添加版本获取,校准等

This commit is contained in:
zhangsheng 2025-05-26 20:31:47 +08:00
parent 06b8edf26f
commit 9b5792329d
3 changed files with 1729 additions and 101 deletions

589
SamplingDialog.py Normal file
View File

@ -0,0 +1,589 @@
from PyQt5.QtWidgets import (
QApplication, QDialog, QLabel, QLineEdit, QPushButton,QMessageBox,
QHBoxLayout, QVBoxLayout, QMainWindow, QWidget, QRadioButton, QButtonGroup,QGridLayout,QFormLayout
)
from PyQt5.QtGui import QRegExpValidator
from PyQt5.QtCore import QRegExp,QByteArray
import struct
from PyQt5.QtNetwork import QTcpSocket
import numpy as np
from numpy.ma.core import masked
from scipy.optimize import leastsq
import struct
HEADER_MAGIC = bytes([0xAA, 0x55, 0xAA])
HEADER_SIZE = 6 # PackgeHead除去data字段的大小(3+1+1+1)
class SamplingDialog(QDialog):
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowTitle("采样参数设置")
self.rate_combo = QComboBox()
self.rate_combo.addItems([
"8000", "16000", "24000", "32000", "48000", "96000"
])
self.time_input = QLineEdit()
self.time_input.setPlaceholderText("输入采样时间(秒)")
self.ok_button = QPushButton("确定")
self.ok_button.clicked.connect(self.accept)
form_layout = QFormLayout()
form_layout.addRow("采样率Hz", self.rate_combo)
form_layout.addRow("采样时间(秒)", self.time_input)
layout = QVBoxLayout()
layout.addLayout(form_layout)
layout.addWidget(self.ok_button)
self.setLayout(layout)
def get_values(self):
# 返回采样率编号(整数)和采样时间(字符串或整数)
rate_code_map = {
"8000": 0x00,
"16000": 0x01,
"24000": 0x02,
"32000": 0x03,
"48000": 0x04,
"96000": 0x05
}
rate_str = self.rate_combo.currentText()
rate_code = rate_code_map.get(rate_str, 0x00)
return rate_code, self.time_input.text()
class PackageHead:
@staticmethod
def parse(data: bytes):
if len(data) < 6:
return None
head = data[0:3]
if head != b'\xAA\x55\xAA':
return None
cmd, ver, result_code = data[3:6]
return {
'cmd': cmd,
'version': ver,
'result_code': result_code,
'data': data[6:]
}
class GetCoe:
def __init__(self, temp_ch1_1, temp_ch1_2, dc_ch1_1, dc_ch1_2,ac_ch1_1,ac_ch1_2,temp_ch2_1,temp_ch2_2,dc_ch2_1,dc_ch2_2,
ac_ch2_1, ac_ch2_2):
self.temp_ch1_1 = temp_ch1_1
self.temp_ch1_2 = temp_ch1_2
self.dc_ch1_1 = dc_ch1_1
self.dc_ch1_2 = dc_ch1_2
self.ac_ch1_1 = ac_ch1_1
self.ac_ch1_2 = ac_ch1_2
self.temp_ch2_1 = temp_ch2_1
self.temp_ch2_2 = temp_ch2_2
self.dc_ch2_1 = dc_ch2_1
self.dc_ch2_2 = dc_ch2_2
self.ac_ch2_1 = ac_ch2_1
self.ac_ch2_2 = ac_ch2_2
class Eigenvalue:
def __init__(self, temp1, temp2, offset1, offset2):
self.temp1 = temp1
self.temp2 = temp2
self.offset1 = offset1
self.offset2 = offset2
class MacConfigDialog(QDialog):
def __init__(self, socket: QTcpSocket, parent=None):
super().__init__(parent)
self.setWindowTitle("配置 MAC")
self.socket = socket
self.setFixedWidth(300)
self.mac_input1 = QLineEdit()
self.mac_input2 = QLineEdit()
hex_reg = QRegExp("[0-9A-Fa-f]{1,2}")
validator = QRegExpValidator(hex_reg)
self.mac_input1.setValidator(validator)
self.mac_input2.setValidator(validator)
self.save_btn = QPushButton("保存")
self.cancel_btn = QPushButton("取消")
mac_line = QHBoxLayout()
mac_line.addWidget(QLabel("MAC 地址: 50:29:4D:20"))
mac_line.addWidget(self.mac_input1)
mac_line.addWidget(QLabel(":"))
mac_line.addWidget(self.mac_input2)
btn_line = QHBoxLayout()
btn_line.addWidget(self.save_btn)
btn_line.addWidget(self.cancel_btn)
main_layout = QVBoxLayout()
main_layout.addLayout(mac_line)
main_layout.addLayout(btn_line)
self.setLayout(main_layout)
self.save_btn.clicked.connect(self.send_mac_config)
self.cancel_btn.clicked.connect(self.reject)
def send_mac_config(self):
try:
mac1 = int(self.mac_input1.text(), 16)
mac2 = int(self.mac_input2.text(), 16)
except ValueError:
QMessageBox.warning(self, "输入错误", "请输入合法的十六进制数")
return
packet = bytes([
0xAA, 0x55, 0xAA,
0x0D, # cmd
0x01, # version
0x00 # result_code (发送时为0)
]) + struct.pack('2B', mac1, mac2)
self.socket.write(packet)
if not self.socket.waitForBytesWritten(1000):
QMessageBox.critical(self, "错误", "发送失败")
return
if not self.socket.waitForReadyRead(3000):
QMessageBox.critical(self, "超时", "未收到回应")
return
data = bytes(self.socket.readAll())
pkg = PackageHead.parse(data)
if not pkg:
QMessageBox.critical(self, "错误", "返回数据格式无效")
return
if pkg['result_code'] == 0x00:
QMessageBox.information(self, "成功", "配置成功")
self.accept()
else:
QMessageBox.warning(self, "失败", f"错误码: {pkg['result_code']}")
class IPConfigDialog(QDialog):
def __init__(self, socket: QTcpSocket, parent=None):
super().__init__(parent)
self.setWindowTitle("配置 IP")
self.socket = socket
self.setFixedWidth(300)
self.setFixedHeight(150)
self.dhcp_radio = QRadioButton("DHCP")
self.static_radio = QRadioButton("Static")
self.dhcp_radio.setChecked(True)
radio_layout = QHBoxLayout()
radio_layout.addWidget(self.dhcp_radio)
radio_layout.addWidget(self.static_radio)
self.radio_group = QButtonGroup()
self.radio_group.addButton(self.dhcp_radio)
self.radio_group.addButton(self.static_radio)
# IPv4地址输入框及标签
self.ip_input = QLineEdit()
self.mask_input = QLineEdit()
self.gw_input = QLineEdit()
# IP地址校验器限制0-255的IPv4格式
ip_regex = QRegExp(r"^(25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)"
r"\.(25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)"
r"\.(25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)"
r"\.(25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)$")
ip_validator = QRegExpValidator(ip_regex)
self.ip_input.setValidator(ip_validator)
self.mask_input.setValidator(ip_validator)
self.gw_input.setValidator(ip_validator)
form_layout = QFormLayout()
form_layout.setLabelAlignment(Qt.AlignRight)
form_layout.addRow(QLabel("IP 地址:"), self.ip_input)
form_layout.addRow(QLabel("子网掩码:"), self.mask_input)
form_layout.addRow(QLabel("网关:"), self.gw_input)
self.save_btn = QPushButton("保存")
self.cancel_btn = QPushButton("取消")
btn_line = QHBoxLayout()
btn_line.addWidget(self.save_btn)
btn_line.addWidget(self.cancel_btn)
main_layout = QVBoxLayout()
main_layout.addLayout(radio_layout)
main_layout.addLayout(form_layout)
main_layout.addLayout(btn_line)
self.setLayout(main_layout)
self.save_btn.clicked.connect(self.send_ip_config)
self.cancel_btn.clicked.connect(self.reject)
# DHCP模式下禁用IP等输入
self.dhcp_radio.toggled.connect(self.update_input_enabled)
self.update_input_enabled()
def update_input_enabled(self):
enabled = self.static_radio.isChecked()
self.ip_input.setEnabled(enabled)
self.mask_input.setEnabled(enabled)
self.gw_input.setEnabled(enabled)
def send_ip_config(self):
mode = 0 if self.dhcp_radio.isChecked() else 1
def parse_ip(ip_str):
parts = ip_str.strip().split('.')
if len(parts) != 4:
raise ValueError()
return [int(p) for p in parts]
if mode == 0:
packet = bytes([
0xAA, 0x55, 0xAA,
0x0E, # cmd
0x01, # version
0x00 # result_code (发送时为0)
]) + struct.pack('B', mode)
else:
try:
ip = parse_ip(self.ip_input.text())
mask = parse_ip(self.mask_input.text())
gw = parse_ip(self.gw_input.text())
except ValueError:
QMessageBox.warning(self, "输入错误", "请输入合法的 IPv4 地址格式")
return
packet = bytes([
0xAA, 0x55, 0xAA,
0x0E, # cmd
0x01, # version
0x00 # result_code (发送时为0)
]) + struct.pack('B', mode) + bytes(ip) + bytes(mask) + bytes(gw)
self.socket.write(packet)
if not self.socket.waitForBytesWritten(1000):
QMessageBox.critical(self, "错误", "发送失败")
return
if not self.socket.waitForReadyRead(3000):
QMessageBox.critical(self, "超时", "未收到回应")
return
data = bytes(self.socket.readAll())
pkg = PackageHead.parse(data)
if not pkg:
QMessageBox.critical(self, "错误", "返回数据格式无效")
return
if pkg['result_code'] == 0x00:
QMessageBox.information(self, "成功", "配置成功")
self.accept()
else:
QMessageBox.warning(self, "失败", f"错误码: {pkg['result_code']}")
from PyQt5.QtWidgets import (
QDialog, QVBoxLayout, QHBoxLayout, QLabel, QPushButton, QComboBox,
QGridLayout, QApplication
)
from PyQt5.QtCore import Qt
class CalibrationDialog(QDialog):
def __init__(self, socket: QTcpSocket, parent=None):
super().__init__(parent)
self.setWindowTitle("校准设置")
self.socket = socket
self.type_index = 0
# 通道与类型选择
self.partial_data = QByteArray()
self.recv_state = ''
self.channel_cb = QComboBox()
self.channel_cb.addItems(["通道1", "通道2"])
self.type_cb = QComboBox()
self.type_cb.addItems(["温度", "电压DC", "电压AC"])
self.type_cb.currentTextChanged.connect(self.update_voltage_points)
top_layout = QHBoxLayout()
top_layout.addWidget(QLabel("通道:"))
top_layout.addWidget(self.channel_cb)
top_layout.addWidget(QLabel("类型:"))
top_layout.addWidget(self.type_cb)
# 电压点信息
self.voltage_labels = []
self.value_labels = []
self.get_buttons = []
self.voltage_points = []
grid = QGridLayout()
grid.addWidget(QLabel("电压点"), 0, 0)
grid.addWidget(QLabel("校准值"), 0, 1)
grid.addWidget(QLabel("操作按钮"), 0, 2)
for i in range(3):
v_label = QLabel()
self.voltage_labels.append(v_label)
val_label = QLabel("未获取")
self.value_labels.append(val_label)
btn = QPushButton("获取")
btn.clicked.connect(lambda _, index=i: self.fetch_voltage(index))
self.get_buttons.append(btn)
grid.addWidget(v_label, i+1, 0)
grid.addWidget(val_label, i+1, 1)
grid.addWidget(btn, i+1, 2)
self.update_voltage_points()
# 初始化与校准按钮
self.init_btn = QPushButton("初始化")
self.calibrate_btn = QPushButton("校准")
self.get_coe_btn = QPushButton("获取系数")
self.init_btn.clicked.connect(self.initialize_calibration)
self.calibrate_btn.clicked.connect(self.calibrate)
self.get_coe_btn.clicked.connect(self.get_coe)
self.socket.readyRead.connect(self.on_ready_read)
btn_layout = QHBoxLayout()
btn_layout.addStretch()
btn_layout.addWidget(self.init_btn)
btn_layout.addWidget(self.calibrate_btn)
btn_layout.addWidget(self.get_coe_btn)
main_layout = QVBoxLayout()
main_layout.addLayout(top_layout)
main_layout.addLayout(grid)
main_layout.addLayout(btn_layout)
self.setLayout(main_layout)
def update_voltage_points(self):
type_text = self.type_cb.currentText()
if type_text == "温度":
self.voltage_points = [0.5, 1.5, 2.5]
elif type_text == "电压DC":
self.voltage_points = [1, 8, 15]
else:
self.voltage_points = [1, 5, 9]
for i, v in enumerate(self.voltage_points):
self.voltage_labels[i].setText(f"{v:.1f}V")
self.value_labels[i].setText("未获取")
def fetch_voltage(self, index):
# 模拟读取值
# import random
# val = round(self.voltage_points[index] + random.uniform(-0.05, 0.05), 3)
# self.value_labels[index].setText(str(val))
self.type_index = index
if self.type_cb.currentIndex() != 2:
self.get_temp_dc()
else:
self.get_wave()
def initialize_calibration(self):
for label in self.value_labels:
label.setText("未获取")
def get_coe(self):
try:
self.socket.write(bytes(
[0xAA, 0x55, 0xAA, 0x10, 0x01, 0x00])) # 发送数据
self.socket.waitForReadyRead()
except Exception as e:
self.status_bar.showMessage(f"状态: 错误 - {str(e)}")
def calibrate(self):
try:
self.recv_state = ''
x = self.voltage_points
y = [float(label.text()) for label in self.value_labels]
if any(label.text() == "未获取" for label in self.value_labels):
raise ValueError("未获取所有电压点数据")
except ValueError:
from PyQt5.QtWidgets import QMessageBox
QMessageBox.warning(self, "错误", "请先获取所有电压点的校准值")
return
# 非线性最小二乘拟合 y = a*x + b
def func(p, x):
a, b = p
return a * x + b
def error(p, x, y):
return func(p, x) - y
p0 = [1, 0]
plsq = leastsq(error, p0, args=(np.array(x), np.array(y)))
a, b = plsq[0]
print(f"拟合结果: a = {a:.4f}, b = {b:.4f}")
# 获取通道号和类型索引
ch = self.channel_cb.currentIndex() + 1 # 通道1 对应 1
group = self.type_cb.currentIndex() # 0=temp, 1=dc, 2=ac
print(f"ch:{ch},group:{group}")
header = bytes([0xAA, 0x55, 0xAA, 0x11,0x01,0x00]) # 可根据实际协议修改
payload = struct.pack('<BBff', ch, group, a, b)
packet = header + payload
# 发送给 MCU
self.socket.write(packet)
self.socket.waitForReadyRead()
def get_extremes(self,data):
"""获取最大和最小的10个值排序法"""
sorted_data = np.sort(data)
return {
'top10_max': sorted_data[-10:][::-1].tolist(), # 降序排列
'top10_min': sorted_data[:10].tolist()
}
def mean_without_max_optimized(self,data):
"""优化内存使用的版本"""
arr = np.array(data)
if len(arr) < 2:
return np.nan
max_val = np.max(arr)
# 计算总和和计数时直接排除最大值
total = np.sum(arr) - max_val
count = len(arr) - 1
return total / count
def mean_without_min_optimized(self, data):
"""优化内存使用的版本"""
arr = np.array(data)
if len(arr) < 2:
return np.nan
min_val = np.min(arr)
# 计算总和和计数时直接排除最大值
total = np.sum(arr) - min_val
count = len(arr) - 1
return total / count
def get_temp_dc(self):
try:
self.recv_state = ''
self.socket.write(bytes(
[0xAA, 0x55, 0xAA, 0x0B, 0x01, 0x00])) # 发送数据
self.socket.waitForReadyRead()
except Exception as e:
QMessageBox.warning(self, "错误",f"{str(e)}")
def get_wave(self):
try:
self.recv_state = 'WAIT_HEADER'
rate_bytes = struct.pack('<I', 4)
time_bytes = struct.pack('<I', 1)
ch = self.channel_cb.currentIndex() + 1
packet = bytes([
0xAA, 0x55, 0xAA,
0x01, 0x01, 0x00,
ch
]) + rate_bytes + time_bytes
self.socket.write(packet)
print(packet)
self.socket.waitForReadyRead()
except Exception as e:
QMessageBox.warning(self, "错误",f"{str(e)}")
def on_ready_read(self):
while self.socket.bytesAvailable():
if self.recv_state == 'WAIT_HEADER':
if self.socket.bytesAvailable() >= HEADER_SIZE:
self.header = self.socket.read(HEADER_SIZE)
if self.header[:3] != HEADER_MAGIC:
print("无效 header magic跳过")
continue
self.cmd = self.header[3]
if self.cmd == 0x01:
self.recv_state = 'WAIT_WAVE_HEADER'
else:
print(f"未知或暂不处理的 cmd: {self.cmd}")
self.recv_state = 'WAIT_HEADER'
return
else:
return
elif self.recv_state == 'WAIT_WAVE_HEADER':
if self.socket.bytesAvailable() >= 5:
wave_header = self.socket.read(5)
self.sampling_rate = struct.unpack('<i', wave_header[1:5])[0]
self.expected_length = self.sampling_rate * 4
self.partial_data.clear()
self.recv_state = 'WAIT_WAVE_DATA'
else:
return
elif self.recv_state == 'WAIT_WAVE_DATA':
bytes_needed = self.expected_length - len(self.partial_data)
chunk = self.socket.read(min(self.socket.bytesAvailable(), bytes_needed))
self.partial_data += chunk
if len(self.partial_data) >= self.expected_length:
self.process_wave_packet(bytes(self.partial_data))
self.recv_state = 'WAIT_HEADER'
self.partial_data.clear()
else:
if self.socket.bytesAvailable() >= HEADER_SIZE:
recv_data = self.socket.read(1340)
if recv_data[:3] != HEADER_MAGIC:
print("无效 header magic跳过")
continue
cmd = recv_data[3]
if cmd == 0x0C:
body_format = '< H H H H'
body_size = struct.calcsize(body_format)
body_data = recv_data[HEADER_SIZE:HEADER_SIZE + body_size]
unpacked_data = struct.unpack(body_format, body_data)
value = Eigenvalue(*unpacked_data)
print(f"温度1{value.temp1/1000},温度2{value.temp2/1000},偏置电压1{value.offset1/100},偏置电压2{value.offset2/100}")
if self.channel_cb.currentIndex() == 0 and self.type_cb.currentIndex() == 0:
self.value_labels[self.type_index].setText(str(value.temp1/1000))
elif self.channel_cb.currentIndex() == 0 and self.type_cb.currentIndex() == 1:
self.value_labels[self.type_index].setText(str(value.offset1 / 100))
elif self.channel_cb.currentIndex() == 1 and self.type_cb.currentIndex() == 0:
self.value_labels[self.type_index].setText(str(value.temp2/1000))
elif self.channel_cb.currentIndex() == 1 and self.type_cb.currentIndex() == 1:
self.value_labels[self.type_index].setText(str(value.offset2 / 100))
elif cmd == 0x11:
pkg = PackageHead.parse(recv_data)
if not pkg:
QMessageBox.critical(self, "错误", "返回数据格式无效")
return
if pkg['result_code'] == 0x00:
QMessageBox.information(self, "成功", "配置成功")
return
elif cmd == 0x10:
body_format = '< f f f f f f f f f f f f'
body_size = struct.calcsize(body_format)
body_data = recv_data[HEADER_SIZE:HEADER_SIZE + body_size]
unpacked_data = struct.unpack(body_format, body_data)
value = GetCoe(*unpacked_data)
print(
f"{value.temp_ch1_1, value.temp_ch1_2, value.dc_ch1_1, value.dc_ch1_2, value.ac_ch1_1, value.ac_ch1_2}")
print(
f"{value.temp_ch2_1, value.temp_ch2_2, value.dc_ch2_1, value.dc_ch2_2, value.ac_ch2_1, value.ac_ch2_2}")
def process_wave_packet(self,wave_data):
data = wave_data # 接收所有数据
data = np.frombuffer(data, dtype=np.int32) # 根据实际数据格式转换
LSB_32BIT = (2.8 / (2 ** 31)) * ((750 + 287) / 287) * 1000/10.2
self.scaled_data = data * LSB_32BIT
result = self.get_extremes(self.scaled_data)
print("最大的10个值:", result['top10_max'])
print("最小的10个值:", result['top10_min'])
mean_max = self.mean_without_max_optimized(result['top10_max'])
print(f"top10_max 去除最大的数据后的平均值1{mean_max}")
mean_min = self.mean_without_min_optimized(result['top10_min'])
print(f"top10_min 去除最大的数据后的平均值2{mean_min}")
pp = mean_max - mean_min
if self.type_cb.currentIndex() == 2:
self.value_labels[self.type_index].setText(str(pp))
print(f"pp :{mean_max - mean_min}")
def closeEvent(self, event):
try:
print("closeEvent")
self.socket.readyRead.disconnect(self.on_ready_read)
except TypeError:
pass
super().closeEvent(event)

