Ver código fonte

Use new wrapper for creating background tasks

Tulir Asokan 2 anos atrás
pai
commit
eff6c2e078

+ 3 - 2
mauigpapi/mqtt/conn.py

@@ -28,6 +28,7 @@ import zlib
 from yarl import URL
 import paho.mqtt.client as pmc
 
+from mautrix.util import background_task
 from mautrix.util.logging import TraceLogger
 
 from ..errors import (
@@ -402,7 +403,7 @@ class AndroidMQTT:
                 self.log.trace(f"Got new seq_id: {parsed_item.seq_id}")
                 self._iris_seq_id = parsed_item.seq_id
                 self._iris_snapshot_at_ms = int(time.time() * 1000)
-                asyncio.create_task(
+                background_task.create(
                     self._dispatch(NewSequenceID(self._iris_seq_id, self._iris_snapshot_at_ms))
                 )
             for part in parsed_item.data:
@@ -698,7 +699,7 @@ class AndroidMQTT:
             self.log.info(f"Latest sequence ID is {latest_seq_id}, catching up from {seq_id}")
             self._iris_seq_id = latest_seq_id
             self._iris_snapshot_at_ms = resp_dict.get("subscribed_at_ms", int(time.time() * 1000))
-            asyncio.create_task(
+            background_task.create(
                 self._dispatch(NewSequenceID(self._iris_seq_id, self._iris_snapshot_at_ms))
             )
 

+ 4 - 4
mautrix_instagram/portal.py

@@ -85,7 +85,7 @@ from mautrix.types import (
     UserID,
     VideoInfo,
 )
-from mautrix.util import ffmpeg
+from mautrix.util import background_task, ffmpeg
 from mautrix.util.message_send_checkpoint import MessageSendCheckpointStatus
 
 from . import matrix as m, puppet as p, user as u
@@ -240,7 +240,7 @@ class Portal(DBPortal, BasePortal):
             event_type=event_type,
             message_type=msgtype,
         )
-        asyncio.create_task(self._send_message_status(event_id, err=None))
+        background_task.create(self._send_message_status(event_id, err=None))
         await self._send_delivery_receipt(event_id)
 
     async def _send_bridge_error(
@@ -274,7 +274,7 @@ class Portal(DBPortal, BasePortal):
                     body=f"\u26a0 Your {event_type_str} {error_type} bridged: {str(err)}",
                 ),
             )
-        asyncio.create_task(self._send_message_status(event_id, err))
+        background_task.create(self._send_message_status(event_id, err))
 
     async def _send_message_status(self, event_id: EventID, err: Exception | None) -> None:
         if not self.config["bridge.message_status_events"]:
@@ -1558,7 +1558,7 @@ class Portal(DBPortal, BasePortal):
                 await self.enqueue_immediate_backfill(source, 0)
 
         intent = sender.intent_for(self)
-        asyncio.create_task(intent.set_typing(self.mxid, timeout=0))
+        background_task.create(intent.set_typing(self.mxid, timeout=0))
         event_ids = []
         for event_type, content in await self.convert_instagram_item(source, sender, item):
             event_ids.append(

+ 5 - 4
mautrix_instagram/user.py

@@ -61,6 +61,7 @@ from mauigpapi.types.direct_inbox import DMInbox, DMInboxResponse
 from mautrix.appservice import AppService
 from mautrix.bridge import BaseUser, async_getter_lock
 from mautrix.types import EventID, MessageType, RoomID, TextMessageEventContent, UserID
+from mautrix.util import background_task
 from mautrix.util.bridge_state import BridgeState, BridgeStateEvent
 from mautrix.util.logging import TraceLogger
 from mautrix.util.opt_prometheus import Gauge, Summary, async_time
@@ -937,7 +938,7 @@ class User(DBUser, BaseUser):
                 )
             else:
                 self.log.warning(f"Got IrisSubscribeError {e}, refreshing...")
-                asyncio.create_task(self.refresh())
+                background_task.create(self.refresh())
         except MQTTReconnectionError as e:
             self.log.warning(
                 f"Unexpected connection error: {e}, reconnecting in 1 minute", exc_info=True
@@ -949,7 +950,7 @@ class User(DBUser, BaseUser):
                 error_code="ig-connection-error-socket",
             )
             self.mqtt.disconnect()
-            asyncio.create_task(self.delayed_start_listen(sleep=60))
+            background_task.create(self.delayed_start_listen(sleep=60))
         except (MQTTNotConnected, MQTTNotLoggedIn, MQTTConnectionUnauthorized) as e:
             self.log.warning(f"Unexpected connection error: {e}, checking auth and reconnecting")
             await self.send_bridge_notice(
@@ -959,7 +960,7 @@ class User(DBUser, BaseUser):
                 error_code="ig-connection-error-maybe-auth",
             )
             self.mqtt.disconnect()
-            asyncio.create_task(self.fetch_user_and_reconnect())
+            background_task.create(self.fetch_user_and_reconnect())
         except Exception as e:
             self.log.exception("Fatal error in listener, reconnecting in 5 minutes")
             await self.send_bridge_notice(
@@ -970,7 +971,7 @@ class User(DBUser, BaseUser):
                 info={"python_error": str(e)},
             )
             self.mqtt.disconnect()
-            asyncio.create_task(self.fetch_user_and_reconnect(sleep_first=300))
+            background_task.create(self.fetch_user_and_reconnect(sleep_first=300))
         else:
             if not self.shutdown:
                 await self.send_bridge_notice(

+ 3 - 2
mautrix_instagram/web/segment.py

@@ -1,11 +1,12 @@
 from __future__ import annotations
 
-import asyncio
 import logging
 
 from yarl import URL
 import aiohttp
 
+from mautrix.util import background_task
+
 from .. import user as u
 
 log = logging.getLogger("mau.web.public.analytics")
@@ -30,7 +31,7 @@ async def _track(user: u.User, event: str, properties: dict) -> None:
 
 def track(user: u.User, event: str, properties: dict | None = None):
     if segment_key:
-        asyncio.create_task(_track(user, event, properties or {}))
+        background_task.create(_track(user, event, properties or {}))
 
 
 def init(key, user_id: str | None = None):

+ 1 - 1
requirements.txt

@@ -4,7 +4,7 @@ commonmark>=0.8,<0.10
 aiohttp>=3,<4
 yarl>=1,<2
 attrs>=20.1
-mautrix>=0.19.3,<0.20
+mautrix>=0.19.4,<0.20
 asyncpg>=0.20,<0.28
 pycryptodome>=3,<4
 paho-mqtt>=1.5,<2