T1L_Config/batch_dialog.py

358 lines
16 KiB
Python
Raw Permalink Normal View History

import os
2025-06-25 10:58:59 +08:00
import sys
2025-06-25 11:26:59 +08:00
import csv
import socket, struct
import time
2025-06-25 10:58:59 +08:00
from PyQt5.QtWidgets import QApplication, QDialog, QVBoxLayout, QHBoxLayout, QTableWidget, QTableWidgetItem, \
2025-06-25 11:26:59 +08:00
QPushButton, QCheckBox, QFileDialog, QMessageBox
from PyQt5.QtCore import Qt
2025-06-25 10:58:59 +08:00
class BatchOperationDialog(QDialog):
def __init__(self):
super().__init__()
self.setWindowTitle("批量操作节点")
2025-06-26 09:29:17 +08:00
self.setGeometry(100, 100, 1093, 737)
2025-06-25 10:58:59 +08:00
2025-06-25 17:42:57 +08:00
# 初始化分页
self.current_page = 0
self.rows_per_page = 20
# 初始化数据
self.data = []
2025-06-25 10:58:59 +08:00
# 初始化UI
self.init_ui()
def init_ui(self):
# 主布局
main_layout = QVBoxLayout()
2025-06-25 17:42:57 +08:00
# 创建表格8列初始行数可以设置为20
self.table_widget = QTableWidget(self.rows_per_page, 8) # 默认显示20行
2025-06-25 10:58:59 +08:00
self.table_widget.setHorizontalHeaderLabels(
["选择", "IP地址", "MAC地址", "版本号", "通道1名称", "通道1绑定设备", "通道2名称", "通道2绑定设备"]
)
# 使通道列可编辑
for row in range(self.table_widget.rowCount()):
self.table_widget.setItem(row, 4, QTableWidgetItem()) # 通道1名称
self.table_widget.setItem(row, 5, QTableWidgetItem()) # 通道1绑定设备
self.table_widget.setItem(row, 6, QTableWidgetItem()) # 通道2名称
self.table_widget.setItem(row, 7, QTableWidgetItem()) # 通道2绑定设备
# 添加勾选框到第1列
for row in range(self.table_widget.rowCount()):
check_box_item = QTableWidgetItem()
check_box_item.setCheckState(0) # 初始状态为未选中
self.table_widget.setItem(row, 0, check_box_item)
# 按钮布局
button_layout = QHBoxLayout()
# 创建按钮
self.select_all_button = QPushButton("全选")
self.select_none_button = QPushButton("全不选")
self.get_version_button = QPushButton("获取版本")
self.get_mac_button = QPushButton("获取MAC")
self.upgrade_button = QPushButton("升级节点")
self.load_ips_button = QPushButton("加载IP列表")
self.load_all_button = QPushButton("加载完整信息列表")
self.save_button = QPushButton("保存表数据")
2025-06-25 17:42:57 +08:00
self.prev_page_button = QPushButton("上一页")
self.next_page_button = QPushButton("下一页")
2025-06-25 10:58:59 +08:00
# 按钮连接
self.select_all_button.clicked.connect(self.select_all)
self.select_none_button.clicked.connect(self.select_none)
self.get_version_button.clicked.connect(self.get_version)
self.get_mac_button.clicked.connect(self.get_mac)
self.upgrade_button.clicked.connect(self.upgrade_node)
self.save_button.clicked.connect(self.save_data)
self.load_ips_button.clicked.connect(self.load_ips)
self.load_all_button.clicked.connect(self.load_all_info)
2025-06-25 17:42:57 +08:00
self.prev_page_button.clicked.connect(self.prev_page)
self.next_page_button.clicked.connect(self.next_page)
2025-06-25 10:58:59 +08:00
# 将按钮加入布局
button_layout.addWidget(self.select_all_button)
button_layout.addWidget(self.select_none_button)
button_layout.addWidget(self.get_version_button)
button_layout.addWidget(self.get_mac_button)
button_layout.addWidget(self.upgrade_button)
button_layout.addWidget(self.load_ips_button)
button_layout.addWidget(self.load_all_button)
button_layout.addWidget(self.save_button)
2025-06-25 17:42:57 +08:00
button_layout.addWidget(self.prev_page_button)
button_layout.addWidget(self.next_page_button)
2025-06-25 10:58:59 +08:00
# 将表格和按钮布局添加到主布局
main_layout.addWidget(self.table_widget)
main_layout.addLayout(button_layout)
# 设置对话框的布局
self.setLayout(main_layout)
2025-06-25 11:26:59 +08:00
# 允许最大化
# self.setWindowState(Qt.WindowMaximized)
2025-06-25 10:58:59 +08:00
def select_all(self):
for row in range(self.table_widget.rowCount()):
item = self.table_widget.item(row, 0)
item.setCheckState(2) # 选中
def select_none(self):
for row in range(self.table_widget.rowCount()):
item = self.table_widget.item(row, 0)
item.setCheckState(0) # 未选中
def get_version(self):
# 遍历表格的第1列获取选中的行的IP地址
for row in range(self.table_widget.rowCount()):
item = self.table_widget.item(row, 0)
if item.checkState() == Qt.Checked: # 如果该行被选中
ip = self.table_widget.item(row, 1).text() # 获取IP地址
if ip:
try:
# 创建TCP连接到IP地址的12345端口
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(5) # 设置连接超时时间为5秒
s.connect((ip, 12345))
# 发送消息
message = bytes([0xAA, 0x55, 0xAA, 0x0F, 0x01, 0x00])
s.sendall(message)
# 接收返回的数据假设返回数据至少有8个字节
response = s.recv(1024)
if len(response) >= 8:
# 解析第7、8字节作为版本号
version = f"{response[6]}-{response[7]}"
# 将版本号写入到第4列版本号列
self.table_widget.setItem(row, 3, QTableWidgetItem(version))
else:
print(f"接收到的数据长度不足:{len(response)},无法提取版本号")
except Exception as e:
print(f"与IP {ip} 连接失败: {e}")
self.table_widget.setItem(row, 3, QTableWidgetItem("连接失败"))
2025-06-25 10:58:59 +08:00
def get_mac(self):
for row in range(self.table_widget.rowCount()):
item = self.table_widget.item(row, 0)
if item.checkState() == Qt.Checked: # 如果该行被选中
ip = self.table_widget.item(row, 1).text() # 获取IP地址
if ip:
try:
# 创建TCP连接到IP地址的12345端口
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(5) # 设置连接超时时间为5秒
s.connect((ip, 12345))
# 发送消息
message = bytes([0xAA, 0x55, 0xAA, 0x12, 0x01, 0x00])
s.sendall(message)
# 接收返回的数据假设返回数据至少有8个字节
response = s.recv(1024)
if len(response) >= 8:
# 解析第7、8字节作为mac后2字节
mac = f"50-29-4D-20-{response[6]}-{response[7]}"
# 将版本号写入到第3列MAC列
self.table_widget.setItem(row, 2, QTableWidgetItem(mac))
else:
print(f"接收到的数据长度不足:{len(response)},无法提取版本号")
except Exception as e:
print(f"与IP {ip} 连接失败: {e}")
self.table_widget.setItem(row, 3, QTableWidgetItem("连接失败"))
def calculate_crc(self, data: bytes) -> int:
"""计算数据的累加和CRC"""
crc = 0
for byte in data:
crc += byte
return crc & 0xFF # 只保留最低字节
2025-06-25 10:58:59 +08:00
def upgrade_node(self):
"""打开文件选择对话框"""
file_path, _ = QFileDialog.getOpenFileName(
self, "选择升级文件", "", "二进制文件 (*.bin);;所有文件 (*)"
)
if file_path:
file_name = os.path.basename(file_path)
# self.status_bar.showMessage(f"已选择: {file_name}")
try:
print("Starting upgrade process...")
if not os.path.exists(file_path):
raise FileNotFoundError(f"Upgrade file {file_path} not found")
with open(file_path, "rb") as f:
package_data = f.read()
if not package_data:
raise ValueError("Upgrade file is empty")
print(f"Read upgrade package, size: {len(package_data)} bytes")
upgrade_len = len(package_data)
crc = self.calculate_crc(package_data)
upgrade_req = struct.pack("<IB", upgrade_len, crc) + package_data
header_magic = bytes([0xAA, 0x55, 0xAA])
cmd = 0x05
version = 1
result_code = 0
package_head = struct.pack("<3sBBB", header_magic, cmd, version, result_code)
full_packet = package_head + upgrade_req
print("Full packet prepared:")
print(f"Header magic: {package_head[:3].hex()}")
print(f"Command: {cmd}, Version: {version}")
print(f"Upgrade length: {upgrade_len}, CRC: {crc}")
print(f"Total packet size: {len(full_packet)} bytes")
# 分包处理每包最大1500字节
max_packet_size = 1500
total_size = len(full_packet)
num_packets = (total_size // max_packet_size) + (1 if total_size % max_packet_size != 0 else 0)
print(f"Total packets: {num_packets}, each up to {max_packet_size} bytes")
for row in range(self.table_widget.rowCount()):
item = self.table_widget.item(row, 0)
if item.checkState() == Qt.Checked: # 如果该行被选中
ip = self.table_widget.item(row, 1).text() # 获取IP地址
if ip:
try:
# 创建TCP连接到IP地址的12345端口
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(5) # 设置连接超时时间为5秒
s.connect((ip, 12345))
for i in range(num_packets):
start_index = i * max_packet_size
end_index = min((i + 1) * max_packet_size, total_size)
packet_chunk = full_packet[start_index:end_index]
print(f"Sending packet {i + 1}/{num_packets}, size: {len(packet_chunk)} bytes")
s.sendall(packet_chunk)
print(f"Packet {i + 1} sent successfully")
time.sleep(0.03)
except Exception as e:
print(f"与IP {ip} 连接失败: {e}")
# self.table_widget.setItem(row, 3, QTableWidgetItem("连接失败"))
QMessageBox.information(self, "升级", "升级全部完成")
except Exception as e:
print(f"Upgrade failed: {str(e)}")
QMessageBox.critical(
self,
"升级",
"升级包上传失败"
)
2025-06-25 10:58:59 +08:00
def save_data(self):
2025-06-25 11:26:59 +08:00
# 打开文件保存对话框
options = QFileDialog.Options()
file_path, _ = QFileDialog.getSaveFileName(self, "保存表数据", "", "CSV Files (*.csv);;All Files (*)", options=options)
if file_path:
# 保存表格数据到CSV文件
with open(file_path, mode='w', newline='', encoding='utf-8') as file:
writer = csv.writer(file)
# 写入表头
headers = [self.table_widget.horizontalHeaderItem(i).text() for i in range(self.table_widget.columnCount())]
2025-06-25 17:42:57 +08:00
writer.writerow(headers[1:])
2025-06-25 11:26:59 +08:00
# 写入表格内容
for row in range(self.table_widget.rowCount()):
2025-06-25 17:42:57 +08:00
row_data = [self.table_widget.item(row, col).text() if self.table_widget.item(row, col) else ""
for col in
range(1, self.table_widget.columnCount())] # Start from col=1 to skip the first column
2025-06-25 11:26:59 +08:00
writer.writerow(row_data)
2025-06-25 17:42:57 +08:00
2025-06-25 11:26:59 +08:00
print(f"数据已保存到 {file_path}")
2025-06-25 10:58:59 +08:00
def load_ips(self):
2025-06-25 11:26:59 +08:00
# 打开文件选择对话框
options = QFileDialog.Options()
file_path, _ = QFileDialog.getOpenFileName(self, "选择IP地址文件", "", "Text Files (*.txt);;All Files (*)", options=options)
if file_path:
# 读取IP地址
with open(file_path, 'r') as file:
lines = file.readlines()
invalid_ips = []
for i, line in enumerate(lines):
ip = line.strip()
# 验证IP地址格式
if not self.validate_ip(ip):
invalid_ips.append(i + 1)
if invalid_ips:
# 如果有无效IP弹出提示框
QMessageBox.warning(self, "错误", f"以下行的IP地址无效{', '.join(map(str, invalid_ips))}")
else:
# 将IP地址加载到表格
2025-06-25 17:42:57 +08:00
self.table_widget.setRowCount(len(lines))
2025-06-25 11:26:59 +08:00
for i, line in enumerate(lines):
ip = line.strip()
if i < self.table_widget.rowCount():
self.table_widget.setItem(i, 1, QTableWidgetItem(ip)) # 设置IP地址列
print(f"加载了{len(lines)}个IP地址")
def validate_ip(self, ip):
# IP地址验证正则表达式
import re
pattern = r"^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$"
return bool(re.match(pattern, ip))
2025-06-25 10:58:59 +08:00
def load_all_info(self):
2025-06-25 11:26:59 +08:00
# 打开CSV文件选择对话框
options = QFileDialog.Options()
2025-06-25 17:42:57 +08:00
file_path, _ = QFileDialog.getOpenFileName(self, "选择CSV文件", "", "CSV Files (*.csv);;All Files (*)",
options=options)
2025-06-25 11:26:59 +08:00
if file_path:
# 读取CSV文件
with open(file_path, mode='r', encoding='utf-8') as file:
reader = csv.reader(file)
headers = next(reader) # 跳过表头
2025-06-25 17:42:57 +08:00
self.data = [row for row in reader]
if len(self.data) <= 20:
self.table_widget.setRowCount(len(self.data))
# 确保加载数据后更新分页
self.current_page = 0 # 每次加载新数据,默认回到第一页
self.update_table()
def update_table(self):
# 获取当前页的数据
start_row = self.current_page * self.rows_per_page
end_row = start_row + self.rows_per_page
rows_to_display = self.data[start_row:end_row]
# 更新表格内容
for row_num, row_data in enumerate(rows_to_display):
for col_num, cell_data in enumerate(row_data):
self.table_widget.setItem(row_num, col_num + 1, QTableWidgetItem(cell_data)) # +1跳过“选择”列
# 根据数据量调整上一页和下一页按钮的状态
self.prev_page_button.setEnabled(self.current_page > 0)
self.next_page_button.setEnabled((self.current_page + 1) * self.rows_per_page < len(self.data))
def prev_page(self):
if self.current_page > 0:
self.current_page -= 1
self.update_table()
def next_page(self):
if (self.current_page + 1) * self.rows_per_page < len(self.data):
self.current_page += 1
self.update_table()
2025-06-25 10:58:59 +08:00
if __name__ == "__main__":
app = QApplication(sys.argv)
dialog = BatchOperationDialog()
dialog.show()
sys.exit(app.exec_())