Skip to content

Commit 81397b2

Browse files
authored
[To dev/1.3] Pipe: Fixed the on committed hook square bug & Trimmed the raw tablet hook & Fixed the premature report for source event & Skipped the parsing of time-covered tsFile (#17360) (#17362)
* new * fix-data * fix * revert * fix
1 parent 7475bd4 commit 81397b2

3 files changed

Lines changed: 36 additions & 11 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeRawTabletInsertionEvent.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,14 @@ private PipeRawTabletInsertionEvent(
8585
this.allocatedMemoryBlock =
8686
PipeDataNodeResourceManager.memory().forceAllocateForTabletWithRetry(0);
8787

88-
addOnCommittedHook(
89-
() -> {
90-
if (shouldReportOnCommit) {
91-
eliminateProgressIndex();
92-
}
93-
});
88+
if (needToReport) {
89+
addOnCommittedHook(
90+
() -> {
91+
if (shouldReportOnCommit) {
92+
eliminateProgressIndex();
93+
}
94+
});
95+
}
9496
}
9597

9698
public PipeRawTabletInsertionEvent(
@@ -182,10 +184,8 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa
182184
}
183185

184186
protected void eliminateProgressIndex() {
185-
if (needToReport) {
186-
if (sourceEvent instanceof PipeTsFileInsertionEvent) {
187-
((PipeTsFileInsertionEvent) sourceEvent).eliminateProgressIndex();
188-
}
187+
if (sourceEvent instanceof PipeTsFileInsertionEvent) {
188+
((PipeTsFileInsertionEvent) sourceEvent).eliminateProgressIndex();
189189
}
190190
}
191191

@@ -253,6 +253,14 @@ public boolean mayEventPathsOverlappedWithPattern() {
253253
}
254254

255255
public void markAsNeedToReport() {
256+
if (!needToReport) {
257+
addOnCommittedHook(
258+
() -> {
259+
if (shouldReportOnCommit) {
260+
eliminateProgressIndex();
261+
}
262+
});
263+
}
256264
this.needToReport = true;
257265
}
258266

@@ -270,6 +278,11 @@ public EnrichedEvent getSourceEvent() {
270278
return sourceEvent;
271279
}
272280

281+
@Override
282+
public boolean isShouldReportOnCommit() {
283+
return shouldReportOnCommit && needToReport;
284+
}
285+
273286
/////////////////////////// TabletInsertionEvent ///////////////////////////
274287

275288
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,17 @@ public boolean mayEventTimeOverlappedWithTimeRange() {
375375
|| startTime <= resource.getFileEndTime() && resource.getFileStartTime() <= endTime;
376376
}
377377

378+
@Override
379+
public boolean shouldParseTime() {
380+
if (!isTimeParsed
381+
&& Objects.nonNull(resource)
382+
&& startTime <= resource.getFileStartTime()
383+
&& resource.getFileEndTime() <= endTime) {
384+
isTimeParsed = true;
385+
}
386+
return !isTimeParsed;
387+
}
388+
378389
@Override
379390
public boolean mayEventPathsOverlappedWithPattern() {
380391
if (Objects.isNull(resource) || !resource.isClosed()) {

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/interval/PipeCommitInterval.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
2525
import org.apache.iotdb.commons.pipe.datastructure.interval.Interval;
2626

27+
import java.util.ArrayList;
2728
import java.util.List;
2829
import java.util.Objects;
2930

@@ -43,7 +44,7 @@ public PipeCommitInterval(
4344
this.pipeTaskMeta = pipeTaskMeta;
4445
this.currentIndex =
4546
Objects.nonNull(currentIndex) ? currentIndex : MinimumProgressIndex.INSTANCE;
46-
this.onCommittedHooks = onCommittedHooks;
47+
this.onCommittedHooks = new ArrayList<>(onCommittedHooks);
4748
}
4849

4950
@Override

0 commit comments

Comments
 (0)