Skip to content

Commit 4903f3c

Browse files
authored
[To dev/1.3] Fix kill query doesn't take effect bug (#17358)
1 parent 81397b2 commit 4903f3c

10 files changed

Lines changed: 201 additions & 84 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/ClientRPCServiceImpl.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,7 @@
205205
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onNpeOrUnexpectedException;
206206
import static org.apache.iotdb.db.utils.ErrorHandlingUtils.onQueryException;
207207
import static org.apache.iotdb.db.utils.QueryDataSetUtils.convertTsBlockByFetchSize;
208+
import static org.apache.iotdb.rpc.TSStatusCode.QUERY_WAS_KILLED;
208209

209210
public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
210211

@@ -241,6 +242,9 @@ public class ClientRPCServiceImpl implements IClientRPCServiceWithHandler {
241242
private static final int DEFAULT_MAX_TSBLOCK_SIZE_IN_BYTES =
242243
TSFileDescriptor.getInstance().getConfig().getMaxTsBlockSizeInBytes();
243244

245+
private static final String NO_QUERY_EXECUTION_ERR_MSG =
246+
"Query is not found, it may be killed by others, timeout or some other runtime errors, you can see more details in server log.";
247+
244248
@FunctionalInterface
245249
public interface SelectResult {
246250
boolean apply(TSExecuteStatementResp resp, IQueryExecution queryExecution, int fetchSize)
@@ -1147,15 +1151,18 @@ public TSFetchResultsResp fetchResultsV2(TSFetchResultsReq req) {
11471151
finished = true;
11481152
return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
11491153
}
1150-
TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
11511154

11521155
queryExecution = COORDINATOR.getQueryExecution(req.queryId);
11531156

11541157
if (queryExecution == null) {
1155-
resp.setHasResultSet(false);
1156-
resp.setMoreData(false);
1157-
return resp;
1158+
TSStatus noQueryExecutionStatus = new TSStatus(QUERY_WAS_KILLED.getStatusCode());
1159+
noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG);
1160+
return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus);
11581161
}
1162+
1163+
TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
1164+
1165+
queryExecution.updateCurrentRpcStartTime(startTime);
11591166
statementType = queryExecution.getStatementType();
11601167

11611168
try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) {
@@ -1686,16 +1693,16 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
16861693
return RpcUtils.getTSFetchResultsResp(getNotLoggedInStatus());
16871694
}
16881695

1689-
TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
1690-
16911696
queryExecution = COORDINATOR.getQueryExecution(req.queryId);
16921697
if (queryExecution == null) {
1693-
resp.setHasResultSet(false);
1694-
resp.setMoreData(true);
1695-
return resp;
1698+
TSStatus noQueryExecutionStatus = new TSStatus(QUERY_WAS_KILLED.getStatusCode());
1699+
noQueryExecutionStatus.setMessage(NO_QUERY_EXECUTION_ERR_MSG);
1700+
return RpcUtils.getTSFetchResultsResp(noQueryExecutionStatus);
16961701
}
1702+
queryExecution.updateCurrentRpcStartTime(startTime);
16971703
statementType = queryExecution.getStatementType();
16981704

