table_extraction.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537
  1. import logging
  2. from multiprocessing import Pool
  3. from pathlib import Path
  4. from pdfplumber.page import Page
  5. from pdfplumber.table import Table
  6. import pdfplumber
  7. from config import (
  8. CLASS_TIMETABLE_PDF_TABLE_SETTINGS,
  9. ALLOWED_TIMESLOTS,
  10. CLASS_TIMETABLE_PDF_MIN_DIMENSIONS,
  11. )
  12. from .models import (
  13. Weekday,
  14. TimeSlot,
  15. YLevel,
  16. RawExtractedModule,
  17. UnmergedTimeEntries,
  18. Area,
  19. HorizontalLine,
  20. ClassPdfExtractionPageData,
  21. RawClassPdfExtractionPageData,
  22. PageMetadata,
  23. TimeSlotYLevelsCollectionData,
  24. )
  25. from .above_table_text import parse_above_table_text
  26. from .geometry import (
  27. get_timeslot_for_area,
  28. is_line_at_bottom,
  29. is_area_below,
  30. is_vertical_match,
  31. )
  32. from .img import is_mostly_white_area
  33. allowed_time_slots: list[TimeSlot] = [
  34. TimeSlot(start_time=timeslot_tuple[0], end_time=timeslot_tuple[1])
  35. for timeslot_tuple in ALLOWED_TIMESLOTS
  36. ]
  37. def find_next_cell_below_index(current_area: Area, cells: list[Area]) -> int:
  38. """
  39. Returns the index of the first cell directly below current_area, or -1 if none.
  40. """
  41. for index, cell in enumerate(cells):
  42. if is_area_below(cell, current_area):
  43. return index
  44. return -1
  45. def get_weekday_from_text(text: str) -> Weekday | None:
  46. """
  47. Helper function that tries to get a Weekday from a string.
  48. Only accepts exact display name matches.
  49. """
  50. for weekday in Weekday:
  51. if weekday.display_name == text:
  52. return weekday
  53. return None
  54. def merge_vertically_spanning_cells(
  55. initial_area: Area,
  56. remaining_cells: list[Area],
  57. horizontal_lines: list[HorizontalLine],
  58. highest_y: float,
  59. weekday: Weekday,
  60. ) -> Area:
  61. """
  62. Merges vertically adjacent cells until a bottom boundary (line or page end) is found.
  63. Mutates remaining_cells by removing used cells.
  64. Returns the final merged area.
  65. """
  66. current_area = initial_area
  67. while True:
  68. logging.debug(
  69. "Searching for bottom boundary of area: %s on %s", current_area, weekday
  70. )
  71. # case 1: horizontal line at the bottom of current area?
  72. if any(
  73. is_line_at_bottom(current_area, line, tolerance=20)
  74. for line in horizontal_lines
  75. ):
  76. logging.debug("Bottom boundary found: horizontal line")
  77. return current_area
  78. # case 2: reached the bottom of the timetable?
  79. if is_vertical_match(current_area.y2, highest_y):
  80. logging.debug("Bottom boundary found: highest y level")
  81. return current_area
  82. # case 3: find and merge with the next cell below
  83. next_cell_index = find_next_cell_below_index(current_area, remaining_cells)
  84. if next_cell_index == -1:
  85. raise RuntimeError(
  86. f"No bottom boundary or next cell found for module on {weekday}"
  87. )
  88. next_cell = remaining_cells.pop(next_cell_index)
  89. logging.debug("Vertically merging with cell below: %s", next_cell)
  90. current_area = Area(
  91. x1=current_area.x1,
  92. y1=current_area.y1,
  93. x2=next_cell.x2, # use the wider x2 in case of a slight misalignment
  94. y2=next_cell.y2,
  95. )
  96. def get_modules_from_weekday(
  97. weekday: Weekday,
  98. unmerged_time_entries: UnmergedTimeEntries,
  99. page: Page,
  100. timeslot_y_levels: dict[TimeSlot, YLevel],
  101. page_number: int,
  102. ) -> list[RawExtractedModule]:
  103. """
  104. Extracts the modules (raw text and start/end) of a weekday on a single pdf page.
  105. """
  106. cells = unmerged_time_entries.cells[:]
  107. horizontal_lines = unmerged_time_entries.horizontal_lines
  108. highest_y: float = get_highest_y_level(timeslot_y_levels, page_number)
  109. modules: list[RawExtractedModule] = []
  110. while cells:
  111. initial_area = cells.pop(0)
  112. if is_mostly_white_area(page, initial_area):
  113. logging.debug("mostly white cell skipped")
  114. continue
  115. merged_area: Area = merge_vertically_spanning_cells(
  116. initial_area, cells, horizontal_lines, highest_y, weekday
  117. )
  118. start_timeslot = get_timeslot_for_area(initial_area, timeslot_y_levels)
  119. if start_timeslot is None:
  120. raise RuntimeError(
  121. f"Could not determine start timeslot for module on {weekday}"
  122. )
  123. end_timeslot = get_timeslot_for_area(merged_area, timeslot_y_levels)
  124. if end_timeslot is None:
  125. raise RuntimeError(
  126. f"Could not determine end timeslot for merged module on {weekday}"
  127. )
  128. text: str = (
  129. page.crop(
  130. (merged_area.x1, merged_area.y1, merged_area.x2, merged_area.y2)
  131. ).extract_text()
  132. or "" # do not raise error when extraction returns None for now
  133. )
  134. modules.append(
  135. RawExtractedModule(
  136. weekday=weekday,
  137. start_seconds=start_timeslot.start_seconds(),
  138. end_seconds=end_timeslot.end_seconds(),
  139. text=text,
  140. source_page_number=page_number,
  141. )
  142. )
  143. return modules
  144. def get_highest_y_level(timeslot_y_levels, page_number) -> float:
  145. """
  146. Gets the highest `YLevel` of all `TimeSlot`'s.
  147. Raises:
  148. RuntimeError: If no the highest allowed `TimeSlot` was not mapped to a `YLevel`
  149. """
  150. try:
  151. highest_y_level = timeslot_y_levels[allowed_time_slots[-1]].y2
  152. except KeyError as e:
  153. logging.debug("timeslot_y_levels on page %d %s", page_number, timeslot_y_levels)
  154. raise RuntimeError("Could not get YLevel for latest TimeSlot") from e
  155. return highest_y_level
  156. def get_usable_table_index(found_tables: list) -> int:
  157. """
  158. Identifies the index of the timetable on the page based on dimensions.
  159. Raises:
  160. RuntimeError: If no or multiple tables matching the minimum dimensions are found.
  161. """
  162. if not found_tables:
  163. raise RuntimeError("No matching tables found.")
  164. valid_indices = []
  165. for index, table in enumerate(found_tables):
  166. x0, top, x1, bottom = table.bbox
  167. width = x1 - x0
  168. height = bottom - top
  169. logging.debug(
  170. "table num %d: width: %d, height: %d",
  171. index + 1,
  172. width,
  173. height,
  174. )
  175. if (
  176. width >= CLASS_TIMETABLE_PDF_MIN_DIMENSIONS
  177. and height >= CLASS_TIMETABLE_PDF_MIN_DIMENSIONS
  178. ):
  179. valid_indices.append(index)
  180. if len(valid_indices) > 1:
  181. raise RuntimeError(
  182. f"Found {len(valid_indices)} valid tables, expected at most 1. "
  183. "Ambiguous table selection."
  184. )
  185. if len(valid_indices) == 1:
  186. return valid_indices[0]
  187. return 0
  188. def process_page(
  189. input_filename: Path, page_index: int
  190. ) -> RawClassPdfExtractionPageData:
  191. """
  192. Process a single page of the PDF to extract modules and header text.
  193. Designed to be run in a separate process.
  194. """
  195. with pdfplumber.open(input_filename) as pdf:
  196. page = pdf.pages[page_index]
  197. timeslot_y_levels: dict[TimeSlot, YLevel] = {}
  198. unmerged_time_entries_by_weekday: dict[Weekday, UnmergedTimeEntries] = {}
  199. weekday_areas: dict[Weekday, Area] = init_weekday_areas()
  200. table: Table = select_main_table(page, page_index)
  201. text_above_table: str = get_above_table_text(page, table_y1=table.bbox[1])
  202. collect_weekday_areas_and_timeslot_y_levels(
  203. weekday_areas, timeslot_y_levels, page, table
  204. )
  205. collected_unmerged_time_entries_by_weekday(
  206. unmerged_time_entries_by_weekday, weekday_areas, table, page
  207. )
  208. all_modules: list[RawExtractedModule] = []
  209. for weekday in Weekday:
  210. all_modules.extend(
  211. get_modules_from_weekday(
  212. weekday,
  213. unmerged_time_entries_by_weekday[weekday],
  214. page,
  215. timeslot_y_levels,
  216. page_index + 1,
  217. )
  218. )
  219. return RawClassPdfExtractionPageData(
  220. raw_extracted_modules=all_modules, above_table_text=text_above_table
  221. )
  222. def collect_weekday_areas_and_timeslot_y_levels(
  223. weekday_areas: dict[Weekday, Area],
  224. timeslot_y_levels: dict[TimeSlot, YLevel],
  225. page: Page,
  226. table: Table,
  227. ) -> None:
  228. """
  229. Populates the passed weekday_areas and timeslot_y_levels dicts with the right
  230. `Area`'s by `Weekday` and `YLevel` by TimeSlot respectively, via side effects.
  231. """
  232. expected_timeslot_index = 0
  233. for row_index, row in enumerate(table.rows):
  234. if row_index == 0:
  235. collect_weekday_areas(weekday_areas, page, row, row_index)
  236. else:
  237. expected_timeslot_index: int = collect_timeslot_y_levels_of_row(
  238. timeslot_y_levels,
  239. TimeSlotYLevelsCollectionData(
  240. row_index=row_index,
  241. expected_timeslot_index=expected_timeslot_index,
  242. last_timeslot=get_last_timeslot(allowed_time_slots),
  243. page=page,
  244. table=table,
  245. weekday_areas=weekday_areas,
  246. ),
  247. )
  248. def collect_timeslot_y_levels_of_row(
  249. timeslot_y_levels: dict[TimeSlot, YLevel],
  250. collection_data: TimeSlotYLevelsCollectionData,
  251. ) -> int:
  252. """
  253. Populates the passed and timeslot_y_levels dicts with the right
  254. `YLevel`'s by `TimeSlot` via side effects.
  255. Returns:
  256. int for the current expected `TimeSlot` index
  257. """
  258. logging.debug("row: %d, col: %d", collection_data.row_index, 0)
  259. row = collection_data.table.rows[collection_data.row_index]
  260. cell = row.cells[0]
  261. if cell is None:
  262. logging.warning("None Table cell found, not collecting YLevel of Row")
  263. return collection_data.expected_timeslot_index
  264. cell_text = collection_data.page.crop(
  265. (cell[0], cell[1], cell[2], cell[3])
  266. ).extract_text()
  267. target_timeslot = allowed_time_slots[collection_data.expected_timeslot_index]
  268. if not (
  269. target_timeslot.start_time in cell_text
  270. and target_timeslot.end_time in cell_text
  271. ):
  272. logging.warning("Unexpected TimeSlot found: '%s'", cell_text)
  273. return collection_data.expected_timeslot_index
  274. if target_timeslot == collection_data.last_timeslot:
  275. for weekday in Weekday:
  276. new_area = Area(
  277. x1=collection_data.weekday_areas[weekday].x1,
  278. y1=collection_data.weekday_areas[weekday].y1,
  279. x2=collection_data.weekday_areas[weekday].x2,
  280. y2=cell[3],
  281. )
  282. collection_data.weekday_areas[weekday] = new_area
  283. timeslot_y_levels[target_timeslot] = YLevel(y1=cell[1], y2=cell[3])
  284. return collection_data.expected_timeslot_index + 1
  285. def collect_weekday_areas(weekday_areas, page, row, row_index) -> None:
  286. """
  287. Populates the passed weekday_areas dict with the right
  288. `Area`'s by `Weekday` via side effects.
  289. """
  290. empty_start_found = False
  291. for column_index, cell in enumerate(row.cells):
  292. logging.debug("row: %d, col: %d", row_index, column_index)
  293. logging.debug(cell)
  294. if cell is None:
  295. logging.debug("None Table Cell Found")
  296. else:
  297. cell_text = page.crop((cell[0], cell[1], cell[2], cell[3])).extract_text()
  298. if not empty_start_found and len(cell_text) == 0:
  299. logging.debug("empty start found")
  300. empty_start_found = True
  301. weekday_enum: Weekday | None = get_weekday_from_text(cell_text)
  302. if weekday_enum:
  303. logging.debug("Weekday %s found", cell_text)
  304. weekday_areas[weekday_enum] = Area(
  305. x1=cell[0], y1=cell[3], x2=cell[2], y2=0
  306. )
  307. def get_last_timeslot(time_slots: list[TimeSlot]) -> TimeSlot:
  308. """
  309. Get the last timeslot a weekday can have.
  310. """
  311. if len(time_slots) == 0:
  312. raise RuntimeError("Cannot get the latest timeslot from an empty list")
  313. last_timeslot = time_slots[-1]
  314. logging.debug("last timeslot found: %s", last_timeslot)
  315. return last_timeslot
  316. def init_weekday_areas() -> dict[Weekday, Area]:
  317. """
  318. Initializes the weekday areas with zero-valued `Area`'s for each `Weekday`
  319. """
  320. weekday_areas: dict[Weekday, Area] = {}
  321. for day in Weekday:
  322. weekday_areas[day] = Area(x1=0, y1=0, x2=0, y2=0)
  323. return weekday_areas
  324. def select_main_table(page: Page, page_index: int) -> Table:
  325. """
  326. Selects the main table on the PDF Page. This should be the timetable.
  327. """
  328. found_tables = page.find_tables(CLASS_TIMETABLE_PDF_TABLE_SETTINGS)
  329. logging.debug(
  330. "amount of tables found on page %d: %d",
  331. page_index + 1,
  332. len(found_tables),
  333. )
  334. table = found_tables[get_usable_table_index(found_tables)]
  335. return table
  336. def collected_unmerged_time_entries_by_weekday(
  337. unmerged_time_entries_by_weekday: dict[Weekday, UnmergedTimeEntries],
  338. weekday_areas: dict[Weekday, Area],
  339. table: Table,
  340. page: Page,
  341. ) -> None:
  342. """
  343. Populates the passed unmerged_time_entries_by_weekday dict with the
  344. `UnmergedTimeEntries` by `Weekday` via side effects.
  345. """
  346. for weekday in Weekday:
  347. unmerged_time_entries_by_weekday[weekday] = UnmergedTimeEntries(
  348. cells=[], horizontal_lines=[]
  349. )
  350. target_area: Area = weekday_areas[weekday]
  351. logging.debug("target_area: %s", target_area)
  352. for row_index, row in enumerate(table.rows):
  353. for column_index, cell in enumerate(row.cells):
  354. if cell is None:
  355. logging.debug("None table cell found")
  356. continue
  357. logging.debug("row: %d, col: %d", row_index, column_index)
  358. logging.debug("cell: %s", cell)
  359. if (
  360. target_area.x1 <= cell[0]
  361. and target_area.y1 <= cell[1]
  362. and target_area.x2 >= cell[2]
  363. and target_area.y2 >= cell[3]
  364. ):
  365. unmerged_time_entries_by_weekday[weekday].cells.append(
  366. Area(x1=cell[0], y1=cell[1], x2=cell[2], y2=cell[3])
  367. )
  368. logging.debug("%s cell found", weekday)
  369. collect_horizontal_lines(
  370. unmerged_time_entries_by_weekday, page, target_area, weekday
  371. )
  372. def collect_horizontal_lines(
  373. unmerged_time_entries_by_weekday: dict[Weekday, UnmergedTimeEntries],
  374. page: Page,
  375. target_area: Area,
  376. weekday: Weekday,
  377. ) -> None:
  378. """
  379. Populates the passed unmerged_time_entries_by_weekday dict with the
  380. `horizontal_lines` of the `UnmergedTimeEntries` by the passed weekday
  381. via side effects. These horizontal Lines are timeslot seperator lines.
  382. """
  383. for line_found in page.lines:
  384. line_x1 = line_found["x0"]
  385. line_x2 = line_found["x1"]
  386. line_y1 = line_found["y0"]
  387. line_y2 = line_found["y1"]
  388. line_bottom = line_found["bottom"]
  389. # ignore non horizontal lines
  390. if line_y1 != line_y2:
  391. continue
  392. if target_area.x1 <= line_x1 and target_area.x2 >= line_x2:
  393. logging.debug("%s timeslot seperator line found", weekday)
  394. unmerged_time_entries_by_weekday[weekday].horizontal_lines.append(
  395. HorizontalLine(x1=line_x1, x2=line_x2, y=line_bottom)
  396. )
  397. def extract_data_from_class_pdf(
  398. input_filename: Path, num_of_jobs: int = 1
  399. ) -> list[ClassPdfExtractionPageData]:
  400. """
  401. Extracts all data from the specified Class Timetable PDF filename.
  402. Can run via multiple jobs.
  403. """
  404. logging.info("Starting extraction with %d jobs", num_of_jobs)
  405. num_pages: int = get_number_of_pdf_pages(input_filename)
  406. logging.info("Found %d pages to process", num_pages)
  407. processed_pages: list[RawClassPdfExtractionPageData] = process_pages_in_parallel(
  408. num_of_jobs, input_filename, num_pages
  409. )
  410. extraction_data: list[ClassPdfExtractionPageData] = process_metadata_sequentially(
  411. processed_pages
  412. )
  413. return extraction_data
  414. def process_metadata_sequentially(
  415. processed_pages: list[RawClassPdfExtractionPageData],
  416. ) -> list[ClassPdfExtractionPageData]:
  417. """
  418. Process the above table text into `PageMetadata`'s of the processed pages.
  419. """
  420. extraction_data: list[ClassPdfExtractionPageData] = []
  421. previous_page_metadata: list[PageMetadata] = []
  422. for processed_page in processed_pages:
  423. page_metadata = parse_above_table_text(
  424. processed_page.above_table_text, previous_page_metadata
  425. )
  426. previous_page_metadata.append(page_metadata)
  427. extraction_data.append(
  428. ClassPdfExtractionPageData(
  429. raw_extracted_modules=processed_page.raw_extracted_modules,
  430. page_metadata=page_metadata,
  431. )
  432. )
  433. return extraction_data
  434. def process_pages_in_parallel(
  435. num_of_jobs: int, input_filename: Path, num_of_pages: int
  436. ) -> list[RawClassPdfExtractionPageData]:
  437. """Extracts the pdf pages in parallel based on the number of jobs"""
  438. with Pool(processes=num_of_jobs) as pool:
  439. results = pool.starmap(
  440. process_page, [(input_filename, i) for i in range(num_of_pages)]
  441. )
  442. return results
  443. def get_number_of_pdf_pages(input_filename: Path) -> int:
  444. """Get the number of pdf pages using the pdfplumber library"""
  445. with pdfplumber.open(input_filename) as pdf:
  446. num_pages = len(pdf.pages)
  447. return num_pages
  448. def get_above_table_text(page: Page, table_y1: float) -> str:
  449. """
  450. Get the text above the timetable for metadata parsing
  451. """
  452. upper_region = page.crop((0, 0, page.width, table_y1))
  453. text_above_table = upper_region.extract_text()
  454. logging.debug("Text found above the table:")
  455. logging.debug(text_above_table)
  456. return text_above_table