switch.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. import asyncio
  2. import os
  3. import argparse
  4. import serial_asyncio
  5. from serial.tools import list_ports
  6. import yaml
  7. from aiohttp import web
  8. def get_config_path():
  9. parser = argparse.ArgumentParser(description="PTZ router")
  10. parser.add_argument(
  11. "-c", "--config", type=str, help="Path to config.yml file"
  12. )
  13. args = parser.parse_args()
  14. if args.config:
  15. return args.config
  16. xdg_config_home = os.environ.get(
  17. "XDG_CONFIG_HOME", os.path.expanduser("~/.config")
  18. )
  19. return os.path.join(xdg_config_home, "diy_ptz_switch", "config.yml")
  20. def load_location_roles():
  21. config_file = get_config_path()
  22. if not os.path.exists(config_file):
  23. raise FileNotFoundError(f"Config file not found at: {config_file}")
  24. with open(config_file, "r", encoding="utf-8") as f:
  25. config = yaml.safe_load(f)
  26. return config.get("location_roles", {})
  27. location_roles = load_location_roles()
  28. port_map = {}
  29. for port in list_ports.comports():
  30. if "LOCATION=" in port.hwid:
  31. loc = port.hwid.split("LOCATION=")[-1]
  32. if loc in location_roles:
  33. role = location_roles[loc]
  34. port_map[role] = port.device
  35. print("Port mapping by USB port:")
  36. for role, dev in port_map.items():
  37. print(f" {role}: {dev}")
  38. JOYSTICK_PORT = port_map.get("joystick")
  39. CAM1_PORT = port_map.get("cam1")
  40. CAM2_PORT = port_map.get("cam2")
  41. BAUDRATE = 2400
  42. current_target = "cam1" # Default
  43. current_mode = "preview" # Default
  44. cam_transports = {}
  45. class JoystickProtocol(asyncio.Protocol):
  46. def __init__(self, forward_func):
  47. self.forward = forward_func
  48. self.buffer = bytearray()
  49. def data_received(self, data):
  50. print(f"[DEBUG] Raw data received: {data.hex()}")
  51. self.buffer += data
  52. self.parse_pelco_d_packets()
  53. def parse_pelco_d_packets(self):
  54. while len(self.buffer) >= 7:
  55. if self.buffer[0] != 0xFF:
  56. self.buffer.pop(0)
  57. continue
  58. packet = self.buffer[:7]
  59. self.buffer = self.buffer[7:]
  60. address = packet[1]
  61. cmd1 = packet[2]
  62. cmd2 = packet[3]
  63. data1 = packet[4]
  64. data2 = packet[5]
  65. print(
  66. f"[Joystick] Packet to camera addr {address:02X} — "
  67. f"Cmd1: {cmd1:02X}, Cmd2: {cmd2:02X}, "
  68. f"Data1: {data1:02X}, Data2: {data2:02X}, "
  69. f"Target: {current_target}"
  70. )
  71. self.forward(packet)
  72. class DummyCamProtocol(asyncio.Protocol):
  73. def connection_made(self, transport):
  74. pass
  75. def make_preset_command(cam_address: int, cmd2: int, preset_id: int):
  76. if not (1 <= preset_id <= 0xFF):
  77. raise ValueError("Preset ID must be between 1 and 255")
  78. cmd1 = 0x00
  79. data1 = 0x00
  80. data2 = preset_id
  81. packet = bytearray([0xFF, cam_address, cmd1, cmd2, data1, data2])
  82. checksum = sum(packet[1:]) % 256
  83. packet.append(checksum)
  84. return packet
  85. def send_preset_command(cam_name, cmd2, preset_id):
  86. transport = cam_transports.get(cam_name)
  87. if not transport:
  88. print(f"[WARN] No transport for {cam_name}")
  89. return
  90. packet = make_preset_command(
  91. 1, cmd2, preset_id
  92. ) # Camera address is hardcoded as 1
  93. print(
  94. f"[API] Sending preset cmd2={cmd2:02X} preset_id={preset_id} to {cam_name}"
  95. )
  96. transport.write(packet)
  97. async def handle_status(request):
  98. return web.json_response({"current_target": current_target})
  99. async def handle_set_target(request):
  100. global current_target
  101. data = await request.json()
  102. target = data.get("target")
  103. if target not in cam_transports:
  104. return web.json_response(
  105. {"error": f"Invalid target: {target}"}, status=400
  106. )
  107. current_target = target
  108. print(f"[API] Target set to: {current_target}")
  109. return web.json_response({"status": "ok", "target": current_target})
  110. async def handle_goto_preset(request):
  111. data = await request.json()
  112. preset_id = int(data.get("preset"))
  113. target = data.get("target", current_target)
  114. if target not in cam_transports:
  115. return web.json_response(
  116. {"error": f"Invalid target: {target}"}, status=400
  117. )
  118. send_preset_command(target, cmd2=0x07, preset_id=preset_id)
  119. return web.json_response(
  120. {
  121. "status": "ok",
  122. "action": "goto",
  123. "preset": preset_id,
  124. "target": target,
  125. }
  126. )
  127. async def handle_save_preset(request):
  128. data = await request.json()
  129. preset_id = int(data.get("preset"))
  130. target = data.get("target", current_target)
  131. if target == "both":
  132. for cam in ["cam1", "cam2"]:
  133. send_preset_command(cam, cmd2=0x03, preset_id=preset_id)
  134. elif target in cam_transports:
  135. send_preset_command(target, cmd2=0x03, preset_id=preset_id)
  136. else:
  137. return web.json_response(
  138. {"error": f"Invalid target: {target}"}, status=400
  139. )
  140. return web.json_response(
  141. {
  142. "status": "ok",
  143. "action": "save",
  144. "preset": preset_id,
  145. "target": target,
  146. }
  147. )
  148. async def handle_set_mode(request):
  149. global current_mode
  150. mode = request.query.get("mode")
  151. if mode not in ("preview", "program"):
  152. return web.json_response({"error": "Invalid mode"}, status=400)
  153. current_mode = mode
  154. return web.json_response({"status": "ok", "mode": current_mode})
  155. async def handle_get_mode(request):
  156. return web.json_response({"mode": current_mode})
  157. def start_http_server():
  158. app = web.Application()
  159. app.router.add_get("/target/get", handle_status)
  160. app.router.add_post("/target/set", handle_set_target)
  161. app.router.add_post("/preset/goto", handle_goto_preset)
  162. app.router.add_post("/preset/save", handle_save_preset)
  163. app.router.add_get("/mode/get", handle_get_mode)
  164. app.router.add_post("/mode/set", handle_set_mode)
  165. return web._run_app(app, port=1423)
  166. async def main():
  167. global cam_transports
  168. loop = asyncio.get_running_loop()
  169. def forward_packet(packet):
  170. transport = cam_transports.get(current_target)
  171. if transport:
  172. transport.write(packet)
  173. else:
  174. print(f"[WARN] No transport for {current_target}")
  175. # Connect to cameras
  176. cam1_transport, _ = await serial_asyncio.create_serial_connection(
  177. loop, DummyCamProtocol, CAM1_PORT, baudrate=BAUDRATE
  178. )
  179. cam2_transport, _ = await serial_asyncio.create_serial_connection(
  180. loop, DummyCamProtocol, CAM2_PORT, baudrate=BAUDRATE
  181. )
  182. cam_transports = {"cam1": cam1_transport, "cam2": cam2_transport}
  183. # Connect to joystick
  184. await serial_asyncio.create_serial_connection(
  185. loop,
  186. lambda: JoystickProtocol(forward_packet),
  187. JOYSTICK_PORT,
  188. baudrate=BAUDRATE,
  189. )
  190. # Start HTTP API in a separate task
  191. asyncio.create_task(start_http_server())
  192. # Wait forever
  193. await asyncio.Event().wait()
  194. if __name__ == "__main__":
  195. asyncio.run(main())