ソースを参照

further refactoring of parse/

Noah Vogt 2 ヶ月 前
コミット
caf95f95b2
4 ファイル変更251 行追加118 行削除
  1. 1 1
      generate_classes_json.py
  2. 14 0
      parse/models.py
  3. 0 1
      parse/parse_modules.py
  4. 236 116
      parse/table_extraction.py

+ 1 - 1
generate_classes_json.py

@@ -80,7 +80,7 @@ def main() -> None:
     args = parser.parse_args()
     lecturers_file = args.lecturers
 
-    logging.basicConfig(level=logging.DEBUG)
+    logging.basicConfig(level=logging.INFO)
 
     valid_lecturer_shorthands: list[str] | None = None
     if lecturers_file:

+ 14 - 0
parse/models.py

@@ -1,7 +1,10 @@
 from enum import Enum, unique
 from typing import Annotated, Any
+from dataclasses import dataclass
 
 from pydantic import BaseModel, PlainSerializer, Field, ConfigDict, BeforeValidator
+from pdfplumber.page import Page
+from pdfplumber.table import Table
 
 
 class XLevel(BaseModel):
@@ -194,6 +197,7 @@ class ClassPdfExtractionPageData(BaseModel):
     raw_extracted_modules: list[RawExtractedModule]
     page_metadata: PageMetadata
 
+
 class RawClassPdfExtractionPageData(BaseModel):
     raw_extracted_modules: list[RawExtractedModule]
     above_table_text: str
@@ -202,3 +206,13 @@ class RawClassPdfExtractionPageData(BaseModel):
 class StartsWithMatch(BaseModel):
     shorthand_found: str
     num_of_matches: int
+
+
+@dataclass
+class TimeSlotYLevelsCollectionData:
+    row_index: int
+    page: Page
+    table: Table
+    expected_timeslot_index: int
+    last_timeslot: TimeSlot
+    weekday_areas: dict[Weekday, Area]

+ 0 - 1
parse/parse_modules.py

@@ -268,4 +268,3 @@ def get_classes(extraction_data: list[ClassPdfExtractionPageData]) -> list[str]:
     Get the classes from the class page's metadata.
     """
     return [page_data.page_metadata.class_name for page_data in extraction_data]
-

+ 236 - 116
parse/table_extraction.py

@@ -22,6 +22,7 @@ from .models import (
     ClassPdfExtractionPageData,
     RawClassPdfExtractionPageData,
     PageMetadata,
+    TimeSlotYLevelsCollectionData,
 )
 from .above_table_text import parse_above_table_text
 from .geometry import (
@@ -38,6 +39,16 @@ allowed_time_slots: list[TimeSlot] = [
 ]
 
 
+def find_next_cell_below_index(current_area: Area, cells: list[Area]) -> int:
+    """
+    Returns the index of the first cell directly below current_area, or -1 if none.
+    """
+    for index, cell in enumerate(cells):
+        if is_area_below(cell, current_area):
+            return index
+    return -1
+
+
 def get_weekday_from_text(text: str) -> Weekday | None:
     """
     Helper function that tries to get a Weekday from a string.
@@ -49,6 +60,56 @@ def get_weekday_from_text(text: str) -> Weekday | None:
     return None
 
 
