1+ import inspect
2+ import weakref
3+
14from tornado import gen
25
36from streamz import Stream
47
8+ # sinks add themselves here to avoid being garbage-collected
59_global_sinks = set ()
610
711
@@ -18,6 +22,16 @@ def __init__(self, upstream, **kwargs):
1822class sink (Sink ):
1923 """ Apply a function on every element
2024
25+ Parameters
26+ ----------
27+ func: callable
28+ A function that will be applied on every element.
29+ args:
30+ Positional arguments that will be passed to ``func`` after the incoming element.
31+ kwargs:
32+ Stream-specific arguments will be passed to ``Stream.__init__``, the rest of
33+ them will be passed to ``func``.
34+
2135 Examples
2236 --------
2337 >>> source = Stream()
@@ -40,10 +54,11 @@ class sink(Sink):
4054 def __init__ (self , upstream , func , * args , ** kwargs ):
4155 self .func = func
4256 # take the stream specific kwargs out
43- stream_name = kwargs .pop ("stream_name" , None )
44- self .kwargs = kwargs
57+ sig = set (inspect .signature (Stream ).parameters )
58+ stream_kwargs = {k : v for (k , v ) in kwargs .items () if k in sig }
59+ self .kwargs = {k : v for (k , v ) in kwargs .items () if k not in sig }
4560 self .args = args
46- super ().__init__ (upstream , stream_name = stream_name )
61+ super ().__init__ (upstream , ** stream_kwargs )
4762
4863 def update (self , x , who = None , metadata = None ):
4964 result = self .func (x , * self .args , ** self .kwargs )
@@ -59,8 +74,8 @@ class sink_to_textfile(Sink):
5974
6075 Type of elements must be ``str``.
6176
62- Arguments
63- ---------
77+ Parameters
78+ ----------
6479 file: str or file-like
6580 File to write the elements to. ``str`` is treated as a file name to open.
6681 If file-like, descriptor must be open in text mode. Note that the file
@@ -83,14 +98,14 @@ class sink_to_textfile(Sink):
8398 1
8499 """
85100 def __init__ (self , upstream , file , end = "\n " , mode = "a" , ** kwargs ):
86- self ._fp = open (file , mode = mode , buffering = 1 ) if isinstance (file , str ) else file
87101 self ._end = end
102+ self ._fp = open (file , mode = mode ) if isinstance (file , str ) else file
103+ weakref .finalize (self , self ._fp .close )
88104 super ().__init__ (upstream , ensure_io_loop = True , ** kwargs )
89105
90106 def __del__ (self ):
91107 self ._fp .close ()
92108
93109 @gen .coroutine
94110 def update (self , x , who = None , metadata = None ):
95- yield self .loop .run_in_executor (None , self ._fp .write , x )
96- yield self .loop .run_in_executor (None , self ._fp .write , self ._end )
111+ yield self .loop .run_in_executor (None , self ._fp .write , x + self ._end )
0 commit comments