|
2 | 2 | from rx import Observable, Observer |
3 | 3 |
|
4 | 4 | from .base import BaseSubscriptionServer |
5 | | -from .constants import GQL_CONNECTION_ACK, GQL_CONNECTION_ERROR |
| 5 | +from .constants import GQL_COMPLETE, GQL_CONNECTION_ACK, GQL_CONNECTION_ERROR |
6 | 6 |
|
7 | 7 |
|
8 | 8 | class BaseSyncSubscriptionServer(BaseSubscriptionServer): |
@@ -49,30 +49,33 @@ def on_start(self, connection_context, op_id, params): |
49 | 49 | op_id, |
50 | 50 | self.send_execution_result, |
51 | 51 | self.send_error, |
52 | | - self.on_close, |
| 52 | + self.send_message, |
53 | 53 | ) |
54 | 54 | ) |
55 | 55 | connection_context.register_operation(op_id, disposable) |
56 | 56 |
|
57 | 57 | except Exception as e: |
58 | | - self.send_error(connection_context, op_id, str(e)) |
| 58 | + self.send_error(connection_context, op_id, e) |
| 59 | + self.send_message(connection_context, op_id, GQL_COMPLETE) |
59 | 60 |
|
60 | 61 |
|
61 | 62 | class SubscriptionObserver(Observer): |
62 | 63 | def __init__( |
63 | | - self, connection_context, op_id, send_execution_result, send_error, on_close |
| 64 | + self, connection_context, op_id, send_execution_result, send_error, send_message |
64 | 65 | ): |
65 | 66 | self.connection_context = connection_context |
66 | 67 | self.op_id = op_id |
67 | 68 | self.send_execution_result = send_execution_result |
68 | 69 | self.send_error = send_error |
69 | | - self.on_close = on_close |
| 70 | + self.send_message = send_message |
70 | 71 |
|
71 | 72 | def on_next(self, value): |
72 | 73 | self.send_execution_result(self.connection_context, self.op_id, value) |
73 | 74 |
|
74 | 75 | def on_completed(self): |
75 | | - self.on_close(self.connection_context) |
| 76 | + self.send_message(self.connection_context, self.op_id, GQL_COMPLETE) |
| 77 | + self.connection_context.remove_operation(self.op_id) |
76 | 78 |
|
77 | 79 | def on_error(self, error): |
78 | 80 | self.send_error(self.connection_context, self.op_id, error) |
| 81 | + self.on_completed() |
0 commit comments