|
1 | 1 | import asyncio |
2 | 2 | from glob import glob |
| 3 | +import queue |
3 | 4 | import os |
4 | 5 | import time |
5 | 6 | from tornado import gen |
6 | 7 | import weakref |
7 | 8 |
|
8 | | -from .core import Stream, convert_interval, RefCounter |
| 9 | +from .core import Stream, convert_interval, RefCounter, sync |
9 | 10 |
|
10 | 11 |
|
11 | 12 | def sink_to_file(filename, upstream, mode='w', prefix='', suffix='\n', flush=False): |
@@ -779,3 +780,135 @@ async def run(self): |
779 | 780 | break |
780 | 781 | await asyncio.gather(*self._emit(x)) |
781 | 782 | self.stopped = True |
| 783 | + |
| 784 | + |
| 785 | +@Stream.register_api() |
| 786 | +class from_websocket(Source): |
| 787 | + """Read binary data from a websocket |
| 788 | +
|
| 789 | + This source will accept connections on a given port and handle messages |
| 790 | + coming in. |
| 791 | +
|
| 792 | + The websockets library must be installed. |
| 793 | +
|
| 794 | + :param host: str |
| 795 | + Typically "localhost" |
| 796 | + :param port: int |
| 797 | + Which port to listen on (must be available) |
| 798 | + :param serve_kwargs: dict |
| 799 | + Passed to ``websockets.serve`` |
| 800 | + :param kwargs: |
| 801 | + Passed to superclass |
| 802 | + """ |
| 803 | + |
| 804 | + def __init__(self, host, port, serve_kwargs=None, **kwargs): |
| 805 | + self.host = host |
| 806 | + self.port = port |
| 807 | + self.s_kw = serve_kwargs |
| 808 | + self.server = None |
| 809 | + super().__init__(**kwargs) |
| 810 | + |
| 811 | + @gen.coroutine |
| 812 | + def _read(self, ws, path): |
| 813 | + while not self.stopped: |
| 814 | + data = yield ws.recv() |
| 815 | + yield self._emit(data) |
| 816 | + |
| 817 | + async def run(self): |
| 818 | + import websockets |
| 819 | + self.server = await websockets.serve( |
| 820 | + self._read, self.host, self.port, **(self.s_kw or {}) |
| 821 | + ) |
| 822 | + |
| 823 | + def stop(self): |
| 824 | + self.server.close() |
| 825 | + sync(self.loop, self.server.wait_closed) |
| 826 | + |
| 827 | + |
| 828 | +@Stream.register_api() |
| 829 | +class from_q(Source): |
| 830 | + """Source events from a threading.Queue, running another event framework |
| 831 | +
|
| 832 | + The queue is polled, i.e., there is a latency/overhead tradeoff, since |
| 833 | + we cannot use ``await`` directly with a multithreaded queue. |
| 834 | +
|
| 835 | + Allows mixing of another event loop, for example pyqt, on another thread. |
| 836 | + Note that, by default, a streamz.Source such as this one will start |
| 837 | + an event loop in a new thread, unless otherwise specified. |
| 838 | + """ |
| 839 | + |
| 840 | + def __init__(self, q, sleep_time=0.01, **kwargs): |
| 841 | + """ |
| 842 | + :param q: threading.Queue |
| 843 | + Any items pushed into here will become streamz events |
| 844 | + :param sleep_time: int |
| 845 | + Sets how long we wait before checking the input queue when |
| 846 | + empty (in s) |
| 847 | + :param kwargs: |
| 848 | + passed to streamz.Source |
| 849 | + """ |
| 850 | + self.q = q |
| 851 | + self.sleep = sleep_time |
| 852 | + super().__init__(**kwargs) |
| 853 | + |
| 854 | + async def _run(self): |
| 855 | + """Poll threading queue for events |
| 856 | + This uses check-and-wait, but overhead is low. Could maybe have |
| 857 | + a sleep-free version with an threading.Event. |
| 858 | + """ |
| 859 | + try: |
| 860 | + out = self.q.get_nowait() |
| 861 | + await self.emit(out, asynchronous=True) |
| 862 | + except queue.Empty: |
| 863 | + await asyncio.sleep(self.sleep) |
| 864 | + |
| 865 | + |
| 866 | +@Stream.register_api() |
| 867 | +class from_mqtt(from_q): |
| 868 | + """Read from MQTT source |
| 869 | +
|
| 870 | + See https://en.wikipedia.org/wiki/MQTT for a description of the protocol |
| 871 | + and its uses. |
| 872 | +
|
| 873 | + See also ``sinks.to_mqtt``. |
| 874 | +
|
| 875 | + Requires ``paho.mqtt`` |
| 876 | +
|
| 877 | + The outputs are ``paho.mqtt.client.MQTTMessage`` instances, which each have |
| 878 | + attributes timestamp, payload, topic, ... |
| 879 | +
|
| 880 | + NB: paho.mqtt.python runs on its own thread in this implementation. We may |
| 881 | + wish to instead call client.loop() directly |
| 882 | +
|
| 883 | + :param host: str |
| 884 | + :param port: int |
| 885 | + :param topic: str |
| 886 | + (May in the future support a list of topics) |
| 887 | + :param keepalive: int |
| 888 | + See mqtt docs - to keep the channel alive |
| 889 | + :param client_kwargs: |
| 890 | + Passed to the client's ``connect()`` method |
| 891 | + """ |
| 892 | + def __init__(self, host, port, topic, keepalive=60 , client_kwargs=None, **kwargs): |
| 893 | + self.host = host |
| 894 | + self.port = port |
| 895 | + self.keepalive = keepalive |
| 896 | + self.topic = topic |
| 897 | + self.client_kwargs = client_kwargs |
| 898 | + super().__init__(q=queue.Queue(), **kwargs) |
| 899 | + |
| 900 | + def _on_connect(self, client, userdata, flags, rc): |
| 901 | + client.subscribe(self.topic) |
| 902 | + |
| 903 | + def _on_message(self, client, userdata, msg): |
| 904 | + self.q.put(msg) |
| 905 | + |
| 906 | + async def run(self): |
| 907 | + import paho.mqtt.client as mqtt |
| 908 | + client = mqtt.Client() |
| 909 | + client.on_connect = self._on_connect |
| 910 | + client.on_message = self._on_message |
| 911 | + client.connect(self.host, self.port, self.keepalive, **(self.client_kwargs or {})) |
| 912 | + client.loop_start() |
| 913 | + await super().run() |
| 914 | + client.disconnect() |
0 commit comments