Procházet zdrojové kódy

initial cd recording code

Noah Vogt před 1 rokem
rodič
revize
2e1969a984

+ 6 - 0
config/default_config.py

@@ -93,3 +93,9 @@ OBS_MIN_SUBDIRS = 7
 NEXTSONG_CACHE_FILE = ""
 OBS_SWITCH_TO_SCENE_HOTKEY_PREFIX = ["ctrl", "shift"]
 OBS_TRANSITION_HOTKEY = ["ctrl", "shift", "f12"]
+
+CD_RECORD_CACHEFILE = ""
+CD_RECORD_OUTPUT_BASEDIR = ""
+CD_RECORD_FFMPEG_INPUT_ARGS = ""
+CD_RECORD_MAX_SECONDS = 4800
+CD_RECORD_MIN_TRACK_MILIS = 4200

+ 2 - 2
force_song.py

@@ -22,7 +22,7 @@ from sys import argv
 
 from utils import (
     error_msg,
-    make_sure_cachefile_exists,
+    make_sure_file_exists,
     switch_to_song,
 )
 from input import validate_songchooser_config
@@ -47,7 +47,7 @@ def exit_if_force_int_is_illegal():
 
 def main() -> None:
     validate_songchooser_config()
-    make_sure_cachefile_exists()
+    make_sure_file_exists(const.NEXTSONG_CACHE_FILE)
     exit_if_force_int_is_illegal()
     switch_to_song(get_force_int())
 

+ 6 - 2
input/__init__.py

@@ -19,12 +19,16 @@ from .parse_prompt import parse_prompt_input, generate_final_prompt
 from .parse_file import (
     parse_metadata,
     parse_songtext,
-    get_songchooser_cachefile_content,
+    get_cachefile_content,
 )
 from .parse_argv import (
     parse_ssync_args_as_tuple,
     parse_slidegen_argv_as_tuple,
     SsyncFlags,
 )
-from .validate_config import validate_ssync_config, validate_songchooser_config
+from .validate_config import (
+    validate_ssync_config,
+    validate_songchooser_config,
+    validate_cd_record_config,
+)
 from .slide_selection_iterator import slide_selection_iterator

+ 5 - 3
input/parse_file.py

@@ -22,6 +22,7 @@ from utils import (
     structure_as_list,
     get_unique_structure_elements,
     get_songtext_by_structure,
+    expand_dir,
 )
 
 import config as const
@@ -90,16 +91,17 @@ def parse_songtext(slidegen) -> None:
     slidegen.songtext = output_dict
 
 
-def get_songchooser_cachefile_content() -> list:
+def get_cachefile_content(cachefile: str) -> list:
+    expanded_path = expand_dir(cachefile)
     try:
         with open(
-            const.NEXTSONG_CACHE_FILE, mode="r", encoding="utf-8-sig"
+            expanded_path, mode="r", encoding="utf-8-sig"
         ) as cachefile_reader:
             cachefile_content = cachefile_reader.readlines()
     except (FileNotFoundError, PermissionError, IOError) as error:
         error_msg(
             "Failed to access cachefile in '{}'. Reason: {}".format(
-                const.NEXTSONG_CACHE_FILE, error
+                expanded_path, error
             )
         )
     return cachefile_content

+ 10 - 0
input/validate_config.py

@@ -45,6 +45,16 @@ def validate_songchooser_config() -> None:
     general_config_validator(needed_constants)
 
 
+def validate_cd_record_config() -> None:
+    needed_constants: dict = {
+        "CD_RECORD_CACHEFILE": const.CD_RECORD_CACHEFILE,
+        "CD_RECORD_OUTPUT_BASEDIR": const.CD_RECORD_OUTPUT_BASEDIR,
+        "CD_RECORD_FFMPEG_INPUT_ARGS": const.CD_RECORD_FFMPEG_INPUT_ARGS,
+        "CD_RECORD_MAX_SECONDS": const.CD_RECORD_MAX_SECONDS,
+    }
+    general_config_validator(needed_constants)
+
+
 def general_config_validator(needed_constants: dict) -> None:
     for key in needed_constants:
         if needed_constants.get(key) == "":

+ 7 - 6
next_song.py

@@ -20,22 +20,23 @@ along with this program.  If not, see <http://www.gnu.org/licenses/>.
 from re import match
 
 from utils import (
-    calculate_yyyy_mm_dd_date,
+    get_yyyy_mm_dd_date,
     switch_to_song,
-    make_sure_cachefile_exists,
+    make_sure_file_exists,
 )
-from input import get_songchooser_cachefile_content, validate_songchooser_config
+from input import get_cachefile_content, validate_songchooser_config
+import config as const
 
 
 def cycle_to_next_song() -> None:
