Skip to content

Commit 064ba9e

Browse files
authored
[To dev/1.3] Clean up tmp dirs of udf and sort while starting up (#17377)
(cherry picked from commit 32f1010)
1 parent 58f94e2 commit 064ba9e

3 files changed

Lines changed: 28 additions & 8 deletions

File tree

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -682,6 +682,15 @@ private void sendRestartRequestToConfigNode() throws StartupException {
682682
}
683683
}
684684

685+
private void cleanupSortTmpDir() {
686+
String sortTmpDir = config.getSortTmpDir();
687+
File tmpDir = new File(sortTmpDir);
688+
if (tmpDir.exists()) {
689+
FileUtils.deleteFileOrDirectory(tmpDir, true);
690+
logger.info("Cleaned up stale sort temp directory: {}", sortTmpDir);
691+
}
692+
}
693+
685694
private void prepareResources() throws StartupException {
686695
prepareUDFResources();
687696
prepareTriggerResources();
@@ -736,6 +745,9 @@ private void setUp() throws StartupException {
736745
registerManager.register(new JMXService());
737746
JMXService.registerMBean(getInstance(), mbeanName);
738747

748+
// Clean up stale sort temp files left from previous runs
749+
cleanupSortTmpDir();
750+
739751
// Get resources for trigger,udf,pipe...
740752
prepareResources();
741753

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/TemporaryQueryDataFileService.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.io.File;
3434
import java.io.IOException;
3535
import java.util.ArrayList;
36+
import java.util.Collections;
3637
import java.util.List;
3738
import java.util.Map;
3839
import java.util.concurrent.ConcurrentHashMap;
@@ -59,10 +60,9 @@ private TemporaryQueryDataFileService() {
5960

6061
public String register(SerializationRecorder recorder) throws IOException {
6162
String queryId = recorder.getQueryId();
62-
if (!recorders.containsKey(queryId)) {
63-
recorders.put(queryId, new ArrayList<>());
64-
}
65-
recorders.get(queryId).add(recorder);
63+
recorders
64+
.computeIfAbsent(queryId, k -> Collections.synchronizedList(new ArrayList<>()))
65+
.add(recorder);
6666

6767
String dirName = getDirName(queryId);
6868
makeDirIfNecessary(dirName);
@@ -109,6 +109,11 @@ private String getFileName(String dir, long index) {
109109
@Override
110110
public void start() throws StartupException {
111111
try {
112+
// Clean up stale temp directories left from previous runs (e.g., after a crash)
113+
File tmpDir = SystemFileFactory.INSTANCE.getFile(TEMPORARY_FILE_DIR);
114+
if (tmpDir.exists()) {
115+
FileUtils.deleteDirectory(tmpDir);
116+
}
112117
makeDirIfNecessary(TEMPORARY_FILE_DIR);
113118
} catch (IOException e) {
114119
throw new StartupException(e);
@@ -117,8 +122,11 @@ public void start() throws StartupException {
117122

118123
@Override
119124
public void stop() {
120-
for (Object queryId : recorders.keySet().toArray()) {
121-
deregister((String) queryId);
125+
recorders.clear();
126+
try {
127+
FileUtils.deleteDirectory(SystemFileFactory.INSTANCE.getFile(TEMPORARY_FILE_DIR));
128+
} catch (IOException e) {
129+
logger.warn("Failed to delete temp dir {}.", TEMPORARY_FILE_DIR, e);
122130
}
123131
}
124132

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/executable/ExecutableManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ private synchronized long generateNextRequestId() throws IOException {
8181

8282
private void downloadExecutables(List<String> uris, long requestId)
8383
throws IOException, URISyntaxException {
84-
// TODO: para download
8584
try {
8685
for (String uriString : uris) {
8786
final URL url = new URI(uriString).toURL();
@@ -238,7 +237,8 @@ protected void saveToDir(ByteBuffer byteBuffer, String destination) throws IOExc
238237
}
239238
Files.createFile(path);
240239
}
241-
// FileOutPutStream is not in append mode by default, so the file will be overridden if it
240+
// FileOutPutStream is not in append mode by default, so the file will be
241+
// overridden if it
242242
// already exists.
243243
try (FileOutputStream outputStream = new FileOutputStream(destination)) {
244244
outputStream.getChannel().write(byteBuffer);

0 commit comments

Comments
 (0)