File tree Expand file tree Collapse file tree 3 files changed +22
-30
lines changed
Expand file tree Collapse file tree 3 files changed +22
-30
lines changed Original file line number Diff line number Diff line change 1- from streamz import Stream
21import asyncio
3- from tornado .platform .asyncio import AsyncIOMainLoop
4- AsyncIOMainLoop ().install ()
2+ from streamz import Stream
53
64
7- source = Stream ()
5+ source = Stream (asynchronous = True )
86s = source .sliding_window (2 ).map (sum )
9- L = s .sink_to_list () # store result in a list
10-
11- s .rate_limit (0.5 ).sink (source .emit ) # pipe output back to input
12- s .rate_limit (1.0 ).sink (lambda x : print (L )) # print state of L every second
13-
14- source .emit (0 ) # seed with initial values
15- source .emit (1 )
16-
7+ L = s .sink_to_list () # store result in a list
178
18- def run_asyncio_loop ():
19- loop = asyncio .get_event_loop ()
20- try :
21- loop .run_forever ()
22- except KeyboardInterrupt :
23- pass
24- finally :
25- loop .close ()
9+ s .rate_limit ('500ms' ).sink (source .emit ) # pipe output back to input
10+ s .rate_limit ('1s' ).sink (lambda x : print (L )) # print state of L every second
2611
12+ source .emit (1 ) # seed with initial values, does not block thread due to Future return
2713
28- run_asyncio_loop ()
14+ try :
15+ asyncio .get_event_loop ().run_forever ()
16+ except (KeyboardInterrupt , asyncio .CancelledError ):
17+ pass
Original file line number Diff line number Diff line change 11from streamz import Stream
2- from tornado .ioloop import IOLoop
32
43source = Stream ()
54s = source .sliding_window (2 ).map (sum )
6- L = s .sink_to_list () # store result in a list
5+ L = s .sink_to_list () # store result in a list
76
87s .rate_limit ('500ms' ).sink (source .emit ) # pipe output back to input
98s .rate_limit ('1s' ).sink (lambda x : print (L )) # print state of L every second
109
11- source .emit (0 ) # seed with initial values
12- source .emit (1 )
10+ try :
11+ source .emit (1 ) # seed with initial values, blocks thread due to cycle in stream
12+ except KeyboardInterrupt :
13+ pass
Original file line number Diff line number Diff line change 44
55source = Stream (asynchronous = True )
66s = source .sliding_window (2 ).map (sum )
7- L = s .sink_to_list () # store result in a list
7+ L = s .sink_to_list () # store result in a list
88
9- s .rate_limit ('500ms' ).sink (source .emit ) # pipe output back to input
9+ s .rate_limit ('500ms' ).sink (source .emit ) # pipe output back to input
1010s .rate_limit ('1s' ).sink (lambda x : print (L )) # print state of L every second
1111
12- source .emit (0 ) # seed with initial values
13- source .emit (1 )
12+ source .emit (1 ) # seed with initial values, does not block thread due to Future return
1413
15- IOLoop .current ().start ()
14+ try :
15+ IOLoop .current ().start ()
16+ except KeyboardInterrupt :
17+ pass
You can’t perform that action at this time.
0 commit comments