WLG/mqttclient/mqtt_client.cpp

414 lines
14 KiB
C++
Raw Normal View History

2024-10-23 09:22:06 +08:00
#include "mqtt_client.h"
2024-10-19 16:02:41 +08:00
#include <signal.h>
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <malloc.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <memory.h>
#include <errno.h>
#include <json/json.h>
#include <sys/types.h>
2024-10-23 19:51:01 +08:00
#include <zlog.h>
2024-10-23 09:22:06 +08:00
#include "common/global.hpp"
2024-10-25 09:35:37 +08:00
#include "localserver/local_server.hpp"
2024-10-23 09:22:06 +08:00
#include "utility/secure.hpp"
2024-10-19 16:02:41 +08:00
2024-10-24 16:01:21 +08:00
extern zlog_category_t *zct;
2024-10-24 20:36:27 +08:00
extern zlog_category_t *zbt;
2024-10-24 16:01:21 +08:00
2024-10-19 16:02:41 +08:00
static char res_topic[100];
static char req_topic[100];
2024-10-25 09:35:37 +08:00
static std::string uptime;
static long long connect_lost_time = 0; // ms
static long long connect_time = 0; // ms
2024-10-19 16:02:41 +08:00
struct mosquitto *mosq = NULL;
static int mid_sent = 0;
struct userdata ud;
2024-10-25 09:35:37 +08:00
connect_callback my_connect_cb = NULL;
disconnect_callback my_disconnect_cb = NULL;
message_callback my_message_cb = NULL;
subscribe_callback my_subscribe_cb = NULL;
log_callback my_log_cb = NULL;
publish_callback my_publish_cb = NULL;
void my_publish_callback(struct mosquitto *mosq, void *obj, int mid) {}
void my_connect_callback(struct mosquitto *mosq, void *obj, int result) {
struct userdata *ud;
ud = (struct userdata *)obj;
if (result) {
zlog_error(zct, "%s", mosquitto_connack_string(result));
return;
}
int ret = 0;
for (int i = 0; i < ud->topic_count; i++) {
zlog_info(zct, "mosquitto_subscribe ud->topics[%d]:%s", i, ud->topics[i]);
ret = mosquitto_subscribe(mosq, NULL, ud->topics[i], ud->topic_qos);
if (ret != MOSQ_ERR_SUCCESS) {
zlog_error(zct, "fail to subscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), ud->topics[i]);
return;
}
}
ret = mosquitto_subscribe(mosq, NULL, GlobalConfig::Topic_G.mSubData.c_str(), 1);
if (ret != MOSQ_ERR_SUCCESS) {
zlog_error(zct, "fail to subscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), GlobalConfig::Topic_G.mSubData.c_str());
return;
}
zlog_info(zct, "mosquitto_subscribe's return value: %d", ret);
char gwTime[32] = {0};
GetTimeNet(gwTime, 0);
connect_time = strtoll(gwTime, NULL, 10);
zlog_info(zct, "connect_time:%lld", connect_time);
long long difftime_ms = connect_time - connect_lost_time;
if (difftime_ms > 20 * 1000) { // 超过20秒判定为连接断开
char reply_string[256] = {0};
std::string startStatus = "0";
if (access(SYSTEMSTART, 0) >= 0) {
startStatus = GetFileContent(SYSTEMSTART, 1);
}
sprintf(reply_string, "{\"dataNodeGatewayNo\":\"%s\",\"softVersion\":\"%s\",\"status\":\"%s\"}", GlobalConfig::MacAddr_G.c_str(), GlobalConfig::Version.c_str(), startStatus.c_str());
Json::Value jsData;
Json::Value jsVal;
Json::FastWriter fw;
jsData["cmd"] = "15";
jsData["dataNodeGatewayNo"] = GlobalConfig::MacAddr_G;
std::string strCmd15 = fw.write(jsData);
std::string instr = std::string(reply_string);
std::string topic = "equipment/state/" + GlobalConfig::MacAddr_G;
ret = data_publish(instr.c_str(), topic.c_str());
if (ret != MOSQ_ERR_SUCCESS) {
zlog_error(zct, "Publish failed:%d, %s", ret, instr.c_str());
disconnect();
}
}
GlobalConfig::LinkStatus_G = 1;
zlog_info(zct, "Connect to server success.");
char buf[256] = {0};
sprintf(buf, "{\"dataNodeGatewayNo\":\"%s\",\"cmd\":\"12\",\"status\":\"REQ\"}", GlobalConfig::MacAddr_G.c_str());
std::string str = std::string(buf);
std::string runinfo = "本地服务器连接成功";
}
void my_disconnect_callback(struct mosquitto *mosq, void *obj, int result) {
disconnect();
zlog_info(zct, "The MQTT connection lost\n");
char gwTime[32] = {0};
GetTimeNet(gwTime, 0);
uptime = std::string(gwTime);
connect_lost_time = strtoll(uptime.c_str(), NULL, 10);
zlog_info(zct, "connect_lost_time:%lld\n", connect_lost_time);
GlobalConfig::LinkStatus_G = 0;
GlobalConfig::LinkCount = GlobalConfig::LinkCount + 1;
#ifdef WIFI_MODULE
char buf[128] = {0};
std::string wpa_state = "";
wpa_state = "/opt/Cidn/wpa_cli status|grep wpa_state | cut -f 2 -d '='";
2024-11-14 19:01:30 +08:00
2024-10-25 09:35:37 +08:00
system_custom(wpa_state.c_str(), buf);
std::string state = std::string(buf);
std::string RSSI_cmd = "";
RSSI_cmd = "/opt/Cidn/wpa_cli signal_poll|grep RSSI | cut -f 2 -d '='";
system_custom(RSSI_cmd.c_str(), buf);
std::string RSSI = std::string(buf);
int iRet = reconnect();
memset(buf, 0, sizeof(buf));
sprintf(buf, "wifi RSSI:%s,state:%s,MQTT reconnect :%d\n", RSSI.c_str(), state.c_str(), iRet);
zlog_info(zct, "%s\n", buf);
#else
#endif
}
void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) {
struct userdata *ud;
bool res;
assert(obj);
ud = (struct userdata *)obj;
if (message->retain && ud->no_retain) return;
if (ud->filter_outs) {
for (int i = 0; i < ud->filter_out_count; i++) {
mosquitto_topic_matches_sub(ud->filter_outs[i], message->topic, &res);
if (res) return;
}
}
if (ud->verbose) {
if (message->payloadlen) {
std::string strtopic(message->topic);
zlog_info(zct, "strtopic : %s \n", strtopic.c_str());
LocalServer::HandleFromServer((const char *)message->payload, message->payloadlen, message->topic);
if (ud->eol) {
}
} else {
if (ud->eol) {
zlog_info(zct, "%s (null)\n", message->topic);
}
}
fflush(stdout);
} else {
if (message->payloadlen) {
fwrite(message->payload, 1, message->payloadlen, stdout);
if (ud->eol) {
}
fflush(stdout);
}
}
}
void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos) {
int i;
zlog_info(zct, "Subscribed (mid: %d): %d", mid, granted_qos[0]);
for (i = 1; i < qos_count; i++) {
zlog_info(zct, ", %d", granted_qos[i]);
}
}
void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str) {
if (level == MOSQ_LOG_ERR) {
// LOG_ERROR("%s\n", str);
} else if (level == MOSQ_LOG_WARNING) {
// LOG_WARN("%s\n", str);
} else if (level == MOSQ_LOG_NOTICE) {
// LOG_INFO("%s\n", str);
}
}
void register_callback(connect_callback connect_c, message_callback message_c, subscribe_callback subscribe_c, log_callback log_c, disconnect_callback disconnect_c, publish_callback publish_c) {
my_connect_cb = connect_c;
my_message_cb = message_c;
my_subscribe_cb = subscribe_c;
my_log_cb = log_c;
my_disconnect_cb = disconnect_c;
my_publish_cb = publish_c;
2024-10-19 16:02:41 +08:00
}
2024-10-22 19:04:25 +08:00
int data_publish(const char *str, const char *topic) {
2024-10-24 20:36:27 +08:00
int ret = -1;
2024-10-22 19:04:25 +08:00
if (mosq != NULL) {
2024-10-19 16:02:41 +08:00
std::string strTopic = std::string(topic);
2024-11-22 15:20:22 +08:00
if (strTopic.find("cmd") != std::string::npos || strTopic.find("configureInfo") != std::string::npos || strTopic.find("status") != std::string::npos) {
zlog_info(zct,"strTopic = %s,pData = %s",strTopic.c_str(),str);
2024-10-19 16:02:41 +08:00
std::string pData = (std::string)str;
char *base64_aes = new char[pData.length() * 2];
memset(base64_aes, 0, pData.length() * 2);
2024-10-24 16:01:21 +08:00
secure::instance().Base64Encode((unsigned char *)pData.c_str(), base64_aes, pData.length());
2024-10-24 20:36:27 +08:00
ret = mosquitto_publish(mosq, &mid_sent, topic, strlen(base64_aes), base64_aes, ud.topic_qos, false);
2024-10-22 19:04:25 +08:00
if (base64_aes != NULL) delete[] base64_aes;
2024-10-24 20:36:27 +08:00
if (ret != MOSQ_ERR_SUCCESS) {
2024-11-14 18:48:45 +08:00
zlog_error(zct, "fail to send mqtt msg, ret: [%s], topic: %s", mosquitto_strerror(ret), topic);
2024-10-24 20:36:27 +08:00
}
2024-10-19 16:02:41 +08:00
} else {
2024-10-24 20:36:27 +08:00
ret = mosquitto_publish(mosq, &mid_sent, topic, strlen(str), str, ud.topic_qos, false);
if (ret != MOSQ_ERR_SUCCESS) {
2024-11-14 18:48:45 +08:00
zlog_error(zct, "fail to send mqtt msg, ret: [%s], topic: %s", mosquitto_strerror(ret), topic);
2024-10-24 20:36:27 +08:00
}
2024-10-19 16:02:41 +08:00
}
2024-11-10 16:43:15 +08:00
if (ret != 0) {
disconnect();
}
}else{
zlog_warn(zct,"mosq == NULL");
2024-10-19 16:02:41 +08:00
}
2024-11-10 16:43:15 +08:00
2024-10-24 20:36:27 +08:00
return ret;
2024-10-19 16:02:41 +08:00
}
2024-10-22 19:04:25 +08:00
int subscribe(const char *topic, int qos) {
2024-10-24 20:36:27 +08:00
int ret = mosquitto_subscribe(mosq, NULL, topic, qos);
if (ret != MOSQ_ERR_SUCCESS) {
zlog_error(zct, "fail to subscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), topic);
}
return ret;
2024-10-19 16:02:41 +08:00
}
2024-10-22 19:04:25 +08:00
int unsubscribe(const char *topic) {
2024-10-24 20:36:27 +08:00
int ret = mosquitto_unsubscribe(mosq, NULL, topic);
if (ret != MOSQ_ERR_SUCCESS) {
zlog_error(zct, "fail to unsubscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), topic);
}
return ret;
2024-10-19 16:02:41 +08:00
}
2024-10-22 19:04:25 +08:00
int disconnect() { return mosquitto_disconnect(mosq); }
2024-10-19 16:02:41 +08:00
2024-10-22 19:04:25 +08:00
int reconnect() { return mosquitto_reconnect(mosq); }
2024-10-25 09:35:37 +08:00
2024-10-22 19:04:25 +08:00
int start_client(const char *boardname, const char *gwid, const char *gwip, const char *gwver, const char *gwcode, std::string &salt) {
2024-10-19 16:02:41 +08:00
char *host;
int port = 51613;
char *bind_address = NULL;
int keepalive = 60;
int rc = 0;
char will_payload[100];
long will_payloadlen = 0;
int will_qos = 0;
bool will_retain = false;
char *will_topic = NULL;
char *psk = NULL;
char *psk_identity = NULL;
mqttsdata_t mqttsdata;
2024-10-22 19:04:25 +08:00
signal(SIGCHLD, SIG_IGN); // signal函数作用为一个信号注册一个信号处理函数。其中SIGCHLD在一个进程终止或者停止时将SIGCHLD信号发送给父进程。而信号处理函数是一个函数指针不过它使用的是预定义函数SIG_IGN(忽视信号)
2024-10-19 16:02:41 +08:00
signal(SIGPIPE, SIG_IGN);
2024-10-25 09:35:37 +08:00
register_callback(my_connect_callback, my_message_callback, my_subscribe_callback, my_log_callback, my_disconnect_callback, my_publish_callback);
2024-10-19 16:02:41 +08:00
2024-10-24 20:36:27 +08:00
int qos = readIntValue("config", "qos", (char *)GlobalConfig::Config_G.c_str());
2024-10-19 16:02:41 +08:00
memset(&mqttsdata, 0, sizeof(mqttsdata_t));
memcpy(mqttsdata.gwid, gwid, strlen(gwid));
salt = std::string(mqttsdata.salt);
memset(&ud, 0, sizeof(struct userdata));
ud.eol = true;
char localip[100];
sprintf(localip, "%s", gwip);
host = localip;
mqttsdata.port = GlobalConfig::ServerPort;
port = GlobalConfig::ServerPort;
#ifdef WIFI_MODULE
mqttsdata.qos = 0;
#else
mqttsdata.qos = qos;
#endif
2024-10-24 20:36:27 +08:00
zlog_info(zct, "mqttsdata.qos = %d", mqttsdata.qos);
2024-10-19 16:02:41 +08:00
std::string username = ReadStrByOpt(SERVERCONFIG, "Server", "UserName");
std::string password = ReadStrByOpt(SERVERCONFIG, "Server", "Password");
2024-10-22 19:04:25 +08:00
if (username == "" || password == "") {
username = "chaos";
password = "HSD272*#xkd";
2024-10-19 16:02:41 +08:00
}
char userName[100];
2024-10-24 20:36:27 +08:00
memset(userName, 0, 100);
2024-10-19 16:02:41 +08:00
sprintf(userName, "%s", username.c_str());
char passWord[100];
2024-10-24 20:36:27 +08:00
memset(passWord, 0, 100);
2024-10-19 16:02:41 +08:00
sprintf(passWord, "%s", password.c_str());
2024-10-24 20:36:27 +08:00
zlog_info(zbt, "userName = %s , passWord = %s", userName, passWord);
2024-10-19 16:02:41 +08:00
ud.username = userName;
ud.password = passWord;
ud.gwid = mqttsdata.gwid;
ud.dataUrl = mqttsdata.dataUrl;
ud.fsUrl = mqttsdata.fsUrl;
will_qos = mqttsdata.qos;
ud.topic_qos = will_qos;
ud.topic_count++;
2024-10-22 19:04:25 +08:00
ud.topics = (char **)realloc(ud.topics, ud.topic_count * sizeof(char *));
2024-10-19 16:02:41 +08:00
if (ud.topics == NULL) {
2024-10-24 20:36:27 +08:00
zlog_error(zbt, "fail to realloc for ud.topics");
2024-10-19 16:02:41 +08:00
return 1;
}
memset(req_topic, 0, 100);
sprintf(req_topic, "dev/cidwcc");
ud.topics[ud.topic_count - 1] = req_topic;
ud.verbose = 1;
2024-10-23 19:51:01 +08:00
2024-10-19 16:02:41 +08:00
memset(res_topic, 0, 100);
sprintf(res_topic, "gw/will");
2024-10-24 20:36:27 +08:00
zlog_info(zct, "res_topic:%s", res_topic);
2024-10-19 16:02:41 +08:00
will_topic = res_topic;
ud.resp_topic = res_topic;
sprintf(will_payload, "{\"cmd\":\"15\",\"gwID\":\"%s\"}", mqttsdata.gwid);
will_payloadlen = strlen(will_payload);
2024-10-24 21:17:01 +08:00
zlog_info(zct, "will_payload:%s,will_payloadlen:%ld", will_payload, will_payloadlen);
2024-10-24 20:36:27 +08:00
mosquitto_lib_init();
mosq = mosquitto_new(NULL, true, &ud);
2024-10-22 19:04:25 +08:00
if (!mosq) {
2024-10-24 20:36:27 +08:00
zlog_error(zct, "fail to mosquitto_new, errno:%d", errno);
free(ud.topics);
2024-10-19 16:02:41 +08:00
mosquitto_lib_cleanup();
return 1;
}
2024-10-24 20:36:27 +08:00
2024-10-25 09:35:37 +08:00
mosquitto_log_callback_set(mosq, my_log_cb);
2024-10-19 16:02:41 +08:00
if (!psk) {
psk = (char *)malloc(32 * sizeof(char));
if (!psk) {
2024-10-24 20:36:27 +08:00
free(ud.topics);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
2024-11-10 16:43:15 +08:00
mosq = NULL;
2024-10-24 16:01:21 +08:00
zlog_error(zct, "Error: No free space 1");
2024-10-19 16:02:41 +08:00
return -1;
}
memset(psk, 0, 32);
2024-10-24 16:01:21 +08:00
snprintf(psk, 32, "%d", getpid());
2024-10-19 16:02:41 +08:00
psk_identity = (char *)malloc(32 * sizeof(char));
if (!psk_identity) {
2024-10-24 20:36:27 +08:00
free(ud.topics);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
2024-11-10 16:43:15 +08:00
mosq = NULL;
2024-10-24 16:01:21 +08:00
zlog_error(zct, "Error: No free space 2");
2024-10-19 16:02:41 +08:00
free(psk);
return -1;
}
strncpy(psk_identity, psk, 32);
}
2024-10-24 20:36:27 +08:00
if (mosquitto_will_set(mosq, will_topic, will_payloadlen, will_payload, will_qos, will_retain)) {
zlog_error(zct, "Error: Problem setting will.");
goto mosq_free;
2024-10-19 16:02:41 +08:00
}
if (ud.username && mosquitto_username_pw_set(mosq, ud.username, ud.password)) {
2024-10-24 20:36:27 +08:00
zlog_error(zct, "Error: Problem setting username and password.");
goto mosq_free;
2024-10-19 16:02:41 +08:00
}
2024-10-25 09:35:37 +08:00
mosquitto_connect_callback_set(mosq, my_connect_cb);
mosquitto_disconnect_callback_set(mosq, my_disconnect_cb);
mosquitto_message_callback_set(mosq, my_message_cb);
mosquitto_publish_callback_set(mosq, my_publish_cb);
mosquitto_subscribe_callback_set(mosq, my_subscribe_cb);
2024-10-19 16:02:41 +08:00
2024-10-24 20:36:27 +08:00
rc = mosquitto_connect_bind(mosq, host, port, keepalive, bind_address);
zlog_info(zct, "host = %s port = %d rc = %d", host, port, rc);
2024-10-22 19:04:25 +08:00
if (rc) {
2024-10-24 20:36:27 +08:00
zlog_error(zct, "Unable to connect (%d).", rc);
2024-10-19 16:02:41 +08:00
goto mosq_free;
}
2024-10-24 20:36:27 +08:00
2024-10-19 16:02:41 +08:00
rc = mosquitto_loop_forever(mosq, -1, 1);
2024-10-22 19:04:25 +08:00
if (rc) {
2024-10-23 19:51:01 +08:00
zlog_error(zct, "Error: %s", mosquitto_strerror(rc));
2024-10-19 16:02:41 +08:00
}
2024-10-24 20:36:27 +08:00
2024-10-19 16:02:41 +08:00
mosq_free:
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
2024-11-10 16:43:15 +08:00
mosq = NULL;
2024-10-24 20:36:27 +08:00
if (ud.topics != NULL) {
free(ud.topics);
}
if (psk) {
free(psk);
}
if (psk_identity) {
free(psk_identity);
}
2024-10-19 16:02:41 +08:00
sleep(1);
2024-10-24 16:01:21 +08:00
return 0;
2024-10-19 16:02:41 +08:00
}