table_extraction.py 11 KB


  1. import logging
  2. from pdfplumber.page import Page
  3. import pdfplumber
  4. from config import TABLE_SETTINGS, ALLOWED_TIMESLOTS
  5. from .models import (
  6. Weekday,
  7. TimeSlot,
  8. YLevel,
  9. RawExtractedModule,
  10. UnmergedTimeEntries,
  11. Area,
  12. HorizontalLine,
  13. ClassPdfExtractionPageData,
  14. PageMetadata,
  15. )
  16. from .above_table_text import parse_above_table_text
  17. from .geometry import (
  18. get_timeslot_for_area,
  19. is_line_at_bottom,
  20. is_area_below,
  21. is_vertical_match,
  22. )
  23. from .img import is_mostly_white_area
  24. allowed_time_slots: list[TimeSlot] = [
  25. TimeSlot(*timeslot_tuple) for timeslot_tuple in ALLOWED_TIMESLOTS
  26. ]
  27. def get_weekday_from_text(text: str) -> Weekday | None:
  28. """
  29. Helper function that tries to get a Weekday from a string.
  30. Only accepts exact display name matches.
  31. """
  32. for weekday in Weekday:
  33. if weekday.display_name == text:
  34. return weekday
  35. return None
  36. def get_modules_from_weekday(
  37. weekday: Weekday,
  38. unmerged_time_entries: UnmergedTimeEntries,
  39. page: Page,
  40. timeslot_y_levels: dict[TimeSlot, YLevel],
  41. page_number: int,
  42. ) -> list[RawExtractedModule]:
  43. """
  44. Extracts the modules (raw text and start/end) of a weekday on a single pdf page
  45. """
  46. highest_y_level = timeslot_y_levels[allowed_time_slots[-1]].y2
  47. modules = []
  48. while len(unmerged_time_entries.cells) > 0:
  49. area = unmerged_time_entries.cells.pop(0)
  50. if is_mostly_white_area(page, area):
  51. logging.debug("mostly white cell skipped")
  52. continue
  53. timeslot = get_timeslot_for_area(area, timeslot_y_levels)
  54. if timeslot is None:
  55. raise RuntimeError("Could not match TimeSlot to Cell Area")
  56. start_seconds = timeslot.start_seconds()
  57. line_at_bottom_found = False
  58. while not line_at_bottom_found:
  59. logging.debug("searching for line at bottom of: %s", area)
  60. logging.debug("line candidates:")
  61. for line in unmerged_time_entries.horizontal_lines:
  62. logging.debug("testing horizontal line: %s", line)
  63. if is_line_at_bottom(area, line, tolerance=20):
  64. line_at_bottom_found = True
  65. logging.debug("candidate line found")
  66. break
  67. else:
  68. if is_vertical_match(area.y2, highest_y_level):
  69. logging.debug("highest y level matched")
  70. break
  71. found_matching_next_cell_index = -1
  72. for index, potential_cell_below in enumerate(
  73. unmerged_time_entries.cells
  74. ):
  75. if is_area_below(potential_cell_below, area):
  76. found_matching_next_cell_index = index
  77. break
  78. else:
  79. raise RuntimeError(
  80. f"No matching cell below found to merge with on {weekday}"
  81. )
  82. logging.debug("vertically merging cells for %s", weekday)
  83. matched_area = unmerged_time_entries.cells.pop(
  84. found_matching_next_cell_index
  85. )
  86. logging.debug("matched cell area: %s", matched_area)
  87. area = Area(area.x1, area.y1, matched_area.x2, matched_area.y2)
  88. text = page.crop((area.x1, area.y1, area.x2, area.y2)).extract_text()
  89. timeslot = get_timeslot_for_area(area, timeslot_y_levels)
  90. if timeslot is None:
  91. raise RuntimeError("Could not match TimeSlot to Cell Area")
  92. end_seconds = timeslot.end_seconds()
  93. modules.append(
  94. RawExtractedModule(weekday, start_seconds, end_seconds, text, page_number)
  95. )
  96. return modules
  97. def extract_data_from_class_pdf(input_filename: str) -> list[ClassPdfExtractionPageData]:
  98. """
  99. Extracts all data from class timetable pdf's
  100. """
  101. extraction_data: list[ClassPdfExtractionPageData] = []
  102. previous_page_metadata: list[PageMetadata] = []
  103. unmerged_time_entries_by_weekday: dict[Weekday, UnmergedTimeEntries] = {}
  104. with pdfplumber.open(input_filename) as pdf:
  105. for page_index, page in enumerate(pdf.pages):
  106. weekday_areas: dict[Weekday, Area] = {}
  107. timeslot_y_levels: dict[TimeSlot, YLevel] = {}
  108. for day in Weekday:
  109. weekday_areas[day] = Area(0, 0, 0, 0)
  110. found_tables = page.find_tables(TABLE_SETTINGS)
  111. logging.debug(
  112. "amount of tables found on page %d: %d",
  113. page_index + 1,
  114. len(found_tables),
  115. )
  116. table = found_tables[0]
  117. table_y1 = table.bbox[1]
  118. text_above_table = get_above_table_text(page, table_y1)
  119. empty_start_found = False
  120. # get weekday and timeslot areas
  121. expected_timeslot_index = 0
  122. for row_index, row in enumerate(table.rows):
  123. if row_index == 0:
  124. for column_index, cell in enumerate(row.cells):
  125. logging.debug("row: %d, col: %d", row_index, column_index)
  126. logging.debug(cell)
  127. if cell is None:
  128. logging.debug("None Table Cell Found")
  129. else:
  130. cell_text = page.crop(
  131. (cell[0], cell[1], cell[2], cell[3])
  132. ).extract_text()
  133. if not empty_start_found and len(cell_text) == 0:
  134. logging.debug("empty start found")
  135. empty_start_found = True
  136. weekday_enum = get_weekday_from_text(cell_text)
  137. if weekday_enum:
  138. logging.debug("Weekday %s found", cell_text)
  139. weekday_areas[weekday_enum] = Area(
  140. cell[0], cell[3], cell[2], 0
  141. )
  142. else:
  143. logging.debug("row: %d, col: %d", row_index, 0)
  144. cell = row.cells[0]
  145. if cell is None:
  146. logging.warning("Unexpected None Table Cell Found")
  147. else:
  148. cell_text = page.crop(
  149. (cell[0], cell[1], cell[2], cell[3])
  150. ).extract_text()
  151. target_timeslot = allowed_time_slots[expected_timeslot_index]
  152. if not (
  153. target_timeslot.start_time in cell_text
  154. and target_timeslot.end_time in cell_text
  155. ):
  156. logging.warning(
  157. "Unexpected Timeslot found: '%s'", cell_text
  158. )
  159. else:
  160. # assumes this is the last timeslot ever
  161. if target_timeslot == TimeSlot("20:30", "21:15"):
  162. for weekday in Weekday:
  163. new_area = Area(
  164. weekday_areas[weekday].x1,
  165. weekday_areas[weekday].y1,
  166. weekday_areas[weekday].x2,
  167. cell[3],
  168. )
  169. weekday_areas[weekday] = new_area
  170. timeslot_y_levels[target_timeslot] = YLevel(
  171. cell[1], cell[3]
  172. )
  173. expected_timeslot_index += 1
  174. for weekday in Weekday:
  175. unmerged_time_entries_by_weekday[weekday] = UnmergedTimeEntries([], [])
  176. target_area = weekday_areas[weekday]
  177. logging.debug("target_area: %s", target_area)
  178. for row_index, row in enumerate(table.rows):
  179. for column_index, cell in enumerate(row.cells):
  180. if cell is None:
  181. logging.debug("None table cell found")
  182. continue
  183. logging.debug("row: %d, col: %d", row_index, column_index)
  184. logging.debug("cell: %s", cell)
  185. if (
  186. target_area.x1 <= cell[0]
  187. and target_area.y1 <= cell[1]
  188. and target_area.x2 >= cell[2]
  189. and target_area.y2 >= cell[3]
  190. ):
  191. cell_dimensions = cell[0], cell[1], cell[2], cell[3]
  192. unmerged_time_entries_by_weekday[weekday].cells.append(
  193. Area(*cell_dimensions)
  194. )
  195. logging.debug("%s cell found", weekday)
  196. for line_found in page.lines:
  197. line_x1 = line_found["x0"]
  198. line_x2 = line_found["x1"]
  199. line_y1 = line_found["y0"]
  200. line_y2 = line_found["y1"]
  201. line_bottom = line_found["bottom"]
  202. # ignore non horizontal lines
  203. if line_y1 != line_y2:
  204. continue
  205. if target_area.x1 <= line_x1 and target_area.x2 >= line_x2:
  206. logging.debug("%s timeslot seperator line found", weekday)
  207. unmerged_time_entries_by_weekday[
  208. weekday
  209. ].horizontal_lines.append(
  210. HorizontalLine(line_x1, line_x2, line_bottom)
  211. )
  212. all_modules: list[RawExtractedModule] = []
  213. for weekday in Weekday:
  214. all_modules.extend(
  215. get_modules_from_weekday(
  216. weekday,
  217. unmerged_time_entries_by_weekday[weekday],
  218. page,
  219. timeslot_y_levels,
  220. page_index + 1,
  221. )
  222. )
  223. page_metadata = parse_above_table_text(
  224. text_above_table, previous_page_metadata
  225. )
  226. previous_page_metadata.append(page_metadata)
  227. extraction_data.append(
  228. ClassPdfExtractionPageData(all_modules, page_metadata)
  229. )
  230. return extraction_data
  231. def get_above_table_text(page: Page, table_y1: float) -> str:
  232. """
  233. Get the text above the timetable for metadata parsing
  234. """
  235. upper_region = page.crop((0, 0, page.width, table_y1))
  236. text_above_table = upper_region.extract_text()
  237. logging.debug("Text found above the table:")
  238. logging.debug(text_above_table)
  239. return text_above_table