slide_selection_iterator.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. # Copyright © 2024 Noah Vogt <noah@noahvogt.com>
  2. # This program is free software: you can redistribute it and/or modify
  3. # it under the terms of the GNU General Public License as published by
  4. # the Free Software Foundation, either version 3 of the License, or
  5. # (at your option) any later version.
  6. # This program is distributed in the hope that it will be useful,
  7. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  8. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  9. # GNU General Public License for more details.
  10. # You should have received a copy of the GNU General Public License
  11. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  12. import os
  13. from threading import Thread
  14. from pathlib import Path
  15. import contextlib
  16. import io
  17. import re
  18. import obsws_python as obs
  19. from utils import (
  20. log,
  21. create_min_obs_subdirs,
  22. error_msg,
  23. expand_dir,
  24. )
  25. from input import parse_metadata, generate_final_prompt
  26. from slides import SlideStyle
  27. import config as const
  28. import slidegen
  29. def slide_selection_iterator(
  30. disable_async_enabled: bool, slide_style: SlideStyle
  31. ) -> None:
  32. iterator_prompt = "Exit now? [y/N]: "
  33. structure_prompt = (
  34. "Choose song structure (leave blank for full song)"
  35. + " eg. [1,R,2,R] / [1-4]: "
  36. )
  37. rclone_local_dir = expand_dir(const.RCLONE_LOCAL_DIR)
  38. song_counter = 0
  39. threads = []
  40. while True:
  41. song_counter += 1
  42. input_prompt_prefix = "[{}{}] ".format(
  43. const.OBS_SUBDIR_NAMING, song_counter
  44. )
  45. prompt_answer = str(input(input_prompt_prefix + iterator_prompt))
  46. if prompt_answer.lower() == "y":
  47. create_min_obs_subdirs()
  48. break
  49. os.system(
  50. "cd {} && fzf {} > {}".format(
  51. rclone_local_dir,
  52. const.FZF_ARGS,
  53. os.path.join(
  54. const.SSYNC_CACHE_DIR, const.SSYNC_CHOSEN_FILE_NAMING
  55. ),
  56. )
  57. )
  58. chosen_song_file = read_chosen_song_file()
  59. if len(chosen_song_file) == 0:
  60. log("no slides chosen, skipping...")
  61. else:
  62. src_dir = os.path.join(rclone_local_dir, chosen_song_file)
  63. dest_dir = create_and_get_dest_dir(
  64. expand_dir(const.OBS_SLIDES_DIR), song_counter
  65. )
  66. full_song_structure = get_structure_for_prompt(
  67. slide_style, src_dir, dest_dir
  68. )
  69. log(
  70. "full song structure of '{}':\n{}".format(
  71. chosen_song_file,
  72. full_song_structure,
  73. ),
  74. color="magenta",
  75. )
  76. structure_prompt_answer = input(
  77. input_prompt_prefix + structure_prompt
  78. ).strip()
  79. log(
  80. "generating slides '{}' to '{}{}'...".format(
  81. chosen_song_file, const.OBS_SUBDIR_NAMING, song_counter
  82. )
  83. )
  84. threads.extend(
  85. generate_slides_for_selected_song(
  86. slide_style,
  87. src_dir,
  88. dest_dir,
  89. generate_final_prompt(
  90. structure_prompt_answer, full_song_structure
  91. ),
  92. disable_async_enabled,
  93. )
  94. )
  95. log("waiting for subprocesses to finish ...")
  96. for thread in threads:
  97. if thread.is_alive():
  98. thread.join()
  99. log("subprocesses finished.")
  100. remove_chosenfile()
  101. add_slides_to_obs_slideshow_inputs()
  102. def add_slides_to_obs_slideshow_inputs():
  103. pattern = re.compile(rf"{const.FILE_NAMING}(\d+)\.jpg$", re.IGNORECASE)
  104. folders = []
  105. for i in range(1, const.OBS_MIN_SUBDIRS + 1):
  106. folders.append(
  107. Path(const.OBS_SLIDES_DIR).joinpath(
  108. Path(f"{const.OBS_SUBDIR_NAMING}{i}")
  109. )
  110. )
  111. for folder in folders:
  112. slides = []
  113. for p in folder.iterdir():
  114. m = pattern.match(p.name)
  115. if m:
  116. slides.append((int(m.group(1)), str(p.resolve())))
  117. slides.sort(key=lambda x: x[0])
  118. ordered_files = [{"value": path} for _, path in slides]
  119. while True:
  120. try:
  121. # suppress stderr from obsws_python internals
  122. with contextlib.redirect_stderr(io.StringIO()):
  123. cl = obs.ReqClient(
  124. host=const.OBS_WEBSOCKET_HOSTNAME,
  125. port=const.OBS_WEBSOCKET_PORT,
  126. password=const.OBS_WEBSOCKET_PASSWORD,
  127. )
  128. source = (
  129. f"{const.SSYNC_SLIDESHOW_INPUT_NAMING}"
  130. + f"{str(folder.name)[len(const.OBS_SUBDIR_NAMING) :]}"
  131. )
  132. try:
  133. with contextlib.redirect_stderr(io.StringIO()):
  134. current_settings = cl.get_input_settings(
  135. source
  136. ).input_settings # type: ignore
  137. new_settings = dict(current_settings)
  138. new_settings["files"] = ordered_files
  139. cl.set_input_settings(
  140. name=source,
  141. settings=new_settings,
  142. overlay=False,
  143. )
  144. log(f"{len(ordered_files)} slides put in " + f"'{source}'.")
  145. break
  146. except obs.error.OBSSDKRequestError: # type: ignore
  147. log(
  148. message=str(
  149. f"Error: Cannot access slideshow input: '{source}' "
  150. + "Please add to OBS and press enter to try again: "
  151. ),
  152. color="red",
  153. end="",
  154. )
  155. input()
  156. except (ConnectionError, obs.error.OBSSDKError): # type: ignore
  157. log(
  158. message=str(
  159. "Error: Cannot connect to OBS Websocket. Please start OBS "
  160. + "and press enter to try again: "
  161. ),
  162. color="red",
  163. end="",
  164. )
  165. input()
  166. def generate_slides_for_selected_song(
  167. classic_slide_style: SlideStyle,
  168. src_dir: str,
  169. dest_dir: str,
  170. calculated_prompt: str | list[str],
  171. disable_async_enabled: bool,
  172. ) -> list[Thread]:
  173. executing_slidegen_instance = slidegen.Slidegen(
  174. classic_slide_style,
  175. src_dir,
  176. dest_dir,
  177. calculated_prompt,
  178. )
  179. return executing_slidegen_instance.execute(disable_async_enabled)
  180. def get_structure_for_prompt(classic_slide_style, src_dir, dest_dir):
  181. dummy_slidegen_instance = slidegen.Slidegen(
  182. classic_slide_style,
  183. src_dir,
  184. dest_dir,
  185. "",
  186. )
  187. parse_metadata(dummy_slidegen_instance)
  188. full_song_structure = dummy_slidegen_instance.metadata["structure"]
  189. return full_song_structure
  190. def get_file_list_inside(rclone_local_dir):
  191. file_list_str = ""
  192. try:
  193. for file in os.listdir(rclone_local_dir):
  194. file_list_str += file + "\n"
  195. except (FileNotFoundError, PermissionError, IOError) as error:
  196. error_msg(
  197. "Failed to access items in '{}'. Reason: {}".format(
  198. rclone_local_dir, error
  199. )
  200. )
  201. file_list_str = file_list_str[:-1]
  202. file_list_str = file_list_str.replace("\n", "\\n")
  203. return file_list_str
  204. def remove_chosenfile() -> None:
  205. try:
  206. if os.path.isfile(
  207. os.path.join(const.SSYNC_CACHE_DIR, const.SSYNC_CHOSEN_FILE_NAMING)
  208. ):
  209. os.remove(
  210. os.path.join(
  211. const.SSYNC_CACHE_DIR, const.SSYNC_CHOSEN_FILE_NAMING
  212. ),
  213. )
  214. except (FileNotFoundError, PermissionError, IOError) as error:
  215. error_msg("Failed to remove chosenfile. Reason: {}".format(error))
  216. def create_and_get_dest_dir(obs_slides_dir, index) -> str:
  217. dest_dir = os.path.join(
  218. obs_slides_dir,
  219. const.OBS_SUBDIR_NAMING + str(index),
  220. )
  221. os.mkdir(dest_dir)
  222. return dest_dir
  223. def read_chosen_song_file() -> str:
  224. with open(
  225. os.path.join(const.SSYNC_CACHE_DIR, const.SSYNC_CHOSEN_FILE_NAMING),
  226. encoding="utf-8-sig",
  227. mode="r",
  228. ) as tempfile_file_opener:
  229. chosen_song_file = tempfile_file_opener.read()[:-1].strip()
  230. return chosen_song_file