File tree Expand file tree Collapse file tree
store/src/main/java/org/apache/rocketmq/store Expand file tree Collapse file tree Original file line number Diff line number Diff line change @@ -122,7 +122,7 @@ protected PutMessageThreadLocal initialValue() {
122122
123123 this .flushDiskWatcher = new FlushDiskWatcher ();
124124
125- this .topicQueueLock = new TopicQueueLock ();
125+ this .topicQueueLock = new TopicQueueLock (messageStore . getMessageStoreConfig (). getTopicQueueLockNum () );
126126
127127 this .commitLogSize = messageStore .getMessageStoreConfig ().getMappedFileSizeCommitLog ();
128128 }
Original file line number Diff line number Diff line change @@ -34,6 +34,14 @@ public TopicQueueLock() {
3434 }
3535 }
3636
37+ public TopicQueueLock (int size ) {
38+ this .size = size ;
39+ this .lockList = new ArrayList <>(size );
40+ for (int i = 0 ; i < this .size ; i ++) {
41+ this .lockList .add (new ReentrantLock ());
42+ }
43+ }
44+
3745 public void lock (String topicQueueKey ) {
3846 Lock lock = this .lockList .get ((topicQueueKey .hashCode () & 0x7fffffff ) % this .size );
3947 lock .lock ();
Original file line number Diff line number Diff line change @@ -401,6 +401,8 @@ public class MessageStoreConfig {
401401 private long memTableFlushInterval = 60 * 60 * 1000L ;
402402 private boolean enableRocksDBLog = false ;
403403
404+ private int topicQueueLockNum = 32 ;
405+
404406 public boolean isDebugLockEnable () {
405407 return debugLockEnable ;
406408 }
@@ -1751,4 +1753,12 @@ public boolean isEnableRocksDBLog() {
17511753 public void setEnableRocksDBLog (boolean enableRocksDBLog ) {
17521754 this .enableRocksDBLog = enableRocksDBLog ;
17531755 }
1756+
1757+ public int getTopicQueueLockNum () {
1758+ return topicQueueLockNum ;
1759+ }
1760+
1761+ public void setTopicQueueLockNum (int topicQueueLockNum ) {
1762+ this .topicQueueLockNum = topicQueueLockNum ;
1763+ }
17541764}
You can’t perform that action at this time.
0 commit comments