Skip to content

Commit e1095fd

Browse files
authored
[To dev/1.3] Add result size limit and time slice control for TransformOperator (#17402)
1 parent 064ba9e commit e1095fd

2 files changed

Lines changed: 146 additions & 7 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/TransformOperator.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import java.util.Arrays;
5656
import java.util.List;
5757
import java.util.Map;
58+
import java.util.concurrent.TimeUnit;
5859

5960
public class TransformOperator implements ProcessOperator {
6061

@@ -230,6 +231,7 @@ public final boolean hasNext() throws Exception {
230231
@Override
231232
public TsBlock next() throws Exception {
232233

234+
long start = System.nanoTime();
233235
try {
234236
YieldableState yieldableState = iterateAllColumnsToNextValid();
235237
if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
@@ -241,9 +243,10 @@ public TsBlock next() throws Exception {
241243
final TimeColumnBuilder timeBuilder = tsBlockBuilder.getTimeColumnBuilder();
242244
final ColumnBuilder[] columnBuilders = tsBlockBuilder.getValueColumnBuilders();
243245
final int columnCount = columnBuilders.length;
244-
245-
int rowCount = 0;
246-
while (!timeHeap.isEmpty()) {
246+
long maxRuntime = operatorContext.getMaxRunTime().roundTo(TimeUnit.NANOSECONDS);
247+
while (!timeHeap.isEmpty()
248+
&& !tsBlockBuilder.isFull()
249+
&& System.nanoTime() - start < maxRuntime) {
247250
final long currentTime = timeHeap.pollFirst();
248251

249252
// time
@@ -258,25 +261,26 @@ public TsBlock next() throws Exception {
258261
}
259262
timeHeap.add(currentTime);
260263

261-
tsBlockBuilder.declarePositions(rowCount);
262264
return tsBlockBuilder.build();
263265
}
264266
}
265267

266268
prepareEachColumn(columnCount);
267269

268-
++rowCount;
270+
tsBlockBuilder.declarePosition();
269271

270272
yieldableState = iterateAllColumnsToNextValid();
271273
if (yieldableState == YieldableState.NOT_YIELDABLE_WAITING_FOR_DATA) {
272-
tsBlockBuilder.declarePositions(rowCount);
273274
return tsBlockBuilder.build();
274275
}
275276

276277
inputLayer.updateRowRecordListEvictionUpperBound();
277278
}
278279

279-
tsBlockBuilder.declarePositions(rowCount);
280+
if (tsBlockBuilder.isEmpty()) {
281+
return null;
282+
}
283+
280284
return tsBlockBuilder.build();
281285
} catch (Exception e) {
282286
throw new RuntimeException(e);

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/operator/TransformOperatorTest.java

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,29 +20,42 @@
2020
package org.apache.iotdb.db.queryengine.execution.operator;
2121

2222
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
23+
import org.apache.iotdb.commons.udf.service.UDFClassLoaderManager;
2324
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
25+
import org.apache.iotdb.db.queryengine.common.NodeRef;
2426
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
2527
import org.apache.iotdb.db.queryengine.common.QueryId;
2628
import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
2729
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
2830
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
2931
import org.apache.iotdb.db.queryengine.execution.operator.process.TransformOperator;
3032
import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
33+
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
34+
import org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand;
3135
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
36+
import org.apache.iotdb.db.queryengine.plan.planner.plan.parameter.InputLocation;
3237
import org.apache.iotdb.db.queryengine.transformation.api.LayerReader;
3338
import org.apache.iotdb.db.queryengine.transformation.dag.input.QueryDataSetInputLayer;
3439
import org.apache.iotdb.db.queryengine.transformation.dag.input.TsBlockInputDataSet;
3540

3641
import com.google.common.collect.ImmutableList;
42+
import com.google.common.collect.ImmutableMap;
43+
import org.apache.tsfile.common.conf.TSFileDescriptor;
3744
import org.apache.tsfile.enums.TSDataType;
3845
import org.apache.tsfile.read.common.block.TsBlock;
3946
import org.apache.tsfile.read.common.block.column.LongColumn;
4047
import org.apache.tsfile.read.common.block.column.TimeColumn;
48+
import org.junit.Assert;
4149
import org.junit.Test;
4250

51+
import java.time.ZoneId;
52+
import java.util.HashMap;
53+
import java.util.List;
54+
import java.util.Map;
4355
import java.util.Optional;
4456

4557
import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
58+
import static org.apache.iotdb.db.queryengine.plan.expression.leaf.TimeSeriesOperand.constructColumnHeaderExpression;
4659

4760
public class TransformOperatorTest {
4861

@@ -139,4 +152,126 @@ public long ramBytesUsed() {
139152
reader.yield();
140153
reader.consumedAll();
141154
}
155+
156+
@Test
157+
public void testTransformResultLimit() throws Exception {
158+
UDFClassLoaderManager.setupAndGetInstance();
159+
int savedMaxLine = TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber();
160+
try {
161+
int rowCount = 2001;
162+
int maxLine = 200;
163+
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(200);
164+
QueryId queryId = new QueryId("stub_query_chunk");
165+
FragmentInstanceId instanceId =
166+
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
167+
FragmentInstanceStateMachine stateMachine =
168+
new FragmentInstanceStateMachine(
169+
instanceId,
170+
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"));
171+
FragmentInstanceContext fragmentInstanceContext =
172+
createFragmentInstanceContext(instanceId, stateMachine);
173+
DriverContext driverContext = new DriverContext(fragmentInstanceContext, 0);
174+
PlanNodeId scanNodeId = new PlanNodeId("scan");
175+
driverContext.addOperatorContext(1, scanNodeId, SeriesScanOperator.class.getSimpleName());
176+
PlanNodeId transformNodeId = new PlanNodeId("transform");
177+
driverContext.addOperatorContext(2, transformNodeId, TransformOperator.class.getSimpleName());
178+
179+
long[] times = new long[rowCount];
180+
long[] values = new long[rowCount];
181+
for (int i = 0; i < rowCount; i++) {
182+
times[i] = i;
183+
values[i] = i * 10L;
184+
}
185+
TsBlock oneBatch =
186+
new TsBlock(
187+
new TimeColumn(rowCount, times), new LongColumn(rowCount, Optional.empty(), values));
188+
189+
Operator childOperator =
190+
new Operator() {
191+
boolean consumed = false;
192+
193+
@Override
194+
public OperatorContext getOperatorContext() {
195+
return driverContext.getOperatorContexts().get(0);
196+
}
197+
198+
@Override
199+
public TsBlock next() {
200+
if (!consumed) {
201+
consumed = true;
202+
return oneBatch;
203+
}
204+
return null;
205+
}
206+
207+
@Override
208+
public boolean hasNext() {
209+
return !consumed;
210+
}
211+
212+
@Override
213+
public void close() {}
214+
215+
@Override
216+
public boolean isFinished() {
217+
return consumed;
218+
}
219+
220+
@Override
221+
public long calculateMaxPeekMemory() {
222+
return oneBatch.getSizeInBytes();
223+
}
224+
225+
@Override
226+
public long calculateMaxReturnSize() {
227+
return oneBatch.getSizeInBytes();
228+
}
229+
230+
@Override
231+
public long calculateRetainedSizeAfterCallingNext() {
232+
return 0;
233+
}
234+
235+
@Override
236+
public long ramBytesUsed() {
237+
return 0;
238+
}
239+
};
240+
241+
TimeSeriesOperand s1 = constructColumnHeaderExpression("root.sg.d1.s1", TSDataType.INT64);
242+
Map<String, List<InputLocation>> inputLocations =
243+
ImmutableMap.of(s1.getExpressionString(), ImmutableList.of(new InputLocation(0, 0)));
244+
Map<NodeRef<Expression>, TSDataType> expressionTypes = new HashMap<>();
245+
expressionTypes.put(NodeRef.of(s1), TSDataType.INT64);
246+
247+
TransformOperator transform =
248+
new TransformOperator(
249+
driverContext.getOperatorContexts().get(1),
250+
childOperator,
251+
ImmutableList.of(TSDataType.INT64),
252+
inputLocations,
253+
new Expression[] {s1},
254+
true,
255+
ZoneId.systemDefault(),
256+
expressionTypes,
257+
true);
258+
259+
int totalOutRows = 0;
260+
int nonNullNextCount = 0;
261+
while (transform.hasNext()) {
262+
TsBlock out = transform.next();
263+
if (out != null) {
264+
nonNullNextCount++;
265+
Assert.assertTrue(
266+
"Each batch must be at most " + maxLine + " rows", out.getPositionCount() <= maxLine);
267+
totalOutRows += out.getPositionCount();
268+
}
269+
}
270+
Assert.assertEquals(rowCount, totalOutRows);
271+
System.out.println(nonNullNextCount);
272+
Assert.assertTrue(nonNullNextCount >= 11);
273+
} finally {
274+
TSFileDescriptor.getInstance().getConfig().setMaxTsBlockLineNumber(savedMaxLine);
275+
}
276+
}
142277
}

0 commit comments

Comments
 (0)