Skip to content

Commit 570bd11

Browse files
Refactor parallel writer (#608)
* refactor of parallel on disk graph index writes * refactor of parallel graph index writes * added unit test of 1 v 2 phase writes * update for review comments * fix for bug that could result in numTasks=0 * initial impl * updates to javadoc comments and imports * deprecated writeInline and moved error block * cleanup * use asyncFileChannel for inline writes * preallocate disk space * Revert "preallocate disk space" This reverts commit f18f30c. * force() writes to disk instead of just OS buffer * debugging code for failing test * removing debugging code for failing test * remove double write * check channel for write completion * force inline writes before neighbors * remove async channel for inline * removing faulty test * removing faulty test * removing faulty test * removing faulty test * unit test parallel writes * add invalidation for buffer cache * Jvector 606 (#624) * Move buffer position in ByteBufferIndexWriter#writeFloats * Add simpler way to write VectorFloat to an IndexWriter --------- Co-authored-by: Michael Marshall <mmarshall@apache.org> * continue to use standard writer in tests for now --------- Co-authored-by: Michael Marshall <mmarshall@apache.org>
1 parent 20c348e commit 570bd11

12 files changed

Lines changed: 773 additions & 513 deletions

File tree

benchmarks-jmh/src/main/java/io/github/jbellis/jvector/bench/ParallelWriteBenchmark.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@
2121
import io.github.jbellis.jvector.graph.ListRandomAccessVectorValues;
2222
import io.github.jbellis.jvector.graph.NodesIterator;
2323
import io.github.jbellis.jvector.graph.RandomAccessVectorValues;
24-
import io.github.jbellis.jvector.graph.disk.OnDiskGraphIndex;
25-
import io.github.jbellis.jvector.graph.disk.OnDiskGraphIndexWriter;
26-
import io.github.jbellis.jvector.graph.disk.OrdinalMapper;
24+
import io.github.jbellis.jvector.graph.disk.*;
2725
import io.github.jbellis.jvector.graph.disk.feature.Feature;
2826
import io.github.jbellis.jvector.graph.disk.feature.FeatureId;
2927
import io.github.jbellis.jvector.graph.disk.feature.FusedPQ;
@@ -189,12 +187,17 @@ public void writeSequentialThenParallelAndVerify(Blackhole blackhole) throws IOE
189187
private void writeGraph(ImmutableGraphIndex graph,
190188
Path path,
191189
boolean parallel) throws IOException {
192-
try (var writer = new OnDiskGraphIndexWriter.Builder(graph, path)
193-
.withParallelWrites(parallel)
194-
.with(nvqFeature)
195-
.with(fusedPQFeature)
196-
.withMapper(identityMapper)
197-
.build()) {
190+
try (RandomAccessOnDiskGraphIndexWriter writer = parallel ?
191+
new OnDiskParallelGraphIndexWriter.Builder(graph, path)
192+
.with(nvqFeature)
193+
.with(fusedPQFeature)
194+
.withMapper(identityMapper)
195+
.build() :
196+
new OnDiskGraphIndexWriter.Builder(graph, path)
197+
.with(nvqFeature)
198+
.with(fusedPQFeature)
199+
.withMapper(identityMapper)
200+
.build()) {
198201
var view = graph.getView();
199202
Map<FeatureId, IntFunction<Feature.State>> writeSuppliers = new EnumMap<>(FeatureId.class);
200203
writeSuppliers.put(FeatureId.NVQ_VECTORS, inlineSuppliers.get(FeatureId.NVQ_VECTORS));

jvector-base/src/main/java/io/github/jbellis/jvector/disk/BufferedRandomAccessWriter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,22 @@ public void flush() throws IOException {
9292
stream.flush();
9393
}
9494

95+
/**
96+
* Invalidates any buffered read cache by seeking to the current position.
97+
* This is necessary when external writes (e.g., via AsynchronousFileChannel)
98+
* have modified the file, bypassing this writer's buffer.
99+
* <p>
100+
* This method flushes any pending writes and then seeks to the current position,
101+
* which forces the underlying RandomAccessFile to re-read from disk on the next read.
102+
*
103+
* @throws IOException if an I/O error occurs
104+
*/
105+
public void invalidateCache() throws IOException {
106+
flush();
107+
long currentPos = raf.getFilePointer();
108+
raf.seek(currentPos);
109+
}
110+
95111
/**
96112
* Returns the CRC32 checksum for the range [startOffset .. endOffset)
97113
*

jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/GraphIndexWriter.java

Lines changed: 13 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@
3434
* <p>
3535
* Implementations support different strategies for writing graph data,
3636
* including random access, sequential, and parallel writing modes.
37-
* Use {@link #getBuilderFor(GraphIndexWriterTypes, ImmutableGraphIndex, IndexWriter)}
38-
* or {@link #getBuilderFor(GraphIndexWriterTypes, ImmutableGraphIndex, Path)}
37+
* Use {@link #getBuilderFor(GraphIndexWriterTypes, ImmutableGraphIndex, Path)}
3938
* factory methods to obtain appropriate builder instances.
4039
*
4140
* @see GraphIndexWriterTypes
@@ -56,58 +55,31 @@ public interface GraphIndexWriter extends Closeable {
5655
void write(Map<FeatureId, IntFunction<Feature.State>> featureStateSuppliers) throws IOException;
5756

5857
/**
59-
* Factory method to obtain a builder for the specified writer type with an IndexWriter.
60-
* <p>
61-
* This overload accepts any IndexWriter but certain types have specific requirements:
62-
* <ul>
63-
* <li>ON_DISK requires a RandomAccessWriter (will throw IllegalArgumentException otherwise)</li>
64-
* <li>ON_DISK_SEQUENTIAL accepts any IndexWriter</li>
65-
* <li>ON_DISK_PARALLEL is not supported via this method (use the Path overload instead)</li>
66-
* </ul>
58+
* Factory method to obtain a builder for the specified writer type.
6759
*
6860
* @param type the type of writer to create
6961
* @param graphIndex the graph index to write
70-
* @param out the output writer
62+
* @param out the Path to the output file
7163
* @return a builder for the specified writer type
7264
* @throws IllegalArgumentException if the type requires a specific writer type that wasn't provided
7365
*/
74-
static AbstractGraphIndexWriter.Builder<? extends AbstractGraphIndexWriter<?>, ? extends IndexWriter>
75-
getBuilderFor(GraphIndexWriterTypes type, ImmutableGraphIndex graphIndex, IndexWriter out) {
66+
static AbstractGraphIndexWriter.Builder<? extends AbstractGraphIndexWriter<?>, ? extends RandomAccessWriter>
67+
getBuilderFor(GraphIndexWriterTypes type, ImmutableGraphIndex graphIndex, Path out) throws FileNotFoundException {
7668
switch (type) {
77-
case ON_DISK_PARALLEL:
78-
if (!(out instanceof RandomAccessWriter)) {
79-
throw new IllegalArgumentException("ON_DISK_PARALLEL requires a RandomAccessWriter");
80-
}
81-
return new OnDiskGraphIndexWriter.Builder(graphIndex, (RandomAccessWriter) out);
82-
case ON_DISK_SEQUENTIAL:
83-
return new OnDiskSequentialGraphIndexWriter.Builder(graphIndex, out);
69+
case RANDOM_ACCESS:
70+
return new OnDiskGraphIndexWriter.Builder(graphIndex, out);
71+
case RANDOM_ACCESS_PARALLEL:
72+
return new OnDiskParallelGraphIndexWriter.Builder(graphIndex, out);
8473
default:
85-
throw new IllegalArgumentException("Unknown GraphIndexWriterType: " + type);
74+
throw new IllegalArgumentException("Unknown RandomAccess GraphIndexWriterType: " + type);
8675
}
8776
}
8877

89-
/**
90-
* Factory method to obtain a builder for the specified writer type with a file Path.
91-
* <p>
92-
* This overload accepts a Path and is required for:
93-
* <ul>
94-
* <li>ON_DISK_PARALLEL - enables async I/O for improved throughput</li>
95-
* </ul>
96-
* Other writer types should use the {@link #getBuilderFor(GraphIndexWriterTypes, ImmutableGraphIndex, IndexWriter)}
97-
* overload instead.
98-
*
99-
* @param type the type of writer to create (currently only ON_DISK_PARALLEL is supported)
100-
* @param graphIndex the graph index to write
101-
* @param out the output file path
102-
* @return a builder for the specified writer type
103-
* @throws FileNotFoundException if the file cannot be created or opened
104-
* @throws IllegalArgumentException if the type is not supported via this method
105-
*/
10678
static AbstractGraphIndexWriter.Builder<? extends AbstractGraphIndexWriter<?>, ? extends IndexWriter>
107-
getBuilderFor(GraphIndexWriterTypes type, ImmutableGraphIndex graphIndex, Path out) throws FileNotFoundException {
79+
getBuilderFor(GraphIndexWriterTypes type, ImmutableGraphIndex graphIndex, IndexWriter out) {
10880
switch (type) {
109-
case ON_DISK_PARALLEL:
110-
return new OnDiskGraphIndexWriter.Builder(graphIndex, out);
81+
case ON_DISK_SEQUENTIAL:
82+
return new OnDiskSequentialGraphIndexWriter.Builder(graphIndex, out);
11183
default:
11284
throw new IllegalArgumentException("Unknown GraphIndexWriterType: " + type);
11385
}

jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/GraphIndexWriterTypes.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,19 @@ public enum GraphIndexWriterTypes {
3232
*/
3333
ON_DISK_SEQUENTIAL,
3434

35+
/**
36+
* Sequential on-disk writer that uses asynchronous I/O for improved throughput.
37+
* Writes all data sequentially and is the current default implementation.
38+
* Writes header as footer. Does not support incremental updates.
39+
* Accepts any RandomAccessWriter.
40+
*/
41+
RANDOM_ACCESS,
42+
3543
/**
3644
* Parallel on-disk writer that uses asynchronous I/O for improved throughput.
3745
* Builds records in parallel across multiple threads and writes them
3846
* asynchronously using AsynchronousFileChannel.
3947
* Requires a Path to be provided for async file channel access.
4048
*/
41-
ON_DISK_PARALLEL
49+
RANDOM_ACCESS_PARALLEL
4250
}

jvector-base/src/main/java/io/github/jbellis/jvector/graph/disk/NodeRecordTask.java

Lines changed: 67 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,27 @@
2121
import io.github.jbellis.jvector.graph.disk.feature.Feature;
2222
import io.github.jbellis.jvector.graph.disk.feature.FeatureId;
2323

24+
import java.io.IOException;
2425
import java.nio.ByteBuffer;
25-
import java.util.ArrayList;
26+
import java.nio.channels.AsynchronousFileChannel;
2627
import java.util.List;
2728
import java.util.Map;
2829
import java.util.concurrent.Callable;
30+
import java.util.concurrent.ExecutionException;
2931
import java.util.function.IntFunction;
3032

3133
/**
32-
* A task that builds L0 records for a range of nodes in memory.
34+
* A task that writes L0 records for a range of nodes directly to disk using synchronous position-based writes.
3335
* <p>
3436
* This task is designed to be executed in a thread pool, with each worker thread
3537
* owning its own ImmutableGraphIndex.View for thread-safe neighbor iteration.
3638
* Each task processes a contiguous range of ordinals to reduce task creation overhead.
39+
* <p>
40+
* This writes directly to the AsynchronousFileChannel using position-based writes with writeFully
41+
* to ensure all bytes are written before returning. This eliminates race conditions where the OS
42+
* buffer cache hasn't flushed data before subsequent reads occur.
3743
*/
38-
class NodeRecordTask implements Callable<List<NodeRecordTask.Result>> {
44+
class NodeRecordTask implements Callable<Void> {
3945
private final int startOrdinal; // Inclusive
4046
private final int endOrdinal; // Exclusive
4147
private final OrdinalMapper ordinalMapper;
@@ -45,22 +51,8 @@ class NodeRecordTask implements Callable<List<NodeRecordTask.Result>> {
4551
private final Map<FeatureId, IntFunction<Feature.State>> featureStateSuppliers;
4652
private final int recordSize;
4753
private final long baseOffset; // Base file offset for L0 (offsets calculated per-ordinal)
48-
private final ByteBuffer buffer;
49-
50-
/**
51-
* Result of building a node record.
52-
*/
53-
static class Result {
54-
final int newOrdinal;
55-
final long fileOffset;
56-
final ByteBuffer data;
57-
58-
Result(int newOrdinal, long fileOffset, ByteBuffer data) {
59-
this.newOrdinal = newOrdinal;
60-
this.fileOffset = fileOffset;
61-
this.data = data;
62-
}
63-
}
54+
private final AsynchronousFileChannel channel;
55+
private final ByteBuffer buffer; // Thread-local buffer for building record components
6456

6557
NodeRecordTask(int startOrdinal,
6658
int endOrdinal,
@@ -71,6 +63,7 @@ static class Result {
7163
Map<FeatureId, IntFunction<Feature.State>> featureStateSuppliers,
7264
int recordSize,
7365
long baseOffset,
66+
AsynchronousFileChannel channel,
7467
ByteBuffer buffer) {
7568
this.startOrdinal = startOrdinal;
7669
this.endOrdinal = endOrdinal;
@@ -81,41 +74,75 @@ static class Result {
8174
this.featureStateSuppliers = featureStateSuppliers;
8275
this.recordSize = recordSize;
8376
this.baseOffset = baseOffset;
77+
this.channel = channel;
8478
this.buffer = buffer;
8579
}
8680

87-
@Override
88-
public List<Result> call() throws Exception {
89-
List<Result> results = new ArrayList<>(endOrdinal - startOrdinal);
81+
/**
82+
* Writes a buffer fully to the channel at the specified position.
83+
* Ensures all bytes are written by looping until the buffer is empty.
84+
* This is critical for correctness as AsynchronousFileChannel.write() may not write all bytes in one call.
85+
*
86+
* @param channel the channel to write to
87+
* @param buffer the buffer to write (will be fully consumed)
88+
* @param position the file position to write at
89+
* @throws IOException if an I/O error occurs
90+
* @throws ExecutionException if the write operation fails
91+
* @throws InterruptedException if interrupted while waiting for write completion
92+
*/
93+
private static void writeFully(AsynchronousFileChannel channel, ByteBuffer buffer, long position)
94+
throws IOException, ExecutionException, InterruptedException {
95+
long currentPosition = position;
96+
while (buffer.hasRemaining()) {
97+
int written = channel.write(buffer, currentPosition).get();
98+
if (written < 0) {
99+
throw new IOException("Channel closed while writing");
100+
}
101+
currentPosition += written;
102+
}
103+
}
90104

105+
@Override
106+
public Void call() throws Exception {
91107
// Reuse writer and buffer across all ordinals in this range
92108
var writer = new ByteBufferIndexWriter(buffer);
93109

94110
for (int newOrdinal = startOrdinal; newOrdinal < endOrdinal; newOrdinal++) {
95-
// Calculate file offset for this ordinal
96-
long fileOffset = baseOffset + (long) newOrdinal * recordSize;
111+
var originalOrdinal = ordinalMapper.newToOld(newOrdinal);
112+
long recordOffset = baseOffset + (long) newOrdinal * recordSize;
113+
long currentPosition = recordOffset;
97114

98115
// Reset buffer for this ordinal
99116
writer.reset();
100117

101-
var originalOrdinal = ordinalMapper.newToOld(newOrdinal);
102-
103118
// Write node ordinal
104119
writer.writeInt(newOrdinal);
120+
ByteBuffer ordinalData = writer.cloneBuffer();
121+
writeFully(channel, ordinalData, currentPosition);
122+
currentPosition += Integer.BYTES;
105123

106124
// Handle OMITTED nodes (holes in ordinal space)
107125
if (originalOrdinal == OrdinalMapper.OMITTED) {
108-
// Write placeholder: skip inline features and write empty neighbor list
126+
// Write placeholder: zeros for features and empty neighbor list
127+
writer.reset();
109128
for (var feature : inlineFeatures) {
110129
// Write zeros for missing features
111130
for (int i = 0; i < feature.featureSize(); i++) {
112131
writer.writeByte(0);
113132
}
114133
}
134+
ByteBuffer featureData = writer.cloneBuffer();
135+
writeFully(channel, featureData, currentPosition);
136+
currentPosition += featureData.remaining();
137+
138+
// Write empty neighbor list
139+
writer.reset();
115140
writer.writeInt(0); // neighbor count
116141
for (int n = 0; n < graph.getDegree(0); n++) {
117142
writer.writeInt(-1); // padding
118143
}
144+
ByteBuffer neighborData = writer.cloneBuffer();
145+
writeFully(channel, neighborData, currentPosition);
119146
} else {
120147
// Validate node exists
121148
if (!graph.containsNode(originalOrdinal)) {
@@ -124,20 +151,22 @@ public List<Result> call() throws Exception {
124151
newOrdinal, originalOrdinal));
125152
}
126153

127-
// Write inline features
154+
// Write inline features (skip if supplier is null - feature was pre-written)
128155
for (var feature : inlineFeatures) {
129156
var supplier = featureStateSuppliers.get(feature.id());
130-
if (supplier == null) {
131-
// Write zeros for missing supplier
132-
for (int i = 0; i < feature.featureSize(); i++) {
133-
writer.writeByte(0);
134-
}
135-
} else {
157+
if (supplier != null) {
158+
// Feature not pre-written, write it now
159+
writer.reset();
136160
feature.writeInline(writer, supplier.apply(originalOrdinal));
161+
ByteBuffer featureData = writer.cloneBuffer();
162+
writeFully(channel, featureData, currentPosition);
137163
}
164+
// Skip to next feature position (whether we wrote it or not)
165+
currentPosition += feature.featureSize();
138166
}
139167

140168
// Write neighbors
169+
writer.reset();
141170
var neighbors = view.getNeighborsIterator(0, originalOrdinal);
142171
if (neighbors.size() > graph.getDegree(0)) {
143172
throw new IllegalStateException(
@@ -161,21 +190,13 @@ public List<Result> call() throws Exception {
161190
for (; n < graph.getDegree(0); n++) {
162191
writer.writeInt(-1);
163192
}
164-
}
165193

166-
// Verify we wrote exactly the expected amount
167-
if (writer.bytesWritten() != recordSize) {
168-
throw new IllegalStateException(
169-
String.format("Record size mismatch for ordinal %d: expected %d bytes, wrote %d bytes",
170-
newOrdinal, recordSize, writer.bytesWritten()));
194+
ByteBuffer neighborData = writer.cloneBuffer();
195+
writeFully(channel, neighborData, currentPosition);
171196
}
172-
173-
// Writer handles flip, copy, and reset internally
174-
// The copy ensures thread-local buffer can be safely reused for the next ordinal
175-
ByteBuffer dataCopy = writer.cloneBuffer();
176-
results.add(new Result(newOrdinal, fileOffset, dataCopy));
177197
}
178198

179-
return results;
199+
return null;
180200
}
181201
}
202+

0 commit comments

Comments
 (0)