You cannot select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
	
	
		
			1810 lines
		
	
	
		
			64 KiB
		
	
	
	
		
			Python
		
	
			
		
		
	
	
			1810 lines
		
	
	
		
			64 KiB
		
	
	
	
		
			Python
		
	
| # -*- coding: utf-8 -*-
 | |
| """
 | |
| Copyright (C) 2024 Xiaomi Corporation.
 | |
| 
 | |
| The ownership and intellectual property rights of Xiaomi Home Assistant
 | |
| Integration and related Xiaomi cloud service API interface provided under this
 | |
| license, including source code and object code (collectively, "Licensed Work"),
 | |
| are owned by Xiaomi. Subject to the terms and conditions of this License, Xiaomi
 | |
| hereby grants you a personal, limited, non-exclusive, non-transferable,
 | |
| non-sublicensable, and royalty-free license to reproduce, use, modify, and
 | |
| distribute the Licensed Work only for your use of Home Assistant for
 | |
| non-commercial purposes. For the avoidance of doubt, Xiaomi does not authorize
 | |
| you to use the Licensed Work for any other purpose, including but not limited
 | |
| to use Licensed Work to develop applications (APP), Web services, and other
 | |
| forms of software.
 | |
| 
 | |
| You may reproduce and distribute copies of the Licensed Work, with or without
 | |
| modifications, whether in source or object form, provided that you must give
 | |
| any other recipients of the Licensed Work a copy of this License and retain all
 | |
| copyright and disclaimers.
 | |
| 
 | |
| Xiaomi provides the Licensed Work on an "AS IS" BASIS, WITHOUT WARRANTIES OR
 | |
| CONDITIONS OF ANY KIND, either express or implied, including, without
 | |
| limitation, any warranties, undertakes, or conditions of TITLE, NO ERROR OR
 | |
| OMISSION, CONTINUITY, RELIABILITY, NON-INFRINGEMENT, MERCHANTABILITY, or
 | |
| FITNESS FOR A PARTICULAR PURPOSE. In any event, you are solely responsible
 | |
| for any direct, indirect, special, incidental, or consequential damages or
 | |
| losses arising from the use or inability to use the Licensed Work.
 | |
| 
 | |
| Xiaomi reserves all rights not expressly granted to you in this License.
 | |
| Except for the rights expressly granted by Xiaomi under this License, Xiaomi
 | |
| does not authorize you in any form to use the trademarks, copyrights, or other
 | |
| forms of intellectual property rights of Xiaomi and its affiliates, including,
 | |
| without limitation, without obtaining other written permission from Xiaomi, you
 | |
| shall not use "Xiaomi", "Mijia" and other words related to Xiaomi or words that
 | |
| may make the public associate with Xiaomi in any form to publicize or promote
 | |
| the software or hardware devices that use the Licensed Work.
 | |
| 
 | |
| Xiaomi has the right to immediately terminate all your authorization under this
 | |
| License in the event:
 | |
| 1. You assert patent invalidation, litigation, or other claims against patents
 | |
| or other intellectual property rights of Xiaomi or its affiliates; or,
 | |
| 2. You make, have made, manufacture, sell, or offer to sell products that knock
 | |
| off Xiaomi or its affiliates' products.
 | |
| 
 | |
| MIoT Pub/Sub client.
 | |
| """
 | |
| import asyncio
 | |
| import json
 | |
| import logging
 | |
| import os
 | |
| import queue
 | |
| import random
 | |
| import re
 | |
| import ssl
 | |
| import struct
 | |
| import threading
 | |
| from abc import ABC, abstractmethod
 | |
| from dataclasses import dataclass
 | |
| from enum import Enum, auto
 | |
| from typing import Callable, Optional, final
 | |
| 
 | |
| from paho.mqtt.client import (
 | |
|     MQTT_ERR_SUCCESS,
 | |
|     MQTT_ERR_UNKNOWN,
 | |
|     Client,
 | |
|     MQTTv5)
 | |
| 
 | |
| # pylint: disable=relative-beyond-top-level
 | |
| from .common import MIoTMatcher
 | |
| from .const import MIHOME_MQTT_KEEPALIVE
 | |
| from .miot_error import MIoTErrorCode, MIoTMipsError
 | |
| from .miot_ev import MIoTEventLoop, TimeoutHandle
 | |
| 
 | |
| _LOGGER = logging.getLogger(__name__)
 | |
| 
 | |
| 
 | |
| class MipsMsgTypeOptions(Enum):
 | |
|     """MIoT Pub/Sub message type."""
 | |
|     ID = 0
 | |
|     RET_TOPIC = auto()
 | |
|     PAYLOAD = auto()
 | |
|     FROM = auto()
 | |
|     MAX = auto()
 | |
| 
 | |
| 
 | |
| class MipsMessage:
 | |
|     """MIoT Pub/Sub message."""
 | |
|     mid: int = 0
 | |
|     msg_from: str = None
 | |
|     ret_topic: str = None
 | |
|     payload: str = None
 | |
| 
 | |
|     @staticmethod
 | |
|     def unpack(data: bytes):
 | |
|         mips_msg = MipsMessage()
 | |
|         data_len = len(data)
 | |
|         data_start = 0
 | |
|         data_end = 0
 | |
|         while data_start < data_len:
 | |
|             data_end = data_start+5
 | |
|             unpack_len, unpack_type = struct.unpack(
 | |
|                 '<IB', data[data_start:data_end])
 | |
|             unpack_data = data[data_end:data_end+unpack_len]
 | |
|             #  string end with \x00
 | |
|             match unpack_type:
 | |
|                 case MipsMsgTypeOptions.ID.value:
 | |
|                     mips_msg.mid = int.from_bytes(
 | |
|                         unpack_data, byteorder='little')
 | |
|                 case MipsMsgTypeOptions.RET_TOPIC.value:
 | |
|                     mips_msg.ret_topic = str(
 | |
|                         unpack_data.strip(b'\x00'), 'utf-8')
 | |
|                 case MipsMsgTypeOptions.PAYLOAD.value:
 | |
|                     mips_msg.payload = str(unpack_data.strip(b'\x00'), 'utf-8')
 | |
|                 case MipsMsgTypeOptions.FROM.value:
 | |
|                     mips_msg.msg_from = str(
 | |
|                         unpack_data.strip(b'\x00'), 'utf-8')
 | |
|                 case _:
 | |
|                     pass
 | |
|             data_start = data_end+unpack_len
 | |
|         return mips_msg
 | |
| 
 | |
|     @staticmethod
 | |
|     def pack(
 | |
|         mid: int, payload: str, msg_from: str = None, ret_topic: str = None
 | |
|     ) -> bytes:
 | |
|         if mid is None or payload is None:
 | |
|             raise MIoTMipsError('invalid mid or payload')
 | |
|         pack_msg: bytes = b''
 | |
|         # mid
 | |
|         pack_msg += struct.pack('<IBI', 4, MipsMsgTypeOptions.ID.value, mid)
 | |
|         # msg_from
 | |
|         if msg_from:
 | |
|             pack_len = len(msg_from)
 | |
|             pack_msg += struct.pack(
 | |
|                 f'<IB{pack_len}sx', pack_len+1,
 | |
|                 MipsMsgTypeOptions.FROM.value, msg_from.encode('utf-8'))
 | |
|         # ret_topic
 | |
|         if ret_topic:
 | |
|             pack_len = len(ret_topic)
 | |
|             pack_msg += struct.pack(
 | |
|                 f'<IB{pack_len}sx', pack_len+1,
 | |
|                 MipsMsgTypeOptions.RET_TOPIC.value, ret_topic.encode('utf-8'))
 | |
|         # payload
 | |
|         pack_len = len(payload)
 | |
|         pack_msg += struct.pack(
 | |
|             f'<IB{pack_len}sx', pack_len+1,
 | |
|             MipsMsgTypeOptions.PAYLOAD.value, payload.encode('utf-8'))
 | |
|         return pack_msg
 | |
| 
 | |
|     def __str__(self) -> str:
 | |
|         return f'{self.mid}, {self.msg_from}, {self.ret_topic}, {self.payload}'
 | |
| 
 | |
| 
 | |
| class MipsCmdType(Enum):
 | |
|     """MIoT Pub/Sub command type."""
 | |
|     CONNECT = 0
 | |
|     DISCONNECT = auto()
 | |
|     DEINIT = auto()
 | |
|     SUB = auto()
 | |
|     UNSUB = auto()
 | |
|     CALL_API = auto()
 | |
|     REG_BROADCAST = auto()
 | |
|     UNREG_BROADCAST = auto()
 | |
| 
 | |
|     REG_MIPS_STATE = auto()
 | |
|     UNREG_MIPS_STATE = auto()
 | |
|     REG_DEVICE_STATE = auto()
 | |
|     UNREG_DEVICE_STATE = auto()
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class MipsCmd:
 | |
|     """MIoT Pub/Sub command."""
 | |
|     type_: MipsCmdType
 | |
|     data: any
 | |
| 
 | |
|     def __init__(self, type_: MipsCmdType, data: any) -> None:
 | |
|         self.type_ = type_
 | |
|         self.data = data
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class MipsRequest:
 | |
|     """MIoT Pub/Sub request."""
 | |
|     mid: int = None
 | |
|     on_reply: Callable[[str, any], None] = None
 | |
|     on_reply_ctx: any = None
 | |
|     timer: TimeoutHandle = None
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class MipsRequestData:
 | |
|     """MIoT Pub/Sub request data."""
 | |
|     topic: str = None
 | |
|     payload: str = None
 | |
|     on_reply: Callable[[str, any], None] = None
 | |
|     on_reply_ctx: any = None
 | |
|     timeout_ms: int = None
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class MipsSendBroadcastData:
 | |
|     """MIoT Pub/Sub send broadcast data."""
 | |
|     topic: str = None
 | |
|     payload: str = None
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class MipsIncomingApiCall:
 | |
|     """MIoT Pub/Sub incoming API call."""
 | |
|     mid: int = None
 | |
|     ret_topic: str = None
 | |
|     timer: TimeoutHandle = None
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class MipsApi:
 | |
|     """MIoT Pub/Sub API."""
 | |
|     topic: str = None
 | |
|     """
 | |
|     param1: session
 | |
|     param2: payload
 | |
|     param3: handler_ctx
 | |
|     """
 | |
|     handler: Callable[[MipsIncomingApiCall, str, any], None] = None
 | |
|     handler_ctx: any = None
 | |
| 
 | |
| 
 | |
| class MipsRegApi(MipsApi):
 | |
|     """.MIoT Pub/Sub register API."""
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class MipsReplyData:
 | |
|     """MIoT Pub/Sub reply data."""
 | |
|     session: MipsIncomingApiCall = None
 | |
|     payload: str = None
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class MipsBroadcast:
 | |
|     """MIoT Pub/Sub broadcast."""
 | |
|     topic: str = None
 | |
