Browse Source

use visca over ip instead of rs485 to send command to ptz cameras

Noah Vogt 1 week ago
parent
commit
ea35fe024f
2 changed files with 167 additions and 55 deletions
  1. 28 12
      README.md
  2. 139 43
      switch.py

+ 28 - 12
README.md

@@ -4,9 +4,10 @@ For a typical livestream production environment consisting of 2 ptz cameras and
 - expensive
 - offering no API
 - nearly impossible to automate
-- offering no rs485 serial connection support (basically only IP via RJ45)
 
-To fix this, I thought why not connect the ptz cameras via their rs485 to a rs485 <-> usb serial converter to a computer and use a server that selects which ptz camera is sent the current joystick input. It also has a http api that allows seleting the current camera target and ptz commands like save_preset or goto_preset.
+To fix this, I built a Python-based PTZ router that connects a traditional Pelco-D serial joystick to modern IP-based PTZ cameras. This server selects which camera receives the joystick input and provides a HTTP API for selecting the target camera and managing presets.
+
+The router receives Pelco-D packets from the joystick via a USB-to-RS485 converter and translates them into VISCA-over-IP commands, which are sent to the cameras over the network (UDP port 52381).
 
 See the following ascii diagram for the architecture.
 
@@ -26,8 +27,9 @@ See the following ascii diagram for the architecture.
                           |  Python asyncio PTZ Router   |
                           |------------------------------|
                           | - Parse Pelco-D packets      |
+                          | - Translate to VISCA commands|
                           | - current_target: cam1/cam2  |
-                          | - Forward to selected cam    |
+                          | - Forward to selected cam(IP)|
                           | - Handle HTTP API requests   |
                           |                              |
                           |   +----------------------+   |
@@ -43,16 +45,30 @@ See the following ascii diagram for the architecture.
                           +------------------------------+
                                          |
                     +--------------------+------------------+
-                    |                                       |
+                    | (Network / UDP)                       | (Network / UDP)
                     v                                       v
       +----------------------------+         +----------------------------+
-      |  /dev/ttyUSBY (CAM1_PORT)  |         |  /dev/ttyUSBZ (CAM2_PORT)  |
-      |  [async serial writer]     |         |  [async serial writer]     |
+      |  Camera 1 (192.168.1.3)    |         |  Camera 2 (192.168.1.4)    |
+      |  [VISCA over IP]           |         |  [VISCA over IP]           |
       +----------------------------+         +----------------------------+
-                    |                                        |
-                    v                                        v
-        +------------------------+              +------------------------+
-        |  PTZ Camera 1          |              |   PTZ Camera 2         |
-        |  (Pelco-D via RS-485)  |              |  (Pelco-D via RS-485)  |
-        +------------------------+              +------------------------+
+
+## Configuration
+
+The project uses a YAML configuration file located at `~/.config/diy_ptz_switch/config.yml`.
+
+Example configuration:
+
+```yaml
+location_roles:
+  "1-4.4": joystick
+  "1-4.1": cam1 # Optional if using IP
+  "1-4.2": cam2 # Optional if using IP
+
+cameras:
+  cam1: "192.168.1.3"
+  cam2: "192.168.1.4"
+```
+
+- `location_roles`: Maps USB port locations to roles (like `joystick`).
+- `cameras`: Maps camera names to their IP addresses for VISCA-over-IP communication.
 

+ 139 - 43
switch.py

@@ -1,6 +1,7 @@
 import asyncio
 import os
 import argparse
+import socket
 
 import serial_asyncio
 from serial.tools import list_ports
@@ -11,14 +12,106 @@ from asyncio import Queue
 write_queue = Queue()
 cam_transports = {}
 
