from PyQt5.QtWidgets import ( QApplication, QDialog, QLabel, QLineEdit, QPushButton,QMessageBox, QHBoxLayout, QVBoxLayout, QMainWindow, QWidget, QRadioButton, QButtonGroup,QGridLayout,QFormLayout ) from PyQt5.QtGui import QRegExpValidator from PyQt5.QtCore import QRegExp,QByteArray import struct from PyQt5.QtNetwork import QTcpSocket import numpy as np from numpy.ma.core import masked from scipy.optimize import leastsq import struct HEADER_MAGIC = bytes([0xAA, 0x55, 0xAA]) HEADER_SIZE = 6 # PackgeHead除去data字段的大小(3+1+1+1) class SamplingDialog(QDialog): def __init__(self, parent=None): super().__init__(parent) self.setWindowTitle("采样参数设置") self.rate_combo = QComboBox() self.rate_combo.addItems([ "8000", "16000", "24000", "32000", "48000", "96000" ]) self.time_input = QLineEdit() self.time_input.setPlaceholderText("输入采样时间(秒)") self.ok_button = QPushButton("确定") self.ok_button.clicked.connect(self.accept) form_layout = QFormLayout() form_layout.addRow("采样率(Hz)", self.rate_combo) form_layout.addRow("采样时间(秒)", self.time_input) layout = QVBoxLayout() layout.addLayout(form_layout) layout.addWidget(self.ok_button) self.setLayout(layout) def get_values(self): # 返回采样率编号(整数)和采样时间(字符串或整数) rate_code_map = { "8000": 0x00, "16000": 0x01, "24000": 0x02, "32000": 0x03, "48000": 0x04, "96000": 0x05 } rate_str = self.rate_combo.currentText() rate_code = rate_code_map.get(rate_str, 0x00) return rate_code, self.time_input.text() class PackageHead: @staticmethod def parse(data: bytes): if len(data) < 6: return None head = data[0:3] if head != b'\xAA\x55\xAA': return None cmd, ver, result_code = data[3:6] return { 'cmd': cmd, 'version': ver, 'result_code': result_code, 'data': data[6:] } class GetCoe: def __init__(self, temp_ch1_1, temp_ch1_2, dc_ch1_1, dc_ch1_2,ac_ch1_1,ac_ch1_2,temp_ch2_1,temp_ch2_2,dc_ch2_1,dc_ch2_2, ac_ch2_1, ac_ch2_2): self.temp_ch1_1 = temp_ch1_1 self.temp_ch1_2 = temp_ch1_2 self.dc_ch1_1 = dc_ch1_1 self.dc_ch1_2 = dc_ch1_2 self.ac_ch1_1 = ac_ch1_1 self.ac_ch1_2 = ac_ch1_2 self.temp_ch2_1 = temp_ch2_1 self.temp_ch2_2 = temp_ch2_2 self.dc_ch2_1 = dc_ch2_1 self.dc_ch2_2 = dc_ch2_2 self.ac_ch2_1 = ac_ch2_1 self.ac_ch2_2 = ac_ch2_2 class Eigenvalue: def __init__(self, temp1, temp2, offset1, offset2): self.temp1 = temp1 self.temp2 = temp2 self.offset1 = offset1 self.offset2 = offset2 class MacConfigDialog(QDialog): def __init__(self, socket: QTcpSocket, parent=None): super().__init__(parent) self.setWindowTitle("配置 MAC") self.socket = socket self.setFixedWidth(300) self.mac_input1 = QLineEdit() self.mac_input2 = QLineEdit() hex_reg = QRegExp("[0-9A-Fa-f]{1,2}") validator = QRegExpValidator(hex_reg) self.mac_input1.setValidator(validator) self.mac_input2.setValidator(validator) self.save_btn = QPushButton("保存") self.cancel_btn = QPushButton("取消") mac_line = QHBoxLayout() mac_line.addWidget(QLabel("MAC 地址: 50:29:4D:20")) mac_line.addWidget(self.mac_input1) mac_line.addWidget(QLabel(":")) mac_line.addWidget(self.mac_input2) btn_line = QHBoxLayout() btn_line.addWidget(self.save_btn) btn_line.addWidget(self.cancel_btn) main_layout = QVBoxLayout() main_layout.addLayout(mac_line) main_layout.addLayout(btn_line) self.setLayout(main_layout) self.save_btn.clicked.connect(self.send_mac_config) self.cancel_btn.clicked.connect(self.reject) def send_mac_config(self): try: mac1 = int(self.mac_input1.text(), 16) mac2 = int(self.mac_input2.text(), 16) except ValueError: QMessageBox.warning(self, "输入错误", "请输入合法的十六进制数") return packet = bytes([ 0xAA, 0x55, 0xAA, 0x0D, # cmd 0x01, # version 0x00 # result_code (发送时为0) ]) + struct.pack('2B', mac1, mac2) self.socket.write(packet) if not self.socket.waitForBytesWritten(1000): QMessageBox.critical(self, "错误", "发送失败") return if not self.socket.waitForReadyRead(3000): QMessageBox.critical(self, "超时", "未收到回应") return data = bytes(self.socket.readAll()) pkg = PackageHead.parse(data) if not pkg: QMessageBox.critical(self, "错误", "返回数据格式无效") return if pkg['result_code'] == 0x00: QMessageBox.information(self, "成功", "配置成功") self.accept() else: QMessageBox.warning(self, "失败", f"错误码: {pkg['result_code']}") class IPConfigDialog(QDialog): def __init__(self, socket: QTcpSocket, parent=None): super().__init__(parent) self.setWindowTitle("配置 IP") self.socket = socket self.setFixedWidth(300) self.setFixedHeight(150) self.dhcp_radio = QRadioButton("DHCP") self.static_radio = QRadioButton("Static") self.dhcp_radio.setChecked(True) radio_layout = QHBoxLayout() radio_layout.addWidget(self.dhcp_radio) radio_layout.addWidget(self.static_radio) self.radio_group = QButtonGroup() self.radio_group.addButton(self.dhcp_radio) self.radio_group.addButton(self.static_radio) # IPv4地址输入框及标签 self.ip_input = QLineEdit() self.mask_input = QLineEdit() self.gw_input = QLineEdit() # IP地址校验器,限制0-255的IPv4格式 ip_regex = QRegExp(r"^(25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)" r"\.(25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)" r"\.(25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)" r"\.(25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)$") ip_validator = QRegExpValidator(ip_regex) self.ip_input.setValidator(ip_validator) self.mask_input.setValidator(ip_validator) self.gw_input.setValidator(ip_validator) form_layout = QFormLayout() form_layout.setLabelAlignment(Qt.AlignRight) form_layout.addRow(QLabel("IP 地址:"), self.ip_input) form_layout.addRow(QLabel("子网掩码:"), self.mask_input) form_layout.addRow(QLabel("网关:"), self.gw_input) self.save_btn = QPushButton("保存") self.cancel_btn = QPushButton("取消") btn_line = QHBoxLayout() btn_line.addWidget(self.save_btn) btn_line.addWidget(self.cancel_btn) main_layout = QVBoxLayout() main_layout.addLayout(radio_layout) main_layout.addLayout(form_layout) main_layout.addLayout(btn_line) self.setLayout(main_layout) self.save_btn.clicked.connect(self.send_ip_config) self.cancel_btn.clicked.connect(self.reject) # DHCP模式下禁用IP等输入 self.dhcp_radio.toggled.connect(self.update_input_enabled) self.update_input_enabled() def update_input_enabled(self): enabled = self.static_radio.isChecked() self.ip_input.setEnabled(enabled) self.mask_input.setEnabled(enabled) self.gw_input.setEnabled(enabled) def send_ip_config(self): mode = 0 if self.dhcp_radio.isChecked() else 1 def parse_ip(ip_str): parts = ip_str.strip().split('.') if len(parts) != 4: raise ValueError() return [int(p) for p in parts] if mode == 0: packet = bytes([ 0xAA, 0x55, 0xAA, 0x0E, # cmd 0x01, # version 0x00 # result_code (发送时为0) ]) + struct.pack('B', mode) else: try: ip = parse_ip(self.ip_input.text()) mask = parse_ip(self.mask_input.text()) gw = parse_ip(self.gw_input.text()) except ValueError: QMessageBox.warning(self, "输入错误", "请输入合法的 IPv4 地址格式") return packet = bytes([ 0xAA, 0x55, 0xAA, 0x0E, # cmd 0x01, # version 0x00 # result_code (发送时为0) ]) + struct.pack('B', mode) + bytes(ip) + bytes(mask) + bytes(gw) self.socket.write(packet) if not self.socket.waitForBytesWritten(1000): QMessageBox.critical(self, "错误", "发送失败") return if not self.socket.waitForReadyRead(3000): QMessageBox.critical(self, "超时", "未收到回应") return data = bytes(self.socket.readAll()) pkg = PackageHead.parse(data) if not pkg: QMessageBox.critical(self, "错误", "返回数据格式无效") return if pkg['result_code'] == 0x00: QMessageBox.information(self, "成功", "配置成功") self.accept() else: QMessageBox.warning(self, "失败", f"错误码: {pkg['result_code']}") from PyQt5.QtWidgets import ( QDialog, QVBoxLayout, QHBoxLayout, QLabel, QPushButton, QComboBox, QGridLayout, QApplication ) from PyQt5.QtCore import Qt class CalibrationDialog(QDialog): def __init__(self, socket: QTcpSocket, parent=None): super().__init__(parent) self.setWindowTitle("校准设置") self.socket = socket self.type_index = 0 # 通道与类型选择 self.partial_data = QByteArray() self.recv_state = '' self.channel_cb = QComboBox() self.channel_cb.addItems(["通道1", "通道2"]) self.type_cb = QComboBox() self.type_cb.addItems(["温度", "电压DC", "电压AC"]) self.type_cb.currentTextChanged.connect(self.update_voltage_points) top_layout = QHBoxLayout() top_layout.addWidget(QLabel("通道:")) top_layout.addWidget(self.channel_cb) top_layout.addWidget(QLabel("类型:")) top_layout.addWidget(self.type_cb) # 电压点信息 self.voltage_labels = [] self.value_labels = [] self.get_buttons = [] self.voltage_points = [] grid = QGridLayout() grid.addWidget(QLabel("电压点"), 0, 0) grid.addWidget(QLabel("校准值"), 0, 1) grid.addWidget(QLabel("操作按钮"), 0, 2) for i in range(3): v_label = QLabel() self.voltage_labels.append(v_label) val_label = QLabel("未获取") self.value_labels.append(val_label) btn = QPushButton("获取") btn.clicked.connect(lambda _, index=i: self.fetch_voltage(index)) self.get_buttons.append(btn) grid.addWidget(v_label, i+1, 0) grid.addWidget(val_label, i+1, 1) grid.addWidget(btn, i+1, 2) self.update_voltage_points() # 初始化与校准按钮 self.init_btn = QPushButton("初始化") self.calibrate_btn = QPushButton("校准") self.get_coe_btn = QPushButton("获取系数") self.init_btn.clicked.connect(self.initialize_calibration) self.calibrate_btn.clicked.connect(self.calibrate) self.get_coe_btn.clicked.connect(self.get_coe) self.socket.readyRead.connect(self.on_ready_read) btn_layout = QHBoxLayout() btn_layout.addStretch() btn_layout.addWidget(self.init_btn) btn_layout.addWidget(self.calibrate_btn) btn_layout.addWidget(self.get_coe_btn) main_layout = QVBoxLayout() main_layout.addLayout(top_layout) main_layout.addLayout(grid) main_layout.addLayout(btn_layout) self.setLayout(main_layout) def update_voltage_points(self): type_text = self.type_cb.currentText() if type_text == "温度": self.voltage_points = [0.5, 1.5, 2.5] elif type_text == "电压DC": self.voltage_points = [1, 8, 15] else: self.voltage_points = [1, 5, 9] for i, v in enumerate(self.voltage_points): self.voltage_labels[i].setText(f"{v:.1f}V") self.value_labels[i].setText("未获取") def fetch_voltage(self, index): # 模拟读取值 # import random # val = round(self.voltage_points[index] + random.uniform(-0.05, 0.05), 3) # self.value_labels[index].setText(str(val)) self.type_index = index if self.type_cb.currentIndex() != 2: self.get_temp_dc() else: self.get_wave() def initialize_calibration(self): for label in self.value_labels: label.setText("未获取") def get_coe(self): try: self.socket.write(bytes( [0xAA, 0x55, 0xAA, 0x10, 0x01, 0x00])) # 发送数据 self.socket.waitForReadyRead() except Exception as e: self.status_bar.showMessage(f"状态: 错误 - {str(e)}") def calibrate(self): try: self.recv_state = '' x = self.voltage_points y = [float(label.text()) for label in self.value_labels] if any(label.text() == "未获取" for label in self.value_labels): raise ValueError("未获取所有电压点数据") except ValueError: from PyQt5.QtWidgets import QMessageBox QMessageBox.warning(self, "错误", "请先获取所有电压点的校准值") return # 非线性最小二乘拟合 y = a*x + b def func(p, x): a, b = p return a * x + b def error(p, x, y): return func(p, x) - y p0 = [1, 0] plsq = leastsq(error, p0, args=(np.array(x), np.array(y))) a, b = plsq[0] print(f"拟合结果: a = {a:.4f}, b = {b:.4f}") # 获取通道号和类型索引 ch = self.channel_cb.currentIndex() + 1 # 通道1 对应 1 group = self.type_cb.currentIndex() # 0=temp, 1=dc, 2=ac print(f"ch:{ch},group:{group}") header = bytes([0xAA, 0x55, 0xAA, 0x11,0x01,0x00]) # 可根据实际协议修改 payload = struct.pack('= HEADER_SIZE: self.header = self.socket.read(HEADER_SIZE) if self.header[:3] != HEADER_MAGIC: print("无效 header magic,跳过") continue self.cmd = self.header[3] if self.cmd == 0x01: self.recv_state = 'WAIT_WAVE_HEADER' else: print(f"未知或暂不处理的 cmd: {self.cmd}") self.recv_state = 'WAIT_HEADER' return else: return elif self.recv_state == 'WAIT_WAVE_HEADER': if self.socket.bytesAvailable() >= 5: wave_header = self.socket.read(5) self.sampling_rate = struct.unpack('= self.expected_length: self.process_wave_packet(bytes(self.partial_data)) self.recv_state = 'WAIT_HEADER' self.partial_data.clear() else: if self.socket.bytesAvailable() >= HEADER_SIZE: recv_data = self.socket.read(1340) if recv_data[:3] != HEADER_MAGIC: print("无效 header magic,跳过") continue cmd = recv_data[3] if cmd == 0x0C: body_format = '< H H H H' body_size = struct.calcsize(body_format) body_data = recv_data[HEADER_SIZE:HEADER_SIZE + body_size] unpacked_data = struct.unpack(body_format, body_data) value = Eigenvalue(*unpacked_data) print(f"温度1:{value.temp1/1000},温度2:{value.temp2/1000},偏置电压1:{value.offset1/100},偏置电压2:{value.offset2/100}") if self.channel_cb.currentIndex() == 0 and self.type_cb.currentIndex() == 0: self.value_labels[self.type_index].setText(str(value.temp1/1000)) elif self.channel_cb.currentIndex() == 0 and self.type_cb.currentIndex() == 1: self.value_labels[self.type_index].setText(str(value.offset1 / 100)) elif self.channel_cb.currentIndex() == 1 and self.type_cb.currentIndex() == 0: self.value_labels[self.type_index].setText(str(value.temp2/1000)) elif self.channel_cb.currentIndex() == 1 and self.type_cb.currentIndex() == 1: self.value_labels[self.type_index].setText(str(value.offset2 / 100)) elif cmd == 0x11: pkg = PackageHead.parse(recv_data) if not pkg: QMessageBox.critical(self, "错误", "返回数据格式无效") return if pkg['result_code'] == 0x00: QMessageBox.information(self, "成功", "配置成功") return elif cmd == 0x10: body_format = '< f f f f f f f f f f f f' body_size = struct.calcsize(body_format) body_data = recv_data[HEADER_SIZE:HEADER_SIZE + body_size] unpacked_data = struct.unpack(body_format, body_data) value = GetCoe(*unpacked_data) print( f"{value.temp_ch1_1, value.temp_ch1_2, value.dc_ch1_1, value.dc_ch1_2, value.ac_ch1_1, value.ac_ch1_2}") print( f"{value.temp_ch2_1, value.temp_ch2_2, value.dc_ch2_1, value.dc_ch2_2, value.ac_ch2_1, value.ac_ch2_2}") def process_wave_packet(self,wave_data): data = wave_data # 接收所有数据 data = np.frombuffer(data, dtype=np.int32) # 根据实际数据格式转换 LSB_32BIT = (2.8 / (2 ** 31)) * ((750 + 287) / 287) * 1000/10.2 self.scaled_data = data * LSB_32BIT result = self.get_extremes(self.scaled_data) print("最大的10个值:", result['top10_max']) print("最小的10个值:", result['top10_min']) mean_max = self.mean_without_max_optimized(result['top10_max']) print(f"top10_max 去除最大的数据后的平均值1:{mean_max}") mean_min = self.mean_without_min_optimized(result['top10_min']) print(f"top10_min 去除最大的数据后的平均值2:{mean_min}") pp = mean_max - mean_min if self.type_cb.currentIndex() == 2: self.value_labels[self.type_index].setText(str(pp)) print(f"pp :{mean_max - mean_min}") def closeEvent(self, event): try: print("closeEvent") self.socket.readyRead.disconnect(self.on_ready_read) except TypeError: pass super().closeEvent(event)