table_extraction.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. import logging
  2. from multiprocessing import Pool
  3. from pathlib import Path
  4. import pdfplumber
  5. from pdfplumber.page import Page
  6. from pdfplumber.table import Table
  7. from config import (
  8. CLASS_TIMETABLE_PDF_TABLE_SETTINGS,
  9. ALLOWED_TIMESLOTS,
  10. CLASS_TIMETABLE_PDF_MIN_DIMENSIONS,
  11. )
  12. from .models import (
  13. Weekday,
  14. TimeSlot,
  15. YLevel,
  16. RawExtractedModule,
  17. UnmergedTimeEntries,
  18. Area,
  19. HorizontalLine,
  20. ClassPdfExtractionPageData,
  21. RawClassPdfExtractionPageData,
  22. PageMetadata,
  23. TimeSlotYLevelsCollectionData,
  24. )
  25. from .above_table_text import parse_above_table_text
  26. from .geometry import (
  27. get_timeslot_for_area,
  28. is_line_at_bottom,
  29. is_area_below,
  30. is_vertical_match,
  31. )
  32. from .img import is_mostly_white_area
  33. logger = logging.getLogger("modulplaner-backend.table_extraction")
  34. allowed_time_slots: list[TimeSlot] = [
  35. TimeSlot(start_time=timeslot_tuple[0], end_time=timeslot_tuple[1])
  36. for timeslot_tuple in ALLOWED_TIMESLOTS
  37. ]
  38. def find_next_cell_below_index(current_area: Area, cells: list[Area]) -> int:
  39. """
  40. Returns the index of the first cell directly below current_area, or -1 if none.
  41. """
  42. for index, cell in enumerate(cells):
  43. if is_area_below(cell, current_area):
  44. return index
  45. return -1
  46. def get_weekday_from_text(text: str) -> Weekday | None:
  47. """
  48. Helper function that tries to get a Weekday from a string.
  49. Only accepts exact display name matches.
  50. """
  51. for weekday in Weekday:
  52. if weekday.display_name == text:
  53. return weekday
  54. return None
  55. def merge_vertically_spanning_cells(
  56. initial_area: Area,
  57. remaining_cells: list[Area],
  58. horizontal_lines: list[HorizontalLine],
  59. highest_y: float,
  60. weekday: Weekday,
  61. ) -> Area:
  62. """
  63. Merges vertically adjacent cells until a bottom boundary (line or page end) is found.
  64. Mutates remaining_cells by removing used cells.
  65. Returns the final merged area.
  66. """
  67. current_area = initial_area
  68. while True:
  69. logger.debug(
  70. "Searching for bottom boundary of area: %s on %s", current_area, weekday
  71. )
  72. # case 1: horizontal line at the bottom of current area?
  73. if any(
  74. is_line_at_bottom(current_area, line, tolerance=20)
  75. for line in horizontal_lines
  76. ):
  77. logger.debug("Bottom boundary found: horizontal line")
  78. return current_area
  79. # case 2: reached the bottom of the timetable?
  80. if is_vertical_match(current_area.y2, highest_y):
  81. logger.debug("Bottom boundary found: highest y level")
  82. return current_area
  83. # case 3: find and merge with the next cell below
  84. next_cell_index = find_next_cell_below_index(current_area, remaining_cells)
  85. if next_cell_index == -1:
  86. raise RuntimeError(
  87. f"No bottom boundary or next cell found for module on {weekday}"
  88. )
  89. next_cell = remaining_cells.pop(next_cell_index)
  90. logger.debug("Vertically merging with cell below: %s", next_cell)
  91. current_area = Area(
  92. x1=current_area.x1,
  93. y1=current_area.y1,
  94. x2=next_cell.x2, # use the wider x2 in case of a slight misalignment
  95. y2=next_cell.y2,
  96. )
  97. def get_modules_from_weekday(
  98. weekday: Weekday,
  99. unmerged_time_entries: UnmergedTimeEntries,
  100. page: Page,
  101. timeslot_y_levels: dict[TimeSlot, YLevel],
  102. page_number: int,
  103. ) -> list[RawExtractedModule]:
  104. """
  105. Extracts the modules (raw text and start/end) of a weekday on a single pdf page.
  106. """
  107. cells = unmerged_time_entries.cells[:]
  108. horizontal_lines = unmerged_time_entries.horizontal_lines
  109. highest_y: float = get_highest_y_level(timeslot_y_levels, page_number)
  110. modules: list[RawExtractedModule] = []
  111. while cells:
  112. initial_area = cells.pop(0)
  113. if is_mostly_white_area(page, initial_area):
  114. logger.debug("mostly white cell skipped")
  115. continue
  116. merged_area: Area = merge_vertically_spanning_cells(
  117. initial_area, cells, horizontal_lines, highest_y, weekday
  118. )
  119. start_timeslot = get_timeslot_for_area(initial_area, timeslot_y_levels)
  120. if start_timeslot is None:
  121. raise RuntimeError(
  122. f"Could not determine start timeslot for module on {weekday}"
  123. )
  124. end_timeslot = get_timeslot_for_area(merged_area, timeslot_y_levels)
  125. if end_timeslot is None:
  126. raise RuntimeError(
  127. f"Could not determine end timeslot for merged module on {weekday}"
  128. )
  129. text: str = (
  130. page.crop(
  131. (merged_area.x1, merged_area.y1, merged_area.x2, merged_area.y2)
  132. ).extract_text()
  133. or "" # do not raise error when extraction returns None for now
  134. )
  135. modules.append(
  136. RawExtractedModule(
  137. weekday=weekday,
  138. start_seconds=start_timeslot.start_seconds(),
  139. end_seconds=end_timeslot.end_seconds(),
  140. text=text,
  141. source_page_number=page_number,
  142. )
  143. )
  144. return modules
  145. def get_highest_y_level(timeslot_y_levels, page_number) -> float:
  146. """
  147. Gets the highest `YLevel` of all `TimeSlot`'s.
  148. Raises:
  149. RuntimeError: If no the highest allowed `TimeSlot` was not mapped to a `YLevel`
  150. """
  151. try:
  152. highest_y_level = timeslot_y_levels[allowed_time_slots[-1]].y2
  153. except KeyError as e:
  154. logger.debug("timeslot_y_levels on page %d %s", page_number, timeslot_y_levels)
  155. raise RuntimeError("Could not get YLevel for latest TimeSlot") from e
  156. return highest_y_level
  157. def get_usable_table_index(found_tables: list) -> int:
  158. """
  159. Identifies the index of the timetable on the page based on dimensions.
  160. Raises:
  161. RuntimeError: If no or multiple tables matching the minimum dimensions are found.
  162. """
  163. if not found_tables:
  164. raise RuntimeError("No matching tables found.")
  165. valid_indices = []
  166. for index, table in enumerate(found_tables):
  167. x0, top, x1, bottom = table.bbox
  168. width = x1 - x0
  169. height = bottom - top
  170. logger.debug(
  171. "table num %d: width: %d, height: %d",
  172. index + 1,
  173. width,
  174. height,
  175. )
  176. if (
  177. width >= CLASS_TIMETABLE_PDF_MIN_DIMENSIONS
  178. and height >= CLASS_TIMETABLE_PDF_MIN_DIMENSIONS
  179. ):
  180. valid_indices.append(index)
  181. if len(valid_indices) > 1:
  182. raise RuntimeError(
  183. f"Found {len(valid_indices)} valid tables, expected at most 1. "
  184. "Ambiguous table selection."
  185. )
  186. if len(valid_indices) == 1:
  187. return valid_indices[0]
  188. return 0
  189. def process_page(
  190. input_filename: Path, page_index: int
  191. ) -> RawClassPdfExtractionPageData:
  192. """
  193. Process a single page of the PDF to extract modules and header text.
  194. Designed to be run in a separate process.
  195. """
  196. with pdfplumber.open(input_filename) as pdf:
  197. page = pdf.pages[page_index]
  198. timeslot_y_levels: dict[TimeSlot, YLevel] = {}
  199. unmerged_time_entries_by_weekday: dict[Weekday, UnmergedTimeEntries] = {}
  200. weekday_areas: dict[Weekday, Area] = init_weekday_areas()
  201. table: Table = select_main_table(page, page_index)
  202. text_above_table: str = get_above_table_text(page, table_y1=table.bbox[1])
  203. collect_weekday_areas_and_timeslot_y_levels(
  204. weekday_areas, timeslot_y_levels, page, table
  205. )
  206. collected_unmerged_time_entries_by_weekday(
  207. unmerged_time_entries_by_weekday, weekday_areas, table, page
  208. )
  209. all_modules: list[RawExtractedModule] = []
  210. for weekday in Weekday:
  211. all_modules.extend(
  212. get_modules_from_weekday(
  213. weekday,
  214. unmerged_time_entries_by_weekday[weekday],
  215. page,
  216. timeslot_y_levels,
  217. page_index + 1,
  218. )
  219. )
  220. return RawClassPdfExtractionPageData(
  221. raw_extracted_modules=all_modules, above_table_text=text_above_table
  222. )
  223. def collect_weekday_areas_and_timeslot_y_levels(
  224. weekday_areas: dict[Weekday, Area],
  225. timeslot_y_levels: dict[TimeSlot, YLevel],
  226. page: Page,
  227. table: Table,
  228. ) -> None:
  229. """
  230. Populates the passed weekday_areas and timeslot_y_levels dicts with the right
  231. `Area`'s by `Weekday` and `YLevel` by TimeSlot respectively, via side effects.
  232. """
  233. expected_timeslot_index = 0
  234. for row_index, row in enumerate(table.rows):
  235. if row_index == 0:
  236. collect_weekday_areas(weekday_areas, page, row, row_index)
  237. else:
  238. expected_timeslot_index: int = collect_timeslot_y_levels_of_row(
  239. timeslot_y_levels,
  240. TimeSlotYLevelsCollectionData(
  241. row_index=row_index,
  242. expected_timeslot_index=expected_timeslot_index,
  243. last_timeslot=get_last_timeslot(allowed_time_slots),
  244. page=page,
  245. table=table,
  246. weekday_areas=weekday_areas,
  247. ),
  248. )
  249. def collect_timeslot_y_levels_of_row(
  250. timeslot_y_levels: dict[TimeSlot, YLevel],
  251. collection_data: TimeSlotYLevelsCollectionData,
  252. ) -> int:
  253. """
  254. Populates the passed and timeslot_y_levels dicts with the right
  255. `YLevel`'s by `TimeSlot` via side effects.
  256. Returns:
  257. int for the current expected `TimeSlot` index
  258. """
  259. logger.debug("row: %d, col: %d", collection_data.row_index, 0)
  260. row = collection_data.table.rows[collection_data.row_index]
  261. cell = row.cells[0]
  262. if cell is None:
  263. logger.warning("None Table cell found, not collecting YLevel of Row")
  264. return collection_data.expected_timeslot_index
  265. cell_text = collection_data.page.crop(
  266. (cell[0], cell[1], cell[2], cell[3])
  267. ).extract_text()
  268. target_timeslot = allowed_time_slots[collection_data.expected_timeslot_index]
  269. if not (
  270. target_timeslot.start_time in cell_text
  271. and target_timeslot.end_time in cell_text
  272. ):
  273. logger.warning("Unexpected TimeSlot found: '%s'", cell_text)
  274. return collection_data.expected_timeslot_index
  275. if target_timeslot == collection_data.last_timeslot:
  276. for weekday in Weekday:
  277. new_area = Area(
  278. x1=collection_data.weekday_areas[weekday].x1,
  279. y1=collection_data.weekday_areas[weekday].y1,
  280. x2=collection_data.weekday_areas[weekday].x2,
  281. y2=cell[3],
  282. )
  283. collection_data.weekday_areas[weekday] = new_area
  284. timeslot_y_levels[target_timeslot] = YLevel(y1=cell[1], y2=cell[3])
  285. return collection_data.expected_timeslot_index + 1
  286. def collect_weekday_areas(weekday_areas, page, row, row_index) -> None:
  287. """
  288. Populates the passed weekday_areas dict with the right
  289. `Area`'s by `Weekday` via side effects.
  290. """
  291. empty_start_found = False
  292. for column_index, cell in enumerate(row.cells):
  293. logger.debug("row: %d, col: %d", row_index, column_index)
  294. logger.debug(cell)
  295. if cell is None:
  296. logger.debug("None Table Cell Found")
  297. else:
  298. cell_text = page.crop((cell[0], cell[1], cell[2], cell[3])).extract_text()
  299. if not empty_start_found and len(cell_text) == 0:
  300. logger.debug("empty start found")
  301. empty_start_found = True
  302. weekday_enum: Weekday | None = get_weekday_from_text(cell_text)
  303. if weekday_enum:
  304. logger.debug("Weekday %s found", cell_text)
  305. weekday_areas[weekday_enum] = Area(
  306. x1=cell[0], y1=cell[3], x2=cell[2], y2=0
  307. )
  308. def get_last_timeslot(time_slots: list[TimeSlot]) -> TimeSlot:
  309. """
  310. Get the last timeslot a weekday can have.
  311. """
  312. if len(time_slots) == 0:
  313. raise RuntimeError("Cannot get the latest timeslot from an empty list")
  314. last_timeslot = time_slots[-1]
  315. logger.debug("last timeslot found: %s", last_timeslot)
  316. return last_timeslot
  317. def init_weekday_areas() -> dict[Weekday, Area]:
  318. """
  319. Initializes the weekday areas with zero-valued `Area`'s for each `Weekday`
  320. """
  321. weekday_areas: dict[Weekday, Area] = {}
  322. for day in Weekday:
  323. weekday_areas[day] = Area(x1=0, y1=0, x2=0, y2=0)
  324. return weekday_areas
  325. def select_main_table(page: Page, page_index: int) -> Table:
  326. """
  327. Selects the main table on the PDF Page. This should be the timetable.
  328. """
  329. found_tables = page.find_tables(CLASS_TIMETABLE_PDF_TABLE_SETTINGS)
  330. logger.debug(
  331. "amount of tables found on page %d: %d",
  332. page_index + 1,
  333. len(found_tables),
  334. )
  335. table = found_tables[get_usable_table_index(found_tables)]
  336. return table
  337. def collected_unmerged_time_entries_by_weekday(
  338. unmerged_time_entries_by_weekday: dict[Weekday, UnmergedTimeEntries],
  339. weekday_areas: dict[Weekday, Area],
  340. table: Table,
  341. page: Page,
  342. ) -> None:
  343. """
  344. Populates the passed unmerged_time_entries_by_weekday dict with the
  345. `UnmergedTimeEntries` by `Weekday` via side effects.
  346. """
  347. for weekday in Weekday:
  348. unmerged_time_entries_by_weekday[weekday] = UnmergedTimeEntries(
  349. cells=[], horizontal_lines=[]
  350. )
  351. target_area: Area = weekday_areas[weekday]
  352. logger.debug("target_area: %s", target_area)
  353. for row_index, row in enumerate(table.rows):
  354. for column_index, cell in enumerate(row.cells):
  355. if cell is None:
  356. logger.debug("None table cell found")
  357. continue
  358. logger.debug("row: %d, col: %d", row_index, column_index)
  359. logger.debug("cell: %s", cell)
  360. if (
  361. target_area.x1 <= cell[0]
  362. and target_area.y1 <= cell[1]
  363. and target_area.x2 >= cell[2]
  364. and target_area.y2 >= cell[3]
  365. ):
  366. unmerged_time_entries_by_weekday[weekday].cells.append(
  367. Area(x1=cell[0], y1=cell[1], x2=cell[2], y2=cell[3])
  368. )
  369. logger.debug("%s cell found", weekday)
  370. collect_horizontal_lines(
  371. unmerged_time_entries_by_weekday, page, target_area, weekday
  372. )
  373. def collect_horizontal_lines(
  374. unmerged_time_entries_by_weekday: dict[Weekday, UnmergedTimeEntries],
  375. page: Page,
  376. target_area: Area,
  377. weekday: Weekday,
  378. ) -> None:
  379. """
  380. Populates the passed unmerged_time_entries_by_weekday dict with the
  381. `horizontal_lines` of the `UnmergedTimeEntries` by the passed weekday
  382. via side effects. These horizontal Lines are timeslot seperator lines.
  383. """
  384. for line_found in page.lines:
  385. line_x1 = line_found["x0"]
  386. line_x2 = line_found["x1"]
  387. line_y1 = line_found["y0"]
  388. line_y2 = line_found["y1"]
  389. line_bottom = line_found["bottom"]
  390. # ignore non horizontal lines
  391. if line_y1 != line_y2:
  392. continue
  393. if target_area.x1 <= line_x1 and target_area.x2 >= line_x2:
  394. logger.debug("%s timeslot seperator line found", weekday)
  395. unmerged_time_entries_by_weekday[weekday].horizontal_lines.append(
  396. HorizontalLine(x1=line_x1, x2=line_x2, y=line_bottom)
  397. )
  398. def extract_data_from_class_pdf(
  399. input_filename: Path, num_of_jobs: int = 1
  400. ) -> list[ClassPdfExtractionPageData]:
  401. """
  402. Extracts all data from the specified Class Timetable PDF filename.
  403. Can run via multiple jobs.
  404. """
  405. logger.info("Starting extraction with %d jobs", num_of_jobs)
  406. num_pages: int = get_number_of_pdf_pages(input_filename)
  407. logger.info("Found %d pages to process", num_pages)
  408. processed_pages: list[RawClassPdfExtractionPageData] = process_pages_in_parallel(
  409. num_of_jobs, input_filename, num_pages
  410. )
  411. extraction_data: list[ClassPdfExtractionPageData] = process_metadata_sequentially(
  412. processed_pages
  413. )
  414. return extraction_data
  415. def process_metadata_sequentially(
  416. processed_pages: list[RawClassPdfExtractionPageData],
  417. ) -> list[ClassPdfExtractionPageData]:
  418. """
  419. Process the above table text into `PageMetadata`'s of the processed pages.
  420. """
  421. extraction_data: list[ClassPdfExtractionPageData] = []
  422. previous_page_metadata: list[PageMetadata] = []
  423. for processed_page in processed_pages:
  424. page_metadata = parse_above_table_text(
  425. processed_page.above_table_text, previous_page_metadata
  426. )
  427. previous_page_metadata.append(page_metadata)
  428. extraction_data.append(
  429. ClassPdfExtractionPageData(
  430. raw_extracted_modules=processed_page.raw_extracted_modules,
  431. page_metadata=page_metadata,
  432. )
  433. )
  434. return extraction_data
  435. def process_pages_in_parallel(
  436. num_of_jobs: int, input_filename: Path, num_of_pages: int
  437. ) -> list[RawClassPdfExtractionPageData]:
  438. """Extracts the pdf pages in parallel based on the number of jobs"""
  439. with Pool(processes=num_of_jobs) as pool:
  440. results = pool.starmap(
  441. process_page, [(input_filename, i) for i in range(num_of_pages)]
  442. )
  443. return results
  444. def get_number_of_pdf_pages(input_filename: Path) -> int:
  445. """Get the number of pdf pages using the pdfplumber library"""
  446. with pdfplumber.open(input_filename) as pdf:
  447. num_pages = len(pdf.pages)
  448. return num_pages
  449. def get_above_table_text(page: Page, table_y1: float) -> str:
  450. """
  451. Get the text above the timetable for metadata parsing
  452. """
  453. upper_region = page.crop((0, 0, page.width, table_y1))
  454. text_above_table = upper_region.extract_text()
  455. logger.debug("Text found above the table:")
  456. logger.debug(text_above_table)
  457. return text_above_table