wirelessgateway/threadfunc/SH_ThreadFunc.cpp

410 lines
13 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <map>
#include <time.h>
#include <math.h>
#include <string.h>
#include <cstdlib>
#include <linux/types.h>
#include <sys/sysinfo.h>
#include <boost/typeof/typeof.hpp>
#include "SH_ThreadFunc.hpp"
#include "../API_log/SH_log.h"
#include "../uart/SH_Uart.hpp"
namespace{
Uart *pUart = Uart::instance();
LocalServer *cidwServer = LocalServer::instance();
}
static std::string serverPort;
static std::string uptime;
static long long connect_lost_time = 0; //ms
static long long connect_time = 0; //ms
void CheckThread()
{
print_info("ENTER CHECK THREAD \n");
std::string runinfo = "系统循环检测模块启动";
int heart_count = 0;
int time_check = 0;
int reset_flag = 0;
int online_check = 0;
int HardStatus = 0;
while (GlobalConfig::QuitFlag_G) {
if (10 == heart_count) {
// StatusPub();
if (GlobalConfig::LinkCount > 30) {
exit(0);
}
std::string ipTemp = IpAddrInit();
if ( 0 != ipTemp.compare(GlobalConfig::IpAddr_G)) {
exit(0);
}
heart_count = 0;
}
if (600 == time_check) {
char buf[256] = {0};
sprintf(buf, "{\"dataNodeGatewayNo\":\"%s\",\"cmd\":\"12\",\"status\":\"REQ\"}",
GlobalConfig::MacAddr_G.c_str());
std::string str = std::string(buf);
data_publish(str.c_str(), GlobalConfig::Topic_G.mPubCmd.c_str());
time_check = 0;
}
if(HardStatus == 3600){//one hour 3600
JsonData jd;
std::string data = jd.JsonCmd_07();
data_publish(data.c_str(), GlobalConfig::Topic_G.mPubStatus.c_str());
HardStatus = 0;
}
if (21600 == online_check) {
JsonData jd;
jd.DataNodeStatusCheck();
Param_29 param;
std::string cmd29 = jd.JsonCmd_29(param);
data_publish(cmd29.c_str(), GlobalConfig::Topic_G.mPubCmd.c_str());
online_check = 0;
}
if ( reset_flag > 7200) {
reset_flag = 0;
//exit(0);
}
reset_flag++;
time_check++;
heart_count++;
online_check++;
HardStatus ++;
if(GlobalConfig::EnterZigBeeWaveTransmittingFlag_G == ENTER_TRANSMITTING_STATUS) {
GlobalConfig::EnterZigBeeWaveTransmittingCnt_G ++;
if(GlobalConfig::EnterZigBeeWaveTransmittingCnt_G >= 180) {
GlobalConfig::EnterZigBeeWaveTransmittingFlag_G = NO_ENTER_TRANSMITTING_STATUS;
GlobalConfig::EnterZigBeeWaveTransmittingCnt_G = 0;
LOG_ERROR("[---- ZigBee error--!] ZigBee PanID is 9999 over time for 3 minutes !\n");
// 重新写入 0x8888
unsigned short shortAddr = 0x8888;
pUart->WriteShortAddr2Zigbee(shortAddr);
// 延时1秒
boost::this_thread::sleep(boost::posix_time::seconds(1));
// 重新读回 GlobalConfig::ZigbeeInfo_G.PanID
pUart->UpdateZigbeeInfoCtrl();
// 延时1秒
boost::this_thread::sleep(boost::posix_time::seconds(1));
std::string str("8888");
if( GlobalConfig::ZigbeeInfo_G.MyAddr.compare(str) == 0 ){
LOG_INFO("[---- ZigBee INFO ----!] ZigBee PanID come back to be 8888 !\n");
}
else {
LOG_ERROR("[---- ZigBee error--!] ZigBee PanID cannot come back to be 8888 !\n");
}
}
}
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
}
}
void StartCgiServer()
{
print_info("start deal cgi\n");
TcpCgi *tcpCgi = TcpCgi::instance();
while (1) {
tcpCgi->startCgiServer();
boost::this_thread::sleep(boost::posix_time::seconds(10));
}
}
void HeartRep()
{
while(1) {
Json::Value jsHeart;
Json::FastWriter fw;
jsHeart["dataNodeGatewayNo"] = GlobalConfig::MacAddr_G;
jsHeart["status"] = "online";
std::string strJson = fw.write(jsHeart);
data_publish(strJson.c_str(), GlobalConfig::Topic_G.mPubHeart.c_str());
boost::this_thread::sleep(boost::posix_time::seconds(10));
}
}
void UartStart()
{
// onReceiveUart cb = (onReceiveUart)&ProtoConvert::HandleFromUart;
// pUart->setCallBack(cb);
pUart->InitUart();
while (GlobalConfig::QuitFlag_G) {
pUart->UpdateZigbeeInfoCtrl();
pUart->ReadFromUart();
pUart->Run();
// pUart->Stop();
boost::this_thread::sleep(boost::posix_time::milliseconds(10));
}
}
void UartStartWave()
{
pUart->DealWaveThread();
}
void StartUdpSys()
{
UdpSys *udpsys = UdpSys::instance();
udpsys->StartConnectSysUdp();
}
static const char* LOCAL_WILL_TOPIC = "up/uart/will";
void my_publish_callback(struct mosquitto *mosq, void *obj, int mid)
{
}
void my_connect_callback(struct mosquitto *mosq, void *obj, int result)
{
struct userdata *ud;
ud = (struct userdata *)obj;
if (!result){
for (int i = 0; i < ud->topic_count; i++){
print_purple("mosquitto_subscribe ud->topics[%d]:%s\n", i, ud->topics[i]);
int iret = mosquitto_subscribe(mosq, NULL, ud->topics[i], ud->topic_qos);
print_purple("mosquitto_subscribe ret:%d\n", iret);
}
int iret = mosquitto_subscribe(mosq, NULL, GlobalConfig::Topic_G.mSubData.c_str(), 1);
print_debug("mosquitto_subscribe's return value: %d\n", iret);
char gwTime[32] = { 0 };
GetTimeNet(gwTime, 0);
connect_time = strtoll(gwTime, NULL, 10);
print_debug("connect_time:%lld\n", connect_time);
long long difftime_ms = connect_time - connect_lost_time;
//if (difftime_ms > 20*1000) { // 超过20秒判定为连接断开
char reply_string[256] = {0};
std::string startStatus = "0";
if (access(SYSTEMSTART, 0) >= 0) {
startStatus = GetFileContent(SYSTEMSTART, 1);
}
sprintf(reply_string, "{\"dataNodeGatewayNo\":\"%s\",\"softVersion\":\"%s\",\"status\":\"%s\"}",
GlobalConfig::MacAddr_G.c_str(), GlobalConfig::Version.c_str(), startStatus.c_str());
Json::Value jsData;
Json::Value jsVal;
Json::FastWriter fw;
jsData["cmd"] = "15";
jsData["dataNodeGatewayNo"] = GlobalConfig::MacAddr_G;
// jsData["cmdBody"] = "";
std::string strCmd15 = fw.write(jsData);
// data_publish(strCmd15.c_str(), GlobalConfig::Topic_G.mPubCmd.c_str());
std::string instr = std::string(reply_string);
std::string topic = "equipment/state/" + GlobalConfig::MacAddr_G;
int ret = data_publish(instr.c_str(), topic.c_str());
system("echo 0 > /CIDW/start");
if (ret != MOSQ_ERR_SUCCESS) {
print_debug("Publish failed:%d, %s\n", ret, instr.c_str());
disconnect();
}
//}
GlobalConfig::LinkStatus_G = 1;
print_debug("Connect to server success.\n");
// std::string cmd20 = JsonCmd_Cgi_20();
// data_publish(cmd20.c_str(), GlobalConfig::Topic_G.mPubCmd.c_str());
char buf[256] = {0};
sprintf(buf, "{\"dataNodeGatewayNo\":\"%s\",\"cmd\":\"12\",\"status\":\"REQ\"}",
GlobalConfig::MacAddr_G.c_str());
std::string str = std::string(buf);
// data_publish(str.c_str(), GlobalConfig::Topic_G.mPubCmd.c_str());
std::string runinfo = "本地服务器连接成功";
}
else{
if (result && !ud->quiet){
fprintf(stderr, "%s\n", mosquitto_connack_string(result));
if(result == 1) {
//Connection Refused: unacceptable protocol version
} else if (result == 2) {
//Connection Refused: identifier rejected
} else if (result == 3) {
//Connection Refused: server unavailable
} else if (result == 4) {
//Connection Refused: bad user name or password
// exit(0);
} else if (result == 5) {
//Connection Refused: not authorized
}
}
}
}
void my_disconnect_callback(struct mosquitto *mosq, void *obj, int result)
{
int ret = 0;
ret = disconnect();
// LOG_ERROR("The uart connection lost:%d\n", ret);
print_debug("The uart connection lost\n");
char gwTime[32] = { 0 };
GetTimeNet(gwTime, 0);
uptime = std::string(gwTime);
connect_lost_time = strtoll(uptime.c_str(), NULL, 10);
print_debug("connect_lost_time:%lld\n", connect_lost_time);
GlobalConfig::LinkStatus_G = 0;
GlobalConfig::LinkCount = GlobalConfig::LinkCount + 1;
}
void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
struct userdata *ud;
bool res;
assert(obj);
ud = (struct userdata *)obj;
if (message->retain && ud->no_retain) return;
if (ud->filter_outs) {
for (int i = 0; i < ud->filter_out_count; i++) {
mosquitto_topic_matches_sub(ud->filter_outs[i], message->topic, &res);
if (res) return;
}
}
if (ud->verbose) {
if (message->payloadlen) {
std::string strtopic(message->topic);
print_info("strtopic : %s \n", strtopic.c_str());
cidwServer->HandleFromServer((const char *)message->payload,message->payloadlen,message->topic);
if (ud->eol) {
print_brown("\n");
}
} else {
if (ud->eol) {
print_brown("%s (null)\n", message->topic);
}
}
fflush(stdout);
} else {
if (message->payloadlen) {
fwrite(message->payload, 1, message->payloadlen, stdout);
if (ud->eol) {
print_red("\n");
}
fflush(stdout);
}
}
}
void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
{
int i;
struct userdata *ud;
assert(obj);
ud = (struct userdata *)obj;
if (!ud->quiet) print_brown("Subscribed (mid: %d): %d", mid, granted_qos[0]);
for (i = 1; i < qos_count; i++){
if (!ud->quiet) print_brown(", %d", granted_qos[i]);
}
}
void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str)
{
if (level == MOSQ_LOG_ERR) {
//LOG_ERROR("%s\n", str);
} else if (level == MOSQ_LOG_WARNING) {
// LOG_WARN("%s\n", str);
} else if (level == MOSQ_LOG_NOTICE) {
//LOG_INFO("%s\n", str);
}
}
void StartMqttClient()
{
print_info("start mqtt \n");
std::string runinfo = "MQTT通信模块启动";
while (1) {
if (GlobalConfig::ServerIP.length() > 0) {
std::string strEqupNo = GlobalConfig::MacAddr_G;
std::string strVersion = GlobalConfig::Version;
std::string salt;
register_collback(my_connect_callback, my_message_callback, my_subscribe_callback, my_log_callback, my_disconnect_callback, my_publish_callback);
start_client(strEqupNo.c_str(), GlobalConfig::MacAddr_G.c_str(), GlobalConfig::ServerIP.c_str(), strVersion.c_str(), "11111111", salt);
}
boost::this_thread::sleep(boost::posix_time::seconds(3));
}
}
void SearchThread()
{
std::string runinfo = "设备搜索模块启动";
while (GlobalConfig::QuitFlag_G) {
if (GlobalConfig::IpAddr_G.length() > 0 && 0 != GlobalConfig::IpAddr_G.compare("0.0.0.0")) {
boost::asio::io_service io_service;
SearchDev *searchDevObj = new SearchDev(io_service);
searchDevObj->MultiCastRecv();
io_service.run();
delete searchDevObj;
}
boost::this_thread::sleep(boost::posix_time::seconds(5));
print_error("SearchThred restart.\n");
}
}
void RecvUpdateFile()
{
boost::asio::io_service iosev;
boost::asio::ip::tcp::acceptor acceptor(iosev,
boost::asio::ip::tcp::endpoint(
boost::asio::ip::tcp::v4(), 7304));
for(;;) {
boost::asio::ip::tcp::socket socket(iosev);
acceptor.accept(socket);
boost::system::error_code ec;
if(ec) {
print_error("%s\n", boost::system::system_error(ec).what());
//return -1;
}
FILE *fp;
char buffer[1024];
size_t len = 0;
int write_len;
bzero(buffer,1024);
fp = fopen("/tmp/upgrade.tar.gz", "w");
if(NULL == fp ) {
print_info("File:\t Can Not Open To Write\n");
exit(1);
}
while(len = socket.read_some(boost::asio::buffer(buffer),ec)) {
if(len < 0) {
print_info("Receive Data From Server Failed! \n");
break;
}
write_len = fwrite(buffer, sizeof(char), len, fp);
if(write_len < len) {
print_info("File:test Write Failed!\n");
break;
}
bzero(buffer, 1024);
}
print_info("Receive File From Server Finished! \n");
fclose(fp);
Json::Value jsData;
Json::FastWriter fw;
jsData["cmd"] = "03";
jsData["updatefilename"] = "updatefile";
std::string str = fw.write(jsData);
system("/etc/init.d/sysupgrade.sh");
}
}