Skip to content

Commit 5bd3bc4

Browse files
author
Martin Durant
committed
Fixes #439
1 parent 4f877f4 commit 5bd3bc4

4 files changed

Lines changed: 124 additions & 123 deletions

File tree

streamz/core.py

Lines changed: 0 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1902,89 +1902,6 @@ def cb(self):
19021902
yield self._emit(x, self.next_metadata)
19031903

19041904

1905-
@Stream.register_api()
1906-
class to_kafka(Stream):
1907-
""" Writes data in the stream to Kafka
1908-
1909-
This stream accepts a string or bytes object. Call ``flush`` to ensure all
1910-
messages are pushed. Responses from Kafka are pushed downstream.
1911-
1912-
Parameters
1913-
----------
1914-
topic : string
1915-
The topic which to write
1916-
producer_config : dict
1917-
Settings to set up the stream, see
1918-
https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration
1919-
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
1920-
Examples:
1921-
bootstrap.servers: Connection string (host:port) to Kafka
1922-
1923-
Examples
1924-
--------
1925-
>>> from streamz import Stream
1926-
>>> ARGS = {'bootstrap.servers': 'localhost:9092'}
1927-
>>> source = Stream()
1928-
>>> kafka = source.map(lambda x: str(x)).to_kafka('test', ARGS)
1929-
<to_kafka>
1930-
>>> for i in range(10):
1931-
... source.emit(i)
1932-
>>> kafka.flush()
1933-
"""
1934-
def __init__(self, upstream, topic, producer_config, **kwargs):
1935-
import confluent_kafka as ck
1936-
1937-
self.topic = topic
1938-
self.producer = ck.Producer(producer_config)
1939-
1940-
kwargs["ensure_io_loop"] = True
1941-
Stream.__init__(self, upstream, **kwargs)
1942-
self.stopped = False
1943-
self.polltime = 0.2
1944-
self.loop.add_callback(self.poll)
1945-
self.futures = []
1946-
1947-
@gen.coroutine
1948-
def poll(self):
1949-
while not self.stopped:
1950-
# executes callbacks for any delivered data, in this thread
1951-
# if no messages were sent, nothing happens
1952-
self.producer.poll(0)
1953-
yield gen.sleep(self.polltime)
1954-
1955-
def update(self, x, who=None, metadata=None):
1956-
future = gen.Future()
1957-
self.futures.append(future)
1958-
1959-
@gen.coroutine
1960-
def _():
1961-
while True:
1962-
try:
1963-
# this runs asynchronously, in C-K's thread
1964-
self.producer.produce(self.topic, x, callback=self.cb)
1965-
return
1966-
except BufferError:
1967-
yield gen.sleep(self.polltime)
1968-
except Exception as e:
1969-
future.set_exception(e)
1970-
return
1971-
1972-
self.loop.add_callback(_)
1973-
return future
1974-
1975-
@gen.coroutine
1976-
def cb(self, err, msg):
1977-
future = self.futures.pop(0)
1978-
if msg is not None and msg.value() is not None:
1979-
future.set_result(None)
1980-
yield self._emit(msg.value())
1981-
else:
1982-
future.set_exception(err or msg.error())
1983-
1984-
def flush(self, timeout=-1):
1985-
self.producer.flush(timeout)
1986-
1987-
19881905
def sync(loop, func, *args, **kwargs):
19891906
"""
19901907
Run coroutine in loop running in separate thread.

streamz/sinks.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,3 +109,86 @@ def __init__(self, upstream, file, end="\n", mode="a", **kwargs):
109109

110110
def update(self, x, who=None, metadata=None):
111111
self._fp.write(x + self._end)
112+
113+
114+
@Stream.register_api()
115+
class to_kafka(Stream):
116+
""" Writes data in the stream to Kafka
117+
118+
This stream accepts a string or bytes object. Call ``flush`` to ensure all
119+
messages are pushed. Responses from Kafka are pushed downstream.
120+
121+
Parameters
122+
----------
123+
topic : string
124+
The topic which to write
125+
producer_config : dict
126+
Settings to set up the stream, see
127+
https://docs.confluent.io/current/clients/confluent-kafka-python/#configuration
128+
https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
129+
Examples:
130+
bootstrap.servers: Connection string (host:port) to Kafka
131+
132+
Examples
133+
--------
134+
>>> from streamz import Stream
135+
>>> ARGS = {'bootstrap.servers': 'localhost:9092'}
136+
>>> source = Stream()
137+
>>> kafka = source.map(lambda x: str(x)).to_kafka('test', ARGS)
138+
<to_kafka>
139+
>>> for i in range(10):
140+
... source.emit(i)
141+
>>> kafka.flush()
142+
"""
143+
def __init__(self, upstream, topic, producer_config, **kwargs):
144+
import confluent_kafka as ck
145+
146+
self.topic = topic
147+
self.producer = ck.Producer(producer_config)
148+
149+
kwargs["ensure_io_loop"] = True
150+
Stream.__init__(self, upstream, **kwargs)
151+
self.stopped = False
152+
self.polltime = 0.2
153+
self.loop.add_callback(self.poll)
154+
self.futures = []
155+
156+
@gen.coroutine
157+
def poll(self):
158+
while not self.stopped:
159+
# executes callbacks for any delivered data, in this thread
160+
# if no messages were sent, nothing happens
161+
self.producer.poll(0)
162+
yield gen.sleep(self.polltime)
163+
164+
def update(self, x, who=None, metadata=None):
165+
future = gen.Future()
166+
self.futures.append(future)
167+
168+
@gen.coroutine
169+
def _():
170+
while True:
171+
try:
172+
# this runs asynchronously, in C-K's thread
173+
self.producer.produce(self.topic, x, callback=self.cb)
174+
return
175+
except BufferError:
176+
yield gen.sleep(self.polltime)
177+
except Exception as e:
178+
future.set_exception(e)
179+
return
180+
181+
self.loop.add_callback(_)
182+
return future
183+
184+
@gen.coroutine
185+
def cb(self, err, msg):
186+
future = self.futures.pop(0)
187+
if msg is not None and msg.value() is not None:
188+
future.set_result(None)
189+
yield self._emit(msg.value())
190+
else:
191+
future.set_exception(err or msg.error())
192+
193+
def flush(self, timeout=-1):
194+
self.producer.flush(timeout)

streamz/tests/test_dask.py

Lines changed: 30 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import asyncio
12
from operator import add
23
import random
34
import time
@@ -16,21 +17,21 @@
1617

1718

1819
@gen_cluster(client=True)
19-
def test_map(c, s, a, b):
20+
async def test_map(c, s, a, b):
2021
source = Stream(asynchronous=True)
2122
futures = scatter(source).map(inc)
2223
futures_L = futures.sink_to_list()
2324
L = futures.gather().sink_to_list()
2425

2526
for i in range(5):
26-
yield source.emit(i)
27+
await source.emit(i)
2728

2829
assert L == [1, 2, 3, 4, 5]
2930
assert all(isinstance(f, Future) for f in futures_L)
3031

3132

3233
@gen_cluster(client=True)
33-
def test_map_on_dict(c, s, a, b):
34+
async def test_map_on_dict(c, s, a, b):
3435
# dask treats dicts differently, so we have to make sure
3536
# the user sees no difference in the streamz api.
3637
# Regression test against #336
@@ -43,7 +44,7 @@ def add_to_dict(d):
4344
L = futures.gather().sink_to_list()
4445

4546
for i in range(5):
46-
yield source.emit({"i": i})
47+
await source.emit({"i": i})
4748

4849
assert len(L) == 5
4950
for i, item in enumerate(sorted(L, key=lambda x: x["x"])):
@@ -52,7 +53,7 @@ def add_to_dict(d):
5253

5354

5455
@gen_cluster(client=True)
55-
def test_partition_then_scatter_async(c, s, a, b):
56+
async def test_partition_then_scatter_async(c, s, a, b):
5657
# Ensure partition w/ timeout before scatter works correctly for
5758
# asynchronous
5859
start = time.monotonic()
@@ -63,10 +64,10 @@ def test_partition_then_scatter_async(c, s, a, b):
6364

6465
rc = RefCounter(loop=source.loop)
6566
for i in range(3):
66-
yield source.emit(i, metadata=[{'ref': rc}])
67+
await source.emit(i, metadata=[{'ref': rc}])
6768

6869
while rc.count != 0 and time.monotonic() - start < 1.:
69-
yield gen.sleep(1e-2)
70+
await gen.sleep(1e-2)
7071

7172
assert L == [1, 2, 3]
7273

@@ -92,7 +93,7 @@ def test_partition_then_scatter_sync(loop):
9293

9394

9495
@gen_cluster(client=True)
95-
def test_non_unique_emit(c, s, a, b):
96+
async def test_non_unique_emit(c, s, a, b):
9697
"""Regression for https://github.com/python-streamz/streams/issues/397
9798
9899
Non-unique stream entries still need to each be processed.
@@ -103,28 +104,28 @@ def test_non_unique_emit(c, s, a, b):
103104

