Skip to content

Commit 831fcc7

Browse files
authored
[ISSUE #7363] Fix get message from tiered storage return incorrect next pull offset (#7365)
1 parent 4a8e0d5 commit 831fcc7

3 files changed

Lines changed: 20 additions & 16 deletions

File tree

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageFetcher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ public CompletableFuture<GetMessageResult> getMessageFromCacheAsync(CompositeQue
319319
}
320320

321321
// if cache is miss, immediately pull messages
322-
LOGGER.warn("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " +
322+
LOGGER.info("TieredMessageFetcher#getMessageFromCacheAsync: cache miss: " +
323323
"topic: {}, queue: {}, queue offset: {}, max message num: {}",
324324
mq.getTopic(), mq.getQueueId(), queueOffset, maxCount);
325325

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredMessageStore.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,11 @@ public GetMessageResult getMessage(String group, String topic, int queueId, long
147147
public CompletableFuture<GetMessageResult> getMessageAsync(String group, String topic,
148148
int queueId, long offset, int maxMsgNums, MessageFilter messageFilter) {
149149

150+
// For system topic, force reading from local store
151+
if (TieredStoreUtil.isSystemTopic(topic) || PopAckConstants.isStartWithRevivePrefix(topic)) {
152+
return next.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter);
153+
}
154+
150155
if (fetchFromCurrentStore(topic, queueId, offset, maxMsgNums)) {
151156
logger.trace("GetMessageAsync from current store, topic: {}, queue: {}, offset: {}", topic, queueId, offset);
152157
} else {
@@ -158,6 +163,7 @@ public CompletableFuture<GetMessageResult> getMessageAsync(String group, String
158163
return fetcher
159164
.getMessageAsync(group, topic, queueId, offset, maxMsgNums, messageFilter)
160165
.thenApply(result -> {
166+
161167
Attributes latencyAttributes = TieredStoreMetricsManager.newAttributesBuilder()
162168
.put(TieredStoreMetricsConstant.LABEL_OPERATION, TieredStoreMetricsConstant.OPERATION_API_GET_MESSAGE)
163169
.put(TieredStoreMetricsConstant.LABEL_TOPIC, topic)
@@ -166,8 +172,7 @@ public CompletableFuture<GetMessageResult> getMessageAsync(String group, String
166172
TieredStoreMetricsManager.apiLatency.record(stopwatch.elapsed(TimeUnit.MILLISECONDS), latencyAttributes);
167173

168174
if (result.getStatus() == GetMessageStatus.OFFSET_FOUND_NULL ||
169-
result.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_ONE ||
170-
result.getStatus() == GetMessageStatus.OFFSET_OVERFLOW_BADLY) {
175+
result.getStatus() == GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) {
171176

172177
if (next.checkInStoreByConsumeOffset(topic, queueId, offset)) {
173178
TieredStoreMetricsManager.fallbackTotal.add(1, latencyAttributes);
@@ -178,14 +183,8 @@ public CompletableFuture<GetMessageResult> getMessageAsync(String group, String
178183
}
179184
}
180185

181-
// Fetch system topic data from the broker when using the force level.
182-
if (result.getStatus() == GetMessageStatus.NO_MATCHED_LOGIC_QUEUE) {
183-
if (TieredStoreUtil.isSystemTopic(topic) || PopAckConstants.isStartWithRevivePrefix(topic)) {
184-
return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter);
185-
}
186-
}
187-
188186
if (result.getStatus() != GetMessageStatus.FOUND &&
187+
result.getStatus() != GetMessageStatus.NO_MATCHED_LOGIC_QUEUE &&
189188
result.getStatus() != GetMessageStatus.OFFSET_OVERFLOW_ONE &&
190189
result.getStatus() != GetMessageStatus.OFFSET_OVERFLOW_BADLY) {
191190
logger.warn("GetMessageAsync not found and message is not in next store, result: {}, " +
@@ -206,10 +205,14 @@ public CompletableFuture<GetMessageResult> getMessageAsync(String group, String
206205
if (minOffsetInQueue >= 0 && minOffsetInQueue < result.getMinOffset()) {
207206
result.setMinOffset(minOffsetInQueue);
208207
}
209-
long maxOffsetInQueue = next.getMaxOffsetInQueue(topic, queueId);
210-
if (maxOffsetInQueue >= 0 && maxOffsetInQueue > result.getMaxOffset()) {
211-
result.setMaxOffset(maxOffsetInQueue);
212-
}
208+
209+
// In general, the local cq offset is slightly greater than the commit offset in read message,
210+
// so there is no need to update the maximum offset to the local cq offset here,
211+
// otherwise it will cause repeated consumption after next begin offset over commit offset.
212+
213+
logger.trace("GetMessageAsync result, group: {}, topic: {}, queueId: {}, offset: {}, count:{}, {}",
214+
group, topic, queueId, offset, maxMsgNums, result);
215+
213216
return result;
214217
}).exceptionally(e -> {
215218
logger.error("GetMessageAsync from tiered store failed", e);

tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageStoreTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public void testGetMessageAsync() {
168168
GetMessageResult result1 = new GetMessageResult();
169169
result1.setStatus(GetMessageStatus.FOUND);
170170
GetMessageResult result2 = new GetMessageResult();
171-
result2.setStatus(GetMessageStatus.MESSAGE_WAS_REMOVING);
171+
result2.setStatus(GetMessageStatus.OFFSET_OVERFLOW_BADLY);
172172

173173
when(fetcher.getMessageAsync(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())).thenReturn(CompletableFuture.completedFuture(result1));
174174
when(nextStore.getMessage(anyString(), anyString(), anyInt(), anyLong(), anyInt(), any())).thenReturn(result2);
@@ -188,7 +188,8 @@ public void testGetMessageAsync() {
188188
properties.setProperty("tieredStorageLevel", "3");
189189
configuration.update(properties);
190190
when(nextStore.checkInStoreByConsumeOffset(anyString(), anyInt(), anyLong())).thenReturn(true);
191-
Assert.assertSame(result2, store.getMessage("group", mq.getTopic(), mq.getQueueId(), 0, 0, null));
191+
Assert.assertEquals(result2.getStatus(),
192+
store.getMessage("group", mq.getTopic(), mq.getQueueId(), 0, 0, null).getStatus());
192193
}
193194

194195
@Test

0 commit comments

Comments
 (0)