Skip to content

Commit fd32dae

Browse files
authored
[ISSUE #6633] Not clear uninitialized files and fix metadata recover (#7342)
1 parent 1a8e7cb commit fd32dae

2 files changed

Lines changed: 22 additions & 41 deletions

File tree

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFile.java

Lines changed: 18 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
*/
1717
package org.apache.rocketmq.tieredstore.file;
1818

19-
import com.alibaba.fastjson.JSON;
2019
import com.google.common.annotations.VisibleForTesting;
2120
import java.nio.ByteBuffer;
2221
import java.util.ArrayList;
@@ -25,13 +24,13 @@
2524
import java.util.HashSet;
2625
import java.util.LinkedList;
2726
import java.util.List;
28-
import java.util.Objects;
2927
import java.util.Set;
3028
import java.util.concurrent.CompletableFuture;
3129
import java.util.concurrent.CopyOnWriteArrayList;
3230
import java.util.concurrent.locks.ReentrantReadWriteLock;
3331
import java.util.stream.Collectors;
3432
import javax.annotation.Nullable;
33+
import org.apache.rocketmq.common.BoundaryType;
3534
import org.apache.rocketmq.logging.org.slf4j.Logger;
3635
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
3736
import org.apache.rocketmq.tieredstore.common.AppendResult;
@@ -43,7 +42,6 @@
4342
import org.apache.rocketmq.tieredstore.provider.FileSegmentAllocator;
4443
import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
4544
import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
46-
import org.apache.rocketmq.common.BoundaryType;
4745

4846
public class TieredFlatFile {
4947

@@ -177,7 +175,10 @@ protected void recoverMetadata() {
177175
}
178176
}
179177

180-
private FileSegmentMetadata getOrCreateFileSegmentMetadata(TieredFileSegment fileSegment) {
178+
/**
179+
* FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended && Not Full
180+
*/
181+
public void updateFileSegment(TieredFileSegment fileSegment) {
181182

182183
FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
183184
this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset());
@@ -186,45 +187,24 @@ private FileSegmentMetadata getOrCreateFileSegmentMetadata(TieredFileSegment fil
186187
if (metadata == null) {
187188
metadata = new FileSegmentMetadata(
188189
this.filePath, fileSegment.getBaseOffset(), fileSegment.getFileType().getType());
189-
metadata.setCreateTimestamp(fileSegment.getMinTimestamp());
190-
metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
191-
metadata.setEndTimestamp(fileSegment.getMaxTimestamp());
192-
if (fileSegment.isClosed()) {
193-
metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
194-
}
195-
this.tieredMetadataStore.updateFileSegment(metadata);
190+
metadata.setCreateTimestamp(System.currentTimeMillis());
196191
}
197-
return metadata;
198-
}
199-
200-
/**
201-
* FileQueue Status: Sealed | Sealed | Sealed | Not sealed, Allow appended && Not Full
202-
*/
203-
public void updateFileSegment(TieredFileSegment fileSegment) {
204-
FileSegmentMetadata segmentMetadata = getOrCreateFileSegmentMetadata(fileSegment);
205192

206-
if (segmentMetadata.getStatus() == FileSegmentMetadata.STATUS_NEW
207-
&& fileSegment.isFull()
208-
&& !fileSegment.needCommit()) {
193+
metadata.setSize(fileSegment.getCommitPosition());
194+
metadata.setBeginTimestamp(fileSegment.getMinTimestamp());
195+
metadata.setEndTimestamp(fileSegment.getMaxTimestamp());
209196

210-
segmentMetadata.markSealed();
197+
if (fileSegment.isFull() && !fileSegment.needCommit()) {
198+
if (metadata.getStatus() == FileSegmentMetadata.STATUS_NEW) {
199+
metadata.markSealed();
200+
}
211201
}
212202

213203
if (fileSegment.isClosed()) {
214-
segmentMetadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
204+
metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
215205
}
216206

217-
segmentMetadata.setSize(fileSegment.getCommitPosition());
218-
segmentMetadata.setEndTimestamp(fileSegment.getMaxTimestamp());
219-
220-
FileSegmentMetadata metadata = tieredMetadataStore.getFileSegment(
221-
this.filePath, fileSegment.getFileType(), fileSegment.getBaseOffset());
222-
223-
if (!Objects.equals(metadata, segmentMetadata)) {
224-
this.tieredMetadataStore.updateFileSegment(segmentMetadata);
225-
logger.debug("TieredFlatFile#UpdateSegmentMetadata, filePath: {}, content: {}",
226-
segmentMetadata.getPath(), JSON.toJSONString(segmentMetadata));
227-
}
207+
this.tieredMetadataStore.updateFileSegment(metadata);
228208
}
229209

230210
private void checkAndFixFileSize() {
@@ -598,6 +578,9 @@ public void destroy() {
598578
logger.error("TieredFlatFile#destroy: mark file segment: {} is deleted failed", fileSegment.getPath(), e);
599579
}
600580
fileSegment.destroyFile();
581+
if (!fileSegment.exists()) {
582+
tieredMetadataStore.deleteFileSegment(filePath, fileType, fileSegment.getBaseOffset());
583+
}
601584
}
602585
fileSegmentList.clear();
603586
needCommitFileSegmentList.clear();

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/file/TieredFlatFileManager.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -136,15 +136,13 @@ public void doCleanExpiredFile() {
136136
TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
137137
for (CompositeQueueFlatFile flatFile : deepCopyFlatFileToList()) {
138138
TieredStoreExecutor.cleanExpiredFileExecutor.submit(() -> {
139-
flatFile.getCompositeFlatFileLock().lock();
140139
try {
140+
flatFile.getCompositeFlatFileLock().lock();
141141
flatFile.cleanExpiredFile(expiredTimeStamp);
142142
flatFile.destroyExpiredFile();
143-
if (flatFile.getConsumeQueueBaseOffset() == -1) {
144-
logger.info("Clean flatFile because file not initialized, topic={}, queueId={}",
145-
flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId());
146-
destroyCompositeFile(flatFile.getMessageQueue());
147-
}
143+
} catch (Throwable t) {
144+
logger.error("Do Clean expired file error, topic={}, queueId={}",
145+
flatFile.getMessageQueue().getTopic(), flatFile.getMessageQueue().getQueueId(), t);
148146
} finally {
149147
flatFile.getCompositeFlatFileLock().unlock();
150148
}

0 commit comments

Comments
 (0)