Files
0003-SDG2042X-PyQt-GUI-for-…/script/SDG2042X_V0.1.py

1025 lines
38 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# Siglent SDG2042X Linux GUI (PyQt5)
# Features:
# - Basic, Burst, Sweep,
# - ARB Manager, SCPI CLI,
# - Storing Presets, storing to editable text file SDG2042x.dat
# - Screenshot
#
# Author: Thomas Gohle / tgohle@togo-lab.io
# www.togo-lab.io
# tgonet.de / gohle.de
#
# Copyright: CC BY-SA 4.0
# https://creativecommons.org/licenses/by-sa/4.0/
#
# Version: 0.1
#----------------------------------------------------
import os
import socket
import sys
import time
from PyQt5 import QtWidgets, QtCore, QtGui
# -------------------- Constants --------------------
Window_Name = "Linux GUI SDG2042X by Thomas Gohle - ToGo-Lab - Ver 0.1"
DEFAULT_PORT = 5025
SOCKET_TIMEOUT = 2.5
SIGLENT_WAVES = ["SINE", "SQUARE", "RAMP", "PULSE", "NOISE", "ARB", "DC"]
PRESET_FILE = "SDG2042x.dat"
NUM_SLOTS = 10
# -------------------- Transport --------------------
class SDGLan:
def __init__(self):
self.sock = None
self.addr = None
self.port = DEFAULT_PORT
def connect(self, addr, port=DEFAULT_PORT):
self.close()
self.addr = addr
self.port = port
self.sock = socket.create_connection((addr, port), timeout=SOCKET_TIMEOUT)
self.sock.settimeout(SOCKET_TIMEOUT)
def close(self):
try:
if self.sock:
self.sock.close()
finally:
self.sock = None
def write(self, cmd: str):
if not self.sock:
raise RuntimeError("Not connected")
if not cmd.endswith("\n"):
cmd += "\n"
self.sock.sendall(cmd.encode("ascii"))
def write_bytes(self, payload: bytes):
if not self.sock:
raise RuntimeError("Not connected")
self.sock.sendall(payload)
def query(self, cmd: str) -> str:
self.write(cmd)
data = self._recv_line()
return data.strip()
def query_raw(self, cmd: str) -> bytes:
self.write(cmd)
return self._recv_until_timeout()
# ---- block helpers ----
def _recv_exact(self, n: int) -> bytes:
buf = bytearray()
while len(buf) < n:
chunk = self.sock.recv(n - len(buf))
if not chunk:
break
buf.extend(chunk)
return bytes(buf)
def _recv_exact_capped(self, n: int, cap: int) -> bytes:
if n > cap:
raise RuntimeError(f"SCPI block length {n} exceeds cap {cap}")
return self._recv_exact(n)
def query_block(self, cmd: str, timeout_s: float = 8.0, cap: int = 25_000_000) -> bytes:
"""Read a SCPI definite-length block (#<d><len><payload>). Returns payload without header."""
if not self.sock:
raise RuntimeError("Not connected")
prev_to = self.sock.gettimeout()
try:
self.sock.settimeout(timeout_s)
self.write(cmd)
# seek '#'
first = self._recv_exact(1)
while first and first != b"#":
first = self._recv_exact(1)
if not first:
return b""
ndig_b = self._recv_exact(1)
if not ndig_b or not ndig_b.isdigit():
return b""
ndig = int(ndig_b.decode("ascii"))
len_str = self._recv_exact(ndig).decode("ascii")
if not len_str.isdigit():
return b""
length = int(len_str)
payload = self._recv_exact_capped(length, cap)
return payload
finally:
try:
self.sock.settimeout(prev_to)
except Exception:
pass
def query_scdp(self, timeout_s: float = 15.0, cap: int = 50_000_000) -> bytes:
"""Robust screenshot: handles BMP direct or SCPI block-wrapped payload."""
if not self.sock:
raise RuntimeError("Not connected")
prev_to = self.sock.gettimeout()
try:
self.sock.settimeout(timeout_s)
# Send SCDP
self.write("SCDP")
# Peek first two bytes
first2 = self._recv_exact(2)
if not first2:
return b""
# Case 1: BMP direct
if first2 == b"BM":
size_bytes = self._recv_exact(4)
if len(size_bytes) != 4:
return b""
total_size = int.from_bytes(size_bytes, "little")
# We already have 6 bytes
rest = self._recv_exact_capped(total_size - 6, cap)
return first2 + size_bytes + rest
# Case 2: SCPI block
if first2[:1] == b"#":
ndig = int(first2[1:2].decode())
len_str = self._recv_exact(ndig).decode("ascii")
if not len_str.isdigit():
return b""
length = int(len_str)
payload = self._recv_exact_capped(length, cap)
return payload
# Case 3: unknown header, read until timeout
tail = self._recv_until_timeout()
return first2 + tail
finally:
try:
self.sock.settimeout(prev_to)
except Exception:
pass
def _recv_line(self) -> str:
chunks = []
while True:
b = self.sock.recv(4096)
if not b:
break
chunks.append(b)
if b.endswith(b"\n"):
break
return b"".join(chunks).decode("ascii", errors="ignore")
def _recv_until_timeout(self) -> bytes:
data = bytearray()
while True:
try:
b = self.sock.recv(65536)
if not b:
break
data.extend(b)
except socket.timeout:
break
return bytes(data)
# -------------------- Utilities --------------------
def human_to_eng(s: str) -> str:
"""Convert '1k', '2.5M', '500mV', '10ms' etc. to float string (base units)."""
t = s.strip().lower().replace("vpp", "").replace("v", "")
t = t.replace("hz", "").replace("deg", "")
mult = 1.0
if t.endswith("ms"):
mult = 1e-3; t = t[:-2]
elif t.endswith("us"):
mult = 1e-6; t = t[:-2]
elif t.endswith("ns"):
mult = 1e-9; t = t[:-2]
elif t.endswith("s"):
mult = 1.0; t = t[:-1]
elif t.endswith("khz"):
mult = 1e3; t = t[:-3]
elif t.endswith("mhz"):
mult = 1e6; t = t[:-3]
elif t.endswith("ghz"):
mult = 1e9; t = t[:-3]
elif t.endswith("k"):
mult = 1e3; t = t[:-1]
elif t.endswith("m") and not t.endswith("mm"):
mult = 1e-3; t = t[:-1]
elif t.endswith("u"):
mult = 1e-6; t = t[:-1]
elif t.endswith("n"):
mult = 1e-9; t = t[:-1]
elif t.endswith("g"):
mult = 1e9; t = t[:-1]
try:
val = float(t)
except ValueError:
return s.strip()
return f"{val*mult}"
def quote_name(name: str) -> str:
"""Quote ARB names if they contain spaces or punctuation."""
safe = all(ch.isalnum() or ch in ("_", "-") for ch in name)
return name if safe else '"%s"' % name
def ensure_preset_file(path: str):
if not os.path.exists(path):
with open(path, "w", encoding="utf-8") as f:
f.write("# SDG2042X presets\n")
for i in range(1, 11):
f.write(f"SLOT{i}_NAME=\n")
f.write(f"SLOT{i}_BSWV=\n")
f.write(f"SLOT{i}_OUTP=\n")
return path
def read_presets(path: str) -> dict:
ensure_preset_file(path)
presets = {}
with open(path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
k, v = line.split("=", 1)
presets[k.strip()] = v.strip()
return presets
def write_presets(path: str, d: dict):
lines = ["# SDG2042X presets"]
for i in range(1, 11):
name = d.get(f"SLOT{i}_NAME", "")
bswv = d.get(f"SLOT{i}_BSWV", "")
outp = d.get(f"SLOT{i}_OUTP", "")
lines.append(f"SLOT{i}_NAME={name}")
lines.append(f"SLOT{i}_BSWV={bswv}")
lines.append(f"SLOT{i}_OUTP={outp}")
with open(path, "w", encoding="utf-8") as f:
f.write("\n".join(lines) + "\n")
# -------------------- Screenshot worker (threaded) --------------------
class ScreenshotWorker(QtCore.QThread):
done = QtCore.pyqtSignal(bytes, str, str) # data, filename, error ('' if ok)
def __init__(self, io: SDGLan, parent=None):
super().__init__(parent)
self.io = io
def run(self):
try:
# Clear previous errors that may reference old invalid commands
try:
self.io.write("*CLS")
except Exception:
pass
# Drain any residual bytes from prior operations
try:
_ = self.io._recv_until_timeout()
except Exception:
pass
data = self.io.query_scdp(timeout_s=20.0, cap=50_000_000)
if not data:
err = self.io.query("SYST:ERR?")
raise RuntimeError(f"no image data; last SYST:ERR? -> {err}")
ts = time.strftime("%Y%m%d-%H%M%S")
fn = f"{ts}_SDG2042X.bmp" if data.startswith(b"BM") else f"{ts}_SDG2042X.bin"
self.done.emit(data, fn, "")
except Exception as e:
self.done.emit(b"", "", str(e))
# -------------------- Tabs --------------------
class BasicTab(QtWidgets.QWidget):
apply_basic = QtCore.pyqtSignal(dict)
readback_basic = QtCore.pyqtSignal()
toggle_output = QtCore.pyqtSignal(bool)
store_slot = QtCore.pyqtSignal(int)
recall_slot = QtCore.pyqtSignal(int)
def __init__(self):
super().__init__()
self._build()
def _build(self):
wave_lbl = QtWidgets.QLabel("Waveform")
self.wave = QtWidgets.QComboBox(); self.wave.addItems(SIGLENT_WAVES)
frq_lbl = QtWidgets.QLabel("Freq")
self.frq = QtWidgets.QLineEdit("1 kHz")
amp_lbl = QtWidgets.QLabel("Amp Vpp")
self.amp = QtWidgets.QLineEdit("1.0")
off_lbl = QtWidgets.QLabel("Offset V")
self.off = QtWidgets.QLineEdit("0.0")
ph_lbl = QtWidgets.QLabel("Phase deg")
self.ph = QtWidgets.QLineEdit("0")
self.btn_apply = QtWidgets.QPushButton("Apply")
self.btn_read = QtWidgets.QPushButton("Readback")
self.btn_on = QtWidgets.QPushButton("Output ON")
self.btn_off = QtWidgets.QPushButton("Output OFF")
# Preset controls
preset_lbl = QtWidgets.QLabel("Preset #")
self.preset_num = QtWidgets.QSpinBox(); self.preset_num.setRange(1, 10); self.preset_num.setValue(1)
self.btn_store = QtWidgets.QPushButton("Store")
self.btn_recall = QtWidgets.QPushButton("Recall")
grid = QtWidgets.QGridLayout(); row = 0
grid.addWidget(wave_lbl, row, 0); grid.addWidget(self.wave, row, 1)
grid.addWidget(frq_lbl, row, 2); grid.addWidget(self.frq, row, 3); row += 1
grid.addWidget(amp_lbl, row, 0); grid.addWidget(self.amp, row, 1)
grid.addWidget(off_lbl, row, 2); grid.addWidget(self.off, row, 3); row += 1
grid.addWidget(ph_lbl, row, 0); grid.addWidget(self.ph, row, 1); row += 1
grid.addWidget(self.btn_apply, row, 0)
grid.addWidget(self.btn_read, row, 1)
grid.addWidget(self.btn_on, row, 2)
grid.addWidget(self.btn_off, row, 3); row += 1
grid.addWidget(preset_lbl, row, 0)
grid.addWidget(self.preset_num, row, 1)
grid.addWidget(self.btn_store, row, 2)
grid.addWidget(self.btn_recall, row, 3)
self.setLayout(grid)
self.btn_apply.clicked.connect(self._emit_apply)
self.btn_read.clicked.connect(lambda: self.readback_basic.emit())
self.btn_on.clicked.connect(lambda: self.toggle_output.emit(True))
self.btn_off.clicked.connect(lambda: self.toggle_output.emit(False))
self.btn_store.clicked.connect(lambda: self.store_slot.emit(self.preset_num.value()))
self.btn_recall.clicked.connect(lambda: self.recall_slot.emit(self.preset_num.value()))
def _emit_apply(self):
cfg = dict(
WVTP=self.wave.currentText(),
FRQ=human_to_eng(self.frq.text()),
AMP=human_to_eng(self.amp.text()),
OFST=human_to_eng(self.off.text()),
PHSE=human_to_eng(self.ph.text()),
)
self.apply_basic.emit(cfg)
class PresetsTab(QtWidgets.QWidget):
rename_slot = QtCore.pyqtSignal(int, str)
view_refresh = QtCore.pyqtSignal()
def __init__(self):
super().__init__()
self._build()
def _build(self):
self.table = QtWidgets.QTableWidget(10, 3)
self.table.setHorizontalHeaderLabels(["Slot", "Name", "Summary"])
self.table.horizontalHeader().setStretchLastSection(True)
self.table.verticalHeader().setVisible(False)
for i in range(10):
self.table.setItem(i, 0, QtWidgets.QTableWidgetItem(str(i+1)))
self.table.item(i, 0).setFlags(QtCore.Qt.ItemIsEnabled)
self.table.setItem(i, 1, QtWidgets.QTableWidgetItem(""))
self.table.setItem(i, 2, QtWidgets.QTableWidgetItem(""))
self.table.item(i, 2).setFlags(QtCore.Qt.ItemIsEnabled)
self.btn_load = QtWidgets.QPushButton("Reload File")
self.btn_save = QtWidgets.QPushButton("Save Names")
self.btn_open = QtWidgets.QPushButton("Open File...")
self.path_label = QtWidgets.QLabel(PRESET_FILE)
layout = QtWidgets.QVBoxLayout()
hl = QtWidgets.QHBoxLayout()
hl.addWidget(self.btn_load); hl.addWidget(self.btn_save); hl.addWidget(self.btn_open); hl.addStretch(1); hl.addWidget(self.path_label)
layout.addLayout(hl)
layout.addWidget(self.table)
self.setLayout(layout)
self.btn_load.clicked.connect(lambda: self.view_refresh.emit())
self.btn_save.clicked.connect(self._save_names)
self.btn_open.clicked.connect(self._open_file)
def _open_file(self):
path, _ = QtWidgets.QFileDialog.getOpenFileName(self, "Open Preset File", PRESET_FILE, "Text (*.txt *.dat);;All Files (*)")
if not path:
return
self.path_label.setText(path)
self.view_refresh.emit()
def _save_names(self):
for i in range(10):
name = self.table.item(i, 1).text().strip()
self.rename_slot.emit(i+1, name)
def update_from_presets(self, d: dict):
for i in range(1, 11):
name = d.get(f"SLOT{i}_NAME", "")
b = d.get(f"SLOT{i}_BSWV", "")
outp = d.get(f"SLOT{i}_OUTP", "")
summ = ""
if b:
up = b.upper()
fields = []
for key in ("WVTP,", "FRQ,", "AMP,", "OFST,"):
idx = up.find(key)
if idx >= 0:
val = b[idx+len(key):]
val = val.split(",")[0]
fields.append(f"{key[:-1]}={val}")
summ = " ".join(fields)
if outp:
summ += (" " if summ else "") + f"OUTP={outp}"
self.table.item(i-1, 1).setText(name)
self.table.item(i-1, 2).setText(summ)
# -------------------- Other tabs (Burst, Sweep, ARB, CLI) --------------------
class BurstTab(QtWidgets.QWidget):
apply_burst = QtCore.pyqtSignal(dict)
readback_burst = QtCore.pyqtSignal()
def __init__(self):
super().__init__()
self._build()
def _build(self):
state_lbl = QtWidgets.QLabel("State")
self.state = QtWidgets.QComboBox(); self.state.addItems(["OFF", "ON"])
mode_lbl = QtWidgets.QLabel("Mode")
self.mode = QtWidgets.QComboBox(); self.mode.addItems(["TRIG", "GAT"])
src_lbl = QtWidgets.QLabel("Source")
self.src = QtWidgets.QComboBox(); self.src.addItems(["INT", "EXT"])
ncy_lbl = QtWidgets.QLabel("Cycles")
self.ncy = QtWidgets.QLineEdit("5")
dly_lbl = QtWidgets.QLabel("Delay s")
self.dly = QtWidgets.QLineEdit("0")
gpol_lbl = QtWidgets.QLabel("Gate Pol")
self.gpol = QtWidgets.QComboBox(); self.gpol.addItems(["POS", "NEG"])
self.btn_apply = QtWidgets.QPushButton("Apply Burst")
self.btn_read = QtWidgets.QPushButton("Readback Burst")
grid = QtWidgets.QGridLayout(); row = 0
grid.addWidget(state_lbl, row, 0); grid.addWidget(self.state, row, 1)
grid.addWidget(mode_lbl, row, 2); grid.addWidget(self.mode, row, 3); row += 1
grid.addWidget(src_lbl, row, 0); grid.addWidget(self.src, row, 1)
grid.addWidget(ncy_lbl, row, 2); grid.addWidget(self.ncy, row, 3); row += 1
grid.addWidget(dly_lbl, row, 0); grid.addWidget(self.dly, row, 1)
grid.addWidget(gpol_lbl, row, 2); grid.addWidget(self.gpol, row, 3); row += 1
grid.addWidget(self.btn_apply, row, 0)
grid.addWidget(self.btn_read, row, 1)
self.setLayout(grid)
self.btn_apply.clicked.connect(self._emit_apply)
self.btn_read.clicked.connect(lambda: self.readback_burst.emit())
def _emit_apply(self):
try:
ncy_int = int(float(self.ncy.text()))
except Exception:
ncy_int = 1
cfg = dict(
STATE=self.state.currentText(),
MODE=self.mode.currentText(),
TRSR=self.src.currentText(),
NCYC=str(ncy_int),
DLAY=human_to_eng(self.dly.text()),
GATEPOL=self.gpol.currentText(),
)
self.apply_burst.emit(cfg)
class SweepTab(QtWidgets.QWidget):
apply_sweep = QtCore.pyqtSignal(dict)
readback_sweep = QtCore.pyqtSignal()
def __init__(self):
super().__init__()
self._build()
def _build(self):
state_lbl = QtWidgets.QLabel("State")
self.state = QtWidgets.QComboBox(); self.state.addItems(["OFF", "ON"])
typ_lbl = QtWidgets.QLabel("Type")
self.typ = QtWidgets.QComboBox(); self.typ.addItems(["LIN", "LOG"])
fstart_lbl = QtWidgets.QLabel("Start Hz")
self.fstart = QtWidgets.QLineEdit("1 kHz")
fstop_lbl = QtWidgets.QLabel("Stop Hz")
self.fstop = QtWidgets.QLineEdit("10 kHz")
time_lbl = QtWidgets.QLabel("Time s")
self.time = QtWidgets.QLineEdit("1.0")
dir_lbl = QtWidgets.QLabel("Direction")
self.direction = QtWidgets.QComboBox(); self.direction.addItems(["UP", "DOWN"])
src_lbl = QtWidgets.QLabel("Source")
self.src = QtWidgets.QComboBox(); self.src.addItems(["INT", "EXT"])
self.btn_apply = QtWidgets.QPushButton("Apply Sweep")
self.btn_read = QtWidgets.QPushButton("Readback Sweep")
grid = QtWidgets.QGridLayout(); row = 0
grid.addWidget(state_lbl, row, 0); grid.addWidget(self.state, row, 1)
grid.addWidget(typ_lbl, row, 2); grid.addWidget(self.typ, row, 3); row += 1
grid.addWidget(fstart_lbl, row, 0); grid.addWidget(self.fstart, row, 1)
grid.addWidget(fstop_lbl, row, 2); grid.addWidget(self.fstop, row, 3); row += 1
grid.addWidget(time_lbl, row, 0); grid.addWidget(self.time, row, 1)
grid.addWidget(dir_lbl, row, 2); grid.addWidget(self.direction, row, 3); row += 1
grid.addWidget(src_lbl, row, 0); grid.addWidget(self.src, row, 1); row += 1
grid.addWidget(self.btn_apply, row, 0)
grid.addWidget(self.btn_read, row, 1)
self.setLayout(grid)
self.btn_apply.clicked.connect(self._emit_apply)
self.btn_read.clicked.connect(lambda: self.readback_sweep.emit())
def _emit_apply(self):
cfg = dict(
STATE=self.state.currentText(),
WAV=self.typ.currentText(),
STAR=human_to_eng(self.fstart.text()),
STOP=human_to_eng(self.fstop.text()),
TIME=human_to_eng(self.time.text()),
DIR=self.direction.currentText(),
TRSR=self.src.currentText(),
)
self.apply_sweep.emit(cfg)
class ARBTab(QtWidgets.QWidget):
refresh_lists = QtCore.pyqtSignal()
set_wave = QtCore.pyqtSignal(str)
upload_wave = QtCore.pyqtSignal(str, str)
download_wave = QtCore.pyqtSignal(str, str)
def __init__(self):
super().__init__()
self._build()
def _build(self):
self.built_in = QtWidgets.QListWidget()
self.user = QtWidgets.QListWidget()
refresh = QtWidgets.QPushButton("Refresh")
set_btn = QtWidgets.QPushButton("Set Selected to Channel")
upload_btn = QtWidgets.QPushButton("Upload .bin/.csv → USER")
download_btn = QtWidgets.QPushButton("Download USER → file")
self.name_edit = QtWidgets.QLineEdit(); self.name_edit.setPlaceholderText("User name (optional)")
grid = QtWidgets.QGridLayout()
grid.addWidget(QtWidgets.QLabel("BUILDIN"), 0, 0)
grid.addWidget(QtWidgets.QLabel("USER"), 0, 1)
grid.addWidget(self.built_in, 1, 0)
grid.addWidget(self.user, 1, 1)
grid.addWidget(refresh, 2, 0)
grid.addWidget(set_btn, 2, 1)
grid.addWidget(self.name_edit, 3, 0)
grid.addWidget(upload_btn, 3, 1)
grid.addWidget(download_btn, 4, 1)
self.setLayout(grid)
refresh.clicked.connect(lambda: self.refresh_lists.emit())
set_btn.clicked.connect(self._emit_set)
upload_btn.clicked.connect(self._emit_upload)
download_btn.clicked.connect(self._emit_download)
def selected_name(self) -> str:
w = self.user.currentItem() or self.built_in.currentItem()
return w.text().strip() if w else ""
def _emit_set(self):
name = self.selected_name()
if name:
self.set_wave.emit(name)
def _emit_upload(self):
path, _ = QtWidgets.QFileDialog.getOpenFileName(self, "Select ARB file", "", "ARB Files (*.bin *.csv);;All Files (*)")
if not path:
return
name = self.name_edit.text().strip()
if not name:
name = os.path.splitext(os.path.basename(path))[0]
self.upload_wave.emit(name, path)
def _emit_download(self):
item = self.user.currentItem()
if not item:
return
name = item.text().strip()
path, _ = QtWidgets.QFileDialog.getSaveFileName(self, "Save ARB file", f"{name}.bin", "Binary (*.bin);;All Files (*)")
if not path:
return
self.download_wave.emit(name, path)
class CLITab(QtWidgets.QWidget):
send_cmd = QtCore.pyqtSignal(str)
query_cmd = QtCore.pyqtSignal(str)
def __init__(self):
super().__init__()
self._build()
def _build(self):
self.inp = QtWidgets.QLineEdit()
self.inp.setPlaceholderText("Enter SCPI, e.g. C1:BSWV? or *IDN?")
btn_send = QtWidgets.QPushButton("Send")
btn_query = QtWidgets.QPushButton("Query")
self.out = QtWidgets.QPlainTextEdit(); self.out.setReadOnly(True); self.out.setMinimumHeight(220)
grid = QtWidgets.QGridLayout(); row = 0
grid.addWidget(QtWidgets.QLabel("Command"), row, 0)
grid.addWidget(self.inp, row, 1, 1, 4)
grid.addWidget(btn_send, row, 5)
grid.addWidget(btn_query, row, 6); row += 1
grid.addWidget(self.out, row, 0, 1, 7)
self.setLayout(grid)
btn_send.clicked.connect(self._do_send)
btn_query.clicked.connect(self._do_query)
self.inp.returnPressed.connect(self._do_query)
def append(self, s: str):
self.out.appendPlainText(s)
def _do_send(self):
cmd = self.inp.text().strip()
if cmd:
self.send_cmd.emit(cmd)
def _do_query(self):
cmd = self.inp.text().strip()
if cmd:
self.query_cmd.emit(cmd)
# -------------------- Main Window --------------------
class Main(QtWidgets.QWidget):
def __init__(self, preset_ip=None):
super().__init__()
self.setWindowTitle(Window_Name)
self.i = SDGLan()
self._scr_worker = None
self._build(preset_ip)
def _build(self, preset_ip):
ip_lbl = QtWidgets.QLabel("IP")
self.ip = QtWidgets.QLineEdit(preset_ip or "")
if not preset_ip:
self.ip.setPlaceholderText("e.g. 192.168.1.120")
port_lbl = QtWidgets.QLabel("Port")
self.port = QtWidgets.QLineEdit(str(DEFAULT_PORT))
self.btn_connect = QtWidgets.QPushButton("Connect")
self.btn_idn = QtWidgets.QPushButton("IDN?")
self.btn_err = QtWidgets.QPushButton("SYST:ERR?")
self.btn_scr = QtWidgets.QPushButton("Screenshot")
ch_lbl = QtWidgets.QLabel("Channel")
self.ch = QtWidgets.QComboBox(); self.ch.addItems(["C1", "C2"])
self.tabs = QtWidgets.QTabWidget()
self.t_basic = BasicTab(); self.tabs.addTab(self.t_basic, "Basic")
self.t_burst = BurstTab(); self.tabs.addTab(self.t_burst, "Burst")
self.t_sweep = SweepTab(); self.tabs.addTab(self.t_sweep, "Sweep")
self.t_arb = ARBTab(); self.tabs.addTab(self.t_arb, "ARB Manager")
self.t_cli = CLITab(); self.tabs.addTab(self.t_cli, "SCPI CLI")
self.t_presets = PresetsTab(); self.tabs.addTab(self.t_presets, "Presets")
self.log = QtWidgets.QPlainTextEdit(); self.log.setReadOnly(True); self.log.setMinimumHeight(220)
top = QtWidgets.QGridLayout(); row = 0
top.addWidget(ip_lbl, row, 0); top.addWidget(self.ip, row, 1, 1, 3)
top.addWidget(port_lbl, row, 4); top.addWidget(self.port, row, 5)
top.addWidget(self.btn_connect, row, 6); row += 1
top.addWidget(self.btn_idn, row, 0); top.addWidget(self.btn_err, row, 1); top.addWidget(self.btn_scr, row, 2)
top.addWidget(ch_lbl, row, 3); top.addWidget(self.ch, row, 4); row += 1
top.addWidget(self.tabs, row, 0, 1, 7); row += 1
top.addWidget(self.log, row, 0, 1, 7)
self.setLayout(top)
# wiring
self.btn_connect.clicked.connect(self.on_connect)
self.btn_idn.clicked.connect(self.on_idn)
self.btn_err.clicked.connect(self.on_err)
self.btn_scr.clicked.connect(self.on_screenshot)
self.t_basic.apply_basic.connect(self.on_apply_basic)
self.t_basic.readback_basic.connect(self.on_readback_basic)
self.t_basic.toggle_output.connect(self.on_output)
self.t_basic.store_slot.connect(self.on_store_slot)
self.t_basic.recall_slot.connect(self.on_recall_slot)
self.t_burst.apply_burst.connect(self.on_apply_burst)
self.t_burst.readback_burst.connect(self.on_readback_burst)
self.t_sweep.apply_sweep.connect(self.on_apply_sweep)
self.t_sweep.readback_sweep.connect(self.on_readback_sweep)
self.t_arb.refresh_lists.connect(self.on_arb_refresh)
self.t_arb.set_wave.connect(self.on_arb_set)
self.t_arb.upload_wave.connect(self.on_arb_upload)
self.t_arb.download_wave.connect(self.on_arb_download)
self.t_cli.send_cmd.connect(self.on_cli_send)
self.t_cli.query_cmd.connect(self.on_cli_query)
self.t_presets.rename_slot.connect(self.on_rename_slot)
self.t_presets.view_refresh.connect(self.refresh_presets_view)
ensure_preset_file(PRESET_FILE)
self.refresh_presets_view()
def logln(self, s: str):
self.log.appendPlainText(s)
def _pref(self) -> str:
return self.ch.currentText() + ":"
# ---------- Top actions ----------
def on_connect(self):
try:
addr = self.ip.text().strip(); port = int(self.port.text().strip())
self.i.connect(addr, port)
self.logln(f"Connected to {addr}:{port}")
except Exception as e:
self.logln(f"[ERR] connect: {e}")
def on_idn(self):
try:
self.logln(f"*IDN? -> {self.i.query('*IDN?')}")
except Exception as e:
self.logln(f"[ERR] IDN: {e}")
def on_err(self):
try:
self.logln(f"SYST:ERR? -> {self.i.query('SYST:ERR?')}")
except Exception as e:
self.logln(f"[ERR] SYST:ERR?: {e}")
# ---------- Screenshot (threaded) ----------
def on_screenshot(self):
if not self.i.sock:
self.logln("[ERR] screenshot: not connected")
return
if self._scr_worker and self._scr_worker.isRunning():
self.logln("[ERR] screenshot already running")
return
self.btn_scr.setEnabled(False)
self.setCursor(QtCore.Qt.WaitCursor)
self._scr_worker = ScreenshotWorker(self.i, self)
self._scr_worker.done.connect(self._on_screenshot_done)
self._scr_worker.start()
def _on_screenshot_done(self, data: bytes, fn: str, err: str):
self.btn_scr.setEnabled(True)
self.unsetCursor()
if err:
self.logln(f"[ERR] screenshot: {err}")
return
try:
with open(fn, "wb") as f:
f.write(data)
self.logln(f"Screenshot saved to {fn} ({len(data)} bytes)")
img = QtGui.QImage(fn)
if img.isNull():
self.logln("[ERR] screenshot: Qt could not decode image; file kept")
return
pixmap = QtGui.QPixmap.fromImage(img)
lbl = QtWidgets.QLabel(); lbl.setPixmap(pixmap)
w = QtWidgets.QWidget(); l = QtWidgets.QVBoxLayout(w); l.addWidget(lbl)
w.setWindowTitle("SDG2042X Screenshot")
w.resize(pixmap.width(), pixmap.height())
w.show()
self._scr_win = w
except Exception as e:
self.logln(f"[ERR] screenshot save/display: {e}")
# ---------- Basic ----------
def on_apply_basic(self, cfg: dict):
try:
cmd = (f"{self._pref()}BSWV "
f"WVTP,{cfg['WVTP']},FRQ,{cfg['FRQ']},AMP,{cfg['AMP']},"
f"OFST,{cfg['OFST']},PHSE,{cfg['PHSE']}")
self.i.write(cmd)
self.logln(f"SET: {cmd}")
except Exception as e:
self.logln(f"[ERR] basic apply: {e}")
def on_readback_basic(self):
try:
rb = self.i.query(f"{self._pref()}BSWV?")
st = self.i.query(f"{self._pref()}OUTP?")
self.logln(f"{self._pref()}BSWV? -> {rb}")
self.logln(f"{self._pref()}OUTP? -> {st}")
except Exception as e:
self.logln(f"[ERR] basic readback: {e}")
def on_output(self, turn_on: bool):
try:
cmd = f"{self._pref()}OUTP {'ON' if turn_on else 'OFF'}"
self.i.write(cmd)
st = self.i.query(f"{self._pref()}OUTP?")
self.logln(f"SET: {cmd}")
self.logln(f"{self._pref()}OUTP? -> {st}")
except Exception as e:
self.logln(f"[ERR] output: {e}")
# ---------- Burst ----------
def on_apply_burst(self, cfg: dict):
try:
parts = [
f"STATE,{cfg['STATE']}",
f"MODE,{cfg['MODE']}",
f"TRSR,{cfg['TRSR']}",
f"NCYC,{cfg['NCYC']}",
f"DLAY,{cfg['DLAY']}",
f"GATEPOL,{cfg['GATEPOL']}",
]
cmd = f"{self._pref()}BTWV " + ",".join(parts)
self.i.write(cmd)
self.logln(f"SET: {cmd}")
except Exception as e:
self.logln(f"[ERR] burst apply: {e}")
def on_readback_burst(self):
try:
rb = self.i.query(f"{self._pref()}BTWV?")
self.logln(f"{self._pref()}BTWV? -> {rb}")
except Exception as e:
self.logln(f"[ERR] burst readback: {e}")
# ---------- Sweep ----------
def on_apply_sweep(self, cfg: dict):
try:
parts = [
f"STATE,{cfg['STATE']}",
f"WAV,{cfg['WAV']}",
f"STAR,{cfg['STAR']}",
f"STOP,{cfg['STOP']}",
f"TIME,{cfg['TIME']}",
f"DIR,{cfg['DIR']}",
f"TRSR,{cfg['TRSR']}",
]
cmd = f"{self._pref()}SWWV " + ",".join(parts)
self.i.write(cmd)
err = self.i.query("SYST:ERR?")
if not err.startswith("0"):
parts[1] = f"SPAC,{cfg['WAV']}" # fallback for older FW
cmd2 = f"{self._pref()}SWWV " + ",".join(parts)
self.i.write(cmd2)
self.logln(f"SET (fallback): {cmd2}")
else:
self.logln(f"SET: {cmd}")
except Exception as e:
self.logln(f"[ERR] sweep apply: {e}")
def on_readback_sweep(self):
try:
rb = self.i.query(f"{self._pref()}SWWV?")
self.logln(f"{self._pref()}SWWV? -> {rb}")
except Exception as e:
self.logln(f"[ERR] sweep readback: {e}")
# ---------- ARB ops ----------
def on_arb_refresh(self):
try:
b = self.i.query("STL? BUILDIN")
u = self.i.query("STL? USER")
def parse_list(s: str):
parts = [p.strip() for p in s.replace(";", ",").split(',') if p.strip()]
return parts
b_list = parse_list(b)
u_list = parse_list(u)
self.t_arb.built_in.clear(); self.t_arb.user.clear()
self.t_arb.built_in.addItems(b_list)
self.t_arb.user.addItems(u_list)
self.logln(f"BUILDIN: {b_list}")
self.logln(f"USER: {u_list}")
except Exception as e:
self.logln(f"[ERR] ARB refresh: {e}")
def on_arb_set(self, name: str):
try:
qname = quote_name(name)
self.i.write(f"{self._pref()}BSWV WVTP,ARB")
self.i.write(f"{self._pref()}ARWV NAME,{qname}")
rb = self.i.query(f"{self._pref()}ARWV?")
self.logln(f"SET ARB: {qname}; ARWV? -> {rb}")
except Exception as e:
self.logln(f"[ERR] ARB set: {e}")
def _make_scpi_block(self, payload: bytes) -> bytes:
n = len(payload)
len_digits = str(len(str(n))).encode('ascii')
header = b'#' + len_digits + str(n).encode('ascii')
return header + payload
def on_arb_upload(self, name: str, path: str):
try:
with open(path, 'rb') as f:
data = f.read()
block = self._make_scpi_block(data)
qname = quote_name(name)
prefix = f"WVDT USER,{qname},".encode('ascii')
self.i.write_bytes(prefix + block + b"\n")
self.logln(f"Uploaded USER,{qname} ({len(data)} bytes)")
except Exception as e:
self.logln(f"[ERR] ARB upload: {e}")
def on_arb_download(self, name: str, savepath: str):
try:
qname = quote_name(name)
raw = self.i.query_raw(f"WVDT? USER,{qname}")
idx = raw.find(b"WAVEDATA,")
if idx == -1:
raise RuntimeError("No WAVEDATA header")
blk = raw[idx + len(b"WAVEDATA,"):]
if not blk or blk[0:1] != b'#':
raise RuntimeError("No SCPI block after WAVEDATA,")
ndig = int(chr(blk[1]))
start = 2
blen = int(blk[start:start+ndig].decode('ascii'))
start += ndig
payload = bytearray(blk[start:start+blen])
need = blen - len(payload)
while need > 0:
try:
chunk = self.i.sock.recv(min(65536, need))
if not chunk:
break
payload.extend(chunk)
need -= len(chunk)
except socket.timeout:
break
with open(savepath, 'wb') as f:
f.write(bytes(payload))
self.logln(f"Downloaded USER,{qname} -> {savepath} ({len(payload)} bytes)")
except Exception as e:
self.logln(f"[ERR] ARB download: {e}")
# ---------- CLI tab ----------
def on_cli_send(self, cmd: str):
try:
self.i.write(cmd)
self.t_cli.append(f"> {cmd}")
except Exception as e:
self.t_cli.append(f"[ERR] send: {e}")
def on_cli_query(self, cmd: str):
try:
resp = self.i.query(cmd)
self.t_cli.append(f"? {cmd}\n< {resp}")
except Exception as e:
self.t_cli.append(f"[ERR] query: {e}")
# ---------- Presets ----------
def on_store_slot(self, slot: int):
try:
if slot < 1 or slot > 10:
raise ValueError("slot out of range")
bswv = self.i.query(f"{self._pref()}BSWV?")
outp = self.i.query(f"{self._pref()}OUTP?")
outp_state = "ON" if "ON" in outp.upper().split(",")[0] else "OFF"
d = read_presets(PRESET_FILE)
d[f"SLOT{slot}_BSWV"] = bswv
d[f"SLOT{slot}_OUTP"] = outp_state
write_presets(PRESET_FILE, d)
self.logln(f"Stored slot {slot}: BSWV len={len(bswv)}, OUTP={outp_state}")
self.refresh_presets_view()
except Exception as e:
self.logln(f"[ERR] store slot {slot}: {e}")
def on_recall_slot(self, slot: int):
try:
if slot < 1 or slot > 10:
raise ValueError("slot out of range")
d = read_presets(PRESET_FILE)
b = d.get(f"SLOT{slot}_BSWV", "").strip()
o = d.get(f"SLOT{slot}_OUTP", "").strip().upper()
if not b:
raise RuntimeError("empty BSWV")
self.i.write(b)
if o in ("ON", "OFF"):
self.i.write(f"{self._pref()}OUTP {o}")
self.logln(f"Recalled slot {slot}: applied BSWV and OUTP={o or 'N/A'}")
except Exception as e:
self.logln(f"[ERR] recall slot {slot}: {e}")
def on_rename_slot(self, slot: int, name: str):
try:
d = read_presets(PRESET_FILE)
d[f"SLOT{slot}_NAME"] = name
write_presets(PRESET_FILE, d)
self.refresh_presets_view()
except Exception as e:
self.logln(f"[ERR] rename slot {slot}: {e}")
def refresh_presets_view(self):
try:
d = read_presets(PRESET_FILE)
self.t_presets.update_from_presets(d)
except Exception as e:
self.logln(f"[ERR] refresh presets: {e}")
# -------------------- CLI entry --------------------
def show_help():
print("Usage: sdg2042x_gui.py [options]\n")
print("Options:")
print(" -ip <addr>, --ip <addr> Prefill IP address field")
print(" -h, --help Show this help message")
if __name__ == "__main__":
preset_ip = None
argv = sys.argv[1:]
for i, tok in enumerate(argv):
if tok in ("-h", "--help"):
show_help(); sys.exit(0)
if tok in ("-ip", "--ip") and i + 1 < len(argv):
preset_ip = argv[i + 1]
app = QtWidgets.QApplication(sys.argv)
m = Main(preset_ip=preset_ip)
m.resize(1100, 680)
m.show()
sys.exit(app.exec_())