rpc.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. # Copyright (c) 2020 Tulir Asokan
  2. #
  3. # This Source Code Form is subject to the terms of the Mozilla Public
  4. # License, v. 2.0. If a copy of the MPL was not distributed with this
  5. # file, You can obtain one at http://mozilla.org/MPL/2.0/.
  6. from typing import Optional, Dict, List, Callable, Awaitable, Any, Tuple
  7. from uuid import UUID, uuid4
  8. import asyncio
  9. import logging
  10. import json
  11. from mautrix.util.logging import TraceLogger
  12. from .errors import NotConnected, UnexpectedError, UnexpectedResponse, make_response_error
  13. EventHandler = Callable[[Dict[str, Any]], Awaitable[None]]
  14. # These are synthetic RPC events for registering callbacks on socket
  15. # connect and disconnect.
  16. CONNECT_EVENT = "_socket_connected"
  17. DISCONNECT_EVENT = "_socket_disconnected"
  18. class SignaldRPCClient:
  19. loop: asyncio.AbstractEventLoop
  20. log: TraceLogger
  21. socket_path: str
  22. _reader: Optional[asyncio.StreamReader]
  23. _writer: Optional[asyncio.StreamWriter]
  24. is_connected: bool
  25. _connect_future: asyncio.Future
  26. _communicate_task: Optional[asyncio.Task]
  27. _response_waiters: Dict[UUID, asyncio.Future]
  28. _rpc_event_handlers: Dict[str, List[EventHandler]]
  29. def __init__(self, socket_path: str, log: Optional[TraceLogger] = None,
  30. loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
  31. self.socket_path = socket_path
  32. self.log = log or logging.getLogger("mausignald")
  33. self.loop = loop or asyncio.get_event_loop()
  34. self._reader = None
  35. self._writer = None
  36. self._communicate_task = None
  37. self.is_connected = False
  38. self._connect_future = self.loop.create_future()
  39. self._response_waiters = {}
  40. self._rpc_event_handlers = {CONNECT_EVENT: [], DISCONNECT_EVENT: []}
  41. self.add_rpc_handler(DISCONNECT_EVENT, self._abandon_responses)
  42. async def wait_for_connected(self, timeout: Optional[int] = None) -> bool:
  43. if self.is_connected:
  44. return True
  45. await asyncio.wait_for(asyncio.shield(self._connect_future), timeout)
  46. return self.is_connected
  47. async def connect(self) -> None:
  48. if self._writer is not None:
  49. return
  50. self._communicate_task = asyncio.create_task(self._communicate_forever())
  51. await self._connect_future
  52. async def _communicate_forever(self) -> None:
  53. while True:
  54. try:
  55. self._reader, self._writer = await asyncio.open_unix_connection(self.socket_path)
  56. except OSError as e:
  57. self.log.error(f"Connection to {self.socket_path} failed: {e}")
  58. await asyncio.sleep(5)
  59. continue
  60. read_loop = asyncio.create_task(self._try_read_loop())
  61. self.is_connected = True
  62. await self._run_rpc_handler(CONNECT_EVENT, {})
  63. self._connect_future.set_result(True)
  64. await read_loop
  65. self.is_connected = False
  66. await self._run_rpc_handler(DISCONNECT_EVENT, {})
  67. self._connect_future = self.loop.create_future()
  68. async def disconnect(self) -> None:
  69. if self._writer is not None:
  70. self._writer.write_eof()
  71. await self._writer.drain()
  72. if self._communicate_task:
  73. self._communicate_task.cancel()
  74. self._communicate_task = None
  75. self._writer = None
  76. self._reader = None
  77. self.is_connected = False
  78. self._connect_future = self.loop.create_future()
  79. def add_rpc_handler(self, method: str, handler: EventHandler) -> None:
  80. self._rpc_event_handlers.setdefault(method, []).append(handler)
  81. def remove_rpc_handler(self, method: str, handler: EventHandler) -> None:
  82. self._rpc_event_handlers.setdefault(method, []).remove(handler)
  83. async def _run_rpc_handler(self, command: str, req: Dict[str, Any]) -> None:
  84. try:
  85. handlers = self._rpc_event_handlers[command]
  86. except KeyError:
  87. self.log.warning("No handlers for RPC request %s", command)
  88. self.log.trace("Data unhandled request: %s", req)
  89. else:
  90. for handler in handlers:
  91. try:
  92. await handler(req)
  93. except Exception:
  94. self.log.exception("Exception in RPC event handler")
  95. def _run_response_handlers(self, req_id: UUID, command: str, req: Any) -> None:
  96. try:
  97. waiter = self._response_waiters.pop(req_id)
  98. except KeyError:
  99. self.log.debug(f"Nobody waiting for response to {req_id}")
  100. return
  101. data = req.get("data")
  102. if command == "unexpected_error":
  103. try:
  104. waiter.set_exception(UnexpectedError(data["message"]))
  105. except KeyError:
  106. waiter.set_exception(UnexpectedError("Unexpected error with no message"))
  107. # elif data and "error" in data and isinstance(data["error"], (str, dict)):
  108. # waiter.set_exception(make_response_error(data))
  109. elif "error" in req and isinstance(req["error"], (str, dict)):
  110. waiter.set_exception(make_response_error(req))
  111. else:
  112. waiter.set_result((command, data))
  113. async def _handle_incoming_line(self, line: str) -> None:
  114. try:
  115. req = json.loads(line)
  116. except json.JSONDecodeError:
  117. self.log.debug(f"Got non-JSON data from server: {line}")
  118. return
  119. try:
  120. req_type = req["type"]
  121. except KeyError:
  122. self.log.debug(f"Got invalid request from server: {line}")
  123. return
  124. self.log.trace("Got data from server: %s", req)
  125. req_id = req.get("id")
  126. if req_id is None:
  127. asyncio.create_task(self._run_rpc_handler(req_type, req))
  128. else:
  129. self._run_response_handlers(UUID(req_id), req_type, req)
  130. async def _try_read_loop(self) -> None:
  131. try:
  132. await self._read_loop()
  133. except Exception:
  134. self.log.exception("Fatal error in read loop")
  135. async def _read_loop(self) -> None:
  136. while self._reader is not None and not self._reader.at_eof():
  137. line = await self._reader.readline()
  138. if not line:
  139. continue
  140. try:
  141. line_str = line.decode("utf-8")
  142. except UnicodeDecodeError:
  143. self.log.exception("Got non-unicode request from server: %s", line)
  144. continue
  145. try:
  146. await self._handle_incoming_line(line_str)
  147. except Exception:
  148. self.log.exception("Failed to handle incoming request %s", line_str)
  149. self.log.debug("Reader disconnected")
  150. self._reader = None
  151. self._writer = None
  152. def _create_request(self, command: str, req_id: Optional[UUID] = None, **data: Any
  153. ) -> Tuple[asyncio.Future, Dict[str, Any]]:
  154. req_id = req_id or uuid4()
  155. req = {"id": str(req_id), "type": command, **data}
  156. self.log.trace("Request %s: %s %s", req_id, command, data)
  157. return self._wait_response(req_id), req
  158. def _wait_response(self, req_id: UUID) -> asyncio.Future:
  159. try:
  160. future = self._response_waiters[req_id]
  161. except KeyError:
  162. future = self._response_waiters[req_id] = self.loop.create_future()
  163. return future
  164. async def _abandon_responses(self, unused_data: Dict[str, Any]) -> None:
  165. for req_id, waiter in self._response_waiters.items():
  166. if not waiter.done():
  167. self.log.trace(f"Abandoning response for {req_id}")
  168. waiter.set_exception(
  169. NotConnected("Disconnected from signald before RPC completed"))
  170. async def _send_request(self, data: Dict[str, Any]) -> None:
  171. if self._writer is None:
  172. raise NotConnected("Not connected to signald")
  173. self._writer.write(json.dumps(data).encode("utf-8"))
  174. self._writer.write(b"\n")
  175. await self._writer.drain()
  176. self.log.trace("Sent data to server server: %s", data)
  177. async def _raw_request(self, command: str, req_id: Optional[UUID] = None, **data: Any
  178. ) -> Tuple[str, Dict[str, Any]]:
  179. future, data = self._create_request(command, req_id, **data)
  180. await self._send_request(data)
  181. return await asyncio.shield(future)
  182. async def request(self, command: str, expected_response: str, **data: Any) -> Any:
  183. resp_type, resp_data = await self._raw_request(command, **data)
  184. if resp_type != expected_response:
  185. raise UnexpectedResponse(resp_type, resp_data)
  186. return resp_data
  187. async def request_v1(self, command: str, **data: Any) -> Any:
  188. return await self.request(command, expected_response=command, version="v1", **data)