Skip to content

Commit e4ae253

Browse files
authored
Subscription: add removeUserData for messages (#17460)
* feat(subscription): add removeUserData for messages * test(subscription): move message IT to dual suite * Revert "test(subscription): move message IT to dual suite" This reverts commit 38f2f80. * add @ignore
1 parent 5c036db commit e4ae253

6 files changed

Lines changed: 300 additions & 39 deletions

File tree

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.subscription.it.local;
21+
22+
import org.apache.iotdb.isession.ISession;
23+
import org.apache.iotdb.it.env.EnvFactory;
24+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
25+
import org.apache.iotdb.itbase.category.LocalStandaloneIT;
26+
import org.apache.iotdb.rpc.subscription.exception.SubscriptionRuntimeException;
27+
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
28+
import org.apache.iotdb.session.subscription.SubscriptionTreeSession;
29+
import org.apache.iotdb.session.subscription.consumer.tree.SubscriptionTreePullConsumer;
30+
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
31+
import org.apache.iotdb.subscription.it.IoTDBSubscriptionITConstant;
32+
33+
import org.junit.Assert;
34+
import org.junit.Before;
35+
import org.junit.Ignore;
36+
import org.junit.Test;
37+
import org.junit.experimental.categories.Category;
38+
import org.junit.runner.RunWith;
39+
40+
import java.lang.reflect.Field;
41+
import java.time.Duration;
42+
import java.util.List;
43+
import java.util.Set;
44+
import java.util.SortedMap;
45+
import java.util.concurrent.locks.LockSupport;
46+
47+
import static org.junit.Assert.fail;
48+
49+
@RunWith(IoTDBTestRunner.class)
50+
@Category({LocalStandaloneIT.class})
51+
public class IoTDBSubscriptionMessageIT extends AbstractSubscriptionLocalIT {
52+
53+
@Override
54+
@Before
55+
public void setUp() throws Exception {
56+
super.setUp();
57+
}
58+
59+
@Ignore
60+
@Test
61+
public void testPullConsumerCommitAfterRemoveUserData() throws Exception {
62+
final String topicName = "topic_remove_user_data";
63+
insertHistoricalData(0, 100);
64+
createTopic(topicName);
65+
66+
final String host = EnvFactory.getEnv().getIP();
67+
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
68+
try (final SubscriptionTreePullConsumer consumer =
69+
new SubscriptionTreePullConsumer.Builder()
70+
.host(host)
71+
.port(port)
72+
.consumerId("c_remove_user_data")
73+
.consumerGroupId("cg_remove_user_data")
74+
.autoCommit(false)
75+
.buildPullConsumer()) {
76+
consumer.open();
77+
consumer.subscribe(topicName);
78+
79+
final List<SubscriptionMessage> messages = pollMessages(consumer);
80+
Assert.assertFalse(messages.isEmpty());
81+
82+
for (final SubscriptionMessage message : messages) {
83+
Assert.assertNotNull(message.getCommitContext());
84+
Assert.assertFalse(message.getResultSets().isEmpty());
85+
86+
message.removeUserData();
87+
88+
Assert.assertNotNull(message.getCommitContext());
89+
Assert.assertThrows(SubscriptionRuntimeException.class, message::getResultSets);
90+
Assert.assertThrows(SubscriptionRuntimeException.class, message::getRecordTabletIterator);
91+
message.removeUserData();
92+
}
93+
94+
consumer.commitSync(messages);
95+
consumer.unsubscribe(topicName);
96+
}
97+
}
98+
99+
@Ignore
100+
@Test
101+
public void testPullConsumerAutoCommitStoresCommitContextsOnly() throws Exception {
102+
final String topicName = "topic_auto_commit_context_only";
103+
insertHistoricalData(100, 200);
104+
createTopic(topicName);
105+
106+
final String host = EnvFactory.getEnv().getIP();
107+
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
108+
try (final SubscriptionTreePullConsumer consumer =
109+
new SubscriptionTreePullConsumer.Builder()
110+
.host(host)
111+
.port(port)
112+
.consumerId("c_auto_commit")
113+
.consumerGroupId("cg_auto_commit")
114+
.autoCommit(true)
115+
.autoCommitIntervalMs(60_000L)
116+
.buildPullConsumer()) {
117+
consumer.open();
118+
consumer.subscribe(topicName);
119+
120+
final List<SubscriptionMessage> messages = pollMessages(consumer);
121+
Assert.assertFalse(messages.isEmpty());
122+
messages.forEach(SubscriptionMessage::removeUserData);
123+
124+
final SortedMap<Long, Set<SubscriptionCommitContext>> uncommittedCommitContexts =
125+
getUncommittedCommitContexts(consumer);
126+
Assert.assertFalse(uncommittedCommitContexts.isEmpty());
127+
128+
final Object storedObject =
129+
uncommittedCommitContexts.values().iterator().next().iterator().next();
130+
Assert.assertTrue(storedObject instanceof SubscriptionCommitContext);
131+
Assert.assertFalse(storedObject instanceof SubscriptionMessage);
132+
133+
consumer.unsubscribe(topicName);
134+
}
135+
}
136+
137+
private void insertHistoricalData(final int start, final int end) {
138+
try (final ISession session = EnvFactory.getEnv().getSessionConnection()) {
139+
for (int i = start; i < end; ++i) {
140+
session.executeNonQueryStatement(
141+
String.format("insert into root.db.d1(time, s1) values (%s, 1)", i));
142+
}
143+
session.executeNonQueryStatement("flush");
144+
} catch (final Exception e) {
145+
e.printStackTrace();
146+
fail(e.getMessage());
147+
}
148+
}
149+
150+
private void createTopic(final String topicName) {
151+
final String host = EnvFactory.getEnv().getIP();
152+
final int port = Integer.parseInt(EnvFactory.getEnv().getPort());
153+
try (final SubscriptionTreeSession session = new SubscriptionTreeSession(host, port)) {
154+
session.open();
155+
session.createTopic(topicName);
156+
} catch (final Exception e) {
157+
e.printStackTrace();
158+
fail(e.getMessage());
159+
}
160+
}
161+
162+
private List<SubscriptionMessage> pollMessages(final SubscriptionTreePullConsumer consumer)
163+
throws Exception {
164+
for (int i = 0; i < 10; ++i) {
165+
final List<SubscriptionMessage> messages =
166+
consumer.poll(Duration.ofMillis(IoTDBSubscriptionITConstant.POLL_TIMEOUT_MS));
167+
if (!messages.isEmpty()) {
168+
return messages;
169+
}
170+
LockSupport.parkNanos(IoTDBSubscriptionITConstant.SLEEP_NS);
171+
}
172+
fail("Failed to poll subscription messages within the expected timeout.");
173+
throw new IllegalStateException("unreachable");
174+
}
175+
176+
@SuppressWarnings("unchecked")
177+
private SortedMap<Long, Set<SubscriptionCommitContext>> getUncommittedCommitContexts(
178+
final SubscriptionTreePullConsumer consumer) throws Exception {
179+
final Field field =
180+
SubscriptionTreePullConsumer.class
181+
.getSuperclass()
182+
.getDeclaredField("uncommittedCommitContexts");
183+
field.setAccessible(true);
184+
return (SortedMap<Long, Set<SubscriptionCommitContext>>) field.get(consumer);
185+
}
186+
}

iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionConsumer.java

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1145,22 +1145,40 @@ private List<SubscriptionPollResponse> pollTabletsInternal(
11451145
/////////////////////////////// commit sync (ack & nack) ///////////////////////////////
11461146

11471147
protected void ack(final Iterable<SubscriptionMessage> messages) throws SubscriptionException {
1148+
ackCommitContexts(extractCommitContexts(messages));
1149+
}
1150+
1151+
protected void ackCommitContexts(final Iterable<SubscriptionCommitContext> commitContexts)
1152+
throws SubscriptionException {
1153+
commit(commitContexts, false);
1154+
}
1155+
1156+
private Iterable<SubscriptionCommitContext> extractCommitContexts(
1157+
final Iterable<SubscriptionMessage> messages) {
1158+
final List<SubscriptionCommitContext> commitContexts = new ArrayList<>();
1159+
for (final SubscriptionMessage message : messages) {
1160+
commitContexts.add(message.getCommitContext());
1161+
}
1162+
return commitContexts;
1163+
}
1164+
1165+
private void commit(final Iterable<SubscriptionCommitContext> commitContexts, final boolean nack)
1166+
throws SubscriptionException {
11481167
final Map<Integer, List<SubscriptionCommitContext>> dataNodeIdToSubscriptionCommitContexts =
11491168
new HashMap<>();
1150-
for (final SubscriptionMessage message : messages) {
1169+
for (final SubscriptionCommitContext commitContext : commitContexts) {
11511170
dataNodeIdToSubscriptionCommitContexts
1152-
.computeIfAbsent(message.getCommitContext().getDataNodeId(), (id) -> new ArrayList<>())
1153-
.add(message.getCommitContext());
1171+
.computeIfAbsent(commitContext.getDataNodeId(), (id) -> new ArrayList<>())
1172+
.add(commitContext);
11541173
}
11551174
for (final Entry<Integer, List<SubscriptionCommitContext>> entry :
11561175
dataNodeIdToSubscriptionCommitContexts.entrySet()) {
1157-
commitInternal(entry.getKey(), entry.getValue(), false);
1176+
commitInternal(entry.getKey(), entry.getValue(), nack);
11581177
}
11591178
}
11601179

11611180
protected void nack(final Iterable<SubscriptionMessage> messages) throws SubscriptionException {
1162-
final Map<Integer, List<SubscriptionCommitContext>> dataNodeIdToSubscriptionCommitContexts =
1163-
new HashMap<>();
1181+
final List<SubscriptionCommitContext> commitContexts = new ArrayList<>();
11641182
for (final SubscriptionMessage message : messages) {
11651183
// make every effort to delete stale intermediate file
11661184
if (Objects.equals(SubscriptionMessageType.TS_FILE.getType(), message.getMessageType())
@@ -1172,29 +1190,18 @@ protected void nack(final Iterable<SubscriptionMessage> messages) throws Subscri
11721190
} catch (final Exception ignored) {
11731191
}
11741192
}
1175-
dataNodeIdToSubscriptionCommitContexts
1176-
.computeIfAbsent(message.getCommitContext().getDataNodeId(), (id) -> new ArrayList<>())
1177-
.add(message.getCommitContext());
1178-
}
1179-
for (final Entry<Integer, List<SubscriptionCommitContext>> entry :
1180-
dataNodeIdToSubscriptionCommitContexts.entrySet()) {
1181-
commitInternal(entry.getKey(), entry.getValue(), true);
1193+
commitContexts.add(message.getCommitContext());
11821194
}
1195+
commit(commitContexts, true);
11831196
}
11841197

11851198
private void nack(final List<SubscriptionPollResponse> responses) throws SubscriptionException {
1186-
final Map<Integer, List<SubscriptionCommitContext>> dataNodeIdToSubscriptionCommitContexts =
1187-
new HashMap<>();
1199+
final List<SubscriptionCommitContext> commitContexts = new ArrayList<>();
11881200
for (final SubscriptionPollResponse response : responses) {
11891201
// there is no stale intermediate file here
1190-
dataNodeIdToSubscriptionCommitContexts
1191-
.computeIfAbsent(response.getCommitContext().getDataNodeId(), (id) -> new ArrayList<>())
1192-
.add(response.getCommitContext());
1193-
}
1194-
for (final Entry<Integer, List<SubscriptionCommitContext>> entry :
1195-
dataNodeIdToSubscriptionCommitContexts.entrySet()) {
1196-
commitInternal(entry.getKey(), entry.getValue(), true);
1202+
commitContexts.add(response.getCommitContext());
11971203
}
1204+
commit(commitContexts, true);
11981205
}
11991206

12001207
private void commitInternal(

iotdb-client/subscription/src/main/java/org/apache/iotdb/session/subscription/consumer/base/AbstractSubscriptionPullConsumer.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.apache.iotdb.rpc.subscription.config.ConsumerConstant;
2323
import org.apache.iotdb.rpc.subscription.exception.SubscriptionException;
24+
import org.apache.iotdb.rpc.subscription.payload.poll.SubscriptionCommitContext;
2425
import org.apache.iotdb.session.subscription.consumer.AsyncCommitCallback;
2526
import org.apache.iotdb.session.subscription.payload.SubscriptionMessage;
2627
import org.apache.iotdb.session.subscription.util.CollectionUtils;
@@ -64,7 +65,7 @@ public abstract class AbstractSubscriptionPullConsumer extends AbstractSubscript
6465
private final boolean autoCommit;
6566
private final long autoCommitIntervalMs;
6667

67-
private SortedMap<Long, Set<SubscriptionMessage>> uncommittedMessages;
68+
private SortedMap<Long, Set<SubscriptionCommitContext>> uncommittedCommitContexts;
6869

6970
private final AtomicBoolean isClosed = new AtomicBoolean(true);
7071

@@ -123,7 +124,7 @@ protected synchronized void open() throws SubscriptionException {
123124

124125
// submit auto poll worker if enabling auto commit
125126
if (autoCommit) {
126-
uncommittedMessages = new ConcurrentSkipListMap<>();
127+
uncommittedCommitContexts = new ConcurrentSkipListMap<>();
127128
submitAutoCommitWorker();
128129
}
129130
}
@@ -201,9 +202,12 @@ protected List<SubscriptionMessage> poll(final Set<String> topicNames, final lon
201202
if (currentTimestamp % autoCommitIntervalMs == 0) {
202203
index -= 1;
203204
}
204-
uncommittedMessages
205+
uncommittedCommitContexts
205206
.computeIfAbsent(index, o -> new ConcurrentSkipListSet<>())
206-
.addAll(messages);
207+
.addAll(
208+
messages.stream()
209+
.map(SubscriptionMessage::getCommitContext)
210+
.collect(Collectors.toList()));
207211
}
208212

209213
return messages;
@@ -271,11 +275,11 @@ public void run() {
271275
index -= 1;
272276
}
273277

274-
for (final Map.Entry<Long, Set<SubscriptionMessage>> entry :
275-
uncommittedMessages.headMap(index).entrySet()) {
278+
for (final Map.Entry<Long, Set<SubscriptionCommitContext>> entry :
279+
uncommittedCommitContexts.headMap(index).entrySet()) {
276280
try {
277-
ack(entry.getValue());
278-
uncommittedMessages.remove(entry.getKey());
281+
ackCommitContexts(entry.getValue());
282+
uncommittedCommitContexts.remove(entry.getKey());
279283
} catch (final Exception e) {
280284
LOGGER.warn("something unexpected happened when auto commit messages...", e);
281285
}
@@ -284,10 +288,11 @@ public void run() {
284288
}
285289

286290
private void commitAllUncommittedMessages() {
287-
for (final Map.Entry<Long, Set<SubscriptionMessage>> entry : uncommittedMessages.entrySet()) {
291+
for (final Map.Entry<Long, Set<SubscriptionCommitContext>> entry :
292+
uncommittedCommitContexts.entrySet()) {
288293
try {
289-
ack(entry.getValue());
290-
uncommittedMessages.remove(entry.getKey());
294+
ackCommitContexts(entry.getValue());
295+
uncommittedCommitContexts.remove(entry.getKey());
291296
} catch (final Exception e) {
292297
LOGGER.warn("something unexpected happened when commit messages during close", e);
293298
}
@@ -314,7 +319,7 @@ protected Map<String, String> allReportMessage() {
314319
allReportMessage.put("autoCommit", String.valueOf(autoCommit));
315320
allReportMessage.put("autoCommitIntervalMs", String.valueOf(autoCommitIntervalMs));
316321
if (autoCommit) {
317-
allReportMessage.put("uncommittedMessages", uncommittedMessages.toString());
322+
allReportMessage.put("uncommittedCommitContexts", uncommittedCommitContexts.toString());
318323
}
319324
return allReportMessage;
320325
}

0 commit comments

Comments
 (0)