1025 lines
38 KiB
Python
Executable File
1025 lines
38 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
# Siglent SDG2042X Linux GUI (PyQt5)
|
||
# Features:
|
||
# - Basic, Burst, Sweep,
|
||
# - ARB Manager, SCPI CLI,
|
||
# - Storing Presets, storing to editable text file SDG2042x.dat
|
||
# - Screenshot
|
||
#
|
||
# Author: Thomas Gohle / tgohle@togo-lab.io
|
||
# www.togo-lab.io
|
||
# tgonet.de / gohle.de
|
||
#
|
||
# Copyright: CC BY-SA 4.0
|
||
# https://creativecommons.org/licenses/by-sa/4.0/
|
||
#
|
||
# Version: 0.1
|
||
#----------------------------------------------------
|
||
|
||
import os
|
||
import socket
|
||
import sys
|
||
import time
|
||
from PyQt5 import QtWidgets, QtCore, QtGui
|
||
|
||
# -------------------- Constants --------------------
|
||
Window_Name = "Linux GUI SDG2042X – by Thomas Gohle - ToGo-Lab - Ver 0.1"
|
||
DEFAULT_PORT = 5025
|
||
SOCKET_TIMEOUT = 2.5
|
||
SIGLENT_WAVES = ["SINE", "SQUARE", "RAMP", "PULSE", "NOISE", "ARB", "DC"]
|
||
PRESET_FILE = "SDG2042x.dat"
|
||
NUM_SLOTS = 10
|
||
|
||
# -------------------- Transport --------------------
|
||
class SDGLan:
|
||
def __init__(self):
|
||
self.sock = None
|
||
self.addr = None
|
||
self.port = DEFAULT_PORT
|
||
|
||
def connect(self, addr, port=DEFAULT_PORT):
|
||
self.close()
|
||
self.addr = addr
|
||
self.port = port
|
||
self.sock = socket.create_connection((addr, port), timeout=SOCKET_TIMEOUT)
|
||
self.sock.settimeout(SOCKET_TIMEOUT)
|
||
|
||
def close(self):
|
||
try:
|
||
if self.sock:
|
||
self.sock.close()
|
||
finally:
|
||
self.sock = None
|
||
|
||
def write(self, cmd: str):
|
||
if not self.sock:
|
||
raise RuntimeError("Not connected")
|
||
if not cmd.endswith("\n"):
|
||
cmd += "\n"
|
||
self.sock.sendall(cmd.encode("ascii"))
|
||
|
||
def write_bytes(self, payload: bytes):
|
||
if not self.sock:
|
||
raise RuntimeError("Not connected")
|
||
self.sock.sendall(payload)
|
||
|
||
def query(self, cmd: str) -> str:
|
||
self.write(cmd)
|
||
data = self._recv_line()
|
||
return data.strip()
|
||
|
||
def query_raw(self, cmd: str) -> bytes:
|
||
self.write(cmd)
|
||
return self._recv_until_timeout()
|
||
|
||
# ---- block helpers ----
|
||
def _recv_exact(self, n: int) -> bytes:
|
||
buf = bytearray()
|
||
while len(buf) < n:
|
||
chunk = self.sock.recv(n - len(buf))
|
||
if not chunk:
|
||
break
|
||
buf.extend(chunk)
|
||
return bytes(buf)
|
||
|
||
def _recv_exact_capped(self, n: int, cap: int) -> bytes:
|
||
if n > cap:
|
||
raise RuntimeError(f"SCPI block length {n} exceeds cap {cap}")
|
||
return self._recv_exact(n)
|
||
|
||
def query_block(self, cmd: str, timeout_s: float = 8.0, cap: int = 25_000_000) -> bytes:
|
||
"""Read a SCPI definite-length block (#<d><len><payload>). Returns payload without header."""
|
||
if not self.sock:
|
||
raise RuntimeError("Not connected")
|
||
prev_to = self.sock.gettimeout()
|
||
try:
|
||
self.sock.settimeout(timeout_s)
|
||
self.write(cmd)
|
||
# seek '#'
|
||
first = self._recv_exact(1)
|
||
while first and first != b"#":
|
||
first = self._recv_exact(1)
|
||
if not first:
|
||
return b""
|
||
ndig_b = self._recv_exact(1)
|
||
if not ndig_b or not ndig_b.isdigit():
|
||
return b""
|
||
ndig = int(ndig_b.decode("ascii"))
|
||
len_str = self._recv_exact(ndig).decode("ascii")
|
||
if not len_str.isdigit():
|
||
return b""
|
||
length = int(len_str)
|
||
payload = self._recv_exact_capped(length, cap)
|
||
return payload
|
||
finally:
|
||
try:
|
||
self.sock.settimeout(prev_to)
|
||
except Exception:
|
||
pass
|
||
|
||
def query_scdp(self, timeout_s: float = 15.0, cap: int = 50_000_000) -> bytes:
|
||
"""Robust screenshot: handles BMP direct or SCPI block-wrapped payload."""
|
||
if not self.sock:
|
||
raise RuntimeError("Not connected")
|
||
prev_to = self.sock.gettimeout()
|
||
try:
|
||
self.sock.settimeout(timeout_s)
|
||
# Send SCDP
|
||
self.write("SCDP")
|
||
# Peek first two bytes
|
||
first2 = self._recv_exact(2)
|
||
if not first2:
|
||
return b""
|
||
# Case 1: BMP direct
|
||
if first2 == b"BM":
|
||
size_bytes = self._recv_exact(4)
|
||
if len(size_bytes) != 4:
|
||
return b""
|
||
total_size = int.from_bytes(size_bytes, "little")
|
||
# We already have 6 bytes
|
||
rest = self._recv_exact_capped(total_size - 6, cap)
|
||
return first2 + size_bytes + rest
|
||
# Case 2: SCPI block
|
||
if first2[:1] == b"#":
|
||
ndig = int(first2[1:2].decode())
|
||
len_str = self._recv_exact(ndig).decode("ascii")
|
||
if not len_str.isdigit():
|
||
return b""
|
||
length = int(len_str)
|
||
payload = self._recv_exact_capped(length, cap)
|
||
return payload
|
||
# Case 3: unknown header, read until timeout
|
||
tail = self._recv_until_timeout()
|
||
return first2 + tail
|
||
finally:
|
||
try:
|
||
self.sock.settimeout(prev_to)
|
||
except Exception:
|
||
pass
|
||
|
||
def _recv_line(self) -> str:
|
||
chunks = []
|
||
while True:
|
||
b = self.sock.recv(4096)
|
||
if not b:
|
||
break
|
||
chunks.append(b)
|
||
if b.endswith(b"\n"):
|
||
break
|
||
return b"".join(chunks).decode("ascii", errors="ignore")
|
||
|
||
def _recv_until_timeout(self) -> bytes:
|
||
data = bytearray()
|
||
while True:
|
||
try:
|
||
b = self.sock.recv(65536)
|
||
if not b:
|
||
break
|
||
data.extend(b)
|
||
except socket.timeout:
|
||
break
|
||
return bytes(data)
|
||
|
||
# -------------------- Utilities --------------------
|
||
def human_to_eng(s: str) -> str:
|
||
"""Convert '1k', '2.5M', '500mV', '10ms' etc. to float string (base units)."""
|
||
t = s.strip().lower().replace("vpp", "").replace("v", "")
|
||
t = t.replace("hz", "").replace("deg", "")
|
||
mult = 1.0
|
||
if t.endswith("ms"):
|
||
mult = 1e-3; t = t[:-2]
|
||
elif t.endswith("us"):
|
||
mult = 1e-6; t = t[:-2]
|
||
elif t.endswith("ns"):
|
||
mult = 1e-9; t = t[:-2]
|
||
elif t.endswith("s"):
|
||
mult = 1.0; t = t[:-1]
|
||
elif t.endswith("khz"):
|
||
mult = 1e3; t = t[:-3]
|
||
elif t.endswith("mhz"):
|
||
mult = 1e6; t = t[:-3]
|
||
elif t.endswith("ghz"):
|
||
mult = 1e9; t = t[:-3]
|
||
elif t.endswith("k"):
|
||
mult = 1e3; t = t[:-1]
|
||
elif t.endswith("m") and not t.endswith("mm"):
|
||
mult = 1e-3; t = t[:-1]
|
||
elif t.endswith("u"):
|
||
mult = 1e-6; t = t[:-1]
|
||
elif t.endswith("n"):
|
||
mult = 1e-9; t = t[:-1]
|
||
elif t.endswith("g"):
|
||
mult = 1e9; t = t[:-1]
|
||
try:
|
||
val = float(t)
|
||
except ValueError:
|
||
return s.strip()
|
||
return f"{val*mult}"
|
||
|
||
def quote_name(name: str) -> str:
|
||
"""Quote ARB names if they contain spaces or punctuation."""
|
||
safe = all(ch.isalnum() or ch in ("_", "-") for ch in name)
|
||
return name if safe else '"%s"' % name
|
||
|
||
def ensure_preset_file(path: str):
|
||
if not os.path.exists(path):
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
f.write("# SDG2042X presets\n")
|
||
for i in range(1, 11):
|
||
f.write(f"SLOT{i}_NAME=\n")
|
||
f.write(f"SLOT{i}_BSWV=\n")
|
||
f.write(f"SLOT{i}_OUTP=\n")
|
||
return path
|
||
|
||
def read_presets(path: str) -> dict:
|
||
ensure_preset_file(path)
|
||
presets = {}
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
for line in f:
|
||
line = line.strip()
|
||
if not line or line.startswith("#"):
|
||
continue
|
||
if "=" not in line:
|
||
continue
|
||
k, v = line.split("=", 1)
|
||
presets[k.strip()] = v.strip()
|
||
return presets
|
||
|
||
def write_presets(path: str, d: dict):
|
||
lines = ["# SDG2042X presets"]
|
||
for i in range(1, 11):
|
||
name = d.get(f"SLOT{i}_NAME", "")
|
||
bswv = d.get(f"SLOT{i}_BSWV", "")
|
||
outp = d.get(f"SLOT{i}_OUTP", "")
|
||
lines.append(f"SLOT{i}_NAME={name}")
|
||
lines.append(f"SLOT{i}_BSWV={bswv}")
|
||
lines.append(f"SLOT{i}_OUTP={outp}")
|
||
with open(path, "w", encoding="utf-8") as f:
|
||
f.write("\n".join(lines) + "\n")
|
||
|
||
# -------------------- Screenshot worker (threaded) --------------------
|
||
class ScreenshotWorker(QtCore.QThread):
|
||
done = QtCore.pyqtSignal(bytes, str, str) # data, filename, error ('' if ok)
|
||
|
||
def __init__(self, io: SDGLan, parent=None):
|
||
super().__init__(parent)
|
||
self.io = io
|
||
|
||
def run(self):
|
||
try:
|
||
# Clear previous errors that may reference old invalid commands
|
||
try:
|
||
self.io.write("*CLS")
|
||
except Exception:
|
||
pass
|
||
# Drain any residual bytes from prior operations
|
||
try:
|
||
_ = self.io._recv_until_timeout()
|
||
except Exception:
|
||
pass
|
||
data = self.io.query_scdp(timeout_s=20.0, cap=50_000_000)
|
||
if not data:
|
||
err = self.io.query("SYST:ERR?")
|
||
raise RuntimeError(f"no image data; last SYST:ERR? -> {err}")
|
||
ts = time.strftime("%Y%m%d-%H%M%S")
|
||
fn = f"{ts}_SDG2042X.bmp" if data.startswith(b"BM") else f"{ts}_SDG2042X.bin"
|
||
self.done.emit(data, fn, "")
|
||
except Exception as e:
|
||
self.done.emit(b"", "", str(e))
|
||
|
||
# -------------------- Tabs --------------------
|
||
class BasicTab(QtWidgets.QWidget):
|
||
apply_basic = QtCore.pyqtSignal(dict)
|
||
readback_basic = QtCore.pyqtSignal()
|
||
toggle_output = QtCore.pyqtSignal(bool)
|
||
store_slot = QtCore.pyqtSignal(int)
|
||
recall_slot = QtCore.pyqtSignal(int)
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self._build()
|
||
|
||
def _build(self):
|
||
wave_lbl = QtWidgets.QLabel("Waveform")
|
||
self.wave = QtWidgets.QComboBox(); self.wave.addItems(SIGLENT_WAVES)
|
||
frq_lbl = QtWidgets.QLabel("Freq")
|
||
self.frq = QtWidgets.QLineEdit("1 kHz")
|
||
amp_lbl = QtWidgets.QLabel("Amp Vpp")
|
||
self.amp = QtWidgets.QLineEdit("1.0")
|
||
off_lbl = QtWidgets.QLabel("Offset V")
|
||
self.off = QtWidgets.QLineEdit("0.0")
|
||
ph_lbl = QtWidgets.QLabel("Phase deg")
|
||
self.ph = QtWidgets.QLineEdit("0")
|
||
|
||
self.btn_apply = QtWidgets.QPushButton("Apply")
|
||
self.btn_read = QtWidgets.QPushButton("Readback")
|
||
self.btn_on = QtWidgets.QPushButton("Output ON")
|
||
self.btn_off = QtWidgets.QPushButton("Output OFF")
|
||
|
||
# Preset controls
|
||
preset_lbl = QtWidgets.QLabel("Preset #")
|
||
self.preset_num = QtWidgets.QSpinBox(); self.preset_num.setRange(1, 10); self.preset_num.setValue(1)
|
||
self.btn_store = QtWidgets.QPushButton("Store")
|
||
self.btn_recall = QtWidgets.QPushButton("Recall")
|
||
|
||
grid = QtWidgets.QGridLayout(); row = 0
|
||
grid.addWidget(wave_lbl, row, 0); grid.addWidget(self.wave, row, 1)
|
||
grid.addWidget(frq_lbl, row, 2); grid.addWidget(self.frq, row, 3); row += 1
|
||
grid.addWidget(amp_lbl, row, 0); grid.addWidget(self.amp, row, 1)
|
||
grid.addWidget(off_lbl, row, 2); grid.addWidget(self.off, row, 3); row += 1
|
||
grid.addWidget(ph_lbl, row, 0); grid.addWidget(self.ph, row, 1); row += 1
|
||
grid.addWidget(self.btn_apply, row, 0)
|
||
grid.addWidget(self.btn_read, row, 1)
|
||
grid.addWidget(self.btn_on, row, 2)
|
||
grid.addWidget(self.btn_off, row, 3); row += 1
|
||
|
||
grid.addWidget(preset_lbl, row, 0)
|
||
grid.addWidget(self.preset_num, row, 1)
|
||
grid.addWidget(self.btn_store, row, 2)
|
||
grid.addWidget(self.btn_recall, row, 3)
|
||
self.setLayout(grid)
|
||
|
||
self.btn_apply.clicked.connect(self._emit_apply)
|
||
self.btn_read.clicked.connect(lambda: self.readback_basic.emit())
|
||
self.btn_on.clicked.connect(lambda: self.toggle_output.emit(True))
|
||
self.btn_off.clicked.connect(lambda: self.toggle_output.emit(False))
|
||
self.btn_store.clicked.connect(lambda: self.store_slot.emit(self.preset_num.value()))
|
||
self.btn_recall.clicked.connect(lambda: self.recall_slot.emit(self.preset_num.value()))
|
||
|
||
def _emit_apply(self):
|
||
cfg = dict(
|
||
WVTP=self.wave.currentText(),
|
||
FRQ=human_to_eng(self.frq.text()),
|
||
AMP=human_to_eng(self.amp.text()),
|
||
OFST=human_to_eng(self.off.text()),
|
||
PHSE=human_to_eng(self.ph.text()),
|
||
)
|
||
self.apply_basic.emit(cfg)
|
||
|
||
class PresetsTab(QtWidgets.QWidget):
|
||
rename_slot = QtCore.pyqtSignal(int, str)
|
||
view_refresh = QtCore.pyqtSignal()
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self._build()
|
||
|
||
def _build(self):
|
||
self.table = QtWidgets.QTableWidget(10, 3)
|
||
self.table.setHorizontalHeaderLabels(["Slot", "Name", "Summary"])
|
||
self.table.horizontalHeader().setStretchLastSection(True)
|
||
self.table.verticalHeader().setVisible(False)
|
||
for i in range(10):
|
||
self.table.setItem(i, 0, QtWidgets.QTableWidgetItem(str(i+1)))
|
||
self.table.item(i, 0).setFlags(QtCore.Qt.ItemIsEnabled)
|
||
self.table.setItem(i, 1, QtWidgets.QTableWidgetItem(""))
|
||
self.table.setItem(i, 2, QtWidgets.QTableWidgetItem(""))
|
||
self.table.item(i, 2).setFlags(QtCore.Qt.ItemIsEnabled)
|
||
|
||
self.btn_load = QtWidgets.QPushButton("Reload File")
|
||
self.btn_save = QtWidgets.QPushButton("Save Names")
|
||
self.btn_open = QtWidgets.QPushButton("Open File...")
|
||
self.path_label = QtWidgets.QLabel(PRESET_FILE)
|
||
|
||
layout = QtWidgets.QVBoxLayout()
|
||
hl = QtWidgets.QHBoxLayout()
|
||
hl.addWidget(self.btn_load); hl.addWidget(self.btn_save); hl.addWidget(self.btn_open); hl.addStretch(1); hl.addWidget(self.path_label)
|
||
layout.addLayout(hl)
|
||
layout.addWidget(self.table)
|
||
self.setLayout(layout)
|
||
|
||
self.btn_load.clicked.connect(lambda: self.view_refresh.emit())
|
||
self.btn_save.clicked.connect(self._save_names)
|
||
self.btn_open.clicked.connect(self._open_file)
|
||
|
||
def _open_file(self):
|
||
path, _ = QtWidgets.QFileDialog.getOpenFileName(self, "Open Preset File", PRESET_FILE, "Text (*.txt *.dat);;All Files (*)")
|
||
if not path:
|
||
return
|
||
self.path_label.setText(path)
|
||
self.view_refresh.emit()
|
||
|
||
def _save_names(self):
|
||
for i in range(10):
|
||
name = self.table.item(i, 1).text().strip()
|
||
self.rename_slot.emit(i+1, name)
|
||
|
||
def update_from_presets(self, d: dict):
|
||
for i in range(1, 11):
|
||
name = d.get(f"SLOT{i}_NAME", "")
|
||
b = d.get(f"SLOT{i}_BSWV", "")
|
||
outp = d.get(f"SLOT{i}_OUTP", "")
|
||
summ = ""
|
||
if b:
|
||
up = b.upper()
|
||
fields = []
|
||
for key in ("WVTP,", "FRQ,", "AMP,", "OFST,"):
|
||
idx = up.find(key)
|
||
if idx >= 0:
|
||
val = b[idx+len(key):]
|
||
val = val.split(",")[0]
|
||
fields.append(f"{key[:-1]}={val}")
|
||
summ = " ".join(fields)
|
||
if outp:
|
||
summ += (" " if summ else "") + f"OUTP={outp}"
|
||
self.table.item(i-1, 1).setText(name)
|
||
self.table.item(i-1, 2).setText(summ)
|
||
|
||
# -------------------- Other tabs (Burst, Sweep, ARB, CLI) --------------------
|
||
class BurstTab(QtWidgets.QWidget):
|
||
apply_burst = QtCore.pyqtSignal(dict)
|
||
readback_burst = QtCore.pyqtSignal()
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self._build()
|
||
|
||
def _build(self):
|
||
state_lbl = QtWidgets.QLabel("State")
|
||
self.state = QtWidgets.QComboBox(); self.state.addItems(["OFF", "ON"])
|
||
mode_lbl = QtWidgets.QLabel("Mode")
|
||
self.mode = QtWidgets.QComboBox(); self.mode.addItems(["TRIG", "GAT"])
|
||
src_lbl = QtWidgets.QLabel("Source")
|
||
self.src = QtWidgets.QComboBox(); self.src.addItems(["INT", "EXT"])
|
||
ncy_lbl = QtWidgets.QLabel("Cycles")
|
||
self.ncy = QtWidgets.QLineEdit("5")
|
||
dly_lbl = QtWidgets.QLabel("Delay s")
|
||
self.dly = QtWidgets.QLineEdit("0")
|
||
gpol_lbl = QtWidgets.QLabel("Gate Pol")
|
||
self.gpol = QtWidgets.QComboBox(); self.gpol.addItems(["POS", "NEG"])
|
||
|
||
self.btn_apply = QtWidgets.QPushButton("Apply Burst")
|
||
self.btn_read = QtWidgets.QPushButton("Readback Burst")
|
||
|
||
grid = QtWidgets.QGridLayout(); row = 0
|
||
grid.addWidget(state_lbl, row, 0); grid.addWidget(self.state, row, 1)
|
||
grid.addWidget(mode_lbl, row, 2); grid.addWidget(self.mode, row, 3); row += 1
|
||
grid.addWidget(src_lbl, row, 0); grid.addWidget(self.src, row, 1)
|
||
grid.addWidget(ncy_lbl, row, 2); grid.addWidget(self.ncy, row, 3); row += 1
|
||
grid.addWidget(dly_lbl, row, 0); grid.addWidget(self.dly, row, 1)
|
||
grid.addWidget(gpol_lbl, row, 2); grid.addWidget(self.gpol, row, 3); row += 1
|
||
grid.addWidget(self.btn_apply, row, 0)
|
||
grid.addWidget(self.btn_read, row, 1)
|
||
self.setLayout(grid)
|
||
|
||
self.btn_apply.clicked.connect(self._emit_apply)
|
||
self.btn_read.clicked.connect(lambda: self.readback_burst.emit())
|
||
|
||
def _emit_apply(self):
|
||
try:
|
||
ncy_int = int(float(self.ncy.text()))
|
||
except Exception:
|
||
ncy_int = 1
|
||
cfg = dict(
|
||
STATE=self.state.currentText(),
|
||
MODE=self.mode.currentText(),
|
||
TRSR=self.src.currentText(),
|
||
NCYC=str(ncy_int),
|
||
DLAY=human_to_eng(self.dly.text()),
|
||
GATEPOL=self.gpol.currentText(),
|
||
)
|
||
self.apply_burst.emit(cfg)
|
||
|
||
class SweepTab(QtWidgets.QWidget):
|
||
apply_sweep = QtCore.pyqtSignal(dict)
|
||
readback_sweep = QtCore.pyqtSignal()
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self._build()
|
||
|
||
def _build(self):
|
||
state_lbl = QtWidgets.QLabel("State")
|
||
self.state = QtWidgets.QComboBox(); self.state.addItems(["OFF", "ON"])
|
||
typ_lbl = QtWidgets.QLabel("Type")
|
||
self.typ = QtWidgets.QComboBox(); self.typ.addItems(["LIN", "LOG"])
|
||
fstart_lbl = QtWidgets.QLabel("Start Hz")
|
||
self.fstart = QtWidgets.QLineEdit("1 kHz")
|
||
fstop_lbl = QtWidgets.QLabel("Stop Hz")
|
||
self.fstop = QtWidgets.QLineEdit("10 kHz")
|
||
time_lbl = QtWidgets.QLabel("Time s")
|
||
self.time = QtWidgets.QLineEdit("1.0")
|
||
dir_lbl = QtWidgets.QLabel("Direction")
|
||
self.direction = QtWidgets.QComboBox(); self.direction.addItems(["UP", "DOWN"])
|
||
src_lbl = QtWidgets.QLabel("Source")
|
||
self.src = QtWidgets.QComboBox(); self.src.addItems(["INT", "EXT"])
|
||
|
||
self.btn_apply = QtWidgets.QPushButton("Apply Sweep")
|
||
self.btn_read = QtWidgets.QPushButton("Readback Sweep")
|
||
|
||
grid = QtWidgets.QGridLayout(); row = 0
|
||
grid.addWidget(state_lbl, row, 0); grid.addWidget(self.state, row, 1)
|
||
grid.addWidget(typ_lbl, row, 2); grid.addWidget(self.typ, row, 3); row += 1
|
||
grid.addWidget(fstart_lbl, row, 0); grid.addWidget(self.fstart, row, 1)
|
||
grid.addWidget(fstop_lbl, row, 2); grid.addWidget(self.fstop, row, 3); row += 1
|
||
grid.addWidget(time_lbl, row, 0); grid.addWidget(self.time, row, 1)
|
||
grid.addWidget(dir_lbl, row, 2); grid.addWidget(self.direction, row, 3); row += 1
|
||
grid.addWidget(src_lbl, row, 0); grid.addWidget(self.src, row, 1); row += 1
|
||
grid.addWidget(self.btn_apply, row, 0)
|
||
grid.addWidget(self.btn_read, row, 1)
|
||
self.setLayout(grid)
|
||
|
||
self.btn_apply.clicked.connect(self._emit_apply)
|
||
self.btn_read.clicked.connect(lambda: self.readback_sweep.emit())
|
||
|
||
def _emit_apply(self):
|
||
cfg = dict(
|
||
STATE=self.state.currentText(),
|
||
WAV=self.typ.currentText(),
|
||
STAR=human_to_eng(self.fstart.text()),
|
||
STOP=human_to_eng(self.fstop.text()),
|
||
TIME=human_to_eng(self.time.text()),
|
||
DIR=self.direction.currentText(),
|
||
TRSR=self.src.currentText(),
|
||
)
|
||
self.apply_sweep.emit(cfg)
|
||
|
||
class ARBTab(QtWidgets.QWidget):
|
||
refresh_lists = QtCore.pyqtSignal()
|
||
set_wave = QtCore.pyqtSignal(str)
|
||
upload_wave = QtCore.pyqtSignal(str, str)
|
||
download_wave = QtCore.pyqtSignal(str, str)
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self._build()
|
||
|
||
def _build(self):
|
||
self.built_in = QtWidgets.QListWidget()
|
||
self.user = QtWidgets.QListWidget()
|
||
refresh = QtWidgets.QPushButton("Refresh")
|
||
set_btn = QtWidgets.QPushButton("Set Selected to Channel")
|
||
upload_btn = QtWidgets.QPushButton("Upload .bin/.csv → USER")
|
||
download_btn = QtWidgets.QPushButton("Download USER → file")
|
||
self.name_edit = QtWidgets.QLineEdit(); self.name_edit.setPlaceholderText("User name (optional)")
|
||
|
||
grid = QtWidgets.QGridLayout()
|
||
grid.addWidget(QtWidgets.QLabel("BUILDIN"), 0, 0)
|
||
grid.addWidget(QtWidgets.QLabel("USER"), 0, 1)
|
||
grid.addWidget(self.built_in, 1, 0)
|
||
grid.addWidget(self.user, 1, 1)
|
||
grid.addWidget(refresh, 2, 0)
|
||
grid.addWidget(set_btn, 2, 1)
|
||
grid.addWidget(self.name_edit, 3, 0)
|
||
grid.addWidget(upload_btn, 3, 1)
|
||
grid.addWidget(download_btn, 4, 1)
|
||
self.setLayout(grid)
|
||
|
||
refresh.clicked.connect(lambda: self.refresh_lists.emit())
|
||
set_btn.clicked.connect(self._emit_set)
|
||
upload_btn.clicked.connect(self._emit_upload)
|
||
download_btn.clicked.connect(self._emit_download)
|
||
|
||
def selected_name(self) -> str:
|
||
w = self.user.currentItem() or self.built_in.currentItem()
|
||
return w.text().strip() if w else ""
|
||
|
||
def _emit_set(self):
|
||
name = self.selected_name()
|
||
if name:
|
||
self.set_wave.emit(name)
|
||
|
||
def _emit_upload(self):
|
||
path, _ = QtWidgets.QFileDialog.getOpenFileName(self, "Select ARB file", "", "ARB Files (*.bin *.csv);;All Files (*)")
|
||
if not path:
|
||
return
|
||
name = self.name_edit.text().strip()
|
||
if not name:
|
||
name = os.path.splitext(os.path.basename(path))[0]
|
||
self.upload_wave.emit(name, path)
|
||
|
||
def _emit_download(self):
|
||
item = self.user.currentItem()
|
||
if not item:
|
||
return
|
||
name = item.text().strip()
|
||
path, _ = QtWidgets.QFileDialog.getSaveFileName(self, "Save ARB file", f"{name}.bin", "Binary (*.bin);;All Files (*)")
|
||
if not path:
|
||
return
|
||
self.download_wave.emit(name, path)
|
||
|
||
class CLITab(QtWidgets.QWidget):
|
||
send_cmd = QtCore.pyqtSignal(str)
|
||
query_cmd = QtCore.pyqtSignal(str)
|
||
|
||
def __init__(self):
|
||
super().__init__()
|
||
self._build()
|
||
|
||
def _build(self):
|
||
self.inp = QtWidgets.QLineEdit()
|
||
self.inp.setPlaceholderText("Enter SCPI, e.g. C1:BSWV? or *IDN?")
|
||
btn_send = QtWidgets.QPushButton("Send")
|
||
btn_query = QtWidgets.QPushButton("Query")
|
||
self.out = QtWidgets.QPlainTextEdit(); self.out.setReadOnly(True); self.out.setMinimumHeight(220)
|
||
|
||
grid = QtWidgets.QGridLayout(); row = 0
|
||
grid.addWidget(QtWidgets.QLabel("Command"), row, 0)
|
||
grid.addWidget(self.inp, row, 1, 1, 4)
|
||
grid.addWidget(btn_send, row, 5)
|
||
grid.addWidget(btn_query, row, 6); row += 1
|
||
grid.addWidget(self.out, row, 0, 1, 7)
|
||
self.setLayout(grid)
|
||
|
||
btn_send.clicked.connect(self._do_send)
|
||
btn_query.clicked.connect(self._do_query)
|
||
self.inp.returnPressed.connect(self._do_query)
|
||
|
||
def append(self, s: str):
|
||
self.out.appendPlainText(s)
|
||
|
||
def _do_send(self):
|
||
cmd = self.inp.text().strip()
|
||
if cmd:
|
||
self.send_cmd.emit(cmd)
|
||
|
||
def _do_query(self):
|
||
cmd = self.inp.text().strip()
|
||
if cmd:
|
||
self.query_cmd.emit(cmd)
|
||
|
||
# -------------------- Main Window --------------------
|
||
class Main(QtWidgets.QWidget):
|
||
def __init__(self, preset_ip=None):
|
||
super().__init__()
|
||
self.setWindowTitle(Window_Name)
|
||
self.i = SDGLan()
|
||
self._scr_worker = None
|
||
self._build(preset_ip)
|
||
|
||
def _build(self, preset_ip):
|
||
ip_lbl = QtWidgets.QLabel("IP")
|
||
self.ip = QtWidgets.QLineEdit(preset_ip or "")
|
||
if not preset_ip:
|
||
self.ip.setPlaceholderText("e.g. 192.168.1.120")
|
||
port_lbl = QtWidgets.QLabel("Port")
|
||
self.port = QtWidgets.QLineEdit(str(DEFAULT_PORT))
|
||
self.btn_connect = QtWidgets.QPushButton("Connect")
|
||
self.btn_idn = QtWidgets.QPushButton("IDN?")
|
||
self.btn_err = QtWidgets.QPushButton("SYST:ERR?")
|
||
self.btn_scr = QtWidgets.QPushButton("Screenshot")
|
||
ch_lbl = QtWidgets.QLabel("Channel")
|
||
self.ch = QtWidgets.QComboBox(); self.ch.addItems(["C1", "C2"])
|
||
|
||
self.tabs = QtWidgets.QTabWidget()
|
||
self.t_basic = BasicTab(); self.tabs.addTab(self.t_basic, "Basic")
|
||
self.t_burst = BurstTab(); self.tabs.addTab(self.t_burst, "Burst")
|
||
self.t_sweep = SweepTab(); self.tabs.addTab(self.t_sweep, "Sweep")
|
||
self.t_arb = ARBTab(); self.tabs.addTab(self.t_arb, "ARB Manager")
|
||
self.t_cli = CLITab(); self.tabs.addTab(self.t_cli, "SCPI CLI")
|
||
self.t_presets = PresetsTab(); self.tabs.addTab(self.t_presets, "Presets")
|
||
|
||
self.log = QtWidgets.QPlainTextEdit(); self.log.setReadOnly(True); self.log.setMinimumHeight(220)
|
||
|
||
top = QtWidgets.QGridLayout(); row = 0
|
||
top.addWidget(ip_lbl, row, 0); top.addWidget(self.ip, row, 1, 1, 3)
|
||
top.addWidget(port_lbl, row, 4); top.addWidget(self.port, row, 5)
|
||
top.addWidget(self.btn_connect, row, 6); row += 1
|
||
top.addWidget(self.btn_idn, row, 0); top.addWidget(self.btn_err, row, 1); top.addWidget(self.btn_scr, row, 2)
|
||
top.addWidget(ch_lbl, row, 3); top.addWidget(self.ch, row, 4); row += 1
|
||
top.addWidget(self.tabs, row, 0, 1, 7); row += 1
|
||
top.addWidget(self.log, row, 0, 1, 7)
|
||
self.setLayout(top)
|
||
|
||
# wiring
|
||
self.btn_connect.clicked.connect(self.on_connect)
|
||
self.btn_idn.clicked.connect(self.on_idn)
|
||
self.btn_err.clicked.connect(self.on_err)
|
||
self.btn_scr.clicked.connect(self.on_screenshot)
|
||
|
||
self.t_basic.apply_basic.connect(self.on_apply_basic)
|
||
self.t_basic.readback_basic.connect(self.on_readback_basic)
|
||
self.t_basic.toggle_output.connect(self.on_output)
|
||
self.t_basic.store_slot.connect(self.on_store_slot)
|
||
self.t_basic.recall_slot.connect(self.on_recall_slot)
|
||
|
||
self.t_burst.apply_burst.connect(self.on_apply_burst)
|
||
self.t_burst.readback_burst.connect(self.on_readback_burst)
|
||
self.t_sweep.apply_sweep.connect(self.on_apply_sweep)
|
||
self.t_sweep.readback_sweep.connect(self.on_readback_sweep)
|
||
self.t_arb.refresh_lists.connect(self.on_arb_refresh)
|
||
self.t_arb.set_wave.connect(self.on_arb_set)
|
||
self.t_arb.upload_wave.connect(self.on_arb_upload)
|
||
self.t_arb.download_wave.connect(self.on_arb_download)
|
||
self.t_cli.send_cmd.connect(self.on_cli_send)
|
||
self.t_cli.query_cmd.connect(self.on_cli_query)
|
||
|
||
self.t_presets.rename_slot.connect(self.on_rename_slot)
|
||
self.t_presets.view_refresh.connect(self.refresh_presets_view)
|
||
|
||
ensure_preset_file(PRESET_FILE)
|
||
self.refresh_presets_view()
|
||
|
||
def logln(self, s: str):
|
||
self.log.appendPlainText(s)
|
||
|
||
def _pref(self) -> str:
|
||
return self.ch.currentText() + ":"
|
||
|
||
# ---------- Top actions ----------
|
||
def on_connect(self):
|
||
try:
|
||
addr = self.ip.text().strip(); port = int(self.port.text().strip())
|
||
self.i.connect(addr, port)
|
||
self.logln(f"Connected to {addr}:{port}")
|
||
except Exception as e:
|
||
self.logln(f"[ERR] connect: {e}")
|
||
|
||
def on_idn(self):
|
||
try:
|
||
self.logln(f"*IDN? -> {self.i.query('*IDN?')}")
|
||
except Exception as e:
|
||
self.logln(f"[ERR] IDN: {e}")
|
||
|
||
def on_err(self):
|
||
try:
|
||
self.logln(f"SYST:ERR? -> {self.i.query('SYST:ERR?')}")
|
||
except Exception as e:
|
||
self.logln(f"[ERR] SYST:ERR?: {e}")
|
||
|
||
# ---------- Screenshot (threaded) ----------
|
||
def on_screenshot(self):
|
||
if not self.i.sock:
|
||
self.logln("[ERR] screenshot: not connected")
|
||
return
|
||
if self._scr_worker and self._scr_worker.isRunning():
|
||
self.logln("[ERR] screenshot already running")
|
||
return
|
||
self.btn_scr.setEnabled(False)
|
||
self.setCursor(QtCore.Qt.WaitCursor)
|
||
self._scr_worker = ScreenshotWorker(self.i, self)
|
||
self._scr_worker.done.connect(self._on_screenshot_done)
|
||
self._scr_worker.start()
|
||
|
||
def _on_screenshot_done(self, data: bytes, fn: str, err: str):
|
||
self.btn_scr.setEnabled(True)
|
||
self.unsetCursor()
|
||
if err:
|
||
self.logln(f"[ERR] screenshot: {err}")
|
||
return
|
||
try:
|
||
with open(fn, "wb") as f:
|
||
f.write(data)
|
||
self.logln(f"Screenshot saved to {fn} ({len(data)} bytes)")
|
||
img = QtGui.QImage(fn)
|
||
if img.isNull():
|
||
self.logln("[ERR] screenshot: Qt could not decode image; file kept")
|
||
return
|
||
pixmap = QtGui.QPixmap.fromImage(img)
|
||
lbl = QtWidgets.QLabel(); lbl.setPixmap(pixmap)
|
||
w = QtWidgets.QWidget(); l = QtWidgets.QVBoxLayout(w); l.addWidget(lbl)
|
||
w.setWindowTitle("SDG2042X Screenshot")
|
||
w.resize(pixmap.width(), pixmap.height())
|
||
w.show()
|
||
self._scr_win = w
|
||
except Exception as e:
|
||
self.logln(f"[ERR] screenshot save/display: {e}")
|
||
|
||
# ---------- Basic ----------
|
||
def on_apply_basic(self, cfg: dict):
|
||
try:
|
||
cmd = (f"{self._pref()}BSWV "
|
||
f"WVTP,{cfg['WVTP']},FRQ,{cfg['FRQ']},AMP,{cfg['AMP']},"
|
||
f"OFST,{cfg['OFST']},PHSE,{cfg['PHSE']}")
|
||
self.i.write(cmd)
|
||
self.logln(f"SET: {cmd}")
|
||
except Exception as e:
|
||
self.logln(f"[ERR] basic apply: {e}")
|
||
|
||
def on_readback_basic(self):
|
||
try:
|
||
rb = self.i.query(f"{self._pref()}BSWV?")
|
||
st = self.i.query(f"{self._pref()}OUTP?")
|
||
self.logln(f"{self._pref()}BSWV? -> {rb}")
|
||
self.logln(f"{self._pref()}OUTP? -> {st}")
|
||
except Exception as e:
|
||
self.logln(f"[ERR] basic readback: {e}")
|
||
|
||
def on_output(self, turn_on: bool):
|
||
try:
|
||
cmd = f"{self._pref()}OUTP {'ON' if turn_on else 'OFF'}"
|
||
self.i.write(cmd)
|
||
st = self.i.query(f"{self._pref()}OUTP?")
|
||
self.logln(f"SET: {cmd}")
|
||
self.logln(f"{self._pref()}OUTP? -> {st}")
|
||
except Exception as e:
|
||
self.logln(f"[ERR] output: {e}")
|
||
|
||
# ---------- Burst ----------
|
||
def on_apply_burst(self, cfg: dict):
|
||
try:
|
||
parts = [
|
||
f"STATE,{cfg['STATE']}",
|
||
f"MODE,{cfg['MODE']}",
|
||
f"TRSR,{cfg['TRSR']}",
|
||
f"NCYC,{cfg['NCYC']}",
|
||
f"DLAY,{cfg['DLAY']}",
|
||
f"GATEPOL,{cfg['GATEPOL']}",
|
||
]
|
||
cmd = f"{self._pref()}BTWV " + ",".join(parts)
|
||
self.i.write(cmd)
|
||
self.logln(f"SET: {cmd}")
|
||
except Exception as e:
|
||
self.logln(f"[ERR] burst apply: {e}")
|
||
|
||
def on_readback_burst(self):
|
||
try:
|
||
rb = self.i.query(f"{self._pref()}BTWV?")
|
||
self.logln(f"{self._pref()}BTWV? -> {rb}")
|
||
except Exception as e:
|
||
self.logln(f"[ERR] burst readback: {e}")
|
||
|
||
# ---------- Sweep ----------
|
||
def on_apply_sweep(self, cfg: dict):
|
||
try:
|
||
parts = [
|
||
f"STATE,{cfg['STATE']}",
|
||
f"WAV,{cfg['WAV']}",
|
||
f"STAR,{cfg['STAR']}",
|
||
f"STOP,{cfg['STOP']}",
|
||
f"TIME,{cfg['TIME']}",
|
||
f"DIR,{cfg['DIR']}",
|
||
f"TRSR,{cfg['TRSR']}",
|
||
]
|
||
cmd = f"{self._pref()}SWWV " + ",".join(parts)
|
||
self.i.write(cmd)
|
||
err = self.i.query("SYST:ERR?")
|
||
if not err.startswith("0"):
|
||
parts[1] = f"SPAC,{cfg['WAV']}" # fallback for older FW
|
||
cmd2 = f"{self._pref()}SWWV " + ",".join(parts)
|
||
self.i.write(cmd2)
|
||
self.logln(f"SET (fallback): {cmd2}")
|
||
else:
|
||
self.logln(f"SET: {cmd}")
|
||
except Exception as e:
|
||
self.logln(f"[ERR] sweep apply: {e}")
|
||
|
||
def on_readback_sweep(self):
|
||
try:
|
||
rb = self.i.query(f"{self._pref()}SWWV?")
|
||
self.logln(f"{self._pref()}SWWV? -> {rb}")
|
||
except Exception as e:
|
||
self.logln(f"[ERR] sweep readback: {e}")
|
||
|
||
# ---------- ARB ops ----------
|
||
def on_arb_refresh(self):
|
||
try:
|
||
b = self.i.query("STL? BUILDIN")
|
||
u = self.i.query("STL? USER")
|
||
def parse_list(s: str):
|
||
parts = [p.strip() for p in s.replace(";", ",").split(',') if p.strip()]
|
||
return parts
|
||
b_list = parse_list(b)
|
||
u_list = parse_list(u)
|
||
self.t_arb.built_in.clear(); self.t_arb.user.clear()
|
||
self.t_arb.built_in.addItems(b_list)
|
||
self.t_arb.user.addItems(u_list)
|
||
self.logln(f"BUILDIN: {b_list}")
|
||
self.logln(f"USER: {u_list}")
|
||
except Exception as e:
|
||
self.logln(f"[ERR] ARB refresh: {e}")
|
||
|
||
def on_arb_set(self, name: str):
|
||
try:
|
||
qname = quote_name(name)
|
||
self.i.write(f"{self._pref()}BSWV WVTP,ARB")
|
||
self.i.write(f"{self._pref()}ARWV NAME,{qname}")
|
||
rb = self.i.query(f"{self._pref()}ARWV?")
|
||
self.logln(f"SET ARB: {qname}; ARWV? -> {rb}")
|
||
except Exception as e:
|
||
self.logln(f"[ERR] ARB set: {e}")
|
||
|
||
def _make_scpi_block(self, payload: bytes) -> bytes:
|
||
n = len(payload)
|
||
len_digits = str(len(str(n))).encode('ascii')
|
||
header = b'#' + len_digits + str(n).encode('ascii')
|
||
return header + payload
|
||
|
||
def on_arb_upload(self, name: str, path: str):
|
||
try:
|
||
with open(path, 'rb') as f:
|
||
data = f.read()
|
||
block = self._make_scpi_block(data)
|
||
qname = quote_name(name)
|
||
prefix = f"WVDT USER,{qname},".encode('ascii')
|
||
self.i.write_bytes(prefix + block + b"\n")
|
||
self.logln(f"Uploaded USER,{qname} ({len(data)} bytes)")
|
||
except Exception as e:
|
||
self.logln(f"[ERR] ARB upload: {e}")
|
||
|
||
def on_arb_download(self, name: str, savepath: str):
|
||
try:
|
||
qname = quote_name(name)
|
||
raw = self.i.query_raw(f"WVDT? USER,{qname}")
|
||
idx = raw.find(b"WAVEDATA,")
|
||
if idx == -1:
|
||
raise RuntimeError("No WAVEDATA header")
|
||
blk = raw[idx + len(b"WAVEDATA,"):]
|
||
if not blk or blk[0:1] != b'#':
|
||
raise RuntimeError("No SCPI block after WAVEDATA,")
|
||
ndig = int(chr(blk[1]))
|
||
start = 2
|
||
blen = int(blk[start:start+ndig].decode('ascii'))
|
||
start += ndig
|
||
payload = bytearray(blk[start:start+blen])
|
||
need = blen - len(payload)
|
||
while need > 0:
|
||
try:
|
||
chunk = self.i.sock.recv(min(65536, need))
|
||
if not chunk:
|
||
break
|
||
payload.extend(chunk)
|
||
need -= len(chunk)
|
||
except socket.timeout:
|
||
break
|
||
with open(savepath, 'wb') as f:
|
||
f.write(bytes(payload))
|
||
self.logln(f"Downloaded USER,{qname} -> {savepath} ({len(payload)} bytes)")
|
||
except Exception as e:
|
||
self.logln(f"[ERR] ARB download: {e}")
|
||
|
||
# ---------- CLI tab ----------
|
||
def on_cli_send(self, cmd: str):
|
||
try:
|
||
self.i.write(cmd)
|
||
self.t_cli.append(f"> {cmd}")
|
||
except Exception as e:
|
||
self.t_cli.append(f"[ERR] send: {e}")
|
||
|
||
def on_cli_query(self, cmd: str):
|
||
try:
|
||
resp = self.i.query(cmd)
|
||
self.t_cli.append(f"? {cmd}\n< {resp}")
|
||
except Exception as e:
|
||
self.t_cli.append(f"[ERR] query: {e}")
|
||
|
||
# ---------- Presets ----------
|
||
def on_store_slot(self, slot: int):
|
||
try:
|
||
if slot < 1 or slot > 10:
|
||
raise ValueError("slot out of range")
|
||
bswv = self.i.query(f"{self._pref()}BSWV?")
|
||
outp = self.i.query(f"{self._pref()}OUTP?")
|
||
outp_state = "ON" if "ON" in outp.upper().split(",")[0] else "OFF"
|
||
d = read_presets(PRESET_FILE)
|
||
d[f"SLOT{slot}_BSWV"] = bswv
|
||
d[f"SLOT{slot}_OUTP"] = outp_state
|
||
write_presets(PRESET_FILE, d)
|
||
self.logln(f"Stored slot {slot}: BSWV len={len(bswv)}, OUTP={outp_state}")
|
||
self.refresh_presets_view()
|
||
except Exception as e:
|
||
self.logln(f"[ERR] store slot {slot}: {e}")
|
||
|
||
def on_recall_slot(self, slot: int):
|
||
try:
|
||
if slot < 1 or slot > 10:
|
||
raise ValueError("slot out of range")
|
||
d = read_presets(PRESET_FILE)
|
||
b = d.get(f"SLOT{slot}_BSWV", "").strip()
|
||
o = d.get(f"SLOT{slot}_OUTP", "").strip().upper()
|
||
if not b:
|
||
raise RuntimeError("empty BSWV")
|
||
self.i.write(b)
|
||
if o in ("ON", "OFF"):
|
||
self.i.write(f"{self._pref()}OUTP {o}")
|
||
self.logln(f"Recalled slot {slot}: applied BSWV and OUTP={o or 'N/A'}")
|
||
except Exception as e:
|
||
self.logln(f"[ERR] recall slot {slot}: {e}")
|
||
|
||
def on_rename_slot(self, slot: int, name: str):
|
||
try:
|
||
d = read_presets(PRESET_FILE)
|
||
d[f"SLOT{slot}_NAME"] = name
|
||
write_presets(PRESET_FILE, d)
|
||
self.refresh_presets_view()
|
||
except Exception as e:
|
||
self.logln(f"[ERR] rename slot {slot}: {e}")
|
||
|
||
def refresh_presets_view(self):
|
||
try:
|
||
d = read_presets(PRESET_FILE)
|
||
self.t_presets.update_from_presets(d)
|
||
except Exception as e:
|
||
self.logln(f"[ERR] refresh presets: {e}")
|
||
|
||
# -------------------- CLI entry --------------------
|
||
def show_help():
|
||
print("Usage: sdg2042x_gui.py [options]\n")
|
||
print("Options:")
|
||
print(" -ip <addr>, --ip <addr> Prefill IP address field")
|
||
print(" -h, --help Show this help message")
|
||
|
||
if __name__ == "__main__":
|
||
preset_ip = None
|
||
argv = sys.argv[1:]
|
||
for i, tok in enumerate(argv):
|
||
if tok in ("-h", "--help"):
|
||
show_help(); sys.exit(0)
|
||
if tok in ("-ip", "--ip") and i + 1 < len(argv):
|
||
preset_ip = argv[i + 1]
|
||
app = QtWidgets.QApplication(sys.argv)
|
||
m = Main(preset_ip=preset_ip)
|
||
m.resize(1100, 680)
|
||
m.show()
|
||
sys.exit(app.exec_())
|