265 lines
10 KiB
Python
265 lines
10 KiB
Python
import sys
|
||
import socket
|
||
import numpy as np
|
||
from scipy.fft import fft, fftfreq
|
||
import matplotlib.pyplot as plt
|
||
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas, NavigationToolbar2QT
|
||
from PyQt5.QtWidgets import QApplication, QMainWindow, QWidget, QPushButton, QVBoxLayout, QHBoxLayout, QLabel, QLineEdit, QFrame, QGridLayout, QStatusBar, QSizePolicy
|
||
from PyQt5.QtCore import Qt
|
||
import matplotlib
|
||
import struct
|
||
import time
|
||
# 启用高DPI支持
|
||
QApplication.setAttribute(Qt.AA_EnableHighDpiScaling)
|
||
QApplication.setAttribute(Qt.AA_UseHighDpiPixmaps)
|
||
|
||
# 强制使用 Qt5Agg 后端
|
||
matplotlib.use('Qt5Agg')
|
||
|
||
plt.rcParams['font.sans-serif'] = ['SimHei'] # Windows 中文字体
|
||
plt.rcParams['axes.unicode_minus'] = False # 解决负号 '-' 显示问题
|
||
|
||
|
||
class MatplotlibCanvas(FigureCanvas):
|
||
""" 用于在 Qt 界面中嵌入 Matplotlib 绘图 """
|
||
def __init__(self, parent=None):
|
||
self.fig, self.axs = plt.subplots(2, 1, figsize=(10, 5)) # 2 个子图(时域 + 频域)
|
||
self.fig.subplots_adjust(hspace=0.6) # 增大时域图和频域图的间距
|
||
super().__init__(self.fig)
|
||
self.setParent(parent)
|
||
self.init_plot()
|
||
|
||
def init_plot(self):
|
||
""" 初始化默认图像(占位提示) """
|
||
self.axs[0].set_title("时域信号", fontsize=12, fontweight='bold')
|
||
self.axs[0].set_xlabel("采样点")
|
||
self.axs[0].set_ylabel("幅度")
|
||
self.axs[0].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center', transform=self.axs[0].transAxes)
|
||
|
||
self.axs[1].set_title("频域信号", fontsize=12, fontweight='bold')
|
||
self.axs[1].set_xlabel("频率 (Hz)")
|
||
self.axs[1].set_ylabel("幅度")
|
||
self.axs[1].text(0.5, 0.5, "暂无数据", fontsize=12, ha='center', va='center', transform=self.axs[1].transAxes)
|
||
|
||
self.draw() # 更新绘图
|
||
|
||
def plot_data(self, data):
|
||
""" 绘制时域和频域图 """
|
||
self.axs[0].clear()
|
||
self.axs[1].clear()
|
||
|
||
# 时域信号
|
||
self.axs[0].plot(data, color='blue')
|
||
self.axs[0].set_title("时域信号", fontsize=12, fontweight='bold')
|
||
self.axs[0].set_xlabel("采样点")
|
||
self.axs[0].set_ylabel("幅度")
|
||
|
||
# 频域信号
|
||
N = len(data)
|
||
T = 1.0 / 96000 # 假设采样率为 96000 Hz
|
||
yf = fft(data)
|
||
xf = fftfreq(N, T)[:N // 2]
|
||
|
||
self.axs[1].plot(xf, 2.0 / N * np.abs(yf[:N // 2]), color='red')
|
||
self.axs[1].set_title("频域信号", fontsize=12, fontweight='bold')
|
||
self.axs[1].set_xlabel("频率 (Hz)")
|
||
self.axs[1].set_ylabel("幅度")
|
||
|
||
self.draw() # 更新绘图
|
||
|
||
|
||
class SocketClientApp(QMainWindow):
|
||
def __init__(self):
|
||
super().__init__()
|
||
self.setWindowTitle("Socket Client & Data Plotter")
|
||
self.setGeometry(100, 100, 700, 600) # 设置初始尺寸
|
||
|
||
# 组件初始化
|
||
self.ip_label = QLabel("IP 地址:")
|
||
self.port_label = QLabel("端口号:")
|
||
self.ip_input = QLineEdit(self)
|
||
self.port_input = QLineEdit(self)
|
||
self.connect_button = QPushButton("连接")
|
||
self.get_data_button = QPushButton("获取数据")
|
||
self.get_data_button.setEnabled(False) # 初始状态不可点击
|
||
self.ip_input.setText("192.168.0.200")
|
||
self.port_input.setText("12345")
|
||
|
||
# 预留绘图区域
|
||
self.canvas = MatplotlibCanvas(self)
|
||
self.toolbar = NavigationToolbar2QT(self.canvas, self) # 添加工具栏(放大、缩小、拖拽)
|
||
|
||
# 预留特征值显示区域
|
||
self.feature_label = QLabel("特征值:")
|
||
self.acceleration_label = QLabel("加速度: -")
|
||
self.velocity_label = QLabel("速度: -")
|
||
|
||
# 设置分割线
|
||
self.line = QFrame()
|
||
self.line.setFrameShape(QFrame.HLine)
|
||
self.line.setFrameShadow(QFrame.Sunken)
|
||
|
||
# 布局管理
|
||
main_layout = QVBoxLayout()
|
||
|
||
# 输入区域(IP & 端口)
|
||
input_layout = QHBoxLayout()
|
||
input_layout.addWidget(self.ip_label)
|
||
input_layout.addWidget(self.ip_input)
|
||
input_layout.addWidget(self.port_label)
|
||
input_layout.addWidget(self.port_input)
|
||
main_layout.addLayout(input_layout)
|
||
|
||
# 按钮区域
|
||
button_layout = QHBoxLayout()
|
||
button_layout.addWidget(self.connect_button)
|
||
button_layout.addWidget(self.get_data_button)
|
||
main_layout.addLayout(button_layout)
|
||
|
||
# 添加分割线
|
||
main_layout.addWidget(self.line)
|
||
|
||
# 工具栏(放大、缩小、拖拽)
|
||
main_layout.addWidget(self.toolbar)
|
||
|
||
# 绘图 + 特征值区域
|
||
graph_layout = QVBoxLayout()
|
||
graph_layout.addWidget(self.canvas)
|
||
|
||
# 特征值显示区域
|
||
feature_layout = QGridLayout()
|
||
feature_layout.addWidget(self.feature_label, 0, 0)
|
||
feature_layout.addWidget(self.acceleration_label, 0, 1)
|
||
feature_layout.addWidget(self.velocity_label, 0, 2)
|
||
graph_layout.addLayout(feature_layout)
|
||
|
||
main_layout.addLayout(graph_layout)
|
||
|
||
# 增大特征值区域与状态栏的间距
|
||
main_layout.addSpacing(20)
|
||
|
||
# 创建状态栏
|
||
self.status_bar = QStatusBar()
|
||
self.setStatusBar(self.status_bar)
|
||
|
||
# 状态栏显示初始消息
|
||
self.status_bar.showMessage("状态: 请填写 IP 和端口号")
|
||
|
||
# 主窗口的中央部件
|
||
central_widget = QWidget()
|
||
central_widget.setLayout(main_layout)
|
||
self.setCentralWidget(central_widget)
|
||
|
||
# 事件绑定
|
||
self.connect_button.clicked.connect(self.connect_to_server)
|
||
self.get_data_button.clicked.connect(self.on_button_clicked)
|
||
|
||
# 设置布局策略,确保控件大小随窗口调整
|
||
self.setSizePolicy(QSizePolicy.Expanding, QSizePolicy.Expanding)
|
||
|
||
def connect_to_server(self):
|
||
""" 连接到服务器 """
|
||
ip = self.ip_input.text()
|
||
port = self.port_input.text()
|
||
|
||
if not ip or not port:
|
||
self.status_bar.showMessage("状态: 请输入有效的 IP 和端口号")
|
||
return
|
||
|
||
try:
|
||
port = int(port)
|
||
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
# 设置 keep-alive 选项
|
||
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
|
||
self.socket.connect((ip, port))
|
||
|
||
self.get_data_button.setEnabled(True) # 连接成功,启用获取数据按钮
|
||
self.status_bar.showMessage(f"状态: 连接成功! 服务器 {ip}:{port}")
|
||
|
||
except Exception as e:
|
||
self.status_bar.showMessage(f"状态: 连接失败 - {str(e)}")
|
||
|
||
def receive_data(self):
|
||
""" 循环接收数据,直到接收完整 """
|
||
received_data = b"" # 初始化接收数据缓存
|
||
complete_data = b"" # 用于存储拼接后的完整数据
|
||
buffer_size = 0
|
||
# 记录开始时间
|
||
start_time = time.time()
|
||
try:
|
||
while True:
|
||
# 每次接收 1024 字节的数据
|
||
chunk = self.socket.recv(1000)
|
||
buffer_size += len(chunk)
|
||
print(f"len partf {len(chunk)},buffer_size {buffer_size}")
|
||
# 拼接数据
|
||
if buffer_size > 97000*4:
|
||
break # 如果没有数据,退出循环
|
||
received_data += chunk
|
||
# 记录结束时间
|
||
end_time = time.time()
|
||
|
||
# 计算并打印执行时间
|
||
execution_time = end_time - start_time
|
||
print(f"代码执行时间: {execution_time} 秒")
|
||
# 循环查找并解析帧头
|
||
while True:
|
||
# 查找帧头
|
||
frame_start = received_data.find(b'\xAA\x55\xAA')
|
||
if frame_start == -1:
|
||
break # 没有找到帧头,退出内层循环,等待更多数据
|
||
|
||
# 丢弃帧头之前的数据(可能是无效数据)
|
||
received_data = received_data[frame_start:]
|
||
|
||
# 检查数据是否足够解析长度字段
|
||
if len(received_data) < 7:
|
||
break # 数据不完整,退出内层循环,等待更多数据
|
||
|
||
try:
|
||
# 解析数据长度字段
|
||
data_length = struct.unpack('<I', received_data[3:7])[0]
|
||
except Exception as e:
|
||
print(f"解析长度字段失败: {e}")
|
||
received_data = received_data[1:] # 丢弃无效字节,继续查找帧头
|
||
continue
|
||
|
||
# 检查数据是否足够解析完整帧
|
||
print(f"解析长度字段 {len(received_data)}")
|
||
if len(received_data) < 7 + data_length:
|
||
break # 数据不完整,退出内层循环,等待更多数据
|
||
|
||
# 提取实际数据
|
||
actual_data = received_data[7:7 + data_length]
|
||
print(f"解析到的长度字段 {len(actual_data)}")
|
||
complete_data += actual_data # 将数据拼接到完整数据中
|
||
print(f"解析到的长度字段2 {len(complete_data)}")
|
||
# 丢弃已处理的数据
|
||
received_data = received_data[7 + data_length:]
|
||
print(f"还剩下解析长度字段 {len(received_data)}")
|
||
|
||
except Exception as e:
|
||
print(f"发生错误: {e}")
|
||
print(f" 数据: {len(complete_data)}")
|
||
return complete_data
|
||
|
||
def on_button_clicked(self):
|
||
""" 获取数据并绘制 """
|
||
try:
|
||
self.status_bar.showMessage("状态: 正在获取数据...")
|
||
data = self.receive_data() # 接收所有数据
|
||
data = np.frombuffer(data, dtype=np.int32) # 根据实际数据格式转换
|
||
LSB_32BIT = (2.8 / (2 ** 31)) * ((750 + 287) / 287) * 1000
|
||
scaled_data = data * LSB_32BIT
|
||
self.canvas.plot_data(scaled_data) # 在 Qt 界面中绘图
|
||
self.status_bar.showMessage("状态: 数据绘制完成")
|
||
except Exception as e:
|
||
self.status_bar.showMessage(f"状态: 错误 - {str(e)}")
|
||
|
||
|
||
if __name__ == '__main__':
|
||
app = QApplication(sys.argv)
|
||
window = SocketClientApp()
|
||
window.show()
|
||
sys.exit(app.exec_())
|