|     """
 | |
|     param 1: msg topic
 | |
|     param 2: msg payload
 | |
|     param 3: handle_ctx
 | |
|     """
 | |
|     handler: Callable[[str, str, any], None] = None
 | |
|     handler_ctx: any = None
 | |
| 
 | |
|     def __str__(self) -> str:
 | |
|         return f'{self.topic}, {id(self.handler)}, {id(self.handler_ctx)}'
 | |
| 
 | |
| 
 | |
| class MipsRegBroadcast(MipsBroadcast):
 | |
|     """MIoT Pub/Sub register broadcast."""
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class MipsState:
 | |
|     """MIoT Pub/Sub state."""
 | |
|     key: str = None
 | |
|     """
 | |
|     str: key
 | |
|     bool: mips connect state
 | |
|     any: ctx
 | |
|     """
 | |
|     handler: Callable[[str, bool], asyncio.Future] = None
 | |
| 
 | |
| 
 | |
| class MipsRegState(MipsState):
 | |
|     """MIoT Pub/Sub register state."""
 | |
| 
 | |
| 
 | |
| class MIoTDeviceState(Enum):
 | |
|     """MIoT device state define."""
 | |
|     DISABLE = 0
 | |
|     OFFLINE = auto()
 | |
|     ONLINE = auto()
 | |
| 
 | |
| 
 | |
| @dataclass
 | |
| class MipsDeviceState:
 | |
|     """MIoT Pub/Sub device state."""
 | |
|     did: str = None
 | |
|     """handler
 | |
|     str: did
 | |
|     MIoTDeviceState: online/offline/disable
 | |
|     any: ctx
 | |
|     """
 | |
|     handler: Callable[[str, MIoTDeviceState, any], None] = None
 | |
|     handler_ctx: any = None
 | |
| 
 | |
| 
 | |
| class MipsRegDeviceState(MipsDeviceState):
 | |
|     """MIoT Pub/Sub register device state."""
 | |
| 
 | |
| 
 | |
| class MipsClient(ABC):
 | |
|     """MIoT Pub/Sub client."""
 | |
|     # pylint: disable=unused-argument
 | |
|     MQTT_INTERVAL_MS = 1000
 | |
|     MIPS_QOS: int = 2
 | |
|     UINT32_MAX: int = 0xFFFFFFFF
 | |
|     MIPS_RECONNECT_INTERVAL_MIN: int = 30000
 | |
|     MIPS_RECONNECT_INTERVAL_MAX: int = 600000
 | |
|     MIPS_SUB_PATCH: int = 300
 | |
|     MIPS_SUB_INTERVAL: int = 1000
 | |
|     main_loop: asyncio.AbstractEventLoop
 | |
|     _logger: logging.Logger
 | |
|     _client_id: str
 | |
|     _host: str
 | |
|     _port: int
 | |
|     _username: str
 | |
|     _password: str
 | |
|     _ca_file: str
 | |
|     _cert_file: str
 | |
|     _key_file: str
 | |
| 
 | |
|     _mqtt_logger: logging.Logger
 | |
|     _mqtt: Client
 | |
|     _mqtt_fd: int
 | |
|     _mqtt_timer: TimeoutHandle
 | |
|     _mqtt_state: bool
 | |
| 
 | |
|     _event_connect: asyncio.Event
 | |
|     _event_disconnect: asyncio.Event
 | |
|     _mev: MIoTEventLoop
 | |
|     _mips_thread: threading.Thread
 | |
|     _mips_queue: queue.Queue
 | |
|     _cmd_event_fd: os.eventfd
 | |
|     _mips_reconnect_tag: bool
 | |
|     _mips_reconnect_interval: int
 | |
|     _mips_reconnect_timer: Optional[TimeoutHandle]
 | |
|     _mips_state_sub_map: dict[str, MipsState]
 | |
|     _mips_sub_pending_map: dict[str, int]
 | |
|     _mips_sub_pending_timer: Optional[TimeoutHandle]
 | |
| 
 | |
|     _on_mips_cmd: Callable[[MipsCmd], None]
 | |
|     _on_mips_message: Callable[[str, bytes], None]
 | |
|     _on_mips_connect: Callable[[int, dict], None]
 | |
|     _on_mips_disconnect: Callable[[int, dict], None]
 | |
| 
 | |
|     def __init__(
 | |
|             self, client_id: str, host: str, port: int,
 | |
|             username: str = None, password: str = None,
 | |
|             ca_file: str = None, cert_file: str = None, key_file: str = None,
 | |
|             loop: Optional[asyncio.AbstractEventLoop] = None
 | |
|     ) -> None:
 | |
|         # MUST run with running loop
 | |
|         self.main_loop = loop or asyncio.get_running_loop()
 | |
|         self._logger = None
 | |
|         self._client_id = client_id
 | |
|         self._host = host
 | |
|         self._port = port
 | |
|         self._username = username
 | |
|         self._password = password
 | |
|         self._ca_file = ca_file
 | |
|         self._cert_file = cert_file
 | |
|         self._key_file = key_file
 | |
| 
 | |
|         self._mqtt_logger = None
 | |
|         self._mqtt_fd = -1
 | |
|         self._mqtt_timer = None
 | |
|         self._mqtt_state = False
 | |
|         # mqtt init for API_VERSION2,
 | |
|         # callback_api_version=CallbackAPIVersion.VERSION2,
 | |
|         self._mqtt = Client(client_id=self._client_id, protocol=MQTTv5)
 | |
|         self._mqtt.enable_logger(logger=self._mqtt_logger)
 | |
| 
 | |
|         # Mips init
 | |
|         self._event_connect = asyncio.Event()
 | |
|         self._event_disconnect = asyncio.Event()
 | |
|         self._mips_reconnect_tag = False
 | |
|         self._mips_reconnect_interval = 0
 | |
|         self._mips_reconnect_timer = None
 | |
|         self._mips_state_sub_map = {}
 | |
|         self._mips_sub_pending_map = {}
 | |
|         self._mips_sub_pending_timer = None
 | |
|         self._mev = MIoTEventLoop()
 | |
|         self._mips_queue = queue.Queue()
 | |
|         self._cmd_event_fd = os.eventfd(0, os.O_NONBLOCK)
 | |
|         self.mev_set_read_handler(
 | |
|             self._cmd_event_fd, self.__mips_cmd_read_handler, None)
 | |
|         self._mips_thread = threading.Thread(target=self.__mips_loop_thread)
 | |
|         self._mips_thread.daemon = True
 | |
|         self._mips_thread.name = self._client_id
 | |
|         self._mips_thread.start()
 | |
| 
 | |
|         self._on_mips_cmd = None
 | |
|         self._on_mips_message = None
 | |
|         self._on_mips_connect = None
 | |
|         self._on_mips_disconnect = None
 | |
| 
 | |
|     @property
 | |
|     def client_id(self) -> str:
 | |
|         return self._client_id
 | |
| 
 | |
|     @property
 | |
|     def host(self) -> str:
 | |
|         return self._host
 | |
| 
 | |
|     @property
 | |
|     def port(self) -> int:
 | |
|         return self._port
 | |
| 
 | |
|     @final
 | |
|     @property
 | |
|     def mips_state(self) -> bool:
 | |
|         """mips connect state.
 | |
| 
 | |
|         Returns:
 | |
|             bool: True: connected, False: disconnected
 | |
|         """
 | |
|         return self._mqtt and self._mqtt.is_connected()
 | |
| 
 | |
|     @final
 | |
|     def mips_deinit(self) -> None:
 | |
|         self._mips_send_cmd(type_=MipsCmdType.DEINIT, data=None)
 | |
|         self._mips_thread.join()
 | |
|         self._mips_thread = None
 | |
| 
 | |
|         self._logger = None
 | |
|         self._client_id = None
 | |
|         self._host = None
 | |
|         self._port = None
 | |
|         self._username = None
 | |
|         self._password = None
 | |
|         self._ca_file = None
 | |
|         self._cert_file = None
 | |
|         self._key_file = None
 | |
|         self._mqtt_logger = None
 | |
|         self._mips_state_sub_map = None
 | |
|         self._mips_sub_pending_map = None
 | |
|         self._mips_sub_pending_timer = None
 | |
| 
 | |
|         self._event_connect = None
 | |
|         self._event_disconnect = None
 | |
| 
 | |
|     def update_mqtt_password(self, password: str) -> None:
 | |
|         self._password = password
 | |
|         self._mqtt.username_pw_set(
 | |
|             username=self._username, password=self._password)
 | |
| 
 | |
|     def log_debug(self, msg, *args, **kwargs) -> None:
 | |
|         if self._logger:
 | |
|             self._logger.debug(f'{self._client_id}, '+msg, *args, **kwargs)
 | |
| 
 | |
|     def log_info(self, msg, *args, **kwargs) -> None:
 | |
|         if self._logger:
 | |
|             self._logger.info(f'{self._client_id}, '+msg, *args, **kwargs)
 | |
| 
 | |
|     def log_error(self, msg, *args, **kwargs) -> None:
 | |
|         if self._logger:
 | |
|             self._logger.error(f'{self._client_id}, '+msg, *args, **kwargs)
 | |
| 
 | |
|     def enable_logger(self, logger: Optional[logging.Logger] = None) -> None:
 | |
|         self._logger = logger
 | |
| 
 | |
|     def enable_mqtt_logger(
 | |
|         self, logger: Optional[logging.Logger] = None
 | |
|     ) -> None:
 | |
|         if logger:
 | |
|             self._mqtt.enable_logger(logger=logger)
 | |
|         else:
 | |
|             self._mqtt.disable_logger()
 | |
| 
 | |
|     @final
 | |
|     def mips_connect(self) -> None:
 | |
|         """mips connect."""
 | |
|         return self._mips_send_cmd(type_=MipsCmdType.CONNECT, data=None)
 | |
| 
 | |
|     @final
 | |
|     async def mips_connect_async(self) -> None:
 | |
|         """mips connect async."""
 | |
|         self._mips_send_cmd(type_=MipsCmdType.CONNECT, data=None)
 | |
|         return await self._event_connect.wait()
 | |
| 
 | |
|     @final
 | |
|     def mips_disconnect(self) -> None:
 | |
|         """mips disconnect."""
 | |
|         return self._mips_send_cmd(type_=MipsCmdType.DISCONNECT, data=None)
 | |
| 
 | |
|     @final
 | |
|     async def mips_disconnect_async(self) -> None:
 | |
|         """mips disconnect async."""
 | |
|         self._mips_send_cmd(type_=MipsCmdType.DISCONNECT, data=None)
 | |
|         return await self._event_disconnect.wait()
 | |
| 
 | |
|     @final
 | |
|     def sub_mips_state(
 | |
|         self, key: str, handler: Callable[[str, bool], asyncio.Future]
 | |
|     ) -> bool:
 | |
