complete get mac, version and upgrade functions.

This commit is contained in:
pandx 2025-06-25 18:17:03 +08:00
parent 39f89b8f25
commit 2bbd391407

View File

@ -1,5 +1,9 @@
import os
import sys import sys
import csv import csv
import socket, struct
import time
from PyQt5.QtWidgets import QApplication, QDialog, QVBoxLayout, QHBoxLayout, QTableWidget, QTableWidgetItem, \ from PyQt5.QtWidgets import QApplication, QDialog, QVBoxLayout, QHBoxLayout, QTableWidget, QTableWidgetItem, \
QPushButton, QCheckBox, QFileDialog, QMessageBox QPushButton, QCheckBox, QFileDialog, QMessageBox
from PyQt5.QtCore import Qt from PyQt5.QtCore import Qt
@ -104,13 +108,150 @@ class BatchOperationDialog(QDialog):
item.setCheckState(0) # 未选中 item.setCheckState(0) # 未选中
def get_version(self): def get_version(self):
print("获取版本按钮被点击") # 遍历表格的第1列获取选中的行的IP地址
for row in range(self.table_widget.rowCount()):
item = self.table_widget.item(row, 0)
if item.checkState() == Qt.Checked: # 如果该行被选中
ip = self.table_widget.item(row, 1).text() # 获取IP地址
if ip:
try:
# 创建TCP连接到IP地址的12345端口
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(5) # 设置连接超时时间为5秒
s.connect((ip, 12345))
# 发送消息
message = bytes([0xAA, 0x55, 0xAA, 0x0F, 0x01, 0x00])
s.sendall(message)
# 接收返回的数据假设返回数据至少有8个字节
response = s.recv(1024)
if len(response) >= 8:
# 解析第7、8字节作为版本号
version = f"{response[6]}-{response[7]}"
# 将版本号写入到第4列版本号列
self.table_widget.setItem(row, 3, QTableWidgetItem(version))
else:
print(f"接收到的数据长度不足:{len(response)},无法提取版本号")
except Exception as e:
print(f"与IP {ip} 连接失败: {e}")
self.table_widget.setItem(row, 3, QTableWidgetItem("连接失败"))
def get_mac(self): def get_mac(self):
print("获取MAC按钮被点击") for row in range(self.table_widget.rowCount()):
item = self.table_widget.item(row, 0)
if item.checkState() == Qt.Checked: # 如果该行被选中
ip = self.table_widget.item(row, 1).text() # 获取IP地址
if ip:
try:
# 创建TCP连接到IP地址的12345端口
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(5) # 设置连接超时时间为5秒
s.connect((ip, 12345))
# 发送消息
message = bytes([0xAA, 0x55, 0xAA, 0x12, 0x01, 0x00])
s.sendall(message)
# 接收返回的数据假设返回数据至少有8个字节
response = s.recv(1024)
if len(response) >= 8:
# 解析第7、8字节作为mac后2字节
mac = f"50-29-4D-20-{response[6]}-{response[7]}"
# 将版本号写入到第3列MAC列
self.table_widget.setItem(row, 2, QTableWidgetItem(mac))
else:
print(f"接收到的数据长度不足:{len(response)},无法提取版本号")
except Exception as e:
print(f"与IP {ip} 连接失败: {e}")
self.table_widget.setItem(row, 3, QTableWidgetItem("连接失败"))
def calculate_crc(self, data: bytes) -> int:
"""计算数据的累加和CRC"""
crc = 0
for byte in data:
crc += byte
return crc & 0xFF # 只保留最低字节
def upgrade_node(self): def upgrade_node(self):
print("升级节点按钮被点击") """打开文件选择对话框"""
file_path, _ = QFileDialog.getOpenFileName(
self, "选择升级文件", "", "二进制文件 (*.bin);;所有文件 (*)"
)
if file_path:
file_name = os.path.basename(file_path)
# self.status_bar.showMessage(f"已选择: {file_name}")
try:
print("Starting upgrade process...")
if not os.path.exists(file_path):
raise FileNotFoundError(f"Upgrade file {file_path} not found")
with open(file_path, "rb") as f:
package_data = f.read()
if not package_data:
raise ValueError("Upgrade file is empty")
print(f"Read upgrade package, size: {len(package_data)} bytes")
upgrade_len = len(package_data)
crc = self.calculate_crc(package_data)
upgrade_req = struct.pack("<IB", upgrade_len, crc) + package_data
header_magic = bytes([0xAA, 0x55, 0xAA])
cmd = 0x05
version = 1
result_code = 0
package_head = struct.pack("<3sBBB", header_magic, cmd, version, result_code)
full_packet = package_head + upgrade_req
print("Full packet prepared:")
print(f"Header magic: {package_head[:3].hex()}")
print(f"Command: {cmd}, Version: {version}")
print(f"Upgrade length: {upgrade_len}, CRC: {crc}")
print(f"Total packet size: {len(full_packet)} bytes")
# 分包处理每包最大1500字节
max_packet_size = 1500
total_size = len(full_packet)
num_packets = (total_size // max_packet_size) + (1 if total_size % max_packet_size != 0 else 0)
print(f"Total packets: {num_packets}, each up to {max_packet_size} bytes")
for row in range(self.table_widget.rowCount()):
item = self.table_widget.item(row, 0)
if item.checkState() == Qt.Checked: # 如果该行被选中
ip = self.table_widget.item(row, 1).text() # 获取IP地址
if ip:
try:
# 创建TCP连接到IP地址的12345端口
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(5) # 设置连接超时时间为5秒
s.connect((ip, 12345))
for i in range(num_packets):
start_index = i * max_packet_size
end_index = min((i + 1) * max_packet_size, total_size)
packet_chunk = full_packet[start_index:end_index]
print(f"Sending packet {i + 1}/{num_packets}, size: {len(packet_chunk)} bytes")
s.sendall(packet_chunk)
print(f"Packet {i + 1} sent successfully")
time.sleep(0.03)
except Exception as e:
print(f"与IP {ip} 连接失败: {e}")
# self.table_widget.setItem(row, 3, QTableWidgetItem("连接失败"))
QMessageBox.information(self, "升级", "升级全部完成")
except Exception as e:
print(f"Upgrade failed: {str(e)}")
QMessageBox.critical(
self,
"升级",
"升级包上传失败"
)
def save_data(self): def save_data(self):
# 打开文件保存对话框 # 打开文件保存对话框