#!/usr/bin/env python3 # Siglent SDG2042X Linux GUI (PyQt5) # Features: # Basic Signals, including Burst, Sweep context-aware parameter settings # - ARB Manager, SCPI CLI, # - Storing Presets, storing to editable preset file SDG2042x.dat # - path setting for screenshot and preset file # - timestamped Screenshot # # Author: Thomas Gohle / tgohle@togo-lab.io # www.togo-lab.io # tgonet.de / gohle.de # # Copyright: CC BY-SA 4.0 # https://creativecommons.org/licenses/by-sa/4.0/ # # Version: 0.1 - 2025-10-2025 import os import socket import sys import time from PyQt5 import QtWidgets, QtCore, QtGui # -------------------- Constants -------------------- Window_Name = "-= WaveForm Generator [SDG2042X] GUI by ToGo-Lab =-" DEFAULT_PORT = 5025 SOCKET_TIMEOUT = 4.0 SIGLENT_WAVES = ["SINE", "SQUARE", "RAMP", "PULSE", "NOISE", "ARB", "DC"] CONFIG_FILE = "SDG2042x.config" PRESET_BASENAME = "SDG2042x.dat" # -------------------- Config helpers -------------------- def _expand(p: str) -> str: return os.path.abspath(os.path.expanduser(os.path.expandvars(p))) def ensure_dir(path: str): if path and not os.path.isdir(path): os.makedirs(path, exist_ok=True) def load_config(path: str) -> dict: d = {"PRESET_DIR": os.getcwd(), "SCREENSHOT_DIR": os.getcwd()} try: with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#") or "=" not in line: continue k, v = line.split("=", 1) d[k.strip().upper()] = v.strip() except FileNotFoundError: pass d["PRESET_DIR"] = _expand(d.get("PRESET_DIR", os.getcwd())) d["SCREENSHOT_DIR"] = _expand(d.get("SCREENSHOT_DIR", os.getcwd())) return d def save_config(path: str, d: dict): lines = [ "# SDG2042x.config", f"PRESET_DIR={d.get('PRESET_DIR','')}", f"SCREENSHOT_DIR={d.get('SCREENSHOT_DIR','')}", "" ] with open(path, "w", encoding="utf-8") as f: f.write("\n".join(lines)) # -------------------- Transport -------------------- class SDGLan: def __init__(self): self.sock = None self.addr = None self.port = DEFAULT_PORT def connect(self, addr, port=DEFAULT_PORT): self.close() self.addr = addr self.port = port self.sock = socket.create_connection((addr, port), timeout=SOCKET_TIMEOUT) self.sock.settimeout(SOCKET_TIMEOUT) def close(self): try: if self.sock: self.sock.close() finally: self.sock = None def write(self, cmd: str): if not self.sock: raise RuntimeError("Not connected") if not cmd.endswith("\n"): cmd += "\n" self.sock.sendall(cmd.encode("ascii")) def write_bytes(self, payload: bytes): if not self.sock: raise RuntimeError("Not connected") self.sock.sendall(payload) def drain(self): """Read and discard any bytes until timeout. Safe if nothing pending.""" try: _ = self._recv_until_timeout() except Exception: pass def query(self, cmd: str) -> str: self.write(cmd) data = self._recv_line() return data.strip() def query_retry(self, cmd: str, retries: int = 2) -> str: """Query with pre-drain and retry on socket.timeout.""" last_err = None for _ in range(retries + 1): try: self.drain() self.write(cmd) data = self._recv_line() return data.strip() except socket.timeout as e: last_err = e try: self.write("*CLS") except Exception: pass continue if last_err: raise last_err return "" def query_raw(self, cmd: str) -> bytes: self.write(cmd) return self._recv_until_timeout() # ---- block helpers ---- def _recv_exact(self, n: int) -> bytes: buf = bytearray() while len(buf) < n: chunk = self.sock.recv(n - len(buf)) if not chunk: break buf.extend(chunk) return bytes(buf) def _recv_exact_capped(self, n: int, cap: int) -> bytes: if n > cap: raise RuntimeError(f"SCPI block length {n} exceeds cap {cap}") return self._recv_exact(n) def query_block(self, cmd: str, timeout_s: float = 8.0, cap: int = 25_000_000) -> bytes: if not self.sock: raise RuntimeError("Not connected") prev_to = self.sock.gettimeout() try: self.sock.settimeout(timeout_s) self.write(cmd) first = self._recv_exact(1) while first and first != b"#": first = self._recv_exact(1) if not first: return b"" ndig_b = self._recv_exact(1) if not ndig_b or not ndig_b.isdigit(): return b"" ndig = int(ndig_b.decode("ascii")) len_str = self._recv_exact(ndig).decode("ascii") if not len_str.isdigit(): return b"" length = int(len_str) payload = self._recv_exact_capped(length, cap) return payload finally: try: self.sock.settimeout(prev_to) except Exception: pass def query_scdp(self, timeout_s: float = 15.0, cap: int = 50_000_000) -> bytes: if not self.sock: raise RuntimeError("Not connected") prev_to = self.sock.gettimeout() try: self.sock.settimeout(timeout_s) self.write("SCDP") first2 = self._recv_exact(2) if not first2: return b"" if first2 == b"BM": size_bytes = self._recv_exact(4) if len(size_bytes) != 4: return b"" total_size = int.from_bytes(size_bytes, "little") rest = self._recv_exact_capped(total_size - 6, cap) return first2 + size_bytes + rest if first2[:1] == b"#": ndig = int(first2[1:2].decode()) len_str = self._recv_exact(ndig).decode("ascii") if not len_str.isdigit(): return b"" length = int(len_str) payload = self._recv_exact_capped(length, cap) return payload tail = self._recv_until_timeout() return first2 + tail finally: try: self.sock.settimeout(prev_to) except Exception: pass def _recv_line(self) -> str: chunks = [] while True: b = self.sock.recv(4096) if not b: break chunks.append(b) if b.endswith(b"\n"): break return b"".join(chunks).decode("ascii", errors="ignore") def _recv_until_timeout(self) -> bytes: data = bytearray() while True: try: b = self.sock.recv(65536) if not b: break data.extend(b) except socket.timeout: break return bytes(data) # -------------------- Utilities -------------------- def human_to_eng(s: str) -> str: t = s.strip().lower().replace("vpp", "").replace("v", "") t = t.replace("hz", "").replace("deg", "") mult = 1.0 if t.endswith("ms"): mult = 1e-3; t = t[:-2] elif t.endswith("us"): mult = 1e-6; t = t[:-2] elif t.endswith("ns"): mult = 1e-9; t = t[:-2] elif t.endswith("s"): mult = 1.0; t = t[:-1] elif t.endswith("khz"): mult = 1e3; t = t[:-3] elif t.endswith("mhz"): mult = 1e6; t = t[:-3] elif t.endswith("ghz"): mult = 1e9; t = t[:-3] elif t.endswith("k"): mult = 1e3; t = t[:-1] elif t.endswith("m") and not t.endswith("mm"): mult = 1e-3; t = t[:-1] elif t.endswith("u"): mult = 1e-6; t = t[:-1] elif t.endswith("n"): mult = 1e-9; t = t[:-1] elif t.endswith("g"): mult = 1e9; t = t[:-1] try: val = float(t) except ValueError: return s.strip() return f"{val*mult}" def quote_name(name: str) -> str: safe = all(ch.isalnum() or ch in ("_", "-") for ch in name) return name if safe else '"%s"' % name def ensure_preset_file(path: str): if not os.path.exists(path): with open(path, "w", encoding="utf-8") as f: f.write("# SDG2042X presets\n") for i in range(1, 11): f.write(f"SLOT{i}_NAME=\n") f.write(f"SLOT{i}_BSWV=\n") f.write(f"SLOT{i}_OUTP=\n") return path def read_presets(path: str) -> dict: ensure_preset_file(path) presets = {} with open(path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line or line.startswith("#"): continue if "=" not in line: continue k, v = line.split("=", 1) presets[k.strip()] = v.strip() return presets def write_presets(path: str, d: dict): lines = ["# SDG2042X presets"] for i in range(1, 11): name = d.get(f"SLOT{i}_NAME", "") bswv = d.get(f"SLOT{i}_BSWV", "") outp = d.get(f"SLOT{i}_OUTP", "") lines.append(f"SLOT{i}_NAME={name}") lines.append(f"SLOT{i}_BSWV={bswv}") lines.append(f"SLOT{i}_OUTP={outp}") with open(path, "w", encoding="utf-8") as f: f.write("\n".join(lines) + "\n") # -------------------- Screenshot worker (threaded) -------------------- class ScreenshotWorker(QtCore.QThread): done = QtCore.pyqtSignal(bytes, str, str) # data, filename, error def __init__(self, io: SDGLan, out_dir: str, parent=None): super().__init__(parent) self.io = io self.out_dir = out_dir def run(self): try: try: self.io.write("*CLS") except Exception: pass try: _ = self.io._recv_until_timeout() except Exception: pass data = self.io.query_scdp(timeout_s=20.0, cap=50_000_000) if not data: err = self.io.query_retry("SYST:ERR?") raise RuntimeError(f"no image data; last SYST:ERR? -> {err}") ts = time.strftime("%Y%m%d-%H%M%S") fn = f"{ts}_SDG2042X.bmp" if data.startswith(b"BM") else f"{ts}_SDG2042X.bin" ensure_dir(self.out_dir) full = os.path.join(self.out_dir, fn) self.done.emit(data, full, "") except Exception as e: self.done.emit(b"", "", str(e)) # -------------------- Tabs -------------------- class BasicTab(QtWidgets.QWidget): apply_basic = QtCore.pyqtSignal(dict) readback_basic = QtCore.pyqtSignal() toggle_output = QtCore.pyqtSignal(bool) store_slot = QtCore.pyqtSignal(int) recall_slot = QtCore.pyqtSignal(int) def __init__(self): super().__init__() self._build() def _build(self): # Row 1: waveform wave_lbl = QtWidgets.QLabel("Waveform") self.wave = QtWidgets.QComboBox(); self.wave.addItems(SIGLENT_WAVES) # Row 2: FRQ/PERI fp_lbl = QtWidgets.QLabel("Freq/Period") self.fp_mode = QtWidgets.QComboBox(); self.fp_mode.addItems(["FRQ", "PERI"]) self.fp_val = QtWidgets.QLineEdit("1 kHz") # Row 3: amplitude or levels amp_mode_lbl = QtWidgets.QLabel("Amplitude mode") self.amp_mode = QtWidgets.QComboBox(); self.amp_mode.addItems(["AMP", "H/L levels"]) self.amp = QtWidgets.QLineEdit("1.0") # AMP Vpp self.hlev = QtWidgets.QLineEdit("0.5") # HLEV self.llev = QtWidgets.QLineEdit("-0.5") # LLEV # Row 4: offset and phase off_lbl = QtWidgets.QLabel("Offset V") self.off = QtWidgets.QLineEdit("0.0") ph_lbl = QtWidgets.QLabel("Phase deg") self.ph = QtWidgets.QLineEdit("0") # Extras self.duty = QtWidgets.QLineEdit("50") # Square/Pulse self.sym = QtWidgets.QLineEdit("50") # Ramp self.width = QtWidgets.QLineEdit("0.001") # Pulse s self.rise = QtWidgets.QLineEdit("1e-6") # Pulse s self.fall = QtWidgets.QLineEdit("1e-6") # Pulse s self.dly = QtWidgets.QLineEdit("0") # Pulse s self.mean = QtWidgets.QLineEdit("0") # Noise self.stdev = QtWidgets.QLineEdit("0.1") self.band = QtWidgets.QCheckBox("Noise bandwidth limit") self.arb_name = QtWidgets.QLineEdit("") # ARB selection # Buttons self.btn_apply = QtWidgets.QPushButton("Apply") self.btn_read = QtWidgets.QPushButton("Readback") self.btn_on = QtWidgets.QPushButton("Output ON") self.btn_off = QtWidgets.QPushButton("Output OFF") # Presets preset_lbl = QtWidgets.QLabel("Preset #") self.preset_num = QtWidgets.QSpinBox(); self.preset_num.setRange(1, 10); self.preset_num.setValue(1) self.btn_store = QtWidgets.QPushButton("Store") self.btn_recall = QtWidgets.QPushButton("Recall") grid = QtWidgets.QGridLayout(); row = 0 grid.addWidget(wave_lbl, row, 0); grid.addWidget(self.wave, row, 1); row += 1 grid.addWidget(fp_lbl, row, 0); grid.addWidget(self.fp_mode, row, 1); grid.addWidget(self.fp_val, row, 2); row += 1 grid.addWidget(amp_mode_lbl, row, 0); grid.addWidget(self.amp_mode, row, 1) grid.addWidget(QtWidgets.QLabel("AMP Vpp"), row, 2); grid.addWidget(self.amp, row, 3) grid.addWidget(QtWidgets.QLabel("HLEV"), row, 4); grid.addWidget(self.hlev, row, 5) grid.addWidget(QtWidgets.QLabel("LLEV"), row, 6); grid.addWidget(self.llev, row, 7); row += 1 grid.addWidget(off_lbl, row, 0); grid.addWidget(self.off, row, 1) grid.addWidget(ph_lbl, row, 2); grid.addWidget(self.ph, row, 3); row += 1 # Extras rows grid.addWidget(QtWidgets.QLabel("DUTY %"), row, 0); grid.addWidget(self.duty, row, 1) grid.addWidget(QtWidgets.QLabel("SYM %"), row, 2); grid.addWidget(self.sym, row, 3) grid.addWidget(QtWidgets.QLabel("WIDTH s"), row, 4); grid.addWidget(self.width, row, 5); row += 1 grid.addWidget(QtWidgets.QLabel("RISE s"), row, 0); grid.addWidget(self.rise, row, 1) grid.addWidget(QtWidgets.QLabel("FALL s"), row, 2); grid.addWidget(self.fall, row, 3) grid.addWidget(QtWidgets.QLabel("DLY s"), row, 4); grid.addWidget(self.dly, row, 5); row += 1 grid.addWidget(QtWidgets.QLabel("NOISE MEAN"), row, 0); grid.addWidget(self.mean, row, 1) grid.addWidget(QtWidgets.QLabel("NOISE STDEV"), row, 2); grid.addWidget(self.stdev, row, 3) grid.addWidget(self.band, row, 4); row += 1 grid.addWidget(QtWidgets.QLabel("ARB name"), row, 0); grid.addWidget(self.arb_name, row, 1, 1, 3); row += 1 grid.addWidget(self.btn_apply, row, 0) grid.addWidget(self.btn_read, row, 1) grid.addWidget(self.btn_on, row, 2) grid.addWidget(self.btn_off, row, 3); row += 1 grid.addWidget(preset_lbl, row, 0) grid.addWidget(self.preset_num, row, 1) grid.addWidget(self.btn_store, row, 2) grid.addWidget(self.btn_recall, row, 3) grid.setColumnStretch(7, 1) self.setLayout(grid) # Signals self.btn_apply.clicked.connect(self._emit_apply) self.btn_read.clicked.connect(lambda: self.readback_basic.emit()) self.btn_on.clicked.connect(lambda: self.toggle_output.emit(True)) self.btn_off.clicked.connect(lambda: self.toggle_output.emit(False)) self.btn_store.clicked.connect(lambda: self.store_slot.emit(self.preset_num.value())) self.btn_recall.clicked.connect(lambda: self.recall_slot.emit(self.preset_num.value())) self.wave.currentTextChanged.connect(self._update_context) self.amp_mode.currentTextChanged.connect(self._update_amp_mode) self._update_context() self._update_amp_mode() def _update_amp_mode(self): amp_mode = self.amp_mode.currentText() show_levels = (amp_mode != "AMP") self.amp.setEnabled(not show_levels) self.hlev.setEnabled(show_levels) self.llev.setEnabled(show_levels) def _update_context(self): w = self.wave.currentText() # enable all by default widgets = [ self.duty, self.sym, self.width, self.rise, self.fall, self.dly, self.mean, self.stdev, self.band, self.arb_name, self.ph, self.off, self.amp, self.hlev, self.llev, self.fp_val ] for wid in widgets: wid.setEnabled(True) if w == "SINE": self._set_extras_enabled(duty=False, sym=False, pulse=False, noise=False, arb=False) self.ph.setEnabled(True); self.off.setEnabled(True) elif w == "SQUARE": self._set_extras_enabled(duty=True, sym=False, pulse=False, noise=False, arb=False) self.ph.setEnabled(True) elif w == "RAMP": self._set_extras_enabled(duty=False, sym=True, pulse=False, noise=False, arb=False) self.ph.setEnabled(True) elif w == "PULSE": self._set_extras_enabled(duty=True, sym=False, pulse=True, noise=False, arb=False) self.ph.setEnabled(False) elif w == "NOISE": self._set_extras_enabled(duty=False, sym=False, pulse=False, noise=True, arb=False) self.fp_val.setEnabled(False) self.amp.setEnabled(False); self.hlev.setEnabled(False); self.llev.setEnabled(False) self.off.setEnabled(False); self.ph.setEnabled(False) elif w == "ARB": self._set_extras_enabled(duty=False, sym=False, pulse=False, noise=False, arb=True) self.ph.setEnabled(True) elif w == "DC": self._set_extras_enabled(duty=False, sym=False, pulse=False, noise=False, arb=False) self.fp_val.setEnabled(False) self.amp.setEnabled(False); self.hlev.setEnabled(False); self.llev.setEnabled(False) self.ph.setEnabled(False) def _set_extras_enabled(self, *, duty: bool, sym: bool, pulse: bool, noise: bool, arb: bool): self.duty.setEnabled(duty) self.sym.setEnabled(sym) for w in (self.width, self.rise, self.fall, self.dly): w.setEnabled(pulse) self.mean.setEnabled(noise) self.stdev.setEnabled(noise) self.band.setEnabled(noise) self.arb_name.setEnabled(arb) def _emit_apply(self): w = self.wave.currentText() cfg = {"WVTP": w} if w not in ("NOISE", "DC"): key = self.fp_mode.currentText() cfg[key] = human_to_eng(self.fp_val.text()) if w not in ("NOISE", "DC"): if self.amp_mode.currentText() == "AMP": cfg["AMP"] = human_to_eng(self.amp.text()) else: cfg["HLEV"] = human_to_eng(self.hlev.text()) cfg["LLEV"] = human_to_eng(self.llev.text()) if w != "NOISE": cfg["OFST"] = human_to_eng(self.off.text()) if w in ("SINE", "SQUARE", "RAMP", "ARB"): cfg["PHSE"] = human_to_eng(self.ph.text()) if w in ("SQUARE", "PULSE"): cfg["DUTY"] = human_to_eng(self.duty.text()) if w == "RAMP": cfg["SYM"] = human_to_eng(self.sym.text()) if w == "PULSE": cfg["WIDTH"] = human_to_eng(self.width.text()) cfg["RISE"] = human_to_eng(self.rise.text()) cfg["FALL"] = human_to_eng(self.fall.text()) cfg["DLY"] = human_to_eng(self.dly.text()) if w == "NOISE": cfg["MEAN"] = human_to_eng(self.mean.text()) cfg["STDEV"] = human_to_eng(self.stdev.text()) cfg["BANDSTATE"] = "ON" if self.band.isChecked() else "OFF" if w == "ARB" and self.arb_name.text().strip(): cfg["ARWV_NAME"] = self.arb_name.text().strip() self.apply_basic.emit(cfg) class PresetsTab(QtWidgets.QWidget): rename_slot = QtCore.pyqtSignal(int, str) view_refresh = QtCore.pyqtSignal() def __init__(self): super().__init__() self._build() def _build(self): self.table = QtWidgets.QTableWidget(10, 3) self.table.setHorizontalHeaderLabels(["Slot", "Name", "Summary"]) self.table.horizontalHeader().setStretchLastSection(True) self.table.verticalHeader().setVisible(False) for i in range(10): self.table.setItem(i, 0, QtWidgets.QTableWidgetItem(str(i+1))) self.table.item(i, 0).setFlags(QtCore.Qt.ItemIsEnabled) self.table.setItem(i, 1, QtWidgets.QTableWidgetItem("")) self.table.setItem(i, 2, QtWidgets.QTableWidgetItem("")) self.table.item(i, 2).setFlags(QtCore.Qt.ItemIsEnabled) self.btn_load = QtWidgets.QPushButton("Reload File") self.btn_save = QtWidgets.QPushButton("Save Names") self.btn_open = QtWidgets.QPushButton("Open File...") self.path_label = QtWidgets.QLabel(PRESET_BASENAME) layout = QtWidgets.QVBoxLayout() hl = QtWidgets.QHBoxLayout() hl.addWidget(self.btn_load); hl.addWidget(self.btn_save); hl.addWidget(self.btn_open); hl.addStretch(1); hl.addWidget(self.path_label) layout.addLayout(hl) layout.addWidget(self.table) self.setLayout(layout) self.btn_load.clicked.connect(lambda: self.view_refresh.emit()) self.btn_save.clicked.connect(self._save_names) self.btn_open.clicked.connect(self._open_file) def _open_file(self): path, _ = QtWidgets.QFileDialog.getOpenFileName(self, "Open Preset File", PRESET_BASENAME, "Text (*.txt *.dat);;All Files (*)") if not path: return self.path_label.setText(path) self.view_refresh.emit() def _save_names(self): for i in range(10): name = self.table.item(i, 1).text().strip() self.rename_slot.emit(i+1, name) def update_from_presets(self, d: dict): for i in range(1, 11): name = d.get(f"SLOT{i}_NAME", "") b = d.get(f"SLOT{i}_BSWV", "") outp = d.get(f"SLOT{i}_OUTP", "") summ = "" if b: up = b.upper() fields = [] for key in ("WVTP,", "FRQ,", "AMP,", "OFST,"): idx = up.find(key) if idx >= 0: val = b[idx+len(key):] val = val.split(",")[0] fields.append(f"{key[:-1]}={val}") summ = " ".join(fields) if outp: summ += (" " if summ else "") + f"OUTP={outp}" self.table.item(i-1, 1).setText(name) self.table.item(i-1, 2).setText(summ) class ConfigTab(QtWidgets.QWidget): changed = QtCore.pyqtSignal(dict) # {"PRESET_DIR":..., "SCREENSHOT_DIR":...} def __init__(self, initial: dict): super().__init__() self._build(initial) def _build(self, cfg: dict): self.preset_dir = QtWidgets.QLineEdit(cfg.get("PRESET_DIR","")) self.ss_dir = QtWidgets.QLineEdit(cfg.get("SCREENSHOT_DIR","")) btn_browse_p = QtWidgets.QPushButton("…") btn_browse_s = QtWidgets.QPushButton("…") self.btn_save = QtWidgets.QPushButton("Save Config") self.btn_reload = QtWidgets.QPushButton("Reload Config") g = QtWidgets.QGridLayout(); row = 0 g.addWidget(QtWidgets.QLabel("Preset dir"), row, 0); g.addWidget(self.preset_dir, row, 1); g.addWidget(btn_browse_p, row, 2); row += 1 g.addWidget(QtWidgets.QLabel("Screenshot dir"), row, 0); g.addWidget(self.ss_dir, row, 1); g.addWidget(btn_browse_s, row, 2); row += 1 g.addWidget(self.btn_save, row, 0); g.addWidget(self.btn_reload, row, 1); g.setColumnStretch(1,1) self.setLayout(g) btn_browse_p.clicked.connect(self._pick_preset_dir) btn_browse_s.clicked.connect(self._pick_ss_dir) self.btn_save.clicked.connect(self._save) self.btn_reload.clicked.connect(self._reload) def _pick_preset_dir(self): d = QtWidgets.QFileDialog.getExistingDirectory(self, "Select Preset Directory", self.preset_dir.text().strip() or os.getcwd()) if d: self.preset_dir.setText(d) def _pick_ss_dir(self): d = QtWidgets.QFileDialog.getExistingDirectory(self, "Select Screenshot Directory", self.ss_dir.text().strip() or os.getcwd()) if d: self.ss_dir.setText(d) def _save(self): d = {"PRESET_DIR": self.preset_dir.text().strip(), "SCREENSHOT_DIR": self.ss_dir.text().strip()} self.changed.emit(d) def _reload(self): d = load_config(CONFIG_FILE) self.preset_dir.setText(d.get("PRESET_DIR","")) self.ss_dir.setText(d.get("SCREENSHOT_DIR","")) class CLITab(QtWidgets.QWidget): send_cmd = QtCore.pyqtSignal(str) query_cmd = QtCore.pyqtSignal(str) def __init__(self): super().__init__() self._build() def _build(self): self.inp = QtWidgets.QLineEdit() self.inp.setPlaceholderText("Enter SCPI, e.g. C1:BSWV? or *IDN?") btn_send = QtWidgets.QPushButton("Send") btn_query = QtWidgets.QPushButton("Query") self.out = QtWidgets.QPlainTextEdit(); self.out.setReadOnly(True); self.out.setMinimumHeight(200) grid = QtWidgets.QGridLayout(); row = 0 grid.addWidget(QtWidgets.QLabel("Command"), row, 0) grid.addWidget(self.inp, row, 1, 1, 4) grid.addWidget(btn_send, row, 5) grid.addWidget(btn_query, row, 6); row += 1 grid.addWidget(self.out, row, 0, 1, 7) self.setLayout(grid) btn_send.clicked.connect(self._do_send) btn_query.clicked.connect(self._do_query) self.inp.returnPressed.connect(self._do_query) def append(self, s: str): self.out.appendPlainText(s) def _do_send(self): cmd = self.inp.text().strip() if cmd: self.send_cmd.emit(cmd) def _do_query(self): cmd = self.inp.text().strip() if cmd: self.query_cmd.emit(cmd) class BurstTab(QtWidgets.QWidget): apply_burst = QtCore.pyqtSignal(dict) readback_burst = QtCore.pyqtSignal() def __init__(self): super().__init__() self._build() def _build(self): state_lbl = QtWidgets.QLabel("State") self.state = QtWidgets.QComboBox(); self.state.addItems(["OFF", "ON"]) mode_lbl = QtWidgets.QLabel("Mode") self.mode = QtWidgets.QComboBox(); self.mode.addItems(["TRIG", "GAT"]) src_lbl = QtWidgets.QLabel("Source") self.src = QtWidgets.QComboBox(); self.src.addItems(["INT", "EXT"]) ncy_lbl = QtWidgets.QLabel("Cycles") self.ncy = QtWidgets.QLineEdit("5") dly_lbl = QtWidgets.QLabel("Delay s") self.dly = QtWidgets.QLineEdit("0") gpol_lbl = QtWidgets.QLabel("Gate Pol") self.gpol = QtWidgets.QComboBox(); self.gpol.addItems(["POS", "NEG"]) self.btn_apply = QtWidgets.QPushButton("Apply Burst") self.btn_read = QtWidgets.QPushButton("Readback Burst") grid = QtWidgets.QGridLayout(); row = 0 grid.addWidget(state_lbl, row, 0); grid.addWidget(self.state, row, 1) grid.addWidget(mode_lbl, row, 2); grid.addWidget(self.mode, row, 3); row += 1 grid.addWidget(src_lbl, row, 0); grid.addWidget(self.src, row, 1) grid.addWidget(ncy_lbl, row, 2); grid.addWidget(self.ncy, row, 3); row += 1 grid.addWidget(dly_lbl, row, 0); grid.addWidget(self.dly, row, 1) grid.addWidget(gpol_lbl, row, 2); grid.addWidget(self.gpol, row, 3); row += 1 grid.addWidget(self.btn_apply, row, 0) grid.addWidget(self.btn_read, row, 1) self.setLayout(grid) self.btn_apply.clicked.connect(self._emit_apply) self.btn_read.clicked.connect(lambda: self.readback_burst.emit()) def _emit_apply(self): try: ncy_int = int(float(self.ncy.text())) except Exception: ncy_int = 1 cfg = dict( STATE=self.state.currentText(), MODE=self.mode.currentText(), TRSR=self.src.currentText(), NCYC=str(ncy_int), DLAY=human_to_eng(self.dly.text()), GATEPOL=self.gpol.currentText(), ) self.apply_burst.emit(cfg) class SweepTab(QtWidgets.QWidget): apply_sweep = QtCore.pyqtSignal(dict) readback_sweep = QtCore.pyqtSignal() def __init__(self): super().__init__() self._build() def _build(self): state_lbl = QtWidgets.QLabel("State") self.state = QtWidgets.QComboBox(); self.state.addItems(["OFF", "ON"]) typ_lbl = QtWidgets.QLabel("Type") self.typ = QtWidgets.QComboBox(); self.typ.addItems(["LIN", "LOG"]) fstart_lbl = QtWidgets.QLabel("Start Hz") self.fstart = QtWidgets.QLineEdit("1 kHz") fstop_lbl = QtWidgets.QLabel("Stop Hz") self.fstop = QtWidgets.QLineEdit("10 kHz") time_lbl = QtWidgets.QLabel("Time s") self.time = QtWidgets.QLineEdit("1.0") dir_lbl = QtWidgets.QLabel("Direction") self.direction = QtWidgets.QComboBox(); self.direction.addItems(["UP", "DOWN"]) src_lbl = QtWidgets.QLabel("Source") self.src = QtWidgets.QComboBox(); self.src.addItems(["INT", "EXT"]) self.btn_apply = QtWidgets.QPushButton("Apply Sweep") self.btn_read = QtWidgets.QPushButton("Readback Sweep") grid = QtWidgets.QGridLayout(); row = 0 grid.addWidget(state_lbl, row, 0); grid.addWidget(self.state, row, 1) grid.addWidget(typ_lbl, row, 2); grid.addWidget(self.typ, row, 3); row += 1 grid.addWidget(fstart_lbl, row, 0); grid.addWidget(self.fstart, row, 1) grid.addWidget(fstop_lbl, row, 2); grid.addWidget(self.fstop, row, 3); row += 1 grid.addWidget(time_lbl, row, 0); grid.addWidget(self.time, row, 1) grid.addWidget(dir_lbl, row, 2); grid.addWidget(self.direction, row, 3); row += 1 grid.addWidget(src_lbl, row, 0); grid.addWidget(self.src, row, 1); row += 1 grid.addWidget(self.btn_apply, row, 0) grid.addWidget(self.btn_read, row, 1) self.setLayout(grid) self.btn_apply.clicked.connect(self._emit_apply) self.btn_read.clicked.connect(lambda: self.readback_sweep.emit()) def _emit_apply(self): cfg = dict( STATE=self.state.currentText(), WAV=self.typ.currentText(), STAR=human_to_eng(self.fstart.text()), STOP=human_to_eng(self.fstop.text()), TIME=human_to_eng(self.time.text()), DIR=self.direction.currentText(), TRSR=self.src.currentText(), ) self.apply_sweep.emit(cfg) class ARBTab(QtWidgets.QWidget): refresh_lists = QtCore.pyqtSignal() set_wave = QtCore.pyqtSignal(str) upload_wave = QtCore.pyqtSignal(str, str) download_wave = QtCore.pyqtSignal(str, str) def __init__(self): super().__init__() self._build() def _build(self): self.built_in = QtWidgets.QListWidget() self.user = QtWidgets.QListWidget() refresh = QtWidgets.QPushButton("Refresh") set_btn = QtWidgets.QPushButton("Set Selected to Channel") upload_btn = QtWidgets.QPushButton("Upload .bin/.csv → USER") download_btn = QtWidgets.QPushButton("Download USER → file") self.name_edit = QtWidgets.QLineEdit(); self.name_edit.setPlaceholderText("User name (optional)") grid = QtWidgets.QGridLayout() grid.addWidget(QtWidgets.QLabel("BUILDIN"), 0, 0) grid.addWidget(QtWidgets.QLabel("USER"), 0, 1) grid.addWidget(self.built_in, 1, 0) grid.addWidget(self.user, 1, 1) grid.addWidget(refresh, 2, 0) grid.addWidget(set_btn, 2, 1) grid.addWidget(self.name_edit, 3, 0) grid.addWidget(upload_btn, 3, 1) grid.addWidget(download_btn, 4, 1) self.setLayout(grid) refresh.clicked.connect(lambda: self.refresh_lists.emit()) set_btn.clicked.connect(self._emit_set) upload_btn.clicked.connect(self._emit_upload) download_btn.clicked.connect(self._emit_download) def selected_name(self) -> str: w = self.user.currentItem() or self.built_in.currentItem() return w.text().strip() if w else "" def _emit_set(self): name = self.selected_name() if name: self.set_wave.emit(name) def _emit_upload(self): path, _ = QtWidgets.QFileDialog.getOpenFileName(self, "Select ARB file", "", "ARB Files (*.bin *.csv);;All Files (*)") if not path: return name = self.name_edit.text().strip() if not name: name = os.path.splitext(os.path.basename(path))[0] self.upload_wave.emit(name, path) def _emit_download(self): item = self.user.currentItem() if not item: return name = item.text().strip() path, _ = QtWidgets.QFileDialog.getSaveFileName(self, "Save ARB file", f"{name}.bin", "Binary (*.bin);;All Files (*)") if not path: return self.download_wave.emit(name, path) # -------------------- Main Window -------------------- class Main(QtWidgets.QWidget): def __init__(self, preset_ip=None): super().__init__() self.setWindowTitle(Window_Name) self.i = SDGLan() self._scr_worker = None self.cfg = load_config(CONFIG_FILE) ensure_dir(self.cfg["PRESET_DIR"]) ensure_dir(self.cfg["SCREENSHOT_DIR"]) self.preset_path = os.path.join(self.cfg["PRESET_DIR"], PRESET_BASENAME) self._build(preset_ip) def _build(self, preset_ip): ip_lbl = QtWidgets.QLabel("IP") self.ip = QtWidgets.QLineEdit(preset_ip or "") if not preset_ip: self.ip.setPlaceholderText("e.g. 192.168.1.120") port_lbl = QtWidgets.QLabel("Port") self.port = QtWidgets.QLineEdit(str(DEFAULT_PORT)) self.btn_connect = QtWidgets.QPushButton("Connect") self.btn_idn = QtWidgets.QPushButton("IDN?") self.btn_err = QtWidgets.QPushButton("SYST:ERR?") self.btn_scr = QtWidgets.QPushButton("Screenshot") ch_lbl = QtWidgets.QLabel("Channel") self.ch = QtWidgets.QComboBox(); self.ch.addItems(["C1", "C2"]) self.tabs = QtWidgets.QTabWidget() self.t_basic = BasicTab(); self.tabs.addTab(self.t_basic, "Basic") self.t_burst = BurstTab(); self.tabs.addTab(self.t_burst, "Burst") self.t_sweep = SweepTab(); self.tabs.addTab(self.t_sweep, "Sweep") self.t_arb = ARBTab(); self.tabs.addTab(self.t_arb, "ARB Manager") self.t_cli = CLITab(); self.tabs.addTab(self.t_cli, "SCPI CLI") self.t_presets = PresetsTab(); self.tabs.addTab(self.t_presets, "Presets") self.t_config = ConfigTab(self.cfg); self.tabs.addTab(self.t_config, "Config") self.log = QtWidgets.QPlainTextEdit(); self.log.setReadOnly(True); self.log.setMinimumHeight(200) top = QtWidgets.QGridLayout(); row = 0 top.addWidget(ip_lbl, row, 0); top.addWidget(self.ip, row, 1, 1, 3) top.addWidget(port_lbl, row, 4); top.addWidget(self.port, row, 5) top.addWidget(self.btn_connect, row, 6); row += 1 top.addWidget(self.btn_idn, row, 0); top.addWidget(self.btn_err, row, 1); top.addWidget(self.btn_scr, row, 2) top.addWidget(ch_lbl, row, 3); top.addWidget(self.ch, row, 4); row += 1 top.addWidget(self.tabs, row, 0, 1, 7); row += 1 top.addWidget(self.log, row, 0, 1, 7) self.setLayout(top) # wiring self.btn_connect.clicked.connect(self.on_connect) self.btn_idn.clicked.connect(self.on_idn) self.btn_err.clicked.connect(self.on_err) self.btn_scr.clicked.connect(self.on_screenshot) self.t_basic.apply_basic.connect(self.on_apply_basic) self.t_basic.readback_basic.connect(self.on_readback_basic) self.t_basic.toggle_output.connect(self.on_output) self.t_basic.store_slot.connect(self.on_store_slot) self.t_basic.recall_slot.connect(self.on_recall_slot) self.t_burst.apply_burst.connect(self.on_apply_burst) self.t_burst.readback_burst.connect(self.on_readback_burst) self.t_sweep.apply_sweep.connect(self.on_apply_sweep) self.t_sweep.readback_sweep.connect(self.on_readback_sweep) self.t_arb.refresh_lists.connect(self.on_arb_refresh) self.t_arb.set_wave.connect(self.on_arb_set) self.t_arb.upload_wave.connect(self.on_arb_upload) self.t_arb.download_wave.connect(self.on_arb_download) self.t_cli.send_cmd.connect(self.on_cli_send) self.t_cli.query_cmd.connect(self.on_cli_query) self.t_config.changed.connect(self.on_config_changed) ensure_preset_file(self.preset_path) self.refresh_presets_view() # --------- helpers --------- def logln(self, s: str): self.log.appendPlainText(s) def _pref(self) -> str: return self.ch.currentText() + ":" # ---------- Top actions ---------- def on_connect(self): try: addr = self.ip.text().strip(); port = int(self.port.text().strip()) self.i.connect(addr, port) self.logln(f"Connected to {addr}:{port}") except Exception as e: self.logln(f"[ERR] connect: {e}") def on_idn(self): try: self.logln(f"*IDN? -> {self.i.query_retry('*IDN?')}") except Exception as e: self.logln(f"[ERR] IDN: {e}") def on_err(self): try: self.logln(f"SYST:ERR? -> {self.i.query_retry('SYST:ERR?')}") except Exception as e: self.logln(f"[ERR] SYST:ERR?: {e}") # ---------- Screenshot ---------- def on_screenshot(self): if not self.i.sock: self.logln("[ERR] screenshot: not connected") return if hasattr(self, "_scr_worker") and self._scr_worker and self._scr_worker.isRunning(): self.logln("[ERR] screenshot already running") return self.btn_scr.setEnabled(False); self.setCursor(QtCore.Qt.WaitCursor) outdir = load_config(CONFIG_FILE)["SCREENSHOT_DIR"] self._scr_worker = ScreenshotWorker(self.i, outdir, self) self._scr_worker.done.connect(self._on_screenshot_done) self._scr_worker.start() def _on_screenshot_done(self, data: bytes, full_path: str, err: str): self.btn_scr.setEnabled(True); self.unsetCursor() if err: self.logln(f"[ERR] screenshot: {err}") return try: with open(full_path, "wb") as f: f.write(data) self.logln(f"Screenshot saved to {full_path} ({len(data)} bytes)") img = QtGui.QImage(full_path) if img.isNull(): self.logln("[ERR] screenshot: Qt decode failed") return pixmap = QtGui.QPixmap.fromImage(img) lbl = QtWidgets.QLabel(); lbl.setPixmap(pixmap) w = QtWidgets.QWidget(); l = QtWidgets.QVBoxLayout(w); l.addWidget(lbl) w.setWindowTitle("SDG2042X Screenshot"); w.resize(pixmap.width(), pixmap.height()); w.show() self._scr_win = w except Exception as e: self.logln(f"[ERR] screenshot save/display: {e}") # ---------- Basic apply/readback ---------- def on_apply_basic(self, cfg: dict): try: parts = [f"WVTP,{cfg['WVTP']}"] if 'FRQ' in cfg: parts.append(f"FRQ,{cfg['FRQ']}") if 'PERI' in cfg: parts.append(f"PERI,{cfg['PERI']}") if 'AMP' in cfg: parts.append(f"AMP,{cfg['AMP']}") else: if 'HLEV' in cfg: parts.append(f"HLEV,{cfg['HLEV']}") if 'LLEV' in cfg: parts.append(f"LLEV,{cfg['LLEV']}") if 'OFST' in cfg: parts.append(f"OFST,{cfg['OFST']}") if 'PHSE' in cfg: parts.append(f"PHSE,{cfg['PHSE']}") for key in ("DUTY","SYM","WIDTH","RISE","FALL","DLY","MEAN","STDEV","BANDSTATE"): if key in cfg: parts.append(f"{key},{cfg[key]}") cmd = f"{self._pref()}BSWV " + ",".join(parts) self.i.write(cmd); self.logln(f"SET: {cmd}") if cfg.get("WVTP") == "ARB" and cfg.get("ARWV_NAME"): qname = quote_name(cfg["ARWV_NAME"]) self.i.write(f"{self._pref()}ARWV NAME,{qname}") self.logln(f"SET ARWV NAME,{qname}") except Exception as e: self.logln(f"[ERR] basic apply: {e}") def on_readback_basic(self): try: rb = self.i.query_retry(f"{self._pref()}BSWV?") st = self.i.query_retry(f"{self._pref()}OUTP?") self.logln(f"{self._pref()}BSWV? -> {rb}") self.logln(f"{self._pref()}OUTP? -> {st}") except Exception as e: self.logln(f"[ERR] basic readback: {e}") def on_output(self, turn_on: bool): try: cmd = f"{self._pref()}OUTP {'ON' if turn_on else 'OFF'}" self.i.write(cmd) st = self.i.query_retry(f"{self._pref()}OUTP?") self.logln(f"SET: {cmd}") self.logln(f"{self._pref()}OUTP? -> {st}") except Exception as e: self.logln(f"[ERR] output: {e}") # ---------- Burst ---------- def on_apply_burst(self, cfg: dict): try: parts = [f"{k},{v}" for k, v in cfg.items()] cmd = f"{self._pref()}BTWV " + ",".join(parts) self.i.write(cmd); self.logln(f"SET: {cmd}") except Exception as e: self.logln(f"[ERR] burst apply: {e}") def on_readback_burst(self): try: rb = self.i.query_retry(f"{self._pref()}BTWV?"); self.logln(f"{self._pref()}BTWV? -> {rb}") except Exception as e: self.logln(f"[ERR] burst readback: {e}") # ---------- Sweep ---------- def on_apply_sweep(self, cfg: dict): try: parts = [f"{k},{v}" for k, v in cfg.items()] cmd = f"{self._pref()}SWWV " + ",".join(parts) self.i.write(cmd) err = self.i.query_retry("SYST:ERR?") if not err.startswith("0"): parts2 = [p.replace("WAV,", "SPAC,") for p in parts] cmd2 = f"{self._pref()}SWWV " + ",".join(parts2) self.i.write(cmd2); self.logln(f"SET (fallback): {cmd2}") else: self.logln(f"SET: {cmd}") except Exception as e: self.logln(f"[ERR] sweep apply: {e}") def on_readback_sweep(self): try: rb = self.i.query_retry(f"{self._pref()}SWWV?"); self.logln(f"{self._pref()}SWWV? -> {rb}") except Exception as e: self.logln(f"[ERR] sweep readback: {e}") # ---------- ARB ops ---------- def on_arb_refresh(self): try: b = self.i.query_retry("STL? BUILDIN") u = self.i.query_retry("STL? USER") def parse_list(s: str): return [p.strip() for p in s.replace(";", ",").split(',') if p.strip()] b_list = parse_list(b) u_list = parse_list(u) self.t_arb.built_in.clear(); self.t_arb.user.clear() self.t_arb.built_in.addItems(b_list) self.t_arb.user.addItems(u_list) self.logln(f"BUILDIN: {b_list}"); self.logln(f"USER: {u_list}") except Exception as e: self.logln(f"[ERR] ARB refresh: {e}") def on_arb_set(self, name: str): try: qname = quote_name(name) self.i.write(f"{self._pref()}BSWV WVTP,ARB") self.i.write(f"{self._pref()}ARWV NAME,{qname}") rb = self.i.query_retry(f"{self._pref()}ARWV?") self.logln(f"SET ARB: {qname}; ARWV? -> {rb}") except Exception as e: self.logln(f"[ERR] ARB set: {e}") def _make_scpi_block(self, payload: bytes) -> bytes: n = len(payload) len_digits = str(len(str(n))).encode('ascii') header = b'#' + len_digits + str(n).encode('ascii') return header + payload def on_arb_upload(self, name: str, path: str): try: with open(path, 'rb') as f: data = f.read() block = self._make_scpi_block(data) qname = quote_name(name) prefix = f"WVDT USER,{qname},".encode('ascii') self.i.write_bytes(prefix + block + b"\n") self.logln(f"Uploaded USER,{qname} ({len(data)} bytes)") except Exception as e: self.logln(f"[ERR] ARB upload: {e}") def on_arb_download(self, name: str, savepath: str): try: qname = quote_name(name) raw = self.i.query_raw(f"WVDT? USER,{qname}") idx = raw.find(b"WAVEDATA,") if idx == -1: raise RuntimeError("No WAVEDATA header") blk = raw[idx + len(b"WAVEDATA,"):] if not blk or blk[0:1] != b'#': raise RuntimeError("No SCPI block after WAVEDATA,") ndig = int(chr(blk[1])) start = 2 blen = int(blk[start:start+ndig].decode('ascii')) start += ndig payload = bytearray(blk[start:start+blen]) need = blen - len(payload) while need > 0: try: chunk = self.i.sock.recv(min(65536, need)) if not chunk: break payload.extend(chunk) need -= len(chunk) except socket.timeout: break with open(savepath, 'wb') as f: f.write(bytes(payload)) self.logln(f"Downloaded USER,{qname} -> {savepath} ({len(payload)} bytes)") except Exception as e: self.logln(f"[ERR] ARB download: {e}") # ---------- CLI tab ---------- def on_cli_send(self, cmd: str): try: self.i.write(cmd) self.t_cli.append(f"> {cmd}") except Exception as e: self.t_cli.append(f"[ERR] send: {e}") def on_cli_query(self, cmd: str): try: resp = self.i.query_retry(cmd) self.t_cli.append(f"? {cmd}\n< {resp}") except Exception as e: self.t_cli.append(f"[ERR] query: {e}") # ---------- Presets ---------- def on_store_slot(self, slot: int): try: if slot < 1 or slot > 10: raise ValueError("slot out of range") bswv = self.i.query_retry(f"{self._pref()}BSWV?") outp = self.i.query_retry(f"{self._pref()}OUTP?") outp_state = "ON" if "ON" in outp.upper().split(',')[0] else "OFF" d = read_presets(self.preset_path) d[f"SLOT{slot}_BSWV"] = bswv d[f"SLOT{slot}_OUTP"] = outp_state write_presets(self.preset_path, d) self.logln(f"Stored slot {slot}: BSWV len={len(bswv)}, OUTP={outp_state}") self.refresh_presets_view() except Exception as e: self.logln(f"[ERR] store slot {slot}: {e}") def on_recall_slot(self, slot: int): try: if slot < 1 or slot > 10: raise ValueError("slot out of range") d = read_presets(self.preset_path) b = d.get(f"SLOT{slot}_BSWV", "").strip() o = d.get(f"SLOT{slot}_OUTP", "").strip().upper() if not b: raise RuntimeError("empty BSWV") self.i.write(b) if o in ("ON", "OFF"): self.i.write(f"{self._pref()}OUTP {o}") self.logln(f"Recalled slot {slot}: applied BSWV and OUTP={o or 'N/A'}") except Exception as e: self.logln(f"[ERR] recall slot {slot}: {e}") def on_rename_slot(self, slot: int, name: str): try: d = read_presets(self.preset_path) d[f"SLOT{slot}_NAME"] = name write_presets(self.preset_path, d) self.refresh_presets_view() except Exception as e: self.logln(f"[ERR] rename slot {slot}: {e}") def refresh_presets_view(self): try: ensure_preset_file(self.preset_path) d = read_presets(self.preset_path) self.t_presets.update_from_presets(d) self.logln(f"Preset file: {self.preset_path}") except Exception as e: self.logln(f"[ERR] refresh presets: {e}") # ---------- Config ---------- def on_config_changed(self, newd: dict): try: nd = { "PRESET_DIR": _expand(newd.get("PRESET_DIR","")) or os.getcwd(), "SCREENSHOT_DIR": _expand(newd.get("SCREENSHOT_DIR","")) or os.getcwd(), } ensure_dir(nd["PRESET_DIR"]); ensure_dir(nd["SCREENSHOT_DIR"]) save_config(CONFIG_FILE, nd) self.cfg = nd self.preset_path = os.path.join(self.cfg["PRESET_DIR"], PRESET_BASENAME) ensure_preset_file(self.preset_path) self.refresh_presets_view() self.logln(f"Config saved -> {CONFIG_FILE}") except Exception as e: self.logln(f"[ERR] save config: {e}") # -------------------- CLI entry -------------------- def show_help(): print("Usage: sdg2042x_gui.py [options]\n") print("Options:") print(" -ip , --ip Prefill IP address field") print(" -h, --help Show this help message") if __name__ == "__main__": preset_ip = None argv = sys.argv[1:] for i, tok in enumerate(argv): if tok in ("-h", "--help"): show_help(); sys.exit(0) if tok in ("-ip", "--ip") and i + 1 < len(argv): preset_ip = argv[i + 1] app = QtWidgets.QApplication(sys.argv) m = Main(preset_ip=preset_ip) m.resize(1250, 780) m.show() sys.exit(app.exec_())