104105
for _ in range(3):
105106
# Emit non-unique values
106-
yield source.emit(0)
107+
await source.emit(0)
107108

108109
assert len(L) == 3
109110
assert L[0] != L[1] or L[0] != L[2]
110111

111112

112113
@gen_cluster(client=True)
113-
def test_scan(c, s, a, b):
114+
async def test_scan(c, s, a, b):
114115
source = Stream(asynchronous=True)
115116
futures = scatter(source).map(inc).scan(add)
116117
futures_L = futures.sink_to_list()
117118
L = futures.gather().sink_to_list()
118119

119120
for i in range(5):
120-
yield source.emit(i)
121+
await source.emit(i)
121122

122123
assert L == [1, 3, 6, 10, 15]
123124
assert all(isinstance(f, Future) for f in futures_L)
124125

125126

126127
@gen_cluster(client=True)
127-
def test_scan_state(c, s, a, b):
128+
async def test_scan_state(c, s, a, b):
128129
source = Stream(asynchronous=True)
129130

130131
def f(acc, i):
@@ -133,33 +134,33 @@ def f(acc, i):
133134

134135
L = scatter(source).scan(f, returns_state=True).gather().sink_to_list()
135136
for i in range(3):
136-
yield source.emit(i)
137+
await source.emit(i)
137138

