VulcanBoard.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. #!/usr/bin/env python3
  2. # Copyright © 2024 Noah Vogt <noah@noahvogt.com>
  3. # This program is free software: you can redistribute it and/or modify
  4. # it under the terms of the GNU General Public License as published by
  5. # the Free Software Foundation, either version 3 of the License, or
  6. # (at your option) any later version.
  7. # This program is distributed in the hope that it will be useful,
  8. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. # GNU General Public License for more details.
  11. # You should have received a copy of the GNU General Public License
  12. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. # pylint: disable=invalid-name
  14. import threading
  15. import sys
  16. import asyncio
  17. import time
  18. import colorama
  19. import uvicorn
  20. from fastapi import FastAPI, HTTPException
  21. from kivy.app import App
  22. from kivy.uix.gridlayout import GridLayout
  23. from kivy.utils import get_color_from_hex
  24. from kivy.config import Config as kvConfig
  25. from kivy.core.window import Window
  26. from kivy.clock import Clock
  27. from util import log, error_exit_gui
  28. from config import (
  29. get_config_path,
  30. ConfigLoader,
  31. Config,
  32. get_state_from_id,
  33. get_state_id_from_exit_code,
  34. DEFAULT_BUTTON_BG_COLOR,
  35. DEFAULT_BUTTON_FG_COLOR,
  36. EMPTY_BUTTON_BG_COLOR,
  37. DEFAULT_STATE_ID,
  38. ERROR_SINK_STATE_ID,
  39. )
  40. from ui import AutoResizeButton
  41. class VulcanBoardApp(App):
  42. def __init__(self, **kwargs):
  43. super().__init__(**kwargs)
  44. self.last_touch_time = 0 # For debounce handling
  45. def build(self):
  46. self.loop = self.ensure_asyncio_loop_running()
  47. self.button_grid = {}
  48. self.button_config_map = {}
  49. self.icon = "icon.png"
  50. config_loader = ConfigLoader(get_config_path())
  51. config = config_loader.get_config() # pyright: ignore
  52. if isinstance(config, str):
  53. error_exit_gui(config)
  54. else:
  55. config: Config = config
  56. Window.borderless = config.borderless
  57. if config.set_window_pos:
  58. Window.left = config.window_pos_x
  59. Window.top = config.window_pos_y
  60. if config.use_auto_fullscreen_mode:
  61. Window.fullscreen = "auto"
  62. kvConfig.set("kivy", "exit_on_escape", "0")
  63. self.button_config_map = {
  64. (btn["position"][0], btn["position"][1]): btn
  65. for btn in config.buttons
  66. }
  67. layout = GridLayout(
  68. cols=config.columns,
  69. rows=config.rows,
  70. spacing=config.spacing,
  71. padding=config.padding,
  72. )
  73. # Populate grid with buttons and placeholders
  74. for row in range(config.rows):
  75. for col in range(config.columns):
  76. defined_button = self.button_config_map.get((row, col))
  77. if defined_button:
  78. states = defined_button.get("states", [])
  79. state = get_state_from_id(states, DEFAULT_STATE_ID)
  80. btn = AutoResizeButton(
  81. text=state.get("txt", ""),
  82. background_color=get_color_from_hex(
  83. state.get("bg_color", DEFAULT_BUTTON_BG_COLOR)
  84. ),
  85. color=get_color_from_hex(
  86. state.get("fg_color", DEFAULT_BUTTON_FG_COLOR)
  87. ),
  88. halign="center",
  89. valign="middle",
  90. background_normal="",
  91. state_id=DEFAULT_STATE_ID,
  92. )
  93. if defined_button.get("autostart", False):
  94. self.async_task(
  95. self.execute_command_async(defined_button, btn)
  96. )
  97. # Use debounce wrapper instead of raw on_release to
  98. # avoid double execution on single taps on touchscreens
  99. btn.bind( # pyright: ignore pylint: disable=no-member
  100. on_release=lambda btn_instance, button=defined_button: self.on_button_pressed_once(
  101. button, btn_instance
  102. )
  103. )
  104. else:
  105. btn = AutoResizeButton(
  106. background_color=get_color_from_hex(
  107. EMPTY_BUTTON_BG_COLOR
  108. ),
  109. )
  110. self.button_grid[(row, col)] = btn
  111. layout.add_widget(btn)
  112. self.api_app = FastAPI(title="VulcanBoard API")
  113. self._setup_api_routes()
  114. uvicorn_config = uvicorn.Config(
  115. app=self.api_app,
  116. host="0.0.0.0",
  117. port=config.api_port,
  118. log_level="info",
  119. )
  120. server = uvicorn.Server(uvicorn_config)
  121. self.async_task(server.serve())
  122. return layout
  123. def on_button_pressed_once(self, button, btn_instance):
  124. now = time.time()
  125. if now - self.last_touch_time > 0.3: # 300 ms debounce
  126. self.last_touch_time = now
  127. self.async_task(self.execute_command_async(button, btn_instance))
  128. def async_task(self, coroutine):
  129. asyncio.run_coroutine_threadsafe(coroutine, self.loop)
  130. def ensure_asyncio_loop_running(self):
  131. if hasattr(self, "loop"):
  132. return self.loop
  133. loop = asyncio.new_event_loop()
  134. def run_loop():
  135. asyncio.set_event_loop(loop)
  136. loop.run_forever()
  137. threading.Thread(target=run_loop, daemon=True).start()
  138. return loop
  139. def config_error_exit(self, popup):
  140. popup.dismiss()
  141. sys.exit(1)
  142. async def execute_command_async(self, button: dict, btn: AutoResizeButton):
  143. follow_up_state_loop = True
  144. states = button["states"]
  145. while follow_up_state_loop:
  146. state = get_state_from_id(states, btn.state_id)
  147. follow_up_state_loop = False
  148. try:
  149. process = await asyncio.create_subprocess_shell(
  150. state["cmd"], shell=True
  151. )
  152. exit_code = await process.wait()
  153. log(f"Executed command: {state['cmd']}")
  154. except Exception as e:
  155. exit_code = ERROR_SINK_STATE_ID
  156. log(f"Error executing command: {e}", color="yellow")
  157. if len(states) != 1:
  158. if isinstance(
  159. follow_up_state := state.get("follow_up_state"), int
  160. ):
  161. follow_up_state_loop = True
  162. exit_code = follow_up_state
  163. Clock.schedule_once(
  164. lambda _: self.update_button_feedback(
  165. states, btn, exit_code
  166. )
  167. )
  168. affects_buttons = button.get("affects_buttons", None)
  169. if affects_buttons:
  170. for affected_btn_dims in affects_buttons:
  171. btn_pos = (affected_btn_dims[0], affected_btn_dims[1])
  172. affected_button = self.button_grid[btn_pos]
  173. Clock.schedule_once(
  174. lambda _, btn_pos=btn_pos, affected_button=affected_button: self.update_button_feedback(
  175. self.button_config_map[btn_pos]["states"],
  176. affected_button,
  177. exit_code,
  178. )
  179. )
  180. def update_button_feedback(
  181. self, states: list, btn: AutoResizeButton, exit_code: int
  182. ):
  183. state_id = get_state_id_from_exit_code(states, exit_code)
  184. state = get_state_from_id(states, state_id)
  185. btn.text = state.get("txt", "")
  186. btn.state_id = state_id
  187. btn.background_color = get_color_from_hex(
  188. state.get("bg_color", DEFAULT_BUTTON_BG_COLOR)
  189. )
  190. btn.color = get_color_from_hex(
  191. state.get("fg_color", DEFAULT_BUTTON_FG_COLOR)
  192. )
  193. def update_button_state_from_api(
  194. self, dt, row: int, col: int, state_id: int
  195. ):
  196. btn = self.button_grid.get((row, col))
  197. btn_config = self.button_config_map.get((row, col))
  198. if not btn or not btn_config:
  199. return
  200. states = btn_config.get("states", [])
  201. state = get_state_from_id(states, state_id)
  202. btn.text = state.get("txt", "")
  203. btn.state_id = state_id
  204. btn.background_color = get_color_from_hex(
  205. state.get("bg_color", DEFAULT_BUTTON_BG_COLOR)
  206. )
  207. btn.color = get_color_from_hex(
  208. state.get("fg_color", DEFAULT_BUTTON_FG_COLOR)
  209. )
  210. def _setup_api_routes(self):
  211. @self.api_app.get("/get_states")
  212. def get_states(x: int, y: int):
  213. btn_config = self.button_config_map.get((x, y))
  214. if not btn_config:
  215. raise HTTPException(status_code=404, detail="Button not found")
  216. return {"states": btn_config.get("states", [])}
  217. @self.api_app.get("/get_current_state")
  218. def get_current_state(x: int, y: int):
  219. btn = self.button_grid.get((x, y))
  220. if not btn:
  221. raise HTTPException(status_code=404, detail="Button not found")
  222. return {"state_id": getattr(btn, "state_id", DEFAULT_STATE_ID)}
  223. @self.api_app.get("/set_state")
  224. @self.api_app.post("/set_state")
  225. def set_state(x: int, y: int, state: int):
  226. btn_config = self.button_config_map.get((x, y))
  227. btn = self.button_grid.get((x, y))
  228. if not btn_config or not btn:
  229. raise HTTPException(status_code=404, detail="Button not found")
  230. states = btn_config.get("states", [])
  231. valid_state = None
  232. for s in states:
  233. if s.get("id", DEFAULT_STATE_ID) == state:
  234. valid_state = s
  235. break
  236. if not valid_state:
  237. raise HTTPException(
  238. status_code=400,
  239. detail=f"State {state} not found for this button",
  240. )
  241. Clock.schedule_once(
  242. lambda dt: self.update_button_state_from_api(dt, x, y, state)
  243. )
  244. return {"status": "success", "state_id": state}
  245. def start_asyncio_loop():
  246. asyncio.set_event_loop(asyncio.new_event_loop())
  247. asyncio.get_event_loop().run_forever()
  248. if __name__ == "__main__":
  249. colorama.init()
  250. VulcanBoardApp().run()