1705+
TSFetchResultsResp resp = RpcUtils.getTSFetchResultsResp(TSStatusCode.SUCCESS_STATUS);
16991706
try (SetThreadName queryName = new SetThreadName(queryExecution.getQueryId())) {
17001707
Pair<TSQueryDataSet, Boolean> pair =
17011708
convertTsBlockByFetchSize(queryExecution, req.fetchSize);
@@ -1705,7 +1712,7 @@ public TSFetchResultsResp fetchResults(TSFetchResultsReq req) {
17051712
resp.setHasResultSet(hasResultSet);
17061713
resp.setQueryDataSet(result);
17071714
resp.setIsAlign(true);
1708-
resp.setMoreData(finished);
1715+
resp.setMoreData(!finished);
17091716
return resp;
17101717
}
17111718
} catch (Exception e) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
2323
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
24+
import org.apache.iotdb.commons.utils.TestOnly;
2425
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
2526
import org.apache.iotdb.db.queryengine.plan.analyze.PredicateUtils;
2627
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
@@ -51,7 +52,11 @@ public class MPPQueryContext {
5152
private long localQueryId;
5253
private SessionInfo session;
5354
private QueryType queryType = QueryType.READ;
55+
56+
/** the max executing time of query in ms. Unit: millisecond */
5457
private long timeOut;
58+
59+
// time unit is ms
5560
private long startTime;
5661

5762
private TEndPoint localDataBlockEndpoint;
@@ -95,26 +100,22 @@ public class MPPQueryContext {
95100

96101
private boolean userQuery = false;
97102

103+
@TestOnly
98104
public MPPQueryContext(QueryId queryId) {
99105
this.queryId = queryId;
100106
this.endPointBlackList = ConcurrentHashMap.newKeySet();
101107
this.memoryReservationManager =
102108
new NotThreadSafeMemoryReservationManager(queryId, this.getClass().getName());
103109
}
104110

105-
// TODO too many callers just pass a null SessionInfo which should be forbidden
111+
@TestOnly
106112
public MPPQueryContext(
107113
String sql,
108114
QueryId queryId,
109115
SessionInfo session,
110116
TEndPoint localDataBlockEndpoint,
111117
TEndPoint localInternalEndpoint) {
112-
this(queryId);
113-
this.sql = sql;
114-
this.session = session;
115-
this.localDataBlockEndpoint = localDataBlockEndpoint;
116-
this.localInternalEndpoint = localInternalEndpoint;
117-
this.initResultNodeContext();
118+
this(sql, queryId, -1, session, localDataBlockEndpoint, localInternalEndpoint);
118119
}
119120

120121
public MPPQueryContext(
@@ -182,10 +183,12 @@ public QueryType getQueryType() {
182183
return queryType;
183184
}
184185

186+
/** the max executing time of query in ms. Unit: millisecond */
185187
public long getTimeOut() {
186188
return timeOut;
187189
}
188190

191+
/** the max executing time of query in ms. Unit: millisecond */
189192
public void setTimeOut(long timeOut) {
190193
this.timeOut = timeOut;
191194
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/QueryStateMachine.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,10 @@ public void transitionToCanceled() {
107107
transitionToDoneState(CANCELED);
108108
}
109109

110-
public void transitionToCanceled(Throwable throwable, TSStatus failureStatus) {
110+
public boolean transitionToCanceled(Throwable throwable, TSStatus failureStatus) {
111111
this.failureStatus.compareAndSet(null, failureStatus);
112112
this.failureException.compareAndSet(null, throwable);
113-
transitionToDoneState(CANCELED);
113+
return transitionToDoneState(CANCELED);
114114
}
115115

116116
public void transitionToAborted() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler;
3838
import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
3939
import org.apache.iotdb.db.queryengine.metric.QueryRelatedResourceMetricSet;
40+
import org.apache.iotdb.db.queryengine.plan.Coordinator;
4041
import org.apache.iotdb.db.queryengine.plan.planner.LocalExecutionPlanner;
4142
import org.apache.iotdb.db.queryengine.plan.planner.PipelineDriverFactory;
4243
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
@@ -427,6 +428,7 @@ private void cancelTimeoutFlushingInstances() {
427428
+ "ms, and now is in flushing state"));
428429
}
429430
});
431+
Coordinator.getInstance().cleanUpStaleQueries();
430432
}
431433

432434
public ExecutorService getIntoOperationExecutor() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.iotdb.commons.conf.IoTDBConstant;
3232
import org.apache.iotdb.db.conf.IoTDBConfig;
3333
import org.apache.iotdb.db.conf.IoTDBDescriptor;
34+
import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
3435
import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints;
3536
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
3637
import org.apache.iotdb.db.queryengine.common.QueryId;
@@ -241,7 +242,6 @@ public int getQueryExecutionMapSize() {
241242
return queryExecutionMap.size();
242243
}
243244

