|
@@ -1,31 +1,31 @@
|
|
|
-# Copyright © 2025 Noah Vogt <noah@noahvogt.com>
|
|
|
-
|
|
|
-# This program is free software: you can redistribute it and/or modify
|
|
|
-# it under the terms of the GNU General Public License as published by
|
|
|
-# the Free Software Foundation, either version 3 of the License, or
|
|
|
-# (at your option) any later version.
|
|
|
-
|
|
|
-# This program is distributed in the hope that it will be useful,
|
|
|
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
|
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
|
-# GNU General Public License for more details.
|
|
|
-
|
|
|
-# You should have received a copy of the GNU General Public License
|
|
|
-# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
-
|
|
|
import asyncio
|
|
|
import serial_asyncio
|
|
|
from serial.tools import list_ports
|
|
|
-
|
|
|
-# TODO: Don't use hardcoded usb port mapping anymore
|
|
|
-location_roles = {
|
|
|
- "1-1.4": "joystick",
|
|
|
- "1-1.1": "cam1",
|
|
|
- "1-1.2": "cam2",
|
|
|
-}
|
|
|
-
|
|
|
+import os
|
|
|
+import yaml
|
|
|
+from aiohttp import web
|
|
|
+
|
|
|
+# =========================
|
|
|
+# Config loading via XDG
|
|
|
+# =========================
|
|
|
+def get_config_path():
|
|
|
+ xdg_config_home = os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config"))
|
|
|
+ return os.path.join(xdg_config_home, "diy_ptz_switch", "config.yml")
|
|
|
+
|
|
|
+def load_location_roles():
|
|
|
+ config_file = get_config_path()
|
|
|
+ if not os.path.exists(config_file):
|
|
|
+ raise FileNotFoundError(f"Config file not found at: {config_file}")
|
|
|
+ with open(config_file, "r") as f:
|
|
|
+ config = yaml.safe_load(f)
|
|
|
+ return config.get("location_roles", {})
|
|
|
+
|
|
|
+location_roles = load_location_roles()
|
|
|
+
|
|
|
+# =========================
|
|
|
+# Serial Port Mapping
|
|
|
+# =========================
|
|
|
port_map = {}
|
|
|
-
|
|
|
for port in list_ports.comports():
|
|
|
if "LOCATION=" in port.hwid:
|
|
|
loc = port.hwid.split("LOCATION=")[-1]
|
|
@@ -33,21 +33,24 @@ for port in list_ports.comports():
|
|
|
role = location_roles[loc]
|
|
|
port_map[role] = port.device
|
|
|
|
|
|
-print("port mapping by usb port:")
|
|
|
-print(port_map)
|
|
|
+print("Port mapping by USB port:")
|
|
|
+for role, dev in port_map.items():
|
|
|
+ print(f" {role}: {dev}")
|
|
|
|
|
|
JOYSTICK_PORT = port_map.get("joystick")
|
|
|
CAM1_PORT = port_map.get("cam1")
|
|
|
CAM2_PORT = port_map.get("cam2")
|
|
|
-# TODO: Don't hardcore baudrate anymore
|
|
|
BAUDRATE = 2400
|
|
|
|
|
|
-DEFAULT_TARGET = "cam1" # default
|
|
|
-
|
|
|
-# Will hold writeable serial transports
|
|
|
+# =========================
|
|
|
+# Globals
|
|
|
+# =========================
|
|
|
+current_target = "cam1" # Default
|
|
|
cam_transports = {}
|
|
|
|
|
|
-
|
|
|
+# =========================
|
|
|
+# Pelco-D Packet Forwarding
|
|
|
+# =========================
|
|
|
class JoystickProtocol(asyncio.Protocol):
|
|
|
def __init__(self, forward_func):
|
|
|
self.forward = forward_func
|
|
@@ -56,10 +59,9 @@ class JoystickProtocol(asyncio.Protocol):
|
|
|
def data_received(self, data):
|
|
|
print(f"[DEBUG] Raw data received: {data.hex()}")
|
|
|
self.buffer += data
|
|
|
-
|
|
|
self.parse_pelco_d_packets()
|
|
|
|
|
|
- def parse_pelco_d_packets(self) -> None:
|
|
|
+ def parse_pelco_d_packets(self):
|
|
|
while len(self.buffer) >= 7:
|
|
|
if self.buffer[0] != 0xFF:
|
|
|
self.buffer.pop(0)
|
|
@@ -73,48 +75,116 @@ class JoystickProtocol(asyncio.Protocol):
|
|
|
cmd2 = packet[3]
|
|
|
data1 = packet[4]
|
|
|
data2 = packet[5]
|
|
|
- # checksum = packet[6]
|
|
|
|
|
|
print(
|
|
|
f"[Joystick] Packet to camera addr {address:02X} — "
|
|
|
f"Cmd1: {cmd1:02X}, Cmd2: {cmd2:02X}, "
|
|
|
f"Data1: {data1:02X}, Data2: {data2:02X}, "
|
|
|
- f"Target: {DEFAULT_TARGET}"
|
|
|
+ f"Target: {current_target}"
|
|
|
)
|
|
|
|
|
|
self.forward(packet)
|
|
|
|
|
|
-
|
|
|
class DummyCamProtocol(asyncio.Protocol):
|
|
|
def connection_made(self, transport):
|
|
|
- pass # We don't need to receive data from the cams
|
|
|
-
|
|
|
-
|
|
|
+ pass
|
|
|
+
|
|
|
+# =========================
|
|
|
+# Pelco-D Preset Helpers
|
|
|
+# =========================
|
|
|
+def make_preset_command(cam_address: int, cmd2: int, preset_id: int):
|
|
|
+ if not (1 <= preset_id <= 0xFF):
|
|
|
+ raise ValueError("Preset ID must be between 1 and 255")
|
|
|
+
|
|
|
+ cmd1 = 0x00
|
|
|
+ data1 = 0x00
|
|
|
+ data2 = preset_id
|
|
|
+ packet = bytearray([0xFF, cam_address, cmd1, cmd2, data1, data2])
|
|
|
+ checksum = sum(packet[1:]) % 256
|
|
|
+ packet.append(checksum)
|
|
|
+ return packet
|
|
|
+
|
|
|
+def send_preset_command(cam_name, cmd2, preset_id):
|
|
|
+ transport = cam_transports.get(cam_name)
|
|
|
+ if not transport:
|
|
|
+ print(f"[WARN] No transport for {cam_name}")
|
|
|
+ return
|
|
|
+ packet = make_preset_command(1, cmd2, preset_id) # Camera address is hardcoded as 1
|
|
|
+ print(f"[API] Sending preset cmd2={cmd2:02X} preset_id={preset_id} to {cam_name}")
|
|
|
+ transport.write(packet)
|
|
|
+
|
|
|
+# =========================
|
|
|
+# HTTP Server (aiohttp)
|
|
|
+# =========================
|
|
|
+async def handle_status(request):
|
|
|
+ return web.json_response({"current_target": current_target})
|
|
|
+
|
|
|
+async def handle_set_target(request):
|
|
|
+ global current_target
|
|
|
+ data = await request.json()
|
|
|
+ target = data.get("target")
|
|
|
+ if target not in cam_transports:
|
|
|
+ return web.json_response({"error": f"Invalid target: {target}"}, status=400)
|
|
|
+ current_target = target
|
|
|
+ print(f"[API] Target set to: {current_target}")
|
|
|
+ return web.json_response({"status": "ok", "target": current_target})
|
|
|
+
|
|
|
+async def handle_goto_preset(request):
|
|
|
+ data = await request.json()
|
|
|
+ preset_id = int(data.get("preset"))
|
|
|
+ target = data.get("target", current_target)
|
|
|
+ if target not in cam_transports:
|
|
|
+ return web.json_response({"error": f"Invalid target: {target}"}, status=400)
|
|
|
+ send_preset_command(target, cmd2=0x07, preset_id=preset_id)
|
|
|
+ return web.json_response({"status": "ok", "action": "goto", "preset": preset_id, "target": target})
|
|
|
+
|
|
|
+async def handle_save_preset(request):
|
|
|
+ data = await request.json()
|
|
|
+ preset_id = int(data.get("preset"))
|
|
|
+ target = data.get("target", current_target)
|
|
|
+ if target == "both":
|
|
|
+ for cam in ["cam1", "cam2"]:
|
|
|
+ send_preset_command(cam, cmd2=0x03, preset_id=preset_id)
|
|
|
+ elif target in cam_transports:
|
|
|
+ send_preset_command(target, cmd2=0x03, preset_id=preset_id)
|
|
|
+ else:
|
|
|
+ return web.json_response({"error": f"Invalid target: {target}"}, status=400)
|
|
|
+
|
|
|
+ return web.json_response({"status": "ok", "action": "save", "preset": preset_id, "target": target})
|
|
|
+
|
|
|
+def start_http_server():
|
|
|
+ app = web.Application()
|
|
|
+ app.router.add_get("/target/get", handle_status)
|
|
|
+ app.router.add_post("/target/set", handle_set_target)
|
|
|
+ app.router.add_post("/preset/goto", handle_goto_preset)
|
|
|
+ app.router.add_post("/preset/save", handle_save_preset)
|
|
|
+ return web._run_app(app, port=1423)
|
|
|
+
|
|
|
+# =========================
|
|
|
+# Main
|
|
|
+# =========================
|
|
|
async def main():
|
|
|
global cam_transports
|
|
|
+
|
|
|
loop = asyncio.get_running_loop()
|
|
|
|
|
|
def forward_packet(packet):
|
|
|
- transport = cam_transports.get(DEFAULT_TARGET)
|
|
|
+ transport = cam_transports.get(current_target)
|
|
|
if transport:
|
|
|
transport.write(packet)
|
|
|
else:
|
|
|
- print(f"[WARN] No transport for {DEFAULT_TARGET}")
|
|
|
+ print(f"[WARN] No transport for {current_target}")
|
|
|
|
|
|
- # Open cam1 and cam2 for writing
|
|
|
+ # Connect to cameras
|
|
|
cam1_transport, _ = await serial_asyncio.create_serial_connection(
|
|
|
loop, DummyCamProtocol, CAM1_PORT, baudrate=BAUDRATE
|
|
|
)
|
|
|
cam2_transport, _ = await serial_asyncio.create_serial_connection(
|
|
|
loop, DummyCamProtocol, CAM2_PORT, baudrate=BAUDRATE
|
|
|
)
|
|
|
+ cam_transports = {"cam1": cam1_transport, "cam2": cam2_transport}
|
|
|
|
|
|
- cam_transports = {
|
|
|
- "cam1": cam1_transport,
|
|
|
- "cam2": cam2_transport,
|
|
|
- }
|
|
|
-
|
|
|
- # Open joystick serial
|
|
|
+ # Connect to joystick
|
|
|
await serial_asyncio.create_serial_connection(
|
|
|
loop,
|
|
|
lambda: JoystickProtocol(forward_packet),
|
|
@@ -122,8 +192,12 @@ async def main():
|
|
|
baudrate=BAUDRATE,
|
|
|
)
|
|
|
|
|
|
- # Keep the loop running
|
|
|
+ # Start HTTP API in a separate task
|
|
|
+ asyncio.create_task(start_http_server())
|
|
|
+
|
|
|
+ # Wait forever
|
|
|
await asyncio.Event().wait()
|
|
|
|
|
|
+if __name__ == "__main__":
|
|
|
+ asyncio.run(main())
|
|
|
|
|
|
-asyncio.run(main())
|