Skip to content

Commit 3b12a25

Browse files
authored
[ISSUE #10076] Make orderly resetOffset wait on consume lock while preserving timeout semantics (#10175)
1 parent d66cfa9 commit 3b12a25

2 files changed

Lines changed: 156 additions & 8 deletions

File tree

client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@
9696

9797
public class MQClientInstance {
9898
private final static long LOCK_TIMEOUT_MILLIS = 3000;
99+
private final static long RESET_OFFSET_MAX_WAIT = 10;
99100
private final static Logger log = LoggerFactory.getLogger(MQClientInstance.class);
100101
private final ClientConfig clientConfig;
101102
private final String clientId;
@@ -1380,9 +1381,11 @@ public synchronized void resetOffset(String topic, String group, Map<MessageQueu
13801381
}
13811382
}
13821383

1383-
try {
1384-
TimeUnit.SECONDS.sleep(10);
1385-
} catch (InterruptedException ignored) {
1384+
if (!consumer.isConsumeOrderly()) {
1385+
try {
1386+
TimeUnit.SECONDS.sleep(RESET_OFFSET_MAX_WAIT);
1387+
} catch (InterruptedException ignored) {
1388+
}
13861389
}
13871390

13881391
Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();
@@ -1391,8 +1394,10 @@ public synchronized void resetOffset(String topic, String group, Map<MessageQueu
13911394
Long offset = offsetTable.get(mq);
13921395
if (topic.equals(mq.getTopic()) && offset != null) {
13931396
try {
1397+
ProcessQueue pq = processQueueTable.get(mq);
1398+
waitResetOffsetReady(consumer, pq);
13941399
consumer.updateConsumeOffset(mq, offset);
1395-
consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
1400+
consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, pq);
13961401
iterator.remove();
13971402
} catch (Exception e) {
13981403
log.warn("reset offset failed. group={}, {}", group, mq, e);
@@ -1406,6 +1411,22 @@ public synchronized void resetOffset(String topic, String group, Map<MessageQueu
14061411
}
14071412
}
14081413

1414+
private void waitResetOffsetReady(DefaultMQPushConsumerImpl consumer, ProcessQueue pq) {
1415+
if (consumer.isConsumeOrderly()) {
1416+
Lock lock = pq.getConsumeLock().writeLock();
1417+
boolean locked = false;
1418+
try {
1419+
locked = lock.tryLock(RESET_OFFSET_MAX_WAIT, TimeUnit.SECONDS);
1420+
} catch (InterruptedException ignored) {
1421+
Thread.currentThread().interrupt();
1422+
} finally {
1423+
if (locked) {
1424+
lock.unlock();
1425+
}
1426+
}
1427+
}
1428+
}
1429+
14091430
@SuppressWarnings("unchecked")
14101431
public Map<MessageQueue, Long> getConsumerStatus(String topic, String group) {
14111432
MQConsumerInner impl = this.consumerTable.get(group);

client/src/test/java/org/apache/rocketmq/client/impl/factory/MQClientInstanceTest.java

Lines changed: 131 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,12 @@
7474
import java.util.Set;
7575
import java.util.concurrent.ConcurrentHashMap;
7676
import java.util.concurrent.ConcurrentMap;
77+
import java.util.concurrent.CountDownLatch;
7778
import java.util.concurrent.ExecutorService;
79+
import java.util.concurrent.TimeUnit;
80+
import java.util.concurrent.atomic.AtomicReference;
81+
import java.util.concurrent.locks.Lock;
82+
import java.util.concurrent.locks.ReadWriteLock;
7883

7984
import static org.assertj.core.api.Assertions.assertThat;
8085
import static org.junit.Assert.assertEquals;
@@ -397,6 +402,119 @@ public void testResetOffset() throws IllegalAccessException {
397402
eq(0L));
398403
}
399404

405+
@Test
406+
public void testResetOffsetOrderly() {
407+
topicRouteTable.put(topic, createTopicRouteData());
408+
brokerAddrTable.put(defaultBroker, createBrokerAddrMap());
409+
MessageQueue messageQueue = createMessageQueue();
410+
ProcessQueue processQueue = new ProcessQueue();
411+
RebalanceImpl rebalanceImpl = mock(RebalanceImpl.class);
412+
when(rebalanceImpl.removeUnnecessaryMessageQueue(eq(messageQueue), eq(processQueue)))
413+
.thenReturn(false, false, true);
414+
consumerTable.put(group, createMQConsumerInner(processQueue, true, rebalanceImpl));
415+
Map<MessageQueue, Long> offsetTable = new HashMap<>();
416+
offsetTable.put(messageQueue, 0L);
417+
418+
mqClientInstance.resetOffset(topic, group, offsetTable);
419+
420+
verify(rebalanceImpl).removeUnnecessaryMessageQueue(messageQueue, processQueue);
421+
}
422+
423+
@Test
424+
public void testResetOffsetOrderlyWhenWaitTimesOut() throws InterruptedException {
425+
topicRouteTable.put(topic, createTopicRouteData());
426+
brokerAddrTable.put(defaultBroker, createBrokerAddrMap());
427+
MessageQueue messageQueue = createMessageQueue();
428+
ProcessQueue processQueue = mock(ProcessQueue.class);
429+
ReadWriteLock consumeLock = mock(ReadWriteLock.class);
430+
Lock writeLock = mock(Lock.class);
431+
RebalanceImpl rebalanceImpl = mock(RebalanceImpl.class);
432+
when(processQueue.getConsumeLock()).thenReturn(consumeLock);
433+
when(consumeLock.writeLock()).thenReturn(writeLock);
434+
when(writeLock.tryLock(10, TimeUnit.SECONDS)).thenReturn(false);
435+
DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) createMQConsumerInner(processQueue, true, rebalanceImpl);
436+
consumerTable.put(group, consumer);
437+
Map<MessageQueue, Long> offsetTable = new HashMap<>();
438+
offsetTable.put(messageQueue, 0L);
439+
440+
mqClientInstance.resetOffset(topic, group, offsetTable);
441+
442+
verify(consumer).updateConsumeOffset(messageQueue, 0L);
443+
verify(rebalanceImpl).removeUnnecessaryMessageQueue(messageQueue, processQueue);
444+
verify(writeLock, times(1)).tryLock(10, TimeUnit.SECONDS);
445+
verify(writeLock, times(0)).unlock();
446+
}
447+
448+
@Test
449+
public void testResetOffsetOrderlyWaitsForInflightConsumptionBeforeUpdatingOffset() throws Exception {
450+
topicRouteTable.put(topic, createTopicRouteData());
451+
brokerAddrTable.put(defaultBroker, createBrokerAddrMap());
452+
MessageQueue messageQueue = createMessageQueue();
453+
ProcessQueue processQueue = new ProcessQueue();
454+
RebalanceImpl rebalanceImpl = mock(RebalanceImpl.class);
455+
when(rebalanceImpl.removeUnnecessaryMessageQueue(eq(messageQueue), eq(processQueue))).thenReturn(true);
456+
DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) createMQConsumerInner(processQueue, true, rebalanceImpl);
457+
consumerTable.put(group, consumer);
458+
Map<MessageQueue, Long> offsetTable = new HashMap<>();
459+
offsetTable.put(messageQueue, 0L);
460+
461+
CountDownLatch consumeLockHeld = new CountDownLatch(1);
462+
CountDownLatch releaseConsumeLock = new CountDownLatch(1);
463+
CountDownLatch suspendCalled = new CountDownLatch(1);
464+
CountDownLatch updateOffsetCalled = new CountDownLatch(1);
465+
AtomicReference<Throwable> backgroundFailure = new AtomicReference<>();
466+
467+
doAnswer(invocation -> {
468+
suspendCalled.countDown();
469+
return null;
470+
}).when(consumer).suspend();
471+
doAnswer(invocation -> {
472+
updateOffsetCalled.countDown();
473+
return null;
474+
}).when(consumer).updateConsumeOffset(messageQueue, 0L);
475+
476+
Thread consumingThread = new Thread(() -> {
477+
processQueue.getConsumeLock().readLock().lock();
478+
try {
479+
consumeLockHeld.countDown();
480+
if (!releaseConsumeLock.await(5, TimeUnit.SECONDS)) {
481+
backgroundFailure.compareAndSet(null,
482+
new AssertionError("Timed out while waiting to release orderly consume lock"));
483+
}
484+
} catch (InterruptedException e) {
485+
Thread.currentThread().interrupt();
486+
backgroundFailure.compareAndSet(null, e);
487+
} finally {
488+
processQueue.getConsumeLock().readLock().unlock();
489+
}
490+
});
491+
Thread resetThread = new Thread(() -> {
492+
try {
493+
mqClientInstance.resetOffset(topic, group, offsetTable);
494+
} catch (Throwable t) {
495+
backgroundFailure.compareAndSet(null, t);
496+
}
497+
});
498+
499+
consumingThread.start();
500+
assertTrue(consumeLockHeld.await(5, TimeUnit.SECONDS));
501+
502+
resetThread.start();
503+
assertTrue(suspendCalled.await(5, TimeUnit.SECONDS));
504+
assertFalse(updateOffsetCalled.await(200, TimeUnit.MILLISECONDS));
505+
506+
releaseConsumeLock.countDown();
507+
consumingThread.join(5000);
508+
resetThread.join(5000);
509+
510+
assertNull(backgroundFailure.get());
511+
assertFalse(consumingThread.isAlive());
512+
assertFalse(resetThread.isAlive());
513+
assertTrue(updateOffsetCalled.await(1, TimeUnit.SECONDS));
514+
verify(consumer).updateConsumeOffset(messageQueue, 0L);
515+
verify(rebalanceImpl).removeUnnecessaryMessageQueue(messageQueue, processQueue);
516+
}
517+
400518
@Test
401519
public void testGetConsumerStatus() {
402520
topicRouteTable.put(topic, createTopicRouteData());
@@ -475,17 +593,26 @@ private HashMap<Long, String> createBrokerAddrMap() {
475593
}
476594

477595
private MQConsumerInner createMQConsumerInner() {
596+
RebalanceImpl rebalanceImpl = mock(RebalanceImpl.class);
597+
when(rebalanceImpl.removeUnnecessaryMessageQueue(any(MessageQueue.class), any(ProcessQueue.class))).thenReturn(true);
598+
return createMQConsumerInner(new ProcessQueue(), false, rebalanceImpl);
599+
}
600+
601+
private MQConsumerInner createMQConsumerInner(ProcessQueue processQueue, boolean orderly, RebalanceImpl rebalanceImpl) {
602+
ConcurrentMap<MessageQueue, ProcessQueue> processQueueMap = new ConcurrentHashMap<>();
603+
processQueueMap.put(createMessageQueue(), processQueue);
604+
return createMQConsumerInner(processQueueMap, orderly, rebalanceImpl);
605+
}
606+
607+
private MQConsumerInner createMQConsumerInner(ConcurrentMap<MessageQueue, ProcessQueue> processQueueMap, boolean orderly, RebalanceImpl rebalanceImpl) {
478608
DefaultMQPushConsumerImpl result = mock(DefaultMQPushConsumerImpl.class);
479609
Set<SubscriptionData> subscriptionDataSet = new HashSet<>();
480610
SubscriptionData subscriptionData = mock(SubscriptionData.class);
481611
subscriptionDataSet.add(subscriptionData);
482612
when(result.subscriptions()).thenReturn(subscriptionDataSet);
483-
RebalanceImpl rebalanceImpl = mock(RebalanceImpl.class);
484-
ConcurrentMap<MessageQueue, ProcessQueue> processQueueMap = new ConcurrentHashMap<>();
485-
ProcessQueue processQueue = new ProcessQueue();
486-
processQueueMap.put(createMessageQueue(), processQueue);
487613
when(rebalanceImpl.getProcessQueueTable()).thenReturn(processQueueMap);
488614
when(result.getRebalanceImpl()).thenReturn(rebalanceImpl);
615+
when(result.isConsumeOrderly()).thenReturn(orderly);
489616
OffsetStore offsetStore = mock(OffsetStore.class);
490617
when(result.getOffsetStore()).thenReturn(offsetStore);
491618
ConsumeMessageService consumeMessageService = mock(ConsumeMessageService.class);

0 commit comments

Comments
 (0)