table_extraction.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. import logging
  2. from multiprocessing import Pool
  3. from pathlib import Path
  4. from pdfplumber.page import Page
  5. from pdfplumber.table import Table
  6. import pdfplumber
  7. from config import (
  8. CLASS_TIMETABLE_PDF_TABLE_SETTINGS,
  9. ALLOWED_TIMESLOTS,
  10. CLASS_TIMETABLE_PDF_MIN_DIMENSIONS,
  11. )
  12. from .models import (
  13. Weekday,
  14. TimeSlot,
  15. YLevel,
  16. RawExtractedModule,
  17. UnmergedTimeEntries,
  18. Area,
  19. HorizontalLine,
  20. ClassPdfExtractionPageData,
  21. RawClassPdfExtractionPageData,
  22. PageMetadata,
  23. )
  24. from .above_table_text import parse_above_table_text
  25. from .geometry import (
  26. get_timeslot_for_area,
  27. is_line_at_bottom,
  28. is_area_below,
  29. is_vertical_match,
  30. )
  31. from .img import is_mostly_white_area
  32. allowed_time_slots: list[TimeSlot] = [
  33. TimeSlot(start_time=timeslot_tuple[0], end_time=timeslot_tuple[1])
  34. for timeslot_tuple in ALLOWED_TIMESLOTS
  35. ]
  36. def get_weekday_from_text(text: str) -> Weekday | None:
  37. """
  38. Helper function that tries to get a Weekday from a string.
  39. Only accepts exact display name matches.
  40. """
  41. for weekday in Weekday:
  42. if weekday.display_name == text:
  43. return weekday
  44. return None
  45. def get_modules_from_weekday(
  46. weekday: Weekday,
  47. unmerged_time_entries: UnmergedTimeEntries,
  48. page: Page,
  49. timeslot_y_levels: dict[TimeSlot, YLevel],
  50. page_number: int,
  51. ) -> list[RawExtractedModule]:
  52. """
  53. Extracts the modules (raw text and start/end) of a weekday on a single pdf page
  54. """
  55. highest_y_level = timeslot_y_levels[allowed_time_slots[-1]].y2
  56. modules = []
  57. while len(unmerged_time_entries.cells) > 0:
  58. area = unmerged_time_entries.cells.pop(0)
  59. if is_mostly_white_area(page, area):
  60. logging.debug("mostly white cell skipped")
  61. continue
  62. timeslot = get_timeslot_for_area(area, timeslot_y_levels)
  63. if timeslot is None:
  64. raise RuntimeError("Could not match TimeSlot to Cell Area")
  65. start_seconds = timeslot.start_seconds()
  66. line_at_bottom_found = False
  67. while not line_at_bottom_found:
  68. logging.debug("searching for line at bottom of: %s", area)
  69. logging.debug("line candidates:")
  70. for line in unmerged_time_entries.horizontal_lines:
  71. logging.debug("testing horizontal line: %s", line)
  72. if is_line_at_bottom(area, line, tolerance=20):
  73. line_at_bottom_found = True
  74. logging.debug("candidate line found")
  75. break
  76. else:
  77. if is_vertical_match(area.y2, highest_y_level):
  78. logging.debug("highest y level matched")
  79. break
  80. found_matching_next_cell_index = -1
  81. for index, potential_cell_below in enumerate(
  82. unmerged_time_entries.cells
  83. ):
  84. if is_area_below(potential_cell_below, area):
  85. found_matching_next_cell_index = index
  86. break
  87. else:
  88. raise RuntimeError(
  89. f"No matching cell below found to merge with on {weekday}"
  90. )
  91. logging.debug("vertically merging cells for %s", weekday)
  92. matched_area = unmerged_time_entries.cells.pop(
  93. found_matching_next_cell_index
  94. )
  95. logging.debug("matched cell area: %s", matched_area)
  96. area = Area(
  97. x1=area.x1, y1=area.y1, x2=matched_area.x2, y2=matched_area.y2
  98. )
  99. text = page.crop((area.x1, area.y1, area.x2, area.y2)).extract_text()
  100. timeslot = get_timeslot_for_area(area, timeslot_y_levels)
  101. if timeslot is None:
  102. raise RuntimeError("Could not match TimeSlot to Cell Area")
  103. end_seconds = timeslot.end_seconds()
  104. modules.append(
  105. RawExtractedModule(
  106. weekday=weekday,
  107. start_seconds=start_seconds,
  108. end_seconds=end_seconds,
  109. text=text,
  110. source_page_number=page_number,
  111. )
  112. )
  113. return modules
  114. def get_usable_table_index(found_tables: list) -> int:
  115. """
  116. Identifies the index of the timetable on the page based on dimensions.
  117. Raises:
  118. RuntimeError: If no or multiple tables matching the minimum dimensions are found.
  119. """
  120. if not found_tables:
  121. raise RuntimeError("No matching tables found.")
  122. valid_indices = []
  123. for index, table in enumerate(found_tables):
  124. x0, top, x1, bottom = table.bbox
  125. width = x1 - x0
  126. height = bottom - top
  127. logging.debug(
  128. "table num %d: width: %d, height: %d",
  129. index + 1,
  130. width,
  131. height,
  132. )
  133. if (
  134. width >= CLASS_TIMETABLE_PDF_MIN_DIMENSIONS
  135. and height >= CLASS_TIMETABLE_PDF_MIN_DIMENSIONS
  136. ):
  137. valid_indices.append(index)
  138. if len(valid_indices) > 1:
  139. raise RuntimeError(
  140. f"Found {len(valid_indices)} valid tables, expected at most 1. "
  141. "Ambiguous table selection."
  142. )
  143. if len(valid_indices) == 1:
  144. return valid_indices[0]
  145. return 0
  146. def process_page(
  147. input_filename: Path, page_index: int
  148. ) -> RawClassPdfExtractionPageData:
  149. """
  150. Process a single page of the PDF to extract modules and header text.
  151. Designed to be run in a separate process.
  152. """
  153. with pdfplumber.open(input_filename) as pdf:
  154. page = pdf.pages[page_index]
  155. weekday_areas: dict[Weekday, Area] = {}
  156. timeslot_y_levels: dict[TimeSlot, YLevel] = {}
  157. unmerged_time_entries_by_weekday: dict[Weekday, UnmergedTimeEntries] = {}
  158. for day in Weekday:
  159. weekday_areas[day] = Area(x1=0, y1=0, x2=0, y2=0)
  160. table: Table = select_main_table(page, page_index)
  161. text_above_table = get_above_table_text(page, table_y1=table.bbox[1])
  162. empty_start_found = False
  163. # get weekday and timeslot areas
  164. expected_timeslot_index = 0
  165. for row_index, row in enumerate(table.rows):
  166. if row_index == 0:
  167. for column_index, cell in enumerate(row.cells):
  168. logging.debug("row: %d, col: %d", row_index, column_index)
  169. logging.debug(cell)
  170. if cell is None:
  171. logging.debug("None Table Cell Found")
  172. else:
  173. cell_text = page.crop(
  174. (cell[0], cell[1], cell[2], cell[3])
  175. ).extract_text()
  176. if not empty_start_found and len(cell_text) == 0:
  177. logging.debug("empty start found")
  178. empty_start_found = True
  179. weekday_enum = get_weekday_from_text(cell_text)
  180. if weekday_enum:
  181. logging.debug("Weekday %s found", cell_text)
  182. weekday_areas[weekday_enum] = Area(
  183. x1=cell[0], y1=cell[3], x2=cell[2], y2=0
  184. )
  185. else:
  186. logging.debug("row: %d, col: %d", row_index, 0)
  187. cell = row.cells[0]
  188. if cell is None:
  189. logging.warning("Unexpected None Table Cell Found")
  190. else:
  191. cell_text = page.crop(
  192. (cell[0], cell[1], cell[2], cell[3])
  193. ).extract_text()
  194. target_timeslot = allowed_time_slots[expected_timeslot_index]
  195. if not (
  196. target_timeslot.start_time in cell_text
  197. and target_timeslot.end_time in cell_text
  198. ):
  199. logging.warning("Unexpected Timeslot found: '%s'", cell_text)
  200. else:
  201. # assumes this is the last timeslot ever
  202. if target_timeslot == TimeSlot(
  203. start_time="20:30", end_time="21:15"
  204. ):
  205. for weekday in Weekday:
  206. new_area = Area(
  207. x1=weekday_areas[weekday].x1,
  208. y1=weekday_areas[weekday].y1,
  209. x2=weekday_areas[weekday].x2,
  210. y2=cell[3],
  211. )
  212. weekday_areas[weekday] = new_area
  213. timeslot_y_levels[target_timeslot] = YLevel(
  214. y1=cell[1], y2=cell[3]
  215. )
  216. expected_timeslot_index += 1
  217. collected_unmerged_time_entries_by_weekday(
  218. unmerged_time_entries_by_weekday, weekday_areas, table, page
  219. )
  220. all_modules: list[RawExtractedModule] = []
  221. for weekday in Weekday:
  222. all_modules.extend(
  223. get_modules_from_weekday(
  224. weekday,
  225. unmerged_time_entries_by_weekday[weekday],
  226. page,
  227. timeslot_y_levels,
  228. page_index + 1,
  229. )
  230. )
  231. return RawClassPdfExtractionPageData(
  232. raw_extracted_modules=all_modules, above_table_text=text_above_table
  233. )
  234. def select_main_table(page: Page, page_index: int) -> Table:
  235. """
  236. Selects the main table on the PDF Page. This should be the timetable.
  237. """
  238. found_tables = page.find_tables(CLASS_TIMETABLE_PDF_TABLE_SETTINGS)
  239. logging.debug(
  240. "amount of tables found on page %d: %d",
  241. page_index + 1,
  242. len(found_tables),
  243. )
  244. table = found_tables[get_usable_table_index(found_tables)]
  245. return table
  246. def collected_unmerged_time_entries_by_weekday(
  247. unmerged_time_entries_by_weekday: dict[Weekday, UnmergedTimeEntries],
  248. weekday_areas: dict[Weekday, Area],
  249. table: Table,
  250. page: Page,
  251. ) -> None:
  252. """
  253. Populates the passed unmerged_time_entries_by_weekday dict with the
  254. `UnmergedTimeEntries` by `Weekday`.
  255. """
  256. for weekday in Weekday:
  257. unmerged_time_entries_by_weekday[weekday] = UnmergedTimeEntries(
  258. cells=[], horizontal_lines=[]
  259. )
  260. target_area: Area = weekday_areas[weekday]
  261. logging.debug("target_area: %s", target_area)
  262. for row_index, row in enumerate(table.rows):
  263. for column_index, cell in enumerate(row.cells):
  264. if cell is None:
  265. logging.debug("None table cell found")
  266. continue
  267. logging.debug("row: %d, col: %d", row_index, column_index)
  268. logging.debug("cell: %s", cell)
  269. if (
  270. target_area.x1 <= cell[0]
  271. and target_area.y1 <= cell[1]
  272. and target_area.x2 >= cell[2]
  273. and target_area.y2 >= cell[3]
  274. ):
  275. unmerged_time_entries_by_weekday[weekday].cells.append(
  276. Area(x1=cell[0], y1=cell[1], x2=cell[2], y2=cell[3])
  277. )
  278. logging.debug("%s cell found", weekday)
  279. collect_horizontal_lines(
  280. unmerged_time_entries_by_weekday, page, target_area, weekday
  281. )
  282. def collect_horizontal_lines(
  283. unmerged_time_entries_by_weekday: dict[Weekday, UnmergedTimeEntries],
  284. page: Page,
  285. target_area: Area,
  286. weekday: Weekday,
  287. ) -> None:
  288. """
  289. Populates the passed unmerged_time_entries_by_weekday dict with the
  290. `horizontal_lines` of the `UnmergedTimeEntries` by the passed weekday.
  291. These horizontal Lines are timeslot seperator lines.
  292. """
  293. for line_found in page.lines:
  294. line_x1 = line_found["x0"]
  295. line_x2 = line_found["x1"]
  296. line_y1 = line_found["y0"]
  297. line_y2 = line_found["y1"]
  298. line_bottom = line_found["bottom"]
  299. # ignore non horizontal lines
  300. if line_y1 != line_y2:
  301. continue
  302. if target_area.x1 <= line_x1 and target_area.x2 >= line_x2:
  303. logging.debug("%s timeslot seperator line found", weekday)
  304. unmerged_time_entries_by_weekday[weekday].horizontal_lines.append(
  305. HorizontalLine(x1=line_x1, x2=line_x2, y=line_bottom)
  306. )
  307. def extract_data_from_class_pdf(
  308. input_filename: Path, num_of_jobs: int = 1
  309. ) -> list[ClassPdfExtractionPageData]:
  310. """
  311. Extracts all data from the specified Class Timetable PDF filename.
  312. Can run via multiple jobs.
  313. """
  314. logging.info("Starting extraction with %d jobs", num_of_jobs)
  315. num_pages: int = get_number_of_pdf_pages(input_filename)
  316. logging.info("Found %d pages to process", num_pages)
  317. processed_pages: list[RawClassPdfExtractionPageData] = process_pages_in_parallel(
  318. num_of_jobs, input_filename, num_pages
  319. )
  320. extraction_data: list[ClassPdfExtractionPageData] = process_metadata_sequentially(
  321. processed_pages
  322. )
  323. return extraction_data
  324. def process_metadata_sequentially(
  325. processed_pages: list[RawClassPdfExtractionPageData],
  326. ) -> list[ClassPdfExtractionPageData]:
  327. """
  328. Process the above table text into `PageMetadata`'s of the processed pages.
  329. """
  330. extraction_data: list[ClassPdfExtractionPageData] = []
  331. previous_page_metadata: list[PageMetadata] = []
  332. for processed_page in processed_pages:
  333. page_metadata = parse_above_table_text(
  334. processed_page.above_table_text, previous_page_metadata
  335. )
  336. previous_page_metadata.append(page_metadata)
  337. extraction_data.append(
  338. ClassPdfExtractionPageData(
  339. raw_extracted_modules=processed_page.raw_extracted_modules,
  340. page_metadata=page_metadata,
  341. )
  342. )
  343. return extraction_data
  344. def process_pages_in_parallel(
  345. num_of_jobs: int, input_filename: Path, num_of_pages: int
  346. ) -> list[RawClassPdfExtractionPageData]:
  347. """Extracts the pdf pages in parallel based on the number of jobs"""
  348. with Pool(processes=num_of_jobs) as pool:
  349. results = pool.starmap(
  350. process_page, [(input_filename, i) for i in range(num_of_pages)]
  351. )
  352. return results
  353. def get_number_of_pdf_pages(input_filename: Path) -> int:
  354. """Get the number of pdf pages using the pdfplumber library"""
  355. with pdfplumber.open(input_filename) as pdf:
  356. num_pages = len(pdf.pages)
  357. return num_pages
  358. def get_above_table_text(page: Page, table_y1: float) -> str:
  359. """
  360. Get the text above the timetable for metadata parsing
  361. """
  362. upper_region = page.crop((0, 0, page.width, table_y1))
  363. text_above_table = upper_region.extract_text()
  364. logging.debug("Text found above the table:")
  365. logging.debug(text_above_table)
  366. return text_above_table