table_extraction.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. import logging
  2. from pdfplumber.page import Page
  3. import pdfplumber
  4. from config import CLASS_PDF_TABLE_SETTINGS, ALLOWED_TIMESLOTS, CLASS_PDF_MIN_DIMENSIONS
  5. from .models import (
  6. Weekday,
  7. TimeSlot,
  8. YLevel,
  9. RawExtractedModule,
  10. UnmergedTimeEntries,
  11. Area,
  12. HorizontalLine,
  13. ClassPdfExtractionPageData,
  14. PageMetadata,
  15. )
  16. from .above_table_text import parse_above_table_text
  17. from .geometry import (
  18. get_timeslot_for_area,
  19. is_line_at_bottom,
  20. is_area_below,
  21. is_vertical_match,
  22. )
  23. from .img import is_mostly_white_area
  24. allowed_time_slots: list[TimeSlot] = [
  25. TimeSlot(start_time=timeslot_tuple[0], end_time=timeslot_tuple[1])
  26. for timeslot_tuple in ALLOWED_TIMESLOTS
  27. ]
  28. def get_weekday_from_text(text: str) -> Weekday | None:
  29. """
  30. Helper function that tries to get a Weekday from a string.
  31. Only accepts exact display name matches.
  32. """
  33. for weekday in Weekday:
  34. if weekday.display_name == text:
  35. return weekday
  36. return None
  37. def get_modules_from_weekday(
  38. weekday: Weekday,
  39. unmerged_time_entries: UnmergedTimeEntries,
  40. page: Page,
  41. timeslot_y_levels: dict[TimeSlot, YLevel],
  42. page_number: int,
  43. ) -> list[RawExtractedModule]:
  44. """
  45. Extracts the modules (raw text and start/end) of a weekday on a single pdf page
  46. """
  47. try:
  48. highest_y_level = timeslot_y_levels[allowed_time_slots[-1]].y2
  49. except KeyError:
  50. logging.warning("Highest allowed timeslot was not found. Trying lower one's.")
  51. for time_slot in allowed_time_slots[:-1]:
  52. try:
  53. highest_y_level = timeslot_y_levels[allowed_time_slots[-1]].y2
  54. except KeyError:
  55. continue
  56. finally:
  57. break
  58. modules = []
  59. while len(unmerged_time_entries.cells) > 0:
  60. area = unmerged_time_entries.cells.pop(0)
  61. if is_mostly_white_area(page, area):
  62. logging.debug("mostly white cell skipped")
  63. continue
  64. timeslot = get_timeslot_for_area(area, timeslot_y_levels)
  65. if timeslot is None:
  66. raise RuntimeError("Could not match TimeSlot to Cell Area")
  67. start_seconds = timeslot.start_seconds()
  68. line_at_bottom_found = False
  69. while not line_at_bottom_found:
  70. logging.debug("searching for line at bottom of: %s", area)
  71. logging.debug("line candidates:")
  72. for line in unmerged_time_entries.horizontal_lines:
  73. logging.debug("testing horizontal line: %s", line)
  74. if is_line_at_bottom(area, line, tolerance=20):
  75. line_at_bottom_found = True
  76. logging.debug("candidate line found")
  77. break
  78. else:
  79. if is_vertical_match(area.y2, highest_y_level):
  80. logging.debug("highest y level matched")
  81. break
  82. found_matching_next_cell_index = -1
  83. for index, potential_cell_below in enumerate(
  84. unmerged_time_entries.cells
  85. ):
  86. if is_area_below(potential_cell_below, area):
  87. found_matching_next_cell_index = index
  88. break
  89. else:
  90. raise RuntimeError(
  91. f"No matching cell below found to merge with on {weekday}"
  92. )
  93. logging.debug("vertically merging cells for %s", weekday)
  94. matched_area = unmerged_time_entries.cells.pop(
  95. found_matching_next_cell_index
  96. )
  97. logging.debug("matched cell area: %s", matched_area)
  98. area = Area(
  99. x1=area.x1, y1=area.y1, x2=matched_area.x2, y2=matched_area.y2
  100. )
  101. text = page.crop((area.x1, area.y1, area.x2, area.y2)).extract_text()
  102. timeslot = get_timeslot_for_area(area, timeslot_y_levels)
  103. if timeslot is None:
  104. raise RuntimeError("Could not match TimeSlot to Cell Area")
  105. end_seconds = timeslot.end_seconds()
  106. modules.append(
  107. RawExtractedModule(
  108. weekday=weekday,
  109. start_seconds=start_seconds,
  110. end_seconds=end_seconds,
  111. text=text,
  112. source_page_number=page_number,
  113. )
  114. )
  115. return modules
  116. def extract_data_from_class_pdf(
  117. input_filename: str, lecturers_file=None
  118. ) -> list[ClassPdfExtractionPageData]:
  119. """
  120. Extracts all data from class timetable pdf's
  121. """
  122. extraction_data: list[ClassPdfExtractionPageData] = []
  123. previous_page_metadata: list[PageMetadata] = []
  124. unmerged_time_entries_by_weekday: dict[Weekday, UnmergedTimeEntries] = {}
  125. with pdfplumber.open(input_filename) as pdf:
  126. for page_index, page in enumerate(pdf.pages):
  127. weekday_areas: dict[Weekday, Area] = {}
  128. timeslot_y_levels: dict[TimeSlot, YLevel] = {}
  129. for day in Weekday:
  130. weekday_areas[day] = Area(x1=0, y1=0, x2=0, y2=0)
  131. found_tables = page.find_tables(CLASS_PDF_TABLE_SETTINGS)
  132. logging.debug(
  133. "amount of tables found on page %d: %d",
  134. page_index + 1,
  135. len(found_tables),
  136. )
  137. usable_table_index: int = 0
  138. if len(found_tables) > 1:
  139. num_of_tables_with_at_least_min_dimensions: int = 0
  140. for table_index, table in enumerate(found_tables):
  141. x0, top, x1, bottom = table.bbox
  142. width = x1 - x0
  143. height = bottom - top
  144. logging.debug(
  145. "table num %d: width: %d, height: %d",
  146. table_index + 1,
  147. width,
  148. height,
  149. )
  150. if width >= CLASS_PDF_MIN_DIMENSIONS and height >= CLASS_PDF_MIN_DIMENSIONS:
  151. num_of_tables_with_at_least_min_dimensions += 1
  152. usable_table_index = table_index
  153. table = found_tables[usable_table_index]
  154. table_y1 = table.bbox[1]
  155. text_above_table = get_above_table_text(page, table_y1)
  156. empty_start_found = False
  157. # get weekday and timeslot areas
  158. expected_timeslot_index = 0
  159. for row_index, row in enumerate(table.rows):
  160. if row_index == 0:
  161. for column_index, cell in enumerate(row.cells):
  162. logging.debug("row: %d, col: %d", row_index, column_index)
  163. logging.debug(cell)
  164. if cell is None:
  165. logging.debug("None Table Cell Found")
  166. else:
  167. cell_text = page.crop(
  168. (cell[0], cell[1], cell[2], cell[3])
  169. ).extract_text()
  170. if not empty_start_found and len(cell_text) == 0:
  171. logging.debug("empty start found")
  172. empty_start_found = True
  173. weekday_enum = get_weekday_from_text(cell_text)
  174. if weekday_enum:
  175. logging.debug("Weekday %s found", cell_text)
  176. weekday_areas[weekday_enum] = Area(
  177. x1=cell[0], y1=cell[3], x2=cell[2], y2=0
  178. )
  179. else:
  180. logging.debug("row: %d, col: %d", row_index, 0)
  181. cell = row.cells[0]
  182. if cell is None:
  183. logging.warning("Unexpected None Table Cell Found")
  184. else:
  185. cell_text = page.crop(
  186. (cell[0], cell[1], cell[2], cell[3])
  187. ).extract_text()
  188. target_timeslot = allowed_time_slots[expected_timeslot_index]
  189. if not (
  190. target_timeslot.start_time in cell_text
  191. and target_timeslot.end_time in cell_text
  192. ):
  193. logging.warning(
  194. "Unexpected Timeslot found: '%s'", cell_text
  195. )
  196. else:
  197. # assumes this is the last timeslot ever
  198. if target_timeslot == TimeSlot(
  199. start_time="20:30", end_time="21:15"
  200. ):
  201. for weekday in Weekday:
  202. new_area = Area(
  203. x1=weekday_areas[weekday].x1,
  204. y1=weekday_areas[weekday].y1,
  205. x2=weekday_areas[weekday].x2,
  206. y2=cell[3],
  207. )
  208. weekday_areas[weekday] = new_area
  209. timeslot_y_levels[target_timeslot] = YLevel(
  210. y1=cell[1], y2=cell[3]
  211. )
  212. expected_timeslot_index += 1
  213. for weekday in Weekday:
  214. unmerged_time_entries_by_weekday[weekday] = UnmergedTimeEntries(
  215. cells=[], horizontal_lines=[]
  216. )
  217. target_area = weekday_areas[weekday]
  218. logging.debug("target_area: %s", target_area)
  219. for row_index, row in enumerate(table.rows):
  220. for column_index, cell in enumerate(row.cells):
  221. if cell is None:
  222. logging.debug("None table cell found")
  223. continue
  224. logging.debug("row: %d, col: %d", row_index, column_index)
  225. logging.debug("cell: %s", cell)
  226. if (
  227. target_area.x1 <= cell[0]
  228. and target_area.y1 <= cell[1]
  229. and target_area.x2 >= cell[2]
  230. and target_area.y2 >= cell[3]
  231. ):
  232. cell_dimensions = cell[0], cell[1], cell[2], cell[3]
  233. unmerged_time_entries_by_weekday[weekday].cells.append(
  234. Area(
  235. x1=cell[0], y1=cell[1], x2=cell[2], y2=cell[3]
  236. )
  237. )
  238. logging.debug("%s cell found", weekday)
  239. for line_found in page.lines:
  240. line_x1 = line_found["x0"]
  241. line_x2 = line_found["x1"]
  242. line_y1 = line_found["y0"]
  243. line_y2 = line_found["y1"]
  244. line_bottom = line_found["bottom"]
  245. # ignore non horizontal lines
  246. if line_y1 != line_y2:
  247. continue
  248. if target_area.x1 <= line_x1 and target_area.x2 >= line_x2:
  249. logging.debug("%s timeslot seperator line found", weekday)
  250. unmerged_time_entries_by_weekday[
  251. weekday
  252. ].horizontal_lines.append(
  253. HorizontalLine(x1=line_x1, x2=line_x2, y=line_bottom)
  254. )
  255. all_modules: list[RawExtractedModule] = []
  256. for weekday in Weekday:
  257. all_modules.extend(
  258. get_modules_from_weekday(
  259. weekday,
  260. unmerged_time_entries_by_weekday[weekday],
  261. page,
  262. timeslot_y_levels,
  263. page_index + 1,
  264. )
  265. )
  266. page_metadata = parse_above_table_text(
  267. text_above_table, previous_page_metadata
  268. )
  269. previous_page_metadata.append(page_metadata)
  270. extraction_data.append(
  271. ClassPdfExtractionPageData(
  272. raw_extracted_modules=all_modules, page_metadata=page_metadata
  273. )
  274. )
  275. return extraction_data
  276. def get_above_table_text(page: Page, table_y1: float) -> str:
  277. """
  278. Get the text above the timetable for metadata parsing
  279. """
  280. upper_region = page.crop((0, 0, page.width, table_y1))
  281. text_above_table = upper_region.extract_text()
  282. logging.debug("Text found above the table:")
  283. logging.debug(text_above_table)
  284. return text_above_table