Skip to content

Commit 1f1d2c1

Browse files
authored
[To dev/1.3] Pipe: Fixed the OOM bug of tablet memory calculation & Optimized the tablet size by memory estimation (#17451)
* fix * fix * push * ger-limit * fix * [To dev/1.3] Pipe: Fixed the OOM bug of tablet memory calculation & Optimized the tablet size by memory estimation * fix * fix * fix
1 parent 230cd63 commit 1f1d2c1

7 files changed

Lines changed: 65 additions & 81 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.commons.client.property.ClientPoolProperty.DefaultProperty;
2525
import org.apache.iotdb.commons.conf.CommonDescriptor;
2626
import org.apache.iotdb.commons.conf.IoTDBConstant;
27+
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2728
import org.apache.iotdb.commons.utils.FileUtils;
2829
import org.apache.iotdb.commons.utils.TestOnly;
2930
import org.apache.iotdb.consensus.ConsensusFactory;
@@ -3388,6 +3389,15 @@ public void setPartitionCacheSize(int partitionCacheSize) {
33883389
this.partitionCacheSize = partitionCacheSize;
33893390
}
33903391

3392+
public int getPipeDataStructureTabletSizeInBytes() {
3393+
int size = PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes();
3394+
if (size > thriftMaxFrameSize) {
3395+
size = (int) (thriftMaxFrameSize * 0.8);
3396+
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletSizeInBytes(size);
3397+
}
3398+
return size;
3399+
}
3400+
33913401
public int getAuthorCacheSize() {
33923402
return authorCacheSize;
33933403
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/TsFileInsertionDataContainer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121

2222
import org.apache.iotdb.commons.path.PatternTreeMap;
2323
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
24-
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2524
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
2625
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
26+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2727
import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
2828
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
2929
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryBlock;
@@ -92,7 +92,7 @@ protected TsFileInsertionDataContainer(
9292
this.allocatedMemoryBlockForTablet =
9393
PipeDataNodeResourceManager.memory()
9494
.forceAllocateForTabletWithRetry(
95-
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
95+
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
9696
}
9797

9898
/**

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/container/scan/TsFileInsertionScanDataContainer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2525
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
2626
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
27+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2728
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
2829
import org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
2930
import org.apache.iotdb.db.pipe.event.common.tsfile.parser.util.ModsOperationUtil;
@@ -124,7 +125,7 @@ public TsFileInsertionScanDataContainer(
124125
this.allocatedMemoryBlockForBatchData =
125126
PipeDataNodeResourceManager.memory()
126127
.forceAllocateForTabletWithRetry(
127-
PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
128+
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes());
128129
this.allocatedMemoryBlockForChunk =
129130
PipeDataNodeResourceManager.memory()
130131
.forceAllocateForTabletWithRetry(PipeConfig.getInstance().getPipeMaxReaderChunkSize());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/resource/memory/PipeMemoryWeightUtil.java

Lines changed: 25 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.db.pipe.resource.memory;
2121

2222
import org.apache.iotdb.commons.pipe.config.PipeConfig;
23+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2324
import org.apache.iotdb.db.pipe.event.common.row.PipeRow;
2425
import org.apache.iotdb.db.utils.MemUtils;
2526

@@ -33,10 +34,10 @@
3334
import org.apache.tsfile.utils.Pair;
3435
import org.apache.tsfile.utils.TsPrimitiveType;
3536
import org.apache.tsfile.write.record.Tablet;
36-
import org.apache.tsfile.write.schema.MeasurementSchema;
3737

3838
import java.util.List;
3939
import java.util.Map;
40+
import java.util.Objects;
4041

4142
public class PipeMemoryWeightUtil {
4243

@@ -107,7 +108,10 @@ public static Pair<Integer, Integer> calculateTabletRowCountAndMemory(RowRecord
107108
}
108109
}
109110

110-
return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, schemaCount);
111+
return calculateTabletRowCountAndMemoryBySize(
112+
totalSizeInBytes,
113+
schemaCount,
114+
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
111115
}
112116

113117
/**
@@ -152,7 +156,8 @@ public static Pair<Integer, Integer> calculateTabletRowCountAndMemory(BatchData
152156
}
153157
}
154158

155-
return calculateTabletRowCountAndMemoryBySize(totalSizeInBytes, schemaCount);
159+
return calculateTabletRowCountAndMemoryBySize(
160+
totalSizeInBytes, schemaCount, batchData.length());
156161
}
157162

158163
/**
@@ -162,90 +167,43 @@ public static Pair<Integer, Integer> calculateTabletRowCountAndMemory(BatchData
162167
* @return left is the row count of tablet, right is the memory cost of tablet in bytes
163168
*/
164169
public static Pair<Integer, Integer> calculateTabletRowCountAndMemory(PipeRow row) {
165-
return calculateTabletRowCountAndMemoryBySize(row.getCurrentRowSize(), row.size());
170+
return calculateTabletRowCountAndMemoryBySize(
171+
row.getCurrentRowSize(),
172+
row.size(),
173+
PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
166174
}
167175

168176
private static Pair<Integer, Integer> calculateTabletRowCountAndMemoryBySize(
169-
int rowSize, int schemaCount) {
170-
if (rowSize <= 0) {
177+
int rowBytesUsed, int schemaCount, int inputNum) {
178+
if (rowBytesUsed <= 0) {
171179
return new Pair<>(1, 0);
172180
}
173181

174182
// Calculate row number according to the max size of a pipe tablet.
175183
// "-100" is the estimated size of other data structures in a pipe tablet.
176184
// "*8" converts bytes to bits, because the bitmap size is 1 bit per schema.
177-
int rowNumber =
178-
8
179-
* (PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes() - 100)
180-
/ (8 * rowSize + schemaCount);
185+
// Here we estimate the max use of
186+
int sizeLimit =
187+
Math.min(
188+
IoTDBDescriptor.getInstance().getConfig().getPipeDataStructureTabletSizeInBytes(),
189+
(int) (inputNum * rowBytesUsed * 1.2));
190+
191+
int rowNumber = 8 * (sizeLimit - 100) / (8 * rowBytesUsed + schemaCount);
181192
rowNumber = Math.max(1, rowNumber);
182193

183194
if ( // This means the row number is larger than the max row count of a pipe tablet
184195
rowNumber > PipeConfig.getInstance().getPipeDataStructureTabletRowSize()) {
185196
// Bound the row number, the memory cost is rowSize * rowNumber
186197
return new Pair<>(
187198
PipeConfig.getInstance().getPipeDataStructureTabletRowSize(),
188-
rowSize * PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
199+
rowBytesUsed * PipeConfig.getInstance().getPipeDataStructureTabletRowSize());
189200
} else {
190-
return new Pair<>(
191-
rowNumber, PipeConfig.getInstance().getPipeDataStructureTabletSizeInBytes());
201+
return new Pair<>(rowNumber, sizeLimit);
192202
}
193203
}
194204

195-
public static long calculateTabletSizeInBytes(Tablet tablet) {
196-
long totalSizeInBytes = 0;
197-
198-
if (tablet == null) {
199-
return totalSizeInBytes;
200-
}
201-
202-
// timestamps
203-
if (tablet.timestamps != null) {
204-
totalSizeInBytes += tablet.timestamps.length * 8L;
205-
}
206-
207-
// values
208-
final List<MeasurementSchema> timeseries = tablet.getSchemas();
209-
if (timeseries != null) {
210-
for (int column = 0; column < timeseries.size(); column++) {
211-
final MeasurementSchema measurementSchema = timeseries.get(column);
212-
if (measurementSchema == null) {
213-
continue;
214-
}
215-
216-
final TSDataType tsDataType = measurementSchema.getType();
217-
if (tsDataType == null) {
218-
continue;
219-
}
220-
221-
if (tsDataType.isBinary()) {
222-
if (tablet.values == null || tablet.values.length <= column) {
223-
continue;
224-
}
225-
final Binary[] values = ((Binary[]) tablet.values[column]);
226-
if (values == null) {
227-
continue;
228-
}
229-
for (Binary value : values) {
230-
totalSizeInBytes += value == null ? 8 : value.ramBytesUsed();
231-
}
232-
} else {
233-
totalSizeInBytes += (long) tablet.getMaxRowNumber() * tsDataType.getDataTypeSize();
234-
}
235-
}
236-
}
237-
238-
// bitMaps
239-
if (tablet.bitMaps != null) {
240-
for (int i = 0; i < tablet.bitMaps.length; i++) {
241-
totalSizeInBytes += tablet.bitMaps[i] == null ? 0 : tablet.bitMaps[i].getSize();
242-
}
243-
}
244-
245-
// estimate other dataStructures size
246-
totalSizeInBytes += 100;
247-
248-
return totalSizeInBytes;
205+
public static long calculateTabletSizeInBytes(final Tablet tablet) {
206+
return Objects.nonNull(tablet) ? tablet.ramBytesUsed() : 0L;
249207
}
250208

251209
public static int calculateBatchDataRamBytesUsed(BatchData batchData) {

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionDataContainerTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.iotdb.db.pipe.event;
2121

22+
import org.apache.iotdb.commons.conf.CommonDescriptor;
23+
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2224
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
2325
import org.apache.iotdb.commons.pipe.datastructure.pattern.PipePattern;
2426
import org.apache.iotdb.commons.pipe.datastructure.pattern.PrefixPipePattern;
@@ -47,6 +49,7 @@
4749
import org.apache.tsfile.write.schema.MeasurementSchema;
4850
import org.junit.After;
4951
import org.junit.Assert;
52+
import org.junit.Before;
5053
import org.junit.Test;
5154
import org.slf4j.Logger;
5255
import org.slf4j.LoggerFactory;
@@ -78,9 +81,19 @@ public class TsFileInsertionDataContainerTest {
7881
private File alignedTsFile;
7982
private File nonalignedTsFile;
8083
private TsFileResource resource;
84+
private boolean isPipeMemoryManagementEnabled;
85+
86+
@Before
87+
public void setUp() throws Exception {
88+
isPipeMemoryManagementEnabled = PipeConfig.getInstance().getPipeMemoryManagementEnabled();
89+
CommonDescriptor.getInstance().getConfig().setPipeMemoryManagementEnabled(false);
90+
}
8191

8292
@After
8393
public void tearDown() throws Exception {
94+
CommonDescriptor.getInstance()
95+
.getConfig()
96+
.setPipeMemoryManagementEnabled(isPipeMemoryManagementEnabled);
8497
if (alignedTsFile != null) {
8598
alignedTsFile.delete();
8699
}

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,9 @@ public class CommonConfig {
214214
private boolean pipeRetryLocallyForParallelOrUserConflict = true;
215215

216216
private int pipeDataStructureTabletRowSize = 2048;
217-
private int pipeDataStructureTabletSizeInBytes = 2097152;
217+
218+
// 128MB
219+
private int pipeDataStructureTabletSizeInBytes = 60 * 1024 * 1024;
218220
private double pipeDataStructureTabletMemoryBlockAllocationRejectThreshold = 0.3;
219221
private double pipeDataStructureTsFileMemoryBlockAllocationRejectThreshold = 0.3;
220222
private volatile double pipeTotalFloatingMemoryProportion = 0.5;

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,11 @@ protected TPipeTransferResp handleTransferHandshakeV1(final PipeTransferHandshak
141141
} catch (Exception e) {
142142
PipeLogger.log(
143143
LOGGER::warn,
144+
e,
144145
"Receiver id = %s: Failed to delete original receiver file dir %s, because %s.",
145146
receiverId.get(),
146147
receiverFileDirWithIdSuffix.get().getPath(),
147-
e.getMessage(),
148-
e);
148+
e.getMessage());
149149
}
150150
} else {
151151
if (LOGGER.isDebugEnabled()) {
@@ -487,11 +487,11 @@ private void closeCurrentWritingFileWriter(final boolean fsyncAfterClose) {
487487
} catch (final Exception e) {
488488
PipeLogger.log(
489489
LOGGER::warn,
490+
e,
490491
"Receiver id = %s: Failed to close current writing file writer %s, because %s.",
491492
receiverId.get(),
492493
writingFile == null ? "null" : writingFile.getPath(),
493-
e.getMessage(),
494-
e);
494+
e.getMessage());
495495
}
496496
writingFileWriter = null;
497497
} else {
@@ -526,11 +526,11 @@ private void deleteFile(final File file) {
526526
} catch (final Exception e) {
527527
PipeLogger.log(
528528
LOGGER::warn,
529+
e,
529530
"Receiver id = %s: Failed to delete original writing file %s, because %s.",
530531
receiverId.get(),
531532
file.getPath(),
532-
e.getMessage(),
533-
e);
533+
e.getMessage());
534534
}
535535
} else {
536536
if (LOGGER.isDebugEnabled()) {
@@ -611,11 +611,11 @@ protected final TPipeTransferResp handleTransferFileSealV1(final PipeTransferFil
611611
} catch (final Exception e) {
612612
PipeLogger.log(
613613
LOGGER::warn,
614+
e,
614615
"Receiver id = %s: Failed to seal file %s from req %s.",
615616
receiverId.get(),
616617
writingFile,
617-
req,
618-
e);
618+
req);
619619
return new TPipeTransferResp(
620620
RpcUtils.getStatus(
621621
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,
@@ -698,11 +698,11 @@ protected final TPipeTransferResp handleTransferFileSealV2(final PipeTransferFil
698698
} catch (final Exception e) {
699699
PipeLogger.log(
700700
LOGGER::warn,
701+
e,
701702
"Receiver id = %s: Failed to seal file %s from req %s.",
702703
receiverId.get(),
703704
files,
704-
req,
705-
e);
705+
req);
706706
return new TPipeTransferResp(
707707
RpcUtils.getStatus(
708708
TSStatusCode.PIPE_TRANSFER_FILE_ERROR,

0 commit comments

Comments
 (0)