Selaa lähdekoodia

add flag to set log level + use named logger

Noah Vogt 2 kuukautta sitten
vanhempi
sitoutus
796f234ade

+ 28 - 15
extract_lecturer_shorthands_pdf.py

@@ -14,6 +14,8 @@ from config import (
 )
 from parse import RawLecturer, Lecturer
 
+logger = logging.getLogger("modulplaner-backend.extract_lecturer_shorthands_pdf")
+
 
 def extract_rows_from_lecturer_shorthand_pdf(input_file) -> list[RawLecturer]:
     lecturers: list[RawLecturer] = []
@@ -34,7 +36,7 @@ def extract_rows_from_lecturer_shorthand_pdf(input_file) -> list[RawLecturer]:
             # even if it drifts slightly left.
             sep_x_1 = nachname_rects[0]["x0"] - 2
             sep_x_2 = vorname_rects[0]["x0"] - 2
-            logging.debug(
+            logger.debug(
                 "calculated separators: %d (Nachname), %d (Vorname)", sep_x_1, sep_x_2
             )
         else:
@@ -57,7 +59,7 @@ def extract_rows_from_lecturer_shorthand_pdf(input_file) -> list[RawLecturer]:
 
             # guard against empty lines list if page has no lines
             if not lines_y1:
-                logging.warning("First page has no lines")
+                logger.warning("First page has no lines")
                 crop_box = (0, 0, page.width, page.height)
             else:
                 crop_box = (0, min_line_y1, page.width, max_line_y1)
@@ -78,7 +80,7 @@ def extract_rows_from_lecturer_shorthand_pdf(input_file) -> list[RawLecturer]:
 
             for row_index, row in enumerate(table.rows):
                 if row is None:
-                    logging.debug("None table row found")
+                    logger.debug("None table row found")
                     continue
 
                 valid_cells = [cell for cell in row.cells if cell is not None]
@@ -93,7 +95,7 @@ def extract_rows_from_lecturer_shorthand_pdf(input_file) -> list[RawLecturer]:
 
                 row_bbox = (row_left, row_top, row_right, row_bottom)
 
-                logging.debug("row %d dimensions: %s", row_index, row_bbox)
+                logger.debug("row %d dimensions: %s", row_index, row_bbox)
 
                 # column 1: From start of row -> Nachname separator
                 col1_bbox = (row_left, row_top, sep_x_1, row_bottom)
@@ -102,19 +104,23 @@ def extract_rows_from_lecturer_shorthand_pdf(input_file) -> list[RawLecturer]:
                 # column 3: From Vorname separator -> End of row
                 col3_bbox = (sep_x_2, row_top, row_right, row_bottom)
 
-                logging.debug("col 1 bbox: %s", col1_bbox)
-                logging.debug("col 2 bbox: %s", col2_bbox)
-                logging.debug("col 3 bbox: %s", col3_bbox)
+                logger.debug("col 1 bbox: %s", col1_bbox)
+                logger.debug("col 2 bbox: %s", col2_bbox)
+                logger.debug("col 3 bbox: %s", col3_bbox)
 
                 row_text: str = cropped_page.crop(row_bbox).extract_text()
-                logging.debug("row text: %s", row_text)
+                logger.debug("row text: %s", row_text)
                 col1_text = cropped_page.crop(col1_bbox).extract_text()
-                logging.debug("col 1 text: %s", col1_text)
+                logger.debug("col 1 text: %s", col1_text)
                 col2_text = cropped_page.crop(col2_bbox).extract_text()
-                logging.debug("col 2 text: %s", col2_text)
+                logger.debug("col 2 text: %s", col2_text)
                 col3_text = cropped_page.crop(col3_bbox).extract_text()
-                logging.debug("col 3 text: %s", col3_text)
-                lecturers.append(RawLecturer(col1_text, col3_text, col2_text))
+                logger.debug("col 3 text: %s", col3_text)
+                lecturers.append(
+                    RawLecturer(
+                        shorthand=col1_text, firstname=col3_text, surname=col2_text
+                    )
+                )
 
     return lecturers
 
@@ -147,7 +153,7 @@ def parse_lecturers(raw_lecturers: list[RawLecturer]) -> list[Lecturer]:
     lecturers: list[Lecturer] = []
     for raw_lecturer in raw_lecturers:
         if is_table_header_row(raw_lecturer) or is_vak_example_row(raw_lecturer):
-            logging.debug("skipping raw lecturer: %s", raw_lecturer)
+            logger.debug("skipping raw lecturer: %s", raw_lecturer)
         else:
             new_lecturer: Lecturer = Lecturer(
                 short=raw_lecturer.shorthand,
@@ -155,7 +161,7 @@ def parse_lecturers(raw_lecturers: list[RawLecturer]) -> list[Lecturer]:
                 firstname=raw_lecturer.firstname,
             )
             if new_lecturer in lecturers:
-                logging.debug("skipped over duplicate lecturer: %s", new_lecturer)
+                logger.debug("skipped over duplicate lecturer: %s", new_lecturer)
             else:
                 lecturers.append(new_lecturer)
     return lecturers
@@ -175,9 +181,16 @@ def main() -> None:
         help="Path to the output JSON file",
         default=LECTURER_SHORTHAND_JSON_OUTPUT_FILE,
     )
