You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			1374 lines
		
	
	
		
			50 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			1374 lines
		
	
	
		
			50 KiB
		
	
	
	
		
			Python
		
	
| # -*- coding: utf-8 -*-
 | |
| """
 | |
| Copyright (C) 2024 Xiaomi Corporation.
 | |
| 
 | |
| The ownership and intellectual property rights of Xiaomi Home Assistant
 | |
| Integration and related Xiaomi cloud service API interface provided under this
 | |
| license, including source code and object code (collectively, "Licensed Work"),
 | |
| are owned by Xiaomi. Subject to the terms and conditions of this License, Xiaomi
 | |
| hereby grants you a personal, limited, non-exclusive, non-transferable,
 | |
| non-sublicensable, and royalty-free license to reproduce, use, modify, and
 | |
| distribute the Licensed Work only for your use of Home Assistant for
 | |
| non-commercial purposes. For the avoidance of doubt, Xiaomi does not authorize
 | |
| you to use the Licensed Work for any other purpose, including but not limited
 | |
| to use Licensed Work to develop applications (APP), Web services, and other
 | |
| forms of software.
 | |
| 
 | |
| You may reproduce and distribute copies of the Licensed Work, with or without
 | |
| modifications, whether in source or object form, provided that you must give
 | |
| any other recipients of the Licensed Work a copy of this License and retain all
 | |
| copyright and disclaimers.
 | |
| 
 | |
| Xiaomi provides the Licensed Work on an "AS IS" BASIS, WITHOUT WARRANTIES OR
 | |
| CONDITIONS OF ANY KIND, either express or implied, including, without
 | |
| limitation, any warranties, undertakes, or conditions of TITLE, NO ERROR OR
 | |
| OMISSION, CONTINUITY, RELIABILITY, NON-INFRINGEMENT, MERCHANTABILITY, or
 | |
| FITNESS FOR A PARTICULAR PURPOSE. In any event, you are solely responsible
 | |
| for any direct, indirect, special, incidental, or consequential damages or
 | |
| losses arising from the use or inability to use the Licensed Work.
 | |
| 
 | |
| Xiaomi reserves all rights not expressly granted to you in this License.
 | |
| Except for the rights expressly granted by Xiaomi under this License, Xiaomi
 | |
| does not authorize you in any form to use the trademarks, copyrights, or other
 | |
| forms of intellectual property rights of Xiaomi and its affiliates, including,
 | |
| without limitation, without obtaining other written permission from Xiaomi, you
 | |
| shall not use "Xiaomi", "Mijia" and other words related to Xiaomi or words that
 | |
| may make the public associate with Xiaomi in any form to publicize or promote
 | |
| the software or hardware devices that use the Licensed Work.
 | |
| 
 | |
| Xiaomi has the right to immediately terminate all your authorization under this
 | |
| License in the event:
 | |
| 1. You assert patent invalidation, litigation, or other claims against patents
 | |
| or other intellectual property rights of Xiaomi or its affiliates; or,
 | |
| 2. You make, have made, manufacture, sell, or offer to sell products that knock
 | |
| off Xiaomi or its affiliates' products.
 | |
| 
 | |
| MIoT lan device control, only support MIoT SPEC-v2 WiFi devices.
 | |
| """
 | |
| 
 | |
| 
 | |
| import json
 | |
| import time
 | |
| import asyncio
 | |
| from dataclasses import dataclass
 | |
| from enum import Enum, auto
 | |
| import logging
 | |
| import random
 | |
| import secrets
 | |
| import socket
 | |
| import struct
 | |
| import threading
 | |
| from typing import Any, Callable, Coroutine, Optional, final
 | |
| from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
 | |
| from cryptography.hazmat.primitives import padding
 | |
| from cryptography.hazmat.backends import default_backend
 | |
| from cryptography.hazmat.primitives import hashes
 | |
| 
 | |
| # pylint: disable=relative-beyond-top-level
 | |
| from .miot_error import MIoTError, MIoTLanError, MIoTErrorCode
 | |
| from .miot_network import InterfaceStatus, MIoTNetwork, NetworkInfo
 | |
| from .miot_mdns import MipsService, MipsServiceState
 | |
| from .common import (
 | |
|     randomize_float, load_yaml_file, gen_absolute_path, MIoTMatcher)
 | |
| 
 | |
| 
 | |
| _LOGGER = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class _MIoTLanGetDevListData:
 | |
|     handler: Callable[[dict, Any], None]
 | |
|     handler_ctx: Any
 | |
|     timeout_ms: int
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class _MIoTLanUnregisterBroadcastData:
 | |
|     key: str
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class _MIoTLanRegisterBroadcastData:
 | |
|     key: str
 | |
|     handler: Callable[[dict, Any], None]
 | |
|     handler_ctx: Any
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class _MIoTLanUnsubDeviceData:
 | |
|     key: str
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class _MIoTLanSubDeviceData:
 | |
|     key: str
 | |
|     handler: Callable[[str, dict, Any], Coroutine]
 | |
|     handler_ctx: Any
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class _MIoTLanNetworkUpdateData:
 | |
|     status: InterfaceStatus
 | |
|     if_name: str
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class _MIoTLanRequestData:
 | |
|     msg_id: int
 | |
|     handler: Optional[Callable[[dict, Any], None]]
 | |
|     handler_ctx: Any
 | |
|     timeout: Optional[asyncio.TimerHandle]
 | |
| 
 | |
| 
 | |
| class _MIoTLanDeviceState(Enum):
 | |
|     FRESH = 0
 | |
|     PING1 = auto()
 | |
|     PING2 = auto()
 | |
|     PING3 = auto()
 | |
|     DEAD = auto()
 | |
| 
 | |
| 
 | |
| class _MIoTLanDevice:
 | |
|     """MIoT lan device."""
 | |
|     # pylint: disable=unused-argument
 | |
|     OT_HEADER: int = 0x2131
 | |
|     OT_HEADER_LEN: int = 32
 | |
|     NETWORK_UNSTABLE_CNT_TH: int = 10
 | |
|     NETWORK_UNSTABLE_TIME_TH: float = 120
 | |
|     NETWORK_UNSTABLE_RESUME_TH: float = 300
 | |
|     FAST_PING_INTERVAL: float = 5
 | |
|     CONSTRUCT_STATE_PENDING: float = 15
 | |
|     KA_INTERVAL_MIN: float = 10
 | |
|     KA_INTERVAL_MAX: float = 50
 | |
| 
 | |
|     did: str
 | |
|     token: bytes
 | |
|     cipher: Cipher
 | |
|     ip: Optional[str]
 | |
| 
 | |
|     offset: int
 | |
|     subscribed: bool
 | |
|     sub_ts: int
 | |
|     supported_wildcard_sub: bool
 | |
| 
 | |
|     _manager: 'MIoTLan'
 | |
|     _if_name: Optional[str]
 | |
|     _sub_locked: bool
 | |
|     _state: _MIoTLanDeviceState
 | |
|     _online: bool
 | |
|     _online_offline_history: list[dict[str, Any]]
 | |
|     _online_offline_timer: Optional[asyncio.TimerHandle]
 | |
| 
 | |
|     _ka_timer: Optional[asyncio.TimerHandle]
 | |
|     _ka_internal: float
 | |
| 
 | |
| # All functions SHOULD be called from the internal loop
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         manager: 'MIoTLan',
 | |
|         did: str,
 | |
|         token: str,
 | |
|         ip: Optional[str] = None
 | |
|     ) -> None:
 | |
|         self._manager: MIoTLan = manager
 | |
|         self.did = did
 | |
|         self.token = bytes.fromhex(token)
 | |
|         aes_key: bytes = self.__md5(self.token)
 | |
|         aex_iv: bytes = self.__md5(aes_key + self.token)
 | |
|         self.cipher = Cipher(
 | |
|             algorithms.AES128(aes_key), modes.CBC(aex_iv), default_backend())
 | |
|         self.ip = ip
 | |
|         self.offset = 0
 | |
|         self.subscribed = False
 | |
|         self.sub_ts = 0
 | |
|         self.supported_wildcard_sub = False
 | |
|         self._if_name = None
 | |
|         self._sub_locked = False
 | |
|         self._state = _MIoTLanDeviceState.DEAD
 | |
