Skip to content

Commit 79e7003

Browse files
authored
[ISSUE #9945] Use UniqueKey as TimerDelKey by default
1 parent 5132258 commit 79e7003

File tree

6 files changed

+60
-10
lines changed

6 files changed

+60
-10
lines changed

broker/src/main/java/org/apache/rocketmq/broker/processor/RecallMessageProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,8 @@ public MessageExtBrokerInner buildMessage(ChannelHandlerContext ctx, RecallMessa
135135
msgInner.setTags(RECALL_MESSAGE_TAG);
136136
msgInner.setTagsCode(RECALL_MESSAGE_TAG.hashCode());
137137
msgInner.setQueueId(0);
138-
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_TIMER_DEL_UNIQKEY,
139-
TimerMessageStore.buildDeleteKey(handle.getTopic(), handle.getMessageId()));
138+
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_TIMER_DEL_UNIQKEY, TimerMessageStore.buildDeleteKey(
139+
handle.getTopic(), handle.getMessageId(), brokerController.getMessageStoreConfig().isAppendTopicForTimerDeleteKey()));
140140
MessageAccessor.putProperty(msgInner,
141141
MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, handle.getMessageId());
142142
MessageAccessor.putProperty(msgInner,

broker/src/test/java/org/apache/rocketmq/broker/processor/RecallMessageProcessorTest.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ public void init() throws IllegalAccessException, NoSuchFieldException {
9696
}
9797

9898
@Test
99-
public void testBuildMessage() {
99+
public void testBuildMessage_withNamespace() {
100+
when(messageStoreConfig.isAppendTopicForTimerDeleteKey()).thenReturn(true);
100101
String timestampStr = String.valueOf(System.currentTimeMillis());
101102
String id = "id";
102103
RecallMessageHandle.HandleV1 handle = new RecallMessageHandle.HandleV1(TOPIC, "brokerName", timestampStr, id);
@@ -110,6 +111,22 @@ public void testBuildMessage() {
110111
Assert.assertEquals(TOPIC + "+" + id, properties.get(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY));
111112
}
112113

114+
@Test
115+
public void testBuildMessage_withoutNamespace() {
116+
when(messageStoreConfig.isAppendTopicForTimerDeleteKey()).thenReturn(false);
117+
String timestampStr = String.valueOf(System.currentTimeMillis());
118+
String id = "id";
119+
RecallMessageHandle.HandleV1 handle = new RecallMessageHandle.HandleV1(TOPIC, "brokerName", timestampStr, id);
120+
MessageExtBrokerInner msg =
121+
recallMessageProcessor.buildMessage(handlerContext, new RecallMessageRequestHeader(), handle);
122+
123+
Assert.assertEquals(TOPIC, msg.getTopic());
124+
Map<String, String> properties = MessageDecoder.string2messageProperties(msg.getPropertiesString());
125+
Assert.assertEquals(timestampStr, properties.get(MessageConst.PROPERTY_TIMER_DELIVER_MS));
126+
Assert.assertEquals(id, properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
127+
Assert.assertEquals(id, properties.get(MessageConst.PROPERTY_TIMER_DEL_UNIQKEY));
128+
}
129+
113130
@Test
114131
public void testHandlePutMessageResult() {
115132
MessageExt message = new MessageExt();

store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,8 @@ public void setRocksdbCompressionType(String compressionType) {
533533

534534
private boolean enableLogConsumeQueueRepeatedlyBuildWhenRecover = false;
535535

536+
private boolean appendTopicForTimerDeleteKey = false;
537+
536538
public boolean isRocksdbCQDoubleWriteEnable() {
537539
return rocksdbCQDoubleWriteEnable;
538540
}
@@ -2236,4 +2238,12 @@ public int getSharedByteBufferNum() {
22362238
public void setSharedByteBufferNum(int sharedByteBufferNum) {
22372239
this.sharedByteBufferNum = sharedByteBufferNum;
22382240
}
2241+
2242+
public boolean isAppendTopicForTimerDeleteKey() {
2243+
return appendTopicForTimerDeleteKey;
2244+
}
2245+
2246+
public void setAppendTopicForTimerDeleteKey(boolean appendTopicForTimerDeleteKey) {
2247+
this.appendTopicForTimerDeleteKey = appendTopicForTimerDeleteKey;
2248+
}
22392249
}

store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1741,7 +1741,7 @@ public void run() {
17411741
isRound = false;
17421742
}
17431743
if (null != uniqueKey && tr.getDeleteList() != null && tr.getDeleteList().size() > 0
1744-
&& tr.getDeleteList().contains(buildDeleteKey(getRealTopic(msgExt), uniqueKey))) {
1744+
&& tr.getDeleteList().contains(buildDeleteKey(getRealTopic(msgExt), uniqueKey, storeConfig.isAppendTopicForTimerDeleteKey()))) {
17451745
avoidDeleteLose.remove(uniqueKey);
17461746
doRes = true;
17471747
tr.idempotentRelease();
@@ -2074,9 +2074,9 @@ public TimerCheckpoint getTimerCheckpoint() {
20742074
return timerCheckpoint;
20752075
}
20762076

2077-
// identify a message by topic + uk, like query operation
2078-
public static String buildDeleteKey(String realTopic, String uniqueKey) {
2079-
return realTopic + "+" + uniqueKey;
2077+
// identify a message by topic or topic + uk(like query operation)
2078+
public static String buildDeleteKey(String realTopic, String uniqueKey, Boolean appendTopicForTimerDeleteKey) {
2079+
return appendTopicForTimerDeleteKey ? (realTopic + "+" + uniqueKey) : uniqueKey;
20802080
}
20812081

20822082
private void recallToTimeline(long delayTime, long offsetPy, int sizePy, MessageExt messageExt) {

store/src/test/java/org/apache/rocketmq/store/timer/TimerMessageStoreTest.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ public void init() throws Exception {
110110
storeConfig.setFlushDiskType(FlushDiskType.ASYNC_FLUSH);
111111
storeConfig.setTimerInterceptDelayLevel(true);
112112
storeConfig.setTimerPrecisionMs(precisionMs);
113+
storeConfig.setAppendTopicForTimerDeleteKey(false); // reset default value
113114

114115
mockMessageStore = Mockito.mock(MessageStore.class);
115116
messageStore = new DefaultMessageStore(storeConfig, new BrokerStatsManager("TimerTest",false), new MyMessageArrivingListener(), new BrokerConfig(), new ConcurrentHashMap<>());
@@ -358,7 +359,7 @@ public void testDeleteTimerMessage() throws Exception {
358359

359360
MessageExtBrokerInner delMsg = buildMessage(delayMs, topic, false);
360361
transformTimerMessage(timerMessageStore,delMsg);
361-
MessageAccessor.putProperty(delMsg, TimerMessageStore.TIMER_DELETE_UNIQUE_KEY, TimerMessageStore.buildDeleteKey(topic, uniqKey));
362+
MessageAccessor.putProperty(delMsg, TimerMessageStore.TIMER_DELETE_UNIQUE_KEY, TimerMessageStore.buildDeleteKey(topic, uniqKey, false));
362363
delMsg.setPropertiesString(MessageDecoder.messageProperties2String(delMsg.getProperties()));
363364
assertEquals(PutMessageStatus.PUT_OK, messageStore.putMessage(delMsg).getPutMessageStatus());
364365

@@ -375,6 +376,7 @@ public void testDeleteTimerMessage() throws Exception {
375376

376377
@Test
377378
public void testDeleteTimerMessage_ukCollision() throws Exception {
379+
storeConfig.setAppendTopicForTimerDeleteKey(true); // append topic as namespace
378380
String topic = "TimerTest_testDeleteTimerMessage";
379381
String collisionTopic = "TimerTest_testDeleteTimerMessage_collision";
380382

@@ -397,13 +399,13 @@ public void testDeleteTimerMessage_ukCollision() throws Exception {
397399

398400
MessageExtBrokerInner delMsg = buildMessage(delayMs, "whatever", false);
399401
transformTimerMessage(timerMessageStore, delMsg);
400-
MessageAccessor.putProperty(delMsg, TimerMessageStore.TIMER_DELETE_UNIQUE_KEY, TimerMessageStore.buildDeleteKey(topic, firstUniqKey));
402+
MessageAccessor.putProperty(delMsg, TimerMessageStore.TIMER_DELETE_UNIQUE_KEY, TimerMessageStore.buildDeleteKey(topic, firstUniqKey, true));
401403
delMsg.setPropertiesString(MessageDecoder.messageProperties2String(delMsg.getProperties()));
402404
assertEquals(PutMessageStatus.PUT_OK, messageStore.putMessage(delMsg).getPutMessageStatus());
403405

404406
delMsg = buildMessage(delayMs, "whatever", false);
405407
transformTimerMessage(timerMessageStore, delMsg);
406-
MessageAccessor.putProperty(delMsg, TimerMessageStore.TIMER_DELETE_UNIQUE_KEY, TimerMessageStore.buildDeleteKey(collisionTopic, secondUniqKey));
408+
MessageAccessor.putProperty(delMsg, TimerMessageStore.TIMER_DELETE_UNIQUE_KEY, TimerMessageStore.buildDeleteKey(collisionTopic, secondUniqKey, true));
407409
delMsg.setPropertiesString(MessageDecoder.messageProperties2String(delMsg.getProperties()));
408410
assertEquals(PutMessageStatus.PUT_OK, messageStore.putMessage(delMsg).getPutMessageStatus());
409411

test/src/test/java/org/apache/rocketmq/test/recall/SendAndRecallDelayMessageIT.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,16 +42,34 @@
4242
import org.junit.AfterClass;
4343
import org.junit.Before;
4444
import org.junit.Test;
45+
import org.junit.runner.RunWith;
46+
import org.junit.runners.Parameterized;
4547

48+
@RunWith(Parameterized.class)
4649
public class SendAndRecallDelayMessageIT extends BaseConf {
4750

4851
private static String initTopic;
4952
private static String consumerGroup;
5053
private static RMQNormalProducer producer;
5154
private static RMQPopConsumer popConsumer;
5255

56+
private final boolean appendTopicForTimerDeleteKey;
57+
58+
public SendAndRecallDelayMessageIT(boolean appendTopicForTimerDeleteKey) {
59+
this.appendTopicForTimerDeleteKey = appendTopicForTimerDeleteKey;
60+
}
61+
62+
@Parameterized.Parameters
63+
public static List<Object[]> params() {
64+
List<Object[]> result = new ArrayList<>();
65+
result.add(new Object[] {false});
66+
result.add(new Object[] {true});
67+
return result;
68+
}
69+
5370
@Before
5471
public void init() {
72+
brokerController1.getMessageStoreConfig().setAppendTopicForTimerDeleteKey(appendTopicForTimerDeleteKey);
5573
initTopic = initTopic();
5674
consumerGroup = initConsumerGroup();
5775
producer = getProducer(NAMESRV_ADDR, initTopic);
@@ -126,6 +144,9 @@ public void testSendAndRecall() throws Exception {
126144

127145
@Test
128146
public void testSendAndRecall_ukCollision() throws Exception {
147+
if (!appendTopicForTimerDeleteKey) { // skip
148+
return;
149+
}
129150
int delaySecond = 5;
130151
String topic = MQRandomUtils.getRandomTopic();
131152
String collisionTopic = MQRandomUtils.getRandomTopic();

0 commit comments

Comments
 (0)