From f7d3858765eb986d0e850871c019bf9a4199261c Mon Sep 17 00:00:00 2001 From: pandx Date: Sat, 21 Jun 2025 10:36:45 +0800 Subject: [PATCH] format codes --- SamplingDialog.py | 79 +++++++++++++-------- feauture_calculate.py | 34 +++++---- mainwindow.py | 157 ++++++++++++++++++++++-------------------- 3 files changed, 151 insertions(+), 119 deletions(-) diff --git a/SamplingDialog.py b/SamplingDialog.py index 33ccc61..f2eb61e 100644 --- a/SamplingDialog.py +++ b/SamplingDialog.py @@ -1,19 +1,19 @@ +import struct -from PyQt5.QtWidgets import ( - QApplication, QDialog, QLabel, QLineEdit, QPushButton,QMessageBox, - QHBoxLayout, QVBoxLayout, QMainWindow, QWidget, QRadioButton, QButtonGroup,QGridLayout,QFormLayout -) -from PyQt5.QtGui import QRegExpValidator -from PyQt5.QtCore import QRegExp,QByteArray -import struct -from PyQt5.QtNetwork import QTcpSocket import numpy as np -from numpy.ma.core import masked +from PyQt5.QtCore import QRegExp, QByteArray +from PyQt5.QtGui import QRegExpValidator +from PyQt5.QtNetwork import QTcpSocket +from PyQt5.QtWidgets import ( + QDialog, QLineEdit, QMessageBox, + QRadioButton, QButtonGroup, QFormLayout +) from scipy.optimize import leastsq -import struct + HEADER_MAGIC = bytes([0xAA, 0x55, 0xAA]) HEADER_SIZE = 6 # PackgeHead除去data字段的大小(3+1+1+1) + class SamplingDialog(QDialog): def __init__(self, parent=None): super().__init__(parent) @@ -53,6 +53,8 @@ class SamplingDialog(QDialog): rate_str = self.rate_combo.currentText() rate_code = rate_code_map.get(rate_str, 0x00) return rate_code, self.time_input.text() + + class PackageHead: @staticmethod def parse(data: bytes): @@ -68,8 +70,11 @@ class PackageHead: 'result_code': result_code, 'data': data[6:] } + + class GetCoe: - def __init__(self, temp_ch1_1, temp_ch1_2, dc_ch1_1, dc_ch1_2,ac_ch1_1,ac_ch1_2,temp_ch2_1,temp_ch2_2,dc_ch2_1,dc_ch2_2, + def __init__(self, temp_ch1_1, temp_ch1_2, dc_ch1_1, dc_ch1_2, ac_ch1_1, ac_ch1_2, temp_ch2_1, temp_ch2_2, dc_ch2_1, + dc_ch2_2, ac_ch2_1, ac_ch2_2): self.temp_ch1_1 = temp_ch1_1 self.temp_ch1_2 = temp_ch1_2 @@ -83,12 +88,16 @@ class GetCoe: self.dc_ch2_2 = dc_ch2_2 self.ac_ch2_1 = ac_ch2_1 self.ac_ch2_2 = ac_ch2_2 + + class Eigenvalue: def __init__(self, temp1, temp2, offset1, offset2): self.temp1 = temp1 self.temp2 = temp2 self.offset1 = offset1 self.offset2 = offset2 + + class MacConfigDialog(QDialog): def __init__(self, socket: QTcpSocket, parent=None): super().__init__(parent) @@ -134,7 +143,7 @@ class MacConfigDialog(QDialog): 0xAA, 0x55, 0xAA, 0x0D, # cmd 0x01, # version - 0x00 # result_code (发送时为0) + 0x00 # result_code (发送时为0) ]) + struct.pack('2B', mac1, mac2) self.socket.write(packet) @@ -159,6 +168,7 @@ class MacConfigDialog(QDialog): else: QMessageBox.warning(self, "失败", f"错误码: {pkg['result_code']}") + class IPConfigDialog(QDialog): def __init__(self, socket: QTcpSocket, parent=None): super().__init__(parent) @@ -235,6 +245,7 @@ class IPConfigDialog(QDialog): if len(parts) != 4: raise ValueError() return [int(p) for p in parts] + if mode == 0: packet = bytes([ 0xAA, 0x55, 0xAA, @@ -257,8 +268,6 @@ class IPConfigDialog(QDialog): 0x00 # result_code (发送时为0) ]) + struct.pack('B', mode) + bytes(ip) + bytes(mask) + bytes(gw) - - self.socket.write(packet) if not self.socket.waitForBytesWritten(1000): QMessageBox.critical(self, "错误", "发送失败") @@ -280,12 +289,15 @@ class IPConfigDialog(QDialog): self.accept() else: QMessageBox.warning(self, "失败", f"错误码: {pkg['result_code']}") + + from PyQt5.QtWidgets import ( QDialog, QVBoxLayout, QHBoxLayout, QLabel, QPushButton, QComboBox, - QGridLayout, QApplication + QGridLayout ) from PyQt5.QtCore import Qt + class CalibrationDialog(QDialog): def __init__(self, socket: QTcpSocket, parent=None): super().__init__(parent) @@ -332,9 +344,9 @@ class CalibrationDialog(QDialog): btn.clicked.connect(lambda _, index=i: self.fetch_voltage(index)) self.get_buttons.append(btn) - grid.addWidget(v_label, i+1, 0) - grid.addWidget(val_label, i+1, 1) - grid.addWidget(btn, i+1, 2) + grid.addWidget(v_label, i + 1, 0) + grid.addWidget(val_label, i + 1, 1) + grid.addWidget(btn, i + 1, 2) self.update_voltage_points() @@ -399,6 +411,7 @@ class CalibrationDialog(QDialog): # 发送给 MCU self.socket.write(packet) self.socket.waitForReadyRead() + def get_coe(self): try: self.socket.write(bytes( @@ -408,6 +421,7 @@ class CalibrationDialog(QDialog): except Exception as e: self.status_bar.showMessage(f"状态: 错误 - {str(e)}") + def calibrate(self): try: self.recv_state = '' @@ -442,14 +456,15 @@ class CalibrationDialog(QDialog): group = self.type_cb.currentIndex() # 0=temp, 1=dc, 2=ac print(f"ch:{ch},group:{group}") - header = bytes([0xAA, 0x55, 0xAA, 0x11,0x01,0x00]) # 可根据实际协议修改 + header = bytes([0xAA, 0x55, 0xAA, 0x11, 0x01, 0x00]) # 可根据实际协议修改 payload = struct.pack(' 0]) ylb_SWPE = sum(signal_ylb_SWPE) - ylb_SWPA = max(signal_ylb_SWPA) # 最大峰高 SWPA + ylb_SWPA = max(signal_ylb_SWPA) # 最大峰高 SWPA return ylb_SWE, ylb_SWPE, ylb_SWPA @@ -701,7 +703,7 @@ def calc_fea_HCR(fft_data, cf, sf, sp, f_st, f_ord): except ValueError: Hb1 = 0 try: - Hb2 = sum([x ** 2 for x in xb2]) # 上边带能量和 + Hb2 = sum([x ** 2 for x in xb2]) # 上边带能量和 except ValueError: Hb2 = 0 xc = calc_multiple_frequency(fft_data, i * cf, sp) # 给定中心频率幅值 @@ -732,15 +734,17 @@ def calc_HCR(fft_data, cf, sf, sp, f_st, f_ord): HCR = sqrt(sum(array(fea_HCR) ** 2)) return HCR + if __name__ == "__main__": freq = 8192 - data = np.loadtxt(r"E:\Workspace\Software\Python\无线传感器算法\特征值计算\09f07f1800158d00-Y.csv", delimiter=',', usecols=(0)) + data = np.loadtxt(r"E:\Workspace\Software\Python\无线传感器算法\特征值计算\09f07f1800158d00-Y.csv", delimiter=',', + usecols=(0)) data_list = data.tolist() data_list = data_list[1:] print("数据长度:", len(data_list)) - #速度有效值 + # 速度有效值 speed_rms_value = calc_vel_pass_rms(data_list, freq) print("速度有效值:", speed_rms_value) # 速度峰值 diff --git a/mainwindow.py b/mainwindow.py index 8f1fc3f..1eabcd0 100644 --- a/mainwindow.py +++ b/mainwindow.py @@ -1,23 +1,19 @@ -import sys -import socket -import os -from time import sleep - -import numpy as np -from scipy.fft import fft, fftfreq -import matplotlib.pyplot as plt -from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas, NavigationToolbar2QT -from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget,QMessageBox, QPushButton, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QFrame, QGridLayout, QStatusBar, QSizePolicy, QFileDialog -from PyQt5.QtCore import Qt, QTimer, pyqtSlot, QByteArray -from PyQt5.QtNetwork import QTcpSocket, QAbstractSocket -import matplotlib -import struct -import time -from typing import Tuple, Optional -from feauture_calculate import * -from SamplingDialog import * import csv -import pandas as pd +import os +import sys +import time +from time import sleep +from typing import Tuple, Optional + +import matplotlib +import matplotlib.pyplot as plt +from PyQt5.QtCore import QTimer +from PyQt5.QtWidgets import QFrame, QStatusBar, QSizePolicy, QFileDialog +from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas, NavigationToolbar2QT +from scipy.fft import fft, fftfreq + +from SamplingDialog import * +from feauture_calculate import * # 启用高DPI支持 QApplication.setAttribute(Qt.AA_EnableHighDpiScaling) @@ -40,8 +36,10 @@ class Eigenvalue: self.offset1 = offset1 self.offset2 = offset2 + class MatplotlibCanvas(FigureCanvas): """ 用于在 Qt 界面中嵌入 Matplotlib 绘图 """ + def __init__(self, parent=None): self.fig, self.axs = plt.subplots(2, 2, figsize=(10, 5)) # 2 个子图(时域 + 频域) self.fig.subplots_adjust(hspace=0.6) # 增大时域图和频域图的间距 @@ -51,15 +49,17 @@ class MatplotlibCanvas(FigureCanvas): def init_plot(self): """ 初始化默认图像(占位提示) """ - self.axs[0,0].set_title("加速度时域信号", fontsize=12, fontweight='bold') - self.axs[0,0].set_xlabel("采样点") - self.axs[0,0].set_ylabel("幅度(m/s2)") - self.axs[0,0].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center', transform=self.axs[0,0].transAxes) + self.axs[0, 0].set_title("加速度时域信号", fontsize=12, fontweight='bold') + self.axs[0, 0].set_xlabel("采样点") + self.axs[0, 0].set_ylabel("幅度(m/s2)") + self.axs[0, 0].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center', + transform=self.axs[0, 0].transAxes) - self.axs[0,1].set_title("加速度频域信号", fontsize=12, fontweight='bold') - self.axs[0,1].set_xlabel("频率 (Hz)") - self.axs[0,1].set_ylabel("幅度(m/s2)") - self.axs[0,1].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center', transform=self.axs[0,1].transAxes) + self.axs[0, 1].set_title("加速度频域信号", fontsize=12, fontweight='bold') + self.axs[0, 1].set_xlabel("频率 (Hz)") + self.axs[0, 1].set_ylabel("幅度(m/s2)") + self.axs[0, 1].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center', + transform=self.axs[0, 1].transAxes) self.axs[1, 0].set_title("速度时域信号", fontsize=12, fontweight='bold') self.axs[1, 0].set_xlabel("采样点") @@ -75,16 +75,16 @@ class MatplotlibCanvas(FigureCanvas): self.draw() # 更新绘图 - def plot_data(self, data,sample): + def plot_data(self, data, sample): """ 绘制时域和频域图 """ - self.axs[0,0].clear() - self.axs[0,1].clear() + self.axs[0, 0].clear() + self.axs[0, 1].clear() # 时域信号 - self.axs[0,0].plot(data, color='blue') - self.axs[0,0].set_title("加速度时域信号", fontsize=12, fontweight='bold') - self.axs[0,0].set_xlabel("采样点") - self.axs[0,0].set_ylabel("幅度(m/s2)") + self.axs[0, 0].plot(data, color='blue') + self.axs[0, 0].set_title("加速度时域信号", fontsize=12, fontweight='bold') + self.axs[0, 0].set_xlabel("采样点") + self.axs[0, 0].set_ylabel("幅度(m/s2)") # 频域信号 N = len(data) @@ -96,25 +96,25 @@ class MatplotlibCanvas(FigureCanvas): # yf = yf[:4000] # xf = xf[:4000] - self.axs[0,1].plot(xf, 2.0 / N * np.abs(yf), lw=0.2, color='black') - self.axs[0,1].plot(xf, 2.0 / N * np.abs(yf), '.', lw=0.3, color='red') + self.axs[0, 1].plot(xf, 2.0 / N * np.abs(yf), lw=0.2, color='black') + self.axs[0, 1].plot(xf, 2.0 / N * np.abs(yf), '.', lw=0.3, color='red') - self.axs[0,1].set_title("加速度频域信号", fontsize=12, fontweight='bold') - self.axs[0,1].set_xlabel("频率 (Hz)") - self.axs[0,1].set_ylabel("幅度(m/s2)") + self.axs[0, 1].set_title("加速度频域信号", fontsize=12, fontweight='bold') + self.axs[0, 1].set_xlabel("频率 (Hz)") + self.axs[0, 1].set_ylabel("幅度(m/s2)") self.draw() # 更新绘图 - def plot_data_vel(self, data,sample): + def plot_data_vel(self, data, sample): """ 绘制时域和频域图 """ - self.axs[1,0].clear() - self.axs[1,1].clear() + self.axs[1, 0].clear() + self.axs[1, 1].clear() # 时域信号 - self.axs[1,0].plot(data, color='blue') - self.axs[1,0].set_title("速度时域信号", fontsize=12, fontweight='bold') - self.axs[1,0].set_xlabel("采样点") - self.axs[1,0].set_ylabel("幅度(mm/s)") + self.axs[1, 0].plot(data, color='blue') + self.axs[1, 0].set_title("速度时域信号", fontsize=12, fontweight='bold') + self.axs[1, 0].set_xlabel("采样点") + self.axs[1, 0].set_ylabel("幅度(mm/s)") # 频域信号 N = len(data) @@ -126,15 +126,16 @@ class MatplotlibCanvas(FigureCanvas): yf = yf[:1000] xf = xf[:1000] - self.axs[1,1].plot(xf, 2.0 / N * np.abs(yf), lw=0.2, color='black') - self.axs[1,1].plot(xf, 2.0 / N * np.abs(yf), '.', lw=0.3, color='red') + self.axs[1, 1].plot(xf, 2.0 / N * np.abs(yf), lw=0.2, color='black') + self.axs[1, 1].plot(xf, 2.0 / N * np.abs(yf), '.', lw=0.3, color='red') - self.axs[1,1].set_title("速度频域信号", fontsize=12, fontweight='bold') - self.axs[1,1].set_xlabel("频率 (Hz)") - self.axs[1,1].set_ylabel("幅度(mm/s)") + self.axs[1, 1].set_title("速度频域信号", fontsize=12, fontweight='bold') + self.axs[1, 1].set_xlabel("频率 (Hz)") + self.axs[1, 1].set_ylabel("幅度(mm/s)") self.draw() # 更新绘图 + class SocketClientApp(QMainWindow): def __init__(self): super().__init__() @@ -293,7 +294,7 @@ class SocketClientApp(QMainWindow): # 设置布局策略,确保控件大小随窗口调整 self.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding) - def get_extremes(self,data): + def get_extremes(self, data): """获取最大和最小的10个值(排序法)""" sorted_data = np.sort(data) return { @@ -301,7 +302,7 @@ class SocketClientApp(QMainWindow): 'top10_min': sorted_data[:10].tolist() } - def mean_without_max_optimized(self,data): + def mean_without_max_optimized(self, data): """优化内存使用的版本""" arr = np.array(data) if len(arr) < 2: @@ -324,7 +325,8 @@ class SocketClientApp(QMainWindow): total = np.sum(arr) - min_val count = len(arr) - 1 return total / count - def calculate_crc(self,data: bytes) -> int: + + def calculate_crc(self, data: bytes) -> int: """计算数据的累加和CRC""" crc = 0 for byte in data: @@ -332,7 +334,7 @@ class SocketClientApp(QMainWindow): return crc & 0xFF # 只保留最低字节 def connect_to_server(self): - #self.process_wave_packet('') + # self.process_wave_packet('') if self.connect_button.text() == "断开": self.connect_button.setText("连接") self.socket.disconnectFromHost() @@ -392,6 +394,7 @@ class SocketClientApp(QMainWindow): self.status_bar.showMessage("状态: 重新连接成功") self.get_data_button.setEnabled(True) self.reconnect_timer.stop() + def on_button_save_csv(self): file_path = QFileDialog.getExistingDirectory( None, @@ -424,10 +427,12 @@ class SocketClientApp(QMainWindow): "保存失败", f"保存文件时出错:\n{str(e)}" ) + def on_button_samping_set(self): dialog = SamplingDialog(self) if dialog.exec_() == QDialog.Accepted: self.rate_code, self.time_str = dialog.get_values() + def on_ready_read(self): while self.socket.bytesAvailable(): if self.recv_state == 'WAIT_HEADER': @@ -466,7 +471,7 @@ class SocketClientApp(QMainWindow): execution_time = end_time - self.start_time print(f"结束时间戳: {end_time}") print(f"代码执行时间: {execution_time} 秒") - print(f"speed :{round((self.sampling_rate * 4 * 8) / execution_time,3)} Kbps") + print(f"speed :{round((self.sampling_rate * 4 * 8) / execution_time, 3)} Kbps") self.process_wave_packet(bytes(self.partial_data)) self.recv_state = 'WAIT_HEADER' self.partial_data.clear() @@ -483,8 +488,9 @@ class SocketClientApp(QMainWindow): body_data = recv_data[HEADER_SIZE:HEADER_SIZE + body_size] unpacked_data = struct.unpack(body_format, body_data) value = Eigenvalue(*unpacked_data) - print(f"温度1:{value.temp1/1000},温度2:{value.temp2/1000},偏置电压1:{value.offset1/100},偏置电压2:{value.offset2/100}") - self.temp1_label.setText(f"温度1:{value.temp1/1000} v") + print( + f"温度1:{value.temp1 / 1000},温度2:{value.temp2 / 1000},偏置电压1:{value.offset1 / 100},偏置电压2:{value.offset2 / 100}") + self.temp1_label.setText(f"温度1:{value.temp1 / 1000} v") self.temp2_label.setText(f"温度2:{value.temp2 / 1000} v") self.offset1_label.setText(f"偏置电压1:{value.offset1 / 100} v") self.offset2_label.setText(f"偏置电压2:{value.offset2 / 100} v") @@ -496,7 +502,6 @@ class SocketClientApp(QMainWindow): print(f"version{unpacked_data}") QMessageBox.information(self, "版本", f"{unpacked_data[0]}.{unpacked_data[1]}") - def receive_data(self, length: int): while len(self.buffer) < length: if not self.socket.waitForReadyRead(1000): @@ -505,7 +510,7 @@ class SocketClientApp(QMainWindow): self.buffer = self.buffer[length:] return bytes(data) - def parse_package_head(self,data: bytes) -> Tuple[Optional[dict], Optional[bytes]]: + def parse_package_head(self, data: bytes) -> Tuple[Optional[dict], Optional[bytes]]: """ 解析包头部 返回: (header_dict, remaining_data) @@ -525,13 +530,14 @@ class SocketClientApp(QMainWindow): 'version': version, 'result_code': result_code }, data[HEADER_SIZE:] - def process_wave_packet(self,wave_data): + + def process_wave_packet(self, wave_data): data = wave_data # 接收所有数据 data = np.frombuffer(data, dtype=np.int32) # 根据实际数据格式转换 for i in range(min(100, len(data))): # 确保不超过数据长度 print(f"{data[i]:1f}", end=" ") print() # 换行 - LSB_32BIT = (2.8 / (2 ** 31)) * ((750 + 287) / 287) * 1000/10.2 + LSB_32BIT = (2.8 / (2 ** 31)) * ((750 + 287) / 287) * 1000 / 10.2 # LSB_32BIT = (2.8 / (2 ** 31)) * ((750 + 287) / 287) * 1000 self.scaled_data = data * LSB_32BIT # for i in range(min(100, len(self.scaled_data))): # 确保不超过数据长度 @@ -547,10 +553,10 @@ class SocketClientApp(QMainWindow): # print(f"pp :{mean_max - mean_min}") print(f"采样率: {self.sampling_rate}") # self.sampling_rate = 48000 # 假设采样率为 48000 Hz - self.canvas.plot_data(self.scaled_data,self.sampling_rate) # 在 Qt 界面中绘图 - data_filter = fft_filter(self.scaled_data, 10, 1000,self.sampling_rate) - data_vel1,data_vel2 = acc2dis(data_filter,self.sampling_rate) - self.canvas.plot_data_vel(data_vel1,self.sampling_rate) # 在 Qt 界面中绘图 + self.canvas.plot_data(self.scaled_data, self.sampling_rate) # 在 Qt 界面中绘图 + data_filter = fft_filter(self.scaled_data, 10, 1000, self.sampling_rate) + data_vel1, data_vel2 = acc2dis(data_filter, self.sampling_rate) + self.canvas.plot_data_vel(data_vel1, self.sampling_rate) # 在 Qt 界面中绘图 self.status_bar.showMessage("状态: 数据绘制完成") # 速度有效值 @@ -565,10 +571,10 @@ class SocketClientApp(QMainWindow): # 加速度峰值 acc_p = calc_acc_p(self.scaled_data, self.sampling_rate) print("加速度峰值:", acc_p) - self.acceleration_label_rms.setText(f"加速度有效值:{round(acc_rms,3)} m/s^2") - self.acceleration_label_pp.setText(f"加速度峰值:{round(acc_p,3)} m/s^2") - self.velocity_label_rms.setText(f"速度有效值:{round(speed_rms_value,3)} mm/s") - self.velocity_label_pp.setText(f"速度峰值:{round(speed_vel_p,3)} mm/s") + self.acceleration_label_rms.setText(f"加速度有效值:{round(acc_rms, 3)} m/s^2") + self.acceleration_label_pp.setText(f"加速度峰值:{round(acc_p, 3)} m/s^2") + self.velocity_label_rms.setText(f"速度有效值:{round(speed_rms_value, 3)} mm/s") + self.velocity_label_pp.setText(f"速度峰值:{round(speed_vel_p, 3)} mm/s") def on_button_clicked(self): try: @@ -589,6 +595,7 @@ class SocketClientApp(QMainWindow): self.socket.waitForReadyRead() except Exception as e: self.status_bar.showMessage(f"状态: 错误 - {str(e)}") + def on_button_clicked2(self): """ 获取数据并绘制 """ try: @@ -608,6 +615,7 @@ class SocketClientApp(QMainWindow): self.socket.waitForReadyRead() except Exception as e: self.status_bar.showMessage(f"状态: 错误 - {str(e)}") + def on_button_upgrade(self): """打开文件选择对话框""" file_path, _ = QFileDialog.getOpenFileName( @@ -631,12 +639,10 @@ class SocketClientApp(QMainWindow): print(f"Read upgrade package, size: {len(package_data)} bytes") - upgrade_len = len(package_data) crc = self.calculate_crc(package_data) upgrade_req = struct.pack("