|         self._online = False
 | |
|         self._online_offline_history = []
 | |
|         self._online_offline_timer = None
 | |
| 
 | |
|         def ka_init_handler() -> None:
 | |
|             self._ka_internal = self.KA_INTERVAL_MIN
 | |
|             self.__update_keep_alive(state=_MIoTLanDeviceState.DEAD)
 | |
|         self._ka_timer = self._manager.internal_loop.call_later(
 | |
|             randomize_float(self.CONSTRUCT_STATE_PENDING, 0.5),
 | |
|             ka_init_handler,)
 | |
|         _LOGGER.debug('miot lan device add, %s', self.did)
 | |
| 
 | |
|     def keep_alive(self, ip: str, if_name: str) -> None:
 | |
|         self.ip = ip
 | |
|         if self._if_name != if_name:
 | |
|             self._if_name = if_name
 | |
|             _LOGGER.info(
 | |
|                 'device if_name change, %s, %s', self._if_name, self.did)
 | |
|         self.__update_keep_alive(state=_MIoTLanDeviceState.FRESH)
 | |
| 
 | |
|     @property
 | |
|     def online(self) -> bool:
 | |
|         return self._online
 | |
| 
 | |
|     @online.setter
 | |
|     def online(self, online: bool) -> None:
 | |
|         if self._online == online:
 | |
|             return
 | |
|         self._online = online
 | |
|         self._manager.broadcast_device_state(
 | |
|             did=self.did, state={
 | |
|                 'online': self._online, 'push_available': self.subscribed})
 | |
| 
 | |
|     @property
 | |
|     def if_name(self) -> Optional[str]:
 | |
|         return self._if_name
 | |
| 
 | |
|     def gen_packet(
 | |
|         self, out_buffer: bytearray, clear_data: dict, did: str, offset: int
 | |
|     ) -> int:
 | |
|         clear_bytes = json.dumps(clear_data).encode('utf-8')
 | |
|         padder = padding.PKCS7(algorithms.AES128.block_size).padder()
 | |
|         padded_data = padder.update(clear_bytes) + padder.finalize()
 | |
|         if len(padded_data) + self.OT_HEADER_LEN > len(out_buffer):
 | |
|             raise ValueError('rpc too long')
 | |
|         encryptor = self.cipher.encryptor()
 | |
|         encrypted_data = encryptor.update(padded_data) + encryptor.finalize()
 | |
|         data_len: int = len(encrypted_data)+self.OT_HEADER_LEN
 | |
|         out_buffer[:32] = struct.pack(
 | |
|             '>HHQI16s', self.OT_HEADER, data_len, int(did), offset,
 | |
|             self.token)
 | |
|         out_buffer[32:data_len] = encrypted_data
 | |
|         msg_md5: bytes = self.__md5(out_buffer[0:data_len])
 | |
|         out_buffer[16:32] = msg_md5
 | |
|         return data_len
 | |
| 
 | |
|     def decrypt_packet(self, encrypted_data: bytearray) -> dict:
 | |
|         data_len: int = struct.unpack('>H', encrypted_data[2:4])[0]
 | |
|         md5_orig: bytes = encrypted_data[16:32]
 | |
|         encrypted_data[16:32] = self.token
 | |
|         md5_calc: bytes = self.__md5(encrypted_data[0:data_len])
 | |
|         if md5_orig != md5_calc:
 | |
|             raise ValueError(f'invalid md5, {md5_orig}, {md5_calc}')
 | |
|         decryptor = self.cipher.decryptor()
 | |
|         decrypted_padded_data = decryptor.update(
 | |
|             encrypted_data[32:data_len]) + decryptor.finalize()
 | |
|         unpadder = padding.PKCS7(algorithms.AES128.block_size).unpadder()
 | |
|         decrypted_data = unpadder.update(
 | |
|             decrypted_padded_data) + unpadder.finalize()
 | |
|         # Some device will add a redundant \0 at the end of JSON string
 | |
|         decrypted_data = decrypted_data.rstrip(b'\x00')
 | |
|         return json.loads(decrypted_data)
 | |
| 
 | |
|     def subscribe(self) -> None:
 | |
|         if self._sub_locked:
 | |
|             return
 | |
|         self._sub_locked = True
 | |
|         try:
 | |
|             sub_ts: int = int(time.time())
 | |
|             self._manager.send2device(
 | |
|                 did=self.did,
 | |
|                 msg={
 | |
|                     'method': 'miIO.sub',
 | |
|                     'params': {
 | |
|                         'version': '2.0',
 | |
|                         'did': self._manager.virtual_did,
 | |
|                         'update_ts': sub_ts,
 | |
|                         'sub_method': '.'
 | |
|                     }
 | |
|                 },
 | |
|                 handler=self.__subscribe_handler,
 | |
|                 handler_ctx=sub_ts,
 | |
|                 timeout_ms=5000)
 | |
|         except Exception as err:  # pylint: disable=broad-exception-caught
 | |
|             _LOGGER.error('subscribe device error, %s', err)
 | |
| 
 | |
|         self._sub_locked = False
 | |
| 
 | |
|     def unsubscribe(self) -> None:
 | |
|         if not self.subscribed:
 | |
|             return
 | |
|         self._manager.send2device(
 | |
|             did=self.did,
 | |
|             msg={
 | |
|                 'method': 'miIO.unsub',
 | |
|                 'params': {
 | |
|                     'version': '2.0',
 | |
|                     'did': self._manager.virtual_did,
 | |
|                     'update_ts': self.sub_ts or 0,
 | |
|                     'sub_method': '.'
 | |
|                 }
 | |
|             },
 | |
|             handler=self.__unsubscribe_handler,
 | |
|             timeout_ms=5000)
 | |
|         self.subscribed = False
 | |
|         self._manager.broadcast_device_state(
 | |
|             did=self.did, state={
 | |
|                 'online': self._online, 'push_available': self.subscribed})
 | |
| 
 | |
|     def on_delete(self) -> None:
 | |
|         if self._ka_timer:
 | |
|             self._ka_timer.cancel()
 | |
|             self._ka_timer = None
 | |
|         if self._online_offline_timer:
 | |
|             self._online_offline_timer.cancel()
 | |
|             self._online_offline_timer = None
 | |
|         _LOGGER.debug('miot lan device delete, %s', self.did)
 | |
| 
 | |
|     def update_info(self, info: dict) -> None:
 | |
|         if (
 | |
|             'token' in info
 | |
|             and len(info['token']) == 32
 | |
|             and info['token'].upper() != self.token.hex().upper()
 | |
|         ):
 | |
|             # Update token
 | |
|             self.token = bytes.fromhex(info['token'])
 | |
|             aes_key: bytes = self.__md5(self.token)
 | |
|             aex_iv: bytes = self.__md5(aes_key + self.token)
 | |
|             self.cipher = Cipher(
 | |
|                 algorithms.AES128(aes_key),
 | |
|                 modes.CBC(aex_iv), default_backend())
 | |
|             _LOGGER.debug('update token, %s', self.did)
 | |
| 
 | |
|     def __subscribe_handler(self, msg: dict, sub_ts: int) -> None:
 | |
|         if (
 | |
|             'result' not in msg
 | |
|             or 'code' not in msg['result']
 | |
|             or msg['result']['code'] != 0
 | |
|         ):
 | |
|             _LOGGER.error('subscribe device error, %s, %s', self.did, msg)
 | |
|             return
 | |
|         self.subscribed = True
 | |
|         self.sub_ts = sub_ts
 | |
|         self._manager.broadcast_device_state(
 | |
|             did=self.did, state={
 | |
|                 'online': self._online, 'push_available': self.subscribed})
 | |
|         _LOGGER.info('subscribe success, %s, %s', self._if_name, self.did)
 | |
| 
 | |
|     def __unsubscribe_handler(self, msg: dict, ctx: Any) -> None:
 | |
|         if (
 | |
|             'result' not in msg
 | |
|             or 'code' not in msg['result']
 | |
|             or msg['result']['code'] != 0
 | |
|         ):
 | |
|             _LOGGER.error('unsubscribe device error, %s, %s', self.did, msg)
 | |
|             return
 | |
|         _LOGGER.info('unsubscribe success, %s, %s', self._if_name, self.did)
 | |
