diff --git a/README.md b/README.md index 99e3c6fbbe..fc871294ea 100644 --- a/README.md +++ b/README.md @@ -396,6 +396,8 @@ replacement = { INPUT_CLUSTERS: [BinaryInput.cluster_id], OUTPUT_CLUSTERS: [BinaryInput.cluster_id, Groups.cluster_id], }, + }, +} ``` You can see that we have replaced `ElectricalMeasurement.cluster_id` from endpoint 1 in the `signature` dict with the name of our cluster that we created: `ElectricalMeasurementCluster` and on endpoint 2 we replaced `AnalogInput.cluster_id` with the implementation we created for that: `AnalogInputCluster`. This instructs Zigpy to use these `CustomCluster` derivatives instead of the normal cluster definitions for these clusters and this is why this part of the quirk is called `replacement`. diff --git a/tests/test_aqara_trv.py b/tests/test_aqara_trv.py new file mode 100644 index 0000000000..68dae606c4 --- /dev/null +++ b/tests/test_aqara_trv.py @@ -0,0 +1,149 @@ +"""Tests for Aqara E1 thermostat.""" + +from unittest import mock + +import pytest +from zigpy.zcl import foundation + +from zhaquirks.xiaomi.aqara.thermostat_agl001 import ( + AGL001, + SENSOR, + SENSOR_ATTR, + SENSOR_TEMP, +) + + +@pytest.mark.parametrize("quirk", (AGL001,)) +async def test_external_sensor_mode(zigpy_device_from_quirk, quirk): + """Test Aqara E1 thermostat external sensor mode setting.""" + + # Create virtual device from the quirk + thermostat_dev = zigpy_device_from_quirk(quirk) + + # Access the Aqara specific cluster + aqara_cluster = thermostat_dev.endpoints[1].opple_cluster + + # Simulate a successful response for multiple calls + async def async_success(*args, **kwargs): + return [foundation.Status.SUCCESS] + + # Test changing to external sensor mode (1) + with mock.patch.object(aqara_cluster, "request", side_effect=async_success) as m1: + await aqara_cluster.write_attributes({SENSOR: 1}) + + # Verify that the request was called twice (once for each write_attributes call) + assert m1.call_count == 2 + + # Verify that the SENSOR_ATTR attribute was used in both calls + first_call_args = m1.call_args_list[0][0] + second_call_args = m1.call_args_list[1][0] + + assert first_call_args[1] == foundation.GeneralCommand.Write_Attributes + assert second_call_args[1] == foundation.GeneralCommand.Write_Attributes + + # Verify that the SENSOR_ATTR is present in the attributes list + assert any(attr.attrid == SENSOR_ATTR for attr in first_call_args[3]) + assert any(attr.attrid == SENSOR_ATTR for attr in second_call_args[3]) + + # Get the attribute values + first_attr = next( + attr for attr in first_call_args[3] if attr.attrid == SENSOR_ATTR + ) + second_attr = next( + attr for attr in second_call_args[3] if attr.attrid == SENSOR_ATTR + ) + + first_attr_value = first_attr.value.value + second_attr_value = second_attr.value.value + + assert first_attr_value.startswith(b"\xaa\x71") + assert b"\x02" in first_attr_value # Action code for external sensor + + assert second_attr_value.startswith(b"\xaa\x71") + assert b"\x02" in second_attr_value # Action code for external sensor + + +@pytest.mark.parametrize("quirk", (AGL001,)) +async def test_internal_sensor_mode(zigpy_device_from_quirk, quirk): + """Test Aqara E1 thermostat internal sensor mode setting.""" + + # Create virtual device from the quirk + thermostat_dev = zigpy_device_from_quirk(quirk) + + # Access the Aqara specific cluster + aqara_cluster = thermostat_dev.endpoints[1].opple_cluster + + # Simulate a successful response for multiple calls + async def async_success(*args, **kwargs): + return [foundation.Status.SUCCESS] + + # Test changing to internal sensor mode (0) + with mock.patch.object(aqara_cluster, "request", side_effect=async_success) as m1: + await aqara_cluster.write_attributes({SENSOR: 0}) + + # Verify that the request was called twice (once for each write_attributes call) + assert m1.call_count == 2 + + # Verify that the SENSOR_ATTR attribute was used in both calls + first_call_args = m1.call_args_list[0][0] + second_call_args = m1.call_args_list[1][0] + + assert first_call_args[1] == foundation.GeneralCommand.Write_Attributes + assert second_call_args[1] == foundation.GeneralCommand.Write_Attributes + + # Verify that the SENSOR_ATTR is present in the attributes list + assert any(attr.attrid == SENSOR_ATTR for attr in first_call_args[3]) + assert any(attr.attrid == SENSOR_ATTR for attr in second_call_args[3]) + + # Get the attribute values + first_attr = next( + attr for attr in first_call_args[3] if attr.attrid == SENSOR_ATTR + ) + second_attr = next( + attr for attr in second_call_args[3] if attr.attrid == SENSOR_ATTR + ) + + first_attr_value = first_attr.value.value + second_attr_value = second_attr.value.value + + assert first_attr_value.startswith(b"\xaa\x71") + assert b"\x04" in first_attr_value # Action code for internal sensor + + assert second_attr_value.startswith(b"\xaa\x71") + assert b"\x04" in second_attr_value # Action code for internal sensor + + +@pytest.mark.parametrize("quirk", (AGL001,)) +async def test_external_sensor_temperature(zigpy_device_from_quirk, quirk): + """Test Aqara E1 thermostat external temperature setting.""" + + # Create virtual device from the quirk + thermostat_dev = zigpy_device_from_quirk(quirk) + + # Access the Aqara specific cluster + aqara_cluster = thermostat_dev.endpoints[1].opple_cluster + + # Simulate a successful response + async def async_success(*args, **kwargs): + return [foundation.Status.SUCCESS] + + # Test sending an external temperature (2500 = 25.00°C) + with mock.patch.object(aqara_cluster, "request", side_effect=async_success) as m1: + await aqara_cluster.write_attributes({SENSOR_TEMP: 2500}) + + # Verify that the request was called + assert m1.call_count == 1 + + # Verify that the SENSOR_ATTR attribute was used + args = m1.call_args[0] + assert args[1] == foundation.GeneralCommand.Write_Attributes + attr = next(attr for attr in args[3] if attr.attrid == SENSOR_ATTR) + + # Verify that the Aqara header is present + attr_value = attr.value.value + assert attr_value.startswith(b"\xaa\x71") + assert b"\x05" in attr_value # Action code for setting temperature + + # Verify that the temperature value is present + sensor_id = b"\x00\x15\x8d\x00\x01\x9d\x1b\x98" + assert sensor_id in attr_value diff --git a/zhaquirks/xiaomi/aqara/thermostat_agl001.py b/zhaquirks/xiaomi/aqara/thermostat_agl001.py index 0fdb90bda1..0ad963e6c1 100644 --- a/zhaquirks/xiaomi/aqara/thermostat_agl001.py +++ b/zhaquirks/xiaomi/aqara/thermostat_agl001.py @@ -5,6 +5,7 @@ from functools import reduce import math import struct +import time from typing import Any from zigpy.profiles import zha @@ -49,8 +50,14 @@ SENSOR = 0x027E BATTERY_PERCENTAGE = 0x040A +SENSOR_TEMP = 0x1392 # Fake address to pass external sensor temperature +SENSOR_ATTR = 0xFFF2 +SENSOR_ATTR_NAME = "sensor_attr" + XIAOMI_CLUSTER_ID = 0xFCC0 +SENSOR_ID = bytearray.fromhex("00158d00019d1b98") + DAYS_MAP = { "mon": 0x02, "tue": 0x04, @@ -145,7 +152,8 @@ class ScheduleEvent: _is_next_day = False def __init__(self, value, is_next_day=False): - """Create ScheduleEvent object from bytes or string.""" + """Initialize schedule event from bytes or string.""" + if isinstance(value, bytes): self._verify_buffer_len(value) self._time = self._read_time_from_buf(value) @@ -178,8 +186,8 @@ def _read_time_from_buf(buf): return time @staticmethod - def _parse_time(string): - parts = string.split(":") + def _parse_time(str): + parts = str.split(":") if len(parts) != 2: raise ValueError("Time must contain ':' separator") @@ -193,8 +201,8 @@ def _read_temp_from_buf(buf): return struct.unpack_from(">H", buf, offset=4)[0] / 100 @staticmethod - def _parse_temp(string): - return float(string) + def _parse_temp(str): + return float(str) @staticmethod def _validate_time(time): @@ -222,23 +230,23 @@ def _write_temp_to_buf(self, buf): struct.pack_into(">H", buf, 4, int(self._temp * 100)) def is_next_day(self): - """Return if event is on the next day.""" + """Return whether the event is for the next day.""" return self._is_next_day def set_next_day(self, is_next_day): - """Set if event is on the next day.""" + """Set whether the event is for the next day.""" self._is_next_day = is_next_day def get_time(self): - """Return event time.""" + """Return the time of the event in minutes.""" return self._time def __str__(self): - """Return event as string.""" + """Return string representation of the event.""" return f"{math.floor(self._time / 60)}:{f'{self._time % 60:0>2}'},{f'{self._temp:.1f}'}" def serialize(self): - """Serialize event to bytes.""" + """Serialize the event to bytes.""" result = bytearray(6) self._write_time_to_buf(result) self._write_temp_to_buf(result) @@ -249,7 +257,7 @@ class ScheduleSettings(t.LVBytes): """Schedule settings object.""" def __new__(cls, value): - """Create ScheduleSettings object from bytes or string.""" + """Create a new schedule settings object from bytes or string.""" day_selection = None events = [None] * 4 if isinstance(value, bytes): @@ -316,9 +324,10 @@ def _read_day_selection(value): byte = struct.unpack_from("c", value, offset=1)[0][0] if byte & 0x01: raise ValueError("Incorrect day selected") - for i, v in DAYS_MAP.items(): - if byte & v: + for i, day_code in DAYS_MAP.items(): + if byte & day_code: day_selection.append(i) + ScheduleSettings._verify_day_selection_in_str(day_selection) elif isinstance(value, str): day_selection = value.split(",") @@ -358,7 +367,7 @@ def _get_day_selection_byte(day_selection): return byte def __str__(self): - """Return ScheduleSettings as string.""" + """Return string representation of the schedule settings.""" day_selection = ScheduleSettings._read_day_selection(self) events = [None] * 4 for i in range(4): @@ -388,6 +397,8 @@ class AqaraThermostatSpecificCluster(XiaomiAqaraE1Cluster): SCHEDULE_SETTINGS: ("schedule_settings", ScheduleSettings, True), SENSOR: ("sensor", t.uint8_t, True), BATTERY_PERCENTAGE: ("battery_percentage", t.uint8_t, True), + SENSOR_TEMP: ("sensor_temp", t.uint32_t, True), + SENSOR_ATTR: (SENSOR_ATTR_NAME, t.LVBytes, True), } ) @@ -402,6 +413,136 @@ def _update_attribute(self, attrid, value): ) super()._update_attribute(attrid, value) + def aqara_header(self, counter: int, params: bytearray, action: int) -> bytearray: + """Create Aqara header for setting external sensor.""" + header = bytes([0xAA, 0x71, len(params) + 3, 0x44, counter]) + integrity = 512 - sum(header) + + return header + bytes([integrity, action, 0x41, len(params)]) + + def _float_to_hex(self, f): + """Convert float to hex.""" + return hex(struct.unpack(" list: + """Write attributes to device with internal 'attributes' validation.""" + attrs: dict[str | int, Any] = {} + + for attr, value in attributes.items(): + # implemented with help from https://github.com/Koenkk/zigbee-herdsman-converters/blob/master/devices/xiaomi.js + attr_def = self.find_attribute(attr) + + if attr_def and attr_def.id == SENSOR_TEMP: + # set external sensor temp. this function expect value to be passed multiplied by 100 + temperatureBuf = bytearray.fromhex( + self._float_to_hex(round(float(value)))[2:] + ) + + params = SENSOR_ID + params += bytes([0x00, 0x01, 0x00, 0x55]) + params += temperatureBuf + + attrs[SENSOR_ATTR_NAME] = self.aqara_header(0x12, params, 0x05) + params + + elif attr_def and attr_def.id == SENSOR: + # set internal/external temperature sensor + device = bytearray.fromhex( + f"{self.endpoint.device.ieee}".replace(":", "") + ) + + timestamp = bytes(reversed(t.uint32_t(int(time.time())).serialize())) + + if value == 0: + # internal sensor + params1 = timestamp + params1 += bytes([0x3D, 0x05]) + params1 += device + params1 += bytes(12) + + params2 = timestamp + params2 += bytes([0x3D, 0x04]) + params2 += device + params2 += bytes(12) + + attrs1 = {} + attrs1[SENSOR_ATTR_NAME] = ( + self.aqara_header(0x12, params1, 0x04) + params1 + ) + attrs[SENSOR_ATTR_NAME] = ( + self.aqara_header(0x13, params2, 0x04) + params2 + ) + + await super().write_attributes(attrs1, manufacturer) + else: + # external sensor + params1 = timestamp + params1 += bytes([0x3D, 0x04]) + params1 += device + params1 += SENSOR_ID + params1 += bytes([0x00, 0x01, 0x00, 0x55]) + params1 += bytes( + [ + 0x13, + 0x0A, + 0x02, + 0x00, + 0x00, + 0x64, + 0x04, + 0xCE, + 0xC2, + 0xB6, + 0xC8, + ] + ) + params1 += bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x3D]) + params1 += bytes([0x64]) + params1 += bytes([0x65]) + + params2 = timestamp + params2 += bytes([0x3D, 0x05]) + params2 += device + params2 += SENSOR_ID + params2 += bytes([0x08, 0x00, 0x07, 0xFD]) + params2 += bytes( + [ + 0x16, + 0x0A, + 0x02, + 0x0A, + 0xC9, + 0xE8, + 0xB1, + 0xB8, + 0xD4, + 0xDA, + 0xCF, + 0xDF, + 0xC0, + 0xEB, + ] + ) + params2 += bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x3D]) + params2 += bytes([0x04]) + params2 += bytes([0x65]) + + attrs1 = {} + attrs1[SENSOR_ATTR_NAME] = ( + self.aqara_header(0x12, params1, 0x02) + params1 + ) + attrs[SENSOR_ATTR_NAME] = ( + self.aqara_header(0x13, params2, 0x02) + params2 + ) + + await super().write_attributes(attrs1, manufacturer) + else: + attrs[attr] = value + + result = await super().write_attributes(attrs, manufacturer) + return result + class AGL001(XiaomiCustomDevice): """Aqara E1 Radiator Thermostat (AGL001) Device."""