Skip to content

Commit 3b5cbf8

Browse files
authored
[ISSUE #8698] Remove batch write in kv cq store and update rocksdb cq check tool (#8739)
1 parent 59bafe8 commit 3b5cbf8

File tree

6 files changed

+39
-28
lines changed

6 files changed

+39
-28
lines changed

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@
215215
import org.apache.rocketmq.store.RocksDBMessageStore;
216216
import org.apache.rocketmq.store.SelectMappedBufferResult;
217217
import org.apache.rocketmq.store.config.BrokerRole;
218+
import org.apache.rocketmq.store.plugin.AbstractPluginMessageStore;
218219
import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
219220
import org.apache.rocketmq.store.queue.CqUnit;
220221
import org.apache.rocketmq.store.queue.ReferredIterator;
@@ -470,16 +471,21 @@ private RemotingCommand checkRocksdbCqWriteProgress(ChannelHandlerContext ctx, R
470471
String requestTopic = requestHeader.getTopic();
471472
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
472473
response.setCode(ResponseCode.SUCCESS);
473-
474-
DefaultMessageStore messageStore = (DefaultMessageStore) brokerController.getMessageStore();
475-
RocksDBMessageStore rocksDBMessageStore = messageStore.getRocksDBMessageStore();
476-
if (!messageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
474+
MessageStore messageStore = brokerController.getMessageStore();
475+
DefaultMessageStore defaultMessageStore;
476+
if (messageStore instanceof AbstractPluginMessageStore) {
477+
defaultMessageStore = (DefaultMessageStore) ((AbstractPluginMessageStore) messageStore).getNext();
478+
} else {
479+
defaultMessageStore = (DefaultMessageStore) messageStore;
480+
}
481+
RocksDBMessageStore rocksDBMessageStore = defaultMessageStore.getRocksDBMessageStore();
482+
if (!defaultMessageStore.getMessageStoreConfig().isRocksdbCQDoubleWriteEnable()) {
477483
response.setBody(JSON.toJSONBytes(ImmutableMap.of("diffResult", "rocksdbCQWriteEnable is false, checkRocksdbCqWriteProgressCommand is invalid")));
478484
return response;
479485
}
480486

481-
ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> cqTable = messageStore.getConsumeQueueTable();
482-
StringBuilder diffResult = new StringBuilder("check success, all is ok!\n");
487+
ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueueInterface>> cqTable = defaultMessageStore.getConsumeQueueTable();
488+
StringBuilder diffResult = new StringBuilder();
483489
try {
484490
if (StringUtils.isNotBlank(requestTopic)) {
485491
processConsumeQueuesForTopic(cqTable.get(requestTopic), requestTopic, rocksDBMessageStore, diffResult,false);
@@ -516,15 +522,15 @@ private void processConsumeQueuesForTopic(ConcurrentMap<Integer, ConsumeQueueInt
516522
Pair<CqUnit, Long> fileCqUnit = jsonCq.getCqUnitAndStoreTime(i);
517523
Pair<CqUnit, Long> kvCqUnit = kvCq.getCqUnitAndStoreTime(i);
518524
if (fileCqUnit == null || kvCqUnit == null) {
519-
diffResult.append(String.format("[topic: %s, queue: %s, offset: %s] \n kv : %s \n file: %s \n",
525+
diffResult.append(String.format("[topic: %s, queue: %s, offset: %s] \n kv : %s \n file : %s \n",
520526
topic, queueId, i, kvCqUnit != null ? kvCqUnit.getObject1() : "null", fileCqUnit != null ? fileCqUnit.getObject1() : "null"));
521527
return;
522528
}
523529
if (!checkCqUnitEqual(kvCqUnit.getObject1(), fileCqUnit.getObject1())) {
524-
String diffInfo = String.format("[topic:%s, queue: %s offset: %s] \n file: %s \n kv: %s",
530+
String diffInfo = String.format("[topic:%s, queue: %s offset: %s] \n file : %s \n kv : %s \n",
525531
topic, queueId, i, kvCqUnit.getObject1(), fileCqUnit.getObject1());
526532
LOGGER.error(diffInfo);
527-
diffResult.append(diffInfo).append("\n");
533+
diffResult.append(diffInfo).append(System.lineSeparator());
528534
return;
529535
}
530536
}

store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -428,15 +428,15 @@ public class MessageStoreConfig {
428428

429429
private boolean rocksdbCQDoubleWriteEnable = false;
430430

431-
private boolean enableBatchWriteKvCq = true;
431+
private int batchWriteKvCqSize = 16;
432432

433433

434-
public boolean isEnableBatchWriteKvCq() {
435-
return enableBatchWriteKvCq;
434+
public int getBatchWriteKvCqSize() {
435+
return batchWriteKvCqSize;
436436
}
437437

438-
public void setEnableBatchWriteKvCq(boolean enableBatchWriteKvCq) {
439-
this.enableBatchWriteKvCq = enableBatchWriteKvCq;
438+
public void setBatchWriteKvCqSize(int batchWriteKvCqSize) {
439+
this.batchWriteKvCqSize = batchWriteKvCqSize;
440440
}
441441

442442
public boolean isRocksdbCQDoubleWriteEnable() {

store/src/main/java/org/apache/rocketmq/store/plugin/AbstractPluginMessageStore.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -661,4 +661,8 @@ public void recoverTopicQueueTable() {
661661
public void notifyMessageArriveIfNecessary(DispatchRequest dispatchRequest) {
662662
next.notifyMessageArriveIfNecessary(dispatchRequest);
663663
}
664+
665+
public MessageStore getNext() {
666+
return next;
667+
}
664668
}

store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore {
5555
public static final byte CTRL_1 = '\u0001';
5656
public static final byte CTRL_2 = '\u0002';
5757

58-
private static final int BATCH_SIZE = 16;
58+
private final int batchSize;
5959
public static final int MAX_KEY_LEN = 300;
6060

6161
private final ScheduledExecutorService scheduledExecutorService;
@@ -77,8 +77,6 @@ public class RocksDBConsumeQueueStore extends AbstractConsumeQueueStore {
7777
private final Map<ByteBuffer, Pair<ByteBuffer, DispatchRequest>> tempTopicQueueMaxOffsetMap;
7878
private volatile boolean isCQError = false;
7979

80-
private boolean enableBatchWriteKvCq;
81-
8280
public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) {
8381
super(messageStore);
8482

@@ -88,11 +86,11 @@ public RocksDBConsumeQueueStore(DefaultMessageStore messageStore) {
8886
this.rocksDBConsumeQueueOffsetTable = new RocksDBConsumeQueueOffsetTable(rocksDBConsumeQueueTable, rocksDBStorage, messageStore);
8987

9088
this.writeBatch = new WriteBatch();
91-
this.enableBatchWriteKvCq = messageStoreConfig.isEnableBatchWriteKvCq();
92-
this.bufferDRList = new ArrayList(BATCH_SIZE);
93-
this.cqBBPairList = new ArrayList(BATCH_SIZE);
94-
this.offsetBBPairList = new ArrayList(BATCH_SIZE);
95-
for (int i = 0; i < BATCH_SIZE; i++) {
89+
this.batchSize = messageStoreConfig.getBatchWriteKvCqSize();
90+
this.bufferDRList = new ArrayList(batchSize);
91+
this.cqBBPairList = new ArrayList(batchSize);
92+
this.offsetBBPairList = new ArrayList(batchSize);
93+
for (int i = 0; i < batchSize; i++) {
9694
this.cqBBPairList.add(RocksDBConsumeQueueTable.getCQByteBufferPair());
9795
this.offsetBBPairList.add(RocksDBConsumeQueueOffsetTable.getOffsetByteBufferPair());
9896
}
@@ -166,12 +164,13 @@ private boolean shutdownInner() {
166164

167165
@Override
168166
public void putMessagePositionInfoWrapper(DispatchRequest request) throws RocksDBException {
167+
if (request == null || this.bufferDRList.size() >= batchSize) {
168+
putMessagePosition();
169+
}
170+
169171
if (request != null) {
170172
this.bufferDRList.add(request);
171173
}
172-
if (request == null || !enableBatchWriteKvCq || this.bufferDRList.size() >= BATCH_SIZE) {
173-
putMessagePosition();
174-
}
175174
}
176175

177176
public void putMessagePosition() throws RocksDBException {

tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
import org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand;
105105
import org.apache.rocketmq.tools.command.offset.SkipAccumulationSubCommand;
106106
import org.apache.rocketmq.tools.command.producer.ProducerSubCommand;
107+
import org.apache.rocketmq.tools.command.queue.CheckRocksdbCqWriteProgressCommand;
107108
import org.apache.rocketmq.tools.command.queue.QueryConsumeQueueCommand;
108109
import org.apache.rocketmq.tools.command.stats.StatsAllSubCommand;
109110
import org.apache.rocketmq.tools.command.topic.AllocateMQSubCommand;
@@ -304,6 +305,7 @@ public static void initCommand() {
304305
initCommand(new ListAclSubCommand());
305306
initCommand(new CopyAclsSubCommand());
306307
initCommand(new RocksDBConfigToJsonCommand());
308+
initCommand(new CheckRocksdbCqWriteProgressCommand());
307309
}
308310

309311
private static void printHelp() {

tools/src/main/java/org/apache/rocketmq/tools/command/queue/CheckRocksdbCqWriteProgressCommand.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class CheckRocksdbCqWriteProgressCommand implements SubCommand {
3434

3535
@Override
3636
public String commandName() {
37-
return "checkRocksdbCqWriteProgressCommandCommand";
37+
return "checkRocksdbCqWriteProgress";
3838
}
3939

4040
@Override
@@ -82,9 +82,9 @@ public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) {
8282
String brokerAddr = brokerData.getBrokerAddrs().get(0L);
8383
CheckRocksdbCqWriteProgressResponseBody body = defaultMQAdminExt.checkRocksdbCqWriteProgress(brokerAddr, topic);
8484
if (StringUtils.isNotBlank(topic)) {
85-
System.out.printf(body.getDiffResult());
85+
System.out.print(body.getDiffResult());
8686
} else {
87-
System.out.printf(brokerName + " | " + brokerAddr + " | " + body.getDiffResult());
87+
System.out.print(brokerName + " | " + brokerAddr + " | \n" + body.getDiffResult());
8888
}
8989
}
9090

0 commit comments

Comments
 (0)