Skip to content

Commit c72b967

Browse files
authored
[To dev/1.3] Subscription: add removeUserData for messages (#17479)
1 parent d34bd8d commit c72b967

6 files changed

Lines changed: 287 additions & 35 deletions

File tree

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.subscription.it.local;
21+
22+
import org.apache.iotdb.isession.ISession;
23+
import org.apache.iotdb.it.env.EnvFactory;
24+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
25+
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
26+
import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeException;
27+
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
28+
import org.apache.iotdb.session.subscription.SubscriptionSession;
29+
import org.apache.iotdb.session.subscription.consumer.SubscriptionPullConsumer;
30+
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
31+
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
32+
33+
import org.junit.Assert;
34+
import org.junit.Before;
35+
import org.junit.Ignore;
36+
import org.junit.Test;
37+
import org.junit.experimental.categories.Category;
38+
import org.junit.runner.RunWith;
39+
40+
import java.lang.reflect.Field;
41+
import java.time.Duration;
42+
import java.util.List;
43+
import java.util.Set;
44+
import java.util.SortedMap;
45+
import java.util.concurrent.locks.LockSupport;
46+
47+
import static org.junit.Assert.fail;
48+
49+
@RunWith(IoTDBTestRunner.class)
50+
@Category({LocalStandaloneIT.class})
51+
public class IoTDBSubscriptionMessageIT extends AbstractSubscriptionLocalIT {
52+
53+
@Override
54+
@Before
55+
public void setUp() throws Exception {
56+
super.setUp();
57+
}
58+
59+
@Ignore
60+
@Test
61+
public void testPullConsumerCommitAfterRemoveUserData() throws Exception {
62+
final String topicName = "topic_remove_user_data";
63+
insertHistoricalData(0, 100);
64+
createTopic(topicName);
65+
66+
final String host = EnvFactory.getEnv().getIP();
67+
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
68+
try (final SubscriptionPullConsumer consumer =
69+
new SubscriptionPullConsumer.Builder()
70+
.host(host)
71+
.port(port)
72+
.consumerId("c_remove_user_data")
73+
.consumerGroupId("cg_remove_user_data")
74+
.autoCommit(false)
75+
.buildPullConsumer()) {
76+
consumer.open();
77+
consumer.subscribe(topicName);
78+
79+
final List<SubscriptionMessage> messages = pollMessages(consumer);
80+
Assert.assertFalse(messages.isEmpty());
81+
82+
for (final SubscriptionMessage message : messages) {
83+
Assert.assertNotNull(message.getCommitContext());
84+
Assert.assertFalse(message.getResultSets().isEmpty());
85+
86+
message.removeUserData();
87+
88+
Assert.assertNotNull(message.getCommitContext());
89+
Assert.assertThrows(SubscriptionRuntimeException.class, message::getResultSets);
90+
Assert.assertThrows(SubscriptionRuntimeException.class, message::getRecordTabletIterator);
91+
message.removeUserData();
92+
}
93+
94+
consumer.commitSync(messages);
95+
consumer.unsubscribe(topicName);
96+
}
97+
}
98+
99+
@Ignore
100+
@Test
101+
public void testPullConsumerAutoCommitStoresCommitContextsOnly() throws Exception {
102+
final String topicName = "topic_auto_commit_context_only";
103+
insertHistoricalData(100, 200);
104+
createTopic(topicName);
105+
106+
final String host = EnvFactory.getEnv().getIP();
107+
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
108+
try (final SubscriptionPullConsumer consumer =
109+
new SubscriptionPullConsumer.Builder()
110+
.host(host)
111+
.port(port)
112+
.consumerId("c_auto_commit")
113+
.consumerGroupId("cg_auto_commit")
114+
.autoCommit(true)
115+
.autoCommitIntervalMs(60_000L)
116+
.buildPullConsumer()) {
117+
consumer.open();
118+
consumer.subscribe(topicName);
119+
120+
final List<SubscriptionMessage> messages = pollMessages(consumer);
121+
Assert.assertFalse(messages.isEmpty());
122+
messages.forEach(SubscriptionMessage::removeUserData);
123+
124+
final SortedMap<Long, Set<SubscriptionCommitContext>> uncommittedCommitContexts =
125+
getUncommittedCommitContexts(consumer);
126+
Assert.assertFalse(uncommittedCommitContexts.isEmpty());
127+
128+
final Object storedObject =
129+
uncommittedCommitContexts.values().iterator().next().iterator().next();
130+
Assert.assertTrue(storedObject instanceof SubscriptionCommitContext);
131+
Assert.assertFalse(storedObject instanceof SubscriptionMessage);
132+
133+
consumer.unsubscribe(topicName);
134+
}
135+
}
136+
137+
private void insertHistoricalData(final int start, final int end) {
138+
try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
139+
for (int i = start; i < end; ++i) {
140+
session.executeNonQueryStatement(
141+
String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
142+
}
143+
session.executeNonQueryStatement("flush");
144+
} catch (final Exception e) {
145+
e.printStackTrace();
146+
fail(e.getMessage());
147+
}
148+
}
149+
150+
private void createTopic(final String topicName) {
151+
final String host = EnvFactory.getEnv().getIP();
152+
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
153+
try (final SubscriptionSession session = new SubscriptionSession(host, port)) {
154+
session.open();
155+
session.createTopic(topicName);
156+
} catch (final Exception e) {
157+
e.printStackTrace();
158+
fail(e.getMessage());
159+
}
160+
}
161+
162+
private List<SubscriptionMessage> pollMessages(final SubscriptionPullConsumer consumer)
163+
throws Exception {
164+
for (int i = 0; i < 10; ++i) {
165+
final List<SubscriptionMessage> messages =
166+
consumer.poll(Duration.ofMillis(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS));
167+
if (!messages.isEmpty()) {
168+
return messages;
169+
}
170+
LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS);
171+
}
172+
fail("Failed to poll subscription messages within the expected timeout.");
173+
throw new IllegalStateException("unreachable");
174+
}
175+
176+
@SuppressWarnings("unchecked")
177+
private SortedMap<Long, Set<SubscriptionCommitContext>> getUncommittedCommitContexts(
178+
final SubscriptionPullConsumer consumer) throws Exception {
179+
final Field field =
180+
SubscriptionPullConsumer.class.getDeclaredField("uncommittedCommitContexts");
181+
field.setAccessible(true);
182+
return (SortedMap<Long, Set<SubscriptionCommitContext>>) field.get(consumer);
183+
}
184+
}

iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionConsumer.java

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1120,22 +1120,40 @@ private List<SubscriptionPollResponse> pollTabletsInternal(
11201120
/////////////////////////////// commit sync (ack & nack) ///////////////////////////////
11211121

11221122
protected void ack(final Iterable<SubscriptionMessage> messages) throws SubscriptionException {
1123+
ackCommitContexts(extractCommitContexts(messages));
1124+
}
1125+
1126+
protected void ackCommitContexts(final Iterable<SubscriptionCommitContext> commitContexts)
1127+
throws SubscriptionException {
1128+
commit(commitContexts, false);
1129+
}
1130+
1131+
private Iterable<SubscriptionCommitContext> extractCommitContexts(
1132+
final Iterable<SubscriptionMessage> messages) {
1133+
final List<SubscriptionCommitContext> commitContexts = new ArrayList<>();
1134+
for (final SubscriptionMessage message : messages) {
1135+
commitContexts.add(message.getCommitContext());
1136+
}
1137+
return commitContexts;
1138+
}
1139+
1140+
private void commit(final Iterable<SubscriptionCommitContext> commitContexts, final boolean nack)
1141+
throws SubscriptionException {
11231142
final Map<Integer, List<SubscriptionCommitContext>> dataNodeIdToSubscriptionCommitContexts =
11241143
new HashMap<>();
1125-
for (final SubscriptionMessage message : messages) {
1144+
for (final SubscriptionCommitContext commitContext : commitContexts) {
11261145
dataNodeIdToSubscriptionCommitContexts
1127-
.computeIfAbsent(message.getCommitContext().getDataNodeId(), (id) -> new ArrayList<>())
1128-
.add(message.getCommitContext());
1146+
.computeIfAbsent(commitContext.getDataNodeId(), (id) -> new ArrayList<>())
1147+
.add(commitContext);
11291148
}
11301149
for (final Entry<Integer, List<SubscriptionCommitContext>> entry :
11311150
dataNodeIdToSubscriptionCommitContexts.entrySet()) {
1132-
commitInternal(entry.getKey(), entry.getValue(), false);
1151+
commitInternal(entry.getKey(), entry.getValue(), nack);
11331152
}
11341153
}
11351154

11361155
protected void nack(final Iterable<SubscriptionMessage> messages) throws SubscriptionException {
1137-
final Map<Integer, List<SubscriptionCommitContext>> dataNodeIdToSubscriptionCommitContexts =
1138-
new HashMap<>();
1156+
final List<SubscriptionCommitContext> commitContexts = new ArrayList<>();
11391157
for (final SubscriptionMessage message : messages) {
11401158
// make every effort to delete stale intermediate file
11411159
if (Objects.equals(SubscriptionMessageType.TS_FILE.getType(), message.getMessageType())
@@ -1147,29 +1165,18 @@ protected void nack(final Iterable<SubscriptionMessage> messages) throws Subscri
11471165
} catch (final Exception ignored) {
11481166
}
11491167
}
1150-
dataNodeIdToSubscriptionCommitContexts
1151-
.computeIfAbsent(message.getCommitContext().getDataNodeId(), (id) -> new ArrayList<>())
1152-
.add(message.getCommitContext());
1153-
}
1154-
for (final Entry<Integer, List<SubscriptionCommitContext>> entry :
1155-
dataNodeIdToSubscriptionCommitContexts.entrySet()) {
1156-
commitInternal(entry.getKey(), entry.getValue(), true);
1168+
commitContexts.add(message.getCommitContext());
11571169
}
1170+
commit(commitContexts, true);
11581171
}
11591172

11601173
private void nack(final List<SubscriptionPollResponse> responses) throws SubscriptionException {
1161-
final Map<Integer, List<SubscriptionCommitContext>> dataNodeIdToSubscriptionCommitContexts =
1162-
new HashMap<>();
1174+
final List<SubscriptionCommitContext> commitContexts = new ArrayList<>();
11631175
for (final SubscriptionPollResponse response : responses) {
11641176
// there is no stale intermediate file here
1165-
dataNodeIdToSubscriptionCommitContexts
1166-
.computeIfAbsent(response.getCommitContext().getDataNodeId(), (id) -> new ArrayList<>())
1167-
.add(response.getCommitContext());
1168-
}
1169-
for (final Entry<Integer, List<SubscriptionCommitContext>> entry :
1170-
dataNodeIdToSubscriptionCommitContexts.entrySet()) {
1171-
commitInternal(entry.getKey(), entry.getValue(), true);
1177+
commitContexts.add(response.getCommitContext());
11721178
}
1179+
commit(commitContexts, true);
11731180
}
11741181

11751182
private void commitInternal(

iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/SubscriptionPullConsumer.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
2323
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
24+
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
2425
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
2526
import org.apache.iotdb.session.subscription.util.CollectionUtils;
2627
import org.apache.iotdb.session.subscription.util.IdentifierUtils;
@@ -62,7 +63,7 @@ public class SubscriptionPullConsumer extends SubscriptionConsumer {
6263
private final boolean autoCommit;
6364
private final long autoCommitIntervalMs;
6465

65-
private SortedMap<Long, Set<SubscriptionMessage>> uncommittedMessages;
66+
private SortedMap<Long, Set<SubscriptionCommitContext>> uncommittedCommitContexts;
6667

6768
private final AtomicBoolean isClosed = new AtomicBoolean(true);
6869

@@ -117,7 +118,7 @@ public synchronized void open() throws SubscriptionException {
117118

118119
// submit auto poll worker if enabling auto commit
119120
if (autoCommit) {
120-
uncommittedMessages = new ConcurrentSkipListMap<>();
121+
uncommittedCommitContexts = new ConcurrentSkipListMap<>();
121122
submitAutoCommitWorker();
122123
}
123124
}
@@ -195,9 +196,12 @@ public List<SubscriptionMessage> poll(final Set<String> topicNames, final long t
195196
if (currentTimestamp % autoCommitIntervalMs == 0) {
196197
index -= 1;
197198
}
198-
uncommittedMessages
199+
uncommittedCommitContexts
199200
.computeIfAbsent(index, o -> new ConcurrentSkipListSet<>())
200-
.addAll(messages);
201+
.addAll(
202+
messages.stream()
203+
.map(SubscriptionMessage::getCommitContext)
204+
.collect(Collectors.toList()));
201205
}
202206

203207
return messages;
@@ -264,11 +268,11 @@ public void run() {
264268
index -= 1;
265269
}
266270

267-
for (final Map.Entry<Long, Set<SubscriptionMessage>> entry :
268-
uncommittedMessages.headMap(index).entrySet()) {
271+
for (final Map.Entry<Long, Set<SubscriptionCommitContext>> entry :
272+
uncommittedCommitContexts.headMap(index).entrySet()) {
269273
try {
270-
ack(entry.getValue());
271-
uncommittedMessages.remove(entry.getKey());
274+
ackCommitContexts(entry.getValue());
275+
uncommittedCommitContexts.remove(entry.getKey());
272276
} catch (final Exception e) {
273277
LOGGER.warn("something unexpected happened when auto commit messages...", e);
274278
}
@@ -277,10 +281,11 @@ public void run() {
277281
}
278282

279283
private void commitAllUncommittedMessages() {
280-
for (final Map.Entry<Long, Set<SubscriptionMessage>> entry : uncommittedMessages.entrySet()) {
284+
for (final Map.Entry<Long, Set<SubscriptionCommitContext>> entry :
285+
uncommittedCommitContexts.entrySet()) {
281286
try {
282-
ack(entry.getValue());
283-
uncommittedMessages.remove(entry.getKey());
287+
ackCommitContexts(entry.getValue());
288+
uncommittedCommitContexts.remove(entry.getKey());
284289
} catch (final Exception e) {
285290
LOGGER.warn("something unexpected happened when commit messages during close", e);
286291
}
@@ -421,7 +426,7 @@ protected Map<String, String> allReportMessage() {
421426
allReportMessage.put("autoCommit", String.valueOf(autoCommit));
422427
allReportMessage.put("autoCommitIntervalMs", String.valueOf(autoCommitIntervalMs));
423428
if (autoCommit) {
424-
allReportMessage.put("uncommittedMessages", uncommittedMessages.toString());
429+
allReportMessage.put("uncommittedCommitContexts", uncommittedCommitContexts.toString());
425430
}
426431
return allReportMessage;
427432
}

0 commit comments

Comments
 (0)