Skip to content

Commit 2eba95a

Browse files
committed
Ensure flatten.update([]) does not kill the stream
Trap the StopIteration that erupts from update when passed an empty iterable so that the stream can continue.
1 parent e4d708c commit 2eba95a

File tree

3 files changed

+16
-3
lines changed

3 files changed

+16
-3
lines changed

examples/scrape.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import toolz
66
from bs4 import BeautifulSoup
77

8-
from streamz import Stream, identity
8+
from streamz import Stream
99

1010

1111
def links_of_page(content_page):
@@ -38,7 +38,6 @@ def topk_dict(d, k=10):
3838
.map(lambda x: x.content))
3939
links = (content.zip(pages)
4040
.map(links_of_page)
41-
.filter(identity)
4241
.flatten())
4342
links.connect(source)
4443

streamz/core.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1637,7 +1637,10 @@ class flatten(Stream):
16371637
def update(self, x, who=None, metadata=None):
16381638
L = []
16391639
items = chain(x)
1640-
item = next(items)
1640+
try:
1641+
item = next(items)
1642+
except StopIteration:
1643+
return L
16411644
for item_next in items:
16421645
y = self._emit(item)
16431646
item = item_next

streamz/tests/test_core.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,17 @@ def test_flatten(iterators):
814814
assert L == [1, 2, 3, 4, 5, 6, 7, 8]
815815

816816

817+
def test_flatten_empty():
818+
source = Stream()
819+
L = source.flatten().sink_to_list()
820+
821+
source.emit([1, 2])
822+
source.emit([])
823+
source.emit([3, 4])
824+
825+
assert L == [1, 2, 3, 4]
826+
827+
817828
def test_unique():
818829
source = Stream()
819830
L = source.unique().sink_to_list()

0 commit comments

Comments
 (0)