T1L_Config/SamplingDialog.py
2025-06-21 10:36:45 +08:00

627 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import struct
import numpy as np
from PyQt5.QtCore import QRegExp, QByteArray
from PyQt5.QtGui import QRegExpValidator
from PyQt5.QtNetwork import QTcpSocket
from PyQt5.QtWidgets import (
QDialog, QLineEdit, QMessageBox,
QRadioButton, QButtonGroup, QFormLayout
)
from scipy.optimize import leastsq
HEADER_MAGIC = bytes([0xAA, 0x55, 0xAA])
HEADER_SIZE = 6 # PackgeHead除去data字段的大小(3+1+1+1)
class SamplingDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("采样参数设置")
self.rate_combo = QComboBox()
self.rate_combo.addItems([
"8000", "16000", "24000", "32000", "48000", "96000"
])
self.time_input = QLineEdit()
self.time_input.setPlaceholderText("输入采样时间(秒)")
self.ok_button = QPushButton("确定")
self.ok_button.clicked.connect(self.accept)
form_layout = QFormLayout()
form_layout.addRow("采样率Hz", self.rate_combo)
form_layout.addRow("采样时间(秒)", self.time_input)
layout = QVBoxLayout()
layout.addLayout(form_layout)
layout.addWidget(self.ok_button)
self.setLayout(layout)
def get_values(self):
# 返回采样率编号(整数)和采样时间(字符串或整数)
rate_code_map = {
"8000": 0x00,
"16000": 0x01,
"24000": 0x02,
"32000": 0x03,
"48000": 0x04,
"96000": 0x05
}
rate_str = self.rate_combo.currentText()
rate_code = rate_code_map.get(rate_str, 0x00)
return rate_code, self.time_input.text()
class PackageHead:
@staticmethod
def parse(data: bytes):
if len(data) < 6:
return None
head = data[0:3]
if head != b'\xAA\x55\xAA':
return None
cmd, ver, result_code = data[3:6]
return {
'cmd': cmd,
'version': ver,
'result_code': result_code,
'data': data[6:]
}
class GetCoe:
def __init__(self, temp_ch1_1, temp_ch1_2, dc_ch1_1, dc_ch1_2, ac_ch1_1, ac_ch1_2, temp_ch2_1, temp_ch2_2, dc_ch2_1,
dc_ch2_2,
ac_ch2_1, ac_ch2_2):
self.temp_ch1_1 = temp_ch1_1
self.temp_ch1_2 = temp_ch1_2
self.dc_ch1_1 = dc_ch1_1
self.dc_ch1_2 = dc_ch1_2
self.ac_ch1_1 = ac_ch1_1
self.ac_ch1_2 = ac_ch1_2
self.temp_ch2_1 = temp_ch2_1
self.temp_ch2_2 = temp_ch2_2
self.dc_ch2_1 = dc_ch2_1
self.dc_ch2_2 = dc_ch2_2
self.ac_ch2_1 = ac_ch2_1
self.ac_ch2_2 = ac_ch2_2
class Eigenvalue:
def __init__(self, temp1, temp2, offset1, offset2):
self.temp1 = temp1
self.temp2 = temp2
self.offset1 = offset1
self.offset2 = offset2
class MacConfigDialog(QDialog):
def __init__(self, socket: QTcpSocket, parent=None):
super().__init__(parent)
self.setWindowTitle("配置 MAC")
self.socket = socket
self.setFixedWidth(300)
self.mac_input1 = QLineEdit()
self.mac_input2 = QLineEdit()
hex_reg = QRegExp("[0-9A-Fa-f]{1,2}")
validator = QRegExpValidator(hex_reg)
self.mac_input1.setValidator(validator)
self.mac_input2.setValidator(validator)
self.save_btn = QPushButton("保存")
self.cancel_btn = QPushButton("取消")
mac_line = QHBoxLayout()
mac_line.addWidget(QLabel("MAC 地址: 50:29:4D:20"))
mac_line.addWidget(self.mac_input1)
mac_line.addWidget(QLabel(":"))
mac_line.addWidget(self.mac_input2)
btn_line = QHBoxLayout()
btn_line.addWidget(self.save_btn)
btn_line.addWidget(self.cancel_btn)
main_layout = QVBoxLayout()
main_layout.addLayout(mac_line)
main_layout.addLayout(btn_line)
self.setLayout(main_layout)
self.save_btn.clicked.connect(self.send_mac_config)
self.cancel_btn.clicked.connect(self.reject)
def send_mac_config(self):
try:
mac1 = int(self.mac_input1.text(), 16)
mac2 = int(self.mac_input2.text(), 16)
except ValueError:
QMessageBox.warning(self, "输入错误", "请输入合法的十六进制数")
return
packet = bytes([
0xAA, 0x55, 0xAA,
0x0D, # cmd
0x01, # version
0x00 # result_code (发送时为0)
]) + struct.pack('2B', mac1, mac2)
self.socket.write(packet)
if not self.socket.waitForBytesWritten(1000):
QMessageBox.critical(self, "错误", "发送失败")
return
if not self.socket.waitForReadyRead(3000):
QMessageBox.critical(self, "超时", "未收到回应")
return
data = bytes(self.socket.readAll())
pkg = PackageHead.parse(data)
if not pkg:
QMessageBox.critical(self, "错误", "返回数据格式无效")
return
if pkg['result_code'] == 0x00:
QMessageBox.information(self, "成功", "配置成功")
self.accept()
else:
QMessageBox.warning(self, "失败", f"错误码: {pkg['result_code']}")
class IPConfigDialog(QDialog):
def __init__(self, socket: QTcpSocket, parent=None):
super().__init__(parent)
self.setWindowTitle("配置 IP")
self.socket = socket
self.setFixedWidth(300)
self.setFixedHeight(150)
self.dhcp_radio = QRadioButton("DHCP")
self.static_radio = QRadioButton("Static")
self.dhcp_radio.setChecked(True)
radio_layout = QHBoxLayout()
radio_layout.addWidget(self.dhcp_radio)
radio_layout.addWidget(self.static_radio)
self.radio_group = QButtonGroup()
self.radio_group.addButton(self.dhcp_radio)
self.radio_group.addButton(self.static_radio)
# IPv4地址输入框及标签
self.ip_input = QLineEdit()
self.mask_input = QLineEdit()
self.gw_input = QLineEdit()
# IP地址校验器限制0-255的IPv4格式
ip_regex = QRegExp(r"^(25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)"
r"\.(25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)"
r"\.(25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)"
r"\.(25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)$")
ip_validator = QRegExpValidator(ip_regex)
self.ip_input.setValidator(ip_validator)
self.mask_input.setValidator(ip_validator)
self.gw_input.setValidator(ip_validator)
form_layout = QFormLayout()
form_layout.setLabelAlignment(Qt.AlignRight)
form_layout.addRow(QLabel("IP 地址:"), self.ip_input)
form_layout.addRow(QLabel("子网掩码:"), self.mask_input)
form_layout.addRow(QLabel("网关:"), self.gw_input)
self.ip_input.setText("192.168.0.151")
self.mask_input.setText("255.255.255.0")
self.gw_input.setText("192.168.0.1")
self.save_btn = QPushButton("保存")
self.cancel_btn = QPushButton("取消")
btn_line = QHBoxLayout()
btn_line.addWidget(self.save_btn)
btn_line.addWidget(self.cancel_btn)
main_layout = QVBoxLayout()
main_layout.addLayout(radio_layout)
main_layout.addLayout(form_layout)
main_layout.addLayout(btn_line)
self.setLayout(main_layout)
self.save_btn.clicked.connect(self.send_ip_config)
self.cancel_btn.clicked.connect(self.reject)
# DHCP模式下禁用IP等输入
self.dhcp_radio.toggled.connect(self.update_input_enabled)
self.update_input_enabled()
def update_input_enabled(self):
enabled = self.static_radio.isChecked()
self.ip_input.setEnabled(enabled)
self.mask_input.setEnabled(enabled)
self.gw_input.setEnabled(enabled)
def send_ip_config(self):
mode = 0 if self.dhcp_radio.isChecked() else 1
def parse_ip(ip_str):
parts = ip_str.strip().split('.')
if len(parts) != 4:
raise ValueError()
return [int(p) for p in parts]
if mode == 0:
packet = bytes([
0xAA, 0x55, 0xAA,
0x0E, # cmd
0x01, # version
0x00 # result_code (发送时为0)
]) + struct.pack('B', mode)
else:
try:
ip = parse_ip(self.ip_input.text())
mask = parse_ip(self.mask_input.text())
gw = parse_ip(self.gw_input.text())
except ValueError:
QMessageBox.warning(self, "输入错误", "请输入合法的 IPv4 地址格式")
return
packet = bytes([
0xAA, 0x55, 0xAA,
0x0E, # cmd
0x01, # version
0x00 # result_code (发送时为0)
]) + struct.pack('B', mode) + bytes(ip) + bytes(mask) + bytes(gw)
self.socket.write(packet)
if not self.socket.waitForBytesWritten(1000):
QMessageBox.critical(self, "错误", "发送失败")
return
if not self.socket.waitForReadyRead(3000):
QMessageBox.critical(self, "超时", "未收到回应")
return
data = bytes(self.socket.readAll())
pkg = PackageHead.parse(data)
if not pkg:
QMessageBox.critical(self, "错误", "返回数据格式无效")
return
if pkg['result_code'] == 0x00:
QMessageBox.information(self, "成功", "配置成功")
self.accept()
else:
QMessageBox.warning(self, "失败", f"错误码: {pkg['result_code']}")
from PyQt5.QtWidgets import (
QDialog, QVBoxLayout, QHBoxLayout, QLabel, QPushButton, QComboBox,
QGridLayout
)
from PyQt5.QtCore import Qt
class CalibrationDialog(QDialog):
def __init__(self, socket: QTcpSocket, parent=None):
super().__init__(parent)
self.setWindowTitle("校准设置")
self.socket = socket
self.type_index = 0
self.setFixedWidth(300)
# 通道与类型选择
self.partial_data = QByteArray()
self.recv_state = ''
self.channel_cb = QComboBox()
self.channel_cb.addItems(["通道1", "通道2"])
self.type_cb = QComboBox()
self.type_cb.addItems(["温度", "电压DC", "电压AC"])
self.type_cb.currentTextChanged.connect(self.update_voltage_points)
self.channel_cb.currentTextChanged.connect(self.update_voltage_points)
top_layout = QHBoxLayout()
top_layout.addWidget(QLabel("通道:"))
top_layout.addWidget(self.channel_cb)
top_layout.addWidget(QLabel("类型:"))
top_layout.addWidget(self.type_cb)
# 电压点信息
self.voltage_labels = []
self.value_labels = []
self.get_buttons = []
self.voltage_points = []
grid = QGridLayout()
grid.addWidget(QLabel("电压点"), 0, 0)
grid.addWidget(QLabel("校准值"), 0, 1)
grid.addWidget(QLabel("操作按钮"), 0, 2)
for i in range(3):
v_label = QLabel()
self.voltage_labels.append(v_label)
val_label = QLabel("未获取")
self.value_labels.append(val_label)
btn = QPushButton("获取")
btn.clicked.connect(lambda _, index=i: self.fetch_voltage(index))
self.get_buttons.append(btn)
grid.addWidget(v_label, i + 1, 0)
grid.addWidget(val_label, i + 1, 1)
grid.addWidget(btn, i + 1, 2)
self.update_voltage_points()
# 初始化与校准按钮
self.init_btn = QPushButton("初始化")
self.calibrate_btn = QPushButton("校准")
self.get_coe_btn = QPushButton("获取系数")
self.init_btn.clicked.connect(self.initialize_calibration)
self.calibrate_btn.clicked.connect(self.calibrate)
self.get_coe_btn.clicked.connect(self.get_coe)
self.socket.readyRead.connect(self.on_ready_read)
btn_layout = QHBoxLayout()
btn_layout.addStretch()
btn_layout.addWidget(self.init_btn)
btn_layout.addWidget(self.calibrate_btn)
btn_layout.addWidget(self.get_coe_btn)
main_layout = QVBoxLayout()
main_layout.addLayout(top_layout)
main_layout.addLayout(grid)
main_layout.addLayout(btn_layout)
self.setLayout(main_layout)
def update_voltage_points(self):
type_text = self.type_cb.currentText()
if type_text == "温度":
self.voltage_points = [0.5, 1.5, 2.5]
elif type_text == "电压DC":
self.voltage_points = [1, 8, 15]
else:
self.voltage_points = [1, 5, 9]
for i, v in enumerate(self.voltage_points):
self.voltage_labels[i].setText(f"{v:.1f}V")
self.value_labels[i].setText("未获取")
def fetch_voltage(self, index):
# 模拟读取值
# import random
# val = round(self.voltage_points[index] + random.uniform(-0.05, 0.05), 3)
# self.value_labels[index].setText(str(val))
self.type_index = index
if self.type_cb.currentIndex() != 2:
self.get_temp_dc()
else:
self.get_wave()
def initialize_calibration(self):
for label in self.value_labels:
label.setText("未获取")
# 获取通道号和类型索引
ch = self.channel_cb.currentIndex() + 1 # 通道1 对应 1
group = self.type_cb.currentIndex() # 0=temp, 1=dc, 2=ac
print(f"ch:{ch},group:{group}")
a = 1
b = 0
header = bytes([0xAA, 0x55, 0xAA, 0x11, 0x01, 0x00]) # 可根据实际协议修改
payload = struct.pack('<BBff', ch, group, a, b)
packet = header + payload
# 发送给 MCU
self.socket.write(packet)
self.socket.waitForReadyRead()
def get_coe(self):
try:
self.socket.write(bytes(
[0xAA, 0x55, 0xAA, 0x10, 0x01, 0x00])) # 发送数据
self.socket.waitForReadyRead()
except Exception as e:
self.status_bar.showMessage(f"状态: 错误 - {str(e)}")
def calibrate(self):
try:
self.recv_state = ''
x = self.voltage_points
y = [float(label.text()) for label in self.value_labels]
if any(label.text() == "未获取" for label in self.value_labels):
raise ValueError("未获取所有电压点数据")
except ValueError:
from PyQt5.QtWidgets import QMessageBox
QMessageBox.warning(self, "错误", "请先获取所有电压点的校准值")
return
# 非线性最小二乘拟合 y = a*x + b
def func(p, x):
a, b = p
return a * x + b
def error(p, x, y):
return func(p, x) - y
p0 = [1, 0]
plsq = leastsq(error, p0, args=(np.array(y), np.array(x)))
a, b = plsq[0]
# import numpy as np
#
# a, b = np.polyfit(x, y, 1)
print(f"拟合结果: a = {a:.4f}, b = {b:.4f}")
# 获取通道号和类型索引
ch = self.channel_cb.currentIndex() + 1 # 通道1 对应 1
group = self.type_cb.currentIndex() # 0=temp, 1=dc, 2=ac
print(f"ch:{ch},group:{group}")
header = bytes([0xAA, 0x55, 0xAA, 0x11, 0x01, 0x00]) # 可根据实际协议修改
payload = struct.pack('<BBff', ch, group, a, b)
packet = header + payload
# 发送给 MCU
self.socket.write(packet)
self.socket.waitForReadyRead()
def get_extremes(self, data):
"""获取最大和最小的10个值排序法"""
sorted_data = np.sort(data)
return {
'top10_max': sorted_data[-10:][::-1].tolist(), # 降序排列
'top10_min': sorted_data[:10].tolist()
}
def mean_without_max_optimized(self, data):
"""优化内存使用的版本"""
arr = np.array(data)
if len(arr) < 2:
return np.nan
max_val = np.max(arr)
# 计算总和和计数时直接排除最大值
total = np.sum(arr) - max_val
count = len(arr) - 1
return total / count
def mean_without_min_optimized(self, data):
"""优化内存使用的版本"""
arr = np.array(data)
if len(arr) < 2:
return np.nan
min_val = np.min(arr)
# 计算总和和计数时直接排除最大值
total = np.sum(arr) - min_val
count = len(arr) - 1
return total / count
def get_temp_dc(self):
try:
self.recv_state = ''
self.socket.write(bytes(
[0xAA, 0x55, 0xAA, 0x0B, 0x01, 0x00])) # 发送数据
self.socket.waitForReadyRead()
except Exception as e:
QMessageBox.warning(self, "错误", f"{str(e)}")
def get_wave(self):
try:
self.recv_state = 'WAIT_HEADER'
rate_bytes = struct.pack('<I', 4)
time_bytes = struct.pack('<I', 1)
ch = self.channel_cb.currentIndex() + 1
packet = bytes([
0xAA, 0x55, 0xAA,
0x01, 0x01, 0x00,
ch
]) + rate_bytes + time_bytes
self.socket.write(packet)
print(packet)
self.socket.waitForReadyRead()
except Exception as e:
QMessageBox.warning(self, "错误", f"{str(e)}")
def on_ready_read(self):
while self.socket.bytesAvailable():
if self.recv_state == 'WAIT_HEADER':
if self.socket.bytesAvailable() >= HEADER_SIZE:
self.header = self.socket.read(HEADER_SIZE)
if self.header[:3] != HEADER_MAGIC:
print("无效 header magic跳过")
continue
self.cmd = self.header[3]
if self.cmd == 0x01:
self.recv_state = 'WAIT_WAVE_HEADER'
else:
print(f"未知或暂不处理的 cmd: {self.cmd}")
self.recv_state = 'WAIT_HEADER'
return
else:
return
elif self.recv_state == 'WAIT_WAVE_HEADER':
if self.socket.bytesAvailable() >= 5:
wave_header = self.socket.read(5)
self.sampling_rate = struct.unpack('<i', wave_header[1:5])[0]
self.expected_length = self.sampling_rate * 4
self.partial_data.clear()
self.recv_state = 'WAIT_WAVE_DATA'
else:
return
elif self.recv_state == 'WAIT_WAVE_DATA':
bytes_needed = self.expected_length - len(self.partial_data)
chunk = self.socket.read(min(self.socket.bytesAvailable(), bytes_needed))
self.partial_data += chunk
if len(self.partial_data) >= self.expected_length:
self.process_wave_packet(bytes(self.partial_data))
self.recv_state = 'WAIT_HEADER'
self.partial_data.clear()
else:
if self.socket.bytesAvailable() >= HEADER_SIZE:
recv_data = self.socket.read(1340)
if recv_data[:3] != HEADER_MAGIC:
print("无效 header magic跳过")
continue
cmd = recv_data[3]
if cmd == 0x0C:
body_format = '< H H H H'
body_size = struct.calcsize(body_format)
body_data = recv_data[HEADER_SIZE:HEADER_SIZE + body_size]
unpacked_data = struct.unpack(body_format, body_data)
value = Eigenvalue(*unpacked_data)
print(
f"温度1{value.temp1 / 1000},温度2{value.temp2 / 1000},偏置电压1{value.offset1 / 100},偏置电压2{value.offset2 / 100}")
if self.channel_cb.currentIndex() == 0 and self.type_cb.currentIndex() == 0:
self.value_labels[self.type_index].setText(str(value.temp1 / 1000))
elif self.channel_cb.currentIndex() == 0 and self.type_cb.currentIndex() == 1:
self.value_labels[self.type_index].setText(str(value.offset1 / 100))
elif self.channel_cb.currentIndex() == 1 and self.type_cb.currentIndex() == 0:
self.value_labels[self.type_index].setText(str(value.temp2 / 1000))
elif self.channel_cb.currentIndex() == 1 and self.type_cb.currentIndex() == 1:
self.value_labels[self.type_index].setText(str(value.offset2 / 100))
elif cmd == 0x11:
pkg = PackageHead.parse(recv_data)
if not pkg:
QMessageBox.critical(self, "错误", "返回数据格式无效")
return
if pkg['result_code'] == 0x00:
QMessageBox.information(self, "成功", "配置成功")
return
elif cmd == 0x10:
body_format = '< f f f f f f f f f f f f'
body_size = struct.calcsize(body_format)
body_data = recv_data[HEADER_SIZE:HEADER_SIZE + body_size]
unpacked_data = struct.unpack(body_format, body_data)
value = GetCoe(*unpacked_data)
print(
f"{value.temp_ch1_1, value.temp_ch1_2, value.dc_ch1_1, value.dc_ch1_2, value.ac_ch1_1, value.ac_ch1_2}")
print(
f"{value.temp_ch2_1, value.temp_ch2_2, value.dc_ch2_1, value.dc_ch2_2, value.ac_ch2_1, value.ac_ch2_2}")
def process_wave_packet(self, wave_data):
data = wave_data # 接收所有数据
data = np.frombuffer(data, dtype=np.int32) # 根据实际数据格式转换
LSB_32BIT = (2.8 / (2 ** 31)) * ((750 + 287) / 287) * 1000 / 10.2
self.scaled_data = data * LSB_32BIT
result = self.get_extremes(self.scaled_data)
print("最大的10个值:", result['top10_max'])
print("最小的10个值:", result['top10_min'])
mean_max = self.mean_without_max_optimized(result['top10_max'])
print(f"top10_max 去除最大的数据后的平均值1{mean_max}")
mean_min = self.mean_without_min_optimized(result['top10_min'])
print(f"top10_min 去除最大的数据后的平均值2{mean_min}")
pp = mean_max - mean_min
if self.type_cb.currentIndex() == 2:
self.value_labels[self.type_index].setText(str(round(pp, 6)))
print(f"pp :{mean_max - mean_min}")
def closeEvent(self, event):
try:
print("closeEvent")
self.socket.readyRead.disconnect(self.on_ready_read)
except TypeError:
pass
super().closeEvent(event)