|         """Subscribe mips state.
 | |
|         NOTICE: callback to main loop thread
 | |
|         """
 | |
|         if isinstance(key, str) is False or handler is None:
 | |
|             raise MIoTMipsError('invalid params')
 | |
|         return self._mips_send_cmd(
 | |
|             type_=MipsCmdType.REG_MIPS_STATE,
 | |
|             data=MipsRegState(key=key, handler=handler))
 | |
| 
 | |
|     @final
 | |
|     def unsub_mips_state(self, key: str) -> bool:
 | |
|         """Unsubscribe mips state."""
 | |
|         if isinstance(key, str) is False:
 | |
|             raise MIoTMipsError('invalid params')
 | |
|         return self._mips_send_cmd(
 | |
|             type_=MipsCmdType.UNREG_MIPS_STATE, data=MipsRegState(key=key))
 | |
| 
 | |
|     @final
 | |
|     def mev_set_timeout(
 | |
|         self, timeout_ms: int, handler: Callable[[any], None],
 | |
|         handler_ctx: any = None
 | |
|     ) -> Optional[TimeoutHandle]:
 | |
|         """set timeout.
 | |
|         NOTICE: Internal function, only mips threads are allowed to call
 | |
|         """
 | |
|         if self._mev is None:
 | |
|             return None
 | |
|         return self._mev.set_timeout(
 | |
|             timeout_ms=timeout_ms,  handler=handler, handler_ctx=handler_ctx)
 | |
| 
 | |
|     @final
 | |
|     def mev_clear_timeout(self, handle: TimeoutHandle) -> None:
 | |
|         """clear timeout.
 | |
|         NOTICE: Internal function, only mips threads are allowed to call
 | |
|         """
 | |
|         if self._mev is None:
 | |
|             return
 | |
|         self._mev.clear_timeout(handle)
 | |
| 
 | |
|     @final
 | |
|     def mev_set_read_handler(
 | |
|         self, fd: int, handler: Callable[[any], None], handler_ctx: any
 | |
|     ) -> bool:
 | |
|         """set read handler.
 | |
|         NOTICE: Internal function, only mips threads are allowed to call
 | |
|         """
 | |
|         if self._mev is None:
 | |
|             return False
 | |
|         return self._mev.set_read_handler(
 | |
|             fd=fd, handler=handler, handler_ctx=handler_ctx)
 | |
| 
 | |
|     @final
 | |
|     def mev_set_write_handler(
 | |
|         self, fd: int, handler: Callable[[any], None], handler_ctx: any
 | |
|     ) -> bool:
 | |
|         """set write handler.
 | |
|         NOTICE: Internal function, only mips threads are allowed to call
 | |
|         """
 | |
|         if self._mev is None:
 | |
|             return False
 | |
|         return self._mev.set_write_handler(
 | |
|             fd=fd, handler=handler, handler_ctx=handler_ctx)
 | |
| 
 | |
|     @property
 | |
|     def on_mips_cmd(self) -> Callable[[MipsCmd], None]:
 | |
|         return self._on_mips_cmd
 | |
| 
 | |
|     @on_mips_cmd.setter
 | |
|     def on_mips_cmd(self, handler: Callable[[MipsCmd], None]) -> None:
 | |
|         """MUST set after __init__ done.
 | |
|         NOTICE thread safe, this function will be called at the **mips** thread
 | |
|         """
 | |
|         self._on_mips_cmd = handler
 | |
| 
 | |
|     @property
 | |
|     def on_mips_message(self) -> Callable[[str, bytes], None]:
 | |
|         return self._on_mips_message
 | |
| 
 | |
|     @on_mips_message.setter
 | |
|     def on_mips_message(self, handler: Callable[[str, bytes], None]) -> None:
 | |
|         """MUST set after __init__ done.
 | |
|         NOTICE thread safe, this function will be called at the **mips** thread
 | |
|         """
 | |
|         self._on_mips_message = handler
 | |
| 
 | |
|     @property
 | |
|     def on_mips_connect(self) -> Callable[[int, dict], None]:
 | |
|         return self._on_mips_connect
 | |
| 
 | |
|     @on_mips_connect.setter
 | |
|     def on_mips_connect(self, handler: Callable[[int, dict], None]) -> None:
 | |
|         """MUST set after __init__ done.
 | |
|         NOTICE thread safe, this function will be called at the
 | |
|         **main loop** thread
 | |
|         """
 | |
|         self._on_mips_connect = handler
 | |
| 
 | |
|     @property
 | |
|     def on_mips_disconnect(self) -> Callable[[int, dict], None]:
 | |
|         return self._on_mips_disconnect
 | |
| 
 | |
|     @on_mips_disconnect.setter
 | |
|     def on_mips_disconnect(self, handler: Callable[[int, dict], None]) -> None:
 | |
|         """MUST set after __init__ done.
 | |
|         NOTICE thread safe, this function will be called at the
 | |
|         **main loop** thread
 | |
|         """
 | |
|         self._on_mips_disconnect = handler
 | |
| 
 | |
|     @abstractmethod
 | |
|     def sub_prop(
 | |
|         self, did: str, handler: Callable[[dict, any], None],
 | |
|         siid: int = None, piid: int = None, handler_ctx: any = None
 | |
|     ) -> bool: ...
 | |
| 
 | |
|     @abstractmethod
 | |
|     def unsub_prop(
 | |
|         self, did: str, siid: int = None, piid: int = None
 | |
|     ) -> bool: ...
 | |
| 
 | |
|     @abstractmethod
 | |
|     def sub_event(
 | |
|         self, did: str, handler: Callable[[dict, any], None],
 | |
|         siid: int = None, eiid: int = None, handler_ctx: any = None
 | |
|     ) -> bool: ...
 | |
| 
 | |
|     @abstractmethod
 | |
|     def unsub_event(
 | |
|         self, did: str, siid: int = None, eiid: int = None
 | |
|     ) -> bool: ...
 | |
| 
 | |
|     @abstractmethod
 | |
|     async def get_dev_list_async(
 | |
|         self, payload: str = None, timeout_ms: int = 10000
 | |
|     ) -> dict[str, dict]: ...
 | |
| 
 | |
|     @abstractmethod
 | |
|     async def get_prop_async(
 | |
|         self, did: str, siid: int, piid: int, timeout_ms: int = 10000
 | |
|     ) -> any: ...
 | |
| 
 | |
|     @abstractmethod
 | |
|     async def set_prop_async(
 | |
|         self, did: str, siid: int, piid: int, value: any,
 | |
|         timeout_ms: int = 10000
 | |
|     ) -> bool: ...
 | |
| 
 | |
|     @abstractmethod
 | |
|     async def action_async(
 | |
|         self, did: str, siid: int, aiid: int, in_list: list,
 | |
|         timeout_ms: int = 10000
 | |
|     ) -> tuple[bool, list]: ...
 | |
| 
 | |
|     @final
 | |
|     def _mips_sub_internal(self, topic: str) -> None:
 | |
|         """mips subscribe.
 | |
|         NOTICE: Internal function, only mips threads are allowed to call
 | |
|         """
 | |
|         self.__thread_check()
 | |
|         if not self._mqtt or not self._mqtt.is_connected():
 | |
|             return
 | |
|         try:
 | |
|             if topic not in self._mips_sub_pending_map:
 | |
|                 self._mips_sub_pending_map[topic] = 0
 | |
|             if not self._mips_sub_pending_timer:
 | |
|                 self._mips_sub_pending_timer = self.mev_set_timeout(
 | |
|                     10, self.__mips_sub_internal_pending_handler, topic)
 | |
|         except Exception as err:  # pylint: disable=broad-exception-caught
 | |
|             # Catch all exception
 | |
|             self.log_error(f'mips sub internal error, {topic}. {err}')
 | |
| 
 | |
|     @final
 | |
|     def _mips_unsub_internal(self, topic: str) -> None:
 | |
|         """mips unsubscribe.
 | |
|         NOTICE: Internal function, only mips threads are allowed to call
 | |
|         """
 | |
|         self.__thread_check()
 | |
|         if not self._mqtt or not self._mqtt.is_connected():
 | |
|             return
 | |
|         try:
 | |
|             result, mid = self._mqtt.unsubscribe(topic=topic)
 | |
|             if result == MQTT_ERR_SUCCESS:
 | |
|                 self.log_debug(
 | |
|                     f'mips unsub internal success, {result}, {mid}, {topic}')
 | |
|                 return
 | |
|             self.log_error(
 | |
|                 f'mips unsub internal error, {result}, {mid}, {topic}')
 | |
|         except Exception as err:  # pylint: disable=broad-exception-caught
 | |
|             # Catch all exception
 | |
|             self.log_error(f'mips unsub internal error, {topic}, {err}')
 | |
| 
 | |
|     @final
 | |
|     def _mips_publish_internal(
 | |
|         self, topic: str, payload: str | bytes,
 | |
|         wait_for_publish: bool = False, timeout_ms: int = 10000
 | |
|     ) -> bool:
 | |
|         """mips publish message.
 | |
|         NOTICE: Internal function, only mips threads are allowed to call
 | |
| 
 | |
|         """
 | |
|         self.__thread_check()
 | |
|         if not self._mqtt or not self._mqtt.is_connected():
 | |
|             return False
 | |
|         try:
 | |
|             handle = self._mqtt.publish(
 | |
|                 topic=topic, payload=payload, qos=self.MIPS_QOS)
 | |
|             # self.log_debug(f'_mips_publish_internal, {topic}, {payload}')
 | |
|             if wait_for_publish is True:
 | |
|                 handle.wait_for_publish(timeout_ms/1000.0)
 | |
|             return True
 | |
|         except Exception as err:  # pylint: disable=broad-exception-caught
 | |
|             # Catch other exception
 | |
|             self.log_error(f'mips publish internal error, {err}')
 | |
|         return False
 | |
| 
 | |
|     @final
 | |
|     def _mips_send_cmd(self, type_: MipsCmdType, data: any) -> bool:
 | |
|         if self._mips_queue is None or self._cmd_event_fd is None:
 | |
|             raise MIoTMipsError('send mips cmd disable')
 | |
|         # Put data to queue
 | |
|         self._mips_queue.put(MipsCmd(type_=type_, data=data))
 | |
|         # Write event fd
 | |
|         os.eventfd_write(self._cmd_event_fd, 1)
 | |
|         # self.log_debug(f'send mips cmd, {type}, {data}')
 | |
|         return True
 | |
| 
 | |
|     def __thread_check(self) -> None:
 | |
|         if threading.current_thread() is not self._mips_thread:
 | |
|             raise MIoTMipsError('illegal call')
 | |
| 
 | |
|     def __mips_cmd_read_handler(self, ctx: any) -> None:
 | |
|         fd_value = os.eventfd_read(self._cmd_event_fd)
 | |
|         if fd_value == 0:
 | |
|             return
 | |
|         while self._mips_queue.empty() is False:
 | |
