|
19 | 19 | from streamz import Stream, RefCounter |
20 | 20 | from streamz.sources import sink_to_file, PeriodicCallback |
21 | 21 | from streamz.utils_test import (inc, double, gen_test, tmpfile, captured_logger, # noqa: F401 |
22 | | - clean, await_for, metadata) # noqa: F401 |
| 22 | + clean, await_for, metadata, wait_for) # noqa: F401 |
23 | 23 | from distributed.utils_test import loop # noqa: F401 |
24 | 24 |
|
25 | 25 |
|
@@ -1634,3 +1634,31 @@ def test_buffer_after_timed_window(): |
1634 | 1634 |
|
1635 | 1635 | def test_buffer_after_sliding_window(): |
1636 | 1636 | Stream().sliding_window(1).buffer(1) |
| 1637 | + |
| 1638 | + |
| 1639 | +def test_backpressure_connect_empty_stream(): |
| 1640 | + @Stream.register_api() |
| 1641 | + class from_list(Stream): |
| 1642 | + |
| 1643 | + def __init__(self, source, **kwargs): |
| 1644 | + self.source = source |
| 1645 | + super().__init__(ensure_io_loop=True, **kwargs) |
| 1646 | + |
| 1647 | + def start(self): |
| 1648 | + self.stopped = False |
| 1649 | + self.loop.add_callback(self.run) |
| 1650 | + |
| 1651 | + @gen.coroutine |
| 1652 | + def run(self): |
| 1653 | + while not self.stopped and len(self.source) > 0: |
| 1654 | + yield self._emit(self.source.pop(0)) |
| 1655 | + |
| 1656 | + source_list = [0, 1, 2, 3, 4] |
| 1657 | + source = Stream.from_list(source_list) |
| 1658 | + sout = Stream() |
| 1659 | + L = sout.rate_limit(1).sink_to_list() |
| 1660 | + source.connect(sout) |
| 1661 | + source.start() |
| 1662 | + |
| 1663 | + wait_for(lambda: L == [0], 0.01) |
| 1664 | + assert len(source_list) > 0 |
0 commit comments