Skip to content

Commit 4559929

Browse files
authored
Fix: send set configuration only to target nodes and harden compaction schedule interruption handling (#17469)
1 parent 8ebe276 commit 4559929

7 files changed

Lines changed: 83 additions & 10 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBSetConfigurationTableIT.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,9 @@ public void testSetConfiguration() {
109109
statement.execute(
110110
"set configuration inner_compaction_candidate_file_num='1',max_cross_compaction_candidate_file_num='1' on "
111111
+ dnId);
112+
if (dnId == 5) {
113+
statement.execute("set configuration compaction_schedule_thread_num='2' on 5");
114+
}
112115
}
113116
} catch (Exception e) {
114117
Assert.fail(e.getMessage());
@@ -131,6 +134,17 @@ public void testSetConfiguration() {
131134
"enable_cross_space_compaction=false",
132135
"inner_compaction_candidate_file_num=1",
133136
"max_cross_compaction_candidate_file_num=1"));
137+
boolean scheduleThreadNumChanged =
138+
checkConfigFileContains(
139+
dnId,
140+
EnvFactory.getEnv().getDataNodeWrapperList().get(i),
141+
"compaction_schedule_thread_num=2");
142+
if (scheduleThreadNumChanged && dnId != 5) {
143+
Assert.fail();
144+
}
145+
if (!scheduleThreadNumChanged && dnId == 5) {
146+
Assert.fail();
147+
}
134148
}
135149
}
136150

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1052,7 +1052,7 @@ public List<TSStatus> setConfiguration(TSetConfigurationReq req) {
10521052
if (!targetDataNodes.isEmpty()) {
10531053
DataNodeAsyncRequestContext<Object, TSStatus> clientHandler =
10541054
new DataNodeAsyncRequestContext<>(
1055-
CnToDnAsyncRequestType.SET_CONFIGURATION, req, dataNodeLocationMap);
1055+
CnToDnAsyncRequestType.SET_CONFIGURATION, req, targetDataNodes);
10561056
CnToDnInternalServiceAsyncRequestManager.getInstance()
10571057
.sendAsyncRequestWithRetry(clientHandler);
10581058
responseList.addAll(clientHandler.getResponseList());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3705,7 +3705,11 @@ public void executeTTLCheckForObjectFiles() throws InterruptedException {
37053705
if (!regionObjectDir.isDirectory()) {
37063706
continue;
37073707
}
3708-
CompactionUtils.executeTTLCheckObjectFilesForTableModel(regionObjectDir, databaseName);
3708+
try {
3709+
CompactionUtils.executeTTLCheckObjectFilesForTableModel(regionObjectDir, databaseName);
3710+
} catch (Exception e) {
3711+
logger.error("Failed to execute object ttl check", e);
3712+
}
37093713
}
37103714
CompactionMetrics.getInstance()
37113715
.updateTTLCheckForObjectFileCost(System.currentTimeMillis() - startTime);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/CompactionUtils.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,11 @@
6969
import javax.annotation.Nullable;
7070

7171
import java.io.File;
72+
import java.io.FileNotFoundException;
7273
import java.io.IOException;
7374
import java.nio.charset.StandardCharsets;
7475
import java.nio.file.Files;
76+
import java.nio.file.NoSuchFileException;
7577
import java.nio.file.attribute.BasicFileAttributes;
7678
import java.util.ArrayList;
7779
import java.util.Collection;
@@ -595,7 +597,10 @@ private static void recursiveTTLCheckForTableDir(
595597
checkTTLAndDeleteExpiredObjectFile(currentFile, basicFileAttributes, lowerBoundInMS);
596598
return;
597599
}
598-
} catch (IOException ignored) {
600+
} catch (FileNotFoundException | NoSuchFileException ignored) {
601+
// may be deleted by other thread
602+
} catch (IOException e) {
603+
logger.warn("Failed to read file attributes: {}", currentFile, e);
599604
}
600605
}
601606
File[] children = currentFile.listFiles();
@@ -606,8 +611,16 @@ private static void recursiveTTLCheckForTableDir(
606611
// block-aligned and reflects allocated directory entry blocks.
607612
acquireCompactionReadRate(currentFile.length());
608613
for (File child : children) {
609-
recursiveTTLCheckForTableDir(
610-
child, depth + 1, maxObjectFileDepth, canDistinguishDirectoryByFileName, lowerBoundInMS);
614+
try {
615+
recursiveTTLCheckForTableDir(
616+
child,
617+
depth + 1,
618+
maxObjectFileDepth,
619+
canDistinguishDirectoryByFileName,
620+
lowerBoundInMS);
621+
} catch (Exception e) {
622+
logger.warn("Failed to check table dir: {}", child, e);
623+
}
611624
}
612625
}
613626