|             mips_cmd: MipsCmd = self._mips_queue.get(block=False)
 | |
|             if mips_cmd.type_ == MipsCmdType.CONNECT:
 | |
|                 self._mips_reconnect_tag = True
 | |
|                 self.__mips_try_reconnect(immediately=True)
 | |
|             elif mips_cmd.type_ == MipsCmdType.DISCONNECT:
 | |
|                 self._mips_reconnect_tag = False
 | |
|                 self.__mips_disconnect()
 | |
|             elif mips_cmd.type_ == MipsCmdType.DEINIT:
 | |
|                 self.log_info('mips client recv deinit cmd')
 | |
|                 self.__mips_disconnect()
 | |
|                 # Close cmd event fd
 | |
|                 if self._cmd_event_fd:
 | |
|                     self.mev_set_read_handler(
 | |
|                         self._cmd_event_fd, None, None)
 | |
|                     os.close(self._cmd_event_fd)
 | |
|                     self._cmd_event_fd = None
 | |
|                 if self._mips_queue:
 | |
|                     self._mips_queue = None
 | |
|                 # ev loop stop
 | |
|                 if self._mev:
 | |
|                     self._mev.loop_stop()
 | |
|                     self._mev = None
 | |
|                 break
 | |
|             elif mips_cmd.type_ == MipsCmdType.REG_MIPS_STATE:
 | |
|                 state: MipsState = mips_cmd.data
 | |
|                 self._mips_state_sub_map[state.key] = state
 | |
|                 self.log_debug(f'mips register mips state, {state.key}')
 | |
|             elif mips_cmd.type_ == MipsCmdType.UNREG_MIPS_STATE:
 | |
|                 state: MipsState = mips_cmd.data
 | |
|                 del self._mips_state_sub_map[state.key]
 | |
|                 self.log_debug(f'mips unregister mips state, {state.key}')
 | |
|             else:
 | |
|                 if self._on_mips_cmd:
 | |
|                     self._on_mips_cmd(mips_cmd=mips_cmd)
 | |
| 
 | |
|     def __mqtt_read_handler(self, ctx: any) -> None:
 | |
|         self.__mqtt_loop_handler(ctx=ctx)
 | |
| 
 | |
|     def __mqtt_write_handler(self, ctx: any) -> None:
 | |
|         self.mev_set_write_handler(self._mqtt_fd, None, None)
 | |
|         self.__mqtt_loop_handler(ctx=ctx)
 | |
| 
 | |
|     def __mqtt_timer_handler(self, ctx: any) -> None:
 | |
|         self.__mqtt_loop_handler(ctx=ctx)
 | |
|         if self._mqtt:
 | |
|             self._mqtt_timer = self.mev_set_timeout(
 | |
|                 self.MQTT_INTERVAL_MS, self.__mqtt_timer_handler, None)
 | |
| 
 | |
|     def __mqtt_loop_handler(self, ctx: any) -> None:
 | |
|         try:
 | |
|             if self._mqtt:
 | |
|                 self._mqtt.loop_read()
 | |
|             if self._mqtt:
 | |
|                 self._mqtt.loop_write()
 | |
|             if self._mqtt:
 | |
|                 self._mqtt.loop_misc()
 | |
|             if self._mqtt and self._mqtt.want_write():
 | |
|                 self.mev_set_write_handler(
 | |
|                     self._mqtt_fd, self.__mqtt_write_handler, None)
 | |
|         except Exception as err:  # pylint: disable=broad-exception-caught
 | |
|             # Catch all exception
 | |
|             self.log_error(f'__mqtt_loop_handler, {err}')
 | |
|             raise err
 | |
| 
 | |
|     def __mips_loop_thread(self) -> None:
 | |
|         self.log_info('mips_loop_thread start')
 | |
|         # Set mqtt config
 | |
|         if self._username:
 | |
|             self._mqtt.username_pw_set(
 | |
|                 username=self._username, password=self._password)
 | |
|         if (
 | |
|             self._ca_file
 | |
|             and self._cert_file
 | |
|             and self._key_file
 | |
|         ):
 | |
|             self._mqtt.tls_set(
 | |
|                 tls_version=ssl.PROTOCOL_TLS_CLIENT,
 | |
|                 ca_certs=self._ca_file,
 | |
|                 certfile=self._cert_file,
 | |
|                 keyfile=self._key_file)
 | |
|         else:
 | |
|             self._mqtt.tls_set(tls_version=ssl.PROTOCOL_TLS_CLIENT)
 | |
|         self._mqtt.tls_insecure_set(True)
 | |
|         self._mqtt.on_connect = self.__on_connect
 | |
|         self._mqtt.on_connect_fail = self.__on_connect_failed
 | |
|         self._mqtt.on_disconnect = self.__on_disconnect
 | |
|         self._mqtt.on_message = self.__on_message
 | |
|         # Run event loop
 | |
|         self._mev.loop_forever()
 | |
|         self.log_info('mips_loop_thread exit!')
 | |
| 
 | |
|     def __on_connect(self, client, user_data, flags, rc, props) -> None:
 | |
|         if not self._mqtt.is_connected():
 | |
|             return
 | |
|         self.log_info(f'mips connect, {flags}, {rc}, {props}')
 | |
|         self._mqtt_state = True
 | |
|         if self._on_mips_connect:
 | |
|             self.mev_set_timeout(
 | |
|                 timeout_ms=0,
 | |
|                 handler=lambda ctx:
 | |
|                     self._on_mips_connect(rc, props))
 | |
|         for item in self._mips_state_sub_map.values():
 | |
|             if item.handler is None:
 | |
|                 continue
 | |
|             self.main_loop.call_soon_threadsafe(
 | |
|                 self.main_loop.create_task,
 | |
|                 item.handler(item.key, True))
 | |
|         # Resolve future
 | |
|         self._event_connect.set()
 | |
|         self._event_disconnect.clear()
 | |
| 
 | |
|     def __on_connect_failed(self, client, user_data, flags, rc) -> None:
 | |
|         self.log_error(f'mips connect failed, {flags}, {rc}')
 | |
|         # Try to reconnect
 | |
|         self.__mips_try_reconnect()
 | |
| 
 | |
|     def __on_disconnect(self,  client, user_data, rc, props) -> None:
 | |
|         if self._mqtt_state:
 | |
|             self.log_error(f'mips disconnect, {rc}, {props}')
 | |
|             self._mqtt_state = False
 | |
|             if self._mqtt_timer:
 | |
|                 self.mev_clear_timeout(self._mqtt_timer)
 | |
|                 self._mqtt_timer = None
 | |
|             if self._mqtt_fd != -1:
 | |
|                 self.mev_set_read_handler(self._mqtt_fd, None, None)
 | |
|                 self.mev_set_write_handler(self._mqtt_fd, None, None)
 | |
|                 self._mqtt_fd = -1
 | |
|             # Clear retry sub
 | |
|             if self._mips_sub_pending_timer:
 | |
|                 self.mev_clear_timeout(self._mips_sub_pending_timer)
 | |
|                 self._mips_sub_pending_timer = None
 | |
|             self._mips_sub_pending_map = {}
 | |
|             if self._on_mips_disconnect:
 | |
|                 self.mev_set_timeout(
 | |
|                     timeout_ms=0,
 | |
|                     handler=lambda ctx:
 | |
|                         self._on_mips_disconnect(rc, props))
 | |
|             # Call state sub handler
 | |
|             for item in self._mips_state_sub_map.values():
 | |
|                 if item.handler is None:
 | |
|                     continue
 | |
|                 self.main_loop.call_soon_threadsafe(
 | |
|                     self.main_loop.create_task,
 | |
|                     item.handler(item.key, False))
 | |
| 
 | |
|         # Try to reconnect
 | |
|         self.__mips_try_reconnect()
 | |
|         # Set event
 | |
|         self._event_disconnect.set()
 | |
|         self._event_connect.clear()
 | |
| 
 | |
|     def __on_message(self, client, user_data, msg) -> None:
 | |
|         self._on_mips_message(topic=msg.topic, payload=msg.payload)
 | |
| 
 | |
|     def __mips_try_reconnect(self, immediately: bool = False) -> None:
 | |
|         if self._mips_reconnect_timer:
 | |
|             self.mev_clear_timeout(self._mips_reconnect_timer)
 | |
|             self._mips_reconnect_timer = None
 | |
|         if not self._mips_reconnect_tag:
 | |
|             return
 | |
|         interval: int = 0
 | |
|         if not immediately:
 | |
|             interval = self.__get_next_reconnect_time()
 | |
|             self.log_error(
 | |
|                 'mips try reconnect after %sms', interval)
 | |
|         self._mips_reconnect_timer = self.mev_set_timeout(
 | |
|             interval, self.__mips_connect, None)
 | |
| 
 | |
|     def __mips_sub_internal_pending_handler(self, ctx: any) -> None:
 | |
|         subbed_count = 1
 | |
|         for topic in list(self._mips_sub_pending_map.keys()):
 | |
|             if subbed_count > self.MIPS_SUB_PATCH:
 | |
|                 break
 | |
|             count = self._mips_sub_pending_map[topic]
 | |
|             if count > 3:
 | |
|                 self._mips_sub_pending_map.pop(topic)
 | |
|                 self.log_error(f'retry mips sub internal error, {topic}')
 | |
|                 continue
 | |
|             subbed_count += 1
 | |
|             result, mid = self._mqtt.subscribe(topic, qos=self.MIPS_QOS)
 | |
|             if result == MQTT_ERR_SUCCESS:
 | |
|                 self._mips_sub_pending_map.pop(topic)
 | |
|                 self.log_debug(f'mips sub internal success, {topic}')
 | |
|                 continue
 | |
|             self._mips_sub_pending_map[topic] = count+1
 | |
|             self.log_error(
 | |
|                 f'retry mips sub internal, {count}, {topic}, {result}, {mid}')
 | |
| 
 | |
|         if len(self._mips_sub_pending_map):
 | |
|             self._mips_sub_pending_timer = self.mev_set_timeout(
 | |
|                 self.MIPS_SUB_INTERVAL,
 | |
|                 self.__mips_sub_internal_pending_handler, None)
 | |
|         else:
 | |
|             self._mips_sub_pending_timer = None
 | |
| 
 | |
|     def __mips_connect(self, ctx: any = None) -> None:
 | |
|         result = MQTT_ERR_UNKNOWN
 | |
|         if self._mips_reconnect_timer:
 | |
|             self.mev_clear_timeout(self._mips_reconnect_timer)
 | |
|             self._mips_reconnect_timer = None
 | |
|         try:
 | |
|             # Try clean mqtt fd before mqtt connect
 | |
|             if self._mqtt_timer:
 | |
|                 self.mev_clear_timeout(self._mqtt_timer)
 | |
|                 self._mqtt_timer = None
 | |
|             if self._mqtt_fd != -1:
 | |
