Skip to content

Commit 9c6b90a

Browse files
authored
[Pipe] Deduplicate historical tsfile events in IoTConsensusV2 pipes (#17472)
* Pipe: deduplicate historical tsfile events per task scope * Pipe: address historical tsfile dedup review comments * spotless * Pipe: fix dedup scope cleanup and historical skip loop * spotless * Refine historical tsfile dedup supply semantics * spotless
1 parent b5f54b9 commit 9c6b90a

File tree

11 files changed

+585
-59
lines changed

11 files changed

+585
-59
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ public PipeCompactedTsFileInsertionEvent(
9191
// init fields of PipeTsFileInsertionEvent
9292
flushPointCount = bindFlushPointCount(originalEvents);
9393
overridingProgressIndex = bindOverridingProgressIndex(originalEvents);
94+
bindTsFileDedupScopeID(anyOfOriginalEvents.getTsFileDedupScopeID());
9495
}
9596

9697
private static boolean bindIsWithMod(Set<PipeTsFileInsertionEvent> originalEvents) {
@@ -184,10 +185,10 @@ public boolean equalsInIoTConsensusV2(final Object o) {
184185

185186
@Override
186187
public void eliminateProgressIndex() {
187-
if (Objects.isNull(overridingProgressIndex)) {
188+
if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(getTsFileDedupScopeID())) {
188189
for (final String originFilePath : originFilePaths) {
189190
PipeTsFileEpochProgressIndexKeeper.getInstance()
190-
.eliminateProgressIndex(dataRegionId, pipeName, originFilePath);
191+
.eliminateProgressIndex(dataRegionId, getTsFileDedupScopeID(), originFilePath);
191192
}
192193
}
193194
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java

Lines changed: 36 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent
9999

100100
protected volatile ProgressIndex overridingProgressIndex;
101101
private Set<String> tableNames;
102+
private String tsFileDedupScopeID;
102103

103104
// This is set to check the tsFile paths by privilege
104105
private Map<IDeviceID, String[]> treeSchemaMap;
@@ -398,13 +399,26 @@ public ProgressIndex forceGetProgressIndex() {
398399
}
399400

400401
public void eliminateProgressIndex() {
401-
if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(resource)) {
402+
if (Objects.isNull(overridingProgressIndex)
403+
&& Objects.nonNull(resource)
404+
&& Objects.nonNull(tsFileDedupScopeID)) {
402405
PipeTsFileEpochProgressIndexKeeper.getInstance()
403406
.eliminateProgressIndex(
404-
Integer.parseInt(resource.getDataRegionId()), pipeName, resource.getTsFilePath());
407+
Integer.parseInt(resource.getDataRegionId()),
408+
tsFileDedupScopeID,
409+
resource.getTsFilePath());
405410
}
406411
}
407412

413+
public PipeTsFileInsertionEvent bindTsFileDedupScopeID(final String tsFileDedupScopeID) {
414+
this.tsFileDedupScopeID = tsFileDedupScopeID;
415+
return this;
416+
}
417+
418+
public String getTsFileDedupScopeID() {
419+
return tsFileDedupScopeID;
420+
}
421+
408422
@Override
409423
public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressReport(
410424
final String pipeName,
@@ -419,25 +433,26 @@ public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressRep
419433
final long startTime,
420434
final long endTime) {
421435
return new PipeTsFileInsertionEvent(
422-
getRawIsTableModelEvent(),
423-
getSourceDatabaseNameFromDataRegion(),
424-
resource,
425-
tsFile,
426-
isWithMod,
427-
isLoaded,
428-
isGeneratedByHistoricalExtractor,
429-
tableNames,
430-
pipeName,
431-
creationTime,
432-
pipeTaskMeta,
433-
treePattern,
434-
tablePattern,
435-
userId,
436-
userName,
437-
cliHostname,
438-
skipIfNoPrivileges,
439-
startTime,
440-
endTime);
436+
getRawIsTableModelEvent(),
437+
getSourceDatabaseNameFromDataRegion(),
438+
resource,
439+
tsFile,
440+
isWithMod,
441+
isLoaded,
442+
isGeneratedByHistoricalExtractor,
443+
tableNames,
444+
pipeName,
445+
creationTime,
446+
pipeTaskMeta,
447+
treePattern,
448+
tablePattern,
449+
userId,
450+
userName,
451+
cliHostname,
452+
skipIfNoPrivileges,
453+
startTime,
454+
endTime)
455+
.bindTsFileDedupScopeID(tsFileDedupScopeID);
441456
}
442457

443458
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java

Lines changed: 71 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.iotdb.db.pipe.processor.iotconsensusv2.IoTConsensusV2Processor;
5151
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
5252
import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
53+
import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
5354
import org.apache.iotdb.db.storageengine.StorageEngine;
5455
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
5556
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
@@ -124,6 +125,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
124125

125126
private String pipeName;
126127
private long creationTime;
128+
private String tsFileDedupScopeID;
127129

128130
private PipeTaskMeta pipeTaskMeta;
129131
private ProgressIndex startIndex;
@@ -320,6 +322,14 @@ public void customize(
320322
}
321323

322324
dataRegionId = environment.getRegionId();
325+
tsFileDedupScopeID =
326+
pipeName
327+
+ "_"
328+
+ dataRegionId
329+
+ "_"
330+
+ creationTime
331+
+ "_"
332+
+ Integer.toHexString(System.identityHashCode(environment));
323333

324334
treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters);
325335
tablePattern = TablePattern.parsePipePatternFromSourceParameters(parameters);
@@ -807,11 +817,15 @@ public synchronized Event supply() {
807817
final PersistentResource resource = pendingQueue.poll();
808818
if (resource == null) {
809819
return supplyTerminateEvent();
810-
} else if (resource instanceof TsFileResource) {
811-
return supplyTsFileEvent((TsFileResource) resource);
812-
} else {
813-
return supplyDeletionEvent((DeletionResource) resource);
814820
}
821+
822+
if (resource instanceof TsFileResource) {
823+
final TsFileResource tsFileResource = (TsFileResource) resource;
824+
return consumeSkippedHistoricalTsFileEventIfNecessary(tsFileResource)
825+
? supplyProgressReportEvent(tsFileResource.getMaxProgressIndex())
826+
: supplyTsFileEvent(tsFileResource);
827+
}
828+
return supplyDeletionEvent((DeletionResource) resource);
815829
}
816830

817831
private Event supplyTerminateEvent() {
@@ -834,20 +848,54 @@ private Event supplyTerminateEvent() {
834848
return terminateEvent;
835849
}
836850

837-
private Event supplyTsFileEvent(final TsFileResource resource) {
838-
if (!filteredTsFileResources2TableNames.containsKey(resource)) {
839-
final ProgressReportEvent progressReportEvent =
840-
new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta);
841-
progressReportEvent.bindProgressIndex(resource.getMaxProgressIndex());
842-
final boolean isReferenceCountIncreased =
843-
progressReportEvent.increaseReferenceCount(
844-
PipeHistoricalDataRegionTsFileAndDeletionSource.class.getName());
845-
if (!isReferenceCountIncreased) {
851+
protected boolean consumeSkippedHistoricalTsFileEventIfNecessary(final TsFileResource resource) {
852+
if (!filteredTsFileResources2TableNames.containsKey(resource)
853+
|| !shouldSkipHistoricalTsFileEvent(resource)) {
854+
return false;
855+
}
856+
857+
filteredTsFileResources2TableNames.remove(resource);
858+
LOGGER.info(
859+
"Pipe {}@{}: skip historical tsfile {} because realtime source in current task {} has already captured it.",
860+
pipeName,
861+
dataRegionId,
862+
resource.getTsFilePath(),
863+
tsFileDedupScopeID);
864+
try {
865+
return true;
866+
} finally {
867+
try {
868+
PipeDataNodeResourceManager.tsfile()
869+
.unpinTsFileResource(resource, shouldTransferModFile, pipeName);
870+
} catch (final IOException e) {
846871
LOGGER.warn(
847-
"The reference count of the event {} cannot be increased, skipping it.",
848-
progressReportEvent);
872+
"Pipe {}@{}: failed to unpin skipped historical TsFileResource, original path: {}",
873+
pipeName,
874+
dataRegionId,
875+
resource.getTsFilePath(),
876+
e);
849877
}
850-
return isReferenceCountIncreased ? progressReportEvent : null;
878+
}
879+
}
880+
881+
protected Event supplyProgressReportEvent(final ProgressIndex progressIndex) {
882+
final ProgressReportEvent progressReportEvent =
883+
new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta);
884+
progressReportEvent.bindProgressIndex(progressIndex);
885+
final boolean isReferenceCountIncreased =
886+
progressReportEvent.increaseReferenceCount(
887+
PipeHistoricalDataRegionTsFileAndDeletionSource.class.getName());
888+
if (!isReferenceCountIncreased) {
889+
LOGGER.warn(
890+
"The reference count of the event {} cannot be increased, skipping it.",
891+
progressReportEvent);
892+
}
893+
return isReferenceCountIncreased ? progressReportEvent : null;
894+
}
895+
896+
protected Event supplyTsFileEvent(final TsFileResource resource) {
897+
if (!filteredTsFileResources2TableNames.containsKey(resource)) {
898+
return supplyProgressReportEvent(resource.getMaxProgressIndex());
851899
}
852900

853901
final PipeTsFileInsertionEvent event =
@@ -916,6 +964,13 @@ private Event supplyTsFileEvent(final TsFileResource resource) {
916964
}
917965
}
918966

967+
private boolean shouldSkipHistoricalTsFileEvent(final TsFileResource resource) {
968+
return pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)
969+
&& DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2
970+
&& PipeTsFileEpochProgressIndexKeeper.getInstance()
971+
.containsTsFile(dataRegionId, tsFileDedupScopeID, resource.getTsFilePath());
972+
}
973+
919974
private Event supplyDeletionEvent(final DeletionResource deletionResource) {
920975
final PipeDeleteDataNodeEvent event =
921976
new PipeDeleteDataNodeEvent(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ private void extractTabletInsertion(final PipeRealtimeEvent event) {
8383
if (canNotUseTabletAnymore(event)) {
8484
event.getTsFileEpoch().migrateState(this, curState -> TsFileEpoch.State.USING_TSFILE);
8585
PipeTsFileEpochProgressIndexKeeper.getInstance()
86-
.registerProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getResource());
86+
.registerProgressIndex(
87+
dataRegionId, getTsFileDedupScopeID(), event.getTsFileEpoch().getResource());
8788
} else {
8889
event
8990
.getTsFileEpoch()
@@ -156,7 +157,8 @@ private void extractTsFileInsertion(final PipeRealtimeEvent event) {
156157
case USING_TABLET:
157158
// If the state is USING_TABLET, discard the event
158159
PipeTsFileEpochProgressIndexKeeper.getInstance()
159-
.eliminateProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getFilePath());
160+
.eliminateProgressIndex(
161+
dataRegionId, getTsFileDedupScopeID(), event.getTsFileEpoch().getFilePath());
160162
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName(), false);
161163
return;
162164
case EMPTY:
@@ -283,7 +285,8 @@ private Event supplyTsFileInsertion(final PipeRealtimeEvent event) {
283285
PipeDataNodeAgent.runtime()
284286
.report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
285287
PipeTsFileEpochProgressIndexKeeper.getInstance()
286-
.eliminateProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getFilePath());
288+
.eliminateProgressIndex(
289+
dataRegionId, getTsFileDedupScopeID(), event.getTsFileEpoch().getFilePath());
287290
return null;
288291
}
289292
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ public abstract class PipeRealtimeDataRegionSource implements PipeExtractor {
133133

134134
protected String pipeID;
135135
private String taskID;
136+
private String tsFileDedupScopeID;
136137
protected long userId;
137138
protected String userName;
138139
protected String cliHostname;
@@ -226,6 +227,7 @@ public void customize(
226227
creationTime = environment.getCreationTime();
227228
pipeID = pipeName + "_" + creationTime;
228229
taskID = pipeName + "_" + dataRegionId + "_" + creationTime;
230+
tsFileDedupScopeID = taskID + "_" + Integer.toHexString(System.identityHashCode(environment));
229231

230232
treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters);
231233
tablePattern = TablePattern.parsePipePatternFromSourceParameters(parameters);
@@ -322,6 +324,8 @@ public void close() throws Exception {
322324
if (dataRegionId >= 0) {
323325
PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(dataRegionId, this);
324326
PipeTimePartitionListener.getInstance().stopListen(dataRegionId, this);
327+
PipeTsFileEpochProgressIndexKeeper.getInstance()
328+
.clearProgressIndex(dataRegionId, tsFileDedupScopeID);
325329
}
326330

327331
synchronized (isClosed) {
@@ -580,7 +584,7 @@ private void maySkipProgressIndexForRealtimeEvent(final PipeRealtimeEvent event)
580584
if (PipeTsFileEpochProgressIndexKeeper.getInstance()
581585
.isProgressIndexAfterOrEquals(
582586
dataRegionId,
583-
pipeName,
587+
tsFileDedupScopeID,
584588
event.getTsFileEpoch().getFilePath(),
585589
getProgressIndex4RealtimeEvent(event))) {
586590
event.skipReportOnCommit();
@@ -652,6 +656,10 @@ public String getTaskID() {
652656
return taskID;
653657
}
654658

659+
public final String getTsFileDedupScopeID() {
660+
return tsFileDedupScopeID;
661+
}
662+
655663
public void increaseExtractEpochSize() {
656664
extractEpochSize.incrementAndGet();
657665
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ protected void doExtract(PipeRealtimeEvent event) {
5252

5353
event.getTsFileEpoch().migrateState(this, state -> TsFileEpoch.State.USING_TSFILE);
5454
PipeTsFileEpochProgressIndexKeeper.getInstance()
55-
.registerProgressIndex(dataRegionId, pipeName, event.getTsFileEpoch().getResource());
55+
.registerProgressIndex(
56+
dataRegionId, getTsFileDedupScopeID(), event.getTsFileEpoch().getResource());
5657

5758
if (!(event.getEvent() instanceof TsFileInsertionEvent)) {
5859
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileSource.class.getName(), false);
@@ -104,7 +105,9 @@ public Event supply() {
104105
.report(pipeTaskMeta, new PipeRuntimeNonCriticalException(errorMessage));
105106
PipeTsFileEpochProgressIndexKeeper.getInstance()
106107
.eliminateProgressIndex(
107-
dataRegionId, pipeName, realtimeEvent.getTsFileEpoch().getFilePath());
108+
dataRegionId,
109+
getTsFileDedupScopeID(),
110+
realtimeEvent.getTsFileEpoch().getFilePath());
108111
}
109112

110113
realtimeEvent.decreaseReferenceCount(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ private void assignToSource(
190190
if (innerEvent instanceof PipeTsFileInsertionEvent) {
191191
final PipeTsFileInsertionEvent tsFileInsertionEvent =
192192
(PipeTsFileInsertionEvent) innerEvent;
193+
tsFileInsertionEvent.bindTsFileDedupScopeID(source.getTsFileDedupScopeID());
193194
tsFileInsertionEvent.disableMod4NonTransferPipes(source.isShouldTransferModFile());
194195
}
195196

0 commit comments

Comments
 (0)