proxy.py 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. from __future__ import annotations
  2. import asyncio
  3. import json
  4. import logging
  5. import urllib.request
  6. from aiohttp import ClientConnectionError
  7. try:
  8. from aiohttp_socks import ProxyConnectionError, ProxyError, ProxyTimeoutError
  9. except ImportError:
  10. class ProxyError(Exception):
  11. pass
  12. ProxyConnectionError = ProxyTimeoutError = ProxyError
  13. RETRYABLE_PROXY_EXCEPTIONS = (
  14. ProxyError,
  15. ProxyTimeoutError,
  16. ProxyConnectionError,
  17. ClientConnectionError,
  18. ConnectionError,
  19. asyncio.TimeoutError,
  20. )
  21. class ProxyHandler:
  22. current_proxy_url: str | None = None
  23. log = logging.getLogger("mauigpapi.proxy")
  24. def __init__(self, api_url: str | None) -> None:
  25. self.api_url = api_url
  26. def get_proxy_url_from_api(self) -> str | None:
  27. assert self.api_url is not None
  28. request = urllib.request.Request(self.api_url, method="GET")
  29. self.log.debug("Requesting proxy from: %s", self.api_url)
  30. try:
  31. with urllib.request.urlopen(request) as f:
  32. response = json.loads(f.read().decode())
  33. except Exception:
  34. self.log.exception("Failed to retrieve proxy from API")
  35. else:
  36. return response["proxy_url"]
  37. return None
  38. def update_proxy_url(self) -> bool:
  39. old_proxy = self.current_proxy_url
  40. new_proxy = None
  41. if self.api_url is not None:
  42. new_proxy = self.get_proxy_url_from_api()
  43. else:
  44. new_proxy = urllib.request.getproxies().get("http")
  45. if old_proxy != new_proxy:
  46. self.log.debug("Set new proxy URL: %s", new_proxy)
  47. self.current_proxy_url = new_proxy
  48. return True
  49. self.log.debug("Got same proxy URL: %s", new_proxy)
  50. return False
  51. def get_proxy_url(self) -> str | None:
  52. if not self.current_proxy_url:
  53. self.update_proxy_url()
  54. return self.current_proxy_url