T1L_Config/mainwindow.py

442 lines
17 KiB
Python
Raw Normal View History

2025-02-22 13:55:58 +08:00
import sys
import socket
2025-04-08 16:47:59 +08:00
import os
2025-04-15 14:18:19 +08:00
from time import sleep
2025-02-22 13:55:58 +08:00
import numpy as np
from scipy.fft import fft, fftfreq
import matplotlib.pyplot as plt
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas, NavigationToolbar2QT
2025-04-08 16:47:59 +08:00
from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QPushButton, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QFrame, QGridLayout, QStatusBar, QSizePolicy, QFileDialog
2025-02-22 13:55:58 +08:00
from PyQt5.QtCore import Qt
import matplotlib
import struct
import time
2025-04-08 16:47:59 +08:00
from typing import Tuple, Optional
2025-02-22 13:55:58 +08:00
# 启用高DPI支持
QApplication.setAttribute(Qt.AA_EnableHighDpiScaling)
QApplication.setAttribute(Qt.AA_UseHighDpiPixmaps)
# 强制使用 Qt5Agg 后端
matplotlib.use('Qt5Agg')
plt.rcParams['font.sans-serif'] = ['SimHei'] # Windows 中文字体
plt.rcParams['axes.unicode_minus'] = False # 解决负号 '-' 显示问题
2025-04-08 16:47:59 +08:00
HEADER_MAGIC = bytes([0xAA, 0x55, 0xAA])
HEADER_SIZE = 6 # PackgeHead除去data字段的大小(3+1+1+1)
2025-02-22 13:55:58 +08:00
class MatplotlibCanvas(FigureCanvas):
""" 用于在 Qt 界面中嵌入 Matplotlib 绘图 """
def __init__(self, parent=None):
self.fig, self.axs = plt.subplots(2, 1, figsize=(10, 5)) # 2 个子图(时域 + 频域)
self.fig.subplots_adjust(hspace=0.6) # 增大时域图和频域图的间距
super().__init__(self.fig)
self.setParent(parent)
self.init_plot()
def init_plot(self):
""" 初始化默认图像(占位提示) """
self.axs[0].set_title("时域信号", fontsize=12, fontweight='bold')
self.axs[0].set_xlabel("采样点")
self.axs[0].set_ylabel("幅度")
self.axs[0].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center', transform=self.axs[0].transAxes)
self.axs[1].set_title("频域信号", fontsize=12, fontweight='bold')
self.axs[1].set_xlabel("频率 (Hz)")
self.axs[1].set_ylabel("幅度")
self.axs[1].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center', transform=self.axs[1].transAxes)
self.draw() # 更新绘图
def plot_data(self, data):
""" 绘制时域和频域图 """
self.axs[0].clear()
self.axs[1].clear()
# 时域信号
self.axs[0].plot(data, color='blue')
self.axs[0].set_title("时域信号", fontsize=12, fontweight='bold')
self.axs[0].set_xlabel("采样点")
self.axs[0].set_ylabel("幅度")
# 频域信号
N = len(data)
T = 1.0 / 96000 # 假设采样率为 96000 Hz
yf = fft(data)
xf = fftfreq(N, T)[:N // 2]
self.axs[1].plot(xf, 2.0 / N * np.abs(yf[:N // 2]), color='red')
self.axs[1].set_title("频域信号", fontsize=12, fontweight='bold')
self.axs[1].set_xlabel("频率 (Hz)")
self.axs[1].set_ylabel("幅度")
self.draw() # 更新绘图
class SocketClientApp(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle("Socket Client & Data Plotter")
self.setGeometry(100, 100, 700, 600) # 设置初始尺寸
# 组件初始化
self.ip_label = QLabel("IP 地址:")
self.port_label = QLabel("端口号:")
self.ip_input = QLineEdit(self)
self.port_input = QLineEdit(self)
self.connect_button = QPushButton("连接")
self.get_data_button = QPushButton("获取数据")
2025-04-08 16:47:59 +08:00
self.upgrade_button = QPushButton("更新固件")
2025-04-15 14:18:19 +08:00
self.upgrade_button_test = QPushButton("更新固件2")
2025-02-22 13:55:58 +08:00
self.get_data_button.setEnabled(False) # 初始状态不可点击
2025-04-08 16:47:59 +08:00
self.upgrade_button.setEnabled(False) # 初始状态不可点击
2025-04-15 14:18:19 +08:00
self.upgrade_button_test.setEnabled(False) # 初始状态不可点击
2025-02-22 13:55:58 +08:00
self.ip_input.setText("192.168.0.200")
self.port_input.setText("12345")
# 预留绘图区域
self.canvas = MatplotlibCanvas(self)
self.toolbar = NavigationToolbar2QT(self.canvas, self) # 添加工具栏(放大、缩小、拖拽)
# 预留特征值显示区域
self.feature_label = QLabel("特征值:")
self.acceleration_label = QLabel("加速度: -")
self.velocity_label = QLabel("速度: -")
# 设置分割线
self.line = QFrame()
self.line.setFrameShape(QFrame.HLine)
self.line.setFrameShadow(QFrame.Sunken)
# 布局管理
main_layout = QVBoxLayout()
# 输入区域IP & 端口)
input_layout = QHBoxLayout()
input_layout.addWidget(self.ip_label)
input_layout.addWidget(self.ip_input)
input_layout.addWidget(self.port_label)
input_layout.addWidget(self.port_input)
main_layout.addLayout(input_layout)
# 按钮区域
button_layout = QHBoxLayout()
button_layout.addWidget(self.connect_button)
button_layout.addWidget(self.get_data_button)
2025-04-08 16:47:59 +08:00
button_layout.addWidget(self.upgrade_button)
2025-04-15 14:18:19 +08:00
button_layout.addWidget(self.upgrade_button_test)
2025-02-22 13:55:58 +08:00
main_layout.addLayout(button_layout)
# 添加分割线
main_layout.addWidget(self.line)
# 工具栏(放大、缩小、拖拽)
main_layout.addWidget(self.toolbar)
# 绘图 + 特征值区域
graph_layout = QVBoxLayout()
graph_layout.addWidget(self.canvas)
# 特征值显示区域
feature_layout = QGridLayout()
feature_layout.addWidget(self.feature_label, 0, 0)
feature_layout.addWidget(self.acceleration_label, 0, 1)
feature_layout.addWidget(self.velocity_label, 0, 2)
graph_layout.addLayout(feature_layout)
main_layout.addLayout(graph_layout)
# 增大特征值区域与状态栏的间距
main_layout.addSpacing(20)
# 创建状态栏
self.status_bar = QStatusBar()
self.setStatusBar(self.status_bar)
# 状态栏显示初始消息
self.status_bar.showMessage("状态: 请填写 IP 和端口号")
# 主窗口的中央部件
central_widget = QWidget()
central_widget.setLayout(main_layout)
self.setCentralWidget(central_widget)
# 事件绑定
self.connect_button.clicked.connect(self.connect_to_server)
self.get_data_button.clicked.connect(self.on_button_clicked)
2025-04-08 16:47:59 +08:00
self.upgrade_button.clicked.connect(self.on_button_upgrade)
2025-04-15 14:18:19 +08:00
self.upgrade_button_test.clicked.connect(self.on_button_upgrade_test)
2025-02-22 13:55:58 +08:00
# 设置布局策略,确保控件大小随窗口调整
self.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
2025-04-15 14:18:19 +08:00
def get_extremes(self,data):
"""获取最大和最小的10个值排序法"""
sorted_data = np.sort(data)
return {
'top10_max': sorted_data[-10:][::-1].tolist(), # 降序排列
'top10_min': sorted_data[:10].tolist()
}
def mean_without_max_optimized(self,data):
"""优化内存使用的版本"""
arr = np.array(data)
if len(arr) < 2:
return np.nan
max_val = np.max(arr)
# 计算总和和计数时直接排除最大值
total = np.sum(arr) - max_val
count = len(arr) - 1
return total / count
def mean_without_min_optimized(self, data):
"""优化内存使用的版本"""
arr = np.array(data)
if len(arr) < 2:
return np.nan
min_val = np.min(arr)
# 计算总和和计数时直接排除最大值
total = np.sum(arr) - min_val
count = len(arr) - 1
return total / count
2025-04-08 16:47:59 +08:00
def calculate_crc(self,data: bytes) -> int:
"""计算数据的累加和CRC"""
crc = 0
for byte in data:
crc += byte
return crc & 0xFF # 只保留最低字节
2025-02-22 13:55:58 +08:00
def connect_to_server(self):
""" 连接到服务器 """
2025-04-15 14:18:19 +08:00
self.ip = self.ip_input.text()
self.port = self.port_input.text()
2025-02-22 13:55:58 +08:00
2025-04-15 14:18:19 +08:00
if not self.ip or not self.port:
2025-02-22 13:55:58 +08:00
self.status_bar.showMessage("状态: 请输入有效的 IP 和端口号")
2025-04-15 14:18:19 +08:00
2025-02-22 13:55:58 +08:00
return
try:
2025-04-15 14:18:19 +08:00
self.port = int(self.port)
2025-02-22 13:55:58 +08:00
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
2025-03-08 14:48:51 +08:00
self.socket.settimeout(2)
2025-02-22 13:55:58 +08:00
# 设置 keep-alive 选项
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
2025-04-15 14:18:19 +08:00
self.socket.connect((self.ip, self.port))
2025-02-22 13:55:58 +08:00
self.get_data_button.setEnabled(True) # 连接成功,启用获取数据按钮
2025-04-08 16:47:59 +08:00
self.upgrade_button.setEnabled(True)
2025-04-15 14:18:19 +08:00
self.upgrade_button_test.setEnabled(True)
self.status_bar.showMessage(f"状态: 连接成功! 服务器 {self.ip}:{self.port}")
2025-02-22 13:55:58 +08:00
except Exception as e:
self.status_bar.showMessage(f"状态: 连接失败 - {str(e)}")
2025-04-08 16:47:59 +08:00
def parse_package_head(self,data: bytes) -> Tuple[Optional[dict], Optional[bytes]]:
"""
解析包头部
返回: (header_dict, remaining_data)
"""
if len(data) < HEADER_SIZE:
return None, None
# 检查魔数
if data[:3] != HEADER_MAGIC:
print(f"Invalid header magic: {data[:3].hex()}")
return None, None
# 解析头部
cmd, version, result_code = struct.unpack_from('<BBB', data, 3)
return {
'cmd': cmd,
'version': version,
'result_code': result_code
}, data[HEADER_SIZE:]
def receive_data(self,length: int):
"""
从socket接收指定长度的数据
"""
data = bytearray()
while len(data) < length:
chunk = self.socket.recv(length - len(data))
if not chunk:
raise ConnectionError("Connection closed")
data.extend(chunk)
print(f"data size {len(data)}")
return bytes(data)
def process_packet(self):
2025-02-22 13:55:58 +08:00
""" 循环接收数据,直到接收完整 """
try:
2025-04-08 16:47:59 +08:00
# 1. 接收包头
start_time = time.time()
header_data = self.receive_data(6)
header, _ = self.parse_package_head(header_data)
if not header:
print("Invalid package header")
return None
# 2. 根据命令类型处理数据
if header['cmd'] == 0x01:
# 先接收WaveDataRsp的固定部分(5字节)
wave_header = self.receive_data(5)
channel_id = wave_header[0]
sampling_rate = struct.unpack('<i', wave_header[1:5])[0]
print(f"channel_id {channel_id}")
print(f"sampling_rate {sampling_rate}")
# 计算需要接收的数据长度
data_length = sampling_rate * 4
wave_data = self.receive_data(data_length)
end_time = time.time()
# 计算并打印执行时间
execution_time = end_time - start_time
print(f"代码执行时间: {execution_time}")
# 合并成完整的数据包
# full_packet = header_data + wave_header + wave_data
return wave_data
else:
print(f"Unsupported command: {header['cmd']}")
return None
except ConnectionError as e:
print(f"Connection error: {e}")
return None
2025-02-22 13:55:58 +08:00
except Exception as e:
2025-04-08 16:47:59 +08:00
print(f"Error processing packet: {e}")
return None
2025-02-22 13:55:58 +08:00
def on_button_clicked(self):
""" 获取数据并绘制 """
try:
self.status_bar.showMessage("状态: 正在获取数据...")
2025-04-15 14:18:19 +08:00
self.socket.sendall(bytes([0xAA,0x55,0xAA,0x01,0x00,0x00,0x01,0x00,0x77,0x01,0x00,0x01,0x00,0x00,0x00])) # 发送数据
2025-04-08 16:47:59 +08:00
data = self.process_packet() # 接收所有数据
2025-02-22 13:55:58 +08:00
data = np.frombuffer(data, dtype=np.int32) # 根据实际数据格式转换
2025-03-08 14:48:51 +08:00
for i in range(min(100, len(data))): # 确保不超过数据长度
print(f"{data[i]:1f}", end=" ")
print() # 换行
2025-02-22 13:55:58 +08:00
LSB_32BIT = (2.8 / (2 ** 31)) * ((750 + 287) / 287) * 1000
scaled_data = data * LSB_32BIT
2025-03-08 14:48:51 +08:00
for i in range(min(100, len(scaled_data))): # 确保不超过数据长度
print(f"{scaled_data[i]:2f}", end=" ")
print() # 换行
2025-04-15 14:18:19 +08:00
result = self.get_extremes(scaled_data)
print("最大的10个值:", result['top10_max'])
print("最小的10个值:", result['top10_min'])
mean_max = self.mean_without_max_optimized(result['top10_max'])
print(f"top10_max 去除最大的数据后的平均值1{mean_max}")
mean_min = self.mean_without_min_optimized(result['top10_min'])
print(f"top10_min 去除最大的数据后的平均值2{mean_min}")
print(f"pp :{mean_max - mean_min}")
2025-02-22 13:55:58 +08:00
self.canvas.plot_data(scaled_data) # 在 Qt 界面中绘图
self.status_bar.showMessage("状态: 数据绘制完成")
except Exception as e:
self.status_bar.showMessage(f"状态: 错误 - {str(e)}")
2025-04-08 16:47:59 +08:00
def on_button_upgrade(self):
"""打开文件选择对话框"""
file_path, _ = QFileDialog.getOpenFileName(
self, "选择升级文件", "", "二进制文件 (*.bin);;所有文件 (*)"
)
if file_path:
file_name = os.path.basename(file_path)
self.status_bar.showMessage(f"已选择: {file_name}")
try:
print("Starting upgrade process...")
if not os.path.exists(file_path):
raise FileNotFoundError(f"Upgrade file {file_path} not found")
with open(file_path, "rb") as f:
package_data = f.read()
if not package_data:
raise ValueError("Upgrade file is empty")
print(f"Read upgrade package, size: {len(package_data)} bytes")
upgrade_len = len(package_data)
crc = self.calculate_crc(package_data)
upgrade_req = struct.pack("<IB", upgrade_len, crc) + package_data
header_magic = bytes([0xAA, 0x55, 0xAA])
cmd = 0x05
version = 1
result_code = 0
package_head = struct.pack("<3sBBB", header_magic, cmd, version, result_code)
full_packet = package_head + upgrade_req
print("Full packet prepared:")
print(f"Header magic: {package_head[:3].hex()}")
print(f"Command: {cmd}, Version: {version}")
print(f"Upgrade length: {upgrade_len}, CRC: {crc}")
print(f"Total packet size: {len(full_packet)} bytes")
self.socket.sendall(full_packet)
print("Upgrade packet ready to send (commented out actual send code)")
except Exception as e:
print(f"Upgrade failed: {str(e)}")
2025-02-22 13:55:58 +08:00
2025-04-15 14:18:19 +08:00
def on_button_upgrade_test(self):
"""打开文件选择对话框"""
file_path, _ = QFileDialog.getOpenFileName(
self, "选择升级文件", "", "二进制文件 (*.bin);;所有文件 (*)"
)
if file_path:
file_name = os.path.basename(file_path)
self.status_bar.showMessage(f"已选择: {file_name}")
try:
print("Starting upgrade process...")
if not os.path.exists(file_path):
raise FileNotFoundError(f"Upgrade file {file_path} not found")
with open(file_path, "rb") as f:
package_data = f.read()
if not package_data:
raise ValueError("Upgrade file is empty")
print(f"Read upgrade package, size: {len(package_data)} bytes")
upgrade_len = len(package_data)
crc = self.calculate_crc(package_data)
upgrade_req = struct.pack("<IB", upgrade_len, crc) + package_data
header_magic = bytes([0xAA, 0x55, 0xAA])
cmd = 0x05
version = 1
result_code = 0
package_head = struct.pack("<3sBBB", header_magic, cmd, version, result_code)
full_packet = package_head + upgrade_req
print("Full packet prepared:")
print(f"Header magic: {package_head[:3].hex()}")
print(f"Command: {cmd}, Version: {version}")
print(f"Upgrade length: {upgrade_len}, CRC: {crc}")
print(f"Total packet size: {len(full_packet)} bytes")
self.socket.sendall(full_packet)
sleep(15)
self.socket.close()
self.connect_to_server()
self.socket.sendall(full_packet)
print("Upgrade packet ready to send (commented out actual send code)")
except Exception as e:
print(f"Upgrade failed: {str(e)}")
2025-02-22 13:55:58 +08:00
if __name__ == '__main__':
app = QApplication(sys.argv)
window = SocketClientApp()
window.show()
sys.exit(app.exec_())