DataNodeFrequencyTest1/DataNodeFrequencyTestnew.py
DESKTOP-2QGM7IR\Lab01 5de3c3cc90 第一次提交
2025-04-09 09:49:42 +08:00

839 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python3
# -*- coding: utf-8 -*-
import binascii
from MainWindow import Ui_DataNodeFrequencyTest
from PyQt5.QtWidgets import QApplication, QMainWindow, QMessageBox
from AFG3000 import *
from Prologix import *
from DN101_NFC import *
from DN101_ZIGBEE import *
from Mysql import *
import serial, threading
import serial.tools.list_ports
from PyQt5.QtCore import QTimer, QThread, pyqtSignal
from PyQt5.QtGui import QIcon
from PyQt5.QtWidgets import *
import numpy as np
import matplotlib
import time
from sqlalchemy import create_engine
import pandas as pd
import tkinter as tk
from tkinter import filedialog
from timing import Timer
from playsound import playsound
matplotlib.use("Qt5Agg")
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.figure import Figure
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei'] # 指定默认字体
plt.rcParams['axes.unicode_minus'] = False # 解决保存图像是负号'-'显示为方块的问题
from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NavigationToolbar
import win32con
#from win32process import SuspendThread, ResumeThread
import ctypes
import crcmod
cfp = configparser.ConfigParser()
cfp.read("config.ini")
SENSER_INF_PACK1 = 0x01
SENSER_INF_PACK2 = 0x07
EVEL_DATA_PACK1 = 0x02
EVEL_DATA_PACK2 = 0x06
WAVE_X_PACK = 0x03
WAVE_Y_PACK = 0x04
WAVE_Z_PACK = 0x05
NFC_PORT_STR = 'Silicon Labs Dual CP210x USB to UART Bridge: Enhanced COM Port'
ZIG_PORT_STR = 'Silicon Labs Dual CP210x USB to UART Bridge: Standard COM Port'
AFG = AFG3000()
#DMM = DMM_34401A(addr=22, port="COM12", baud=115200, ser_timeout=3, timeout=2)
db = Mysql()
Stop = 0
sensorType = -1
#vol = {
# "160Hz":1.575,
# "320Hz":1.000,
# "640Hz":1.000,
# "1000Hz":2.620,
# "2000Hz":2.9,
# "3000Hz":2.7,
# "4000Hz":1.82,
# "5000Hz":1.33,
# "6000Hz":0.66
#}
class Thread_01(QThread): # 线程1
trigger = pyqtSignal(str)
Voltage = pyqtSignal(str)
AmplSig = pyqtSignal(str)
TestOK = pyqtSignal(list)
ThreadStop = pyqtSignal()
TimeOut = pyqtSignal()
def __init__(self, Fredata,vol):
super().__init__()
self.Fredata = Fredata
self.vol = vol
handle = -1
def run(self):
try:
self.handle = ctypes.windll.kernel32.OpenThread(
win32con.PROCESS_ALL_ACCESS, False, int(QThread.currentThreadId()))
except Exception as e:
print('get thread handle failed', e)
global StopThread
for i in range(0, len(self.Fredata)):
print(self.Fredata[i]['Frequency'])
start_time = time.time()
ampl = 0.02
ampl_ins = 0.05
AFG.SetAmpl(ampl)
AFG.SetFrequency(str(self.Fredata[i]['Frequency']))
self.trigger.emit(str(self.Fredata[i]['Frequency']))
#Voltage = str(DMM.measure())
# Voltage = (AFG.measure())
# Voltage = str(round(Voltage, 2))
# self.Voltage.emit(Voltage)
retAmpl = AFG.query('VOLTAGE:AMPLITUDE?')
retAmpl = round(retAmpl,2)
self.AmplSig.emit(str(retAmpl))
time.sleep(0.5)
AFG.Open()
time.sleep(0.5)
current_freq = self.Fredata[i]['Frequency']
ampl_set = float(self.vol[current_freq])
start_time2 = time.time()
while(ampl<ampl_set):
ampl = ampl + ampl_ins
AFG.SetAmpl(ampl)
time.sleep(0.05)
end_time2 = time.time()
print("调整时间2", end_time2 - start_time)
retry_times = 3
Voltage = (AFG.measure())
start_time3 = time.time()
while retry_times:
vol_diff = 100-Voltage
vol_div = vol_diff/Voltage
vol_change = ampl*vol_div
ampl = ampl+vol_change
AFG.SetAmpl(ampl)
time.sleep(0.1)
Voltage = AFG.measure()
#print(Voltage,abs(Voltage-100))
if(abs(Voltage-100)<1):
retry_times = 0
else:
retry_times = retry_times -1
try:
vol_str = str(round(Voltage, 2))
self.Voltage.emit(vol_str)
retAmpl = AFG.query(':SOUR1:VOLT?')
retAmpl = round(retAmpl, 2)
self.AmplSig.emit(str(retAmpl))
except Exception as e:
print("A")
self.vol[current_freq] = str(ampl)
print(current_freq, ampl)
end_time3 = time.time()
a = []
a.append('1')
a.append(str(self.Fredata[i]['Frequency']))
a.append(str(Voltage))
a.append(time.time())
time.sleep(3)
self.TestOK.emit(a)
print("调整时间3", end_time3 - start_time)
timeCount = 0
while timeCount < 500:
global Stop
#print(timeCount)
if Stop == 1:
time.sleep(0.1)
Stop = 0
break
timeCount += 1
time.sleep(0.1)
end_time4 = time.time()
print("调整时间4", end_time4 - start_time)
if timeCount == 500:
print("timeout emit")
self.TimeOut.emit()
#print(Voltage)
# retAmpl = AFG.query('VOLTAGE:AMPLITUDE?')
retAmpl = AFG.query(':SOUR1:VOLT?')
#print(retAmpl)
while float(retAmpl) > 0.1:#微调ampl到最低值20mv
retAmpl -= 0.1
#print(retAmpl)
AFG.SetAmpl(retAmpl)
# retAmpl = AFG.query('VOLTAGE:AMPLITUDE?')
retAmpl = AFG.query(':SOUR1:VOLT?')
time.sleep(0.05)
#print(retAmpl)
time.sleep(0.1)
AFG.Close()
end_time = time.time()
print("调整时间:", end_time - start_time)
ampl = 0.02
AFG.SetAmpl(ampl)
AFG.SetFrequency('80')
AFG.Close()
self.ThreadStop.emit()
class MyFigureCanvas(FigureCanvas):
def __init__(self):
self.fig = Figure()
FigureCanvas.__init__(self, self.fig)
self.axes = self.fig.add_subplot(111)
#self.axes.set_ylim(ymin=0, ymax=30)
def plot(self, data,shortAddr):
self.axes.cla()
x = []
y1 = []
y2 = []
data1 = float(data[0]['rmsValue_Z']) * 1.41
for i in range(len(data)):
y1.append(float(data[i]['rmsValue_Z']))
x.append(data[i]['Frequency'])
y2.append(data1)
self.axes.plot(x, y1)
data1 = float(data[0]['rmsValue_Z']) * 1.41
self.axes.plot(x, y2)
self.axes.set_title('传感器短地址' + shortAddr)
self.axes.set_xlabel('Frequency(Hz)')
self.axes.set_ylabel('RMS(m/s^2)')
self.fig.subplots_adjust(left=0.15, bottom=0.2)
self.axes.grid()
self.fig.canvas.draw_idle()
now_time = datetime.datetime.now()
time_str = now_time.strftime('%Y-%m-%d')
strPath = os.getcwd()
path = strPath + '\\{0}\\{1}.png'.format(time_str,shortAddr)
#if os.path.exists(path):
self.fig.savefig(path)
def clear(self):
self.axes.cla()
self.axes.grid()
self.fig.canvas.draw_idle()
class DataNodeFrequencyTest(QMainWindow, Ui_DataNodeFrequencyTest):
def __init__(self):
super(DataNodeFrequencyTest, self).__init__()
self.setupUi(self)
self.timer = Timer() # 使用计时器
self.setWindowTitle("凯奥思无线传感器频响测试")
self.NFCRead_Btn.clicked.connect(self.NFCRead)
self.Test_Btn.clicked.connect(self.StartTest)
self.ConnecDev_Btn.clicked.connect(self.ConnectDev)
self.send_Btn.clicked.connect(self.send_variable)
self.stop_Btn.clicked.connect(self.stop_ask)
#self.Export_Btn.clicked.connect(self.Export2Excel)
self.timer_zig = QTimer(self)
self.timer_zig.timeout.connect(self.zig_update)
#self.timer_multimeter = QTimer(self)
#self.timer_multimeter.timeout.connect(self.multimeter_update)
self.timer_single = QTimer(self)
self.timer_nfc = QTimer(self)
self.Init()
self.vol = {}
def Init(self):
self.start = 0
self.Test = 0
self.Next = 0
self.TestOK = '0'
self.Fre = 0
self.Vol = '0'
self.AFGOKTime = 0
self.ShortAddr = ""
self.TestEnd = 0
self.start_time = 0
self.start_end = 0
# self.AFG3000 = AFG3000()
self.port_list = AFG.init()
ip = cfp.get("DB", "IP")
# for i in range(0, len(self.port_list)):
# self.Signal_Box.addItem(self.port_list[i])
# print(self.port_list)
#self.AUDIO_PATH = self.get_resources_path(os.path.join('res', 'Finish.mp3'))
#playsound(self.AUDIO_PATH)
self.port_list_multimeter = list(serial.tools.list_ports.comports())
for i in range(0, len(self.port_list_multimeter)):
self.multimeter_Box.addItem(self.port_list_multimeter[i].device)
self.sensorType_Box.addItem('mems无线传感器')
self.sensorType_Box.addItem('压电无线传感器')
thistime = time.strftime("%Y-%m-%d", time.localtime())
sql = "select count(*) from t_data_nfc"
data = db._Query(sql)
self.TotalCount.setText(str(data[0]['count(*)']))
sql = "select count(*) from t_data_nfc where TIME LIKE \'%{0}%\'".format(thistime)
data = db._Query(sql)
self.Count_Edit.setText(str(data[0]['count(*)']))
self.Signal_Edit.setText(cfp.get("Device", "deviceSignal"))
self.plot = MyFigureCanvas()
self.gridlayout = QGridLayout(self.groupBox_3) # 继承容器groupBox
self.toolbar = NavigationToolbar(self.plot, self)
self.gridlayout.addWidget(self.toolbar, 0, 0)
self.gridlayout.addWidget(self.plot, 1, 0)
self.show()
data = []
# sql = "select rmsValue_Z,speedValue_Z, Frequency from t_data_zigbee where shortAddr = '0109'"
# data = db._Query(sql)
# self.plot.plot(data, '0109')
self.plot.clear()
def get_resources_path(self,relative_path):
if getattr(sys, 'frozen', False):
base_path = sys._MEIPASS
else:
base_path = os.path.abspath('.')
return os.path.join(base_path, relative_path)
def Export2Excel(self):
root = tk.Tk()
root.withdraw()
Filepath = filedialog.askdirectory() # 获得选择好的文件
engine = create_engine('mysql+pymysql://root:root@192.168.1.75:3306/datanodedb')
sql = 'SELECT * FROM t_data_zigbee'
df_read = pd.read_sql_query(sql, engine)
Filepath = Filepath + '/data.xlsx'
print(Filepath)
df_read.to_excel(Filepath, index=False)
QMessageBox.warning(self, "标题", "导出成功", QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes)
def send_variable(self):
print("short_Addr " + self.ShortAddr)
write_data = "AA55AA" + self.ShortAddr + "1700"
byte_data = bytes.fromhex(write_data)
checksum = sum(byte_data) & 0xFF
checksum_byte = bytearray([checksum])
byte_data = byte_data + checksum_byte
print(byte_data)
self.zig_ser.write(byte_data)
def stop_ask(self):
print("short_Addr " + self.ShortAddr)
write_data = "AA55AA" + self.ShortAddr + "1800"
byte_data = bytes.fromhex(write_data)
checksum = sum(byte_data) & 0xFF
checksum_byte = bytearray([checksum])
byte_data = byte_data + checksum_byte
print(byte_data)
self.zig_ser.write(byte_data)
def ConnectDev(self):
if self.start != 0:
#self.Result_edit.clear()
self.start = 0
AFG.Close()
#self.Result_edit.append('信号发生器断开成功!')
#DMM.Close()
#self.Result_edit.append('万用表断开成功!')
self.nfc_ser.close()
#self.Result_edit.append('NFC模块断开成功')
self.zig_ser.close()
#self.Result_edit.append('ZigBee模块断开成功')
return None
elif self.start == 0:
self.ConnecDev_Btn.setEnabled(False)
self.NFCRead_Btn.setEnabled(False)
self.Test_Btn.setEnabled(False)
try:
# 连接信号发生器
QApplication.processEvents()
#self.Result_edit.clear()
#print(self.port_list[self.Signal_Box.currentIndex()])
# self.AFG3000.Open(self.port_list[self.Signal_Box.currentIndex()])
if self.sensorType_Box.currentText() == 'mems无线传感器':
sensorType = 0
self.Fredata = db._Query('select * from t_data_frequency WHERE sensorType = 0 order by Frequency ;')
elif self.sensorType_Box.currentText() == '压电无线传感器':
sensorType = 1
self.Fredata = db._Query('select * from t_data_frequency order by Frequency;')
self.tabWidget.setCurrentIndex(2)
self.NFCList = self.tabWidget.widget(2)
layoutNFC = QVBoxLayout()
layoutNFC.addWidget(QLabel('频点列表(Hz)'))
for i in range(0, len(self.Fredata)):
layoutNFC.addWidget(QLabel(str(self.Fredata[i]['Frequency'])))
print(self.Fredata[i]['Frequency'])
self.vol[int(self.Fredata[i]['Frequency'])] = cfp.get("Frequency", str(self.Fredata[i]['Frequency']))
layoutNFC.addStretch(1)
self.NFCList.setLayout(layoutNFC)
cfp.set("Device", "deviceSignal", self.Signal_Edit.text())
# cfp.set("Device", "deviceMultimeter", self.multimeter_Box.currentText())
signal = cfp.get("Device", "deviceSignal")
multimeter = cfp.get("Device", "deviceMultimeter")
# multimeter = 'USB0::0x2A8D::0xB318::MY62200139::0::INSTR'
AFG.Connect(signal)
self.TestStatus.setText('信号发生器联机成功!')
# self.Result_edit.append('信号发生器联机成功!')
# 连接万用表
# DMM = DMM_34401A(addr=22, port="COM8", baud=115200, ser_timeout=3, timeout=2)
# self.DMM = DMM_34401A(addr=22, port=self.port_list_multimeter[self.multimeter_Box.currentIndex()].name, \
# baud=115200, timeout=5)
# print(DMM.cmdPoll("DETector:BANDwidth?"))
# DMM.set_measure_voltage_ac()
# self.timer_multimeter.start(200) # 启动串口接收分析数据
AFG.ConnectMultimeter(multimeter)
AFG.SetAC()
# self.Result_edit.append('万用表联机成功!')
self.TestStatus.setText('万用表联机成功!')
nfc_port, zig_port = self.get_nfc_zig_port()
print(nfc_port, zig_port)
self.nfc_ser = serial.Serial(nfc_port, 19200, rtscts=False, dsrdtr=False)
self.zig_ser = serial.Serial(zig_port, 115200, rtscts=False, dsrdtr=False)
self.nfc_reader = Nfc_Reader(self.nfc_ser)
self.zig_reader = Zig_Reader(self.zig_ser)
self.timer_zig.start(1000) # 启动串口接收分析数据
self.nfc_reader.rfidm_c2_init()
#self.Result_edit.append('NFC、ZigBee模块联机成功')
self.TestStatus.setText('NFC、ZigBee模块联机成功')
self.ConnecDev_Btn.setEnabled(True)
self.ConnecDev_Btn.setText('断开')
self.NFCRead_Btn.setEnabled(True)
self.TestStatus.setText('联机成功!')
self.start = 1
thistime = time.strftime("%Y-%m-%d", time.localtime())
sql = "select count(*) from t_data_nfc where TIME LIKE \'%{0}%\'".format(thistime)
data = db._Query(sql)
self.Count_Edit.setText(str(data[0]['count(*)']))
if data[0]['count(*)'] < 10 :
self.encourage_label.setStyleSheet("font: 75 15pt \"微软雅黑\";\n"
"color: rgb(106, 168, 79);")
self.encourage_label.setText('今天是新的一天,加油!')
if data[0]['count(*)'] > 10 :
self.encourage_label.setStyleSheet("font: 75 15pt \"微软雅黑\";\n"
"color: rgb(255, 0, 0);")
self.encourage_label.setText('再接再厉!')
except Exception:
QMessageBox.warning(self, "标题", "连接失败", QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes)
self.ConnecDev_Btn.setEnabled(True)
self.start = 0
return -1
def StartTest(self):
# self.thread_01 = Thread_01() # 创建线程
# self.thread_01.start() # 开始线程
# return 0
try:
if self.Test != 0:
self.Test = 0
ret = ctypes.windll.kernel32.TerminateThread(self.thread_01.handle, 0)
AFG.SetAmpl(0.02)
AFG.SetFrequency('80')
AFG.Close()
self.Test_Btn.setText('开始测试')
# 停止计时并显示测试耗时
elapsed_time = self.timer.stop()
self.TestStatus.setText(f'测试结束,耗时:{elapsed_time:.2f}')
return 0
elif self.Test == 0:
self.Test = 1
#开始计时
self.timer.start()
#AFG.Open()
thistime = time.strftime("%Y-%m-%d", time.localtime())
sql = "select count(*) from t_data_nfc where TIME LIKE \'%{0}%\'".format(thistime)
data = db._Query(sql)
self.Count_Edit.setText(str(data[0]['count(*)']))
if data[0]['count(*)'] < 10:
self.encourage_label.setStyleSheet("font: 75 15pt \"微软雅黑\";\n"
"color: rgb(106, 168, 79);")
self.encourage_label.setText('今天是新的一天,加油!')
if data[0]['count(*)'] > 10:
self.encourage_label.setStyleSheet("font: 75 20pt \"微软雅黑\";\n"
"color: rgb(255, 0, 0);")
self.encourage_label.setText('再接再厉!')
self.TestStatus.setText('正在测试中……')
self.ConnecDev_Btn.setEnabled(False)
self.NFCRead_Btn.setEnabled(False)
self.Test_Btn.setText('停止测试')
self.plot.clear()
self.thread_01 = Thread_01(self.Fredata,self.vol) # 创建线程
self.thread_01.trigger.connect(self.deal)
self.thread_01.Voltage.connect(self.VoltageFun)
self.thread_01.TestOK.connect(self.TestOKFun)
self.thread_01.AmplSig.connect(self.AmplFun)
self.thread_01.ThreadStop.connect(self.ThreadStop)
self.thread_01.TimeOut.connect(self.TimeOut)
self.thread_01.start() # 开始线程
return 0
except Exception as e:
QMessageBox.warning(self, "标题", "测试异常", QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes)
return -1
# self.AFG3000.SetFrequency('8k')
# self.AFG3000.query('FREQUENCY?');
def AmplFun(self,updateampl):
updateampl = float(updateampl)*1000
self.Ampl_Edit.setText(str(updateampl))
def deal(self, updatestr):
self.Frequency_Edit.setText(updatestr)
def VoltageFun(self, updateVoltage):
self.VOLTage_Edit.setText(updateVoltage)
def ThreadStop(self):
self.TestEnd = 1
self.Test = 0
self.Test_Btn.setText('开始测试')
# 停止计时并显示测试耗时
elapsed_time = self.timer.stop()
self.TestStatus.setText(f'测试结束,耗时:{elapsed_time:.2f}')
self.NFCRead_Btn.setEnabled(True)
self.Ampl_Edit.setText('20')
self.Frequency_Edit.setText('80')
sql = "select rmsValue_Z,speedValue_Z, Frequency from t_data_zigbee where shortAddr = '{0}' and rmsValue_Z is not null order by Frequency".format(
self.ShortAddr)
data = db._Query(sql)
self.plot.plot(data,self.ShortAddr)
self.Test_Btn.setEnabled(False)
thistime = time.strftime("%Y-%m-%d", time.localtime())
sql = "select count(*) from t_data_nfc"
data = db._Query(sql)
self.TotalCount.setText(str(data[0]['count(*)']))
sql = "select count(*) from t_data_nfc where TIME LIKE \'%{0}%\'".format(thistime)
data = db._Query(sql)
self.Count_Edit.setText(str(data[0]['count(*)']))
if data[0]['count(*)'] > 10:
self.encourage_label.setStyleSheet("font: 75 15pt \"微软雅黑\";\n"
"color: rgb(255, 0, 0);")
self.encourage_label.setText('再接再厉!')
elif data[0]['count(*)'] > 50 & data[0]['count(*)'] < 100:
self.encourage_label.setStyleSheet("font: 75 20pt \"微软雅黑\";\n"
"color: rgb(255, 0, 0);")
self.encourage_label.setText('GOOD')
elif data[0]['count(*)'] > 100:
self.encourage_label.setStyleSheet("font: 75 30pt \"微软雅黑\";\n"
"color: rgb(255, 0, 0);")
self.encourage_label.setText('GREAT')
def TimeOut(self):
self.encourage_label.setText('超时了')
# self.Test = 0
# ret = ctypes.windll.kernel32.TerminateThread(self.thread_01.handle, 0)
#
# AFG.SetAmpl(0.02)
# AFG.SetFrequency('80')
# AFG.Close()
# self.Test_Btn.setText('开始测试')
# self.TestStatus.setText('测试结束')
# QMessageBox.warning(self, "标题", "ZigBee接收超时请重新测试", QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes)
def TestOKFun(self, updateTest):
print(updateTest)
self.TestOK = updateTest[0]
print(self.TestOK)
self.Fre = updateTest[1]
self.Vol = updateTest[2]
self.AFGOKTime = updateTest[3]
def get_nfc_zig_port(self):
serial_ports = self.get_serial_list()
for port in serial_ports:
port = list(port)
print(port)
if NFC_PORT_STR in port[1]:
nfc_port = port[0]
elif ZIG_PORT_STR in port[1]:
zig_port = port[0]
return (nfc_port, zig_port)
def get_serial_list(self):
serial_ports = serial.tools.list_ports.comports()
serial_list = []
for port in serial_ports:
serial_list.append(list(port))
return serial_list
#def multimeter_update(self):
#VOLTage = self.DMM.measure()
#print(VOLTage)
#self.VOLTage_Edit.setText(VOLTage)
def zig_update(self):
msg = ""
zig_info = self.zig_reader.read()
# 将解析的设备信息和特征值输出到界面上
if zig_info == SENSER_INF_PACK2:
for k, i in self.zig_reader.dev_info_msg.items():
msg = msg + k + ":" + str(i) + "\n"
for k, i in self.zig_reader.dev_conf_msg.items():
msg = msg + k + ":" + str(i) + "\n"
self.zig_conf_recv_flag = True
elif (zig_info == EVEL_DATA_PACK1) | (zig_info == EVEL_DATA_PACK2):
for k, i in self.zig_reader.dev_evel_msg.items():
# print(k, " :", i)
if isinstance(i, float):
msg = msg + k + ":" + format(i, ".4f") + "\n"
else:
msg = msg + k + ":" + str(i) + "\n"
elif zig_info == SENSER_INF_PACK1:
self.ShortAddr = bytes(self.zig_reader.senser_ask.msg_addr).hex()
#msg = "传感器: " + str(self.ShortAddr) + "\n测试结果 :" + self.TestOK
#self.ZigBee_Edit.setText(msg)
print("testOK " + self.TestOK)
if int(self.TestOK) == 1:
time.sleep(3)
print("short_Addr " + self.ShortAddr)
write_data = "AA55AA" + self.ShortAddr + "1700"
byte_data = bytes.fromhex(write_data)
checksum = sum(byte_data) & 0xFF
checksum_byte = bytearray([checksum])
byte_data = byte_data + checksum_byte
print(byte_data)
self.zig_ser.write(byte_data)
self.start_time = time.time()
if self.TestEnd == 1:
write_data = "AA55AA" + self.ShortAddr + "1800"
byte_data = bytes.fromhex(write_data)
checksum = sum(byte_data) & 0xFF
checksum_byte = bytearray([checksum])
byte_data = byte_data + checksum_byte
print(byte_data)
self.zig_ser.write(byte_data)
self.TestEnd = 0
elif zig_info == 0x19:
self.start_time2 = time.time()
self.TestOK = '2'
payload = (self.zig_reader.zigpack.payload[0] << 8) | self.zig_reader.zigpack.payload[1] # 大端模式合并
payload2 = (self.zig_reader.zigpack.payload2[0] << 8) | self.zig_reader.zigpack.payload2[1] # 大端模式合并
self.acc_z_rms = self.zig_reader.evel_cal(payload)
self.acc_z_pk = self.zig_reader.evel_cal(payload2)
print(self.acc_z_rms)
print(self.acc_z_pk)
print(self.start_time - self.start_time2)
msg = "传感器: " + str(self.ShortAddr) + "\n测试结果 :" + self.TestOK + "\n有效值:" + str(self.acc_z_rms) + "\n峰值:" + str(self.acc_z_pk)
self.ZigBee_Edit.setText(msg)
# zig_recv_file_name = self.zig_reader.dev_evel_msg['采集时间'][:10]+".txt"
# with open(zig_recv_file_name, "a") as zig_recv_file:
# zig_recv_file.write(msg)
# zig_recv_file.write(str(self.Fre))
# zig_recv_file.write(self.Vol)
# zig_recv_file.write("\n\n")
global Stop
try:
#print("update zig:", self.TestOK, len(msg), self.ShortAddr, self.zig_reader.dev_evel_msg['传感器地址'])
# if (int(self.TestOK) == 1) and (len(msg) > 0) and (self.ShortAddr == self.zig_reader.dev_evel_msg['传感器地址']):
# self.Next = 1
# self.TestOK = '0'
# print("OK1")
# self.encourage_label.setStyleSheet("font: 75 15pt \"微软雅黑\";\n"
# "color: rgb(0, 0, 0);")
# self.encourage_label.setText('第一次收到Zigbee信息')
# return
if (int(self.TestOK) == 2) and zig_info == 0x19 and self.nfc_reader.dev_conf_msg['Zigbee 本地地址'] == self.ShortAddr:
print("OK2")
Stop = 1
self.Next = 0
sql = "select count(*) from t_data_zigbee where shortAddr = '{0}' and Frequency = '{1}';".format(
self.ShortAddr,self.Fre)
data = db._Query(sql)
print(data[0]['count(*)'])
if data[0]['count(*)'] > 0:
sql = "update t_data_zigbee set sampleTime = '{0}',deviceTem = '',\
environmentTem = '',angle = '',batteryV = '',\
rmsPK_X = '',rmsValue_X = '',speedValue_X = '',speedValue_X = '',\
rmsPK_Y = '',rmsValue_Y = '',speedPK_Y = '',speedValue_Y = '',\
rmsPK_Z = '{1}',rmsValue_Z = '{2}',speedPK_Z = '',speedValue_Z = '',\
VOLTage = '' \
where shortAddr = '{3}' and Frequency = {4};".format(self.zig_reader.get_date_time(),
self.acc_z_pk,
self.acc_z_rms,
self.ShortAddr,self.Fre)
print(sql)
data = db._Operate(sql)
else:
sql = "insert into t_data_zigbee(shortAddr,sampleTime,deviceTem,environmentTem,angle,batteryV,\
rmsPK_X,rmsValue_X,speedPK_X,speedValue_X,rmsPK_Y,rmsValue_Y,speedPK_Y,speedValue_Y,rmsPK_Z,\
rmsValue_Z,speedPK_Z,speedValue_Z,Frequency,VOLTage) values('{0}','{1}','','','','',\
'','','','','','','','','{2}','{3}','','',{4},''\
)".format(self.ShortAddr,self.zig_reader.get_date_time(),
self.acc_z_pk,
self.acc_z_rms,
self.Fre)
#print(sql)
data = db._Operate(sql)
self.TestOK = '0'
self.start_time3 = time.time()
print("消耗时间:" + str(self.start_time3 - self.start_time))
except Exception as e:
print(e)
QMessageBox.warning(self, "标题", "数据存储异常", QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes)
return -1
def nfc_update(self):
msg = ''
for k, i in self.nfc_reader.dev_info_msg.items():
msg = msg + k + ":" + str(i) + "\n"
for k, i in self.nfc_reader.dev_conf_msg.items():
msg = msg + k + ":" + str(i) + "\n"
thistime = time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
sql = "select count(*) from t_data_nfc where shortAddr = '{0}';".format(
self.nfc_reader.dev_conf_msg['Zigbee 本地地址'])
self.ShortAddr = self.nfc_reader.dev_conf_msg['Zigbee 本地地址']
data = db._Query(sql)
print(data[0]['count(*)'])
if data[0]['count(*)'] > 0:
sql = "update t_data_nfc set dataNodeName = '{0}',dataNodeNo = '{1}',\
hardWareVersion = '{2}',softVersion = '{3}',productNo = '{4}',\
productSN = '{5}',productType = '{6}',RSSI = '{7}',\
firstPowerTime = '{8}',TIME = '{9}',batteryV = '{10}'\
where shortAddr = '{11}';".format(self.nfc_reader.dev_info_msg['测点名称'],
self.nfc_reader.dev_info_msg['MAC地址'], \
self.nfc_reader.dev_info_msg['硬件版本'],
self.nfc_reader.dev_info_msg['软件版本'], \
self.nfc_reader.dev_info_msg['生产批号'],
self.nfc_reader.dev_info_msg['生产序列号'], \
self.nfc_reader.dev_info_msg['产品型号'],
self.nfc_reader.dev_info_msg['无线信号强度'], \
self.nfc_reader.dev_info_msg['首次上电日期'], thistime, \
self.nfc_reader.dev_info_msg['电池电压'],
self.nfc_reader.dev_conf_msg['Zigbee 本地地址'])
#print(sql)
data = db._Operate(sql);
else:
sql = "insert into t_data_nfc(dataNodeName,dataNodeNo,shortAddr,hardWareVersion,softVersion,productNo,\
productType,RSSI,productSN,firstPowerTime,batteryV,TIME) values('{0}','{1}','{2}','{3}','{4}','{5}',\
'{6}','{7}','{8}','{9}','{10}','{11}')".format(self.nfc_reader.dev_info_msg['测点名称'],
self.nfc_reader.dev_info_msg['MAC地址'], \
self.nfc_reader.dev_conf_msg['Zigbee 本地地址'],
self.nfc_reader.dev_info_msg['硬件版本'], \
self.nfc_reader.dev_info_msg['软件版本'],
self.nfc_reader.dev_info_msg['生产批号'], \
self.nfc_reader.dev_info_msg['产品型号'],
self.nfc_reader.dev_info_msg['无线信号强度'], \
self.nfc_reader.dev_info_msg['生产序列号'],
self.nfc_reader.dev_info_msg['首次上电日期'], \
self.nfc_reader.dev_info_msg['电池电压'], thistime)
#print(sql)
data = db._Operate(sql)
self.NFC_Edit.setText(msg)
self.tabWidget.setCurrentIndex(1)
def NFCRead(self):
try:
self.Test_Btn.setEnabled(True)
#self.Result_edit.clear()
self.tabWidget.setCurrentIndex(0)
QApplication.processEvents()
self.nfc_reader.get_uuid()
if self.nfc_reader.nfc_uuid:
#self.Result_edit.append("DN101 NFC UUID" + hex(self.nfc_reader.nfc_uuid)[2:])
time.sleep(0.5)
info = self.nfc_reader.read_info()
conf = self.nfc_reader.read_conf()
if info & conf:
self.nfc_reader.dev_info_analysis()
self.nfc_reader.dev_conf_analysis()
self.nfc_update()
self.TestStatus.setText('NFC读取成功')
#self.Result_edit.append("DN101 NFC读取成功")
print("addr " + self.nfc_reader.dev_conf_msg['Zigbee 本地地址'])
short_Addr = self.nfc_reader.dev_conf_msg['Zigbee 本地地址']
write_data = "DEDFEFD2" + short_Addr
byte_data = bytes.fromhex(write_data)
self.zig_ser.write(byte_data)
# 打印结果
print(byte_data.hex()) # 输出: dedeefd25223 (16进制字符串格式)
return info
else:
#self.Result_edit.append("NFC读取失败")
return False
else:
#self.Result_edit.append("NFC读取失败")
return False
except Exception:
QMessageBox.warning(self, "标题", "测试异常", QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes)
return -1
if __name__ == '__main__':
app = QApplication(sys.argv)
MainWindow = QMainWindow()
ui = Ui_DataNodeFrequencyTest()
ui.setupUi(MainWindow)
View = DataNodeFrequencyTest()
View.show()
sys.exit(app.exec_())