+VISCA_PORT = 52381
+
+
+class ViscaOverIP:
+    def __init__(self, ip, port):
+        self.ip = ip
+        self.port = port
+        self.sequence_number = 1
+        self.transport = None
+
+    async def connect(self, loop):
+        class ViscaProtocol(asyncio.DatagramProtocol):
+            def connection_made(self, transport):
+                pass
+
+        self.transport, _ = await loop.create_datagram_endpoint(
+            ViscaProtocol,
+            remote_addr=(self.ip, self.port)
+        )
+
+    def write(self, visca_payload):
+        if not self.transport:
+            print(f"[ERROR] No transport for {self.ip}")
+            return
+
+        header = bytearray([0x01, 0x00])  # Payload type: VISCA command
+        header += len(visca_payload).to_bytes(2, 'big')
+        header += self.sequence_number.to_bytes(4, 'big')
+
+        packet = header + visca_payload
+        self.transport.sendto(packet)
+        self.sequence_number = (self.sequence_number + 1) & 0xFFFFFFFF
+
+
+def translate_pelco_to_visca(packet):
+    """
+    Translates a 7-byte Pelco-D packet to a VISCA command.
+    """
+    if len(packet) < 7:
+        return None
+
+    cmd1 = packet[2]
+    cmd2 = packet[3]
+    pan_speed = packet[4]
+    tilt_speed = packet[5]
+
+    # Map speeds (Pelco 00-3F to VISCA 01-18/17)
+    v_pan_speed = max(1, min(0x18, int(pan_speed * 0x18 / 0x3F)))
+    v_tilt_speed = max(1, min(0x17, int(tilt_speed * 0x17 / 0x3F)))
+
+    # Pan/Tilt Drive
+    # 81 01 06 01 VV WW 0x 0y FF
+    # x: 01=left, 02=right, 03=stop
+    # y: 01=up, 02=down, 03=stop
+
+    pan_dir = 0x03
+    if cmd2 & 0x02:  # Right
+        pan_dir = 0x02
+    elif cmd2 & 0x04:  # Left
+        pan_dir = 0x01
+
+    tilt_dir = 0x03
+    if cmd2 & 0x08:  # Up
+        tilt_dir = 0x01
+    elif cmd2 & 0x10:  # Down
+        tilt_dir = 0x02
+
+    if pan_dir != 0x03 or tilt_dir != 0x03:
+        return bytearray([0x81, 0x01, 0x06, 0x01, v_pan_speed, v_tilt_speed, pan_dir, tilt_dir, 0xFF])
+
+    # Zoom
+    # 81 01 04 07 0p FF (0p: 00=Stop, 02=Tele/In, 03=Wide/Out)
+    if cmd2 & 0x20:  # Zoom In (Tele)
+        return bytearray([0x81, 0x01, 0x04, 0x07, 0x02, 0xFF])
+    elif cmd2 & 0x40:  # Zoom Out (Wide)
+        return bytearray([0x81, 0x01, 0x04, 0x07, 0x03, 0xFF])
+
+    # Focus
+    # 81 01 04 08 0p FF (02=Far, 03=Near)
+    if cmd1 & 0x01:  # Focus Near
+        return bytearray([0x81, 0x01, 0x04, 0x08, 0x03, 0xFF])
+    elif cmd1 & 0x02:  # Focus Far
+        return bytearray([0x81, 0x01, 0x04, 0x08, 0x02, 0xFF])
+
+    # If it's a stop packet (cmd1=0, cmd2=0) or we don't recognize it
+    if cmd1 == 0 and cmd2 == 0:
+        # General stop for Pan/Tilt and Zoom
+        # Note: VISCA Zoom stop is separate but we'll prioritize P/T stop
+        return bytearray([0x81, 0x01, 0x06, 0x01, 0x00, 0x00, 0x03, 0x03, 0xFF])
+
+    return None
+
 
 async def writer_task():
     while True:
         cam_name, packet = await write_queue.get()
-        transport = cam_transports.get(cam_name)
-        if transport:
+        visca_obj = cam_transports.get(cam_name)
+        if visca_obj:
             try:
-                transport.write(packet)
+                visca_obj.write(packet)
             except Exception as e:
                 print(f"[ERROR] Write failed for {cam_name}: {e}")
         else:
@@ -46,16 +139,18 @@ def get_config_path():
     return os.path.join(xdg_config_home, "diy_ptz_switch", "config.yml")
 
 
-def load_location_roles():
+def load_config():
     config_file = get_config_path()
     if not os.path.exists(config_file):
         raise FileNotFoundError(f"Config file not found at: {config_file}")
     with open(config_file, "r", encoding="utf-8") as f:
         config = yaml.safe_load(f)
-    return config.get("location_roles", {})
+    return config
 
 
-location_roles = load_location_roles()
+config = load_config()
+location_roles = config.get("location_roles", {})
+camera_ips = config.get("cameras", {})
 
 port_map = {}
 for port in list_ports.comports():
@@ -70,8 +165,6 @@ for role, dev in port_map.items():
     print(f"  {role}: {dev}")
 
 JOYSTICK_PORT = port_map.get("joystick")