| 
 | |
|     def __update_keep_alive(self, state: _MIoTLanDeviceState) -> None:
 | |
|         last_state: _MIoTLanDeviceState = self._state
 | |
|         self._state = state
 | |
|         if self._state != _MIoTLanDeviceState.FRESH:
 | |
|             _LOGGER.debug('device status, %s, %s', self.did, self._state)
 | |
|         if self._ka_timer:
 | |
|             self._ka_timer.cancel()
 | |
|             self._ka_timer = None
 | |
|         match state:
 | |
|             case _MIoTLanDeviceState.FRESH:
 | |
|                 if last_state == _MIoTLanDeviceState.DEAD:
 | |
|                     self._ka_internal = self.KA_INTERVAL_MIN
 | |
|                     self.__change_online(True)
 | |
|                 self._ka_timer = self._manager.internal_loop.call_later(
 | |
|                     self.__get_next_ka_timeout(), self.__update_keep_alive,
 | |
|                     _MIoTLanDeviceState.PING1)
 | |
|             case (
 | |
|                     _MIoTLanDeviceState.PING1
 | |
|                     | _MIoTLanDeviceState.PING2
 | |
|                     | _MIoTLanDeviceState.PING3
 | |
|             ):
 | |
|                 # Set the timer first to avoid Any early returns
 | |
|                 self._ka_timer = self._manager.internal_loop.call_later(
 | |
|                     self.FAST_PING_INTERVAL, self.__update_keep_alive,
 | |
|                     _MIoTLanDeviceState(state.value+1))
 | |
|                 # Fast ping
 | |
|                 if self._if_name is None:
 | |
|                     _LOGGER.error('if_name is Not set for device, %s', self.did)
 | |
|                     return
 | |
|                 if self.ip is None:
 | |
|                     _LOGGER.error('ip is Not set for device, %s', self.did)
 | |
|                     return
 | |
|                 self._manager.ping(if_name=self._if_name, target_ip=self.ip)
 | |
|             case _MIoTLanDeviceState.DEAD:
 | |
|                 if last_state == _MIoTLanDeviceState.PING3:
 | |
|                     self._ka_internal = self.KA_INTERVAL_MIN
 | |
|                     self.__change_online(False)
 | |
|             case _:
 | |
|                 _LOGGER.error('invalid state, %s', state)
 | |
| 
 | |
|     def __get_next_ka_timeout(self) -> float:
 | |
|         self._ka_internal = min(self._ka_internal*2, self.KA_INTERVAL_MAX)
 | |
|         return randomize_float(self._ka_internal, 0.1)
 | |
| 
 | |
|     def __change_online(self, online: bool) -> None:
 | |
|         _LOGGER.info('change online, %s, %s', self.did, online)
 | |
|         ts_now: int = int(time.time())
 | |
|         self._online_offline_history.append({'ts': ts_now, 'online': online})
 | |
|         if len(self._online_offline_history) > self.NETWORK_UNSTABLE_CNT_TH:
 | |
|             self._online_offline_history.pop(0)
 | |
|         if self._online_offline_timer:
 | |
|             self._online_offline_timer.cancel()
 | |
|             self._online_offline_timer = None
 | |
|         if not online:
 | |
|             self.online = False
 | |
|         else:
 | |
|             if (
 | |
|                 len(self._online_offline_history) < self.NETWORK_UNSTABLE_CNT_TH
 | |
|                 or (
 | |
|                     ts_now - self._online_offline_history[0]['ts'] >
 | |
|                     self.NETWORK_UNSTABLE_TIME_TH)
 | |
|             ):
 | |
|                 self.online = True
 | |
|             else:
 | |
|                 _LOGGER.info('unstable device detected, %s', self.did)
 | |
|                 self._online_offline_timer = \
 | |
|                     self._manager.internal_loop.call_later(
 | |
|                         self.NETWORK_UNSTABLE_RESUME_TH,
 | |
|                         self.__online_resume_handler)
 | |
| 
 | |
|     def __online_resume_handler(self) -> None:
 | |
|         _LOGGER.info('unstable resume threshold past, %s', self.did)
 | |
|         self.online = True
 | |
| 
 | |
|     def __md5(self, data: bytes) -> bytes:
 | |
|         hasher = hashes.Hash(hashes.MD5(), default_backend())
 | |
|         hasher.update(data)
 | |
|         return hasher.finalize()
 | |
| 
 | |
| 
 | |
| class MIoTLan:
 | |
|     """MIoT lan device control."""
 | |
|     # pylint: disable=unused-argument
 | |
|     # pylint: disable=inconsistent-quotes
 | |
|     OT_HEADER: bytes = b'\x21\x31'
 | |
|     OT_PORT: int = 54321
 | |
|     OT_PROBE_LEN: int = 32
 | |
|     OT_MSG_LEN: int = 1400
 | |
|     OT_SUPPORT_WILDCARD_SUB: int = 0xFE
 | |
| 
 | |
|     OT_PROBE_INTERVAL_MIN: float = 5
 | |
|     OT_PROBE_INTERVAL_MAX: float = 45
 | |
| 
 | |
|     PROFILE_MODELS_FILE: str = 'lan/profile_models.yaml'
 | |
| 
 | |
|     _main_loop: asyncio.AbstractEventLoop
 | |
|     _net_ifs: set[str]
 | |
|     _network: MIoTNetwork
 | |
|     _mips_service: MipsService
 | |
|     _enable_subscribe: bool
 | |
|     _lan_devices: dict[str, _MIoTLanDevice]
 | |
|     _virtual_did: str
 | |
|     _probe_msg: bytes
 | |
|     _write_buffer: bytearray
 | |
|     _read_buffer: bytearray
 | |
| 
 | |
|     _internal_loop: asyncio.AbstractEventLoop
 | |
|     _thread: threading.Thread
 | |
| 
 | |
|     _available_net_ifs: set[str]
 | |
|     _broadcast_socks: dict[str, socket.socket]
 | |
|     _local_port: Optional[int]
 | |
|     _scan_timer: Optional[asyncio.TimerHandle]
 | |
|     _last_scan_interval: Optional[float]
 | |
|     _msg_id_counter: int
 | |
|     _pending_requests: dict[int, _MIoTLanRequestData]
 | |
|     _device_msg_matcher: MIoTMatcher
 | |
|     _device_state_sub_map: dict[str, _MIoTLanSubDeviceData]
 | |
|     _reply_msg_buffer: dict[str, asyncio.TimerHandle]
 | |
| 
 | |
|     _lan_state_sub_map: dict[str, Callable[[bool], Coroutine]]
 | |
|     _lan_ctrl_vote_map: dict[str, bool]
 | |
| 
 | |
|     _profile_models: dict[str, dict]
 | |
| 
 | |
|     _init_lock: asyncio.Lock
 | |
|     _init_done: bool
 | |
| 
 | |
| # The following should be called from the main loop
 | |
| 
 | |
|     def __init__(
 | |
|         self,
 | |
|         net_ifs: list[str],
 | |
|         network: MIoTNetwork,
 | |
|         mips_service: MipsService,
 | |
|         enable_subscribe: bool = False,
 | |
|         virtual_did: Optional[int] = None,
 | |
|         loop: Optional[asyncio.AbstractEventLoop] = None
 | |
|     ) -> None:
 | |
|         if not network:
 | |
|             raise ValueError('network is required')
 | |
|         if not mips_service:
 | |
|             raise ValueError('mips_service is required')
 | |
|         self._main_loop = loop or asyncio.get_event_loop()
 | |
|         self._net_ifs = set(net_ifs)
 | |
|         self._network = network
 | |
|         self._network.sub_network_info(
 | |
|             key='miot_lan',
 | |
|             handler=self.__on_network_info_change_external_async)
 | |
|         self._mips_service = mips_service
 | |
|         self._mips_service.sub_service_change(
 | |
|             key='miot_lan', group_id='*',
 | |
|             handler=self.__on_mips_service_change)
 | |
|         self._enable_subscribe = enable_subscribe
 | |
|         self._virtual_did = str(virtual_did) \
 | |
|             if (virtual_did is not None) \
 | |
|             else str(secrets.randbits(64))
 | |
|         # Init socket probe message
 | |
