connect.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374
  1. """Openconnect-Microsoft-login connection helpers."""
  2. import logging
  3. import time
  4. from dataclasses import dataclass
  5. from typing import Optional
  6. from urllib.parse import urlparse
  7. from otppy import OTP
  8. from selenium import webdriver
  9. from selenium.common.exceptions import ElementClickInterceptedException
  10. from selenium.common.exceptions import ElementNotInteractableException
  11. from selenium.common.exceptions import NoSuchElementException
  12. from selenium.common.exceptions import StaleElementReferenceException
  13. from selenium.common.exceptions import TimeoutException
  14. from selenium.webdriver.common.by import By
  15. from selenium.webdriver.firefox.options import Options as FirefoxOptions
  16. from selenium.webdriver.support import expected_conditions as EC
  17. from selenium.webdriver.support.ui import WebDriverWait
  18. USERNAME_INPUT_NAME = "loginfmt"
  19. PASSWORD_INPUT_NAME = "passwd"
  20. MFA_INPUT_NAME = "otc"
  21. CONTINUE_BUTTON_ID = "idSIButton9"
  22. MFA_CONTINUE_BUTTON_ID = "idSubmit_SAOTCC_Continue"
  23. MFA_ERROR_TEXT_ID = "idSpan_SAOTCC_Error_OTC"
  24. VPN_INSTALL_PAGE_EXCLUSIVE_ELEMENT_ID = "provisioning_action_label"
  25. MAX_RETRY_DOMAIN_CHECK = 16
  26. MFA_MAX_RETRY_COUNT = 3
  27. ELEMENT_CHECK_DELAY = 0.5
  28. DOMAIN_CHECK_DELAY = 0.5
  29. LOGGER = logging.getLogger("OCMA")
  30. LOGGER.setLevel(logging.INFO)
  31. @dataclass
  32. class VPNCookie:
  33. """VPN cookie data container."""
  34. domain: str
  35. cookie: str
  36. def login(
  37. username: str,
  38. password: str,
  39. mfa_secret: Optional[str] = None,
  40. vpn_site: str = "https://vpn.fhnw.ch",
  41. headless: bool = True,
  42. log_messages: bool = False,
  43. ) -> VPNCookie:
  44. """
  45. Log in to the vpn.
  46. Parameters
  47. ----------
  48. username : str
  49. Microsoft account username.
  50. password : str
  51. Microsoft account password.
  52. mfa_secret : Optional[str], optional
  53. Multifactor secret, by default None
  54. vpn_site : _type_, optional
  55. VPN site to log into, by default "https://vpn.fhnw.ch"
  56. headless : bool, optional
  57. If the browser should be run in headless mode, by default True
  58. log_messages : bool, optional
  59. If messages should be logged to the console, by default False
  60. Returns
  61. -------
  62. VPNCookie
  63. Cookie to log into the openconnect VPN.
  64. Raises
  65. ------
  66. ValueError
  67. If anything went sideways during the login.
  68. """
  69. if log_messages:
  70. formatter = logging.Formatter(
  71. "%(asctime)s: %(levelname)s - %(name)s - %(message)s"
  72. )
  73. fh_s = logging.StreamHandler()
  74. fh_s.setLevel(logging.INFO)
  75. fh_s.setFormatter(formatter)
  76. LOGGER.addHandler(fh_s)
  77. LOGGER.info("Starting")
  78. options = FirefoxOptions()
  79. if headless:
  80. LOGGER.info("Running in headless mode")
  81. options.add_argument("--headless")
  82. driver = webdriver.Firefox(options=options)
  83. driver.get(vpn_site)
  84. retries = MAX_RETRY_DOMAIN_CHECK
  85. while not _is_on_ms_login_page(driver) and retries > 0:
  86. retries -= 1
  87. time.sleep(DOMAIN_CHECK_DELAY)
  88. if retries < 0:
  89. raise ValueError(
  90. f"We never reached the MS login page! Currently on {driver.current_url}"
  91. )
  92. _fill_login(driver, username, password)
  93. _fill_mfa(driver, mfa_secret)
  94. _confirm_stay_signed_in(driver)
  95. _is_on_install_page(driver)
  96. return _get_webvpn_cookie(driver)
  97. def _fill_login(driver: webdriver.Firefox, username: str, password: str) -> None:
  98. LOGGER.info("Filling in login information")
  99. _check_on_ms_login_page(driver)
  100. if not _find_element(driver, By.NAME, USERNAME_INPUT_NAME):
  101. raise ValueError("Could not find login page!")
  102. if not _element_interactable(driver, By.NAME, USERNAME_INPUT_NAME):
  103. raise ValueError("Could not find username field!")
  104. driver.find_element(By.NAME, USERNAME_INPUT_NAME).send_keys(username)
  105. click_continue(driver)
  106. if _find_element(driver, by=By.ID, item="usernameError", wait=5):
  107. error_msg = driver.find_element(By.ID, "usernameError").text
  108. LOGGER.error("Invalid username or other error (Message: %s)", error_msg)
  109. raise ValueError(f"Invalid username or other error (Message: {error_msg})")
  110. if not _element_interactable(driver, By.NAME, PASSWORD_INPUT_NAME):
  111. raise ValueError("Could not find password input!")
  112. driver.find_element(By.NAME, PASSWORD_INPUT_NAME).send_keys(password)
  113. click_continue(driver)
  114. if _find_element(driver, by=By.ID, item="passwordError", wait=5):
  115. error_msg = driver.find_element(By.ID, "passwordError").text
  116. LOGGER.error("Invalid password or other error (Message: %s)", error_msg)
  117. raise ValueError(f"Invalid password or other error (Message: {error_msg})")
  118. retries = MAX_RETRY_DOMAIN_CHECK
  119. while on_login_form(driver) and retries > 0:
  120. LOGGER.info(
  121. "Login information may not have yet been confirmed. Checking again (Try %s of %s)",
  122. MAX_RETRY_DOMAIN_CHECK - retries + 1,
  123. MAX_RETRY_DOMAIN_CHECK,
  124. )
  125. time.sleep(ELEMENT_CHECK_DELAY)
  126. if on_login_form(driver):
  127. LOGGER.info("Login information has not yet been confirmed. Trying again")
  128. click_continue(driver) # Confirm password
  129. retries -= 1
  130. if retries < 0:
  131. raise ValueError("Failed to confirm login form!")
  132. LOGGER.info("Login information filled out")
  133. def on_login_form(driver: webdriver.Firefox) -> bool:
  134. """
  135. Check if we are on the login form.
  136. Parameters
  137. ----------
  138. driver : webdriver.Firefox
  139. The web driver to check on.
  140. Returns
  141. -------
  142. bool
  143. True if we are on the login form, else false.
  144. """
  145. return "saml2" in driver.current_url # or driver.current_url.endswith("login")
  146. def _fill_mfa(driver: webdriver.Firefox, mfa_secret: Optional[str]) -> None:
  147. LOGGER.info("Checking if we can fill in MFA information")
  148. _check_on_ms_login_page(driver)
  149. # Check if we have the MFA page
  150. if driver.current_url.endswith("/login") and mfa_secret is None:
  151. raise ValueError("You need to supply your MFA secret to log in!")
  152. _check_approval_screen(driver)
  153. # Check if we have an MFA code to enter
  154. if mfa_secret is None:
  155. LOGGER.info("Filling in login information")
  156. return
  157. for i in range(MFA_MAX_RETRY_COUNT):
  158. LOGGER.info(
  159. "Filling in MFA information (Try %s of %s)", i + 1, MFA_MAX_RETRY_COUNT
  160. )
  161. if not _find_element(driver, By.NAME, MFA_INPUT_NAME):
  162. time.sleep(ELEMENT_CHECK_DELAY)
  163. continue
  164. mfa_code = get_mfa_code(mfa_secret)
  165. driver.find_element(By.NAME, MFA_INPUT_NAME).send_keys(mfa_code)
  166. click_continue(driver, MFA_CONTINUE_BUTTON_ID) # Confirm otp
  167. LOGGER.info("Confirmed MFA code")
  168. if not driver.current_url.endswith("/login"):
  169. # We have moved on
  170. LOGGER.info("We have moved away from the MFA page")
  171. break
  172. # Check for any errors
  173. if not _find_element(driver, By.ID, MFA_ERROR_TEXT_ID, 2):
  174. # Did not find an error
  175. LOGGER.info("MFA seems to have been accepted, no errors")
  176. break
  177. def _check_approval_screen(driver: webdriver.Firefox) -> None:
  178. if not _find_element(driver, By.ID, "idDiv_SAOTCAS_Title"):
  179. return
  180. found_sign_in_other_way = False
  181. for _ in range(16):
  182. try:
  183. driver.find_element(By.ID, "signInAnotherWay").click()
  184. found_sign_in_other_way = True
  185. break
  186. except (ElementClickInterceptedException, StaleElementReferenceException):
  187. pass
  188. except NoSuchElementException:
  189. pass
  190. time.sleep(ELEMENT_CHECK_DELAY)
  191. if not found_sign_in_other_way:
  192. raise ValueError("Failed to find the option to sing in another way!")
  193. for _ in range(16):
  194. try:
  195. driver.find_element(By.XPATH, "//div[@data-value='PhoneAppOTP']").click()
  196. break
  197. except (ElementClickInterceptedException, StaleElementReferenceException):
  198. pass
  199. except NoSuchElementException:
  200. pass
  201. time.sleep(ELEMENT_CHECK_DELAY)
  202. return
  203. def _confirm_stay_signed_in(driver: webdriver.Firefox) -> bool:
  204. LOGGER.info("Checking if we should confirm if we should stay signed in")
  205. if not _is_on_ms_login_page(driver):
  206. LOGGER.info("Already logged in, can't confirm 'stay signed in'")
  207. return False
  208. if not driver.current_url.endswith("/common/SAS/ProcessAuth"):
  209. LOGGER.info(
  210. "We have reached a dead end. We have completed the login, but not on the 'stay signed in' page"
  211. )
  212. return False
  213. click_continue(driver) # Confirm stay signed in
  214. LOGGER.info("Confirmed to 'stay signed in'")
  215. return True
  216. def _find_element(driver: webdriver.Firefox, by: str, item: str, wait: int = 8) -> bool:
  217. try:
  218. w = WebDriverWait(driver, wait)
  219. w.until(EC.presence_of_element_located((by, item)))
  220. return True
  221. except TimeoutException:
  222. return False
  223. def _element_interactable(
  224. driver: webdriver.Firefox, by: str, item: str, wait: int = 8
  225. ) -> bool:
  226. try:
  227. w = WebDriverWait(driver, wait)
  228. w.until(EC.element_to_be_clickable((by, item)))
  229. return True
  230. except ElementNotInteractableException:
  231. return False
  232. def _is_on_install_page(driver: webdriver.Firefox) -> None:
  233. if not _find_element(driver, By.ID, VPN_INSTALL_PAGE_EXCLUSIVE_ELEMENT_ID):
  234. raise ValueError("Could not find install page!")
  235. def _get_webvpn_cookie(driver: webdriver.Firefox) -> VPNCookie:
  236. webvpn_cookie = driver.get_cookie("webvpn")
  237. driver.close()
  238. if webvpn_cookie is None:
  239. raise ValueError(
  240. "Failed to find the webvpn cookie. Maybe the authentication has failed?"
  241. )
  242. webvpn_domain = webvpn_cookie["domain"]
  243. webvpn_value = webvpn_cookie["value"]
  244. return VPNCookie(domain=webvpn_domain, cookie=webvpn_value)
  245. def _check_on_ms_login_page(driver: webdriver.Firefox) -> None:
  246. if not _is_on_ms_login_page(driver):
  247. raise ValueError("We should still be on the MS login page but we aren't!")
  248. def _is_on_ms_login_page(driver: webdriver.Firefox) -> bool:
  249. return urlparse(driver.current_url).netloc == "login.microsoftonline.com"
  250. def click_continue(driver: webdriver.Firefox, btn_id: str = CONTINUE_BUTTON_ID) -> bool:
  251. """
  252. Click the continue button.
  253. Parameters
  254. ----------
  255. driver : webdriver.Firefox
  256. The driver for which to click the continue button.
  257. btn_id : str, optional
  258. The buttons ID to be clicked, by default CONTINUE_BUTTON_ID
  259. Returns
  260. -------
  261. bool
  262. True if still on login page, false if not.
  263. """
  264. for _ in range(16):
  265. try:
  266. driver.find_element(By.ID, btn_id).click()
  267. return True
  268. except (ElementClickInterceptedException, StaleElementReferenceException):
  269. pass
  270. except NoSuchElementException:
  271. pass
  272. time.sleep(ELEMENT_CHECK_DELAY)
  273. return False
  274. def get_mfa_code(secret: str) -> str:
  275. """
  276. Get the multifactor authentication code for the given secret.
  277. Parameters
  278. ----------
  279. secret : str
  280. MFA secret
  281. Returns
  282. -------
  283. str
  284. MFA code
  285. """
  286. otp = OTP.fromb32(secret)
  287. code: str = otp.TOTP()[0]
  288. return code