|                 self.mev_set_read_handler(self._mqtt_fd, None, None)
 | |
|                 self.mev_set_write_handler(self._mqtt_fd, None, None)
 | |
|                 self._mqtt_fd = -1
 | |
|             result = self._mqtt.connect(
 | |
|                 host=self._host, port=self._port,
 | |
|                 clean_start=True, keepalive=MIHOME_MQTT_KEEPALIVE)
 | |
|             self.log_info(f'__mips_connect success, {result}')
 | |
|         except (TimeoutError, OSError) as error:
 | |
|             self.log_error('__mips_connect, connect error, %s', error)
 | |
| 
 | |
|         if result == MQTT_ERR_SUCCESS:
 | |
|             self._mqtt_fd = self._mqtt.socket()
 | |
|             self.log_debug(f'__mips_connect, _mqtt_fd, {self._mqtt_fd}')
 | |
|             self.mev_set_read_handler(
 | |
|                 self._mqtt_fd, self.__mqtt_read_handler, None)
 | |
|             if self._mqtt.want_write():
 | |
|                 self.mev_set_write_handler(
 | |
|                     self._mqtt_fd, self.__mqtt_write_handler, None)
 | |
|             self._mqtt_timer = self.mev_set_timeout(
 | |
|                 self.MQTT_INTERVAL_MS, self.__mqtt_timer_handler, None)
 | |
|         else:
 | |
|             self.log_error(f'__mips_connect error result, {result}')
 | |
|             self.__mips_try_reconnect()
 | |
| 
 | |
|     def __mips_disconnect(self) -> None:
 | |
|         if self._mips_reconnect_timer:
 | |
|             self.mev_clear_timeout(self._mips_reconnect_timer)
 | |
|             self._mips_reconnect_timer = None
 | |
|         if self._mqtt_timer:
 | |
|             self.mev_clear_timeout(self._mqtt_timer)
 | |
|             self._mqtt_timer = None
 | |
|         if self._mqtt_fd != -1:
 | |
|             self.mev_set_read_handler(self._mqtt_fd, None, None)
 | |
|             self.mev_set_write_handler(self._mqtt_fd, None, None)
 | |
|             self._mqtt_fd = -1
 | |
|         self._mqtt.disconnect()
 | |
| 
 | |
|     def __get_next_reconnect_time(self) -> int:
 | |
|         if self._mips_reconnect_interval == 0:
 | |
|             self._mips_reconnect_interval = self.MIPS_RECONNECT_INTERVAL_MIN
 | |
|         else:
 | |
|             self._mips_reconnect_interval = min(
 | |
|                 self._mips_reconnect_interval*2,
 | |
|                 self.MIPS_RECONNECT_INTERVAL_MAX)
 | |
|         return self._mips_reconnect_interval
 | |
| 
 | |
| 
 | |
| class MipsCloudClient(MipsClient):
 | |
|     """MIoT Pub/Sub Cloud Client."""
 | |
|     # pylint: disable=unused-argument
 | |
|     # pylint: disable=inconsistent-quotes
 | |
|     _msg_matcher: MIoTMatcher
 | |
| 
 | |
|     def __init__(
 | |
|             self, uuid: str, cloud_server: str, app_id: str,
 | |
|             token: str, port: int = 8883,
 | |
|             loop: Optional[asyncio.AbstractEventLoop] = None
 | |
|     ) -> None:
 | |
|         self._msg_matcher = MIoTMatcher()
 | |
|         super().__init__(
 | |
|             client_id=f'ha.{uuid}', host=f'{cloud_server}-ha.mqtt.io.mi.com',
 | |
|             port=port, username=app_id, password=token, loop=loop)
 | |
| 
 | |
|         self.on_mips_cmd = self.__on_mips_cmd_handler
 | |
|         self.on_mips_message = self.__on_mips_message_handler
 | |
|         self.on_mips_connect = self.__on_mips_connect_handler
 | |
|         self.on_mips_disconnect = self.__on_mips_disconnect_handler
 | |
| 
 | |
|     def deinit(self) -> None:
 | |
|         self.mips_deinit()
 | |
|         self._msg_matcher = None
 | |
|         self.on_mips_cmd = None
 | |
|         self.on_mips_message = None
 | |
|         self.on_mips_connect = None
 | |
| 
 | |
|     @final
 | |
|     def connect(self) -> None:
 | |
|         self.mips_connect()
 | |
| 
 | |
|     @final
 | |
|     async def connect_async(self) -> None:
 | |
|         await self.mips_connect_async()
 | |
| 
 | |
|     @final
 | |
|     def disconnect(self) -> None:
 | |
|         self.mips_disconnect()
 | |
|         self._msg_matcher = MIoTMatcher()
 | |
| 
 | |
|     @final
 | |
|     async def disconnect_async(self) -> None:
 | |
|         await self.mips_disconnect_async()
 | |
|         self._msg_matcher = MIoTMatcher()
 | |
| 
 | |
|     def update_access_token(self, access_token: str) -> bool:
 | |
|         if not isinstance(access_token, str):
 | |
|             raise MIoTMipsError('invalid token')
 | |
|         return self.update_mqtt_password(password=access_token)
 | |
| 
 | |
|     @final
 | |
|     def sub_prop(
 | |
|         self, did: str, handler: Callable[[dict, any], None],
 | |
|         siid: int = None, piid: int = None, handler_ctx: any = None
 | |
|     ) -> bool:
 | |
|         if not isinstance(did, str) or handler is None:
 | |
|             raise MIoTMipsError('invalid params')
 | |
| 
 | |
|         topic: str = (
 | |
|             f'device/{did}/up/properties_changed/'
 | |
|             f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}')
 | |
| 
 | |
|         def on_prop_msg(topic: str, payload: str, ctx: any) -> bool:
 | |
|             try:
 | |
|                 msg: dict = json.loads(payload)
 | |
|             except json.JSONDecodeError:
 | |
|                 self.log_error(
 | |
|                     f'on_prop_msg, invalid msg, {topic}, {payload}')
 | |
|                 return
 | |
|             if (
 | |
|                 not isinstance(msg.get('params', None), dict)
 | |
|                 or 'siid' not in msg['params']
 | |
|                 or 'piid' not in msg['params']
 | |
|                 or 'value' not in msg['params']
 | |
|             ):
 | |
|                 self.log_error(
 | |
|                     f'on_prop_msg, invalid msg, {topic}, {payload}')
 | |
|                 return
 | |
|             if handler:
 | |
|                 self.log_debug('on properties_changed, %s', payload)
 | |
|                 handler(msg['params'], ctx)
 | |
|         return self.__reg_broadcast(
 | |
|             topic=topic, handler=on_prop_msg, handler_ctx=handler_ctx)
 | |
| 
 | |
|     @final
 | |
|     def unsub_prop(self, did: str, siid: int = None, piid: int = None) -> bool:
 | |
|         if not isinstance(did, str):
 | |
|             raise MIoTMipsError('invalid params')
 | |
|         topic: str = (
 | |
|             f'device/{did}/up/properties_changed/'
 | |
|             f'{"#" if siid is None or piid is None else f"{siid}/{piid}"}')
 | |
|         return self.__unreg_broadcast(topic=topic)
 | |
| 
 | |
|     @final
 | |
|     def sub_event(
 | |
|         self, did: str, handler: Callable[[dict, any], None],
 | |
|         siid: int = None, eiid: int = None, handler_ctx: any = None
 | |
|     ) -> bool:
 | |
|         if not isinstance(did, str) or handler is None:
 | |
|             raise MIoTMipsError('invalid params')
 | |
|         # Spelling error: event_occured
 | |
|         topic: str = (
 | |
|             f'device/{did}/up/event_occured/'
 | |
|             f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}')
 | |
| 
 | |
|         def on_event_msg(topic: str, payload: str, ctx: any) -> bool:
 | |
|             try:
 | |
|                 msg: dict = json.loads(payload)
 | |
|             except json.JSONDecodeError:
 | |
|                 self.log_error(
 | |
|                     f'on_event_msg, invalid msg, {topic}, {payload}')
 | |
|                 return
 | |
|             if (
 | |
|                 not isinstance(msg.get('params', None), dict)
 | |
|                 or 'siid' not in msg['params']
 | |
|                 or 'eiid' not in msg['params']
 | |
|                 or 'arguments' not in msg['params']
 | |
|             ):
 | |
|                 self.log_error(
 | |
|                     f'on_event_msg, invalid msg, {topic}, {payload}')
 | |
|                 return
 | |
|             if handler:
 | |
|                 self.log_debug('on on_event_msg, %s', payload)
 | |
|                 msg['params']['from'] = 'cloud'
 | |
|                 handler(msg['params'], ctx)
 | |
|         return self.__reg_broadcast(
 | |
|             topic=topic, handler=on_event_msg, handler_ctx=handler_ctx)
 | |
| 
 | |
|     @final
 | |
|     def unsub_event(self, did: str, siid: int = None, eiid: int = None) -> bool:
 | |
|         if not isinstance(did, str):
 | |
|             raise MIoTMipsError('invalid params')
 | |
|         # Spelling error: event_occured
 | |
|         topic: str = (
 | |
|             f'device/{did}/up/event_occured/'
 | |
|             f'{"#" if siid is None or eiid is None else f"{siid}/{eiid}"}')
 | |
|         return self.__unreg_broadcast(topic=topic)
 | |
| 
 | |
|     @final
 | |
|     def sub_device_state(
 | |
|         self, did: str, handler: Callable[[str, MIoTDeviceState, any], None],
 | |
|         handler_ctx: any = None
 | |
|     ) -> bool:
 | |
|         """subscribe online state."""
 | |
|         if not isinstance(did, str) or handler is None:
 | |
|             raise MIoTMipsError('invalid params')
 | |
|         topic: str = f'device/{did}/state/#'
 | |
| 
 | |
|         def on_state_msg(topic: str, payload: str, ctx: any) -> None:
 | |
|             msg: dict = json.loads(payload)
 | |
|             # {"device_id":"xxxx","device_name":"米家智能插座3   ","event":"online",
 | |
|             # "model": "cuco.plug.v3","timestamp":1709001070828,"uid":xxxx}
 | |
|             if msg is None or 'device_id' not in msg or 'event' not in msg:
 | |
|                 self.log_error(f'on_state_msg, recv unknown msg, {payload}')
 | |
|                 return
 | |
|             if msg['device_id'] != did:
 | |
|                 self.log_error(
 | |
|                     f'on_state_msg, err msg, {did}!={msg["device_id"]}')
 | |
|                 return
 | |
|             if handler:
 | |
|                 self.log_debug('cloud, device state changed, %s', payload)
 | |
|                 handler(
 | |
|                     did, MIoTDeviceState.ONLINE if msg['event'] == 'online'
 | |
|                     else MIoTDeviceState.OFFLINE, ctx)
 | |
|         return self.__reg_broadcast(
 | |
|             topic=topic, handler=on_state_msg, handler_ctx=handler_ctx)
 | |
