|
1 | 1 | from asyncio import ensure_future |
2 | 2 | from graphql.execution.executors.asyncio import AsyncioExecutor |
| 3 | +from graphql.execution.executors.gevent import GeventExecutor |
3 | 4 | from websockets.protocol import CONNECTING, OPEN |
4 | 5 | from inspect import isawaitable, isasyncgen |
5 | 6 | from graphql import graphql, format_error |
| 7 | +from graphql.execution import ExecutionResult |
6 | 8 | from collections import OrderedDict |
7 | 9 | import json |
8 | | - |
9 | | - |
| 10 | +import gevent |
| 11 | +from rx.core.anonymousobservable import AnonymousObservable |
10 | 12 | GRAPHQL_WS = 'graphql-ws' |
11 | 13 | WS_PROTOCOL = GRAPHQL_WS |
12 | 14 |
|
@@ -48,6 +50,24 @@ def get_operation(self, op_id): |
48 | 50 | def remove_operation(self, op_id): |
49 | 51 | del self.operations[op_id] |
50 | 52 |
|
| 53 | +class GEventConnectionContext(ConnectionContext): |
| 54 | + |
| 55 | + def receive(self): |
| 56 | + msg = self.ws.receive() |
| 57 | + return msg |
| 58 | + |
| 59 | + def send(self, data): |
| 60 | + if self.closed: |
| 61 | + return |
| 62 | + self.ws.send(data) |
| 63 | + |
| 64 | + @property |
| 65 | + def closed(self): |
| 66 | + return self.ws.closed |
| 67 | + |
| 68 | + def close(self, code): |
| 69 | + self.ws.close(code) |
| 70 | + |
51 | 71 |
|
52 | 72 | class AioHTTPConnectionContext(ConnectionContext): |
53 | 73 | async def receive(self): |
@@ -185,6 +205,76 @@ def unsubscribe(self, connection_context, op_id): |
185 | 205 | def on_operation_complete(self, connection_context, op_id): |
186 | 206 | pass |
187 | 207 |
|
| 208 | +class GeventSubscriptionServer(BaseWebSocketSubscriptionServer): |
| 209 | + |
| 210 | + def get_graphql_params(self, *args, **kwargs): |
| 211 | + params = super(GeventSubscriptionServer, self).get_graphql_params(*args, **kwargs) |
| 212 | + return dict(params, executor=GeventExecutor()) |
| 213 | + |
| 214 | + def handle(self, ws): |
| 215 | + connection_context = GEventConnectionContext(ws) |
| 216 | + self.on_open(connection_context) |
| 217 | + while True: |
| 218 | + try: |
| 219 | + if connection_context.closed: |
| 220 | + raise ConnectionClosedException() |
| 221 | + message = connection_context.receive() |
| 222 | + except ConnectionClosedException: |
| 223 | + self.on_close(connection_context) |
| 224 | + return |
| 225 | + self.on_message(connection_context, message) |
| 226 | + |
| 227 | + def on_message(self, connection_context, message): |
| 228 | + try: |
| 229 | + parsed_message = json.loads(message) |
| 230 | + assert isinstance( |
| 231 | + parsed_message, dict), "Payload must be an object." |
| 232 | + except Exception as e: |
| 233 | + self.send_error(connection_context, None, e) |
| 234 | + return |
| 235 | + |
| 236 | + self.process_message(connection_context, parsed_message) |
| 237 | + |
| 238 | + def on_open(self, connection_context): |
| 239 | + pass |
| 240 | + |
| 241 | + def on_connect(self, connection_context, payload): |
| 242 | + pass |
| 243 | + |
| 244 | + def on_close(self, connection_context): |
| 245 | + remove_operations = list(connection_context.operations.keys()) |
| 246 | + for op_id in remove_operations: |
| 247 | + self.unsubscribe(connection_context, op_id) |
| 248 | + |
| 249 | + def on_connection_init(self, connection_context, op_id, payload): |
| 250 | + try: |
| 251 | + self.on_connect(connection_context, payload) |
| 252 | + self.send_message(connection_context, op_type=GQL_CONNECTION_ACK) |
| 253 | + |
| 254 | + except Exception as e: |
| 255 | + self.send_error(connection_context, op_id, e, GQL_CONNECTION_ERROR) |
| 256 | + connection_context.close(1011) |
| 257 | + |
| 258 | + def on_connection_terminate(self, connection_context, op_id): |
| 259 | + connection_context.close(1011) |
| 260 | + |
| 261 | + |
| 262 | + def on_start(self, connection_context, op_id, params): |
| 263 | + try: |
| 264 | + execution_result = graphql( |
| 265 | + self.schema, **params, allow_subscriptions=True |
| 266 | + ) |
| 267 | + def process_result(value): |
| 268 | + self.send_execution_result(connection_context, op_id, value) |
| 269 | + execution_result.subscribe(on_next=process_result, |
| 270 | + on_completed=lambda: print("Done!"), |
| 271 | + on_error=lambda error: print("Error Occurred: {0}".format(error)) |
| 272 | + ) |
| 273 | + except Exception as e: |
| 274 | + self.send_error(connection_context, op_id, str(e)) |
| 275 | + |
| 276 | + def on_stop(self, connection_context, op_id): |
| 277 | + self.unsubscribe(connection_context, op_id) |
188 | 278 |
|
189 | 279 | class WebSocketSubscriptionServer(BaseWebSocketSubscriptionServer): |
190 | 280 |
|
|
0 commit comments