+    parser.add_argument(
+        "--log-level",
+        help="Set the logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
+        default="INFO",
+        type=str.upper,
+        choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
+    )
     args = parser.parse_args()
 
-    logging.basicConfig(level=logging.INFO)
+    logging.basicConfig(level=args.log_level)
 
     raw_lecturers: list[RawLecturer] = extract_rows_from_lecturer_shorthand_pdf(
         args.input

+ 15 - 6
generate_classes_json.py

@@ -17,6 +17,8 @@ from parse import (
 
 from config import CLASS_TIMETABLE_PDF_INPUT_FILE, CLASSES_JSON_OUTPUT_FILE
 
+logger = logging.getLogger("modulplaner-backend")
+
 
 def get_valid_lecturers(file_path: str) -> list[str]:
     """
@@ -24,18 +26,18 @@ def get_valid_lecturers(file_path: str) -> list[str]:
     """
     valid_lecturers: list[str] = []
     try:
-        logging.warning("reading lecturers file: '%s'", file_path)
+        logger.warning("reading lecturers file: '%s'", file_path)
         with open(file_path, "r", encoding="utf-8") as f:
             data = json.load(f)
             if isinstance(data, list):
                 for entry in data:
                     if isinstance(entry, dict) and "short" in entry:
                         valid_lecturers.append(entry["short"])
-        logging.info(
+        logger.info(
             "Loaded %d valid lecturers from %s", len(valid_lecturers), file_path
         )
     except Exception as e:
-        logging.error("Failed to load valid lecturers from '%s': %s", file_path, e)
+        logger.error("Failed to load valid lecturers from '%s': %s", file_path, e)
     return valid_lecturers
 
 
@@ -76,11 +78,18 @@ def main() -> None:
         type=int,
         default=1,
     )
+    parser.add_argument(
+        "--log-level",
+        help="Set the logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
+        default="INFO",
+        type=str.upper,
+        choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
+    )
 
     args = parser.parse_args()
     lecturers_file = args.lecturers
 
-    logging.basicConfig(level=logging.INFO)
+    logging.basicConfig(level=args.log_level)
 
     valid_lecturer_shorthands: list[str] | None = None
     if lecturers_file:
@@ -89,7 +98,7 @@ def main() -> None:
     extraction_data: list[ClassPdfExtractionPageData]
 
     if args.load_intermediate:
-        logging.info("Loading intermediate data from %s", args.load_intermediate)
+        logger.info("Loading intermediate data from %s", args.load_intermediate)
         with open(args.load_intermediate, "r", encoding="utf-8") as f:
             extraction_data = TypeAdapter(
                 list[ClassPdfExtractionPageData]
@@ -97,7 +106,7 @@ def main() -> None:
     else:
         extraction_data = extract_data_from_class_pdf(args.input, num_of_jobs=args.jobs)
         if args.save_intermediate:
-            logging.info("Saving intermediate data to %s", args.save_intermediate)
+            logger.info("Saving intermediate data to %s", args.save_intermediate)
             with open(args.save_intermediate, "w", encoding="utf-8") as f:
                 f.write(
                     TypeAdapter(list[ClassPdfExtractionPageData])

+ 4 - 2
parse/above_table_text.py

@@ -11,6 +11,8 @@ from .models import (
     Time,
 )
 
+logger = logging.getLogger("modulplaner-backend.above_table_text")
+
 
 def parse_above_table_text(
     txt: str, previous_page_metadata: list[PageMetadata]
@@ -76,13 +78,13 @@ def get_class_name(third_line: str) -> str:
 def get_degree_program(
     third_line: str, class_name: str, previous_page_metadata: list[PageMetadata]
 ) -> DegreeProgram:
-    logging.debug("class_name: '%s'", class_name)
+    logger.debug("class_name: '%s'", class_name)
     if "Kontext BWL" and "Kommunikation" and "GSW" in third_line:
         return DegreeProgram.MIXED_BWL_GSW_KOMM
     for degree_program in DegreeProgram:
         if degree_program.value in third_line:
             return degree_program
-    logging.warning("Using heuristics to guess the degree_program in %s", third_line)
+    logger.warning("Using heuristics to guess the degree_program in %s", third_line)
     try:
         for page_metadata in previous_page_metadata:
             if page_metadata.class_name == class_name[:-1]:

+ 3 - 1
parse/img.py

@@ -5,6 +5,8 @@ from pdfplumber.page import Page
 
 from .models import Area
 
+logger = logging.getLogger("modulplaner-backend.img")
+
 
 def is_mostly_white_area(page: Page, area: Area) -> bool:
     """
@@ -31,6 +33,6 @@ def is_mostly_white_area(page: Page, area: Area) -> bool:
 
     total_pixels = arr.shape[0] * arr.shape[1]
     whitish_percentage = is_whitish.sum() / total_pixels
-    logging.debug("whitish: %.2f%%", whitish_percentage * 100)
+    logger.debug("whitish: %.2f%%", whitish_percentage * 100)
 
     return whitish_percentage > 0.9

+ 8 - 6
parse/parse_modules.py

@@ -1,5 +1,5 @@
-from typing import List
 import logging
+from typing import List
 
 from pydantic import TypeAdapter
 
@@ -16,6 +16,8 @@ from .models import (
     ClassPdfExtractionPageData,
 )
 
+logger = logging.getLogger("modulplaner-backend.parse_modules")
+
 
 def get_modules_for_class_json(
     modules: list[RawExtractedModule],
@@ -110,7 +112,7 @@ def parse_module_class_pdf_cell_text(
     Parse a single Class Timetable PDF module cell text.
     """
     lines = text.split("\n")
-    logging.debug("Parsing module cell text: \n%s", text)
+    logger.debug("Parsing module cell text: \n%s", text)
     if len(lines) != 3 and len(lines) != 2:
         raise RuntimeError("Invalid Number of Lines in the cell text.")
     if len(lines) == 3:
@@ -149,7 +151,7 @@ def get_lecturer_shorthands(
             if len(word) == LECTURER_SHORTHAND_SIZE:
                 lecturer_shorthands.append(word)
             else:
-                logging.warning("Could not get Lecturer Shorthand from word: %s", word)
+                logger.warning("Could not get Lecturer Shorthand from word: %s", word)
     else:
         for word in words:
             exact_starts_with_match = matches_startswith(
@@ -168,7 +170,7 @@ def get_lecturer_shorthands(
                     minus_last_char_starts_with_match.shorthand_found
                 )
             else:
-                logging.warning("Could not get Lecturer Shorthand from word: %s", word)
+                logger.warning("Could not get Lecturer Shorthand from word: %s", word)
     return lecturer_shorthands
 
 
@@ -213,13 +215,13 @@ def get_module_shorthand(
                     f"cut off class name part '{class_name_part}'"
                     + f" of class name '{class_name}' in line '{first_line}'"
                 )
-                logging.debug(debug_msg)
+                logger.debug(debug_msg)
                 break
 
         for foreign_class_name in all_class_names:
             if word.endswith(foreign_class_name):
                 word = word[: word.rfind(foreign_class_name)]
-                logging.debug(
+                logger.debug(
                     "cut off class name '%s' in line '%s'",
                     foreign_class_name,
                     first_line,

+ 31 - 28
parse/table_extraction.py

@@ -2,9 +2,9 @@ import logging
 from multiprocessing import Pool
 from pathlib import Path
 
+import pdfplumber
 from pdfplumber.page import Page
 from pdfplumber.table import Table
-import pdfplumber
 
 from config import (
     CLASS_TIMETABLE_PDF_TABLE_SETTINGS,
@@ -33,6 +33,9 @@ from .geometry import (
 )
 from .img import is_mostly_white_area
 
+logger = logging.getLogger("modulplaner-backend.table_extraction")
+
+
 allowed_time_slots: list[TimeSlot] = [
     TimeSlot(start_time=timeslot_tuple[0], end_time=timeslot_tuple[1])
     for timeslot_tuple in ALLOWED_TIMESLOTS
@@ -75,7 +78,7 @@ def merge_vertically_spanning_cells(
     current_area = initial_area
 
     while True:
-        logging.debug(
+        logger.debug(
             "Searching for bottom boundary of area: %s on %s", current_area, weekday
         )
 
@@ -84,12 +87,12 @@ def merge_vertically_spanning_cells(
             is_line_at_bottom(current_area, line, tolerance=20)
             for line in horizontal_lines
         ):
-            logging.debug("Bottom boundary found: horizontal line")
+            logger.debug("Bottom boundary found: horizontal line")
             return current_area
 
         # case 2: reached the bottom of the timetable?
         if is_vertical_match(current_area.y2, highest_y):
-            logging.debug("Bottom boundary found: highest y level")
+            logger.debug("Bottom boundary found: highest y level")
             return current_area
 
         # case 3: find and merge with the next cell below
@@ -100,7 +103,7 @@ def merge_vertically_spanning_cells(
             )
 
         next_cell = remaining_cells.pop(next_cell_index)
-        logging.debug("Vertically merging with cell below: %s", next_cell)
+        logger.debug("Vertically merging with cell below: %s", next_cell)
 
         current_area = Area(
             x1=current_area.x1,
@@ -129,7 +132,7 @@ def get_modules_from_weekday(
         initial_area = cells.pop(0)
 
         if is_mostly_white_area(page, initial_area):
-            logging.debug("mostly white cell skipped")
+            logger.debug("mostly white cell skipped")
             continue
 
         merged_area: Area = merge_vertically_spanning_cells(
@@ -178,7 +181,7 @@ def get_highest_y_level(timeslot_y_levels, page_number) -> float:
     try:
         highest_y_level = timeslot_y_levels[allowed_time_slots[-1]].y2
     except KeyError as e:
-        logging.debug("timeslot_y_levels on page %d %s", page_number, timeslot_y_levels)
+        logger.debug("timeslot_y_levels on page %d %s", page_number, timeslot_y_levels)
         raise RuntimeError("Could not get YLevel for latest TimeSlot") from e
     return highest_y_level
 
@@ -198,7 +201,7 @@ def get_usable_table_index(found_tables: list) -> int:
         x0, top, x1, bottom = table.bbox
         width = x1 - x0
         height = bottom - top
-        logging.debug(
+        logger.debug(
             "table num %d: width: %d, height: %d",
             index + 1,
             width,
@@ -301,11 +304,11 @@ def collect_timeslot_y_levels_of_row(
     Returns:
         int for the current expected `TimeSlot` index
     """
-    logging.debug("row: %d, col: %d", collection_data.row_index, 0)
+    logger.debug("row: %d, col: %d", collection_data.row_index, 0)
     row = collection_data.table.rows[collection_data.row_index]
     cell = row.cells[0]
     if cell is None:
-        logging.warning("None Table cell found, not collecting YLevel of Row")
+        logger.warning("None Table cell found, not collecting YLevel of Row")
         return collection_data.expected_timeslot_index
     cell_text = collection_data.page.crop(
         (cell[0], cell[1], cell[2], cell[3])
@@ -315,7 +318,7 @@ def collect_timeslot_y_levels_of_row(
         target_timeslot.start_time in cell_text
         and target_timeslot.end_time in cell_text
     ):
-        logging.warning("Unexpected TimeSlot found: '%s'", cell_text)
+        logger.warning("Unexpected TimeSlot found: '%s'", cell_text)
         return collection_data.expected_timeslot_index
     if target_timeslot == collection_data.last_timeslot:
         for weekday in Weekday:
@@ -337,19 +340,19 @@ def collect_weekday_areas(weekday_areas, page, row, row_index) -> None:
     """
     empty_start_found = False
     for column_index, cell in enumerate(row.cells):
-        logging.debug("row: %d, col: %d", row_index, column_index)
-        logging.debug(cell)
+        logger.debug("row: %d, col: %d", row_index, column_index)
+        logger.debug(cell)
         if cell is None:
-            logging.debug("None Table Cell Found")
+            logger.debug("None Table Cell Found")
         else:
             cell_text = page.crop((cell[0], cell[1], cell[2], cell[3])).extract_text()
             if not empty_start_found and len(cell_text) == 0:
-                logging.debug("empty start found")
+                logger.debug("empty start found")
                 empty_start_found = True
 
             weekday_enum: Weekday | None = get_weekday_from_text(cell_text)
             if weekday_enum:
-                logging.debug("Weekday %s found", cell_text)
+                logger.debug("Weekday %s found", cell_text)
                 weekday_areas[weekday_enum] = Area(
                     x1=cell[0], y1=cell[3], x2=cell[2], y2=0
                 )
@@ -362,7 +365,7 @@ def get_last_timeslot(time_slots: list[TimeSlot]) -> TimeSlot:
     if len(time_slots) == 0:
         raise RuntimeError("Cannot get the latest timeslot from an empty list")
     last_timeslot = time_slots[-1]
-    logging.debug("last timeslot found: %s", last_timeslot)
+    logger.debug("last timeslot found: %s", last_timeslot)
 
     return last_timeslot
 
@@ -382,7 +385,7 @@ def select_main_table(page: Page, page_index: int) -> Table:
     Selects the main table on the PDF Page. This should be the timetable.
     """
     found_tables = page.find_tables(CLASS_TIMETABLE_PDF_TABLE_SETTINGS)
-    logging.debug(
+    logger.debug(
         "amount of tables found on page %d: %d",
         page_index + 1,
         len(found_tables),
@@ -406,15 +409,15 @@ def collected_unmerged_time_entries_by_weekday(
             cells=[], horizontal_lines=[]
         )
         target_area: Area = weekday_areas[weekday]
-        logging.debug("target_area: %s", target_area)
+        logger.debug("target_area: %s", target_area)
 
         for row_index, row in enumerate(table.rows):
             for column_index, cell in enumerate(row.cells):
                 if cell is None:
-                    logging.debug("None table cell found")
+                    logger.debug("None table cell found")
                     continue
-                logging.debug("row: %d, col: %d", row_index, column_index)
-                logging.debug("cell: %s", cell)
+                logger.debug("row: %d, col: %d", row_index, column_index)
+                logger.debug("cell: %s", cell)
                 if (
                     target_area.x1 <= cell[0]
                     and target_area.y1 <= cell[1]
@@ -424,7 +427,7 @@ def collected_unmerged_time_entries_by_weekday(
                     unmerged_time_entries_by_weekday[weekday].cells.append(
                         Area(x1=cell[0], y1=cell[1], x2=cell[2], y2=cell[3])
                     )
-                    logging.debug("%s cell found", weekday)
+                    logger.debug("%s cell found", weekday)
 
         collect_horizontal_lines(
             unmerged_time_entries_by_weekday, page, target_area, weekday
@@ -454,7 +457,7 @@ def collect_horizontal_lines(
             continue
 
         if target_area.x1 <= line_x1 and target_area.x2 >= line_x2:
-            logging.debug("%s timeslot seperator line found", weekday)
+            logger.debug("%s timeslot seperator line found", weekday)
             unmerged_time_entries_by_weekday[weekday].horizontal_lines.append(
                 HorizontalLine(x1=line_x1, x2=line_x2, y=line_bottom)
             )
@@ -467,10 +470,10 @@ def extract_data_from_class_pdf(
     Extracts all data from the specified Class Timetable PDF filename.
     Can run via multiple jobs.
     """
-    logging.info("Starting extraction with %d jobs", num_of_jobs)
+    logger.info("Starting extraction with %d jobs", num_of_jobs)
 
     num_pages: int = get_number_of_pdf_pages(input_filename)
-    logging.info("Found %d pages to process", num_pages)
+    logger.info("Found %d pages to process", num_pages)
 
     processed_pages: list[RawClassPdfExtractionPageData] = process_pages_in_parallel(
         num_of_jobs, input_filename, num_pages
@@ -531,7 +534,7 @@ def get_above_table_text(page: Page, table_y1: float) -> str:
     upper_region = page.crop((0, 0, page.width, table_y1))
     text_above_table = upper_region.extract_text()
 
-    logging.debug("Text found above the table:")
-    logging.debug(text_above_table)
+    logger.debug("Text found above the table:")
+    logger.debug(text_above_table)
 
     return text_above_table

+ 19 - 10
rip_modulplaner_frontend_data.py

@@ -17,6 +17,8 @@ from config import (
     REQUESTS_TIMEOUT,
 )
 
+logger = logging.getLogger("modulplaner-backend.rip_frontend_data")
+
 
 def download_file(url: str, local_path: Path) -> bool:
     """
@@ -30,16 +32,16 @@ def download_file(url: str, local_path: Path) -> bool:
 
         with open(local_path, "wb") as f:
             f.write(response.content)
-        logging.info("Downloaded: %s", local_path)
+        logger.info("Downloaded: %s", local_path)
         return True
     except requests.exceptions.HTTPError as e:
         if e.response.status_code == 404:
-            logging.warning("File not found (404): %s", url)
+            logger.warning("File not found (403): %s", url)
         else:
-            logging.error("Failed to download %s: %s", url, e)
+            logger.error("Failed to download %s: %s", url, e)
         return False
     except Exception as e:
-        logging.error("Error downloading %s: %s", url, e)
+        logger.error("Error downloading %s: %s", url, e)
         return False
 
 
@@ -49,12 +51,12 @@ def get_semester_versions(
     """
     Downloads and parses the semester-versions.json file.
     """
-    logging.info("Fetching semester list...")
+    logger.info("Fetching semester list...")
     if not download_file(
         f"{base_url}/{FRONTEND_RIPPER_SEMESTER_VERSIONS_FILE}",
         output_dir / FRONTEND_RIPPER_SEMESTER_VERSIONS_FILE,
     ):
-        logging.error("Could not download semester-versions.json. Exiting.")
+        logger.error("Could not download semester-versions.json. Exiting.")
         return None
 
     try:
@@ -63,7 +65,7 @@ def get_semester_versions(
         ) as f:
             return json.load(f)
     except json.JSONDecodeError:
-        logging.error("Error parsing semester-versions.json")
+        logger.error("Error parsing semester-versions.json")
         return None
 
 
@@ -71,7 +73,7 @@ def process_semester(semester: str, base_url: str, output_dir: Path) -> None:
     """
     Downloads files associated with a specific semester.
     """
-    logging.info("Processing Semester: %s", semester)
+    logger.info("Processing Semester: %s", semester)
 
     semester_level_files = ["blockclasses.json", "config.json"]
     for s_file in semester_level_files:
@@ -89,7 +91,7 @@ def process_semester(semester: str, base_url: str, output_dir: Path) -> None:
                         output_dir / semester / blockclass_file,
                     )
         except (json.JSONDecodeError, OSError) as e:
-            logging.error("Error reading config.json for %s: %s", semester, e)
+            logger.error("Error reading config.json for %s: %s", semester, e)
 
 
 def process_version(
@@ -121,9 +123,16 @@ def main():
         help="Output directory for downloaded files",
         default=FRONTEND_RIPPER_OUTPUT_DIR_DEFAULT,
     )
+    parser.add_argument(
+        "--log-level",
+        help="Set the logging level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
+        default="INFO",
+        type=str.upper,
+        choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
+    )
     args = parser.parse_args()
 
-    logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")
+    logging.basicConfig(level=args.log_level)
 
     base_url = args.base_url
     output_dir = Path(args.output_dir)