proxy.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. from __future__ import annotations
  2. from typing import Callable
  3. import asyncio
  4. import json
  5. import logging
  6. import urllib.request
  7. from aiohttp import ClientConnectionError
  8. from mautrix.util.logging import TraceLogger
  9. try:
  10. from aiohttp_socks import ProxyConnectionError, ProxyError, ProxyTimeoutError
  11. except ImportError:
  12. class ProxyError(Exception):
  13. pass
  14. ProxyConnectionError = ProxyTimeoutError = ProxyError
  15. RETRYABLE_PROXY_EXCEPTIONS = (
  16. ProxyError,
  17. ProxyTimeoutError,
  18. ProxyConnectionError,
  19. ClientConnectionError,
  20. ConnectionError,
  21. asyncio.TimeoutError,
  22. )
  23. class ProxyHandler:
  24. current_proxy_url: str | None = None
  25. log = logging.getLogger("mauigpapi.proxy")
  26. def __init__(self, api_url: str | None) -> None:
  27. self.api_url = api_url
  28. def get_proxy_url_from_api(self) -> str | None:
  29. assert self.api_url is not None
  30. request = urllib.request.Request(self.api_url, method="GET")
  31. self.log.debug("Requesting proxy from: %s", self.api_url)
  32. try:
  33. with urllib.request.urlopen(request) as f:
  34. response = json.loads(f.read().decode())
  35. except Exception:
  36. self.log.exception("Failed to retrieve proxy from API")
  37. else:
  38. return response["proxy_url"]
  39. return None
  40. def update_proxy_url(self) -> bool:
  41. old_proxy = self.current_proxy_url
  42. new_proxy = None
  43. if self.api_url is not None:
  44. new_proxy = self.get_proxy_url_from_api()
  45. else:
  46. new_proxy = urllib.request.getproxies().get("http")
  47. if old_proxy != new_proxy:
  48. self.log.debug("Set new proxy URL: %s", new_proxy)
  49. self.current_proxy_url = new_proxy
  50. return True
  51. self.log.debug("Got same proxy URL: %s", new_proxy)
  52. return False
  53. def get_proxy_url(self) -> str | None:
  54. if not self.current_proxy_url:
  55. self.update_proxy_url()
  56. return self.current_proxy_url
  57. async def proxy_with_retry(
  58. name: str,
  59. func: Callable,
  60. logger: TraceLogger,
  61. proxy_handler: ProxyHandler,
  62. on_proxy_change: Callable,
  63. max_retries: int = 10,
  64. ):
  65. errors = 0
  66. while True:
  67. try:
  68. return await func()
  69. except RETRYABLE_PROXY_EXCEPTIONS as e:
  70. errors += 1
  71. if errors > max_retries:
  72. raise
  73. wait = min(errors * 10, 60)
  74. logger.warning(
  75. "%s while trying to %s, retrying in %d seconds",
  76. e.__class__.__name__,
  77. name,
  78. wait,
  79. )
  80. if errors > 1 and proxy_handler.update_proxy_url():
  81. await on_proxy_change()