Skip to content

Commit 17c06a7

Browse files
author
Martin Durant
committed
Merge branch 'master' into master2streamz
2 parents f060d9f + f76fbd9 commit 17c06a7

9 files changed

Lines changed: 266 additions & 222 deletions

File tree

.coveragerc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ omit =
1313
exclude_lines =
1414
if __name__ == '__main__':
1515
pragma: no cover
16+
NotImplementedError

setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
# https://flake8.readthedocs.io/en/latest/user/error-codes.html
55

66
# Note: there cannot be spaces after comma's here
7-
exclude = __init__.py, compatibility.py
7+
exclude = __init__.py,tests
88
ignore =
99
# Extra space in brackets
1010
E20,

streamz/compatibility.py

Lines changed: 0 additions & 8 deletions
This file was deleted.

streamz/core.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828

2929
from collections.abc import Iterable
3030

31-
from .compatibility import get_thread_identity
31+
from threading import get_ident as get_thread_identity
3232
from .orderedweakset import OrderedWeakrefSet
3333

3434
no_default = '--no-default--'
@@ -526,8 +526,10 @@ def disconnect(self, downstream):
526526

527527
@property
528528
def upstream(self):
529-
if len(self.upstreams) != 1:
529+
if len(self.upstreams) > 1:
530530
raise ValueError("Stream has multiple upstreams")
531+
elif len(self.upstreams) == 0:
532+
return None
531533
else:
532534
return self.upstreams[0]
533535

@@ -549,6 +551,13 @@ def remove(self, predicate):
549551
""" Only pass through elements for which the predicate returns False """
550552
return self.filter(lambda x: not predicate(x))
551553

554+
def stop(self):
555+
"""Call on any stream node to halt all upstream sources"""
556+
prev, s = self.upstream, self
557+
while s:
558+
prev, s = s, s.upstream
559+
prev.stopped = True
560+
552561
@property
553562
def scan(self):
554563
return self.accumulate

streamz/sinks.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,5 @@ def __init__(self, upstream, file, end="\n", mode="a", **kwargs):
107107
weakref.finalize(self, self._fp.close)
108108
super().__init__(upstream, **kwargs)
109109

110-
def __del__(self):
111-
self._fp.close()
112-
113110
def update(self, x, who=None, metadata=None):
114111
self._fp.write(x + self._end)

0 commit comments

Comments
 (0)