Skip to content

Commit 6309e69

Browse files
authored
[To dev/1.3] Pipe: Cleaned multiple potential problems in pipe module (#17396) (#17417)
* Cleaned multiple potential problems in pipe module (#17396) * fix * fix * fix * fix * gras-shop * fix * spls * fix * pipe-dn * logger-bug * fix * fix
1 parent 7478110 commit 6309e69

22 files changed

Lines changed: 70 additions & 174 deletions

File tree

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public PipeHeartbeat(
5555
// the final results and namely these dataNodes are omitted in calculation.
5656
remainingEventCountMap.put(
5757
pipeMeta.getStaticMeta(),
58-
Objects.nonNull(pipeCompletedListFromAgent)
58+
Objects.nonNull(pipeRemainingEventCountListFromAgent)
5959
? pipeRemainingEventCountListFromAgent.get(i)
6060
: 0L);
6161
remainingTimeMap.put(

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/source/IoTDBConfigRegionSource.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,10 @@ protected void triggerSnapshot() {
101101
@Override
102102
public synchronized EnrichedEvent supply() throws Exception {
103103
final EnrichedEvent event = super.supply();
104-
PipeEventCommitManager.getInstance()
105-
.enrichWithCommitterKeyAndCommitId(event, creationTime, regionId);
104+
if (Objects.nonNull(event)) {
105+
PipeEventCommitManager.getInstance()
106+
.enrichWithCommitterKeyAndCommitId(event, creationTime, regionId);
107+
}
106108
return event;
107109
}
108110

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/runtime/PipeAgentLauncher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public static synchronized void launchPipePluginAgent(
7878
curList.add(uninstalledOrConflictedPipePluginMetaList.get(index + offset));
7979
offset++;
8080
}
81-
index += (offset + 1);
81+
index += offset;
8282
fetchAndSavePipePluginJars(curList);
8383
}
8484

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -722,7 +722,7 @@ protected void calculateMemoryUsage(
722722
MESSAGE_PIPE_NOT_ENOUGH_MEMORY,
723723
needMemory,
724724
freeMemorySizeInBytes,
725-
freeMemorySizeInBytes,
725+
reservedMemorySizeInBytes,
726726
PipeMemoryManager.getTotalMemorySizeInBytes());
727727
LOGGER.warn(message);
728728
throw new PipeException(message);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ private void collectEvent(final Event event) {
196196
((PipeHeartbeatEvent) event).recordConnectorQueueSize(pendingQueue);
197197
}
198198

199-
pendingQueue.directOffer(event);
199+
pendingQueue.offer(event);
200200
collectInvocationCount.incrementAndGet();
201201
}
202202

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeRealtimePriorityBlockingQueue.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public PipeRealtimePriorityBlockingQueue() {
7272
}
7373

7474
@Override
75-
public boolean directOffer(final Event event) {
75+
public boolean offer(final Event event) {
7676
checkBeforeOffer(event);
7777

7878
if (event instanceof TsFileInsertionEvent) {
@@ -85,18 +85,13 @@ public boolean directOffer(final Event event) {
8585
((EnrichedEvent) event).decreaseReferenceCount(PipeEventCollector.class.getName(), false);
8686
return false;
8787
} else {
88-
return super.directOffer(event);
88+
return super.offer(event);
8989
}
9090
}
9191

92-
@Override
93-
public boolean waitedOffer(final Event event) {
94-
return directOffer(event);
95-
}
96-
9792
@Override
9893
public boolean put(final Event event) {
99-
directOffer(event);
94+
offer(event);
10095
return true;
10196
}
10297

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/overview/PipeResourceMetrics.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public void unbindFrom(final AbstractMetricService metricService) {
137137
// phantom reference count
138138
metricService.remove(MetricType.AUTO_GAUGE, Metric.PIPE_PHANTOM_REFERENCE_COUNT.toString());
139139

140-
metricService.remove(MetricType.RATE, Metric.PIPE_TSFILE_SEND_DISK_IO.toString());
140+
metricService.remove(MetricType.COUNTER, Metric.PIPE_TSFILE_SEND_DISK_IO.toString());
141141
}
142142

143143
public void recordDiskIO(final long bytes) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/metric/receiver/PipeDataNodeReceiverMetrics.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -282,14 +282,14 @@ private void unbind(AbstractMetricService metricService) {
282282
Tag.NAME.toString(),
283283
RECEIVER,
284284
Tag.TYPE.toString(),
285-
"handshakeDatanodeV1");
285+
"handshakeDataNodeV1");
286286
metricService.remove(
287287
MetricType.TIMER,
288288
Metric.PIPE_DATANODE_RECEIVER.toString(),
289289
Tag.NAME.toString(),
290290
RECEIVER,
291291
Tag.TYPE.toString(),
292-
"handshakeDatanodeV2");
292+
"handshakeDataNodeV2");
293293
metricService.remove(
294294
MetricType.TIMER,
295295
Metric.PIPE_DATANODE_RECEIVER.toString(),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -728,7 +728,7 @@ public void collectWindowOutputs(
728728
throw new UnsupportedOperationException(
729729
String.format(
730730
"The output tablet does not support column type %s",
731-
valueColumnTypes[rowIndex]));
731+
valueColumnTypes[columnIndex]));
732732
}
733733
} else {
734734
bitMaps[columnIndex].mark(rowIndex);
@@ -742,7 +742,7 @@ public void collectWindowOutputs(
742742
int filteredCount = 0;
743743
for (int i = 0; i < columnNameStringList.length; ++i) {
744744
if (!bitMaps[i].isAllMarked()) {
745-
originColumnIndex2FilteredColumnIndexMapperList[i] = ++filteredCount;
745+
originColumnIndex2FilteredColumnIndexMapperList[i] = filteredCount++;
746746
}
747747
}
748748

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeDynamicMemoryBlock.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public class PipeDynamicMemoryBlock {
4444

4545
PipeDynamicMemoryBlock(
4646
final @NotNull PipeModelFixedMemoryBlock fixedMemoryBlock, final long memoryUsageInBytes) {
47-
this.memoryUsageInBytes = Math.min(memoryUsageInBytes, 0);
47+
this.memoryUsageInBytes = Math.max(memoryUsageInBytes, 0);
4848
this.fixedMemoryBlock = fixedMemoryBlock;
4949
}
5050

@@ -116,7 +116,7 @@ public void updateMemoryEfficiency(
116116
if (Double.isNaN(historyMemoryEfficiency)
117117
|| Double.isInfinite(historyMemoryEfficiency)
118118
|| historyMemoryEfficiency < 0.0) {
119-
currentMemoryEfficiency = 0.0;
119+
historyMemoryEfficiency = 0.0;
120120
}
121121

122122
this.historyMemoryEfficiency = Math.min(historyMemoryEfficiency, 1.0);

0 commit comments

Comments
 (0)