@@ -637,7 +650,10 @@ private static void checkTTLAndDeleteExpiredObjectFile(
637650
FileMetrics.getInstance().decreaseObjectFileNum(1);
638651
FileMetrics.getInstance().decreaseObjectFileSize(attributes.size());
639652
logger.info("Remove object file {}, size is {}(byte)", file.getPath(), attributes.size());
640-
} catch (Exception ignored) {
653+
} catch (FileNotFoundException | NoSuchFileException ignored) {
654+
// may be deleted by other thread
655+
} catch (Exception e) {
656+
logger.warn("Failed to delete expired object file: {}", file, e);
641657
}
642658
}
643659

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskManager.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public class CompactionScheduleTaskManager implements IService {
6565
ConcurrentHashMap.newKeySet();
6666
private ReentrantLock lock = new ReentrantLock();
6767
private volatile boolean init = false;
68+
private volatile boolean isStoppingAllScheduleTask = false;
6869

6970
@Override
7071
public void start() throws StartupException {
@@ -76,8 +77,13 @@ public void start() throws StartupException {
7677
logger.info("Compaction schedule task manager started.");
7778
}
7879

80+
public boolean isStoppingAllScheduleTask() {
81+
return isStoppingAllScheduleTask;
82+
}
83+
7984
public void stopCompactionScheduleTasks() throws InterruptedException {
8085
lock.lock();
86+
isStoppingAllScheduleTask = true;
8187
try {
8288
for (Future<Void> task : submitCompactionScheduleTaskFutures) {
8389
task.cancel(true);
@@ -121,6 +127,7 @@ public void checkAndMayApplyConfigurationChange() throws InterruptedException {
121127

122128
public void startScheduleTasks() {
123129
lock.lock();
130+
isStoppingAllScheduleTask = false;
124131
try {
125132
// compaction selector
126133
for (int workerId = 0; workerId < compactionSelectorNum; workerId++) {
@@ -144,6 +151,7 @@ public void startScheduleTasks() {
144151
@Override
145152
public void stop() {
146153
lock.lock();
154+
isStoppingAllScheduleTask = true;
147155
try {
148156
if (!init) {
149157
return;
@@ -160,6 +168,7 @@ public void stop() {
160168
@Override
161169
public void waitAndStop(long milliseconds) {
162170
lock.lock();
171+
isStoppingAllScheduleTask = true;
163172
try {
164173
if (!init) {
165174
return;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/CompactionScheduleTaskWorker.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,26 @@ public Void call() {
7272
dataRegion.executeCompaction();
7373
}
7474
} catch (InterruptedException ignored) {
75+
boolean isStoppedByUser =
76+
CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask();
7577
logger.info(
76-
"[CompactionScheduleTaskWorker-{}] compaction schedule is interrupted", workerId);
77-
return null;
78+
"[CompactionScheduleTaskWorker-{}] compaction schedule is interrupted, isStopByUser: {}",
79+
workerId,
80+
isStoppedByUser);
81+
if (isStoppedByUser) {
82+
return null;
83+
}
84+
} catch (Exception e) {
85+
logger.error(
86+
"[CompactionScheduleTaskWorker-{}] Failed to execute compaction schedule task",
87+
workerId,
88+
e);
89+
} catch (Throwable t) {
90+
logger.error(
91+
"[CompactionScheduleTaskWorker-{}] Failed to execute compaction schedule task and cannot recover",
92+
workerId,
93+
t);
94+
throw t;
7895
}
7996
}
8097
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/schedule/TTLScheduleTask.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,21 @@ public Void call() throws Exception {
7878
}
7979
}
8080
} catch (InterruptedException ignored) {
81-
logger.info("[TTLCheckTask-{}] TTL checker is interrupted", workerId);
82-
return null;
81+
boolean isStoppedByUser =
82+
CompactionScheduleTaskManager.getInstance().isStoppingAllScheduleTask();
83+
logger.info(
84+
"[TTLCheckTask-{}] TTL checker is interrupted, isStoppedByUser: {}",
85+
workerId,
86+
isStoppedByUser);
87+
if (isStoppedByUser) {
88+
return null;
89+
}
90+
} catch (Exception e) {
91+
logger.error("[TTLCheckTask-{}] Failed to execute ttl check", workerId, e);
92+
} catch (Throwable t) {
93+
logger.error(
94+
"[TTLCheckTask-{}] Failed to execute ttl check and cannot recover", workerId, t);
95+
throw t;
8396
}
8497
}
8598
}

0 commit comments

Comments
 (0)