Skip to content

Commit 33fa6fa

Browse files
authored
Subscription: Fixed multiple problems (#17418) (#17422)
1 parent 6309e69 commit 33fa6fa

5 files changed

Lines changed: 45 additions & 39 deletions

File tree

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/subscription/SubscriptionInfo.java

Lines changed: 25 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -220,26 +220,21 @@ public void validatePipePluginUsageByTopic(String pipePluginName) throws Subscri
220220
}
221221
}
222222

223-
public void validatePipePluginUsageByTopicInternal(String pipePluginName)
223+
private void validatePipePluginUsageByTopicInternal(String pipePluginName)
224224
throws SubscriptionException {
225-
acquireReadLock();
226-
try {
227-
topicMetaKeeper
228-
.getAllTopicMeta()
229-
.forEach(
230-
meta -> {
231-
if (pipePluginName.equals(meta.getConfig().getAttribute().get("processor"))) {
232-
final String exceptionMessage =
233-
String.format(
234-
"PipePlugin '%s' is already used by Topic '%s' as a processor.",
235-
pipePluginName, meta.getTopicName());
236-
LOGGER.warn(exceptionMessage);
237-
throw new SubscriptionException(exceptionMessage);
238-
}
239-
});
240-
} finally {
241-
releaseReadLock();
242-
}
225+
topicMetaKeeper
226+
.getAllTopicMeta()
227+
.forEach(
228+
meta -> {
229+
if (pipePluginName.equals(meta.getConfig().getAttribute().get("processor"))) {
230+
final String exceptionMessage =
231+
String.format(
232+
"PipePlugin '%s' is already used by Topic '%s' as a processor.",
233+
pipePluginName, meta.getTopicName());
234+
LOGGER.warn(exceptionMessage);
235+
throw new SubscriptionException(exceptionMessage);
236+
}
237+
});
243238
}
244239

245240
public void validateBeforeAlteringTopic(TopicMeta topicMeta) throws SubscriptionException {
@@ -326,21 +321,25 @@ public TSStatus createTopic(CreateTopicPlan plan) {
326321
public TSStatus alterTopic(AlterTopicPlan plan) {
327322
acquireWriteLock();
328323
try {
329-
topicMetaKeeper.removeTopicMeta(plan.getTopicMeta().getTopicName());
330-
topicMetaKeeper.addTopicMeta(plan.getTopicMeta().getTopicName(), plan.getTopicMeta());
331-
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
324+
return alterTopicInternal(plan);
332325
} finally {
333326
releaseWriteLock();
334327
}
335328
}
336329

337-
public TSStatus alterMultipleTopics(AlterMultipleTopicsPlan plan) {
330+
private TSStatus alterTopicInternal(final AlterTopicPlan plan) {
331+
topicMetaKeeper.removeTopicMeta(plan.getTopicMeta().getTopicName());
332+
topicMetaKeeper.addTopicMeta(plan.getTopicMeta().getTopicName(), plan.getTopicMeta());
333+
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
334+
}
335+
336+
public TSStatus alterMultipleTopics(final AlterMultipleTopicsPlan plan) {
338337
acquireWriteLock();
339338
try {
340-
TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
339+
final TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
341340
status.setSubStatus(new ArrayList<>());
342-
for (AlterTopicPlan subPlan : plan.getSubPlans()) {
343-
TSStatus innerStatus = alterTopic(subPlan);
341+
for (final AlterTopicPlan subPlan : plan.getSubPlans()) {
342+
final TSStatus innerStatus = alterTopicInternal(subPlan);
344343
status.getSubStatus().add(innerStatus);
345344
if (innerStatus.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
346345
status.setCode(TSStatusCode.ALTER_TOPIC_ERROR.getStatusCode());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionBrokerAgent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ private int getPrefetchingQueueCountInternal() {
279279
*/
280280
private static class Cache<T> {
281281

282-
private T value;
282+
private volatile T value;
283283
private volatile boolean valid = false;
284284
private final Supplier<T> supplier;
285285

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionConsumerAgent.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,6 @@ private void handleSingleConsumerGroupMetaChangesInternal(
9696

9797
// if consumer group meta does not exist on local agent
9898
if (Objects.isNull(metaInAgent)) {
99-
consumerGroupMetaKeeper.removeConsumerGroupMeta(consumerGroupId);
10099
consumerGroupMetaKeeper.addConsumerGroupMeta(consumerGroupId, metaFromCoordinator);
101100
SubscriptionAgent.broker().createBrokerIfNotExist(consumerGroupId);
102101
return;

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/agent/SubscriptionTopicAgent.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,12 @@ public boolean isTopicExisted(final String topicName) {
149149
public String getTopicFormat(final String topicName) {
150150
acquireReadLock();
151151
try {
152-
return topicMetaKeeper
153-
.getTopicMeta(topicName)
154-
.getConfig()
155-
.getStringOrDefault(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_DEFAULT_VALUE);
152+
return topicMetaKeeper.containsTopicMeta(topicName)
153+
? topicMetaKeeper
154+
.getTopicMeta(topicName)
155+
.getConfig()
156+
.getStringOrDefault(TopicConstant.FORMAT_KEY, TopicConstant.FORMAT_DEFAULT_VALUE)
157+
: null;
156158
} finally {
157159
releaseReadLock();
158160
}
@@ -161,10 +163,12 @@ public String getTopicFormat(final String topicName) {
161163
public String getTopicMode(final String topicName) {
162164
acquireReadLock();
163165
try {
164-
return topicMetaKeeper
165-
.getTopicMeta(topicName)
166-
.getConfig()
167-
.getStringOrDefault(TopicConstant.MODE_KEY, TopicConstant.MODE_DEFAULT_VALUE);
166+
return topicMetaKeeper.containsTopicMeta(topicName)
167+
? topicMetaKeeper
168+
.getTopicMeta(topicName)
169+
.getConfig()
170+
.getStringOrDefault(TopicConstant.MODE_KEY, TopicConstant.MODE_DEFAULT_VALUE)
171+
: null;
168172
} finally {
169173
releaseReadLock();
170174
}
@@ -174,6 +178,7 @@ public Map<String, TopicConfig> getTopicConfigs(final Set<String> topicNames) {
174178
acquireReadLock();
175179
try {
176180
return topicNames.stream()
181+
.filter(topicMetaKeeper::containsTopicMeta)
177182
.collect(
178183
Collectors.toMap(
179184
topicName -> topicName,

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/subscription/meta/consumer/ConsumerGroupMeta.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.nio.ByteBuffer;
3434
import java.util.Collections;
3535
import java.util.HashSet;
36+
import java.util.Iterator;
3637
import java.util.Map;
3738
import java.util.Objects;
3839
import java.util.Optional;
@@ -120,11 +121,13 @@ public void addConsumer(final ConsumerMeta consumerMeta) {
120121

121122
public void removeConsumer(final String consumerId) {
122123
consumerIdToConsumerMeta.remove(consumerId);
123-
for (final Map.Entry<String, Set<String>> entry :
124-
topicNameToSubscribedConsumerIdSet.entrySet()) {
124+
final Iterator<Map.Entry<String, Set<String>>> iterator =
125+
topicNameToSubscribedConsumerIdSet.entrySet().iterator();
126+
while (iterator.hasNext()) {
127+
final Map.Entry<String, Set<String>> entry = iterator.next();
125128
entry.getValue().remove(consumerId);
126129
if (entry.getValue().isEmpty()) {
127-
topicNameToSubscribedConsumerIdSet.remove(entry.getKey());
130+
iterator.remove();
128131
}
129132
}
130133
}

0 commit comments

Comments
 (0)