+def merge_vertically_spanning_cells(
+    initial_area: Area,
+    remaining_cells: list[Area],
+    horizontal_lines: list[HorizontalLine],
+    highest_y: float,
+    weekday: Weekday,
+) -> Area:
+    """
+    Merges vertically adjacent cells until a bottom boundary (line or page end) is found.
+    Mutates remaining_cells by removing used cells.
+    Returns the final merged area.
+    """
+    current_area = initial_area
+
+    while True:
+        logging.debug(
+            "Searching for bottom boundary of area: %s on %s", current_area, weekday
+        )
+
+        # case 1: horizontal line at the bottom of current area?
+        if any(
+            is_line_at_bottom(current_area, line, tolerance=20)
+            for line in horizontal_lines
+        ):
+            logging.debug("Bottom boundary found: horizontal line")
+            return current_area
+
+        # case 2: reached the bottom of the timetable?
+        if is_vertical_match(current_area.y2, highest_y):
+            logging.debug("Bottom boundary found: highest y level")
+            return current_area
+
+        # case 3: find and merge with the next cell below
+        next_cell_index = find_next_cell_below_index(current_area, remaining_cells)
+        if next_cell_index == -1:
+            raise RuntimeError(
+                f"No bottom boundary or next cell found for module on {weekday}"
+            )
+
+        next_cell = remaining_cells.pop(next_cell_index)
+        logging.debug("Vertically merging with cell below: %s", next_cell)
+
+        current_area = Area(
+            x1=current_area.x1,
+            y1=current_area.y1,
+            x2=next_cell.x2,  # use the wider x2 in case of a slight misalignment
+            y2=next_cell.y2,
+        )
+
+
 def get_modules_from_weekday(
     weekday: Weekday,
     unmerged_time_entries: UnmergedTimeEntries,
@@ -57,71 +118,71 @@ def get_modules_from_weekday(
     page_number: int,
 ) -> list[RawExtractedModule]:
     """
-    Extracts the modules (raw text and start/end) of a weekday on a single pdf page
+    Extracts the modules (raw text and start/end) of a weekday on a single pdf page.
     """
-    highest_y_level = timeslot_y_levels[allowed_time_slots[-1]].y2
-    modules = []
-    while len(unmerged_time_entries.cells) > 0:
-        area = unmerged_time_entries.cells.pop(0)
-        if is_mostly_white_area(page, area):
+    cells = unmerged_time_entries.cells[:]
+    horizontal_lines = unmerged_time_entries.horizontal_lines
+
+    highest_y: float = get_highest_y_level(timeslot_y_levels, page_number)
+    modules: list[RawExtractedModule] = []
+    while cells:
+        initial_area = cells.pop(0)
+
+        if is_mostly_white_area(page, initial_area):
             logging.debug("mostly white cell skipped")
             continue
-        timeslot = get_timeslot_for_area(area, timeslot_y_levels)
-        if timeslot is None:
-            raise RuntimeError("Could not match TimeSlot to Cell Area")
-        start_seconds = timeslot.start_seconds()
-        line_at_bottom_found = False
-        while not line_at_bottom_found:
-            logging.debug("searching for line at bottom of: %s", area)
-            logging.debug("line candidates:")
-            for line in unmerged_time_entries.horizontal_lines:
-                logging.debug("testing horizontal line: %s", line)
-                if is_line_at_bottom(area, line, tolerance=20):
-                    line_at_bottom_found = True
-                    logging.debug("candidate line found")
-                    break
-            else:
-
-                if is_vertical_match(area.y2, highest_y_level):
-                    logging.debug("highest y level matched")
-                    break
-                found_matching_next_cell_index = -1
-                for index, potential_cell_below in enumerate(
-                    unmerged_time_entries.cells
-                ):
-                    if is_area_below(potential_cell_below, area):
-                        found_matching_next_cell_index = index
-                        break
-                else:
-                    raise RuntimeError(
-                        f"No matching cell below found to merge with on {weekday}"
-                    )
-                logging.debug("vertically merging cells for %s", weekday)
-                matched_area = unmerged_time_entries.cells.pop(
-                    found_matching_next_cell_index
-                )
-                logging.debug("matched cell area: %s", matched_area)
-                area = Area(
-                    x1=area.x1, y1=area.y1, x2=matched_area.x2, y2=matched_area.y2
-                )
 
-        text = page.crop((area.x1, area.y1, area.x2, area.y2)).extract_text()
-        timeslot = get_timeslot_for_area(area, timeslot_y_levels)
-        if timeslot is None:
-            raise RuntimeError("Could not match TimeSlot to Cell Area")
-        end_seconds = timeslot.end_seconds()
+        merged_area: Area = merge_vertically_spanning_cells(
+            initial_area, cells, horizontal_lines, highest_y, weekday
+        )
+
+        start_timeslot = get_timeslot_for_area(initial_area, timeslot_y_levels)
+        if start_timeslot is None:
+            raise RuntimeError(
+                f"Could not determine start timeslot for module on {weekday}"
+            )
+
+        end_timeslot = get_timeslot_for_area(merged_area, timeslot_y_levels)
+        if end_timeslot is None:
+            raise RuntimeError(
+                f"Could not determine end timeslot for merged module on {weekday}"
+            )
+
+        text: str = (
+            page.crop(
+                (merged_area.x1, merged_area.y1, merged_area.x2, merged_area.y2)
+            ).extract_text()
+            or ""  # do not raise error when extraction returns None for now
+        )
+
         modules.append(
             RawExtractedModule(
                 weekday=weekday,
-                start_seconds=start_seconds,
-                end_seconds=end_seconds,
+                start_seconds=start_timeslot.start_seconds(),
+                end_seconds=end_timeslot.end_seconds(),
                 text=text,
                 source_page_number=page_number,
             )
         )
+
     return modules
 
 
+def get_highest_y_level(timeslot_y_levels, page_number) -> float:
+    """
+    Gets the highest `YLevel` of all `TimeSlot`'s.
+
+    Raises:
+        RuntimeError: If no the highest allowed `TimeSlot` was not mapped to a `YLevel`
+    """
+    try:
+        highest_y_level = timeslot_y_levels[allowed_time_slots[-1]].y2
+    except KeyError as e:
+        logging.debug("timeslot_y_levels on page %d %s", page_number, timeslot_y_levels)
+        raise RuntimeError("Could not get YLevel for latest TimeSlot") from e
+    return highest_y_level
+
+
 def get_usable_table_index(found_tables: list) -> int:
     """
     Identifies the index of the timetable on the page based on dimensions.
@@ -170,73 +231,16 @@ def process_page(
     """
     with pdfplumber.open(input_filename) as pdf:
         page = pdf.pages[page_index]
-        weekday_areas: dict[Weekday, Area] = {}
         timeslot_y_levels: dict[TimeSlot, YLevel] = {}
         unmerged_time_entries_by_weekday: dict[Weekday, UnmergedTimeEntries] = {}
-
-        for day in Weekday:
-            weekday_areas[day] = Area(x1=0, y1=0, x2=0, y2=0)
+        weekday_areas: dict[Weekday, Area] = init_weekday_areas()
 
         table: Table = select_main_table(page, page_index)
-        text_above_table = get_above_table_text(page, table_y1=table.bbox[1])
-
-        empty_start_found = False
+        text_above_table: str = get_above_table_text(page, table_y1=table.bbox[1])
 
-        # get weekday and timeslot areas
-        expected_timeslot_index = 0
-        for row_index, row in enumerate(table.rows):
-            if row_index == 0:
-                for column_index, cell in enumerate(row.cells):
-                    logging.debug("row: %d, col: %d", row_index, column_index)
-                    logging.debug(cell)
-                    if cell is None:
-                        logging.debug("None Table Cell Found")
-                    else:
-                        cell_text = page.crop(
-                            (cell[0], cell[1], cell[2], cell[3])
-                        ).extract_text()
-                        if not empty_start_found and len(cell_text) == 0:
-                            logging.debug("empty start found")
-                            empty_start_found = True
-
-                        weekday_enum = get_weekday_from_text(cell_text)
-                        if weekday_enum:
-                            logging.debug("Weekday %s found", cell_text)
-                            weekday_areas[weekday_enum] = Area(
-                                x1=cell[0], y1=cell[3], x2=cell[2], y2=0
-                            )
-            else:
-                logging.debug("row: %d, col: %d", row_index, 0)
-                cell = row.cells[0]
-                if cell is None:
-                    logging.warning("Unexpected None Table Cell Found")
-                else:
-                    cell_text = page.crop(
-                        (cell[0], cell[1], cell[2], cell[3])
-                    ).extract_text()
-                    target_timeslot = allowed_time_slots[expected_timeslot_index]
-                    if not (
-                        target_timeslot.start_time in cell_text
-                        and target_timeslot.end_time in cell_text
-                    ):
-                        logging.warning("Unexpected Timeslot found: '%s'", cell_text)
-                    else:
-                        # assumes this is the last timeslot ever
-                        if target_timeslot == TimeSlot(
-                            start_time="20:30", end_time="21:15"
-                        ):
-                            for weekday in Weekday:
-                                new_area = Area(
-                                    x1=weekday_areas[weekday].x1,
-                                    y1=weekday_areas[weekday].y1,
-                                    x2=weekday_areas[weekday].x2,
-                                    y2=cell[3],
-                                )
-                                weekday_areas[weekday] = new_area
-                        timeslot_y_levels[target_timeslot] = YLevel(
-                            y1=cell[1], y2=cell[3]
-                        )
-                        expected_timeslot_index += 1
+        collect_weekday_areas_and_timeslot_y_levels(
+            weekday_areas, timeslot_y_levels, page, table
+        )
 
         collected_unmerged_time_entries_by_weekday(
             unmerged_time_entries_by_weekday, weekday_areas, table, page
@@ -257,6 +261,122 @@ def process_page(
             raw_extracted_modules=all_modules, above_table_text=text_above_table
         )
 
+
+def collect_weekday_areas_and_timeslot_y_levels(
+    weekday_areas: dict[Weekday, Area],
+    timeslot_y_levels: dict[TimeSlot, YLevel],
+    page: Page,
+    table: Table,
+) -> None:
+    """
+    Populates the passed weekday_areas and timeslot_y_levels dicts with the right
+    `Area`'s by `Weekday` and `YLevel` by TimeSlot respectively, via side effects.
+    """
+    expected_timeslot_index = 0
+    for row_index, row in enumerate(table.rows):
+        if row_index == 0:
+            collect_weekday_areas(weekday_areas, page, row, row_index)
+        else:
+            expected_timeslot_index: int = collect_timeslot_y_levels_of_row(
+                timeslot_y_levels,
+                TimeSlotYLevelsCollectionData(
+                    row_index=row_index,
+                    expected_timeslot_index=expected_timeslot_index,
+                    last_timeslot=get_last_timeslot(allowed_time_slots),
+                    page=page,
+                    table=table,
+                    weekday_areas=weekday_areas,
+                ),
+            )
+
+
+def collect_timeslot_y_levels_of_row(
+    timeslot_y_levels: dict[TimeSlot, YLevel],
+    collection_data: TimeSlotYLevelsCollectionData,
+) -> int:
+    """
+    Populates the passed and timeslot_y_levels dicts with the right
+    `YLevel`'s by `TimeSlot` via side effects.
+
+    Returns:
+        int for the current expected `TimeSlot` index
+    """
+    logging.debug("row: %d, col: %d", collection_data.row_index, 0)
+    row = collection_data.table.rows[collection_data.row_index]
+    cell = row.cells[0]
+    if cell is None:
+        logging.warning("None Table cell found, not collecting YLevel of Row")
+        return collection_data.expected_timeslot_index
+    cell_text = collection_data.page.crop(
+        (cell[0], cell[1], cell[2], cell[3])
+    ).extract_text()
+    target_timeslot = allowed_time_slots[collection_data.expected_timeslot_index]
+    if not (
+        target_timeslot.start_time in cell_text
+        and target_timeslot.end_time in cell_text
+    ):
+        logging.warning("Unexpected TimeSlot found: '%s'", cell_text)
+        return collection_data.expected_timeslot_index
+    if target_timeslot == collection_data.last_timeslot:
+        for weekday in Weekday:
+            new_area = Area(
+                x1=collection_data.weekday_areas[weekday].x1,
+                y1=collection_data.weekday_areas[weekday].y1,
+                x2=collection_data.weekday_areas[weekday].x2,
+                y2=cell[3],
+            )
+            collection_data.weekday_areas[weekday] = new_area
+    timeslot_y_levels[target_timeslot] = YLevel(y1=cell[1], y2=cell[3])
+    return collection_data.expected_timeslot_index + 1
+
+
+def collect_weekday_areas(weekday_areas, page, row, row_index) -> None:
+    """
+    Populates the passed weekday_areas dict with the right
+    `Area`'s by `Weekday` via side effects.
+    """
+    empty_start_found = False
+    for column_index, cell in enumerate(row.cells):
+        logging.debug("row: %d, col: %d", row_index, column_index)
+        logging.debug(cell)
+        if cell is None:
+            logging.debug("None Table Cell Found")
+        else:
+            cell_text = page.crop((cell[0], cell[1], cell[2], cell[3])).extract_text()
+            if not empty_start_found and len(cell_text) == 0:
+                logging.debug("empty start found")
+                empty_start_found = True
+
+            weekday_enum: Weekday | None = get_weekday_from_text(cell_text)
+            if weekday_enum:
+                logging.debug("Weekday %s found", cell_text)
+                weekday_areas[weekday_enum] = Area(
+                    x1=cell[0], y1=cell[3], x2=cell[2], y2=0
+                )
+
+
+def get_last_timeslot(time_slots: list[TimeSlot]) -> TimeSlot:
+    """
+    Get the last timeslot a weekday can have.
+    """
+    if len(time_slots) == 0:
+        raise RuntimeError("Cannot get the latest timeslot from an empty list")
+    last_timeslot = time_slots[-1]
+    logging.debug("last timeslot found: %s", last_timeslot)
+
+    return last_timeslot
+
+
+def init_weekday_areas() -> dict[Weekday, Area]:
+    """
+    Initializes the weekday areas with zero-valued `Area`'s for each `Weekday`
+    """
+    weekday_areas: dict[Weekday, Area] = {}
+    for day in Weekday:
+        weekday_areas[day] = Area(x1=0, y1=0, x2=0, y2=0)
+    return weekday_areas
+
+
 def select_main_table(page: Page, page_index: int) -> Table:
     """
     Selects the main table on the PDF Page. This should be the timetable.
@@ -279,7 +399,7 @@ def collected_unmerged_time_entries_by_weekday(
 ) -> None:
     """
     Populates the passed unmerged_time_entries_by_weekday dict with the
-    `UnmergedTimeEntries` by `Weekday`.
+    `UnmergedTimeEntries` by `Weekday` via side effects.
     """
     for weekday in Weekday:
         unmerged_time_entries_by_weekday[weekday] = UnmergedTimeEntries(
@@ -319,8 +439,8 @@ def collect_horizontal_lines(
 ) -> None:
     """
     Populates the passed unmerged_time_entries_by_weekday dict with the
-    `horizontal_lines` of the `UnmergedTimeEntries` by the passed weekday.
-    These horizontal Lines are timeslot seperator lines.
+    `horizontal_lines` of the `UnmergedTimeEntries` by the passed weekday
+    via side effects. These horizontal Lines are timeslot seperator lines.
     """
     for line_found in page.lines:
         line_x1 = line_found["x0"]