Skip to content

Commit 7f9e07b

Browse files
author
wwoods
committed
Fix for #394 #397 - non-unique values with scatter
Before this, non-unique values being emitted to scatter would result in an underlying Dask error. This fix ensures that even duplicated values will each be executed in the stream, an idea which is more intuitive and obviates the need for the "pure" specification used in standard Dask.
1 parent b82ca42 commit 7f9e07b

2 files changed

Lines changed: 24 additions & 7 deletions

File tree

streamz/dask.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from tornado import gen
66

77
from dask.compatibility import apply
8-
from dask.base import tokenize
98
from distributed.client import default_client
109

1110
from .core import Stream
@@ -106,12 +105,11 @@ def update(self, x, who=None, metadata=None):
106105
self._retain_refs(metadata)
107106
# We need to make sure that x is treated as it is by dask
108107
# However, client.scatter works internally different for
109-
# lists and dicts. So we always use a dict here to be sure
110-
# we know the format exactly. The key will be taken as the
111-
# dask identifier of the data.
112-
tokenized_x = f"{type(x).__name__}-{tokenize(x)}"
113-
future_as_dict = yield client.scatter({tokenized_x: x}, asynchronous=True)
114-
future = future_as_dict[tokenized_x]
108+
# lists and dicts. So we always use a list here to be sure
109+
# we know the format exactly. We do not use a key to avoid
110+
# issues like https://github.com/python-streamz/streams/issues/397.
111+
future_as_list = yield client.scatter([x], asynchronous=True, hash=False)
112+
future = future_as_list[0]
115113
f = yield self._emit(future, metadata=metadata)
116114
self._release_refs(metadata)
117115

streamz/tests/test_dask.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from operator import add
2+
import random
23
import time
34

45
import pytest
@@ -50,6 +51,24 @@ def add_to_dict(d):
5051
assert item["i"] == i
5152

5253

54+
@gen_cluster(client=True)
55+
def test_non_unique_emit(c, s, a, b):
56+
"""Regression for https://github.com/python-streamz/streams/issues/397
57+
58+
Non-unique stream entries still need to each be processed.
59+
"""
60+
source = Stream(asynchronous=True)
61+
futures = source.scatter().map(lambda x: random.random())
62+
L = futures.gather().sink_to_list()
63+
64+
for _ in range(3):
65+
# Emit non-unique values
66+
yield source.emit(0)
67+
68+
assert len(L) == 3
69+
assert L[0] != L[1] or L[0] != L[2]
70+
71+
5372
@gen_cluster(client=True)
5473
def test_scan(c, s, a, b):
5574
source = Stream(asynchronous=True)

0 commit comments

Comments
 (0)