-CAM1_PORT = port_map.get("cam1")
-CAM2_PORT = port_map.get("cam2")
 BAUDRATE = 2400
 
 current_target = "cam1"  # Default
@@ -104,39 +197,31 @@ class JoystickProtocol(asyncio.Protocol):
             data2 = packet[5]
 
             print(
-                f"[Joystick] Packet to camera addr {address:02X} — "
+                f"[Joystick] Pelco packet to addr {address:02X} — "
                 f"Cmd1: {cmd1:02X}, Cmd2: {cmd2:02X}, "
                 f"Data1: {data1:02X}, Data2: {data2:02X}, "
                 f"Target: {current_target}"
             )
 
-            self.forward(packet)
-
-
-class DummyCamProtocol(asyncio.Protocol):
-    def connection_made(self, transport):
-        pass
+            visca_packet = translate_pelco_to_visca(packet)
+            if visca_packet:
+                self.forward(visca_packet)
 
 
-def make_preset_command(cam_address: int, cmd2: int, preset_id: int):
-    if not (1 <= preset_id <= 0xFF):
-        raise ValueError("Preset ID must be between 1 and 255")
+def make_visca_preset_command(cmd2, preset_id):
+    if not (0 <= preset_id <= 0xFF):
+        raise ValueError("Preset ID must be between 0 and 255")
 
-    cmd1 = 0x00
-    data1 = 0x00
-    data2 = preset_id
-    packet = bytearray([0xFF, cam_address, cmd1, cmd2, data1, data2])
-    checksum = sum(packet[1:]) % 256
-    packet.append(checksum)
-    return packet
+    # VISCA: 81 01 04 3F 0p pp FF
+    # 0p: 01=Set, 02=Recall
+    action = 0x02 if cmd2 == 0x07 else 0x01
+    return bytearray([0x81, 0x01, 0x04, 0x3F, action, preset_id, 0xFF])
 
 
 def send_preset_command(cam_name, cmd2, preset_id):
-    # cam id is hardcoded, should be the same on all cameras or else they will
-    # not accept the preset commands
-    packet = make_preset_command(1, cmd2, preset_id)
+    packet = make_visca_preset_command(cmd2, preset_id)
     print(
-        f"[API] Queueing preset cmd2={cmd2:02X} preset_id={preset_id} for {cam_name}"
+        f"[API] Queueing VISCA preset action={cmd2:02X} preset_id={preset_id} for {cam_name}"
     )
     enqueue_write(cam_name, packet)
 
@@ -240,23 +325,29 @@ async def main():
         else:
             print(f"[WARN] No transport for {current_target}")
 
-    # Connect to cameras
-    cam1_transport, _ = await serial_asyncio.create_serial_connection(
-        loop, DummyCamProtocol, CAM1_PORT, baudrate=BAUDRATE
-    )
-    cam2_transport, _ = await serial_asyncio.create_serial_connection(
-        loop, DummyCamProtocol, CAM2_PORT, baudrate=BAUDRATE
-    )
-    cam_transports = {"cam1": cam1_transport, "cam2": cam2_transport}
+    # Connect to cameras via VISCA over IP
+    for cam_name, ip in camera_ips.items():
+        if cam_name.startswith("cam"):
+            print(f"[INFO] Connecting to {cam_name} at {ip}")
+            visca_obj = ViscaOverIP(ip, VISCA_PORT)
+            await visca_obj.connect(loop)
+            cam_transports[cam_name] = visca_obj
+
+    if not cam_transports:
+        print("[WARN] No cameras configured")
 
     asyncio.create_task(writer_task())
 
-    await serial_asyncio.create_serial_connection(
-        loop,
-        lambda: JoystickProtocol(forward_packet),
-        JOYSTICK_PORT,
-        baudrate=BAUDRATE,
-    )
+
+    if JOYSTICK_PORT:
+        await serial_asyncio.create_serial_connection(
+            loop,
+            lambda: JoystickProtocol(forward_packet),
+            JOYSTICK_PORT,
+            baudrate=BAUDRATE,
+        )
+    else:
+        print("[WARN] No joystick port found")
 
     asyncio.create_task(start_http_server())
 
@@ -266,3 +357,8 @@ async def main():
 
 if __name__ == "__main__":
     asyncio.run(main())
+
+
+
+if __name__ == "__main__":
+    asyncio.run(main())