switch.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. import asyncio
  2. import os
  3. import serial_asyncio
  4. from serial.tools import list_ports
  5. import yaml
  6. from aiohttp import web
  7. # =========================
  8. # Config loading via XDG
  9. # =========================
  10. def get_config_path():
  11. xdg_config_home = os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config"))
  12. return os.path.join(xdg_config_home, "diy_ptz_switch", "config.yml")
  13. def load_location_roles():
  14. config_file = get_config_path()
  15. if not os.path.exists(config_file):
  16. raise FileNotFoundError(f"Config file not found at: {config_file}")
  17. with open(config_file, "r") as f:
  18. config = yaml.safe_load(f)
  19. return config.get("location_roles", {})
  20. location_roles = load_location_roles()
  21. # =========================
  22. # Serial Port Mapping
  23. # =========================
  24. port_map = {}
  25. for port in list_ports.comports():
  26. if "LOCATION=" in port.hwid:
  27. loc = port.hwid.split("LOCATION=")[-1]
  28. if loc in location_roles:
  29. role = location_roles[loc]
  30. port_map[role] = port.device
  31. print("Port mapping by USB port:")
  32. for role, dev in port_map.items():
  33. print(f" {role}: {dev}")
  34. JOYSTICK_PORT = port_map.get("joystick")
  35. CAM1_PORT = port_map.get("cam1")
  36. CAM2_PORT = port_map.get("cam2")
  37. BAUDRATE = 2400
  38. # =========================
  39. # Globals
  40. # =========================
  41. current_target = "cam1" # Default
  42. cam_transports = {}
  43. # =========================
  44. # Pelco-D Packet Forwarding
  45. # =========================
  46. class JoystickProtocol(asyncio.Protocol):
  47. def __init__(self, forward_func):
  48. self.forward = forward_func
  49. self.buffer = bytearray()
  50. def data_received(self, data):
  51. print(f"[DEBUG] Raw data received: {data.hex()}")
  52. self.buffer += data
  53. self.parse_pelco_d_packets()
  54. def parse_pelco_d_packets(self):
  55. while len(self.buffer) >= 7:
  56. if self.buffer[0] != 0xFF:
  57. self.buffer.pop(0)
  58. continue
  59. packet = self.buffer[:7]
  60. self.buffer = self.buffer[7:]
  61. address = packet[1]
  62. cmd1 = packet[2]
  63. cmd2 = packet[3]
  64. data1 = packet[4]
  65. data2 = packet[5]
  66. print(
  67. f"[Joystick] Packet to camera addr {address:02X} — "
  68. f"Cmd1: {cmd1:02X}, Cmd2: {cmd2:02X}, "
  69. f"Data1: {data1:02X}, Data2: {data2:02X}, "
  70. f"Target: {current_target}"
  71. )
  72. self.forward(packet)
  73. class DummyCamProtocol(asyncio.Protocol):
  74. def connection_made(self, transport):
  75. pass
  76. # =========================
  77. # Pelco-D Preset Helpers
  78. # =========================
  79. def make_preset_command(cam_address: int, cmd2: int, preset_id: int):
  80. if not (1 <= preset_id <= 0xFF):
  81. raise ValueError("Preset ID must be between 1 and 255")
  82. cmd1 = 0x00
  83. data1 = 0x00
  84. data2 = preset_id
  85. packet = bytearray([0xFF, cam_address, cmd1, cmd2, data1, data2])
  86. checksum = sum(packet[1:]) % 256
  87. packet.append(checksum)
  88. return packet
  89. def send_preset_command(cam_name, cmd2, preset_id):
  90. transport = cam_transports.get(cam_name)
  91. if not transport:
  92. print(f"[WARN] No transport for {cam_name}")
  93. return
  94. packet = make_preset_command(1, cmd2, preset_id) # Camera address is hardcoded as 1
  95. print(f"[API] Sending preset cmd2={cmd2:02X} preset_id={preset_id} to {cam_name}")
  96. transport.write(packet)
  97. # =========================
  98. # HTTP Server (aiohttp)
  99. # =========================
  100. async def handle_status(request):
  101. return web.json_response({"current_target": current_target})
  102. async def handle_set_target(request):
  103. global current_target
  104. data = await request.json()
  105. target = data.get("target")
  106. if target not in cam_transports:
  107. return web.json_response({"error": f"Invalid target: {target}"}, status=400)
  108. current_target = target
  109. print(f"[API] Target set to: {current_target}")
  110. return web.json_response({"status": "ok", "target": current_target})
  111. async def handle_goto_preset(request):
  112. data = await request.json()
  113. preset_id = int(data.get("preset"))
  114. target = data.get("target", current_target)
  115. if target not in cam_transports:
  116. return web.json_response({"error": f"Invalid target: {target}"}, status=400)
  117. send_preset_command(target, cmd2=0x07, preset_id=preset_id)
  118. return web.json_response({"status": "ok", "action": "goto", "preset": preset_id, "target": target})
  119. async def handle_save_preset(request):
  120. data = await request.json()
  121. preset_id = int(data.get("preset"))
  122. target = data.get("target", current_target)
  123. if target == "both":
  124. for cam in ["cam1", "cam2"]:
  125. send_preset_command(cam, cmd2=0x03, preset_id=preset_id)
  126. elif target in cam_transports:
  127. send_preset_command(target, cmd2=0x03, preset_id=preset_id)
  128. else:
  129. return web.json_response({"error": f"Invalid target: {target}"}, status=400)
  130. return web.json_response({"status": "ok", "action": "save", "preset": preset_id, "target": target})
  131. def start_http_server():
  132. app = web.Application()
  133. app.router.add_get("/target/get", handle_status)
  134. app.router.add_post("/target/set", handle_set_target)
  135. app.router.add_post("/preset/goto", handle_goto_preset)
  136. app.router.add_post("/preset/save", handle_save_preset)
  137. return web._run_app(app, port=1423)
  138. # =========================
  139. # Main
  140. # =========================
  141. async def main():
  142. global cam_transports
  143. loop = asyncio.get_running_loop()
  144. def forward_packet(packet):
  145. transport = cam_transports.get(current_target)
  146. if transport:
  147. transport.write(packet)
  148. else:
  149. print(f"[WARN] No transport for {current_target}")
  150. # Connect to cameras
  151. cam1_transport, _ = await serial_asyncio.create_serial_connection(
  152. loop, DummyCamProtocol, CAM1_PORT, baudrate=BAUDRATE
  153. )
  154. cam2_transport, _ = await serial_asyncio.create_serial_connection(
  155. loop, DummyCamProtocol, CAM2_PORT, baudrate=BAUDRATE
  156. )
  157. cam_transports = {"cam1": cam1_transport, "cam2": cam2_transport}
  158. # Connect to joystick
  159. await serial_asyncio.create_serial_connection(
  160. loop,
  161. lambda: JoystickProtocol(forward_packet),
  162. JOYSTICK_PORT,
  163. baudrate=BAUDRATE,
  164. )
  165. # Start HTTP API in a separate task
  166. asyncio.create_task(start_http_server())
  167. # Wait forever
  168. await asyncio.Event().wait()
  169. if __name__ == "__main__":
  170. asyncio.run(main())