table_extraction.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291
  1. import logging
  2. from pdfplumber.page import Page
  3. import pdfplumber
  4. from config import CLASS_PDF_TABLE_SETTINGS, ALLOWED_TIMESLOTS, CLASS_PDF_MIN_DIMENSIONS
  5. from .models import (
  6. Weekday,
  7. TimeSlot,
  8. YLevel,
  9. RawExtractedModule,
  10. UnmergedTimeEntries,
  11. Area,
  12. HorizontalLine,
  13. ClassPdfExtractionPageData,
  14. PageMetadata,
  15. )
  16. from .above_table_text import parse_above_table_text
  17. from .geometry import (
  18. get_timeslot_for_area,
  19. is_line_at_bottom,
  20. is_area_below,
  21. is_vertical_match,
  22. )
  23. from .img import is_mostly_white_area
  24. allowed_time_slots: list[TimeSlot] = [
  25. TimeSlot(*timeslot_tuple) for timeslot_tuple in ALLOWED_TIMESLOTS
  26. ]
  27. def get_weekday_from_text(text: str) -> Weekday | None:
  28. """
  29. Helper function that tries to get a Weekday from a string.
  30. Only accepts exact display name matches.
  31. """
  32. for weekday in Weekday:
  33. if weekday.display_name == text:
  34. return weekday
  35. return None
  36. def get_modules_from_weekday(
  37. weekday: Weekday,
  38. unmerged_time_entries: UnmergedTimeEntries,
  39. page: Page,
  40. timeslot_y_levels: dict[TimeSlot, YLevel],
  41. page_number: int,
  42. ) -> list[RawExtractedModule]:
  43. """
  44. Extracts the modules (raw text and start/end) of a weekday on a single pdf page
  45. """
  46. try:
  47. highest_y_level = timeslot_y_levels[allowed_time_slots[-1]].y2
  48. except KeyError:
  49. logging.warning("Highest allowed timeslot was not found. Trying lower one's.")
  50. for time_slot in allowed_time_slots[:-1]:
  51. try:
  52. highest_y_level = timeslot_y_levels[allowed_time_slots[-1]].y2
  53. except KeyError:
  54. continue
  55. finally:
  56. break
  57. modules = []
  58. while len(unmerged_time_entries.cells) > 0:
  59. area = unmerged_time_entries.cells.pop(0)
  60. if is_mostly_white_area(page, area):
  61. logging.debug("mostly white cell skipped")
  62. continue
  63. timeslot = get_timeslot_for_area(area, timeslot_y_levels)
  64. if timeslot is None:
  65. raise RuntimeError("Could not match TimeSlot to Cell Area")
  66. start_seconds = timeslot.start_seconds()
  67. line_at_bottom_found = False
  68. while not line_at_bottom_found:
  69. logging.debug("searching for line at bottom of: %s", area)
  70. logging.debug("line candidates:")
  71. for line in unmerged_time_entries.horizontal_lines:
  72. logging.debug("testing horizontal line: %s", line)
  73. if is_line_at_bottom(area, line, tolerance=20):
  74. line_at_bottom_found = True
  75. logging.debug("candidate line found")
  76. break
  77. else:
  78. if is_vertical_match(area.y2, highest_y_level):
  79. logging.debug("highest y level matched")
  80. break
  81. found_matching_next_cell_index = -1
  82. for index, potential_cell_below in enumerate(
  83. unmerged_time_entries.cells
  84. ):
  85. if is_area_below(potential_cell_below, area):
  86. found_matching_next_cell_index = index
  87. break
  88. else:
  89. raise RuntimeError(
  90. f"No matching cell below found to merge with on {weekday}"
  91. )
  92. logging.debug("vertically merging cells for %s", weekday)
  93. matched_area = unmerged_time_entries.cells.pop(
  94. found_matching_next_cell_index
  95. )
  96. logging.debug("matched cell area: %s", matched_area)
  97. area = Area(area.x1, area.y1, matched_area.x2, matched_area.y2)
  98. text = page.crop((area.x1, area.y1, area.x2, area.y2)).extract_text()
  99. timeslot = get_timeslot_for_area(area, timeslot_y_levels)
  100. if timeslot is None:
  101. raise RuntimeError("Could not match TimeSlot to Cell Area")
  102. end_seconds = timeslot.end_seconds()
  103. modules.append(
  104. RawExtractedModule(weekday, start_seconds, end_seconds, text, page_number)
  105. )
  106. return modules
  107. def extract_data_from_class_pdf(
  108. input_filename: str, lecturers_file=None
  109. ) -> list[ClassPdfExtractionPageData]:
  110. """
  111. Extracts all data from class timetable pdf's
  112. """
  113. extraction_data: list[ClassPdfExtractionPageData] = []
  114. previous_page_metadata: list[PageMetadata] = []
  115. unmerged_time_entries_by_weekday: dict[Weekday, UnmergedTimeEntries] = {}
  116. with pdfplumber.open(input_filename) as pdf:
  117. for page_index, page in enumerate(pdf.pages):
  118. weekday_areas: dict[Weekday, Area] = {}
  119. timeslot_y_levels: dict[TimeSlot, YLevel] = {}
  120. for day in Weekday:
  121. weekday_areas[day] = Area(0, 0, 0, 0)
  122. found_tables = page.find_tables(CLASS_PDF_TABLE_SETTINGS)
  123. logging.debug(
  124. "amount of tables found on page %d: %d",
  125. page_index + 1,
  126. len(found_tables),
  127. )
  128. usable_table_index: int = 0
  129. if len(found_tables) > 1:
  130. num_of_tables_with_at_least_min_dimensions: int = 0
  131. for table_index, table in enumerate(found_tables):
  132. x0, top, x1, bottom = table.bbox
  133. width = x1 - x0
  134. height = bottom - top
  135. logging.debug(
  136. "table num %d: width: %d, height: %d",
  137. table_index + 1,
  138. width,
  139. height,
  140. )
  141. if width >= CLASS_PDF_MIN_DIMENSIONS and height >= CLASS_PDF_MIN_DIMENSIONS:
  142. num_of_tables_with_at_least_min_dimensions += 1
  143. usable_table_index = table_index
  144. table = found_tables[usable_table_index]
  145. table_y1 = table.bbox[1]
  146. text_above_table = get_above_table_text(page, table_y1)
  147. empty_start_found = False
  148. # get weekday and timeslot areas
  149. expected_timeslot_index = 0
  150. for row_index, row in enumerate(table.rows):
  151. if row_index == 0:
  152. for column_index, cell in enumerate(row.cells):
  153. logging.debug("row: %d, col: %d", row_index, column_index)
  154. logging.debug(cell)
  155. if cell is None:
  156. logging.debug("None Table Cell Found")
  157. else:
  158. cell_text = page.crop(
  159. (cell[0], cell[1], cell[2], cell[3])
  160. ).extract_text()
  161. if not empty_start_found and len(cell_text) == 0:
  162. logging.debug("empty start found")
  163. empty_start_found = True
  164. weekday_enum = get_weekday_from_text(cell_text)
  165. if weekday_enum:
  166. logging.debug("Weekday %s found", cell_text)
  167. weekday_areas[weekday_enum] = Area(
  168. cell[0], cell[3], cell[2], 0
  169. )
  170. else:
  171. logging.debug("row: %d, col: %d", row_index, 0)
  172. cell = row.cells[0]
  173. if cell is None:
  174. logging.warning("Unexpected None Table Cell Found")
  175. else:
  176. cell_text = page.crop(
  177. (cell[0], cell[1], cell[2], cell[3])
  178. ).extract_text()
  179. target_timeslot = allowed_time_slots[expected_timeslot_index]
  180. if not (
  181. target_timeslot.start_time in cell_text
  182. and target_timeslot.end_time in cell_text
  183. ):
  184. logging.warning(
  185. "Unexpected Timeslot found: '%s'", cell_text
  186. )
  187. else:
  188. # assumes this is the last timeslot ever
  189. if target_timeslot == TimeSlot("20:30", "21:15"):
  190. for weekday in Weekday:
  191. new_area = Area(
  192. weekday_areas[weekday].x1,
  193. weekday_areas[weekday].y1,
  194. weekday_areas[weekday].x2,
  195. cell[3],
  196. )
  197. weekday_areas[weekday] = new_area
  198. timeslot_y_levels[target_timeslot] = YLevel(
  199. cell[1], cell[3]
  200. )
  201. expected_timeslot_index += 1
  202. for weekday in Weekday:
  203. unmerged_time_entries_by_weekday[weekday] = UnmergedTimeEntries([], [])
  204. target_area = weekday_areas[weekday]
  205. logging.debug("target_area: %s", target_area)
  206. for row_index, row in enumerate(table.rows):
  207. for column_index, cell in enumerate(row.cells):
  208. if cell is None:
  209. logging.debug("None table cell found")
  210. continue
  211. logging.debug("row: %d, col: %d", row_index, column_index)
  212. logging.debug("cell: %s", cell)
  213. if (
  214. target_area.x1 <= cell[0]
  215. and target_area.y1 <= cell[1]
  216. and target_area.x2 >= cell[2]
  217. and target_area.y2 >= cell[3]
  218. ):
  219. cell_dimensions = cell[0], cell[1], cell[2], cell[3]
  220. unmerged_time_entries_by_weekday[weekday].cells.append(
  221. Area(*cell_dimensions)
  222. )
  223. logging.debug("%s cell found", weekday)
  224. for line_found in page.lines:
  225. line_x1 = line_found["x0"]
  226. line_x2 = line_found["x1"]
  227. line_y1 = line_found["y0"]
  228. line_y2 = line_found["y1"]
  229. line_bottom = line_found["bottom"]
  230. # ignore non horizontal lines
  231. if line_y1 != line_y2:
  232. continue
  233. if target_area.x1 <= line_x1 and target_area.x2 >= line_x2:
  234. logging.debug("%s timeslot seperator line found", weekday)
  235. unmerged_time_entries_by_weekday[
  236. weekday
  237. ].horizontal_lines.append(
  238. HorizontalLine(line_x1, line_x2, line_bottom)
  239. )
  240. all_modules: list[RawExtractedModule] = []
  241. for weekday in Weekday:
  242. all_modules.extend(
  243. get_modules_from_weekday(
  244. weekday,
  245. unmerged_time_entries_by_weekday[weekday],
  246. page,
  247. timeslot_y_levels,
  248. page_index + 1,
  249. )
  250. )
  251. page_metadata = parse_above_table_text(
  252. text_above_table, previous_page_metadata
  253. )
  254. previous_page_metadata.append(page_metadata)
  255. extraction_data.append(
  256. ClassPdfExtractionPageData(all_modules, page_metadata)
  257. )
  258. return extraction_data
  259. def get_above_table_text(page: Page, table_y1: float) -> str:
  260. """
  261. Get the text above the timetable for metadata parsing
  262. """
  263. upper_region = page.crop((0, 0, page.width, table_y1))
  264. text_above_table = upper_region.extract_text()
  265. logging.debug("Text found above the table:")
  266. logging.debug(text_above_table)
  267. return text_above_table