Skip to content

Commit 02c5e25

Browse files
committed
Create new map_async api
I tried map with a coroutine and it failed spectacularly: ``` @gen_test() def test_map_async_tornado(): @gen.coroutine def add_tor(x=0, y=0): return x + y source = Stream(asynchronous=True) L = source.map(add_tor, y=1).map(add_tor, y=2).sink_to_list() yield source.emit(0) yield gen.moment # yield to the event loop to ensure it finished > assert L == [3] E assert [<Future finished exception=TypeError("unsupported operand type(s) for +: '_asyncio.Future' and 'int'")>] == [3] E E At index 0 diff: <Future finished exception=TypeError("unsupported operand type(s) for +: '_asyncio.Future' and 'int'")> != 3 E E Full diff: E [ E - 3, E + <Future finished exception=TypeError("unsupported operand type(s) for +: '_asyncio.Future' and 'int'")>, E ] ``` So I made a new `map_async` that uses native asyncio plumbing to await the coroutine before feeding it downstream.
1 parent 8c73290 commit 02c5e25

2 files changed

Lines changed: 91 additions & 0 deletions

File tree

streamz/core.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -718,6 +718,61 @@ def update(self, x, who=None, metadata=None):
718718
return self._emit(result, metadata=metadata)
719719

720720

721+
@Stream.register_api()
722+
class map_async(Stream):
723+
""" Apply an async function to every element in the stream
724+
725+
Parameters
726+
----------
727+
func: async callable
728+
*args :
729+
The arguments to pass to the function.
730+
**kwargs:
731+
Keyword arguments to pass to func
732+
733+
Examples
734+
--------
735+
>>> async def mult(x, factor=1):
736+
... return factor*x
737+
>>> async def run():
738+
... source = Stream(asynchronous=True)
739+
... source.map_async(mult, factor=2).sink(print)
740+
... for i in range(5):
741+
... await source.emit(i)
742+
>>> asyncio.run(run())
743+
0
744+
2
745+
4
746+
6
747+
8
748+
749+
"""
750+
def __init__(self, upstream, func, *args, **kwargs):
751+
self.func = func
752+
stream_name = kwargs.pop('stream_name', None)
753+
self.kwargs = kwargs
754+
self.args = args
755+
self.work_queue = asyncio.Queue()
756+
757+
Stream.__init__(self, upstream, stream_name=stream_name, ensure_io_loop=True)
758+
self.cb_task = self.loop.asyncio_loop.create_task(self.cb())
759+
760+
def update(self, x, who=None, metadata=None):
761+
coro = self.func(x, *self.args, **self.kwargs)
762+
return self.work_queue.put_nowait((coro, metadata))
763+
764+
async def cb(self):
765+
while True:
766+
coro, metadata = await self.work_queue.get()
767+
try:
768+
result = await coro
769+
except Exception as e:
770+
logger.exception(e)
771+
raise
772+
else:
773+
return self._emit(result, metadata=metadata)
774+
775+
721776
@Stream.register_api()
722777
class starmap(Stream):
723778
""" Apply a function to every element in the stream, splayed out

streamz/tests/test_core.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,42 @@ def add(x=0, y=0):
126126
assert L[0] == 11
127127

128128

129+
@gen_test()
130+
def test_map_async_tornado():
131+
@gen.coroutine
132+
def add_tor(x=0, y=0):
133+
return x + y
134+
135+
async def add_native(x=0, y=0):
136+
return x + y
137+
138+
source = Stream(asynchronous=True)
139+
L = source.map_async(add_tor, y=1).map_async(add_native, y=2).sink_to_list()
140+
141+
yield source.emit(0)
142+
143+
yield gen.moment # Must yield to the event loop to ensure it finished
144+
assert L == [3]
145+
146+
147+
@pytest.mark.asyncio
148+
async def test_map_async():
149+
@gen.coroutine
150+
def add_tor(x=0, y=0):
151+
return x + y
152+
153+
async def add_native(x=0, y=0):
154+
return x + y
155+
156+
source = Stream(asynchronous=True)
157+
L = source.map_async(add_tor, y=1).map_async(add_native, y=2).sink_to_list()
158+
159+
await source.emit(0)
160+
161+
await asyncio.sleep(0) # Must yield to the event loop to ensure it finished
162+
assert L == [3]
163+
164+
129165
def test_map_args():
130166
source = Stream()
131167
L = source.map(operator.add, 10).sink_to_list()

0 commit comments

Comments
 (0)