138139
assert L == [0, 1, 3]
139140

140141

141142
@gen_cluster(client=True)
142-
def test_zip(c, s, a, b):
143+
async def test_zip(c, s, a, b):
143144
a = Stream(asynchronous=True)
144145
b = Stream(asynchronous=True)
145146
c = scatter(a).zip(scatter(b))
146147

147148
L = c.gather().sink_to_list()
148149

149-
yield a.emit(1)
150-
yield b.emit('a')
151-
yield a.emit(2)
152-
yield b.emit('b')
150+
await a.emit(1)
151+
await b.emit('a')
152+
await a.emit(2)
153+
await b.emit('b')
153154

154155
assert L == [(1, 'a'), (2, 'b')]
155156

156157

157158
@gen_cluster(client=True)
158-
def test_accumulate(c, s, a, b):
159+
async def test_accumulate(c, s, a, b):
159160
source = Stream(asynchronous=True)
160161
L = source.scatter().accumulate(lambda acc, x: acc + x, with_state=True).gather().sink_to_list()
161162
for i in range(3):
162-
yield source.emit(i)
163+
await source.emit(i)
163164
assert L[-1][1] == 3
164165

165166

@@ -169,10 +170,9 @@ def test_sync(loop): # noqa: F811
169170
source = Stream()
170171
L = source.scatter().map(inc).gather().sink_to_list()
171172

172-
@gen.coroutine
173-
def f():
173+
async def f():
174174
for i in range(10):
175-
yield source.emit(i, asynchronous=True)
175+
await source.emit(i, asynchronous=True)
176176

177177
sync(loop, f)
178178

@@ -193,24 +193,24 @@ def test_sync_2(loop): # noqa: F811
193193

194194

195195
@gen_cluster(client=True, nthreads=[('127.0.0.1', 1)] * 2)
196-
def test_buffer(c, s, a, b):
196+
async def test_buffer(c, s, a, b):
197197
source = Stream(asynchronous=True)
198198
L = source.scatter().map(slowinc, delay=0.5).buffer(5).gather().sink_to_list()
199199

200200
start = time.time()
201201
for i in range(5):
202-
yield source.emit(i)
202+
await source.emit(i)
203203
end = time.time()
204204
assert end - start < 0.5
205205

206206
for i in range(5, 10):
207-
yield source.emit(i)
207+
await source.emit(i)
208208

209209
end2 = time.time()
210210
assert end2 - start > (0.5 / 3)
211211

212212
while len(L) < 10:
213-
yield gen.sleep(0.01)
213+
await gen.sleep(0.01)
214214
assert time.time() - start < 5
215215

216216
assert L == list(map(inc, range(10)))
@@ -242,7 +242,7 @@ def test_buffer_sync(loop): # noqa: F811
242242

243243

244244
@pytest.mark.xfail(reason='')
245-
def test_stream_shares_client_loop(loop): # noqa: F811
245+
async def test_stream_shares_client_loop(loop): # noqa: F811
246246
with cluster() as (s, [a, b]):
247247
with Client(s['address'], loop=loop) as client: # noqa: F841
248248
source = Stream()
@@ -251,14 +251,14 @@ def test_stream_shares_client_loop(loop): # noqa: F811
251251

252252

253253
@gen_cluster(client=True)
254-
def test_starmap(c, s, a, b):
254+
async def test_starmap(c, s, a, b):
255255
def add(x, y, z=0):
256256
return x + y + z
257257

258258
source = Stream(asynchronous=True)
259259
L = source.scatter().starmap(add, z=10).gather().sink_to_list()
260260

261261
for i in range(5):
262-
yield source.emit((i, i))
262+
await source.emit((i, i))
263263

264264
assert L == [10, 12, 14, 16, 18]

0 commit comments

Comments
 (0)