添加固件更新

This commit is contained in:
zhangsheng 2025-04-08 16:47:59 +08:00
parent 9ae3e480ca
commit 345de6a942

View File

@ -1,14 +1,16 @@
import sys
import socket
import os
import numpy as np
from scipy.fft import fft, fftfreq
import matplotlib.pyplot as plt
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas, NavigationToolbar2QT
from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QPushButton, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QFrame, QGridLayout, QStatusBar, QSizePolicy
from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QPushButton, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QFrame, QGridLayout, QStatusBar, QSizePolicy, QFileDialog
from PyQt5.QtCore import Qt
import matplotlib
import struct
import time
from typing import Tuple, Optional
# 启用高DPI支持
QApplication.setAttribute(Qt.AA_EnableHighDpiScaling)
QApplication.setAttribute(Qt.AA_UseHighDpiPixmaps)
@ -19,6 +21,8 @@ matplotlib.use('Qt5Agg')
plt.rcParams['font.sans-serif'] = ['SimHei'] # Windows 中文字体
plt.rcParams['axes.unicode_minus'] = False # 解决负号 '-' 显示问题
HEADER_MAGIC = bytes([0xAA, 0x55, 0xAA])
HEADER_SIZE = 6 # PackgeHead除去data字段的大小(3+1+1+1)
class MatplotlibCanvas(FigureCanvas):
""" 用于在 Qt 界面中嵌入 Matplotlib 绘图 """
@ -81,7 +85,9 @@ class SocketClientApp(QMainWindow):
self.port_input = QLineEdit(self)
self.connect_button = QPushButton("连接")
self.get_data_button = QPushButton("获取数据")
self.upgrade_button = QPushButton("更新固件")
self.get_data_button.setEnabled(False) # 初始状态不可点击
self.upgrade_button.setEnabled(False) # 初始状态不可点击
self.ip_input.setText("192.168.0.200")
self.port_input.setText("12345")
@ -114,6 +120,7 @@ class SocketClientApp(QMainWindow):
button_layout = QHBoxLayout()
button_layout.addWidget(self.connect_button)
button_layout.addWidget(self.get_data_button)
button_layout.addWidget(self.upgrade_button)
main_layout.addLayout(button_layout)
# 添加分割线
@ -153,10 +160,17 @@ class SocketClientApp(QMainWindow):
# 事件绑定
self.connect_button.clicked.connect(self.connect_to_server)
self.get_data_button.clicked.connect(self.on_button_clicked)
self.upgrade_button.clicked.connect(self.on_button_upgrade)
# 设置布局策略,确保控件大小随窗口调整
self.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
def calculate_crc(self,data: bytes) -> int:
"""计算数据的累加和CRC"""
crc = 0
for byte in data:
crc += byte
return crc & 0xFF # 只保留最低字节
def connect_to_server(self):
""" 连接到服务器 """
ip = self.ip_input.text()
@ -164,6 +178,7 @@ class SocketClientApp(QMainWindow):
if not ip or not port:
self.status_bar.showMessage("状态: 请输入有效的 IP 和端口号")
return
try:
@ -175,86 +190,92 @@ class SocketClientApp(QMainWindow):
self.socket.connect((ip, port))
self.get_data_button.setEnabled(True) # 连接成功,启用获取数据按钮
self.upgrade_button.setEnabled(True)
self.status_bar.showMessage(f"状态: 连接成功! 服务器 {ip}:{port}")
except Exception as e:
self.status_bar.showMessage(f"状态: 连接失败 - {str(e)}")
def receive_data(self):
def parse_package_head(self,data: bytes) -> Tuple[Optional[dict], Optional[bytes]]:
"""
解析包头部
返回: (header_dict, remaining_data)
"""
if len(data) < HEADER_SIZE:
return None, None
# 检查魔数
if data[:3] != HEADER_MAGIC:
print(f"Invalid header magic: {data[:3].hex()}")
return None, None
# 解析头部
cmd, version, result_code = struct.unpack_from('<BBB', data, 3)
return {
'cmd': cmd,
'version': version,
'result_code': result_code
}, data[HEADER_SIZE:]
def receive_data(self,length: int):
"""
从socket接收指定长度的数据
"""
data = bytearray()
while len(data) < length:
chunk = self.socket.recv(length - len(data))
if not chunk:
raise ConnectionError("Connection closed")
data.extend(chunk)
print(f"data size {len(data)}")
return bytes(data)
def process_packet(self):
""" 循环接收数据,直到接收完整 """
received_data = b"" # 初始化接收数据缓存
complete_data = b"" # 用于存储拼接后的完整数据
buffer_size = 0
# 记录开始时间
start_time = time.time()
try:
while True:
try:
# 每次接收 1024 字节的数据
chunk = self.socket.recv(1460)
buffer_size += len(chunk)
print(f"len partf {len(chunk)},buffer_size {buffer_size}")
if not chunk:
break # 如果没有数据,退出循环
received_data += chunk
except socket.timeout:
print(f"接收数据超时({2} 秒),退出接收循环")
break # 超时后退出接收循环
# 记录结束时间
end_time = time.time()
# 1. 接收包头
start_time = time.time()
header_data = self.receive_data(6)
header, _ = self.parse_package_head(header_data)
if not header:
print("Invalid package header")
return None
# 计算并打印执行时间
execution_time = end_time - start_time
print(f"代码执行时间: {execution_time}")
# 循环查找并解析帧头
while True:
# 查找帧头
frame_start = received_data.find(b'\xAA\x55\xAA')
if frame_start == -1:
break # 没有找到帧头,退出内层循环,等待更多数据
# 丢弃帧头之前的数据(可能是无效数据)
received_data = received_data[frame_start:]
# 检查数据是否足够解析长度字段
if len(received_data) < 7:
break # 数据不完整,退出内层循环,等待更多数据
try:
# 解析数据长度字段
data_length = struct.unpack('<I', received_data[3:7])[0]
except Exception as e:
print(f"解析长度字段失败: {e}")
received_data = received_data[1:] # 丢弃无效字节,继续查找帧头
continue
# 检查数据是否足够解析完整帧
print(f"解析长度字段 {len(received_data)}")
if len(received_data) < 7 + data_length:
break # 数据不完整,退出内层循环,等待更多数据
# 提取实际数据
actual_data = received_data[7:7 + data_length]
complete_data += actual_data # 将数据拼接到完整数据中
# 丢弃已处理的数据
received_data = received_data[7 + data_length:]
# 2. 根据命令类型处理数据
if header['cmd'] == 0x01:
# 先接收WaveDataRsp的固定部分(5字节)
wave_header = self.receive_data(5)
channel_id = wave_header[0]
sampling_rate = struct.unpack('<i', wave_header[1:5])[0]
print(f"channel_id {channel_id}")
print(f"sampling_rate {sampling_rate}")
# 计算需要接收的数据长度
data_length = sampling_rate * 4
wave_data = self.receive_data(data_length)
end_time = time.time()
# 计算并打印执行时间
execution_time = end_time - start_time
print(f"代码执行时间: {execution_time}")
# 合并成完整的数据包
# full_packet = header_data + wave_header + wave_data
return wave_data
else:
print(f"Unsupported command: {header['cmd']}")
return None
except ConnectionError as e:
print(f"Connection error: {e}")
return None
except Exception as e:
print(f"发生错误: {e}")
print(f" 数据: {len(complete_data)}")
# 打印前100个字节的数据以16进制方式
print("前100个字节的数据16进制表示")
for i in range(min(100, len(complete_data))): # 确保不超过数据长度
print(f"{complete_data[i]:02X}", end=" ") # 每个字节以16进制格式打印
print() # 换行
return complete_data
print(f"Error processing packet: {e}")
return None
def on_button_clicked(self):
""" 获取数据并绘制 """
try:
self.status_bar.showMessage("状态: 正在获取数据...")
self.socket.sendall(bytes([0xAA, 0x55, 0xAA,0x01])) # 发送数据
data = self.receive_data() # 接收所有数据
data = self.process_packet() # 接收所有数据
data = np.frombuffer(data, dtype=np.int32) # 根据实际数据格式转换
for i in range(min(100, len(data))): # 确保不超过数据长度
print(f"{data[i]:1f}", end=" ")
@ -269,6 +290,53 @@ class SocketClientApp(QMainWindow):
except Exception as e:
self.status_bar.showMessage(f"状态: 错误 - {str(e)}")
def on_button_upgrade(self):
"""打开文件选择对话框"""
file_path, _ = QFileDialog.getOpenFileName(
self, "选择升级文件", "", "二进制文件 (*.bin);;所有文件 (*)"
)
if file_path:
file_name = os.path.basename(file_path)
self.status_bar.showMessage(f"已选择: {file_name}")
try:
print("Starting upgrade process...")
if not os.path.exists(file_path):
raise FileNotFoundError(f"Upgrade file {file_path} not found")
with open(file_path, "rb") as f:
package_data = f.read()
if not package_data:
raise ValueError("Upgrade file is empty")
print(f"Read upgrade package, size: {len(package_data)} bytes")
upgrade_len = len(package_data)
crc = self.calculate_crc(package_data)
upgrade_req = struct.pack("<IB", upgrade_len, crc) + package_data
header_magic = bytes([0xAA, 0x55, 0xAA])
cmd = 0x05
version = 1
result_code = 0
package_head = struct.pack("<3sBBB", header_magic, cmd, version, result_code)
full_packet = package_head + upgrade_req
print("Full packet prepared:")
print(f"Header magic: {package_head[:3].hex()}")
print(f"Command: {cmd}, Version: {version}")
print(f"Upgrade length: {upgrade_len}, CRC: {crc}")
print(f"Total packet size: {len(full_packet)} bytes")
self.socket.sendall(full_packet)
print("Upgrade packet ready to send (commented out actual send code)")
except Exception as e:
print(f"Upgrade failed: {str(e)}")
if __name__ == '__main__':
app = QApplication(sys.argv)