|         probe_bytes = bytearray(self.OT_PROBE_LEN)
 | |
|         probe_bytes[:20] = (
 | |
|             b'!1\x00\x20\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFFMDID')
 | |
|         probe_bytes[20:28] = struct.pack('>Q', int(self._virtual_did))
 | |
|         probe_bytes[28:32] = b'\x00\x00\x00\x00'
 | |
|         self._probe_msg = bytes(probe_bytes)
 | |
|         self._read_buffer = bytearray(self.OT_MSG_LEN)
 | |
|         self._write_buffer = bytearray(self.OT_MSG_LEN)
 | |
| 
 | |
|         self._lan_devices = {}
 | |
|         self._available_net_ifs = set()
 | |
|         self._broadcast_socks = {}
 | |
|         self._local_port = None
 | |
|         self._scan_timer = None
 | |
|         self._last_scan_interval = None
 | |
|         self._msg_id_counter = int(random.random()*0x7FFFFFFF)
 | |
|         self._pending_requests = {}
 | |
|         self._device_msg_matcher = MIoTMatcher()
 | |
|         self._device_state_sub_map = {}
 | |
|         self._reply_msg_buffer = {}
 | |
| 
 | |
|         self._lan_state_sub_map = {}
 | |
|         self._lan_ctrl_vote_map = {}
 | |
| 
 | |
|         self._init_lock = asyncio.Lock()
 | |
|         self._init_done = False
 | |
| 
 | |
|         if (
 | |
|             len(self._mips_service.get_services()) == 0
 | |
|             and len(self._net_ifs) > 0
 | |
|         ):
 | |
|             _LOGGER.info('no central hub gateway service, init miot lan')
 | |
|             self._main_loop.call_later(
 | |
|                 0, lambda: self._main_loop.create_task(
 | |
|                     self.init_async()))
 | |
| 
 | |
|     def __assert_service_ready(self) -> None:
 | |
|         if not self._init_done:
 | |
|             raise MIoTLanError(
 | |
|                 'MIoT lan is not ready',
 | |
|                 MIoTErrorCode.CODE_LAN_UNAVAILABLE)
 | |
| 
 | |
|     @property
 | |
|     def virtual_did(self) -> str:
 | |
|         return self._virtual_did
 | |
| 
 | |
|     @property
 | |
|     def internal_loop(self) -> asyncio.AbstractEventLoop:
 | |
|         return self._internal_loop
 | |
| 
 | |
|     @property
 | |
|     def init_done(self) -> bool:
 | |
|         return self._init_done
 | |
| 
 | |
|     async def init_async(self) -> None:
 | |
|         # Avoid race condition
 | |
|         async with self._init_lock:
 | |
|             if self._init_done:
 | |
|                 _LOGGER.info('miot lan already init')
 | |
|                 return
 | |
|             if len(self._net_ifs) == 0:
 | |
|                 _LOGGER.info('no net_ifs')
 | |
|                 return
 | |
|             if not any(self._lan_ctrl_vote_map.values()):
 | |
|                 _LOGGER.info('no vote for lan ctrl')
 | |
|                 return
 | |
|             if len(self._mips_service.get_services()) > 0:
 | |
|                 _LOGGER.info('central hub gateway service exist')
 | |
|                 return
 | |
|             for if_name in list(self._network.network_info.keys()):
 | |
|                 self._available_net_ifs.add(if_name)
 | |
|             if len(self._available_net_ifs) == 0:
 | |
|                 _LOGGER.info('no available net_ifs')
 | |
|                 return
 | |
|             if self._net_ifs.isdisjoint(self._available_net_ifs):
 | |
|                 _LOGGER.info('no valid net_ifs')
 | |
|                 return
 | |
|             try:
 | |
|                 self._profile_models = await self._main_loop.run_in_executor(
 | |
|                     None, load_yaml_file,
 | |
|                     gen_absolute_path(self.PROFILE_MODELS_FILE))
 | |
|             except Exception as err:  # pylint: disable=broad-exception-caught
 | |
|                 _LOGGER.error('load profile models error, %s', err)
 | |
|                 self._profile_models = {}
 | |
|             self._internal_loop = asyncio.new_event_loop()
 | |
|             # All tasks meant for the internal loop should happen in this thread
 | |
|             self._thread = threading.Thread(target=self.__internal_loop_thread)
 | |
|             self._thread.name = 'miot_lan'
 | |
|             self._thread.daemon = True
 | |
|             self._thread.start()
 | |
|             self._init_done = True
 | |
|             for handler in list(self._lan_state_sub_map.values()):
 | |
|                 self._main_loop.create_task(handler(True))
 | |
|             _LOGGER.info(
 | |
|                 'miot lan init, %s ,%s', self._net_ifs, self._available_net_ifs)
 | |
| 
 | |
|     def __internal_loop_thread(self) -> None:
 | |
|         _LOGGER.info('miot lan thread start')
 | |
|         self.__init_socket()
 | |
|         self._scan_timer = self._internal_loop.call_later(
 | |
|             int(3*random.random()), self.__scan_devices)
 | |
|         self._internal_loop.run_forever()
 | |
|         _LOGGER.info('miot lan thread exit')
 | |
| 
 | |
|     async def deinit_async(self) -> None:
 | |
|         if not self._init_done:
 | |
|             _LOGGER.info('miot lan not init')
 | |
|             return
 | |
|         self._init_done = False
 | |
|         self._internal_loop.call_soon_threadsafe(self.__deinit)
 | |
|         self._thread.join()
 | |
|         self._internal_loop.close()
 | |
| 
 | |
|         self._profile_models = {}
 | |
|         self._lan_devices = {}
 | |
|         self._broadcast_socks = {}
 | |
|         self._local_port = None
 | |
|         self._scan_timer = None
 | |
|         self._last_scan_interval = None
 | |
|         self._msg_id_counter = int(random.random()*0x7FFFFFFF)
 | |
|         self._pending_requests = {}
 | |
|         self._device_msg_matcher = MIoTMatcher()
 | |
|         self._device_state_sub_map = {}
 | |
|         self._reply_msg_buffer = {}
 | |
|         for handler in list(self._lan_state_sub_map.values()):
 | |
|             self._main_loop.create_task(handler(False))
 | |
|         _LOGGER.info('miot lan deinit')
 | |
| 
 | |
|     async def update_net_ifs_async(self, net_ifs: list[str]) -> None:
 | |
|         _LOGGER.info('update net_ifs, %s', net_ifs)
 | |
|         if not isinstance(net_ifs, list):
 | |
|             _LOGGER.error('invalid net_ifs, %s', net_ifs)
 | |
|             return
 | |
|         if len(net_ifs) == 0:
 | |
|             # Deinit lan
 | |
|             await self.deinit_async()
 | |
|             self._net_ifs = set(net_ifs)
 | |
|             return
 | |
|         available_net_ifs = set()
 | |
|         for if_name in list(self._network.network_info.keys()):
 | |
|             available_net_ifs.add(if_name)
 | |
|         if set(net_ifs).isdisjoint(available_net_ifs):
 | |
|             _LOGGER.error('no valid net_ifs, %s', net_ifs)
 | |
|             await self.deinit_async()
 | |
|             self._net_ifs = set(net_ifs)
 | |
|             self._available_net_ifs = available_net_ifs
 | |
|             return
 | |
|         if not self._init_done:
 | |
|             self._net_ifs = set(net_ifs)
 | |
|             await self.init_async()
 | |
|             return
 | |
|         self._internal_loop.call_soon_threadsafe(
 | |
|             self.__update_net_ifs,
 | |
|             net_ifs)
 | |
| 
 | |
|     async def vote_for_lan_ctrl_async(self, key: str, vote: bool) -> None:
 | |
|         _LOGGER.info('vote for lan ctrl, %s, %s', key, vote)
 | |
|         self._lan_ctrl_vote_map[key] = vote
 | |
|         if not any(self._lan_ctrl_vote_map.values()):
 | |
|             await self.deinit_async()
 | |
|             return
 | |
|         await self.init_async()
 | |
| 
 | |
|     async def update_subscribe_option(self, enable_subscribe: bool) -> None:
 | |
|         _LOGGER.info('update subscribe option, %s', enable_subscribe)
 | |
