Skip to content

Commit 09969c5

Browse files
committed
Rework map_async to handle failures better
`map` stops the flow of items in the stream when the function raises but `map_async` is outside of the direct line of return so it fails weirdly during an exception. To address that, I added the idea about stopping the stream or not. This way, if the stream does not deliberately invoke `stop` during an exception, the stream continues to process inputs after an exception. Since the `map_async` now conceives of stopping or not, I added a boolean in the node state to control the loop inside the worker task. In the case of an exception during mapping, `map_async` will now release the references held on the metadata for the offending input. I added an example that shows off the failure modes of `map` and `map_async` that plainly demonstrates that exceptions can leave the stream in a weird state.
1 parent 3e0beab commit 09969c5

File tree

2 files changed

+61
-6
lines changed

2 files changed

+61
-6
lines changed

examples/map_failure_modes.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
import asyncio
2+
from itertools import count
3+
from streamz import Stream
4+
5+
6+
async def flaky_async(x, from_where):
7+
return flaky_sync(x, from_where)
8+
9+
10+
def flaky_sync(x, from_where):
11+
if x % 5 == 4:
12+
raise ValueError(f"I flaked out on {from_where}")
13+
return x
14+
15+
16+
def make_counter(name):
17+
return Stream.from_iterable(count(), asynchronous=True, stream_name=name)
18+
19+
20+
async def main():
21+
async_non_stop_source = make_counter("async not stopping")
22+
s_async = async_non_stop_source.map_async(flaky_async, async_non_stop_source)
23+
s_async.rate_limit("500ms").sink(print, async_non_stop_source.name)
24+
25+
sync_source = make_counter("sync")
26+
s_sync = sync_source.map(flaky_sync, sync_source)
27+
s_sync.rate_limit("500ms").sink(print, sync_source.name)
28+
29+
async_stopping_source = make_counter("async stopping")
30+
s_async = async_stopping_source.map_async(flaky_async, async_stopping_source, stop_on_exception=True)
31+
s_async.rate_limit("500ms").sink(print, async_stopping_source.name)
32+
33+
async_non_stop_source.start()
34+
sync_source.start()
35+
async_stopping_source.start()
36+
print(f"{async_non_stop_source.started=}, {sync_source.started=}, {async_stopping_source.started=}")
37+
await asyncio.sleep(3)
38+
print(f"{async_non_stop_source.stopped=}, {sync_source.stopped=}, {async_stopping_source.stopped=}")
39+
40+
41+
if __name__ == "__main__":
42+
try:
43+
asyncio.run(main())
44+
except KeyboardInterrupt:
45+
pass

streamz/core.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,8 @@ class map_async(Stream):
730730
The arguments to pass to the function.
731731
parallelism:
732732
The maximum number of parallel Tasks for evaluating func, default value is 1
733+
stop_on_exception:
734+
If the mapped func raises an exception, should the stream stop or not. Default value is False.
733735
**kwargs:
734736
Keyword arguments to pass to func
735737
@@ -749,16 +751,23 @@ class map_async(Stream):
749751
6
750752
8
751753
"""
752-
def __init__(self, upstream, func, *args, parallelism=1, **kwargs):
754+
def __init__(self, upstream, func, *args, parallelism=1, stop_on_exception=False, **kwargs):
753755
self.func = func
754756
stream_name = kwargs.pop('stream_name', None)
755757
self.kwargs = kwargs
756758
self.args = args
759+
self.running = True
760+
self.stop_on_exception = stop_on_exception
757761
self.work_queue = asyncio.Queue(maxsize=parallelism)
758762

759763
Stream.__init__(self, upstream, stream_name=stream_name, ensure_io_loop=True)
760764
self.work_task = self._create_task(self.work_callback())
761765

766+
def stop(self):
767+
if self.running:
768+
self.running = False
769+
super().stop()
770+
762771
def update(self, x, who=None, metadata=None):
763772
return self._create_task(self._insert_job(x, metadata))
764773

@@ -768,19 +777,20 @@ def _create_task(self, coro):
768777
return self.loop.asyncio_loop.create_task(coro)
769778

770779
async def work_callback(self):
771-
while True:
780+
while self.running:
781+
task, metadata = await self.work_queue.get()
782+
self.work_queue.task_done()
772783
try:
773-
task, metadata = await self.work_queue.get()
774-
self.work_queue.task_done()
775784
result = await task
776785
except Exception as e:
777786
logger.exception(e)
778-
raise
787+
if self.stop_on_exception:
788+
self.stop()
779789
else:
780790
results = self._emit(result, metadata=metadata)
781791
if results:
782792
await asyncio.gather(*results)
783-
self._release_refs(metadata)
793+
self._release_refs(metadata)
784794

785795
async def _wait_for_work_slot(self):
786796
while self.work_queue.full():

0 commit comments

Comments
 (0)