connect.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. """Openconnect-Microsoft-login connection helpers."""
  2. import logging
  3. import time
  4. from dataclasses import dataclass
  5. from urllib.parse import urlparse
  6. from otppy import OTP
  7. from selenium import webdriver
  8. from selenium.common.exceptions import (
  9. ElementClickInterceptedException,
  10. ElementNotInteractableException,
  11. NoSuchElementException,
  12. StaleElementReferenceException,
  13. TimeoutException,
  14. )
  15. from selenium.webdriver.common.by import By
  16. from selenium.webdriver.firefox.options import Options as FirefoxOptions
  17. from selenium.webdriver.support import expected_conditions as ec
  18. from selenium.webdriver.support.wait import WebDriverWait
  19. USERNAME_INPUT_NAME = "loginfmt"
  20. PASSWORD_INPUT_NAME = "passwd"
  21. MFA_INPUT_NAME = "otc"
  22. CONTINUE_BUTTON_ID = "idSIButton9"
  23. MFA_CONTINUE_BUTTON_ID = "idSubmit_SAOTCC_Continue"
  24. MFA_ERROR_TEXT_ID = "idSpan_SAOTCC_Error_OTC"
  25. VPN_INSTALL_PAGE_EXCLUSIVE_ELEMENT_ID = "provisioning_action_label"
  26. MAX_RETRY_DOMAIN_CHECK = 16
  27. MFA_MAX_RETRY_COUNT = 3
  28. ELEMENT_CHECK_DELAY = 0.5
  29. DOMAIN_CHECK_DELAY = 0.5
  30. LOGGER = logging.getLogger("OCMA")
  31. LOGGER.setLevel(logging.INFO)
  32. @dataclass
  33. class VPNCookie:
  34. """VPN cookie data container."""
  35. domain: str
  36. cookie: str
  37. def login( # noqa: PLR0913 # pylint: disable=too-many-arguments,too-many-positional-arguments
  38. username: str,
  39. password: str,
  40. mfa_secret: str | None = None,
  41. vpn_site: str = "https://vpn.fhnw.ch",
  42. headless: bool = True,
  43. log_messages: bool = False,
  44. ) -> VPNCookie:
  45. """
  46. Log in to the vpn.
  47. Parameters
  48. ----------
  49. username : str
  50. Microsoft account username.
  51. password : str
  52. Microsoft account password.
  53. mfa_secret : str | None, optional
  54. Multi-factor secret, by default None
  55. vpn_site : str, optional
  56. VPN site to log into, by default "https://vpn.fhnw.ch"
  57. headless : bool, optional
  58. If the browser should be run in headless mode, by default True
  59. log_messages : bool, optional
  60. If messages should be logged to the console, by default False
  61. Returns
  62. -------
  63. VPNCookie
  64. Cookie to log into the openconnect VPN.
  65. Raises
  66. ------
  67. ValueError
  68. If anything went sideways during the login.
  69. """
  70. if log_messages:
  71. formatter = logging.Formatter(
  72. "%(asctime)s: %(levelname)s - %(name)s - %(message)s",
  73. )
  74. fh_s = logging.StreamHandler()
  75. fh_s.setLevel(logging.INFO)
  76. fh_s.setFormatter(formatter)
  77. LOGGER.addHandler(fh_s)
  78. LOGGER.info("Starting")
  79. options = FirefoxOptions()
  80. if headless:
  81. LOGGER.info("Running in headless mode")
  82. options.add_argument("--headless")
  83. driver = webdriver.Firefox(options=options)
  84. driver.get(vpn_site)
  85. retries = MAX_RETRY_DOMAIN_CHECK
  86. while not _is_on_ms_login_page(driver) and retries > 0:
  87. retries -= 1
  88. time.sleep(DOMAIN_CHECK_DELAY)
  89. if retries < 0:
  90. raise ValueError(
  91. f"We never reached the MS login page! Currently on {driver.current_url}",
  92. )
  93. _fill_login(driver, username, password)
  94. _fill_mfa(driver, mfa_secret)
  95. _confirm_stay_signed_in(driver)
  96. _is_on_install_page(driver)
  97. return _get_webvpn_cookie(driver)
  98. def _fill_login(driver: webdriver.Firefox, username: str, password: str) -> None:
  99. LOGGER.info("Filling in login information")
  100. _check_on_ms_login_page(driver)
  101. if not _find_element(driver, By.NAME, USERNAME_INPUT_NAME):
  102. raise ValueError("Could not find login page!")
  103. if not _element_interactable(driver, By.NAME, USERNAME_INPUT_NAME):
  104. raise ValueError("Could not find username field!")
  105. driver.find_element(By.NAME, USERNAME_INPUT_NAME).send_keys(username)
  106. click_continue(driver)
  107. if _find_element(driver, by=By.ID, item="usernameError", wait=5):
  108. error_msg = driver.find_element(By.ID, "usernameError").text
  109. LOGGER.error("Invalid username or other error (Message: %s)", error_msg)
  110. raise ValueError(f"Invalid username or other error (Message: {error_msg})")
  111. if not _element_interactable(driver, By.NAME, PASSWORD_INPUT_NAME):
  112. raise ValueError("Could not find password input!")
  113. driver.find_element(By.NAME, PASSWORD_INPUT_NAME).send_keys(password)
  114. click_continue(driver)
  115. if _find_element(driver, by=By.ID, item="passwordError", wait=5):
  116. error_msg = driver.find_element(By.ID, "passwordError").text
  117. LOGGER.error("Invalid password or other error (Message: %s)", error_msg)
  118. raise ValueError(f"Invalid password or other error (Message: {error_msg})")
  119. retries = MAX_RETRY_DOMAIN_CHECK
  120. while on_login_form(driver) and retries > 0:
  121. LOGGER.info(
  122. "Login information may not have yet been confirmed. Checking again (Try %s of %s)",
  123. MAX_RETRY_DOMAIN_CHECK - retries + 1,
  124. MAX_RETRY_DOMAIN_CHECK,
  125. )
  126. time.sleep(ELEMENT_CHECK_DELAY)
  127. if on_login_form(driver):
  128. LOGGER.info("Login information has not yet been confirmed. Trying again")
  129. click_continue(driver) # Confirm password
  130. retries -= 1
  131. if retries < 0:
  132. raise ValueError("Failed to confirm login form!")
  133. LOGGER.info("Login information filled out")
  134. def on_login_form(driver: webdriver.Firefox) -> bool:
  135. """
  136. Check if we are on the login form.
  137. Parameters
  138. ----------
  139. driver : webdriver.Firefox
  140. The web driver to check on.
  141. Returns
  142. -------
  143. bool
  144. True if we are on the login form, else false.
  145. """
  146. return "saml2" in driver.current_url # or driver.current_url.endswith("login")
  147. def _fill_mfa(driver: webdriver.Firefox, mfa_secret: str | None) -> None:
  148. LOGGER.info("Checking if we can fill in MFA information")
  149. _check_on_ms_login_page(driver)
  150. # Check if we have the MFA page
  151. if driver.current_url.endswith("/login") and mfa_secret is None:
  152. raise ValueError("You need to supply your MFA secret to log in!")
  153. _check_approval_screen(driver)
  154. # Check if we have an MFA code to enter
  155. if mfa_secret is None:
  156. LOGGER.info("Filling in login information")
  157. return
  158. for i in range(MFA_MAX_RETRY_COUNT):
  159. LOGGER.info(
  160. "Filling in MFA information (Try %s of %s)",
  161. i + 1,
  162. MFA_MAX_RETRY_COUNT,
  163. )
  164. if not _find_element(driver, By.NAME, MFA_INPUT_NAME):
  165. time.sleep(ELEMENT_CHECK_DELAY)
  166. continue
  167. mfa_code = get_mfa_code(mfa_secret)
  168. driver.find_element(By.NAME, MFA_INPUT_NAME).send_keys(mfa_code)
  169. click_continue(driver, MFA_CONTINUE_BUTTON_ID) # Confirm otp
  170. LOGGER.info("Confirmed MFA code")
  171. if not driver.current_url.endswith("/login"):
  172. # We have moved on
  173. LOGGER.info("We have moved away from the MFA page")
  174. break
  175. # Check for any errors
  176. if not _find_element(driver, By.ID, MFA_ERROR_TEXT_ID, 2):
  177. # Did not find an error
  178. LOGGER.info("MFA seems to have been accepted, no errors")
  179. break
  180. def _check_approval_screen(driver: webdriver.Firefox) -> None:
  181. if not _find_element(driver, By.ID, "idDiv_SAOTCAS_Title"):
  182. return
  183. found_sign_in_other_way = False
  184. for _ in range(16):
  185. try:
  186. driver.find_element(By.ID, "signInAnotherWay").click()
  187. found_sign_in_other_way = True
  188. break
  189. except (ElementClickInterceptedException, StaleElementReferenceException):
  190. pass
  191. except NoSuchElementException:
  192. pass
  193. time.sleep(ELEMENT_CHECK_DELAY)
  194. if not found_sign_in_other_way:
  195. raise ValueError("Failed to find the option to sing in another way!")
  196. for _ in range(16):
  197. try:
  198. driver.find_element(By.XPATH, "//div[@data-value='PhoneAppOTP']").click()
  199. break
  200. except (ElementClickInterceptedException, StaleElementReferenceException):
  201. pass
  202. except NoSuchElementException:
  203. pass
  204. time.sleep(ELEMENT_CHECK_DELAY)
  205. return
  206. def _confirm_stay_signed_in(driver: webdriver.Firefox) -> bool:
  207. LOGGER.info("Checking if we should confirm if we should stay signed in")
  208. if not _is_on_ms_login_page(driver):
  209. LOGGER.info("Already logged in, can't confirm 'stay signed in'")
  210. return False
  211. if not driver.current_url.endswith("/common/SAS/ProcessAuth"):
  212. LOGGER.info(
  213. "We have reached a dead end. We have completed the login, but not on the 'stay signed in' page",
  214. )
  215. return False
  216. click_continue(driver) # Confirm stay signed in
  217. LOGGER.info("Confirmed to 'stay signed in'")
  218. return True
  219. def _find_element(driver: webdriver.Firefox, by: str, item: str, wait: int = 8) -> bool:
  220. try:
  221. w = WebDriverWait(driver, wait)
  222. w.until(ec.presence_of_element_located((by, item)))
  223. except TimeoutException:
  224. return False
  225. return True
  226. def _element_interactable(
  227. driver: webdriver.Firefox,
  228. by: str,
  229. item: str,
  230. wait: int = 8,
  231. ) -> bool:
  232. try:
  233. w = WebDriverWait(driver, wait)
  234. w.until(ec.element_to_be_clickable((by, item)))
  235. except ElementNotInteractableException:
  236. return False
  237. return True
  238. def _is_on_install_page(driver: webdriver.Firefox) -> None:
  239. if not _find_element(driver, By.ID, VPN_INSTALL_PAGE_EXCLUSIVE_ELEMENT_ID):
  240. raise ValueError("Could not find install page!")
  241. def _get_webvpn_cookie(driver: webdriver.Firefox) -> VPNCookie:
  242. webvpn_cookie = driver.get_cookie("webvpn")
  243. driver.close()
  244. if webvpn_cookie is None:
  245. raise ValueError(
  246. "Failed to find the webvpn cookie. Maybe the authentication has failed?",
  247. )
  248. webvpn_domain = webvpn_cookie["domain"]
  249. webvpn_value = webvpn_cookie["value"]
  250. return VPNCookie(domain=webvpn_domain, cookie=webvpn_value)
  251. def _check_on_ms_login_page(driver: webdriver.Firefox) -> None:
  252. if not _is_on_ms_login_page(driver):
  253. raise ValueError("We should still be on the MS login page but we aren't!")
  254. def _is_on_ms_login_page(driver: webdriver.Firefox) -> bool:
  255. return urlparse(driver.current_url).netloc == "login.microsoftonline.com"
  256. def click_continue(driver: webdriver.Firefox, btn_id: str = CONTINUE_BUTTON_ID) -> bool:
  257. """
  258. Click the continue button.
  259. Parameters
  260. ----------
  261. driver : webdriver.Firefox
  262. The driver for which to click the continue button.
  263. btn_id : str, optional
  264. The buttons ID to be clicked, by default CONTINUE_BUTTON_ID
  265. Returns
  266. -------
  267. bool
  268. True if still on login page, false if not.
  269. """
  270. for _ in range(16):
  271. try:
  272. driver.find_element(By.ID, btn_id).click()
  273. except (ElementClickInterceptedException, StaleElementReferenceException):
  274. pass
  275. except NoSuchElementException:
  276. pass
  277. else:
  278. return True
  279. time.sleep(ELEMENT_CHECK_DELAY)
  280. return False
  281. def get_mfa_code(secret: str) -> str:
  282. """
  283. Get the multi-factor authentication code for the given secret.
  284. Parameters
  285. ----------
  286. secret : str
  287. MFA secret
  288. Returns
  289. -------
  290. str
  291. MFA code
  292. """
  293. otp = OTP.fromb32(secret)
  294. code: str = otp.TOTP()[0]
  295. return code