|         if not self._init_done:
 | |
|             self._enable_subscribe = enable_subscribe
 | |
|             return
 | |
|         self._internal_loop.call_soon_threadsafe(
 | |
|             self.__update_subscribe_option,
 | |
|             {'enable_subscribe': enable_subscribe})
 | |
| 
 | |
|     def update_devices(self, devices: dict[str, dict]) -> bool:
 | |
|         _LOGGER.info('update devices, %s', devices)
 | |
|         if not self._init_done:
 | |
|             return False
 | |
|         self._internal_loop.call_soon_threadsafe(
 | |
|             self.__update_devices, devices)
 | |
|         return True
 | |
| 
 | |
|     def delete_devices(self, devices: list[str]) -> bool:
 | |
|         _LOGGER.info('delete devices, %s', devices)
 | |
|         if not self._init_done:
 | |
|             return False
 | |
|         self._internal_loop.call_soon_threadsafe(
 | |
|             self.__delete_devices, devices)
 | |
|         return True
 | |
| 
 | |
|     def sub_lan_state(
 | |
|         self, key: str, handler: Callable[[bool], Coroutine]
 | |
|     ) -> None:
 | |
|         self._lan_state_sub_map[key] = handler
 | |
| 
 | |
|     def unsub_lan_state(self, key: str) -> None:
 | |
|         self._lan_state_sub_map.pop(key, None)
 | |
| 
 | |
|     @final
 | |
|     def sub_device_state(
 | |
|         self, key: str, handler: Callable[[str, dict, Any], Coroutine],
 | |
|         handler_ctx: Any = None
 | |
|     ) -> bool:
 | |
|         if not self._init_done:
 | |
|             return False
 | |
|         self._internal_loop.call_soon_threadsafe(
 | |
|             self.__sub_device_state,
 | |
|             _MIoTLanSubDeviceData(
 | |
|                 key=key, handler=handler, handler_ctx=handler_ctx))
 | |
|         return True
 | |
| 
 | |
|     @final
 | |
|     def unsub_device_state(self, key: str) -> bool:
 | |
|         if not self._init_done:
 | |
|             return False
 | |
|         self._internal_loop.call_soon_threadsafe(
 | |
|             self.__unsub_device_state, _MIoTLanUnsubDeviceData(key=key))
 | |
|         return True
 | |
| 
 | |
|     @final
 | |
|     def sub_prop(
 | |
|         self,
 | |
|         did: str,
 | |
|         handler: Callable[[dict, Any], None],
 | |
|         siid: Optional[int] = None,
 | |
|         piid: Optional[int] = None,
 | |
|         handler_ctx: Any = None
 | |
|     ) -> bool:
 | |
|         if not self._init_done:
 | |
|             return False
 | |
|         if not self._enable_subscribe:
 | |
|             return False
 | |
|         key = (
 | |
|             f'{did}/p/'
 | |
|             f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}')
 | |
|         self._internal_loop.call_soon_threadsafe(
 | |
|             self.__sub_broadcast,
 | |
|             _MIoTLanRegisterBroadcastData(
 | |
|                 key=key, handler=handler, handler_ctx=handler_ctx))
 | |
|         return True
 | |
| 
 | |
|     @final
 | |
|     def unsub_prop(
 | |
|         self,
 | |
|         did: str,
 | |
|         siid: Optional[int] = None,
 | |
|         piid: Optional[int] = None
 | |
|     ) -> bool:
 | |
|         if not self._init_done:
 | |
|             return False
 | |
|         if not self._enable_subscribe:
 | |
|             return False
 | |
|         key = (
 | |
|             f'{did}/p/'
 | |
|             f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}')
 | |
|         self._internal_loop.call_soon_threadsafe(
 | |
|             self.__unsub_broadcast,
 | |
|             _MIoTLanUnregisterBroadcastData(key=key))
 | |
|         return True
 | |
| 
 | |
|     @final
 | |
|     def sub_event(
 | |
|         self,
 | |
|         did: str,
 | |
|         handler: Callable[[dict, Any], None],
 | |
|         siid: Optional[int] = None,
 | |
|         eiid: Optional[int] = None,
 | |
|         handler_ctx: Any = None
 | |
|     ) -> bool:
 | |
|         if not self._init_done:
 | |
|             return False
 | |
|         if not self._enable_subscribe:
 | |
|             return False
 | |
|         key = (
 | |
|             f'{did}/e/'
 | |
|             f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}')
 | |
|         self._internal_loop.call_soon_threadsafe(
 | |
|             self.__sub_broadcast,
 | |
|             _MIoTLanRegisterBroadcastData(
 | |
|                 key=key, handler=handler, handler_ctx=handler_ctx))
 | |
|         return True
 | |
| 
 | |
|     @final
 | |
|     def unsub_event(
 | |
|         self,
 | |
|         did: str,
 | |
|         siid: Optional[int] = None,
 | |
|         eiid: Optional[int] = None
 | |
|     ) -> bool:
 | |
|         if not self._init_done:
 | |
|             return False
 | |
|         if not self._enable_subscribe:
 | |
|             return False
 | |
|         key = (
 | |
|             f'{did}/e/'
 | |
|             f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}')
 | |
|         self._internal_loop.call_soon_threadsafe(
 | |
|             self.__unsub_broadcast,
 | |
|             _MIoTLanUnregisterBroadcastData(key=key))
 | |
|         return True
 | |
| 
 | |
|     @final
 | |
|     async def get_prop_async(
 | |
|         self, did: str, siid: int, piid: int, timeout_ms: int = 10000
 | |
|     ) -> Any:
 | |
|         self.__assert_service_ready()
 | |
|         result_obj = await self.__call_api_async(
 | |
|             did=did, msg={
 | |
|                 'method': 'get_properties',
 | |
|                 'params': [{'did': did, 'siid': siid, 'piid': piid}]
 | |
|             }, timeout_ms=timeout_ms)
 | |
| 
 | |
|         if (
 | |
|             result_obj and 'result' in result_obj
 | |
|             and len(result_obj['result']) == 1
 | |
|             and 'did' in result_obj['result'][0]
 | |
|             and result_obj['result'][0]['did'] == did
 | |
|         ):
 | |
|             return result_obj['result'][0].get('value', None)
 | |
|         return None
 | |
| 
 | |
|     @final
 | |
|     async def set_prop_async(
 | |
|         self, did: str, siid: int, piid: int, value: Any,
 | |
|         timeout_ms: int = 10000
 | |
|     ) -> dict:
 | |
|         self.__assert_service_ready()
 | |
|         result_obj = await self.__call_api_async(
 | |
|             did=did, msg={
 | |
|                 'method': 'set_properties',
 | |
|                 'params': [{
 | |
|                     'did': did, 'siid': siid, 'piid': piid, 'value': value}]
 | |
|             }, timeout_ms=timeout_ms)
 | |
|         if result_obj:
 | |
|             if (
 | |
|                 'result' in result_obj
 | |
|                 and len(result_obj['result']) == 1
 | |
|                 and 'did' in result_obj['result'][0]
 | |
|                 and result_obj['result'][0]['did'] == did
 | |
|                 and 'code' in result_obj['result'][0]
 | |
|             ):
 | |
|                 return result_obj['result'][0]
 | |
|             if 'code' in result_obj:
 | |
|                 return result_obj
 | |
|         raise MIoTError('Invalid result', MIoTErrorCode.CODE_INTERNAL_ERROR)
 | |
| 
 | |
|     @final
 | |
|     async def action_async(
 | |
|         self, did: str, siid: int, aiid: int, in_list: list,
 | |
|         timeout_ms: int = 10000
 | |
|     ) -> dict:
 | |
|         self.__assert_service_ready()
 | |
|         result_obj = await self.__call_api_async(
 | |
|             did=did, msg={
 | |
|                 'method': 'action',
 | |
|                 'params': {
 | |
|                     'did': did, 'siid': siid, 'aiid': aiid, 'in': in_list}
 | |
|             }, timeout_ms=timeout_ms)
 | |
|         if result_obj:
 | |
|             if 'result' in result_obj and 'code' in result_obj['result']:
 | |
|                 return result_obj['result']
 | |
|             if 'code' in result_obj:
 | |
|                 return result_obj
 | |
