DESKTOP-2QGM7IR\Lab01 5de3c3cc90 第一次提交
2025-04-09 09:49:42 +08:00

703 lines
30 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/python3
# -*- coding: utf-8 -*-
import sys
from MainWindow import Ui_DataNodeFrequencyTest
from PyQt5.QtWidgets import QApplication, QMainWindow, QMessageBox
from AFG3000 import *
from Prologix import *
from DN101_NFC import *
from DN101_ZIGBEE import *
from Mysql import *
import serial, threading
import serial.tools.list_ports
from PyQt5.QtCore import QTimer, QThread, pyqtSignal
from PyQt5.QtGui import QIcon
from PyQt5.QtWidgets import *
import numpy as np
import matplotlib
import time
from sqlalchemy import create_engine
import pandas as pd
import tkinter as tk
from tkinter import filedialog
from playsound import playsound
matplotlib.use("Qt5Agg")
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.figure import Figure
import matplotlib.pyplot as plt
plt.rcParams['font.sans-serif'] = ['SimHei'] # 指定默认字体
plt.rcParams['axes.unicode_minus'] = False # 解决保存图像是负号'-'显示为方块的问题
from matplotlib.backends.backend_qt5agg import NavigationToolbar2QT as NavigationToolbar
import win32con
from win32process import SuspendThread, ResumeThread
import ctypes
SENSER_INF_PACK1 = 0x01
SENSER_INF_PACK2 = 0x07
EVEL_DATA_PACK1 = 0x02
EVEL_DATA_PACK2 = 0x06
WAVE_X_PACK = 0x03
WAVE_Y_PACK = 0x04
WAVE_Z_PACK = 0x05
NFC_PORT_STR = 'Silicon Labs Dual CP210x USB to UART Bridge: Enhanced COM Port'
ZIG_PORT_STR = 'Silicon Labs Dual CP210x USB to UART Bridge: Standard COM Port'
AFG = AFG3000()
DMM = DMM_34401A(addr=22, port="COM13", baud=115200, ser_timeout=3, timeout=2)
db = Mysql()
Stop = 0
class Thread_01(QThread): # 线程1
trigger = pyqtSignal(str)
Voltage = pyqtSignal(str)
AmplSig = pyqtSignal(str)
TestOK = pyqtSignal(list)
ThreadStop = pyqtSignal()
TimeOut = pyqtSignal()
def __init__(self, Fredata):
super().__init__()
self.Fredata = Fredata
handle = -1
def run(self):
try:
self.handle = ctypes.windll.kernel32.OpenThread(
win32con.PROCESS_ALL_ACCESS, False, int(QThread.currentThreadId()))
except Exception as e:
print('get thread handle failed', e)
global StopThread
for i in range(0, len(self.Fredata)):
print(self.Fredata[i]['Frequency'])
ampl = 0.02
ampl_ins = 0.025
AFG.SetAmpl(ampl)
AFG.SetFrequency(str(self.Fredata[i]['Frequency']))
self.trigger.emit(str(self.Fredata[i]['Frequency']))
Voltage = str(DMM.measure())
self.Voltage.emit(Voltage)
retAmpl = AFG.query('VOLTAGE:AMPLITUDE?')
self.AmplSig.emit(str(retAmpl))
while (ampl < 1.3)&(float(self.Fredata[i]['Frequency'])<=160):
ampl += 0.05
AFG.SetAmpl(ampl)
time.sleep(0.1)
while (ampl < 1.6) & (float(self.Fredata[i]['Frequency']) > 160)& (float(self.Fredata[i]['Frequency']) <= 320):
ampl += 0.05
AFG.SetAmpl(ampl)
time.sleep(0.1)
while (ampl < 1.7) & (float(self.Fredata[i]['Frequency']) > 320)& (float(self.Fredata[i]['Frequency']) <= 640):
ampl += 0.05
AFG.SetAmpl(ampl)
time.sleep(0.1)
while (ampl < 1.7) & (float(self.Fredata[i]['Frequency']) > 640)& (float(self.Fredata[i]['Frequency']) <= 1000):
ampl += 0.05
AFG.SetAmpl(ampl)
time.sleep(0.1)
while (ampl < 1.2) & (float(self.Fredata[i]['Frequency']) > 1000)& (float(self.Fredata[i]['Frequency']) <= 2000):
ampl += 0.05
AFG.SetAmpl(ampl)
time.sleep(0.1)
while (ampl < 0.5) & (float(self.Fredata[i]['Frequency']) > 2000)& (float(self.Fredata[i]['Frequency']) <= 3000):
ampl += 0.05
AFG.SetAmpl(ampl)
time.sleep(0.1)
ampl_ins = 0.01
while (ampl < 0.05) & (float(self.Fredata[i]['Frequency']) > 3000)& (float(self.Fredata[i]['Frequency']) <= 4000):
ampl += 0.02
AFG.SetAmpl(ampl)
time.sleep(0.1)
ampl_ins = 0.01
while (ampl < 0.2) & (float(self.Fredata[i]['Frequency']) > 4000)& (float(self.Fredata[i]['Frequency']) <= 5000):
ampl += 0.05
AFG.SetAmpl(ampl)
time.sleep(0.1)
while (ampl < 0.4) & (float(self.Fredata[i]['Frequency']) > 5000)& (float(self.Fredata[i]['Frequency']) <= 7000):
ampl += 0.05
AFG.SetAmpl(ampl)
time.sleep(0.1)
try:
if(float(self.Fredata[i]['Frequency']) >= 1000):
DMM.set_measure_voltage_ac("AUTO", "200")
else:
DMM.set_measure_voltage_ac("AUTO", "20")
except:
print("DMM Set Error")
while float(Voltage) < 100:#调整电压到100mv
# if (float(self.Fredata[i]['Frequency']) >= 1000):
# Voltage = str(DMM.measure())
# ampl += 0.01
# self.Voltage.emit(Voltage)
# retAmpl = AFG.query('VOLTAGE:AMPLITUDE?')
# self.AmplSig.emit(str(retAmpl))
# AFG.SetAmpl(ampl)
# time.sleep(0.05)
# else:
Voltage = str(DMM.measure())
if float(Voltage) < 95:
ampl += ampl_ins
if (float(Voltage) > 95) and (float(Voltage) < 100):
ampl += 0.01
self.Voltage.emit(Voltage)
retAmpl = AFG.query('VOLTAGE:AMPLITUDE?')
self.AmplSig.emit(str(retAmpl))
AFG.SetAmpl(ampl)
if float(Voltage) > 100:
a = []
a.append('1')
a.append(self.Fredata[i]['Frequency'])
a.append(Voltage)
self.TestOK.emit(a)
timeCount = 0
while timeCount < 50:
global Stop
print(timeCount)
if Stop == 1:
time.sleep(1)
Stop = 0
break
timeCount += 1
time.sleep(1)
print(Voltage)
retAmpl = AFG.query('VOLTAGE:AMPLITUDE?')
#print(retAmpl)
while float(retAmpl) > 0.07:#微调ampl到最低值20mv
retAmpl -= 0.05
#print(retAmpl)
AFG.SetAmpl(retAmpl)
retAmpl = AFG.query('VOLTAGE:AMPLITUDE?')
time.sleep(0.05)
print(retAmpl)
time.sleep(1)
ampl = 0.02
AFG.SetAmpl(ampl)
AFG.SetFrequency('80')
AFG.Close()
self.ThreadStop.emit()
class MyFigureCanvas(FigureCanvas):
def __init__(self):
self.fig = Figure()
FigureCanvas.__init__(self, self.fig)
self.axes = self.fig.add_subplot(111)
#self.axes.set_ylim(ymin=0, ymax=30)
def plot(self, data,shortAddr):
self.axes.cla()
x = []
y1 = []
y2 = []
data1 = float(data[0]['rmsValue_Z']) * 1.41
for i in range(len(data)):
y1.append(float(data[i]['rmsValue_Z']))
x.append(data[i]['Frequency'])
y2.append(data1)
self.axes.plot(x, y1)
data1 = float(data[0]['rmsValue_Z']) * 1.41
self.axes.plot(x, y2)
self.axes.set_title('传感器短地址' + shortAddr)
self.axes.set_xlabel('Frequency(Hz)')
self.axes.set_ylabel('RMS(m/s^2)')
self.fig.subplots_adjust(left=0.15, bottom=0.2)
self.axes.grid()
self.fig.canvas.draw_idle()
path = 'G:/chaos/频响测试 - 副本/20230403/{0}.png'.format(shortAddr)
self.fig.savefig(path)
def clear(self):
self.axes.cla()
self.axes.grid()
self.fig.canvas.draw_idle()
class DataNodeFrequencyTest(QMainWindow, Ui_DataNodeFrequencyTest):
def __init__(self):
super(DataNodeFrequencyTest, self).__init__()
self.setupUi(self)
self.setWindowTitle("无线传感器频响测试")
self.NFCRead_Btn.clicked.connect(self.NFCRead)
self.Test_Btn.clicked.connect(self.StartTest)
self.ConnecDev_Btn.clicked.connect(self.ConnectDev)
self.Export_Btn.clicked.connect(self.Export2Excel)
self.timer_zig = QTimer(self)
self.timer_zig.timeout.connect(self.zig_update)
#self.timer_multimeter = QTimer(self)
#self.timer_multimeter.timeout.connect(self.multimeter_update)
self.timer_single = QTimer(self)
self.timer_nfc = QTimer(self)
self.Init()
def Init(self):
self.start = 0
self.Test = 0
self.Next = 0
self.TestOK = '0'
self.Fre = 0
self.Vol = '0'
# self.AFG3000 = AFG3000()
self.port_list = AFG.init()
for i in range(0, len(self.port_list)):
self.Signal_Box.addItem(self.port_list[i])
print(self.port_list)
self.AUDIO_PATH = self.get_resources_path(os.path.join('res', 'Finish.mp3'))
#playsound(self.AUDIO_PATH)
self.port_list_multimeter = list(serial.tools.list_ports.comports())
for i in range(0, len(self.port_list_multimeter)):
self.multimeter_Box.addItem(self.port_list_multimeter[i].description)
self.Fredata = db._Query('select * from t_data_frequency order by Frequency;');
thistime = time.strftime("%Y-%m-%d", time.localtime())
sql = "select count(*) from t_data_nfc"
data = db._Query(sql)
self.TotalCount.setText(str(data[0]['count(*)']))
sql = "select count(*) from t_data_nfc where TIME LIKE \'%{0}%\'".format(thistime)
data = db._Query(sql)
self.Count_Edit.setText(str(data[0]['count(*)']))
self.tabWidget.setCurrentIndex(2)
self.NFCList = self.tabWidget.widget(2)
layoutNFC = QVBoxLayout()
layoutNFC.addWidget(QLabel('频点列表(Hz)'))
for i in range(0, len(self.Fredata)):
layoutNFC.addWidget(QLabel(str(self.Fredata[i]['Frequency'])))
layoutNFC.addStretch(1)
self.NFCList.setLayout(layoutNFC)
self.plot = MyFigureCanvas()
self.gridlayout = QGridLayout(self.groupBox_3) # 继承容器groupBox
self.toolbar = NavigationToolbar(self.plot, self)
self.gridlayout.addWidget(self.toolbar, 0, 0)
self.gridlayout.addWidget(self.plot, 1, 0)
self.show()
data = []
# sql = "select rmsValue_Z,speedValue_Z, Frequency from t_data_zigbee where shortAddr = '450'"
# data = db._Query(sql)
# self.plot.plot(data, '450')
self.plot.clear()
def get_resources_path(self,relative_path):
if getattr(sys, 'frozen', False):
base_path = sys._MEIPASS
else:
base_path = os.path.abspath('.')
return os.path.join(base_path, relative_path)
def Export2Excel(self):
root = tk.Tk()
root.withdraw()
Filepath = filedialog.askdirectory() # 获得选择好的文件
engine = create_engine('mysql+pymysql://root:root@192.168.1.75:3306/datanodedb')
sql = 'SELECT * FROM t_data_zigbee'
df_read = pd.read_sql_query(sql, engine)
Filepath = Filepath + '/data.xlsx'
print(Filepath)
df_read.to_excel(Filepath, index=False)
QMessageBox.warning(self, "标题", "导出成功", QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes)
def ConnectDev(self):
if self.start != 0:
self.Result_edit.clear()
self.start = 0
AFG.Close()
self.Result_edit.append('信号发生器断开成功!')
DMM.Close()
self.Result_edit.append('万用表断开成功!')
self.nfc_ser.close()
self.Result_edit.append('NFC模块断开成功')
self.zig_ser.close()
self.Result_edit.append('ZigBee模块断开成功')
return None
elif self.start == 0:
self.ConnecDev_Btn.setEnabled(False)
self.NFCRead_Btn.setEnabled(False)
self.Test_Btn.setEnabled(False)
try:
# 连接信号发生器
QApplication.processEvents()
self.Result_edit.clear()
#print(self.port_list[self.Signal_Box.currentIndex()])
# self.AFG3000.Open(self.port_list[self.Signal_Box.currentIndex()])
AFG.Connect('TCPIP0::192.168.1.140::inst0::INSTR')
self.Result_edit.append('信号发生器联机成功!')
# 连接万用表
# DMM = DMM_34401A(addr=22, port="COM8", baud=115200, ser_timeout=3, timeout=2)
# self.DMM = DMM_34401A(addr=22, port=self.port_list_multimeter[self.multimeter_Box.currentIndex()].name, \
# baud=115200, timeout=5)
print(DMM.cmdPoll("DETector:BANDwidth?"))
DMM.set_measure_voltage_ac()
# self.timer_multimeter.start(200) # 启动串口接收分析数据
self.Result_edit.append('万用表联机成功!')
nfc_port, zig_port = self.get_nfc_zig_port()
print(nfc_port, zig_port)
self.nfc_ser = serial.Serial(nfc_port, 19200, rtscts=False, dsrdtr=False)
self.zig_ser = serial.Serial(zig_port, 115200, rtscts=False, dsrdtr=False)
self.nfc_reader = Nfc_Reader(self.nfc_ser)
self.zig_reader = Zig_Reader(self.zig_ser)
self.timer_zig.start(100) # 启动串口接收分析数据
self.nfc_reader.rfidm_c2_init()
self.Result_edit.append('NFC、ZigBee模块联机成功')
self.ConnecDev_Btn.setEnabled(True)
self.ConnecDev_Btn.setText('断开')
self.NFCRead_Btn.setEnabled(True)
self.start = 1
except Exception as e:
print(e)
QMessageBox.warning(self, "标题", "连接失败", QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes)
self.ConnecDev_Btn.setEnabled(True)
self.start = 0
return -1
def StartTest(self):
# self.thread_01 = Thread_01() # 创建线程
# self.thread_01.start() # 开始线程
# return 0
try:
if self.Test != 0:
self.Test = 0
ret = ctypes.windll.kernel32.TerminateThread(self.thread_01.handle, 0)
AFG.SetAmpl(0.02)
AFG.SetFrequency('80')
AFG.Close()
self.Test_Btn.setText('开始测试')
self.TestStatus.setText('测试结束')
return 0
elif self.Test == 0:
self.Test = 1
AFG.Open()
self.TestStatus.setText('正在测试中……')
self.ConnecDev_Btn.setEnabled(False)
self.NFCRead_Btn.setEnabled(False)
self.Test_Btn.setText('停止测试')
self.plot.clear()
self.thread_01 = Thread_01(self.Fredata) # 创建线程
self.thread_01.trigger.connect(self.deal)
self.thread_01.Voltage.connect(self.VoltageFun)
self.thread_01.TestOK.connect(self.TestOKFun)
self.thread_01.AmplSig.connect(self.AmplFun)
self.thread_01.ThreadStop.connect(self.ThreadStop)
self.thread_01.TimeOut.connect(self.TimeOut)
self.thread_01.start() # 开始线程
return 0
except Exception:
QMessageBox.warning(self, "标题", "测试异常", QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes)
return -1
# self.AFG3000.SetFrequency('8k')
# self.AFG3000.query('FREQUENCY?');
def AmplFun(self,updateampl):
updateampl = float(updateampl)*1000
self.Ampl_Edit.setText(str(updateampl))
def deal(self, updatestr):
self.Frequency_Edit.setText(updatestr)
def VoltageFun(self, updateVoltage):
self.VOLTage_Edit.setText(updateVoltage)
def ThreadStop(self):
self.Test = 0
self.TestStatus.setText('测试结束')
self.Test_Btn.setText('开始测试')
self.NFCRead_Btn.setEnabled(True)
self.Ampl_Edit.setText('20')
self.Frequency_Edit.setText('80')
sql = "select rmsValue_Z,speedValue_Z, Frequency from t_data_zigbee where shortAddr = '{0}' order by Frequency".format(
self.ShortAddr)
data = db._Query(sql)
self.plot.plot(data,self.ShortAddr)
self.Test_Btn.setEnabled(False)
thistime = time.strftime("%Y-%m-%d", time.localtime())
sql = "select count(*) from t_data_nfc"
data = db._Query(sql)
self.TotalCount.setText(str(data[0]['count(*)']))
sql = "select count(*) from t_data_nfc where TIME LIKE \'%{0}%\'".format(thistime)
data = db._Query(sql)
self.Count_Edit.setText(str(data[0]['count(*)']))
#playsound(self.AUDIO_PATH)
def TimeOut(self):
self.Test = 0
ret = ctypes.windll.kernel32.TerminateThread(self.thread_01.handle, 0)
AFG.SetAmpl(0.02)
AFG.SetFrequency('80')
AFG.Close();
self.Test_Btn.setText('开始测试')
self.TestStatus.setText('测试结束')
QMessageBox.warning(self, "标题", "ZigBee接收超时请重新测试", QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes)
def TestOKFun(self, updateTest):
print(updateTest)
self.TestOK = updateTest[0]
print(self.TestOK)
self.Fre = updateTest[1]
self.Vol = updateTest[2]
def get_nfc_zig_port(self):
serial_ports = self.get_serial_list()
for port in serial_ports:
port = list(port)
print(port)
if NFC_PORT_STR in port[1]:
nfc_port = port[0]
elif ZIG_PORT_STR in port[1]:
zig_port = port[0]
return (nfc_port, zig_port)
def get_serial_list(self):
serial_ports = serial.tools.list_ports.comports()
serial_list = []
for port in serial_ports:
serial_list.append(list(port))
return serial_list
#def multimeter_update(self):
#VOLTage = self.DMM.measure()
#print(VOLTage)
#self.VOLTage_Edit.setText(VOLTage)
def zig_update(self):
msg = ""
zig_info = self.zig_reader.read()
# 将解析的设备信息和特征值输出到界面上
if zig_info == SENSER_INF_PACK2:
for k, i in self.zig_reader.dev_info_msg.items():
msg = msg + k + ":" + str(i) + "\n"
for k, i in self.zig_reader.dev_conf_msg.items():
msg = msg + k + ":" + str(i) + "\n"
self.zig_conf_recv_flag = True
elif (zig_info == EVEL_DATA_PACK1) | (zig_info == EVEL_DATA_PACK2):
for k, i in self.zig_reader.dev_evel_msg.items():
# print(k, " :", i)
if isinstance(i, float):
msg = msg + k + ":" + format(i, ".4f") + "\n"
else:
msg = msg + k + ":" + str(i) + "\n"
if zig_info:
self.ZigBee_textEdit.setText(msg)
zig_recv_file_name = self.zig_reader.dev_evel_msg['采集时间'][:10]+".txt"
with open(zig_recv_file_name, "a") as zig_recv_file:
zig_recv_file.write(msg)
zig_recv_file.write(str(self.Fre))
zig_recv_file.write(self.Vol)
zig_recv_file.write("\n\n")
global Stop
try:
if int(self.TestOK) == 1 and (len(msg) > 0) and self.ShortAddr == self.zig_reader.dev_evel_msg['传感器地址']:
self.Next = 1
self.TestOK = '0'
print("OK1")
return
if (int(self.Next) == 1) and (len(msg) > 0) and self.ShortAddr == self.zig_reader.dev_evel_msg['传感器地址']:
print("OK2")
Stop = 1
self.Next = 0
sql = "select count(*) from t_data_zigbee where shortAddr = '{0}' and Frequency = '{1}';".format(
self.zig_reader.dev_evel_msg['传感器地址'],self.Fre)
data = db._Query(sql)
print(data[0]['count(*)'])
if data[0]['count(*)'] > 0:
sql = "update t_data_zigbee set sampleTime = '{0}',deviceTem = '{1}',\
environmentTem = '{2}',angle = '{3}',batteryV = '{4}',\
rmsPK_X = '{5}',rmsValue_X = '{6}',speedValue_X = '{7}',speedValue_X = '{8}',\
rmsPK_Y = '{9}',rmsValue_Y = '{10}',speedPK_Y = '{11}',speedValue_Y = '{12}',\
rmsPK_Z = '{13}',rmsValue_Z = '{14}',speedPK_Z = '{15}',speedValue_Z = '{16}',\
VOLTage = '{17}' \
where shortAddr = '{18}' and Frequency = '{19}';".format(self.zig_reader.dev_evel_msg['采集时间'],
self.zig_reader.dev_evel_msg['设备温度'],
self.zig_reader.dev_evel_msg['环境温度'],
self.zig_reader.dev_evel_msg['倾角'],
self.zig_reader.dev_evel_msg['电池电压'],
self.zig_reader.dev_evel_msg['X轴峰值'],
self.zig_reader.dev_evel_msg['X轴有效值'],
self.zig_reader.dev_evel_msg['X轴速度峰值'],
self.zig_reader.dev_evel_msg['X轴速度有效值'],
self.zig_reader.dev_evel_msg['Y轴峰值'],
self.zig_reader.dev_evel_msg['Y轴有效值'],
self.zig_reader.dev_evel_msg['Y轴速度峰值'],
self.zig_reader.dev_evel_msg['Y轴速度有效值'],
self.zig_reader.dev_evel_msg['Z轴峰值'],
self.zig_reader.dev_evel_msg['Z轴有效值'],
self.zig_reader.dev_evel_msg['Z轴速度峰值'],
self.zig_reader.dev_evel_msg['Z轴速度有效值'],self.Vol,
self.zig_reader.dev_evel_msg['传感器地址'],self.Fre)
print(sql)
data = db._Operate(sql)
else:
sql = "insert into t_data_zigbee(shortAddr,sampleTime,deviceTem,environmentTem,angle,batteryV,\
rmsPK_X,rmsValue_X,speedPK_X,speedValue_X,rmsPK_Y,rmsValue_Y,speedPK_Y,speedValue_Y,rmsPK_Z,\
rmsValue_Z,speedPK_Z,speedValue_Z,Frequency,VOLTage) values('{0}','{1}','{2}','{3}','{4}','{5}',\
'{6}','{7}','{8}','{9}','{10}','{11}','{12}','{13}','{14}','{15}','{16}','{17}',{18},'{19}'\
)".format(self.zig_reader.dev_evel_msg['传感器地址'],
self.zig_reader.dev_evel_msg['采集时间'],
self.zig_reader.dev_evel_msg['设备温度'],
self.zig_reader.dev_evel_msg['环境温度'],
self.zig_reader.dev_evel_msg['倾角'],
self.zig_reader.dev_evel_msg['电池电压'],
self.zig_reader.dev_evel_msg['X轴峰值'],
self.zig_reader.dev_evel_msg['X轴有效值'],
self.zig_reader.dev_evel_msg['X轴速度峰值'],
self.zig_reader.dev_evel_msg['X轴速度有效值'],
self.zig_reader.dev_evel_msg['Y轴峰值'],
self.zig_reader.dev_evel_msg['Y轴有效值'],
self.zig_reader.dev_evel_msg['Y轴速度峰值'],
self.zig_reader.dev_evel_msg['Y轴速度有效值'],
self.zig_reader.dev_evel_msg['Z轴峰值'],
self.zig_reader.dev_evel_msg['Z轴有效值'],
self.zig_reader.dev_evel_msg['Z轴速度峰值'],
self.zig_reader.dev_evel_msg['Z轴速度有效值'],self.Fre,self.Vol)
#print(sql)
data = db._Operate(sql)
self.TestOK = '0'
except Exception:
QMessageBox.warning(self, "标题", "数据存储异常", QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes)
return -1
def nfc_update(self):
msg = ''
for k, i in self.nfc_reader.dev_info_msg.items():
msg = msg + k + ":" + str(i) + "\n"
for k, i in self.nfc_reader.dev_conf_msg.items():
msg = msg + k + ":" + str(i) + "\n"
thistime = time.strftime("%Y-%m-%d %H:%M:%S ", time.localtime())
sql = "select count(*) from t_data_nfc where shortAddr = '{0}';".format(
self.nfc_reader.dev_conf_msg['Zigbee 本地地址'])
self.ShortAddr = self.nfc_reader.dev_conf_msg['Zigbee 本地地址']
data = db._Query(sql)
print(data[0]['count(*)'])
if data[0]['count(*)'] > 0:
sql = "update t_data_nfc set dataNodeName = '{0}',dataNodeNo = '{1}',\
hardWareVersion = '{2}',softVersion = '{3}',productNo = '{4}',\
productSN = '{5}',productType = '{6}',RSSI = '{7}',\
firstPowerTime = '{8}',TIME = '{9}',batteryV = '{10}'\
where shortAddr = '{11}';".format(self.nfc_reader.dev_info_msg['测点名称'],
self.nfc_reader.dev_info_msg['MAC地址'], \
self.nfc_reader.dev_info_msg['硬件版本'],
self.nfc_reader.dev_info_msg['软件版本'], \
self.nfc_reader.dev_info_msg['生产批号'],
self.nfc_reader.dev_info_msg['生产序列号'], \
self.nfc_reader.dev_info_msg['产品型号'],
self.nfc_reader.dev_info_msg['无线信号强度'], \
self.nfc_reader.dev_info_msg['首次上电日期'], thistime, \
self.nfc_reader.dev_info_msg['电池电压'],
self.nfc_reader.dev_conf_msg['Zigbee 本地地址'])
#print(sql)
data = db._Operate(sql);
else:
sql = "insert into t_data_nfc(dataNodeName,dataNodeNo,shortAddr,hardWareVersion,softVersion,productNo,\
productType,RSSI,productSN,firstPowerTime,batteryV,TIME) values('{0}','{1}','{2}','{3}','{4}','{5}',\
'{6}','{7}','{8}','{9}','{10}','{11}')".format(self.nfc_reader.dev_info_msg['测点名称'],
self.nfc_reader.dev_info_msg['MAC地址'], \
self.nfc_reader.dev_conf_msg['Zigbee 本地地址'],
self.nfc_reader.dev_info_msg['硬件版本'], \
self.nfc_reader.dev_info_msg['软件版本'],
self.nfc_reader.dev_info_msg['生产批号'], \
self.nfc_reader.dev_info_msg['产品型号'],
self.nfc_reader.dev_info_msg['无线信号强度'], \
self.nfc_reader.dev_info_msg['生产序列号'],
self.nfc_reader.dev_info_msg['首次上电日期'], \
self.nfc_reader.dev_info_msg['电池电压'], thistime)
#print(sql)
data = db._Operate(sql);
self.NFC_textEdit.setText(msg)
def NFCRead(self):
try:
self.Test_Btn.setEnabled(True)
self.Result_edit.clear()
self.tabWidget.setCurrentIndex(0)
QApplication.processEvents()
self.nfc_reader.get_uuid()
if self.nfc_reader.nfc_uuid:
self.Result_edit.append("DN101 NFC UUID" + hex(self.nfc_reader.nfc_uuid)[2:])
time.sleep(0.5)
info = self.nfc_reader.read_info()
conf = self.nfc_reader.read_conf()
if info & conf:
self.nfc_reader.dev_info_analysis()
self.nfc_reader.dev_conf_analysis()
self.nfc_update()
self.Result_edit.append("DN101 NFC读取成功")
return info
else:
self.Result_edit.append("NFC读取失败")
return False
else:
self.Result_edit.append("NFC读取失败")
return False
except Exception:
QMessageBox.warning(self, "标题", "测试异常", QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes)
return -1
if __name__ == '__main__':
app = QApplication(sys.argv)
MainWindow = QMainWindow()
ui = Ui_DataNodeFrequencyTest()
ui.setupUi(MainWindow)
View = DataNodeFrequencyTest()
View.show()
sys.exit(app.exec_())