|
@@ -13,14 +13,24 @@
|
|
|
# You should have received a copy of the GNU General Public License
|
|
|
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
|
|
|
|
+import sys
|
|
|
import os
|
|
|
|
|
|
-from utils import log, CustomException
|
|
|
+from PyQt5.QtWidgets import ( # pylint: disable=no-name-in-module
|
|
|
+ QApplication,
|
|
|
+ QMessageBox,
|
|
|
+ QDialog,
|
|
|
+)
|
|
|
+
|
|
|
+from utils import log, CustomException, RadioButtonDialog
|
|
|
|
|
|
if os.name == "nt":
|
|
|
# pylint: disable=import-error
|
|
|
import wmi # pyright: ignore
|
|
|
import ctypes
|
|
|
+ from subprocess import PIPE, run
|
|
|
+ from re import match, search
|
|
|
+ import config as const
|
|
|
else:
|
|
|
from pycdio import DRIVER_DEVICE
|
|
|
from cdio import (
|
|
@@ -30,6 +40,82 @@ else:
|
|
|
)
|
|
|
|
|
|
|
|
|
+def choose_right_scsi_drive(drives: list) -> str:
|
|
|
+ if len(drives) != 1:
|
|
|
+ log("Warning: More than one SCSI drive found", color="yellow")
|
|
|
+ if (
|
|
|
+ # pylint: disable=possibly-used-before-assignment
|
|
|
+ const.CD_RECORD_PREFERED_SCSI_DRIVE in drives
|
|
|
+ and const.CD_RECORD_PREFERED_SCSI_DRIVE != ""
|
|
|
+ ):
|
|
|
+ return const.CD_RECORD_PREFERED_SCSI_DRIVE
|
|
|
+
|
|
|
+ # pylint: disable=possibly-used-before-assignment
|
|
|
+ dialog = RadioButtonDialog(drives, "Choose a SCSI Drive")
|
|
|
+ if dialog.exec_() == QDialog.Accepted:
|
|
|
+ print(f"Dialog accepted: {dialog.chosen}")
|
|
|
+ return dialog.chosen
|
|
|
+ log("Warning: Choosing first SCSI drive...", color="yellow")
|
|
|
+
|
|
|
+ return drives[0]
|
|
|
+
|
|
|
+
|
|
|
+def get_cdrecord_devname(cd_drive: str) -> str:
|
|
|
+ if os.name == "nt":
|
|
|
+ scsi_drives = get_scsi_drives()
|
|
|
+ return choose_right_scsi_drive(scsi_drives)
|
|
|
+
|
|
|
+ return cd_drive
|
|
|
+
|
|
|
+
|
|
|
+def bytes_line_ends_with(line: bytes, target: str) -> bool:
|
|
|
+ byte_len = len(line)
|
|
|
+ if byte_len < len(target):
|
|
|
+ return False
|
|
|
+
|
|
|
+ for index in range(-1, -1 * len(target), -1):
|
|
|
+ if ord(target[index]) != line[index]:
|
|
|
+ return False
|
|
|
+
|
|
|
+ return True
|
|
|
+
|
|
|
+
|
|
|
+def get_scsi_drives() -> list[str]:
|
|
|
+ # pylint: disable=possibly-used-before-assignment, subprocess-run-check
|
|
|
+ try:
|
|
|
+ drives = []
|
|
|
+ result = run(["cdrecord", "-scanbus"], stdout=PIPE)
|
|
|
+ output = result.stdout
|
|
|
+ if result.returncode != 0:
|
|
|
+ raise CustomException("Command 'cdrecord -scanbus' failed.")
|
|
|
+ for line in output.split(b"\n"):
|
|
|
+ print(line)
|
|
|
+ # pylint: disable=possibly-used-before-assignment
|
|
|
+ if (
|
|
|
+ match(rb"\s*[0-9](,[0-9])+\s*[0-9]+", line)
|
|
|
+ and (not bytes_line_ends_with(line, "*"))
|
|
|
+ and (not bytes_line_ends_with(line, "HOST ADAPTOR"))
|
|
|
+ ):
|
|
|
+ matches = search(rb"[0-9](,[0-9])+", line)
|
|
|
+ if matches is None:
|
|
|
+ raise CustomException(f"could not parse line: '{line}'")
|
|
|
+ drives.append(matches.group().decode("ascii"))
|
|
|
+
|
|
|
+ if not drives:
|
|
|
+ raise CustomException("Could not find any SCSI drives")
|
|
|
+
|
|
|
+ return drives
|
|
|
+ except CustomException as error:
|
|
|
+ app = QApplication([])
|
|
|
+ QMessageBox.critical(
|
|
|
+ None,
|
|
|
+ "Error",
|
|
|
+ f"Could not get SCSI drives. Reason: {error}",
|
|
|
+ )
|
|
|
+ del app
|
|
|
+ sys.exit(1)
|
|
|
+
|
|
|
+
|
|
|
def get_cd_drives() -> list[str]:
|
|
|
if os.name == "nt":
|
|
|
c = wmi.WMI()
|