Skip to content

Commit b3c46eb

Browse files
authored
Merge pull request #399 from wwoods/master
Fix for #394 #397 - non-unique values with scatter
2 parents f76fbd9 + 7f9e07b commit b3c46eb

2 files changed

Lines changed: 24 additions & 7 deletions

File tree

streamz/dask.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from tornado import gen
66

77
from dask.compatibility import apply
8-
from dask.base import tokenize
98
from distributed.client import default_client
109

1110
from .core import Stream
@@ -106,12 +105,11 @@ def update(self, x, who=None, metadata=None):
106105
self._retain_refs(metadata)
107106
# We need to make sure that x is treated as it is by dask
108107
# However, client.scatter works internally different for
109-
# lists and dicts. So we always use a dict here to be sure
110-
# we know the format exactly. The key will be taken as the
111-
# dask identifier of the data.
112-
tokenized_x = f"{type(x).__name__}-{tokenize(x)}"
113-
future_as_dict = yield client.scatter({tokenized_x: x}, asynchronous=True)
114-
future = future_as_dict[tokenized_x]
108+
# lists and dicts. So we always use a list here to be sure
109+
# we know the format exactly. We do not use a key to avoid
110+
# issues like https://github.com/python-streamz/streams/issues/397.
111+
future_as_list = yield client.scatter([x], asynchronous=True, hash=False)
112+
future = future_as_list[0]
115113
f = yield self._emit(future, metadata=metadata)
116114
self._release_refs(metadata)
117115

streamz/tests/test_dask.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from operator import add
2+
import random
23
import time
34

45
import pytest
@@ -50,6 +51,24 @@ def add_to_dict(d):
5051
assert item["i"] == i
5152

5253

54+
@gen_cluster(client=True)
55+
def test_non_unique_emit(c, s, a, b):
56+
"""Regression for https://github.com/python-streamz/streams/issues/397
57+
58+
Non-unique stream entries still need to each be processed.
59+
"""
60+
source = Stream(asynchronous=True)
61+
futures = source.scatter().map(lambda x: random.random())
62+
L = futures.gather().sink_to_list()
63+
64+
for _ in range(3):
65+
# Emit non-unique values
66+
yield source.emit(0)
67+
68+
assert len(L) == 3
69+
assert L[0] != L[1] or L[0] != L[2]
70+
71+
5372
@gen_cluster(client=True)
5473
def test_scan(c, s, a, b):
5574
source = Stream(asynchronous=True)

0 commit comments

Comments
 (0)