import os import sys import csv import socket, struct import time from PyQt5.QtWidgets import QApplication, QDialog, QVBoxLayout, QHBoxLayout, QTableWidget, QTableWidgetItem, \ QPushButton, QCheckBox, QFileDialog, QMessageBox from PyQt5.QtCore import Qt class BatchOperationDialog(QDialog): def __init__(self): super().__init__() self.setWindowTitle("批量操作节点") self.setGeometry(100, 100, 1093, 737) # 初始化分页 self.current_page = 0 self.rows_per_page = 20 # 初始化数据 self.data = [] # 初始化UI self.init_ui() def init_ui(self): # 主布局 main_layout = QVBoxLayout() # 创建表格(8列,初始行数可以设置为20) self.table_widget = QTableWidget(self.rows_per_page, 8) # 默认显示20行 self.table_widget.setHorizontalHeaderLabels( ["选择", "IP地址", "MAC地址", "版本号", "通道1名称", "通道1绑定设备", "通道2名称", "通道2绑定设备"] ) # 使通道列可编辑 for row in range(self.table_widget.rowCount()): self.table_widget.setItem(row, 4, QTableWidgetItem()) # 通道1名称 self.table_widget.setItem(row, 5, QTableWidgetItem()) # 通道1绑定设备 self.table_widget.setItem(row, 6, QTableWidgetItem()) # 通道2名称 self.table_widget.setItem(row, 7, QTableWidgetItem()) # 通道2绑定设备 # 添加勾选框到第1列 for row in range(self.table_widget.rowCount()): check_box_item = QTableWidgetItem() check_box_item.setCheckState(0) # 初始状态为未选中 self.table_widget.setItem(row, 0, check_box_item) # 按钮布局 button_layout = QHBoxLayout() # 创建按钮 self.select_all_button = QPushButton("全选") self.select_none_button = QPushButton("全不选") self.get_version_button = QPushButton("获取版本") self.get_mac_button = QPushButton("获取MAC") self.upgrade_button = QPushButton("升级节点") self.load_ips_button = QPushButton("加载IP列表") self.load_all_button = QPushButton("加载完整信息列表") self.save_button = QPushButton("保存表数据") self.prev_page_button = QPushButton("上一页") self.next_page_button = QPushButton("下一页") # 按钮连接 self.select_all_button.clicked.connect(self.select_all) self.select_none_button.clicked.connect(self.select_none) self.get_version_button.clicked.connect(self.get_version) self.get_mac_button.clicked.connect(self.get_mac) self.upgrade_button.clicked.connect(self.upgrade_node) self.save_button.clicked.connect(self.save_data) self.load_ips_button.clicked.connect(self.load_ips) self.load_all_button.clicked.connect(self.load_all_info) self.prev_page_button.clicked.connect(self.prev_page) self.next_page_button.clicked.connect(self.next_page) # 将按钮加入布局 button_layout.addWidget(self.select_all_button) button_layout.addWidget(self.select_none_button) button_layout.addWidget(self.get_version_button) button_layout.addWidget(self.get_mac_button) button_layout.addWidget(self.upgrade_button) button_layout.addWidget(self.load_ips_button) button_layout.addWidget(self.load_all_button) button_layout.addWidget(self.save_button) button_layout.addWidget(self.prev_page_button) button_layout.addWidget(self.next_page_button) # 将表格和按钮布局添加到主布局 main_layout.addWidget(self.table_widget) main_layout.addLayout(button_layout) # 设置对话框的布局 self.setLayout(main_layout) # 允许最大化 # self.setWindowState(Qt.WindowMaximized) def select_all(self): for row in range(self.table_widget.rowCount()): item = self.table_widget.item(row, 0) item.setCheckState(2) # 选中 def select_none(self): for row in range(self.table_widget.rowCount()): item = self.table_widget.item(row, 0) item.setCheckState(0) # 未选中 def get_version(self): # 遍历表格的第1列,获取选中的行的IP地址 for row in range(self.table_widget.rowCount()): item = self.table_widget.item(row, 0) if item.checkState() == Qt.Checked: # 如果该行被选中 ip = self.table_widget.item(row, 1).text() # 获取IP地址 if ip: try: # 创建TCP连接到IP地址的12345端口 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(5) # 设置连接超时时间为5秒 s.connect((ip, 12345)) # 发送消息 message = bytes([0xAA, 0x55, 0xAA, 0x0F, 0x01, 0x00]) s.sendall(message) # 接收返回的数据(假设返回数据至少有8个字节) response = s.recv(1024) if len(response) >= 8: # 解析第7、8字节作为版本号 version = f"{response[6]}-{response[7]}" # 将版本号写入到第4列(版本号列) self.table_widget.setItem(row, 3, QTableWidgetItem(version)) else: print(f"接收到的数据长度不足:{len(response)},无法提取版本号") except Exception as e: print(f"与IP {ip} 连接失败: {e}") self.table_widget.setItem(row, 3, QTableWidgetItem("连接失败")) def get_mac(self): for row in range(self.table_widget.rowCount()): item = self.table_widget.item(row, 0) if item.checkState() == Qt.Checked: # 如果该行被选中 ip = self.table_widget.item(row, 1).text() # 获取IP地址 if ip: try: # 创建TCP连接到IP地址的12345端口 with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(5) # 设置连接超时时间为5秒 s.connect((ip, 12345)) # 发送消息 message = bytes([0xAA, 0x55, 0xAA, 0x12, 0x01, 0x00]) s.sendall(message) # 接收返回的数据(假设返回数据至少有8个字节) response = s.recv(1024) if len(response) >= 8: # 解析第7、8字节作为mac后2字节 mac = f"50-29-4D-20-{response[6]}-{response[7]}" # 将版本号写入到第3列(MAC列) self.table_widget.setItem(row, 2, QTableWidgetItem(mac)) else: print(f"接收到的数据长度不足:{len(response)},无法提取版本号") except Exception as e: print(f"与IP {ip} 连接失败: {e}") self.table_widget.setItem(row, 3, QTableWidgetItem("连接失败")) def calculate_crc(self, data: bytes) -> int: """计算数据的累加和CRC""" crc = 0 for byte in data: crc += byte return crc & 0xFF # 只保留最低字节 def upgrade_node(self): """打开文件选择对话框""" file_path, _ = QFileDialog.getOpenFileName( self, "选择升级文件", "", "二进制文件 (*.bin);;所有文件 (*)" ) if file_path: file_name = os.path.basename(file_path) # self.status_bar.showMessage(f"已选择: {file_name}") try: print("Starting upgrade process...") if not os.path.exists(file_path): raise FileNotFoundError(f"Upgrade file {file_path} not found") with open(file_path, "rb") as f: package_data = f.read() if not package_data: raise ValueError("Upgrade file is empty") print(f"Read upgrade package, size: {len(package_data)} bytes") upgrade_len = len(package_data) crc = self.calculate_crc(package_data) upgrade_req = struct.pack(" 0) self.next_page_button.setEnabled((self.current_page + 1) * self.rows_per_page < len(self.data)) def prev_page(self): if self.current_page > 0: self.current_page -= 1 self.update_table() def next_page(self): if (self.current_page + 1) * self.rows_per_page < len(self.data): self.current_page += 1 self.update_table() if __name__ == "__main__": app = QApplication(sys.argv) dialog = BatchOperationDialog() dialog.show() sys.exit(app.exec_())