Skip to content

Commit bdae09d

Browse files
imzsRongtongJin
andauthored
[RIP-80] #9928 Implementation of Priority Message (#9929)
* #9928 Implementation of Priority Message * switch to fastjson2 Change-Id: I3620b00b79a77a93a7cf0fdfac857fb495638ca6 * fix bazel CI, upgrade rocketmq-proto to 2.1.1 Change-Id: Ia66cc7b14b89dc319044ced5ba349315e487c849 * Fix bazel CI * fix CI, temporarily disable popKv in OffsetResetForPopIT Change-Id: I0b406cf0ece5067de430c4404aaa1f2dd46edcba --------- Co-authored-by: RongtongJin <user@example.com>
1 parent 9f9cab8 commit bdae09d

26 files changed

Lines changed: 621 additions & 53 deletions

File tree

WORKSPACE

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ maven_install(
7171
"org.bouncycastle:bcpkix-jdk15on:1.69",
7272
"com.google.code.gson:gson:2.8.9",
7373
"com.googlecode.concurrentlinkedhashmap:concurrentlinkedhashmap-lru:1.4.2",
74-
"org.apache.rocketmq:rocketmq-proto:2.0.4",
74+
"org.apache.rocketmq:rocketmq-proto:2.1.1",
7575
"com.google.protobuf:protobuf-java:3.20.1",
7676
"com.google.protobuf:protobuf-java-util:3.20.1",
7777
"com.conversantmedia:disruptor:1.2.10",

broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerService.java

Lines changed: 76 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@
2525
import org.apache.rocketmq.common.BrokerConfig;
2626
import org.apache.rocketmq.common.KeyBuilder;
2727
import org.apache.rocketmq.common.MixAll;
28+
import org.apache.rocketmq.common.PopAckConstants;
2829
import org.apache.rocketmq.common.ServiceThread;
2930
import org.apache.rocketmq.common.TopicConfig;
3031
import org.apache.rocketmq.common.TopicFilterType;
32+
import org.apache.rocketmq.common.attribute.TopicMessageType;
3133
import org.apache.rocketmq.common.constant.ConsumeInitMode;
3234
import org.apache.rocketmq.common.constant.LoggerName;
3335
import org.apache.rocketmq.common.constant.PermName;
@@ -38,6 +40,7 @@
3840
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
3941
import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
4042
import org.apache.rocketmq.remoting.protocol.header.ExtraInfoUtil;
43+
import org.apache.rocketmq.remoting.protocol.subscription.SubscriptionGroupConfig;
4144
import org.apache.rocketmq.store.AppendMessageStatus;
4245
import org.apache.rocketmq.store.GetMessageResult;
4346
import org.apache.rocketmq.store.GetMessageStatus;
@@ -324,6 +327,23 @@ protected CompletableFuture<PopConsumerContext> getMessageAsync(CompletableFutur
324327
});
325328
}
326329

