1+ import threading
12import socket
3+ import logging
4+ from time import sleep
25
36PH803W_DEFAULT_TCP_PORT = 12416
4- PH803W_PING_INTERVAL = 4000
5- RECONNECT_DELAY = 10000
6- RESPONSE_TIMEOUT = 5000
7+ PH803W_PING_INTERVAL = 4
8+ RECONNECT_DELAY = 10
9+ # RESPONSE_TIMEOUT = 5000
10+ ABORT_AFTER_CONSECUTIVE_EMPTY = 10
11+
12+ _LOGGER = logging .getLogger (__name__ )
13+
14+
15+ class DeviceError (ConnectionError ):
16+ pass
717
818
919class Device (object ):
1020 def __init__ (self , host ):
11- self .result = {}
1221 self .host = host
22+ self .passcode = ""
23+ self ._measurements = []
24+ self ._measurements_counter = 0
1325 self ._socket = socket .socket (socket .AF_INET , socket .SOCK_STREAM )
1426 self ._loop = True
27+ self ._empty_counter = 0
28+ self ._pong_thread = None
1529
16- async def run_async (self ) :
17- self .run ()
30+ async def run_async (self , once : bool = True ) -> bool :
31+ return self .run (once )
1832
19- def run (self , once : bool = False ) -> bool :
33+ def run (self , once : bool = True ) -> bool :
34+ if once :
35+ self ._connect ()
36+ return self ._run (once )
37+ else :
38+ measurements = - 1
39+ while self ._loop :
40+ if measurements == self ._measurements_counter :
41+ _LOGGER .error ("Aborting reconnects, no new measurements" )
42+ break
43+ measurements = self ._measurements_counter
44+ self ._connect ()
45+ try :
46+ self ._run (once )
47+ except Exception as e :
48+ _LOGGER .warning ("Exception in run loop" , e )
49+ sleep (RECONNECT_DELAY )
50+ return not self ._loop and self ._measurements_counter > 0
51+
52+ def _connect (self ) -> bool :
2053 self ._loop = True
2154 self ._socket .connect ((self .host , PH803W_DEFAULT_TCP_PORT ))
2255
56+ # Send request for connection
2357 data = bytes .fromhex ("0000000303000006" )
2458 self ._socket .sendall (data )
59+
60+ # Receive response and passcode
2561 response = self ._socket .recv (1024 )
2662 passcode_lenth = response [9 ]
2763 passcode_raw = response [10 : 10 + passcode_lenth ]
28- passcode = passcode_raw .decode ("utf-8" )
29- # print( passcode)
64+ self . passcode = passcode_raw .decode ("utf-8" )
65+ _LOGGER . debug ( self . passcode )
3066
67+ # Send passcode confirmation
3168 data = (
3269 bytes .fromhex ("000000030f00000800" )
3370 + passcode_lenth .to_bytes (1 , "little" )
3471 + passcode_raw
3572 )
3673 self ._socket .sendall (data )
74+
75+ # Receive confirmation
3776 response = self ._socket .recv (1024 )
3877 if response [8 ] != 0 :
39- # print("Error connecting")
40- return False
78+ raise DeviceError ("Error connecting" )
4179
42- # Connection established, from now on some cyclig bahavior
80+ def _run (self , once : bool = True ) -> bool :
81+ # Connection established, start requesting data,
82+ # from now on some cyclig bahavior
4383 data = bytes .fromhex ("000000030400009002" )
4484 self ._socket .sendall (data )
45- empty_counter = 0
46- data = bytes .fromhex ("0000000303000015" )
47- while self ._loop and empty_counter < 10 :
85+
86+ # If continous reading ping/pong needs to be run cyclic
87+ if not once :
88+ self ._pong_thread = threading .Thread (target = self ._ping_loop )
89+ self ._pong_thread .daemon = True
90+ self ._pong_thread .start ()
91+ else :
92+ self ._send_ping ()
93+
94+ while self ._loop :
4895 response = self ._socket .recv (1024 )
96+ if self ._empty_counter > ABORT_AFTER_CONSECUTIVE_EMPTY :
97+ _LOGGER .error ("Too many empty consecutive packages" )
98+ raise DeviceError ("Too many empty consecutive packages" )
4999 if len (response ) == 0 :
50- empty_counter += 1
100+ self . _empty_counter += 1
51101 continue
52- empty_counter = 0
53- # print(response)
54- if len (response ) == 18 :
55- flag1 = response [8 ]
56- if flag1 & 0b0000_0100 :
57- print ("In water" )
58- flag2 = response [9 ]
59- if flag2 & 0b0000_0010 :
60- print ("ORP on" )
61- if flag2 & 0b0000_0001 :
62- print ("PH on" )
63- # state_raw = response[8 : 9]
64- ph_raw = response [10 :12 ]
65- ph = int .from_bytes (ph_raw , "big" ) * 0.01
66- redox_raw = response [12 :14 ]
67- redox = int .from_bytes (redox_raw , "big" ) - 2000
68- unknown1_raw = response [14 :16 ]
69- unknown1 = int .from_bytes (unknown1_raw , "big" )
70- unknown2_raw = response [15 :18 ]
71- unknown2 = int .from_bytes (unknown2_raw , "big" )
72- print (
73- "pH: %s, Redox: %s, U1: %s, U2: %s"
74- % (ph , redox , unknown1 , unknown2 ),
75- flush = True ,
76- )
77- self ._socket .sendall (data )
78- response = self ._socket .recv (1024 )
79- if once :
80- break
102+ self ._empty_counter = 0
103+
104+ self ._handle_response (response )
105+
106+ if once and len (self ._measurements ) > 0 :
107+ self ._loop = False
108+
109+ return (once and len (self ._measurements ) > 0 ) or not once
81110
82111 def _handle_response (self , data ):
83112 if data [0 ] != 0 and data [1 ] != 0 and data [2 ] != 0 and data [2 ] != 3 :
84- # print("Ignore data package because invalid prefix: %s" % data[0:3])
85- self .result ["status" ] = [
86- "Error" ,
87- "Ignore data package because invalid prefix" ,
88- ]
113+ _LOGGER .warning ("Ignore data package because invalid prefix: %s" % data [0 :3 ])
89114 return
90115 data_length = data [4 ]
91116 if len (data ) != data_length + 5 :
92117 if len (data ) > data_length :
93118 additional_data = data [data_length : len (data )]
94119 data = data [0 :data_length ]
95- # print(
96- # "Split into two data packages because additional data detected. First %s - Second %s}"
97- # % (data.toString("hex"), additional_data.toString("hex"))
98- # )
120+ _LOGGER .debug ("Split into two data packages because additional data detected." )
99121 self ._handle_response (additional_data )
100122 else :
101- # print(
102- # "Ignore data package because invalid length(%s): %s"
103- # % (data_length, data)
104- # )
105- self .result ["status" ] = [
106- "Error" ,
107- "Ignore data package because invalid length" ,
108- ]
123+ _LOGGER .warning (
124+ "Ignore data package because invalid length(%s): %s"
125+ % (data_length , data )
126+ )
109127 return
110128
111129 message_type = data [7 ]
@@ -120,46 +138,54 @@ def _handle_response(self, data):
120138 elif message_type == 0x94 :
121139 self ._handle_data_extended_response (data )
122140 else :
123- # print(
124- # "Ignore data package because invalid message type %s: %s"
125- # % (message_type, data)
126- # )
127- self .result ["status" ] = [
128- "Ignore" ,
129- "Ignore data package because invalid length" ,
130- message_type ,
131- data ,
132- ]
141+ pass
142+ _LOGGER .warning (
143+ "Ignore data package because invalid message type %s: %s"
144+ % (message_type , data )
145+ )
133146
134147 def _handle_passcode_response (self , data ):
135148 pass
149+ _LOGGER .warning ("Passcode resonse ignored" )
136150
137- def _handle_login_response (self , data ):
138- pass
139-
140- def _handle_ping_pong_response (self ):
151+ def _handle_login_response (self , data ):
141152 pass
153+ _LOGGER .warning ("Login resonse ignored" )
142154
155+ def _handle_data_response (self , data ):
156+ if len (data ) == 18 :
157+ meas = Measurement (data )
158+ self ._measurements .append (meas )
159+ self ._measurements_counter += 1
160+ else :
161+ pass
162+ _LOGGER .debug (meas )
163+
143164 def _handle_data_extended_response (self , data ):
144165 pass
166+ _LOGGER .warning ("Extended data ignored" )
167+
168+ def _handle_ping_pong_response (self ):
169+ # if self._pong_thread is None or self._pong_thread.done:
170+ # self._pong_thread = asyncio.create_task(self._async_queue_ping())
171+ # pass
172+ # else:
173+ # _LOGGER.debug("Pong thread alredy running")
174+ _LOGGER .debug ("Pong message received" )
145175
146176 def _send_ping (self ):
147- pass
177+ pong_data = bytes .fromhex ("0000000303000015" )
178+ self ._socket .sendall (pong_data )
179+ _LOGGER .debug ("Ping sent" )
148180
149- # if (this.pingWaitTimeout) {
150- # clearTimeout(this.pingWaitTimeout);
151- # this.pingWaitTimeout = null;
152- # }
153- # if (this.pingTimeout) {
154- # clearTimeout(this.pingTimeout);
155- # this.pingTimeout = null;
156- # }
157- # debug('received pong');
158- # this.pingTimeout = setTimeout(() => {
159- # this.pingTimeout = null;
160- # this._sendPing();
161- # }, this.options.pingInterval || PH803W_PING_INTERVAL);
162- # } def _handle_data_response(self, data):
181+ def _ping_loop (self ):
182+ while self ._loop :
183+ sleep (PH803W_PING_INTERVAL )
184+ self ._send_ping ()
185+
186+ # async def _async_queue_ping(self):
187+ # await asyncio.sleep(PH803W_PING_INTERVAL)
188+ # self._send_ping()
163189
164190 def abort (self ):
165191 self ._loop = False
@@ -176,3 +202,25 @@ def __enter__(self):
176202
177203 def __exit__ (self , type , value , traceback ):
178204 self ._socket .close ()
205+
206+ class Measurement :
207+ def __init__ (self , data ) -> None :
208+ flag1 = data [8 ]
209+ self .in_water = flag1 & 0b0000_0100 != 0
210+ flag2 = data [9 ]
211+ self .orp_on = flag2 & 0b0000_0010 != 0
212+ self .ph_on = flag2 & 0b0000_0001 != 0
213+ ph_raw = data [10 :12 ]
214+ self .ph = int .from_bytes (ph_raw , "big" ) * 0.01
215+ redox_raw = data [12 :14 ]
216+ self .redox = int .from_bytes (redox_raw , "big" ) - 2000
217+ unknown1_raw = data [14 :16 ]
218+ self .unknown1 = int .from_bytes (unknown1_raw , "big" )
219+ unknown2_raw = data [15 :18 ]
220+ self .unknown2 = int .from_bytes (unknown2_raw , "big" )
221+
222+ def __str__ (self ) -> str :
223+ return (
224+ "pH: %s, Redox: %s, In-water: %s, pH-on: %s, Orp-on: %s"
225+ % (self .ph , self .redox , self .in_water , self .ph_on , self .orp_on )
226+ )
0 commit comments