Skip to content

Commit b4ce8da

Browse files
authored
Fix can not use currentGeneratorFuture and currentGenerator to get current progress if DN is restarted before the data partition generation complete (#17491)
1 parent 3adba33 commit b4ce8da

8 files changed

Lines changed: 95 additions & 39 deletions

File tree

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,8 @@ private void buildActionMap() {
148148
(req, client) -> client.generateDataPartitionTable((TGenerateDataPartitionTableReq) req));
149149
actionMapBuilder.put(
150150
CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
151-
(req, client) -> client.generateDataPartitionTableHeartbeat());
151+
(req, client) ->
152+
client.generateDataPartitionTableHeartbeat((TGenerateDataPartitionTableReq) req));
152153
actionMap = actionMapBuilder.build();
153154
}
154155

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -443,15 +443,15 @@ protected void setLoadManager() {
443443
}
444444

445445
public void close() throws IOException {
446-
if (consensusManager.get() != null) {
447-
consensusManager.get().close();
448-
}
449446
if (partitionManager != null) {
450447
partitionManager.getRegionMaintainer().shutdown();
451448
}
452449
if (procedureManager != null) {
453450
procedureManager.stopExecutor();
454451
}
452+
if (consensusManager.get() != null) {
453+
consensusManager.get().close();
454+
}
455455
}
456456

457457
@Override

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@
6262
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
6363
import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
6464
import org.apache.iotdb.confignode.procedure.env.RemoveDataNodeHandler;
65+
import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
6566
import org.apache.iotdb.confignode.procedure.impl.cq.CreateCQProcedure;
6667
import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
6768
import org.apache.iotdb.confignode.procedure.impl.node.RemoveAINodeProcedure;
@@ -2329,6 +2330,25 @@ public Pair<Long, Boolean> checkDuplicateTableTask(
23292330
return new Pair<>(-1L, false);
23302331
}
23312332

