#!/usr/bin/env python3 # Siglent SDG2042X Linux GUI (PyQt5) # Features: # Basic Signals, including Burst, Sweep context-aware parameter settings # - ARB Manager, SCPI CLI, # - Storing Presets, storing to editable preset file SDG2042x.dat # - path setting for screenshot and preset file # - timestamped Screenshot # # Author: Thomas Gohle / tgohle@togo-lab.io # www.togo-lab.io # tgonet.de / gohle.de # # Copyright: CC BY-SA 4.0 # https://creativecommons.org/licenses/by-sa/4.0/ # # Version: 0.2.2 - 2026-04-20 # # ---- PATCH NOTES Severity: High ---- # # BUG-1 _recv_line(): partial recv could miss \n if it didn't land at # the end of a recv() chunk. Rewritten to accumulate into a # bytearray and check the joined buffer. # BUG-2 on_arb_download() ran a blocking recv loop in the GUI thread, # freezing the UI during large transfers. Replaced with # ARBDownloadWorker(QThread) matching the ScreenshotWorker pattern. # BUG-12 human_to_eng(): strip order was "hz" before suffix checks, so # "1.5 MHz" -> "1.5 m" -> mult=1e-3 (completely wrong). # Replaced with a regex-based parser; handles all original suffixes # plus "mv". Falls back to original string on parse failure. # BUG-15 real SDG ARB download returned WAVEDATA payload without the # SCPI definite-length block form expected by v0.2. Download # parser now accepts both block and raw-after-WAVEDATA formats. # ARB store parsing was also reworked to keep BUILDIN index/name # pairs separate from USER waveform names. # # ---- PATCH NOTES Severity: Medium ---- # # BUG-6 cleaned up screenshot path handling. # V0.1 used to save re-read the config from disk on every call, # which meant the Config tab’s in-memory state could be ignored until written out. # Screenshot action now uses self.cfg["SCREENSHOT_DIR"] directly, # so the live config is respected. # BUG-10 Sweep fallback from WAV to SPAC was timing-fragile because # it queried SYST:ERR? immediately after the first write. # Sweep apply now waits on *OPC? before checking the error # queue, clears stale errors before each attempt, and verifies # the fallback attempt the same way. # # ----------------------------------------------- import os import re import socket import sys import time from PyQt5 import QtWidgets, QtCore, QtGui # -------------------- Constants -------------------- Window_Name = "-= WaveForm Generator [SDG2042X] GUI by ToGo-Lab =-" DEFAULT_PORT = 5025 SOCKET_TIMEOUT = 4.0 SIGLENT_WAVES = ["SINE", "SQUARE", "RAMP", "PULSE", "NOISE", "ARB", "DC"] CONFIG_FILE = "SDG2042x.config" PRESET_BASENAME = "SDG2042x.dat" # -------------------- Config helpers -------------------- def _expand(p: str) -> str: return os.path.abspath(os.path.expanduser(os.path.expandvars(p))) def ensure_dir(path: str): if path and not os.path.isdir(path): os.makedirs(path, exist_ok=True) def load_config(path: str) -> dict: d = {"PRESET_DIR": os.getcwd(), "SCREENSHOT_DIR": os.getcwd()} try: with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#") or "=" not in line: continue k, v = line.split("=", 1) d[k.strip().upper()] = v.strip() except FileNotFoundError: pass d["PRESET_DIR"] = _expand(d.get("PRESET_DIR", os.getcwd())) d["SCREENSHOT_DIR"] = _expand(d.get("SCREENSHOT_DIR", os.getcwd())) return d def save_config(path: str, d: dict): lines = [ "# SDG2042x.config", f"PRESET_DIR={d.get('PRESET_DIR','')}", f"SCREENSHOT_DIR={d.get('SCREENSHOT_DIR','')}", "" ] with open(path, "w", encoding="utf-8") as f: f.write("\n".join(lines)) # -------------------- Transport -------------------- class SDGLan: def __init__(self): self.sock = None self.addr = None self.port = DEFAULT_PORT def connect(self, addr, port=DEFAULT_PORT): self.close() self.addr = addr self.port = port self.sock = socket.create_connection((addr, port), timeout=SOCKET_TIMEOUT) self.sock.settimeout(SOCKET_TIMEOUT) def close(self): try: if self.sock: self.sock.close() finally: self.sock = None def write(self, cmd: str): if not self.sock: raise RuntimeError("Not connected") if not cmd.endswith("\n"): cmd += "\n" self.sock.sendall(cmd.encode("ascii")) def write_bytes(self, payload: bytes): if not self.sock: raise RuntimeError("Not connected") self.sock.sendall(payload) def drain(self): """Read and discard any bytes until timeout. Safe if nothing pending.""" try: _ = self._recv_until_timeout() except Exception: pass def query(self, cmd: str) -> str: self.write(cmd) data = self._recv_line() return data.strip() def query_retry(self, cmd: str, retries: int = 2) -> str: """Query with pre-drain and retry on socket.timeout.""" last_err = None for _ in range(retries + 1): try: self.drain() self.write(cmd) data = self._recv_line() return data.strip() except socket.timeout as e: last_err = e try: self.write("*CLS") except Exception: pass continue if last_err: raise last_err return "" def query_raw(self, cmd: str) -> bytes: self.write(cmd) return self._recv_until_timeout() # ---- block helpers ---- def _recv_exact(self, n: int) -> bytes: buf = bytearray() while len(buf) < n: chunk = self.sock.recv(n - len(buf)) if not chunk: break buf.extend(chunk) return bytes(buf) def _recv_exact_capped(self, n: int, cap: int) -> bytes: if n > cap: raise RuntimeError(f"SCPI block length {n} exceeds cap {cap}") return self._recv_exact(n) def query_block(self, cmd: str, timeout_s: float = 8.0, cap: int = 25_000_000) -> bytes: if not self.sock: raise RuntimeError("Not connected") prev_to = self.sock.gettimeout() try: self.sock.settimeout(timeout_s) self.write(cmd) first = self._recv_exact(1) while first and first != b"#": first = self._recv_exact(1) if not first: return b"" ndig_b = self._recv_exact(1) if not ndig_b or not ndig_b.isdigit(): return b"" ndig = int(ndig_b.decode("ascii")) len_str = self._recv_exact(ndig).decode("ascii") if not len_str.isdigit(): return b"" length = int(len_str) payload = self._recv_exact_capped(length, cap) return payload finally: try: self.sock.settimeout(prev_to) except Exception: pass def query_scdp(self, timeout_s: float = 15.0, cap: int = 50_000_000) -> bytes: if not self.sock: raise RuntimeError("Not connected") prev_to = self.sock.gettimeout() try: self.sock.settimeout(timeout_s) self.write("SCDP") first2 = self._recv_exact(2) if not first2: return b"" if first2 == b"BM": size_bytes = self._recv_exact(4) if len(size_bytes) != 4: return b"" total_size = int.from_bytes(size_bytes, "little") rest = self._recv_exact_capped(total_size - 6, cap) return first2 + size_bytes + rest if first2[:1] == b"#": ndig = int(first2[1:2].decode()) len_str = self._recv_exact(ndig).decode("ascii") if not len_str.isdigit(): return b"" length = int(len_str) payload = self._recv_exact_capped(length, cap) return payload tail = self._recv_until_timeout() return first2 + tail finally: try: self.sock.settimeout(prev_to) except Exception: pass # BUG-1 FIX: accumulate into bytearray and check the joined buffer, # not just the last recv() chunk. The original check b.endswith(b"\n") # on the raw chunk would miss a \n that arrived mid-buffer in a # subsequent recv() call, causing _recv_line() to block forever. def _recv_line(self) -> str: buf = bytearray() while True: chunk = self.sock.recv(4096) if not chunk: break buf.extend(chunk) if b"\n" in buf: break return buf.decode("ascii", errors="ignore") def _recv_until_timeout(self) -> bytes: data = bytearray() while True: try: b = self.sock.recv(65536) if not b: break data.extend(b) except socket.timeout: break return bytes(data) # -------------------- Utilities -------------------- # BUG-12 FIX: original code stripped "hz" before checking suffix multipliers, # so "1.5 MHz" -> "1.5 m" -> mult = 1e-3 (completely wrong frequency sent to # instrument). Replaced with a single regex match that reads suffix first, # before any stripping. Handles all original suffixes + "mv". _ENG_SUFFIXES = { "ghz": 1e9, "mhz": 1e6, "khz": 1e3, "hz": 1.0, "ms": 1e-3, "us": 1e-6, "ns": 1e-9, "s": 1.0, "mv": 1e-3, "vpp": 1.0, "v": 1.0, "deg": 1.0, "k": 1e3, "m": 1e-3, "u": 1e-6, "n": 1e-9, "g": 1e9, } # Build pattern: longest suffixes first to avoid partial matches (mhz before m) _SUFFIX_PAT = "|".join( re.escape(s) for s in sorted(_ENG_SUFFIXES, key=len, reverse=True) ) _ENG_RE = re.compile( r"^\s*([+-]?\d+\.?\d*(?:[eE][+-]?\d+)?)\s*(" + _SUFFIX_PAT + r")?\s*$", re.IGNORECASE, ) def human_to_eng(s: str) -> str: m = _ENG_RE.match(s.strip()) if not m: return s.strip() # fallback: return as-is, instrument will reject if wrong val = float(m.group(1)) suffix = (m.group(2) or "").lower() mult = _ENG_SUFFIXES.get(suffix, 1.0) result = val * mult # Return integer string when value is whole, float string otherwise if result == int(result) and abs(result) < 1e15: return str(int(result)) return repr(result) # repr preserves full float precision def quote_name(name: str) -> str: safe = all(ch.isalnum() or ch in ("_", "-") for ch in name) return name if safe else '"%s"' % name def _extract_length_from_header(header: bytes): txt = header.decode("ascii", errors="ignore").upper() m = re.search(r"(?:^|,)\s*LENGTH\s*,\s*(\d+)", txt) return int(m.group(1)) if m else None def parse_stl_builtin(s: str): s = re.sub(r"^\s*STL\s*", "", s.strip(), flags=re.IGNORECASE) toks = [t.strip() for t in s.replace(";", ",").split(",") if t.strip()] out = [] i = 0 while i + 1 < len(toks): tok = toks[i] if re.fullmatch(r"M\d+", tok, flags=re.IGNORECASE): out.append({"source": "BUILDIN", "index": int(tok[1:]), "name": toks[i + 1]}) i += 2 else: i += 1 return out def parse_stl_user(s: str): s = re.sub(r"^\s*STL\s*", "", s.strip(), flags=re.IGNORECASE) toks = [t.strip() for t in s.replace(";", ",").split(",") if t.strip()] if toks and toks[0].upper() == "WVNM": toks = toks[1:] if len(toks) == 1 and toks[0].upper() == "EMPTY": return [] return [{"source": "USER", "name": t} for t in toks if t.upper() != "EMPTY"] def ensure_preset_file(path: str): if not os.path.exists(path): with open(path, "w", encoding="utf-8") as f: f.write("# SDG2042X presets\n") for i in range(1, 11): f.write(f"SLOT{i}_NAME=\n") f.write(f"SLOT{i}_BSWV=\n") f.write(f"SLOT{i}_OUTP=\n") return path def read_presets(path: str) -> dict: ensure_preset_file(path) presets = {} with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if "=" not in line: continue k, v = line.split("=", 1) presets[k.strip()] = v.strip() return presets def write_presets(path: str, d: dict): lines = ["# SDG2042X presets"] for i in range(1, 11): name = d.get(f"SLOT{i}_NAME", "") bswv = d.get(f"SLOT{i}_BSWV", "") outp = d.get(f"SLOT{i}_OUTP", "") lines.append(f"SLOT{i}_NAME={name}") lines.append(f"SLOT{i}_BSWV={bswv}") lines.append(f"SLOT{i}_OUTP={outp}") with open(path, "w", encoding="utf-8") as f: f.write("\n".join(lines) + "\n") # -------------------- Screenshot worker (threaded) -------------------- class ScreenshotWorker(QtCore.QThread): done = QtCore.pyqtSignal(bytes, str, str) # data, filename, error def __init__(self, io: SDGLan, out_dir: str, parent=None): super().__init__(parent) self.io = io self.out_dir = out_dir def run(self): try: try: self.io.write("*CLS") except Exception: pass try: _ = self.io._recv_until_timeout() except Exception: pass data = self.io.query_scdp(timeout_s=20.0, cap=50_000_000) if not data: err = self.io.query_retry("SYST:ERR?") raise RuntimeError(f"no image data; last SYST:ERR? -> {err}") ts = time.strftime("%Y%m%d-%H%M%S") fn = f"{ts}_SDG2042X.bmp" if data.startswith(b"BM") else f"{ts}_SDG2042X.bin" ensure_dir(self.out_dir) full = os.path.join(self.out_dir, fn) self.done.emit(data, full, "") except Exception as e: self.done.emit(b"", "", str(e)) # -------------------- BUG-2 FIX: ARB download worker -------------------- # Original on_arb_download() called self.i.sock.recv() in a blocking loop # directly in the GUI thread, freezing the UI for large waveform transfers. # Moved to a QThread worker with the same done-signal pattern as # ScreenshotWorker. Main window connects the signal and handles save + log. class ARBDownloadWorker(QtCore.QThread): done = QtCore.pyqtSignal(bytes, str, str) # payload, savepath, error def __init__(self, io: SDGLan, name: str, savepath: str, parent=None): super().__init__(parent) self.io = io self.name = name self.savepath = savepath def run(self): try: qname = quote_name(self.name) raw = self.io.query_raw(f"WVDT? USER,{qname}") idx = raw.find(b"WAVEDATA,") if idx == -1: preview = raw[:120] raise RuntimeError(f"No WAVEDATA header in response; first bytes={preview!r}") header = raw[:idx] tail = raw[idx + len(b"WAVEDATA,"):] expected_len = _extract_length_from_header(header) tail_wo_ws = tail.lstrip(b" \t\r\n") if tail_wo_ws.startswith(b"#"): if len(tail_wo_ws) < 2 or not tail_wo_ws[1:2].isdigit(): raise RuntimeError("Malformed SCPI block header after WAVEDATA,") ndig = int(tail_wo_ws[1:2].decode("ascii")) hdr_len = 2 + ndig if len(tail_wo_ws) < hdr_len: raise RuntimeError("Incomplete SCPI block header") blen = int(tail_wo_ws[2:2 + ndig].decode("ascii")) payload = bytearray(tail_wo_ws[hdr_len:hdr_len + blen]) need = blen - len(payload) while need > 0: chunk = self.io.sock.recv(min(65536, need)) if not chunk: break payload.extend(chunk) need -= len(chunk) if len(payload) < blen: raise RuntimeError(f"Incomplete SCPI block payload: got {len(payload)} of {blen}") self.done.emit(bytes(payload), self.savepath, "") return payload = bytearray(tail) if expected_len is not None: need = expected_len - len(payload) while need > 0: chunk = self.io.sock.recv(min(65536, need)) if not chunk: break payload.extend(chunk) need -= len(chunk) if len(payload) < expected_len: raise RuntimeError( f"Incomplete raw waveform payload: got {len(payload)} of {expected_len}" ) payload = payload[:expected_len] self.done.emit(bytes(payload), self.savepath, "") except Exception as e: self.done.emit(b"", self.savepath, str(e)) # -------------------- Tabs -------------------- class BasicTab(QtWidgets.QWidget): apply_basic = QtCore.pyqtSignal(dict) readback_basic = QtCore.pyqtSignal() toggle_output = QtCore.pyqtSignal(bool) store_slot = QtCore.pyqtSignal(int) recall_slot = QtCore.pyqtSignal(int) def __init__(self): super().__init__() self._build() def _build(self): # Row 1: waveform wave_lbl = QtWidgets.QLabel("Waveform") self.wave = QtWidgets.QComboBox(); self.wave.addItems(SIGLENT_WAVES) # Row 2: FRQ/PERI fp_lbl = QtWidgets.QLabel("Freq/Period") self.fp_mode = QtWidgets.QComboBox(); self.fp_mode.addItems(["FRQ", "PERI"]) self.fp_val = QtWidgets.QLineEdit("1 kHz") # Row 3: amplitude or levels amp_mode_lbl = QtWidgets.QLabel("Amplitude mode") self.amp_mode = QtWidgets.QComboBox(); self.amp_mode.addItems(["AMP", "H/L levels"]) self.amp = QtWidgets.QLineEdit("1.0") # AMP Vpp self.hlev = QtWidgets.QLineEdit("0.5") # HLEV self.llev = QtWidgets.QLineEdit("-0.5") # LLEV # Row 4: offset and phase off_lbl = QtWidgets.QLabel("Offset V") self.off = QtWidgets.QLineEdit("0.0") ph_lbl = QtWidgets.QLabel("Phase deg") self.ph = QtWidgets.QLineEdit("0") # Extras self.duty = QtWidgets.QLineEdit("50") # Square/Pulse self.sym = QtWidgets.QLineEdit("50") # Ramp self.width = QtWidgets.QLineEdit("0.001") # Pulse s self.rise = QtWidgets.QLineEdit("1e-6") # Pulse s self.fall = QtWidgets.QLineEdit("1e-6") # Pulse s self.dly = QtWidgets.QLineEdit("0") # Pulse s self.mean = QtWidgets.QLineEdit("0") # Noise self.stdev = QtWidgets.QLineEdit("0.1") self.band = QtWidgets.QCheckBox("Noise bandwidth limit") self.arb_name = QtWidgets.QLineEdit("") # ARB selection # Buttons self.btn_apply = QtWidgets.QPushButton("Apply") self.btn_read = QtWidgets.QPushButton("Readback") self.btn_on = QtWidgets.QPushButton("Output ON") self.btn_off = QtWidgets.QPushButton("Output OFF") # Presets preset_lbl = QtWidgets.QLabel("Preset #") self.preset_num = QtWidgets.QSpinBox(); self.preset_num.setRange(1, 10); self.preset_num.setValue(1) self.btn_store = QtWidgets.QPushButton("Store") self.btn_recall = QtWidgets.QPushButton("Recall") grid = QtWidgets.QGridLayout(); row = 0 grid.addWidget(wave_lbl, row, 0); grid.addWidget(self.wave, row, 1); row += 1 grid.addWidget(fp_lbl, row, 0); grid.addWidget(self.fp_mode, row, 1); grid.addWidget(self.fp_val, row, 2); row += 1 grid.addWidget(amp_mode_lbl, row, 0); grid.addWidget(self.amp_mode, row, 1) grid.addWidget(QtWidgets.QLabel("AMP Vpp"), row, 2); grid.addWidget(self.amp, row, 3) grid.addWidget(QtWidgets.QLabel("HLEV"), row, 4); grid.addWidget(self.hlev, row, 5) grid.addWidget(QtWidgets.QLabel("LLEV"), row, 6); grid.addWidget(self.llev, row, 7); row += 1 grid.addWidget(off_lbl, row, 0); grid.addWidget(self.off, row, 1) grid.addWidget(ph_lbl, row, 2); grid.addWidget(self.ph, row, 3); row += 1 # Extras rows grid.addWidget(QtWidgets.QLabel("DUTY %"), row, 0); grid.addWidget(self.duty, row, 1) grid.addWidget(QtWidgets.QLabel("SYM %"), row, 2); grid.addWidget(self.sym, row, 3) grid.addWidget(QtWidgets.QLabel("WIDTH s"), row, 4); grid.addWidget(self.width, row, 5); row += 1 grid.addWidget(QtWidgets.QLabel("RISE s"), row, 0); grid.addWidget(self.rise, row, 1) grid.addWidget(QtWidgets.QLabel("FALL s"), row, 2); grid.addWidget(self.fall, row, 3) grid.addWidget(QtWidgets.QLabel("DLY s"), row, 4); grid.addWidget(self.dly, row, 5); row += 1 grid.addWidget(QtWidgets.QLabel("NOISE MEAN"), row, 0); grid.addWidget(self.mean, row, 1) grid.addWidget(QtWidgets.QLabel("NOISE STDEV"), row, 2); grid.addWidget(self.stdev, row, 3) grid.addWidget(self.band, row, 4); row += 1 grid.addWidget(QtWidgets.QLabel("ARB name"), row, 0); grid.addWidget(self.arb_name, row, 1, 1, 3); row += 1 grid.addWidget(self.btn_apply, row, 0) grid.addWidget(self.btn_read, row, 1) grid.addWidget(self.btn_on, row, 2) grid.addWidget(self.btn_off, row, 3); row += 1 grid.addWidget(preset_lbl, row, 0) grid.addWidget(self.preset_num, row, 1) grid.addWidget(self.btn_store, row, 2) grid.addWidget(self.btn_recall, row, 3) grid.setColumnStretch(7, 1) self.setLayout(grid) # Signals self.btn_apply.clicked.connect(self._emit_apply) self.btn_read.clicked.connect(lambda: self.readback_basic.emit()) self.btn_on.clicked.connect(lambda: self.toggle_output.emit(True)) self.btn_off.clicked.connect(lambda: self.toggle_output.emit(False)) self.btn_store.clicked.connect(lambda: self.store_slot.emit(self.preset_num.value())) self.btn_recall.clicked.connect(lambda: self.recall_slot.emit(self.preset_num.value())) self.wave.currentTextChanged.connect(self._update_context) self.amp_mode.currentTextChanged.connect(self._update_amp_mode) self._update_context() self._update_amp_mode() def _update_amp_mode(self): amp_mode = self.amp_mode.currentText() show_levels = (amp_mode != "AMP") self.amp.setEnabled(not show_levels) self.hlev.setEnabled(show_levels) self.llev.setEnabled(show_levels) def _update_context(self): w = self.wave.currentText() # enable all by default widgets = [ self.duty, self.sym, self.width, self.rise, self.fall, self.dly, self.mean, self.stdev, self.band, self.arb_name, self.ph, self.off, self.amp, self.hlev, self.llev, self.fp_val ] for wid in widgets: wid.setEnabled(True) if w == "SINE": self._set_extras_enabled(duty=False, sym=False, pulse=False, noise=False, arb=False) self.ph.setEnabled(True); self.off.setEnabled(True) elif w == "SQUARE": self._set_extras_enabled(duty=True, sym=False, pulse=False, noise=False, arb=False) self.ph.setEnabled(True) elif w == "RAMP": self._set_extras_enabled(duty=False, sym=True, pulse=False, noise=False, arb=False) self.ph.setEnabled(True) elif w == "PULSE": self._set_extras_enabled(duty=True, sym=False, pulse=True, noise=False, arb=False) self.ph.setEnabled(False) elif w == "NOISE": self._set_extras_enabled(duty=False, sym=False, pulse=False, noise=True, arb=False) self.fp_val.setEnabled(False) self.amp.setEnabled(False); self.hlev.setEnabled(False); self.llev.setEnabled(False) self.off.setEnabled(False); self.ph.setEnabled(False) elif w == "ARB": self._set_extras_enabled(duty=False, sym=False, pulse=False, noise=False, arb=True) self.ph.setEnabled(True) elif w == "DC": self._set_extras_enabled(duty=False, sym=False, pulse=False, noise=False, arb=False) self.fp_val.setEnabled(False) self.amp.setEnabled(False); self.hlev.setEnabled(False); self.llev.setEnabled(False) self.ph.setEnabled(False) def _set_extras_enabled(self, *, duty: bool, sym: bool, pulse: bool, noise: bool, arb: bool): self.duty.setEnabled(duty) self.sym.setEnabled(sym) for w in (self.width, self.rise, self.fall, self.dly): w.setEnabled(pulse) self.mean.setEnabled(noise) self.stdev.setEnabled(noise) self.band.setEnabled(noise) self.arb_name.setEnabled(arb) def _emit_apply(self): w = self.wave.currentText() cfg = {"WVTP": w} if w not in ("NOISE", "DC"): key = self.fp_mode.currentText() cfg[key] = human_to_eng(self.fp_val.text()) if w not in ("NOISE", "DC"): if self.amp_mode.currentText() == "AMP": cfg["AMP"] = human_to_eng(self.amp.text()) else: cfg["HLEV"] = human_to_eng(self.hlev.text()) cfg["LLEV"] = human_to_eng(self.llev.text()) if w != "NOISE": cfg["OFST"] = human_to_eng(self.off.text()) if w in ("SINE", "SQUARE", "RAMP", "ARB"): cfg["PHSE"] = human_to_eng(self.ph.text()) if w in ("SQUARE", "PULSE"): cfg["DUTY"] = human_to_eng(self.duty.text()) if w == "RAMP": cfg["SYM"] = human_to_eng(self.sym.text()) if w == "PULSE": cfg["WIDTH"] = human_to_eng(self.width.text()) cfg["RISE"] = human_to_eng(self.rise.text()) cfg["FALL"] = human_to_eng(self.fall.text()) cfg["DLY"] = human_to_eng(self.dly.text()) if w == "NOISE": cfg["MEAN"] = human_to_eng(self.mean.text()) cfg["STDEV"] = human_to_eng(self.stdev.text()) cfg["BANDSTATE"] = "ON" if self.band.isChecked() else "OFF" if w == "ARB" and self.arb_name.text().strip(): cfg["ARWV_NAME"] = self.arb_name.text().strip() self.apply_basic.emit(cfg) class PresetsTab(QtWidgets.QWidget): rename_slot = QtCore.pyqtSignal(int, str) view_refresh = QtCore.pyqtSignal() def __init__(self): super().__init__() self._build() def _build(self): self.table = QtWidgets.QTableWidget(10, 3) self.table.setHorizontalHeaderLabels(["Slot", "Name", "Summary"]) self.table.horizontalHeader().setStretchLastSection(True) self.table.verticalHeader().setVisible(False) for i in range(10): self.table.setItem(i, 0, QtWidgets.QTableWidgetItem(str(i+1))) self.table.item(i, 0).setFlags(QtCore.Qt.ItemIsEnabled) self.table.setItem(i, 1, QtWidgets.QTableWidgetItem("")) self.table.setItem(i, 2, QtWidgets.QTableWidgetItem("")) self.table.item(i, 2).setFlags(QtCore.Qt.ItemIsEnabled) self.btn_load = QtWidgets.QPushButton("Reload File") self.btn_save = QtWidgets.QPushButton("Save Names") self.btn_open = QtWidgets.QPushButton("Open File...") self.path_label = QtWidgets.QLabel(PRESET_BASENAME) layout = QtWidgets.QVBoxLayout() hl = QtWidgets.QHBoxLayout() hl.addWidget(self.btn_load); hl.addWidget(self.btn_save); hl.addWidget(self.btn_open); hl.addStretch(1); hl.addWidget(self.path_label) layout.addLayout(hl) layout.addWidget(self.table) self.setLayout(layout) self.btn_load.clicked.connect(lambda: self.view_refresh.emit()) self.btn_save.clicked.connect(self._save_names) self.btn_open.clicked.connect(self._open_file) def _open_file(self): path, _ = QtWidgets.QFileDialog.getOpenFileName(self, "Open Preset File", PRESET_BASENAME, "Text (*.txt *.dat);;All Files (*)") if not path: return self.path_label.setText(path) self.view_refresh.emit() def _save_names(self): for i in range(10): name = self.table.item(i, 1).text().strip() self.rename_slot.emit(i+1, name) def update_from_presets(self, d: dict): for i in range(1, 11): name = d.get(f"SLOT{i}_NAME", "") b = d.get(f"SLOT{i}_BSWV", "") outp = d.get(f"SLOT{i}_OUTP", "") summ = "" if b: up = b.upper() fields = [] for key in ("WVTP,", "FRQ,", "AMP,", "OFST,"): idx = up.find(key) if idx >= 0: val = b[idx+len(key):] val = val.split(",")[0] fields.append(f"{key[:-1]}={val}") summ = " ".join(fields) if outp: summ += (" " if summ else "") + f"OUTP={outp}" self.table.item(i-1, 1).setText(name) self.table.item(i-1, 2).setText(summ) class ConfigTab(QtWidgets.QWidget): changed = QtCore.pyqtSignal(dict) # {"PRESET_DIR":..., "SCREENSHOT_DIR":...} def __init__(self, initial: dict): super().__init__() self._build(initial) def _build(self, cfg: dict): self.preset_dir = QtWidgets.QLineEdit(cfg.get("PRESET_DIR", "")) self.ss_dir = QtWidgets.QLineEdit(cfg.get("SCREENSHOT_DIR", "")) btn_browse_p = QtWidgets.QPushButton("…") btn_browse_s = QtWidgets.QPushButton("…") self.btn_save = QtWidgets.QPushButton("Save Config") self.btn_reload = QtWidgets.QPushButton("Reload Config") g = QtWidgets.QGridLayout(); row = 0 g.addWidget(QtWidgets.QLabel("Preset dir"), row, 0); g.addWidget(self.preset_dir, row, 1); g.addWidget(btn_browse_p, row, 2); row += 1 g.addWidget(QtWidgets.QLabel("Screenshot dir"), row, 0); g.addWidget(self.ss_dir, row, 1); g.addWidget(btn_browse_s, row, 2); row += 1 g.addWidget(self.btn_save, row, 0); g.addWidget(self.btn_reload, row, 1); g.setColumnStretch(1, 1) self.setLayout(g) btn_browse_p.clicked.connect(self._pick_preset_dir) btn_browse_s.clicked.connect(self._pick_ss_dir) self.btn_save.clicked.connect(self._save) self.btn_reload.clicked.connect(self._reload) def _pick_preset_dir(self): d = QtWidgets.QFileDialog.getExistingDirectory(self, "Select Preset Directory", self.preset_dir.text().strip() or os.getcwd()) if d: self.preset_dir.setText(d) def _pick_ss_dir(self): d = QtWidgets.QFileDialog.getExistingDirectory(self, "Select Screenshot Directory", self.ss_dir.text().strip() or os.getcwd()) if d: self.ss_dir.setText(d) def _save(self): d = {"PRESET_DIR": self.preset_dir.text().strip(), "SCREENSHOT_DIR": self.ss_dir.text().strip()} self.changed.emit(d) def _reload(self): d = load_config(CONFIG_FILE) self.preset_dir.setText(d.get("PRESET_DIR", "")) self.ss_dir.setText(d.get("SCREENSHOT_DIR", "")) class CLITab(QtWidgets.QWidget): send_cmd = QtCore.pyqtSignal(str) query_cmd = QtCore.pyqtSignal(str) def __init__(self): super().__init__() self._build() def _build(self): self.inp = QtWidgets.QLineEdit() self.inp.setPlaceholderText("Enter SCPI, e.g. C1:BSWV? or *IDN?") btn_send = QtWidgets.QPushButton("Send") btn_query = QtWidgets.QPushButton("Query") self.out = QtWidgets.QPlainTextEdit(); self.out.setReadOnly(True); self.out.setMinimumHeight(200) grid = QtWidgets.QGridLayout(); row = 0 grid.addWidget(QtWidgets.QLabel("Command"), row, 0) grid.addWidget(self.inp, row, 1, 1, 4) grid.addWidget(btn_send, row, 5) grid.addWidget(btn_query, row, 6); row += 1 grid.addWidget(self.out, row, 0, 1, 7) self.setLayout(grid) btn_send.clicked.connect(self._do_send) btn_query.clicked.connect(self._do_query) self.inp.returnPressed.connect(self._do_query) def append(self, s: str): self.out.appendPlainText(s) def _do_send(self): cmd = self.inp.text().strip() if cmd: self.send_cmd.emit(cmd) def _do_query(self): cmd = self.inp.text().strip() if cmd: self.query_cmd.emit(cmd) class BurstTab(QtWidgets.QWidget): apply_burst = QtCore.pyqtSignal(dict) readback_burst = QtCore.pyqtSignal() def __init__(self): super().__init__() self._build() def _build(self): state_lbl = QtWidgets.QLabel("State") self.state = QtWidgets.QComboBox(); self.state.addItems(["OFF", "ON"]) mode_lbl = QtWidgets.QLabel("Mode") self.mode = QtWidgets.QComboBox(); self.mode.addItems(["TRIG", "GAT"]) src_lbl = QtWidgets.QLabel("Source") self.src = QtWidgets.QComboBox(); self.src.addItems(["INT", "EXT"]) ncy_lbl = QtWidgets.QLabel("Cycles") self.ncy = QtWidgets.QLineEdit("5") dly_lbl = QtWidgets.QLabel("Delay s") self.dly = QtWidgets.QLineEdit("0") gpol_lbl = QtWidgets.QLabel("Gate Pol") self.gpol = QtWidgets.QComboBox(); self.gpol.addItems(["POS", "NEG"]) self.btn_apply = QtWidgets.QPushButton("Apply Burst") self.btn_read = QtWidgets.QPushButton("Readback Burst") grid = QtWidgets.QGridLayout(); row = 0 grid.addWidget(state_lbl, row, 0); grid.addWidget(self.state, row, 1) grid.addWidget(mode_lbl, row, 2); grid.addWidget(self.mode, row, 3); row += 1 grid.addWidget(src_lbl, row, 0); grid.addWidget(self.src, row, 1) grid.addWidget(ncy_lbl, row, 2); grid.addWidget(self.ncy, row, 3); row += 1 grid.addWidget(dly_lbl, row, 0); grid.addWidget(self.dly, row, 1) grid.addWidget(gpol_lbl, row, 2); grid.addWidget(self.gpol, row, 3); row += 1 grid.addWidget(self.btn_apply, row, 0) grid.addWidget(self.btn_read, row, 1) self.setLayout(grid) self.btn_apply.clicked.connect(self._emit_apply) self.btn_read.clicked.connect(lambda: self.readback_burst.emit()) def _emit_apply(self): try: ncy_int = int(float(self.ncy.text())) except Exception: ncy_int = 1 cfg = dict( STATE = self.state.currentText(), MODE = self.mode.currentText(), TRSR = self.src.currentText(), NCYC = str(ncy_int), DLAY = human_to_eng(self.dly.text()), GATEPOL = self.gpol.currentText(), ) self.apply_burst.emit(cfg) class SweepTab(QtWidgets.QWidget): apply_sweep = QtCore.pyqtSignal(dict) readback_sweep = QtCore.pyqtSignal() def __init__(self): super().__init__() self._build() def _build(self): state_lbl = QtWidgets.QLabel("State") self.state = QtWidgets.QComboBox(); self.state.addItems(["OFF", "ON"]) typ_lbl = QtWidgets.QLabel("Type") self.typ = QtWidgets.QComboBox(); self.typ.addItems(["LIN", "LOG"]) fstart_lbl = QtWidgets.QLabel("Start Hz") self.fstart = QtWidgets.QLineEdit("1 kHz") fstop_lbl = QtWidgets.QLabel("Stop Hz") self.fstop = QtWidgets.QLineEdit("10 kHz") time_lbl = QtWidgets.QLabel("Time s") self.time = QtWidgets.QLineEdit("1.0") dir_lbl = QtWidgets.QLabel("Direction") self.direction = QtWidgets.QComboBox(); self.direction.addItems(["UP", "DOWN"]) src_lbl = QtWidgets.QLabel("Source") self.src = QtWidgets.QComboBox(); self.src.addItems(["INT", "EXT"]) self.btn_apply = QtWidgets.QPushButton("Apply Sweep") self.btn_read = QtWidgets.QPushButton("Readback Sweep") grid = QtWidgets.QGridLayout(); row = 0 grid.addWidget(state_lbl, row, 0); grid.addWidget(self.state, row, 1) grid.addWidget(typ_lbl, row, 2); grid.addWidget(self.typ, row, 3); row += 1 grid.addWidget(fstart_lbl, row, 0); grid.addWidget(self.fstart, row, 1) grid.addWidget(fstop_lbl, row, 2); grid.addWidget(self.fstop, row, 3); row += 1 grid.addWidget(time_lbl, row, 0); grid.addWidget(self.time, row, 1) grid.addWidget(dir_lbl, row, 2); grid.addWidget(self.direction, row, 3); row += 1 grid.addWidget(src_lbl, row, 0); grid.addWidget(self.src, row, 1); row += 1 grid.addWidget(self.btn_apply, row, 0) grid.addWidget(self.btn_read, row, 1) self.setLayout(grid) self.btn_apply.clicked.connect(self._emit_apply) self.btn_read.clicked.connect(lambda: self.readback_sweep.emit()) def _emit_apply(self): cfg = dict( STATE = self.state.currentText(), WAV = self.typ.currentText(), STAR = human_to_eng(self.fstart.text()), STOP = human_to_eng(self.fstop.text()), TIME = human_to_eng(self.time.text()), DIR = self.direction.currentText(), TRSR = self.src.currentText(), ) self.apply_sweep.emit(cfg) class ARBTab(QtWidgets.QWidget): refresh_lists = QtCore.pyqtSignal() set_wave = QtCore.pyqtSignal(object) upload_wave = QtCore.pyqtSignal(str, str) download_wave = QtCore.pyqtSignal(str, str) def __init__(self): super().__init__() self._build() def _build(self): self.built_in = QtWidgets.QListWidget() self.user = QtWidgets.QListWidget() refresh = QtWidgets.QPushButton("Refresh") set_btn = QtWidgets.QPushButton("Set Selected to Channel") upload_btn = QtWidgets.QPushButton("Upload .bin/.csv → USER") download_btn = QtWidgets.QPushButton("Download USER → file") self.name_edit = QtWidgets.QLineEdit(); self.name_edit.setPlaceholderText("User name (optional)") grid = QtWidgets.QGridLayout() grid.addWidget(QtWidgets.QLabel("BUILDIN"), 0, 0) grid.addWidget(QtWidgets.QLabel("USER"), 0, 1) grid.addWidget(self.built_in, 1, 0) grid.addWidget(self.user, 1, 1) grid.addWidget(refresh, 2, 0) grid.addWidget(set_btn, 2, 1) grid.addWidget(self.name_edit, 3, 0) grid.addWidget(upload_btn, 3, 1) grid.addWidget(download_btn, 4, 1) self.setLayout(grid) refresh.clicked.connect(lambda: self.refresh_lists.emit()) set_btn.clicked.connect(self._emit_set) upload_btn.clicked.connect(self._emit_upload) download_btn.clicked.connect(self._emit_download) self.built_in.itemSelectionChanged.connect(self._on_builtin_selected) self.user.itemSelectionChanged.connect(self._on_user_selected) def _on_builtin_selected(self): if self.built_in.selectedItems(): self.user.blockSignals(True) self.user.clearSelection() self.user.setCurrentItem(None) self.user.blockSignals(False) def _on_user_selected(self): if self.user.selectedItems(): self.built_in.blockSignals(True) self.built_in.clearSelection() self.built_in.setCurrentItem(None) self.built_in.blockSignals(False) def selected_entry(self): w = self.user.currentItem() or self.built_in.currentItem() if not w: return None data = w.data(QtCore.Qt.UserRole) if isinstance(data, dict): return data name = w.text().strip() return {"source": "USER", "name": name} if name else None def _emit_set(self): entry = self.selected_entry() if entry: self.set_wave.emit(entry) def _emit_upload(self): path, _ = QtWidgets.QFileDialog.getOpenFileName(self, "Select ARB file", "", "ARB Files (*.bin *.csv);;All Files (*)") if not path: return name = self.name_edit.text().strip() if not name: name = os.path.splitext(os.path.basename(path))[0] self.upload_wave.emit(name, path) def _emit_download(self): item = self.user.currentItem() if not item: return entry = item.data(QtCore.Qt.UserRole) or {"source": "USER", "name": item.text().strip()} name = entry.get("name", "wave") path, _ = QtWidgets.QFileDialog.getSaveFileName(self, "Save ARB file", f"{name}.bin", "Binary (*.bin);;All Files (*)") if not path: return self.download_wave.emit(name, path) # -------------------- Main Window -------------------- class Main(QtWidgets.QWidget): def __init__(self, preset_ip=None): super().__init__() self.setWindowTitle(Window_Name) self.i = SDGLan() self._scr_worker = None self._arb_dl_worker = None # BUG-2: track ARB download worker self.cfg = load_config(CONFIG_FILE) ensure_dir(self.cfg["PRESET_DIR"]) ensure_dir(self.cfg["SCREENSHOT_DIR"]) self.preset_path = os.path.join(self.cfg["PRESET_DIR"], PRESET_BASENAME) self._build(preset_ip) def _build(self, preset_ip): ip_lbl = QtWidgets.QLabel("IP") self.ip = QtWidgets.QLineEdit(preset_ip or "") if not preset_ip: self.ip.setPlaceholderText("e.g. 192.168.1.120") port_lbl = QtWidgets.QLabel("Port") self.port = QtWidgets.QLineEdit(str(DEFAULT_PORT)) self.btn_connect = QtWidgets.QPushButton("Connect") self.btn_idn = QtWidgets.QPushButton("IDN?") self.btn_err = QtWidgets.QPushButton("SYST:ERR?") self.btn_scr = QtWidgets.QPushButton("Screenshot") ch_lbl = QtWidgets.QLabel("Channel") self.ch = QtWidgets.QComboBox(); self.ch.addItems(["C1", "C2"]) self.tabs = QtWidgets.QTabWidget() self.t_basic = BasicTab(); self.tabs.addTab(self.t_basic, "Basic") self.t_burst = BurstTab(); self.tabs.addTab(self.t_burst, "Burst") self.t_sweep = SweepTab(); self.tabs.addTab(self.t_sweep, "Sweep") self.t_arb = ARBTab(); self.tabs.addTab(self.t_arb, "ARB Manager") self.t_cli = CLITab(); self.tabs.addTab(self.t_cli, "SCPI CLI") self.t_presets = PresetsTab(); self.tabs.addTab(self.t_presets, "Presets") self.t_config = ConfigTab(self.cfg); self.tabs.addTab(self.t_config, "Config") self.log = QtWidgets.QPlainTextEdit(); self.log.setReadOnly(True); self.log.setMinimumHeight(200) top = QtWidgets.QGridLayout(); row = 0 top.addWidget(ip_lbl, row, 0); top.addWidget(self.ip, row, 1, 1, 3) top.addWidget(port_lbl, row, 4); top.addWidget(self.port, row, 5) top.addWidget(self.btn_connect, row, 6); row += 1 top.addWidget(self.btn_idn, row, 0); top.addWidget(self.btn_err, row, 1); top.addWidget(self.btn_scr, row, 2) top.addWidget(ch_lbl, row, 3); top.addWidget(self.ch, row, 4); row += 1 top.addWidget(self.tabs, row, 0, 1, 7); row += 1 top.addWidget(self.log, row, 0, 1, 7) self.setLayout(top) # wiring self.btn_connect.clicked.connect(self.on_connect) self.btn_idn.clicked.connect(self.on_idn) self.btn_err.clicked.connect(self.on_err) self.btn_scr.clicked.connect(self.on_screenshot) self.t_basic.apply_basic.connect(self.on_apply_basic) self.t_basic.readback_basic.connect(self.on_readback_basic) self.t_basic.toggle_output.connect(self.on_output) self.t_basic.store_slot.connect(self.on_store_slot) self.t_basic.recall_slot.connect(self.on_recall_slot) self.t_burst.apply_burst.connect(self.on_apply_burst) self.t_burst.readback_burst.connect(self.on_readback_burst) self.t_sweep.apply_sweep.connect(self.on_apply_sweep) self.t_sweep.readback_sweep.connect(self.on_readback_sweep) self.t_arb.refresh_lists.connect(self.on_arb_refresh) self.t_arb.set_wave.connect(self.on_arb_set) self.t_arb.upload_wave.connect(self.on_arb_upload) self.t_arb.download_wave.connect(self.on_arb_download) self.t_cli.send_cmd.connect(self.on_cli_send) self.t_cli.query_cmd.connect(self.on_cli_query) self.t_config.changed.connect(self.on_config_changed) ensure_preset_file(self.preset_path) self.refresh_presets_view() # --------- helpers --------- def logln(self, s: str): self.log.appendPlainText(s) def _pref(self) -> str: return self.ch.currentText() + ":" # ---------- Top actions ---------- def on_connect(self): try: addr = self.ip.text().strip(); port = int(self.port.text().strip()) self.i.connect(addr, port) self.logln(f"Connected to {addr}:{port}") except Exception as e: self.logln(f"[ERR] connect: {e}") def on_idn(self): try: self.logln(f"*IDN? -> {self.i.query_retry('*IDN?')}") except Exception as e: self.logln(f"[ERR] IDN: {e}") def on_err(self): try: self.logln(f"SYST:ERR? -> {self.i.query_retry('SYST:ERR?')}") except Exception as e: self.logln(f"[ERR] SYST:ERR?: {e}") # ---------- Screenshot ---------- def on_screenshot(self): if not self.i.sock: self.logln("[ERR] screenshot: not connected") return if hasattr(self, "_scr_worker") and self._scr_worker and self._scr_worker.isRunning(): self.logln("[ERR] screenshot already running") return self.btn_scr.setEnabled(False); self.setCursor(QtCore.Qt.WaitCursor) outdir = self.cfg["SCREENSHOT_DIR"] # BUG-6 fix: use in-memory cfg, not re-read from disk self._scr_worker = ScreenshotWorker(self.i, outdir, self) self._scr_worker.done.connect(self._on_screenshot_done) self._scr_worker.start() def _on_screenshot_done(self, data: bytes, full_path: str, err: str): self.btn_scr.setEnabled(True); self.unsetCursor() if err: self.logln(f"[ERR] screenshot: {err}") return try: with open(full_path, "wb") as f: f.write(data) self.logln(f"Screenshot saved to {full_path} ({len(data)} bytes)") img = QtGui.QImage(full_path) if img.isNull(): self.logln("[ERR] screenshot: Qt decode failed") return pixmap = QtGui.QPixmap.fromImage(img) lbl = QtWidgets.QLabel(); lbl.setPixmap(pixmap) w = QtWidgets.QWidget(); l = QtWidgets.QVBoxLayout(w); l.addWidget(lbl) w.setWindowTitle("SDG2042X Screenshot"); w.resize(pixmap.width(), pixmap.height()); w.show() self._scr_win = w except Exception as e: self.logln(f"[ERR] screenshot save/display: {e}") # ---------- Basic apply/readback ---------- def on_apply_basic(self, cfg: dict): try: parts = [f"WVTP,{cfg['WVTP']}"] if 'FRQ' in cfg: parts.append(f"FRQ,{cfg['FRQ']}") if 'PERI' in cfg: parts.append(f"PERI,{cfg['PERI']}") if 'AMP' in cfg: parts.append(f"AMP,{cfg['AMP']}") else: if 'HLEV' in cfg: parts.append(f"HLEV,{cfg['HLEV']}") if 'LLEV' in cfg: parts.append(f"LLEV,{cfg['LLEV']}") if 'OFST' in cfg: parts.append(f"OFST,{cfg['OFST']}") if 'PHSE' in cfg: parts.append(f"PHSE,{cfg['PHSE']}") for key in ("DUTY","SYM","WIDTH","RISE","FALL","DLY","MEAN","STDEV","BANDSTATE"): if key in cfg: parts.append(f"{key},{cfg[key]}") cmd = f"{self._pref()}BSWV " + ",".join(parts) self.i.write(cmd); self.logln(f"SET: {cmd}") if cfg.get("WVTP") == "ARB" and cfg.get("ARWV_NAME"): qname = quote_name(cfg["ARWV_NAME"]) self.i.write(f"{self._pref()}ARWV NAME,{qname}") self.logln(f"SET ARWV NAME,{qname}") except Exception as e: self.logln(f"[ERR] basic apply: {e}") def on_readback_basic(self): try: rb = self.i.query_retry(f"{self._pref()}BSWV?") st = self.i.query_retry(f"{self._pref()}OUTP?") self.logln(f"{self._pref()}BSWV? -> {rb}") self.logln(f"{self._pref()}OUTP? -> {st}") except Exception as e: self.logln(f"[ERR] basic readback: {e}") def on_output(self, turn_on: bool): try: cmd = f"{self._pref()}OUTP {'ON' if turn_on else 'OFF'}" self.i.write(cmd) st = self.i.query_retry(f"{self._pref()}OUTP?") self.logln(f"SET: {cmd}") self.logln(f"{self._pref()}OUTP? -> {st}") except Exception as e: self.logln(f"[ERR] output: {e}") # ---------- Burst ---------- def on_apply_burst(self, cfg: dict): try: parts = [f"{k},{v}" for k, v in cfg.items()] cmd = f"{self._pref()}BTWV " + ",".join(parts) self.i.write(cmd); self.logln(f"SET: {cmd}") except Exception as e: self.logln(f"[ERR] burst apply: {e}") def on_readback_burst(self): try: rb = self.i.query_retry(f"{self._pref()}BTWV?"); self.logln(f"{self._pref()}BTWV? -> {rb}") except Exception as e: self.logln(f"[ERR] burst readback: {e}") # ---------- Sweep ---------- def on_apply_sweep(self, cfg: dict): try: parts = [f"{k},{v}" for k, v in cfg.items()] cmd = f"{self._pref()}SWWV " + ",".join(parts) # BUG-10 FIX: # Do not decide the fallback from an immediate SYST:ERR? query right # after the write. The instrument may not have finished processing the # original SWWV command yet. Use *OPC? as an explicit completion point, # then inspect the error queue. Also clear stale errors before each try # so fallback decisions are based on the current attempt only. try: self.i.write("*CLS") except Exception: pass self.i.write(cmd) opc = self.i.query_retry("*OPC?") err = self.i.query_retry("SYST:ERR?") if not err.startswith("0"): parts2 = [p.replace("WAV,", "SPAC,") for p in parts] if parts2 == parts: raise RuntimeError(f"sweep apply failed; no fallback token found; *OPC? -> {opc}, SYST:ERR? -> {err}") cmd2 = f"{self._pref()}SWWV " + ",".join(parts2) try: self.i.write("*CLS") except Exception: pass self.i.write(cmd2) opc2 = self.i.query_retry("*OPC?") err2 = self.i.query_retry("SYST:ERR?") if not err2.startswith("0"): raise RuntimeError( f"sweep apply failed on both forms; first err={err}; fallback err={err2}; " f"cmd1={cmd}; cmd2={cmd2}; *OPC? -> {opc2}" ) self.logln(f"SET (fallback): {cmd2}") else: self.logln(f"SET: {cmd}") except Exception as e: self.logln(f"[ERR] sweep apply: {e}") def on_readback_sweep(self): try: rb = self.i.query_retry(f"{self._pref()}SWWV?"); self.logln(f"{self._pref()}SWWV? -> {rb}") except Exception as e: self.logln(f"[ERR] sweep readback: {e}") # ---------- ARB ops ---------- def on_arb_refresh(self): try: b = self.i.query_retry("STL? BUILDIN") u = self.i.query_retry("STL? USER") b_list = parse_stl_builtin(b) u_list = parse_stl_user(u) self.t_arb.built_in.clear() self.t_arb.user.clear() for entry in b_list: item = QtWidgets.QListWidgetItem(f"M{entry['index']}: {entry['name']}") item.setData(QtCore.Qt.UserRole, entry) self.t_arb.built_in.addItem(item) for entry in u_list: item = QtWidgets.QListWidgetItem(entry['name']) item.setData(QtCore.Qt.UserRole, entry) self.t_arb.user.addItem(item) self.logln(f"BUILDIN entries: {len(b_list)}") self.logln(f"USER entries: {len(u_list)}") except Exception as e: self.logln(f"[ERR] ARB refresh: {e}") def on_arb_set(self, entry): try: if not entry: raise RuntimeError("no ARB entry selected") self.i.write(f"{self._pref()}BSWV WVTP,ARB") if entry.get("source") == "BUILDIN": idx = entry.get("index") if idx is None: raise RuntimeError("built-in ARB entry missing index") self.i.write(f"{self._pref()}ARWV INDEX,{idx}") rb = self.i.query_retry(f"{self._pref()}ARWV?") self.logln(f"SET ARB BUILDIN M{idx}:{entry.get('name','')} ; ARWV? -> {rb}") else: name = entry.get("name", "") if not name: raise RuntimeError("user ARB entry missing name") qname = quote_name(name) self.i.write(f"{self._pref()}ARWV NAME,{qname}") rb = self.i.query_retry(f"{self._pref()}ARWV?") self.logln(f"SET ARB USER {qname}; ARWV? -> {rb}") except Exception as e: self.logln(f"[ERR] ARB set: {e}") def _make_scpi_block(self, payload: bytes) -> bytes: n = len(payload) len_digits = str(len(str(n))).encode('ascii') header = b'#' + len_digits + str(n).encode('ascii') return header + payload def on_arb_upload(self, name: str, path: str): try: with open(path, 'rb') as f: data = f.read() block = self._make_scpi_block(data) qname = quote_name(name) prefix = f"WVDT USER,{qname},".encode('ascii') self.i.write_bytes(prefix + block + b"\n") self.logln(f"Uploaded USER,{qname} ({len(data)} bytes)") try: self.on_arb_refresh() except Exception: pass except Exception as e: self.logln(f"[ERR] ARB upload: {e}") # BUG-2 FIX: ARB download moved off the GUI thread into ARBDownloadWorker. # The old code called self.i.sock.recv() in a while loop here, freezing # the UI for large waveforms. Now we spin up a QThread and handle the # result via _on_arb_download_done() connected to the worker's done signal. def on_arb_download(self, name: str, savepath: str): if not self.i.sock: self.logln("[ERR] ARB download: not connected") return if self._arb_dl_worker and self._arb_dl_worker.isRunning(): self.logln("[ERR] ARB download already in progress") return self.logln(f"ARB download started: USER,{name} ...") self._arb_dl_worker = ARBDownloadWorker(self.i, name, savepath, self) self._arb_dl_worker.done.connect(self._on_arb_download_done) self._arb_dl_worker.start() def _on_arb_download_done(self, payload: bytes, savepath: str, err: str): if err: self.logln(f"[ERR] ARB download: {err}") return try: with open(savepath, 'wb') as f: f.write(payload) self.logln(f"Downloaded -> {savepath} ({len(payload)} bytes)") except Exception as e: self.logln(f"[ERR] ARB download save: {e}") # ---------- CLI tab ---------- def on_cli_send(self, cmd: str): try: self.i.write(cmd) self.t_cli.append(f"> {cmd}") except Exception as e: self.t_cli.append(f"[ERR] send: {e}") def on_cli_query(self, cmd: str): try: resp = self.i.query_retry(cmd) self.t_cli.append(f"? {cmd}\n< {resp}") except Exception as e: self.t_cli.append(f"[ERR] query: {e}") # ---------- Presets ---------- def on_store_slot(self, slot: int): try: if slot < 1 or slot > 10: raise ValueError("slot out of range") bswv = self.i.query_retry(f"{self._pref()}BSWV?") outp = self.i.query_retry(f"{self._pref()}OUTP?") outp_state = "ON" if "ON" in outp.upper().split(',')[0] else "OFF" d = read_presets(self.preset_path) d[f"SLOT{slot}_BSWV"] = bswv d[f"SLOT{slot}_OUTP"] = outp_state write_presets(self.preset_path, d) self.logln(f"Stored slot {slot}: BSWV len={len(bswv)}, OUTP={outp_state}") self.refresh_presets_view() except Exception as e: self.logln(f"[ERR] store slot {slot}: {e}") def on_recall_slot(self, slot: int): try: if slot < 1 or slot > 10: raise ValueError("slot out of range") d = read_presets(self.preset_path) b = d.get(f"SLOT{slot}_BSWV", "").strip() o = d.get(f"SLOT{slot}_OUTP", "").strip().upper() if not b: raise RuntimeError("empty BSWV") self.i.write(b) if o in ("ON", "OFF"): self.i.write(f"{self._pref()}OUTP {o}") self.logln(f"Recalled slot {slot}: applied BSWV and OUTP={o or 'N/A'}") except Exception as e: self.logln(f"[ERR] recall slot {slot}: {e}") def on_rename_slot(self, slot: int, name: str): try: d = read_presets(self.preset_path) d[f"SLOT{slot}_NAME"] = name write_presets(self.preset_path, d) self.refresh_presets_view() except Exception as e: self.logln(f"[ERR] rename slot {slot}: {e}") def refresh_presets_view(self): try: ensure_preset_file(self.preset_path) d = read_presets(self.preset_path) self.t_presets.update_from_presets(d) self.logln(f"Preset file: {self.preset_path}") except Exception as e: self.logln(f"[ERR] refresh presets: {e}") # ---------- Config ---------- def on_config_changed(self, newd: dict): try: nd = { "PRESET_DIR": _expand(newd.get("PRESET_DIR","")) or os.getcwd(), "SCREENSHOT_DIR": _expand(newd.get("SCREENSHOT_DIR","")) or os.getcwd(), } ensure_dir(nd["PRESET_DIR"]); ensure_dir(nd["SCREENSHOT_DIR"]) save_config(CONFIG_FILE, nd) self.cfg = nd self.preset_path = os.path.join(self.cfg["PRESET_DIR"], PRESET_BASENAME) ensure_preset_file(self.preset_path) self.refresh_presets_view() self.logln(f"Config saved -> {CONFIG_FILE}") except Exception as e: self.logln(f"[ERR] save config: {e}") # -------------------- CLI entry -------------------- def show_help(): print("Usage: sdg2042x_gui.py [options]\n") print("Options:") print(" -ip , --ip Prefill IP address field") print(" -h, --help Show this help message") if __name__ == "__main__": preset_ip = None argv = sys.argv[1:] for i, tok in enumerate(argv): if tok in ("-h", "--help"): show_help(); sys.exit(0) if tok in ("-ip", "--ip") and i + 1 < len(argv): preset_ip = argv[i + 1] app = QtWidgets.QApplication(sys.argv) m = Main(preset_ip=preset_ip) m.resize(1250, 780) m.show() sys.exit(app.exec_())