Skip to content

Commit bf86272

Browse files
author
Martin Durant
committed
refactor PeriodicCallback but keep alias for now
1 parent 38212a4 commit bf86272

3 files changed

Lines changed: 74 additions & 27 deletions

File tree

streamz/sinks.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,5 @@ def __init__(self, upstream, file, end="\n", mode="a", **kwargs):
107107
weakref.finalize(self, self._fp.close)
108108
super().__init__(upstream, **kwargs)
109109

110-
def __del__(self):
111-
self._fp.close()
112-
113110
def update(self, x, who=None, metadata=None):
114111
self._fp.write(x + self._end)

streamz/sources.py

Lines changed: 68 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,25 +2,11 @@
22
from glob import glob
33
import os
44
import time
5-
import tornado.ioloop
65
from tornado import gen
76

87
from .core import Stream, convert_interval, RefCounter
98

109

11-
def PeriodicCallback(callback, callback_time, asynchronous=False, **kwargs):
12-
source = Stream(asynchronous=asynchronous)
13-
14-
def _():
15-
result = callback()
16-
source._emit(result)
17-
18-
pc = tornado.ioloop.PeriodicCallback(_, callback_time, **kwargs)
19-
pc.start()
20-
pc.start()
21-
return source
22-
23-
2410
def sink_to_file(filename, upstream, mode='w', prefix='', suffix='\n', flush=False):
2511
file = open(filename, mode=mode)
2612

@@ -34,6 +20,17 @@ def write(text):
3420

3521

3622
class Source(Stream):
23+
"""Start node for a set of Streams
24+
25+
Source nodes emit data into other nodes. They typically get this data
26+
by polling external sources, and are necessarily run by an event loop.
27+
28+
Parameters
29+
----------
30+
start: bool
31+
Whether to call the run method immediately. If False, nothing
32+
will happen until ``source.start()`` is called.
33+
"""
3734
_graphviz_shape = 'doubleoctagon'
3835

3936
def __init__(self, start=False, **kwargs):
@@ -49,15 +46,68 @@ def stop(self):
4946
self.stopped = True
5047

5148
def start(self):
52-
"""start polling"""
53-
self.stopped = False
54-
self.started = True
55-
self.loop.add_callback(self.run)
49+
"""start polling
50+
51+
If already running, this has no effect. If the source was started and then
52+
stopped again, this will restart the ``self.run`` coroutine.
53+
"""
54+
if self.stopped:
55+
self.stopped = False
56+
self.started = True
57+
self.loop.add_callback(self.run)
5658

5759
async def run(self):
60+
"""This coroutine will be invoked by start() and emit all data
61+
62+
You might either overrive ``_run()`` when all logic can be contained
63+
there, or override this method directly.
64+
65+
Note the use of ``.stopped`` to halt the coroutine, whether or not
66+
67+
"""
5868
while not self.stopped:
5969
await self._run()
6070

71+
async def _run(self):
72+
"""This is the functionality to run on each cycle
73+
74+
Typically this may be used for polling some external IO source
75+
or time-based data emission. You might choose to include an
76+
``await asyncio.sleep()`` for the latter.
77+
"""
78+
raise NotImplementedError
79+
80+
81+
@Stream.register_api(staticmethod)
82+
class from_periodic(Source):
83+
"""Generate data from a function on given period
84+
85+
cf ``streamz.dataframe.PeriodicDataFrame``
86+
87+
Parameters
88+
----------
89+
callback: callable
90+
Function to call on each iteration. Takes no arguments.
91+
poll_interval: float
92+
Time to sleep between calls (s)
93+
"""
94+
95+
def __init__(self, callback, poll_interval=0.1, **kwargs):
96+
self._cb = callback
97+
self._poll = poll_interval
98+
super().__init__(**kwargs)
99+
100+
async def _run(self):
101+
await asyncio.gather(self._emit(self._cb()))
102+
await asyncio.sleep(self._poll)
103+
104+
105+
def PeriodicCallback(callback, callback_time, asynchronous=False, **kwargs): # pragma: no cover
106+
"""For backward compatibility - please use Stream.from_periodic"""
107+
if kwargs:
108+
callback = lambda: callback(**kwargs)
109+
return Stream.from_periodic(callback, callback_time, asynchronous=asynchronous)
110+
61111

62112
@Stream.register_api(staticmethod)
63113
class from_textfile(Source):

streamz/tests/test_core.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,18 @@
55
import operator
66
from operator import add
77
import os
8-
from time import time, sleep
8+
from time import sleep
99
import sys
1010

1111
import pytest
1212

13-
from tornado import gen
1413
from tornado.queues import Queue
1514
from tornado.ioloop import IOLoop
1615

1716
import streamz as sz
1817

19-
from streamz import Stream, RefCounter
20-
from streamz.sources import sink_to_file, PeriodicCallback
18+
from streamz import RefCounter
19+
from streamz.sources import sink_to_file
2120
from streamz.utils_test import (inc, double, gen_test, tmpfile, captured_logger, # noqa: F401
2221
clean, await_for, metadata, wait_for) # noqa: F401
2322
from distributed.utils_test import loop # noqa: F401
@@ -421,7 +420,7 @@ def test_timed_window_ref_counts():
421420
@gen_test()
422421
def test_timed_window_metadata():
423422
source = Stream()
424-
L = metadata(source.timed_window(0.01)).sink_to_list()
423+
L = metadata(source.timed_window(0.06)).sink_to_list()
425424

426425
source.emit(0)
427426
source.emit(1, metadata=[{'v': 1}])
@@ -482,7 +481,8 @@ def test_sink_to_file():
482481
@gen_test()
483482
def test_counter():
484483
counter = itertools.count()
485-
source = PeriodicCallback(lambda: next(counter), 0.001, asynchronous=True)
484+
source = Stream.from_periodic(lambda: next(counter), 0.001, asynchronous=True,
485+
start=True)
486486
L = source.sink_to_list()
487487
yield gen.sleep(0.05)
488488

0 commit comments

Comments
 (0)