|
7 | 7 |
|
8 | 8 | import datetime |
9 | 9 | import hashlib |
| 10 | +import struct |
10 | 11 | import time |
11 | 12 | import uuid |
12 | 13 | from calendar import monthrange |
@@ -981,6 +982,295 @@ def get_system_status(self, keepAlive: bool = False): |
981 | 982 | outObj = {k: SystemStatusBools[v] for k, v in outObj.items()} |
982 | 983 | return outObj |
983 | 984 |
|
| 985 | + def get_wallbox_data(self, wbIndex: int = 0, keepAlive: bool = False): |
| 986 | + """Polls the wallbox status via rscp protocol locally. |
| 987 | +
|
| 988 | + Args: |
| 989 | + wbIndex (Optional[int]): Index of the wallbox to poll data for |
| 990 | + keepAlive (Optional[bool]): True to keep connection alive |
| 991 | +
|
| 992 | + Returns: |
| 993 | + dict: Dictionary containing the wallbox status structured as follows:: |
| 994 | +
|
| 995 | + { |
| 996 | + "appSoftware": <version of the app>, |
| 997 | + "batteryToCar": <true if the wallbox may use the battery, otherwise false>, |
| 998 | + "chargingActive": <true if charging is currently active, otherwise false>, |
| 999 | + "chargingCanceled": <true if charging was manually canceled, otherwise false>, |
| 1000 | + "consumptionNet": <power currently consumed by the wallbox, provided by the grid in watts>, |
| 1001 | + "consumptionSun": <power currently consumed by the wallbox, provided by the solar panels in watts>, |
| 1002 | + "energyAll": <total consumed energy this month in watthours>, |
| 1003 | + "energyNet": <consumed net energy this month in watthours>, |
| 1004 | + "energySun": <consumed solar energy this month in watthours>, |
| 1005 | + "index": <index of the requested wallbox>, |
| 1006 | + "keyState": <state of the key switch at the wallbox>, |
| 1007 | + "maxChargeCurrent": <configured maximum charge current in A>, |
| 1008 | + "phases": <number of phases used for charging>, |
| 1009 | + "schukoOn": <true if the connected schuko of the wallbox is on, otherwise false>, |
| 1010 | + "soc": <state of charge>, |
| 1011 | + "sunModeOn": <true if sun-only-mode is active, false if mixed mode is active> |
| 1012 | + } |
| 1013 | + """ |
| 1014 | + req = self.sendRequest( |
| 1015 | + ( |
| 1016 | + RscpTag.WB_REQ_DATA, |
| 1017 | + RscpType.Container, |
| 1018 | + [ |
| 1019 | + (RscpTag.WB_INDEX, RscpType.UChar8, wbIndex), |
| 1020 | + (RscpTag.WB_REQ_EXTERN_DATA_ALG, RscpType.NoneType, None), |
| 1021 | + (RscpTag.WB_REQ_EXTERN_DATA_SUN, RscpType.NoneType, None), |
| 1022 | + (RscpTag.WB_REQ_EXTERN_DATA_NET, RscpType.NoneType, None), |
| 1023 | + (RscpTag.WB_REQ_APP_SOFTWARE, RscpType.NoneType, None), |
| 1024 | + (RscpTag.WB_REQ_KEY_STATE, RscpType.NoneType, None), |
| 1025 | + ], |
| 1026 | + ), |
| 1027 | + keepAlive=True, |
| 1028 | + ) |
| 1029 | + |
| 1030 | + outObj = { |
| 1031 | + "index": rscpFindTagIndex(req, RscpTag.WB_INDEX), |
| 1032 | + "appSoftware": rscpFindTagIndex(req, RscpTag.WB_APP_SOFTWARE), |
| 1033 | + } |
| 1034 | + |
| 1035 | + extern_data_alg = rscpFindTag(req, RscpTag.WB_EXTERN_DATA_ALG) |
| 1036 | + if extern_data_alg is not None: |
| 1037 | + extern_data = rscpFindTagIndex(extern_data_alg, RscpTag.WB_EXTERN_DATA) |
| 1038 | + status_byte = extern_data[2] |
| 1039 | + outObj["sunModeOn"] = (status_byte & 128) != 0 |
| 1040 | + outObj["chargingCanceled"] = (status_byte & 64) != 0 |
| 1041 | + outObj["chargingActive"] = (status_byte & 32) != 0 |
| 1042 | + outObj["plugLocked"] = (status_byte & 16) != 0 |
| 1043 | + outObj["plugged"] = (status_byte & 8) != 0 |
| 1044 | + outObj["soc"] = extern_data[0] |
| 1045 | + outObj["phases"] = extern_data[1] |
| 1046 | + outObj["maxChargeCurrent"] = extern_data[3] |
| 1047 | + outObj["schukoOn"] = extern_data[5] != 0 |
| 1048 | + |
| 1049 | + extern_data_sun = rscpFindTag(req, RscpTag.WB_EXTERN_DATA_SUN) |
| 1050 | + if extern_data_sun is not None: |
| 1051 | + extern_data = rscpFindTagIndex(extern_data_sun, RscpTag.WB_EXTERN_DATA) |
| 1052 | + outObj["consumptionSun"] = struct.unpack("h", extern_data[0:2])[0] |
| 1053 | + outObj["energySun"] = struct.unpack("i", extern_data[2:6])[0] |
| 1054 | + |
| 1055 | + extern_data_net = rscpFindTag(req, RscpTag.WB_EXTERN_DATA_NET) |
| 1056 | + if extern_data_net is not None: |
| 1057 | + extern_data = rscpFindTagIndex(extern_data_net, RscpTag.WB_EXTERN_DATA) |
| 1058 | + outObj["consumptionNet"] = struct.unpack("h", extern_data[0:2])[0] |
| 1059 | + outObj["energyNet"] = struct.unpack("i", extern_data[2:6])[0] |
| 1060 | + |
| 1061 | + if "energySun" in outObj and "energyNet" in outObj: |
| 1062 | + outObj["energyAll"] = outObj["energyNet"] + outObj["energySun"] |
| 1063 | + |
| 1064 | + key_state = rscpFindTag(req, RscpTag.WB_KEY_STATE) |
| 1065 | + if key_state is not None: |
| 1066 | + outObj["keyState"] = rscpFindTagIndex(key_state, RscpTag.WB_KEY_STATE) |
| 1067 | + |
| 1068 | + req = self.sendRequest( |
| 1069 | + (RscpTag.EMS_REQ_BATTERY_TO_CAR_MODE, RscpType.NoneType, None), |
| 1070 | + keepAlive=keepAlive, |
| 1071 | + ) |
| 1072 | + battery_to_car = rscpFindTag(req, RscpTag.EMS_BATTERY_TO_CAR_MODE) |
| 1073 | + if battery_to_car is not None: |
| 1074 | + outObj["batteryToCar"] = rscpFindTagIndex( |
| 1075 | + battery_to_car, RscpTag.EMS_BATTERY_TO_CAR_MODE |
| 1076 | + ) |
| 1077 | + |
| 1078 | + outObj = {k: v for k, v in sorted(outObj.items())} |
| 1079 | + return outObj |
| 1080 | + |
| 1081 | + def set_wallbox_sunmode( |
| 1082 | + self, enable: bool, wbIndex: int = 0, keepAlive: bool = False |
| 1083 | + ) -> bool: |
| 1084 | + """Sets the sun mode of the wallbox via rscp protocol locally. |
| 1085 | +
|
| 1086 | + Args: |
| 1087 | + enable (bool): True to enable sun mode, otherwise false, |
| 1088 | + wbIndex (Optional[int]): index of the requested wallbox, |
| 1089 | + keepAlive (Optional[bool]): True to keep connection alive |
| 1090 | +
|
| 1091 | + Returns: |
| 1092 | + True if success |
| 1093 | + False if error |
| 1094 | + """ |
| 1095 | + return self.sendWallboxSetRequest( |
| 1096 | + dataIndex=0, value=1 if enable else 2, wbIndex=wbIndex, keepAlive=keepAlive |
| 1097 | + ) |
| 1098 | + |
| 1099 | + def set_wallbox_schuko( |
| 1100 | + self, on: bool, wbIndex: int = 0, keepAlive: bool = False |
| 1101 | + ) -> bool: |
| 1102 | + """Sets the Schuko of the wallbox via rscp protocol locally. |
| 1103 | +
|
| 1104 | + Args: |
| 1105 | + on (bool): True to activate the Schuko, otherwise false |
| 1106 | + wbIndex (Optional[int]): index of the requested wallbox, |
| 1107 | + keepAlive (Optional[bool]): True to keep connection alive |
| 1108 | +
|
| 1109 | + Returns: |
| 1110 | + True if success (wallbox has understood the request, but might have ignored an unsupported value) |
| 1111 | + False if error |
| 1112 | + """ |
| 1113 | + return self.sendWallboxSetRequest( |
| 1114 | + dataIndex=5, value=1 if on else 0, wbIndex=wbIndex, keepAlive=keepAlive |
| 1115 | + ) |
| 1116 | + |
| 1117 | + def set_wallbox_max_charge_current( |
| 1118 | + self, max_charge_current: int, wbIndex: int = 0, keepAlive: bool = False |
| 1119 | + ) -> bool: |
| 1120 | + """Sets the maximum charge current of the wallbox via rscp protocol locally. |
| 1121 | +
|
| 1122 | + Args: |
| 1123 | + max_charge_current (int): maximum allowed charge current in A |
| 1124 | + wbIndex (Optional[int]): index of the requested wallbox, |
| 1125 | + keepAlive (Optional[bool]): True to keep connection alive |
| 1126 | +
|
| 1127 | + Returns: |
| 1128 | + True if success (wallbox has understood the request, but might have clipped the value) |
| 1129 | + False if error |
| 1130 | + """ |
| 1131 | + return self.sendWallboxSetRequest( |
| 1132 | + dataIndex=2, |
| 1133 | + value=max_charge_current, |
| 1134 | + request=RscpTag.WB_REQ_SET_PARAM_1, |
| 1135 | + wbIndex=wbIndex, |
| 1136 | + keepAlive=keepAlive, |
| 1137 | + ) |
| 1138 | + |
| 1139 | + def toggle_wallbox_charging( |
| 1140 | + self, wbIndex: int = 0, keepAlive: bool = False |
| 1141 | + ) -> bool: |
| 1142 | + """Toggles charging of the wallbox via rscp protocol locally. |
| 1143 | +
|
| 1144 | + Args: |
| 1145 | + wbIndex (Optional[int]): index of the requested wallbox, |
| 1146 | + keepAlive (Optional[bool]): True to keep connection alive |
| 1147 | +
|
| 1148 | + Returns: |
| 1149 | + True if success |
| 1150 | + False if error |
| 1151 | + """ |
| 1152 | + return self.sendWallboxSetRequest( |
| 1153 | + dataIndex=4, value=1, wbIndex=wbIndex, keepAlive=keepAlive |
| 1154 | + ) |
| 1155 | + |
| 1156 | + def toggle_wallbox_phases(self, wbIndex: int = 0, keepAlive: bool = False) -> bool: |
| 1157 | + """Toggles the number of phases used for charging by the wallbox between 1 and 3 via rscp protocol locally. |
| 1158 | +
|
| 1159 | + Args: |
| 1160 | + wbIndex (Optional[int]): index of the requested wallbox, |
| 1161 | + keepAlive (Optional[bool]): True to keep connection alive |
| 1162 | +
|
| 1163 | + Returns: |
| 1164 | + True if success |
| 1165 | + False if error |
| 1166 | + """ |
| 1167 | + return self.sendWallboxSetRequest( |
| 1168 | + dataIndex=3, value=1, wbIndex=wbIndex, keepAlive=keepAlive |
| 1169 | + ) |
| 1170 | + |
| 1171 | + def sendWallboxRequest( |
| 1172 | + self, |
| 1173 | + dataIndex: int, |
| 1174 | + value: int, |
| 1175 | + request: RscpTag = RscpTag.WB_REQ_SET_EXTERN, |
| 1176 | + wbIndex: int = 0, |
| 1177 | + keepAlive: bool = False, |
| 1178 | + ) -> Tuple[str | int | RscpTag, str | int | RscpType, Any]: |
| 1179 | + """Sends a low-level request with WB_EXTERN_DATA to the wallbox via rscp protocol locally. |
| 1180 | +
|
| 1181 | + Args: |
| 1182 | + dataIndex (int): byte index in the WB_EXTERN_DATA array (values: 0-5) |
| 1183 | + value (int): byte value to be set in the WB_EXTERN_DATA array at the given index |
| 1184 | + request (Optional[RscpTag]): request identifier (WB_REQ_SET_EXTERN, WB_REQ_SET_PARAM_1 or WB_REQ_SET_PARAM_2), |
| 1185 | + wbIndex (Optional[int]): index of the requested wallbox, |
| 1186 | + keepAlive (Optional[bool]): True to keep connection alive |
| 1187 | +
|
| 1188 | + Returns: |
| 1189 | + An object with the received data |
| 1190 | + """ |
| 1191 | + dataArray = bytearray([0, 0, 0, 0, 0, 0]) |
| 1192 | + dataArray[dataIndex] = value |
| 1193 | + result = self.sendRequest( |
| 1194 | + ( |
| 1195 | + RscpTag.WB_REQ_DATA, |
| 1196 | + RscpType.Container, |
| 1197 | + [ |
| 1198 | + (RscpTag.WB_INDEX, RscpType.UChar8, wbIndex), |
| 1199 | + ( |
| 1200 | + request, |
| 1201 | + RscpType.Container, |
| 1202 | + [ |
| 1203 | + (RscpTag.WB_EXTERN_DATA, RscpType.ByteArray, dataArray), |
| 1204 | + ( |
| 1205 | + RscpTag.WB_EXTERN_DATA_LEN, |
| 1206 | + RscpType.UChar8, |
| 1207 | + len(dataArray), |
| 1208 | + ), |
| 1209 | + ], |
| 1210 | + ), |
| 1211 | + ], |
| 1212 | + ), |
| 1213 | + keepAlive=keepAlive, |
| 1214 | + ) |
| 1215 | + return result |
| 1216 | + |
| 1217 | + def sendWallboxSetRequest( |
| 1218 | + self, |
| 1219 | + dataIndex: int, |
| 1220 | + value: int, |
| 1221 | + request: RscpTag = RscpTag.WB_REQ_SET_EXTERN, |
| 1222 | + wbIndex: int = 0, |
| 1223 | + keepAlive: bool = False, |
| 1224 | + ) -> bool: |
| 1225 | + """Sends a low-level set request with WB_EXTERN_DATA to the wallbox via rscp protocol locally and evaluates the response. |
| 1226 | +
|
| 1227 | + Args: |
| 1228 | + dataIndex (int): byte index in the WB_EXTERN_DATA array (values: 0-5) |
| 1229 | + value (int): byte value to be set in the WB_EXTERN_DATA array at the given index |
| 1230 | + request (Optional[RscpTag]): request identifier (WB_REQ_SET_EXTERN, WB_REQ_SET_PARAM_1 or WB_REQ_SET_PARAM_2), |
| 1231 | + wbIndex (Optional[int]): index of the requested wallbox, |
| 1232 | + keepAlive (Optional[bool]): True to keep connection alive |
| 1233 | +
|
| 1234 | + Returns: |
| 1235 | + True if success |
| 1236 | + False if error |
| 1237 | + """ |
| 1238 | + response = self.sendWallboxRequest( |
| 1239 | + dataIndex, value, request, wbIndex, keepAlive |
| 1240 | + ) |
| 1241 | + |
| 1242 | + if response[0] != RscpTag.WB_DATA.name: |
| 1243 | + return False |
| 1244 | + responseData = response[2][-1] |
| 1245 | + return ( |
| 1246 | + responseData[0][2:] == request.name[6:] |
| 1247 | + and responseData[1] != RscpType.Error.name |
| 1248 | + ) |
| 1249 | + |
| 1250 | + def set_battery_to_car_mode(self, enabled: bool, keepAlive: bool = False): |
| 1251 | + """Sets whether the wallbox may use the battery. |
| 1252 | +
|
| 1253 | + Args: |
| 1254 | + enabled (bool): True to enable charging the car using the battery |
| 1255 | + keepAlive (Optional[bool]): True to keep connection alive |
| 1256 | +
|
| 1257 | + Returns: |
| 1258 | + True if success |
| 1259 | + False if error |
| 1260 | + """ |
| 1261 | + enabledValue = 1 if enabled else 0 |
| 1262 | + |
| 1263 | + response = self.sendRequest( |
| 1264 | + (RscpTag.EMS_REQ_SET_BATTERY_TO_CAR_MODE, RscpType.UChar8, enabledValue), |
| 1265 | + keepAlive=keepAlive, |
| 1266 | + ) |
| 1267 | + |
| 1268 | + return response == ( |
| 1269 | + RscpTag.EMS_SET_BATTERY_TO_CAR_MODE.name, |
| 1270 | + RscpType.UChar8.name, |
| 1271 | + enabledValue, |
| 1272 | + ) |
| 1273 | + |
984 | 1274 | def get_batteries(self, keepAlive: bool = False): |
985 | 1275 | """Scans for installed batteries via rscp protocol. |
986 | 1276 |
|
|
0 commit comments