add manager codes

This commit is contained in:
zhangsheng 2025-01-22 17:13:45 +08:00
parent bb9ab6db01
commit 4de0f19d9c
13 changed files with 589 additions and 215 deletions

View File

@ -32,12 +32,13 @@ void JsonData::CmtCmd_80(char* send_data,int& send_length)
memcpy(gateway_ver.comm_mode,"WiFi",sizeof(gateway_ver.comm_mode));
#endif
memcpy(gateway_ver.comm_mode,"有线",sizeof(gateway_ver.comm_mode));
memcpy(gateway_ver.terminal_name,ReadStrByOpt(NETWORKCONFIG, "eth0", "hostName").c_str(),sizeof(gateway_ver.terminal_name));
GetSysStatusCMT(gateway_ver.cpu_use,gateway_ver.memory_use,gateway_ver.disk_remain,gateway_ver.temperature);
memcpy(send_data,&gateway_ver,sizeof(GatewayVersion));
send_length = sizeof(GatewayVersion);
}
void JsonData::CmtCmd_81(char* send_data,int& send_length)
void JsonData::CmtCmd_81(char* recv_body,int& count,char* send_data,int& send_length)
{
int featureInterVal;
int featureInterTime;
@ -46,15 +47,37 @@ void JsonData::CmtCmd_81(char* send_data,int& send_length)
int maxSensorNum;
int sensorCount;
array_t arrRes;
char whereCon[512]={0};
if (recv_body != NULL)
{
printf("count = %d\n",count);
char short_addr_[256]={0};
for (size_t i = 0; i < count; i++){
char temp[5]={0};
memcpy(temp,recv_body + i * 4,4);
printf("short_addr = %s\n",temp);
strcat(short_addr_,"'");
strcat(short_addr_,temp);
strcat(short_addr_,"'");
if (i + 1 != count){
strcat(short_addr_ ,",");
}
}
sprintf(whereCon,"zigbeeShortAddr IN (%s)",short_addr_);
arrRes = sqlite_db_ctrl::instance().GetDataMultiLineTransaction(T_SENSOR_INFO(TNAME), "*", whereCon);
}else{
arrRes = sqlite_db_ctrl::instance().GetDataMultiLineTransaction(T_SENSOR_INFO(TNAME), "*", NULL);
}
int iResult = arrRes.size();
printf("result = %d\n",iResult);
size_t j = 0;
if (iResult > 0) {
SensorInfo sensor_info[iResult];
for (; j < iResult; j++)
{
memcpy(sensor_info[j].sensor_name,arrRes[j][1].c_str(),sizeof(sensor_info[j].sensor_name));
memcpy(sensor_info[j].measurement_id,arrRes[j][44].c_str(),sizeof(sensor_info[j].measurement_id));
sensor_info[j].short_addr = atoi(arrRes[j][30].c_str());
memcpy(sensor_info[j].short_addr , arrRes[j][30].c_str(),sizeof(sensor_info[j].short_addr));
memcpy(sensor_info[j].hw_ver,arrRes[j][8].c_str(),sizeof(sensor_info[j].hw_ver));
memcpy(sensor_info[j].soft_ver,arrRes[j][9].c_str(),sizeof(sensor_info[j].soft_ver));
std::vector<std::string> vParamRSSI;
@ -130,27 +153,31 @@ void JsonData::CmtCmd_81(char* send_data,int& send_length)
int static_Count = sqlite_db_ctrl::instance().GetTableRows(szTableName,strsql.c_str());
scheduler::instance().GetScheduleConfig(featureInterVal,waveInterVal,featureInterTime,waveInterTime,maxSensorNum);
zlog_info(zct,"wavex = %d,featureInterVal = %d,waveInterVal",waveX_Count,featureInterVal);
sensor_info[j].wave_x_reporting_rate = (float(waveX_Count)/(86400/(float)waveInterVal)) * 100;
sensor_info[j].wave_y_reporting_rate = (float(waveY_Count)/(86400/(float)waveInterVal)) * 100;
sensor_info[j].wave_z_reporting_rate = (float(waveZ_Count)/(86400/(float)waveInterVal)) * 100;
sensor_info[j].eigen_value_reporting_rate = (float(static_Count)/(86400/(float)featureInterVal)) *100;
zlog_info(zct,"wavex = %d,featureInterVal = %d,waveInterVal = %d",waveX_Count,featureInterVal,waveInterVal);
int day_count = 86400 / waveInterVal;
sensor_info[j].wave_x_reporting_rate = float( waveX_Count/ day_count) * 100;
printf("day_count = %d,wave_x_reporting_rate = %d\n",day_count,sensor_info[j].wave_x_reporting_rate );
sensor_info[j].wave_y_reporting_rate = (float(waveY_Count)/(86400/waveInterVal)) * 100;
sensor_info[j].wave_z_reporting_rate = (float(waveZ_Count)/(86400/waveInterVal)) * 100;
sensor_info[j].eigen_value_reporting_rate = (float(static_Count)/(86400/featureInterVal)) * 100;
day_count = 86400 / featureInterVal;
printf("static_Count = %d,day_count = %d,wave_x_reporting_rate = %d\n",static_Count,day_count,sensor_info[j].eigen_value_reporting_rate );
memset(whereCon,0,sizeof(whereCon));
sprintf(whereCon,"channelID = '%s-Z'",arrRes[j][44].c_str());
std::string integratRMS = sqlite_db_ctrl::instance().GetData(T_DATA_INFO(TNAME), "integratRMS",whereCon);
memcpy(sensor_info[j].velocity_rms , integratRMS.c_str(),sizeof(sensor_info[j].velocity_rms));;
sensor_info[j].upgrade_status = 0;
memcpy(sensor_info[j].upgrade_time,"2024-01-08 12:00:00",sizeof(sensor_info[j].upgrade_time));
sensor_info[j].velocity_rms = atof(integratRMS.c_str());
sensor_info[j].upgrade_status = atoi(arrRes[j][46].c_str());
memset(whereCon,0,sizeof(whereCon));
sprintf(whereCon,"short_addr = '%s' ORDER BY start_timestamp DESC limit 0,1",arrRes[j][30].c_str());
std::string upgrade_time = sqlite_db_ctrl::instance().GetData("firmware_upgrade", " start_timestamp ", whereCon);
memcpy(sensor_info[j].upgrade_time,upgrade_time.c_str(),sizeof(sensor_info[j].upgrade_time));
sensor_info[j].version = 1;
}
memcpy(send_data,&sensor_info,sizeof(SensorInfo) * j);
memcpy(send_data + sizeof(SensorInfo) * j,&sensor_info,sizeof(SensorInfo) * j);
memcpy(send_data + sizeof(SensorInfo) * j * 2,&sensor_info,sizeof(SensorInfo) * j);
memcpy(send_data + sizeof(SensorInfo) * j * 3,&sensor_info,sizeof(SensorInfo) * j);
memcpy(send_data + sizeof(SensorInfo) * j * 4,&sensor_info,sizeof(SensorInfo) * j);
send_length = sizeof(SensorInfo) * j * 5;
send_length = sizeof(SensorInfo) * j;
zlog_info(zct,"sizeof(SensorInfo) = %d",sizeof(SensorInfo)*j);
zlog_info(zct,"send_length = %d",send_length);
}
@ -198,13 +225,30 @@ void JsonData::CmtCmd_82(char* MeasurementID,char* send_data,int& channel,int& s
free(buffer);
}
void JsonData::CmtCmd_83(char* MeasurementID,char* send_data,int& send_length)
void JsonData::CmtCmd_83(char* recv_body,int& count,char* send_data,int& send_length)
{
vec_t vecRes;
std::string filename = "";
char whereCon[128]={0};
sprintf(whereCon,"MeasurementID = '%s'",MeasurementID);
if (recv_body){
printf("count = %d\n",count);
char MeasurementID_[256]={0};
for (size_t i = 0; i < count; i++){
char temp[5]={0};
memcpy(temp,recv_body + i * 20,20);
printf("short_addr = %s\n",temp);
strcat(MeasurementID_,"'");
strcat(MeasurementID_,temp);
strcat(MeasurementID_,"'");
if (i + 1 != count){
strcat(MeasurementID_ ,",");
}
}
sprintf(whereCon,"MeasurementID IN (%s)",MeasurementID_);
vecRes = sqlite_db_ctrl::instance().GetDataSingleLine(T_SENSOR_INFO(TNAME), "*", whereCon);
}else{
vecRes = sqlite_db_ctrl::instance().GetDataSingleLine(T_SENSOR_INFO(TNAME), "*", NULL);
}
if (vecRes.size() > 0){
DownloadConfig download_config;
memcpy(download_config.gw_mac,GlobalConfig::MacAddr_G.c_str(),sizeof(download_config.gw_mac));
@ -334,12 +378,140 @@ void JsonData::CmtCmd_84(char* filename)
}
void JsonData::CmtCmd_85(char* filename)
void JsonData::CmtCmd_85(char* filename,char* file_md5,char* send_data,int& send_length)
{
zlog_info(zct,"file_path = %s,file_md5 = %d",filename,file_md5);
sleep(3);
int iRet = system("/opt/opt.sh");
zlog_info(zct, "iRet = %d", iRet);
if (iRet == -1) {
zlog_error(zct, "system() error");
}
}
void JsonData::CmtCmd_86(char* recv_body,int& count,char* filename,char* file_md5,char* send_data,int& send_length)
{
zlog_info(zct,"file_path = %s,file_md5 = %d",filename,file_md5);
UpgradeSensorRes upgrade_sensor_res;
upgrade_sensor_res.code = 0;
sprintf(upgrade_sensor_res.message ,"%s","");
char cmd[128]={0};
char file_path[64]={0};
sprintf(cmd, "mv /opt/%s /opt/DataNode/",filename);
system(cmd);
sprintf(file_path, "/opt/DataNode/%s",filename);
FILE * pFile=NULL;
size_t thisSize = 0;
char *buffer=NULL;
pFile = fopen (file_path,"rb");
if (pFile==NULL) {
zlog_error(zct,"Error opening file");
}
else
{
while (fgetc(pFile) != EOF) {
++thisSize;
}
rewind(pFile);
buffer = (char*)malloc(thisSize);
fread (buffer, sizeof (char), thisSize, pFile);
fclose (pFile);
}
zlog_info(zct,"Read %zu bytes", thisSize);
char sensor_type[6] = {0};
char sf_version[10] = {0};
memcpy(sensor_type, buffer, 5);
zlog_info(zct,"model:%s", sensor_type);
char c[2] = {0};
c[0] = buffer[5];
uint8_t hw_ver = atoi(c);
c[0] = buffer[6];
uint8_t sf_ver_m = atoi(c);
c[0] = buffer[7];
uint8_t sf_ver_s = atoi(c);
sprintf(sf_version,"%d.%d",sf_ver_m,sf_ver_s);
unsigned char ch_crc = 0x00;
int packgeSize = 0;
ch_crc = buffer[12];
packgeSize = BUILD_UINT32(buffer[8],buffer[9],buffer[10],buffer[11]);
zlog_info(zct,"sf_ver_m = %d",sf_ver_m);
zlog_info(zct,"sf_ver_s = %d",sf_ver_s);
zlog_info(zct,"hw_ver = %d",hw_ver);
zlog_info(zct,"sensor_type = %s",sensor_type);
zlog_info(zct,"packgeSize = %d",packgeSize);
zlog_info(zct,"ch_crc = %02x",ch_crc);
unsigned char sum = 0x00;
for(size_t i = 13; i < thisSize;i++){
sum += buffer[i];
}
if (sum % 256 != ch_crc){
zlog_error(zct,"package CRC error,filename = %s",file_path);
upgrade_sensor_res.code = 1;
sprintf(upgrade_sensor_res.message ,"%s","package CRC error");
memcpy(send_data,(char*)&upgrade_sensor_res,sizeof(DownloadConfigRes));
return;
}
zlog_info(zct,"sum = %x\n",sum % 256);
char localtimestamp[32] = {0};
GetTimeNet(localtimestamp, 1);
for (size_t i = 0; i < count; i++)
{
char wherecon[100] = {0};
char insertSql[200] = {0};
char updateSql[100] = {0};
char short_addr_[21] = {0};
memcpy(short_addr_,recv_body + i * 4,4);
sprintf(wherecon," zigbeeShortAddr = '%s' ",short_addr_);
vec_t vecResult = sqlite_db_ctrl::instance().GetDataSingleLine(T_SENSOR_INFO(TNAME), " hardVersion,softVersion,ProductNo,zigbeeShortAddr ", wherecon);
if (hw_ver != atoi(vecResult[0].c_str())){
zlog_error(zct,"hardVersion error,filename = %s",file_path);
upgrade_sensor_res.code = 2;
sprintf(upgrade_sensor_res.message ,"%s","hardVersion error");
memcpy(send_data,(char*)&upgrade_sensor_res,sizeof(DownloadConfigRes));
return;
}
sprintf(insertSql, " '%s','%s','','','','','%d.%d','%s',1,'%s'",vecResult[3].c_str(),localtimestamp,sf_ver_m,sf_ver_s,vecResult[1].c_str(),filename);
sqlite_db_ctrl::instance().InsertData(" firmware_upgrade ", insertSql);
//0 默认状态1 升级中2 升级成功,3 升级失败
memset(wherecon,0,sizeof(wherecon));
memset(updateSql,0,sizeof(updateSql));
sprintf(wherecon," zigbeeShortAddr = '%s'",vecResult[3].c_str());
sprintf(updateSql, " upgradeStatus = %d ", 1);
sqlite_db_ctrl::instance().UpdateTableData(T_SENSOR_INFO(TNAME), updateSql,wherecon);
uint16_t short_addr;
char *end_ptr = NULL;
short_addr = strtol(vecResult[3].c_str(), &end_ptr, 16);
int res = scheduler::instance().UpgradeSensor(short_addr,std::string(sensor_type),atoi(vecResult[0].c_str()),vecResult[1],std::string(sf_version));
}
free(buffer);
}
void JsonData::CmtCmd_88(char* recv_body,int& count)
{
}
void JsonData::CmtCmd_86(char* filename)
{
std::string get_file_md5(const std::string& file_path) {
// 使用 md5sum 命令计算文件的 MD5 值
std::string command = "md5sum " + file_path + " | awk '{ print $1 }'";
// 使用 popen 执行命令并获取输出
std::array<char, 128> buffer;
std::string result;
std::shared_ptr<FILE> pipe(popen(command.c_str(), "r"), pclose);
if (!pipe) {
std::cerr << "Error: Unable to run md5sum command" << std::endl;
return "";
}
// 从命令的输出流中读取数据
while (fgets(buffer.data(), buffer.size(), pipe.get()) != nullptr) {
result += buffer.data();
}
// 去除末尾的换行符
result.erase(result.find_last_not_of("\n") + 1);
return result;
}

View File

@ -5,7 +5,7 @@
#include <iostream>
#include <json/json.h>
#include "common/parameter_defination.hpp"
#include "localserver/cmt_protocol.h"
#include "localserver/cmt_server.hpp"
class JsonData {
public:
@ -79,12 +79,13 @@ public:
//CMT tcp
void CmtCmd_80(char* send_data,int& return_length);
void CmtCmd_81(char* send_data,int& return_length);
void CmtCmd_81(char* recv_body,int& count,char* send_data,int& return_length);
void CmtCmd_82(char* MeasurementID,char* send_data,int& channel,int& return_length);
void CmtCmd_83(char* MeasurementID,char* send_data,int& return_length);
void CmtCmd_83(char* recv_body,int& count,char* send_data,int& return_length);
void CmtCmd_84(char* filename);
void CmtCmd_85(char* filename);
void CmtCmd_86(char* filename);
void CmtCmd_85(char* filename,char* file_md5,char* send_data,int& send_length);
void CmtCmd_86(char* recv_body,int& count,char* filename,char* file_md5,char* send_data,int& send_length);
void CmtCmd_88(char* recv_body,int& count);
private:
Json::FastWriter show_value_;

View File

@ -644,7 +644,7 @@ std::string JsonData::JsonCmd_Cgi_60(Param_60 &param){
jsonVal["success"] = true;
jsonVal["message"] = "";
char file_path[64]={0};
char cmd[64]={0};
char cmd[128]={0};
sprintf(cmd, "mv /opt/%s /opt/DataNode/",param.fileName.c_str());
system(cmd);
sprintf(file_path, "/opt/DataNode/%s",param.fileName.c_str());
@ -751,16 +751,17 @@ std::string JsonData::JsonCmd_Cgi_60(Param_60 &param){
char *end_ptr = NULL;
short_addr = strtol(vecResult[3].c_str(), &end_ptr, 16);
int res = scheduler::instance().UpgradeSensor(short_addr,std::string(sensor_type),atoi(vecResult[0].c_str()),vecResult[1],std::string(sf_version));
if (res != 0)
{
if (res != 0){
jsonVal["success"] = false;
jsonVal["message"] = "UpgradeSensor error";
free(buffer);
return show_value_.write(jsonVal);
}
zlog_info(zct,"000000000000");
}
free(buffer);
zlog_info(zct,"11111111111");
return show_value_.write(jsonVal);
}

View File

@ -6,11 +6,11 @@
extern zlog_category_t *zct;
void LocalServer::HandleTcp_cmd(const char* recv_data,char* send_data,uint8_t& rescmd,int& send_length)
void LocalServer::HandleTcp_cmd(const char* recv_data,char* send_data,uint8_t& rescmd,int& send_length,int recvbody_length)
{
zlog_info(zct,"HandleTcp_cmd");
PackgeHead head;
memcpy(&head,recv_data,sizeof(PackgeHead));
PackageHead head;
memcpy(&head,recv_data,sizeof(PackageHead));
uint8_t cmd = head.cmd;
rescmd = head.cmd;
printf("cmd = %d\n",cmd);
@ -28,56 +28,82 @@ void LocalServer::HandleTcp_cmd(const char* recv_data,char* send_data,uint8_t& r
}break;
case kGateWayVersion: {
jd.CmtCmd_80(send_data,send_length);
// memcpy(send_data,&send_data_,sizeof(send_length));
// if (send_data_){
// free(send_data_);
// }
}break;
case kSensorInfo:{
jd.CmtCmd_81(send_data,send_length);
char *recv_body = NULL;
if (recvbody_length > 0){
recv_body = (char*)malloc(recvbody_length - 4);
memcpy(recv_body,recv_data + sizeof(PackageHead) + 4,recvbody_length - 4);
}
int count = 0;
memcpy((char*)&count,recv_data + sizeof(PackageHead),4);
jd.CmtCmd_81(recv_body,count,send_data,send_length);
if (recv_body){
free(recv_body);
}
}break;
case kSensorWave:{
WaveReq wave_req;
memcpy(&wave_req,recv_data + sizeof(PackgeHead),sizeof(WaveReq));
memcpy(&wave_req,recv_data + sizeof(PackageHead),sizeof(WaveReq));
jd.CmtCmd_82(wave_req.measurement_id,send_data,wave_req.channel,send_length);
}break;
case KDownloadConfig:{
DownloadConfigReq download_config;
memcpy(&download_config,recv_data + sizeof(PackgeHead),sizeof(DownloadConfigReq));
jd.CmtCmd_83(download_config.measurement_id,send_data,send_length);
char *recv_body = NULL;
int count = 0;
if (recvbody_length > 0){
recv_body = (char*)malloc(recvbody_length - 4);
memcpy((char*)&count,(char*)recv_data + sizeof(PackageHead),4);
memcpy(recv_body,recv_data + sizeof(PackageHead) + 4,recvbody_length - 4);
}
jd.CmtCmd_83(recv_body,count,send_data,send_length);
zlog_info(zct, "CmtCmd_83 = %d",send_length);
if (recv_body){
free(recv_body);
}
}break;
case KUploadConfig:{
UploadConfigReq upload_config;
memcpy(&upload_config,recv_data + sizeof(PackgeHead),sizeof(UploadConfigReq));
memcpy(&upload_config,recv_data + sizeof(PackageHead),sizeof(UploadConfigReq));
jd.CmtCmd_84(upload_config.filename);
}break;
case KUpgadeGateway:{
sleep(3);
int iRet = system("/opt/opt.sh");
zlog_info(zct, "iRet = %d", iRet);
if (iRet == -1) {
zlog_error(zct, "system() error");
}
UpgradeGwReq upgrade_gw;
memcpy(&upgrade_gw,recv_data + sizeof(PackageHead),sizeof(UpgradeGwReq));
jd.CmtCmd_85(upgrade_gw.filename,upgrade_gw.md5,send_data,send_length);
}break;
case KUpgradeSensor:{
char *recv_body = NULL;
int count = 0;
if (recvbody_length > 0){
recv_body = (char*)malloc(recvbody_length - sizeof(UpgradeSensorReq) - 4);
memcpy((char*)&count,(char*)recv_data + sizeof(PackageHead) + sizeof(UpgradeSensorReq) ,4);
memcpy(recv_body,recv_data + sizeof(PackageHead) + sizeof(UpgradeSensorReq) + 4,recvbody_length - sizeof(UpgradeSensorReq) - 4);
}
UpgradeSensorReq upgrade_sensor;
memcpy(&upgrade_sensor,recv_data + sizeof(PackgeHead),sizeof(UpgradeSensorReq));
jd.CmtCmd_86(upgrade_sensor.filename);
memcpy(&upgrade_sensor,recv_data + sizeof(PackageHead),sizeof(UpgradeSensorReq));
jd.CmtCmd_86(recv_body,count,upgrade_sensor.filename,upgrade_sensor.md5,send_data,send_length);
}break;
case KEigenvalue:{
}
break;
case KUpgradeSensorStop:{
char *recv_body = NULL;
int count = 0;
if (recvbody_length > 0){
recv_body = (char*)malloc(recvbody_length - 4);
memcpy((char*)&count,(char*)recv_data + sizeof(PackageHead) ,4);
memcpy(recv_body,recv_data + sizeof(PackageHead) + 4 ,recvbody_length - 4);
}
UpgradeSensorReq upgrade_sensor;
memcpy(&upgrade_sensor,recv_data + sizeof(PackageHead),sizeof(UpgradeSensorReq));
jd.CmtCmd_88(recv_body,count);
}
break;
default:
return NULL;
break;
}

View File

@ -0,0 +1,94 @@
#include "cmt_server.hpp"
#include <iostream>
#include <cstdio>
#include <cstring>
#include <memory>
#include <array>
#include <zlog.h>
#include "localserver/local_server.hpp"
extern zlog_category_t *zct;
void CMTSession::start() {
memset(data_, 0, CMT_TCP_LEN);
sessionSet_.insert(shared_from_this()); // 将会话添加到会话集合中
do_read();
}
void CMTSession::set_data(char *data, int len) {
memcpy(data_, data, len);
}
void CMTSession::do_write(std::size_t length) {
auto self(shared_from_this());
zlog_debug(zct, "[CMT] response len = %d", length);
boost::asio::async_write(socket_, boost::asio::buffer(data_, length), [this, self](boost::system::error_code ec, std::size_t /*length*/) {
if (ec) {
zlog_error(zct, "[CMT] fail to send data, %s, %d", ec.category().name(), ec.value());
socket_.close();
sessionSet_.erase(shared_from_this());
} else {
zlog_debug(zct, "[CMT] waiting for next message...");
do_read();
// socket_.close();
// sessionSet_.erase(shared_from_this());
}
});
}
void CMTServer::do_accept() {
acceptor_.async_accept(socket_, [this](boost::system::error_code ec) {
zlog_debug(zct, "[CMT] accept a socket");
if (!ec) {
std::make_shared<CMTSession>(std::move(socket_), sessionSet_)->start();
}
do_accept();
});
}
void CMTSession::do_read() {
auto self(shared_from_this());
socket_.async_read_some(boost::asio::buffer(data_, CMT_TCP_LEN), [this, self](boost::system::error_code ec, std::size_t length) {
if (!ec) {
if (length == 0) {
zlog_warn(zct, "[CMT] Client disconnected (length 0)");
sessionSet_.erase(shared_from_this());
return;
}
if (length < 6 || data_[0] != 0xAA || data_[1] != 0x55 || data_[2] != 0xAA) {
zlog_warn(zct, "[CMT] invalid data package, len:%d", length);
do_read();
}
uint8_t cmd = data_[3];
int payload_len = 0;
memcpy((char*)&payload_len, (char*)&data_[4], 4);
payload_len = htonl(payload_len);
zlog_debug(zct, "[CMT] cmd: %d, message len: %d, payload len: %d, head:%2x-%2x-%2x", cmd, length, payload_len, data_[0], data_[1], data_[2]);
zlog_debug(zct, "[CMT] payload bytes %d, %d", data_[4], data_[5]);
int send_data_len = 0;
char send_data[96100] = {0};
LocalServer::HandleTcp_cmd(data_,send_data,cmd,send_data_len,payload_len);
PackageHead pkg_head;
pkg_head.head[0] = 0xAA;
pkg_head.head[1] = 0x55;
pkg_head.head[2] = 0xAA;
pkg_head.cmd = cmd;
pkg_head.len = send_data_len;
memset(data_,0,sizeof(data_));
memcpy(data_, &pkg_head, head_len_);
memcpy(data_ + head_len_, &send_data, send_data_len);
do_write(head_len_ + send_data_len);
} else if (ec == boost::asio::error::eof){
zlog_warn(zct, "[CMT] Client disconnect the connection");
sessionSet_.erase(shared_from_this());
} else if (ec == boost::asio::error::connection_reset){
zlog_warn(zct, "[CMT] Connection reset by client");
sessionSet_.erase(shared_from_this());
} else {
zlog_error(zct, "[CMT] Error message, reason: %s", ec.message().c_str());
socket_.close();
sessionSet_.erase(shared_from_this());
}
});
}

View File

@ -1,15 +1,25 @@
#ifndef CMT_PROTOCOL_HPP_
#define CMT_PROTOCOL_HPP_
#include <iostream>
#include <boost/asio.hpp>
#include <cstdlib>
#include <memory>
#include <utility>
#include <unordered_set>
using boost::asio::ip::tcp;
#define CMT_TCP_LEN 100000
//1 tcp server可以参考本安有线中的代码侦听端口10000
//2 定义无线网关与传感器用到的结构
//3 包头为
#pragma pack(1)
struct PackgeHead{
struct PackageHead{
uint8_t head[3]; // 固定值0xAA55AA
uint8_t cmd;
uint16_t len;
int len;
char data[0];
};
@ -45,10 +55,15 @@ enum LooseStatus{
LOOSE = 1
};
//传感器信息cmd 81
struct SensorInfoReq{
char short_addr[4];
};
struct SensorInfo{
int version;
char sensor_name[64];
char measurement_id[20];
int short_addr;
char short_addr[5];
char hw_ver[12];
char soft_ver[12];
int gateway_rssi;
@ -63,7 +78,7 @@ struct SensorInfo{
int wave_x_reporting_rate;
int wave_y_reporting_rate;
int wave_z_reporting_rate;
char velocity_rms[10];
float velocity_rms;
int upgrade_status; // 参考 SensorUpgradeStatus
char upgrade_time[20];
};
@ -128,16 +143,29 @@ struct DownloadConfig{
//上传配置 cmd 84
struct UploadConfigReq{
char filename[32];
char filename[64];
char md5[33];
};
//网关更新 cmd 85
//传感器更新 cmd 86
struct UpgradeSensorReq{
char filename[32];
int upgrade_short_addr[32];
struct UpgradeGwReq{
char filename[64];
char md5[33];
};
struct UpgradeGwRes{
int code;
char message[64];
};
//传感器更新 cmd 86
struct UpgradeSensorReq{
char filename[64];
char md5[33];
char upgrade_short_addr[0];
};
struct UpgradeSensorRes{
int code;
char message[64];
};
//获取特征值 cmd 87
@ -153,4 +181,46 @@ struct Search {
int gw_type; // GatewayType
};
class CMTSession : public std::enable_shared_from_this<CMTSession> {
public:
CMTSession(tcp::socket socket, std::unordered_set<std::shared_ptr<CMTSession>>& sessionSet) : socket_(std::move(socket)), sessionSet_(sessionSet) {
web_version_ = "1.0.1";
version_ = 1;
head_len_ = sizeof(PackageHead);
}
void start();
void set_data(char *data, int len);
void do_write(std::size_t length);
private:
void do_read();
tcp::socket socket_;
std::unordered_set<std::shared_ptr<CMTSession>>& sessionSet_;
char data_[CMT_TCP_LEN];
std::string web_version_;
int version_;
int head_len_;
};
class CMTServer {
public:
CMTServer(boost::asio::io_service& io_service, short port) : acceptor_(io_service, tcp::endpoint(tcp::v4(), port)), socket_(io_service) { do_accept(); }
// 向所有客户端发送消息
void send_message_to_all(char *message, int len) {
for (auto& session : sessionSet_) {
session->set_data(message, len);
session->do_write(len);
}
}
private:
void do_accept();
tcp::acceptor acceptor_;
tcp::socket socket_;
std::unordered_set<std::shared_ptr<CMTSession>> sessionSet_;
};
#endif

View File

@ -80,7 +80,7 @@ public:
virtual ~LocalServer();
static void HandleFromServer(const char *pData, int pLen, const char *topic);
static std::string HandleCgi_cmd(std::string &pData);
static void HandleTcp_cmd(const char* recvData,char* send_data,uint8_t& rescmd,int& reslength);
static void HandleTcp_cmd(const char* recvData,char* send_data,uint8_t& rescmd,int& reslength,int recvbody_length = 0);
};
#endif // LOCAL_SERVER_HPP_

View File

@ -18,6 +18,7 @@
#include "dbaccess/sql_db.hpp"
#include "uart/uart.hpp"
#include "minilzo/minilzo.h"
#include "scheduler/schedule.hpp"
extern std::vector<RecvData> g_VecWaveDataX;
extern std::vector<RecvData> g_VecWaveDataY;
@ -43,6 +44,7 @@ int main(int argc, char *argv[]) {
zlog_info(zbt, " Firmware compile time:%s %s,version %s", __DATE__, __TIME__, GlobalConfig::Version.c_str());
zlog_info(zbt, "####CIDNSOFT start####");
return 0;
g_VecWaveDataX.reserve(1000);
g_VecWaveDataY.reserve(1000);
@ -116,7 +118,7 @@ int main(int argc, char *argv[]) {
startTcpCgi.detach();
//启动CMT server
boost::thread startTcpCmt(attrs, StartTCPServer);
boost::thread startTcpCmt(attrs, StartCMTServer);
startTcpCmt.detach();
sleep(5);

View File

@ -544,24 +544,28 @@ int SensorScheduler::UpdateConfigResult(int short_addr, int result) {
int SensorScheduler::UpgradeSensor(int short_addr, std::string sensor_type, int hw_version,
std::string current_sw_version, std::string upgrade_sw_version) {
int id = 0;
auto iter = short_addr_map_.find(short_addr);
if (iter == short_addr_map_.end()) {
zlog_error(zct, "cannot find id for short_addr %x", short_addr);
return 1;
} else {
id = iter->second;
}
UpgradeInfo info;
info.try_times = 0;
info.sensor_type = sensor_type;
info.hw_version = hw_version;
info.current_sw_version = current_sw_version;
info.upgrade_sw_version = upgrade_sw_version;
long ts = GetLocalTs();
info.submit_time = GetUTCTime(ts);
upgrade_[id] = info;
zlog_info(zbt, "[%d] short addr:%x add upgrade info", id, short_addr);
UpgradeCfg::WriteCfg(upgrade_);
// auto iter = short_addr_map_.find(short_addr);
// if (iter == short_addr_map_.end()) {
// zlog_error(zct, "cannot find id for short_addr %x", short_addr);
// return 1;
// } else {
// id = iter->second;
// }
// zlog_info(zbt, "[%d] short addr:%x sensor_type = %s,hw_version = %d,\
// current_sw_version = %s,upgrade_sw_version = %s", id, short_addr,\
// sensor_type.c_str(),hw_version,current_sw_version.c_str(),upgrade_sw_version.c_str());
// UpgradeInfo info;
// info.try_times = 0;
// info.sensor_type = sensor_type;
// info.hw_version = hw_version;
// info.current_sw_version = current_sw_version;
// info.upgrade_sw_version = upgrade_sw_version;
// long ts = GetLocalTs();
// info.submit_time = GetUTCTime(ts);
// upgrade_[id] = info;
// zlog_info(zbt, "[%d] short addr:%x add upgrade info", id, short_addr);
// UpgradeCfg::WriteCfg(upgrade_);
//zlog_info(zbt, "[%d] 222short addr:%x add upgrade info", id, short_addr);
return 0;
}

View File

@ -59,32 +59,37 @@ int UpgradeCfg::ReadCfg(std::map<int, UpgradeInfo> &upgrade) {
}
int UpgradeCfg::WriteCfg(std::map<int, UpgradeInfo> &upgrade) {
if (upgrade.size() == 0) {
ClearCfg();
return 0;
}
Json::Value root;
for (auto item : upgrade) {
Json::Value upgrade_item;
upgrade_item["id"] = item.first;
upgrade_item["try_times"] = item.second.try_times;
upgrade_item["type"] = item.second.sensor_type;
upgrade_item["hw_version"] = item.second.hw_version;
upgrade_item["current_sw_version"] = item.second.current_sw_version;
upgrade_item["upgrade_sw_version"] = item.second.upgrade_sw_version;
upgrade_item["submit_time"] = item.second.submit_time;
Json::Value try_world_time;
for (auto entry : item.second.try_world_time1) {
try_world_time.append(entry);
}
upgrade_item["try_world_time"] = try_world_time;
root.append(upgrade_item);
}
Json::StyledStreamWriter streamWriter;
std::ofstream out_file(UPGRADE_CONFIG);
streamWriter.write(out_file, root);
out_file.close();
// if (upgrade.size() == 0) {
// ClearCfg();
// return 0;
// }
zlog_info(zct,"123 = %d",upgrade.size());
// Json::Value root;
// for (auto item : upgrade) {
// zlog_info(zct,"1111");
// Json::Value upgrade_item;
// upgrade_item["id"] = item.first;
// upgrade_item["try_times"] = item.second.try_times;
// upgrade_item["type"] = item.second.sensor_type;
// upgrade_item["hw_version"] = item.second.hw_version;
// upgrade_item["current_sw_version"] = item.second.current_sw_version;
// upgrade_item["upgrade_sw_version"] = item.second.upgrade_sw_version;
// upgrade_item["submit_time"] = item.second.submit_time;
// Json::Value try_world_time;
// zlog_info(zct,"2222");
// for (auto entry : item.second.try_world_time1) {
// try_world_time.append(entry);
// }
// zlog_info(zct,"333");
// upgrade_item["try_world_time"] = try_world_time;
// root.append(upgrade_item);
// zlog_info(zct,"444");
// }
// zlog_info(zct,"456");
// Json::StyledStreamWriter streamWriter;
// std::ofstream out_file(UPGRADE_CONFIG);
// streamWriter.write(out_file, root);
// out_file.close();
return 0;
}

View File

@ -20,6 +20,9 @@
#include "utility/udp_scan.hpp"
#include "utility/search_dev.hpp"
#include "localserver/local_server.hpp"
#include "localserver/cmt_server.hpp"
#include <memory>
extern zlog_category_t *zct;
extern zlog_category_t *zbt;
@ -37,16 +40,12 @@ void StartCgiServer() {
}
}
void StartTCPServer() {
zlog_info(zbt, "start deal TCP");
while (1) {
TcpCgi::startTCPServer();
sleep(10);
}
std::unique_ptr<CMTServer> g_mgr_server;
void StartCMTServer() {
boost::asio::io_service io_service;
g_mgr_server = std::make_unique<CMTServer>(io_service, 10000); io_service.run();
}
void RunLED() {
while (1) {
gpio_set(GlobalConfig::GPIO_G.runLed, 1);

View File

@ -17,7 +17,7 @@ void CheckThread(); //循环检测线程
void StartMqttClient(); //启动mqtt服务
void SearchThread(); //组播功能, 提供发现设备功能
void StartCgiServer(); //启动cgi处理服务端
void StartTCPServer();
void StartCMTServer();
void HeartRep();
void UartStart(); //
void TestUart();

View File

@ -1,6 +1,6 @@
#include "tcp_cgi.hpp"
#include "localserver/local_server.hpp"
#include "localserver/cmt_protocol.h"
#include "localserver/cmt_server.hpp"
#include <zlog.h>
extern zlog_category_t *zct;
@ -61,108 +61,108 @@ void TcpCgi::startCgiServer() {
}
void TcpCgi::startTCPServer() {
int listenfd, connfd;
int mw_optval = 1;
// int listenfd, connfd;
// int mw_optval = 1;
struct sockaddr_in servaddr;
char buff[40960];
int n;
if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
zlog_error(zbt, "create socket error: %s(errno: %d)", strerror(errno), errno);
return;
}
setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&mw_optval, sizeof(mw_optval));
// struct sockaddr_in servaddr;
// char buff[40960];
// int n;
// if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
// zlog_error(zbt, "create socket error: %s(errno: %d)", strerror(errno), errno);
// return;
// }
// setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&mw_optval, sizeof(mw_optval));
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(10000);
// memset(&servaddr, 0, sizeof(servaddr));
// servaddr.sin_family = AF_INET;
// servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
// servaddr.sin_port = htons(10000);
if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
zlog_error(zbt, "bind socket error: %s(errno: %d)", strerror(errno), errno);
return;
}
// if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) {
// zlog_error(zbt, "bind socket error: %s(errno: %d)", strerror(errno), errno);
// return;
// }
if (listen(listenfd, 10) == -1) {
zlog_error(zbt, "listen socket error: %s(errno: %d)", strerror(errno), errno);
return;
}
char *send_all_data = NULL;
send_all_data = (char*)malloc(384000); // 96000*4 = 384000
if (send_all_data == NULL) {
zlog_error(zct,"send_all_data Memory error");
}
// if (listen(listenfd, 10) == -1) {
// zlog_error(zbt, "listen socket error: %s(errno: %d)", strerror(errno), errno);
// return;
// }
// char *send_all_data = NULL;
// send_all_data = (char*)malloc(384000); // 96000*4 = 384000
// if (send_all_data == NULL) {
// zlog_error(zct,"send_all_data Memory error");
// }
while (1) {
if ((connfd = accept(listenfd, (struct sockaddr *)NULL, NULL)) == -1) {
zlog_error(zct, "accept socket error: %s(errno: %d)", strerror(errno), errno);
continue;
}
n = recv(connfd, buff, 40960, 0);
if (n <= 0) {
zlog_info(zct, "recv 0 and will close");
close(connfd);
} else {
printf("recv len = %d\n", n);
buff[n] = '\0';
PackgeHead pkg;
pkg.head[0] = 0xAA;
pkg.head[1] = 0x55;
pkg.head[2] = 0xAA;
int recv_data_len = 0;
char send_data[96100] = {0};
LocalServer::HandleTcp_cmd(buff,send_data,pkg.cmd,recv_data_len);
printf("response len = %d\n", recv_data_len);
int PKG_ONCE_SIZE = 1300;
int package_num = recv_data_len / PKG_ONCE_SIZE + 1;
if (recv_data_len % PKG_ONCE_SIZE == 0) {
package_num -= 1;
}
Search search;
int PKG_HEADER_LEN = sizeof(PackgeHead);
printf("package_num = %d\n", package_num);
if (recv_data_len > PKG_ONCE_SIZE) {
for (int i = 0; i < package_num; ++i) {
if (package_num - 1 == i) {
memcpy(send_all_data , send_data + i * PKG_ONCE_SIZE, recv_data_len - i * PKG_ONCE_SIZE);
send(connfd, send_all_data, recv_data_len - i * PKG_ONCE_SIZE,0);
printf("i = %d,send size = %d\n",i,recv_data_len - i * PKG_ONCE_SIZE + PKG_HEADER_LEN);
}else if(i == 0){
printf("i = %d,head size = %d,total size = %d\n",i,PKG_HEADER_LEN, PKG_ONCE_SIZE + PKG_HEADER_LEN);
pkg.len = recv_data_len;
memcpy(send_all_data, &pkg, PKG_HEADER_LEN);
memcpy(send_all_data + PKG_HEADER_LEN, send_data + i * PKG_ONCE_SIZE, PKG_ONCE_SIZE);
send(connfd, send_all_data, PKG_ONCE_SIZE + PKG_HEADER_LEN,0);
printf("i = %d,send size = %d\n",i,PKG_ONCE_SIZE + PKG_HEADER_LEN);
memset(send_all_data,0,38400);
} else {
printf("i = %d,package_num = %d\n",i, package_num);
memcpy(send_all_data, send_data + i * PKG_ONCE_SIZE, PKG_ONCE_SIZE);
send(connfd, send_all_data, PKG_ONCE_SIZE,0);
printf("i = %d,send size = %d\n",i,PKG_ONCE_SIZE);
memset(send_all_data,0,38400);
mssleep(50);
}
}
} else {
pkg.len = recv_data_len;
memcpy(send_all_data, &pkg, PKG_HEADER_LEN);
memcpy(send_all_data + PKG_HEADER_LEN, &send_data, recv_data_len);
send(connfd, send_all_data, recv_data_len + PKG_HEADER_LEN,0);
printf("only one pkg, send data len:%d\n", recv_data_len + PKG_HEADER_LEN);
// while (1) {
// if ((connfd = accept(listenfd, (struct sockaddr *)NULL, NULL)) == -1) {
// zlog_error(zct, "accept socket error: %s(errno: %d)", strerror(errno), errno);
// continue;
// }
// n = recv(connfd, buff, 40960, 0);
// if (n <= 0) {
// zlog_info(zct, "recv 0 and will close");
// close(connfd);
// } else {
// printf("recv len = %d\n", n);
// buff[n] = '\0';
// PackageHead pkg;
// pkg.head[0] = 0xAA;
// pkg.head[1] = 0x55;
// pkg.head[2] = 0xAA;
// int recv_data_len = 0;
// char send_data[96100] = {0};
// LocalServer::HandleTcp_cmd(buff,send_data,pkg.cmd,recv_data_len);
// printf("response len = %d\n", recv_data_len);
// int PKG_ONCE_SIZE = 1300;
// int package_num = recv_data_len / PKG_ONCE_SIZE + 1;
// if (recv_data_len % PKG_ONCE_SIZE == 0) {
// package_num -= 1;
// }
// Search search;
// int PKG_HEADER_LEN = sizeof(PackageHead);
// printf("package_num = %d\n", package_num);
// if (recv_data_len > PKG_ONCE_SIZE) {
// for (int i = 0; i < package_num; ++i) {
// if (package_num - 1 == i) {
// memcpy(send_all_data , send_data + i * PKG_ONCE_SIZE, recv_data_len - i * PKG_ONCE_SIZE);
// send(connfd, send_all_data, recv_data_len - i * PKG_ONCE_SIZE,0);
// printf("i = %d,send size = %d\n",i,recv_data_len - i * PKG_ONCE_SIZE + PKG_HEADER_LEN);
// }else if(i == 0){
// printf("i = %d,head size = %d,total size = %d\n",i,PKG_HEADER_LEN, PKG_ONCE_SIZE + PKG_HEADER_LEN);
// pkg.len = recv_data_len;
// memcpy(send_all_data, &pkg, PKG_HEADER_LEN);
// memcpy(send_all_data + PKG_HEADER_LEN, send_data + i * PKG_ONCE_SIZE, PKG_ONCE_SIZE);
// send(connfd, send_all_data, PKG_ONCE_SIZE + PKG_HEADER_LEN,0);
// printf("i = %d,send size = %d\n",i,PKG_ONCE_SIZE + PKG_HEADER_LEN);
// memset(send_all_data,0,38400);
// } else {
// printf("i = %d,package_num = %d\n",i, package_num);
// memcpy(send_all_data, send_data + i * PKG_ONCE_SIZE, PKG_ONCE_SIZE);
// send(connfd, send_all_data, PKG_ONCE_SIZE,0);
// printf("i = %d,send size = %d\n",i,PKG_ONCE_SIZE);
// memset(send_all_data,0,38400);
// mssleep(50);
// }
// }
// } else {
// pkg.len = recv_data_len;
// memcpy(send_all_data, &pkg, PKG_HEADER_LEN);
// memcpy(send_all_data + PKG_HEADER_LEN, &send_data, recv_data_len);
// send(connfd, send_all_data, recv_data_len + PKG_HEADER_LEN,0);
// printf("only one pkg, send data len:%d\n", recv_data_len + PKG_HEADER_LEN);
}
// }
close(connfd);
}
}
close(listenfd);
if (send_all_data)
{
free(send_all_data);
send_all_data = NULL;
}
// close(connfd);
// }
// }
// close(listenfd);
// if (send_all_data)
// {
// free(send_all_data);
// send_all_data = NULL;
// }
return;
}