2333+
public boolean isExistUnfinishedProcedure(
2334+
Class<? extends StateMachineProcedure<?, ?>> procedureClass) {
2335+
if (procedureClass == null) {
2336+
return false;
2337+
}
2338+
2339+
for (Procedure<ConfigNodeProcedureEnv> procedure : getExecutor().getProcedures().values()) {
2340+
if (!procedure.isFinished() && procedureClass.isInstance(procedure)) {
2341+
LOGGER.info(
2342+
"[{}] procedure details are {}",
2343+
procedureClass.getSimpleName(),
2344+
procedure.toStringDetails());
2345+
return true;
2346+
}
2347+
}
2348+
2349+
return false;
2350+
}
2351+
23322352
// ======================================================
23332353
/*
23342354
GET-SET Region

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -388,6 +388,25 @@ public TConfigNodeLocation getLeaderLocation() {
388388
return null;
389389
}
390390

391+
public TConfigNodeLocation getNotNullLeaderLocation() {
392+
Peer leaderPeer = getLeaderPeer();
393+
394+
while (leaderPeer == null) {
395+
try {
396+
Thread.sleep(1000);
397+
} catch (InterruptedException ignored) {
398+
399+
}
400+
leaderPeer = getLeaderPeer();
401+
}
402+
403+
Peer finalLeaderPeer = leaderPeer;
404+
return getNodeManager().getRegisteredConfigNodes().stream()
405+
.filter(leader -> leader.getConfigNodeId() == finalLeaderPeer.getNodeId())
406+
.findFirst()
407+
.orElse(null);
408+
}
409+
391410
/**
392411
* @return true if ConfigNode-leader is elected, false otherwise.
393412
*/

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,11 +521,13 @@ private Flow requestPartitionTablesHeartBeat() {
521521

522522
if (!dataPartitionTables.containsKey(dataNodeId)) {
523523
try {
524+
TGenerateDataPartitionTableReq req = new TGenerateDataPartitionTableReq();
525+
req.setDatabases(databasesWithLostDataPartition);
524526
Object response =
525527
SyncDataNodeClientPool.getInstance()
526528
.sendSyncRequestToDataNodeWithGivenRetry(
527529
dataNode.getLocation().getInternalEndPoint(),
528-
null,
530+
req,
529531
CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
530532
MAX_RETRY_COUNT);
531533

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java

Lines changed: 38 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
5656
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
5757
import org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigNodeMetrics;
58+
import org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure;
5859
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
5960
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
6061
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
@@ -195,6 +196,9 @@ public void active() {
195196
configManager.initConsensusManager();
196197
upgrade();
197198
TConfigNodeLocation leaderNodeLocation = waitForLeaderElected();
199+
if (leaderNodeLocation == null) {
200+
leaderNodeLocation = configManager.getConsensusManager().getNotNullLeaderLocation();
201+
}
198202
setUpMetricService();
199203
// Notice: We always set up Seed-ConfigNode's RPC service lastly to ensure
200204
// that the external service is not provided until ConfigNode is fully available
@@ -225,36 +229,42 @@ public void active() {
225229

226230
/* After the ConfigNode leader election, a leader switch may occur, which could cause the procedure not to be created. This can happen if the original leader has not yet executed the procedure creation, while the other followers have already finished starting up. Therefore, having the original leader (before the leader switch) initiate the process ensures that only one procedure will be created. */
227231
if (leaderNodeLocation.getConfigNodeId() == configNodeId) {
228-
dataPartitionTableCheckFuture =
229-
dataPartitionTableCheckExecutor.submit(
230-
() -> {
231-
LOGGER.info(
232-
"[DataPartitionIntegrity] Prepare to start dataPartitionTableIntegrityCheck after all datanodes started up");
233-
Thread.sleep(CONF.getPartitionTableRecoverWaitAllDnUpTimeoutInMs());
234-
235-
while (true) {
236-
List<Integer> dnList =
237-
configManager
238-
.getLoadManager()
239-
.filterDataNodeThroughStatus(NodeStatus.Running);
240-
if (dnList != null && !dnList.isEmpty()) {
241-
LOGGER.info("Starting dataPartitionTableIntegrityCheck...");
242-
TSStatus status =
243-
configManager.getProcedureManager().dataPartitionTableIntegrityCheck();
244-
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
245-
LOGGER.error(
246-
"Data partition table integrity check failed! Current status code is {}, status message is {}",
247-
status.getCode(),
248-
status.getMessage());
232+
if (!configManager
233+
.getProcedureManager()
234+
.isExistUnfinishedProcedure(DataPartitionTableIntegrityCheckProcedure.class)) {
235+
dataPartitionTableCheckFuture =
236+
dataPartitionTableCheckExecutor.submit(
237+
() -> {
238+
LOGGER.info(
239+
"[DataPartitionIntegrity] Prepare to start dataPartitionTableIntegrityCheck after all datanodes started up");
240+
Thread.sleep(CONF.getPartitionTableRecoverWaitAllDnUpTimeoutInMs());
241+
242+
while (true) {
243+
List<Integer> dnList =
244+
configManager
245+
.getLoadManager()
246+
.filterDataNodeThroughStatus(NodeStatus.Running);
247+
if (dnList != null && !dnList.isEmpty()) {
248+
LOGGER.info("Starting dataPartitionTableIntegrityCheck...");
249+
TSStatus status =
250+
configManager
251+
.getProcedureManager()
252+
.dataPartitionTableIntegrityCheck();
253+
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
254+
LOGGER.error(
255+
"Data partition table integrity check failed! Current status code is {}, status message is {}",
256+
status.getCode(),
257+
status.getMessage());
258+
}
259+
break;
260+
} else {
261+
LOGGER.info("No running datanodes found, waiting...");
262+
Thread.sleep(5000);
249263
}
250-
break;
251-
} else {
252-
LOGGER.info("No running datanodes found, waiting...");
253-
Thread.sleep(5000);
254264
}
255-
}
256-
return null;
257-
});
265+
return null;
266+
});
267+
}
258268
}
259269
return;
260270
} else {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3228,7 +3228,8 @@ public TGenerateDataPartitionTableResp generateDataPartitionTable(
32283228
}
32293229

32303230
@Override
3231-
public TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartbeat() {
3231+
public TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartbeat(
3232+
TGenerateDataPartitionTableReq req) {
32323233
TGenerateDataPartitionTableHeartbeatResp resp = new TGenerateDataPartitionTableHeartbeatResp();
32333234
// Must be lower than the RPC request timeout, in milliseconds
32343235
final long timeoutMs = 50000;
@@ -3238,10 +3239,13 @@ public TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartb
32383239
// To resolve this situation that the DataNode is registered and didn't request
32393240
// generateDataPartitionTable interface yet.
32403241
if (currentGeneratorFuture == null || currentGenerator == null) {
3241-
resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode());
3242-
resp.setMessage("No DataPartitionTable generation task found");
3243-
resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
3244-
return resp;
3242+
generateDataPartitionTable(req);
3243+
if (currentGeneratorFuture == null || currentGenerator == null) {
3244+
resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode());
3245+
resp.setMessage("No DataPartitionTable generation task found");
3246+
resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
3247+
return resp;
3248+
}
32453249
}
32463250

32473251
currentGeneratorFuture.get(timeoutMs, TimeUnit.MILLISECONDS);

iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1330,7 +1330,7 @@ service IDataNodeRPCService {
13301330
/**
13311331
* Check the status of DataPartitionTable generation task
13321332
*/
1333-
TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartbeat()
1333+
TGenerateDataPartitionTableHeartbeatResp generateDataPartitionTableHeartbeat(TGenerateDataPartitionTableReq req)
13341334

13351335
/**
13361336
* END: Data Partition Table Integrity Check

0 commit comments

Comments
 (0)