330+
protected CompletableFuture<PopConsumerContext> getMessageFromTopicAsync(CompletableFuture<PopConsumerContext> future,
331+
String clientHost, String groupId, String topicId, long requestCount, int batchSize, MessageFilter filter,
332+
PopConsumerRecord.RetryType retryType) {
333+
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topicId);
334+
if (null == topicConfig) {
335+
return future;
336+
}
337+
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
338+
long index = (brokerController.getBrokerConfig().isPriorityOrderAsc() ?
339+
topicConfig.getReadQueueNums() - 1 - i : i) + requestCount;
340+
int current = (int) index % topicConfig.getReadQueueNums();
341+
future = this.getMessageAsync(future, clientHost, groupId,
342+
topicId, current, batchSize, filter, retryType);
343+
}
344+
return future;
345+
}
346+
327347
public CompletableFuture<PopConsumerContext> popAsync(String clientHost, long popTime, long invisibleTime,
328348
String groupId, String topicId, int queueId, int batchSize, boolean fifo, String attemptId, int initMode,
329349
MessageFilter filter) {
@@ -336,6 +356,12 @@ public CompletableFuture<PopConsumerContext> popAsync(String clientHost, long po
336356
return CompletableFuture.completedFuture(popConsumerContext);
337357
}
338358

359+
SubscriptionGroupConfig subscriptionGroupConfig =
360+
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(groupId);
361+
if (null == subscriptionGroupConfig || !subscriptionGroupConfig.isConsumeEnable()) {
362+
return CompletableFuture.completedFuture(popConsumerContext);
363+
}
364+
339365
log.debug("PopConsumerService popAsync, groupId={}, topicId={}, queueId={}, " +
340366
"batchSize={}, invisibleTime={}, fifo={}, attemptId={}, filter={}",
341367
groupId, topicId, queueId, batchSize, invisibleTime, fifo, attemptId, filter);
@@ -345,43 +371,46 @@ public CompletableFuture<PopConsumerContext> popAsync(String clientHost, long po
345371
String retryTopicV2 = KeyBuilder.buildPopRetryTopicV2(topicId, groupId);
346372
long requestCount = Objects.requireNonNull(ConcurrentHashMapUtils.computeIfAbsent(
347373
requestCountTable, requestKey, k -> new AtomicLong(0L))).getAndIncrement();
348-
boolean preferRetry = requestCount % 5L == 0L;
374+
boolean usePriorityMode = TopicMessageType.PRIORITY.equals(topicConfig.getTopicMessageType())
375+
&& !fifo && requestCount % 100L < subscriptionGroupConfig.getPriorityFactor();
376+
int probability = usePriorityMode ?
377+
brokerConfig.getPopFromRetryProbabilityForPriority() : brokerConfig.getPopFromRetryProbability();
378+
probability = Math.max(0, Math.min(100, probability)); // [51, 100] means always
379+
boolean preferRetry = probability > 0 && requestCount % (100 / probability) == 0L;
380+
requestCount = usePriorityMode ? 0 : requestCount; // use requestCount as randomQ
349381

350382
CompletableFuture<PopConsumerContext> getMessageFuture =
351383
CompletableFuture.completedFuture(popConsumerContext);
352384

353385
try {
354386
if (!fifo && preferRetry) {
355387
if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
356-
getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId,
357-
retryTopicV1, 0, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V1);
388+
getMessageFuture = this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId,
389+
retryTopicV1, requestCount, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V1);
358390
}
359391

360392
if (brokerConfig.isEnableRetryTopicV2()) {
361-
getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId,
362-
retryTopicV2, 0, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V2);
393+
getMessageFuture = this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId,
394+
retryTopicV2, requestCount, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V2);
363395
}
364396
}
365397

366398
if (queueId != -1) {
367399
getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId,
368400
topicId, queueId, batchSize, filter, PopConsumerRecord.RetryType.NORMAL_TOPIC);
369401
} else {
370-
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
371-
int current = (int) ((requestCount + i) % topicConfig.getReadQueueNums());
372-
getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId,
373-
topicId, current, batchSize, filter, PopConsumerRecord.RetryType.NORMAL_TOPIC);
374-
}
402+
getMessageFuture = this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId,
403+
topicId, requestCount, batchSize, filter, PopConsumerRecord.RetryType.NORMAL_TOPIC);
375404

376405
if (!fifo && !preferRetry) {
377406
if (brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
378-
getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId,
379-
retryTopicV1, 0, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V1);
407+
getMessageFuture = this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId,
408+
retryTopicV1, requestCount, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V1);
380409
}
381410

382411
if (brokerConfig.isEnableRetryTopicV2()) {
383-
getMessageFuture = this.getMessageAsync(getMessageFuture, clientHost, groupId,
384-
retryTopicV2, 0, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V2);
412+
getMessageFuture = this.getMessageFromTopicAsync(getMessageFuture, clientHost, groupId,
413+
retryTopicV2, requestCount, batchSize, filter, PopConsumerRecord.RetryType.RETRY_TOPIC_V2);
385414
}
386415
}
387416
}
@@ -568,21 +597,33 @@ public long revive(AtomicLong currentTime, int maxCount) {
568597
return consumerRecords.size();
569598
}
570599

