switch.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. import asyncio
  2. import os
  3. import serial_asyncio
  4. from serial.tools import list_ports
  5. import yaml
  6. from aiohttp import web
  7. # =========================
  8. # Config loading via XDG
  9. # =========================
  10. def get_config_path():
  11. xdg_config_home = os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config"))
  12. return os.path.join(xdg_config_home, "diy_ptz_switch", "config.yml")
  13. def load_location_roles():
  14. config_file = get_config_path()
  15. if not os.path.exists(config_file):
  16. raise FileNotFoundError(f"Config file not found at: {config_file}")
  17. with open(config_file, "r", encoding="utf-8") as f:
  18. config = yaml.safe_load(f)
  19. return config.get("location_roles", {})
  20. location_roles = load_location_roles()
  21. # =========================
  22. # Serial Port Mapping
  23. # =========================
  24. port_map = {}
  25. for port in list_ports.comports():
  26. if "LOCATION=" in port.hwid:
  27. loc = port.hwid.split("LOCATION=")[-1]
  28. if loc in location_roles:
  29. role = location_roles[loc]
  30. port_map[role] = port.device
  31. print("Port mapping by USB port:")
  32. for role, dev in port_map.items():
  33. print(f" {role}: {dev}")
  34. JOYSTICK_PORT = port_map.get("joystick")
  35. CAM1_PORT = port_map.get("cam1")
  36. CAM2_PORT = port_map.get("cam2")
  37. BAUDRATE = 2400
  38. # =========================
  39. # Globals
  40. # =========================
  41. current_target = "cam1" # Default
  42. current_mode = "preview" # Default
  43. cam_transports = {}
  44. # =========================
  45. # Pelco-D Packet Forwarding
  46. # =========================
  47. class JoystickProtocol(asyncio.Protocol):
  48. def __init__(self, forward_func):
  49. self.forward = forward_func
  50. self.buffer = bytearray()
  51. def data_received(self, data):
  52. print(f"[DEBUG] Raw data received: {data.hex()}")
  53. self.buffer += data
  54. self.parse_pelco_d_packets()
  55. def parse_pelco_d_packets(self):
  56. while len(self.buffer) >= 7:
  57. if self.buffer[0] != 0xFF:
  58. self.buffer.pop(0)
  59. continue
  60. packet = self.buffer[:7]
  61. self.buffer = self.buffer[7:]
  62. address = packet[1]
  63. cmd1 = packet[2]
  64. cmd2 = packet[3]
  65. data1 = packet[4]
  66. data2 = packet[5]
  67. print(
  68. f"[Joystick] Packet to camera addr {address:02X} — "
  69. f"Cmd1: {cmd1:02X}, Cmd2: {cmd2:02X}, "
  70. f"Data1: {data1:02X}, Data2: {data2:02X}, "
  71. f"Target: {current_target}"
  72. )
  73. self.forward(packet)
  74. class DummyCamProtocol(asyncio.Protocol):
  75. def connection_made(self, transport):
  76. pass
  77. # =========================
  78. # Pelco-D Preset Helpers
  79. # =========================
  80. def make_preset_command(cam_address: int, cmd2: int, preset_id: int):
  81. if not (1 <= preset_id <= 0xFF):
  82. raise ValueError("Preset ID must be between 1 and 255")
  83. cmd1 = 0x00
  84. data1 = 0x00
  85. data2 = preset_id
  86. packet = bytearray([0xFF, cam_address, cmd1, cmd2, data1, data2])
  87. checksum = sum(packet[1:]) % 256
  88. packet.append(checksum)
  89. return packet
  90. def send_preset_command(cam_name, cmd2, preset_id):
  91. transport = cam_transports.get(cam_name)
  92. if not transport:
  93. print(f"[WARN] No transport for {cam_name}")
  94. return
  95. packet = make_preset_command(1, cmd2, preset_id) # Camera address is hardcoded as 1
  96. print(f"[API] Sending preset cmd2={cmd2:02X} preset_id={preset_id} to {cam_name}")
  97. transport.write(packet)
  98. # =========================
  99. # HTTP Server (aiohttp)
  100. # =========================
  101. async def handle_status(request):
  102. return web.json_response({"current_target": current_target})
  103. async def handle_set_target(request):
  104. global current_target
  105. data = await request.json()
  106. target = data.get("target")
  107. if target not in cam_transports:
  108. return web.json_response({"error": f"Invalid target: {target}"}, status=400)
  109. current_target = target
  110. print(f"[API] Target set to: {current_target}")
  111. return web.json_response({"status": "ok", "target": current_target})
  112. async def handle_goto_preset(request):
  113. data = await request.json()
  114. preset_id = int(data.get("preset"))
  115. target = data.get("target", current_target)
  116. if target not in cam_transports:
  117. return web.json_response({"error": f"Invalid target: {target}"}, status=400)
  118. send_preset_command(target, cmd2=0x07, preset_id=preset_id)
  119. return web.json_response({"status": "ok", "action": "goto", "preset": preset_id, "target": target})
  120. async def handle_save_preset(request):
  121. data = await request.json()
  122. preset_id = int(data.get("preset"))
  123. target = data.get("target", current_target)
  124. if target == "both":
  125. for cam in ["cam1", "cam2"]:
  126. send_preset_command(cam, cmd2=0x03, preset_id=preset_id)
  127. elif target in cam_transports:
  128. send_preset_command(target, cmd2=0x03, preset_id=preset_id)
  129. else:
  130. return web.json_response({"error": f"Invalid target: {target}"}, status=400)
  131. return web.json_response({"status": "ok", "action": "save", "preset": preset_id, "target": target})
  132. async def handle_set_mode(request):
  133. global current_mode
  134. mode = request.query.get("mode")
  135. if mode not in ("preview", "program"):
  136. return web.json_response({"error": "Invalid mode"}, status=400)
  137. current_mode = mode
  138. return web.json_response({"status": "ok", "mode": current_mode})
  139. async def handle_get_mode(request):
  140. return web.json_response({"mode": current_mode})
  141. def start_http_server():
  142. app = web.Application()
  143. app.router.add_get("/target/get", handle_status)
  144. app.router.add_post("/target/set", handle_set_target)
  145. app.router.add_post("/preset/goto", handle_goto_preset)
  146. app.router.add_post("/preset/save", handle_save_preset)
  147. app.router.add_get("/mode/get", handle_get_mode)
  148. app.router.add_post("/mode/set", handle_set_mode)
  149. return web._run_app(app, port=1423)
  150. # =========================
  151. # Main
  152. # =========================
  153. async def main():
  154. global cam_transports
  155. loop = asyncio.get_running_loop()
  156. def forward_packet(packet):
  157. transport = cam_transports.get(current_target)
  158. if transport:
  159. transport.write(packet)
  160. else:
  161. print(f"[WARN] No transport for {current_target}")
  162. # Connect to cameras
  163. cam1_transport, _ = await serial_asyncio.create_serial_connection(
  164. loop, DummyCamProtocol, CAM1_PORT, baudrate=BAUDRATE
  165. )
  166. cam2_transport, _ = await serial_asyncio.create_serial_connection(
  167. loop, DummyCamProtocol, CAM2_PORT, baudrate=BAUDRATE
  168. )
  169. cam_transports = {"cam1": cam1_transport, "cam2": cam2_transport}
  170. # Connect to joystick
  171. await serial_asyncio.create_serial_connection(
  172. loop,
  173. lambda: JoystickProtocol(forward_packet),
  174. JOYSTICK_PORT,
  175. baudrate=BAUDRATE,
  176. )
  177. # Start HTTP API in a separate task
  178. asyncio.create_task(start_http_server())
  179. # Wait forever
  180. await asyncio.Event().wait()
  181. if __name__ == "__main__":
  182. asyncio.run(main())