WLG/mqttclient/mqtt_client.cpp

423 lines
14 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "mqtt_client.h"
#include <signal.h>
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <malloc.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <memory.h>
#include <errno.h>
#include <json/json.h>
#include <sys/types.h>
#include <zlog.h>
#include "common/global.hpp"
#include "localserver/local_server.hpp"
#include "utility/secure.hpp"
extern zlog_category_t *zct;
extern zlog_category_t *zbt;
static char res_topic[100];
static char req_topic[100];
static std::string uptime;
static long long connect_lost_time = 0; // ms
static long long connect_time = 0; // ms
struct mosquitto *mosq = NULL;
static int mid_sent = 0;
struct userdata ud;
connect_callback my_connect_cb = NULL;
disconnect_callback my_disconnect_cb = NULL;
message_callback my_message_cb = NULL;
subscribe_callback my_subscribe_cb = NULL;
log_callback my_log_cb = NULL;
publish_callback my_publish_cb = NULL;
void my_publish_callback(struct mosquitto *mosq, void *obj, int mid) {}
void my_connect_callback(struct mosquitto *mosq, void *obj, int result) {
struct userdata *ud;
ud = (struct userdata *)obj;
if (result) {
zlog_error(zct, "%s", mosquitto_connack_string(result));
return;
}
int ret = 0;
for (int i = 0; i < ud->topic_count; i++) {
zlog_info(zct, "mosquitto_subscribe ud->topics[%d]:%s", i, ud->topics[i]);
ret = mosquitto_subscribe(mosq, NULL, ud->topics[i], ud->topic_qos);
if (ret != MOSQ_ERR_SUCCESS) {
zlog_error(zct, "fail to subscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), ud->topics[i]);
return;
}
}
ret = mosquitto_subscribe(mosq, NULL, GlobalConfig::Topic_G.mSubData.c_str(), 1);
if (ret != MOSQ_ERR_SUCCESS) {
zlog_error(zct, "fail to subscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), GlobalConfig::Topic_G.mSubData.c_str());
return;
}
zlog_info(zct, "mosquitto_subscribe's return value: %d", ret);
char gwTime[32] = {0};
GetTimeNet(gwTime, 0);
connect_time = strtoll(gwTime, NULL, 10);
zlog_info(zct, "connect_time:%lld", connect_time);
long long difftime_ms = connect_time - connect_lost_time;
if (difftime_ms > 20 * 1000) { // 超过20秒判定为连接断开
char reply_string[256] = {0};
std::string startStatus = "0";
if (access(SYSTEMSTART, 0) >= 0) {
startStatus = GetFileContent(SYSTEMSTART, 1);
}
sprintf(reply_string, "{\"dataNodeGatewayNo\":\"%s\",\"softVersion\":\"%s\",\"status\":\"%s\"}", GlobalConfig::MacAddr_G.c_str(), GlobalConfig::Version.c_str(), startStatus.c_str());
Json::Value jsData;
Json::Value jsVal;
Json::FastWriter fw;
jsData["cmd"] = "15";
jsData["dataNodeGatewayNo"] = GlobalConfig::MacAddr_G;
std::string strCmd15 = fw.write(jsData);
std::string instr = std::string(reply_string);
std::string topic = "equipment/state/" + GlobalConfig::MacAddr_G;
ret = data_publish(instr.c_str(), topic.c_str());
if (ret != MOSQ_ERR_SUCCESS) {
zlog_error(zct, "Publish failed:%d, %s", ret, instr.c_str());
disconnect();
}
}
GlobalConfig::LinkStatus_G = 1;
zlog_info(zct, "Connect to server success.");
char buf[256] = {0};
sprintf(buf, "{\"dataNodeGatewayNo\":\"%s\",\"cmd\":\"12\",\"status\":\"REQ\"}", GlobalConfig::MacAddr_G.c_str());
std::string str = std::string(buf);
std::string runinfo = "本地服务器连接成功";
}
void my_disconnect_callback(struct mosquitto *mosq, void *obj, int result) {
disconnect();
zlog_info(zct, "The MQTT connection lost\n");
char gwTime[32] = {0};
GetTimeNet(gwTime, 0);
uptime = std::string(gwTime);
connect_lost_time = strtoll(uptime.c_str(), NULL, 10);
zlog_info(zct, "connect_lost_time:%lld\n", connect_lost_time);
GlobalConfig::LinkStatus_G = 0;
GlobalConfig::LinkCount = GlobalConfig::LinkCount + 1;
#ifdef WIFI_MODULE
char buf[128] = {0};
std::string wpa_state = "";
#ifdef G2UL_GATEWAY
wpa_state = "/usr/sbin/wpa_cli status|grep wpa_state | cut -f 2 -d '='";
#endif
#ifdef IMX6UL_GATEWAY
wpa_state = "/opt/Cidn/wpa_cli status|grep wpa_state | cut -f 2 -d '='";
#endif
system_custom(wpa_state.c_str(), buf);
std::string state = std::string(buf);
std::string RSSI_cmd = "";
#ifdef G2UL_GATEWAY
RSSI_cmd = "/usr/sbin/wpa_cli signal_poll|grep RSSI | cut -f 2 -d '='";
#endif
#ifdef IMX6UL_GATEWAY
RSSI_cmd = "/opt/Cidn/wpa_cli signal_poll|grep RSSI | cut -f 2 -d '='";
#endif
system_custom(RSSI_cmd.c_str(), buf);
std::string RSSI = std::string(buf);
int iRet = reconnect();
memset(buf, 0, sizeof(buf));
sprintf(buf, "wifi RSSI:%s,state:%s,MQTT reconnect :%d\n", RSSI.c_str(), state.c_str(), iRet);
zlog_info(zct, "%s\n", buf);
#else
#endif
}
void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) {
struct userdata *ud;
bool res;
assert(obj);
ud = (struct userdata *)obj;
if (message->retain && ud->no_retain) return;
if (ud->filter_outs) {
for (int i = 0; i < ud->filter_out_count; i++) {
mosquitto_topic_matches_sub(ud->filter_outs[i], message->topic, &res);
if (res) return;
}
}
if (ud->verbose) {
if (message->payloadlen) {
std::string strtopic(message->topic);
zlog_info(zct, "strtopic : %s \n", strtopic.c_str());
LocalServer::HandleFromServer((const char *)message->payload, message->payloadlen, message->topic);
if (ud->eol) {
}
} else {
if (ud->eol) {
zlog_info(zct, "%s (null)\n", message->topic);
}
}
fflush(stdout);
} else {
if (message->payloadlen) {
fwrite(message->payload, 1, message->payloadlen, stdout);
if (ud->eol) {
}
fflush(stdout);
}
}
}
void my_subscribe_callback(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos) {
int i;
zlog_info(zct, "Subscribed (mid: %d): %d", mid, granted_qos[0]);
for (i = 1; i < qos_count; i++) {
zlog_info(zct, ", %d", granted_qos[i]);
}
}
void my_log_callback(struct mosquitto *mosq, void *obj, int level, const char *str) {
if (level == MOSQ_LOG_ERR) {
// LOG_ERROR("%s\n", str);
} else if (level == MOSQ_LOG_WARNING) {
// LOG_WARN("%s\n", str);
} else if (level == MOSQ_LOG_NOTICE) {
// LOG_INFO("%s\n", str);
}
}
void register_callback(connect_callback connect_c, message_callback message_c, subscribe_callback subscribe_c, log_callback log_c, disconnect_callback disconnect_c, publish_callback publish_c) {
my_connect_cb = connect_c;
my_message_cb = message_c;
my_subscribe_cb = subscribe_c;
my_log_cb = log_c;
my_disconnect_cb = disconnect_c;
my_publish_cb = publish_c;
}
int data_publish(const char *str, const char *topic) {
int ret = -1;
if (mosq != NULL) {
std::string strTopic = std::string(topic);
if (strTopic.find("cmd") != std::string::npos || strTopic.find("configureInfo") != std::string::npos) {
std::string pData = (std::string)str;
char *base64_aes = new char[pData.length() * 2];
memset(base64_aes, 0, pData.length() * 2);
secure::instance().Base64Encode((unsigned char *)pData.c_str(), base64_aes, pData.length());
ret = mosquitto_publish(mosq, &mid_sent, topic, strlen(base64_aes), base64_aes, ud.topic_qos, false);
if (base64_aes != NULL) delete[] base64_aes;
if (ret != MOSQ_ERR_SUCCESS) {
zlog_error(zct, "fail to send mqtt msg, ret: [%s], topic: %s", mosquitto_strerror(ret), topic);
}
} else {
ret = mosquitto_publish(mosq, &mid_sent, topic, strlen(str), str, ud.topic_qos, false);
if (ret != MOSQ_ERR_SUCCESS) {
zlog_error(zct, "fail to send mqtt msg, ret: [%s], topic: %s", mosquitto_strerror(ret), topic);
}
}
if (ret != 0) {
disconnect();
}
}else{
zlog_warn(zct,"mosq == NULL");
}
return ret;
}
int subscribe(const char *topic, int qos) {
int ret = mosquitto_subscribe(mosq, NULL, topic, qos);
if (ret != MOSQ_ERR_SUCCESS) {
zlog_error(zct, "fail to subscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), topic);
}
return ret;
}
int unsubscribe(const char *topic) {
int ret = mosquitto_unsubscribe(mosq, NULL, topic);
if (ret != MOSQ_ERR_SUCCESS) {
zlog_error(zct, "fail to unsubscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), topic);
}
return ret;
}
int disconnect() { return mosquitto_disconnect(mosq); }
int reconnect() { return mosquitto_reconnect(mosq); }
int start_client(const char *boardname, const char *gwid, const char *gwip, const char *gwver, const char *gwcode, std::string &salt) {
char *host;
int port = 51613;
char *bind_address = NULL;
int keepalive = 60;
int rc = 0;
char will_payload[100];
long will_payloadlen = 0;
int will_qos = 0;
bool will_retain = false;
char *will_topic = NULL;
char *psk = NULL;
char *psk_identity = NULL;
mqttsdata_t mqttsdata;
signal(SIGCHLD, SIG_IGN); // signal函数作用为一个信号注册一个信号处理函数。其中SIGCHLD在一个进程终止或者停止时将SIGCHLD信号发送给父进程。而信号处理函数是一个函数指针不过它使用的是预定义函数SIG_IGN(忽视信号)
signal(SIGPIPE, SIG_IGN);
register_callback(my_connect_callback, my_message_callback, my_subscribe_callback, my_log_callback, my_disconnect_callback, my_publish_callback);
int qos = readIntValue("config", "qos", (char *)GlobalConfig::Config_G.c_str());
memset(&mqttsdata, 0, sizeof(mqttsdata_t));
memcpy(mqttsdata.gwid, gwid, strlen(gwid));
salt = std::string(mqttsdata.salt);
memset(&ud, 0, sizeof(struct userdata));
ud.eol = true;
char localip[100];
sprintf(localip, "%s", gwip);
host = localip;
mqttsdata.port = GlobalConfig::ServerPort;
port = GlobalConfig::ServerPort;
#ifdef WIFI_MODULE
mqttsdata.qos = 0;
#else
mqttsdata.qos = qos;
#endif
zlog_info(zct, "mqttsdata.qos = %d", mqttsdata.qos);
std::string username = ReadStrByOpt(SERVERCONFIG, "Server", "UserName");
std::string password = ReadStrByOpt(SERVERCONFIG, "Server", "Password");
if (username == "" || password == "") {
username = "chaos";
password = "HSD272*#xkd";
}
char userName[100];
memset(userName, 0, 100);
sprintf(userName, "%s", username.c_str());
char passWord[100];
memset(passWord, 0, 100);
sprintf(passWord, "%s", password.c_str());
zlog_info(zbt, "userName = %s , passWord = %s", userName, passWord);
ud.username = userName;
ud.password = passWord;
ud.gwid = mqttsdata.gwid;
ud.dataUrl = mqttsdata.dataUrl;
ud.fsUrl = mqttsdata.fsUrl;
will_qos = mqttsdata.qos;
ud.topic_qos = will_qos;
ud.topic_count++;
ud.topics = (char **)realloc(ud.topics, ud.topic_count * sizeof(char *));
if (ud.topics == NULL) {
zlog_error(zbt, "fail to realloc for ud.topics");
return 1;
}
memset(req_topic, 0, 100);
sprintf(req_topic, "dev/cidwcc");
ud.topics[ud.topic_count - 1] = req_topic;
ud.verbose = 1;
memset(res_topic, 0, 100);
sprintf(res_topic, "gw/will");
zlog_info(zct, "res_topic:%s", res_topic);
will_topic = res_topic;
ud.resp_topic = res_topic;
sprintf(will_payload, "{\"cmd\":\"15\",\"gwID\":\"%s\"}", mqttsdata.gwid);
will_payloadlen = strlen(will_payload);
zlog_info(zct, "will_payload:%s,will_payloadlen:%ld", will_payload, will_payloadlen);
mosquitto_lib_init();
mosq = mosquitto_new(NULL, true, &ud);
if (!mosq) {
zlog_error(zct, "fail to mosquitto_new, errno:%d", errno);
free(ud.topics);
mosquitto_lib_cleanup();
return 1;
}
mosquitto_log_callback_set(mosq, my_log_cb);
if (!psk) {
psk = (char *)malloc(32 * sizeof(char));
if (!psk) {
free(ud.topics);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
mosq = NULL;
zlog_error(zct, "Error: No free space 1");
return -1;
}
memset(psk, 0, 32);
snprintf(psk, 32, "%d", getpid());
psk_identity = (char *)malloc(32 * sizeof(char));
if (!psk_identity) {
free(ud.topics);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
mosq = NULL;
zlog_error(zct, "Error: No free space 2");
free(psk);
return -1;
}
strncpy(psk_identity, psk, 32);
}
if (mosquitto_will_set(mosq, will_topic, will_payloadlen, will_payload, will_qos, will_retain)) {
zlog_error(zct, "Error: Problem setting will.");
goto mosq_free;
}
if (ud.username && mosquitto_username_pw_set(mosq, ud.username, ud.password)) {
zlog_error(zct, "Error: Problem setting username and password.");
goto mosq_free;
}
mosquitto_connect_callback_set(mosq, my_connect_cb);
mosquitto_disconnect_callback_set(mosq, my_disconnect_cb);
mosquitto_message_callback_set(mosq, my_message_cb);
mosquitto_publish_callback_set(mosq, my_publish_cb);
mosquitto_subscribe_callback_set(mosq, my_subscribe_cb);
rc = mosquitto_connect_bind(mosq, host, port, keepalive, bind_address);
zlog_info(zct, "host = %s port = %d rc = %d", host, port, rc);
if (rc) {
zlog_error(zct, "Unable to connect (%d).", rc);
goto mosq_free;
}
rc = mosquitto_loop_forever(mosq, -1, 1);
if (rc) {
zlog_error(zct, "Error: %s", mosquitto_strerror(rc));
}
mosq_free:
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
mosq = NULL;
if (ud.topics != NULL) {
free(ud.topics);
}
if (psk) {
free(psk);
}
if (psk_identity) {
free(psk_identity);
}
sleep(1);
return 0;
}