| 
 | |
|     @final
 | |
|     def unsub_device_state(self, did: str) -> bool:
 | |
|         if not isinstance(did, str):
 | |
|             raise MIoTMipsError('invalid params')
 | |
|         topic: str = f'device/{did}/state/#'
 | |
|         return self.__unreg_broadcast(topic=topic)
 | |
| 
 | |
|     async def get_dev_list_async(
 | |
|         self, payload: str = None, timeout_ms: int = 10000
 | |
|     ) -> dict[str, dict]:
 | |
|         raise NotImplementedError('please call in http client')
 | |
| 
 | |
|     async def get_prop_async(
 | |
|         self, did: str, siid: int, piid: int,  timeout_ms: int = 10000
 | |
|     ) -> any:
 | |
|         raise NotImplementedError('please call in http client')
 | |
| 
 | |
|     async def set_prop_async(
 | |
|         self, did: str, siid: int, piid: int, value: any,
 | |
|         timeout_ms: int = 10000
 | |
|     ) -> bool:
 | |
|         raise NotImplementedError('please call in http client')
 | |
| 
 | |
|     async def action_async(
 | |
|         self, did: str, siid: int, aiid: int, in_list: list,
 | |
|         timeout_ms: int = 10000
 | |
|     ) -> tuple[bool, list]:
 | |
|         raise NotImplementedError('please call in http client')
 | |
| 
 | |
|     def __on_mips_cmd_handler(self, mips_cmd: MipsCmd) -> None:
 | |
|         """
 | |
|         NOTICE thread safe, this function will be called at the **mips** thread
 | |
|         """
 | |
|         if mips_cmd.type_ == MipsCmdType.REG_BROADCAST:
 | |
|             reg_bc: MipsRegBroadcast = mips_cmd.data
 | |
|             if not self._msg_matcher.get(topic=reg_bc.topic):
 | |
|                 sub_bc: MipsBroadcast = MipsBroadcast(
 | |
|                     topic=reg_bc.topic, handler=reg_bc.handler,
 | |
|                     handler_ctx=reg_bc.handler_ctx)
 | |
|                 self._msg_matcher[reg_bc.topic] = sub_bc
 | |
|                 self._mips_sub_internal(topic=reg_bc.topic)
 | |
|             else:
 | |
|                 self.log_debug(f'mips cloud re-reg broadcast, {reg_bc.topic}')
 | |
|         elif mips_cmd.type_ == MipsCmdType.UNREG_BROADCAST:
 | |
|             unreg_bc: MipsRegBroadcast = mips_cmd.data
 | |
|             if self._msg_matcher.get(topic=unreg_bc.topic):
 | |
|                 del self._msg_matcher[unreg_bc.topic]
 | |
|                 self._mips_unsub_internal(topic=unreg_bc.topic)
 | |
| 
 | |
|     def __reg_broadcast(
 | |
|         self, topic: str, handler: Callable[[str, str, any], None],
 | |
|         handler_ctx: any = None
 | |
|     ) -> bool:
 | |
|         return self._mips_send_cmd(
 | |
|             type_=MipsCmdType.REG_BROADCAST,
 | |
|             data=MipsRegBroadcast(
 | |
|                 topic=topic, handler=handler, handler_ctx=handler_ctx))
 | |
| 
 | |
|     def __unreg_broadcast(self, topic: str) -> bool:
 | |
|         return self._mips_send_cmd(
 | |
|             type_=MipsCmdType.UNREG_BROADCAST,
 | |
|             data=MipsRegBroadcast(topic=topic))
 | |
| 
 | |
|     def __on_mips_connect_handler(self, rc, props) -> None:
 | |
|         """sub topic."""
 | |
|         for topic, _ in list(
 | |
|                 self._msg_matcher.iter_all_nodes()):
 | |
|             self._mips_sub_internal(topic=topic)
 | |
| 
 | |
|     def __on_mips_disconnect_handler(self, rc, props) -> None:
 | |
|         """unsub topic."""
 | |
|         pass
 | |
| 
 | |
|     def __on_mips_message_handler(self, topic: str, payload) -> None:
 | |
|         """
 | |
|         NOTICE thread safe, this function will be called at the **mips** thread
 | |
|         """
 | |
|         # broadcast
 | |
|         bc_list: list[MipsBroadcast] = list(
 | |
|             self._msg_matcher.iter_match(topic))
 | |
|         if not bc_list:
 | |
|             return
 | |
|         # self.log_debug(f"on broadcast, {topic}, {payload}")
 | |
|         for item in bc_list or []:
 | |
|             if item.handler is None:
 | |
|                 continue
 | |
|             # NOTICE: call threadsafe
 | |
|             self.main_loop.call_soon_threadsafe(
 | |
|                 item.handler, topic, payload, item.handler_ctx)
 | |
| 
 | |
| 
 | |
| class MipsLocalClient(MipsClient):
 | |
|     """MIoT Pub/Sub Local Client."""
 | |
|     # pylint: disable=unused-argument
 | |
|     # pylint: disable=inconsistent-quotes
 | |
|     MIPS_RECONNECT_INTERVAL_MIN: int = 6000
 | |
|     MIPS_RECONNECT_INTERVAL_MAX: int = 60000
 | |
|     MIPS_SUB_PATCH: int = 1000
 | |
|     MIPS_SUB_INTERVAL: int = 100
 | |
|     _did: str
 | |
|     _group_id: str
 | |
|     _home_name: str
 | |
|     _mips_seed_id: int
 | |
|     _reply_topic: str
 | |
|     _dev_list_change_topic: str
 | |
|     _request_map: dict[str, MipsRequest]
 | |
|     _msg_matcher: MIoTMatcher
 | |
|     _device_state_sub_map: dict[str, MipsDeviceState]
 | |
|     _get_prop_queue: dict[str, list]
 | |
|     _get_prop_timer: asyncio.TimerHandle
 | |
|     _on_dev_list_changed: Callable[[any, list[str]], asyncio.Future]
 | |
| 
 | |
|     def __init__(
 | |
|         self, did: str, host: str, group_id: str,
 | |
|         ca_file: str, cert_file: str, key_file: str,
 | |
|         port: int = 8883, home_name: str = '',
 | |
|         loop: Optional[asyncio.AbstractEventLoop] = None
 | |
|     ) -> None:
 | |
|         self._did = did
 | |
|         self._group_id = group_id
 | |
|         self._home_name = home_name
 | |
|         self._mips_seed_id = random.randint(0, self.UINT32_MAX)
 | |
|         self._reply_topic = f'{did}/reply'
 | |
|         self._dev_list_change_topic = f'{did}/appMsg/devListChange'
 | |
|         self._request_map = {}
 | |
|         self._msg_matcher = MIoTMatcher()
 | |
|         self._device_state_sub_map = {}
 | |
|         self._get_prop_queue = {}
 | |
|         self._get_prop_timer = None
 | |
|         self._on_dev_list_changed = None
 | |
| 
 | |
|         super().__init__(
 | |
|             client_id=did, host=host, port=port,
 | |
|             ca_file=ca_file, cert_file=cert_file, key_file=key_file, loop=loop)
 | |
|         # MIPS local thread name use group_id
 | |
|         self._mips_thread.name = self._group_id
 | |
| 
 | |
|         self.on_mips_cmd = self.__on_mips_cmd_handler
 | |
|         self.on_mips_message = self.__on_mips_message_handler
 | |
|         self.on_mips_connect = self.__on_mips_connect_handler
 | |
| 
 | |
|     @property
 | |
|     def group_id(self) -> str:
 | |
|         return self._group_id
 | |
| 
 | |
|     def deinit(self) -> None:
 | |
|         self.mips_deinit()
 | |
|         self._did = None
 | |
|         self._mips_seed_id = None
 | |
|         self._reply_topic = None
 | |
|         self._dev_list_change_topic = None
 | |
|         self._request_map = None
 | |
|         self._msg_matcher = None
 | |
|         self._device_state_sub_map = None
 | |
|         self._get_prop_queue = None
 | |
|         self._get_prop_timer = None
 | |
|         self._on_dev_list_changed = None
 | |
| 
 | |
|         self.on_mips_cmd = None
 | |
|         self.on_mips_message = None
 | |
|         self.on_mips_connect = None
 | |
| 
 | |
|     def log_debug(self, msg, *args, **kwargs) -> None:
 | |
|         if self._logger:
 | |
|             self._logger.debug(f'{self._home_name}, '+msg, *args, **kwargs)
 | |
| 
 | |
|     def log_info(self, msg, *args, **kwargs) -> None:
 | |
|         if self._logger:
 | |
|             self._logger.info(f'{self._home_name}, '+msg, *args, **kwargs)
 | |
| 
 | |
|     def log_error(self, msg, *args, **kwargs) -> None:
 | |
|         if self._logger:
 | |
|             self._logger.error(f'{self._home_name}, '+msg, *args, **kwargs)
 | |
| 
 | |
|     @final
 | |
|     def connect(self) -> None:
 | |
|         self.mips_connect()
 | |
| 
 | |
|     @final
 | |
|     async def connect_async(self) -> None:
 | |
|         await self.mips_connect_async()
 | |
| 
 | |
|     @final
 | |
|     def disconnect(self) -> None:
 | |
|         self.mips_disconnect()
 | |
|         self._request_map = {}
 | |
|         self._msg_matcher = MIoTMatcher()
 | |
|         self._device_state_sub_map = {}
 | |
| 
 | |
|     @final
 | |
|     async def disconnect_async(self) -> None:
 | |
|         await self.mips_disconnect_async()
 | |
|         self._request_map = {}
 | |
|         self._msg_matcher = MIoTMatcher()
 | |
|         self._device_state_sub_map = {}
 | |
| 
 | |
|     @final
 | |
|     def sub_prop(
 | |
|         self, did: str, handler: Callable[[dict, any], None],
 | |
|         siid: int = None, piid: int = None, handler_ctx: any = None
 | |
|     ) -> bool:
 | |
|         topic: str = (
 | |
|             f'appMsg/notify/iot/{did}/property/'
 | |
|             f'{"#" if siid is None or piid is None else f"{siid}.{piid}"}')
 | |
| 
 | |
|         def on_prop_msg(topic: str, payload: str, ctx: any):
 | |
|             msg: dict = json.loads(payload)
 | |
|             if (
 | |
|                 msg is None
 | |
|                 or 'did' not in msg
 | |
|                 or 'siid' not in msg
 | |
|                 or 'piid' not in msg
 | |
|                 or 'value' not in msg
 | |
|             ):
 | |
|                 # self.log_error(f'on_prop_msg, recv unknown msg, {payload}')
 | |
|                 return
 | |
|             if handler:
 | |
|                 self.log_debug('local, on properties_changed, %s', payload)
 | |
|                 handler(msg, ctx)
 | |
