ソースを参照

Fix iris subscribe timeout and clear response waiters on disconnect

Tulir Asokan 3 年 前
コミット
e05ca5f1c6
1 ファイル変更16 行追加2 行削除
  1. 16 2
      mauigpapi/mqtt/conn.py

+ 16 - 2
mauigpapi/mqtt/conn.py

@@ -159,6 +159,20 @@ class AndroidMQTT:
         self._client.on_socket_register_write = self._on_socket_register_write
         self._client.on_socket_unregister_write = self._on_socket_unregister_write
 
+    def _clear_response_waiters(self) -> None:
+        for waiter in self._response_waiters.values():
+            if not waiter.done():
+                waiter.set_exception(
+                    MQTTNotConnected("MQTT disconnected before request returned response")
+                )
+        for waiter in self._publish_waiters.values():
+            if not waiter.done():
+                waiter.set_exception(
+                    MQTTNotConnected("MQTT disconnected before request was published")
+                )
+        self._response_waiters = {}
+        self._publish_waiters = {}
+
     def _form_client_id(self) -> bytes:
         subscribe_topics = [
             RealtimeTopic.PUBSUB,
@@ -239,7 +253,7 @@ class AndroidMQTT:
     def _on_disconnect_handler(self, client: MQTToTClient, _: Any, rc: int) -> None:
         err_str = "Generic error." if rc == pmc.MQTT_ERR_NOMEM else pmc.error_string(rc)
         self.log.debug(f"MQTT disconnection code %d: %s", rc, err_str)
-        # self._clear_response_waiters()
+        self._clear_response_waiters()
 
     async def _post_connect(self) -> None:
         await self._dispatch(Connect())
@@ -604,7 +618,7 @@ class AndroidMQTT:
             RealtimeTopic.SUB_IRIS,
             RealtimeTopic.SUB_IRIS_RESPONSE,
             {"seq_id": seq_id, "snapshot_at_ms": snapshot_at_ms},
-            timeout=20 * 1000,
+            timeout=20,
         )
         self.log.debug("Iris subscribe response: %s", resp.payload.decode("utf-8"))
         resp_dict = json.loads(resp.payload.decode("utf-8"))