#include "mqtt_client.h" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "common/global.hpp" #include "localserver/local_server.hpp" #include "utility/secure.hpp" extern zlog_category_t *zct; extern zlog_category_t *zbt; static char res_topic[100]; static char req_topic[100]; static std::string uptime; static long long connect_lost_time = 0; // ms static long long connect_time = 0; // ms struct mosquitto *mosq = NULL; static int mid_sent = 0; struct userdata ud; connect_callback my_connect_cb = NULL; disconnect_callback my_disconnect_cb = NULL; message_callback my_message_cb = NULL; subscribe_callback my_subscribe_cb = NULL; log_callback my_log_cb = NULL; publish_callback my_publish_cb = NULL; void my_publish_callback(struct mosquitto *mosq, void *obj, int mid) {} void my_connect_callback(struct mosquitto *mosq, void *obj, int result) { struct userdata *ud; ud = (struct userdata *)obj; if (result) { zlog_error(zct, "%s", mosquitto_connack_string(result)); return; } int ret = 0; for (int i = 0; i < ud->topic_count; i++) { zlog_info(zct, "mosquitto_subscribe ud->topics[%d]:%s", i, ud->topics[i]); ret = mosquitto_subscribe(mosq, NULL, ud->topics[i], ud->topic_qos); if (ret != MOSQ_ERR_SUCCESS) { zlog_error(zct, "fail to subscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), ud->topics[i]); return; } } ret = mosquitto_subscribe(mosq, NULL, GlobalConfig::Topic_G.mSubData.c_str(), 1); if (ret != MOSQ_ERR_SUCCESS) { zlog_error(zct, "fail to subscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), GlobalConfig::Topic_G.mSubData.c_str()); return; } zlog_info(zct, "mosquitto_subscribe's return value: %d", ret); char gwTime[32] = {0}; GetTimeNet(gwTime, 0); connect_time = strtoll(gwTime, NULL, 10); zlog_info(zct, "connect_time:%lld", connect_time); long long difftime_ms = connect_time - connect_lost_time; if (difftime_ms > 20 * 1000) { // 超过20秒,判定为连接断开 char reply_string[256] = {0}; std::string startStatus = "0"; if (access(SYSTEMSTART, 0) >= 0) { startStatus = GetFileContent(SYSTEMSTART, 1); } sprintf(reply_string, "{\"dataNodeGatewayNo\":\"%s\",\"softVersion\":\"%s\",\"status\":\"%s\"}", GlobalConfig::MacAddr_G.c_str(), GlobalConfig::Version.c_str(), startStatus.c_str()); Json::Value jsData; Json::Value jsVal; Json::FastWriter fw; jsData["cmd"] = "15"; jsData["dataNodeGatewayNo"] = GlobalConfig::MacAddr_G; std::string strCmd15 = fw.write(jsData); std::string instr = std::string(reply_string); std::string topic = "equipment/state/" + GlobalConfig::MacAddr_G; ret = data_publish(instr.c_str(), topic.c_str()); if (ret != MOSQ_ERR_SUCCESS) { zlog_error(zct, "Publish failed:%d, %s", ret, instr.c_str()); disconnect(); } } GlobalConfig::LinkStatus_G = 1; zlog_info(zct, "Connect to server success."); char buf[256] = {0}; sprintf(buf, "{\"dataNodeGatewayNo\":\"%s\",\"cmd\":\"12\",\"status\":\"REQ\"}", GlobalConfig::MacAddr_G.c_str()); std::string str = std::string(buf); std::string runinfo = "本地服务器连接成功"; } void my_disconnect_callback(struct mosquitto *mosq, void *obj, int result) { disconnect(); zlog_info(zct, "The MQTT connection lost\n"); char gwTime[32] = {0}; GetTimeNet(gwTime, 0); uptime = std::string(gwTime); connect_lost_time = strtoll(uptime.c_str(), NULL, 10); zlog_info(zct, "connect_lost_time:%lld\n", connect_lost_time); GlobalConfig::LinkStatus_G = 0; GlobalConfig::LinkCount = GlobalConfig::LinkCount + 1; #ifdef WIFI_MODULE char buf[128] = {0}; std::string wpa_state = ""; wpa_state = "/usr/sbin/wpa_cli status|grep wpa_state | cut -f 2 -d '='"; system_custom(wpa_state.c_str(), buf); std::string state = std::string(buf); std::string RSSI_cmd = ""; RSSI_cmd = "/usr/sbin/wpa_cli signal_poll|grep RSSI | cut -f 2 -d '='"; system_custom(RSSI_cmd.c_str(), buf); std::string RSSI = std::string(buf); int iRet = reconnect(); memset(buf, 0, sizeof(buf)); sprintf(buf, "wifi RSSI:%s,state:%s,MQTT reconnect :%d\n", RSSI.c_str(), state.c_str(), iRet); zlog_info(zct, "%s\n", buf); #else #endif } void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) { struct userdata *ud; bool res; assert(obj); ud = (struct userdata *)obj; if (message->retain && ud->no_retain) return; if (ud->filter_outs) { for (int i = 0; i < ud->filter_out_count; i++) { mosquitto_topic_matches_sub(ud->filter_outs[i], message->topic, &res); if (res) return; } } if (ud->verbose) { if (message->payloadlen) { std::string strtopic(message->topic); zlog_info(zct, "strtopic : %s \n", strtopic.c_str()); LocalServer::HandleFromServer((const char *)message->payload, message->payloadlen, message->topic); if (ud->eol) { } } else { if (ud->eol) { zlog_info(zct, "%s (null)\n", message->topic); } } fflush(stdout); } else { if (message->payloadlen) { fwrite(message->payload, 1, message->payloadlen, stdout); if (ud->eol) { } fflush(stdout); } } } void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos) { int i; zlog_info(zct, "Subscribed (mid: %d): %d", mid, granted_qos[0]); for (i = 1; i < qos_count; i++) { zlog_info(zct, ", %d", granted_qos[i]); } } void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str) { if (level == MOSQ_LOG_ERR) { // LOG_ERROR("%s\n", str); } else if (level == MOSQ_LOG_WARNING) { // LOG_WARN("%s\n", str); } else if (level == MOSQ_LOG_NOTICE) { // LOG_INFO("%s\n", str); } } void register_callback(connect_callback connect_c, message_callback message_c, subscribe_callback subscribe_c, log_callback log_c, disconnect_callback disconnect_c, publish_callback publish_c) { my_connect_cb = connect_c; my_message_cb = message_c; my_subscribe_cb = subscribe_c; my_log_cb = log_c; my_disconnect_cb = disconnect_c; my_publish_cb = publish_c; } int data_publish(const char *str, const char *topic) { int ret = -1; if (mosq != NULL) { std::string strTopic = std::string(topic); if (strTopic.find("cmd") != std::string::npos || strTopic.find("configureInfo") != std::string::npos) { std::string pData = (std::string)str; char *base64_aes = new char[pData.length() * 2]; memset(base64_aes, 0, pData.length() * 2); secure::instance().Base64Encode((unsigned char *)pData.c_str(), base64_aes, pData.length()); ret = mosquitto_publish(mosq, &mid_sent, topic, strlen(base64_aes), base64_aes, ud.topic_qos, false); if (base64_aes != NULL) delete[] base64_aes; if (ret != MOSQ_ERR_SUCCESS) { zlog_error(zct, "fail to send mqtt msg, ret: [%s], topic: %s", mosquitto_strerror(ret), topic); } } else { ret = mosquitto_publish(mosq, &mid_sent, topic, strlen(str), str, ud.topic_qos, false); if (ret != MOSQ_ERR_SUCCESS) { zlog_error(zct, "fail to send mqtt msg, ret: [%s], topic: %s", mosquitto_strerror(ret), topic); } } if (ret != 0) { disconnect(); } }else{ zlog_warn(zct,"mosq == NULL"); } return ret; } int subscribe(const char *topic, int qos) { int ret = mosquitto_subscribe(mosq, NULL, topic, qos); if (ret != MOSQ_ERR_SUCCESS) { zlog_error(zct, "fail to subscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), topic); } return ret; } int unsubscribe(const char *topic) { int ret = mosquitto_unsubscribe(mosq, NULL, topic); if (ret != MOSQ_ERR_SUCCESS) { zlog_error(zct, "fail to unsubscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), topic); } return ret; } int disconnect() { return mosquitto_disconnect(mosq); } int reconnect() { return mosquitto_reconnect(mosq); } int start_client(const char *boardname, const char *gwid, const char *gwip, const char *gwver, const char *gwcode, std::string &salt) { char *host; int port = 51613; char *bind_address = NULL; int keepalive = 60; int rc = 0; char will_payload[100]; long will_payloadlen = 0; int will_qos = 0; bool will_retain = false; char *will_topic = NULL; char *psk = NULL; char *psk_identity = NULL; mqttsdata_t mqttsdata; signal(SIGCHLD, SIG_IGN); // signal函数作用:为一个信号注册一个信号处理函数。其中SIGCHLD:在一个进程终止或者停止时,将SIGCHLD信号发送给父进程。而信号处理函数是一个函数指针,不过它使用的是预定义函数SIG_IGN(忽视信号) signal(SIGPIPE, SIG_IGN); register_callback(my_connect_callback, my_message_callback, my_subscribe_callback, my_log_callback, my_disconnect_callback, my_publish_callback); int qos = readIntValue("config", "qos", (char *)GlobalConfig::Config_G.c_str()); memset(&mqttsdata, 0, sizeof(mqttsdata_t)); memcpy(mqttsdata.gwid, gwid, strlen(gwid)); salt = std::string(mqttsdata.salt); memset(&ud, 0, sizeof(struct userdata)); ud.eol = true; char localip[100]; sprintf(localip, "%s", gwip); host = localip; mqttsdata.port = GlobalConfig::ServerPort; port = GlobalConfig::ServerPort; #ifdef WIFI_MODULE mqttsdata.qos = 0; #else mqttsdata.qos = qos; #endif zlog_info(zct, "mqttsdata.qos = %d", mqttsdata.qos); std::string username = ReadStrByOpt(SERVERCONFIG, "Server", "UserName"); std::string password = ReadStrByOpt(SERVERCONFIG, "Server", "Password"); if (username == "" || password == "") { username = "chaos"; password = "HSD272*#xkd"; } char userName[100]; memset(userName, 0, 100); sprintf(userName, "%s", username.c_str()); char passWord[100]; memset(passWord, 0, 100); sprintf(passWord, "%s", password.c_str()); zlog_info(zbt, "userName = %s , passWord = %s", userName, passWord); ud.username = userName; ud.password = passWord; ud.gwid = mqttsdata.gwid; ud.dataUrl = mqttsdata.dataUrl; ud.fsUrl = mqttsdata.fsUrl; will_qos = mqttsdata.qos; ud.topic_qos = will_qos; ud.topic_count++; ud.topics = (char **)realloc(ud.topics, ud.topic_count * sizeof(char *)); if (ud.topics == NULL) { zlog_error(zbt, "fail to realloc for ud.topics"); return 1; } memset(req_topic, 0, 100); sprintf(req_topic, "dev/cidwcc"); ud.topics[ud.topic_count - 1] = req_topic; ud.verbose = 1; memset(res_topic, 0, 100); sprintf(res_topic, "gw/will"); zlog_info(zct, "res_topic:%s", res_topic); will_topic = res_topic; ud.resp_topic = res_topic; sprintf(will_payload, "{\"cmd\":\"15\",\"gwID\":\"%s\"}", mqttsdata.gwid); will_payloadlen = strlen(will_payload); zlog_info(zct, "will_payload:%s,will_payloadlen:%ld", will_payload, will_payloadlen); mosquitto_lib_init(); mosq = mosquitto_new(NULL, true, &ud); if (!mosq) { zlog_error(zct, "fail to mosquitto_new, errno:%d", errno); free(ud.topics); mosquitto_lib_cleanup(); return 1; } mosquitto_log_callback_set(mosq, my_log_cb); if (!psk) { psk = (char *)malloc(32 * sizeof(char)); if (!psk) { free(ud.topics); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); mosq = NULL; zlog_error(zct, "Error: No free space 1"); return -1; } memset(psk, 0, 32); snprintf(psk, 32, "%d", getpid()); psk_identity = (char *)malloc(32 * sizeof(char)); if (!psk_identity) { free(ud.topics); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); mosq = NULL; zlog_error(zct, "Error: No free space 2"); free(psk); return -1; } strncpy(psk_identity, psk, 32); } if (mosquitto_will_set(mosq, will_topic, will_payloadlen, will_payload, will_qos, will_retain)) { zlog_error(zct, "Error: Problem setting will."); goto mosq_free; } if (ud.username && mosquitto_username_pw_set(mosq, ud.username, ud.password)) { zlog_error(zct, "Error: Problem setting username and password."); goto mosq_free; } mosquitto_connect_callback_set(mosq, my_connect_cb); mosquitto_disconnect_callback_set(mosq, my_disconnect_cb); mosquitto_message_callback_set(mosq, my_message_cb); mosquitto_publish_callback_set(mosq, my_publish_cb); mosquitto_subscribe_callback_set(mosq, my_subscribe_cb); rc = mosquitto_connect_bind(mosq, host, port, keepalive, bind_address); zlog_info(zct, "host = %s port = %d rc = %d", host, port, rc); if (rc) { zlog_error(zct, "Unable to connect (%d).", rc); goto mosq_free; } rc = mosquitto_loop_forever(mosq, -1, 1); if (rc) { zlog_error(zct, "Error: %s", mosquitto_strerror(rc)); } mosq_free: mosquitto_destroy(mosq); mosquitto_lib_cleanup(); mosq = NULL; if (ud.topics != NULL) { free(ud.topics); } if (psk) { free(psk); } if (psk_identity) { free(psk_identity); } sleep(1); return 0; }