|         return self.__reg_broadcast(
 | |
|             topic=topic, handler=on_prop_msg, handler_ctx=handler_ctx)
 | |
| 
 | |
|     @final
 | |
|     def unsub_prop(self, did: str, siid: int = None, piid: int = None) -> bool:
 | |
|         topic: str = (
 | |
|             f'appMsg/notify/iot/{did}/property/'
 | |
|             f'{"#" if siid is None or piid is None else f"{siid}.{piid}"}')
 | |
|         return self.__unreg_broadcast(topic=topic)
 | |
| 
 | |
|     @final
 | |
|     def sub_event(
 | |
|         self, did: str, handler: Callable[[dict, any], None],
 | |
|         siid: int = None, eiid: int = None, handler_ctx: any = None
 | |
|     ) -> bool:
 | |
|         topic: str = (
 | |
|             f'appMsg/notify/iot/{did}/event/'
 | |
|             f'{"#" if siid is None or eiid is None else f"{siid}.{eiid}"}')
 | |
| 
 | |
|         def on_event_msg(topic: str, payload: str, ctx: any):
 | |
|             msg: dict = json.loads(payload)
 | |
|             if (
 | |
|                 msg is None
 | |
|                 or 'did' not in msg
 | |
|                 or 'siid' not in msg
 | |
|                 or 'eiid' not in msg
 | |
|                 or 'arguments' not in msg
 | |
|             ):
 | |
|                 # self.log_error(f'on_event_msg, recv unknown msg, {payload}')
 | |
|                 return
 | |
|             if handler:
 | |
|                 self.log_debug('local, on event_occurred, %s', payload)
 | |
|                 handler(msg, ctx)
 | |
|         return self.__reg_broadcast(
 | |
|             topic=topic, handler=on_event_msg, handler_ctx=handler_ctx)
 | |
| 
 | |
|     @final
 | |
|     def unsub_event(self, did: str, siid: int = None, eiid: int = None) -> bool:
 | |
|         topic: str = (
 | |
|             f'appMsg/notify/iot/{did}/event/'
 | |
|             f'{"#" if siid is None or eiid is None else f"{siid}.{eiid}"}')
 | |
|         return self.__unreg_broadcast(topic=topic)
 | |
| 
 | |
|     @final
 | |
|     async def get_prop_safe_async(
 | |
|         self, did: str, siid: int, piid: int, timeout_ms: int = 10000
 | |
|     ) -> any:
 | |
|         self._get_prop_queue.setdefault(did, [])
 | |
|         fut: asyncio.Future = self.main_loop.create_future()
 | |
|         self._get_prop_queue[did].append({
 | |
|             'param': json.dumps({
 | |
|                 'did': did,
 | |
|                 'siid': siid,
 | |
|                 'piid': piid
 | |
|             }),
 | |
|             'fut': fut,
 | |
|             'timeout_ms': timeout_ms
 | |
|         })
 | |
|         if self._get_prop_timer is None:
 | |
|             self._get_prop_timer = self.main_loop.create_task(
 | |
|                 self.__get_prop_timer_handle())
 | |
|         return await fut
 | |
| 
 | |
|     @final
 | |
|     async def get_prop_async(
 | |
|         self, did: str, siid: int, piid: int, timeout_ms: int = 10000
 | |
|     ) -> any:
 | |
|         result_obj = await self.__request_async(
 | |
|             topic='proxy/get',
 | |
|             payload=json.dumps({
 | |
|                 'did': did,
 | |
|                 'siid': siid,
 | |
|                 'piid': piid
 | |
|             }),
 | |
|             timeout_ms=timeout_ms)
 | |
|         if not isinstance(result_obj, dict) or 'value' not in result_obj:
 | |
|             return None
 | |
|         return result_obj['value']
 | |
| 
 | |
|     @final
 | |
|     async def set_prop_async(
 | |
|         self, did: str, siid: int, piid: int, value: any,
 | |
|         timeout_ms: int = 10000
 | |
|     ) -> dict:
 | |
|         payload_obj: dict = {
 | |
|             'did': did,
 | |
|             'rpc': {
 | |
|                 'id': self.__gen_mips_id,
 | |
|                 'method': 'set_properties',
 | |
|                 'params': [{
 | |
|                     'did': did,
 | |
|                     'siid': siid,
 | |
|                     'piid': piid,
 | |
|                     'value': value
 | |
|                 }]
 | |
|             }
 | |
|         }
 | |
|         result_obj = await self.__request_async(
 | |
|             topic='proxy/rpcReq',
 | |
|             payload=json.dumps(payload_obj),
 | |
|             timeout_ms=timeout_ms)
 | |
|         if result_obj:
 | |
|             if (
 | |
|                 'result' in result_obj
 | |
|                 and len(result_obj['result']) == 1
 | |
|                 and 'did' in result_obj['result'][0]
 | |
|                 and result_obj['result'][0]['did'] == did
 | |
|                 and 'code' in result_obj['result'][0]
 | |
|             ):
 | |
|                 return result_obj['result'][0]
 | |
|             if 'error' in result_obj:
 | |
|                 return result_obj['error']
 | |
|         return {
 | |
|             'code': MIoTErrorCode.CODE_INTERNAL_ERROR.value,
 | |
|             'message': 'Invalid result'}
 | |
| 
 | |
|     @final
 | |
|     async def action_async(
 | |
|         self, did: str, siid: int, aiid: int, in_list: list,
 | |
|         timeout_ms: int = 10000
 | |
|     ) -> dict:
 | |
|         payload_obj: dict = {
 | |
|             'did': did,
 | |
|             'rpc': {
 | |
|                 'id': self.__gen_mips_id,
 | |
|                 'method': 'action',
 | |
|                 'params': {
 | |
|                     'did': did,
 | |
|                     'siid': siid,
 | |
|                     'aiid': aiid,
 | |
|                     'in': in_list
 | |
|                 }
 | |
|             }
 | |
|         }
 | |
|         result_obj = await self.__request_async(
 | |
|             topic='proxy/rpcReq', payload=json.dumps(payload_obj),
 | |
|             timeout_ms=timeout_ms)
 | |
|         if result_obj:
 | |
|             if 'result' in result_obj and 'code' in result_obj['result']:
 | |
|                 return result_obj['result']
 | |
|             if 'error' in result_obj:
 | |
|                 return result_obj['error']
 | |
|         return {
 | |
|             'code': MIoTErrorCode.CODE_INTERNAL_ERROR.value,
 | |
|             'message': 'Invalid result'}
 | |
| 
 | |
|     @final
 | |
|     async def get_dev_list_async(
 | |
|         self, payload: str = None, timeout_ms: int = 10000
 | |
|     ) -> dict[str, dict]:
 | |
|         result_obj = await self.__request_async(
 | |
|             topic='proxy/getDevList', payload=payload or '{}',
 | |
|             timeout_ms=timeout_ms)
 | |
|         if not result_obj or 'devList' not in result_obj:
 | |
|             return None
 | |
|         device_list = {}
 | |
|         for did, info in result_obj['devList'].items():
 | |
|             name: str = info.get('name', None)
 | |
|             urn: str = info.get('urn', None)
 | |
|             model: str = info.get('model', None)
 | |
|             if name is None or urn is None or model is None:
 | |
|                 self.log_error(f'invalid device info, {did}, {info}')
 | |
|                 continue
 | |
|             device_list[did] = {
 | |
|                 'did': did,
 | |
|                 'name': name,
 | |
|                 'urn': urn,
 | |
|                 'model': model,
 | |
|                 'online': info.get('online', False),
 | |
|                 'icon': info.get('icon', None),
 | |
|                 'fw_version': None,
 | |
|                 'home_id': '',
 | |
|                 'home_name': '',
 | |
|                 'room_id': info.get('roomId', ''),
 | |
|                 'room_name': info.get('roomName', ''),
 | |
|                 'specv2_access': info.get('specV2Access', False),
 | |
|                 'push_available': info.get('pushAvailable', False),
 | |
|                 'manufacturer': model.split('.')[0],
 | |
|             }
 | |
|         return device_list
 | |
| 
 | |
|     @final
 | |
|     async def get_action_group_list_async(
 | |
|         self, timeout_ms: int = 10000
 | |
|     ) -> list[str]:
 | |
|         result_obj = await self.__request_async(
 | |
|             topic='proxy/getMijiaActionGroupList',
 | |
|             payload='{}',
 | |
|             timeout_ms=timeout_ms)
 | |
|         if not result_obj or 'result' not in result_obj:
 | |
|             return None
 | |
|         return result_obj['result']
 | |
| 
 | |
|     @final
 | |
|     async def exec_action_group_list_async(
 | |
|         self, ag_id: str, timeout_ms: int = 10000
 | |
|     ) -> dict:
 | |
|         result_obj = await self.__request_async(
 | |
|             topic='proxy/execMijiaActionGroup',
 | |
|             payload=f'{{"id":"{ag_id}"}}',
 | |
|             timeout_ms=timeout_ms)
 | |
|         if result_obj:
 | |
|             if 'result' in result_obj:
 | |
|                 return result_obj['result']
 | |
|             if 'error' in result_obj:
 | |
|                 return result_obj['error']
 | |
|         return {
 | |
|             'code': MIoTErrorCode.CODE_MIPS_INVALID_RESULT.value,
 | |
|             'message': 'invalid result'}
 | |
| 
 | |
|     @final
 | |
|     @property
 | |
|     def on_dev_list_changed(self) -> Callable[[any, list[str]], asyncio.Future]:
 | |
|         return self._on_dev_list_changed
 | |
| 
 | |
|     @final
 | |
|     @on_dev_list_changed.setter
 | |
|     def on_dev_list_changed(
 | |
|         self, func: Callable[[any, list[str]], asyncio.Future]
 | |
|     ) -> None:
 | |
|         """run in main loop."""
 | |
|         self._on_dev_list_changed = func
 | |
| 
 | |
|     @final
 | |
|     def __on_mips_cmd_handler(self, mips_cmd: MipsCmd) -> None:
 | |
|         if mips_cmd.type_ == MipsCmdType.CALL_API:
 | |
|             req_data: MipsRequestData = mips_cmd.data
 | |
|             req = MipsRequest()
 | |
|             req.mid = self.__gen_mips_id
 | |
|             req.on_reply = req_data.on_reply
 | |
|             req.on_reply_ctx = req_data.on_reply_ctx
 | |
|             pub_topic: str = f'master/{req_data.topic}'
 | |
|             result = self.__mips_publish(
 | |
|                 topic=pub_topic, payload=req_data.payload, mid=req.mid,
 | |
|                 ret_topic=self._reply_topic)
 | |
|             self.log_debug(
 | |
|                 f'mips local call api, {result}, {req.mid}, {pub_topic}, '
 | |
|                 f'{req_data.payload}')
 | |
| 
 | |
|             def on_request_timeout(req: MipsRequest):
 | |
