|
@@ -30,7 +30,7 @@ import paho.mqtt.client as pmc
|
|
|
|
|
|
from mautrix.util import background_task
|
|
from mautrix.util import background_task
|
|
from mautrix.util.logging import TraceLogger
|
|
from mautrix.util.logging import TraceLogger
|
|
-from mautrix.util.proxy import ProxyHandler
|
|
|
|
|
|
+from mautrix.util.proxy import ProxyHandler, proxy_with_retry
|
|
|
|
|
|
from ..errors import (
|
|
from ..errors import (
|
|
IrisSubscribeError,
|
|
IrisSubscribeError,
|
|
@@ -262,7 +262,7 @@ class AndroidMQTT:
|
|
|
|
|
|
def _on_disconnect_handler(self, client: MQTToTClient, _: Any, rc: int) -> None:
|
|
def _on_disconnect_handler(self, client: MQTToTClient, _: Any, rc: int) -> None:
|
|
err_str = "Generic error." if rc == pmc.MQTT_ERR_NOMEM else pmc.error_string(rc)
|
|
err_str = "Generic error." if rc == pmc.MQTT_ERR_NOMEM else pmc.error_string(rc)
|
|
- self.log.debug(f"MQTT disconnection code %d: %s", rc, err_str)
|
|
|
|
|
|
+ self.log.debug("MQTT disconnection code %d: %s", rc, err_str)
|
|
self._clear_publish_waiters()
|
|
self._clear_publish_waiters()
|
|
|
|
|
|
async def _post_connect(self) -> None:
|
|
async def _post_connect(self) -> None:
|
|
@@ -561,17 +561,6 @@ class AndroidMQTT:
|
|
finally:
|
|
finally:
|
|
self.log.debug(f"Dispatcher loop {loop_id} stopped")
|
|
self.log.debug(f"Dispatcher loop {loop_id} stopped")
|
|
|
|
|
|
- async def update_proxy_and_sleep(self, retry, reason):
|
|
|
|
- sleep = retry * 2
|
|
|
|
- if retry > 1:
|
|
|
|
- if self.proxy_handler and self.proxy_handler.update_proxy_url(reason):
|
|
|
|
- self.setup_proxy()
|
|
|
|
- await self._dispatch(ProxyUpdate())
|
|
|
|
- await self._dispatch(
|
|
|
|
- Disconnect(reason=f"MQTT Error: no connection, retrying in {sleep} seconds")
|
|
|
|
- )
|
|
|
|
- await asyncio.sleep(sleep)
|
|
|
|
-
|
|
|
|
async def listen(
|
|
async def listen(
|
|
self,
|
|
self,
|
|
graphql_subs: set[str] | None = None,
|
|
graphql_subs: set[str] | None = None,
|
|
@@ -586,53 +575,56 @@ class AndroidMQTT:
|
|
self._iris_snapshot_at_ms = snapshot_at_ms
|
|
self._iris_snapshot_at_ms = snapshot_at_ms
|
|
|
|
|
|
self.log.debug("Connecting to Instagram MQTT")
|
|
self.log.debug("Connecting to Instagram MQTT")
|
|
- await self._reconnect()
|
|
|
|
connection_retries = 0
|
|
connection_retries = 0
|
|
|
|
|
|
- while True:
|
|
|
|
|
|
+ async def setup_connection():
|
|
|
|
+ await self._reconnect()
|
|
|
|
+
|
|
try:
|
|
try:
|
|
await asyncio.sleep(1)
|
|
await asyncio.sleep(1)
|
|
except asyncio.CancelledError:
|
|
except asyncio.CancelledError:
|
|
self.disconnect()
|
|
self.disconnect()
|
|
# this might not be necessary
|
|
# this might not be necessary
|
|
self._client.loop_misc()
|
|
self._client.loop_misc()
|
|
- break
|
|
|
|
|
|
+ return
|
|
rc = self._client.loop_misc()
|
|
rc = self._client.loop_misc()
|
|
|
|
|
|
# If disconnect() has been called
|
|
# If disconnect() has been called
|
|
# Beware, internal API, may have to change this to something more stable!
|
|
# Beware, internal API, may have to change this to something more stable!
|
|
if self._client._state == pmc.mqtt_cs_disconnecting:
|
|
if self._client._state == pmc.mqtt_cs_disconnecting:
|
|
- break # Stop listening
|
|
|
|
|
|
+ return # Stop listening
|
|
|
|
|
|
if rc != pmc.MQTT_ERR_SUCCESS:
|
|
if rc != pmc.MQTT_ERR_SUCCESS:
|
|
# If known/expected error
|
|
# If known/expected error
|
|
if rc == pmc.MQTT_ERR_CONN_LOST:
|
|
if rc == pmc.MQTT_ERR_CONN_LOST:
|
|
await self._dispatch(Disconnect(reason="Connection lost, retrying"))
|
|
await self._dispatch(Disconnect(reason="Connection lost, retrying"))
|
|
|
|
+ raise MQTTNotConnected
|
|
elif rc == pmc.MQTT_ERR_NOMEM:
|
|
elif rc == pmc.MQTT_ERR_NOMEM:
|
|
# This error is wrongly classified
|
|
# This error is wrongly classified
|
|
# See https://github.com/eclipse/paho.mqtt.python/issues/340
|
|
# See https://github.com/eclipse/paho.mqtt.python/issues/340
|
|
await self._dispatch(Disconnect(reason="Connection lost, retrying"))
|
|
await self._dispatch(Disconnect(reason="Connection lost, retrying"))
|
|
|
|
+ raise MQTTNotConnected
|
|
elif rc == pmc.MQTT_ERR_CONN_REFUSED:
|
|
elif rc == pmc.MQTT_ERR_CONN_REFUSED:
|
|
raise MQTTNotLoggedIn("MQTT connection refused")
|
|
raise MQTTNotLoggedIn("MQTT connection refused")
|
|
elif rc == pmc.MQTT_ERR_NO_CONN:
|
|
elif rc == pmc.MQTT_ERR_NO_CONN:
|
|
- if connection_retries > retry_limit:
|
|
|
|
- raise MQTTNotConnected(f"Connection failed {connection_retries} times")
|
|
|
|
- await self.update_proxy_and_sleep(connection_retries, "MQTT_ERR_NO_CONN")
|
|
|
|
|
|
+ raise MQTTNotConnected(f"Connection failed {connection_retries} times")
|
|
else:
|
|
else:
|
|
err = pmc.error_string(rc)
|
|
err = pmc.error_string(rc)
|
|
self.log.error("MQTT Error: %s", err)
|
|
self.log.error("MQTT Error: %s", err)
|
|
await self._dispatch(Disconnect(reason=f"MQTT Error: {err}, retrying"))
|
|
await self._dispatch(Disconnect(reason=f"MQTT Error: {err}, retrying"))
|
|
|
|
+ raise MQTTNotConnected
|
|
|
|
+
|
|
|
|
+ await proxy_with_retry(
|
|
|
|
+ "mqtt.listen",
|
|
|
|
+ lambda: setup_connection(),
|
|
|
|
+ logger=self.log,
|
|
|
|
+ proxy_handler=self.proxy_handler,
|
|
|
|
+ on_proxy_change=lambda: self._dispatch(ProxyUpdate()),
|
|
|
|
+ max_retries=retry_limit,
|
|
|
|
+ min_wait_seconds=10,
|
|
|
|
+ retryable_exceptions=(MQTTNotConnected, MQTTReconnectionError),
|
|
|
|
+ )
|
|
|
|
|
|
- try:
|
|
|
|
- await self._reconnect()
|
|
|
|
- except MQTTReconnectionError:
|
|
|
|
- if connection_retries > retry_limit:
|
|
|
|
- raise
|
|
|
|
- await self.update_proxy_and_sleep(connection_retries, "MQTTReconnectionError")
|
|
|
|
-
|
|
|
|
- connection_retries += 1
|
|
|
|
- else:
|
|
|
|
- connection_retries = 0
|
|
|
|
if self._event_dispatcher_task:
|
|
if self._event_dispatcher_task:
|
|
self._event_dispatcher_task.cancel()
|
|
self._event_dispatcher_task.cancel()
|
|
self._event_dispatcher_task = None
|
|
self._event_dispatcher_task = None
|