1717
1818package org .apache .rocketmq .tools .monitor ;
1919
20+ import org .apache .rocketmq .logging .org .slf4j .Logger ;
21+ import org .apache .rocketmq .logging .org .slf4j .LoggerFactory ;
22+ import org .apache .rocketmq .remoting .protocol .body .ConsumerRunningInfo ;
23+
2024import java .util .Iterator ;
2125import java .util .Map .Entry ;
2226import java .util .TreeMap ;
23- import org .apache .rocketmq .remoting .protocol .body .ConsumerRunningInfo ;
24- import org .apache .rocketmq .logging .org .slf4j .Logger ;
25- import org .apache .rocketmq .logging .org .slf4j .LoggerFactory ;
2627
2728public class DefaultMonitorListener implements MonitorListener {
2829 private final static String LOG_PREFIX = "[MONITOR] " ;
@@ -34,33 +35,43 @@ public DefaultMonitorListener() {
3435
3536 @ Override
3637 public void beginRound () {
37- logger .info (LOG_PREFIX + " =========================================beginRound" );
38+ logger .info ("{} =========================================beginRound", LOG_PREFIX );
3839 }
3940
4041 @ Override
4142 public void reportUndoneMsgs (UndoneMsgs undoneMsgs ) {
42- logger .info (String . format ( LOG_PREFIX + " reportUndoneMsgs: %s " , undoneMsgs ) );
43+ logger .info ("{} reportUndoneMsgs: {} " , LOG_PREFIX , undoneMsgs );
4344 }
4445
4546 @ Override
4647 public void reportFailedMsgs (FailedMsgs failedMsgs ) {
47- logger .info (String . format ( LOG_PREFIX + " reportFailedMsgs: %s " , failedMsgs ) );
48+ logger .info ("{} reportFailedMsgs: {} " , LOG_PREFIX , failedMsgs );
4849 }
4950
5051 @ Override
5152 public void reportDeleteMsgsEvent (DeleteMsgsEvent deleteMsgsEvent ) {
52- logger .info (String . format ( LOG_PREFIX + " reportDeleteMsgsEvent: %s " , deleteMsgsEvent ) );
53+ logger .info ("{} reportDeleteMsgsEvent: {} " , LOG_PREFIX , deleteMsgsEvent );
5354 }
5455
5556 @ Override
5657 public void reportConsumerRunningInfo (TreeMap <String , ConsumerRunningInfo > criTable ) {
58+ if (criTable == null || criTable .isEmpty ()) {
59+ logger .warn ("{}ConsumerRunningInfo is empty." , LOG_NOTIFY );
60+ return ;
61+ }
62+
63+ ConsumerRunningInfo firstValue = criTable .firstEntry ().getValue ();
64+ if (firstValue == null || firstValue .getProperties () == null ) {
65+ logger .warn ("{}ConsumerRunningInfo entry is empty." , LOG_NOTIFY );
66+ return ;
67+ }
68+
69+ String consumerGroup = firstValue .getProperties ().getProperty ("consumerGroup" );
5770
5871 {
5972 boolean result = ConsumerRunningInfo .analyzeSubscription (criTable );
6073 if (!result ) {
61- logger .info (String .format (LOG_NOTIFY
62- + "reportConsumerRunningInfo: ConsumerGroup: %s, Subscription different" , criTable
63- .firstEntry ().getValue ().getProperties ().getProperty ("consumerGroup" )));
74+ logger .info ("{}reportConsumerRunningInfo: ConsumerGroup: {}, Subscription different" , LOG_NOTIFY , consumerGroup );
6475 }
6576 }
6677
@@ -70,18 +81,18 @@ public void reportConsumerRunningInfo(TreeMap<String, ConsumerRunningInfo> criTa
7081 Entry <String , ConsumerRunningInfo > next = it .next ();
7182 String result = ConsumerRunningInfo .analyzeProcessQueue (next .getKey (), next .getValue ());
7283 if (!result .isEmpty ()) {
73- logger .info (String . format ( LOG_NOTIFY
74- + "reportConsumerRunningInfo: ConsumerGroup: %s, ClientId: %s, %s" ,
75- criTable . firstEntry (). getValue (). getProperties (). getProperty ( " consumerGroup" ) ,
76- next .getKey (),
77- result ) );
84+ logger .info ("{}reportConsumerRunningInfo: ConsumerGroup: {}, ClientId: {}, {}" ,
85+ LOG_NOTIFY ,
86+ consumerGroup ,
87+ next .getKey (),
88+ result );
7889 }
7990 }
8091 }
8192 }
8293
8394 @ Override
8495 public void endRound () {
85- logger .info (LOG_PREFIX + " =========================================endRound" );
96+ logger .info ("{} =========================================endRound", LOG_PREFIX );
8697 }
8798}
0 commit comments