33#
44# Copyright 2017 Francesco Santini <francesco.santini@gmail.com>
55# Licensed under a MIT license. See LICENSE for details
6- from __future__ import annotations # required for python < 3.9
7-
86import datetime
97import hashlib
108import struct
119import time
1210import uuid
1311from calendar import monthrange
14- from typing import Any , Dict , List , Literal , Tuple
12+ from typing import Any , Literal
1513
1614from ._e3dc_rscp_local import (
1715 E3DC_RSCP_local ,
@@ -102,9 +100,9 @@ def __init__(self, connectType: int, **kwargs: Any) -> None:
102100 self .maxBatChargePower = None
103101 self .maxBatDischargePower = None
104102 self .startDischargeDefault = None
105- self .powermeters : List [ Dict [str , Any ]] = []
106- self .pvis : List [ Dict [str , Any ]] = []
107- self .batteries : List [ Dict [str , Any ]] = []
103+ self .powermeters : list [ dict [str , Any ]] = []
104+ self .pvis : list [ dict [str , Any ]] = []
105+ self .batteries : list [ dict [str , Any ]] = []
108106 self .pmIndexExt = None
109107
110108 if "configuration" in kwargs :
@@ -203,10 +201,10 @@ def _set_serial(self, serial: str):
203201
204202 def sendRequest (
205203 self ,
206- request : Tuple [str | int | RscpTag , str | int | RscpType , Any ],
204+ request : tuple [str | int | RscpTag , str | int | RscpType , Any ],
207205 retries : int = 3 ,
208206 keepAlive : bool = False ,
209- ) -> Tuple [str | int | RscpTag , str | int | RscpType , Any ]:
207+ ) -> tuple [str | int | RscpTag , str | int | RscpType , Any ]:
210208 """This function uses the RSCP interface to make a request.
211209
212210 Does make retries in case of exceptions like Socket.Error
@@ -362,7 +360,7 @@ def poll_switches(self, keepAlive: bool = False):
362360 descList = switchDesc [2 ] # get the payload of the container
363361 statusList = switchStatus [2 ]
364362
365- switchList : List [ Dict [str , Any ]] = []
363+ switchList : list [ dict [str , Any ]] = []
366364
367365 for switch in range (len (descList )):
368366 switchID = rscpFindTagIndex (descList [switch ], RscpTag .HA_DATAPOINT_INDEX )
@@ -466,7 +464,7 @@ def get_idle_periods(self, keepAlive: bool = False):
466464 if idlePeriodsRaw [0 ] != RscpTag .EMS_GET_IDLE_PERIODS :
467465 return None
468466
469- idlePeriods : Dict [str , List [ Dict [str , Any ]]] = {
467+ idlePeriods : dict [str , list [ dict [str , Any ]]] = {
470468 "idleCharge" : [] * 7 ,
471469 "idleDischarge" : [] * 7 ,
472470 }
@@ -497,7 +495,7 @@ def get_idle_periods(self, keepAlive: bool = False):
497495 return idlePeriods
498496
499497 def set_idle_periods (
500- self , idlePeriods : Dict [str , List [ Dict [str , Any ]]], keepAlive : bool = False
498+ self , idlePeriods : dict [str , list [ dict [str , Any ]]], keepAlive : bool = False
501499 ):
502500 """Set idle periods via rscp protocol.
503501
@@ -546,7 +544,7 @@ def set_idle_periods(
546544 True if success
547545 False if error
548546 """
549- periodList : List [ Tuple [RscpTag , RscpType , Any ]] = []
547+ periodList : list [ tuple [RscpTag , RscpType , Any ]] = []
550548
551549 if "idleCharge" not in idlePeriods and "idleDischarge" not in idlePeriods :
552550 raise ValueError ("neither key idleCharge nor idleDischarge in object" )
@@ -1185,7 +1183,7 @@ def sendWallboxRequest(
11851183 request : RscpTag = RscpTag .WB_REQ_SET_EXTERN ,
11861184 wbIndex : int = 0 ,
11871185 keepAlive : bool = False ,
1188- ) -> Tuple [str | int | RscpTag , str | int | RscpType , Any ]:
1186+ ) -> tuple [str | int | RscpTag , str | int | RscpType , Any ]:
11891187 """Sends a low-level request with WB_EXTERN_DATA to the wallbox via rscp protocol locally.
11901188
11911189 Args:
@@ -1294,7 +1292,7 @@ def get_batteries(self, keepAlive: bool = False):
12941292 ]
12951293 """
12961294 maxBatteries = 8
1297- outObj : List [ Dict [str , int ]] = []
1295+ outObj : list [ dict [str , int ]] = []
12981296 for batIndex in range (maxBatteries ):
12991297 try :
13001298 req = self .sendRequest (
@@ -1326,7 +1324,7 @@ def get_batteries(self, keepAlive: bool = False):
13261324 def get_battery_data (
13271325 self ,
13281326 batIndex : int | None = None ,
1329- dcbs : List [int ] | None = None ,
1327+ dcbs : list [int ] | None = None ,
13301328 keepAlive : bool = False ,
13311329 ):
13321330 """Polls the battery data via rscp protocol.
@@ -1470,7 +1468,7 @@ def get_battery_data(
14701468 dcbCount = rscpFindTagIndex (req , RscpTag .BAT_DCB_COUNT )
14711469 deviceStateContainer = rscpFindTag (req , RscpTag .BAT_DEVICE_STATE )
14721470
1473- outObj : Dict [str , Any ] = {
1471+ outObj : dict [str , Any ] = {
14741472 "asoc" : rscpFindTagIndex (req , RscpTag .BAT_ASOC ),
14751473 "chargeCycles" : rscpFindTagIndex (req , RscpTag .BAT_CHARGE_CYCLES ),
14761474 "current" : rscpFindTagIndex (req , RscpTag .BAT_CURRENT ),
@@ -1555,9 +1553,9 @@ def get_battery_data(
15551553
15561554 # Initialize default values for DCB
15571555 sensorCount = 0
1558- temperatures : List [float ] = []
1556+ temperatures : list [float ] = []
15591557 seriesCellCount = 0
1560- voltages : List [float ] = []
1558+ voltages : list [float ] = []
15611559
15621560 # Set temperatures, if available for the device
15631561 temperatures_raw = rscpFindTag (req , RscpTag .BAT_DCB_ALL_CELL_TEMPERATURES )
@@ -1586,7 +1584,7 @@ def get_battery_data(
15861584 for cell_voltage in voltages_data :
15871585 voltages .append (cell_voltage [2 ])
15881586
1589- dcbobj : Dict [str , Any ] = {
1587+ dcbobj : dict [str , Any ] = {
15901588 "current" : rscpFindTagIndex (info , RscpTag .BAT_DCB_CURRENT ),
15911589 "currentAvg30s" : rscpFindTagIndex (
15921590 info , RscpTag .BAT_DCB_CURRENT_AVG_30S
@@ -1655,7 +1653,7 @@ def get_battery_data(
16551653 return outObj
16561654
16571655 def get_batteries_data (
1658- self , batteries : List [ Dict [str , Any ]] | None = None , keepAlive : bool = False
1656+ self , batteries : list [ dict [str , Any ]] | None = None , keepAlive : bool = False
16591657 ):
16601658 """Polls the batteries data via rscp protocol.
16611659
@@ -1669,7 +1667,7 @@ def get_batteries_data(
16691667 if batteries is None :
16701668 batteries = self .batteries
16711669
1672- outObj : List [ Dict [str , Any ]] = []
1670+ outObj : list [ dict [str , Any ]] = []
16731671
16741672 for battery in batteries :
16751673 if "dcbs" in battery :
@@ -1703,7 +1701,7 @@ def get_pvis(self, keepAlive: bool = False):
17031701 ]
17041702 """
17051703 maxPvis = 8
1706- outObj : List [ Dict [str , Any ]] = []
1704+ outObj : list [ dict [str , Any ]] = []
17071705 for pviIndex in range (maxPvis ):
17081706 req = self .sendRequest (
17091707 (
@@ -1743,8 +1741,8 @@ def get_pvis(self, keepAlive: bool = False):
17431741 def get_pvi_data (
17441742 self ,
17451743 pviIndex : int | None = None ,
1746- strings : List [int ] | None = None ,
1747- phases : List [int ] | None = None ,
1744+ strings : list [int ] | None = None ,
1745+ phases : list [int ] | None = None ,
17481746 keepAlive : bool = False ,
17491747 ):
17501748 """Polls the inverter data via rscp protocol.
@@ -1861,7 +1859,7 @@ def get_pvi_data(
18611859 frequency = rscpFindTag (req , RscpTag .PVI_FREQUENCY_UNDER_OVER )
18621860 deviceState = rscpFindTag (req , RscpTag .PVI_DEVICE_STATE )
18631861
1864- outObj : Dict [str , Any ] = {
1862+ outObj : dict [str , Any ] = {
18651863 "acMaxApparentPower" : rscpFindTagIndex (
18661864 rscpFindTag (req , RscpTag .PVI_AC_MAX_APPARENTPOWER ), RscpTag .PVI_VALUE
18671865 ),
@@ -2040,7 +2038,7 @@ def get_pvi_data(
20402038 return outObj
20412039
20422040 def get_pvis_data (
2043- self , pvis : List [ Dict [str , Any ]] | None = None , keepAlive : bool = False
2041+ self , pvis : list [ dict [str , Any ]] | None = None , keepAlive : bool = False
20442042 ):
20452043 """Polls the inverters data via rscp protocol.
20462044
@@ -2054,7 +2052,7 @@ def get_pvis_data(
20542052 if pvis is None :
20552053 pvis = self .pvis
20562054
2057- outObj : List [ Dict [str , Any ]] = []
2055+ outObj : list [ dict [str , Any ]] = []
20582056
20592057 for pvi in pvis :
20602058 if "strings" in pvi :
@@ -2095,7 +2093,7 @@ def get_powermeters(self, keepAlive: bool = False):
20952093 ]
20962094 """
20972095 maxPowermeters = 8
2098- outObj : List [ Dict [str , Any ]] = []
2096+ outObj : list [ dict [str , Any ]] = []
20992097 for pmIndex in range (
21002098 maxPowermeters
21012099 ): # max 8 powermeters according to E3DC spec
@@ -2212,7 +2210,7 @@ def get_powermeter_data(self, pmIndex: int | None = None, keepAlive: bool = Fals
22122210 return outObj
22132211
22142212 def get_powermeters_data (
2215- self , powermeters : List [ Dict [str , Any ]] | None = None , keepAlive : bool = False
2213+ self , powermeters : list [ dict [str , Any ]] | None = None , keepAlive : bool = False
22162214 ):
22172215 """Polls the powermeters data via rscp protocol.
22182216
@@ -2226,7 +2224,7 @@ def get_powermeters_data(
22262224 if powermeters is None :
22272225 powermeters = self .powermeters
22282226
2229- outObj : List [ Dict [str , Any ]] = []
2227+ outObj : list [ dict [str , Any ]] = []
22302228
22312229 for powermeter in powermeters :
22322230 outObj .append (
0 commit comments