754
feauture_calculate.py Normal file
View File

@ -0,0 +1,754 @@
from math import floor, ceil
import numpy as np
from numpy import real, sqrt, mean, array
from scipy import fftpack
# 频域滤波
from scipy.signal import butter, sosfilt, hilbert, argrelextrema
def fft_spectrum(signal: np.ndarray, fs: float):
"""
傅里叶变换频谱
:param signal:np.ndarray 时序振动信号
:param fs:float 采样频率
:return:
f:np.ndarray, 频率
y:np.ndarray, 幅值
"""
n = len(signal)
f = fs * np.arange(n // 2) / n
fft_data = abs(np.fft.fft(signal))[:int(n / 2)] * 2 / n # 归一化处理双边频谱反转后需要×2
return f, fft_data
def fft_filter(signal, lpf1, lpf2, fs):
"""
signal: np.ndarray 原始振动信号
fs: float 采样频率
lpf1: float 滤波频率-低频
lpf2: float 滤波频率-高频
:return
y: np.ndarray 滤波信号
"""
yy = fftpack.fft(signal)
m = len(yy)
k = m / fs
for i in range(0, floor(k * lpf1)):
yy[i] = 0
for i in range(ceil(k * lpf2 - 1), m):
yy[i] = 0
y = 2 * real(fftpack.ifft(yy))
return y
# 高通滤波
def fft_filter_high_pass(signal, lpf_high, fs):
"""
signal: np.ndarray 原始振动信号
fs: float 采样频率
lpf_high: float 滤波频率
:return
y: np.ndarray 滤波信号
"""
yy = fftpack.fft(signal)
m = len(yy)
k = m / fs
for i in range(0, floor(k * lpf_high) + 1):
yy[i] = 0
for i in range(ceil(k * (fs - lpf_high) - 1), m):
yy[i] = 0
y = real(fftpack.ifft(yy))
return y
# 低通滤波
def fft_filter_low_pass(signal, lpf_low, fs):
"""
signal: np.ndarray 原始振动信号
fs: float 采样频率
lpf_low: float 滤波频率
:return
y: np.ndarray 滤波信号
"""
yy = fftpack.fft(signal)
m = len(yy)
k = m / fs
for i in range(ceil(k * lpf_low - 1), m):
yy[i] = 0
y = 2 * real(fftpack.ifft(yy))
return y
# 希尔伯特变换
def hilbert_envelop(signal):
"""
输入原始信号输出包络时域信号
"""
hx = hilbert(signal) # 对信号进行希尔伯特变换。
analytic_signal = signal - hx * 1j # 进行hilbert变换后的解析信号
return np.abs(analytic_signal)
def acc2dis(data: np.ndarray, fs: float):
"""
采用时域积分的方式将振动加速度信号转化为速度信号和位移信号
Parameters
----------
data: np.ndarray, 振动加速度信号
fs: float, 采样频率
Return
------
s_ifft: np.ndarray, 积分速度信号
d_ifftnp.ndarray, 积分位移信号
"""
n = len(data)
a_mul_dt = data / fs
s = []
total = a_mul_dt[0]
for i in range(n - 1):
total = total + a_mul_dt[i + 1]
s.append(total)
s_fft = np.fft.fft(s)
s_fft[:2] = 0 # 去除直流分量
s_fft[-3:] = 0 # 去除直流分量
s_ifft = np.real(np.fft.ifft(s_fft))
s_mut_dt = s_ifft / fs
d = []
total = s_mut_dt[0]
for i in range(n - 2):
total = total + s_mut_dt[i + 1]
d.append(total)
d_fft = np.fft.fft(d)
d_fft[:2] = 0
d_fft[-3:] = 0
d_ifft = np.real(np.fft.ifft(d_fft))
return s_ifft * 1000, d_ifft * 1000000 # 单位转换
def calc_vel_pass_rms(signal, fs):
"""
signal: 振动加速度信号
fs: 采样频率
:return: 通频速度有效值10-1000Hz
"""
#x1 = fft_filter(signal, 10, 1000, fs)
x2, x3 = acc2dis(signal, fs)
#x4 = fft_filter(x2, 10, 1000, fs)
vel_pass_rms = np.sqrt(np.mean(x2 ** 2))
return vel_pass_rms
# 低频速度有效值3-1000Hz
def calc_vel_low_rms(signal, fs):
"""""
signal: 振动加速度信号
fs: 采样频率
Return
v_rms: 速度有效值
"""""
x1 = fft_filter(signal, 3, 1000, fs)
x2, x3 = acc2dis(x1, fs)
vel_low_rms = np.sqrt(np.mean(x2 ** 2))
return vel_low_rms
# 加速度有效值3-10KHz
def calc_acc_rms(signal, fs):
"""""
signal: 振动加速度信号
fs: 采样频率
Return
a_rms: 加速度有效值
"""""
#x1 = fft_filter(signal, 3,10000, fs)
acc_rms = np.sqrt(np.mean(signal ** 2))
return acc_rms
# 加速度峰值3-10KHz
def calc_acc_p(signal, fs):
"""""
signal: 振动加速度信号
fs: 采样频率
Return
a_p: 加速度有效值
"""""
#x1 = fft_filter(signal, 3,10000, fs)
x2 = sorted(abs(signal), reverse=True)
x3 = x2[0:100]
acc_p = np.mean(x3)
return acc_p
# 振动冲击值5K~10KHz
def calc_vibration_impulse(signal, fs):
"""""
signal: 振动加速度信号
fs: 采样频率
Return
impulse: 振动冲击值
"""""
x1 = fft_filter(signal, 5000, 10000, fs)
x2 = hilbert_envelop(x1)
x3 = fft_filter(x2, 3, 500, fs)
impulse = np.sqrt(np.mean(x3 ** 2))
return impulse
# 加速度峭度指标(3~10KHz)
def calc_acc_kurtosis(signal, fs):
"""""
signal: 振动加速度信号
fs: 采样频率
Return
acc_k: 峭度指标
"""""
x1 = fft_filter(signal, 3, 10000, fs)
K = sum(x1 ** 4) / len(x1)
a = sqrt(sum(x1 ** 2) / len(x1))
acc_kurtosis = K / (a ** 4)
return acc_kurtosis
# 加速度歪度指标(3-10KHz)
def calc_acc_skew(signal, fs):
"""""
signal: 振动加速度信号
fs: 采样频率
Return
acc_s: 歪度指标
"""""
x1 = fft_filter(signal, 3, 10000, fs)
S = sum(x1 ** 3) / len(x1)
a = sqrt(sum(x1 ** 2) / len(x1))
acc_skew = S / (a ** 3)
return acc_skew
# 速度峰值(3-1KHz)
def calc_vel_p(signal, fs):
"""""
signal: 振动加速度信号
fs: 采样频率
Return
vel_p: 速度峰值
"""""
#x1 = fft_filter(signal, 10, 1000, fs)
x2, x3 = acc2dis(signal, fs)
x4 = sorted(abs(x2), reverse=True)
x5 = x4[0:100]
vel_p = np.mean(x5)
return vel_p
def calc_fea_acc(signal, fs):
"""
加速度传感器特征值计算
"""
# 通频速度有效值
vel_pass_rms = calc_vel_pass_rms(signal, fs)
# 低频速度有效值
vel_low_rms = calc_vel_low_rms(signal, fs)
# 加速度有效值
acc_rms = calc_acc_rms(signal, fs)
# 加速度峰值
acc_p = calc_acc_p(signal, fs)
# 振动冲击值
vibration_impulse = calc_vibration_impulse(signal, fs)
# 加速度峭度指标
acc_kurtosis = calc_acc_kurtosis(signal, fs)
# 加速度歪度指标
acc_skew = calc_acc_skew(signal, fs)
# 速度峰值
vel_p = calc_vel_p(signal, fs)
return vel_pass_rms, vel_low_rms, acc_rms, acc_p, vibration_impulse, acc_kurtosis, acc_skew, vel_p
# 最大正向峰值(5-500Hz)
def calc_max_positive_p(signal, fs):
"""""
signal: 振动位移信号
fs: 采样频率
Return
max_positive_p: 最大正向峰值
"""""
x1 = fft_filter(signal, 5, 500, fs)
x2 = []
for i in range(1, len(x1)):
if x1[i - 1] > 0:
k = x1[i - 1]
x2.append(k)
max_positive_p = abs(max(x2) - np.mean(x1))
return max_positive_p
# 最大负向峰值(5-500Hz)
def calc_max_negative_p(signal, fs):
"""""
signal: 振动位移信号
fs: 采样频率
Return
max_negative_p: 最大负向峰值
"""""
x1 = fft_filter(signal, 5, 500, fs)
x2 = []
for i in range(1, len(x1)):
if x1[i - 1] < 0:
k = x1[i - 1]
x2.append(k)
max_negative_p = abs(min(x2) - np.mean(x1))
return max_negative_p
# 监测峰峰值(5-500Hz)
def calc_monitor_pp(signal, fs, L):
"""""
signal: 振动位移信号
fs: 采样频率
L: 传感器量程
Return
mon_pp: 监测峰峰值
"""""
x1 = fft_filter(signal, 5, 500, fs)
for i in range(1, len(x1)):
if x1[i - 1] < x1[i]:
x1[i] = x1[i - 1] + (1000 / fs) * (L * 0.05)
elif x1[i - 1] > x1[i]:
x1[i] = x1[i - 1] - x1[i - 1] * 0.63 * (1 / fs)
mon_pp = max(x1) - min(x1)
return mon_pp
# 诊断峰峰值(5-500Hz)
def calc_diagnosis_pp(signal, fs):
"""""
signal: 振动位移信号
fs: 采样频率
Return
dia_pp: 诊断峰峰值
"""""
x1 = fft_filter(signal, 5, 500, fs)
dia_pp = max(x1) - min(x1)
return dia_pp
# 峰值(5-500Hz)
def calc_peak(signal, fs):
"""""
signal: 振动位移信号
fs: 采样频率
Return
Peak: 峰值
"""""
x1 = fft_filter(signal, 5, 500, fs)
peak = (max(x1) - min(x1)) / 2
return peak
# 峰值因子(5-500Hz)
def calc_peaking_factor(signal, fs):
"""""
signal: 振动位移信号
fs: 采样频率
Return
Cf: 峰值因子
"""""
x1 = fft_filter(signal, 5, 500, fs)
PP = (max(x1) - min(x1)) / 2
rms = np.sqrt(np.mean(x1 ** 2))
peaking_factor = PP / rms
return peaking_factor
# 推导峰值(5-500Hz)
def calc_derive_p(signal, fs):
"""""
signal: 振动位移信号
fs: 采样频率
Return
de_p: 推导峰值
"""""
x1 = fft_filter(signal, 5, 500, fs)
rms = np.sqrt(np.mean(x1 ** 2))
de_p = 1.414 * rms
return de_p
def calc_fea_displacement(signal, fs, L):
"""
位移传感器特征值计算
"""
# 最大正向峰值
max_positive_p = calc_max_positive_p(signal, fs)
# 最大负向峰值
max_negative_p = calc_max_negative_p(signal, fs)
# 监测峰峰值
mon_pp = calc_monitor_pp(signal, fs, L)
# 诊断峰峰值
dia_pp = calc_diagnosis_pp(signal, fs)
# 峰值
peak = calc_peak(signal, fs)
# 峰值因子
peaking_factor = calc_peaking_factor(signal, fs)
# 推导峰值
de_p = calc_derive_p(signal, fs)
return max_positive_p, max_negative_p, mon_pp, dia_pp, peak, peaking_factor, de_p
def filter_wave(x: np.ndarray, order: int, wn: int or list, type: str, fs: float, output: str = 'sos'):
"""
振动信号过滤,实现高通低通旁通等功能
:param x: np.ndarray
:param order: int,阶次
:param wn: int or list, if type is lowpass or highpass,the value is int,otherwise the value is list 截止频率
,top and bottom limit
:param type:str,lowpass,highpass,bandpass,bands-top
:param fs:float,frequency
:param output:str,three types: ba,sos,zpk
:return:np.array,过滤的信号
"""
sos = butter(order, wn, type, False, fs=fs, output=output)
return sosfilt(sos, x)
def envelope_detection(x: np.ndarray):
"""
提取振动信号的包络曲线
Parameters
----------
x: np.ndarray, 原始振动信号
y np.ndarray, 希尔伯特变换变换后的解析信号
Return
------
z: np.ndarray, 包络曲线
"""
x1 = hilbert(x)
y = x - x1 * 1j
z = abs(y)
return z
def calc_ylb_SWE(signal, fs):
"""
:return ylb_SWE: 应力波能量
"""
f_min = 32000 # 低频截至频率
f_max = 40000 # 高频截至频率
signal_filter_data = filter_wave(signal, 8, [f_min, f_max], "bandpass", fs)
signal_envelope = envelope_detection(signal_filter_data)
ylb_SWE = sum(signal_envelope[signal_envelope > 0])
return ylb_SWE
def calc_ylb_SWPE(signal, fs):
"""
:return ylb_SWPE: 脉冲能量
"""
f_min = 32000 # 低频截至频率
f_max = 48000 # 高频截至频率
L_times = 0.1 # 最小排序法时有效:人工设置极限阈值
signal_filter_data = filter_wave(signal, 8, [f_min, f_max], "bandpass", fs)
signal_envelope = envelope_detection(signal_filter_data)
signal_envelope_less = signal_envelope[argrelextrema(signal_envelope, np.less)] # 求局部极小值点,np.greatest是极大值点
L_sort = np.sort(signal_envelope_less)
L_Limit = L_sort[round(len(L_sort) * L_times)] # 设置最小阀值
signal_ylb = signal_envelope.copy()
signal_ylb[signal_ylb <= L_Limit] = 0 # 低于阀值设置为0
signal_ylb[0] = 0 # 首元素设置为0
signal_ylb[len(signal_envelope) - 1] = 0 # 尾元素设置为0
signal_ylb[signal_ylb > L_Limit] = 1
signal_dt = np.diff(signal_ylb)
dt_s = [i for i in range(len(signal_dt)) if signal_dt[i] == 1]
dt_x = [j for j in range(len(signal_dt)) if signal_dt[j] == -1]
signal_ylb_SWPE = np.linspace(0, 0, len(dt_s)) # 应力波脉冲能量值(SWPE)赋初值0
for i in range(len(dt_s)):
signal_ylb_SWPE[i] = sum(signal_envelope[dt_s[i] + 1:dt_x[i] + 1] - L_Limit)
ylb_SWPE = sum(signal_ylb_SWPE)
return ylb_SWPE
def calc_ylb_SWPA(signal, fs):
"""
:return ylb_SWPA: 最大峰高
"""
f_min = 32000 # 低频截至频率
f_max = 40000 # 高频截至频率
L_times = 0.1 # 最小排序法时有效:人工设置极限阈值
signal_filter_data = filter_wave(signal, 8, [f_min, f_max], "bandpass", fs)
signal_envelope = envelope_detection(signal_filter_data)
signal_envelope_less = signal_envelope[argrelextrema(signal_envelope, np.less)]
L_sort = np.sort(signal_envelope_less)
L_Limit = L_sort[round(len(L_sort) * L_times)] # 设置最小阀值
signal_ylb = signal_envelope.copy()
signal_ylb[signal_ylb <= L_Limit] = 0 # 低于阀值设置为0
signal_ylb[0] = 0 # 首元素设置为0
signal_ylb[len(signal_envelope) - 1] = 0 # 尾元素设置为0
signal_ylb[signal_ylb > L_Limit] = 1
signal_dt = np.diff(signal_ylb)
dt_s = [i for i in range(len(signal_dt)) if signal_dt[i] == 1]
dt_x = [j for j in range(len(signal_dt)) if signal_dt[j] == -1]
signal_ylb_SWPA = np.linspace(0, 0, len(dt_s)) # 应力波脉冲能量峰值(SWPA)赋初值0
for i in range(len(dt_s)):
signal_ylb_SWPA[i] = max(signal_envelope[dt_s[i]+1:dt_x[i]+1])
ylb_SWPA = max(signal_ylb_SWPA) # 最大峰高 SWPA
return ylb_SWPA
def calc_fea_ylb(signal):
"""
应力波传感器特征值计算
:return ylb_SWE, ylb_SWPE, ylb_SWPA: 应力波能量 脉冲能量 最大峰高
"""
fs_ylb = 131072
f_min = 32000 # 低频截至频率
f_max = 40000 # 高频截至频率
L_times = 0.1 # 最小排序法时有效:人工设置极限阈值
signal_filter_data = filter_wave(signal, 8, [f_min, f_max], "bandpass", fs_ylb)
signal_envelope = envelope_detection(signal_filter_data)
signal_envelope_less = signal_envelope[argrelextrema(signal_envelope, np.less)]
L_sort = np.sort(signal_envelope_less)
L_Limit = L_sort[round(len(L_sort) * L_times)] # 设置最小阀值
signal_ylb = signal_envelope.copy()
signal_ylb[signal_ylb <= L_Limit] = 0 # 低于阀值设置为0
signal_ylb[0] = 0 # 首元素设置为0
signal_ylb[len(signal_envelope) - 1] = 0 # 尾元素设置为0
signal_ylb[signal_ylb > L_Limit] = 1
signal_dt = np.diff(signal_ylb)
dt_s = [i for i in range(len(signal_dt)) if signal_dt[i] == 1]
dt_x = [j for j in range(len(signal_dt)) if signal_dt[j] == -1]
signal_ylb_SWPA = np.linspace(0, 0, len(dt_s)) # 应力波脉冲能量峰值(SWPA)赋初值0
signal_ylb_SWPE = np.linspace(0, 0, len(dt_s)) # 应力波脉冲能量值(SWPE)赋初值0
for i in range(len(dt_s)):
signal_ylb_SWPA[i] = max(signal_envelope[dt_s[i]+1:dt_x[i]+1])
signal_ylb_SWPE[i] = sum(signal_envelope[dt_s[i] + 1:dt_x[i] + 1] - L_Limit)
ylb_SWE = sum(signal_envelope[signal_envelope > 0])
ylb_SWPE = sum(signal_ylb_SWPE)
ylb_SWPA = max(signal_ylb_SWPA) # 最大峰高 SWPA
return ylb_SWE, ylb_SWPE, ylb_SWPA
def calc_multiple_frequency(fft_data, ift, sp):
"""
获取给定频率的幅值
Parameters
----------
fft_data: 频谱数据 [频率, 幅值]
ift: 给定频率
sp: 采样频率除以采样点数
Returns
-------
f_iX:i倍频幅值
"""
try:
f_iX = max(fft_data[floor((ift - 0.5) / sp):ceil((ift + 0.5) / sp)])
except ValueError:
f_iX = 0
return f_iX
def calc_fea_HS(fft_data, fea, sp, f_st, f_ord):
"""
:param fft_data: 频谱数据
:param fea: 特征值
:param sp: 采样频率除以采样点数
:param f_st: 起始阶次
:param f_ord: 计算阶次
:return: fea_HS: f_st~(f_st+f_ord-1)倍频幅值
"""
fea_HS = []
for i in range(f_st, f_st + f_ord):
fea_HS.append(calc_multiple_frequency(fft_data, i * fea, sp))
return fea_HS
def calc_HS(fft_data, fea, sp, f_st, f_ord):
"""
计算特征值的HS函数
:param fft_data: 频谱数据
:param fea: 特征值
:param sp: 采样频率除以采样点数
:param f_st: 起始阶次
:param f_ord: 计算阶次
:return: HS:
"""
mul = 0.707
fea_HS = calc_fea_HS(fft_data, fea, sp, f_st, f_ord)
HS = sqrt(sum(array(fea_HS) ** 2)) * mul
return HS
def calc_HRS(fft_data, fea_min, fea_max, sp):
"""
:param fft_data: 频谱数据
:param fea_min: 频率下限
:param fea_max: 频率上限
:param sp: 采样频率除以采样点数
:return: HRS: 能量和
"""
mul = 0.707
fea_X = fft_data[floor((fea_min - 0.5) / sp):ceil((fea_max + 0.5) / sp)]
HRS = sqrt(sum(fea_X ** 2)) * mul
return HRS
def calc_fea_HCS_up(fft_data, fc, fb, sp, f_st, f_ord):
"""
获取给定中心频率和边带的上边带能量和
Parameters
----------
fft_data: 频谱数据 [频率, 幅值]
fc: 中心频率
fb: 边带频率
sp: 采样频率除以采样点数
f_st: 起始阶次
f_ord: 计算阶次
Returns
-------
HRS_up:f_st~(f_st+f_ord-1)倍上边带能量和
"""
mul = 0.707
HCS_up = []
for i in range(f_st, f_st + f_ord):
xb = []
for j in range(1, 6):
xb.append(calc_multiple_frequency(fft_data, i * fc + j * fb, sp))
xb.append(calc_multiple_frequency(fft_data, i * fc, sp))
HCS_up.append(sqrt(sum([x ** 2 for x in xb])) * mul)
return HCS_up
def calc_fea_HCS_low(fft_data, fc, fb, sp, f_st, f_ord):
"""
获取给定中心频率和边带的下边带能量和
Parameters
----------
fft_data: 频谱数据 [频率, 幅值]
fc: 中心频率
fb: 边带频率
sp: 采样频率除以采样点数
f_st: 起始阶次
f_ord: 计算阶次
Returns
-------
HRS_low:f_st~(f_st+f_ord-1)倍下边带能量和
"""
mul = 0.707
HCS_low = []
for i in range(f_st, f_st + f_ord):
xb = []
for j in range(1, 6):
xb.append(calc_multiple_frequency(fft_data, i * fc - j * fb, sp))
xb.append(calc_multiple_frequency(fft_data, i * fc, sp))
HCS_low.append(sqrt(sum([x ** 2 for x in xb])) * mul)
return HCS_low
def calc_fea_HDS(fft_data, fea, sp, f_st, f_ord):
"""
:param fft_data: 频谱数据
:param fea: 特征值
:param sp: 采样频率除以采样点数
:param f_st: 起始阶次
:param f_ord: 计算阶次
:return: fea_HS: 1/f_st~1/(f_st+f_ord-1)倍频幅值
"""
fea_HDS = []
for i in range(f_st, f_st + f_ord):
fea_HDS.append(calc_multiple_frequency(fft_data, 1 / i * fea, sp))
return fea_HDS
def calc_HDS(fft_data, fea, sp, f_st, f_ord):
"""
:param fft_data: 频谱数据
:param fea: 特征值
:param sp: 采样频率除以采样点数
:param f_st: 起始阶次
:param f_ord: 计算阶次
:return: HDS:
"""
mul = 0.707
fea_HDS = calc_fea_HDS(fft_data, fea, sp, f_st, f_ord)
HDS = sqrt(sum(array(fea_HDS) ** 2)) * mul
return HDS
def calc_fea_HCR(fft_data, cf, sf, sp, f_st, f_ord):
"""
计算给定中心频率和边带频率的边带能量比
Parameters
----------
fft_data: 频谱数据 [频率, 幅值]
cf: 中心频率
sf: 边带频率
sp: 采样频率除以采样点数
f_st: 起始阶次
f_ord: 计算阶次
Returns
-------
fea_HCR:f_st~(f_st+f_ord-1)倍能量比
"""
fea_HCR = []
for i in range(f_st, f_st + f_ord):
xb1 = []
xb2 = []
for j in range(1, 6):
xb1.append(calc_multiple_frequency(fft_data, i * cf - j * sf, sp))
xb2.append(calc_multiple_frequency(fft_data, i * cf + j * sf, sp))
try:
Hb1 = sum([x ** 2 for x in xb1]) # 下边带能量和
except ValueError:
Hb1 = 0
try:
Hb2 = sum([x ** 2 for x in xb2]) # 上边带能量和
except ValueError:
Hb2 = 0
xc = calc_multiple_frequency(fft_data, i * cf, sp) # 给定中心频率幅值
if xc == 0:
H = 0
else:
H = sqrt(Hb1 + Hb2) / xc
fea_HCR.append(H)
return fea_HCR
def calc_HCR(fft_data, cf, sf, sp, f_st, f_ord):
"""
计算给定中心频率和边带频率的边带能量比
Parameters
----------
fft_data: 频谱数据 [频率, 幅值]
cf: 中心频率
sf: 边带频率
sp: 采样频率除以采样点数
f_st: 起始阶次
f_ord: 计算阶次
Returns
-------
HCR:f_st~(f_st+f_ord-1)倍能量比的平方和开根号
"""
fea_HCR = calc_fea_HCR(fft_data, cf, sf, sp, f_st, f_ord)
HCR = sqrt(sum(array(fea_HCR) ** 2))
return HCR
if __name__ == "__main__":
freq = 8192
data = np.loadtxt(r"E:\Workspace\Software\Python\无线传感器算法\特征值计算\09f07f1800158d00-Y.csv", delimiter=',', usecols=(0))
data_list = data.tolist()
data_list = data_list[1:]
print("数据长度:", len(data_list))
#速度有效值
speed_rms_value = calc_vel_pass_rms(data_list, freq)
print("速度有效值:", speed_rms_value)
# 速度峰值
speed_vel_p = calc_vel_p(data_list, freq)
print("速度有峰值:", speed_vel_p)
# 加速度有效值
acc_rms = calc_acc_rms(data_list, freq)
print("加速度有效值:", acc_rms)
# 加速度峰值
acc_p = calc_acc_p(data_list, freq)
print("加速度峰值:", acc_p)

View File

@ -7,13 +7,18 @@ import numpy as np
from scipy.fft import fft, fftfreq from scipy.fft import fft, fftfreq
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas, NavigationToolbar2QT from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas, NavigationToolbar2QT
from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QPushButton, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QFrame, QGridLayout, QStatusBar, QSizePolicy, QFileDialog from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget,QMessageBox, QPushButton, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QFrame, QGridLayout, QStatusBar, QSizePolicy, QFileDialog
from PyQt5.QtCore import Qt, QTimer, pyqtSlot, QByteArray from PyQt5.QtCore import Qt, QTimer, pyqtSlot, QByteArray
from PyQt5.QtNetwork import QTcpSocket, QAbstractSocket from PyQt5.QtNetwork import QTcpSocket, QAbstractSocket
import matplotlib import matplotlib
import struct import struct
import time import time
from typing import Tuple, Optional from typing import Tuple, Optional
from feauture_calculate import *
from SamplingDialog import *
import csv
import pandas as pd
# 启用高DPI支持 # 启用高DPI支持
QApplication.setAttribute(Qt.AA_EnableHighDpiScaling) QApplication.setAttribute(Qt.AA_EnableHighDpiScaling)
QApplication.setAttribute(Qt.AA_UseHighDpiPixmaps) QApplication.setAttribute(Qt.AA_UseHighDpiPixmaps)
@ -27,10 +32,18 @@ plt.rcParams['axes.unicode_minus'] = False # 解决负号 '-' 显示问题
HEADER_MAGIC = bytes([0xAA, 0x55, 0xAA]) HEADER_MAGIC = bytes([0xAA, 0x55, 0xAA])
HEADER_SIZE = 6 # PackgeHead除去data字段的大小(3+1+1+1) HEADER_SIZE = 6 # PackgeHead除去data字段的大小(3+1+1+1)
class Eigenvalue:
def __init__(self, temp1, temp2, offset1, offset2):
self.temp1 = temp1
self.temp2 = temp2
self.offset1 = offset1
self.offset2 = offset2
class MatplotlibCanvas(FigureCanvas): class MatplotlibCanvas(FigureCanvas):
""" 用于在 Qt 界面中嵌入 Matplotlib 绘图 """ """ 用于在 Qt 界面中嵌入 Matplotlib 绘图 """
def __init__(self, parent=None): def __init__(self, parent=None):
self.fig, self.axs = plt.subplots(2, 1, figsize=(10, 5)) # 2 个子图(时域 + 频域) self.fig, self.axs = plt.subplots(2, 2, figsize=(10, 5)) # 2 个子图(时域 + 频域)
self.fig.subplots_adjust(hspace=0.6) # 增大时域图和频域图的间距 self.fig.subplots_adjust(hspace=0.6) # 增大时域图和频域图的间距
super().__init__(self.fig) super().__init__(self.fig)
self.setParent(parent) self.setParent(parent)
@ -38,60 +51,116 @@ class MatplotlibCanvas(FigureCanvas):
def init_plot(self): def init_plot(self):
""" 初始化默认图像(占位提示) """ """ 初始化默认图像(占位提示) """
self.axs[0].set_title("时域信号", fontsize=12, fontweight='bold') self.axs[0,0].set_title("加速度时域信号", fontsize=12, fontweight='bold')
self.axs[0].set_xlabel("采样点") self.axs[0,0].set_xlabel("采样点")
self.axs[0].set_ylabel("幅度") self.axs[0,0].set_ylabel("幅度(m/s2)")
self.axs[0].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center', transform=self.axs[0].transAxes) self.axs[0,0].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center', transform=self.axs[0,0].transAxes)
self.axs[1].set_title("频域信号", fontsize=12, fontweight='bold') self.axs[0,1].set_title("加速度频域信号", fontsize=12, fontweight='bold')
self.axs[1].set_xlabel("频率 (Hz)") self.axs[0,1].set_xlabel("频率 (Hz)")
self.axs[1].set_ylabel("幅度") self.axs[0,1].set_ylabel("幅度(m/s2)")
self.axs[1].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center', transform=self.axs[1].transAxes) self.axs[0,1].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center', transform=self.axs[0,1].transAxes)
self.axs[1, 0].set_title("速度时域信号", fontsize=12, fontweight='bold')
self.axs[1, 0].set_xlabel("采样点")
self.axs[1, 0].set_ylabel("幅度(mm/s)")
self.axs[1, 0].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center',
transform=self.axs[1, 0].transAxes)
self.axs[1, 1].set_title("速度频域信号", fontsize=12, fontweight='bold')
self.axs[1, 1].set_xlabel("频率 (Hz)")
self.axs[1, 1].set_ylabel("幅度(mm/s)")
self.axs[1, 1].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center',
transform=self.axs[1, 1].transAxes)
self.draw() # 更新绘图 self.draw() # 更新绘图
def plot_data(self, data): def plot_data(self, data,sample):
""" 绘制时域和频域图 """ """ 绘制时域和频域图 """
self.axs[0].clear() self.axs[0,0].clear()
self.axs[1].clear() self.axs[0,1].clear()
# 时域信号 # 时域信号
self.axs[0].plot(data, color='blue') self.axs[0,0].plot(data, color='blue')
self.axs[0].set_title("时域信号", fontsize=12, fontweight='bold') self.axs[0,0].set_title("加速度时域信号", fontsize=12, fontweight='bold')
self.axs[0].set_xlabel("采样点") self.axs[0,0].set_xlabel("采样点")
self.axs[0].set_ylabel("幅度") self.axs[0,0].set_ylabel("幅度(m/s2)")
# 频域信号 # 频域信号
N = len(data) N = len(data)
T = 1.0 / 96000 # 假设采样率为 96000 Hz T = 1.0 / sample
yf = fft(data) yf = fft(data)[:N // 2]
xf = fftfreq(N, T)[:N // 2] xf = fftfreq(N, T)[:N // 2]
self.axs[1].plot(xf, 2.0 / N * np.abs(yf[:N // 2]), color='red') # self.axs[0,1].plot(xf, 2.0 / N * np.abs(yf[:N // 2]), color='red')
self.axs[1].set_title("频域信号", fontsize=12, fontweight='bold')
self.axs[1].set_xlabel("频率 (Hz)") # yf = yf[:4000]
self.axs[1].set_ylabel("幅度") # xf = xf[:4000]
self.axs[0,1].plot(xf, 2.0 / N * np.abs(yf), lw=0.2, color='black')
self.axs[0,1].plot(xf, 2.0 / N * np.abs(yf), '.', lw=0.3, color='red')
self.axs[0,1].set_title("加速度频域信号", fontsize=12, fontweight='bold')
self.axs[0,1].set_xlabel("频率 (Hz)")
self.axs[0,1].set_ylabel("幅度(m/s2)")
self.draw() # 更新绘图 self.draw() # 更新绘图
def plot_data_vel(self, data,sample):
""" 绘制时域和频域图 """
self.axs[1,0].clear()
self.axs[1,1].clear()
# 时域信号
self.axs[1,0].plot(data, color='blue')
self.axs[1,0].set_title("速度时域信号", fontsize=12, fontweight='bold')
self.axs[1,0].set_xlabel("采样点")
self.axs[1,0].set_ylabel("幅度(mm/s)")
# 频域信号
N = len(data)
T = 1.0 / sample
yf = fft(data)[:N // 2]
xf = fftfreq(N, T)[:N // 2]
# self.axs[1,1].plot(xf, 2.0 / N * np.abs(yf[:N // 2]), color='red')
yf = yf[:1000]
xf = xf[:1000]
self.axs[1,1].plot(xf, 2.0 / N * np.abs(yf), lw=0.2, color='black')
self.axs[1,1].plot(xf, 2.0 / N * np.abs(yf), '.', lw=0.3, color='red')
self.axs[1,1].set_title("速度频域信号", fontsize=12, fontweight='bold')
self.axs[1,1].set_xlabel("频率 (Hz)")
self.axs[1,1].set_ylabel("幅度(mm/s)")
self.draw() # 更新绘图
class SocketClientApp(QMainWindow): class SocketClientApp(QMainWindow):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self.setWindowTitle("Socket Client & Data Plotter") self.setWindowTitle("Socket Client & Data Plotter")
self.setGeometry(100, 100, 700, 600) # 设置初始尺寸 self.setGeometry(100, 100, 900, 700) # 设置初始尺寸
self.rate_code = 4
self.time_str = '1'
self.recv_state = ''
self.current_cmd = None
self.expected_length = 0
self.partial_data = QByteArray()
self.scaled_data = ''
self.sampling_rate = 0
self.buffer = QByteArray() self.buffer = QByteArray()
self.reconnect_timer = QTimer(self) self.reconnect_timer = QTimer(self)
self.reconnect_timer.timeout.connect(self.attempt_reconnect) self.reconnect_timer.timeout.connect(self.attempt_reconnect)
self.reconnect_interval = 5000 self.reconnect_interval = 5000
self.max_reconnect_attempts = 5 self.max_reconnect_attempts = 10
self.current_reconnect_attempts = 0 self.current_reconnect_attempts = 0
self.socket = QTcpSocket(self) self.socket = QTcpSocket(self)
self.socket.readyRead.connect(self.on_ready_read) self.socket.readyRead.connect(self.on_ready_read)
self.socket.disconnected.connect(self.on_socket_disconnected) self.socket.disconnected.connect(self.on_socket_disconnected)
self.socket.errorOccurred.connect(self.on_socket_error) self.socket.errorOccurred.connect(self.on_socket_error)
self.socket.connected.connect(self.on_socket_connected)
# 组件初始化 # 组件初始化
self.ip_label = QLabel("IP 地址:") self.ip_label = QLabel("IP 地址:")
@ -99,13 +168,23 @@ class SocketClientApp(QMainWindow):
self.ip_input = QLineEdit(self) self.ip_input = QLineEdit(self)
self.port_input = QLineEdit(self) self.port_input = QLineEdit(self)
self.connect_button = QPushButton("连接") self.connect_button = QPushButton("连接")
self.get_data_button = QPushButton("获取数据") self.get_data_button = QPushButton("获取数据1")
self.get_data_button2 = QPushButton("获取数据2")
self.get_temp_button = QPushButton("温度、偏置")
self.upgrade_button = QPushButton("更新固件") self.upgrade_button = QPushButton("更新固件")
self.upgrade_button_test = QPushButton("更新固件2") self.mac_config_button = QPushButton("mac配置")
self.ipv4_config_button = QPushButton("网络配置")
self.calibration_config_button = QPushButton("校准")
self.get_version_button = QPushButton("版本")
self.upgrade_button_sampling = QPushButton("采样率/时间")
self.save_button_csv = QPushButton("存储到csv")
self.get_data_button.setEnabled(False) # 初始状态不可点击 self.get_data_button.setEnabled(False) # 初始状态不可点击
self.get_data_button2.setEnabled(False) # 初始状态不可点击
self.upgrade_button.setEnabled(False) # 初始状态不可点击 self.upgrade_button.setEnabled(False) # 初始状态不可点击
self.upgrade_button_test.setEnabled(False) # 初始状态不可点击 self.get_temp_button.setEnabled(False)
self.ip_input.setText("192.168.0.200") self.upgrade_button_sampling.setEnabled(False)
self.save_button_csv.setEnabled(False)
self.ip_input.setText("192.168.0.199")
self.port_input.setText("12345") self.port_input.setText("12345")
# 预留绘图区域 # 预留绘图区域
@ -114,9 +193,14 @@ class SocketClientApp(QMainWindow):
# 预留特征值显示区域 # 预留特征值显示区域
self.feature_label = QLabel("特征值:") self.feature_label = QLabel("特征值:")
self.acceleration_label = QLabel("加速度: -") self.acceleration_label_rms = QLabel("加速度有效值: -")
self.velocity_label = QLabel("速度: -") self.acceleration_label_pp = QLabel("加速度峰值: -")
self.velocity_label_rms = QLabel("速度有效值: -")
self.velocity_label_pp = QLabel("速度峰值: -")
self.temp1_label = QLabel("温度1: -")
self.temp2_label = QLabel("温度2-")
self.offset1_label = QLabel("偏置电压1-")
self.offset2_label = QLabel("偏置电压2-")
# 设置分割线 # 设置分割线
self.line = QFrame() self.line = QFrame()
self.line.setFrameShape(QFrame.HLine) self.line.setFrameShape(QFrame.HLine)
@ -137,8 +221,15 @@ class SocketClientApp(QMainWindow):
button_layout = QHBoxLayout() button_layout = QHBoxLayout()
button_layout.addWidget(self.connect_button) button_layout.addWidget(self.connect_button)
button_layout.addWidget(self.get_data_button) button_layout.addWidget(self.get_data_button)
button_layout.addWidget(self.get_data_button2)
button_layout.addWidget(self.get_temp_button)
button_layout.addWidget(self.upgrade_button) button_layout.addWidget(self.upgrade_button)
button_layout.addWidget(self.upgrade_button_test) button_layout.addWidget(self.upgrade_button_sampling)
button_layout.addWidget(self.mac_config_button)
button_layout.addWidget(self.ipv4_config_button)
button_layout.addWidget(self.calibration_config_button)
button_layout.addWidget(self.get_version_button)
button_layout.addWidget(self.save_button_csv)
main_layout.addLayout(button_layout) main_layout.addLayout(button_layout)
# 添加分割线 # 添加分割线
@ -153,9 +244,17 @@ class SocketClientApp(QMainWindow):
# 特征值显示区域 # 特征值显示区域
feature_layout = QGridLayout() feature_layout = QGridLayout()
feature_layout.setVerticalSpacing(10)
feature_layout.addWidget(self.feature_label, 0, 0) feature_layout.addWidget(self.feature_label, 0, 0)
feature_layout.addWidget(self.acceleration_label, 0, 1) feature_layout.addWidget(self.acceleration_label_rms, 0, 1)
feature_layout.addWidget(self.velocity_label, 0, 2) feature_layout.addWidget(self.acceleration_label_pp, 0, 2)
feature_layout.addWidget(self.velocity_label_rms, 1, 1)
feature_layout.addWidget(self.velocity_label_pp, 1, 2)
feature_layout.addWidget(self.temp1_label, 2, 1)
feature_layout.addWidget(self.temp2_label, 2, 2)
feature_layout.addWidget(self.offset1_label, 3, 1)
feature_layout.addWidget(self.offset2_label, 3, 2)
graph_layout.addLayout(feature_layout) graph_layout.addLayout(feature_layout)
main_layout.addLayout(graph_layout) main_layout.addLayout(graph_layout)
@ -178,9 +277,15 @@ class SocketClientApp(QMainWindow):
# 事件绑定 # 事件绑定
self.connect_button.clicked.connect(self.connect_to_server) self.connect_button.clicked.connect(self.connect_to_server)
self.get_data_button.clicked.connect(self.on_button_clicked) self.get_data_button.clicked.connect(self.on_button_clicked)
self.get_data_button2.clicked.connect(self.on_button_clicked2)
self.upgrade_button.clicked.connect(self.on_button_upgrade) self.upgrade_button.clicked.connect(self.on_button_upgrade)
self.upgrade_button_test.clicked.connect(self.on_button_upgrade_test) self.get_temp_button.clicked.connect(self.on_button_temp)
self.upgrade_button_sampling.clicked.connect(self.on_button_samping_set)
self.save_button_csv.clicked.connect(self.on_button_save_csv)
self.mac_config_button.clicked.connect(self.mac_config_dialog)
self.ipv4_config_button.clicked.connect(self.ipv4_config_dialog)
self.calibration_config_button.clicked.connect(self.calibration_dialog)
self.get_version_button.clicked.connect(self.on_button_get_version)
# 设置布局策略,确保控件大小随窗口调整 # 设置布局策略,确保控件大小随窗口调整
self.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding) self.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
@ -221,15 +326,33 @@ class SocketClientApp(QMainWindow):
for byte in data: for byte in data:
crc += byte crc += byte
return crc & 0xFF # 只保留最低字节 return crc & 0xFF # 只保留最低字节
def connect_to_server(self): def connect_to_server(self):
#self.process_wave_packet('')
ip = self.ip_input.text() ip = self.ip_input.text()
port = int(self.port_input.text()) port = int(self.port_input.text())
self.socket.abort() self.socket.abort()
self.socket.connectToHost(ip, port) self.socket.connectToHost(ip, port)
def on_socket_connected(self):
self.status_bar.showMessage("状态: 连接成功")
self.get_data_button.setEnabled(True)
self.get_data_button2.setEnabled(True)
self.upgrade_button.setEnabled(True)
self.get_temp_button.setEnabled(True)
self.upgrade_button_sampling.setEnabled(True)
self.save_button_csv.setEnabled(True)
def on_socket_disconnected(self): def on_socket_disconnected(self):
print("on_socket_disconnected")
self.status_bar.showMessage("状态: 连接断开,正在重连...") self.status_bar.showMessage("状态: 连接断开,正在重连...")
self.get_data_button.setEnabled(False) self.get_data_button.setEnabled(False)
self.get_data_button2.setEnabled(False)
self.upgrade_button.setEnabled(False)
self.get_temp_button.setEnabled(False)
self.upgrade_button_sampling.setEnabled(False)
self.save_button_csv.setEnabled(False)
self.current_reconnect_attempts = 0
self.reconnect_timer.start(self.reconnect_interval) self.reconnect_timer.start(self.reconnect_interval)
def on_socket_error(self, error): def on_socket_error(self, error):
@ -238,13 +361,122 @@ class SocketClientApp(QMainWindow):
def attempt_reconnect(self): def attempt_reconnect(self):
if self.current_reconnect_attempts >= self.max_reconnect_attempts: if self.current_reconnect_attempts >= self.max_reconnect_attempts:
self.reconnect_timer.stop() self.reconnect_timer.stop()
self.status_bar.showMessage("状态: 重连失败,请检查网络") self.status_bar.showMessage("状态: 多次重连失败")
return return
self.current_reconnect_attempts += 1 self.current_reconnect_attempts += 1
print(f"{self.current_reconnect_attempts} 次尝试重连...")
self.socket.abort()
self.connect_to_server() self.connect_to_server()
if self.socket.waitForConnected(1000):
self.status_bar.showMessage("状态: 重新连接成功")
self.get_data_button.setEnabled(True)
self.reconnect_timer.stop()
def on_button_save_csv(self):
file_path = QFileDialog.getExistingDirectory(
None,
"选择保存目录",
"",
QFileDialog.ShowDirsOnly
)
if not file_path:
return
file_path = os.path.join(file_path, "wave.csv")
try:
# 将 QByteArray 转换为字符串并写入文件
# 写入 CSV 文件
with open(file_path, 'w', newline='') as csvfile:
writer = csv.writer(csvfile)
for val in self.scaled_data:
writer.writerow([val])
print(f"已保存 {len(self.scaled_data)} 个 float 到 {file_path}")
QMessageBox.information(
self,
"保存成功",
f"文件已成功保存到:\n{file_path}"
)
except Exception as e:
QMessageBox.critical(
self,
"保存失败",
f"保存文件时出错:\n{str(e)}"
)
def on_button_samping_set(self):
dialog = SamplingDialog(self)
if dialog.exec_() == QDialog.Accepted:
self.rate_code, self.time_str = dialog.get_values()
def on_ready_read(self): def on_ready_read(self):
self.buffer += self.socket.readAll() while self.socket.bytesAvailable():
if self.recv_state == 'WAIT_HEADER':
self.start_time = time.time()
print(f"开始时间戳: {self.start_time}")
if self.socket.bytesAvailable() >= HEADER_SIZE:
self.header = self.socket.read(HEADER_SIZE)
if self.header[:3] != HEADER_MAGIC:
print("无效 header magic跳过")
continue
self.cmd = self.header[3]
if self.cmd == 0x01:
self.recv_state = 'WAIT_WAVE_HEADER'
else:
print(f"未知或暂不处理的 cmd: {self.cmd}")
self.recv_state = 'WAIT_HEADER'
return
else:
return
elif self.recv_state == 'WAIT_WAVE_HEADER':
if self.socket.bytesAvailable() >= 5:
wave_header = self.socket.read(5)
self.sampling_rate = struct.unpack('<i', wave_header[1:5])[0]
print(f"采样率: {self.sampling_rate}")
self.expected_length = self.sampling_rate * 4
self.partial_data.clear()
self.recv_state = 'WAIT_WAVE_DATA'
else:
return
elif self.recv_state == 'WAIT_WAVE_DATA':
bytes_needed = self.expected_length - len(self.partial_data)
chunk = self.socket.read(min(self.socket.bytesAvailable(), bytes_needed))
self.partial_data += chunk
if len(self.partial_data) >= self.expected_length:
end_time = time.time()
execution_time = end_time - self.start_time
print(f"结束时间戳: {end_time}")
print(f"代码执行时间: {execution_time}")
self.process_wave_packet(bytes(self.partial_data))
self.recv_state = 'WAIT_HEADER'
self.partial_data.clear()
else:
if self.socket.bytesAvailable() >= HEADER_SIZE:
recv_data = self.socket.read(1340)
if recv_data[:3] != HEADER_MAGIC:
print("无效 header magic跳过")
continue
cmd = recv_data[3]
if cmd == 0x0C:
body_format = '< H H H H'
body_size = struct.calcsize(body_format)
body_data = recv_data[HEADER_SIZE:HEADER_SIZE + body_size]
unpacked_data = struct.unpack(body_format, body_data)
value = Eigenvalue(*unpacked_data)
print(f"温度1{value.temp1/1000},温度2{value.temp2/1000},偏置电压1{value.offset1/100},偏置电压2{value.offset2/100}")
self.temp1_label.setText(f"温度1{value.temp1/1000} v")
self.temp2_label.setText(f"温度2{value.temp2 / 1000} v")
self.offset1_label.setText(f"偏置电压1{value.offset1 / 100} v")
self.offset2_label.setText(f"偏置电压2{value.offset2 / 100} v")
elif cmd == 0x0F:
body_format = '< B B'
body_size = struct.calcsize(body_format)
body_data = recv_data[HEADER_SIZE:HEADER_SIZE + body_size]
unpacked_data = struct.unpack(body_format, body_data)
print(f"version{unpacked_data}")
QMessageBox.information(self, "版本", f"{unpacked_data[0]}.{unpacked_data[1]}")
def receive_data(self, length: int): def receive_data(self, length: int):
while len(self.buffer) < length: while len(self.buffer) < length:
@ -274,75 +506,89 @@ class SocketClientApp(QMainWindow):
'version': version, 'version': version,
'result_code': result_code 'result_code': result_code
}, data[HEADER_SIZE:] }, data[HEADER_SIZE:]
def process_wave_packet(self,wave_data):
def process_packet(self): data = wave_data # 接收所有数据
""" 循环接收数据,直到接收完整 """
try:
# 1. 接收包头
start_time = time.time()
header_data = self.receive_data(6)
header, _ = self.parse_package_head(header_data)
if not header:
print("Invalid package header")
return None
# 2. 根据命令类型处理数据
if header['cmd'] == 0x01:
# 先接收WaveDataRsp的固定部分(5字节)
wave_header = self.receive_data(5)
channel_id = wave_header[0]
sampling_rate = struct.unpack('<i', wave_header[1:5])[0]
print(f"channel_id {channel_id}")
print(f"sampling_rate {sampling_rate}")
# 计算需要接收的数据长度
data_length = sampling_rate * 4
wave_data = self.receive_data(data_length)
end_time = time.time()
# 计算并打印执行时间
execution_time = end_time - start_time
print(f"代码执行时间: {execution_time}")
# 合并成完整的数据包
# full_packet = header_data + wave_header + wave_data
return wave_data
else:
print(f"Unsupported command: {header['cmd']}")
return None
except ConnectionError as e:
print(f"Connection error: {e}")
return None
except Exception as e:
print(f"Error processing packet: {e}")
return None
def on_button_clicked(self):
""" 获取数据并绘制 """
try:
self.status_bar.showMessage("状态: 正在获取数据...")
self.socket.write(bytes([0xAA,0x55,0xAA,0x01,0x00,0x00,0x01,0x00,0x77,0x01,0x00,0x01,0x00,0x00,0x00])) # 发送数据
data = self.process_packet() # 接收所有数据
data = np.frombuffer(data, dtype=np.int32) # 根据实际数据格式转换 data = np.frombuffer(data, dtype=np.int32) # 根据实际数据格式转换
for i in range(min(100, len(data))): # 确保不超过数据长度 for i in range(min(100, len(data))): # 确保不超过数据长度
print(f"{data[i]:1f}", end=" ") print(f"{data[i]:1f}", end=" ")
print() # 换行 print() # 换行
LSB_32BIT = (2.8 / (2 ** 31)) * ((750 + 287) / 287) * 1000 LSB_32BIT = (2.8 / (2 ** 31)) * ((750 + 287) / 287) * 1000/10.2
scaled_data = data * LSB_32BIT # LSB_32BIT = (2.8 / (2 ** 31)) * ((750 + 287) / 287) * 1000
for i in range(min(100, len(scaled_data))): # 确保不超过数据长度 self.scaled_data = data * LSB_32BIT
print(f"{scaled_data[i]:2f}", end=" ") # for i in range(min(100, len(self.scaled_data))): # 确保不超过数据长度
print() # 换行 # print(f"{self.scaled_data[i]:2f}", end=" ")
result = self.get_extremes(scaled_data) # print() # 换行
print("最大的10个值:", result['top10_max']) # result = self.get_extremes(self.scaled_data)
print("最小的10个值:", result['top10_min']) # print("最大的10个值:", result['top10_max'])
mean_max = self.mean_without_max_optimized(result['top10_max']) # print("最小的10个值:", result['top10_min'])
print(f"top10_max 去除最大的数据后的平均值1{mean_max}") # mean_max = self.mean_without_max_optimized(result['top10_max'])
mean_min = self.mean_without_min_optimized(result['top10_min']) # print(f"top10_max 去除最大的数据后的平均值1{mean_max}")
print(f"top10_min 去除最大的数据后的平均值2{mean_min}") # mean_min = self.mean_without_min_optimized(result['top10_min'])
print(f"pp :{mean_max - mean_min}") # print(f"top10_min 去除最大的数据后的平均值2{mean_min}")
self.canvas.plot_data(scaled_data) # 在 Qt 界面中绘图 # print(f"pp :{mean_max - mean_min}")
print(f"采样率: {self.sampling_rate}")
# self.sampling_rate = 48000 # 假设采样率为 48000 Hz
self.canvas.plot_data(self.scaled_data,self.sampling_rate) # 在 Qt 界面中绘图
data_filter = fft_filter(self.scaled_data, 10, 1000,self.sampling_rate)
data_vel1,data_vel2 = acc2dis(data_filter,self.sampling_rate)
self.canvas.plot_data_vel(data_vel1,self.sampling_rate) # 在 Qt 界面中绘图
self.status_bar.showMessage("状态: 数据绘制完成") self.status_bar.showMessage("状态: 数据绘制完成")
# 速度有效值
speed_rms_value = calc_vel_pass_rms(self.scaled_data, self.sampling_rate)
print("速度有效值:", speed_rms_value)
# 速度峰值
speed_vel_p = calc_vel_p(self.scaled_data, self.sampling_rate)
print("速度峰值:", speed_vel_p)
# 加速度有效值
acc_rms = calc_acc_rms(self.scaled_data, self.sampling_rate)
print("加速度有效值:", acc_rms)
# 加速度峰值
acc_p = calc_acc_p(self.scaled_data, self.sampling_rate)
print("加速度峰值:", acc_p)
self.acceleration_label_rms.setText(f"加速度有效值:{round(acc_rms,3)} m/s^2")
self.acceleration_label_pp.setText(f"加速度峰值:{round(acc_p,3)} m/s^2")
self.velocity_label_rms.setText(f"速度有效值:{round(speed_rms_value,3)} mm/s")
self.velocity_label_pp.setText(f"速度峰值:{round(speed_vel_p,3)} mm/s")
def on_button_clicked(self):
try:
self.recv_state = 'WAIT_HEADER'
time_value = int(self.time_str)
rate_bytes = struct.pack('<I', self.rate_code)
time_bytes = struct.pack('<I', time_value)
packet = bytes([
0xAA, 0x55, 0xAA,
0x01, 0x00, 0x00,
0x01
]) + rate_bytes + time_bytes
self.socket.write(packet)
print(packet)
self.status_bar.showMessage(f"状态: 正在获取通道1数据采样率: {self.rate_code}, 时间: {time_value}s")
self.socket.waitForReadyRead()
except Exception as e: except Exception as e:
self.status_bar.showMessage(f"状态: 错误 - {str(e)}") self.status_bar.showMessage(f"状态: 错误 - {str(e)}")
def on_button_clicked2(self):
""" 获取数据并绘制 """
try:
self.recv_state = 'WAIT_HEADER'
time_value = int(self.time_str)
rate_bytes = struct.pack('<I', self.rate_code)
time_bytes = struct.pack('<I', time_value)
packet = bytes([
0xAA, 0x55, 0xAA,
0x01, 0x00, 0x00,
0x02
]) + rate_bytes + time_bytes
self.socket.write(packet)
self.status_bar.showMessage(f"状态: 正在获取通道2数据采样率: {self.rate_code}, 时间: {time_value}s")
self.socket.waitForReadyRead()
except Exception as e:
self.status_bar.showMessage(f"状态: 错误 - {str(e)}")
def on_button_upgrade(self): def on_button_upgrade(self):
"""打开文件选择对话框""" """打开文件选择对话框"""
file_path, _ = QFileDialog.getOpenFileName( file_path, _ = QFileDialog.getOpenFileName(
@ -440,6 +686,45 @@ class SocketClientApp(QMainWindow):
print("Upgrade packet ready to send (commented out actual send code)") print("Upgrade packet ready to send (commented out actual send code)")
except Exception as e: except Exception as e:
print(f"Upgrade failed: {str(e)}") print(f"Upgrade failed: {str(e)}")
def on_button_temp(self):
try:
self.recv_state = ''
self.status_bar.showMessage("状态: 正在获取数据...",3000)
self.socket.write(bytes(
[0xAA, 0x55, 0xAA, 0x0B, 0x01, 0x00])) # 发送数据
self.socket.waitForReadyRead()
except Exception as e:
self.status_bar.showMessage(f"状态: 错误 - {str(e)}")
def mac_config_dialog(self):
self.recv_state = ''
self.socket.readyRead.disconnect(self.on_ready_read)
dialog = MacConfigDialog(self.socket, self)
dialog.exec_()
self.socket.readyRead.connect(self.on_ready_read)
def ipv4_config_dialog(self):
self.recv_state = ''
self.socket.readyRead.disconnect(self.on_ready_read)
dialog = IPConfigDialog(self.socket, self)
dialog.exec_()
self.socket.readyRead.connect(self.on_ready_read)
def calibration_dialog(self):
self.recv_state = ''
self.socket.readyRead.disconnect(self.on_ready_read)
dialog = CalibrationDialog(self.socket, self)
dialog.exec_()
self.socket.readyRead.connect(self.on_ready_read)
def on_button_get_version(self):
try:
self.recv_state = ''
self.status_bar.showMessage("状态: 正在获取数据...",3000)
self.socket.write(bytes(
[0xAA, 0x55, 0xAA, 0x0F, 0x01, 0x00])) # 发送数据
self.socket.waitForReadyRead()
except Exception as e:
self.status_bar.showMessage(f"状态: 错误 - {str(e)}")
if __name__ == '__main__': if __name__ == '__main__':
app = QApplication(sys.argv) app = QApplication(sys.argv)