diff --git a/SamplingDialog.py b/SamplingDialog.py new file mode 100644 index 0000000..2c228e0 --- /dev/null +++ b/SamplingDialog.py @@ -0,0 +1,589 @@ + +from PyQt5.QtWidgets import ( + QApplication, QDialog, QLabel, QLineEdit, QPushButton,QMessageBox, + QHBoxLayout, QVBoxLayout, QMainWindow, QWidget, QRadioButton, QButtonGroup,QGridLayout,QFormLayout +) +from PyQt5.QtGui import QRegExpValidator +from PyQt5.QtCore import QRegExp,QByteArray +import struct +from PyQt5.QtNetwork import QTcpSocket +import numpy as np +from numpy.ma.core import masked +from scipy.optimize import leastsq +import struct +HEADER_MAGIC = bytes([0xAA, 0x55, 0xAA]) +HEADER_SIZE = 6 # PackgeHead除去data字段的大小(3+1+1+1) + +class SamplingDialog(QDialog): + def __init__(self, parent=None): + super().__init__(parent) + self.setWindowTitle("采样参数设置") + + self.rate_combo = QComboBox() + self.rate_combo.addItems([ + "8000", "16000", "24000", "32000", "48000", "96000" + ]) + + self.time_input = QLineEdit() + self.time_input.setPlaceholderText("输入采样时间(秒)") + + self.ok_button = QPushButton("确定") + self.ok_button.clicked.connect(self.accept) + + form_layout = QFormLayout() + form_layout.addRow("采样率(Hz)", self.rate_combo) + form_layout.addRow("采样时间(秒)", self.time_input) + + layout = QVBoxLayout() + layout.addLayout(form_layout) + layout.addWidget(self.ok_button) + + self.setLayout(layout) + + def get_values(self): + # 返回采样率编号(整数)和采样时间(字符串或整数) + rate_code_map = { + "8000": 0x00, + "16000": 0x01, + "24000": 0x02, + "32000": 0x03, + "48000": 0x04, + "96000": 0x05 + } + rate_str = self.rate_combo.currentText() + rate_code = rate_code_map.get(rate_str, 0x00) + return rate_code, self.time_input.text() +class PackageHead: + @staticmethod + def parse(data: bytes): + if len(data) < 6: + return None + head = data[0:3] + if head != b'\xAA\x55\xAA': + return None + cmd, ver, result_code = data[3:6] + return { + 'cmd': cmd, + 'version': ver, + 'result_code': result_code, + 'data': data[6:] + } +class GetCoe: + def __init__(self, temp_ch1_1, temp_ch1_2, dc_ch1_1, dc_ch1_2,ac_ch1_1,ac_ch1_2,temp_ch2_1,temp_ch2_2,dc_ch2_1,dc_ch2_2, + ac_ch2_1, ac_ch2_2): + self.temp_ch1_1 = temp_ch1_1 + self.temp_ch1_2 = temp_ch1_2 + self.dc_ch1_1 = dc_ch1_1 + self.dc_ch1_2 = dc_ch1_2 + self.ac_ch1_1 = ac_ch1_1 + self.ac_ch1_2 = ac_ch1_2 + self.temp_ch2_1 = temp_ch2_1 + self.temp_ch2_2 = temp_ch2_2 + self.dc_ch2_1 = dc_ch2_1 + self.dc_ch2_2 = dc_ch2_2 + self.ac_ch2_1 = ac_ch2_1 + self.ac_ch2_2 = ac_ch2_2 +class Eigenvalue: + def __init__(self, temp1, temp2, offset1, offset2): + self.temp1 = temp1 + self.temp2 = temp2 + self.offset1 = offset1 + self.offset2 = offset2 +class MacConfigDialog(QDialog): + def __init__(self, socket: QTcpSocket, parent=None): + super().__init__(parent) + self.setWindowTitle("配置 MAC") + self.socket = socket + self.setFixedWidth(300) + self.mac_input1 = QLineEdit() + self.mac_input2 = QLineEdit() + hex_reg = QRegExp("[0-9A-Fa-f]{1,2}") + validator = QRegExpValidator(hex_reg) + self.mac_input1.setValidator(validator) + self.mac_input2.setValidator(validator) + self.save_btn = QPushButton("保存") + self.cancel_btn = QPushButton("取消") + + mac_line = QHBoxLayout() + mac_line.addWidget(QLabel("MAC 地址: 50:29:4D:20")) + mac_line.addWidget(self.mac_input1) + mac_line.addWidget(QLabel(":")) + mac_line.addWidget(self.mac_input2) + + btn_line = QHBoxLayout() + btn_line.addWidget(self.save_btn) + btn_line.addWidget(self.cancel_btn) + + main_layout = QVBoxLayout() + main_layout.addLayout(mac_line) + main_layout.addLayout(btn_line) + self.setLayout(main_layout) + + self.save_btn.clicked.connect(self.send_mac_config) + self.cancel_btn.clicked.connect(self.reject) + + def send_mac_config(self): + try: + mac1 = int(self.mac_input1.text(), 16) + mac2 = int(self.mac_input2.text(), 16) + except ValueError: + QMessageBox.warning(self, "输入错误", "请输入合法的十六进制数") + return + + packet = bytes([ + 0xAA, 0x55, 0xAA, + 0x0D, # cmd + 0x01, # version + 0x00 # result_code (发送时为0) + ]) + struct.pack('2B', mac1, mac2) + + self.socket.write(packet) + if not self.socket.waitForBytesWritten(1000): + QMessageBox.critical(self, "错误", "发送失败") + return + + if not self.socket.waitForReadyRead(3000): + QMessageBox.critical(self, "超时", "未收到回应") + return + + data = bytes(self.socket.readAll()) + pkg = PackageHead.parse(data) + + if not pkg: + QMessageBox.critical(self, "错误", "返回数据格式无效") + return + + if pkg['result_code'] == 0x00: + QMessageBox.information(self, "成功", "配置成功") + self.accept() + else: + QMessageBox.warning(self, "失败", f"错误码: {pkg['result_code']}") + +class IPConfigDialog(QDialog): + def __init__(self, socket: QTcpSocket, parent=None): + super().__init__(parent) + self.setWindowTitle("配置 IP") + self.socket = socket + self.setFixedWidth(300) + self.setFixedHeight(150) + self.dhcp_radio = QRadioButton("DHCP") + self.static_radio = QRadioButton("Static") + self.dhcp_radio.setChecked(True) + radio_layout = QHBoxLayout() + radio_layout.addWidget(self.dhcp_radio) + radio_layout.addWidget(self.static_radio) + + self.radio_group = QButtonGroup() + self.radio_group.addButton(self.dhcp_radio) + self.radio_group.addButton(self.static_radio) + + # IPv4地址输入框及标签 + self.ip_input = QLineEdit() + self.mask_input = QLineEdit() + self.gw_input = QLineEdit() + + # IP地址校验器,限制0-255的IPv4格式 + ip_regex = QRegExp(r"^(25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)" + r"\.(25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)" + r"\.(25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)" + r"\.(25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)$") + ip_validator = QRegExpValidator(ip_regex) + self.ip_input.setValidator(ip_validator) + self.mask_input.setValidator(ip_validator) + self.gw_input.setValidator(ip_validator) + + form_layout = QFormLayout() + form_layout.setLabelAlignment(Qt.AlignRight) + + form_layout.addRow(QLabel("IP 地址:"), self.ip_input) + form_layout.addRow(QLabel("子网掩码:"), self.mask_input) + form_layout.addRow(QLabel("网关:"), self.gw_input) + + self.save_btn = QPushButton("保存") + self.cancel_btn = QPushButton("取消") + btn_line = QHBoxLayout() + btn_line.addWidget(self.save_btn) + btn_line.addWidget(self.cancel_btn) + + main_layout = QVBoxLayout() + main_layout.addLayout(radio_layout) + main_layout.addLayout(form_layout) + main_layout.addLayout(btn_line) + self.setLayout(main_layout) + + self.save_btn.clicked.connect(self.send_ip_config) + self.cancel_btn.clicked.connect(self.reject) + + # DHCP模式下禁用IP等输入 + self.dhcp_radio.toggled.connect(self.update_input_enabled) + self.update_input_enabled() + + def update_input_enabled(self): + enabled = self.static_radio.isChecked() + self.ip_input.setEnabled(enabled) + self.mask_input.setEnabled(enabled) + self.gw_input.setEnabled(enabled) + + def send_ip_config(self): + mode = 0 if self.dhcp_radio.isChecked() else 1 + + def parse_ip(ip_str): + parts = ip_str.strip().split('.') + if len(parts) != 4: + raise ValueError() + return [int(p) for p in parts] + if mode == 0: + packet = bytes([ + 0xAA, 0x55, 0xAA, + 0x0E, # cmd + 0x01, # version + 0x00 # result_code (发送时为0) + ]) + struct.pack('B', mode) + else: + try: + ip = parse_ip(self.ip_input.text()) + mask = parse_ip(self.mask_input.text()) + gw = parse_ip(self.gw_input.text()) + except ValueError: + QMessageBox.warning(self, "输入错误", "请输入合法的 IPv4 地址格式") + return + packet = bytes([ + 0xAA, 0x55, 0xAA, + 0x0E, # cmd + 0x01, # version + 0x00 # result_code (发送时为0) + ]) + struct.pack('B', mode) + bytes(ip) + bytes(mask) + bytes(gw) + + + + self.socket.write(packet) + if not self.socket.waitForBytesWritten(1000): + QMessageBox.critical(self, "错误", "发送失败") + return + + if not self.socket.waitForReadyRead(3000): + QMessageBox.critical(self, "超时", "未收到回应") + return + + data = bytes(self.socket.readAll()) + pkg = PackageHead.parse(data) + + if not pkg: + QMessageBox.critical(self, "错误", "返回数据格式无效") + return + + if pkg['result_code'] == 0x00: + QMessageBox.information(self, "成功", "配置成功") + self.accept() + else: + QMessageBox.warning(self, "失败", f"错误码: {pkg['result_code']}") +from PyQt5.QtWidgets import ( + QDialog, QVBoxLayout, QHBoxLayout, QLabel, QPushButton, QComboBox, + QGridLayout, QApplication +) +from PyQt5.QtCore import Qt + +class CalibrationDialog(QDialog): + def __init__(self, socket: QTcpSocket, parent=None): + super().__init__(parent) + self.setWindowTitle("校准设置") + self.socket = socket + self.type_index = 0 + # 通道与类型选择 + self.partial_data = QByteArray() + self.recv_state = '' + self.channel_cb = QComboBox() + self.channel_cb.addItems(["通道1", "通道2"]) + + self.type_cb = QComboBox() + self.type_cb.addItems(["温度", "电压DC", "电压AC"]) + self.type_cb.currentTextChanged.connect(self.update_voltage_points) + + top_layout = QHBoxLayout() + top_layout.addWidget(QLabel("通道:")) + top_layout.addWidget(self.channel_cb) + top_layout.addWidget(QLabel("类型:")) + top_layout.addWidget(self.type_cb) + + # 电压点信息 + self.voltage_labels = [] + self.value_labels = [] + self.get_buttons = [] + self.voltage_points = [] + + grid = QGridLayout() + grid.addWidget(QLabel("电压点"), 0, 0) + grid.addWidget(QLabel("校准值"), 0, 1) + grid.addWidget(QLabel("操作按钮"), 0, 2) + + for i in range(3): + v_label = QLabel() + self.voltage_labels.append(v_label) + + val_label = QLabel("未获取") + self.value_labels.append(val_label) + + btn = QPushButton("获取") + btn.clicked.connect(lambda _, index=i: self.fetch_voltage(index)) + self.get_buttons.append(btn) + + grid.addWidget(v_label, i+1, 0) + grid.addWidget(val_label, i+1, 1) + grid.addWidget(btn, i+1, 2) + + self.update_voltage_points() + + # 初始化与校准按钮 + self.init_btn = QPushButton("初始化") + self.calibrate_btn = QPushButton("校准") + self.get_coe_btn = QPushButton("获取系数") + self.init_btn.clicked.connect(self.initialize_calibration) + self.calibrate_btn.clicked.connect(self.calibrate) + self.get_coe_btn.clicked.connect(self.get_coe) + self.socket.readyRead.connect(self.on_ready_read) + + btn_layout = QHBoxLayout() + btn_layout.addStretch() + btn_layout.addWidget(self.init_btn) + btn_layout.addWidget(self.calibrate_btn) + btn_layout.addWidget(self.get_coe_btn) + + main_layout = QVBoxLayout() + main_layout.addLayout(top_layout) + main_layout.addLayout(grid) + main_layout.addLayout(btn_layout) + + self.setLayout(main_layout) + + def update_voltage_points(self): + type_text = self.type_cb.currentText() + if type_text == "温度": + self.voltage_points = [0.5, 1.5, 2.5] + elif type_text == "电压DC": + self.voltage_points = [1, 8, 15] + else: + self.voltage_points = [1, 5, 9] + + for i, v in enumerate(self.voltage_points): + self.voltage_labels[i].setText(f"{v:.1f}V") + self.value_labels[i].setText("未获取") + + def fetch_voltage(self, index): + # 模拟读取值 + # import random + # val = round(self.voltage_points[index] + random.uniform(-0.05, 0.05), 3) + # self.value_labels[index].setText(str(val)) + self.type_index = index + if self.type_cb.currentIndex() != 2: + self.get_temp_dc() + else: + self.get_wave() + + def initialize_calibration(self): + for label in self.value_labels: + label.setText("未获取") + def get_coe(self): + try: + self.socket.write(bytes( + [0xAA, 0x55, 0xAA, 0x10, 0x01, 0x00])) # 发送数据 + self.socket.waitForReadyRead() + + + except Exception as e: + self.status_bar.showMessage(f"状态: 错误 - {str(e)}") + def calibrate(self): + try: + self.recv_state = '' + x = self.voltage_points + y = [float(label.text()) for label in self.value_labels] + if any(label.text() == "未获取" for label in self.value_labels): + raise ValueError("未获取所有电压点数据") + except ValueError: + from PyQt5.QtWidgets import QMessageBox + QMessageBox.warning(self, "错误", "请先获取所有电压点的校准值") + return + + + + # 非线性最小二乘拟合 y = a*x + b + def func(p, x): + a, b = p + return a * x + b + + def error(p, x, y): + return func(p, x) - y + + p0 = [1, 0] + plsq = leastsq(error, p0, args=(np.array(x), np.array(y))) + a, b = plsq[0] + + print(f"拟合结果: a = {a:.4f}, b = {b:.4f}") + + # 获取通道号和类型索引 + ch = self.channel_cb.currentIndex() + 1 # 通道1 对应 1 + group = self.type_cb.currentIndex() # 0=temp, 1=dc, 2=ac + print(f"ch:{ch},group:{group}") + + header = bytes([0xAA, 0x55, 0xAA, 0x11,0x01,0x00]) # 可根据实际协议修改 + payload = struct.pack('= HEADER_SIZE: + self.header = self.socket.read(HEADER_SIZE) + if self.header[:3] != HEADER_MAGIC: + print("无效 header magic,跳过") + continue + self.cmd = self.header[3] + if self.cmd == 0x01: + self.recv_state = 'WAIT_WAVE_HEADER' + else: + print(f"未知或暂不处理的 cmd: {self.cmd}") + self.recv_state = 'WAIT_HEADER' + return + else: + return + elif self.recv_state == 'WAIT_WAVE_HEADER': + if self.socket.bytesAvailable() >= 5: + wave_header = self.socket.read(5) + self.sampling_rate = struct.unpack('= self.expected_length: + self.process_wave_packet(bytes(self.partial_data)) + self.recv_state = 'WAIT_HEADER' + self.partial_data.clear() + else: + if self.socket.bytesAvailable() >= HEADER_SIZE: + recv_data = self.socket.read(1340) + if recv_data[:3] != HEADER_MAGIC: + print("无效 header magic,跳过") + continue + cmd = recv_data[3] + if cmd == 0x0C: + body_format = '< H H H H' + body_size = struct.calcsize(body_format) + body_data = recv_data[HEADER_SIZE:HEADER_SIZE + body_size] + unpacked_data = struct.unpack(body_format, body_data) + value = Eigenvalue(*unpacked_data) + print(f"温度1:{value.temp1/1000},温度2:{value.temp2/1000},偏置电压1:{value.offset1/100},偏置电压2:{value.offset2/100}") + if self.channel_cb.currentIndex() == 0 and self.type_cb.currentIndex() == 0: + self.value_labels[self.type_index].setText(str(value.temp1/1000)) + elif self.channel_cb.currentIndex() == 0 and self.type_cb.currentIndex() == 1: + self.value_labels[self.type_index].setText(str(value.offset1 / 100)) + elif self.channel_cb.currentIndex() == 1 and self.type_cb.currentIndex() == 0: + self.value_labels[self.type_index].setText(str(value.temp2/1000)) + elif self.channel_cb.currentIndex() == 1 and self.type_cb.currentIndex() == 1: + self.value_labels[self.type_index].setText(str(value.offset2 / 100)) + elif cmd == 0x11: + pkg = PackageHead.parse(recv_data) + if not pkg: + QMessageBox.critical(self, "错误", "返回数据格式无效") + return + if pkg['result_code'] == 0x00: + QMessageBox.information(self, "成功", "配置成功") + return + elif cmd == 0x10: + body_format = '< f f f f f f f f f f f f' + body_size = struct.calcsize(body_format) + body_data = recv_data[HEADER_SIZE:HEADER_SIZE + body_size] + unpacked_data = struct.unpack(body_format, body_data) + value = GetCoe(*unpacked_data) + print( + f"{value.temp_ch1_1, value.temp_ch1_2, value.dc_ch1_1, value.dc_ch1_2, value.ac_ch1_1, value.ac_ch1_2}") + print( + f"{value.temp_ch2_1, value.temp_ch2_2, value.dc_ch2_1, value.dc_ch2_2, value.ac_ch2_1, value.ac_ch2_2}") + def process_wave_packet(self,wave_data): + data = wave_data # 接收所有数据 + data = np.frombuffer(data, dtype=np.int32) # 根据实际数据格式转换 + LSB_32BIT = (2.8 / (2 ** 31)) * ((750 + 287) / 287) * 1000/10.2 + self.scaled_data = data * LSB_32BIT + + result = self.get_extremes(self.scaled_data) + print("最大的10个值:", result['top10_max']) + print("最小的10个值:", result['top10_min']) + mean_max = self.mean_without_max_optimized(result['top10_max']) + print(f"top10_max 去除最大的数据后的平均值1:{mean_max}") + mean_min = self.mean_without_min_optimized(result['top10_min']) + print(f"top10_min 去除最大的数据后的平均值2:{mean_min}") + pp = mean_max - mean_min + if self.type_cb.currentIndex() == 2: + self.value_labels[self.type_index].setText(str(pp)) + print(f"pp :{mean_max - mean_min}") + + def closeEvent(self, event): + try: + print("closeEvent") + self.socket.readyRead.disconnect(self.on_ready_read) + except TypeError: + pass + super().closeEvent(event) \ No newline at end of file diff --git a/feauture_calculate.py b/feauture_calculate.py new file mode 100644 index 0000000..727d35b --- /dev/null +++ b/feauture_calculate.py @@ -0,0 +1,754 @@ +from math import floor, ceil +import numpy as np +from numpy import real, sqrt, mean, array +from scipy import fftpack +# 频域滤波 +from scipy.signal import butter, sosfilt, hilbert, argrelextrema + + +def fft_spectrum(signal: np.ndarray, fs: float): + """ + 傅里叶变换频谱 + :param signal:np.ndarray 时序振动信号 + :param fs:float 采样频率 + :return: + f:np.ndarray, 频率 + y:np.ndarray, 幅值 + """ + n = len(signal) + f = fs * np.arange(n // 2) / n + fft_data = abs(np.fft.fft(signal))[:int(n / 2)] * 2 / n # 归一化处理,双边频谱反转后需要×2 + return f, fft_data + + +def fft_filter(signal, lpf1, lpf2, fs): + """ + signal: np.ndarray 原始振动信号 + fs: float 采样频率 + lpf1: float 滤波频率-低频 + lpf2: float 滤波频率-高频 + :return + y: np.ndarray 滤波信号 + """ + yy = fftpack.fft(signal) + m = len(yy) + k = m / fs + for i in range(0, floor(k * lpf1)): + yy[i] = 0 + for i in range(ceil(k * lpf2 - 1), m): + yy[i] = 0 + y = 2 * real(fftpack.ifft(yy)) + return y +# 高通滤波 +def fft_filter_high_pass(signal, lpf_high, fs): + """ + signal: np.ndarray 原始振动信号 + fs: float 采样频率 + lpf_high: float 滤波频率 + :return + y: np.ndarray 滤波信号 + """ + yy = fftpack.fft(signal) + m = len(yy) + k = m / fs + for i in range(0, floor(k * lpf_high) + 1): + yy[i] = 0 + for i in range(ceil(k * (fs - lpf_high) - 1), m): + yy[i] = 0 + y = real(fftpack.ifft(yy)) + return y +# 低通滤波 +def fft_filter_low_pass(signal, lpf_low, fs): + """ + signal: np.ndarray 原始振动信号 + fs: float 采样频率 + lpf_low: float 滤波频率 + :return + y: np.ndarray 滤波信号 + """ + yy = fftpack.fft(signal) + m = len(yy) + k = m / fs + for i in range(ceil(k * lpf_low - 1), m): + yy[i] = 0 + y = 2 * real(fftpack.ifft(yy)) + return y + +# 希尔伯特变换 +def hilbert_envelop(signal): + """ + 输入原始信号,输出包络时域信号 + """ + hx = hilbert(signal) # 对信号进行希尔伯特变换。 + analytic_signal = signal - hx * 1j # 进行hilbert变换后的解析信号 + return np.abs(analytic_signal) + + +def acc2dis(data: np.ndarray, fs: float): + """ + 采用时域积分的方式,将振动加速度信号转化为速度信号和位移信号 + Parameters + ---------- + data: np.ndarray, 振动加速度信号 + fs: float, 采样频率 + + Return + ------ + s_ifft: np.ndarray, 积分速度信号 + d_ifft:np.ndarray, 积分位移信号 + """ + n = len(data) + a_mul_dt = data / fs + + s = [] + total = a_mul_dt[0] + for i in range(n - 1): + total = total + a_mul_dt[i + 1] + s.append(total) + s_fft = np.fft.fft(s) + s_fft[:2] = 0 # 去除直流分量 + s_fft[-3:] = 0 # 去除直流分量 + s_ifft = np.real(np.fft.ifft(s_fft)) + + s_mut_dt = s_ifft / fs + d = [] + total = s_mut_dt[0] + for i in range(n - 2): + total = total + s_mut_dt[i + 1] + d.append(total) + d_fft = np.fft.fft(d) + d_fft[:2] = 0 + d_fft[-3:] = 0 + d_ifft = np.real(np.fft.ifft(d_fft)) + return s_ifft * 1000, d_ifft * 1000000 # 单位转换 + + +def calc_vel_pass_rms(signal, fs): + """ + signal: 振动加速度信号 + fs: 采样频率 + :return: 通频速度有效值(10-1000Hz) + """ + #x1 = fft_filter(signal, 10, 1000, fs) + x2, x3 = acc2dis(signal, fs) + #x4 = fft_filter(x2, 10, 1000, fs) + vel_pass_rms = np.sqrt(np.mean(x2 ** 2)) + return vel_pass_rms + + +# 低频速度有效值(3-1000Hz) +def calc_vel_low_rms(signal, fs): + """"" + signal: 振动加速度信号 + fs: 采样频率 + Return + v_rms: 速度有效值 + """"" + x1 = fft_filter(signal, 3, 1000, fs) + x2, x3 = acc2dis(x1, fs) + vel_low_rms = np.sqrt(np.mean(x2 ** 2)) + return vel_low_rms + + +# 加速度有效值(3-10KHz) +def calc_acc_rms(signal, fs): + """"" + signal: 振动加速度信号 + fs: 采样频率 + Return + a_rms: 加速度有效值 + """"" + #x1 = fft_filter(signal, 3,10000, fs) + acc_rms = np.sqrt(np.mean(signal ** 2)) + return acc_rms + + +# 加速度峰值(3-10KHz) +def calc_acc_p(signal, fs): + """"" + signal: 振动加速度信号 + fs: 采样频率 + Return + a_p: 加速度有效值 + """"" + #x1 = fft_filter(signal, 3,10000, fs) + x2 = sorted(abs(signal), reverse=True) + x3 = x2[0:100] + acc_p = np.mean(x3) + return acc_p + + +# 振动冲击值(5K~10KHz) +def calc_vibration_impulse(signal, fs): + """"" + signal: 振动加速度信号 + fs: 采样频率 + Return + impulse: 振动冲击值 + """"" + x1 = fft_filter(signal, 5000, 10000, fs) + x2 = hilbert_envelop(x1) + x3 = fft_filter(x2, 3, 500, fs) + impulse = np.sqrt(np.mean(x3 ** 2)) + return impulse + + +# 加速度峭度指标(3~10KHz) +def calc_acc_kurtosis(signal, fs): + """"" + signal: 振动加速度信号 + fs: 采样频率 + Return + acc_k: 峭度指标 + """"" + x1 = fft_filter(signal, 3, 10000, fs) + K = sum(x1 ** 4) / len(x1) + a = sqrt(sum(x1 ** 2) / len(x1)) + acc_kurtosis = K / (a ** 4) + return acc_kurtosis + + +# 加速度歪度指标(3-10KHz) +def calc_acc_skew(signal, fs): + """"" + signal: 振动加速度信号 + fs: 采样频率 + Return + acc_s: 歪度指标 + """"" + x1 = fft_filter(signal, 3, 10000, fs) + S = sum(x1 ** 3) / len(x1) + a = sqrt(sum(x1 ** 2) / len(x1)) + acc_skew = S / (a ** 3) + return acc_skew + + +# 速度峰值(3-1KHz) +def calc_vel_p(signal, fs): + """"" + signal: 振动加速度信号 + fs: 采样频率 + Return + vel_p: 速度峰值 + """"" + #x1 = fft_filter(signal, 10, 1000, fs) + x2, x3 = acc2dis(signal, fs) + x4 = sorted(abs(x2), reverse=True) + x5 = x4[0:100] + vel_p = np.mean(x5) + return vel_p + + +def calc_fea_acc(signal, fs): + """ + 加速度传感器特征值计算 + """ + # 通频速度有效值 + vel_pass_rms = calc_vel_pass_rms(signal, fs) + # 低频速度有效值 + vel_low_rms = calc_vel_low_rms(signal, fs) + # 加速度有效值 + acc_rms = calc_acc_rms(signal, fs) + # 加速度峰值 + acc_p = calc_acc_p(signal, fs) + # 振动冲击值 + vibration_impulse = calc_vibration_impulse(signal, fs) + # 加速度峭度指标 + acc_kurtosis = calc_acc_kurtosis(signal, fs) + # 加速度歪度指标 + acc_skew = calc_acc_skew(signal, fs) + # 速度峰值 + vel_p = calc_vel_p(signal, fs) + return vel_pass_rms, vel_low_rms, acc_rms, acc_p, vibration_impulse, acc_kurtosis, acc_skew, vel_p + + +# 最大正向峰值(5-500Hz) +def calc_max_positive_p(signal, fs): + """"" + signal: 振动位移信号 + fs: 采样频率 + Return + max_positive_p: 最大正向峰值 + """"" + x1 = fft_filter(signal, 5, 500, fs) + x2 = [] + for i in range(1, len(x1)): + if x1[i - 1] > 0: + k = x1[i - 1] + x2.append(k) + max_positive_p = abs(max(x2) - np.mean(x1)) + return max_positive_p + + +# 最大负向峰值(5-500Hz) +def calc_max_negative_p(signal, fs): + """"" + signal: 振动位移信号 + fs: 采样频率 + Return + max_negative_p: 最大负向峰值 + """"" + x1 = fft_filter(signal, 5, 500, fs) + x2 = [] + for i in range(1, len(x1)): + if x1[i - 1] < 0: + k = x1[i - 1] + x2.append(k) + max_negative_p = abs(min(x2) - np.mean(x1)) + return max_negative_p + + +# 监测峰峰值(5-500Hz) +def calc_monitor_pp(signal, fs, L): + """"" + signal: 振动位移信号 + fs: 采样频率 + L: 传感器量程 + Return + mon_pp: 监测峰峰值 + """"" + x1 = fft_filter(signal, 5, 500, fs) + for i in range(1, len(x1)): + if x1[i - 1] < x1[i]: + x1[i] = x1[i - 1] + (1000 / fs) * (L * 0.05) + elif x1[i - 1] > x1[i]: + x1[i] = x1[i - 1] - x1[i - 1] * 0.63 * (1 / fs) + mon_pp = max(x1) - min(x1) + return mon_pp + + +# 诊断峰峰值(5-500Hz) +def calc_diagnosis_pp(signal, fs): + """"" + signal: 振动位移信号 + fs: 采样频率 + Return + dia_pp: 诊断峰峰值 + """"" + x1 = fft_filter(signal, 5, 500, fs) + dia_pp = max(x1) - min(x1) + return dia_pp + + +# 峰值(5-500Hz) +def calc_peak(signal, fs): + """"" + signal: 振动位移信号 + fs: 采样频率 + Return + Peak: 峰值 + """"" + x1 = fft_filter(signal, 5, 500, fs) + peak = (max(x1) - min(x1)) / 2 + return peak + + +# 峰值因子(5-500Hz) +def calc_peaking_factor(signal, fs): + """"" + signal: 振动位移信号 + fs: 采样频率 + Return + Cf: 峰值因子 + """"" + x1 = fft_filter(signal, 5, 500, fs) + PP = (max(x1) - min(x1)) / 2 + rms = np.sqrt(np.mean(x1 ** 2)) + peaking_factor = PP / rms + return peaking_factor + + +# 推导峰值(5-500Hz) +def calc_derive_p(signal, fs): + """"" + signal: 振动位移信号 + fs: 采样频率 + Return + de_p: 推导峰值 + """"" + x1 = fft_filter(signal, 5, 500, fs) + rms = np.sqrt(np.mean(x1 ** 2)) + de_p = 1.414 * rms + return de_p + + +def calc_fea_displacement(signal, fs, L): + """ + 位移传感器特征值计算 + """ + # 最大正向峰值 + max_positive_p = calc_max_positive_p(signal, fs) + # 最大负向峰值 + max_negative_p = calc_max_negative_p(signal, fs) + # 监测峰峰值 + mon_pp = calc_monitor_pp(signal, fs, L) + # 诊断峰峰值 + dia_pp = calc_diagnosis_pp(signal, fs) + # 峰值 + peak = calc_peak(signal, fs) + # 峰值因子 + peaking_factor = calc_peaking_factor(signal, fs) + # 推导峰值 + de_p = calc_derive_p(signal, fs) + return max_positive_p, max_negative_p, mon_pp, dia_pp, peak, peaking_factor, de_p + + +def filter_wave(x: np.ndarray, order: int, wn: int or list, type: str, fs: float, output: str = 'sos'): + """ + 振动信号过滤,实现高通,低通,旁通等功能。 + :param x: np.ndarray + :param order: int,阶次 + :param wn: int or list, if type is lowpass or highpass,the value is int,otherwise the value is list 截止频率 + ,top and bottom limit + :param type:str,lowpass,highpass,bandpass,bands-top + :param fs:float,frequency + :param output:str,three types: ba,sos,zpk + :return:np.array,过滤的信号 + """ + sos = butter(order, wn, type, False, fs=fs, output=output) + return sosfilt(sos, x) + + +def envelope_detection(x: np.ndarray): + """ + 提取振动信号的包络曲线 + Parameters + ---------- + x: np.ndarray, 原始振动信号 + y: np.ndarray, 希尔伯特变换变换后的解析信号 + Return + ------ + z: np.ndarray, 包络曲线 + """ + x1 = hilbert(x) + y = x - x1 * 1j + z = abs(y) + return z + + +def calc_ylb_SWE(signal, fs): + """ + :return ylb_SWE: 应力波能量 + """ + f_min = 32000 # 低频截至频率 + f_max = 40000 # 高频截至频率 + signal_filter_data = filter_wave(signal, 8, [f_min, f_max], "bandpass", fs) + signal_envelope = envelope_detection(signal_filter_data) + ylb_SWE = sum(signal_envelope[signal_envelope > 0]) + return ylb_SWE + + + + + +def calc_ylb_SWPE(signal, fs): + """ + :return ylb_SWPE: 脉冲能量 + """ + f_min = 32000 # 低频截至频率 + f_max = 48000 # 高频截至频率 + L_times = 0.1 # 最小排序法时有效:人工设置极限阈值 + signal_filter_data = filter_wave(signal, 8, [f_min, f_max], "bandpass", fs) + signal_envelope = envelope_detection(signal_filter_data) + signal_envelope_less = signal_envelope[argrelextrema(signal_envelope, np.less)] # 求局部极小值点,np.greatest是极大值点 + L_sort = np.sort(signal_envelope_less) + L_Limit = L_sort[round(len(L_sort) * L_times)] # 设置最小阀值 + signal_ylb = signal_envelope.copy() + signal_ylb[signal_ylb <= L_Limit] = 0 # 低于阀值设置为0 + signal_ylb[0] = 0 # 首元素设置为0 + signal_ylb[len(signal_envelope) - 1] = 0 # 尾元素设置为0 + signal_ylb[signal_ylb > L_Limit] = 1 + signal_dt = np.diff(signal_ylb) + dt_s = [i for i in range(len(signal_dt)) if signal_dt[i] == 1] + dt_x = [j for j in range(len(signal_dt)) if signal_dt[j] == -1] + signal_ylb_SWPE = np.linspace(0, 0, len(dt_s)) # 应力波脉冲能量值(SWPE)赋初值0 + for i in range(len(dt_s)): + signal_ylb_SWPE[i] = sum(signal_envelope[dt_s[i] + 1:dt_x[i] + 1] - L_Limit) + ylb_SWPE = sum(signal_ylb_SWPE) + return ylb_SWPE + + +def calc_ylb_SWPA(signal, fs): + """ + :return ylb_SWPA: 最大峰高 + """ + f_min = 32000 # 低频截至频率 + f_max = 40000 # 高频截至频率 + L_times = 0.1 # 最小排序法时有效:人工设置极限阈值 + signal_filter_data = filter_wave(signal, 8, [f_min, f_max], "bandpass", fs) + signal_envelope = envelope_detection(signal_filter_data) + signal_envelope_less = signal_envelope[argrelextrema(signal_envelope, np.less)] + L_sort = np.sort(signal_envelope_less) + L_Limit = L_sort[round(len(L_sort) * L_times)] # 设置最小阀值 + signal_ylb = signal_envelope.copy() + signal_ylb[signal_ylb <= L_Limit] = 0 # 低于阀值设置为0 + signal_ylb[0] = 0 # 首元素设置为0 + signal_ylb[len(signal_envelope) - 1] = 0 # 尾元素设置为0 + signal_ylb[signal_ylb > L_Limit] = 1 + signal_dt = np.diff(signal_ylb) + dt_s = [i for i in range(len(signal_dt)) if signal_dt[i] == 1] + dt_x = [j for j in range(len(signal_dt)) if signal_dt[j] == -1] + signal_ylb_SWPA = np.linspace(0, 0, len(dt_s)) # 应力波脉冲能量峰值(SWPA)赋初值0 + for i in range(len(dt_s)): + signal_ylb_SWPA[i] = max(signal_envelope[dt_s[i]+1:dt_x[i]+1]) + ylb_SWPA = max(signal_ylb_SWPA) # 最大峰高 SWPA + return ylb_SWPA + + +def calc_fea_ylb(signal): + """ + 应力波传感器特征值计算 + :return ylb_SWE, ylb_SWPE, ylb_SWPA: 应力波能量 脉冲能量 最大峰高 + """ + fs_ylb = 131072 + f_min = 32000 # 低频截至频率 + f_max = 40000 # 高频截至频率 + L_times = 0.1 # 最小排序法时有效:人工设置极限阈值 + signal_filter_data = filter_wave(signal, 8, [f_min, f_max], "bandpass", fs_ylb) + signal_envelope = envelope_detection(signal_filter_data) + signal_envelope_less = signal_envelope[argrelextrema(signal_envelope, np.less)] + L_sort = np.sort(signal_envelope_less) + L_Limit = L_sort[round(len(L_sort) * L_times)] # 设置最小阀值 + signal_ylb = signal_envelope.copy() + signal_ylb[signal_ylb <= L_Limit] = 0 # 低于阀值设置为0 + signal_ylb[0] = 0 # 首元素设置为0 + signal_ylb[len(signal_envelope) - 1] = 0 # 尾元素设置为0 + signal_ylb[signal_ylb > L_Limit] = 1 + signal_dt = np.diff(signal_ylb) + dt_s = [i for i in range(len(signal_dt)) if signal_dt[i] == 1] + dt_x = [j for j in range(len(signal_dt)) if signal_dt[j] == -1] + signal_ylb_SWPA = np.linspace(0, 0, len(dt_s)) # 应力波脉冲能量峰值(SWPA)赋初值0 + signal_ylb_SWPE = np.linspace(0, 0, len(dt_s)) # 应力波脉冲能量值(SWPE)赋初值0 + for i in range(len(dt_s)): + signal_ylb_SWPA[i] = max(signal_envelope[dt_s[i]+1:dt_x[i]+1]) + signal_ylb_SWPE[i] = sum(signal_envelope[dt_s[i] + 1:dt_x[i] + 1] - L_Limit) + ylb_SWE = sum(signal_envelope[signal_envelope > 0]) + ylb_SWPE = sum(signal_ylb_SWPE) + ylb_SWPA = max(signal_ylb_SWPA) # 最大峰高 SWPA + return ylb_SWE, ylb_SWPE, ylb_SWPA + + +def calc_multiple_frequency(fft_data, ift, sp): + """ + 获取给定频率的幅值 + Parameters + ---------- + fft_data: 频谱数据 [频率, 幅值] + ift: 给定频率 + sp: 采样频率除以采样点数 + Returns + ------- + f_iX:i倍频幅值 + """ + try: + f_iX = max(fft_data[floor((ift - 0.5) / sp):ceil((ift + 0.5) / sp)]) + except ValueError: + f_iX = 0 + return f_iX + + +def calc_fea_HS(fft_data, fea, sp, f_st, f_ord): + """ + :param fft_data: 频谱数据 + :param fea: 特征值 + :param sp: 采样频率除以采样点数 + :param f_st: 起始阶次 + :param f_ord: 计算阶次 + :return: fea_HS: f_st~(f_st+f_ord-1)倍频幅值 + """ + fea_HS = [] + for i in range(f_st, f_st + f_ord): + fea_HS.append(calc_multiple_frequency(fft_data, i * fea, sp)) + return fea_HS + + +def calc_HS(fft_data, fea, sp, f_st, f_ord): + """ + 计算特征值的HS函数 + :param fft_data: 频谱数据 + :param fea: 特征值 + :param sp: 采样频率除以采样点数 + :param f_st: 起始阶次 + :param f_ord: 计算阶次 + :return: HS: + """ + mul = 0.707 + fea_HS = calc_fea_HS(fft_data, fea, sp, f_st, f_ord) + HS = sqrt(sum(array(fea_HS) ** 2)) * mul + return HS + + +def calc_HRS(fft_data, fea_min, fea_max, sp): + """ + :param fft_data: 频谱数据 + :param fea_min: 频率下限 + :param fea_max: 频率上限 + :param sp: 采样频率除以采样点数 + :return: HRS: 能量和 + """ + mul = 0.707 + fea_X = fft_data[floor((fea_min - 0.5) / sp):ceil((fea_max + 0.5) / sp)] + HRS = sqrt(sum(fea_X ** 2)) * mul + return HRS + + +def calc_fea_HCS_up(fft_data, fc, fb, sp, f_st, f_ord): + """ + 获取给定中心频率和边带的上边带能量和 + Parameters + ---------- + fft_data: 频谱数据 [频率, 幅值] + fc: 中心频率 + fb: 边带频率 + sp: 采样频率除以采样点数 + f_st: 起始阶次 + f_ord: 计算阶次 + Returns + ------- + HRS_up:f_st~(f_st+f_ord-1)倍上边带能量和 + """ + mul = 0.707 + HCS_up = [] + for i in range(f_st, f_st + f_ord): + xb = [] + for j in range(1, 6): + xb.append(calc_multiple_frequency(fft_data, i * fc + j * fb, sp)) + xb.append(calc_multiple_frequency(fft_data, i * fc, sp)) + HCS_up.append(sqrt(sum([x ** 2 for x in xb])) * mul) + return HCS_up + + +def calc_fea_HCS_low(fft_data, fc, fb, sp, f_st, f_ord): + """ + 获取给定中心频率和边带的下边带能量和 + Parameters + ---------- + fft_data: 频谱数据 [频率, 幅值] + fc: 中心频率 + fb: 边带频率 + sp: 采样频率除以采样点数 + f_st: 起始阶次 + f_ord: 计算阶次 + Returns + ------- + HRS_low:f_st~(f_st+f_ord-1)倍下边带能量和 + """ + mul = 0.707 + HCS_low = [] + for i in range(f_st, f_st + f_ord): + xb = [] + for j in range(1, 6): + xb.append(calc_multiple_frequency(fft_data, i * fc - j * fb, sp)) + xb.append(calc_multiple_frequency(fft_data, i * fc, sp)) + HCS_low.append(sqrt(sum([x ** 2 for x in xb])) * mul) + return HCS_low + + +def calc_fea_HDS(fft_data, fea, sp, f_st, f_ord): + """ + :param fft_data: 频谱数据 + :param fea: 特征值 + :param sp: 采样频率除以采样点数 + :param f_st: 起始阶次 + :param f_ord: 计算阶次 + :return: fea_HS: 1/f_st~1/(f_st+f_ord-1)倍频幅值 + """ + fea_HDS = [] + for i in range(f_st, f_st + f_ord): + fea_HDS.append(calc_multiple_frequency(fft_data, 1 / i * fea, sp)) + return fea_HDS + + +def calc_HDS(fft_data, fea, sp, f_st, f_ord): + """ + :param fft_data: 频谱数据 + :param fea: 特征值 + :param sp: 采样频率除以采样点数 + :param f_st: 起始阶次 + :param f_ord: 计算阶次 + :return: HDS: + """ + mul = 0.707 + fea_HDS = calc_fea_HDS(fft_data, fea, sp, f_st, f_ord) + HDS = sqrt(sum(array(fea_HDS) ** 2)) * mul + return HDS + + +def calc_fea_HCR(fft_data, cf, sf, sp, f_st, f_ord): + """ + 计算给定中心频率和边带频率的边带能量比 + Parameters + ---------- + fft_data: 频谱数据 [频率, 幅值] + cf: 中心频率 + sf: 边带频率 + sp: 采样频率除以采样点数 + f_st: 起始阶次 + f_ord: 计算阶次 + Returns + ------- + fea_HCR:f_st~(f_st+f_ord-1)倍能量比 + """ + fea_HCR = [] + for i in range(f_st, f_st + f_ord): + xb1 = [] + xb2 = [] + for j in range(1, 6): + xb1.append(calc_multiple_frequency(fft_data, i * cf - j * sf, sp)) + xb2.append(calc_multiple_frequency(fft_data, i * cf + j * sf, sp)) + try: + Hb1 = sum([x ** 2 for x in xb1]) # 下边带能量和 + except ValueError: + Hb1 = 0 + try: + Hb2 = sum([x ** 2 for x in xb2]) # 上边带能量和 + except ValueError: + Hb2 = 0 + xc = calc_multiple_frequency(fft_data, i * cf, sp) # 给定中心频率幅值 + if xc == 0: + H = 0 + else: + H = sqrt(Hb1 + Hb2) / xc + fea_HCR.append(H) + return fea_HCR + + +def calc_HCR(fft_data, cf, sf, sp, f_st, f_ord): + """ + 计算给定中心频率和边带频率的边带能量比 + Parameters + ---------- + fft_data: 频谱数据 [频率, 幅值] + cf: 中心频率 + sf: 边带频率 + sp: 采样频率除以采样点数 + f_st: 起始阶次 + f_ord: 计算阶次 + Returns + ------- + HCR:f_st~(f_st+f_ord-1)倍能量比的平方和开根号 + """ + fea_HCR = calc_fea_HCR(fft_data, cf, sf, sp, f_st, f_ord) + HCR = sqrt(sum(array(fea_HCR) ** 2)) + return HCR + +if __name__ == "__main__": + freq = 8192 + data = np.loadtxt(r"E:\Workspace\Software\Python\无线传感器算法\特征值计算\09f07f1800158d00-Y.csv", delimiter=',', usecols=(0)) + data_list = data.tolist() + data_list = data_list[1:] + + print("数据长度:", len(data_list)) + + #速度有效值 + speed_rms_value = calc_vel_pass_rms(data_list, freq) + print("速度有效值:", speed_rms_value) + # 速度峰值 + speed_vel_p = calc_vel_p(data_list, freq) + print("速度有峰值:", speed_vel_p) + # 加速度有效值 + acc_rms = calc_acc_rms(data_list, freq) + print("加速度有效值:", acc_rms) + # 加速度峰值 + acc_p = calc_acc_p(data_list, freq) + print("加速度峰值:", acc_p) diff --git a/mainwindow.py b/mainwindow.py index 58a298b..95a9ea3 100644 --- a/mainwindow.py +++ b/mainwindow.py @@ -7,13 +7,18 @@ import numpy as np from scipy.fft import fft, fftfreq import matplotlib.pyplot as plt from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas, NavigationToolbar2QT -from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QPushButton, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QFrame, QGridLayout, QStatusBar, QSizePolicy, QFileDialog +from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget,QMessageBox, QPushButton, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QFrame, QGridLayout, QStatusBar, QSizePolicy, QFileDialog from PyQt5.QtCore import Qt, QTimer, pyqtSlot, QByteArray from PyQt5.QtNetwork import QTcpSocket, QAbstractSocket import matplotlib import struct import time from typing import Tuple, Optional +from feauture_calculate import * +from SamplingDialog import * +import csv +import pandas as pd + # 启用高DPI支持 QApplication.setAttribute(Qt.AA_EnableHighDpiScaling) QApplication.setAttribute(Qt.AA_UseHighDpiPixmaps) @@ -27,10 +32,18 @@ plt.rcParams['axes.unicode_minus'] = False # 解决负号 '-' 显示问题 HEADER_MAGIC = bytes([0xAA, 0x55, 0xAA]) HEADER_SIZE = 6 # PackgeHead除去data字段的大小(3+1+1+1) + +class Eigenvalue: + def __init__(self, temp1, temp2, offset1, offset2): + self.temp1 = temp1 + self.temp2 = temp2 + self.offset1 = offset1 + self.offset2 = offset2 + class MatplotlibCanvas(FigureCanvas): """ 用于在 Qt 界面中嵌入 Matplotlib 绘图 """ def __init__(self, parent=None): - self.fig, self.axs = plt.subplots(2, 1, figsize=(10, 5)) # 2 个子图(时域 + 频域) + self.fig, self.axs = plt.subplots(2, 2, figsize=(10, 5)) # 2 个子图(时域 + 频域) self.fig.subplots_adjust(hspace=0.6) # 增大时域图和频域图的间距 super().__init__(self.fig) self.setParent(parent) @@ -38,60 +51,116 @@ class MatplotlibCanvas(FigureCanvas): def init_plot(self): """ 初始化默认图像(占位提示) """ - self.axs[0].set_title("时域信号", fontsize=12, fontweight='bold') - self.axs[0].set_xlabel("采样点") - self.axs[0].set_ylabel("幅度") - self.axs[0].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center', transform=self.axs[0].transAxes) + self.axs[0,0].set_title("加速度时域信号", fontsize=12, fontweight='bold') + self.axs[0,0].set_xlabel("采样点") + self.axs[0,0].set_ylabel("幅度(m/s2)") + self.axs[0,0].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center', transform=self.axs[0,0].transAxes) - self.axs[1].set_title("频域信号", fontsize=12, fontweight='bold') - self.axs[1].set_xlabel("频率 (Hz)") - self.axs[1].set_ylabel("幅度") - self.axs[1].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center', transform=self.axs[1].transAxes) + self.axs[0,1].set_title("加速度频域信号", fontsize=12, fontweight='bold') + self.axs[0,1].set_xlabel("频率 (Hz)") + self.axs[0,1].set_ylabel("幅度(m/s2)") + self.axs[0,1].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center', transform=self.axs[0,1].transAxes) + + self.axs[1, 0].set_title("速度时域信号", fontsize=12, fontweight='bold') + self.axs[1, 0].set_xlabel("采样点") + self.axs[1, 0].set_ylabel("幅度(mm/s)") + self.axs[1, 0].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center', + transform=self.axs[1, 0].transAxes) + + self.axs[1, 1].set_title("速度频域信号", fontsize=12, fontweight='bold') + self.axs[1, 1].set_xlabel("频率 (Hz)") + self.axs[1, 1].set_ylabel("幅度(mm/s)") + self.axs[1, 1].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center', + transform=self.axs[1, 1].transAxes) self.draw() # 更新绘图 - def plot_data(self, data): + def plot_data(self, data,sample): """ 绘制时域和频域图 """ - self.axs[0].clear() - self.axs[1].clear() + self.axs[0,0].clear() + self.axs[0,1].clear() # 时域信号 - self.axs[0].plot(data, color='blue') - self.axs[0].set_title("时域信号", fontsize=12, fontweight='bold') - self.axs[0].set_xlabel("采样点") - self.axs[0].set_ylabel("幅度") + self.axs[0,0].plot(data, color='blue') + self.axs[0,0].set_title("加速度时域信号", fontsize=12, fontweight='bold') + self.axs[0,0].set_xlabel("采样点") + self.axs[0,0].set_ylabel("幅度(m/s2)") # 频域信号 N = len(data) - T = 1.0 / 96000 # 假设采样率为 96000 Hz - yf = fft(data) + T = 1.0 / sample + yf = fft(data)[:N // 2] xf = fftfreq(N, T)[:N // 2] - self.axs[1].plot(xf, 2.0 / N * np.abs(yf[:N // 2]), color='red') - self.axs[1].set_title("频域信号", fontsize=12, fontweight='bold') - self.axs[1].set_xlabel("频率 (Hz)") - self.axs[1].set_ylabel("幅度") + # self.axs[0,1].plot(xf, 2.0 / N * np.abs(yf[:N // 2]), color='red') + + # yf = yf[:4000] + # xf = xf[:4000] + self.axs[0,1].plot(xf, 2.0 / N * np.abs(yf), lw=0.2, color='black') + self.axs[0,1].plot(xf, 2.0 / N * np.abs(yf), '.', lw=0.3, color='red') + + self.axs[0,1].set_title("加速度频域信号", fontsize=12, fontweight='bold') + self.axs[0,1].set_xlabel("频率 (Hz)") + self.axs[0,1].set_ylabel("幅度(m/s2)") self.draw() # 更新绘图 + def plot_data_vel(self, data,sample): + """ 绘制时域和频域图 """ + self.axs[1,0].clear() + self.axs[1,1].clear() + + # 时域信号 + self.axs[1,0].plot(data, color='blue') + self.axs[1,0].set_title("速度时域信号", fontsize=12, fontweight='bold') + self.axs[1,0].set_xlabel("采样点") + self.axs[1,0].set_ylabel("幅度(mm/s)") + + # 频域信号 + N = len(data) + T = 1.0 / sample + yf = fft(data)[:N // 2] + xf = fftfreq(N, T)[:N // 2] + + # self.axs[1,1].plot(xf, 2.0 / N * np.abs(yf[:N // 2]), color='red') + + yf = yf[:1000] + xf = xf[:1000] + self.axs[1,1].plot(xf, 2.0 / N * np.abs(yf), lw=0.2, color='black') + self.axs[1,1].plot(xf, 2.0 / N * np.abs(yf), '.', lw=0.3, color='red') + + self.axs[1,1].set_title("速度频域信号", fontsize=12, fontweight='bold') + self.axs[1,1].set_xlabel("频率 (Hz)") + self.axs[1,1].set_ylabel("幅度(mm/s)") + + self.draw() # 更新绘图 class SocketClientApp(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("Socket Client & Data Plotter") - self.setGeometry(100, 100, 700, 600) # 设置初始尺寸 + self.setGeometry(100, 100, 900, 700) # 设置初始尺寸 + self.rate_code = 4 + self.time_str = '1' + self.recv_state = '' + self.current_cmd = None + self.expected_length = 0 + self.partial_data = QByteArray() + self.scaled_data = '' + self.sampling_rate = 0 self.buffer = QByteArray() self.reconnect_timer = QTimer(self) self.reconnect_timer.timeout.connect(self.attempt_reconnect) self.reconnect_interval = 5000 - self.max_reconnect_attempts = 5 + self.max_reconnect_attempts = 10 self.current_reconnect_attempts = 0 self.socket = QTcpSocket(self) self.socket.readyRead.connect(self.on_ready_read) self.socket.disconnected.connect(self.on_socket_disconnected) self.socket.errorOccurred.connect(self.on_socket_error) + self.socket.connected.connect(self.on_socket_connected) # 组件初始化 self.ip_label = QLabel("IP 地址:") @@ -99,13 +168,23 @@ class SocketClientApp(QMainWindow): self.ip_input = QLineEdit(self) self.port_input = QLineEdit(self) self.connect_button = QPushButton("连接") - self.get_data_button = QPushButton("获取数据") + self.get_data_button = QPushButton("获取数据1") + self.get_data_button2 = QPushButton("获取数据2") + self.get_temp_button = QPushButton("温度、偏置") self.upgrade_button = QPushButton("更新固件") - self.upgrade_button_test = QPushButton("更新固件2") + self.mac_config_button = QPushButton("mac配置") + self.ipv4_config_button = QPushButton("网络配置") + self.calibration_config_button = QPushButton("校准") + self.get_version_button = QPushButton("版本") + self.upgrade_button_sampling = QPushButton("采样率/时间") + self.save_button_csv = QPushButton("存储到csv") self.get_data_button.setEnabled(False) # 初始状态不可点击 + self.get_data_button2.setEnabled(False) # 初始状态不可点击 self.upgrade_button.setEnabled(False) # 初始状态不可点击 - self.upgrade_button_test.setEnabled(False) # 初始状态不可点击 - self.ip_input.setText("192.168.0.200") + self.get_temp_button.setEnabled(False) + self.upgrade_button_sampling.setEnabled(False) + self.save_button_csv.setEnabled(False) + self.ip_input.setText("192.168.0.199") self.port_input.setText("12345") # 预留绘图区域 @@ -114,9 +193,14 @@ class SocketClientApp(QMainWindow): # 预留特征值显示区域 self.feature_label = QLabel("特征值:") - self.acceleration_label = QLabel("加速度: -") - self.velocity_label = QLabel("速度: -") - + self.acceleration_label_rms = QLabel("加速度有效值: -") + self.acceleration_label_pp = QLabel("加速度峰值: -") + self.velocity_label_rms = QLabel("速度有效值: -") + self.velocity_label_pp = QLabel("速度峰值: -") + self.temp1_label = QLabel("温度1: -") + self.temp2_label = QLabel("温度2:-") + self.offset1_label = QLabel("偏置电压1:-") + self.offset2_label = QLabel("偏置电压2:-") # 设置分割线 self.line = QFrame() self.line.setFrameShape(QFrame.HLine) @@ -137,8 +221,15 @@ class SocketClientApp(QMainWindow): button_layout = QHBoxLayout() button_layout.addWidget(self.connect_button) button_layout.addWidget(self.get_data_button) + button_layout.addWidget(self.get_data_button2) + button_layout.addWidget(self.get_temp_button) button_layout.addWidget(self.upgrade_button) - button_layout.addWidget(self.upgrade_button_test) + button_layout.addWidget(self.upgrade_button_sampling) + button_layout.addWidget(self.mac_config_button) + button_layout.addWidget(self.ipv4_config_button) + button_layout.addWidget(self.calibration_config_button) + button_layout.addWidget(self.get_version_button) + button_layout.addWidget(self.save_button_csv) main_layout.addLayout(button_layout) # 添加分割线 @@ -153,9 +244,17 @@ class SocketClientApp(QMainWindow): # 特征值显示区域 feature_layout = QGridLayout() + feature_layout.setVerticalSpacing(10) feature_layout.addWidget(self.feature_label, 0, 0) - feature_layout.addWidget(self.acceleration_label, 0, 1) - feature_layout.addWidget(self.velocity_label, 0, 2) + feature_layout.addWidget(self.acceleration_label_rms, 0, 1) + feature_layout.addWidget(self.acceleration_label_pp, 0, 2) + feature_layout.addWidget(self.velocity_label_rms, 1, 1) + feature_layout.addWidget(self.velocity_label_pp, 1, 2) + feature_layout.addWidget(self.temp1_label, 2, 1) + feature_layout.addWidget(self.temp2_label, 2, 2) + feature_layout.addWidget(self.offset1_label, 3, 1) + feature_layout.addWidget(self.offset2_label, 3, 2) + graph_layout.addLayout(feature_layout) main_layout.addLayout(graph_layout) @@ -178,9 +277,15 @@ class SocketClientApp(QMainWindow): # 事件绑定 self.connect_button.clicked.connect(self.connect_to_server) self.get_data_button.clicked.connect(self.on_button_clicked) + self.get_data_button2.clicked.connect(self.on_button_clicked2) self.upgrade_button.clicked.connect(self.on_button_upgrade) - self.upgrade_button_test.clicked.connect(self.on_button_upgrade_test) - + self.get_temp_button.clicked.connect(self.on_button_temp) + self.upgrade_button_sampling.clicked.connect(self.on_button_samping_set) + self.save_button_csv.clicked.connect(self.on_button_save_csv) + self.mac_config_button.clicked.connect(self.mac_config_dialog) + self.ipv4_config_button.clicked.connect(self.ipv4_config_dialog) + self.calibration_config_button.clicked.connect(self.calibration_dialog) + self.get_version_button.clicked.connect(self.on_button_get_version) # 设置布局策略,确保控件大小随窗口调整 self.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding) @@ -221,15 +326,33 @@ class SocketClientApp(QMainWindow): for byte in data: crc += byte return crc & 0xFF # 只保留最低字节 + def connect_to_server(self): + #self.process_wave_packet('') ip = self.ip_input.text() port = int(self.port_input.text()) self.socket.abort() self.socket.connectToHost(ip, port) + def on_socket_connected(self): + self.status_bar.showMessage("状态: 连接成功") + self.get_data_button.setEnabled(True) + self.get_data_button2.setEnabled(True) + self.upgrade_button.setEnabled(True) + self.get_temp_button.setEnabled(True) + self.upgrade_button_sampling.setEnabled(True) + self.save_button_csv.setEnabled(True) + def on_socket_disconnected(self): + print("on_socket_disconnected") self.status_bar.showMessage("状态: 连接断开,正在重连...") self.get_data_button.setEnabled(False) + self.get_data_button2.setEnabled(False) + self.upgrade_button.setEnabled(False) + self.get_temp_button.setEnabled(False) + self.upgrade_button_sampling.setEnabled(False) + self.save_button_csv.setEnabled(False) + self.current_reconnect_attempts = 0 self.reconnect_timer.start(self.reconnect_interval) def on_socket_error(self, error): @@ -238,13 +361,122 @@ class SocketClientApp(QMainWindow): def attempt_reconnect(self): if self.current_reconnect_attempts >= self.max_reconnect_attempts: self.reconnect_timer.stop() - self.status_bar.showMessage("状态: 重连失败,请检查网络") + self.status_bar.showMessage("状态: 多次重连失败") return + self.current_reconnect_attempts += 1 + print(f"第 {self.current_reconnect_attempts} 次尝试重连...") + + self.socket.abort() self.connect_to_server() + if self.socket.waitForConnected(1000): + self.status_bar.showMessage("状态: 重新连接成功") + self.get_data_button.setEnabled(True) + self.reconnect_timer.stop() + def on_button_save_csv(self): + file_path = QFileDialog.getExistingDirectory( + None, + "选择保存目录", + "", + QFileDialog.ShowDirsOnly + ) + + if not file_path: + return + file_path = os.path.join(file_path, "wave.csv") + try: + # 将 QByteArray 转换为字符串并写入文件 + # 写入 CSV 文件 + with open(file_path, 'w', newline='') as csvfile: + writer = csv.writer(csvfile) + for val in self.scaled_data: + writer.writerow([val]) + + print(f"已保存 {len(self.scaled_data)} 个 float 到 {file_path}") + + QMessageBox.information( + self, + "保存成功", + f"文件已成功保存到:\n{file_path}" + ) + except Exception as e: + QMessageBox.critical( + self, + "保存失败", + f"保存文件时出错:\n{str(e)}" + ) + def on_button_samping_set(self): + dialog = SamplingDialog(self) + if dialog.exec_() == QDialog.Accepted: + self.rate_code, self.time_str = dialog.get_values() def on_ready_read(self): - self.buffer += self.socket.readAll() + while self.socket.bytesAvailable(): + if self.recv_state == 'WAIT_HEADER': + self.start_time = time.time() + print(f"开始时间戳: {self.start_time}") + if self.socket.bytesAvailable() >= HEADER_SIZE: + self.header = self.socket.read(HEADER_SIZE) + if self.header[:3] != HEADER_MAGIC: + print("无效 header magic,跳过") + continue + self.cmd = self.header[3] + if self.cmd == 0x01: + self.recv_state = 'WAIT_WAVE_HEADER' + else: + print(f"未知或暂不处理的 cmd: {self.cmd}") + self.recv_state = 'WAIT_HEADER' + return + else: + return + elif self.recv_state == 'WAIT_WAVE_HEADER': + if self.socket.bytesAvailable() >= 5: + wave_header = self.socket.read(5) + self.sampling_rate = struct.unpack('= self.expected_length: + end_time = time.time() + execution_time = end_time - self.start_time + print(f"结束时间戳: {end_time}") + print(f"代码执行时间: {execution_time} 秒") + self.process_wave_packet(bytes(self.partial_data)) + self.recv_state = 'WAIT_HEADER' + self.partial_data.clear() + else: + if self.socket.bytesAvailable() >= HEADER_SIZE: + recv_data = self.socket.read(1340) + if recv_data[:3] != HEADER_MAGIC: + print("无效 header magic,跳过") + continue + cmd = recv_data[3] + if cmd == 0x0C: + body_format = '< H H H H' + body_size = struct.calcsize(body_format) + body_data = recv_data[HEADER_SIZE:HEADER_SIZE + body_size] + unpacked_data = struct.unpack(body_format, body_data) + value = Eigenvalue(*unpacked_data) + print(f"温度1:{value.temp1/1000},温度2:{value.temp2/1000},偏置电压1:{value.offset1/100},偏置电压2:{value.offset2/100}") + self.temp1_label.setText(f"温度1:{value.temp1/1000} v") + self.temp2_label.setText(f"温度2:{value.temp2 / 1000} v") + self.offset1_label.setText(f"偏置电压1:{value.offset1 / 100} v") + self.offset2_label.setText(f"偏置电压2:{value.offset2 / 100} v") + elif cmd == 0x0F: + body_format = '< B B' + body_size = struct.calcsize(body_format) + body_data = recv_data[HEADER_SIZE:HEADER_SIZE + body_size] + unpacked_data = struct.unpack(body_format, body_data) + print(f"version{unpacked_data}") + QMessageBox.information(self, "版本", f"{unpacked_data[0]}.{unpacked_data[1]}") + def receive_data(self, length: int): while len(self.buffer) < length: @@ -274,75 +506,89 @@ class SocketClientApp(QMainWindow): 'version': version, 'result_code': result_code }, data[HEADER_SIZE:] + def process_wave_packet(self,wave_data): + data = wave_data # 接收所有数据 + data = np.frombuffer(data, dtype=np.int32) # 根据实际数据格式转换 + for i in range(min(100, len(data))): # 确保不超过数据长度 + print(f"{data[i]:1f}", end=" ") + print() # 换行 + LSB_32BIT = (2.8 / (2 ** 31)) * ((750 + 287) / 287) * 1000/10.2 + # LSB_32BIT = (2.8 / (2 ** 31)) * ((750 + 287) / 287) * 1000 + self.scaled_data = data * LSB_32BIT + # for i in range(min(100, len(self.scaled_data))): # 确保不超过数据长度 + # print(f"{self.scaled_data[i]:2f}", end=" ") + # print() # 换行 + # result = self.get_extremes(self.scaled_data) + # print("最大的10个值:", result['top10_max']) + # print("最小的10个值:", result['top10_min']) + # mean_max = self.mean_without_max_optimized(result['top10_max']) + # print(f"top10_max 去除最大的数据后的平均值1:{mean_max}") + # mean_min = self.mean_without_min_optimized(result['top10_min']) + # print(f"top10_min 去除最大的数据后的平均值2:{mean_min}") + # print(f"pp :{mean_max - mean_min}") + print(f"采样率: {self.sampling_rate}") + # self.sampling_rate = 48000 # 假设采样率为 48000 Hz + self.canvas.plot_data(self.scaled_data,self.sampling_rate) # 在 Qt 界面中绘图 + data_filter = fft_filter(self.scaled_data, 10, 1000,self.sampling_rate) + data_vel1,data_vel2 = acc2dis(data_filter,self.sampling_rate) + self.canvas.plot_data_vel(data_vel1,self.sampling_rate) # 在 Qt 界面中绘图 + self.status_bar.showMessage("状态: 数据绘制完成") + # 速度有效值 - def process_packet(self): - """ 循环接收数据,直到接收完整 """ - try: - # 1. 接收包头 - start_time = time.time() - header_data = self.receive_data(6) - header, _ = self.parse_package_head(header_data) - if not header: - print("Invalid package header") - return None - - # 2. 根据命令类型处理数据 - if header['cmd'] == 0x01: - # 先接收WaveDataRsp的固定部分(5字节) - wave_header = self.receive_data(5) - channel_id = wave_header[0] - sampling_rate = struct.unpack('