Skip to content

Commit 044620e

Browse files
authored
[To dev/1.3] Pipe: Fixed the log of disruptor queue & deleted the useless binary buffer (#17341) (#17359)
* wz * fx
1 parent 0b67cb8 commit 044620e

12 files changed

Lines changed: 37 additions & 170 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
3838
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
3939
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
40-
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
4140
import org.apache.iotdb.pipe.api.access.Row;
4241
import org.apache.iotdb.pipe.api.collector.RowCollector;
4342
import org.apache.iotdb.pipe.api.collector.TabletCollector;
@@ -51,7 +50,8 @@
5150
import org.slf4j.Logger;
5251
import org.slf4j.LoggerFactory;
5352

54-
import java.nio.ByteBuffer;
53+
import javax.annotation.Nonnull;
54+
5555
import java.util.ArrayList;
5656
import java.util.Collection;
5757
import java.util.List;
@@ -102,18 +102,11 @@ private PipeInsertNodeTabletInsertionEvent(
102102
this.allocatedMemoryBlock = new AtomicReference<>();
103103
}
104104

105+
@Nonnull
105106
public InsertNode getInsertNode() {
106107
return insertNode;
107108
}
108109

109-
public ByteBuffer getByteBuffer() throws WALPipeException {
110-
final InsertNode node = insertNode;
111-
if (Objects.isNull(node)) {
112-
throw new PipeException("InsertNode has been released");
113-
}
114-
return node.serializeToByteBuffer();
115-
}
116-
117110
public String getDeviceId() {
118111
final InsertNode node = insertNode;
119112
if (Objects.isNull(node)) {
@@ -214,9 +207,6 @@ public boolean isGeneratedByPipe() {
214207
public boolean mayEventTimeOverlappedWithTimeRange() {
215208
try {
216209
final InsertNode insertNode = getInsertNode();
217-
if (Objects.isNull(insertNode)) {
218-
return true;
219-
}
220210

221211
if (insertNode instanceof InsertRowNode) {
222212
final long timestamp = ((InsertRowNode) insertNode).getTime();
@@ -258,9 +248,6 @@ public boolean mayEventTimeOverlappedWithTimeRange() {
258248
public boolean mayEventPathsOverlappedWithPattern() {
259249
try {
260250
final InsertNode insertNode = getInsertNode();
261-
if (Objects.isNull(insertNode)) {
262-
return true;
263-
}
264251

265252
if (insertNode instanceof InsertRowNode || insertNode instanceof InsertTabletNode) {
266253
final PartialPath devicePartialPath = insertNode.getDevicePath();
@@ -355,9 +342,6 @@ private List<TabletInsertionDataContainer> initDataContainers() {
355342

356343
dataContainers = new ArrayList<>();
357344
final InsertNode node = getInsertNode();
358-
if (Objects.isNull(node)) {
359-
throw new PipeException("InsertNode has been released");
360-
}
361345
switch (node.getType()) {
362346
case INSERT_ROW:
363347
case INSERT_TABLET:

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/batch/PipeTabletEventPlainBatch.java

Lines changed: 5 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,11 @@
2424
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
2525
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBatchReq;
2626
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
27-
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
2827
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
2928

3029
import org.apache.tsfile.utils.Pair;
3130
import org.apache.tsfile.utils.PublicBAOS;
3231
import org.apache.tsfile.utils.ReadWriteIOUtils;
33-
import org.slf4j.Logger;
34-
import org.slf4j.LoggerFactory;
3532

3633
import java.io.DataOutputStream;
3734
import java.io.IOException;
@@ -40,23 +37,14 @@
4037
import java.util.HashMap;
4138
import java.util.List;
4239
import java.util.Map;
43-
import java.util.Objects;
4440

4541
public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
46-
47-
private static final Logger LOGGER = LoggerFactory.getLogger(PipeTabletEventBatch.class);
48-
49-
private final List<ByteBuffer> binaryBuffers = new ArrayList<>();
5042
private final List<ByteBuffer> insertNodeBuffers = new ArrayList<>();
5143
private final List<ByteBuffer> tabletBuffers = new ArrayList<>();
5244

5345
// Used to rate limit when transferring data
5446
private final Map<Pair<String, Long>, Long> pipe2BytesAccumulated = new HashMap<>();
5547

56-
PipeTabletEventPlainBatch(final int maxDelayInMs, final long requestMaxBatchSizeInBytes) {
57-
super(maxDelayInMs, requestMaxBatchSizeInBytes, null);
58-
}
59-
6048
PipeTabletEventPlainBatch(
6149
final int maxDelayInMs,
6250
final long requestMaxBatchSizeInBytes,
@@ -65,8 +53,7 @@ public class PipeTabletEventPlainBatch extends PipeTabletEventBatch {
6553
}
6654

6755
@Override
68-
protected boolean constructBatch(final TabletInsertionEvent event)
69-
throws WALPipeException, IOException {
56+
protected boolean constructBatch(final TabletInsertionEvent event) throws IOException {
7057
final int bufferSize = buildTabletInsertionBuffer(event);
7158
totalBufferSize += bufferSize;
7259
pipe2BytesAccumulated.compute(
@@ -81,16 +68,14 @@ protected boolean constructBatch(final TabletInsertionEvent event)
8168
public synchronized void onSuccess() {
8269
super.onSuccess();
8370

84-
binaryBuffers.clear();
8571
insertNodeBuffers.clear();
8672
tabletBuffers.clear();
8773

8874
pipe2BytesAccumulated.clear();
8975
}
9076

9177
public PipeTransferTabletBatchReq toTPipeTransferReq() throws IOException {
92-
return PipeTransferTabletBatchReq.toTPipeTransferReq(
93-
binaryBuffers, insertNodeBuffers, tabletBuffers);
78+
return PipeTransferTabletBatchReq.toTPipeTransferReq(insertNodeBuffers, tabletBuffers);
9479
}
9580

9681
public Map<Pair<String, Long>, Long> deepCopyPipeName2BytesAccumulated() {
@@ -101,22 +86,16 @@ public Map<Pair<String, Long>, Long> getPipe2BytesAccumulated() {
10186
return pipe2BytesAccumulated;
10287
}
10388

104-
private int buildTabletInsertionBuffer(final TabletInsertionEvent event)
105-
throws IOException, WALPipeException {
89+
private int buildTabletInsertionBuffer(final TabletInsertionEvent event) throws IOException {
10690
final ByteBuffer buffer;
10791
if (event instanceof PipeInsertNodeTabletInsertionEvent) {
10892
final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent =
10993
(PipeInsertNodeTabletInsertionEvent) event;
11094
// Read the bytebuffer from the wal file and transfer it directly without serializing or
11195
// deserializing if possible
11296
final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode();
113-
if (Objects.isNull(insertNode)) {
114-
buffer = pipeInsertNodeTabletInsertionEvent.getByteBuffer();
115-
binaryBuffers.add(buffer);
116-
} else {
117-
buffer = insertNode.serializeToByteBuffer();
118-
insertNodeBuffers.add(buffer);
119-
}
97+
buffer = insertNode.serializeToByteBuffer();
98+
insertNodeBuffers.add(buffer);
12099
} else {
121100
final PipeRawTabletInsertionEvent pipeRawTabletInsertionEvent =
122101
(PipeRawTabletInsertionEvent) event;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/payload/evolvable/request/PipeTransferTabletBatchReq.java

Lines changed: 8 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@
4545

4646
public class PipeTransferTabletBatchReq extends TPipeTransferReq {
4747

48-
private final transient List<PipeTransferTabletBinaryReq> binaryReqs = new ArrayList<>();
4948
private final transient List<PipeTransferTabletInsertNodeReq> insertNodeReqs = new ArrayList<>();
5049
private final transient List<PipeTransferTabletRawReq> tabletReqs = new ArrayList<>();
5150

@@ -61,26 +60,6 @@ public Pair<InsertRowsStatement, InsertMultiTabletsStatement> constructStatement
6160
final List<InsertRowStatement> insertRowStatementList = new ArrayList<>();
6261
final List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>();
6362

64-
for (final PipeTransferTabletBinaryReq binaryReq : binaryReqs) {
65-
final InsertBaseStatement statement = binaryReq.constructStatement();
66-
if (statement.isEmpty()) {
67-
continue;
68-
}
69-
if (statement instanceof InsertRowStatement) {
70-
insertRowStatementList.add((InsertRowStatement) statement);
71-
} else if (statement instanceof InsertTabletStatement) {
72-
insertTabletStatementList.add((InsertTabletStatement) statement);
73-
} else if (statement instanceof InsertRowsStatement) {
74-
insertRowStatementList.addAll(
75-
((InsertRowsStatement) statement).getInsertRowStatementList());
76-
} else {
77-
throw new UnsupportedOperationException(
78-
String.format(
79-
"unknown InsertBaseStatement %s constructed from PipeTransferTabletBinaryReq.",
80-
binaryReq));
81-
}
82-
}
83-
8463
for (final PipeTransferTabletInsertNodeReq insertNodeReq : insertNodeReqs) {
8564
final InsertBaseStatement statement = insertNodeReq.constructStatement();
8665
if (statement.isEmpty()) {
@@ -117,9 +96,7 @@ public Pair<InsertRowsStatement, InsertMultiTabletsStatement> constructStatement
11796
/////////////////////////////// Thrift ///////////////////////////////
11897

11998
public static PipeTransferTabletBatchReq toTPipeTransferReq(
120-
final List<ByteBuffer> binaryBuffers,
121-
final List<ByteBuffer> insertNodeBuffers,
122-
final List<ByteBuffer> tabletBuffers)
99+
final List<ByteBuffer> insertNodeBuffers, final List<ByteBuffer> tabletBuffers)
123100
throws IOException {
124101
final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq();
125102

@@ -130,11 +107,8 @@ public static PipeTransferTabletBatchReq toTPipeTransferReq(
130107
batchReq.type = PipeRequestType.TRANSFER_TABLET_BATCH.getType();
131108
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
132109
final DataOutputStream outputStream = new DataOutputStream(byteArrayOutputStream)) {
133-
ReadWriteIOUtils.write(binaryBuffers.size(), outputStream);
134-
for (final ByteBuffer binaryBuffer : binaryBuffers) {
135-
ReadWriteIOUtils.write(binaryBuffer.limit(), outputStream);
136-
outputStream.write(binaryBuffer.array(), 0, binaryBuffer.limit());
137-
}
110+
// Binary req, for rolling upgrading
111+
ReadWriteIOUtils.write(0, outputStream);
138112

139113
ReadWriteIOUtils.write(insertNodeBuffers.size(), outputStream);
140114
for (final ByteBuffer insertNodeBuffer : insertNodeBuffers) {
@@ -157,16 +131,10 @@ public static PipeTransferTabletBatchReq fromTPipeTransferReq(
157131
final TPipeTransferReq transferReq) {
158132
final PipeTransferTabletBatchReq batchReq = new PipeTransferTabletBatchReq();
159133

160-
int size = ReadWriteIOUtils.readInt(transferReq.body);
161-
for (int i = 0; i < size; ++i) {
162-
final int length = ReadWriteIOUtils.readInt(transferReq.body);
163-
final byte[] body = new byte[length];
164-
transferReq.body.get(body);
165-
batchReq.binaryReqs.add(
166-
PipeTransferTabletBinaryReq.toTPipeTransferReq(ByteBuffer.wrap(body)));
167-
}
134+
// Binary req, for rolling upgrading
135+
ReadWriteIOUtils.readInt(transferReq.body);
168136

169-
size = ReadWriteIOUtils.readInt(transferReq.body);
137+
int size = ReadWriteIOUtils.readInt(transferReq.body);
170138
for (int i = 0; i < size; ++i) {
171139
batchReq.insertNodeReqs.add(
172140
PipeTransferTabletInsertNodeReq.toTPipeTransferRawReq(
@@ -188,11 +156,6 @@ public static PipeTransferTabletBatchReq fromTPipeTransferReq(
188156

189157
/////////////////////////////// TestOnly ///////////////////////////////
190158

191-
@TestOnly
192-
public List<PipeTransferTabletBinaryReq> getBinaryReqs() {
193-
return binaryReqs;
194-
}
195-
196159
@TestOnly
197160
public List<PipeTransferTabletInsertNodeReq> getInsertNodeReqs() {
198161
return insertNodeReqs;
@@ -214,8 +177,7 @@ public boolean equals(final Object obj) {
214177
return false;
215178
}
216179
final PipeTransferTabletBatchReq that = (PipeTransferTabletBatchReq) obj;
217-
return binaryReqs.equals(that.binaryReqs)
218-
&& insertNodeReqs.equals(that.insertNodeReqs)
180+
return insertNodeReqs.equals(that.insertNodeReqs)
219181
&& tabletReqs.equals(that.tabletReqs)
220182
&& version == that.version
221183
&& type == that.type
@@ -224,6 +186,6 @@ public boolean equals(final Object obj) {
224186

225187
@Override
226188
public int hashCode() {
227-
return Objects.hash(binaryReqs, insertNodeReqs, tabletReqs, version, type, body);
189+
return Objects.hash(insertNodeReqs, tabletReqs, version, type, body);
228190
}
229191
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/airgap/IoTDBDataRegionAirGapSink.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
3131
import org.apache.iotdb.db.pipe.metric.overview.PipeResourceMetrics;
3232
import org.apache.iotdb.db.pipe.metric.sink.PipeDataRegionSinkMetrics;
33-
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletBinaryReq;
3433
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletInsertNodeReq;
3534
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReq;
3635
import org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTsFilePieceReq;
@@ -194,13 +193,9 @@ private void doTransferWrapper(
194193
private void doTransfer(
195194
final AirGapSocket socket,
196195
final PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletInsertionEvent)
197-
throws PipeException, WALPipeException, IOException {
196+
throws PipeException, IOException {
198197
final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode();
199-
final byte[] bytes =
200-
Objects.isNull(insertNode)
201-
? PipeTransferTabletBinaryReq.toTPipeTransferBytes(
202-
pipeInsertNodeTabletInsertionEvent.getByteBuffer())
203-
: PipeTransferTabletInsertNodeReq.toTPipeTransferBytes(insertNode);
198+
final byte[] bytes = PipeTransferTabletInsertNodeReq.toTPipeTransferBytes(insertNode);
204199

205200
if (!send(
206201
pipeInsertNodeTabletInsertionEvent.getPipeName(),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusAsyncSink.java

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
import org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.handler.PipeConsensusTabletInsertNodeEventHandler;
4949
import org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.handler.PipeConsensusTsFileInsertionEventHandler;
5050
import org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.builder.PipeConsensusAsyncBatchReqBuilder;
51-
import org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
5251
import org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
5352
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
5453
import org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
@@ -64,7 +63,6 @@
6463
import java.io.FileNotFoundException;
6564
import java.io.IOException;
6665
import java.util.Iterator;
67-
import java.util.Objects;
6866
import java.util.concurrent.BlockingQueue;
6967
import java.util.concurrent.LinkedBlockingDeque;
7068
import java.util.concurrent.LinkedBlockingQueue;
@@ -280,15 +278,8 @@ public void transfer(TabletInsertionEvent tabletInsertionEvent) throws Exception
280278
final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode();
281279
final ProgressIndex progressIndex = pipeInsertNodeTabletInsertionEvent.getProgressIndex();
282280
final TPipeConsensusTransferReq pipeConsensusTransferReq =
283-
Objects.isNull(insertNode)
284-
? PipeConsensusTabletBinaryReq.toTPipeConsensusTransferReq(
285-
pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
286-
tCommitId,
287-
tConsensusGroupId,
288-
progressIndex,
289-
thisDataNodeId)
290-
: PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
291-
insertNode, tCommitId, tConsensusGroupId, progressIndex, thisDataNodeId);
281+
PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
282+
insertNode, tCommitId, tConsensusGroupId, progressIndex, thisDataNodeId);
292283
final PipeConsensusTabletInsertNodeEventHandler pipeConsensusInsertNodeReqHandler =
293284
new PipeConsensusTabletInsertNodeEventHandler(
294285
pipeInsertNodeTabletInsertionEvent,

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusSyncSink.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@
3838
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
3939
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
4040
import org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.builder.PipeConsensusSyncBatchReqBuilder;
41-
import org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
4241
import org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
4342
import org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceReq;
4443
import org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTsFilePieceWithModReq;
@@ -255,21 +254,10 @@ private void doTransfer(PipeInsertNodeTabletInsertionEvent pipeInsertNodeTabletI
255254
insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode();
256255
progressIndex = pipeInsertNodeTabletInsertionEvent.getProgressIndex();
257256

258-
if (insertNode != null) {
259-
resp =
260-
syncPipeConsensusServiceClient.pipeConsensusTransfer(
261-
PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
262-
insertNode, tCommitId, tConsensusGroupId, progressIndex, thisDataNodeId));
263-
} else {
264-
resp =
265-
syncPipeConsensusServiceClient.pipeConsensusTransfer(
266-
PipeConsensusTabletBinaryReq.toTPipeConsensusTransferReq(
267-
pipeInsertNodeTabletInsertionEvent.getByteBuffer(),
268-
tCommitId,
269-
tConsensusGroupId,
270-
progressIndex,
271-
thisDataNodeId));
272-
}
257+
resp =
258+
syncPipeConsensusServiceClient.pipeConsensusTransfer(
259+
PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
260+
insertNode, tCommitId, tConsensusGroupId, progressIndex, thisDataNodeId));
273261
} catch (final Exception e) {
274262
throw new PipeConnectionException(
275263
String.format(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/payload/builder/PipeConsensusTransferBatchReqBuilder.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
2929
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
3030
import org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTabletBatchReq;
31-
import org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
3231
import org.apache.iotdb.db.pipe.sink.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
3332
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
3433
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALPipeException;
@@ -199,17 +198,10 @@ protected int buildTabletInsertionBuffer(TabletInsertionEvent event) throws WALP
199198
final InsertNode insertNode = pipeInsertNodeTabletInsertionEvent.getInsertNode();
200199
// PipeConsensus will transfer binary data to TPipeConsensusTransferReq
201200
final ProgressIndex progressIndex = pipeInsertNodeTabletInsertionEvent.getProgressIndex();
202-
if (Objects.isNull(insertNode)) {
203-
buffer = pipeInsertNodeTabletInsertionEvent.getByteBuffer();
204-
batchReqs.add(
205-
PipeConsensusTabletBinaryReq.toTPipeConsensusTransferReq(
206-
buffer, commitId, consensusGroupId, progressIndex, thisDataNodeId));
207-
} else {
208-
buffer = insertNode.serializeToByteBuffer();
209-
batchReqs.add(
210-
PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
211-
insertNode, commitId, consensusGroupId, progressIndex, thisDataNodeId));
212-
}
201+
buffer = insertNode.serializeToByteBuffer();
202+
batchReqs.add(
203+
PipeConsensusTabletInsertNodeReq.toTPipeConsensusTransferReq(
204+
insertNode, commitId, consensusGroupId, progressIndex, thisDataNodeId));
213205

214206
return buffer.limit();
215207
}

0 commit comments

Comments
 (0)