244-
// TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
245245
private ExecutorService getQueryExecutor() {
246246
int coordinatorReadExecutorSize = CONFIG.getCoordinatorReadExecutorSize();
247247
return IoTDBThreadPoolFactory.newFixedThreadPool(
@@ -254,7 +254,6 @@ private ExecutorService getWriteExecutor() {
254254
coordinatorWriteExecutorSize, ThreadName.MPP_COORDINATOR_WRITE_EXECUTOR.getName());
255255
}
256256

257-
// TODO: (xingtanzjr) need to redo once we have a concrete policy for the threadPool management
258257
private ScheduledExecutorService getScheduledExecutor() {
259258
return IoTDBThreadPoolFactory.newScheduledThreadPool(
260259
COORDINATOR_SCHEDULED_EXECUTOR_SIZE,
@@ -267,12 +266,11 @@ public QueryId createQueryId() {
267266

268267
public void cleanupQueryExecution(
269268
Long queryId, org.apache.thrift.TBase<?, ?> nativeApiRequest, Throwable t) {
270-
IQueryExecution queryExecution = getQueryExecution(queryId);
269+
IQueryExecution queryExecution = queryExecutionMap.remove(queryId);
271270
if (queryExecution != null) {
272271
try (SetThreadName threadName = new SetThreadName(queryExecution.getQueryId())) {
273272
LOGGER.debug("[CleanUpQuery]]");
274273
queryExecution.stopAndCleanup(t);
275-
queryExecutionMap.remove(queryId);
276274
if (queryExecution.isQuery() && queryExecution.isUserQuery()) {
277275
long costTime = queryExecution.getTotalExecutionTime();
278276
// print slow query
@@ -300,6 +298,35 @@ public void cleanupQueryExecution(
300298
}
301299
}
302300

301+
/**
302+
* We need to reclaim resources from queries that have exceeded their timeout by more than one
303+
* minute. This indicates that the associated clients have failed to perform proper resource
304+
* cleanup.
305+
*/
306+
public void cleanUpStaleQueries() {
307+
long currentTime = System.currentTimeMillis();
308+
queryExecutionMap.forEach(
309+
(queryId, queryExecution) -> {
310+
if (queryExecution.isActive()) {
311+
return;
312+
}
313+
long timeout = queryExecution.getTimeout();
314+
long queryStartTime = queryExecution.getStartExecutionTime();
315+
long executeTime = currentTime - queryStartTime;
316+
if (timeout > 0 && executeTime - 60_000L > timeout) {
317+
LOGGER.warn(
318+
"Cleaning up stale query with id {}, which has been running for {} ms, timeout duration is: {}ms",
319+
queryId,
320+
executeTime,
321+
timeout);
322+
cleanupQueryExecution(
323+
queryId,
324+
null,
325+
new QueryTimeoutRuntimeException(queryStartTime, currentTime, timeout));
326+
}
327+
});
328+
}
329+
303330
public void cleanupQueryExecution(Long queryId) {
304331
cleanupQueryExecution(queryId, null, null);
305332
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/ClusterSchemaFetchExecutor.java

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
3636
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
3737
import org.apache.iotdb.db.queryengine.plan.execution.ExecutionResult;
38+
import org.apache.iotdb.db.queryengine.plan.execution.IQueryExecution;
3839
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
3940
import org.apache.iotdb.db.queryengine.plan.statement.internal.DeviceSchemaFetchStatement;
4041
import org.apache.iotdb.db.queryengine.plan.statement.internal.SeriesSchemaFetchStatement;
@@ -250,29 +251,37 @@ private ClusterSchemaTree executeSchemaFetchQuery(
250251
String.format("Fetch Schema failed, because %s", executionResult.status.getMessage()),
251252
executionResult.status.getCode());
252253
}
254+
IQueryExecution queryExecution = coordinator.getQueryExecution(queryId);
253255
try (SetThreadName threadName = new SetThreadName(executionResult.queryId.getId())) {
254256
ClusterSchemaTree result = new ClusterSchemaTree();
255257
ClusterSchemaTree.SchemaNodeBatchDeserializer deserializer =
256258
new ClusterSchemaTree.SchemaNodeBatchDeserializer();
257259
Set<String> databaseSet = new HashSet<>();
258-
while (coordinator.getQueryExecution(queryId).hasNextResult()) {
259-
// The query will be transited to FINISHED when invoking getBatchResult() at the last time
260-
// So we don't need to clean up it manually
261-
Optional<TsBlock> tsBlock;
262-
try {
263-
tsBlock = coordinator.getQueryExecution(queryId).getBatchResult();
264-
} catch (IoTDBException e) {
265-
t = e;
266-
throw new RuntimeException("Fetch Schema failed. ", e);
267-
}
268-
if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
269-
break;
270-
}
271-
Column column = tsBlock.get().getColumn(0);
272-
for (int i = 0; i < column.getPositionCount(); i++) {
273-
parseFetchedData(column.getBinary(i), result, deserializer, databaseSet, context);
260+
if (queryExecution != null) {
261+
while (queryExecution.hasNextResult()) {
262+
// The query will be transited to FINISHED when invoking getBatchResult() at the last
263+
// time
264+
// So we don't need to clean up it manually
265+
Optional<TsBlock> tsBlock;
266+
try {
267+
tsBlock = queryExecution.getBatchResult();
268+
} catch (IoTDBException e) {
269+
t = e;
270+
throw new RuntimeException("Fetch Schema failed. ", e);
271+
}
272+
if (!tsBlock.isPresent() || tsBlock.get().isEmpty()) {
273+
break;
274+
}
275+
Column column = tsBlock.get().getColumn(0);
276+
for (int i = 0; i < column.getPositionCount(); i++) {
277+
parseFetchedData(column.getBinary(i), result, deserializer, databaseSet, context);
278+
}
274279
}
280+
} else {
281+
throw new RuntimeException(
282+
String.format("Fetch Schema failed, because queryExecution is null for %s", queryId));
275283
}
284+
276285
result.setDatabases(databaseSet);
277286
return result;
278287
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/IQueryExecution.java

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ public interface IQueryExecution {
3333

3434
void stop(Throwable t);
3535

36-
void stopAndCleanup();
37-
3836
void stopAndCleanup(Throwable t);
3937

4038
void cancel();
@@ -57,10 +55,32 @@ public interface IQueryExecution {
5755

5856
String getQueryId();
5957

58+
// time unit is ms
6059
long getStartExecutionTime();
6160

61+
/**
62+
* @param executionTime time unit should be ns
63+
*/
6264
void recordExecutionTime(long executionTime);
6365

66+
/**
67+
* update current rpc start time, which is used to calculate rpc execution time and update total
68+
* execution time
69+
*
70+
* @param startTime start time of current rpc, time unit is ns
71+
*/
72+
void updateCurrentRpcStartTime(long startTime);
73+
74+
/**
75+
* Check if there is an active RPC for this query. If {@code startTimeOfCurrentRpc == -1}, it
76+
* means there is no active RPC, otherwise there is an active RPC. An active RPC means that the
77+
* client is still fetching results and the QueryExecution should not be cleaned up until the RPC
78+
* finishes. On the other hand, if there is no active RPC, it means that the client has finished
79+
* fetching results or has not started fetching results yet, and the QueryExecution can be safely
80+
* cleaned up.
81+
*/
82+
boolean isActive();
83+
6484
/**
6585
* @return cost time in ns
6686
*/
@@ -69,6 +89,7 @@ public interface IQueryExecution {
6989
/** return ip for a thrift-based client, client-id for MQTT/REST client */
7090
String getClientHostname();
7191

92+
/** the max executing time of query in ms. Unit: millisecond */
7293
long getTimeout();
7394

7495
Optional<String> getExecuteSQL();

0 commit comments

Comments
 (0)