provisioning_api.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. # mautrix-signal - A Matrix-Signal puppeting bridge
  2. # Copyright (C) 2020 Tulir Asokan
  3. #
  4. # This program is free software: you can redistribute it and/or modify
  5. # it under the terms of the GNU Affero General Public License as published by
  6. # the Free Software Foundation, either version 3 of the License, or
  7. # (at your option) any later version.
  8. #
  9. # This program is distributed in the hope that it will be useful,
  10. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12. # GNU Affero General Public License for more details.
  13. #
  14. # You should have received a copy of the GNU Affero General Public License
  15. # along with this program. If not, see <https://www.gnu.org/licenses/>.
  16. from typing import Awaitable, Dict
  17. import logging
  18. import json
  19. from aiohttp import web
  20. from mautrix.types import UserID
  21. from mautrix.util.logging import TraceLogger
  22. from .. import user as u
  23. class ProvisioningAPI:
  24. log: TraceLogger = logging.getLogger("mau.web.provisioning")
  25. app: web.Application
  26. def __init__(self, shared_secret: str) -> None:
  27. self.app = web.Application()
  28. self.shared_secret = shared_secret
  29. self.app.router.add_get("/api/whoami", self.status)
  30. self.app.router.add_options("/api/login", self.login_options)
  31. self.app.router.add_post("/api/login", self.login)
  32. self.app.router.add_post("/api/logout", self.logout)
  33. @property
  34. def _acao_headers(self) -> Dict[str, str]:
  35. return {
  36. "Access-Control-Allow-Origin": "*",
  37. "Access-Control-Allow-Headers": "Authorization, Content-Type",
  38. "Access-Control-Allow-Methods": "POST, OPTIONS",
  39. }
  40. @property
  41. def _headers(self) -> Dict[str, str]:
  42. return {
  43. **self._acao_headers,
  44. "Content-Type": "application/json",
  45. }
  46. async def login_options(self, _: web.Request) -> web.Response:
  47. return web.Response(status=200, headers=self._headers)
  48. def check_token(self, request: web.Request) -> Awaitable['u.User']:
  49. try:
  50. token = request.headers["Authorization"]
  51. token = token[len("Bearer "):]
  52. except KeyError:
  53. raise web.HTTPBadRequest(body='{"error": "Missing Authorization header"}',
  54. headers=self._headers)
  55. except IndexError:
  56. raise web.HTTPBadRequest(body='{"error": "Malformed Authorization header"}',
  57. headers=self._headers)
  58. if token != self.shared_secret:
  59. raise web.HTTPForbidden(body='{"error": "Invalid token"}', headers=self._headers)
  60. try:
  61. user_id = request.query["user_id"]
  62. except KeyError:
  63. raise web.HTTPBadRequest(body='{"error": "Missing user_id query param"}',
  64. headers=self._headers)
  65. return u.User.get_by_mxid(UserID(user_id))
  66. async def status(self, request: web.Request) -> web.Response:
  67. user = await self.check_token(request)
  68. data = {
  69. "permissions": user.permission_level,
  70. "mxid": user.mxid,
  71. "twitter": None,
  72. }
  73. if await user.is_logged_in():
  74. data["twitter"] = (await user.get_info()).serialize()
  75. return web.json_response(data, headers=self._acao_headers)
  76. async def login(self, request: web.Request) -> web.Response:
  77. user = await self.check_token(request)
  78. try:
  79. data = await request.json()
  80. except json.JSONDecodeError:
  81. raise web.HTTPBadRequest(body='{"error": "Malformed JSON"}', headers=self._headers)
  82. try:
  83. auth_token = data["auth_token"]
  84. csrf_token = data["csrf_token"]
  85. except KeyError:
  86. raise web.HTTPBadRequest(body='{"error": "Missing keys"}', headers=self._headers)
  87. try:
  88. await user.connect(auth_token=auth_token, csrf_token=csrf_token)
  89. except Exception:
  90. self.log.debug("Failed to log in", exc_info=True)
  91. raise web.HTTPUnauthorized(body='{"error": "Twitter authorization failed"}',
  92. headers=self._headers)
  93. return web.Response(body='{}', status=200, headers=self._headers)
  94. async def logout(self, request: web.Request) -> web.Response:
  95. user = await self.check_token(request)
  96. await user.logout()
  97. return web.json_response({}, headers=self._acao_headers)