T1L_Config/batch_dialog.py
2025-06-26 09:29:17 +08:00

358 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import csv
import socket, struct
import time
from PyQt5.QtWidgets import QApplication, QDialog, QVBoxLayout, QHBoxLayout, QTableWidget, QTableWidgetItem, \
QPushButton, QCheckBox, QFileDialog, QMessageBox
from PyQt5.QtCore import Qt
class BatchOperationDialog(QDialog):
def __init__(self):
super().__init__()
self.setWindowTitle("批量操作节点")
self.setGeometry(100, 100, 1093, 737)
# 初始化分页
self.current_page = 0
self.rows_per_page = 20
# 初始化数据
self.data = []
# 初始化UI
self.init_ui()
def init_ui(self):
# 主布局
main_layout = QVBoxLayout()
# 创建表格8列初始行数可以设置为20
self.table_widget = QTableWidget(self.rows_per_page, 8) # 默认显示20行
self.table_widget.setHorizontalHeaderLabels(
["选择", "IP地址", "MAC地址", "版本号", "通道1名称", "通道1绑定设备", "通道2名称", "通道2绑定设备"]
)
# 使通道列可编辑
for row in range(self.table_widget.rowCount()):
self.table_widget.setItem(row, 4, QTableWidgetItem()) # 通道1名称
self.table_widget.setItem(row, 5, QTableWidgetItem()) # 通道1绑定设备
self.table_widget.setItem(row, 6, QTableWidgetItem()) # 通道2名称
self.table_widget.setItem(row, 7, QTableWidgetItem()) # 通道2绑定设备
# 添加勾选框到第1列
for row in range(self.table_widget.rowCount()):
check_box_item = QTableWidgetItem()
check_box_item.setCheckState(0) # 初始状态为未选中
self.table_widget.setItem(row, 0, check_box_item)
# 按钮布局
button_layout = QHBoxLayout()
# 创建按钮
self.select_all_button = QPushButton("全选")
self.select_none_button = QPushButton("全不选")
self.get_version_button = QPushButton("获取版本")
self.get_mac_button = QPushButton("获取MAC")
self.upgrade_button = QPushButton("升级节点")
self.load_ips_button = QPushButton("加载IP列表")
self.load_all_button = QPushButton("加载完整信息列表")
self.save_button = QPushButton("保存表数据")
self.prev_page_button = QPushButton("上一页")
self.next_page_button = QPushButton("下一页")
# 按钮连接
self.select_all_button.clicked.connect(self.select_all)
self.select_none_button.clicked.connect(self.select_none)
self.get_version_button.clicked.connect(self.get_version)
self.get_mac_button.clicked.connect(self.get_mac)
self.upgrade_button.clicked.connect(self.upgrade_node)
self.save_button.clicked.connect(self.save_data)
self.load_ips_button.clicked.connect(self.load_ips)
self.load_all_button.clicked.connect(self.load_all_info)
self.prev_page_button.clicked.connect(self.prev_page)
self.next_page_button.clicked.connect(self.next_page)
# 将按钮加入布局
button_layout.addWidget(self.select_all_button)
button_layout.addWidget(self.select_none_button)
button_layout.addWidget(self.get_version_button)
button_layout.addWidget(self.get_mac_button)
button_layout.addWidget(self.upgrade_button)
button_layout.addWidget(self.load_ips_button)
button_layout.addWidget(self.load_all_button)
button_layout.addWidget(self.save_button)
button_layout.addWidget(self.prev_page_button)
button_layout.addWidget(self.next_page_button)
# 将表格和按钮布局添加到主布局
main_layout.addWidget(self.table_widget)
main_layout.addLayout(button_layout)
# 设置对话框的布局
self.setLayout(main_layout)
# 允许最大化
# self.setWindowState(Qt.WindowMaximized)
def select_all(self):
for row in range(self.table_widget.rowCount()):
item = self.table_widget.item(row, 0)
item.setCheckState(2) # 选中
def select_none(self):
for row in range(self.table_widget.rowCount()):
item = self.table_widget.item(row, 0)
item.setCheckState(0) # 未选中
def get_version(self):
# 遍历表格的第1列获取选中的行的IP地址
for row in range(self.table_widget.rowCount()):
item = self.table_widget.item(row, 0)
if item.checkState() == Qt.Checked: # 如果该行被选中
ip = self.table_widget.item(row, 1).text() # 获取IP地址
if ip:
try:
# 创建TCP连接到IP地址的12345端口
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(5) # 设置连接超时时间为5秒
s.connect((ip, 12345))
# 发送消息
message = bytes([0xAA, 0x55, 0xAA, 0x0F, 0x01, 0x00])
s.sendall(message)
# 接收返回的数据假设返回数据至少有8个字节
response = s.recv(1024)
if len(response) >= 8:
# 解析第7、8字节作为版本号
version = f"{response[6]}-{response[7]}"
# 将版本号写入到第4列版本号列
self.table_widget.setItem(row, 3, QTableWidgetItem(version))
else:
print(f"接收到的数据长度不足:{len(response)},无法提取版本号")
except Exception as e:
print(f"与IP {ip} 连接失败: {e}")
self.table_widget.setItem(row, 3, QTableWidgetItem("连接失败"))
def get_mac(self):
for row in range(self.table_widget.rowCount()):
item = self.table_widget.item(row, 0)
if item.checkState() == Qt.Checked: # 如果该行被选中
ip = self.table_widget.item(row, 1).text() # 获取IP地址
if ip:
try:
# 创建TCP连接到IP地址的12345端口
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(5) # 设置连接超时时间为5秒
s.connect((ip, 12345))
# 发送消息
message = bytes([0xAA, 0x55, 0xAA, 0x12, 0x01, 0x00])
s.sendall(message)
# 接收返回的数据假设返回数据至少有8个字节
response = s.recv(1024)
if len(response) >= 8:
# 解析第7、8字节作为mac后2字节
mac = f"50-29-4D-20-{response[6]}-{response[7]}"
# 将版本号写入到第3列MAC列
self.table_widget.setItem(row, 2, QTableWidgetItem(mac))
else:
print(f"接收到的数据长度不足:{len(response)},无法提取版本号")
except Exception as e:
print(f"与IP {ip} 连接失败: {e}")
self.table_widget.setItem(row, 3, QTableWidgetItem("连接失败"))
def calculate_crc(self, data: bytes) -> int:
"""计算数据的累加和CRC"""
crc = 0
for byte in data:
crc += byte
return crc & 0xFF # 只保留最低字节
def upgrade_node(self):
"""打开文件选择对话框"""
file_path, _ = QFileDialog.getOpenFileName(
self, "选择升级文件", "", "二进制文件 (*.bin);;所有文件 (*)"
)
if file_path:
file_name = os.path.basename(file_path)
# self.status_bar.showMessage(f"已选择: {file_name}")
try:
print("Starting upgrade process...")
if not os.path.exists(file_path):
raise FileNotFoundError(f"Upgrade file {file_path} not found")
with open(file_path, "rb") as f:
package_data = f.read()
if not package_data:
raise ValueError("Upgrade file is empty")
print(f"Read upgrade package, size: {len(package_data)} bytes")
upgrade_len = len(package_data)
crc = self.calculate_crc(package_data)
upgrade_req = struct.pack("<IB", upgrade_len, crc) + package_data
header_magic = bytes([0xAA, 0x55, 0xAA])
cmd = 0x05
version = 1
result_code = 0
package_head = struct.pack("<3sBBB", header_magic, cmd, version, result_code)
full_packet = package_head + upgrade_req
print("Full packet prepared:")
print(f"Header magic: {package_head[:3].hex()}")
print(f"Command: {cmd}, Version: {version}")
print(f"Upgrade length: {upgrade_len}, CRC: {crc}")
print(f"Total packet size: {len(full_packet)} bytes")
# 分包处理每包最大1500字节
max_packet_size = 1500
total_size = len(full_packet)
num_packets = (total_size // max_packet_size) + (1 if total_size % max_packet_size != 0 else 0)
print(f"Total packets: {num_packets}, each up to {max_packet_size} bytes")
for row in range(self.table_widget.rowCount()):
item = self.table_widget.item(row, 0)
if item.checkState() == Qt.Checked: # 如果该行被选中
ip = self.table_widget.item(row, 1).text() # 获取IP地址
if ip:
try:
# 创建TCP连接到IP地址的12345端口
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(5) # 设置连接超时时间为5秒
s.connect((ip, 12345))
for i in range(num_packets):
start_index = i * max_packet_size
end_index = min((i + 1) * max_packet_size, total_size)
packet_chunk = full_packet[start_index:end_index]
print(f"Sending packet {i + 1}/{num_packets}, size: {len(packet_chunk)} bytes")
s.sendall(packet_chunk)
print(f"Packet {i + 1} sent successfully")
time.sleep(0.03)
except Exception as e:
print(f"与IP {ip} 连接失败: {e}")
# self.table_widget.setItem(row, 3, QTableWidgetItem("连接失败"))
QMessageBox.information(self, "升级", "升级全部完成")
except Exception as e:
print(f"Upgrade failed: {str(e)}")
QMessageBox.critical(
self,
"升级",
"升级包上传失败"
)
def save_data(self):
# 打开文件保存对话框
options = QFileDialog.Options()
file_path, _ = QFileDialog.getSaveFileName(self, "保存表数据", "", "CSV Files (*.csv);;All Files (*)", options=options)
if file_path:
# 保存表格数据到CSV文件
with open(file_path, mode='w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
# 写入表头
headers = [self.table_widget.horizontalHeaderItem(i).text() for i in range(self.table_widget.columnCount())]
writer.writerow(headers[1:])
# 写入表格内容
for row in range(self.table_widget.rowCount()):
row_data = [self.table_widget.item(row, col).text() if self.table_widget.item(row, col) else ""
for col in
range(1, self.table_widget.columnCount())] # Start from col=1 to skip the first column
writer.writerow(row_data)
print(f"数据已保存到 {file_path}")
def load_ips(self):
# 打开文件选择对话框
options = QFileDialog.Options()
file_path, _ = QFileDialog.getOpenFileName(self, "选择IP地址文件", "", "Text Files (*.txt);;All Files (*)", options=options)
if file_path:
# 读取IP地址
with open(file_path, 'r') as file:
lines = file.readlines()
invalid_ips = []
for i, line in enumerate(lines):
ip = line.strip()
# 验证IP地址格式
if not self.validate_ip(ip):
invalid_ips.append(i + 1)
if invalid_ips:
# 如果有无效IP弹出提示框
QMessageBox.warning(self, "错误", f"以下行的IP地址无效{', '.join(map(str, invalid_ips))}")
else:
# 将IP地址加载到表格
self.table_widget.setRowCount(len(lines))
for i, line in enumerate(lines):
ip = line.strip()
if i < self.table_widget.rowCount():
self.table_widget.setItem(i, 1, QTableWidgetItem(ip)) # 设置IP地址列
print(f"加载了{len(lines)}个IP地址")
def validate_ip(self, ip):
# IP地址验证正则表达式
import re
pattern = r"^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$"
return bool(re.match(pattern, ip))
def load_all_info(self):
# 打开CSV文件选择对话框
options = QFileDialog.Options()
file_path, _ = QFileDialog.getOpenFileName(self, "选择CSV文件", "", "CSV Files (*.csv);;All Files (*)",
options=options)
if file_path:
# 读取CSV文件
with open(file_path, mode='r', encoding='utf-8') as file:
reader = csv.reader(file)
headers = next(reader) # 跳过表头
self.data = [row for row in reader]
if len(self.data) <= 20:
self.table_widget.setRowCount(len(self.data))
# 确保加载数据后更新分页
self.current_page = 0 # 每次加载新数据,默认回到第一页
self.update_table()
def update_table(self):
# 获取当前页的数据
start_row = self.current_page * self.rows_per_page
end_row = start_row + self.rows_per_page
rows_to_display = self.data[start_row:end_row]
# 更新表格内容
for row_num, row_data in enumerate(rows_to_display):
for col_num, cell_data in enumerate(row_data):
self.table_widget.setItem(row_num, col_num + 1, QTableWidgetItem(cell_data)) # +1跳过“选择”列
# 根据数据量调整上一页和下一页按钮的状态
self.prev_page_button.setEnabled(self.current_page > 0)
self.next_page_button.setEnabled((self.current_page + 1) * self.rows_per_page < len(self.data))
def prev_page(self):
if self.current_page > 0:
self.current_page -= 1
self.update_table()
def next_page(self):
if (self.current_page + 1) * self.rows_per_page < len(self.data):
self.current_page += 1
self.update_table()
if __name__ == "__main__":
app = QApplication(sys.argv)
dialog = BatchOperationDialog()
dialog.show()
sys.exit(app.exec_())