-    cachefile_content = get_songchooser_cachefile_content()
+    cachefile_content = get_cachefile_content(const.NEXTSONG_CACHE_FILE)
     if (
         not (
             len(cachefile_content) == 2
             and match(r"[0-9]{4}-[0-9]{2}-[0-9]{2}$", cachefile_content[0])
             and match(r"^[0-9]+$", cachefile_content[1])
         )
-        or cachefile_content[0].strip() != calculate_yyyy_mm_dd_date()
+        or cachefile_content[0].strip() != get_yyyy_mm_dd_date()
     ):
         switch_to_song(1)
     else:
@@ -44,7 +45,7 @@ def cycle_to_next_song() -> None:
 
 def main() -> None:
     validate_songchooser_config()
-    make_sure_cachefile_exists()
+    make_sure_file_exists(const.NEXTSONG_CACHE_FILE)
     cycle_to_next_song()
 
 

+ 189 - 0
set_cd_marker.py

@@ -0,0 +1,189 @@
+#!/usr/bin/env python3
+
+"""
+Copyright © 2024 Noah Vogt <noah@noahvogt.com>
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program.  If not, see <http://www.gnu.org/licenses/>.
+"""
+
+from os import path, mkdir
+from shlex import split
+from subprocess import Popen
+
+from utils import (
+    get_yyyy_mm_dd_date,
+    make_sure_file_exists,
+    is_valid_cd_record_checkfile,
+    get_unix_milis,
+    log,
+    warn,
+    error_msg,
+    expand_dir,
+)
+from input import get_cachefile_content, validate_cd_record_config
+import config as const
+
+
+def start_cd_recording() -> int:
+    date = get_yyyy_mm_dd_date()
+    filename = path.join(
+        const.CD_RECORD_OUTPUT_BASEDIR,
+        date,
+        "{}.wav".format(date),
+    )
+
+    log("starting cd recording...")
+    cmd = "ffmpeg -y {} -ar 44100 -t {} {}".format(
+        const.CD_RECORD_FFMPEG_INPUT_ARGS,
+        const.CD_RECORD_MAX_SECONDS,
+        filename,
+    )
+    process = Popen(split(cmd))
+    return process.pid
+
+
+def create_cachefile_for_marker(
+    cachefile_content: list,
+    yyyy_mm_dd: str,
+    unix_milis: int,
+    *ffmpeg_recording_pid: int,
+    initial_run=False,
+) -> None:
+    cachefile = expand_dir(const.CD_RECORD_CACHEFILE)
+    if initial_run:
+        marker = 1
+    else:
+        marker = int(cachefile_content[1]) + 1
+        if marker > 99:
+            return
+
+    if (
+        not (initial_run)
+        and unix_milis - int(cachefile_content[4])
+        < const.CD_RECORD_MIN_TRACK_MILIS
+    ):
+        return
+
+    log("writing cd marker {} to cachefile...".format(marker))
+    try:
+        with open(cachefile, mode="w+", encoding="utf-8-sig") as file_writer:
+            file_writer.write(yyyy_mm_dd + "\n")
+            file_writer.write(str(marker) + "\n")
+            if initial_run:
+                file_writer.write("{}\n".format(ffmpeg_recording_pid[0]))
+                file_writer.write(str(unix_milis) + "\n")
+            else:
+                file_writer.write(cachefile_content[2].strip() + "\n")
+                file_writer.write(cachefile_content[3].strip() + "\n")
+            file_writer.write(str(unix_milis) + "\n")
+    except (FileNotFoundError, PermissionError, IOError) as error:
+        error_msg(
+            "Failed to write to cachefile '{}'. Reason: {}".format(
+                cachefile, error
+            )
+        )
+
+
+def update_cue_sheet(
+    cachefile_content: list, yyyy_mm_dd: str, unix_milis: int, initial_run=False
+) -> None:
+    cue_sheet_dir = path.join(
+        expand_dir(const.CD_RECORD_OUTPUT_BASEDIR), yyyy_mm_dd
+    )
+    cue_sheet_path = path.join(cue_sheet_dir, "sheet.cue")
+    wave_path = path.join(cue_sheet_dir, "{}.wav".format(yyyy_mm_dd))
+    if initial_run:
+        log("updating cue sheet...")
+        try:
+            if not path.exists(cue_sheet_dir):
+                mkdir(cue_sheet_dir)
+            with open(
+                cue_sheet_path, mode="w+", encoding="utf-8-sig"
+            ) as file_writer:
+                file_writer.write('FILE "{}" WAVE\n'.format(wave_path))
+                file_writer.write("  TRACK 01 AUDIO\n")
+                file_writer.write("    INDEX 01 00:00:00\n")
+        except (FileNotFoundError, PermissionError, IOError) as error:
+            error_msg(
+                "Failed to write to cue sheet file '{}'. Reason: {}".format(
+                    cue_sheet_path, error
+                )
+            )
+    else:
+        marker = int(cachefile_content[1]) + 1
+        if marker > 99:
+            warn("An Audio CD can only hold up to 99 tracks.")
+            return
+
+        start_milis = int(cachefile_content[3])
+        last_track_milis = int(cachefile_content[4])
+        if unix_milis - last_track_milis < const.CD_RECORD_MIN_TRACK_MILIS:
+            warn(
+                "Minimum track length of {}ms not satisfied".format(
+                    const.CD_RECORD_MIN_TRACK_MILIS
+                )
+            )
+            return
+
+        milis_diff = unix_milis - start_milis
+        mins = milis_diff // 60000
+        milis_diff -= 60000 * mins
+        secs = int(milis_diff / 1000)
+        milis_diff -= 1000 * secs
+        frames = int(75 / 1000 * milis_diff)
+
+        log("updating cue sheet...")
+        try:
+            with open(
+                cue_sheet_path, mode="a", encoding="utf-8-sig"
+            ) as file_writer:
+                file_writer.write("  TRACK {:02d} AUDIO\n".format(marker))
+                file_writer.write(
+                    "    INDEX 01 {:02d}:{:02d}:{:02d}\n".format(
+                        mins, secs, frames
+                    )
+                )
+        except (FileNotFoundError, PermissionError, IOError) as error:
+            error_msg(
+                "Failed to write to cue sheet file '{}'. Reason: {}".format(
+                    cue_sheet_path, error
+                )
+            )
+
+
+def set_cd_marker() -> None:
+    cachefile_content = get_cachefile_content(const.CD_RECORD_CACHEFILE)
+    yyyy_mm_dd = get_yyyy_mm_dd_date()
+    unix_milis = get_unix_milis()
+    cachefile_and_time_data = (cachefile_content, yyyy_mm_dd, unix_milis)
+
+    if is_valid_cd_record_checkfile(*cachefile_and_time_data[:-1]):
+        create_cachefile_for_marker(*cachefile_and_time_data)
+        update_cue_sheet(*cachefile_and_time_data)
+    else:
+        pid = start_cd_recording()
+        create_cachefile_for_marker(
+            *cachefile_and_time_data, pid, initial_run=True
+        )
+        update_cue_sheet(*cachefile_and_time_data, initial_run=True)
+
+
+def main() -> None:
+    validate_cd_record_config()
+    make_sure_file_exists(const.CD_RECORD_CACHEFILE)
+    set_cd_marker()
+
+
+if __name__ == "__main__":
+    main()