571-
public void createRetryTopicIfNeeded(String groupId, String topicId) {
572-
TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topicId);
573-
if (topicConfig != null) {
600+
public void createRetryTopicIfNeeded(String groupId, String retryTopic) {
601+
TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(retryTopic);
602+
if (topicConfig != null && !brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
574603
return;
575604
}
576605

577-
topicConfig = new TopicConfig(topicId, 1, 1,
606+
int retryQueueNum = PopAckConstants.retryQueueNum;
607+
if (brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
608+
String normalTopic = KeyBuilder.parseNormalTopic(retryTopic, groupId);
609+
TopicConfig normalConfig = brokerController.getTopicConfigManager().selectTopicConfig(normalTopic); // always exists
610+
retryQueueNum = normalConfig.getWriteQueueNums();
611+
if (topicConfig != null && topicConfig.getWriteQueueNums() == normalConfig.getWriteQueueNums()) {
612+
return;
613+
}
614+
}
615+
616+
topicConfig = new TopicConfig(retryTopic, retryQueueNum, retryQueueNum,
578617
PermName.PERM_READ | PermName.PERM_WRITE, 0);
579618
topicConfig.setTopicFilterType(TopicFilterType.SINGLE_TAG);
580619
brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
581620

582-
long offset = this.brokerController.getConsumerOffsetManager().queryOffset(groupId, topicId, 0);
583-
if (offset < 0) {
584-
this.brokerController.getConsumerOffsetManager().commitOffset(
585-
"InitPopOffset", groupId, topicId, 0, 0);
621+
for (int i = 0; i < retryQueueNum; i++) {
622+
long offset = this.brokerController.getConsumerOffsetManager().queryOffset(groupId, retryTopic, i);
623+
if (offset < 0) {
624+
this.brokerController.getConsumerOffsetManager().commitOffset(
625+
"InitPopOffset", groupId, retryTopic, i, 0);
626+
}
586627
}
587628
}
588629

@@ -605,7 +646,7 @@ public boolean reviveRetry(PopConsumerRecord record, MessageExt messageExt) {
605646
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
606647
msgInner.setTopic(retryTopic);
607648
msgInner.setBody(messageExt.getBody() != null ? messageExt.getBody() : new byte[] {});
608-
msgInner.setQueueId(0);
649+
msgInner.setQueueId(getRetryQueueId(retryTopic, messageExt));
609650
if (messageExt.getTags() != null) {
610651
msgInner.setTags(messageExt.getTags());
611652
} else {
@@ -647,6 +688,18 @@ public boolean reviveRetry(PopConsumerRecord record, MessageExt messageExt) {
647688
return true;
648689
}
649690

691+
private int getRetryQueueId(String retryTopic, MessageExt oriMsg) {
692+
if (!brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
693+
return 0;
694+
}
695+
int oriQueueId = oriMsg.getQueueId(); // original qid of normal or retry topic
696+
if (oriQueueId > brokerController.getTopicConfigManager().selectTopicConfig(retryTopic).getWriteQueueNums() - 1) {
697+
log.warn("not expected, {}, {}, {}", retryTopic, oriQueueId, oriMsg.getMsgId());
698+
return 0; // fallback
699+
}
700+
return oriQueueId;
701+
}
702+
650703
// Export kv store record to revive topic
651704
@SuppressWarnings("ExtractMethodRecommender")
652705
public synchronized void transferToFsStore() {

broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.apache.rocketmq.common.PopAckConstants;
4040
import org.apache.rocketmq.common.ServiceThread;
4141
import org.apache.rocketmq.common.TopicConfig;
42+
import org.apache.rocketmq.common.attribute.TopicMessageType;
4243
import org.apache.rocketmq.common.constant.ConsumeInitMode;
4344
import org.apache.rocketmq.common.constant.LoggerName;
4445
import org.apache.rocketmq.common.constant.PermName;
@@ -512,11 +513,15 @@ public RemotingCommand processRequest(final ChannelHandlerContext ctx, RemotingC
512513
// considered the same type because they share the same retry flag in previous fields.
513514
// Therefore, needRetryV1 is designed as a subset of needRetry, and within a single request,
514515
// only one type of retry topic is able to call popMsgFromQueue.
515-
boolean needRetry = randomQ < brokerConfig.getPopFromRetryProbability();
516+
boolean usePriorityMode = TopicMessageType.PRIORITY.equals(topicConfig.getTopicMessageType())
517+
&& !requestHeader.isOrder() && randomQ < subscriptionGroupConfig.getPriorityFactor();
518+
boolean needRetry = randomQ < (usePriorityMode ?
519+
brokerConfig.getPopFromRetryProbabilityForPriority() : brokerConfig.getPopFromRetryProbability());
516520
boolean needRetryV1 = false;
517521
if (brokerConfig.isEnableRetryTopicV2() && brokerConfig.isRetrieveMessageFromPopRetryTopicV1()) {
518522
needRetryV1 = randomQ % 2 == 0;
519523
}
524+
randomQ = usePriorityMode ? 0 : randomQ; // reset randomQ
520525
long popTime = System.currentTimeMillis();
521526
CompletableFuture<Long> getMessageFuture = CompletableFuture.completedFuture(0L);
522527
if (needRetry && !requestHeader.isOrder()) {
@@ -653,7 +658,9 @@ private CompletableFuture<Long> popMsgFromTopic(TopicConfig topicConfig, boolean
653658
StringBuilder msgOffsetInfo, StringBuilder orderCountInfo, int randomQ, CompletableFuture<Long> getMessageFuture) {
654659
if (topicConfig != null) {
655660
for (int i = 0; i < topicConfig.getReadQueueNums(); i++) {
656-
int queueId = (randomQ + i) % topicConfig.getReadQueueNums();
661+
int index = (brokerController.getBrokerConfig().isPriorityOrderAsc() ?
662+
topicConfig.getReadQueueNums() - 1 - i : i) + randomQ;
663+
int queueId = index % topicConfig.getReadQueueNums();
657664
getMessageFuture = getMessageFuture.thenCompose(restNum ->
658665
popMsgFromQueue(topicConfig.getTopicName(), requestHeader.getAttemptId(), isRetry,
659666
getMessageResult, requestHeader, queueId, restNum, reviveQid, channel, popTime, messageFilter,

broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,6 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt)
112112
msgInner.setTopic(popCheckPoint.getTopic());
113113
}
114114
msgInner.setBody(messageExt.getBody());
115-
msgInner.setQueueId(0);
116115
if (messageExt.getTags() != null) {
117116
msgInner.setTags(messageExt.getTags());
118117
} else {
@@ -131,6 +130,7 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt)
131130
msgInner.getProperties().put(MessageConst.PROPERTY_ORIGIN_GROUP, popCheckPoint.getCId());
132131
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
133132
addRetryTopicIfNotExist(msgInner.getTopic(), popCheckPoint.getCId());
133+
msgInner.setQueueId(getRetryQueueId(msgInner.getTopic(), messageExt));
134134
PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
135135
brokerController.getBrokerMetricsManager().getPopMetricsManager().incPopReviveRetryMessageCount(popCheckPoint, putMessageResult.getPutMessageStatus());
136136
if (brokerController.getBrokerConfig().isEnablePopLog()) {
@@ -150,30 +150,55 @@ private boolean reviveRetry(PopCheckPoint popCheckPoint, MessageExt messageExt)
150150
return true;
151151
}
152152

153-
private void initPopRetryOffset(String topic, String consumerGroup) {
154-
long offset = this.brokerController.getConsumerOffsetManager().queryOffset(consumerGroup, topic, 0);
155-
if (offset < 0) {
156-
this.brokerController.getConsumerOffsetManager().commitOffset("initPopRetryOffset", consumerGroup, topic,
157-
0, 0);
153+
private void initPopRetryOffset(String retryTopic, String consumerGroup, int retryQueueNum) {
154+
for (int i = 0; i < retryQueueNum; i++) {
155+
long offset = this.brokerController.getConsumerOffsetManager().queryOffset(consumerGroup, retryTopic, i);
156+
if (offset < 0) {
157+
this.brokerController.getConsumerOffsetManager().commitOffset("initPopRetryOffset", consumerGroup, retryTopic, i, 0);
158+
}
158159
}
159160
}
160161

161-
public void addRetryTopicIfNotExist(String topic, String consumerGroup) {
162+
public void addRetryTopicIfNotExist(String retryTopic, String consumerGroup) {
162163
if (brokerController != null) {
163-
TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topic);
164-
if (topicConfig != null) {
164+
TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(retryTopic);
165+
if (topicConfig != null && !brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
165166
return;
166167
}
167-
topicConfig = new TopicConfig(topic);
168-
topicConfig.setReadQueueNums(PopAckConstants.retryQueueNum);
169-
topicConfig.setWriteQueueNums(PopAckConstants.retryQueueNum);
168+
169+
int retryQueueNum = PopAckConstants.retryQueueNum;
170+
if (brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
171+
String normalTopic = KeyBuilder.parseNormalTopic(retryTopic, consumerGroup);
172+
TopicConfig normalConfig = brokerController.getTopicConfigManager().selectTopicConfig(normalTopic); // always exists
173+
retryQueueNum = normalConfig.getWriteQueueNums();
174+
if (topicConfig != null && topicConfig.getWriteQueueNums() == normalConfig.getWriteQueueNums()) {
175+
return;
176+
}
177+
}
178+
179+
// create new one, or update in case of queue expansion
180+
topicConfig = new TopicConfig(retryTopic);
181+
topicConfig.setReadQueueNums(retryQueueNum);
182+
topicConfig.setWriteQueueNums(retryQueueNum);
170183
topicConfig.setTopicFilterType(TopicFilterType.SINGLE_TAG);
171184
topicConfig.setPerm(6);
172185
topicConfig.setTopicSysFlag(0);
173186
brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
174187

175-
initPopRetryOffset(topic, consumerGroup);
188+
initPopRetryOffset(retryTopic, consumerGroup, retryQueueNum);
189+
}
190+
}
191+
192+
private int getRetryQueueId(String retryTopic, MessageExt messageExt) {
193+
if (!brokerController.getBrokerConfig().isUseSeparateRetryQueue()) {
194+
return 0;
195+
}
196+
int oriQueueId = messageExt.getQueueId(); // original qid of normal or retry topic
197+
if (oriQueueId > brokerController.getTopicConfigManager().selectTopicConfig(retryTopic).getWriteQueueNums() - 1) {
198+
POP_LOGGER.warn("not expected, {}, {}, {}", retryTopic, oriQueueId, messageExt.getMsgId());
199+
return 0; // fallback
176200
}
201+
return oriQueueId;
177202
}
178203

179204
protected List<MessageExt> getReviveMessage(long offset, int queueId) {

broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,16 @@ public RemotingCommand sendMessage(final ChannelHandlerContext ctx,
281281
}
282282

283283
MessageAccessor.setProperties(msgInner, oriProps);
284+
// check properties to ensure exclusive, don't check topic meta config to keep the behavior consistent
285+
int msgPriority = msgInner.getPriority();
286+
if (msgPriority >= 0) {
287+
if (TopicMessageType.PRIORITY.equals(TopicMessageType.parseFromMessageProperty(msgInner.getProperties()))) {
288+
queueIdInt = Math.min(msgPriority, topicConfig.getWriteQueueNums() - 1);
289+
msgInner.setQueueId(queueIdInt);
290+
} else {
291+
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_PRIORITY);
292+
}
293+
}
284294

285295
CleanupPolicy cleanupPolicy = CleanupPolicyUtils.getDeletePolicy(Optional.of(topicConfig));
286296
if (Objects.equals(cleanupPolicy, CleanupPolicy.COMPACTION)) {

0 commit comments

Comments
 (0)