|         raise MIoTError('Invalid result', MIoTErrorCode.CODE_INTERNAL_ERROR)
 | |
| 
 | |
|     @final
 | |
|     async def get_dev_list_async(
 | |
|         self, timeout_ms: int = 10000
 | |
|     ) -> dict[str, dict]:
 | |
|         if not self._init_done:
 | |
|             return {}
 | |
| 
 | |
|         def get_device_list_handler(msg: dict, fut: asyncio.Future):
 | |
|             self._main_loop.call_soon_threadsafe(
 | |
|                 fut.set_result, msg)
 | |
| 
 | |
|         fut: asyncio.Future = self._main_loop.create_future()
 | |
|         self._internal_loop.call_soon_threadsafe(
 | |
|             self.__get_dev_list,
 | |
|             _MIoTLanGetDevListData(
 | |
|                 handler=get_device_list_handler,
 | |
|                 handler_ctx=fut,
 | |
|                 timeout_ms=timeout_ms))
 | |
|         return await fut
 | |
| 
 | |
|     async def __call_api_async(
 | |
|         self, did: str, msg: dict, timeout_ms: int = 10000
 | |
|     ) -> dict:
 | |
|         def call_api_handler(msg: dict, fut: asyncio.Future):
 | |
|             self._main_loop.call_soon_threadsafe(
 | |
|                 fut.set_result, msg)
 | |
| 
 | |
|         fut: asyncio.Future = self._main_loop.create_future()
 | |
|         self._internal_loop.call_soon_threadsafe(
 | |
|             self.__call_api, did, msg, call_api_handler, fut, timeout_ms)
 | |
|         return await fut
 | |
| 
 | |
|     async def __on_network_info_change_external_async(
 | |
|         self,
 | |
|         status: InterfaceStatus,
 | |
|         info: NetworkInfo
 | |
|     ) -> None:
 | |
|         _LOGGER.info(
 | |
|             'on network info change, status: %s, info: %s', status, info)
 | |
|         available_net_ifs = set()
 | |
|         for if_name in list(self._network.network_info.keys()):
 | |
|             available_net_ifs.add(if_name)
 | |
|         if len(available_net_ifs) == 0:
 | |
|             await self.deinit_async()
 | |
|             self._available_net_ifs = available_net_ifs
 | |
|             return
 | |
|         if self._net_ifs.isdisjoint(available_net_ifs):
 | |
|             _LOGGER.info('no valid net_ifs')
 | |
|             await self.deinit_async()
 | |
|             self._available_net_ifs = available_net_ifs
 | |
|             return
 | |
|         if not self._init_done:
 | |
|             self._available_net_ifs = available_net_ifs
 | |
|             await self.init_async()
 | |
|             return
 | |
|         self._internal_loop.call_soon_threadsafe(
 | |
|             self.__on_network_info_change,
 | |
|             _MIoTLanNetworkUpdateData(status=status, if_name=info.name))
 | |
| 
 | |
|     async def __on_mips_service_change(
 | |
|         self, group_id: str,  state: MipsServiceState, data: dict
 | |
|     ) -> None:
 | |
|         _LOGGER.info(
 | |
|             'on mips service change, %s, %s, %s',  group_id, state, data)
 | |
|         if len(self._mips_service.get_services()) > 0:
 | |
|             _LOGGER.info('find central service, deinit miot lan')
 | |
|             await self.deinit_async()
 | |
|         else:
 | |
|             _LOGGER.info('no central service, init miot lan')
 | |
|             await self.init_async()
 | |
| 
 | |
| # The following methods SHOULD ONLY be called in the internal loop
 | |
| 
 | |
|     def ping(self, if_name: str | None, target_ip: str) -> None:
 | |
|         if not target_ip:
 | |
|             return
 | |
|         self.__sendto(
 | |
|             if_name=if_name, data=self._probe_msg, address=target_ip,
 | |
|             port=self.OT_PORT)
 | |
| 
 | |
|     def send2device(
 | |
|         self, did: str,
 | |
|         msg: dict,
 | |
|         handler: Optional[Callable[[dict, Any], None]] = None,
 | |
|         handler_ctx: Any = None,
 | |
|         timeout_ms: Optional[int] = None
 | |
|     ) -> None:
 | |
|         if timeout_ms and not handler:
 | |
|             raise ValueError('handler is required when timeout_ms is set')
 | |
|         device: _MIoTLanDevice | None = self._lan_devices.get(did)
 | |
|         if not device:
 | |
|             raise ValueError('invalid device')
 | |
|         if not device.cipher:
 | |
|             raise ValueError('invalid device cipher')
 | |
|         if not device.if_name:
 | |
|             raise ValueError('invalid device if_name')
 | |
|         if not device.ip:
 | |
|             raise ValueError('invalid device ip')
 | |
|         in_msg = {'id': self.__gen_msg_id(), **msg}
 | |
|         msg_len = device.gen_packet(
 | |
|             out_buffer=self._write_buffer,
 | |
|             clear_data=in_msg,
 | |
|             did=did,
 | |
|             offset=int(time.time())-device.offset)
 | |
| 
 | |
|         return self.__make_request(
 | |
|             msg_id=in_msg['id'],
 | |
|             msg=self._write_buffer[0: msg_len],
 | |
|             if_name=device.if_name,
 | |
|             ip=device.ip,
 | |
|             handler=handler,
 | |
|             handler_ctx=handler_ctx,
 | |
|             timeout_ms=timeout_ms)
 | |
| 
 | |
|     def __make_request(
 | |
|         self,
 | |
|         msg_id: int,
 | |
|         msg: bytearray,
 | |
|         if_name: str,
 | |
|         ip: str,
 | |
|         handler: Optional[Callable[[dict, Any], None]],
 | |
|         handler_ctx: Any = None,
 | |
|         timeout_ms: Optional[int] = None
 | |
|     ) -> None:
 | |
|         def request_timeout_handler(req_data: _MIoTLanRequestData):
 | |
|             self._pending_requests.pop(req_data.msg_id, None)
 | |
|             if req_data and req_data.handler:
 | |
|                 req_data.handler({
 | |
|                     'code': MIoTErrorCode.CODE_TIMEOUT.value,
 | |
|                     'error': 'timeout'},
 | |
|                     req_data.handler_ctx)
 | |
| 
 | |
|         timer: Optional[asyncio.TimerHandle] = None
 | |
|         request_data = _MIoTLanRequestData(
 | |
|             msg_id=msg_id,
 | |
|             handler=handler,
 | |
|             handler_ctx=handler_ctx,
 | |
|             timeout=timer)
 | |
|         if timeout_ms:
 | |
|             timer = self._internal_loop.call_later(
 | |
|                 timeout_ms/1000, request_timeout_handler, request_data)
 | |
|             request_data.timeout = timer
 | |
|         self._pending_requests[msg_id] = request_data
 | |
|         self.__sendto(if_name=if_name, data=msg, address=ip, port=self.OT_PORT)
 | |
| 
 | |
|     def broadcast_device_state(self, did: str, state: dict) -> None:
 | |
|         for handler in self._device_state_sub_map.values():
 | |
|             self._main_loop.call_soon_threadsafe(
 | |
|                 self._main_loop.create_task,
 | |
|                 handler.handler(did, state, handler.handler_ctx))
 | |
| 
 | |
|     def __gen_msg_id(self) -> int:
 | |
|         if not self._msg_id_counter:
 | |
|             self._msg_id_counter = int(random.random()*0x7FFFFFFF)
 | |
|         self._msg_id_counter += 1
 | |
|         if self._msg_id_counter > 0x80000000:
 | |
|             self._msg_id_counter = 1
 | |
|         return self._msg_id_counter
 | |
| 
 | |
|     def __call_api(
 | |
|         self,
 | |
|         did: str,
 | |
|         msg: dict,
 | |
|         handler: Callable,
 | |
|         handler_ctx: Any,
 | |
|         timeout_ms: int = 10000
 | |
|     ) -> None:
 | |
|         try:
 | |
|             self.send2device(
 | |
|                 did=did,
 | |
|                 msg={'from': 'ha.xiaomi_home', **msg},
 | |
|                 handler=handler,
 | |
|                 handler_ctx=handler_ctx,
 | |
|                 timeout_ms=timeout_ms)
 | |
