Sfoglia il codice sorgente

add support for usb joysticks + better documentation

Noah Vogt 4 settimane fa
parent
commit
5bd9f0cb41
4 ha cambiato i file con 298 aggiunte e 49 eliminazioni
  1. 71 4
      README.md
  2. 26 0
      dummy_cam.py
  3. 1 0
      requirements.txt
  4. 200 45
      switch.py

+ 71 - 4
README.md

@@ -1,32 +1,35 @@
 # diy_ptz_switch
 
 For a typical livestream production environment consisting of 2 ptz cameras and one joystick / controller board, most commercial options like the [Skaarhoj PTZ Fly](https://shop.skaarhoj.com/products/ptz-fly-w-blue-pill-inside) are
+
 - expensive
 - offering no API
 - nearly impossible to automate
 
-To fix this, I built a Python-based PTZ router that connects a traditional Pelco-D serial joystick to modern IP-based PTZ cameras. This server selects which camera receives the joystick input and provides a HTTP API for selecting the target camera and managing presets.
+To fix this, I built a Python-based PTZ router that enabled a single, simple PTZ Joystick to easily control two modern IP-based PTZ cameras. The server selects which camera receives the joystick input and provides a HTTP API for selecting the target camera and managing presets.
 
-The router receives Pelco-D packets from the joystick via a USB-to-RS485 converter and translates them into VISCA-over-IP commands, which are sent to the cameras over the network (UDP port 52381).
+The router receives **a)** Pelco-D packets from the joystick via a USB-to-RS485 converter *OR* **b)** normal packets from a USB Joystick and translates them into VISCA-over-IP commands, which are sent to the cameras over the network (UDP port 52381).
 
 See the following ascii diagram for the architecture.
 
                             +------------------------+
                             |      PTZ Joystick      |
                             |  (Pelco-D via RS-485)  |
+                            |    OR USB Joystick     |
                             +------------------------+
                                          |
                                          v
                          +--------------------------------+
                          |  /dev/ttyUSBX (JOYSTICK_PORT)  |
                          |  [async serial reader]         |
+                         |     OR [async evedv reader]    |
                          +--------------------------------+
                                          |
                                          v
                           +------------------------------+
                           |  Python asyncio PTZ Router   |
                           |------------------------------|
-                          | - Parse Pelco-D packets      |
+                          | - Parse Pelco-D/evdev packets|
                           | - Translate to VISCA commands|
                           | - current_target: cam1/cam2  |
                           | - Forward to selected cam(IP)|
@@ -59,6 +62,8 @@ The project uses a YAML configuration file located at `~/.config/diy_ptz_switch/
 Example configuration:
 
 ```yaml
+joystick_type: "usb_joystick" # "usb_joystick" or "pelco_serial" (default)
+
 location_roles:
   "1-4.4": joystick
   "1-4.1": cam1 # Optional if using IP