+ 75 - 0
stop_cd_recording.py

@@ -0,0 +1,75 @@
+#!/usr/bin/env python3
+
+"""
+Copyright © 2024 Noah Vogt <noah@noahvogt.com>
+
+This program is free software: you can redistribute it and/or modify
+it under the terms of the GNU General Public License as published by
+the Free Software Foundation, either version 3 of the License, or
+(at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License
+along with this program.  If not, see <http://www.gnu.org/licenses/>.
+"""
+
+from os import kill
+from signal import SIGTERM
+
+from utils import (
+    get_yyyy_mm_dd_date,
+    make_sure_file_exists,
+    is_valid_cd_record_checkfile,
+    log,
+    error_msg,
+    expand_dir,
+)
+from input import get_cachefile_content, validate_cd_record_config
+import config as const
+
+
+def mark_end_of_recording(cachefile_content: list) -> None:
+    cachefile = expand_dir(const.CD_RECORD_CACHEFILE)
+
+    log("marking end of recording...")
+    try:
+        with open(cachefile, mode="w+", encoding="utf-8-sig") as file_writer:
+            file_writer.write(cachefile_content[0].strip() + "\n")
+            file_writer.write("9001\n")
+            file_writer.write(cachefile_content[2].strip() + "\n")
+            file_writer.write(cachefile_content[3].strip() + "\n")
+            file_writer.write(cachefile_content[4].strip() + "\n")
+    except (FileNotFoundError, PermissionError, IOError) as error:
+        error_msg(
+            "Failed to write to cachefile '{}'. Reason: {}".format(
+                cachefile, error
+            )
+        )
+
+
+def stop_cd_recording() -> None:
+    cachefile_content = get_cachefile_content(const.CD_RECORD_CACHEFILE)
+    yyyy_mm_dd = get_yyyy_mm_dd_date()
+
+    if is_valid_cd_record_checkfile(cachefile_content, yyyy_mm_dd):
+        try:
+            kill(int(cachefile_content[2]), SIGTERM)
+        except ProcessLookupError:
+            error_msg("Recording not running, cannot be stopped.")
+        mark_end_of_recording(cachefile_content)
+    else:
+        error_msg("CD Record Checkfile is invalid.")
+
+
+def main() -> None:
+    validate_cd_record_config()
+    make_sure_file_exists(const.CD_RECORD_CACHEFILE)
+    stop_cd_recording()
+
+
+if __name__ == "__main__":
+    main()

