Skip to content

Commit 7e73a1b

Browse files
author
Martin Durant
committed
Merge branch 'master' into master2streamz
2 parents 17c06a7 + b3c46eb commit 7e73a1b

2 files changed

Lines changed: 23 additions & 7 deletions

File tree

streamz/dask.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from tornado import gen
66

77
from dask.compatibility import apply
8-
from dask.base import tokenize
98
from distributed.client import default_client
109

1110
from .core import Stream
@@ -102,12 +101,11 @@ def update(self, x, who=None, metadata=None):
102101
self._retain_refs(metadata)
103102
# We need to make sure that x is treated as it is by dask
104103
# However, client.scatter works internally different for
105-
# lists and dicts. So we always use a dict here to be sure
106-
# we know the format exactly. The key will be taken as the
107-
# dask identifier of the data.
108-
tokenized_x = f"{type(x).__name__}-{tokenize(x)}"
109-
future_as_dict = yield client.scatter({tokenized_x: x}, asynchronous=True)
110-
future = future_as_dict[tokenized_x]
104+
# lists and dicts. So we always use a list here to be sure
105+
# we know the format exactly. We do not use a key to avoid
106+
# issues like https://github.com/python-streamz/streams/issues/397.
107+
future_as_list = yield client.scatter([x], asynchronous=True, hash=False)
108+
future = future_as_list[0]
111109
f = yield self._emit(future, metadata=metadata)
112110
self._release_refs(metadata)
113111

streamz/tests/test_dask.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from operator import add
2+
import random
23
import time
34

45
import pytest
@@ -91,6 +92,23 @@ def test_partition_then_scatter_sync(loop):
9192
assert L == [1, 2, 3]
9293

9394

95+
def test_non_unique_emit(c, s, a, b):
96+
"""Regression for https://github.com/python-streamz/streams/issues/397
97+
98+
Non-unique stream entries still need to each be processed.
99+
"""
100+
source = Stream(asynchronous=True)
101+
futures = source.scatter().map(lambda x: random.random())
102+
L = futures.gather().sink_to_list()
103+
104+
for _ in range(3):
105+
# Emit non-unique values
106+
yield source.emit(0)
107+
108+
assert len(L) == 3
109+
assert L[0] != L[1] or L[0] != L[2]
110+
111+
94112
@gen_cluster(client=True)
95113
def test_scan(c, s, a, b):
96114
source = Stream(asynchronous=True)

0 commit comments

Comments
 (0)