Skip to content

Commit 4a8e0d5

Browse files
guyinyouguyinyou
andauthored
[ISSUE #7351] Allow mqadmin to operate slave nodes
Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
1 parent fd32dae commit 4a8e0d5

2 files changed

Lines changed: 0 additions & 118 deletions

File tree

broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -406,9 +406,6 @@ public boolean rejectRequest() {
406406
private synchronized RemotingCommand updateAndCreateTopic(ChannelHandlerContext ctx,
407407
RemotingCommand request) throws RemotingCommandException {
408408
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
409-
if (validateSlave(response)) {
410-
return response;
411-
}
412409
final CreateTopicRequestHeader requestHeader =
413410
(CreateTopicRequestHeader) request.decodeCommandCustomHeader(CreateTopicRequestHeader.class);
414411

@@ -519,9 +516,6 @@ private synchronized RemotingCommand updateAndCreateStaticTopic(ChannelHandlerCo
519516
private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
520517
RemotingCommand request) throws RemotingCommandException {
521518
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
522-
if (validateSlave(response)) {
523-
return response;
524-
}
525519
DeleteTopicRequestHeader requestHeader =
526520
(DeleteTopicRequestHeader) request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class);
527521

@@ -1413,9 +1407,6 @@ public void onException(Throwable e) {
14131407
private RemotingCommand updateAndCreateSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request)
14141408
throws RemotingCommandException {
14151409
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
1416-
if (validateSlave(response)) {
1417-
return response;
1418-
}
14191410

14201411
LOGGER.info("AdminBrokerProcessor#updateAndCreateSubscriptionGroup called by {}",
14211412
RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
@@ -1480,9 +1471,6 @@ private RemotingCommand getAllSubscriptionGroup(ChannelHandlerContext ctx,
14801471
private RemotingCommand deleteSubscriptionGroup(ChannelHandlerContext ctx,
14811472
RemotingCommand request) throws RemotingCommandException {
14821473
final RemotingCommand response = RemotingCommand.createResponseCommand(null);
1483-
if (validateSlave(response)) {
1484-
return response;
1485-
}
14861474
DeleteSubscriptionGroupRequestHeader requestHeader =
14871475
(DeleteSubscriptionGroupRequestHeader) request.decodeCommandCustomHeader(DeleteSubscriptionGroupRequestHeader.class);
14881476

broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java

Lines changed: 0 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,6 @@
7676
import org.apache.rocketmq.store.DefaultMessageStore;
7777
import org.apache.rocketmq.store.MessageStore;
7878
import org.apache.rocketmq.store.SelectMappedBufferResult;
79-
import org.apache.rocketmq.store.config.BrokerRole;
8079
import org.apache.rocketmq.store.config.MessageStoreConfig;
8180
import org.apache.rocketmq.store.logfile.DefaultMappedFile;
8281
import org.apache.rocketmq.store.stats.BrokerStats;
@@ -250,32 +249,6 @@ public void testUpdateAndCreateTopic() throws Exception {
250249
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
251250
}
252251

253-
@Test
254-
public void testUpdateAndCreateTopicOnSlaveInRocksdb() throws Exception {
255-
if (notToBeExecuted()) {
256-
return;
257-
}
258-
initRocksdbTopicManager();
259-
testUpdateAndCreateTopicOnSlave();
260-
}
261-
262-
@Test
263-
public void testUpdateAndCreateTopicOnSlave() throws Exception {
264-
// setup
265-
MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class);
266-
when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE);
267-
defaultMessageStore = mock(DefaultMessageStore.class);
268-
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
269-
270-
// test on slave
271-
String topic = "TEST_CREATE_TOPIC";
272-
RemotingCommand request = buildCreateTopicRequest(topic);
273-
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
274-
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
275-
assertThat(response.getRemark()).isEqualTo("Can't modify topic or subscription group from slave broker, " +
276-
"please execute it from master broker.");
277-
}
278-
279252
@Test
280253
public void testDeleteTopicInRocksdb() throws Exception {
281254
if (notToBeExecuted()) {
@@ -301,31 +274,6 @@ public void testDeleteTopic() throws Exception {
301274
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
302275
}
303276

304-
@Test
305-
public void testDeleteTopicOnSlaveInRocksdb() throws Exception {
306-
if (notToBeExecuted()) {
307-
return;
308-
}
309-
initRocksdbTopicManager();
310-
testDeleteTopicOnSlave();
311-
}
312-
313-
@Test
314-
public void testDeleteTopicOnSlave() throws Exception {
315-
// setup
316-
MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class);
317-
when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE);
318-
defaultMessageStore = mock(DefaultMessageStore.class);
319-
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
320-
321-
String topic = "TEST_DELETE_TOPIC";
322-
RemotingCommand request = buildDeleteTopicRequest(topic);
323-
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
324-
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
325-
assertThat(response.getRemark()).isEqualTo("Can't modify topic or subscription group from slave broker, " +
326-
"please execute it from master broker.");
327-
}
328-
329277
@Test
330278
public void testDeleteWithPopRetryTopic() throws Exception {
331279
String topic = "topicA";
@@ -538,36 +486,6 @@ public void testUpdateAndCreateSubscriptionGroup() throws RemotingCommandExcepti
538486
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
539487
}
540488

541-
@Test
542-
public void testUpdateAndCreateSubscriptionGroupOnSlaveInRocksdb() throws Exception {
543-
initRocksdbSubscriptionManager();
544-
testUpdateAndCreateSubscriptionGroupOnSlave();
545-
}
546-
547-
@Test
548-
public void testUpdateAndCreateSubscriptionGroupOnSlave() throws RemotingCommandException {
549-
// Setup
550-
MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class);
551-
when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE);
552-
defaultMessageStore = mock(DefaultMessageStore.class);
553-
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
554-
555-
// Test
556-
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
557-
SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
558-
subscriptionGroupConfig.setBrokerId(1);
559-
subscriptionGroupConfig.setGroupName("groupId");
560-
subscriptionGroupConfig.setConsumeEnable(Boolean.TRUE);
561-
subscriptionGroupConfig.setConsumeBroadcastEnable(Boolean.TRUE);
562-
subscriptionGroupConfig.setRetryMaxTimes(111);
563-
subscriptionGroupConfig.setConsumeFromMinEnable(Boolean.TRUE);
564-
request.setBody(JSON.toJSON(subscriptionGroupConfig).toString().getBytes());
565-
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
566-
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
567-
assertThat(response.getRemark()).isEqualTo("Can't modify topic or subscription group from slave broker, " +
568-
"please execute it from master broker.");
569-
}
570-
571489
@Test
572490
public void testGetAllSubscriptionGroupInRocksdb() throws Exception {
573491
initRocksdbSubscriptionManager();
@@ -596,30 +514,6 @@ public void testDeleteSubscriptionGroup() throws RemotingCommandException {
596514
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
597515
}
598516

599-
@Test
600-
public void testDeleteSubscriptionGroupOnSlaveInRocksdb() throws Exception {
601-
initRocksdbSubscriptionManager();
602-
testDeleteSubscriptionGroupOnSlave();
603-
}
604-
605-
@Test
606-
public void testDeleteSubscriptionGroupOnSlave() throws RemotingCommandException {
607-
// Setup
608-
MessageStoreConfig messageStoreConfig = mock(MessageStoreConfig.class);
609-
when(messageStoreConfig.getBrokerRole()).thenReturn(BrokerRole.SLAVE);
610-
defaultMessageStore = mock(DefaultMessageStore.class);
611-
when(brokerController.getMessageStoreConfig()).thenReturn(messageStoreConfig);
612-
613-
// Test
614-
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, null);
615-
request.addExtField("groupName", "GID-Group-Name");
616-
request.addExtField("removeOffset", "true");
617-
RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
618-
assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
619-
assertThat(response.getRemark()).isEqualTo("Can't modify topic or subscription group from slave broker, " +
620-
"please execute it from master broker.");
621-
}
622-
623517
@Test
624518
public void testGetTopicStatsInfo() throws RemotingCommandException {
625519
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO, null);

0 commit comments

Comments
 (0)