From 4de0f19d9c60d9e9ca1042724e6a9dc59692b1a4 Mon Sep 17 00:00:00 2001 From: zhangsheng Date: Wed, 22 Jan 2025 17:13:45 +0800 Subject: [PATCH] add manager codes --- jsonparse/cmt_parse.cpp | 216 ++++++++++++++++-- jsonparse/communication_cmd.hpp | 11 +- jsonparse/web_cmd_parse3.cpp | 7 +- localserver/cmt_cmd.cpp | 74 ++++-- localserver/cmt_server.cpp | 94 ++++++++ .../{cmt_protocol.h => cmt_server.hpp} | 90 +++++++- localserver/local_server.hpp | 2 +- main.cpp | 6 +- scheduler/schedule.cpp | 40 ++-- scheduler/upgrade_cfg.cpp | 57 ++--- threadfunc/thread_func.cpp | 15 +- threadfunc/thread_func.hpp | 2 +- utility/tcp_cgi.cpp | 190 +++++++-------- 13 files changed, 589 insertions(+), 215 deletions(-) create mode 100644 localserver/cmt_server.cpp rename localserver/{cmt_protocol.h => cmt_server.hpp} (58%) diff --git a/jsonparse/cmt_parse.cpp b/jsonparse/cmt_parse.cpp index 5e622dd..89b6d99 100644 --- a/jsonparse/cmt_parse.cpp +++ b/jsonparse/cmt_parse.cpp @@ -32,12 +32,13 @@ void JsonData::CmtCmd_80(char* send_data,int& send_length) memcpy(gateway_ver.comm_mode,"WiFi",sizeof(gateway_ver.comm_mode)); #endif memcpy(gateway_ver.comm_mode,"有线",sizeof(gateway_ver.comm_mode)); + memcpy(gateway_ver.terminal_name,ReadStrByOpt(NETWORKCONFIG, "eth0", "hostName").c_str(),sizeof(gateway_ver.terminal_name)); GetSysStatusCMT(gateway_ver.cpu_use,gateway_ver.memory_use,gateway_ver.disk_remain,gateway_ver.temperature); memcpy(send_data,&gateway_ver,sizeof(GatewayVersion)); send_length = sizeof(GatewayVersion); } -void JsonData::CmtCmd_81(char* send_data,int& send_length) +void JsonData::CmtCmd_81(char* recv_body,int& count,char* send_data,int& send_length) { int featureInterVal; int featureInterTime; @@ -46,15 +47,37 @@ void JsonData::CmtCmd_81(char* send_data,int& send_length) int maxSensorNum; int sensorCount; array_t arrRes; - arrRes = sqlite_db_ctrl::instance().GetDataMultiLineTransaction(T_SENSOR_INFO(TNAME), "*", NULL); + char whereCon[512]={0}; + if (recv_body != NULL) + { + printf("count = %d\n",count); + char short_addr_[256]={0}; + for (size_t i = 0; i < count; i++){ + char temp[5]={0}; + memcpy(temp,recv_body + i * 4,4); + printf("short_addr = %s\n",temp); + strcat(short_addr_,"'"); + strcat(short_addr_,temp); + strcat(short_addr_,"'"); + if (i + 1 != count){ + strcat(short_addr_ ,","); + } + } + sprintf(whereCon,"zigbeeShortAddr IN (%s)",short_addr_); + arrRes = sqlite_db_ctrl::instance().GetDataMultiLineTransaction(T_SENSOR_INFO(TNAME), "*", whereCon); + }else{ + arrRes = sqlite_db_ctrl::instance().GetDataMultiLineTransaction(T_SENSOR_INFO(TNAME), "*", NULL); + } int iResult = arrRes.size(); + printf("result = %d\n",iResult); size_t j = 0; if (iResult > 0) { SensorInfo sensor_info[iResult]; for (; j < iResult; j++) { + memcpy(sensor_info[j].sensor_name,arrRes[j][1].c_str(),sizeof(sensor_info[j].sensor_name)); memcpy(sensor_info[j].measurement_id,arrRes[j][44].c_str(),sizeof(sensor_info[j].measurement_id)); - sensor_info[j].short_addr = atoi(arrRes[j][30].c_str()); + memcpy(sensor_info[j].short_addr , arrRes[j][30].c_str(),sizeof(sensor_info[j].short_addr)); memcpy(sensor_info[j].hw_ver,arrRes[j][8].c_str(),sizeof(sensor_info[j].hw_ver)); memcpy(sensor_info[j].soft_ver,arrRes[j][9].c_str(),sizeof(sensor_info[j].soft_ver)); std::vector vParamRSSI; @@ -130,27 +153,31 @@ void JsonData::CmtCmd_81(char* send_data,int& send_length) int static_Count = sqlite_db_ctrl::instance().GetTableRows(szTableName,strsql.c_str()); scheduler::instance().GetScheduleConfig(featureInterVal,waveInterVal,featureInterTime,waveInterTime,maxSensorNum); - zlog_info(zct,"wavex = %d,featureInterVal = %d,waveInterVal",waveX_Count,featureInterVal); - sensor_info[j].wave_x_reporting_rate = (float(waveX_Count)/(86400/(float)waveInterVal)) * 100; - sensor_info[j].wave_y_reporting_rate = (float(waveY_Count)/(86400/(float)waveInterVal)) * 100; - sensor_info[j].wave_z_reporting_rate = (float(waveZ_Count)/(86400/(float)waveInterVal)) * 100; - sensor_info[j].eigen_value_reporting_rate = (float(static_Count)/(86400/(float)featureInterVal)) *100; + zlog_info(zct,"wavex = %d,featureInterVal = %d,waveInterVal = %d",waveX_Count,featureInterVal,waveInterVal); + int day_count = 86400 / waveInterVal; + + sensor_info[j].wave_x_reporting_rate = float( waveX_Count/ day_count) * 100; + printf("day_count = %d,wave_x_reporting_rate = %d\n",day_count,sensor_info[j].wave_x_reporting_rate ); + sensor_info[j].wave_y_reporting_rate = (float(waveY_Count)/(86400/waveInterVal)) * 100; + sensor_info[j].wave_z_reporting_rate = (float(waveZ_Count)/(86400/waveInterVal)) * 100; + sensor_info[j].eigen_value_reporting_rate = (float(static_Count)/(86400/featureInterVal)) * 100; + day_count = 86400 / featureInterVal; + printf("static_Count = %d,day_count = %d,wave_x_reporting_rate = %d\n",static_Count,day_count,sensor_info[j].eigen_value_reporting_rate ); memset(whereCon,0,sizeof(whereCon)); sprintf(whereCon,"channelID = '%s-Z'",arrRes[j][44].c_str()); std::string integratRMS = sqlite_db_ctrl::instance().GetData(T_DATA_INFO(TNAME), "integratRMS",whereCon); - memcpy(sensor_info[j].velocity_rms , integratRMS.c_str(),sizeof(sensor_info[j].velocity_rms));; - sensor_info[j].upgrade_status = 0; - memcpy(sensor_info[j].upgrade_time,"2024-01-08 12:00:00",sizeof(sensor_info[j].upgrade_time)); + sensor_info[j].velocity_rms = atof(integratRMS.c_str()); + sensor_info[j].upgrade_status = atoi(arrRes[j][46].c_str()); + memset(whereCon,0,sizeof(whereCon)); + sprintf(whereCon,"short_addr = '%s' ORDER BY start_timestamp DESC limit 0,1",arrRes[j][30].c_str()); + std::string upgrade_time = sqlite_db_ctrl::instance().GetData("firmware_upgrade", " start_timestamp ", whereCon); + memcpy(sensor_info[j].upgrade_time,upgrade_time.c_str(),sizeof(sensor_info[j].upgrade_time)); sensor_info[j].version = 1; } memcpy(send_data,&sensor_info,sizeof(SensorInfo) * j); - memcpy(send_data + sizeof(SensorInfo) * j,&sensor_info,sizeof(SensorInfo) * j); - memcpy(send_data + sizeof(SensorInfo) * j * 2,&sensor_info,sizeof(SensorInfo) * j); - memcpy(send_data + sizeof(SensorInfo) * j * 3,&sensor_info,sizeof(SensorInfo) * j); - memcpy(send_data + sizeof(SensorInfo) * j * 4,&sensor_info,sizeof(SensorInfo) * j); - send_length = sizeof(SensorInfo) * j * 5; + send_length = sizeof(SensorInfo) * j; zlog_info(zct,"sizeof(SensorInfo) = %d",sizeof(SensorInfo)*j); zlog_info(zct,"send_length = %d",send_length); } @@ -198,13 +225,30 @@ void JsonData::CmtCmd_82(char* MeasurementID,char* send_data,int& channel,int& s free(buffer); } -void JsonData::CmtCmd_83(char* MeasurementID,char* send_data,int& send_length) +void JsonData::CmtCmd_83(char* recv_body,int& count,char* send_data,int& send_length) { vec_t vecRes; std::string filename = ""; char whereCon[128]={0}; - sprintf(whereCon,"MeasurementID = '%s'",MeasurementID); - vecRes = sqlite_db_ctrl::instance().GetDataSingleLine(T_SENSOR_INFO(TNAME), "*", whereCon); + if (recv_body){ + printf("count = %d\n",count); + char MeasurementID_[256]={0}; + for (size_t i = 0; i < count; i++){ + char temp[5]={0}; + memcpy(temp,recv_body + i * 20,20); + printf("short_addr = %s\n",temp); + strcat(MeasurementID_,"'"); + strcat(MeasurementID_,temp); + strcat(MeasurementID_,"'"); + if (i + 1 != count){ + strcat(MeasurementID_ ,","); + } + } + sprintf(whereCon,"MeasurementID IN (%s)",MeasurementID_); + vecRes = sqlite_db_ctrl::instance().GetDataSingleLine(T_SENSOR_INFO(TNAME), "*", whereCon); + }else{ + vecRes = sqlite_db_ctrl::instance().GetDataSingleLine(T_SENSOR_INFO(TNAME), "*", NULL); + } if (vecRes.size() > 0){ DownloadConfig download_config; memcpy(download_config.gw_mac,GlobalConfig::MacAddr_G.c_str(),sizeof(download_config.gw_mac)); @@ -334,12 +378,140 @@ void JsonData::CmtCmd_84(char* filename) } -void JsonData::CmtCmd_85(char* filename) +void JsonData::CmtCmd_85(char* filename,char* file_md5,char* send_data,int& send_length) +{ + zlog_info(zct,"file_path = %s,file_md5 = %d",filename,file_md5); + sleep(3); + int iRet = system("/opt/opt.sh"); + zlog_info(zct, "iRet = %d", iRet); + if (iRet == -1) { + zlog_error(zct, "system() error"); + } +} + +void JsonData::CmtCmd_86(char* recv_body,int& count,char* filename,char* file_md5,char* send_data,int& send_length) +{ + zlog_info(zct,"file_path = %s,file_md5 = %d",filename,file_md5); + UpgradeSensorRes upgrade_sensor_res; + upgrade_sensor_res.code = 0; + sprintf(upgrade_sensor_res.message ,"%s",""); + char cmd[128]={0}; + char file_path[64]={0}; + sprintf(cmd, "mv /opt/%s /opt/DataNode/",filename); + system(cmd); + sprintf(file_path, "/opt/DataNode/%s",filename); + FILE * pFile=NULL; + size_t thisSize = 0; + char *buffer=NULL; + pFile = fopen (file_path,"rb"); + if (pFile==NULL) { + zlog_error(zct,"Error opening file"); + } + else + { + while (fgetc(pFile) != EOF) { + ++thisSize; + } + rewind(pFile); + buffer = (char*)malloc(thisSize); + fread (buffer, sizeof (char), thisSize, pFile); + fclose (pFile); + } + zlog_info(zct,"Read %zu bytes", thisSize); + char sensor_type[6] = {0}; + char sf_version[10] = {0}; + memcpy(sensor_type, buffer, 5); + zlog_info(zct,"model:%s", sensor_type); + char c[2] = {0}; + c[0] = buffer[5]; + uint8_t hw_ver = atoi(c); + c[0] = buffer[6]; + uint8_t sf_ver_m = atoi(c); + c[0] = buffer[7]; + uint8_t sf_ver_s = atoi(c); + sprintf(sf_version,"%d.%d",sf_ver_m,sf_ver_s); + unsigned char ch_crc = 0x00; + int packgeSize = 0; + ch_crc = buffer[12]; + packgeSize = BUILD_UINT32(buffer[8],buffer[9],buffer[10],buffer[11]); + zlog_info(zct,"sf_ver_m = %d",sf_ver_m); + zlog_info(zct,"sf_ver_s = %d",sf_ver_s); + zlog_info(zct,"hw_ver = %d",hw_ver); + zlog_info(zct,"sensor_type = %s",sensor_type); + zlog_info(zct,"packgeSize = %d",packgeSize); + zlog_info(zct,"ch_crc = %02x",ch_crc); + unsigned char sum = 0x00; + for(size_t i = 13; i < thisSize;i++){ + sum += buffer[i]; + } + if (sum % 256 != ch_crc){ + zlog_error(zct,"package CRC error,filename = %s",file_path); + upgrade_sensor_res.code = 1; + sprintf(upgrade_sensor_res.message ,"%s","package CRC error"); + memcpy(send_data,(char*)&upgrade_sensor_res,sizeof(DownloadConfigRes)); + return; + } + zlog_info(zct,"sum = %x\n",sum % 256); + char localtimestamp[32] = {0}; + GetTimeNet(localtimestamp, 1); + for (size_t i = 0; i < count; i++) + { + char wherecon[100] = {0}; + char insertSql[200] = {0}; + char updateSql[100] = {0}; + char short_addr_[21] = {0}; + memcpy(short_addr_,recv_body + i * 4,4); + sprintf(wherecon," zigbeeShortAddr = '%s' ",short_addr_); + vec_t vecResult = sqlite_db_ctrl::instance().GetDataSingleLine(T_SENSOR_INFO(TNAME), " hardVersion,softVersion,ProductNo,zigbeeShortAddr ", wherecon); + if (hw_ver != atoi(vecResult[0].c_str())){ + zlog_error(zct,"hardVersion error,filename = %s",file_path); + upgrade_sensor_res.code = 2; + sprintf(upgrade_sensor_res.message ,"%s","hardVersion error"); + memcpy(send_data,(char*)&upgrade_sensor_res,sizeof(DownloadConfigRes)); + return; + } + sprintf(insertSql, " '%s','%s','','','','','%d.%d','%s',1,'%s'",vecResult[3].c_str(),localtimestamp,sf_ver_m,sf_ver_s,vecResult[1].c_str(),filename); + sqlite_db_ctrl::instance().InsertData(" firmware_upgrade ", insertSql); + //0 默认状态,1 升级中,2 升级成功,3 升级失败 + memset(wherecon,0,sizeof(wherecon)); + memset(updateSql,0,sizeof(updateSql)); + sprintf(wherecon," zigbeeShortAddr = '%s'",vecResult[3].c_str()); + sprintf(updateSql, " upgradeStatus = %d ", 1); + sqlite_db_ctrl::instance().UpdateTableData(T_SENSOR_INFO(TNAME), updateSql,wherecon); + uint16_t short_addr; + char *end_ptr = NULL; + short_addr = strtol(vecResult[3].c_str(), &end_ptr, 16); + int res = scheduler::instance().UpgradeSensor(short_addr,std::string(sensor_type),atoi(vecResult[0].c_str()),vecResult[1],std::string(sf_version)); + } + free(buffer); +} + +void JsonData::CmtCmd_88(char* recv_body,int& count) { } -void JsonData::CmtCmd_86(char* filename) -{ +std::string get_file_md5(const std::string& file_path) { + // 使用 md5sum 命令计算文件的 MD5 值 + std::string command = "md5sum " + file_path + " | awk '{ print $1 }'"; + // 使用 popen 执行命令并获取输出 + std::array buffer; + std::string result; + std::shared_ptr pipe(popen(command.c_str(), "r"), pclose); + + if (!pipe) { + std::cerr << "Error: Unable to run md5sum command" << std::endl; + return ""; + } + + // 从命令的输出流中读取数据 + while (fgets(buffer.data(), buffer.size(), pipe.get()) != nullptr) { + result += buffer.data(); + } + + // 去除末尾的换行符 + result.erase(result.find_last_not_of("\n") + 1); + + return result; } \ No newline at end of file diff --git a/jsonparse/communication_cmd.hpp b/jsonparse/communication_cmd.hpp index 057954a..d588d0a 100644 --- a/jsonparse/communication_cmd.hpp +++ b/jsonparse/communication_cmd.hpp @@ -5,7 +5,7 @@ #include #include #include "common/parameter_defination.hpp" -#include "localserver/cmt_protocol.h" +#include "localserver/cmt_server.hpp" class JsonData { public: @@ -79,12 +79,13 @@ public: //CMT tcp void CmtCmd_80(char* send_data,int& return_length); - void CmtCmd_81(char* send_data,int& return_length); + void CmtCmd_81(char* recv_body,int& count,char* send_data,int& return_length); void CmtCmd_82(char* MeasurementID,char* send_data,int& channel,int& return_length); - void CmtCmd_83(char* MeasurementID,char* send_data,int& return_length); + void CmtCmd_83(char* recv_body,int& count,char* send_data,int& return_length); void CmtCmd_84(char* filename); - void CmtCmd_85(char* filename); - void CmtCmd_86(char* filename); + void CmtCmd_85(char* filename,char* file_md5,char* send_data,int& send_length); + void CmtCmd_86(char* recv_body,int& count,char* filename,char* file_md5,char* send_data,int& send_length); + void CmtCmd_88(char* recv_body,int& count); private: Json::FastWriter show_value_; diff --git a/jsonparse/web_cmd_parse3.cpp b/jsonparse/web_cmd_parse3.cpp index b8648be..953ea90 100644 --- a/jsonparse/web_cmd_parse3.cpp +++ b/jsonparse/web_cmd_parse3.cpp @@ -644,7 +644,7 @@ std::string JsonData::JsonCmd_Cgi_60(Param_60 ¶m){ jsonVal["success"] = true; jsonVal["message"] = ""; char file_path[64]={0}; - char cmd[64]={0}; + char cmd[128]={0}; sprintf(cmd, "mv /opt/%s /opt/DataNode/",param.fileName.c_str()); system(cmd); sprintf(file_path, "/opt/DataNode/%s",param.fileName.c_str()); @@ -751,16 +751,17 @@ std::string JsonData::JsonCmd_Cgi_60(Param_60 ¶m){ char *end_ptr = NULL; short_addr = strtol(vecResult[3].c_str(), &end_ptr, 16); int res = scheduler::instance().UpgradeSensor(short_addr,std::string(sensor_type),atoi(vecResult[0].c_str()),vecResult[1],std::string(sf_version)); - if (res != 0) - { + if (res != 0){ jsonVal["success"] = false; jsonVal["message"] = "UpgradeSensor error"; free(buffer); return show_value_.write(jsonVal); } + zlog_info(zct,"000000000000"); } free(buffer); + zlog_info(zct,"11111111111"); return show_value_.write(jsonVal); } diff --git a/localserver/cmt_cmd.cpp b/localserver/cmt_cmd.cpp index 3dbe711..fe1a87c 100644 --- a/localserver/cmt_cmd.cpp +++ b/localserver/cmt_cmd.cpp @@ -6,11 +6,11 @@ extern zlog_category_t *zct; -void LocalServer::HandleTcp_cmd(const char* recv_data,char* send_data,uint8_t& rescmd,int& send_length) +void LocalServer::HandleTcp_cmd(const char* recv_data,char* send_data,uint8_t& rescmd,int& send_length,int recvbody_length) { zlog_info(zct,"HandleTcp_cmd"); - PackgeHead head; - memcpy(&head,recv_data,sizeof(PackgeHead)); + PackageHead head; + memcpy(&head,recv_data,sizeof(PackageHead)); uint8_t cmd = head.cmd; rescmd = head.cmd; printf("cmd = %d\n",cmd); @@ -28,56 +28,82 @@ void LocalServer::HandleTcp_cmd(const char* recv_data,char* send_data,uint8_t& r }break; case kGateWayVersion: { jd.CmtCmd_80(send_data,send_length); - // memcpy(send_data,&send_data_,sizeof(send_length)); - // if (send_data_){ - // free(send_data_); - // } }break; case kSensorInfo:{ - jd.CmtCmd_81(send_data,send_length); - + char *recv_body = NULL; + if (recvbody_length > 0){ + recv_body = (char*)malloc(recvbody_length - 4); + memcpy(recv_body,recv_data + sizeof(PackageHead) + 4,recvbody_length - 4); + } + int count = 0; + memcpy((char*)&count,recv_data + sizeof(PackageHead),4); + jd.CmtCmd_81(recv_body,count,send_data,send_length); + if (recv_body){ + free(recv_body); + } }break; case kSensorWave:{ WaveReq wave_req; - memcpy(&wave_req,recv_data + sizeof(PackgeHead),sizeof(WaveReq)); + memcpy(&wave_req,recv_data + sizeof(PackageHead),sizeof(WaveReq)); jd.CmtCmd_82(wave_req.measurement_id,send_data,wave_req.channel,send_length); }break; case KDownloadConfig:{ - DownloadConfigReq download_config; - memcpy(&download_config,recv_data + sizeof(PackgeHead),sizeof(DownloadConfigReq)); - jd.CmtCmd_83(download_config.measurement_id,send_data,send_length); + char *recv_body = NULL; + int count = 0; + if (recvbody_length > 0){ + recv_body = (char*)malloc(recvbody_length - 4); + memcpy((char*)&count,(char*)recv_data + sizeof(PackageHead),4); + memcpy(recv_body,recv_data + sizeof(PackageHead) + 4,recvbody_length - 4); + } + jd.CmtCmd_83(recv_body,count,send_data,send_length); zlog_info(zct, "CmtCmd_83 = %d",send_length); + if (recv_body){ + free(recv_body); + } }break; case KUploadConfig:{ UploadConfigReq upload_config; - memcpy(&upload_config,recv_data + sizeof(PackgeHead),sizeof(UploadConfigReq)); + memcpy(&upload_config,recv_data + sizeof(PackageHead),sizeof(UploadConfigReq)); jd.CmtCmd_84(upload_config.filename); }break; case KUpgadeGateway:{ - sleep(3); - int iRet = system("/opt/opt.sh"); - zlog_info(zct, "iRet = %d", iRet); - if (iRet == -1) { - zlog_error(zct, "system() error"); - } + + UpgradeGwReq upgrade_gw; + memcpy(&upgrade_gw,recv_data + sizeof(PackageHead),sizeof(UpgradeGwReq)); + jd.CmtCmd_85(upgrade_gw.filename,upgrade_gw.md5,send_data,send_length); }break; case KUpgradeSensor:{ + char *recv_body = NULL; + int count = 0; + if (recvbody_length > 0){ + recv_body = (char*)malloc(recvbody_length - sizeof(UpgradeSensorReq) - 4); + memcpy((char*)&count,(char*)recv_data + sizeof(PackageHead) + sizeof(UpgradeSensorReq) ,4); + memcpy(recv_body,recv_data + sizeof(PackageHead) + sizeof(UpgradeSensorReq) + 4,recvbody_length - sizeof(UpgradeSensorReq) - 4); + } UpgradeSensorReq upgrade_sensor; - memcpy(&upgrade_sensor,recv_data + sizeof(PackgeHead),sizeof(UpgradeSensorReq)); - jd.CmtCmd_86(upgrade_sensor.filename); + memcpy(&upgrade_sensor,recv_data + sizeof(PackageHead),sizeof(UpgradeSensorReq)); + jd.CmtCmd_86(recv_body,count,upgrade_sensor.filename,upgrade_sensor.md5,send_data,send_length); }break; case KEigenvalue:{ } break; case KUpgradeSensorStop:{ - + char *recv_body = NULL; + int count = 0; + if (recvbody_length > 0){ + recv_body = (char*)malloc(recvbody_length - 4); + memcpy((char*)&count,(char*)recv_data + sizeof(PackageHead) ,4); + memcpy(recv_body,recv_data + sizeof(PackageHead) + 4 ,recvbody_length - 4); + } + UpgradeSensorReq upgrade_sensor; + memcpy(&upgrade_sensor,recv_data + sizeof(PackageHead),sizeof(UpgradeSensorReq)); + jd.CmtCmd_88(recv_body,count); } break; default: - return NULL; break; } diff --git a/localserver/cmt_server.cpp b/localserver/cmt_server.cpp new file mode 100644 index 0000000..2b4d6c3 --- /dev/null +++ b/localserver/cmt_server.cpp @@ -0,0 +1,94 @@ +#include "cmt_server.hpp" +#include +#include +#include +#include +#include +#include + +#include "localserver/local_server.hpp" + +extern zlog_category_t *zct; + +void CMTSession::start() { + memset(data_, 0, CMT_TCP_LEN); + sessionSet_.insert(shared_from_this()); // 将会话添加到会话集合中 + do_read(); +} +void CMTSession::set_data(char *data, int len) { + memcpy(data_, data, len); +} +void CMTSession::do_write(std::size_t length) { + auto self(shared_from_this()); + zlog_debug(zct, "[CMT] response len = %d", length); + boost::asio::async_write(socket_, boost::asio::buffer(data_, length), [this, self](boost::system::error_code ec, std::size_t /*length*/) { + if (ec) { + zlog_error(zct, "[CMT] fail to send data, %s, %d", ec.category().name(), ec.value()); + socket_.close(); + sessionSet_.erase(shared_from_this()); + } else { + zlog_debug(zct, "[CMT] waiting for next message..."); + do_read(); + // socket_.close(); + // sessionSet_.erase(shared_from_this()); + } + }); +} + +void CMTServer::do_accept() { + acceptor_.async_accept(socket_, [this](boost::system::error_code ec) { + zlog_debug(zct, "[CMT] accept a socket"); + if (!ec) { + std::make_shared(std::move(socket_), sessionSet_)->start(); + } + + do_accept(); + }); +} +void CMTSession::do_read() { + auto self(shared_from_this()); + socket_.async_read_some(boost::asio::buffer(data_, CMT_TCP_LEN), [this, self](boost::system::error_code ec, std::size_t length) { + if (!ec) { + if (length == 0) { + zlog_warn(zct, "[CMT] Client disconnected (length 0)"); + sessionSet_.erase(shared_from_this()); + return; + } + + if (length < 6 || data_[0] != 0xAA || data_[1] != 0x55 || data_[2] != 0xAA) { + zlog_warn(zct, "[CMT] invalid data package, len:%d", length); + do_read(); + } + uint8_t cmd = data_[3]; + int payload_len = 0; + memcpy((char*)&payload_len, (char*)&data_[4], 4); + payload_len = htonl(payload_len); + zlog_debug(zct, "[CMT] cmd: %d, message len: %d, payload len: %d, head:%2x-%2x-%2x", cmd, length, payload_len, data_[0], data_[1], data_[2]); + zlog_debug(zct, "[CMT] payload bytes %d, %d", data_[4], data_[5]); + int send_data_len = 0; + char send_data[96100] = {0}; + LocalServer::HandleTcp_cmd(data_,send_data,cmd,send_data_len,payload_len); + PackageHead pkg_head; + pkg_head.head[0] = 0xAA; + pkg_head.head[1] = 0x55; + pkg_head.head[2] = 0xAA; + pkg_head.cmd = cmd; + pkg_head.len = send_data_len; + memset(data_,0,sizeof(data_)); + memcpy(data_, &pkg_head, head_len_); + memcpy(data_ + head_len_, &send_data, send_data_len); + do_write(head_len_ + send_data_len); + + } else if (ec == boost::asio::error::eof){ + zlog_warn(zct, "[CMT] Client disconnect the connection"); + sessionSet_.erase(shared_from_this()); + } else if (ec == boost::asio::error::connection_reset){ + zlog_warn(zct, "[CMT] Connection reset by client"); + sessionSet_.erase(shared_from_this()); + } else { + zlog_error(zct, "[CMT] Error message, reason: %s", ec.message().c_str()); + socket_.close(); + sessionSet_.erase(shared_from_this()); + } + }); +} diff --git a/localserver/cmt_protocol.h b/localserver/cmt_server.hpp similarity index 58% rename from localserver/cmt_protocol.h rename to localserver/cmt_server.hpp index 2e2ae6f..b5c8138 100644 --- a/localserver/cmt_protocol.h +++ b/localserver/cmt_server.hpp @@ -1,15 +1,25 @@ #ifndef CMT_PROTOCOL_HPP_ #define CMT_PROTOCOL_HPP_ #include +#include +#include +#include +#include +#include + +using boost::asio::ip::tcp; + +#define CMT_TCP_LEN 100000 + //1 tcp server可以参考本安有线中的代码,侦听端口10000 //2 定义无线网关与传感器用到的结构 //3 包头为 #pragma pack(1) -struct PackgeHead{ +struct PackageHead{ uint8_t head[3]; // 固定值:0xAA55AA uint8_t cmd; - uint16_t len; + int len; char data[0]; }; @@ -45,10 +55,15 @@ enum LooseStatus{ LOOSE = 1 }; //传感器信息cmd 81 + +struct SensorInfoReq{ + char short_addr[4]; +}; struct SensorInfo{ int version; + char sensor_name[64]; char measurement_id[20]; - int short_addr; + char short_addr[5]; char hw_ver[12]; char soft_ver[12]; int gateway_rssi; @@ -63,7 +78,7 @@ struct SensorInfo{ int wave_x_reporting_rate; int wave_y_reporting_rate; int wave_z_reporting_rate; - char velocity_rms[10]; + float velocity_rms; int upgrade_status; // 参考 SensorUpgradeStatus char upgrade_time[20]; }; @@ -128,16 +143,29 @@ struct DownloadConfig{ //上传配置 cmd 84 struct UploadConfigReq{ - char filename[32]; + char filename[64]; + char md5[33]; }; //网关更新 cmd 85 - -//传感器更新 cmd 86 -struct UpgradeSensorReq{ - char filename[32]; - int upgrade_short_addr[32]; +struct UpgradeGwReq{ + char filename[64]; + char md5[33]; }; +struct UpgradeGwRes{ + int code; + char message[64]; +}; +//传感器更新 cmd 86 +struct UpgradeSensorReq{ + char filename[64]; + char md5[33]; + char upgrade_short_addr[0]; +}; +struct UpgradeSensorRes{ + int code; + char message[64]; +}; //获取特征值 cmd 87 @@ -153,4 +181,46 @@ struct Search { int gw_type; // GatewayType }; +class CMTSession : public std::enable_shared_from_this { +public: + CMTSession(tcp::socket socket, std::unordered_set>& sessionSet) : socket_(std::move(socket)), sessionSet_(sessionSet) { + web_version_ = "1.0.1"; + version_ = 1; + head_len_ = sizeof(PackageHead); + } + + void start(); + void set_data(char *data, int len); + void do_write(std::size_t length); + +private: + void do_read(); + + tcp::socket socket_; + std::unordered_set>& sessionSet_; + char data_[CMT_TCP_LEN]; + std::string web_version_; + int version_; + int head_len_; +}; + +class CMTServer { +public: + CMTServer(boost::asio::io_service& io_service, short port) : acceptor_(io_service, tcp::endpoint(tcp::v4(), port)), socket_(io_service) { do_accept(); } + + // 向所有客户端发送消息 + void send_message_to_all(char *message, int len) { + for (auto& session : sessionSet_) { + session->set_data(message, len); + session->do_write(len); + } + } +private: + void do_accept(); + + tcp::acceptor acceptor_; + tcp::socket socket_; + std::unordered_set> sessionSet_; +}; + #endif diff --git a/localserver/local_server.hpp b/localserver/local_server.hpp index d08842c..f448952 100644 --- a/localserver/local_server.hpp +++ b/localserver/local_server.hpp @@ -80,7 +80,7 @@ public: virtual ~LocalServer(); static void HandleFromServer(const char *pData, int pLen, const char *topic); static std::string HandleCgi_cmd(std::string &pData); - static void HandleTcp_cmd(const char* recvData,char* send_data,uint8_t& rescmd,int& reslength); + static void HandleTcp_cmd(const char* recvData,char* send_data,uint8_t& rescmd,int& reslength,int recvbody_length = 0); }; #endif // LOCAL_SERVER_HPP_ diff --git a/main.cpp b/main.cpp index 3c51654..58a8bb4 100644 --- a/main.cpp +++ b/main.cpp @@ -18,6 +18,7 @@ #include "dbaccess/sql_db.hpp" #include "uart/uart.hpp" #include "minilzo/minilzo.h" +#include "scheduler/schedule.hpp" extern std::vector g_VecWaveDataX; extern std::vector g_VecWaveDataY; @@ -43,7 +44,8 @@ int main(int argc, char *argv[]) { zlog_info(zbt, " Firmware compile time:%s %s,version %s", __DATE__, __TIME__, GlobalConfig::Version.c_str()); zlog_info(zbt, "####CIDNSOFT start####"); - + return 0; + g_VecWaveDataX.reserve(1000); g_VecWaveDataY.reserve(1000); g_VecWaveDataZ.reserve(1500); @@ -116,7 +118,7 @@ int main(int argc, char *argv[]) { startTcpCgi.detach(); //启动CMT server - boost::thread startTcpCmt(attrs, StartTCPServer); + boost::thread startTcpCmt(attrs, StartCMTServer); startTcpCmt.detach(); sleep(5); diff --git a/scheduler/schedule.cpp b/scheduler/schedule.cpp index 79c3bb9..bb88836 100644 --- a/scheduler/schedule.cpp +++ b/scheduler/schedule.cpp @@ -544,24 +544,28 @@ int SensorScheduler::UpdateConfigResult(int short_addr, int result) { int SensorScheduler::UpgradeSensor(int short_addr, std::string sensor_type, int hw_version, std::string current_sw_version, std::string upgrade_sw_version) { int id = 0; - auto iter = short_addr_map_.find(short_addr); - if (iter == short_addr_map_.end()) { - zlog_error(zct, "cannot find id for short_addr %x", short_addr); - return 1; - } else { - id = iter->second; - } - UpgradeInfo info; - info.try_times = 0; - info.sensor_type = sensor_type; - info.hw_version = hw_version; - info.current_sw_version = current_sw_version; - info.upgrade_sw_version = upgrade_sw_version; - long ts = GetLocalTs(); - info.submit_time = GetUTCTime(ts); - upgrade_[id] = info; - zlog_info(zbt, "[%d] short addr:%x add upgrade info", id, short_addr); - UpgradeCfg::WriteCfg(upgrade_); + // auto iter = short_addr_map_.find(short_addr); + // if (iter == short_addr_map_.end()) { + // zlog_error(zct, "cannot find id for short_addr %x", short_addr); + // return 1; + // } else { + // id = iter->second; + // } + // zlog_info(zbt, "[%d] short addr:%x sensor_type = %s,hw_version = %d,\ + // current_sw_version = %s,upgrade_sw_version = %s", id, short_addr,\ + // sensor_type.c_str(),hw_version,current_sw_version.c_str(),upgrade_sw_version.c_str()); + // UpgradeInfo info; + // info.try_times = 0; + // info.sensor_type = sensor_type; + // info.hw_version = hw_version; + // info.current_sw_version = current_sw_version; + // info.upgrade_sw_version = upgrade_sw_version; + // long ts = GetLocalTs(); + // info.submit_time = GetUTCTime(ts); + // upgrade_[id] = info; + // zlog_info(zbt, "[%d] short addr:%x add upgrade info", id, short_addr); + // UpgradeCfg::WriteCfg(upgrade_); + //zlog_info(zbt, "[%d] 222short addr:%x add upgrade info", id, short_addr); return 0; } diff --git a/scheduler/upgrade_cfg.cpp b/scheduler/upgrade_cfg.cpp index 3ddcce8..e61276d 100644 --- a/scheduler/upgrade_cfg.cpp +++ b/scheduler/upgrade_cfg.cpp @@ -59,32 +59,37 @@ int UpgradeCfg::ReadCfg(std::map &upgrade) { } int UpgradeCfg::WriteCfg(std::map &upgrade) { - if (upgrade.size() == 0) { - ClearCfg(); - return 0; - } - - Json::Value root; - for (auto item : upgrade) { - Json::Value upgrade_item; - upgrade_item["id"] = item.first; - upgrade_item["try_times"] = item.second.try_times; - upgrade_item["type"] = item.second.sensor_type; - upgrade_item["hw_version"] = item.second.hw_version; - upgrade_item["current_sw_version"] = item.second.current_sw_version; - upgrade_item["upgrade_sw_version"] = item.second.upgrade_sw_version; - upgrade_item["submit_time"] = item.second.submit_time; - Json::Value try_world_time; - for (auto entry : item.second.try_world_time1) { - try_world_time.append(entry); - } - upgrade_item["try_world_time"] = try_world_time; - root.append(upgrade_item); - } - Json::StyledStreamWriter streamWriter; - std::ofstream out_file(UPGRADE_CONFIG); - streamWriter.write(out_file, root); - out_file.close(); + // if (upgrade.size() == 0) { + // ClearCfg(); + // return 0; + // } + zlog_info(zct,"123 = %d",upgrade.size()); + // Json::Value root; + // for (auto item : upgrade) { + // zlog_info(zct,"1111"); + // Json::Value upgrade_item; + // upgrade_item["id"] = item.first; + // upgrade_item["try_times"] = item.second.try_times; + // upgrade_item["type"] = item.second.sensor_type; + // upgrade_item["hw_version"] = item.second.hw_version; + // upgrade_item["current_sw_version"] = item.second.current_sw_version; + // upgrade_item["upgrade_sw_version"] = item.second.upgrade_sw_version; + // upgrade_item["submit_time"] = item.second.submit_time; + // Json::Value try_world_time; + // zlog_info(zct,"2222"); + // for (auto entry : item.second.try_world_time1) { + // try_world_time.append(entry); + // } + // zlog_info(zct,"333"); + // upgrade_item["try_world_time"] = try_world_time; + // root.append(upgrade_item); + // zlog_info(zct,"444"); + // } + // zlog_info(zct,"456"); + // Json::StyledStreamWriter streamWriter; + // std::ofstream out_file(UPGRADE_CONFIG); + // streamWriter.write(out_file, root); + // out_file.close(); return 0; } diff --git a/threadfunc/thread_func.cpp b/threadfunc/thread_func.cpp index 8ac8ba1..8496e14 100644 --- a/threadfunc/thread_func.cpp +++ b/threadfunc/thread_func.cpp @@ -20,6 +20,9 @@ #include "utility/udp_scan.hpp" #include "utility/search_dev.hpp" #include "localserver/local_server.hpp" +#include "localserver/cmt_server.hpp" +#include + extern zlog_category_t *zct; extern zlog_category_t *zbt; @@ -37,16 +40,12 @@ void StartCgiServer() { } } -void StartTCPServer() { - zlog_info(zbt, "start deal TCP"); - - while (1) { - TcpCgi::startTCPServer(); - sleep(10); - } +std::unique_ptr g_mgr_server; +void StartCMTServer() { + boost::asio::io_service io_service; + g_mgr_server = std::make_unique(io_service, 10000); io_service.run(); } - void RunLED() { while (1) { gpio_set(GlobalConfig::GPIO_G.runLed, 1); diff --git a/threadfunc/thread_func.hpp b/threadfunc/thread_func.hpp index 5052a1f..a9c7b58 100644 --- a/threadfunc/thread_func.hpp +++ b/threadfunc/thread_func.hpp @@ -17,7 +17,7 @@ void CheckThread(); //循环检测线程 void StartMqttClient(); //启动mqtt服务 void SearchThread(); //组播功能, 提供发现设备功能 void StartCgiServer(); //启动cgi处理服务端 -void StartTCPServer(); +void StartCMTServer(); void HeartRep(); void UartStart(); // void TestUart(); diff --git a/utility/tcp_cgi.cpp b/utility/tcp_cgi.cpp index 3a4a814..6f042ad 100644 --- a/utility/tcp_cgi.cpp +++ b/utility/tcp_cgi.cpp @@ -1,6 +1,6 @@ #include "tcp_cgi.hpp" #include "localserver/local_server.hpp" -#include "localserver/cmt_protocol.h" +#include "localserver/cmt_server.hpp" #include extern zlog_category_t *zct; @@ -61,108 +61,108 @@ void TcpCgi::startCgiServer() { } void TcpCgi::startTCPServer() { - int listenfd, connfd; - int mw_optval = 1; + // int listenfd, connfd; + // int mw_optval = 1; - struct sockaddr_in servaddr; - char buff[40960]; - int n; - if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { - zlog_error(zbt, "create socket error: %s(errno: %d)", strerror(errno), errno); - return; - } - setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&mw_optval, sizeof(mw_optval)); + // struct sockaddr_in servaddr; + // char buff[40960]; + // int n; + // if ((listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { + // zlog_error(zbt, "create socket error: %s(errno: %d)", strerror(errno), errno); + // return; + // } + // setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, (char *)&mw_optval, sizeof(mw_optval)); - memset(&servaddr, 0, sizeof(servaddr)); - servaddr.sin_family = AF_INET; - servaddr.sin_addr.s_addr = htonl(INADDR_ANY); - servaddr.sin_port = htons(10000); + // memset(&servaddr, 0, sizeof(servaddr)); + // servaddr.sin_family = AF_INET; + // servaddr.sin_addr.s_addr = htonl(INADDR_ANY); + // servaddr.sin_port = htons(10000); - if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) { - zlog_error(zbt, "bind socket error: %s(errno: %d)", strerror(errno), errno); - return; - } + // if (bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)) == -1) { + // zlog_error(zbt, "bind socket error: %s(errno: %d)", strerror(errno), errno); + // return; + // } - if (listen(listenfd, 10) == -1) { - zlog_error(zbt, "listen socket error: %s(errno: %d)", strerror(errno), errno); - return; - } - char *send_all_data = NULL; - send_all_data = (char*)malloc(384000); // 96000*4 = 384000 - if (send_all_data == NULL) { - zlog_error(zct,"send_all_data Memory error"); - } + // if (listen(listenfd, 10) == -1) { + // zlog_error(zbt, "listen socket error: %s(errno: %d)", strerror(errno), errno); + // return; + // } + // char *send_all_data = NULL; + // send_all_data = (char*)malloc(384000); // 96000*4 = 384000 + // if (send_all_data == NULL) { + // zlog_error(zct,"send_all_data Memory error"); + // } - while (1) { - if ((connfd = accept(listenfd, (struct sockaddr *)NULL, NULL)) == -1) { - zlog_error(zct, "accept socket error: %s(errno: %d)", strerror(errno), errno); - continue; - } - n = recv(connfd, buff, 40960, 0); - if (n <= 0) { - zlog_info(zct, "recv 0 and will close"); - close(connfd); - } else { - printf("recv len = %d\n", n); - buff[n] = '\0'; - PackgeHead pkg; - pkg.head[0] = 0xAA; - pkg.head[1] = 0x55; - pkg.head[2] = 0xAA; - int recv_data_len = 0; - char send_data[96100] = {0}; - LocalServer::HandleTcp_cmd(buff,send_data,pkg.cmd,recv_data_len); - printf("response len = %d\n", recv_data_len); - int PKG_ONCE_SIZE = 1300; - int package_num = recv_data_len / PKG_ONCE_SIZE + 1; - if (recv_data_len % PKG_ONCE_SIZE == 0) { - package_num -= 1; - } - Search search; - int PKG_HEADER_LEN = sizeof(PackgeHead); - printf("package_num = %d\n", package_num); - if (recv_data_len > PKG_ONCE_SIZE) { - for (int i = 0; i < package_num; ++i) { - if (package_num - 1 == i) { - memcpy(send_all_data , send_data + i * PKG_ONCE_SIZE, recv_data_len - i * PKG_ONCE_SIZE); - send(connfd, send_all_data, recv_data_len - i * PKG_ONCE_SIZE,0); - printf("i = %d,send size = %d\n",i,recv_data_len - i * PKG_ONCE_SIZE + PKG_HEADER_LEN); - }else if(i == 0){ - printf("i = %d,head size = %d,total size = %d\n",i,PKG_HEADER_LEN, PKG_ONCE_SIZE + PKG_HEADER_LEN); - pkg.len = recv_data_len; - memcpy(send_all_data, &pkg, PKG_HEADER_LEN); - memcpy(send_all_data + PKG_HEADER_LEN, send_data + i * PKG_ONCE_SIZE, PKG_ONCE_SIZE); - send(connfd, send_all_data, PKG_ONCE_SIZE + PKG_HEADER_LEN,0); - printf("i = %d,send size = %d\n",i,PKG_ONCE_SIZE + PKG_HEADER_LEN); - memset(send_all_data,0,38400); - } else { - printf("i = %d,package_num = %d\n",i, package_num); - memcpy(send_all_data, send_data + i * PKG_ONCE_SIZE, PKG_ONCE_SIZE); - send(connfd, send_all_data, PKG_ONCE_SIZE,0); - printf("i = %d,send size = %d\n",i,PKG_ONCE_SIZE); - memset(send_all_data,0,38400); - mssleep(50); - } - } - } else { - pkg.len = recv_data_len; - memcpy(send_all_data, &pkg, PKG_HEADER_LEN); - memcpy(send_all_data + PKG_HEADER_LEN, &send_data, recv_data_len); - send(connfd, send_all_data, recv_data_len + PKG_HEADER_LEN,0); - printf("only one pkg, send data len:%d\n", recv_data_len + PKG_HEADER_LEN); + // while (1) { + // if ((connfd = accept(listenfd, (struct sockaddr *)NULL, NULL)) == -1) { + // zlog_error(zct, "accept socket error: %s(errno: %d)", strerror(errno), errno); + // continue; + // } + // n = recv(connfd, buff, 40960, 0); + // if (n <= 0) { + // zlog_info(zct, "recv 0 and will close"); + // close(connfd); + // } else { + // printf("recv len = %d\n", n); + // buff[n] = '\0'; + // PackageHead pkg; + // pkg.head[0] = 0xAA; + // pkg.head[1] = 0x55; + // pkg.head[2] = 0xAA; + // int recv_data_len = 0; + // char send_data[96100] = {0}; + // LocalServer::HandleTcp_cmd(buff,send_data,pkg.cmd,recv_data_len); + // printf("response len = %d\n", recv_data_len); + // int PKG_ONCE_SIZE = 1300; + // int package_num = recv_data_len / PKG_ONCE_SIZE + 1; + // if (recv_data_len % PKG_ONCE_SIZE == 0) { + // package_num -= 1; + // } + // Search search; + // int PKG_HEADER_LEN = sizeof(PackageHead); + // printf("package_num = %d\n", package_num); + // if (recv_data_len > PKG_ONCE_SIZE) { + // for (int i = 0; i < package_num; ++i) { + // if (package_num - 1 == i) { + // memcpy(send_all_data , send_data + i * PKG_ONCE_SIZE, recv_data_len - i * PKG_ONCE_SIZE); + // send(connfd, send_all_data, recv_data_len - i * PKG_ONCE_SIZE,0); + // printf("i = %d,send size = %d\n",i,recv_data_len - i * PKG_ONCE_SIZE + PKG_HEADER_LEN); + // }else if(i == 0){ + // printf("i = %d,head size = %d,total size = %d\n",i,PKG_HEADER_LEN, PKG_ONCE_SIZE + PKG_HEADER_LEN); + // pkg.len = recv_data_len; + // memcpy(send_all_data, &pkg, PKG_HEADER_LEN); + // memcpy(send_all_data + PKG_HEADER_LEN, send_data + i * PKG_ONCE_SIZE, PKG_ONCE_SIZE); + // send(connfd, send_all_data, PKG_ONCE_SIZE + PKG_HEADER_LEN,0); + // printf("i = %d,send size = %d\n",i,PKG_ONCE_SIZE + PKG_HEADER_LEN); + // memset(send_all_data,0,38400); + // } else { + // printf("i = %d,package_num = %d\n",i, package_num); + // memcpy(send_all_data, send_data + i * PKG_ONCE_SIZE, PKG_ONCE_SIZE); + // send(connfd, send_all_data, PKG_ONCE_SIZE,0); + // printf("i = %d,send size = %d\n",i,PKG_ONCE_SIZE); + // memset(send_all_data,0,38400); + // mssleep(50); + // } + // } + // } else { + // pkg.len = recv_data_len; + // memcpy(send_all_data, &pkg, PKG_HEADER_LEN); + // memcpy(send_all_data + PKG_HEADER_LEN, &send_data, recv_data_len); + // send(connfd, send_all_data, recv_data_len + PKG_HEADER_LEN,0); + // printf("only one pkg, send data len:%d\n", recv_data_len + PKG_HEADER_LEN); - } + // } - close(connfd); - } - } - close(listenfd); - if (send_all_data) - { - free(send_all_data); - send_all_data = NULL; - } + // close(connfd); + // } + // } + // close(listenfd); + // if (send_all_data) + // { + // free(send_all_data); + // send_all_data = NULL; + // } return; }