123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- import asyncio
- import os
- import serial_asyncio
- from serial.tools import list_ports
- import yaml
- from aiohttp import web
- # =========================
- # Config loading via XDG
- # =========================
- def get_config_path():
- xdg_config_home = os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config"))
- return os.path.join(xdg_config_home, "diy_ptz_switch", "config.yml")
- def load_location_roles():
- config_file = get_config_path()
- if not os.path.exists(config_file):
- raise FileNotFoundError(f"Config file not found at: {config_file}")
- with open(config_file, "r") as f:
- config = yaml.safe_load(f)
- return config.get("location_roles", {})
- location_roles = load_location_roles()
- # =========================
- # Serial Port Mapping
- # =========================
- port_map = {}
- for port in list_ports.comports():
- if "LOCATION=" in port.hwid:
- loc = port.hwid.split("LOCATION=")[-1]
- if loc in location_roles:
- role = location_roles[loc]
- port_map[role] = port.device
- print("Port mapping by USB port:")
- for role, dev in port_map.items():
- print(f" {role}: {dev}")
- JOYSTICK_PORT = port_map.get("joystick")
- CAM1_PORT = port_map.get("cam1")
- CAM2_PORT = port_map.get("cam2")
- BAUDRATE = 2400
- # =========================
- # Globals
- # =========================
- current_target = "cam1" # Default
- cam_transports = {}
- # =========================
- # Pelco-D Packet Forwarding
- # =========================
- class JoystickProtocol(asyncio.Protocol):
- def __init__(self, forward_func):
- self.forward = forward_func
- self.buffer = bytearray()
- def data_received(self, data):
- print(f"[DEBUG] Raw data received: {data.hex()}")
- self.buffer += data
- self.parse_pelco_d_packets()
- def parse_pelco_d_packets(self):
- while len(self.buffer) >= 7:
- if self.buffer[0] != 0xFF:
- self.buffer.pop(0)
- continue
- packet = self.buffer[:7]
- self.buffer = self.buffer[7:]
- address = packet[1]
- cmd1 = packet[2]
- cmd2 = packet[3]
- data1 = packet[4]
- data2 = packet[5]
- print(
- f"[Joystick] Packet to camera addr {address:02X} — "
- f"Cmd1: {cmd1:02X}, Cmd2: {cmd2:02X}, "
- f"Data1: {data1:02X}, Data2: {data2:02X}, "
- f"Target: {current_target}"
- )
- self.forward(packet)
- class DummyCamProtocol(asyncio.Protocol):
- def connection_made(self, transport):
- pass
- # =========================
- # Pelco-D Preset Helpers
- # =========================
- def make_preset_command(cam_address: int, cmd2: int, preset_id: int):
- if not (1 <= preset_id <= 0xFF):
- raise ValueError("Preset ID must be between 1 and 255")
- cmd1 = 0x00
- data1 = 0x00
- data2 = preset_id
- packet = bytearray([0xFF, cam_address, cmd1, cmd2, data1, data2])
- checksum = sum(packet[1:]) % 256
- packet.append(checksum)
- return packet
- def send_preset_command(cam_name, cmd2, preset_id):
- transport = cam_transports.get(cam_name)
- if not transport:
- print(f"[WARN] No transport for {cam_name}")
- return
- packet = make_preset_command(1, cmd2, preset_id) # Camera address is hardcoded as 1
- print(f"[API] Sending preset cmd2={cmd2:02X} preset_id={preset_id} to {cam_name}")
- transport.write(packet)
- # =========================
- # HTTP Server (aiohttp)
- # =========================
- async def handle_status(request):
- return web.json_response({"current_target": current_target})
- async def handle_set_target(request):
- global current_target
- data = await request.json()
- target = data.get("target")
- if target not in cam_transports:
- return web.json_response({"error": f"Invalid target: {target}"}, status=400)
- current_target = target
- print(f"[API] Target set to: {current_target}")
- return web.json_response({"status": "ok", "target": current_target})
- async def handle_goto_preset(request):
- data = await request.json()
- preset_id = int(data.get("preset"))
- target = data.get("target", current_target)
- if target not in cam_transports:
- return web.json_response({"error": f"Invalid target: {target}"}, status=400)
- send_preset_command(target, cmd2=0x07, preset_id=preset_id)
- return web.json_response({"status": "ok", "action": "goto", "preset": preset_id, "target": target})
- async def handle_save_preset(request):
- data = await request.json()
- preset_id = int(data.get("preset"))
- target = data.get("target", current_target)
- if target == "both":
- for cam in ["cam1", "cam2"]:
- send_preset_command(cam, cmd2=0x03, preset_id=preset_id)
- elif target in cam_transports:
- send_preset_command(target, cmd2=0x03, preset_id=preset_id)
- else:
- return web.json_response({"error": f"Invalid target: {target}"}, status=400)
- return web.json_response({"status": "ok", "action": "save", "preset": preset_id, "target": target})
- def start_http_server():
- app = web.Application()
- app.router.add_get("/target/get", handle_status)
- app.router.add_post("/target/set", handle_set_target)
- app.router.add_post("/preset/goto", handle_goto_preset)
- app.router.add_post("/preset/save", handle_save_preset)
- return web._run_app(app, port=1423)
- # =========================
- # Main
- # =========================
- async def main():
- global cam_transports
- loop = asyncio.get_running_loop()
- def forward_packet(packet):
- transport = cam_transports.get(current_target)
- if transport:
- transport.write(packet)
- else:
- print(f"[WARN] No transport for {current_target}")
- # Connect to cameras
- cam1_transport, _ = await serial_asyncio.create_serial_connection(
- loop, DummyCamProtocol, CAM1_PORT, baudrate=BAUDRATE
- )
- cam2_transport, _ = await serial_asyncio.create_serial_connection(
- loop, DummyCamProtocol, CAM2_PORT, baudrate=BAUDRATE
- )
- cam_transports = {"cam1": cam1_transport, "cam2": cam2_transport}
- # Connect to joystick
- await serial_asyncio.create_serial_connection(
- loop,
- lambda: JoystickProtocol(forward_packet),
- JOYSTICK_PORT,
- baudrate=BAUDRATE,
- )
- # Start HTTP API in a separate task
- asyncio.create_task(start_http_server())
- # Wait forever
- await asyncio.Event().wait()
- if __name__ == "__main__":
- asyncio.run(main())
|