-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathstreams.py
More file actions
136 lines (105 loc) · 4.03 KB
/
streams.py
File metadata and controls
136 lines (105 loc) · 4.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# Copyright 2017-2020 Palantir Technologies, Inc.
# Copyright 2021- Python Language Server Contributors.
import logging
import threading
import asyncio
try:
import ujson as json
except Exception: # pylint: disable=broad-except
import json
log = logging.getLogger(__name__)
class JsonRpcStreamReader:
def __init__(self, rfile):
self._rfile = rfile
def close(self):
self._rfile.close()
def listen(self, message_consumer):
"""Blocking call to listen for messages on the rfile.
Args:
message_consumer (fn): function that is passed each message as it is read off the socket.
"""
while not self._rfile.closed:
try:
request_str = self._read_message()
except ValueError:
if self._rfile.closed:
return
log.exception("Failed to read from rfile")
if request_str is None:
break
try:
message_consumer(json.loads(request_str.decode('utf-8')))
except ValueError:
log.exception("Failed to parse JSON message %s", request_str)
continue
def _read_message(self):
"""Reads the contents of a message.
Returns:
body of message if parsable else None
"""
line = self._rfile.readline()
if not line:
return None
content_length = self._content_length(line)
# Blindly consume all header lines
while line and line.strip():
line = self._rfile.readline()
if not line:
return None
# Grab the body
return self._rfile.read(content_length)
async def listen_async(self, message_consumer):
"""Blocking call to listen for messages on the rfile.
Args:
message_consumer (fn): function that is passed each message as it is read off the socket.
"""
while not self._rfile.closed:
try:
request_str = await asyncio.to_thread(self._read_message)
except ValueError:
if self._rfile.closed:
return
log.exception("Failed to read from rfile")
if request_str is None:
break
try:
await message_consumer(json.loads(request_str.decode('utf-8')))
except ValueError:
log.exception("Failed to parse JSON message %s", request_str)
continue
@staticmethod
def _content_length(line):
"""Extract the content length from an input line."""
if line.startswith(b'Content-Length: '):
_, value = line.split(b'Content-Length: ')
value = value.strip()
try:
return int(value)
except ValueError as e:
raise ValueError(f"Invalid Content-Length header: {value}") from e
return None
class JsonRpcStreamWriter:
def __init__(self, wfile, **json_dumps_args):
self._wfile = wfile
self._wfile_lock = threading.Lock()
self._json_dumps_args = json_dumps_args
def close(self):
with self._wfile_lock:
self._wfile.close()
def write(self, message):
with self._wfile_lock:
if self._wfile.closed:
return
try:
body = json.dumps(message, **self._json_dumps_args)
# Ensure we get the byte length, not the character length
content_length = len(body) if isinstance(body, bytes) else len(body.encode('utf-8'))
response = (
f"Content-Length: {content_length}\r\n"
f"Content-Type: application/vscode-jsonrpc; charset=utf8\r\n\r\n"
f"{body}"
)
self._wfile.write(response.encode('utf-8'))
self._wfile.flush()
except Exception: # pylint: disable=broad-except
log.exception("Failed to write message to output file %s", message)