From 2e5b88063c4191103eba0f3089deae0b2ac8373a Mon Sep 17 00:00:00 2001 From: pandx Date: Fri, 25 Oct 2024 09:35:37 +0800 Subject: [PATCH] refine mqtt codes. --- mqttclient/mqtt_client.cpp | 211 +++++++++++++++++++++++++++++++++---- mqttclient/mqtt_client.h | 16 +-- threadfunc/thread_func.cpp | 169 +---------------------------- 3 files changed, 197 insertions(+), 199 deletions(-) diff --git a/mqttclient/mqtt_client.cpp b/mqttclient/mqtt_client.cpp index e67ea6c..4fc356b 100644 --- a/mqttclient/mqtt_client.cpp +++ b/mqttclient/mqtt_client.cpp @@ -16,6 +16,7 @@ #include #include "common/global.hpp" +#include "localserver/local_server.hpp" #include "utility/secure.hpp" extern zlog_category_t *zct; @@ -23,23 +24,189 @@ extern zlog_category_t *zbt; static char res_topic[100]; static char req_topic[100]; +static std::string uptime; +static long long connect_lost_time = 0; // ms +static long long connect_time = 0; // ms struct mosquitto *mosq = NULL; static int mid_sent = 0; struct userdata ud; -connect_callback my_connect_callback = NULL; -disconnect_callback my_disconnect_callback = NULL; -message_callback my_message_callback = NULL; -subscribe_callback my_subscribe_callback = NULL; -log_callback my_log_callback = NULL; -publish_callback my_publish_callback = NULL; +connect_callback my_connect_cb = NULL; +disconnect_callback my_disconnect_cb = NULL; +message_callback my_message_cb = NULL; +subscribe_callback my_subscribe_cb = NULL; +log_callback my_log_cb = NULL; +publish_callback my_publish_cb = NULL; -void register_collback(connect_callback connect_c, message_callback message_c, subscribe_callback subscribe_c, log_callback log_c, disconnect_callback disconnect_c, publish_callback publish_c) { - my_connect_callback = connect_c; - my_message_callback = message_c; - my_subscribe_callback = subscribe_c; - my_log_callback = log_c; - my_disconnect_callback = disconnect_c; - my_publish_callback = publish_c; +void my_publish_callback(struct mosquitto *mosq, void *obj, int mid) {} + +void my_connect_callback(struct mosquitto *mosq, void *obj, int result) { + struct userdata *ud; + ud = (struct userdata *)obj; + if (result) { + zlog_error(zct, "%s", mosquitto_connack_string(result)); + return; + } + int ret = 0; + for (int i = 0; i < ud->topic_count; i++) { + zlog_info(zct, "mosquitto_subscribe ud->topics[%d]:%s", i, ud->topics[i]); + ret = mosquitto_subscribe(mosq, NULL, ud->topics[i], ud->topic_qos); + if (ret != MOSQ_ERR_SUCCESS) { + zlog_error(zct, "fail to subscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), ud->topics[i]); + return; + } + } + + ret = mosquitto_subscribe(mosq, NULL, GlobalConfig::Topic_G.mSubData.c_str(), 1); + if (ret != MOSQ_ERR_SUCCESS) { + zlog_error(zct, "fail to subscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), GlobalConfig::Topic_G.mSubData.c_str()); + return; + } + zlog_info(zct, "mosquitto_subscribe's return value: %d", ret); + + char gwTime[32] = {0}; + GetTimeNet(gwTime, 0); + connect_time = strtoll(gwTime, NULL, 10); + zlog_info(zct, "connect_time:%lld", connect_time); + long long difftime_ms = connect_time - connect_lost_time; + if (difftime_ms > 20 * 1000) { // 超过20秒,判定为连接断开 + char reply_string[256] = {0}; + std::string startStatus = "0"; + if (access(SYSTEMSTART, 0) >= 0) { + startStatus = GetFileContent(SYSTEMSTART, 1); + } + sprintf(reply_string, "{\"dataNodeGatewayNo\":\"%s\",\"softVersion\":\"%s\",\"status\":\"%s\"}", GlobalConfig::MacAddr_G.c_str(), GlobalConfig::Version.c_str(), startStatus.c_str()); + + Json::Value jsData; + Json::Value jsVal; + Json::FastWriter fw; + jsData["cmd"] = "15"; + jsData["dataNodeGatewayNo"] = GlobalConfig::MacAddr_G; + + std::string strCmd15 = fw.write(jsData); + + std::string instr = std::string(reply_string); + std::string topic = "equipment/state/" + GlobalConfig::MacAddr_G; + ret = data_publish(instr.c_str(), topic.c_str()); + if (ret != MOSQ_ERR_SUCCESS) { + zlog_error(zct, "Publish failed:%d, %s", ret, instr.c_str()); + disconnect(); + } + } + GlobalConfig::LinkStatus_G = 1; + zlog_info(zct, "Connect to server success."); + + char buf[256] = {0}; + sprintf(buf, "{\"dataNodeGatewayNo\":\"%s\",\"cmd\":\"12\",\"status\":\"REQ\"}", GlobalConfig::MacAddr_G.c_str()); + std::string str = std::string(buf); + std::string runinfo = "本地服务器连接成功"; +} + +void my_disconnect_callback(struct mosquitto *mosq, void *obj, int result) { + disconnect(); + zlog_info(zct, "The MQTT connection lost\n"); + char gwTime[32] = {0}; + GetTimeNet(gwTime, 0); + uptime = std::string(gwTime); + connect_lost_time = strtoll(uptime.c_str(), NULL, 10); + zlog_info(zct, "connect_lost_time:%lld\n", connect_lost_time); + GlobalConfig::LinkStatus_G = 0; + GlobalConfig::LinkCount = GlobalConfig::LinkCount + 1; + +#ifdef WIFI_MODULE + char buf[128] = {0}; + std::string wpa_state = ""; +#ifdef G2UL_GATEWAY + wpa_state = "/usr/sbin/wpa_cli status|grep wpa_state | cut -f 2 -d '='"; +#endif + +#ifdef IMX6UL_GATEWAY + wpa_state = "/opt/Cidn/wpa_cli status|grep wpa_state | cut -f 2 -d '='"; +#endif + system_custom(wpa_state.c_str(), buf); + std::string state = std::string(buf); + std::string RSSI_cmd = ""; +#ifdef G2UL_GATEWAY + RSSI_cmd = "/usr/sbin/wpa_cli signal_poll|grep RSSI | cut -f 2 -d '='"; +#endif +#ifdef IMX6UL_GATEWAY + RSSI_cmd = "/opt/Cidn/wpa_cli signal_poll|grep RSSI | cut -f 2 -d '='"; +#endif + system_custom(RSSI_cmd.c_str(), buf); + std::string RSSI = std::string(buf); + + int iRet = reconnect(); + memset(buf, 0, sizeof(buf)); + sprintf(buf, "wifi RSSI:%s,state:%s,MQTT reconnect :%d\n", RSSI.c_str(), state.c_str(), iRet); + + zlog_info(zct, "%s\n", buf); + +#else + +#endif +} + +void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) { + struct userdata *ud; + bool res; + assert(obj); + ud = (struct userdata *)obj; + + if (message->retain && ud->no_retain) return; + if (ud->filter_outs) { + for (int i = 0; i < ud->filter_out_count; i++) { + mosquitto_topic_matches_sub(ud->filter_outs[i], message->topic, &res); + if (res) return; + } + } + + if (ud->verbose) { + if (message->payloadlen) { + std::string strtopic(message->topic); + zlog_info(zct, "strtopic : %s \n", strtopic.c_str()); + LocalServer::HandleFromServer((const char *)message->payload, message->payloadlen, message->topic); + if (ud->eol) { + } + } else { + if (ud->eol) { + zlog_info(zct, "%s (null)\n", message->topic); + } + } + fflush(stdout); + } else { + if (message->payloadlen) { + fwrite(message->payload, 1, message->payloadlen, stdout); + if (ud->eol) { + } + fflush(stdout); + } + } +} + +void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos) { + int i; + zlog_info(zct, "Subscribed (mid: %d): %d", mid, granted_qos[0]); + for (i = 1; i < qos_count; i++) { + zlog_info(zct, ", %d", granted_qos[i]); + } +} + +void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str) { + if (level == MOSQ_LOG_ERR) { + // LOG_ERROR("%s\n", str); + } else if (level == MOSQ_LOG_WARNING) { + // LOG_WARN("%s\n", str); + } else if (level == MOSQ_LOG_NOTICE) { + // LOG_INFO("%s\n", str); + } +} + +void register_callback(connect_callback connect_c, message_callback message_c, subscribe_callback subscribe_c, log_callback log_c, disconnect_callback disconnect_c, publish_callback publish_c) { + my_connect_cb = connect_c; + my_message_cb = message_c; + my_subscribe_cb = subscribe_c; + my_log_cb = log_c; + my_disconnect_cb = disconnect_c; + my_publish_cb = publish_c; } int data_publish(const char *str, const char *topic) { @@ -102,6 +269,7 @@ int unsubscribe(const char *topic) { int disconnect() { return mosquitto_disconnect(mosq); } int reconnect() { return mosquitto_reconnect(mosq); } + int start_client(const char *boardname, const char *gwid, const char *gwip, const char *gwver, const char *gwcode, std::string &salt) { char *host; int port = 51613; @@ -118,10 +286,7 @@ int start_client(const char *boardname, const char *gwid, const char *gwip, cons mqttsdata_t mqttsdata; signal(SIGCHLD, SIG_IGN); // signal函数作用:为一个信号注册一个信号处理函数。其中SIGCHLD:在一个进程终止或者停止时,将SIGCHLD信号发送给父进程。而信号处理函数是一个函数指针,不过它使用的是预定义函数SIG_IGN(忽视信号) signal(SIGPIPE, SIG_IGN); - if (my_connect_callback == NULL || my_message_callback == NULL || my_subscribe_callback == NULL || my_log_callback == NULL) { - zlog_error(zct, "please call the register_collback function first."); - return -1; - } + register_callback(my_connect_callback, my_message_callback, my_subscribe_callback, my_log_callback, my_disconnect_callback, my_publish_callback); int qos = readIntValue("config", "qos", (char *)GlobalConfig::Config_G.c_str()); memset(&mqttsdata, 0, sizeof(mqttsdata_t)); @@ -195,7 +360,7 @@ int start_client(const char *boardname, const char *gwid, const char *gwip, cons return 1; } - mosquitto_log_callback_set(mosq, my_log_callback); + mosquitto_log_callback_set(mosq, my_log_cb); if (!psk) { psk = (char *)malloc(32 * sizeof(char)); @@ -229,11 +394,11 @@ int start_client(const char *boardname, const char *gwid, const char *gwip, cons goto mosq_free; } - mosquitto_connect_callback_set(mosq, my_connect_callback); - mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); - mosquitto_message_callback_set(mosq, my_message_callback); - mosquitto_publish_callback_set(mosq, my_publish_callback); - mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); + mosquitto_connect_callback_set(mosq, my_connect_cb); + mosquitto_disconnect_callback_set(mosq, my_disconnect_cb); + mosquitto_message_callback_set(mosq, my_message_cb); + mosquitto_publish_callback_set(mosq, my_publish_cb); + mosquitto_subscribe_callback_set(mosq, my_subscribe_cb); rc = mosquitto_connect_bind(mosq, host, port, keepalive, bind_address); zlog_info(zct, "host = %s port = %d rc = %d", host, port, rc); diff --git a/mqttclient/mqtt_client.h b/mqttclient/mqtt_client.h index 0e01ffb..b5237bd 100644 --- a/mqttclient/mqtt_client.h +++ b/mqttclient/mqtt_client.h @@ -49,14 +49,14 @@ typedef void (*log_callback)(struct mosquitto *mosq, void *obj, int level, const typedef void (*publish_callback)(struct mosquitto *mosq, void *obj, int mid); // mqtt注册 -extern void register_collback(connect_callback connect_c, message_callback message_c, subscribe_callback subscribe_c, log_callback log_c, disconnect_callback disconnect_c, publish_callback publish_c); -extern int data_publish(const char *str, const char *topic); -extern int data_publish_wave(WAVE_CONTAIN *str, const char *topic); -extern int disconnect(); -extern int reconnect(); -extern int subscribe(const char *topic, int qos); -extern int unsubscribe(const char *topic); -extern int start_client(const char *boardname, const char *gwid, const char *gwip, const char *gwver, const char *gwcode, std::string &salt); +void register_collback(connect_callback connect_c, message_callback message_c, subscribe_callback subscribe_c, log_callback log_c, disconnect_callback disconnect_c, publish_callback publish_c); +int data_publish(const char *str, const char *topic); +int data_publish_wave(WAVE_CONTAIN *str, const char *topic); +int disconnect(); +int reconnect(); +int subscribe(const char *topic, int qos); +int unsubscribe(const char *topic); +int start_client(const char *boardname, const char *gwid, const char *gwip, const char *gwver, const char *gwcode, std::string &salt); #ifdef __cplusplus } #endif diff --git a/threadfunc/thread_func.cpp b/threadfunc/thread_func.cpp index 33eca14..69e47ff 100644 --- a/threadfunc/thread_func.cpp +++ b/threadfunc/thread_func.cpp @@ -23,9 +23,6 @@ extern zlog_category_t *zct; extern zlog_category_t *zbt; static std::string serverPort; -static std::string uptime; -static long long connect_lost_time = 0; // ms -static long long connect_time = 0; // ms Dial dial; @@ -270,169 +267,6 @@ void WatchDog() { close(fd); } -void my_publish_callback(struct mosquitto *mosq, void *obj, int mid) {} - -void my_connect_callback(struct mosquitto *mosq, void *obj, int result) { - struct userdata *ud; - ud = (struct userdata *)obj; - if (result) { - zlog_error(zct, "%s", mosquitto_connack_string(result)); - return; - } - int ret = 0; - for (int i = 0; i < ud->topic_count; i++) { - zlog_info(zct, "mosquitto_subscribe ud->topics[%d]:%s", i, ud->topics[i]); - ret = mosquitto_subscribe(mosq, NULL, ud->topics[i], ud->topic_qos); - if (ret != MOSQ_ERR_SUCCESS) { - zlog_error(zct, "fail to subscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), ud->topics[i]); - return; - } - } - - ret = mosquitto_subscribe(mosq, NULL, GlobalConfig::Topic_G.mSubData.c_str(), 1); - if (ret != MOSQ_ERR_SUCCESS) { - zlog_error(zct, "fail to subscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), GlobalConfig::Topic_G.mSubData.c_str()); - return; - } - zlog_info(zct, "mosquitto_subscribe's return value: %d", ret); - - char gwTime[32] = {0}; - GetTimeNet(gwTime, 0); - connect_time = strtoll(gwTime, NULL, 10); - zlog_info(zct, "connect_time:%lld", connect_time); - long long difftime_ms = connect_time - connect_lost_time; - if (difftime_ms > 20 * 1000) { // 超过20秒,判定为连接断开 - char reply_string[256] = {0}; - std::string startStatus = "0"; - if (access(SYSTEMSTART, 0) >= 0) { - startStatus = GetFileContent(SYSTEMSTART, 1); - } - sprintf(reply_string, "{\"dataNodeGatewayNo\":\"%s\",\"softVersion\":\"%s\",\"status\":\"%s\"}", GlobalConfig::MacAddr_G.c_str(), GlobalConfig::Version.c_str(), startStatus.c_str()); - - Json::Value jsData; - Json::Value jsVal; - Json::FastWriter fw; - jsData["cmd"] = "15"; - jsData["dataNodeGatewayNo"] = GlobalConfig::MacAddr_G; - - std::string strCmd15 = fw.write(jsData); - - std::string instr = std::string(reply_string); - std::string topic = "equipment/state/" + GlobalConfig::MacAddr_G; - ret = data_publish(instr.c_str(), topic.c_str()); - if (ret != MOSQ_ERR_SUCCESS) { - zlog_error(zct, "Publish failed:%d, %s", ret, instr.c_str()); - disconnect(); - } - } - GlobalConfig::LinkStatus_G = 1; - zlog_info(zct, "Connect to server success."); - - char buf[256] = {0}; - sprintf(buf, "{\"dataNodeGatewayNo\":\"%s\",\"cmd\":\"12\",\"status\":\"REQ\"}", GlobalConfig::MacAddr_G.c_str()); - std::string str = std::string(buf); - std::string runinfo = "本地服务器连接成功"; -} - -void my_disconnect_callback(struct mosquitto *mosq, void *obj, int result) { - disconnect(); - zlog_info(zct, "The MQTT connection lost\n"); - char gwTime[32] = {0}; - GetTimeNet(gwTime, 0); - uptime = std::string(gwTime); - connect_lost_time = strtoll(uptime.c_str(), NULL, 10); - zlog_info(zct, "connect_lost_time:%lld\n", connect_lost_time); - GlobalConfig::LinkStatus_G = 0; - GlobalConfig::LinkCount = GlobalConfig::LinkCount + 1; - -#ifdef WIFI_MODULE - char buf[128] = {0}; - std::string wpa_state = ""; -#ifdef G2UL_GATEWAY - wpa_state = "/usr/sbin/wpa_cli status|grep wpa_state | cut -f 2 -d '='"; -#endif - -#ifdef IMX6UL_GATEWAY - wpa_state = "/opt/Cidn/wpa_cli status|grep wpa_state | cut -f 2 -d '='"; -#endif - system_custom(wpa_state.c_str(), buf); - std::string state = std::string(buf); - std::string RSSI_cmd = ""; -#ifdef G2UL_GATEWAY - RSSI_cmd = "/usr/sbin/wpa_cli signal_poll|grep RSSI | cut -f 2 -d '='"; -#endif -#ifdef IMX6UL_GATEWAY - RSSI_cmd = "/opt/Cidn/wpa_cli signal_poll|grep RSSI | cut -f 2 -d '='"; -#endif - system_custom(RSSI_cmd.c_str(), buf); - std::string RSSI = std::string(buf); - - int iRet = reconnect(); - memset(buf, 0, sizeof(buf)); - sprintf(buf, "wifi RSSI:%s,state:%s,MQTT reconnect :%d\n", RSSI.c_str(), state.c_str(), iRet); - - zlog_info(zct, "%s\n", buf); - -#else - -#endif -} - -void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) { - struct userdata *ud; - bool res; - assert(obj); - ud = (struct userdata *)obj; - - if (message->retain && ud->no_retain) return; - if (ud->filter_outs) { - for (int i = 0; i < ud->filter_out_count; i++) { - mosquitto_topic_matches_sub(ud->filter_outs[i], message->topic, &res); - if (res) return; - } - } - - if (ud->verbose) { - if (message->payloadlen) { - std::string strtopic(message->topic); - zlog_info(zct, "strtopic : %s \n", strtopic.c_str()); - LocalServer::HandleFromServer((const char *)message->payload, message->payloadlen, message->topic); - if (ud->eol) { - } - } else { - if (ud->eol) { - zlog_info(zct, "%s (null)\n", message->topic); - } - } - fflush(stdout); - } else { - if (message->payloadlen) { - fwrite(message->payload, 1, message->payloadlen, stdout); - if (ud->eol) { - } - fflush(stdout); - } - } -} - -void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos) { - int i; - zlog_info(zct, "Subscribed (mid: %d): %d", mid, granted_qos[0]); - for (i = 1; i < qos_count; i++) { - zlog_info(zct, ", %d", granted_qos[i]); - } -} - -void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str) { - if (level == MOSQ_LOG_ERR) { - // LOG_ERROR("%s\n", str); - } else if (level == MOSQ_LOG_WARNING) { - // LOG_WARN("%s\n", str); - } else if (level == MOSQ_LOG_NOTICE) { - // LOG_INFO("%s\n", str); - } -} - void StartMqttClient() { zlog_info(zct, "start mqtt \n"); std::string runinfo = "MQTT通信模块启动"; @@ -441,8 +275,7 @@ void StartMqttClient() { if (GlobalConfig::ServerIP.length() > 0) { std::string strEqupNo = GlobalConfig::MacAddr_G; std::string strVersion = GlobalConfig::Version; - std::string salt; - register_collback(my_connect_callback, my_message_callback, my_subscribe_callback, my_log_callback, my_disconnect_callback, my_publish_callback); + std::string salt; start_client(strEqupNo.c_str(), GlobalConfig::MacAddr_G.c_str(), GlobalConfig::ServerIP.c_str(), strVersion.c_str(), "11111111", salt); zlog_error(zct, "fail to connect server"); }