@@ -69,6 +74,68 @@ cameras:
   cam2: "192.168.1.4"
 ```
 
-- `location_roles`: Maps USB port locations to roles (like `joystick`).
+- `joystick_type`: Sets the input method.
+  - `pelco_serial` (default): Uses a traditional Pelco-D serial joystick via a USB-to-RS485 converter.
+  - `usb_joystick`: Uses a modern USB 3D PTZ joystick (HID device) using the `evdev` library.
+- `location_roles`: Maps USB port locations to roles (like `joystick`). Required for `pelco_serial`.
 - `cameras`: Maps camera names to their IP addresses for VISCA-over-IP communication.
 
+## Easy selecting
+
+You want to test the `switch.py` server, but you don't have two IP-enabled PTZ Cameras in your network? Just run two instances of `dummy_cam.py` like this:
+
+```bash
+python3 dummy_cam.py --ip 127.0.0.1 --name "Dummy Cam 1"
+python3 dummy_cam.py --ip 127.0.0.2 --name "Dummy Cam 2"
+```
+
+Obviously, the camera targets in your `config.yml` need to point to the dummy cam IP's:
+
+```yaml
+cameras:
+  cam1: "127.0.0.1"
+  cam2: "127.0.0.2"
+```
+
+## Recommended Hardware
+
+There are basically two technologies to choose from when getting a joystick:
+
+- **Potentiometer Joysticks:**
+    - These rely on physical contact. A wiper moves across a resistive element (like a volume knob) to change the voltage based on position.
+    - **Precision:** Generally lower. They are prone to "jitter" as the wiper moves, which can cause micro-stutters in your PTZ pans.
+    - **Durability:** The resistive track wears down over time due to friction, eventually leading to dead zones or "drifting" (where the camera moves even when the stick is centered).
+    - **Cost:** Very affordable; common in budget CCTV controllers.
+- **Hall Effect Joysticks:**
+    - These are contactless. They use magnets and a sensor to detect position based on magnetic field strength.
+    - **Precision:** Extremely high. Because there is no physical friction, the movement is smooth and the signal is very "clean," which is ideal for slow, cinematic camera crawls.
+    - **Durability:** Virtually infinite mechanical life. Since nothing is rubbing together, the sensor won't wear out or drift over time.
+    - **Cost:** More expensive, but considered the industry standard for professional broadcast production (like the Skaarhoj units mentioned above).
+
+If you ever encountered joystick drift while controlling cameras live using a potentiometer joystick, you will never ever go back to using potentiometer again. The inclusion of the links for the potentiometer hardware is only here for completeness, and not that i would recommend them to anyone.
+
+### Hall Effect Joysticks
+|Product|Link|Estimated Cost|
+|---|---|---|
+|Anxinshi USB PTZ Controller|[https://de.aliexpress.com/item/32825990133.html](https://de.aliexpress.com/item/32825990133.html)|120€|
+
+### Potentiometer Joystick
+
+|Product|Link|Estimated Cost|
+|---|---|---|
+|Cheapeast PTZ Joystick with RS-485 Output|[https://www.amazon.de/dp/B0DX3BFXM1](https://www.amazon.de/dp/B0DX3BFXM1)|50€|
+|RS-485 to USB Converter|[https://de.aliexpress.com/item/1005007539998595.html](https://de.aliexpress.com/item/1005007539998595.html)|2€|
+
+Note that out of 4 RS-485 to USB Converters I bought, only 3 worked.
+
+## Roadmap
+
+Possible changes in future releases:
+
+- Add back the options to output the camera signals over RS-485 (currently fixed to IP/Ethernet/RJ45, but the git history contains working code from the RS-485 days)
+- Code Cleanup (don't look at all the pylint warnings)
+- API Doc (probably using OpenAPI/swagger)
+- more RESTful API (use `PUT` instead of `POST` requests)
+- less hardcoding of values (ports, baudrate, usb device name), they should be configurable via `config.yml`
+- add a testsuite
+- a rust rewrite?

+ 26 - 0
dummy_cam.py

@@ -0,0 +1,26 @@
+import socket
+import argparse
+
+def main():
+    parser = argparse.ArgumentParser(description="Dummy VISCA PTZ Camera")
+    parser.add_argument("--ip", type=str, default="127.0.0.1", help="IP to bind to")
+    parser.add_argument("--port", type=int, default=52381, help="UDP port to bind to")
+    parser.add_argument("--name", type=str, default="Cam", help="Name for log outputs")
+    args = parser.parse_args()
+
+    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+    sock.bind((args.ip, args.port))
+    
+    print(f"[{args.name}] Listening for VISCA UDP packets on {args.ip}:{args.port}...")
+
+    try:
+        while True:
+            data, addr = sock.recvfrom(1024)
+            # Format the output as spaced hex for easy reading (e.g., 81 01 06 01 ...)
+            hex_data = ' '.join(f'{b:02X}' for b in data)
+            print(f"[{args.name}] Received from {addr}: {hex_data}")
+    except KeyboardInterrupt:
+        print(f"\n[{args.name}] Shutting down.")
+
+if __name__ == "__main__":
+    main()

+ 1 - 0
requirements.txt

@@ -2,3 +2,4 @@ pyyaml
 pyserial
 pyserial_asyncio
 aiohttp
+evdev

+ 200 - 45
switch.py

@@ -1,13 +1,15 @@
 import asyncio
 import os
 import argparse
-import socket
+from asyncio import Queue
 
-import serial_asyncio
-from serial.tools import list_ports
 import yaml
 from aiohttp import web
-from asyncio import Queue
+
+import serial_asyncio
+from serial.tools import list_ports
+import evdev
+from evdev import ecodes
 
 write_queue = Queue()
 cam_transports = {}
@@ -28,8 +30,7 @@ class ViscaOverIP:
                 pass
 
         self.transport, _ = await loop.create_datagram_endpoint(
-            ViscaProtocol,
-            remote_addr=(self.ip, self.port)
+            ViscaProtocol, remote_addr=(self.ip, self.port)
         )
 
     def write(self, visca_payload):
@@ -38,8 +39,8 @@ class ViscaOverIP:
             return
 
         header = bytearray([0x01, 0x00])  # Payload type: VISCA command
-        header += len(visca_payload).to_bytes(2, 'big')
-        header += self.sequence_number.to_bytes(4, 'big')
+        header += len(visca_payload).to_bytes(2, "big")
+        header += self.sequence_number.to_bytes(4, "big")
 
         packet = header + visca_payload
         self.transport.sendto(packet)
@@ -80,20 +81,32 @@ def translate_pelco_to_visca(packet):
         tilt_dir = 0x02
 
     if pan_dir != 0x03 or tilt_dir != 0x03:
-        return bytearray([0x81, 0x01, 0x06, 0x01, v_pan_speed, v_tilt_speed, pan_dir, tilt_dir, 0xFF])
+        return bytearray(
+            [
+                0x81,
+                0x01,
+                0x06,
+                0x01,
+                v_pan_speed,
+                v_tilt_speed,
+                pan_dir,
+                tilt_dir,
+                0xFF,
+            ]
+        )
 
     # Zoom
     # 81 01 04 07 0p FF (0p: 00=Stop, 02=Tele/In, 03=Wide/Out)
     if cmd2 & 0x20:  # Zoom In (Tele)
         return bytearray([0x81, 0x01, 0x04, 0x07, 0x02, 0xFF])
-    elif cmd2 & 0x40:  # Zoom Out (Wide)
+    if cmd2 & 0x40:  # Zoom Out (Wide)
         return bytearray([0x81, 0x01, 0x04, 0x07, 0x03, 0xFF])
 
     # Focus
     # 81 01 04 08 0p FF (02=Far, 03=Near)
     if cmd1 & 0x01:  # Focus Near
         return bytearray([0x81, 0x01, 0x04, 0x08, 0x03, 0xFF])
-    elif cmd1 & 0x02:  # Focus Far
+    if cmd1 & 0x02:  # Focus Far
         return bytearray([0x81, 0x01, 0x04, 0x08, 0x02, 0xFF])
 
     # If it's a stop packet (cmd1=0, cmd2=0) or we don't recognize it
@@ -151,6 +164,7 @@ def load_config():
 config = load_config()
 location_roles = config.get("location_roles", {})
 camera_ips = config.get("cameras", {})
+joystick_type = config.get("joystick_type", "pelco_serial")
 
 port_map = {}
 for port in list_ports.comports():
@@ -167,8 +181,99 @@ for role, dev in port_map.items():
 JOYSTICK_PORT = port_map.get("joystick")
 BAUDRATE = 2400
 
-current_target = "cam1"  # Default
-current_mode = "preview"  # Default
+CURRENT_TARGET = "cam1"  # Default
+CURRENT_MODE = "preview"  # Default
+
+
+async def evdev_joystick_task(forward_func):
+    devices = [evdev.InputDevice(path) for path in evdev.list_devices()]
+    target_device = None
+    for device in devices:
+        if "shenzhenxiaolong" in device.name.lower():
+            target_device = device
+            break
+
+    if not target_device:
+        print("[WARN] Sony/Anxinshi USB joystick not found.")
+        return
+
+    print(f"[INFO] Using USB joystick: {target_device.name}")
+
+    x_val = 512
+    y_val = 512
+    z_val = 512
+    deadzone = 50
+
+    last_pt_packet = None
+    last_z_packet = None
+
+    try:
+        async for event in target_device.async_read_loop():
+            if event.type == ecodes.EV_ABS:
+                if event.code == ecodes.ABS_X:
+                    x_val = event.value
+                elif event.code == ecodes.ABS_Y:
+                    y_val = event.value
+                elif event.code == ecodes.ABS_Z:
+                    z_val = event.value
+            elif event.type == ecodes.EV_SYN:
+                pan_dir = 0x03
+                pan_speed = 0x00
+                if x_val < 512 - deadzone:
+                    pan_dir = 0x01  # Left
+                    pan_speed = int((512 - x_val) / 512.0 * 0x18)
+                elif x_val > 512 + deadzone:
+                    pan_dir = 0x02  # Right
+                    pan_speed = int((x_val - 512) / 511.0 * 0x18)
+                pan_speed = (
+                    max(1, min(0x18, pan_speed)) if pan_dir != 0x03 else 0x00
+                )
+
+                tilt_dir = 0x03
+                tilt_speed = 0x00
+                if y_val < 512 - deadzone:
+                    tilt_dir = 0x01  # Up
+                    tilt_speed = int((512 - y_val) / 512.0 * 0x17)
+                elif y_val > 512 + deadzone:
+                    tilt_dir = 0x02  # Down
+                    tilt_speed = int((y_val - 512) / 511.0 * 0x17)
+                tilt_speed = (
+                    max(1, min(0x17, tilt_speed)) if tilt_dir != 0x03 else 0x00
+                )
+
+                pt_packet = bytearray(
+                    [
+                        0x81,
+                        0x01,
+                        0x06,
+                        0x01,
+                        pan_speed,
+                        tilt_speed,
+                        pan_dir,
+                        tilt_dir,
+                        0xFF,
+                    ]
+                )
+                if pt_packet != last_pt_packet:
+                    forward_func(pt_packet)
+                    last_pt_packet = pt_packet
+
+                zoom_cmd = 0x00
+                if z_val < 512 - deadzone:
+                    z_speed = int((512 - z_val) / 512.0 * 7)
+                    z_speed = max(0, min(7, z_speed))
+                    zoom_cmd = 0x30 | z_speed
+                elif z_val > 512 + deadzone:
+                    z_speed = int((z_val - 512) / 511.0 * 7)
+                    z_speed = max(0, min(7, z_speed))
+                    zoom_cmd = 0x20 | z_speed
+
+                z_packet = bytearray([0x81, 0x01, 0x04, 0x07, zoom_cmd, 0xFF])
+                if z_packet != last_z_packet:
+                    forward_func(z_packet)
+                    last_z_packet = z_packet
+    except Exception as e:
+        print(f"[ERROR] USB Joystick loop error: {e}")
 
 
 class JoystickProtocol(asyncio.Protocol):
@@ -200,7 +305,7 @@ class JoystickProtocol(asyncio.Protocol):
                 f"[Joystick] Pelco packet to addr {address:02X} — "
                 f"Cmd1: {cmd1:02X}, Cmd2: {cmd2:02X}, "
                 f"Data1: {data1:02X}, Data2: {data2:02X}, "
-                f"Target: {current_target}"
+                f"Target: {CURRENT_TARGET}"
             )
 
             visca_packet = translate_pelco_to_visca(packet)
@@ -209,7 +314,7 @@ class JoystickProtocol(asyncio.Protocol):
 
 
 def make_visca_preset_command(cmd2, preset_id):
-    if not (0 <= preset_id <= 0xFF):
+    if not 0 <= preset_id <= 0xFF:
         raise ValueError("Preset ID must be between 0 and 255")
 
     # VISCA: 81 01 04 3F 0p pp FF
@@ -227,26 +332,54 @@ def send_preset_command(cam_name, cmd2, preset_id):
 
 
 async def handle_status(request):
-    return web.json_response({"current_target": current_target})
+    return web.json_response({"current_target": CURRENT_TARGET})
 
 
 async def handle_set_target(request):
-    global current_target
-    data = await request.json()
-    target = data.get("target")
+    global CURRENT_TARGET
+    target = None
+    if request.can_read_body:
+        try:
+            data = await request.json()
+            target = data.get("target")
+        except Exception:
+            pass
+    if not target:
+        target = request.query.get("target")
+
     if target not in cam_transports:
         return web.json_response(
             {"error": f"Invalid target: {target}"}, status=400
         )
-    current_target = target
-    print(f"[API] Target set to: {current_target}")
-    return web.json_response({"status": "ok", "target": current_target})
+    CURRENT_TARGET = target
+    print(f"[API] Target set to: {CURRENT_TARGET}")
+    return web.json_response({"status": "ok", "target": CURRENT_TARGET})
 
 
 async def handle_goto_preset(request):
-    data = await request.json()
-    preset_id = int(data.get("preset"))
-    target = data.get("target", current_target)
+    preset_id = None
+    target = None
+    if request.can_read_body:
+        try:
+            data = await request.json()
+            preset_id = data.get("preset")
+            target = data.get("target")
+        except Exception:
+            pass
+
+    if preset_id is None:
+        preset_id = request.query.get("preset")
+    if target is None:
+        target = request.query.get("target", CURRENT_TARGET)
+
+    if preset_id is None:
+        return web.json_response({"error": "Missing preset"}, status=400)
+
+    try:
+        preset_id = int(preset_id)
+    except ValueError:
+        return web.json_response({"error": "Invalid preset ID"}, status=400)
+
     if target not in cam_transports:
         return web.json_response(
             {"error": f"Invalid target: {target}"}, status=400
@@ -263,9 +396,29 @@ async def handle_goto_preset(request):
 
 
 async def handle_save_preset(request):
-    data = await request.json()
-    preset_id = int(data.get("preset"))
-    target = data.get("target", current_target)
+    preset_id = None
+    target = None
+    if request.can_read_body:
+        try:
+            data = await request.json()
+            preset_id = data.get("preset")
+            target = data.get("target")
+        except Exception:
+            pass
+
+    if preset_id is None:
+        preset_id = request.query.get("preset")
+    if target is None:
+        target = request.query.get("target", CURRENT_TARGET)
+
+    if preset_id is None:
+        return web.json_response({"error": "Missing preset"}, status=400)
+
+    try:
+        preset_id = int(preset_id)
+    except ValueError:
+        return web.json_response({"error": "Invalid preset ID"}, status=400)
+
     if target == "both":
         for cam in ["cam1", "cam2"]:
             send_preset_command(cam, cmd2=0x03, preset_id=preset_id)
@@ -287,16 +440,16 @@ async def handle_save_preset(request):
 
 
 async def handle_set_mode(request):
-    global current_mode
+    global CURRENT_MODE
     mode = request.query.get("mode")
     if mode not in ("preview", "program"):
         return web.json_response({"error": "Invalid mode"}, status=400)
-    current_mode = mode
-    return web.json_response({"status": "ok", "mode": current_mode})
+    CURRENT_MODE = mode
+    return web.json_response({"status": "ok", "mode": CURRENT_MODE})
 
 
 async def handle_get_mode(request):
-    return web.json_response({"mode": current_mode})
+    return web.json_response({"mode": CURRENT_MODE})
 
 
 async def start_http_server():
@@ -320,10 +473,10 @@ async def main():
     loop = asyncio.get_running_loop()
 
     def forward_packet(packet):
-        if current_target in cam_transports:
-            enqueue_write(current_target, packet)
+        if CURRENT_TARGET in cam_transports:
+            enqueue_write(CURRENT_TARGET, packet)
         else:
-            print(f"[WARN] No transport for {current_target}")
+            print(f"[WARN] No transport for {CURRENT_TARGET}")
 
     # Connect to cameras via VISCA over IP
     for cam_name, ip in camera_ips.items():
@@ -338,16 +491,19 @@ async def main():
 
     asyncio.create_task(writer_task())
 
-
-    if JOYSTICK_PORT:
-        await serial_asyncio.create_serial_connection(
-            loop,
-            lambda: JoystickProtocol(forward_packet),
-            JOYSTICK_PORT,
-            baudrate=BAUDRATE,
-        )
+    if joystick_type == "usb_joystick":
+        print("[INFO] Starting USB joystick task")
+        asyncio.create_task(evdev_joystick_task(forward_packet))
     else:
-        print("[WARN] No joystick port found")
+        if JOYSTICK_PORT:
+            await serial_asyncio.create_serial_connection(
+                loop,
+                lambda: JoystickProtocol(forward_packet),
+                JOYSTICK_PORT,
+                baudrate=BAUDRATE,
+            )
+        else:
+            print("[WARN] No joystick port found for serial connection")
 
     asyncio.create_task(start_http_server())
 
@@ -359,6 +515,5 @@ if __name__ == "__main__":
     asyncio.run(main())
 
 
-
 if __name__ == "__main__":
     asyncio.run(main())