switch.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. # Copyright © 2025 Noah Vogt <noah@noahvogt.com>
  2. # This program is free software: you can redistribute it and/or modify
  3. # it under the terms of the GNU General Public License as published by
  4. # the Free Software Foundation, either version 3 of the License, or
  5. # (at your option) any later version.
  6. # This program is distributed in the hope that it will be useful,
  7. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  8. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  9. # GNU General Public License for more details.
  10. # You should have received a copy of the GNU General Public License
  11. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  12. import asyncio
  13. import serial_asyncio
  14. from serial.tools import list_ports
  15. # TODO: Don't use hardcoded usb port mapping anymore
  16. location_roles = {
  17. "1-1.4": "joystick",
  18. "1-1.1": "cam1",
  19. "1-1.2": "cam2",
  20. }
  21. port_map = {}
  22. for port in list_ports.comports():
  23. if "LOCATION=" in port.hwid:
  24. loc = port.hwid.split("LOCATION=")[-1]
  25. if loc in location_roles:
  26. role = location_roles[loc]
  27. port_map[role] = port.device
  28. print("port mapping by usb port:")
  29. print(port_map)
  30. JOYSTICK_PORT = port_map.get("joystick")
  31. CAM1_PORT = port_map.get("cam1")
  32. CAM2_PORT = port_map.get("cam2")
  33. # TODO: Don't hardcore baudrate anymore
  34. BAUDRATE = 2400
  35. DEFAULT_TARGET = "cam1" # default
  36. # Will hold writeable serial transports
  37. cam_transports = {}
  38. class JoystickProtocol(asyncio.Protocol):
  39. def __init__(self, forward_func):
  40. self.forward = forward_func
  41. self.buffer = bytearray()
  42. def data_received(self, data):
  43. print(f"[DEBUG] Raw data received: {data.hex()}")
  44. self.buffer += data
  45. self.parse_pelco_d_packets()
  46. def parse_pelco_d_packets(self) -> None:
  47. while len(self.buffer) >= 7:
  48. if self.buffer[0] != 0xFF:
  49. self.buffer.pop(0)
  50. continue
  51. packet = self.buffer[:7]
  52. self.buffer = self.buffer[7:]
  53. address = packet[1]
  54. cmd1 = packet[2]
  55. cmd2 = packet[3]
  56. data1 = packet[4]
  57. data2 = packet[5]
  58. # checksum = packet[6]
  59. print(
  60. f"[Joystick] Packet to camera addr {address:02X} — "
  61. f"Cmd1: {cmd1:02X}, Cmd2: {cmd2:02X}, "
  62. f"Data1: {data1:02X}, Data2: {data2:02X}, "
  63. f"Target: {DEFAULT_TARGET}"
  64. )
  65. self.forward(packet)
  66. class DummyCamProtocol(asyncio.Protocol):
  67. def connection_made(self, transport):
  68. pass # We don't need to receive data from the cams
  69. async def main():
  70. global cam_transports
  71. loop = asyncio.get_running_loop()
  72. def forward_packet(packet):
  73. transport = cam_transports.get(DEFAULT_TARGET)
  74. if transport:
  75. transport.write(packet)
  76. else:
  77. print(f"[WARN] No transport for {DEFAULT_TARGET}")
  78. # Open cam1 and cam2 for writing
  79. cam1_transport, _ = await serial_asyncio.create_serial_connection(
  80. loop, DummyCamProtocol, CAM1_PORT, baudrate=BAUDRATE
  81. )
  82. cam2_transport, _ = await serial_asyncio.create_serial_connection(
  83. loop, DummyCamProtocol, CAM2_PORT, baudrate=BAUDRATE
  84. )
  85. cam_transports = {
  86. "cam1": cam1_transport,
  87. "cam2": cam2_transport,
  88. }
  89. # Open joystick serial
  90. await serial_asyncio.create_serial_connection(
  91. loop,
  92. lambda: JoystickProtocol(forward_packet),
  93. JOYSTICK_PORT,
  94. baudrate=BAUDRATE,
  95. )
  96. # Keep the loop running
  97. await asyncio.Event().wait()
  98. asyncio.run(main())