diff --git a/script/SDG2042X_V0.1.py b/script/SDG2042X_V0.1.py index 0eacf4d..97b0009 100755 --- a/script/SDG2042X_V0.1.py +++ b/script/SDG2042X_V0.1.py @@ -1,10 +1,11 @@ #!/usr/bin/env python3 # Siglent SDG2042X Linux GUI (PyQt5) # Features: -# - Basic, Burst, Sweep, +# Basic Signals, including Burst, Sweep context-aware parameter settings # - ARB Manager, SCPI CLI, -# - Storing Presets, storing to editable text file SDG2042x.dat -# - Screenshot +# - Storing Presets, storing to editable preset file SDG2042x.dat +# - path setting for screenshot and preset file +# - timestamped Screenshot # # Author: Thomas Gohle / tgohle@togo-lab.io # www.togo-lab.io @@ -13,8 +14,7 @@ # Copyright: CC BY-SA 4.0 # https://creativecommons.org/licenses/by-sa/4.0/ # -# Version: 0.1 -#---------------------------------------------------- +# Version: 0.1 - 2025-10-2025 import os import socket @@ -23,12 +23,47 @@ import time from PyQt5 import QtWidgets, QtCore, QtGui # -------------------- Constants -------------------- -Window_Name = "Linux GUI SDG2042X – by Thomas Gohle - ToGo-Lab - Ver 0.1" +Window_Name = "Linux GUI SDG2042X – v16" DEFAULT_PORT = 5025 -SOCKET_TIMEOUT = 2.5 +SOCKET_TIMEOUT = 4.0 SIGLENT_WAVES = ["SINE", "SQUARE", "RAMP", "PULSE", "NOISE", "ARB", "DC"] -PRESET_FILE = "SDG2042x.dat" -NUM_SLOTS = 10 + +CONFIG_FILE = "SDG2042x.config" +PRESET_BASENAME = "SDG2042x.dat" + +# -------------------- Config helpers -------------------- +def _expand(p: str) -> str: + return os.path.abspath(os.path.expanduser(os.path.expandvars(p))) + +def ensure_dir(path: str): + if path and not os.path.isdir(path): + os.makedirs(path, exist_ok=True) + +def load_config(path: str) -> dict: + d = {"PRESET_DIR": os.getcwd(), "SCREENSHOT_DIR": os.getcwd()} + try: + with open(path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line or line.startswith("#") or "=" not in line: + continue + k, v = line.split("=", 1) + d[k.strip().upper()] = v.strip() + except FileNotFoundError: + pass + d["PRESET_DIR"] = _expand(d.get("PRESET_DIR", os.getcwd())) + d["SCREENSHOT_DIR"] = _expand(d.get("SCREENSHOT_DIR", os.getcwd())) + return d + +def save_config(path: str, d: dict): + lines = [ + "# SDG2042x.config", + f"PRESET_DIR={d.get('PRESET_DIR','')}", + f"SCREENSHOT_DIR={d.get('SCREENSHOT_DIR','')}", + "" + ] + with open(path, "w", encoding="utf-8") as f: + f.write("\n".join(lines)) # -------------------- Transport -------------------- class SDGLan: @@ -63,11 +98,38 @@ class SDGLan: raise RuntimeError("Not connected") self.sock.sendall(payload) + def drain(self): + """Read and discard any bytes until timeout. Safe if nothing pending.""" + try: + _ = self._recv_until_timeout() + except Exception: + pass + def query(self, cmd: str) -> str: self.write(cmd) data = self._recv_line() return data.strip() + def query_retry(self, cmd: str, retries: int = 2) -> str: + """Query with pre-drain and retry on socket.timeout.""" + last_err = None + for _ in range(retries + 1): + try: + self.drain() + self.write(cmd) + data = self._recv_line() + return data.strip() + except socket.timeout as e: + last_err = e + try: + self.write("*CLS") + except Exception: + pass + continue + if last_err: + raise last_err + return "" + def query_raw(self, cmd: str) -> bytes: self.write(cmd) return self._recv_until_timeout() @@ -88,14 +150,12 @@ class SDGLan: return self._recv_exact(n) def query_block(self, cmd: str, timeout_s: float = 8.0, cap: int = 25_000_000) -> bytes: - """Read a SCPI definite-length block (#). Returns payload without header.""" if not self.sock: raise RuntimeError("Not connected") prev_to = self.sock.gettimeout() try: self.sock.settimeout(timeout_s) self.write(cmd) - # seek '#' first = self._recv_exact(1) while first and first != b"#": first = self._recv_exact(1) @@ -118,28 +178,22 @@ class SDGLan: pass def query_scdp(self, timeout_s: float = 15.0, cap: int = 50_000_000) -> bytes: - """Robust screenshot: handles BMP direct or SCPI block-wrapped payload.""" if not self.sock: raise RuntimeError("Not connected") prev_to = self.sock.gettimeout() try: self.sock.settimeout(timeout_s) - # Send SCDP self.write("SCDP") - # Peek first two bytes first2 = self._recv_exact(2) if not first2: return b"" - # Case 1: BMP direct if first2 == b"BM": size_bytes = self._recv_exact(4) if len(size_bytes) != 4: return b"" total_size = int.from_bytes(size_bytes, "little") - # We already have 6 bytes rest = self._recv_exact_capped(total_size - 6, cap) return first2 + size_bytes + rest - # Case 2: SCPI block if first2[:1] == b"#": ndig = int(first2[1:2].decode()) len_str = self._recv_exact(ndig).decode("ascii") @@ -148,7 +202,6 @@ class SDGLan: length = int(len_str) payload = self._recv_exact_capped(length, cap) return payload - # Case 3: unknown header, read until timeout tail = self._recv_until_timeout() return first2 + tail finally: @@ -182,7 +235,6 @@ class SDGLan: # -------------------- Utilities -------------------- def human_to_eng(s: str) -> str: - """Convert '1k', '2.5M', '500mV', '10ms' etc. to float string (base units).""" t = s.strip().lower().replace("vpp", "").replace("v", "") t = t.replace("hz", "").replace("deg", "") mult = 1.0 @@ -217,7 +269,6 @@ def human_to_eng(s: str) -> str: return f"{val*mult}" def quote_name(name: str) -> str: - """Quote ARB names if they contain spaces or punctuation.""" safe = all(ch.isalnum() or ch in ("_", "-") for ch in name) return name if safe else '"%s"' % name @@ -259,31 +310,32 @@ def write_presets(path: str, d: dict): # -------------------- Screenshot worker (threaded) -------------------- class ScreenshotWorker(QtCore.QThread): - done = QtCore.pyqtSignal(bytes, str, str) # data, filename, error ('' if ok) + done = QtCore.pyqtSignal(bytes, str, str) # data, filename, error - def __init__(self, io: SDGLan, parent=None): + def __init__(self, io: SDGLan, out_dir: str, parent=None): super().__init__(parent) self.io = io + self.out_dir = out_dir def run(self): try: - # Clear previous errors that may reference old invalid commands try: self.io.write("*CLS") except Exception: pass - # Drain any residual bytes from prior operations try: _ = self.io._recv_until_timeout() except Exception: pass data = self.io.query_scdp(timeout_s=20.0, cap=50_000_000) if not data: - err = self.io.query("SYST:ERR?") + err = self.io.query_retry("SYST:ERR?") raise RuntimeError(f"no image data; last SYST:ERR? -> {err}") ts = time.strftime("%Y%m%d-%H%M%S") fn = f"{ts}_SDG2042X.bmp" if data.startswith(b"BM") else f"{ts}_SDG2042X.bin" - self.done.emit(data, fn, "") + ensure_dir(self.out_dir) + full = os.path.join(self.out_dir, fn) + self.done.emit(data, full, "") except Exception as e: self.done.emit(b"", "", str(e)) @@ -300,60 +352,189 @@ class BasicTab(QtWidgets.QWidget): self._build() def _build(self): + # Row 1: waveform wave_lbl = QtWidgets.QLabel("Waveform") self.wave = QtWidgets.QComboBox(); self.wave.addItems(SIGLENT_WAVES) - frq_lbl = QtWidgets.QLabel("Freq") - self.frq = QtWidgets.QLineEdit("1 kHz") - amp_lbl = QtWidgets.QLabel("Amp Vpp") - self.amp = QtWidgets.QLineEdit("1.0") + # Row 2: FRQ/PERI + fp_lbl = QtWidgets.QLabel("Freq/Period") + self.fp_mode = QtWidgets.QComboBox(); self.fp_mode.addItems(["FRQ", "PERI"]) + self.fp_val = QtWidgets.QLineEdit("1 kHz") + # Row 3: amplitude or levels + amp_mode_lbl = QtWidgets.QLabel("Amplitude mode") + self.amp_mode = QtWidgets.QComboBox(); self.amp_mode.addItems(["AMP", "H/L levels"]) + self.amp = QtWidgets.QLineEdit("1.0") # AMP Vpp + self.hlev = QtWidgets.QLineEdit("0.5") # HLEV + self.llev = QtWidgets.QLineEdit("-0.5") # LLEV + # Row 4: offset and phase off_lbl = QtWidgets.QLabel("Offset V") self.off = QtWidgets.QLineEdit("0.0") ph_lbl = QtWidgets.QLabel("Phase deg") self.ph = QtWidgets.QLineEdit("0") + # Extras + self.duty = QtWidgets.QLineEdit("50") # Square/Pulse + self.sym = QtWidgets.QLineEdit("50") # Ramp + self.width = QtWidgets.QLineEdit("0.001") # Pulse s + self.rise = QtWidgets.QLineEdit("1e-6") # Pulse s + self.fall = QtWidgets.QLineEdit("1e-6") # Pulse s + self.dly = QtWidgets.QLineEdit("0") # Pulse s + + self.mean = QtWidgets.QLineEdit("0") # Noise + self.stdev = QtWidgets.QLineEdit("0.1") + self.band = QtWidgets.QCheckBox("Noise bandwidth limit") + + self.arb_name = QtWidgets.QLineEdit("") # ARB selection + + # Buttons self.btn_apply = QtWidgets.QPushButton("Apply") self.btn_read = QtWidgets.QPushButton("Readback") self.btn_on = QtWidgets.QPushButton("Output ON") self.btn_off = QtWidgets.QPushButton("Output OFF") - # Preset controls + # Presets preset_lbl = QtWidgets.QLabel("Preset #") self.preset_num = QtWidgets.QSpinBox(); self.preset_num.setRange(1, 10); self.preset_num.setValue(1) self.btn_store = QtWidgets.QPushButton("Store") self.btn_recall = QtWidgets.QPushButton("Recall") grid = QtWidgets.QGridLayout(); row = 0 - grid.addWidget(wave_lbl, row, 0); grid.addWidget(self.wave, row, 1) - grid.addWidget(frq_lbl, row, 2); grid.addWidget(self.frq, row, 3); row += 1 - grid.addWidget(amp_lbl, row, 0); grid.addWidget(self.amp, row, 1) - grid.addWidget(off_lbl, row, 2); grid.addWidget(self.off, row, 3); row += 1 - grid.addWidget(ph_lbl, row, 0); grid.addWidget(self.ph, row, 1); row += 1 + grid.addWidget(wave_lbl, row, 0); grid.addWidget(self.wave, row, 1); row += 1 + grid.addWidget(fp_lbl, row, 0); grid.addWidget(self.fp_mode, row, 1); grid.addWidget(self.fp_val, row, 2); row += 1 + grid.addWidget(amp_mode_lbl, row, 0); grid.addWidget(self.amp_mode, row, 1) + grid.addWidget(QtWidgets.QLabel("AMP Vpp"), row, 2); grid.addWidget(self.amp, row, 3) + grid.addWidget(QtWidgets.QLabel("HLEV"), row, 4); grid.addWidget(self.hlev, row, 5) + grid.addWidget(QtWidgets.QLabel("LLEV"), row, 6); grid.addWidget(self.llev, row, 7); row += 1 + grid.addWidget(off_lbl, row, 0); grid.addWidget(self.off, row, 1) + grid.addWidget(ph_lbl, row, 2); grid.addWidget(self.ph, row, 3); row += 1 + + # Extras rows + grid.addWidget(QtWidgets.QLabel("DUTY %"), row, 0); grid.addWidget(self.duty, row, 1) + grid.addWidget(QtWidgets.QLabel("SYM %"), row, 2); grid.addWidget(self.sym, row, 3) + grid.addWidget(QtWidgets.QLabel("WIDTH s"), row, 4); grid.addWidget(self.width, row, 5); row += 1 + grid.addWidget(QtWidgets.QLabel("RISE s"), row, 0); grid.addWidget(self.rise, row, 1) + grid.addWidget(QtWidgets.QLabel("FALL s"), row, 2); grid.addWidget(self.fall, row, 3) + grid.addWidget(QtWidgets.QLabel("DLY s"), row, 4); grid.addWidget(self.dly, row, 5); row += 1 + grid.addWidget(QtWidgets.QLabel("NOISE MEAN"), row, 0); grid.addWidget(self.mean, row, 1) + grid.addWidget(QtWidgets.QLabel("NOISE STDEV"), row, 2); grid.addWidget(self.stdev, row, 3) + grid.addWidget(self.band, row, 4); row += 1 + grid.addWidget(QtWidgets.QLabel("ARB name"), row, 0); grid.addWidget(self.arb_name, row, 1, 1, 3); row += 1 + grid.addWidget(self.btn_apply, row, 0) grid.addWidget(self.btn_read, row, 1) grid.addWidget(self.btn_on, row, 2) grid.addWidget(self.btn_off, row, 3); row += 1 - grid.addWidget(preset_lbl, row, 0) grid.addWidget(self.preset_num, row, 1) grid.addWidget(self.btn_store, row, 2) grid.addWidget(self.btn_recall, row, 3) + + grid.setColumnStretch(7, 1) self.setLayout(grid) + # Signals self.btn_apply.clicked.connect(self._emit_apply) self.btn_read.clicked.connect(lambda: self.readback_basic.emit()) self.btn_on.clicked.connect(lambda: self.toggle_output.emit(True)) self.btn_off.clicked.connect(lambda: self.toggle_output.emit(False)) self.btn_store.clicked.connect(lambda: self.store_slot.emit(self.preset_num.value())) self.btn_recall.clicked.connect(lambda: self.recall_slot.emit(self.preset_num.value())) + self.wave.currentTextChanged.connect(self._update_context) + self.amp_mode.currentTextChanged.connect(self._update_amp_mode) + + self._update_context() + self._update_amp_mode() + + def _update_amp_mode(self): + amp_mode = self.amp_mode.currentText() + show_levels = (amp_mode != "AMP") + self.amp.setEnabled(not show_levels) + self.hlev.setEnabled(show_levels) + self.llev.setEnabled(show_levels) + + def _update_context(self): + w = self.wave.currentText() + # enable all by default + widgets = [ + self.duty, self.sym, self.width, self.rise, self.fall, self.dly, + self.mean, self.stdev, self.band, self.arb_name, + self.ph, self.off, self.amp, self.hlev, self.llev, self.fp_val + ] + for wid in widgets: + wid.setEnabled(True) + + if w == "SINE": + self._set_extras_enabled(duty=False, sym=False, pulse=False, noise=False, arb=False) + self.ph.setEnabled(True); self.off.setEnabled(True) + elif w == "SQUARE": + self._set_extras_enabled(duty=True, sym=False, pulse=False, noise=False, arb=False) + self.ph.setEnabled(True) + elif w == "RAMP": + self._set_extras_enabled(duty=False, sym=True, pulse=False, noise=False, arb=False) + self.ph.setEnabled(True) + elif w == "PULSE": + self._set_extras_enabled(duty=True, sym=False, pulse=True, noise=False, arb=False) + self.ph.setEnabled(False) + elif w == "NOISE": + self._set_extras_enabled(duty=False, sym=False, pulse=False, noise=True, arb=False) + self.fp_val.setEnabled(False) + self.amp.setEnabled(False); self.hlev.setEnabled(False); self.llev.setEnabled(False) + self.off.setEnabled(False); self.ph.setEnabled(False) + elif w == "ARB": + self._set_extras_enabled(duty=False, sym=False, pulse=False, noise=False, arb=True) + self.ph.setEnabled(True) + elif w == "DC": + self._set_extras_enabled(duty=False, sym=False, pulse=False, noise=False, arb=False) + self.fp_val.setEnabled(False) + self.amp.setEnabled(False); self.hlev.setEnabled(False); self.llev.setEnabled(False) + self.ph.setEnabled(False) + + def _set_extras_enabled(self, *, duty: bool, sym: bool, pulse: bool, noise: bool, arb: bool): + self.duty.setEnabled(duty) + self.sym.setEnabled(sym) + for w in (self.width, self.rise, self.fall, self.dly): + w.setEnabled(pulse) + self.mean.setEnabled(noise) + self.stdev.setEnabled(noise) + self.band.setEnabled(noise) + self.arb_name.setEnabled(arb) def _emit_apply(self): - cfg = dict( - WVTP=self.wave.currentText(), - FRQ=human_to_eng(self.frq.text()), - AMP=human_to_eng(self.amp.text()), - OFST=human_to_eng(self.off.text()), - PHSE=human_to_eng(self.ph.text()), - ) + w = self.wave.currentText() + cfg = {"WVTP": w} + + if w not in ("NOISE", "DC"): + key = self.fp_mode.currentText() + cfg[key] = human_to_eng(self.fp_val.text()) + + if w not in ("NOISE", "DC"): + if self.amp_mode.currentText() == "AMP": + cfg["AMP"] = human_to_eng(self.amp.text()) + else: + cfg["HLEV"] = human_to_eng(self.hlev.text()) + cfg["LLEV"] = human_to_eng(self.llev.text()) + + if w != "NOISE": + cfg["OFST"] = human_to_eng(self.off.text()) + + if w in ("SINE", "SQUARE", "RAMP", "ARB"): + cfg["PHSE"] = human_to_eng(self.ph.text()) + + if w in ("SQUARE", "PULSE"): + cfg["DUTY"] = human_to_eng(self.duty.text()) + if w == "RAMP": + cfg["SYM"] = human_to_eng(self.sym.text()) + if w == "PULSE": + cfg["WIDTH"] = human_to_eng(self.width.text()) + cfg["RISE"] = human_to_eng(self.rise.text()) + cfg["FALL"] = human_to_eng(self.fall.text()) + cfg["DLY"] = human_to_eng(self.dly.text()) + if w == "NOISE": + cfg["MEAN"] = human_to_eng(self.mean.text()) + cfg["STDEV"] = human_to_eng(self.stdev.text()) + cfg["BANDSTATE"] = "ON" if self.band.isChecked() else "OFF" + if w == "ARB" and self.arb_name.text().strip(): + cfg["ARWV_NAME"] = self.arb_name.text().strip() + self.apply_basic.emit(cfg) class PresetsTab(QtWidgets.QWidget): @@ -379,7 +560,7 @@ class PresetsTab(QtWidgets.QWidget): self.btn_load = QtWidgets.QPushButton("Reload File") self.btn_save = QtWidgets.QPushButton("Save Names") self.btn_open = QtWidgets.QPushButton("Open File...") - self.path_label = QtWidgets.QLabel(PRESET_FILE) + self.path_label = QtWidgets.QLabel(PRESET_BASENAME) layout = QtWidgets.QVBoxLayout() hl = QtWidgets.QHBoxLayout() @@ -393,7 +574,7 @@ class PresetsTab(QtWidgets.QWidget): self.btn_open.clicked.connect(self._open_file) def _open_file(self): - path, _ = QtWidgets.QFileDialog.getOpenFileName(self, "Open Preset File", PRESET_FILE, "Text (*.txt *.dat);;All Files (*)") + path, _ = QtWidgets.QFileDialog.getOpenFileName(self, "Open Preset File", PRESET_BASENAME, "Text (*.txt *.dat);;All Files (*)") if not path: return self.path_label.setText(path) @@ -425,7 +606,91 @@ class PresetsTab(QtWidgets.QWidget): self.table.item(i-1, 1).setText(name) self.table.item(i-1, 2).setText(summ) -# -------------------- Other tabs (Burst, Sweep, ARB, CLI) -------------------- +class ConfigTab(QtWidgets.QWidget): + changed = QtCore.pyqtSignal(dict) # {"PRESET_DIR":..., "SCREENSHOT_DIR":...} + + def __init__(self, initial: dict): + super().__init__() + self._build(initial) + + def _build(self, cfg: dict): + self.preset_dir = QtWidgets.QLineEdit(cfg.get("PRESET_DIR","")) + self.ss_dir = QtWidgets.QLineEdit(cfg.get("SCREENSHOT_DIR","")) + btn_browse_p = QtWidgets.QPushButton("…") + btn_browse_s = QtWidgets.QPushButton("…") + self.btn_save = QtWidgets.QPushButton("Save Config") + self.btn_reload = QtWidgets.QPushButton("Reload Config") + + g = QtWidgets.QGridLayout(); row = 0 + g.addWidget(QtWidgets.QLabel("Preset dir"), row, 0); g.addWidget(self.preset_dir, row, 1); g.addWidget(btn_browse_p, row, 2); row += 1 + g.addWidget(QtWidgets.QLabel("Screenshot dir"), row, 0); g.addWidget(self.ss_dir, row, 1); g.addWidget(btn_browse_s, row, 2); row += 1 + g.addWidget(self.btn_save, row, 0); g.addWidget(self.btn_reload, row, 1); g.setColumnStretch(1,1) + self.setLayout(g) + + btn_browse_p.clicked.connect(self._pick_preset_dir) + btn_browse_s.clicked.connect(self._pick_ss_dir) + self.btn_save.clicked.connect(self._save) + self.btn_reload.clicked.connect(self._reload) + + def _pick_preset_dir(self): + d = QtWidgets.QFileDialog.getExistingDirectory(self, "Select Preset Directory", self.preset_dir.text().strip() or os.getcwd()) + if d: + self.preset_dir.setText(d) + + def _pick_ss_dir(self): + d = QtWidgets.QFileDialog.getExistingDirectory(self, "Select Screenshot Directory", self.ss_dir.text().strip() or os.getcwd()) + if d: + self.ss_dir.setText(d) + + def _save(self): + d = {"PRESET_DIR": self.preset_dir.text().strip(), "SCREENSHOT_DIR": self.ss_dir.text().strip()} + self.changed.emit(d) + + def _reload(self): + d = load_config(CONFIG_FILE) + self.preset_dir.setText(d.get("PRESET_DIR","")) + self.ss_dir.setText(d.get("SCREENSHOT_DIR","")) + +class CLITab(QtWidgets.QWidget): + send_cmd = QtCore.pyqtSignal(str) + query_cmd = QtCore.pyqtSignal(str) + + def __init__(self): + super().__init__() + self._build() + + def _build(self): + self.inp = QtWidgets.QLineEdit() + self.inp.setPlaceholderText("Enter SCPI, e.g. C1:BSWV? or *IDN?") + btn_send = QtWidgets.QPushButton("Send") + btn_query = QtWidgets.QPushButton("Query") + self.out = QtWidgets.QPlainTextEdit(); self.out.setReadOnly(True); self.out.setMinimumHeight(200) + + grid = QtWidgets.QGridLayout(); row = 0 + grid.addWidget(QtWidgets.QLabel("Command"), row, 0) + grid.addWidget(self.inp, row, 1, 1, 4) + grid.addWidget(btn_send, row, 5) + grid.addWidget(btn_query, row, 6); row += 1 + grid.addWidget(self.out, row, 0, 1, 7) + self.setLayout(grid) + + btn_send.clicked.connect(self._do_send) + btn_query.clicked.connect(self._do_query) + self.inp.returnPressed.connect(self._do_query) + + def append(self, s: str): + self.out.appendPlainText(s) + + def _do_send(self): + cmd = self.inp.text().strip() + if cmd: + self.send_cmd.emit(cmd) + + def _do_query(self): + cmd = self.inp.text().strip() + if cmd: + self.query_cmd.emit(cmd) + class BurstTab(QtWidgets.QWidget): apply_burst = QtCore.pyqtSignal(dict) readback_burst = QtCore.pyqtSignal() @@ -598,46 +863,6 @@ class ARBTab(QtWidgets.QWidget): return self.download_wave.emit(name, path) -class CLITab(QtWidgets.QWidget): - send_cmd = QtCore.pyqtSignal(str) - query_cmd = QtCore.pyqtSignal(str) - - def __init__(self): - super().__init__() - self._build() - - def _build(self): - self.inp = QtWidgets.QLineEdit() - self.inp.setPlaceholderText("Enter SCPI, e.g. C1:BSWV? or *IDN?") - btn_send = QtWidgets.QPushButton("Send") - btn_query = QtWidgets.QPushButton("Query") - self.out = QtWidgets.QPlainTextEdit(); self.out.setReadOnly(True); self.out.setMinimumHeight(220) - - grid = QtWidgets.QGridLayout(); row = 0 - grid.addWidget(QtWidgets.QLabel("Command"), row, 0) - grid.addWidget(self.inp, row, 1, 1, 4) - grid.addWidget(btn_send, row, 5) - grid.addWidget(btn_query, row, 6); row += 1 - grid.addWidget(self.out, row, 0, 1, 7) - self.setLayout(grid) - - btn_send.clicked.connect(self._do_send) - btn_query.clicked.connect(self._do_query) - self.inp.returnPressed.connect(self._do_query) - - def append(self, s: str): - self.out.appendPlainText(s) - - def _do_send(self): - cmd = self.inp.text().strip() - if cmd: - self.send_cmd.emit(cmd) - - def _do_query(self): - cmd = self.inp.text().strip() - if cmd: - self.query_cmd.emit(cmd) - # -------------------- Main Window -------------------- class Main(QtWidgets.QWidget): def __init__(self, preset_ip=None): @@ -645,6 +870,12 @@ class Main(QtWidgets.QWidget): self.setWindowTitle(Window_Name) self.i = SDGLan() self._scr_worker = None + + self.cfg = load_config(CONFIG_FILE) + ensure_dir(self.cfg["PRESET_DIR"]) + ensure_dir(self.cfg["SCREENSHOT_DIR"]) + self.preset_path = os.path.join(self.cfg["PRESET_DIR"], PRESET_BASENAME) + self._build(preset_ip) def _build(self, preset_ip): @@ -668,8 +899,9 @@ class Main(QtWidgets.QWidget): self.t_arb = ARBTab(); self.tabs.addTab(self.t_arb, "ARB Manager") self.t_cli = CLITab(); self.tabs.addTab(self.t_cli, "SCPI CLI") self.t_presets = PresetsTab(); self.tabs.addTab(self.t_presets, "Presets") + self.t_config = ConfigTab(self.cfg); self.tabs.addTab(self.t_config, "Config") - self.log = QtWidgets.QPlainTextEdit(); self.log.setReadOnly(True); self.log.setMinimumHeight(220) + self.log = QtWidgets.QPlainTextEdit(); self.log.setReadOnly(True); self.log.setMinimumHeight(200) top = QtWidgets.QGridLayout(); row = 0 top.addWidget(ip_lbl, row, 0); top.addWidget(self.ip, row, 1, 1, 3) @@ -697,19 +929,21 @@ class Main(QtWidgets.QWidget): self.t_burst.readback_burst.connect(self.on_readback_burst) self.t_sweep.apply_sweep.connect(self.on_apply_sweep) self.t_sweep.readback_sweep.connect(self.on_readback_sweep) + self.t_arb.refresh_lists.connect(self.on_arb_refresh) self.t_arb.set_wave.connect(self.on_arb_set) self.t_arb.upload_wave.connect(self.on_arb_upload) self.t_arb.download_wave.connect(self.on_arb_download) + self.t_cli.send_cmd.connect(self.on_cli_send) self.t_cli.query_cmd.connect(self.on_cli_query) - self.t_presets.rename_slot.connect(self.on_rename_slot) - self.t_presets.view_refresh.connect(self.refresh_presets_view) + self.t_config.changed.connect(self.on_config_changed) - ensure_preset_file(PRESET_FILE) + ensure_preset_file(self.preset_path) self.refresh_presets_view() + # --------- helpers --------- def logln(self, s: str): self.log.appendPlainText(s) @@ -727,69 +961,87 @@ class Main(QtWidgets.QWidget): def on_idn(self): try: - self.logln(f"*IDN? -> {self.i.query('*IDN?')}") + self.logln(f"*IDN? -> {self.i.query_retry('*IDN?')}") except Exception as e: self.logln(f"[ERR] IDN: {e}") def on_err(self): try: - self.logln(f"SYST:ERR? -> {self.i.query('SYST:ERR?')}") + self.logln(f"SYST:ERR? -> {self.i.query_retry('SYST:ERR?')}") except Exception as e: self.logln(f"[ERR] SYST:ERR?: {e}") - # ---------- Screenshot (threaded) ---------- + # ---------- Screenshot ---------- def on_screenshot(self): if not self.i.sock: self.logln("[ERR] screenshot: not connected") return - if self._scr_worker and self._scr_worker.isRunning(): + if hasattr(self, "_scr_worker") and self._scr_worker and self._scr_worker.isRunning(): self.logln("[ERR] screenshot already running") return - self.btn_scr.setEnabled(False) - self.setCursor(QtCore.Qt.WaitCursor) - self._scr_worker = ScreenshotWorker(self.i, self) + self.btn_scr.setEnabled(False); self.setCursor(QtCore.Qt.WaitCursor) + outdir = load_config(CONFIG_FILE)["SCREENSHOT_DIR"] + self._scr_worker = ScreenshotWorker(self.i, outdir, self) self._scr_worker.done.connect(self._on_screenshot_done) self._scr_worker.start() - def _on_screenshot_done(self, data: bytes, fn: str, err: str): - self.btn_scr.setEnabled(True) - self.unsetCursor() + def _on_screenshot_done(self, data: bytes, full_path: str, err: str): + self.btn_scr.setEnabled(True); self.unsetCursor() if err: self.logln(f"[ERR] screenshot: {err}") return try: - with open(fn, "wb") as f: + with open(full_path, "wb") as f: f.write(data) - self.logln(f"Screenshot saved to {fn} ({len(data)} bytes)") - img = QtGui.QImage(fn) + self.logln(f"Screenshot saved to {full_path} ({len(data)} bytes)") + img = QtGui.QImage(full_path) if img.isNull(): - self.logln("[ERR] screenshot: Qt could not decode image; file kept") + self.logln("[ERR] screenshot: Qt decode failed") return pixmap = QtGui.QPixmap.fromImage(img) lbl = QtWidgets.QLabel(); lbl.setPixmap(pixmap) w = QtWidgets.QWidget(); l = QtWidgets.QVBoxLayout(w); l.addWidget(lbl) - w.setWindowTitle("SDG2042X Screenshot") - w.resize(pixmap.width(), pixmap.height()) - w.show() + w.setWindowTitle("SDG2042X Screenshot"); w.resize(pixmap.width(), pixmap.height()); w.show() self._scr_win = w except Exception as e: self.logln(f"[ERR] screenshot save/display: {e}") - # ---------- Basic ---------- + # ---------- Basic apply/readback ---------- def on_apply_basic(self, cfg: dict): try: - cmd = (f"{self._pref()}BSWV " - f"WVTP,{cfg['WVTP']},FRQ,{cfg['FRQ']},AMP,{cfg['AMP']}," - f"OFST,{cfg['OFST']},PHSE,{cfg['PHSE']}") - self.i.write(cmd) - self.logln(f"SET: {cmd}") + parts = [f"WVTP,{cfg['WVTP']}"] + if 'FRQ' in cfg: + parts.append(f"FRQ,{cfg['FRQ']}") + if 'PERI' in cfg: + parts.append(f"PERI,{cfg['PERI']}") + if 'AMP' in cfg: + parts.append(f"AMP,{cfg['AMP']}") + else: + if 'HLEV' in cfg: parts.append(f"HLEV,{cfg['HLEV']}") + if 'LLEV' in cfg: parts.append(f"LLEV,{cfg['LLEV']}") + if 'OFST' in cfg: + parts.append(f"OFST,{cfg['OFST']}") + if 'PHSE' in cfg: + parts.append(f"PHSE,{cfg['PHSE']}") + for key in ("DUTY","SYM","WIDTH","RISE","FALL","DLY","MEAN","STDEV","BANDSTATE"): + if key in cfg: + parts.append(f"{key},{cfg[key]}") + + cmd = f"{self._pref()}BSWV " + ",".join(parts) + self.i.write(cmd); self.logln(f"SET: {cmd}") + + if cfg.get("WVTP") == "ARB" and cfg.get("ARWV_NAME"): + qname = quote_name(cfg["ARWV_NAME"]) + self.i.write(f"{self._pref()}ARWV NAME,{qname}") + self.logln(f"SET ARWV NAME,{qname}") + except Exception as e: self.logln(f"[ERR] basic apply: {e}") def on_readback_basic(self): try: - rb = self.i.query(f"{self._pref()}BSWV?") - st = self.i.query(f"{self._pref()}OUTP?") + rb = self.i.query_retry(f"{self._pref()}BSWV?") + st = self.i.query_retry(f"{self._pref()}OUTP?") self.logln(f"{self._pref()}BSWV? -> {rb}") self.logln(f"{self._pref()}OUTP? -> {st}") except Exception as e: @@ -799,7 +1051,7 @@ class Main(QtWidgets.QWidget): try: cmd = f"{self._pref()}OUTP {'ON' if turn_on else 'OFF'}" self.i.write(cmd) - st = self.i.query(f"{self._pref()}OUTP?") + st = self.i.query_retry(f"{self._pref()}OUTP?") self.logln(f"SET: {cmd}") self.logln(f"{self._pref()}OUTP? -> {st}") except Exception as e: @@ -808,47 +1060,29 @@ class Main(QtWidgets.QWidget): # ---------- Burst ---------- def on_apply_burst(self, cfg: dict): try: - parts = [ - f"STATE,{cfg['STATE']}", - f"MODE,{cfg['MODE']}", - f"TRSR,{cfg['TRSR']}", - f"NCYC,{cfg['NCYC']}", - f"DLAY,{cfg['DLAY']}", - f"GATEPOL,{cfg['GATEPOL']}", - ] + parts = [f"{k},{v}" for k, v in cfg.items()] cmd = f"{self._pref()}BTWV " + ",".join(parts) - self.i.write(cmd) - self.logln(f"SET: {cmd}") + self.i.write(cmd); self.logln(f"SET: {cmd}") except Exception as e: self.logln(f"[ERR] burst apply: {e}") def on_readback_burst(self): try: - rb = self.i.query(f"{self._pref()}BTWV?") - self.logln(f"{self._pref()}BTWV? -> {rb}") + rb = self.i.query_retry(f"{self._pref()}BTWV?"); self.logln(f"{self._pref()}BTWV? -> {rb}") except Exception as e: self.logln(f"[ERR] burst readback: {e}") # ---------- Sweep ---------- def on_apply_sweep(self, cfg: dict): try: - parts = [ - f"STATE,{cfg['STATE']}", - f"WAV,{cfg['WAV']}", - f"STAR,{cfg['STAR']}", - f"STOP,{cfg['STOP']}", - f"TIME,{cfg['TIME']}", - f"DIR,{cfg['DIR']}", - f"TRSR,{cfg['TRSR']}", - ] + parts = [f"{k},{v}" for k, v in cfg.items()] cmd = f"{self._pref()}SWWV " + ",".join(parts) self.i.write(cmd) - err = self.i.query("SYST:ERR?") + err = self.i.query_retry("SYST:ERR?") if not err.startswith("0"): - parts[1] = f"SPAC,{cfg['WAV']}" # fallback for older FW - cmd2 = f"{self._pref()}SWWV " + ",".join(parts) - self.i.write(cmd2) - self.logln(f"SET (fallback): {cmd2}") + parts2 = [p.replace("WAV,", "SPAC,") for p in parts] + cmd2 = f"{self._pref()}SWWV " + ",".join(parts2) + self.i.write(cmd2); self.logln(f"SET (fallback): {cmd2}") else: self.logln(f"SET: {cmd}") except Exception as e: @@ -856,26 +1090,23 @@ class Main(QtWidgets.QWidget): def on_readback_sweep(self): try: - rb = self.i.query(f"{self._pref()}SWWV?") - self.logln(f"{self._pref()}SWWV? -> {rb}") + rb = self.i.query_retry(f"{self._pref()}SWWV?"); self.logln(f"{self._pref()}SWWV? -> {rb}") except Exception as e: self.logln(f"[ERR] sweep readback: {e}") # ---------- ARB ops ---------- def on_arb_refresh(self): try: - b = self.i.query("STL? BUILDIN") - u = self.i.query("STL? USER") + b = self.i.query_retry("STL? BUILDIN") + u = self.i.query_retry("STL? USER") def parse_list(s: str): - parts = [p.strip() for p in s.replace(";", ",").split(',') if p.strip()] - return parts + return [p.strip() for p in s.replace(";", ",").split(',') if p.strip()] b_list = parse_list(b) u_list = parse_list(u) self.t_arb.built_in.clear(); self.t_arb.user.clear() self.t_arb.built_in.addItems(b_list) self.t_arb.user.addItems(u_list) - self.logln(f"BUILDIN: {b_list}") - self.logln(f"USER: {u_list}") + self.logln(f"BUILDIN: {b_list}"); self.logln(f"USER: {u_list}") except Exception as e: self.logln(f"[ERR] ARB refresh: {e}") @@ -884,7 +1115,7 @@ class Main(QtWidgets.QWidget): qname = quote_name(name) self.i.write(f"{self._pref()}BSWV WVTP,ARB") self.i.write(f"{self._pref()}ARWV NAME,{qname}") - rb = self.i.query(f"{self._pref()}ARWV?") + rb = self.i.query_retry(f"{self._pref()}ARWV?") self.logln(f"SET ARB: {qname}; ARWV? -> {rb}") except Exception as e: self.logln(f"[ERR] ARB set: {e}") @@ -948,7 +1179,7 @@ class Main(QtWidgets.QWidget): def on_cli_query(self, cmd: str): try: - resp = self.i.query(cmd) + resp = self.i.query_retry(cmd) self.t_cli.append(f"? {cmd}\n< {resp}") except Exception as e: self.t_cli.append(f"[ERR] query: {e}") @@ -958,13 +1189,13 @@ class Main(QtWidgets.QWidget): try: if slot < 1 or slot > 10: raise ValueError("slot out of range") - bswv = self.i.query(f"{self._pref()}BSWV?") - outp = self.i.query(f"{self._pref()}OUTP?") - outp_state = "ON" if "ON" in outp.upper().split(",")[0] else "OFF" - d = read_presets(PRESET_FILE) + bswv = self.i.query_retry(f"{self._pref()}BSWV?") + outp = self.i.query_retry(f"{self._pref()}OUTP?") + outp_state = "ON" if "ON" in outp.upper().split(',')[0] else "OFF" + d = read_presets(self.preset_path) d[f"SLOT{slot}_BSWV"] = bswv d[f"SLOT{slot}_OUTP"] = outp_state - write_presets(PRESET_FILE, d) + write_presets(self.preset_path, d) self.logln(f"Stored slot {slot}: BSWV len={len(bswv)}, OUTP={outp_state}") self.refresh_presets_view() except Exception as e: @@ -974,7 +1205,7 @@ class Main(QtWidgets.QWidget): try: if slot < 1 or slot > 10: raise ValueError("slot out of range") - d = read_presets(PRESET_FILE) + d = read_presets(self.preset_path) b = d.get(f"SLOT{slot}_BSWV", "").strip() o = d.get(f"SLOT{slot}_OUTP", "").strip().upper() if not b: @@ -988,20 +1219,39 @@ class Main(QtWidgets.QWidget): def on_rename_slot(self, slot: int, name: str): try: - d = read_presets(PRESET_FILE) + d = read_presets(self.preset_path) d[f"SLOT{slot}_NAME"] = name - write_presets(PRESET_FILE, d) + write_presets(self.preset_path, d) self.refresh_presets_view() except Exception as e: self.logln(f"[ERR] rename slot {slot}: {e}") def refresh_presets_view(self): try: - d = read_presets(PRESET_FILE) + ensure_preset_file(self.preset_path) + d = read_presets(self.preset_path) self.t_presets.update_from_presets(d) + self.logln(f"Preset file: {self.preset_path}") except Exception as e: self.logln(f"[ERR] refresh presets: {e}") + # ---------- Config ---------- + def on_config_changed(self, newd: dict): + try: + nd = { + "PRESET_DIR": _expand(newd.get("PRESET_DIR","")) or os.getcwd(), + "SCREENSHOT_DIR": _expand(newd.get("SCREENSHOT_DIR","")) or os.getcwd(), + } + ensure_dir(nd["PRESET_DIR"]); ensure_dir(nd["SCREENSHOT_DIR"]) + save_config(CONFIG_FILE, nd) + self.cfg = nd + self.preset_path = os.path.join(self.cfg["PRESET_DIR"], PRESET_BASENAME) + ensure_preset_file(self.preset_path) + self.refresh_presets_view() + self.logln(f"Config saved -> {CONFIG_FILE}") + except Exception as e: + self.logln(f"[ERR] save config: {e}") + # -------------------- CLI entry -------------------- def show_help(): print("Usage: sdg2042x_gui.py [options]\n") @@ -1019,6 +1269,6 @@ if __name__ == "__main__": preset_ip = argv[i + 1] app = QtWidgets.QApplication(sys.argv) m = Main(preset_ip=preset_ip) - m.resize(1100, 680) + m.resize(1250, 780) m.show() sys.exit(app.exec_()) diff --git a/script/SDG2042x.dat b/script/SDG2042x.dat deleted file mode 100644 index ba6681a..0000000 --- a/script/SDG2042x.dat +++ /dev/null @@ -1,31 +0,0 @@ -# SDG2042X presets -SLOT1_NAME=Sine 1kHz -SLOT1_BSWV=C1:BSWV WVTP,SINE,FRQ,1000HZ,PERI,0.001S,AMP,3V,AMPVRMS,1.0605Vrms,AMPDBM,13.5205dBm,MAX_OUTPUT_AMP,20V,OFST,0V,HLEV,1.5V,LLEV,-1.5V,PHSE,0 -SLOT1_OUTP=ON -SLOT2_NAME= -SLOT2_BSWV= -SLOT2_OUTP= -SLOT3_NAME= -SLOT3_BSWV= -SLOT3_OUTP= -SLOT4_NAME= -SLOT4_BSWV= -SLOT4_OUTP= -SLOT5_NAME= -SLOT5_BSWV= -SLOT5_OUTP= -SLOT6_NAME= -SLOT6_BSWV= -SLOT6_OUTP= -SLOT7_NAME= -SLOT7_BSWV= -SLOT7_OUTP= -SLOT8_NAME= -SLOT8_BSWV= -SLOT8_OUTP= -SLOT9_NAME= -SLOT9_BSWV= -SLOT9_OUTP= -SLOT10_NAME= -SLOT10_BSWV= -SLOT10_OUTP= diff --git a/script/archive/SDG2042X_V0.0.py b/script/archive/SDG2042X_V0.0.py new file mode 100755 index 0000000..0eacf4d --- /dev/null +++ b/script/archive/SDG2042X_V0.0.py @@ -0,0 +1,1024 @@ +#!/usr/bin/env python3 +# Siglent SDG2042X Linux GUI (PyQt5) +# Features: +# - Basic, Burst, Sweep, +# - ARB Manager, SCPI CLI, +# - Storing Presets, storing to editable text file SDG2042x.dat +# - Screenshot +# +# Author: Thomas Gohle / tgohle@togo-lab.io +# www.togo-lab.io +# tgonet.de / gohle.de +# +# Copyright: CC BY-SA 4.0 +# https://creativecommons.org/licenses/by-sa/4.0/ +# +# Version: 0.1 +#---------------------------------------------------- + +import os +import socket +import sys +import time +from PyQt5 import QtWidgets, QtCore, QtGui + +# -------------------- Constants -------------------- +Window_Name = "Linux GUI SDG2042X – by Thomas Gohle - ToGo-Lab - Ver 0.1" +DEFAULT_PORT = 5025 +SOCKET_TIMEOUT = 2.5 +SIGLENT_WAVES = ["SINE", "SQUARE", "RAMP", "PULSE", "NOISE", "ARB", "DC"] +PRESET_FILE = "SDG2042x.dat" +NUM_SLOTS = 10 + +# -------------------- Transport -------------------- +class SDGLan: + def __init__(self): + self.sock = None + self.addr = None + self.port = DEFAULT_PORT + + def connect(self, addr, port=DEFAULT_PORT): + self.close() + self.addr = addr + self.port = port + self.sock = socket.create_connection((addr, port), timeout=SOCKET_TIMEOUT) + self.sock.settimeout(SOCKET_TIMEOUT) + + def close(self): + try: + if self.sock: + self.sock.close() + finally: + self.sock = None + + def write(self, cmd: str): + if not self.sock: + raise RuntimeError("Not connected") + if not cmd.endswith("\n"): + cmd += "\n" + self.sock.sendall(cmd.encode("ascii")) + + def write_bytes(self, payload: bytes): + if not self.sock: + raise RuntimeError("Not connected") + self.sock.sendall(payload) + + def query(self, cmd: str) -> str: + self.write(cmd) + data = self._recv_line() + return data.strip() + + def query_raw(self, cmd: str) -> bytes: + self.write(cmd) + return self._recv_until_timeout() + + # ---- block helpers ---- + def _recv_exact(self, n: int) -> bytes: + buf = bytearray() + while len(buf) < n: + chunk = self.sock.recv(n - len(buf)) + if not chunk: + break + buf.extend(chunk) + return bytes(buf) + + def _recv_exact_capped(self, n: int, cap: int) -> bytes: + if n > cap: + raise RuntimeError(f"SCPI block length {n} exceeds cap {cap}") + return self._recv_exact(n) + + def query_block(self, cmd: str, timeout_s: float = 8.0, cap: int = 25_000_000) -> bytes: + """Read a SCPI definite-length block (#). Returns payload without header.""" + if not self.sock: + raise RuntimeError("Not connected") + prev_to = self.sock.gettimeout() + try: + self.sock.settimeout(timeout_s) + self.write(cmd) + # seek '#' + first = self._recv_exact(1) + while first and first != b"#": + first = self._recv_exact(1) + if not first: + return b"" + ndig_b = self._recv_exact(1) + if not ndig_b or not ndig_b.isdigit(): + return b"" + ndig = int(ndig_b.decode("ascii")) + len_str = self._recv_exact(ndig).decode("ascii") + if not len_str.isdigit(): + return b"" + length = int(len_str) + payload = self._recv_exact_capped(length, cap) + return payload + finally: + try: + self.sock.settimeout(prev_to) + except Exception: + pass + + def query_scdp(self, timeout_s: float = 15.0, cap: int = 50_000_000) -> bytes: + """Robust screenshot: handles BMP direct or SCPI block-wrapped payload.""" + if not self.sock: + raise RuntimeError("Not connected") + prev_to = self.sock.gettimeout() + try: + self.sock.settimeout(timeout_s) + # Send SCDP + self.write("SCDP") + # Peek first two bytes + first2 = self._recv_exact(2) + if not first2: + return b"" + # Case 1: BMP direct + if first2 == b"BM": + size_bytes = self._recv_exact(4) + if len(size_bytes) != 4: + return b"" + total_size = int.from_bytes(size_bytes, "little") + # We already have 6 bytes + rest = self._recv_exact_capped(total_size - 6, cap) + return first2 + size_bytes + rest + # Case 2: SCPI block + if first2[:1] == b"#": + ndig = int(first2[1:2].decode()) + len_str = self._recv_exact(ndig).decode("ascii") + if not len_str.isdigit(): + return b"" + length = int(len_str) + payload = self._recv_exact_capped(length, cap) + return payload + # Case 3: unknown header, read until timeout + tail = self._recv_until_timeout() + return first2 + tail + finally: + try: + self.sock.settimeout(prev_to) + except Exception: + pass + + def _recv_line(self) -> str: + chunks = [] + while True: + b = self.sock.recv(4096) + if not b: + break + chunks.append(b) + if b.endswith(b"\n"): + break + return b"".join(chunks).decode("ascii", errors="ignore") + + def _recv_until_timeout(self) -> bytes: + data = bytearray() + while True: + try: + b = self.sock.recv(65536) + if not b: + break + data.extend(b) + except socket.timeout: + break + return bytes(data) + +# -------------------- Utilities -------------------- +def human_to_eng(s: str) -> str: + """Convert '1k', '2.5M', '500mV', '10ms' etc. to float string (base units).""" + t = s.strip().lower().replace("vpp", "").replace("v", "") + t = t.replace("hz", "").replace("deg", "") + mult = 1.0 + if t.endswith("ms"): + mult = 1e-3; t = t[:-2] + elif t.endswith("us"): + mult = 1e-6; t = t[:-2] + elif t.endswith("ns"): + mult = 1e-9; t = t[:-2] + elif t.endswith("s"): + mult = 1.0; t = t[:-1] + elif t.endswith("khz"): + mult = 1e3; t = t[:-3] + elif t.endswith("mhz"): + mult = 1e6; t = t[:-3] + elif t.endswith("ghz"): + mult = 1e9; t = t[:-3] + elif t.endswith("k"): + mult = 1e3; t = t[:-1] + elif t.endswith("m") and not t.endswith("mm"): + mult = 1e-3; t = t[:-1] + elif t.endswith("u"): + mult = 1e-6; t = t[:-1] + elif t.endswith("n"): + mult = 1e-9; t = t[:-1] + elif t.endswith("g"): + mult = 1e9; t = t[:-1] + try: + val = float(t) + except ValueError: + return s.strip() + return f"{val*mult}" + +def quote_name(name: str) -> str: + """Quote ARB names if they contain spaces or punctuation.""" + safe = all(ch.isalnum() or ch in ("_", "-") for ch in name) + return name if safe else '"%s"' % name + +def ensure_preset_file(path: str): + if not os.path.exists(path): + with open(path, "w", encoding="utf-8") as f: + f.write("# SDG2042X presets\n") + for i in range(1, 11): + f.write(f"SLOT{i}_NAME=\n") + f.write(f"SLOT{i}_BSWV=\n") + f.write(f"SLOT{i}_OUTP=\n") + return path + +def read_presets(path: str) -> dict: + ensure_preset_file(path) + presets = {} + with open(path, "r", encoding="utf-8") as f: + for line in f: + line = line.strip() + if not line or line.startswith("#"): + continue + if "=" not in line: + continue + k, v = line.split("=", 1) + presets[k.strip()] = v.strip() + return presets + +def write_presets(path: str, d: dict): + lines = ["# SDG2042X presets"] + for i in range(1, 11): + name = d.get(f"SLOT{i}_NAME", "") + bswv = d.get(f"SLOT{i}_BSWV", "") + outp = d.get(f"SLOT{i}_OUTP", "") + lines.append(f"SLOT{i}_NAME={name}") + lines.append(f"SLOT{i}_BSWV={bswv}") + lines.append(f"SLOT{i}_OUTP={outp}") + with open(path, "w", encoding="utf-8") as f: + f.write("\n".join(lines) + "\n") + +# -------------------- Screenshot worker (threaded) -------------------- +class ScreenshotWorker(QtCore.QThread): + done = QtCore.pyqtSignal(bytes, str, str) # data, filename, error ('' if ok) + + def __init__(self, io: SDGLan, parent=None): + super().__init__(parent) + self.io = io + + def run(self): + try: + # Clear previous errors that may reference old invalid commands + try: + self.io.write("*CLS") + except Exception: + pass + # Drain any residual bytes from prior operations + try: + _ = self.io._recv_until_timeout() + except Exception: + pass + data = self.io.query_scdp(timeout_s=20.0, cap=50_000_000) + if not data: + err = self.io.query("SYST:ERR?") + raise RuntimeError(f"no image data; last SYST:ERR? -> {err}") + ts = time.strftime("%Y%m%d-%H%M%S") + fn = f"{ts}_SDG2042X.bmp" if data.startswith(b"BM") else f"{ts}_SDG2042X.bin" + self.done.emit(data, fn, "") + except Exception as e: + self.done.emit(b"", "", str(e)) + +# -------------------- Tabs -------------------- +class BasicTab(QtWidgets.QWidget): + apply_basic = QtCore.pyqtSignal(dict) + readback_basic = QtCore.pyqtSignal() + toggle_output = QtCore.pyqtSignal(bool) + store_slot = QtCore.pyqtSignal(int) + recall_slot = QtCore.pyqtSignal(int) + + def __init__(self): + super().__init__() + self._build() + + def _build(self): + wave_lbl = QtWidgets.QLabel("Waveform") + self.wave = QtWidgets.QComboBox(); self.wave.addItems(SIGLENT_WAVES) + frq_lbl = QtWidgets.QLabel("Freq") + self.frq = QtWidgets.QLineEdit("1 kHz") + amp_lbl = QtWidgets.QLabel("Amp Vpp") + self.amp = QtWidgets.QLineEdit("1.0") + off_lbl = QtWidgets.QLabel("Offset V") + self.off = QtWidgets.QLineEdit("0.0") + ph_lbl = QtWidgets.QLabel("Phase deg") + self.ph = QtWidgets.QLineEdit("0") + + self.btn_apply = QtWidgets.QPushButton("Apply") + self.btn_read = QtWidgets.QPushButton("Readback") + self.btn_on = QtWidgets.QPushButton("Output ON") + self.btn_off = QtWidgets.QPushButton("Output OFF") + + # Preset controls + preset_lbl = QtWidgets.QLabel("Preset #") + self.preset_num = QtWidgets.QSpinBox(); self.preset_num.setRange(1, 10); self.preset_num.setValue(1) + self.btn_store = QtWidgets.QPushButton("Store") + self.btn_recall = QtWidgets.QPushButton("Recall") + + grid = QtWidgets.QGridLayout(); row = 0 + grid.addWidget(wave_lbl, row, 0); grid.addWidget(self.wave, row, 1) + grid.addWidget(frq_lbl, row, 2); grid.addWidget(self.frq, row, 3); row += 1 + grid.addWidget(amp_lbl, row, 0); grid.addWidget(self.amp, row, 1) + grid.addWidget(off_lbl, row, 2); grid.addWidget(self.off, row, 3); row += 1 + grid.addWidget(ph_lbl, row, 0); grid.addWidget(self.ph, row, 1); row += 1 + grid.addWidget(self.btn_apply, row, 0) + grid.addWidget(self.btn_read, row, 1) + grid.addWidget(self.btn_on, row, 2) + grid.addWidget(self.btn_off, row, 3); row += 1 + + grid.addWidget(preset_lbl, row, 0) + grid.addWidget(self.preset_num, row, 1) + grid.addWidget(self.btn_store, row, 2) + grid.addWidget(self.btn_recall, row, 3) + self.setLayout(grid) + + self.btn_apply.clicked.connect(self._emit_apply) + self.btn_read.clicked.connect(lambda: self.readback_basic.emit()) + self.btn_on.clicked.connect(lambda: self.toggle_output.emit(True)) + self.btn_off.clicked.connect(lambda: self.toggle_output.emit(False)) + self.btn_store.clicked.connect(lambda: self.store_slot.emit(self.preset_num.value())) + self.btn_recall.clicked.connect(lambda: self.recall_slot.emit(self.preset_num.value())) + + def _emit_apply(self): + cfg = dict( + WVTP=self.wave.currentText(), + FRQ=human_to_eng(self.frq.text()), + AMP=human_to_eng(self.amp.text()), + OFST=human_to_eng(self.off.text()), + PHSE=human_to_eng(self.ph.text()), + ) + self.apply_basic.emit(cfg) + +class PresetsTab(QtWidgets.QWidget): + rename_slot = QtCore.pyqtSignal(int, str) + view_refresh = QtCore.pyqtSignal() + + def __init__(self): + super().__init__() + self._build() + + def _build(self): + self.table = QtWidgets.QTableWidget(10, 3) + self.table.setHorizontalHeaderLabels(["Slot", "Name", "Summary"]) + self.table.horizontalHeader().setStretchLastSection(True) + self.table.verticalHeader().setVisible(False) + for i in range(10): + self.table.setItem(i, 0, QtWidgets.QTableWidgetItem(str(i+1))) + self.table.item(i, 0).setFlags(QtCore.Qt.ItemIsEnabled) + self.table.setItem(i, 1, QtWidgets.QTableWidgetItem("")) + self.table.setItem(i, 2, QtWidgets.QTableWidgetItem("")) + self.table.item(i, 2).setFlags(QtCore.Qt.ItemIsEnabled) + + self.btn_load = QtWidgets.QPushButton("Reload File") + self.btn_save = QtWidgets.QPushButton("Save Names") + self.btn_open = QtWidgets.QPushButton("Open File...") + self.path_label = QtWidgets.QLabel(PRESET_FILE) + + layout = QtWidgets.QVBoxLayout() + hl = QtWidgets.QHBoxLayout() + hl.addWidget(self.btn_load); hl.addWidget(self.btn_save); hl.addWidget(self.btn_open); hl.addStretch(1); hl.addWidget(self.path_label) + layout.addLayout(hl) + layout.addWidget(self.table) + self.setLayout(layout) + + self.btn_load.clicked.connect(lambda: self.view_refresh.emit()) + self.btn_save.clicked.connect(self._save_names) + self.btn_open.clicked.connect(self._open_file) + + def _open_file(self): + path, _ = QtWidgets.QFileDialog.getOpenFileName(self, "Open Preset File", PRESET_FILE, "Text (*.txt *.dat);;All Files (*)") + if not path: + return + self.path_label.setText(path) + self.view_refresh.emit() + + def _save_names(self): + for i in range(10): + name = self.table.item(i, 1).text().strip() + self.rename_slot.emit(i+1, name) + + def update_from_presets(self, d: dict): + for i in range(1, 11): + name = d.get(f"SLOT{i}_NAME", "") + b = d.get(f"SLOT{i}_BSWV", "") + outp = d.get(f"SLOT{i}_OUTP", "") + summ = "" + if b: + up = b.upper() + fields = [] + for key in ("WVTP,", "FRQ,", "AMP,", "OFST,"): + idx = up.find(key) + if idx >= 0: + val = b[idx+len(key):] + val = val.split(",")[0] + fields.append(f"{key[:-1]}={val}") + summ = " ".join(fields) + if outp: + summ += (" " if summ else "") + f"OUTP={outp}" + self.table.item(i-1, 1).setText(name) + self.table.item(i-1, 2).setText(summ) + +# -------------------- Other tabs (Burst, Sweep, ARB, CLI) -------------------- +class BurstTab(QtWidgets.QWidget): + apply_burst = QtCore.pyqtSignal(dict) + readback_burst = QtCore.pyqtSignal() + + def __init__(self): + super().__init__() + self._build() + + def _build(self): + state_lbl = QtWidgets.QLabel("State") + self.state = QtWidgets.QComboBox(); self.state.addItems(["OFF", "ON"]) + mode_lbl = QtWidgets.QLabel("Mode") + self.mode = QtWidgets.QComboBox(); self.mode.addItems(["TRIG", "GAT"]) + src_lbl = QtWidgets.QLabel("Source") + self.src = QtWidgets.QComboBox(); self.src.addItems(["INT", "EXT"]) + ncy_lbl = QtWidgets.QLabel("Cycles") + self.ncy = QtWidgets.QLineEdit("5") + dly_lbl = QtWidgets.QLabel("Delay s") + self.dly = QtWidgets.QLineEdit("0") + gpol_lbl = QtWidgets.QLabel("Gate Pol") + self.gpol = QtWidgets.QComboBox(); self.gpol.addItems(["POS", "NEG"]) + + self.btn_apply = QtWidgets.QPushButton("Apply Burst") + self.btn_read = QtWidgets.QPushButton("Readback Burst") + + grid = QtWidgets.QGridLayout(); row = 0 + grid.addWidget(state_lbl, row, 0); grid.addWidget(self.state, row, 1) + grid.addWidget(mode_lbl, row, 2); grid.addWidget(self.mode, row, 3); row += 1 + grid.addWidget(src_lbl, row, 0); grid.addWidget(self.src, row, 1) + grid.addWidget(ncy_lbl, row, 2); grid.addWidget(self.ncy, row, 3); row += 1 + grid.addWidget(dly_lbl, row, 0); grid.addWidget(self.dly, row, 1) + grid.addWidget(gpol_lbl, row, 2); grid.addWidget(self.gpol, row, 3); row += 1 + grid.addWidget(self.btn_apply, row, 0) + grid.addWidget(self.btn_read, row, 1) + self.setLayout(grid) + + self.btn_apply.clicked.connect(self._emit_apply) + self.btn_read.clicked.connect(lambda: self.readback_burst.emit()) + + def _emit_apply(self): + try: + ncy_int = int(float(self.ncy.text())) + except Exception: + ncy_int = 1 + cfg = dict( + STATE=self.state.currentText(), + MODE=self.mode.currentText(), + TRSR=self.src.currentText(), + NCYC=str(ncy_int), + DLAY=human_to_eng(self.dly.text()), + GATEPOL=self.gpol.currentText(), + ) + self.apply_burst.emit(cfg) + +class SweepTab(QtWidgets.QWidget): + apply_sweep = QtCore.pyqtSignal(dict) + readback_sweep = QtCore.pyqtSignal() + + def __init__(self): + super().__init__() + self._build() + + def _build(self): + state_lbl = QtWidgets.QLabel("State") + self.state = QtWidgets.QComboBox(); self.state.addItems(["OFF", "ON"]) + typ_lbl = QtWidgets.QLabel("Type") + self.typ = QtWidgets.QComboBox(); self.typ.addItems(["LIN", "LOG"]) + fstart_lbl = QtWidgets.QLabel("Start Hz") + self.fstart = QtWidgets.QLineEdit("1 kHz") + fstop_lbl = QtWidgets.QLabel("Stop Hz") + self.fstop = QtWidgets.QLineEdit("10 kHz") + time_lbl = QtWidgets.QLabel("Time s") + self.time = QtWidgets.QLineEdit("1.0") + dir_lbl = QtWidgets.QLabel("Direction") + self.direction = QtWidgets.QComboBox(); self.direction.addItems(["UP", "DOWN"]) + src_lbl = QtWidgets.QLabel("Source") + self.src = QtWidgets.QComboBox(); self.src.addItems(["INT", "EXT"]) + + self.btn_apply = QtWidgets.QPushButton("Apply Sweep") + self.btn_read = QtWidgets.QPushButton("Readback Sweep") + + grid = QtWidgets.QGridLayout(); row = 0 + grid.addWidget(state_lbl, row, 0); grid.addWidget(self.state, row, 1) + grid.addWidget(typ_lbl, row, 2); grid.addWidget(self.typ, row, 3); row += 1 + grid.addWidget(fstart_lbl, row, 0); grid.addWidget(self.fstart, row, 1) + grid.addWidget(fstop_lbl, row, 2); grid.addWidget(self.fstop, row, 3); row += 1 + grid.addWidget(time_lbl, row, 0); grid.addWidget(self.time, row, 1) + grid.addWidget(dir_lbl, row, 2); grid.addWidget(self.direction, row, 3); row += 1 + grid.addWidget(src_lbl, row, 0); grid.addWidget(self.src, row, 1); row += 1 + grid.addWidget(self.btn_apply, row, 0) + grid.addWidget(self.btn_read, row, 1) + self.setLayout(grid) + + self.btn_apply.clicked.connect(self._emit_apply) + self.btn_read.clicked.connect(lambda: self.readback_sweep.emit()) + + def _emit_apply(self): + cfg = dict( + STATE=self.state.currentText(), + WAV=self.typ.currentText(), + STAR=human_to_eng(self.fstart.text()), + STOP=human_to_eng(self.fstop.text()), + TIME=human_to_eng(self.time.text()), + DIR=self.direction.currentText(), + TRSR=self.src.currentText(), + ) + self.apply_sweep.emit(cfg) + +class ARBTab(QtWidgets.QWidget): + refresh_lists = QtCore.pyqtSignal() + set_wave = QtCore.pyqtSignal(str) + upload_wave = QtCore.pyqtSignal(str, str) + download_wave = QtCore.pyqtSignal(str, str) + + def __init__(self): + super().__init__() + self._build() + + def _build(self): + self.built_in = QtWidgets.QListWidget() + self.user = QtWidgets.QListWidget() + refresh = QtWidgets.QPushButton("Refresh") + set_btn = QtWidgets.QPushButton("Set Selected to Channel") + upload_btn = QtWidgets.QPushButton("Upload .bin/.csv → USER") + download_btn = QtWidgets.QPushButton("Download USER → file") + self.name_edit = QtWidgets.QLineEdit(); self.name_edit.setPlaceholderText("User name (optional)") + + grid = QtWidgets.QGridLayout() + grid.addWidget(QtWidgets.QLabel("BUILDIN"), 0, 0) + grid.addWidget(QtWidgets.QLabel("USER"), 0, 1) + grid.addWidget(self.built_in, 1, 0) + grid.addWidget(self.user, 1, 1) + grid.addWidget(refresh, 2, 0) + grid.addWidget(set_btn, 2, 1) + grid.addWidget(self.name_edit, 3, 0) + grid.addWidget(upload_btn, 3, 1) + grid.addWidget(download_btn, 4, 1) + self.setLayout(grid) + + refresh.clicked.connect(lambda: self.refresh_lists.emit()) + set_btn.clicked.connect(self._emit_set) + upload_btn.clicked.connect(self._emit_upload) + download_btn.clicked.connect(self._emit_download) + + def selected_name(self) -> str: + w = self.user.currentItem() or self.built_in.currentItem() + return w.text().strip() if w else "" + + def _emit_set(self): + name = self.selected_name() + if name: + self.set_wave.emit(name) + + def _emit_upload(self): + path, _ = QtWidgets.QFileDialog.getOpenFileName(self, "Select ARB file", "", "ARB Files (*.bin *.csv);;All Files (*)") + if not path: + return + name = self.name_edit.text().strip() + if not name: + name = os.path.splitext(os.path.basename(path))[0] + self.upload_wave.emit(name, path) + + def _emit_download(self): + item = self.user.currentItem() + if not item: + return + name = item.text().strip() + path, _ = QtWidgets.QFileDialog.getSaveFileName(self, "Save ARB file", f"{name}.bin", "Binary (*.bin);;All Files (*)") + if not path: + return + self.download_wave.emit(name, path) + +class CLITab(QtWidgets.QWidget): + send_cmd = QtCore.pyqtSignal(str) + query_cmd = QtCore.pyqtSignal(str) + + def __init__(self): + super().__init__() + self._build() + + def _build(self): + self.inp = QtWidgets.QLineEdit() + self.inp.setPlaceholderText("Enter SCPI, e.g. C1:BSWV? or *IDN?") + btn_send = QtWidgets.QPushButton("Send") + btn_query = QtWidgets.QPushButton("Query") + self.out = QtWidgets.QPlainTextEdit(); self.out.setReadOnly(True); self.out.setMinimumHeight(220) + + grid = QtWidgets.QGridLayout(); row = 0 + grid.addWidget(QtWidgets.QLabel("Command"), row, 0) + grid.addWidget(self.inp, row, 1, 1, 4) + grid.addWidget(btn_send, row, 5) + grid.addWidget(btn_query, row, 6); row += 1 + grid.addWidget(self.out, row, 0, 1, 7) + self.setLayout(grid) + + btn_send.clicked.connect(self._do_send) + btn_query.clicked.connect(self._do_query) + self.inp.returnPressed.connect(self._do_query) + + def append(self, s: str): + self.out.appendPlainText(s) + + def _do_send(self): + cmd = self.inp.text().strip() + if cmd: + self.send_cmd.emit(cmd) + + def _do_query(self): + cmd = self.inp.text().strip() + if cmd: + self.query_cmd.emit(cmd) + +# -------------------- Main Window -------------------- +class Main(QtWidgets.QWidget): + def __init__(self, preset_ip=None): + super().__init__() + self.setWindowTitle(Window_Name) + self.i = SDGLan() + self._scr_worker = None + self._build(preset_ip) + + def _build(self, preset_ip): + ip_lbl = QtWidgets.QLabel("IP") + self.ip = QtWidgets.QLineEdit(preset_ip or "") + if not preset_ip: + self.ip.setPlaceholderText("e.g. 192.168.1.120") + port_lbl = QtWidgets.QLabel("Port") + self.port = QtWidgets.QLineEdit(str(DEFAULT_PORT)) + self.btn_connect = QtWidgets.QPushButton("Connect") + self.btn_idn = QtWidgets.QPushButton("IDN?") + self.btn_err = QtWidgets.QPushButton("SYST:ERR?") + self.btn_scr = QtWidgets.QPushButton("Screenshot") + ch_lbl = QtWidgets.QLabel("Channel") + self.ch = QtWidgets.QComboBox(); self.ch.addItems(["C1", "C2"]) + + self.tabs = QtWidgets.QTabWidget() + self.t_basic = BasicTab(); self.tabs.addTab(self.t_basic, "Basic") + self.t_burst = BurstTab(); self.tabs.addTab(self.t_burst, "Burst") + self.t_sweep = SweepTab(); self.tabs.addTab(self.t_sweep, "Sweep") + self.t_arb = ARBTab(); self.tabs.addTab(self.t_arb, "ARB Manager") + self.t_cli = CLITab(); self.tabs.addTab(self.t_cli, "SCPI CLI") + self.t_presets = PresetsTab(); self.tabs.addTab(self.t_presets, "Presets") + + self.log = QtWidgets.QPlainTextEdit(); self.log.setReadOnly(True); self.log.setMinimumHeight(220) + + top = QtWidgets.QGridLayout(); row = 0 + top.addWidget(ip_lbl, row, 0); top.addWidget(self.ip, row, 1, 1, 3) + top.addWidget(port_lbl, row, 4); top.addWidget(self.port, row, 5) + top.addWidget(self.btn_connect, row, 6); row += 1 + top.addWidget(self.btn_idn, row, 0); top.addWidget(self.btn_err, row, 1); top.addWidget(self.btn_scr, row, 2) + top.addWidget(ch_lbl, row, 3); top.addWidget(self.ch, row, 4); row += 1 + top.addWidget(self.tabs, row, 0, 1, 7); row += 1 + top.addWidget(self.log, row, 0, 1, 7) + self.setLayout(top) + + # wiring + self.btn_connect.clicked.connect(self.on_connect) + self.btn_idn.clicked.connect(self.on_idn) + self.btn_err.clicked.connect(self.on_err) + self.btn_scr.clicked.connect(self.on_screenshot) + + self.t_basic.apply_basic.connect(self.on_apply_basic) + self.t_basic.readback_basic.connect(self.on_readback_basic) + self.t_basic.toggle_output.connect(self.on_output) + self.t_basic.store_slot.connect(self.on_store_slot) + self.t_basic.recall_slot.connect(self.on_recall_slot) + + self.t_burst.apply_burst.connect(self.on_apply_burst) + self.t_burst.readback_burst.connect(self.on_readback_burst) + self.t_sweep.apply_sweep.connect(self.on_apply_sweep) + self.t_sweep.readback_sweep.connect(self.on_readback_sweep) + self.t_arb.refresh_lists.connect(self.on_arb_refresh) + self.t_arb.set_wave.connect(self.on_arb_set) + self.t_arb.upload_wave.connect(self.on_arb_upload) + self.t_arb.download_wave.connect(self.on_arb_download) + self.t_cli.send_cmd.connect(self.on_cli_send) + self.t_cli.query_cmd.connect(self.on_cli_query) + + self.t_presets.rename_slot.connect(self.on_rename_slot) + self.t_presets.view_refresh.connect(self.refresh_presets_view) + + ensure_preset_file(PRESET_FILE) + self.refresh_presets_view() + + def logln(self, s: str): + self.log.appendPlainText(s) + + def _pref(self) -> str: + return self.ch.currentText() + ":" + + # ---------- Top actions ---------- + def on_connect(self): + try: + addr = self.ip.text().strip(); port = int(self.port.text().strip()) + self.i.connect(addr, port) + self.logln(f"Connected to {addr}:{port}") + except Exception as e: + self.logln(f"[ERR] connect: {e}") + + def on_idn(self): + try: + self.logln(f"*IDN? -> {self.i.query('*IDN?')}") + except Exception as e: + self.logln(f"[ERR] IDN: {e}") + + def on_err(self): + try: + self.logln(f"SYST:ERR? -> {self.i.query('SYST:ERR?')}") + except Exception as e: + self.logln(f"[ERR] SYST:ERR?: {e}") + + # ---------- Screenshot (threaded) ---------- + def on_screenshot(self): + if not self.i.sock: + self.logln("[ERR] screenshot: not connected") + return + if self._scr_worker and self._scr_worker.isRunning(): + self.logln("[ERR] screenshot already running") + return + self.btn_scr.setEnabled(False) + self.setCursor(QtCore.Qt.WaitCursor) + self._scr_worker = ScreenshotWorker(self.i, self) + self._scr_worker.done.connect(self._on_screenshot_done) + self._scr_worker.start() + + def _on_screenshot_done(self, data: bytes, fn: str, err: str): + self.btn_scr.setEnabled(True) + self.unsetCursor() + if err: + self.logln(f"[ERR] screenshot: {err}") + return + try: + with open(fn, "wb") as f: + f.write(data) + self.logln(f"Screenshot saved to {fn} ({len(data)} bytes)") + img = QtGui.QImage(fn) + if img.isNull(): + self.logln("[ERR] screenshot: Qt could not decode image; file kept") + return + pixmap = QtGui.QPixmap.fromImage(img) + lbl = QtWidgets.QLabel(); lbl.setPixmap(pixmap) + w = QtWidgets.QWidget(); l = QtWidgets.QVBoxLayout(w); l.addWidget(lbl) + w.setWindowTitle("SDG2042X Screenshot") + w.resize(pixmap.width(), pixmap.height()) + w.show() + self._scr_win = w + except Exception as e: + self.logln(f"[ERR] screenshot save/display: {e}") + + # ---------- Basic ---------- + def on_apply_basic(self, cfg: dict): + try: + cmd = (f"{self._pref()}BSWV " + f"WVTP,{cfg['WVTP']},FRQ,{cfg['FRQ']},AMP,{cfg['AMP']}," + f"OFST,{cfg['OFST']},PHSE,{cfg['PHSE']}") + self.i.write(cmd) + self.logln(f"SET: {cmd}") + except Exception as e: + self.logln(f"[ERR] basic apply: {e}") + + def on_readback_basic(self): + try: + rb = self.i.query(f"{self._pref()}BSWV?") + st = self.i.query(f"{self._pref()}OUTP?") + self.logln(f"{self._pref()}BSWV? -> {rb}") + self.logln(f"{self._pref()}OUTP? -> {st}") + except Exception as e: + self.logln(f"[ERR] basic readback: {e}") + + def on_output(self, turn_on: bool): + try: + cmd = f"{self._pref()}OUTP {'ON' if turn_on else 'OFF'}" + self.i.write(cmd) + st = self.i.query(f"{self._pref()}OUTP?") + self.logln(f"SET: {cmd}") + self.logln(f"{self._pref()}OUTP? -> {st}") + except Exception as e: + self.logln(f"[ERR] output: {e}") + + # ---------- Burst ---------- + def on_apply_burst(self, cfg: dict): + try: + parts = [ + f"STATE,{cfg['STATE']}", + f"MODE,{cfg['MODE']}", + f"TRSR,{cfg['TRSR']}", + f"NCYC,{cfg['NCYC']}", + f"DLAY,{cfg['DLAY']}", + f"GATEPOL,{cfg['GATEPOL']}", + ] + cmd = f"{self._pref()}BTWV " + ",".join(parts) + self.i.write(cmd) + self.logln(f"SET: {cmd}") + except Exception as e: + self.logln(f"[ERR] burst apply: {e}") + + def on_readback_burst(self): + try: + rb = self.i.query(f"{self._pref()}BTWV?") + self.logln(f"{self._pref()}BTWV? -> {rb}") + except Exception as e: + self.logln(f"[ERR] burst readback: {e}") + + # ---------- Sweep ---------- + def on_apply_sweep(self, cfg: dict): + try: + parts = [ + f"STATE,{cfg['STATE']}", + f"WAV,{cfg['WAV']}", + f"STAR,{cfg['STAR']}", + f"STOP,{cfg['STOP']}", + f"TIME,{cfg['TIME']}", + f"DIR,{cfg['DIR']}", + f"TRSR,{cfg['TRSR']}", + ] + cmd = f"{self._pref()}SWWV " + ",".join(parts) + self.i.write(cmd) + err = self.i.query("SYST:ERR?") + if not err.startswith("0"): + parts[1] = f"SPAC,{cfg['WAV']}" # fallback for older FW + cmd2 = f"{self._pref()}SWWV " + ",".join(parts) + self.i.write(cmd2) + self.logln(f"SET (fallback): {cmd2}") + else: + self.logln(f"SET: {cmd}") + except Exception as e: + self.logln(f"[ERR] sweep apply: {e}") + + def on_readback_sweep(self): + try: + rb = self.i.query(f"{self._pref()}SWWV?") + self.logln(f"{self._pref()}SWWV? -> {rb}") + except Exception as e: + self.logln(f"[ERR] sweep readback: {e}") + + # ---------- ARB ops ---------- + def on_arb_refresh(self): + try: + b = self.i.query("STL? BUILDIN") + u = self.i.query("STL? USER") + def parse_list(s: str): + parts = [p.strip() for p in s.replace(";", ",").split(',') if p.strip()] + return parts + b_list = parse_list(b) + u_list = parse_list(u) + self.t_arb.built_in.clear(); self.t_arb.user.clear() + self.t_arb.built_in.addItems(b_list) + self.t_arb.user.addItems(u_list) + self.logln(f"BUILDIN: {b_list}") + self.logln(f"USER: {u_list}") + except Exception as e: + self.logln(f"[ERR] ARB refresh: {e}") + + def on_arb_set(self, name: str): + try: + qname = quote_name(name) + self.i.write(f"{self._pref()}BSWV WVTP,ARB") + self.i.write(f"{self._pref()}ARWV NAME,{qname}") + rb = self.i.query(f"{self._pref()}ARWV?") + self.logln(f"SET ARB: {qname}; ARWV? -> {rb}") + except Exception as e: + self.logln(f"[ERR] ARB set: {e}") + + def _make_scpi_block(self, payload: bytes) -> bytes: + n = len(payload) + len_digits = str(len(str(n))).encode('ascii') + header = b'#' + len_digits + str(n).encode('ascii') + return header + payload + + def on_arb_upload(self, name: str, path: str): + try: + with open(path, 'rb') as f: + data = f.read() + block = self._make_scpi_block(data) + qname = quote_name(name) + prefix = f"WVDT USER,{qname},".encode('ascii') + self.i.write_bytes(prefix + block + b"\n") + self.logln(f"Uploaded USER,{qname} ({len(data)} bytes)") + except Exception as e: + self.logln(f"[ERR] ARB upload: {e}") + + def on_arb_download(self, name: str, savepath: str): + try: + qname = quote_name(name) + raw = self.i.query_raw(f"WVDT? USER,{qname}") + idx = raw.find(b"WAVEDATA,") + if idx == -1: + raise RuntimeError("No WAVEDATA header") + blk = raw[idx + len(b"WAVEDATA,"):] + if not blk or blk[0:1] != b'#': + raise RuntimeError("No SCPI block after WAVEDATA,") + ndig = int(chr(blk[1])) + start = 2 + blen = int(blk[start:start+ndig].decode('ascii')) + start += ndig + payload = bytearray(blk[start:start+blen]) + need = blen - len(payload) + while need > 0: + try: + chunk = self.i.sock.recv(min(65536, need)) + if not chunk: + break + payload.extend(chunk) + need -= len(chunk) + except socket.timeout: + break + with open(savepath, 'wb') as f: + f.write(bytes(payload)) + self.logln(f"Downloaded USER,{qname} -> {savepath} ({len(payload)} bytes)") + except Exception as e: + self.logln(f"[ERR] ARB download: {e}") + + # ---------- CLI tab ---------- + def on_cli_send(self, cmd: str): + try: + self.i.write(cmd) + self.t_cli.append(f"> {cmd}") + except Exception as e: + self.t_cli.append(f"[ERR] send: {e}") + + def on_cli_query(self, cmd: str): + try: + resp = self.i.query(cmd) + self.t_cli.append(f"? {cmd}\n< {resp}") + except Exception as e: + self.t_cli.append(f"[ERR] query: {e}") + + # ---------- Presets ---------- + def on_store_slot(self, slot: int): + try: + if slot < 1 or slot > 10: + raise ValueError("slot out of range") + bswv = self.i.query(f"{self._pref()}BSWV?") + outp = self.i.query(f"{self._pref()}OUTP?") + outp_state = "ON" if "ON" in outp.upper().split(",")[0] else "OFF" + d = read_presets(PRESET_FILE) + d[f"SLOT{slot}_BSWV"] = bswv + d[f"SLOT{slot}_OUTP"] = outp_state + write_presets(PRESET_FILE, d) + self.logln(f"Stored slot {slot}: BSWV len={len(bswv)}, OUTP={outp_state}") + self.refresh_presets_view() + except Exception as e: + self.logln(f"[ERR] store slot {slot}: {e}") + + def on_recall_slot(self, slot: int): + try: + if slot < 1 or slot > 10: + raise ValueError("slot out of range") + d = read_presets(PRESET_FILE) + b = d.get(f"SLOT{slot}_BSWV", "").strip() + o = d.get(f"SLOT{slot}_OUTP", "").strip().upper() + if not b: + raise RuntimeError("empty BSWV") + self.i.write(b) + if o in ("ON", "OFF"): + self.i.write(f"{self._pref()}OUTP {o}") + self.logln(f"Recalled slot {slot}: applied BSWV and OUTP={o or 'N/A'}") + except Exception as e: + self.logln(f"[ERR] recall slot {slot}: {e}") + + def on_rename_slot(self, slot: int, name: str): + try: + d = read_presets(PRESET_FILE) + d[f"SLOT{slot}_NAME"] = name + write_presets(PRESET_FILE, d) + self.refresh_presets_view() + except Exception as e: + self.logln(f"[ERR] rename slot {slot}: {e}") + + def refresh_presets_view(self): + try: + d = read_presets(PRESET_FILE) + self.t_presets.update_from_presets(d) + except Exception as e: + self.logln(f"[ERR] refresh presets: {e}") + +# -------------------- CLI entry -------------------- +def show_help(): + print("Usage: sdg2042x_gui.py [options]\n") + print("Options:") + print(" -ip , --ip Prefill IP address field") + print(" -h, --help Show this help message") + +if __name__ == "__main__": + preset_ip = None + argv = sys.argv[1:] + for i, tok in enumerate(argv): + if tok in ("-h", "--help"): + show_help(); sys.exit(0) + if tok in ("-ip", "--ip") and i + 1 < len(argv): + preset_ip = argv[i + 1] + app = QtWidgets.QApplication(sys.argv) + m = Main(preset_ip=preset_ip) + m.resize(1100, 680) + m.show() + sys.exit(app.exec_())