|         except Exception as err:  # pylint: disable=broad-exception-caught
 | |
|             _LOGGER.error('send2device error, %s', err)
 | |
|             handler({
 | |
|                 'code': MIoTErrorCode.CODE_INTERNAL_ERROR.value,
 | |
|                 'error': str(err)},
 | |
|                 handler_ctx)
 | |
| 
 | |
|     def __sub_device_state(self, data: _MIoTLanSubDeviceData) -> None:
 | |
|         self._device_state_sub_map[data.key] = data
 | |
| 
 | |
|     def __unsub_device_state(self, data: _MIoTLanUnsubDeviceData) -> None:
 | |
|         self._device_state_sub_map.pop(data.key, None)
 | |
| 
 | |
|     def __sub_broadcast(self, data: _MIoTLanRegisterBroadcastData) -> None:
 | |
|         self._device_msg_matcher[data.key] = data
 | |
|         _LOGGER.debug('lan register broadcast, %s', data.key)
 | |
| 
 | |
|     def __unsub_broadcast(self, data: _MIoTLanUnregisterBroadcastData) -> None:
 | |
|         if self._device_msg_matcher.get(topic=data.key):
 | |
|             del self._device_msg_matcher[data.key]
 | |
|         _LOGGER.debug('lan unregister broadcast, %s', data.key)
 | |
| 
 | |
|     def __get_dev_list(self, data: _MIoTLanGetDevListData) -> None:
 | |
|         dev_list = {
 | |
|             device.did: {
 | |
|                 'online': device.online,
 | |
|                 'push_available': device.subscribed
 | |
|             }
 | |
|             for device in self._lan_devices.values()
 | |
|             if device.online}
 | |
|         data.handler(
 | |
|             dev_list, data.handler_ctx)
 | |
| 
 | |
|     def __update_devices(self, devices: dict[str, dict]) -> None:
 | |
|         for did, info in devices.items():
 | |
|             # did MUST be digit(UINT64)
 | |
|             if not did.isdigit():
 | |
|                 _LOGGER.info('invalid did, %s', did)
 | |
|                 continue
 | |
|             if (
 | |
|                     'model' not in info
 | |
|                     or info['model'] in self._profile_models):
 | |
|                 # Do not support the local control of
 | |
|                 # Profile device for the time being
 | |
|                 _LOGGER.info(
 | |
|                     'model not support local ctrl, %s, %s',
 | |
|                     did, info.get('model'))
 | |
|                 continue
 | |
|             if did not in self._lan_devices:
 | |
|                 if 'token' not in info:
 | |
|                     _LOGGER.error(
 | |
|                         'token not found, %s, %s', did, info)
 | |
|                     continue
 | |
|                 if len(info['token']) != 32:
 | |
|                     _LOGGER.error(
 | |
|                         'invalid device token, %s, %s', did, info)
 | |
|                     continue
 | |
|                 self._lan_devices[did] = _MIoTLanDevice(
 | |
|                     manager=self, did=did, token=info['token'],
 | |
|                     ip=info.get('ip', None))
 | |
|             else:
 | |
|                 self._lan_devices[did].update_info(info)
 | |
| 
 | |
|     def __delete_devices(self, devices: list[str]) -> None:
 | |
|         for did in devices:
 | |
|             lan_device = self._lan_devices.pop(did, None)
 | |
|             if not lan_device:
 | |
|                 continue
 | |
|             lan_device.on_delete()
 | |
| 
 | |
|     def __on_network_info_change(self, data: _MIoTLanNetworkUpdateData) -> None:
 | |
|         if data.status == InterfaceStatus.ADD:
 | |
|             self._available_net_ifs.add(data.if_name)
 | |
|             if data.if_name in self._net_ifs:
 | |
|                 self.__create_socket(if_name=data.if_name)
 | |
|         elif data.status == InterfaceStatus.REMOVE:
 | |
|             self._available_net_ifs.remove(data.if_name)
 | |
|             self.__destroy_socket(if_name=data.if_name)
 | |
| 
 | |
|     def __update_net_ifs(self, net_ifs: list[str]) -> None:
 | |
|         if self._net_ifs != set(net_ifs):
 | |
|             self._net_ifs = set(net_ifs)
 | |
|             for if_name in self._net_ifs:
 | |
|                 self.__create_socket(if_name=if_name)
 | |
|             for if_name in list(self._broadcast_socks.keys()):
 | |
|                 if if_name not in self._net_ifs:
 | |
|                     self.__destroy_socket(if_name=if_name)
 | |
| 
 | |
|     def __update_subscribe_option(self, options: dict) -> None:
 | |
|         if 'enable_subscribe' in options:
 | |
|             if options['enable_subscribe'] != self._enable_subscribe:
 | |
|                 self._enable_subscribe = options['enable_subscribe']
 | |
|                 if not self._enable_subscribe:
 | |
|                     # Unsubscribe all
 | |
|                     for device in self._lan_devices.values():
 | |
|                         device.unsubscribe()
 | |
| 
 | |
|     def __deinit(self) -> None:
 | |
|         # Release all resources
 | |
|         if self._scan_timer:
 | |
|             self._scan_timer.cancel()
 | |
|             self._scan_timer = None
 | |
|         for device in self._lan_devices.values():
 | |
|             device.on_delete()
 | |
|         self._lan_devices.clear()
 | |
|         for req_data in self._pending_requests.values():
 | |
|             if req_data.timeout:
 | |
|                 req_data.timeout.cancel()
 | |
|                 req_data.timeout = None
 | |
|         self._pending_requests.clear()
 | |
|         for timer in self._reply_msg_buffer.values():
 | |
|             timer.cancel()
 | |
|         self._reply_msg_buffer.clear()
 | |
|         self._device_msg_matcher = MIoTMatcher()
 | |
|         self.__deinit_socket()
 | |
|         self._internal_loop.stop()
 | |
| 
 | |
|     def __init_socket(self) -> None:
 | |
|         self.__deinit_socket()
 | |
|         for if_name in self._net_ifs:
 | |
|             if if_name not in self._available_net_ifs:
 | |
|                 return
 | |
|             self.__create_socket(if_name=if_name)
 | |
| 
 | |
|     def __create_socket(self, if_name: str) -> None:
 | |
|         if if_name in self._broadcast_socks:
 | |
|             _LOGGER.info('socket already created, %s', if_name)
 | |
|             return
 | |
|         # Create socket
 | |
|         try:
 | |