|                 self.log_error(
 | |
|                     f'on mips request timeout, {req.mid}, {pub_topic}'
 | |
|                     f', {req_data.payload}')
 | |
|                 self._request_map.pop(str(req.mid), None)
 | |
|                 req.on_reply(
 | |
|                     '{"error":{"code":-10006, "message":"timeout"}}',
 | |
|                     req.on_reply_ctx)
 | |
|             req.timer = self.mev_set_timeout(
 | |
|                 req_data.timeout_ms, on_request_timeout, req)
 | |
|             self._request_map[str(req.mid)] = req
 | |
|         elif mips_cmd.type_ == MipsCmdType.REG_BROADCAST:
 | |
|             reg_bc: MipsRegBroadcast = mips_cmd.data
 | |
|             sub_topic: str = f'{self._did}/{reg_bc.topic}'
 | |
|             if not self._msg_matcher.get(sub_topic):
 | |
|                 sub_bc: MipsBroadcast = MipsBroadcast(
 | |
|                     topic=sub_topic, handler=reg_bc.handler,
 | |
|                     handler_ctx=reg_bc.handler_ctx)
 | |
|                 self._msg_matcher[sub_topic] = sub_bc
 | |
|                 self._mips_sub_internal(topic=f'master/{reg_bc.topic}')
 | |
|             else:
 | |
|                 self.log_debug(f'mips re-reg broadcast, {sub_topic}')
 | |
|         elif mips_cmd.type_ == MipsCmdType.UNREG_BROADCAST:
 | |
|             unreg_bc: MipsRegBroadcast = mips_cmd.data
 | |
|             # Central hub gateway needs to add prefix
 | |
|             unsub_topic: str = f'{self._did}/{unreg_bc.topic}'
 | |
|             if self._msg_matcher.get(unsub_topic):
 | |
|                 del self._msg_matcher[unsub_topic]
 | |
|                 self._mips_unsub_internal(
 | |
|                     topic=re.sub(f'^{self._did}', 'master', unsub_topic))
 | |
|         elif mips_cmd.type_ == MipsCmdType.REG_DEVICE_STATE:
 | |
|             reg_dev_state: MipsRegDeviceState = mips_cmd.data
 | |
|             self._device_state_sub_map[reg_dev_state.did] = reg_dev_state
 | |
|             self.log_debug(
 | |
|                 f'mips local reg device state, {reg_dev_state.did}')
 | |
|         elif mips_cmd.type_ == MipsCmdType.UNREG_DEVICE_STATE:
 | |
|             unreg_dev_state: MipsRegDeviceState = mips_cmd.data
 | |
|             del self._device_state_sub_map[unreg_dev_state.did]
 | |
|             self.log_debug(
 | |
|                 f'mips local unreg device state, {unreg_dev_state.did}')
 | |
|         else:
 | |
|             self.log_error(
 | |
|                 f'mips local recv unknown cmd, {mips_cmd.type_}, '
 | |
|                 f'{mips_cmd.data}')
 | |
| 
 | |
|     def __on_mips_connect_handler(self, rc, props) -> None:
 | |
|         self.log_debug('__on_mips_connect_handler')
 | |
|         # Sub did/#, include reply topic
 | |
|         self._mips_sub_internal(f'{self._did}/#')
 | |
|         # Sub device list change
 | |
|         self._mips_sub_internal('master/appMsg/devListChange')
 | |
|         # Do not need to subscribe api topics, for they are covered by did/#
 | |
|         # Sub api topic.
 | |
|         # Sub broadcast topic
 | |
|         for topic, _ in list(self._msg_matcher.iter_all_nodes()):
 | |
|             self._mips_sub_internal(
 | |
|                 topic=re.sub(f'^{self._did}', 'master', topic))
 | |
| 
 | |
|     @final
 | |
|     def __on_mips_message_handler(self, topic: str, payload: bytes) -> None:
 | |
|         mips_msg: MipsMessage = MipsMessage.unpack(payload)
 | |
|         # self.log_debug(
 | |
|         #     f"mips local client, on_message, {topic} -> {mips_msg}")
 | |
|         # Reply
 | |
|         if topic == self._reply_topic:
 | |
|             self.log_debug(f'on request reply, {mips_msg}')
 | |
|             req: MipsRequest = self._request_map.pop(str(mips_msg.mid), None)
 | |
|             if req:
 | |
|                 # Cancel timer
 | |
|                 self.mev_clear_timeout(req.timer)
 | |
|                 if req.on_reply:
 | |
|                     self.main_loop.call_soon_threadsafe(
 | |
|                         req.on_reply, mips_msg.payload or '{}',
 | |
|                         req.on_reply_ctx)
 | |
|             return
 | |
|         # Broadcast
 | |
|         bc_list: list[MipsBroadcast] = list(self._msg_matcher.iter_match(
 | |
|             topic=topic))
 | |
|         if bc_list:
 | |
|             self.log_debug(f'on broadcast, {topic}, {mips_msg}')
 | |
|             for item in bc_list or []:
 | |
|                 if item.handler is None:
 | |
|                     continue
 | |
|                 self.main_loop.call_soon_threadsafe(
 | |
|                     item.handler, topic[topic.find('/')+1:],
 | |
|                     mips_msg.payload or '{}', item.handler_ctx)
 | |
|             return
 | |
|         # Device list change
 | |
|         if topic == self._dev_list_change_topic:
 | |
|             payload_obj: dict = json.loads(mips_msg.payload)
 | |
|             dev_list = payload_obj.get('devList', None)
 | |
|             if not isinstance(dev_list, list) or not dev_list:
 | |
|                 _LOGGER.error(
 | |
|                     'unknown devListChange msg, %s', mips_msg.payload)
 | |
|                 return
 | |
|             if self._on_dev_list_changed:
 | |
|                 self.main_loop.call_soon_threadsafe(
 | |
|                     self.main_loop.create_task,
 | |
|                     self._on_dev_list_changed(self, payload_obj['devList']))
 | |
|             return
 | |
| 
 | |
|         self.log_debug(
 | |
|             f'mips local client, recv unknown msg, {topic} -> {mips_msg}')
 | |
| 
 | |
|     @property
 | |
|     def __gen_mips_id(self) -> int:
 | |
|         mips_id: int = self._mips_seed_id
 | |
|         self._mips_seed_id = int((self._mips_seed_id+1) % self.UINT32_MAX)
 | |
|         return mips_id
 | |
| 
 | |
|     def __mips_publish(
 | |
|             self, topic: str, payload: str | bytes, mid: int = None,
 | |
|             ret_topic: str = None, wait_for_publish: bool = False,
 | |
|             timeout_ms: int = 10000
 | |
|     ) -> bool:
 | |
|         mips_msg: bytes = MipsMessage.pack(
 | |
|             mid=mid or self.__gen_mips_id, payload=payload,
 | |
|             msg_from='local', ret_topic=ret_topic)
 | |
|         return self._mips_publish_internal(
 | |
|             topic=topic.strip(), payload=mips_msg,
 | |
|             wait_for_publish=wait_for_publish, timeout_ms=timeout_ms)
 | |
| 
 | |
|     def __request(
 | |
|             self, topic: str, payload: str,
 | |
|             on_reply: Callable[[str, any], None],
 | |
|             on_reply_ctx: any = None, timeout_ms: int = 10000
 | |
|     ) -> bool:
 | |
|         if topic is None or payload is None or on_reply is None:
 | |
|             raise MIoTMipsError('invalid params')
 | |
|         req_data: MipsRequestData = MipsRequestData()
 | |
|         req_data.topic = topic
 | |
|         req_data.payload = payload
 | |
|         req_data.on_reply = on_reply
 | |
|         req_data.on_reply_ctx = on_reply_ctx
 | |
|         req_data.timeout_ms = timeout_ms
 | |
|         return self._mips_send_cmd(type_=MipsCmdType.CALL_API, data=req_data)
 | |
| 
 | |
|     def __reg_broadcast(
 | |
|         self, topic: str, handler: Callable[[str, str, any], None],
 | |
|         handler_ctx: any
 | |
|     ) -> bool:
 | |
|         return self._mips_send_cmd(
 | |
|             type_=MipsCmdType.REG_BROADCAST,
 | |
|             data=MipsRegBroadcast(
 | |
|                 topic=topic, handler=handler, handler_ctx=handler_ctx))
 | |
| 
 | |
|     def __unreg_broadcast(self, topic) -> bool:
 | |
|         return self._mips_send_cmd(
 | |
|             type_=MipsCmdType.UNREG_BROADCAST,
 | |
|             data=MipsRegBroadcast(topic=topic))
 | |
| 
 | |
|     @final
 | |
|     async def __request_async(
 | |
|         self, topic: str, payload: str, timeout_ms: int = 10000
 | |
|     ) -> dict:
 | |
|         fut_handler: asyncio.Future = self.main_loop.create_future()
 | |
| 
 | |
|         def on_msg_reply(payload: str, ctx: any):
 | |
|             fut: asyncio.Future = ctx
 | |
|             if fut:
 | |
|                 self.main_loop.call_soon_threadsafe(fut.set_result, payload)
 | |
|         if not self.__request(
 | |
|                 topic=topic,
 | |
|                 payload=payload,
 | |
|                 on_reply=on_msg_reply,
 | |
|                 on_reply_ctx=fut_handler,
 | |
|                 timeout_ms=timeout_ms):
 | |
|             # Request error
 | |
|             fut_handler.set_result('internal request error')
 | |
| 
 | |
|         result = await fut_handler
 | |
|         try:
 | |
|             return json.loads(result)
 | |
|         except json.JSONDecodeError:
 | |
|             return {
 | |
|                 'code': MIoTErrorCode.CODE_MIPS_INVALID_RESULT.value,
 | |
|                 'message': f'Error: {result}'}
 | |
| 
 | |
|     async def __get_prop_timer_handle(self) -> None:
 | |
|         for did in list(self._get_prop_queue.keys()):
 | |
|             item = self._get_prop_queue[did].pop()
 | |
|             _LOGGER.debug('get prop, %s, %s', did, item)
 | |
|             result_obj = await self.__request_async(
 | |
|                 topic='proxy/get',
 | |
|                 payload=item['param'],
 | |
|                 timeout_ms=item['timeout_ms'])
 | |
|             if result_obj is None or 'value' not in result_obj:
 | |
|                 item['fut'].set_result(None)
 | |
|             else:
 | |
|                 item['fut'].set_result(result_obj['value'])
 | |
| 
 | |
|             if not self._get_prop_queue[did]:
 | |
|                 self._get_prop_queue.pop(did, None)
 | |
| 
 | |
|         if self._get_prop_queue:
 | |
|             self._get_prop_timer = self.main_loop.call_later(
 | |
|                 0.1, lambda: self.main_loop.create_task(
 | |
|                     self.__get_prop_timer_handle()))
 | |
|         else:
 | |
|             self._get_prop_timer = None
 |