Skip to content

Commit 75cad35

Browse files
committed
Move operation unsubscription to BaseSubscriptionServer
1 parent 7b21f0f commit 75cad35

3 files changed

Lines changed: 11 additions & 12 deletions

File tree

graphql_ws/base.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -168,3 +168,11 @@ def on_message(self, connection_context, message):
168168
return self.send_error(connection_context, None, e)
169169

170170
return self.process_message(connection_context, parsed_message)
171+
172+
def unsubscribe(self, connection_context, op_id):
173+
if connection_context.has_operation(op_id):
174+
# Close async iterator
175+
connection_context.get_operation(op_id).dispose()
176+
# Close operation
177+
connection_context.remove_operation(op_id)
178+
self.on_operation_complete(connection_context, op_id)

graphql_ws/base_async.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,5 @@ async def on_close(self, connection_context):
111111
async def on_stop(self, connection_context, op_id):
112112
await self.unsubscribe(connection_context, op_id)
113113

114-
async def unsubscribe(self, connection_context, op_id):
115-
await super().unsubscribe(connection_context, op_id)
116-
117114
async def on_operation_complete(self, connection_context, op_id):
118115
pass

graphql_ws/base_sync.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,6 @@
88
class BaseSyncSubscriptionServer(BaseSubscriptionServer):
99
graphql_executor = SyncExecutor
1010

11-
def unsubscribe(self, connection_context, op_id):
12-
if connection_context.has_operation(op_id):
13-
# Close async iterator
14-
connection_context.get_operation(op_id).dispose()
15-
# Close operation
16-
connection_context.remove_operation(op_id)
17-
self.on_operation_complete(connection_context, op_id)
18-
1911
def on_operation_complete(self, connection_context, op_id):
2012
pass
2113

@@ -51,7 +43,7 @@ def on_start(self, connection_context, op_id, params):
5143
assert isinstance(
5244
execution_result, Observable
5345
), "A subscription must return an observable"
54-
execution_result.subscribe(
46+
disposable = execution_result.subscribe(
5547
SubscriptionObserver(
5648
connection_context,
5749
op_id,
@@ -60,6 +52,8 @@ def on_start(self, connection_context, op_id, params):
6052
self.on_close,
6153
)
6254
)
55+
connection_context.register_operation(op_id, disposable)
56+
6357
except Exception as e:
6458
self.send_error(connection_context, op_id, str(e))
6559

0 commit comments

Comments
 (0)