Files

1481 lines
58 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# Siglent SDG2042X Linux GUI (PyQt5)
# Features:
# Basic Signals, including Burst, Sweep context-aware parameter settings
# - ARB Manager, SCPI CLI,
# - Storing Presets, storing to editable preset file SDG2042x.dat
# - path setting for screenshot and preset file
# - timestamped Screenshot
#
# Author: Thomas Gohle / tgohle@togo-lab.io
# www.togo-lab.io
# tgonet.de / gohle.de
#
# Copyright: CC BY-SA 4.0
# https://creativecommons.org/licenses/by-sa/4.0/
#
# Version: 0.2.1 - 2026-04-20
#
# ---- PATCH NOTES Severity: High ----
#
# BUG-1 _recv_line(): partial recv could miss \n if it didn't land at
# the end of a recv() chunk. Rewritten to accumulate into a
# bytearray and check the joined buffer.
# BUG-2 on_arb_download() ran a blocking recv loop in the GUI thread,
# freezing the UI during large transfers. Replaced with
# ARBDownloadWorker(QThread) matching the ScreenshotWorker pattern.
# BUG-12 human_to_eng(): strip order was "hz" before suffix checks, so
# "1.5 MHz" -> "1.5 m" -> mult=1e-3 (completely wrong).
# Replaced with a regex-based parser; handles all original suffixes
# plus "mv". Falls back to original string on parse failure.
# BUG-15 real SDG ARB download returned WAVEDATA payload without the
# SCPI definite-length block form expected by v0.2. Download
# parser now accepts both block and raw-after-WAVEDATA formats.
# ARB store parsing was also reworked to keep BUILDIN index/name
# pairs separate from USER waveform names.
#
# ---- PATCH NOTES Severity: Medium ----
#
# BUG-6 cleaned up screenshot path handling.
# V0.1 used to save re-read the config from disk on every call,
# which meant the Config tabs in-memory state could be ignored until written out.
# Screenshot action now uses self.cfg["SCREENSHOT_DIR"] directly,
# so the live config is respected.
#
# -----------------------------------------------
import os
import re
import socket
import sys
import time
from PyQt5 import QtWidgets, QtCore, QtGui
# -------------------- Constants --------------------
Window_Name = "-= WaveForm Generator [SDG2042X] GUI by ToGo-Lab =-"
DEFAULT_PORT = 5025
SOCKET_TIMEOUT = 4.0
SIGLENT_WAVES = ["SINE", "SQUARE", "RAMP", "PULSE", "NOISE", "ARB", "DC"]
CONFIG_FILE = "SDG2042x.config"
PRESET_BASENAME = "SDG2042x.dat"
# -------------------- Config helpers --------------------
def _expand(p: str) -> str:
return os.path.abspath(os.path.expanduser(os.path.expandvars(p)))
def ensure_dir(path: str):
if path and not os.path.isdir(path):
os.makedirs(path, exist_ok=True)
def load_config(path: str) -> dict:
d = {"PRESET_DIR": os.getcwd(), "SCREENSHOT_DIR": os.getcwd()}
try:
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#") or "=" not in line:
continue
k, v = line.split("=", 1)
d[k.strip().upper()] = v.strip()
except FileNotFoundError:
pass
d["PRESET_DIR"] = _expand(d.get("PRESET_DIR", os.getcwd()))
d["SCREENSHOT_DIR"] = _expand(d.get("SCREENSHOT_DIR", os.getcwd()))
return d
def save_config(path: str, d: dict):
lines = [
"# SDG2042x.config",
f"PRESET_DIR={d.get('PRESET_DIR','')}",
f"SCREENSHOT_DIR={d.get('SCREENSHOT_DIR','')}",
""
]
with open(path, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
# -------------------- Transport --------------------
class SDGLan:
def __init__(self):
self.sock = None
self.addr = None
self.port = DEFAULT_PORT
def connect(self, addr, port=DEFAULT_PORT):
self.close()
self.addr = addr
self.port = port
self.sock = socket.create_connection((addr, port), timeout=SOCKET_TIMEOUT)
self.sock.settimeout(SOCKET_TIMEOUT)
def close(self):
try:
if self.sock:
self.sock.close()
finally:
self.sock = None
def write(self, cmd: str):
if not self.sock:
raise RuntimeError("Not connected")
if not cmd.endswith("\n"):
cmd += "\n"
self.sock.sendall(cmd.encode("ascii"))
def write_bytes(self, payload: bytes):
if not self.sock:
raise RuntimeError("Not connected")
self.sock.sendall(payload)
def drain(self):
"""Read and discard any bytes until timeout. Safe if nothing pending."""
try:
_ = self._recv_until_timeout()
except Exception:
pass
def query(self, cmd: str) -> str:
self.write(cmd)
data = self._recv_line()
return data.strip()
def query_retry(self, cmd: str, retries: int = 2) -> str:
"""Query with pre-drain and retry on socket.timeout."""
last_err = None
for _ in range(retries + 1):
try:
self.drain()
self.write(cmd)
data = self._recv_line()
return data.strip()
except socket.timeout as e:
last_err = e
try:
self.write("*CLS")
except Exception:
pass
continue
if last_err:
raise last_err
return ""
def query_raw(self, cmd: str) -> bytes:
self.write(cmd)
return self._recv_until_timeout()
# ---- block helpers ----
def _recv_exact(self, n: int) -> bytes:
buf = bytearray()
while len(buf) < n:
chunk = self.sock.recv(n - len(buf))
if not chunk:
break
buf.extend(chunk)
return bytes(buf)
def _recv_exact_capped(self, n: int, cap: int) -> bytes:
if n > cap:
raise RuntimeError(f"SCPI block length {n} exceeds cap {cap}")
return self._recv_exact(n)
def query_block(self, cmd: str, timeout_s: float = 8.0, cap: int = 25_000_000) -> bytes:
if not self.sock:
raise RuntimeError("Not connected")
prev_to = self.sock.gettimeout()
try:
self.sock.settimeout(timeout_s)
self.write(cmd)
first = self._recv_exact(1)
while first and first != b"#":
first = self._recv_exact(1)
if not first:
return b""
ndig_b = self._recv_exact(1)
if not ndig_b or not ndig_b.isdigit():
return b""
ndig = int(ndig_b.decode("ascii"))
len_str = self._recv_exact(ndig).decode("ascii")
if not len_str.isdigit():
return b""
length = int(len_str)
payload = self._recv_exact_capped(length, cap)
return payload
finally:
try:
self.sock.settimeout(prev_to)
except Exception:
pass
def query_scdp(self, timeout_s: float = 15.0, cap: int = 50_000_000) -> bytes:
if not self.sock:
raise RuntimeError("Not connected")
prev_to = self.sock.gettimeout()
try:
self.sock.settimeout(timeout_s)
self.write("SCDP")
first2 = self._recv_exact(2)
if not first2:
return b""
if first2 == b"BM":
size_bytes = self._recv_exact(4)
if len(size_bytes) != 4:
return b""
total_size = int.from_bytes(size_bytes, "little")
rest = self._recv_exact_capped(total_size - 6, cap)
return first2 + size_bytes + rest
if first2[:1] == b"#":
ndig = int(first2[1:2].decode())
len_str = self._recv_exact(ndig).decode("ascii")
if not len_str.isdigit():
return b""
length = int(len_str)
payload = self._recv_exact_capped(length, cap)
return payload
tail = self._recv_until_timeout()
return first2 + tail
finally:
try:
self.sock.settimeout(prev_to)
except Exception:
pass
# BUG-1 FIX: accumulate into bytearray and check the joined buffer,
# not just the last recv() chunk. The original check b.endswith(b"\n")
# on the raw chunk would miss a \n that arrived mid-buffer in a
# subsequent recv() call, causing _recv_line() to block forever.
def _recv_line(self) -> str:
buf = bytearray()
while True:
chunk = self.sock.recv(4096)
if not chunk:
break
buf.extend(chunk)
if b"\n" in buf:
break
return buf.decode("ascii", errors="ignore")
def _recv_until_timeout(self) -> bytes:
data = bytearray()
while True:
try:
b = self.sock.recv(65536)
if not b:
break
data.extend(b)
except socket.timeout:
break
return bytes(data)
# -------------------- Utilities --------------------
# BUG-12 FIX: original code stripped "hz" before checking suffix multipliers,
# so "1.5 MHz" -> "1.5 m" -> mult = 1e-3 (completely wrong frequency sent to
# instrument). Replaced with a single regex match that reads suffix first,
# before any stripping. Handles all original suffixes + "mv".
_ENG_SUFFIXES = {
"ghz": 1e9,
"mhz": 1e6,
"khz": 1e3,
"hz": 1.0,
"ms": 1e-3,
"us": 1e-6,
"ns": 1e-9,
"s": 1.0,
"mv": 1e-3,
"vpp": 1.0,
"v": 1.0,
"deg": 1.0,
"k": 1e3,
"m": 1e-3,
"u": 1e-6,
"n": 1e-9,
"g": 1e9,
}
# Build pattern: longest suffixes first to avoid partial matches (mhz before m)
_SUFFIX_PAT = "|".join(
re.escape(s) for s in sorted(_ENG_SUFFIXES, key=len, reverse=True)
)
_ENG_RE = re.compile(
r"^\s*([+-]?\d+\.?\d*(?:[eE][+-]?\d+)?)\s*(" + _SUFFIX_PAT + r")?\s*$",
re.IGNORECASE,
)
def human_to_eng(s: str) -> str:
m = _ENG_RE.match(s.strip())
if not m:
return s.strip() # fallback: return as-is, instrument will reject if wrong
val = float(m.group(1))
suffix = (m.group(2) or "").lower()
mult = _ENG_SUFFIXES.get(suffix, 1.0)
result = val * mult
# Return integer string when value is whole, float string otherwise
if result == int(result) and abs(result) < 1e15:
return str(int(result))
return repr(result) # repr preserves full float precision
def quote_name(name: str) -> str:
safe = all(ch.isalnum() or ch in ("_", "-") for ch in name)
return name if safe else '"%s"' % name
def _extract_length_from_header(header: bytes):
txt = header.decode("ascii", errors="ignore").upper()
m = re.search(r"(?:^|,)\s*LENGTH\s*,\s*(\d+)", txt)
return int(m.group(1)) if m else None
def parse_stl_builtin(s: str):
s = re.sub(r"^\s*STL\s*", "", s.strip(), flags=re.IGNORECASE)
toks = [t.strip() for t in s.replace(";", ",").split(",") if t.strip()]
out = []
i = 0
while i + 1 < len(toks):
tok = toks[i]
if re.fullmatch(r"M\d+", tok, flags=re.IGNORECASE):
out.append({"source": "BUILDIN", "index": int(tok[1:]), "name": toks[i + 1]})
i += 2
else:
i += 1
return out
def parse_stl_user(s: str):
s = re.sub(r"^\s*STL\s*", "", s.strip(), flags=re.IGNORECASE)
toks = [t.strip() for t in s.replace(";", ",").split(",") if t.strip()]
if toks and toks[0].upper() == "WVNM":
toks = toks[1:]
if len(toks) == 1 and toks[0].upper() == "EMPTY":
return []
return [{"source": "USER", "name": t} for t in toks if t.upper() != "EMPTY"]
def ensure_preset_file(path: str):
if not os.path.exists(path):
with open(path, "w", encoding="utf-8") as f:
f.write("# SDG2042X presets\n")
for i in range(1, 11):
f.write(f"SLOT{i}_NAME=\n")
f.write(f"SLOT{i}_BSWV=\n")
f.write(f"SLOT{i}_OUTP=\n")
return path
def read_presets(path: str) -> dict:
ensure_preset_file(path)
presets = {}
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
k, v = line.split("=", 1)
presets[k.strip()] = v.strip()
return presets
def write_presets(path: str, d: dict):
lines = ["# SDG2042X presets"]
for i in range(1, 11):
name = d.get(f"SLOT{i}_NAME", "")
bswv = d.get(f"SLOT{i}_BSWV", "")
outp = d.get(f"SLOT{i}_OUTP", "")
lines.append(f"SLOT{i}_NAME={name}")
lines.append(f"SLOT{i}_BSWV={bswv}")
lines.append(f"SLOT{i}_OUTP={outp}")
with open(path, "w", encoding="utf-8") as f:
f.write("\n".join(lines) + "\n")
# -------------------- Screenshot worker (threaded) --------------------
class ScreenshotWorker(QtCore.QThread):
done = QtCore.pyqtSignal(bytes, str, str) # data, filename, error
def __init__(self, io: SDGLan, out_dir: str, parent=None):
super().__init__(parent)
self.io = io
self.out_dir = out_dir
def run(self):
try:
try:
self.io.write("*CLS")
except Exception:
pass
try:
_ = self.io._recv_until_timeout()
except Exception:
pass
data = self.io.query_scdp(timeout_s=20.0, cap=50_000_000)
if not data:
err = self.io.query_retry("SYST:ERR?")
raise RuntimeError(f"no image data; last SYST:ERR? -> {err}")
ts = time.strftime("%Y%m%d-%H%M%S")
fn = f"{ts}_SDG2042X.bmp" if data.startswith(b"BM") else f"{ts}_SDG2042X.bin"
ensure_dir(self.out_dir)
full = os.path.join(self.out_dir, fn)
self.done.emit(data, full, "")
except Exception as e:
self.done.emit(b"", "", str(e))
# -------------------- BUG-2 FIX: ARB download worker --------------------
# Original on_arb_download() called self.i.sock.recv() in a blocking loop
# directly in the GUI thread, freezing the UI for large waveform transfers.
# Moved to a QThread worker with the same done-signal pattern as
# ScreenshotWorker. Main window connects the signal and handles save + log.
class ARBDownloadWorker(QtCore.QThread):
done = QtCore.pyqtSignal(bytes, str, str) # payload, savepath, error
def __init__(self, io: SDGLan, name: str, savepath: str, parent=None):
super().__init__(parent)
self.io = io
self.name = name
self.savepath = savepath
def run(self):
try:
qname = quote_name(self.name)
raw = self.io.query_raw(f"WVDT? USER,{qname}")
idx = raw.find(b"WAVEDATA,")
if idx == -1:
preview = raw[:120]
raise RuntimeError(f"No WAVEDATA header in response; first bytes={preview!r}")
header = raw[:idx]
tail = raw[idx + len(b"WAVEDATA,"):]
expected_len = _extract_length_from_header(header)
tail_wo_ws = tail.lstrip(b" \t\r\n")
if tail_wo_ws.startswith(b"#"):
if len(tail_wo_ws) < 2 or not tail_wo_ws[1:2].isdigit():
raise RuntimeError("Malformed SCPI block header after WAVEDATA,")
ndig = int(tail_wo_ws[1:2].decode("ascii"))
hdr_len = 2 + ndig
if len(tail_wo_ws) < hdr_len:
raise RuntimeError("Incomplete SCPI block header")
blen = int(tail_wo_ws[2:2 + ndig].decode("ascii"))
payload = bytearray(tail_wo_ws[hdr_len:hdr_len + blen])
need = blen - len(payload)
while need > 0:
chunk = self.io.sock.recv(min(65536, need))
if not chunk:
break
payload.extend(chunk)
need -= len(chunk)
if len(payload) < blen:
raise RuntimeError(f"Incomplete SCPI block payload: got {len(payload)} of {blen}")
self.done.emit(bytes(payload), self.savepath, "")
return
payload = bytearray(tail)
if expected_len is not None:
need = expected_len - len(payload)
while need > 0:
chunk = self.io.sock.recv(min(65536, need))
if not chunk:
break
payload.extend(chunk)
need -= len(chunk)
if len(payload) < expected_len:
raise RuntimeError(
f"Incomplete raw waveform payload: got {len(payload)} of {expected_len}"
)
payload = payload[:expected_len]
self.done.emit(bytes(payload), self.savepath, "")
except Exception as e:
self.done.emit(b"", self.savepath, str(e))
# -------------------- Tabs --------------------
class BasicTab(QtWidgets.QWidget):
apply_basic = QtCore.pyqtSignal(dict)
readback_basic = QtCore.pyqtSignal()
toggle_output = QtCore.pyqtSignal(bool)
store_slot = QtCore.pyqtSignal(int)
recall_slot = QtCore.pyqtSignal(int)
def __init__(self):
super().__init__()
self._build()
def _build(self):
# Row 1: waveform
wave_lbl = QtWidgets.QLabel("Waveform")
self.wave = QtWidgets.QComboBox(); self.wave.addItems(SIGLENT_WAVES)
# Row 2: FRQ/PERI
fp_lbl = QtWidgets.QLabel("Freq/Period")
self.fp_mode = QtWidgets.QComboBox(); self.fp_mode.addItems(["FRQ", "PERI"])
self.fp_val = QtWidgets.QLineEdit("1 kHz")
# Row 3: amplitude or levels
amp_mode_lbl = QtWidgets.QLabel("Amplitude mode")
self.amp_mode = QtWidgets.QComboBox(); self.amp_mode.addItems(["AMP", "H/L levels"])
self.amp = QtWidgets.QLineEdit("1.0") # AMP Vpp
self.hlev = QtWidgets.QLineEdit("0.5") # HLEV
self.llev = QtWidgets.QLineEdit("-0.5") # LLEV
# Row 4: offset and phase
off_lbl = QtWidgets.QLabel("Offset V")
self.off = QtWidgets.QLineEdit("0.0")
ph_lbl = QtWidgets.QLabel("Phase deg")
self.ph = QtWidgets.QLineEdit("0")
# Extras
self.duty = QtWidgets.QLineEdit("50") # Square/Pulse
self.sym = QtWidgets.QLineEdit("50") # Ramp
self.width = QtWidgets.QLineEdit("0.001") # Pulse s
self.rise = QtWidgets.QLineEdit("1e-6") # Pulse s
self.fall = QtWidgets.QLineEdit("1e-6") # Pulse s
self.dly = QtWidgets.QLineEdit("0") # Pulse s
self.mean = QtWidgets.QLineEdit("0") # Noise
self.stdev = QtWidgets.QLineEdit("0.1")
self.band = QtWidgets.QCheckBox("Noise bandwidth limit")
self.arb_name = QtWidgets.QLineEdit("") # ARB selection
# Buttons
self.btn_apply = QtWidgets.QPushButton("Apply")
self.btn_read = QtWidgets.QPushButton("Readback")
self.btn_on = QtWidgets.QPushButton("Output ON")
self.btn_off = QtWidgets.QPushButton("Output OFF")
# Presets
preset_lbl = QtWidgets.QLabel("Preset #")
self.preset_num = QtWidgets.QSpinBox(); self.preset_num.setRange(1, 10); self.preset_num.setValue(1)
self.btn_store = QtWidgets.QPushButton("Store")
self.btn_recall = QtWidgets.QPushButton("Recall")
grid = QtWidgets.QGridLayout(); row = 0
grid.addWidget(wave_lbl, row, 0); grid.addWidget(self.wave, row, 1); row += 1
grid.addWidget(fp_lbl, row, 0); grid.addWidget(self.fp_mode, row, 1); grid.addWidget(self.fp_val, row, 2); row += 1
grid.addWidget(amp_mode_lbl, row, 0); grid.addWidget(self.amp_mode, row, 1)
grid.addWidget(QtWidgets.QLabel("AMP Vpp"), row, 2); grid.addWidget(self.amp, row, 3)
grid.addWidget(QtWidgets.QLabel("HLEV"), row, 4); grid.addWidget(self.hlev, row, 5)
grid.addWidget(QtWidgets.QLabel("LLEV"), row, 6); grid.addWidget(self.llev, row, 7); row += 1
grid.addWidget(off_lbl, row, 0); grid.addWidget(self.off, row, 1)
grid.addWidget(ph_lbl, row, 2); grid.addWidget(self.ph, row, 3); row += 1
# Extras rows
grid.addWidget(QtWidgets.QLabel("DUTY %"), row, 0); grid.addWidget(self.duty, row, 1)
grid.addWidget(QtWidgets.QLabel("SYM %"), row, 2); grid.addWidget(self.sym, row, 3)
grid.addWidget(QtWidgets.QLabel("WIDTH s"), row, 4); grid.addWidget(self.width, row, 5); row += 1
grid.addWidget(QtWidgets.QLabel("RISE s"), row, 0); grid.addWidget(self.rise, row, 1)
grid.addWidget(QtWidgets.QLabel("FALL s"), row, 2); grid.addWidget(self.fall, row, 3)
grid.addWidget(QtWidgets.QLabel("DLY s"), row, 4); grid.addWidget(self.dly, row, 5); row += 1
grid.addWidget(QtWidgets.QLabel("NOISE MEAN"), row, 0); grid.addWidget(self.mean, row, 1)
grid.addWidget(QtWidgets.QLabel("NOISE STDEV"), row, 2); grid.addWidget(self.stdev, row, 3)
grid.addWidget(self.band, row, 4); row += 1
grid.addWidget(QtWidgets.QLabel("ARB name"), row, 0); grid.addWidget(self.arb_name, row, 1, 1, 3); row += 1
grid.addWidget(self.btn_apply, row, 0)
grid.addWidget(self.btn_read, row, 1)
grid.addWidget(self.btn_on, row, 2)
grid.addWidget(self.btn_off, row, 3); row += 1
grid.addWidget(preset_lbl, row, 0)
grid.addWidget(self.preset_num, row, 1)
grid.addWidget(self.btn_store, row, 2)
grid.addWidget(self.btn_recall, row, 3)
grid.setColumnStretch(7, 1)
self.setLayout(grid)
# Signals
self.btn_apply.clicked.connect(self._emit_apply)
self.btn_read.clicked.connect(lambda: self.readback_basic.emit())
self.btn_on.clicked.connect(lambda: self.toggle_output.emit(True))
self.btn_off.clicked.connect(lambda: self.toggle_output.emit(False))
self.btn_store.clicked.connect(lambda: self.store_slot.emit(self.preset_num.value()))
self.btn_recall.clicked.connect(lambda: self.recall_slot.emit(self.preset_num.value()))
self.wave.currentTextChanged.connect(self._update_context)
self.amp_mode.currentTextChanged.connect(self._update_amp_mode)
self._update_context()
self._update_amp_mode()
def _update_amp_mode(self):
amp_mode = self.amp_mode.currentText()
show_levels = (amp_mode != "AMP")
self.amp.setEnabled(not show_levels)
self.hlev.setEnabled(show_levels)
self.llev.setEnabled(show_levels)
def _update_context(self):
w = self.wave.currentText()
# enable all by default
widgets = [
self.duty, self.sym, self.width, self.rise, self.fall, self.dly,
self.mean, self.stdev, self.band, self.arb_name,
self.ph, self.off, self.amp, self.hlev, self.llev, self.fp_val
]
for wid in widgets:
wid.setEnabled(True)
if w == "SINE":
self._set_extras_enabled(duty=False, sym=False, pulse=False, noise=False, arb=False)
self.ph.setEnabled(True); self.off.setEnabled(True)
elif w == "SQUARE":
self._set_extras_enabled(duty=True, sym=False, pulse=False, noise=False, arb=False)
self.ph.setEnabled(True)
elif w == "RAMP":
self._set_extras_enabled(duty=False, sym=True, pulse=False, noise=False, arb=False)
self.ph.setEnabled(True)
elif w == "PULSE":
self._set_extras_enabled(duty=True, sym=False, pulse=True, noise=False, arb=False)
self.ph.setEnabled(False)
elif w == "NOISE":
self._set_extras_enabled(duty=False, sym=False, pulse=False, noise=True, arb=False)
self.fp_val.setEnabled(False)
self.amp.setEnabled(False); self.hlev.setEnabled(False); self.llev.setEnabled(False)
self.off.setEnabled(False); self.ph.setEnabled(False)
elif w == "ARB":
self._set_extras_enabled(duty=False, sym=False, pulse=False, noise=False, arb=True)
self.ph.setEnabled(True)
elif w == "DC":
self._set_extras_enabled(duty=False, sym=False, pulse=False, noise=False, arb=False)
self.fp_val.setEnabled(False)
self.amp.setEnabled(False); self.hlev.setEnabled(False); self.llev.setEnabled(False)
self.ph.setEnabled(False)
def _set_extras_enabled(self, *, duty: bool, sym: bool, pulse: bool, noise: bool, arb: bool):
self.duty.setEnabled(duty)
self.sym.setEnabled(sym)
for w in (self.width, self.rise, self.fall, self.dly):
w.setEnabled(pulse)
self.mean.setEnabled(noise)
self.stdev.setEnabled(noise)
self.band.setEnabled(noise)
self.arb_name.setEnabled(arb)
def _emit_apply(self):
w = self.wave.currentText()
cfg = {"WVTP": w}
if w not in ("NOISE", "DC"):
key = self.fp_mode.currentText()
cfg[key] = human_to_eng(self.fp_val.text())
if w not in ("NOISE", "DC"):
if self.amp_mode.currentText() == "AMP":
cfg["AMP"] = human_to_eng(self.amp.text())
else:
cfg["HLEV"] = human_to_eng(self.hlev.text())
cfg["LLEV"] = human_to_eng(self.llev.text())
if w != "NOISE":
cfg["OFST"] = human_to_eng(self.off.text())
if w in ("SINE", "SQUARE", "RAMP", "ARB"):
cfg["PHSE"] = human_to_eng(self.ph.text())
if w in ("SQUARE", "PULSE"):
cfg["DUTY"] = human_to_eng(self.duty.text())
if w == "RAMP":
cfg["SYM"] = human_to_eng(self.sym.text())
if w == "PULSE":
cfg["WIDTH"] = human_to_eng(self.width.text())
cfg["RISE"] = human_to_eng(self.rise.text())
cfg["FALL"] = human_to_eng(self.fall.text())
cfg["DLY"] = human_to_eng(self.dly.text())
if w == "NOISE":
cfg["MEAN"] = human_to_eng(self.mean.text())
cfg["STDEV"] = human_to_eng(self.stdev.text())
cfg["BANDSTATE"] = "ON" if self.band.isChecked() else "OFF"
if w == "ARB" and self.arb_name.text().strip():
cfg["ARWV_NAME"] = self.arb_name.text().strip()
self.apply_basic.emit(cfg)
class PresetsTab(QtWidgets.QWidget):
rename_slot = QtCore.pyqtSignal(int, str)
view_refresh = QtCore.pyqtSignal()
def __init__(self):
super().__init__()
self._build()
def _build(self):
self.table = QtWidgets.QTableWidget(10, 3)
self.table.setHorizontalHeaderLabels(["Slot", "Name", "Summary"])
self.table.horizontalHeader().setStretchLastSection(True)
self.table.verticalHeader().setVisible(False)
for i in range(10):
self.table.setItem(i, 0, QtWidgets.QTableWidgetItem(str(i+1)))
self.table.item(i, 0).setFlags(QtCore.Qt.ItemIsEnabled)
self.table.setItem(i, 1, QtWidgets.QTableWidgetItem(""))
self.table.setItem(i, 2, QtWidgets.QTableWidgetItem(""))
self.table.item(i, 2).setFlags(QtCore.Qt.ItemIsEnabled)
self.btn_load = QtWidgets.QPushButton("Reload File")
self.btn_save = QtWidgets.QPushButton("Save Names")
self.btn_open = QtWidgets.QPushButton("Open File...")
self.path_label = QtWidgets.QLabel(PRESET_BASENAME)
layout = QtWidgets.QVBoxLayout()
hl = QtWidgets.QHBoxLayout()
hl.addWidget(self.btn_load); hl.addWidget(self.btn_save); hl.addWidget(self.btn_open); hl.addStretch(1); hl.addWidget(self.path_label)
layout.addLayout(hl)
layout.addWidget(self.table)
self.setLayout(layout)
self.btn_load.clicked.connect(lambda: self.view_refresh.emit())
self.btn_save.clicked.connect(self._save_names)
self.btn_open.clicked.connect(self._open_file)
def _open_file(self):
path, _ = QtWidgets.QFileDialog.getOpenFileName(self, "Open Preset File", PRESET_BASENAME, "Text (*.txt *.dat);;All Files (*)")
if not path:
return
self.path_label.setText(path)
self.view_refresh.emit()
def _save_names(self):
for i in range(10):
name = self.table.item(i, 1).text().strip()
self.rename_slot.emit(i+1, name)
def update_from_presets(self, d: dict):
for i in range(1, 11):
name = d.get(f"SLOT{i}_NAME", "")
b = d.get(f"SLOT{i}_BSWV", "")
outp = d.get(f"SLOT{i}_OUTP", "")
summ = ""
if b:
up = b.upper()
fields = []
for key in ("WVTP,", "FRQ,", "AMP,", "OFST,"):
idx = up.find(key)
if idx >= 0:
val = b[idx+len(key):]
val = val.split(",")[0]
fields.append(f"{key[:-1]}={val}")
summ = " ".join(fields)
if outp:
summ += (" " if summ else "") + f"OUTP={outp}"
self.table.item(i-1, 1).setText(name)
self.table.item(i-1, 2).setText(summ)
class ConfigTab(QtWidgets.QWidget):
changed = QtCore.pyqtSignal(dict) # {"PRESET_DIR":..., "SCREENSHOT_DIR":...}
def __init__(self, initial: dict):
super().__init__()
self._build(initial)
def _build(self, cfg: dict):
self.preset_dir = QtWidgets.QLineEdit(cfg.get("PRESET_DIR", ""))
self.ss_dir = QtWidgets.QLineEdit(cfg.get("SCREENSHOT_DIR", ""))
btn_browse_p = QtWidgets.QPushButton("")
btn_browse_s = QtWidgets.QPushButton("")
self.btn_save = QtWidgets.QPushButton("Save Config")
self.btn_reload = QtWidgets.QPushButton("Reload Config")
g = QtWidgets.QGridLayout(); row = 0
g.addWidget(QtWidgets.QLabel("Preset dir"), row, 0); g.addWidget(self.preset_dir, row, 1); g.addWidget(btn_browse_p, row, 2); row += 1
g.addWidget(QtWidgets.QLabel("Screenshot dir"), row, 0); g.addWidget(self.ss_dir, row, 1); g.addWidget(btn_browse_s, row, 2); row += 1
g.addWidget(self.btn_save, row, 0); g.addWidget(self.btn_reload, row, 1); g.setColumnStretch(1, 1)
self.setLayout(g)
btn_browse_p.clicked.connect(self._pick_preset_dir)
btn_browse_s.clicked.connect(self._pick_ss_dir)
self.btn_save.clicked.connect(self._save)
self.btn_reload.clicked.connect(self._reload)
def _pick_preset_dir(self):
d = QtWidgets.QFileDialog.getExistingDirectory(self, "Select Preset Directory", self.preset_dir.text().strip() or os.getcwd())
if d:
self.preset_dir.setText(d)
def _pick_ss_dir(self):
d = QtWidgets.QFileDialog.getExistingDirectory(self, "Select Screenshot Directory", self.ss_dir.text().strip() or os.getcwd())
if d:
self.ss_dir.setText(d)
def _save(self):
d = {"PRESET_DIR": self.preset_dir.text().strip(), "SCREENSHOT_DIR": self.ss_dir.text().strip()}
self.changed.emit(d)
def _reload(self):
d = load_config(CONFIG_FILE)
self.preset_dir.setText(d.get("PRESET_DIR", ""))
self.ss_dir.setText(d.get("SCREENSHOT_DIR", ""))
class CLITab(QtWidgets.QWidget):
send_cmd = QtCore.pyqtSignal(str)
query_cmd = QtCore.pyqtSignal(str)
def __init__(self):
super().__init__()
self._build()
def _build(self):
self.inp = QtWidgets.QLineEdit()
self.inp.setPlaceholderText("Enter SCPI, e.g. C1:BSWV? or *IDN?")
btn_send = QtWidgets.QPushButton("Send")
btn_query = QtWidgets.QPushButton("Query")
self.out = QtWidgets.QPlainTextEdit(); self.out.setReadOnly(True); self.out.setMinimumHeight(200)
grid = QtWidgets.QGridLayout(); row = 0
grid.addWidget(QtWidgets.QLabel("Command"), row, 0)
grid.addWidget(self.inp, row, 1, 1, 4)
grid.addWidget(btn_send, row, 5)
grid.addWidget(btn_query, row, 6); row += 1
grid.addWidget(self.out, row, 0, 1, 7)
self.setLayout(grid)
btn_send.clicked.connect(self._do_send)
btn_query.clicked.connect(self._do_query)
self.inp.returnPressed.connect(self._do_query)
def append(self, s: str):
self.out.appendPlainText(s)
def _do_send(self):
cmd = self.inp.text().strip()
if cmd:
self.send_cmd.emit(cmd)
def _do_query(self):
cmd = self.inp.text().strip()
if cmd:
self.query_cmd.emit(cmd)
class BurstTab(QtWidgets.QWidget):
apply_burst = QtCore.pyqtSignal(dict)
readback_burst = QtCore.pyqtSignal()
def __init__(self):
super().__init__()
self._build()
def _build(self):
state_lbl = QtWidgets.QLabel("State")
self.state = QtWidgets.QComboBox(); self.state.addItems(["OFF", "ON"])
mode_lbl = QtWidgets.QLabel("Mode")
self.mode = QtWidgets.QComboBox(); self.mode.addItems(["TRIG", "GAT"])
src_lbl = QtWidgets.QLabel("Source")
self.src = QtWidgets.QComboBox(); self.src.addItems(["INT", "EXT"])
ncy_lbl = QtWidgets.QLabel("Cycles")
self.ncy = QtWidgets.QLineEdit("5")
dly_lbl = QtWidgets.QLabel("Delay s")
self.dly = QtWidgets.QLineEdit("0")
gpol_lbl = QtWidgets.QLabel("Gate Pol")
self.gpol = QtWidgets.QComboBox(); self.gpol.addItems(["POS", "NEG"])
self.btn_apply = QtWidgets.QPushButton("Apply Burst")
self.btn_read = QtWidgets.QPushButton("Readback Burst")
grid = QtWidgets.QGridLayout(); row = 0
grid.addWidget(state_lbl, row, 0); grid.addWidget(self.state, row, 1)
grid.addWidget(mode_lbl, row, 2); grid.addWidget(self.mode, row, 3); row += 1
grid.addWidget(src_lbl, row, 0); grid.addWidget(self.src, row, 1)
grid.addWidget(ncy_lbl, row, 2); grid.addWidget(self.ncy, row, 3); row += 1
grid.addWidget(dly_lbl, row, 0); grid.addWidget(self.dly, row, 1)
grid.addWidget(gpol_lbl, row, 2); grid.addWidget(self.gpol, row, 3); row += 1
grid.addWidget(self.btn_apply, row, 0)
grid.addWidget(self.btn_read, row, 1)
self.setLayout(grid)
self.btn_apply.clicked.connect(self._emit_apply)
self.btn_read.clicked.connect(lambda: self.readback_burst.emit())
def _emit_apply(self):
try:
ncy_int = int(float(self.ncy.text()))
except Exception:
ncy_int = 1
cfg = dict(
STATE = self.state.currentText(),
MODE = self.mode.currentText(),
TRSR = self.src.currentText(),
NCYC = str(ncy_int),
DLAY = human_to_eng(self.dly.text()),
GATEPOL = self.gpol.currentText(),
)
self.apply_burst.emit(cfg)
class SweepTab(QtWidgets.QWidget):
apply_sweep = QtCore.pyqtSignal(dict)
readback_sweep = QtCore.pyqtSignal()
def __init__(self):
super().__init__()
self._build()
def _build(self):
state_lbl = QtWidgets.QLabel("State")
self.state = QtWidgets.QComboBox(); self.state.addItems(["OFF", "ON"])
typ_lbl = QtWidgets.QLabel("Type")
self.typ = QtWidgets.QComboBox(); self.typ.addItems(["LIN", "LOG"])
fstart_lbl = QtWidgets.QLabel("Start Hz")
self.fstart = QtWidgets.QLineEdit("1 kHz")
fstop_lbl = QtWidgets.QLabel("Stop Hz")
self.fstop = QtWidgets.QLineEdit("10 kHz")
time_lbl = QtWidgets.QLabel("Time s")
self.time = QtWidgets.QLineEdit("1.0")
dir_lbl = QtWidgets.QLabel("Direction")
self.direction = QtWidgets.QComboBox(); self.direction.addItems(["UP", "DOWN"])
src_lbl = QtWidgets.QLabel("Source")
self.src = QtWidgets.QComboBox(); self.src.addItems(["INT", "EXT"])
self.btn_apply = QtWidgets.QPushButton("Apply Sweep")
self.btn_read = QtWidgets.QPushButton("Readback Sweep")
grid = QtWidgets.QGridLayout(); row = 0
grid.addWidget(state_lbl, row, 0); grid.addWidget(self.state, row, 1)
grid.addWidget(typ_lbl, row, 2); grid.addWidget(self.typ, row, 3); row += 1
grid.addWidget(fstart_lbl, row, 0); grid.addWidget(self.fstart, row, 1)
grid.addWidget(fstop_lbl, row, 2); grid.addWidget(self.fstop, row, 3); row += 1
grid.addWidget(time_lbl, row, 0); grid.addWidget(self.time, row, 1)
grid.addWidget(dir_lbl, row, 2); grid.addWidget(self.direction, row, 3); row += 1
grid.addWidget(src_lbl, row, 0); grid.addWidget(self.src, row, 1); row += 1
grid.addWidget(self.btn_apply, row, 0)
grid.addWidget(self.btn_read, row, 1)
self.setLayout(grid)
self.btn_apply.clicked.connect(self._emit_apply)
self.btn_read.clicked.connect(lambda: self.readback_sweep.emit())
def _emit_apply(self):
cfg = dict(
STATE = self.state.currentText(),
WAV = self.typ.currentText(),
STAR = human_to_eng(self.fstart.text()),
STOP = human_to_eng(self.fstop.text()),
TIME = human_to_eng(self.time.text()),
DIR = self.direction.currentText(),
TRSR = self.src.currentText(),
)
self.apply_sweep.emit(cfg)
class ARBTab(QtWidgets.QWidget):
refresh_lists = QtCore.pyqtSignal()
set_wave = QtCore.pyqtSignal(object)
upload_wave = QtCore.pyqtSignal(str, str)
download_wave = QtCore.pyqtSignal(str, str)
def __init__(self):
super().__init__()
self._build()
def _build(self):
self.built_in = QtWidgets.QListWidget()
self.user = QtWidgets.QListWidget()
refresh = QtWidgets.QPushButton("Refresh")
set_btn = QtWidgets.QPushButton("Set Selected to Channel")
upload_btn = QtWidgets.QPushButton("Upload .bin/.csv → USER")
download_btn = QtWidgets.QPushButton("Download USER → file")
self.name_edit = QtWidgets.QLineEdit(); self.name_edit.setPlaceholderText("User name (optional)")
grid = QtWidgets.QGridLayout()
grid.addWidget(QtWidgets.QLabel("BUILDIN"), 0, 0)
grid.addWidget(QtWidgets.QLabel("USER"), 0, 1)
grid.addWidget(self.built_in, 1, 0)
grid.addWidget(self.user, 1, 1)
grid.addWidget(refresh, 2, 0)
grid.addWidget(set_btn, 2, 1)
grid.addWidget(self.name_edit, 3, 0)
grid.addWidget(upload_btn, 3, 1)
grid.addWidget(download_btn, 4, 1)
self.setLayout(grid)
refresh.clicked.connect(lambda: self.refresh_lists.emit())
set_btn.clicked.connect(self._emit_set)
upload_btn.clicked.connect(self._emit_upload)
download_btn.clicked.connect(self._emit_download)
self.built_in.itemSelectionChanged.connect(self._on_builtin_selected)
self.user.itemSelectionChanged.connect(self._on_user_selected)
def _on_builtin_selected(self):
if self.built_in.selectedItems():
self.user.blockSignals(True)
self.user.clearSelection()
self.user.setCurrentItem(None)
self.user.blockSignals(False)
def _on_user_selected(self):
if self.user.selectedItems():
self.built_in.blockSignals(True)
self.built_in.clearSelection()
self.built_in.setCurrentItem(None)
self.built_in.blockSignals(False)
def selected_entry(self):
w = self.user.currentItem() or self.built_in.currentItem()
if not w:
return None
data = w.data(QtCore.Qt.UserRole)
if isinstance(data, dict):
return data
name = w.text().strip()
return {"source": "USER", "name": name} if name else None
def _emit_set(self):
entry = self.selected_entry()
if entry:
self.set_wave.emit(entry)
def _emit_upload(self):
path, _ = QtWidgets.QFileDialog.getOpenFileName(self, "Select ARB file", "", "ARB Files (*.bin *.csv);;All Files (*)")
if not path:
return
name = self.name_edit.text().strip()
if not name:
name = os.path.splitext(os.path.basename(path))[0]
self.upload_wave.emit(name, path)
def _emit_download(self):
item = self.user.currentItem()
if not item:
return
entry = item.data(QtCore.Qt.UserRole) or {"source": "USER", "name": item.text().strip()}
name = entry.get("name", "wave")
path, _ = QtWidgets.QFileDialog.getSaveFileName(self, "Save ARB file", f"{name}.bin", "Binary (*.bin);;All Files (*)")
if not path:
return
self.download_wave.emit(name, path)
# -------------------- Main Window --------------------
class Main(QtWidgets.QWidget):
def __init__(self, preset_ip=None):
super().__init__()
self.setWindowTitle(Window_Name)
self.i = SDGLan()
self._scr_worker = None
self._arb_dl_worker = None # BUG-2: track ARB download worker
self.cfg = load_config(CONFIG_FILE)
ensure_dir(self.cfg["PRESET_DIR"])
ensure_dir(self.cfg["SCREENSHOT_DIR"])
self.preset_path = os.path.join(self.cfg["PRESET_DIR"], PRESET_BASENAME)
self._build(preset_ip)
def _build(self, preset_ip):
ip_lbl = QtWidgets.QLabel("IP")
self.ip = QtWidgets.QLineEdit(preset_ip or "")
if not preset_ip:
self.ip.setPlaceholderText("e.g. 192.168.1.120")
port_lbl = QtWidgets.QLabel("Port")
self.port = QtWidgets.QLineEdit(str(DEFAULT_PORT))
self.btn_connect = QtWidgets.QPushButton("Connect")
self.btn_idn = QtWidgets.QPushButton("IDN?")
self.btn_err = QtWidgets.QPushButton("SYST:ERR?")
self.btn_scr = QtWidgets.QPushButton("Screenshot")
ch_lbl = QtWidgets.QLabel("Channel")
self.ch = QtWidgets.QComboBox(); self.ch.addItems(["C1", "C2"])
self.tabs = QtWidgets.QTabWidget()
self.t_basic = BasicTab(); self.tabs.addTab(self.t_basic, "Basic")
self.t_burst = BurstTab(); self.tabs.addTab(self.t_burst, "Burst")
self.t_sweep = SweepTab(); self.tabs.addTab(self.t_sweep, "Sweep")
self.t_arb = ARBTab(); self.tabs.addTab(self.t_arb, "ARB Manager")
self.t_cli = CLITab(); self.tabs.addTab(self.t_cli, "SCPI CLI")
self.t_presets = PresetsTab(); self.tabs.addTab(self.t_presets, "Presets")
self.t_config = ConfigTab(self.cfg); self.tabs.addTab(self.t_config, "Config")
self.log = QtWidgets.QPlainTextEdit(); self.log.setReadOnly(True); self.log.setMinimumHeight(200)
top = QtWidgets.QGridLayout(); row = 0
top.addWidget(ip_lbl, row, 0); top.addWidget(self.ip, row, 1, 1, 3)
top.addWidget(port_lbl, row, 4); top.addWidget(self.port, row, 5)
top.addWidget(self.btn_connect, row, 6); row += 1
top.addWidget(self.btn_idn, row, 0); top.addWidget(self.btn_err, row, 1); top.addWidget(self.btn_scr, row, 2)
top.addWidget(ch_lbl, row, 3); top.addWidget(self.ch, row, 4); row += 1
top.addWidget(self.tabs, row, 0, 1, 7); row += 1
top.addWidget(self.log, row, 0, 1, 7)
self.setLayout(top)
# wiring
self.btn_connect.clicked.connect(self.on_connect)
self.btn_idn.clicked.connect(self.on_idn)
self.btn_err.clicked.connect(self.on_err)
self.btn_scr.clicked.connect(self.on_screenshot)
self.t_basic.apply_basic.connect(self.on_apply_basic)
self.t_basic.readback_basic.connect(self.on_readback_basic)
self.t_basic.toggle_output.connect(self.on_output)
self.t_basic.store_slot.connect(self.on_store_slot)
self.t_basic.recall_slot.connect(self.on_recall_slot)
self.t_burst.apply_burst.connect(self.on_apply_burst)
self.t_burst.readback_burst.connect(self.on_readback_burst)
self.t_sweep.apply_sweep.connect(self.on_apply_sweep)
self.t_sweep.readback_sweep.connect(self.on_readback_sweep)
self.t_arb.refresh_lists.connect(self.on_arb_refresh)
self.t_arb.set_wave.connect(self.on_arb_set)
self.t_arb.upload_wave.connect(self.on_arb_upload)
self.t_arb.download_wave.connect(self.on_arb_download)
self.t_cli.send_cmd.connect(self.on_cli_send)
self.t_cli.query_cmd.connect(self.on_cli_query)
self.t_config.changed.connect(self.on_config_changed)
ensure_preset_file(self.preset_path)
self.refresh_presets_view()
# --------- helpers ---------
def logln(self, s: str):
self.log.appendPlainText(s)
def _pref(self) -> str:
return self.ch.currentText() + ":"
# ---------- Top actions ----------
def on_connect(self):
try:
addr = self.ip.text().strip(); port = int(self.port.text().strip())
self.i.connect(addr, port)
self.logln(f"Connected to {addr}:{port}")
except Exception as e:
self.logln(f"[ERR] connect: {e}")
def on_idn(self):
try:
self.logln(f"*IDN? -> {self.i.query_retry('*IDN?')}")
except Exception as e:
self.logln(f"[ERR] IDN: {e}")
def on_err(self):
try:
self.logln(f"SYST:ERR? -> {self.i.query_retry('SYST:ERR?')}")
except Exception as e:
self.logln(f"[ERR] SYST:ERR?: {e}")
# ---------- Screenshot ----------
def on_screenshot(self):
if not self.i.sock:
self.logln("[ERR] screenshot: not connected")
return
if hasattr(self, "_scr_worker") and self._scr_worker and self._scr_worker.isRunning():
self.logln("[ERR] screenshot already running")
return
self.btn_scr.setEnabled(False); self.setCursor(QtCore.Qt.WaitCursor)
outdir = self.cfg["SCREENSHOT_DIR"] # BUG-6 fix: use in-memory cfg, not re-read from disk
self._scr_worker = ScreenshotWorker(self.i, outdir, self)
self._scr_worker.done.connect(self._on_screenshot_done)
self._scr_worker.start()
def _on_screenshot_done(self, data: bytes, full_path: str, err: str):
self.btn_scr.setEnabled(True); self.unsetCursor()
if err:
self.logln(f"[ERR] screenshot: {err}")
return
try:
with open(full_path, "wb") as f:
f.write(data)
self.logln(f"Screenshot saved to {full_path} ({len(data)} bytes)")
img = QtGui.QImage(full_path)
if img.isNull():
self.logln("[ERR] screenshot: Qt decode failed")
return
pixmap = QtGui.QPixmap.fromImage(img)
lbl = QtWidgets.QLabel(); lbl.setPixmap(pixmap)
w = QtWidgets.QWidget(); l = QtWidgets.QVBoxLayout(w); l.addWidget(lbl)
w.setWindowTitle("SDG2042X Screenshot"); w.resize(pixmap.width(), pixmap.height()); w.show()
self._scr_win = w
except Exception as e:
self.logln(f"[ERR] screenshot save/display: {e}")
# ---------- Basic apply/readback ----------
def on_apply_basic(self, cfg: dict):
try:
parts = [f"WVTP,{cfg['WVTP']}"]
if 'FRQ' in cfg: parts.append(f"FRQ,{cfg['FRQ']}")
if 'PERI' in cfg: parts.append(f"PERI,{cfg['PERI']}")
if 'AMP' in cfg:
parts.append(f"AMP,{cfg['AMP']}")
else:
if 'HLEV' in cfg: parts.append(f"HLEV,{cfg['HLEV']}")
if 'LLEV' in cfg: parts.append(f"LLEV,{cfg['LLEV']}")
if 'OFST' in cfg: parts.append(f"OFST,{cfg['OFST']}")
if 'PHSE' in cfg: parts.append(f"PHSE,{cfg['PHSE']}")
for key in ("DUTY","SYM","WIDTH","RISE","FALL","DLY","MEAN","STDEV","BANDSTATE"):
if key in cfg:
parts.append(f"{key},{cfg[key]}")
cmd = f"{self._pref()}BSWV " + ",".join(parts)
self.i.write(cmd); self.logln(f"SET: {cmd}")
if cfg.get("WVTP") == "ARB" and cfg.get("ARWV_NAME"):
qname = quote_name(cfg["ARWV_NAME"])
self.i.write(f"{self._pref()}ARWV NAME,{qname}")
self.logln(f"SET ARWV NAME,{qname}")
except Exception as e:
self.logln(f"[ERR] basic apply: {e}")
def on_readback_basic(self):
try:
rb = self.i.query_retry(f"{self._pref()}BSWV?")
st = self.i.query_retry(f"{self._pref()}OUTP?")
self.logln(f"{self._pref()}BSWV? -> {rb}")
self.logln(f"{self._pref()}OUTP? -> {st}")
except Exception as e:
self.logln(f"[ERR] basic readback: {e}")
def on_output(self, turn_on: bool):
try:
cmd = f"{self._pref()}OUTP {'ON' if turn_on else 'OFF'}"
self.i.write(cmd)
st = self.i.query_retry(f"{self._pref()}OUTP?")
self.logln(f"SET: {cmd}")
self.logln(f"{self._pref()}OUTP? -> {st}")
except Exception as e:
self.logln(f"[ERR] output: {e}")
# ---------- Burst ----------
def on_apply_burst(self, cfg: dict):
try:
parts = [f"{k},{v}" for k, v in cfg.items()]
cmd = f"{self._pref()}BTWV " + ",".join(parts)
self.i.write(cmd); self.logln(f"SET: {cmd}")
except Exception as e:
self.logln(f"[ERR] burst apply: {e}")
def on_readback_burst(self):
try:
rb = self.i.query_retry(f"{self._pref()}BTWV?"); self.logln(f"{self._pref()}BTWV? -> {rb}")
except Exception as e:
self.logln(f"[ERR] burst readback: {e}")
# ---------- Sweep ----------
def on_apply_sweep(self, cfg: dict):
try:
parts = [f"{k},{v}" for k, v in cfg.items()]
cmd = f"{self._pref()}SWWV " + ",".join(parts)
self.i.write(cmd)
err = self.i.query_retry("SYST:ERR?")
if not err.startswith("0"):
parts2 = [p.replace("WAV,", "SPAC,") for p in parts]
cmd2 = f"{self._pref()}SWWV " + ",".join(parts2)
self.i.write(cmd2); self.logln(f"SET (fallback): {cmd2}")
else:
self.logln(f"SET: {cmd}")
except Exception as e:
self.logln(f"[ERR] sweep apply: {e}")
def on_readback_sweep(self):
try:
rb = self.i.query_retry(f"{self._pref()}SWWV?"); self.logln(f"{self._pref()}SWWV? -> {rb}")
except Exception as e:
self.logln(f"[ERR] sweep readback: {e}")
# ---------- ARB ops ----------
def on_arb_refresh(self):
try:
b = self.i.query_retry("STL? BUILDIN")
u = self.i.query_retry("STL? USER")
b_list = parse_stl_builtin(b)
u_list = parse_stl_user(u)
self.t_arb.built_in.clear()
self.t_arb.user.clear()
for entry in b_list:
item = QtWidgets.QListWidgetItem(f"M{entry['index']}: {entry['name']}")
item.setData(QtCore.Qt.UserRole, entry)
self.t_arb.built_in.addItem(item)
for entry in u_list:
item = QtWidgets.QListWidgetItem(entry['name'])
item.setData(QtCore.Qt.UserRole, entry)
self.t_arb.user.addItem(item)
self.logln(f"BUILDIN entries: {len(b_list)}")
self.logln(f"USER entries: {len(u_list)}")
except Exception as e:
self.logln(f"[ERR] ARB refresh: {e}")
def on_arb_set(self, entry):
try:
if not entry:
raise RuntimeError("no ARB entry selected")
self.i.write(f"{self._pref()}BSWV WVTP,ARB")
if entry.get("source") == "BUILDIN":
idx = entry.get("index")
if idx is None:
raise RuntimeError("built-in ARB entry missing index")
self.i.write(f"{self._pref()}ARWV INDEX,{idx}")
rb = self.i.query_retry(f"{self._pref()}ARWV?")
self.logln(f"SET ARB BUILDIN M{idx}:{entry.get('name','')} ; ARWV? -> {rb}")
else:
name = entry.get("name", "")
if not name:
raise RuntimeError("user ARB entry missing name")
qname = quote_name(name)
self.i.write(f"{self._pref()}ARWV NAME,{qname}")
rb = self.i.query_retry(f"{self._pref()}ARWV?")
self.logln(f"SET ARB USER {qname}; ARWV? -> {rb}")
except Exception as e:
self.logln(f"[ERR] ARB set: {e}")
def _make_scpi_block(self, payload: bytes) -> bytes:
n = len(payload)
len_digits = str(len(str(n))).encode('ascii')
header = b'#' + len_digits + str(n).encode('ascii')
return header + payload
def on_arb_upload(self, name: str, path: str):
try:
with open(path, 'rb') as f:
data = f.read()
block = self._make_scpi_block(data)
qname = quote_name(name)
prefix = f"WVDT USER,{qname},".encode('ascii')
self.i.write_bytes(prefix + block + b"\n")
self.logln(f"Uploaded USER,{qname} ({len(data)} bytes)")
try:
self.on_arb_refresh()
except Exception:
pass
except Exception as e:
self.logln(f"[ERR] ARB upload: {e}")
# BUG-2 FIX: ARB download moved off the GUI thread into ARBDownloadWorker.
# The old code called self.i.sock.recv() in a while loop here, freezing
# the UI for large waveforms. Now we spin up a QThread and handle the
# result via _on_arb_download_done() connected to the worker's done signal.
def on_arb_download(self, name: str, savepath: str):
if not self.i.sock:
self.logln("[ERR] ARB download: not connected")
return
if self._arb_dl_worker and self._arb_dl_worker.isRunning():
self.logln("[ERR] ARB download already in progress")
return
self.logln(f"ARB download started: USER,{name} ...")
self._arb_dl_worker = ARBDownloadWorker(self.i, name, savepath, self)
self._arb_dl_worker.done.connect(self._on_arb_download_done)
self._arb_dl_worker.start()
def _on_arb_download_done(self, payload: bytes, savepath: str, err: str):
if err:
self.logln(f"[ERR] ARB download: {err}")
return
try:
with open(savepath, 'wb') as f:
f.write(payload)
self.logln(f"Downloaded -> {savepath} ({len(payload)} bytes)")
except Exception as e:
self.logln(f"[ERR] ARB download save: {e}")
# ---------- CLI tab ----------
def on_cli_send(self, cmd: str):
try:
self.i.write(cmd)
self.t_cli.append(f"> {cmd}")
except Exception as e:
self.t_cli.append(f"[ERR] send: {e}")
def on_cli_query(self, cmd: str):
try:
resp = self.i.query_retry(cmd)
self.t_cli.append(f"? {cmd}\n< {resp}")
except Exception as e:
self.t_cli.append(f"[ERR] query: {e}")
# ---------- Presets ----------
def on_store_slot(self, slot: int):
try:
if slot < 1 or slot > 10:
raise ValueError("slot out of range")
bswv = self.i.query_retry(f"{self._pref()}BSWV?")
outp = self.i.query_retry(f"{self._pref()}OUTP?")
outp_state = "ON" if "ON" in outp.upper().split(',')[0] else "OFF"
d = read_presets(self.preset_path)
d[f"SLOT{slot}_BSWV"] = bswv
d[f"SLOT{slot}_OUTP"] = outp_state
write_presets(self.preset_path, d)
self.logln(f"Stored slot {slot}: BSWV len={len(bswv)}, OUTP={outp_state}")
self.refresh_presets_view()
except Exception as e:
self.logln(f"[ERR] store slot {slot}: {e}")
def on_recall_slot(self, slot: int):
try:
if slot < 1 or slot > 10:
raise ValueError("slot out of range")
d = read_presets(self.preset_path)
b = d.get(f"SLOT{slot}_BSWV", "").strip()
o = d.get(f"SLOT{slot}_OUTP", "").strip().upper()
if not b:
raise RuntimeError("empty BSWV")
self.i.write(b)
if o in ("ON", "OFF"):
self.i.write(f"{self._pref()}OUTP {o}")
self.logln(f"Recalled slot {slot}: applied BSWV and OUTP={o or 'N/A'}")
except Exception as e:
self.logln(f"[ERR] recall slot {slot}: {e}")
def on_rename_slot(self, slot: int, name: str):
try:
d = read_presets(self.preset_path)
d[f"SLOT{slot}_NAME"] = name
write_presets(self.preset_path, d)
self.refresh_presets_view()
except Exception as e:
self.logln(f"[ERR] rename slot {slot}: {e}")
def refresh_presets_view(self):
try:
ensure_preset_file(self.preset_path)
d = read_presets(self.preset_path)
self.t_presets.update_from_presets(d)
self.logln(f"Preset file: {self.preset_path}")
except Exception as e:
self.logln(f"[ERR] refresh presets: {e}")
# ---------- Config ----------
def on_config_changed(self, newd: dict):
try:
nd = {
"PRESET_DIR": _expand(newd.get("PRESET_DIR","")) or os.getcwd(),
"SCREENSHOT_DIR": _expand(newd.get("SCREENSHOT_DIR","")) or os.getcwd(),
}
ensure_dir(nd["PRESET_DIR"]); ensure_dir(nd["SCREENSHOT_DIR"])
save_config(CONFIG_FILE, nd)
self.cfg = nd
self.preset_path = os.path.join(self.cfg["PRESET_DIR"], PRESET_BASENAME)
ensure_preset_file(self.preset_path)
self.refresh_presets_view()
self.logln(f"Config saved -> {CONFIG_FILE}")
except Exception as e:
self.logln(f"[ERR] save config: {e}")
# -------------------- CLI entry --------------------
def show_help():
print("Usage: sdg2042x_gui.py [options]\n")
print("Options:")
print(" -ip <addr>, --ip <addr> Prefill IP address field")
print(" -h, --help Show this help message")
if __name__ == "__main__":
preset_ip = None
argv = sys.argv[1:]
for i, tok in enumerate(argv):
if tok in ("-h", "--help"):
show_help(); sys.exit(0)
if tok in ("-ip", "--ip") and i + 1 < len(argv):
preset_ip = argv[i + 1]
app = QtWidgets.QApplication(sys.argv)
m = Main(preset_ip=preset_ip)
m.resize(1250, 780)
m.show()
sys.exit(app.exec_())