From 9acfeffdab02026c8e2f7ecf51bdf274b8cb098c Mon Sep 17 00:00:00 2001 From: Rishabh Dewangan <107680241+Rishabh-git10@users.noreply.github.com> Date: Tue, 9 Jun 2026 20:09:57 +0000 Subject: [PATCH 1/2] fix(grpc): allow multiple wildcard subscriptions when topic validation disabled Signed-off-by: Rishabh Dewangan <107680241+Rishabh-git10@users.noreply.github.com> --- ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py | 58 ++++++++++++++------ ext/dapr-ext-grpc/tests/test_servicier.py | 32 +++++++++++ 2 files changed, 73 insertions(+), 17 deletions(-) diff --git a/ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py b/ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py index a729fab8e..63bb8b649 100644 --- a/ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py +++ b/ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py @@ -81,6 +81,17 @@ def __init__(self): self._registered_topics: List[appcallback_v1.TopicSubscription] = [] self._registered_bindings: List[str] = [] + self._wildcard_topics: List[Tuple[str, str, str, bool, TopicSubscribeCallable]] = [] + + def _match_topic(self, pattern: str, topic: str) -> bool: + import re + + re_pattern = re.escape(pattern) + re_pattern = re_pattern.replace(r'\+', r'[^/]*') + re_pattern = re_pattern.replace(r'\*', r'[^/.]+') + re_pattern = re_pattern.replace(r'\#', r'.*') + return bool(re.match(f'^{re_pattern}$', topic)) + def register_method(self, method: str, cb: InvokeMethodCallable) -> None: """Registers method for service invocation.""" if method in self._invoke_method_map: @@ -98,17 +109,16 @@ def register_topic( disable_topic_validation: Optional[bool] = False, ) -> None: """Registers topic subscription for pubsub.""" - if not disable_topic_validation: - topic_key = pubsub_name + DELIMITER + topic - else: - topic_key = pubsub_name + topic_key = pubsub_name + DELIMITER + topic pubsub_topic = topic_key + DELIMITER + path = '' if rule is not None: path = getattr(cb, '__name__', rule.match) pubsub_topic = pubsub_topic + path if pubsub_topic in self._topic_map: raise ValueError(f'{topic} is already registered with {pubsub_name}') self._topic_map[pubsub_topic] = cb + self._wildcard_topics.append((pubsub_name, topic, path, disable_topic_validation, cb)) registered_topic = self._registered_topics_map.get(topic_key) sub: appcallback_v1.TopicSubscription = appcallback_v1.TopicSubscription() @@ -197,14 +207,21 @@ def ListTopicSubscriptions(self, request, context): def OnTopicEvent(self, request: TopicEventRequest, context): """Subscribes events from Pubsub.""" pubsub_topic = request.pubsub_name + DELIMITER + request.topic + DELIMITER + request.path - no_validation_key = request.pubsub_name + DELIMITER + request.path - if pubsub_topic not in self._topic_map: - if no_validation_key in self._topic_map: - pubsub_topic = no_validation_key - else: + if pubsub_topic in self._topic_map: + cb = self._topic_map[pubsub_topic] + else: + cb = None + for p_name, t_pattern, path, d_val, w_cb in self._wildcard_topics: + if p_name == request.pubsub_name and request.path == path: + if self._match_topic(t_pattern, request.topic) or ( + d_val and not any(c in t_pattern for c in ['+', '#', '*']) + ): + cb = w_cb + break + if cb is None: context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore - raise NotImplementedError(f'topic {request.topic} is not implemented!') + raise NotImplementedError(f'topic {request.topic} topic not implemented!') customdata: Struct = request.extensions extensions = dict() @@ -222,7 +239,7 @@ def OnTopicEvent(self, request: TopicEventRequest, context): event.SetSubject(request.topic) event.SetExtensions(extensions) - response = self._topic_map[pubsub_topic](event) + response = cb(event) if isinstance(response, TopicEventResponse): return appcallback_v1.TopicEventResponse(status=response.status.value) return empty_pb2.Empty() @@ -293,13 +310,20 @@ def _handle_bulk_topic_event( ) -> Optional[TopicEventBulkResponse]: """Process bulk topic event request - routes each entry to the appropriate topic handler.""" topic_key = request.pubsub_name + DELIMITER + request.topic + DELIMITER + request.path - no_validation_key = request.pubsub_name + DELIMITER + request.path - if topic_key not in self._topic_map and no_validation_key not in self._topic_map: - return None # we don't have a handler - - handler_key = topic_key if topic_key in self._topic_map else no_validation_key - cb = self._topic_map[handler_key] # callback + if topic_key in self._topic_map: + cb = self._topic_map[topic_key] # callback + else: + cb = None + for p_name, t_pattern, path, d_val, w_cb in self._wildcard_topics: + if p_name == request.pubsub_name and request.path == path: + if self._match_topic(t_pattern, request.topic) or ( + d_val and not any(c in t_pattern for c in ['+', '#', '*']) + ): + cb = w_cb + break + if cb is None: + return None # we don't have a handler statuses = [] for entry in request.entries: diff --git a/ext/dapr-ext-grpc/tests/test_servicier.py b/ext/dapr-ext-grpc/tests/test_servicier.py index 1fe18c5dc..e322abc90 100644 --- a/ext/dapr-ext-grpc/tests/test_servicier.py +++ b/ext/dapr-ext-grpc/tests/test_servicier.py @@ -182,6 +182,38 @@ def test_non_registered_topic(self): self.fake_context, ) + def test_multiple_wildcard_subscriptions(self): + self._servicer.register_topic( + 'pubsub_multi_wildcard', + 'orders/+/items', + self._topic1_method, + None, + disable_topic_validation=True, + ) + self._servicer.register_topic( + 'pubsub_multi_wildcard', + 'inventory/#', + self._topic2_method, + None, + disable_topic_validation=True, + ) + + self._servicer.OnTopicEvent( + appcallback_v1.TopicEventRequest( + pubsub_name='pubsub_multi_wildcard', topic='orders/123/items' + ), + self.fake_context, + ) + self._topic1_method.assert_called_once() + + self._servicer.OnTopicEvent( + appcallback_v1.TopicEventRequest( + pubsub_name='pubsub_multi_wildcard', topic='inventory/warehouse/aisle4' + ), + self.fake_context, + ) + self._topic2_method.assert_called_once() + class BulkTopicEventTests(unittest.TestCase): def setUp(self): From 223946d6b7aebb7e2530ebfe2ab837868ed3e7ac Mon Sep 17 00:00:00 2001 From: Rishabh Dewangan <107680241+Rishabh-git10@users.noreply.github.com> Date: Wed, 10 Jun 2026 19:06:34 +0000 Subject: [PATCH 2/2] refactor(grpc): address review feedback by optimizing routing logic Signed-off-by: Rishabh Dewangan <107680241+Rishabh-git10@users.noreply.github.com> --- ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py | 68 ++++++++------------ ext/dapr-ext-grpc/tests/test_servicier.py | 6 +- 2 files changed, 31 insertions(+), 43 deletions(-) diff --git a/ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py b/ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py index 63bb8b649..9694e1c53 100644 --- a/ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py +++ b/ext/dapr-ext-grpc/dapr/ext/grpc/_servicer.py @@ -81,16 +81,24 @@ def __init__(self): self._registered_topics: List[appcallback_v1.TopicSubscription] = [] self._registered_bindings: List[str] = [] - self._wildcard_topics: List[Tuple[str, str, str, bool, TopicSubscribeCallable]] = [] + self._route_map: Dict[Tuple[str, str], TopicSubscribeCallable] = {} + self._validation_disabled_pubsubs: Dict[str, TopicSubscribeCallable] = {} - def _match_topic(self, pattern: str, topic: str) -> bool: - import re + def _get_topic_callback( + self, pubsub_name: str, topic: str, path: str + ) -> Optional[TopicSubscribeCallable]: + pubsub_topic = pubsub_name + DELIMITER + topic + DELIMITER + path + if pubsub_topic in self._topic_map: + return self._topic_map[pubsub_topic] + + if (pubsub_name, path) in self._route_map: + return self._route_map[(pubsub_name, path)] + + if path == '': + if pubsub_name in self._validation_disabled_pubsubs: + return self._validation_disabled_pubsubs[pubsub_name] - re_pattern = re.escape(pattern) - re_pattern = re_pattern.replace(r'\+', r'[^/]*') - re_pattern = re_pattern.replace(r'\*', r'[^/.]+') - re_pattern = re_pattern.replace(r'\#', r'.*') - return bool(re.match(f'^{re_pattern}$', topic)) + return None def register_method(self, method: str, cb: InvokeMethodCallable) -> None: """Registers method for service invocation.""" @@ -111,14 +119,16 @@ def register_topic( """Registers topic subscription for pubsub.""" topic_key = pubsub_name + DELIMITER + topic pubsub_topic = topic_key + DELIMITER - path = '' if rule is not None: path = getattr(cb, '__name__', rule.match) pubsub_topic = pubsub_topic + path if pubsub_topic in self._topic_map: raise ValueError(f'{topic} is already registered with {pubsub_name}') self._topic_map[pubsub_topic] = cb - self._wildcard_topics.append((pubsub_name, topic, path, disable_topic_validation, cb)) + self._route_map[(pubsub_name, topic)] = cb + + if disable_topic_validation: + self._validation_disabled_pubsubs[pubsub_name] = cb registered_topic = self._registered_topics_map.get(topic_key) sub: appcallback_v1.TopicSubscription = appcallback_v1.TopicSubscription() @@ -206,22 +216,10 @@ def ListTopicSubscriptions(self, request, context): def OnTopicEvent(self, request: TopicEventRequest, context): """Subscribes events from Pubsub.""" - pubsub_topic = request.pubsub_name + DELIMITER + request.topic + DELIMITER + request.path - - if pubsub_topic in self._topic_map: - cb = self._topic_map[pubsub_topic] - else: - cb = None - for p_name, t_pattern, path, d_val, w_cb in self._wildcard_topics: - if p_name == request.pubsub_name and request.path == path: - if self._match_topic(t_pattern, request.topic) or ( - d_val and not any(c in t_pattern for c in ['+', '#', '*']) - ): - cb = w_cb - break - if cb is None: - context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore - raise NotImplementedError(f'topic {request.topic} topic not implemented!') + cb = self._get_topic_callback(request.pubsub_name, request.topic, request.path) + if cb is None: + context.set_code(grpc.StatusCode.UNIMPLEMENTED) # type: ignore + raise NotImplementedError(f'topic {request.topic} is not implemented!') customdata: Struct = request.extensions extensions = dict() @@ -309,21 +307,9 @@ def _handle_bulk_topic_event( self, request: TopicEventBulkRequest, context ) -> Optional[TopicEventBulkResponse]: """Process bulk topic event request - routes each entry to the appropriate topic handler.""" - topic_key = request.pubsub_name + DELIMITER + request.topic + DELIMITER + request.path - - if topic_key in self._topic_map: - cb = self._topic_map[topic_key] # callback - else: - cb = None - for p_name, t_pattern, path, d_val, w_cb in self._wildcard_topics: - if p_name == request.pubsub_name and request.path == path: - if self._match_topic(t_pattern, request.topic) or ( - d_val and not any(c in t_pattern for c in ['+', '#', '*']) - ): - cb = w_cb - break - if cb is None: - return None # we don't have a handler + cb = self._get_topic_callback(request.pubsub_name, request.topic, request.path) + if cb is None: + return None # we don't have a handler statuses = [] for entry in request.entries: diff --git a/ext/dapr-ext-grpc/tests/test_servicier.py b/ext/dapr-ext-grpc/tests/test_servicier.py index e322abc90..ff6910ee0 100644 --- a/ext/dapr-ext-grpc/tests/test_servicier.py +++ b/ext/dapr-ext-grpc/tests/test_servicier.py @@ -200,7 +200,7 @@ def test_multiple_wildcard_subscriptions(self): self._servicer.OnTopicEvent( appcallback_v1.TopicEventRequest( - pubsub_name='pubsub_multi_wildcard', topic='orders/123/items' + pubsub_name='pubsub_multi_wildcard', topic='orders/123/items', path='orders/+/items' ), self.fake_context, ) @@ -208,7 +208,9 @@ def test_multiple_wildcard_subscriptions(self): self._servicer.OnTopicEvent( appcallback_v1.TopicEventRequest( - pubsub_name='pubsub_multi_wildcard', topic='inventory/warehouse/aisle4' + pubsub_name='pubsub_multi_wildcard', + topic='inventory/warehouse/aisle4', + path='inventory/#', ), self.fake_context, )