+ 7 - 4
utils/__init__.py

@@ -13,10 +13,9 @@ GNU General Public License for more details.
 
 You should have received a copy of the GNU General Public License
 along with this program.  If not, see <http://www.gnu.org/licenses/>.
-
 """
 
-from .log import error_msg, log
+from .log import error_msg, warn, log
 from .strings import (
     get_songtext_by_structure,
     structure_as_list,
@@ -26,5 +25,9 @@ from .img import get_empty_image
 from .create_min_obs_subdirs import create_min_obs_subdirs
 from .clear_obs_slides_dir import clear_obs_slides_dir
 from .path import expand_dir
-from .date import calculate_yyyy_mm_dd_date
-from .songchooser import make_sure_cachefile_exists, switch_to_song
+from .date import get_yyyy_mm_dd_date, get_unix_milis
+from .scripts import (
+    make_sure_file_exists,
+    switch_to_song,
+    is_valid_cd_record_checkfile,
+)

+ 6 - 1
utils/date.py

@@ -16,7 +16,12 @@ along with this program.  If not, see <http://www.gnu.org/licenses/>.
 """
 
 from datetime import date
+from time import time
 
 
-def calculate_yyyy_mm_dd_date() -> str:
+def get_yyyy_mm_dd_date() -> str:
     return date.strftime(date.today(), "%Y-%m-%d")
+
+
+def get_unix_milis() -> int:
+    return int(time() * 1000)

+ 5 - 1
utils/log.py

@@ -25,5 +25,9 @@ def error_msg(msg: str):
     sys.exit(1)
 
 
-def log(message: str, color: str = "green") -> None:
+def warn(message: str) -> None:
+    print(colored("[*] Warning: {}".format(message), "yellow"))
+
+
+def log(message: str, color="green") -> None:
     print(colored("[*] {}".format(message), color))

+ 28 - 7
utils/songchooser.py → utils/scripts.py

@@ -17,24 +17,25 @@ along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 from os import path
 from time import sleep
+from re import match
 
 from pyautogui import keyDown, keyUp
 
-from utils import error_msg, log, calculate_yyyy_mm_dd_date
+from utils import error_msg, log, get_yyyy_mm_dd_date
 import config as const
 
 
-def make_sure_cachefile_exists() -> None:
-    if not path.isfile(const.NEXTSONG_CACHE_FILE):
+def make_sure_file_exists(cachefile: str) -> None:
+    if not path.isfile(cachefile):
         try:
             with open(
-                const.NEXTSONG_CACHE_FILE, mode="w+", encoding="utf-8-sig"
+                cachefile, mode="w+", encoding="utf-8-sig"
             ) as file_creator:
                 file_creator.write("")
         except (FileNotFoundError, PermissionError, IOError) as error:
             error_msg(
-                "Failed to create cachefile in '{}'. Reason: {}".format(
-                    const.NEXTSONG_CACHE_FILE, error
+                "Failed to create file in '{}'. Reason: {}".format(
+                    cachefile, error
                 )
             )
 
@@ -67,7 +68,7 @@ def create_cachfile_for_song(song) -> None:
         with open(
             const.NEXTSONG_CACHE_FILE, mode="w", encoding="utf-8-sig"
         ) as file_writer:
-            file_writer.write(calculate_yyyy_mm_dd_date() + "\n")
+            file_writer.write(get_yyyy_mm_dd_date() + "\n")
             file_writer.write(str(song) + "\n")
     except (FileNotFoundError, PermissionError, IOError) as error:
         error_msg(
@@ -75,3 +76,23 @@ def create_cachfile_for_song(song) -> None:
                 const.NEXTSONG_CACHE_FILE, error
             )
         )
+
+
+def is_valid_cd_record_checkfile(
+    cachefile_content: list, yyyy_mm_dd: str
+) -> bool:
+    return (
+        len(cachefile_content) == 5
+        # YYYY-MM-DD
+        and bool(match(r"[0-9]{4}-[0-9]{2}-[0-9]{2}$", cachefile_content[0]))
+        # last marker
+        and bool(match(r"^[0-9]+$", cachefile_content[1]))
+        # pid of ffmpeg recording instance
+        and bool(match(r"^[0-9]+$", cachefile_content[2]))
+        # unix milis @ recording start
+        and bool(match(r"^[0-9]+$", cachefile_content[3]))
+        # unix milis @ last track
+        and bool(match(r"^[0-9]+$", cachefile_content[4]))
+        # date matches today
+        and cachefile_content[0].strip() == yyyy_mm_dd
+    )