import sys import socket import os from time import sleep import numpy as np from scipy.fft import fft, fftfreq import matplotlib.pyplot as plt from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas, NavigationToolbar2QT from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QPushButton, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QFrame, QGridLayout, QStatusBar, QSizePolicy, QFileDialog from PyQt5.QtCore import Qt, QTimer, pyqtSlot, QByteArray from PyQt5.QtNetwork import QTcpSocket, QAbstractSocket import matplotlib import struct import time from typing import Tuple, Optional # 启用高DPI支持 QApplication.setAttribute(Qt.AA_EnableHighDpiScaling) QApplication.setAttribute(Qt.AA_UseHighDpiPixmaps) # 强制使用 Qt5Agg 后端 matplotlib.use('Qt5Agg') plt.rcParams['font.sans-serif'] = ['SimHei'] # Windows 中文字体 plt.rcParams['axes.unicode_minus'] = False # 解决负号 '-' 显示问题 HEADER_MAGIC = bytes([0xAA, 0x55, 0xAA]) HEADER_SIZE = 6 # PackgeHead除去data字段的大小(3+1+1+1) class MatplotlibCanvas(FigureCanvas): """ 用于在 Qt 界面中嵌入 Matplotlib 绘图 """ def __init__(self, parent=None): self.fig, self.axs = plt.subplots(2, 1, figsize=(10, 5)) # 2 个子图(时域 + 频域) self.fig.subplots_adjust(hspace=0.6) # 增大时域图和频域图的间距 super().__init__(self.fig) self.setParent(parent) self.init_plot() def init_plot(self): """ 初始化默认图像(占位提示) """ self.axs[0].set_title("时域信号", fontsize=12, fontweight='bold') self.axs[0].set_xlabel("采样点") self.axs[0].set_ylabel("幅度") self.axs[0].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center', transform=self.axs[0].transAxes) self.axs[1].set_title("频域信号", fontsize=12, fontweight='bold') self.axs[1].set_xlabel("频率 (Hz)") self.axs[1].set_ylabel("幅度") self.axs[1].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center', transform=self.axs[1].transAxes) self.draw() # 更新绘图 def plot_data(self, data): """ 绘制时域和频域图 """ self.axs[0].clear() self.axs[1].clear() # 时域信号 self.axs[0].plot(data, color='blue') self.axs[0].set_title("时域信号", fontsize=12, fontweight='bold') self.axs[0].set_xlabel("采样点") self.axs[0].set_ylabel("幅度") # 频域信号 N = len(data) T = 1.0 / 96000 # 假设采样率为 96000 Hz yf = fft(data) xf = fftfreq(N, T)[:N // 2] self.axs[1].plot(xf, 2.0 / N * np.abs(yf[:N // 2]), color='red') self.axs[1].set_title("频域信号", fontsize=12, fontweight='bold') self.axs[1].set_xlabel("频率 (Hz)") self.axs[1].set_ylabel("幅度") self.draw() # 更新绘图 class SocketClientApp(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("Socket Client & Data Plotter") self.setGeometry(100, 100, 700, 600) # 设置初始尺寸 self.buffer = QByteArray() self.reconnect_timer = QTimer(self) self.reconnect_timer.timeout.connect(self.attempt_reconnect) self.reconnect_interval = 5000 self.max_reconnect_attempts = 5 self.current_reconnect_attempts = 0 self.socket = QTcpSocket(self) self.socket.readyRead.connect(self.on_ready_read) self.socket.disconnected.connect(self.on_socket_disconnected) self.socket.errorOccurred.connect(self.on_socket_error) # 组件初始化 self.ip_label = QLabel("IP 地址:") self.port_label = QLabel("端口号:") self.ip_input = QLineEdit(self) self.port_input = QLineEdit(self) self.connect_button = QPushButton("连接") self.get_data_button = QPushButton("获取数据") self.upgrade_button = QPushButton("更新固件") self.upgrade_button_test = QPushButton("更新固件2") self.get_data_button.setEnabled(False) # 初始状态不可点击 self.upgrade_button.setEnabled(False) # 初始状态不可点击 self.upgrade_button_test.setEnabled(False) # 初始状态不可点击 self.ip_input.setText("192.168.0.200") self.port_input.setText("12345") # 预留绘图区域 self.canvas = MatplotlibCanvas(self) self.toolbar = NavigationToolbar2QT(self.canvas, self) # 添加工具栏(放大、缩小、拖拽) # 预留特征值显示区域 self.feature_label = QLabel("特征值:") self.acceleration_label = QLabel("加速度: -") self.velocity_label = QLabel("速度: -") # 设置分割线 self.line = QFrame() self.line.setFrameShape(QFrame.HLine) self.line.setFrameShadow(QFrame.Sunken) # 布局管理 main_layout = QVBoxLayout() # 输入区域(IP & 端口) input_layout = QHBoxLayout() input_layout.addWidget(self.ip_label) input_layout.addWidget(self.ip_input) input_layout.addWidget(self.port_label) input_layout.addWidget(self.port_input) main_layout.addLayout(input_layout) # 按钮区域 button_layout = QHBoxLayout() button_layout.addWidget(self.connect_button) button_layout.addWidget(self.get_data_button) button_layout.addWidget(self.upgrade_button) button_layout.addWidget(self.upgrade_button_test) main_layout.addLayout(button_layout) # 添加分割线 main_layout.addWidget(self.line) # 工具栏(放大、缩小、拖拽) main_layout.addWidget(self.toolbar) # 绘图 + 特征值区域 graph_layout = QVBoxLayout() graph_layout.addWidget(self.canvas) # 特征值显示区域 feature_layout = QGridLayout() feature_layout.addWidget(self.feature_label, 0, 0) feature_layout.addWidget(self.acceleration_label, 0, 1) feature_layout.addWidget(self.velocity_label, 0, 2) graph_layout.addLayout(feature_layout) main_layout.addLayout(graph_layout) # 增大特征值区域与状态栏的间距 main_layout.addSpacing(20) # 创建状态栏 self.status_bar = QStatusBar() self.setStatusBar(self.status_bar) # 状态栏显示初始消息 self.status_bar.showMessage("状态: 请填写 IP 和端口号") # 主窗口的中央部件 central_widget = QWidget() central_widget.setLayout(main_layout) self.setCentralWidget(central_widget) # 事件绑定 self.connect_button.clicked.connect(self.connect_to_server) self.get_data_button.clicked.connect(self.on_button_clicked) self.upgrade_button.clicked.connect(self.on_button_upgrade) self.upgrade_button_test.clicked.connect(self.on_button_upgrade_test) # 设置布局策略,确保控件大小随窗口调整 self.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding) def get_extremes(self,data): """获取最大和最小的10个值(排序法)""" sorted_data = np.sort(data) return { 'top10_max': sorted_data[-10:][::-1].tolist(), # 降序排列 'top10_min': sorted_data[:10].tolist() } def mean_without_max_optimized(self,data): """优化内存使用的版本""" arr = np.array(data) if len(arr) < 2: return np.nan max_val = np.max(arr) # 计算总和和计数时直接排除最大值 total = np.sum(arr) - max_val count = len(arr) - 1 return total / count def mean_without_min_optimized(self, data): """优化内存使用的版本""" arr = np.array(data) if len(arr) < 2: return np.nan min_val = np.min(arr) # 计算总和和计数时直接排除最大值 total = np.sum(arr) - min_val count = len(arr) - 1 return total / count def calculate_crc(self,data: bytes) -> int: """计算数据的累加和CRC""" crc = 0 for byte in data: crc += byte return crc & 0xFF # 只保留最低字节 def connect_to_server(self): ip = self.ip_input.text() port = int(self.port_input.text()) self.socket.abort() self.socket.connectToHost(ip, port) def on_socket_disconnected(self): self.status_bar.showMessage("状态: 连接断开,正在重连...") self.get_data_button.setEnabled(False) self.reconnect_timer.start(self.reconnect_interval) def on_socket_error(self, error): self.status_bar.showMessage(f"状态: 错误 - {self.socket.errorString()}") def attempt_reconnect(self): if self.current_reconnect_attempts >= self.max_reconnect_attempts: self.reconnect_timer.stop() self.status_bar.showMessage("状态: 重连失败,请检查网络") return self.current_reconnect_attempts += 1 self.connect_to_server() def on_ready_read(self): self.buffer += self.socket.readAll() def receive_data(self, length: int): while len(self.buffer) < length: if not self.socket.waitForReadyRead(1000): raise ConnectionError("等待数据超时") data = self.buffer[:length] self.buffer = self.buffer[length:] return bytes(data) def parse_package_head(self,data: bytes) -> Tuple[Optional[dict], Optional[bytes]]: """ 解析包头部 返回: (header_dict, remaining_data) """ if len(data) < HEADER_SIZE: return None, None # 检查魔数 if data[:3] != HEADER_MAGIC: print(f"Invalid header magic: {data[:3].hex()}") return None, None # 解析头部 cmd, version, result_code = struct.unpack_from('