Skip to content

Commit b5f54b9

Browse files
authored
Fix IoTConsensusV2 receiver writer borrow race (#17495)
* Fix IoTConsensusV2 receiver writer borrow race * Keep IoTConsensusV2 writer borrow fast path * fix data race
1 parent bfb4856 commit b5f54b9

1 file changed

Lines changed: 28 additions & 30 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/iotconsensusv2/IoTConsensusV2Receiver.java

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1001,47 +1001,45 @@ public IoTConsensusV2TsFileWriter tryToFindCorrespondingWriter(TCommitId commitI
10011001
return tsFileWriter.orElse(null);
10021002
}
10031003

1004-
@SuppressWarnings("java:S3655")
10051004
public IoTConsensusV2TsFileWriter borrowCorrespondingWriter(TCommitId commitId) {
1006-
Optional<IoTConsensusV2TsFileWriter> tsFileWriter =
1005+
final Optional<IoTConsensusV2TsFileWriter> correspondingWriter =
10071006
iotConsensusV2TsFileWriterPool.stream()
10081007
.filter(
10091008
item ->
10101009
item.isUsed()
10111010
&& Objects.equals(commitId, item.getCommitIdOfCorrespondingHolderEvent()))
10121011
.findFirst();
1012+
if (correspondingWriter.isPresent()) {
1013+
return correspondingWriter.get().refreshLastUsedTs();
1014+
}
10131015

1014-
// If the TsFileInsertionEvent is first using tsFileWriter, we will find the first available
1015-
// buffer for it.
1016-
if (!tsFileWriter.isPresent()) {
1017-
// We should synchronously find the idle writer to avoid concurrency issues.
1018-
lock.lock();
1019-
try {
1020-
// We need to check tsFileWriter.isPresent() here. Since there may be both retry-sent
1021-
// tsfile
1022-
// events and real-time-sent tsfile events, causing the receiver's tsFileWriter load to
1023-
// exceed IOTDB_CONFIG.getIoTConsensusV2PipelineSize().
1024-
while (!tsFileWriter.isPresent()) {
1025-
tsFileWriter =
1026-
iotConsensusV2TsFileWriterPool.stream().filter(item -> !item.isUsed()).findFirst();
1027-
condition.await(RETRY_WAIT_TIME, TimeUnit.MILLISECONDS);
1016+
lock.lock();
1017+
try {
1018+
while (true) {
1019+
final Optional<IoTConsensusV2TsFileWriter> idleWriter =
1020+
iotConsensusV2TsFileWriterPool.stream().filter(item -> !item.isUsed()).findFirst();
1021+
if (idleWriter.isPresent()) {
1022+
final IoTConsensusV2TsFileWriter writer = idleWriter.get();
1023+
// Publish commitId before marking the writer as used so lock-free lookup callers
1024+
// observing isUsed=true can always see the bound commitId as well.
1025+
writer.setCommitIdOfCorrespondingHolderEvent(commitId);
1026+
writer.setUsed(true);
1027+
return writer.refreshLastUsedTs();
10281028
}
1029-
tsFileWriter.get().setUsed(true);
1030-
tsFileWriter.get().setCommitIdOfCorrespondingHolderEvent(commitId);
1031-
} catch (final InterruptedException e) {
1032-
Thread.currentThread().interrupt();
1033-
final String errorStr =
1034-
String.format(
1035-
"IoTConsensusV2%s: receiver thread get interrupted when waiting for borrowing tsFileWriter.",
1036-
consensusPipeName);
1037-
LOGGER.warn(errorStr);
1038-
throw new RuntimeException(errorStr);
1039-
} finally {
1040-
lock.unlock();
1029+
1030+
condition.await(RETRY_WAIT_TIME, TimeUnit.MILLISECONDS);
10411031
}
1032+
} catch (final InterruptedException e) {
1033+
Thread.currentThread().interrupt();
1034+
final String errorStr =
1035+
String.format(
1036+
"IoTConsensusV2%s: receiver thread get interrupted when waiting for borrowing tsFileWriter.",
1037+
consensusPipeName);
1038+
LOGGER.warn(errorStr);
1039+
throw new RuntimeException(errorStr);
1040+
} finally {
1041+
lock.unlock();
10421042
}
1043-
1044-
return tsFileWriter.get().refreshLastUsedTs();
10451043
}
10461044

10471045
private void checkZombieTsFileWriter() {

0 commit comments

Comments
 (0)