Skip to content

Commit 230cd63

Browse files
authored
[to dev/1.3] fix: prevent NPE when isFinished() is called before DataDriver init (#17441)
1 parent 28c4e02 commit 230cd63

4 files changed

Lines changed: 138 additions & 3 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,11 @@ protected boolean init(SettableFuture<?> blockedFuture) {
7474
return true;
7575
}
7676

77+
@Override
78+
public boolean isInit() {
79+
return init;
80+
}
81+
7782
/**
7883
* Init seq file list and unseq file list in {@link
7984
* org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource} and set it into each

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ private boolean isFinishedInternal() {
213213
finished =
214214
state.get() != State.ALIVE
215215
|| driverContext.isDone()
216-
|| root.isFinished()
216+
|| (isInit() && root.isFinished())
217217
|| sink.isClosed();
218218
} catch (Exception e) {
219219
throw new RuntimeException(e);
@@ -224,6 +224,8 @@ private boolean isFinishedInternal() {
224224
return finished;
225225
}
226226

227+
abstract boolean isInit();
228+
227229
@SuppressWarnings({"squid:S1181", "squid:S112"})
228230
private ListenableFuture<?> processInternal() {
229231
long startTimeNanos = System.nanoTime();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ protected boolean init(SettableFuture<?> blockedFuture) {
4242
return true;
4343
}
4444

45+
@Override
46+
boolean isInit() {
47+
return true;
48+
}
49+
4550
@Override
4651
protected void releaseResource() {
4752
driverContext.getFragmentInstanceContext().decrementNumOfUnClosedDriver();

iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java

Lines changed: 125 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,9 @@
2222
import org.apache.iotdb.commons.exception.IllegalPathException;
2323
import org.apache.iotdb.commons.exception.MetadataException;
2424
import org.apache.iotdb.commons.path.MeasurementPath;
25+
import org.apache.iotdb.commons.path.PartialPath;
2526
import org.apache.iotdb.db.conf.IoTDBDescriptor;
27+
import org.apache.iotdb.db.exception.WriteProcessException;
2628
import org.apache.iotdb.db.exception.query.QueryProcessException;
2729
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
2830
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
@@ -35,7 +37,9 @@
3537
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState;
3638
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
3739
import org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator;
40+
import org.apache.iotdb.db.queryengine.execution.operator.process.SingleDeviceViewOperator;
3841
import org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator;
42+
import org.apache.iotdb.db.queryengine.execution.operator.process.join.LeftOuterTimeJoinOperator;
3943
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
4044
import org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
4145
import org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
@@ -51,7 +55,6 @@
5155
import com.google.common.util.concurrent.ListenableFuture;
5256
import io.airlift.units.Duration;
5357
import org.apache.tsfile.enums.TSDataType;
54-
import org.apache.tsfile.exception.write.WriteProcessException;
5558
import org.apache.tsfile.read.common.block.TsBlock;
5659
import org.apache.tsfile.read.common.block.column.IntColumn;
5760
import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -91,7 +94,11 @@ public class DataDriverTest {
9194
TimeUnit.MILLISECONDS);
9295

9396
@Before
94-
public void setUp() throws MetadataException, IOException, WriteProcessException {
97+
public void setUp()
98+
throws MetadataException,
99+
IOException,
100+
WriteProcessException,
101+
org.apache.tsfile.exception.write.WriteProcessException {
95102
IoTDBDescriptor.getInstance().getConfig().setDriverTaskExecutionTimeSliceInMs(10000);
96103
SeriesReaderTestUtil.setUp(
97104
measurementSchemas, deviceIds, seqResources, unSeqResources, DATA_DRIVER_TEST_SG);
@@ -248,4 +255,120 @@ public void batchTest() {
248255
instanceNotificationExecutor.shutdown();
249256
}
250257
}
258+
259+
@Test
260+
public void testCallIsFinishedBeforeDataSourcePrepared() throws IllegalPathException {
261+
ExecutorService instanceNotificationExecutor =
262+
IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification");
263+
try {
264+
MeasurementPath measurementPath1 =
265+
new MeasurementPath(
266+
new PartialPath(DATA_DRIVER_TEST_SG + ".device0"),
267+
new MeasurementSchema("sensor0", TSDataType.INT32));
268+
Set<String> allSensors = new HashSet<>();
269+
allSensors.add("sensor0");
270+
allSensors.add("sensor1");
271+
QueryId queryId = new QueryId("stub_query");
272+
FragmentInstanceId instanceId =
273+
new FragmentInstanceId(new PlanFragmentId(queryId, 0), "stub-instance");
274+
FragmentInstanceStateMachine stateMachine =
275+
new FragmentInstanceStateMachine(instanceId, instanceNotificationExecutor);
276+
DataRegion dataRegion = Mockito.mock(DataRegion.class);
277+
Mockito.when(dataRegion.tryReadLock(Mockito.anyLong())).thenReturn(true);
278+
FragmentInstanceContext fragmentInstanceContext =
279+
createFragmentInstanceContext(instanceId, stateMachine);
280+
fragmentInstanceContext.setDataRegion(dataRegion);
281+
DataDriverContext driverContext = new DataDriverContext(fragmentInstanceContext, 0);
282+
PlanNodeId planNodeId1 = new PlanNodeId("1");
283+
driverContext.addOperatorContext(1, planNodeId1, SeriesScanOperator.class.getSimpleName());
284+
PlanNodeId planNodeId2 = new PlanNodeId("2");
285+
driverContext.addOperatorContext(2, planNodeId2, SeriesScanOperator.class.getSimpleName());
286+
driverContext.addOperatorContext(
287+
3, new PlanNodeId("3"), FullOuterTimeJoinOperator.class.getSimpleName());
288+
driverContext.addOperatorContext(4, new PlanNodeId("4"), LimitOperator.class.getSimpleName());
289+
290+
SeriesScanOptions.Builder scanOptionsBuilder = new SeriesScanOptions.Builder();
291+
scanOptionsBuilder.withAllSensors(allSensors);
292+
SeriesScanOperator seriesScanOperator1 =
293+
new SeriesScanOperator(
294+
driverContext.getOperatorContexts().get(0),
295+
planNodeId1,
296+
measurementPath1,
297+
Ordering.ASC,
298+
scanOptionsBuilder.build());
299+
driverContext.addSourceOperator(seriesScanOperator1);
300+
driverContext.addPath(measurementPath1);
301+
seriesScanOperator1
302+
.getOperatorContext()
303+
.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
304+
305+
MeasurementPath measurementPath2 =
306+
new MeasurementPath(
307+
new PartialPath(DATA_DRIVER_TEST_SG + ".device0"),
308+
new MeasurementSchema("sensor1", TSDataType.INT32));
309+
SeriesScanOperator seriesScanOperator2 =
310+
new SeriesScanOperator(
311+
driverContext.getOperatorContexts().get(1),
312+
planNodeId2,
313+
measurementPath2,
314+
Ordering.ASC,
315+
scanOptionsBuilder.build());
316+
driverContext.addSourceOperator(seriesScanOperator2);
317+
driverContext.addPath(measurementPath2);
318+
319+
seriesScanOperator2
320+
.getOperatorContext()
321+
.setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
322+
323+
LeftOuterTimeJoinOperator timeJoinOperator =
324+
new LeftOuterTimeJoinOperator(
325+
driverContext.getOperatorContexts().get(2),
326+
seriesScanOperator1,
327+
1,
328+
seriesScanOperator2,
329+
Arrays.asList(TSDataType.INT32, TSDataType.INT32),
330+
new AscTimeComparator());
331+
SingleDeviceViewOperator fakeOperator =
332+
new SingleDeviceViewOperator(
333+
driverContext.getOperatorContexts().get(3),
334+
"d1",
335+
timeJoinOperator,
336+
Arrays.asList(0),
337+
Arrays.asList(TSDataType.INT32, TSDataType.INT32));
338+
fakeOperator.getOperatorContext().setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
339+
340+
fragmentInstanceContext.setSourcePaths(driverContext.getPaths());
341+
String deviceId = DATA_DRIVER_TEST_SG + ".device0";
342+
Mockito.when(
343+
dataRegion.query(
344+
eq(driverContext.getPaths()),
345+
eq(deviceId),
346+
eq(fragmentInstanceContext),
347+
Mockito.isNull(),
348+
Mockito.isNull(),
349+
Mockito.anyLong()))
350+
.thenReturn(null);
351+
fragmentInstanceContext.initQueryDataSource(driverContext.getPaths());
352+
fragmentInstanceContext.initializeNumOfDrivers(1);
353+
354+
StubSink stubSink = new StubSink(fragmentInstanceContext);
355+
driverContext.setSink(stubSink);
356+
IDriver dataDriver = null;
357+
try {
358+
dataDriver = new DataDriver(fakeOperator, driverContext, 0);
359+
assertEquals(
360+
fragmentInstanceContext.getId(), dataDriver.getDriverTaskId().getFragmentInstanceId());
361+
assertFalse(dataDriver.isFinished());
362+
} finally {
363+
if (dataDriver != null) {
364+
dataDriver.close();
365+
}
366+
}
367+
} catch (QueryProcessException e) {
368+
e.printStackTrace();
369+
fail();
370+
} finally {
371+
instanceNotificationExecutor.shutdown();
372+
}
373+
}
251374
}

0 commit comments

Comments
 (0)