|
@@ -83,6 +83,8 @@ INBOX_THREAD_REGEX = re.compile(r"/direct_v2/inbox/threads/([\w_]+)")
|
|
REQUEST_PUBLISH_TIMEOUT = 5
|
|
REQUEST_PUBLISH_TIMEOUT = 5
|
|
REQUEST_RESPONSE_TIMEOUT = 30
|
|
REQUEST_RESPONSE_TIMEOUT = 30
|
|
|
|
|
|
|
|
+RECONNECT_ATTEMPTS = 5
|
|
|
|
+
|
|
|
|
|
|
class AndroidMQTT:
|
|
class AndroidMQTT:
|
|
_loop: asyncio.AbstractEventLoop
|
|
_loop: asyncio.AbstractEventLoop
|
|
@@ -524,11 +526,20 @@ class AndroidMQTT:
|
|
# endregion
|
|
# endregion
|
|
|
|
|
|
async def _reconnect(self) -> None:
|
|
async def _reconnect(self) -> None:
|
|
- try:
|
|
|
|
- self.log.trace("Trying to reconnect to MQTT")
|
|
|
|
- self._client.reconnect()
|
|
|
|
- except (SocketError, OSError, pmc.WebsocketConnectionError) as e:
|
|
|
|
- raise MQTTReconnectionError("MQTT reconnection failed") from e
|
|
|
|
|
|
+ if self._client.is_connected():
|
|
|
|
+ self.log.debug("Trying to reconnect to MQTT (currently connected)")
|
|
|
|
+ else:
|
|
|
|
+ self.log.debug("Trying to reconnect to MQTT (currently not connected)")
|
|
|
|
+ attempts = 0
|
|
|
|
+ while True:
|
|
|
|
+ try:
|
|
|
|
+ self._client.reconnect()
|
|
|
|
+ return
|
|
|
|
+ except (SocketError, OSError, pmc.WebsocketConnectionError) as e:
|
|
|
|
+ self.log.exception("Error on attempt %d reconnecting to MQTT", attempts)
|
|
|
|
+ attempts += 1
|
|
|
|
+ if attempts > RECONNECT_ATTEMPTS:
|
|
|
|
+ raise MQTTReconnectionError("MQTT reconnection failed") from e
|
|
|
|
|
|
def add_event_handler(
|
|
def add_event_handler(
|
|
self, evt_type: Type[T], handler: Callable[[T], Awaitable[None]]
|
|
self, evt_type: Type[T], handler: Callable[[T], Awaitable[None]]
|
|
@@ -685,7 +696,7 @@ class AndroidMQTT:
|
|
await self.publish(topic, payload)
|
|
await self.publish(topic, payload)
|
|
except asyncio.TimeoutError:
|
|
except asyncio.TimeoutError:
|
|
self.log.warning("Publish timed out - try forcing reconnect")
|
|
self.log.warning("Publish timed out - try forcing reconnect")
|
|
- self._client.reconnect()
|
|
|
|
|
|
+ await self._reconnect()
|
|
except MQTTNotConnected:
|
|
except MQTTNotConnected:
|
|
self.log.warning(
|
|
self.log.warning(
|
|
"MQTT disconnected before PUBACK - wait a hot minute, we should get "
|
|
"MQTT disconnected before PUBACK - wait a hot minute, we should get "
|
|
@@ -783,7 +794,7 @@ class AndroidMQTT:
|
|
await self.publish(RealtimeTopic.SEND_MESSAGE, req)
|
|
await self.publish(RealtimeTopic.SEND_MESSAGE, req)
|
|
except asyncio.TimeoutError:
|
|
except asyncio.TimeoutError:
|
|
self.log.warning("Publish timed out - try forcing reconnect")
|
|
self.log.warning("Publish timed out - try forcing reconnect")
|
|
- self._client.reconnect()
|
|
|
|
|
|
+ await self._reconnect()
|
|
except MQTTNotConnected:
|
|
except MQTTNotConnected:
|
|
self.log.warning(
|
|
self.log.warning(
|
|
"MQTT disconnected before PUBACK - wait a hot minute, we should get "
|
|
"MQTT disconnected before PUBACK - wait a hot minute, we should get "
|