2424import org .apache .iotdb .common .rpc .thrift .TSStatus ;
2525import org .apache .iotdb .common .rpc .thrift .TSeriesPartitionSlot ;
2626import org .apache .iotdb .common .rpc .thrift .TTimePartitionSlot ;
27+ import org .apache .iotdb .commons .cluster .NodeStatus ;
2728import org .apache .iotdb .commons .enums .DataPartitionTableGeneratorState ;
2829import org .apache .iotdb .commons .partition .DataPartitionTable ;
2930import org .apache .iotdb .commons .partition .DatabaseScopedDataPartitionTable ;
3334import org .apache .iotdb .confignode .client .sync .SyncDataNodeClientPool ;
3435import org .apache .iotdb .confignode .consensus .request .read .partition .GetDataPartitionPlan ;
3536import org .apache .iotdb .confignode .consensus .request .write .partition .CreateDataPartitionPlan ;
37+ import org .apache .iotdb .confignode .manager .load .LoadManager ;
3638import org .apache .iotdb .confignode .manager .node .NodeManager ;
3739import org .apache .iotdb .confignode .procedure .env .ConfigNodeProcedureEnv ;
3840import org .apache .iotdb .confignode .procedure .exception .ProcedureException ;
@@ -95,6 +97,7 @@ public class DataPartitionTableIntegrityCheckProcedure
9597 private static final long ROLL_BACK_NEXT_STATE_INTERVAL = 60000 ;
9698
9799 NodeManager dataNodeManager ;
100+ LoadManager loadManager ;
98101 private List <TDataNodeConfiguration > allDataNodes = new ArrayList <>();
99102
100103 // ============Need serialize BEGIN=============/
@@ -135,6 +138,7 @@ protected Flow executeFromState(
135138 try {
136139 // Ensure to get the real-time DataNodes in the current cluster at every step
137140 dataNodeManager = env .getConfigManager ().getNodeManager ();
141+ loadManager = env .getConfigManager ().getLoadManager ();
138142 allDataNodes = dataNodeManager .getRegisteredDataNodes ();
139143
140144 switch (state ) {
@@ -214,6 +218,10 @@ protected DataPartitionTableIntegrityCheckProcedureState getInitialState() {
214218 return DataPartitionTableIntegrityCheckProcedureState .COLLECT_EARLIEST_TIMESLOTS ;
215219 }
216220
221+ /**
222+ * Collect earliest timeslot information from all DataNodes. Each DataNode returns a Map<String,
223+ * Long> where key is database name and value is the earliest timeslot id.
224+ */
217225 /**
218226 * Collect earliest timeslot information from all DataNodes. Each DataNode returns a Map<String,
219227 * Long> where key is database name and value is the earliest timeslot id.
@@ -236,15 +244,31 @@ private Flow collectEarliestTimeslots() {
236244 // Collect earliest timeslots from all DataNodes
237245 allDataNodes .removeAll (skipDataNodes );
238246 for (TDataNodeConfiguration dataNode : allDataNodes ) {
247+ // Check if DataNode is alive before sending request
248+ NodeStatus nodeStatus = loadManager .getNodeStatus (dataNode .getLocation ().getDataNodeId ());
249+ if (!NodeStatus .Running .equals (nodeStatus )) {
250+ failedDataNodes .add (dataNode );
251+ continue ;
252+ }
253+
239254 try {
240- TGetEarliestTimeslotsResp resp =
241- (TGetEarliestTimeslotsResp )
242- SyncDataNodeClientPool .getInstance ()
243- .sendSyncRequestToDataNodeWithGivenRetry (
244- dataNode .getLocation ().getInternalEndPoint (),
245- null ,
246- CnToDnSyncRequestType .COLLECT_EARLIEST_TIMESLOTS ,
247- MAX_RETRY_COUNT );
255+ Object response =
256+ SyncDataNodeClientPool .getInstance ()
257+ .sendSyncRequestToDataNodeWithGivenRetry (
258+ dataNode .getLocation ().getInternalEndPoint (),
259+ null ,
260+ CnToDnSyncRequestType .COLLECT_EARLIEST_TIMESLOTS ,
261+ MAX_RETRY_COUNT );
262+
263+ if (response instanceof TSStatus ) {
264+ failedDataNodes .add (dataNode );
265+ LOG .error (
266+ "[DataPartitionIntegrity] Failed to collected earliest timeslots from the DataNode[id={}], already out of max retry time" ,
267+ dataNode .getLocation ().getDataNodeId ());
268+ continue ;
269+ }
270+
271+ TGetEarliestTimeslotsResp resp = (TGetEarliestTimeslotsResp ) response ;
248272 if (resp .getStatus ().getCode () != TSStatusCode .SUCCESS_STATUS .getStatusCode ()) {
249273 failedDataNodes .add (dataNode );
250274 LOG .error (
@@ -423,18 +447,34 @@ private Flow requestPartitionTables() {
423447 allDataNodes .removeAll (failedDataNodes );
424448 for (TDataNodeConfiguration dataNode : allDataNodes ) {
425449 int dataNodeId = dataNode .getLocation ().getDataNodeId ();
450+ // Check if DataNode is alive before sending request
451+ NodeStatus nodeStatus = loadManager .getNodeStatus (dataNodeId );
452+ if (!NodeStatus .Running .equals (nodeStatus )) {
453+ failedDataNodes .add (dataNode );
454+ continue ;
455+ }
456+
426457 if (!dataPartitionTables .containsKey (dataNodeId )) {
427458 try {
428459 TGenerateDataPartitionTableReq req = new TGenerateDataPartitionTableReq ();
429460 req .setDatabases (databasesWithLostDataPartition );
430- TGenerateDataPartitionTableResp resp =
431- (TGenerateDataPartitionTableResp )
432- SyncDataNodeClientPool .getInstance ()
433- .sendSyncRequestToDataNodeWithGivenRetry (
434- dataNode .getLocation ().getInternalEndPoint (),
435- req ,
436- CnToDnSyncRequestType .GENERATE_DATA_PARTITION_TABLE ,
437- MAX_RETRY_COUNT );
461+ Object response =
462+ SyncDataNodeClientPool .getInstance ()
463+ .sendSyncRequestToDataNodeWithGivenRetry (
464+ dataNode .getLocation ().getInternalEndPoint (),
465+ req ,
466+ CnToDnSyncRequestType .GENERATE_DATA_PARTITION_TABLE ,
467+ MAX_RETRY_COUNT );
468+
469+ if (response instanceof TSStatus ) {
470+ failedDataNodes .add (dataNode );
471+ LOG .error (
472+ "[DataPartitionIntegrity] Failed to request DataPartitionTable generation from the DataNode[id={}], already out of max retry time" ,
473+ dataNode .getLocation ().getDataNodeId ());
474+ continue ;
475+ }
476+
477+ TGenerateDataPartitionTableResp resp = (TGenerateDataPartitionTableResp ) response ;
438478 if (resp .getStatus ().getCode () != TSStatusCode .SUCCESS_STATUS .getStatusCode ()) {
439479 failedDataNodes .add (dataNode );
440480 LOG .error (
@@ -472,17 +512,33 @@ private Flow requestPartitionTablesHeartBeat() {
472512 int completeCount = 0 ;
473513 for (TDataNodeConfiguration dataNode : allDataNodes ) {
474514 int dataNodeId = dataNode .getLocation ().getDataNodeId ();
515+ // Check if DataNode is alive before sending request
516+ NodeStatus nodeStatus = loadManager .getNodeStatus (dataNodeId );
517+ if (!NodeStatus .Running .equals (nodeStatus )) {
518+ failedDataNodes .add (dataNode );
519+ continue ;
520+ }
475521
476522 if (!dataPartitionTables .containsKey (dataNodeId )) {
477523 try {
524+ Object response =
525+ SyncDataNodeClientPool .getInstance ()
526+ .sendSyncRequestToDataNodeWithGivenRetry (
527+ dataNode .getLocation ().getInternalEndPoint (),
528+ null ,
529+ CnToDnSyncRequestType .GENERATE_DATA_PARTITION_TABLE_HEART_BEAT ,
530+ MAX_RETRY_COUNT );
531+
532+ if (response instanceof TSStatus ) {
533+ failedDataNodes .add (dataNode );
534+ LOG .error (
535+ "[DataPartitionIntegrity] Failed to request DataPartitionTable generation heart beat from the DataNode[id={}], already out of max retry time" ,
536+ dataNode .getLocation ().getDataNodeId ());
537+ continue ;
538+ }
539+
478540 TGenerateDataPartitionTableHeartbeatResp resp =
479- (TGenerateDataPartitionTableHeartbeatResp )
480- SyncDataNodeClientPool .getInstance ()
481- .sendSyncRequestToDataNodeWithGivenRetry (
482- dataNode .getLocation ().getInternalEndPoint (),
483- null ,
484- CnToDnSyncRequestType .GENERATE_DATA_PARTITION_TABLE_HEART_BEAT ,
485- MAX_RETRY_COUNT );
541+ (TGenerateDataPartitionTableHeartbeatResp ) response ;
486542 DataPartitionTableGeneratorState state =
487543 DataPartitionTableGeneratorState .getStateByCode (resp .getErrorCode ());
488544
0 commit comments