|             sock = socket.socket(
 | |
|                 socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
 | |
|             sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
 | |
|             sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
 | |
|             # Set SO_BINDTODEVICE
 | |
|             sock.setsockopt(
 | |
|                 socket.SOL_SOCKET, socket.SO_BINDTODEVICE, if_name.encode())
 | |
|             sock.bind(('', self._local_port or 0))
 | |
|             self._internal_loop.add_reader(
 | |
|                 sock.fileno(), self.__socket_read_handler, (if_name, sock))
 | |
|             self._broadcast_socks[if_name] = sock
 | |
|             self._local_port = self._local_port or sock.getsockname()[1]
 | |
|             _LOGGER.info(
 | |
|                 'created socket, %s, %s', if_name, self._local_port)
 | |
|         except Exception as err:  # pylint: disable=broad-exception-caught
 | |
|             _LOGGER.error('create socket error, %s, %s', if_name, err)
 | |
| 
 | |
|     def __deinit_socket(self) -> None:
 | |
|         for if_name in list(self._broadcast_socks.keys()):
 | |
|             self.__destroy_socket(if_name)
 | |
|         self._broadcast_socks.clear()
 | |
| 
 | |
|     def __destroy_socket(self, if_name: str) -> None:
 | |
|         sock = self._broadcast_socks.pop(if_name, None)
 | |
|         if not sock:
 | |
|             return
 | |
|         self._internal_loop.remove_reader(sock.fileno())
 | |
|         sock.close()
 | |
|         _LOGGER.info('destroyed socket, %s', if_name)
 | |
| 
 | |
|     def __socket_read_handler(self, ctx: tuple[str, socket.socket]) -> None:
 | |
|         try:
 | |
|             data_len, addr = ctx[1].recvfrom_into(
 | |
|                 self._read_buffer, self.OT_MSG_LEN, socket.MSG_DONTWAIT)
 | |
|             if data_len < 0:
 | |
|                 # Socket error
 | |
|                 _LOGGER.error('socket read error, %s, %s', ctx[0], data_len)
 | |
|                 return
 | |
|             if addr[1] != self.OT_PORT:
 | |
|                 # Not ot msg
 | |
|                 return
 | |
|             self.__raw_message_handler(
 | |
|                 self._read_buffer[:data_len], data_len, addr[0], ctx[0])
 | |
|         except Exception as err:  # pylint: disable=broad-exception-caught
 | |
|             _LOGGER.error('socket read handler error, %s', err)
 | |
| 
 | |
|     def __raw_message_handler(
 | |
|         self, data: bytearray, data_len: int, ip: str, if_name: str
 | |
|     ) -> None:
 | |
|         if data[:2] != self.OT_HEADER:
 | |
|             return
 | |
|         # Keep alive message
 | |
|         did: str = str(struct.unpack('>Q', data[4:12])[0])
 | |
|         device: _MIoTLanDevice | None = self._lan_devices.get(did)
 | |
|         if not device:
 | |
|             return
 | |
|         timestamp: int = struct.unpack('>I', data[12:16])[0]
 | |
|         device.offset = int(time.time()) - timestamp
 | |
|         # Keep alive if this is a probe
 | |
|         if data_len == self.OT_PROBE_LEN or device.subscribed:
 | |
|             device.keep_alive(ip=ip, if_name=if_name)
 | |
|         # Manage device subscribe status
 | |
|         if (
 | |
|             self._enable_subscribe
 | |
|             and data_len == self.OT_PROBE_LEN
 | |
|             and data[16:20] == b'MSUB'
 | |
|             and data[24:27] == b'PUB'
 | |
|         ):
 | |
|             device.supported_wildcard_sub = (
 | |
|                 int(data[28]) == self.OT_SUPPORT_WILDCARD_SUB)
 | |
|             sub_ts = struct.unpack('>I', data[20:24])[0]
 | |
|             sub_type = int(data[27])
 | |
|             if (
 | |
|                 device.supported_wildcard_sub
 | |
|                 and sub_type in [0, 1, 4]
 | |
|                 and sub_ts != device.sub_ts
 | |
|             ):
 | |
|                 device.subscribed = False
 | |
|                 device.subscribe()
 | |
|         if data_len > self.OT_PROBE_LEN:
 | |
|             # handle device message
 | |
|             try:
 | |
|                 decrypted_data = device.decrypt_packet(data)
 | |
|                 self.__message_handler(did, decrypted_data)
 | |
|             except Exception as err:   # pylint: disable=broad-exception-caught
 | |
|                 _LOGGER.error('decrypt packet error, %s, %s', did, err)
 | |
|                 return
 | |
| 
 | |
|     def __message_handler(self, did: str, msg: dict) -> None:
 | |
|         if 'id' not in msg:
 | |
|             _LOGGER.warning('invalid message, no id, %s, %s', did, msg)
 | |
|             return
 | |
|         # Reply
 | |
|         req: _MIoTLanRequestData | None = \
 | |
|             self._pending_requests.pop(msg['id'], None)
 | |
|         if req:
 | |
|             if req.timeout:
 | |
|                 req.timeout.cancel()
 | |
|                 req.timeout = None
 | |
|             if req.handler is not None:
 | |
|                 self._main_loop.call_soon_threadsafe(
 | |
|                     req.handler, msg, req.handler_ctx)
 | |
|             return
 | |
|         # Handle up link message
 | |
|         if 'method' not in msg or 'params' not in msg:
 | |
|             _LOGGER.debug(
 | |
|                 'invalid message, no method or params, %s, %s', did, msg)
 | |
|             return
 | |
|         # Filter dup message
 | |
|         if self.__filter_dup_message(did, msg['id']):
 | |
|             self.send2device(
 | |
|                 did=did, msg={'id': msg['id'], 'result': {'code': 0}})
 | |
|             return
 | |
|         _LOGGER.debug('lan message, %s, %s', did, msg)
 | |
|         if msg['method'] == 'properties_changed':
 | |
|             for param in msg['params']:
 | |
|                 if 'siid' not in param and 'piid' not in param:
 | |
|                     _LOGGER.debug(
 | |
|                         'invalid message, no siid or piid, %s, %s', did, msg)
 | |
|                     continue
 | |
|                 key = f'{did}/p/{param["siid"]}/{param["piid"]}'
 | |
|                 subs: list[_MIoTLanRegisterBroadcastData] = list(
 | |
|                     self._device_msg_matcher.iter_match(key))
 | |
|                 for sub in subs:
 | |
|                     self._main_loop.call_soon_threadsafe(
 | |
|                         sub.handler, param, sub.handler_ctx)
 | |
|         elif (
 | |
|                 msg['method'] == 'event_occured'
 | |
|                 and 'siid' in msg['params']
 | |
|                 and 'eiid' in msg['params']
 | |
|         ):
 | |
|             key = f'{did}/e/{msg["params"]["siid"]}/{msg["params"]["eiid"]}'
 | |
|             subs: list[_MIoTLanRegisterBroadcastData] = list(
 | |
|                 self._device_msg_matcher.iter_match(key))
 | |
|             for sub in subs:
 | |
|                 self._main_loop.call_soon_threadsafe(
 | |
|                     sub.handler, msg['params'], sub.handler_ctx)
 | |
|         else:
 | |
|             _LOGGER.debug(
 | |
|                 'invalid message, unknown method, %s, %s', did, msg)
 | |
|         # Reply
 | |
|         self.send2device(
 | |
|             did=did, msg={'id': msg['id'], 'result': {'code': 0}})
 | |
| 
 | |
|     def __filter_dup_message(self, did: str, msg_id: int) -> bool:
 | |
|         filter_id = f'{did}.{msg_id}'
 | |
|         if filter_id in self._reply_msg_buffer:
 | |
|             return True
 | |
|         self._reply_msg_buffer[filter_id] = self._internal_loop.call_later(
 | |
|             5,
 | |
|             lambda filter_id: self._reply_msg_buffer.pop(filter_id, None),
 | |
|             filter_id)
 | |
|         return False
 | |
| 
 | |
|     def __sendto(
 | |
|         self, if_name: str | None, data: bytes, address: str, port: int
 | |
|     ) -> None:
 | |
|         if if_name is None:
 | |
|             # Broadcast
 | |
|             for if_n, sock in self._broadcast_socks.items():
 | |
|                 _LOGGER.debug('send broadcast, %s', if_n)
 | |
|                 sock.sendto(data, socket.MSG_DONTWAIT, (address, port))
 | |
|         else:
 | |
|             # Unicast
 | |
|             sock = self._broadcast_socks.get(if_name, None)
 | |
|             if not sock:
 | |
|                 _LOGGER.error('invalid socket, %s', if_name)
 | |
|                 return
 | |
|             sock.sendto(data, socket.MSG_DONTWAIT, (address, port))
 | |
| 
 | |
|     def __scan_devices(self) -> None:
 | |
|         if self._scan_timer:
 | |
|             self._scan_timer.cancel()
 | |
|             self._scan_timer = None
 | |
|         try:
 | |
|             # Scan devices
 | |
|             self.ping(if_name=None, target_ip='255.255.255.255')
 | |
|         except Exception as err: # pylint: disable=broad-exception-caught
 | |
|             # Ignore any exceptions to avoid blocking the loop
 | |
|             _LOGGER.error('ping device error, %s', err)
 | |
|             pass
 | |
|         scan_time = self.__get_next_scan_time()
 | |
|         self._scan_timer = self._internal_loop.call_later(
 | |
|             scan_time, self.__scan_devices)
 | |
|         _LOGGER.debug('next scan time: %ss', scan_time)
 | |
| 
 | |
|     def __get_next_scan_time(self) -> float:
 | |
|         if not self._last_scan_interval:
 | |
|             self._last_scan_interval = self.OT_PROBE_INTERVAL_MIN
 | |
|         self._last_scan_interval = min(
 | |
|             self._last_scan_interval*2, self.OT_PROBE_INTERVAL_MAX)
 | |
|         return self._last_scan_interval
 |