| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364 |
- import asyncio
- import os
- import argparse
- import socket
- import serial_asyncio
- from serial.tools import list_ports
- import yaml
- from aiohttp import web
- from asyncio import Queue
- write_queue = Queue()
- cam_transports = {}
- VISCA_PORT = 52381
- class ViscaOverIP:
- def __init__(self, ip, port):
- self.ip = ip
- self.port = port
- self.sequence_number = 1
- self.transport = None
- async def connect(self, loop):
- class ViscaProtocol(asyncio.DatagramProtocol):
- def connection_made(self, transport):
- pass
- self.transport, _ = await loop.create_datagram_endpoint(
- ViscaProtocol,
- remote_addr=(self.ip, self.port)
- )
- def write(self, visca_payload):
- if not self.transport:
- print(f"[ERROR] No transport for {self.ip}")
- return
- header = bytearray([0x01, 0x00]) # Payload type: VISCA command
- header += len(visca_payload).to_bytes(2, 'big')
- header += self.sequence_number.to_bytes(4, 'big')
- packet = header + visca_payload
- self.transport.sendto(packet)
- self.sequence_number = (self.sequence_number + 1) & 0xFFFFFFFF
- def translate_pelco_to_visca(packet):
- """
- Translates a 7-byte Pelco-D packet to a VISCA command.
- """
- if len(packet) < 7:
- return None
- cmd1 = packet[2]
- cmd2 = packet[3]
- pan_speed = packet[4]
- tilt_speed = packet[5]
- # Map speeds (Pelco 00-3F to VISCA 01-18/17)
- v_pan_speed = max(1, min(0x18, int(pan_speed * 0x18 / 0x3F)))
- v_tilt_speed = max(1, min(0x17, int(tilt_speed * 0x17 / 0x3F)))
- # Pan/Tilt Drive
- # 81 01 06 01 VV WW 0x 0y FF
- # x: 01=left, 02=right, 03=stop
- # y: 01=up, 02=down, 03=stop
- pan_dir = 0x03
- if cmd2 & 0x02: # Right
- pan_dir = 0x02
- elif cmd2 & 0x04: # Left
- pan_dir = 0x01
- tilt_dir = 0x03
- if cmd2 & 0x08: # Up
- tilt_dir = 0x01
- elif cmd2 & 0x10: # Down
- tilt_dir = 0x02
- if pan_dir != 0x03 or tilt_dir != 0x03:
- return bytearray([0x81, 0x01, 0x06, 0x01, v_pan_speed, v_tilt_speed, pan_dir, tilt_dir, 0xFF])
- # Zoom
- # 81 01 04 07 0p FF (0p: 00=Stop, 02=Tele/In, 03=Wide/Out)
- if cmd2 & 0x20: # Zoom In (Tele)
- return bytearray([0x81, 0x01, 0x04, 0x07, 0x02, 0xFF])
- elif cmd2 & 0x40: # Zoom Out (Wide)
- return bytearray([0x81, 0x01, 0x04, 0x07, 0x03, 0xFF])
- # Focus
- # 81 01 04 08 0p FF (02=Far, 03=Near)
- if cmd1 & 0x01: # Focus Near
- return bytearray([0x81, 0x01, 0x04, 0x08, 0x03, 0xFF])
- elif cmd1 & 0x02: # Focus Far
- return bytearray([0x81, 0x01, 0x04, 0x08, 0x02, 0xFF])
- # If it's a stop packet (cmd1=0, cmd2=0) or we don't recognize it
- if cmd1 == 0 and cmd2 == 0:
- # General stop for Pan/Tilt and Zoom
- # Note: VISCA Zoom stop is separate but we'll prioritize P/T stop
- return bytearray([0x81, 0x01, 0x06, 0x01, 0x00, 0x00, 0x03, 0x03, 0xFF])
- return None
- async def writer_task():
- while True:
- cam_name, packet = await write_queue.get()
- visca_obj = cam_transports.get(cam_name)
- if visca_obj:
- try:
- visca_obj.write(packet)
- except Exception as e:
- print(f"[ERROR] Write failed for {cam_name}: {e}")
- else:
- print(f"[WARN] No transport for {cam_name}")
- write_queue.task_done()
- def enqueue_write(cam_name, packet):
- write_queue.put_nowait((cam_name, packet))
- def get_config_path():
- parser = argparse.ArgumentParser(description="PTZ router")
- parser.add_argument(
- "-c", "--config", type=str, help="Path to config.yml file"
- )
- args = parser.parse_args()
- if args.config:
- return args.config
- xdg_config_home = os.environ.get(
- "XDG_CONFIG_HOME", os.path.expanduser("~/.config")
- )
- return os.path.join(xdg_config_home, "diy_ptz_switch", "config.yml")
- def load_config():
- config_file = get_config_path()
- if not os.path.exists(config_file):
- raise FileNotFoundError(f"Config file not found at: {config_file}")
- with open(config_file, "r", encoding="utf-8") as f:
- config = yaml.safe_load(f)
- return config
- config = load_config()
- location_roles = config.get("location_roles", {})
- camera_ips = config.get("cameras", {})
- port_map = {}
- for port in list_ports.comports():
- if "LOCATION=" in port.hwid:
- loc = port.hwid.split("LOCATION=")[-1]
- if loc in location_roles:
- role = location_roles[loc]
- port_map[role] = port.device
- print("Port mapping by USB port:")
- for role, dev in port_map.items():
- print(f" {role}: {dev}")
- JOYSTICK_PORT = port_map.get("joystick")
- BAUDRATE = 2400
- current_target = "cam1" # Default
- current_mode = "preview" # Default
- class JoystickProtocol(asyncio.Protocol):
- def __init__(self, forward_func):
- self.forward = forward_func
- self.buffer = bytearray()
- def data_received(self, data):
- print(f"[DEBUG] Raw data received: {data.hex()}")
- self.buffer += data
- self.parse_pelco_d_packets()
- def parse_pelco_d_packets(self):
- while len(self.buffer) >= 7:
- if self.buffer[0] != 0xFF:
- self.buffer.pop(0)
- continue
- packet = self.buffer[:7]
- self.buffer = self.buffer[7:]
- address = packet[1]
- cmd1 = packet[2]
- cmd2 = packet[3]
- data1 = packet[4]
- data2 = packet[5]
- print(
- f"[Joystick] Pelco packet to addr {address:02X} — "
- f"Cmd1: {cmd1:02X}, Cmd2: {cmd2:02X}, "
- f"Data1: {data1:02X}, Data2: {data2:02X}, "
- f"Target: {current_target}"
- )
- visca_packet = translate_pelco_to_visca(packet)
- if visca_packet:
- self.forward(visca_packet)
- def make_visca_preset_command(cmd2, preset_id):
- if not (0 <= preset_id <= 0xFF):
- raise ValueError("Preset ID must be between 0 and 255")
- # VISCA: 81 01 04 3F 0p pp FF
- # 0p: 01=Set, 02=Recall
- action = 0x02 if cmd2 == 0x07 else 0x01
- return bytearray([0x81, 0x01, 0x04, 0x3F, action, preset_id, 0xFF])
- def send_preset_command(cam_name, cmd2, preset_id):
- packet = make_visca_preset_command(cmd2, preset_id)
- print(
- f"[API] Queueing VISCA preset action={cmd2:02X} preset_id={preset_id} for {cam_name}"
- )
- enqueue_write(cam_name, packet)
- async def handle_status(request):
- return web.json_response({"current_target": current_target})
- async def handle_set_target(request):
- global current_target
- data = await request.json()
- target = data.get("target")
- if target not in cam_transports:
- return web.json_response(
- {"error": f"Invalid target: {target}"}, status=400
- )
- current_target = target
- print(f"[API] Target set to: {current_target}")
- return web.json_response({"status": "ok", "target": current_target})
- async def handle_goto_preset(request):
- data = await request.json()
- preset_id = int(data.get("preset"))
- target = data.get("target", current_target)
- if target not in cam_transports:
- return web.json_response(
- {"error": f"Invalid target: {target}"}, status=400
- )
- send_preset_command(target, cmd2=0x07, preset_id=preset_id)
- return web.json_response(
- {
- "status": "ok",
- "action": "goto",
- "preset": preset_id,
- "target": target,
- }
- )
- async def handle_save_preset(request):
- data = await request.json()
- preset_id = int(data.get("preset"))
- target = data.get("target", current_target)
- if target == "both":
- for cam in ["cam1", "cam2"]:
- send_preset_command(cam, cmd2=0x03, preset_id=preset_id)
- elif target in cam_transports:
- send_preset_command(target, cmd2=0x03, preset_id=preset_id)
- else:
- return web.json_response(
- {"error": f"Invalid target: {target}"}, status=400
- )
- return web.json_response(
- {
- "status": "ok",
- "action": "save",
- "preset": preset_id,
- "target": target,
- }
- )
- async def handle_set_mode(request):
- global current_mode
- mode = request.query.get("mode")
- if mode not in ("preview", "program"):
- return web.json_response({"error": "Invalid mode"}, status=400)
- current_mode = mode
- return web.json_response({"status": "ok", "mode": current_mode})
- async def handle_get_mode(request):
- return web.json_response({"mode": current_mode})
- async def start_http_server():
- app = web.Application()
- app.router.add_get("/target/get", handle_status)
- app.router.add_post("/target/set", handle_set_target)
- app.router.add_post("/preset/goto", handle_goto_preset)
- app.router.add_post("/preset/save", handle_save_preset)
- app.router.add_get("/mode/get", handle_get_mode)
- app.router.add_post("/mode/set", handle_set_mode)
- runner = web.AppRunner(app)
- await runner.setup()
- site = web.TCPSite(runner, "0.0.0.0", 1423)
- await site.start()
- async def main():
- global cam_transports
- loop = asyncio.get_running_loop()
- def forward_packet(packet):
- if current_target in cam_transports:
- enqueue_write(current_target, packet)
- else:
- print(f"[WARN] No transport for {current_target}")
- # Connect to cameras via VISCA over IP
- for cam_name, ip in camera_ips.items():
- if cam_name.startswith("cam"):
- print(f"[INFO] Connecting to {cam_name} at {ip}")
- visca_obj = ViscaOverIP(ip, VISCA_PORT)
- await visca_obj.connect(loop)
- cam_transports[cam_name] = visca_obj
- if not cam_transports:
- print("[WARN] No cameras configured")
- asyncio.create_task(writer_task())
- if JOYSTICK_PORT:
- await serial_asyncio.create_serial_connection(
- loop,
- lambda: JoystickProtocol(forward_packet),
- JOYSTICK_PORT,
- baudrate=BAUDRATE,
- )
- else:
- print("[WARN] No joystick port found")
- asyncio.create_task(start_http_server())
- # Wait forever
- await asyncio.Event().wait()
- if __name__ == "__main__":
- asyncio.run(main())
- if __name__ == "__main__":
- asyncio.run(main())
|