Skip to content

Commit f76fbd9

Browse files
authored
Merge pull request #395 from martindurant/source_refactor
refactor and make less tornado
2 parents b82ca42 + 13ff9df commit f76fbd9

9 files changed

Lines changed: 266 additions & 222 deletions

File tree

.coveragerc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,4 @@ omit =
1313
exclude_lines =
1414
if __name__ == '__main__':
1515
pragma: no cover
16+
NotImplementedError

setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
# https://flake8.readthedocs.io/en/latest/user/error-codes.html
55

66
# Note: there cannot be spaces after comma's here
7-
exclude = __init__.py, compatibility.py
7+
exclude = __init__.py,tests
88
ignore =
99
# Extra space in brackets
1010
E20,

streamz/compatibility.py

Lines changed: 0 additions & 8 deletions
This file was deleted.

streamz/core.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
from collections.abc import Iterable
2525

26-
from .compatibility import get_thread_identity
26+
from threading import get_ident as get_thread_identity
2727
from .orderedweakset import OrderedWeakrefSet
2828

2929
no_default = '--no-default--'
@@ -512,8 +512,10 @@ def disconnect(self, downstream):
512512

513513
@property
514514
def upstream(self):
515-
if len(self.upstreams) != 1:
515+
if len(self.upstreams) > 1:
516516
raise ValueError("Stream has multiple upstreams")
517+
elif len(self.upstreams) == 0:
518+
return None
517519
else:
518520
return self.upstreams[0]
519521

@@ -535,6 +537,13 @@ def remove(self, predicate):
535537
""" Only pass through elements for which the predicate returns False """
536538
return self.filter(lambda x: not predicate(x))
537539

540+
def stop(self):
541+
"""Call on any stream node to halt all upstream sources"""
542+
prev, s = self.upstream, self
543+
while s:
544+
prev, s = s, s.upstream
545+
prev.stopped = True
546+
538547
@property
539548
def scan(self):
540549
return self.accumulate

streamz/sinks.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,5 @@ def __init__(self, upstream, file, end="\n", mode="a", **kwargs):
107107
weakref.finalize(self, self._fp.close)
108108
super().__init__(upstream, **kwargs)
109109

110-
def __del__(self):
111-
self._fp.close()
112-
113110
def update(self, x, who=None, metadata=None):
114111
self._fp.write(x + self._end)

0 commit comments

Comments
 (0)