WLG/mqttclient/mqtt_client.cpp
2024-10-24 21:17:01 +08:00

265 lines
9.1 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "mqtt_client.h"
#include <signal.h>
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <malloc.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <memory.h>
#include <errno.h>
#include <json/json.h>
#include <sys/types.h>
#include <zlog.h>
#include "common/global.hpp"
#include "utility/secure.hpp"
extern zlog_category_t *zct;
extern zlog_category_t *zbt;
static char res_topic[100];
static char req_topic[100];
struct mosquitto *mosq = NULL;
static int mid_sent = 0;
struct userdata ud;
connect_callback my_connect_callback = NULL;
disconnect_callback my_disconnect_callback = NULL;
message_callback my_message_callback = NULL;
subscribe_callback my_subscribe_callback = NULL;
log_callback my_log_callback = NULL;
publish_callback my_publish_callback = NULL;
void register_collback(connect_callback connect_c, message_callback message_c, subscribe_callback subscribe_c, log_callback log_c, disconnect_callback disconnect_c, publish_callback publish_c) {
my_connect_callback = connect_c;
my_message_callback = message_c;
my_subscribe_callback = subscribe_c;
my_log_callback = log_c;
my_disconnect_callback = disconnect_c;
my_publish_callback = publish_c;
}
int data_publish(const char *str, const char *topic) {
int ret = -1;
if (mosq != NULL) {
std::string strTopic = std::string(topic);
if (strTopic.find("cmd") != std::string::npos || strTopic.find("configureInfo") != std::string::npos) {
std::string pData = (std::string)str;
char *base64_aes = new char[pData.length() * 2];
memset(base64_aes, 0, pData.length() * 2);
secure::instance().Base64Encode((unsigned char *)pData.c_str(), base64_aes, pData.length());
ret = mosquitto_publish(mosq, &mid_sent, topic, strlen(base64_aes), base64_aes, ud.topic_qos, false);
if (base64_aes != NULL) delete[] base64_aes;
if (ret != MOSQ_ERR_SUCCESS) {
zlog_error(zct, "fail to send mqtt msg, ret: [%s], topic: %s, str: %s", mosquitto_strerror(ret), topic, str);
}
} else {
ret = mosquitto_publish(mosq, &mid_sent, topic, strlen(str), str, ud.topic_qos, false);
if (ret != MOSQ_ERR_SUCCESS) {
zlog_error(zct, "fail to send mqtt msg, ret: [%s], topic: %s, str: %s", mosquitto_strerror(ret), topic, str);
}
}
}
if (ret != 0) {
disconnect();
}
return ret;
}
int data_publish_wave(WAVE_CONTAIN *str, const char *topic) {
int ret = -1;
if (mosq != NULL) {
zlog_info(zct, "data_publish:Unit : %s type:%d flag : %d channelid : %s total : %d count : %d number :%d floats:%f floate:%f\n", str->SensorEngineeringUnit, str->type, str->flag, str->channelId, str->total, str->count, str->number, str->waveData[0],
str->waveData[(str->number - 2)]);
ret = mosquitto_publish(mosq, &mid_sent, topic, sizeof(WAVE_CONTAIN), str, ud.topic_qos, false);
if (ret != MOSQ_ERR_SUCCESS) {
zlog_error(zct, "fail to send mqtt msg, ret: [%s], topic: %s", mosquitto_strerror(ret), topic);
}
}
return ret;
}
int subscribe(const char *topic, int qos) {
int ret = mosquitto_subscribe(mosq, NULL, topic, qos);
if (ret != MOSQ_ERR_SUCCESS) {
zlog_error(zct, "fail to subscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), topic);
}
return ret;
}
int unsubscribe(const char *topic) {
int ret = mosquitto_unsubscribe(mosq, NULL, topic);
if (ret != MOSQ_ERR_SUCCESS) {
zlog_error(zct, "fail to unsubscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), topic);
}
return ret;
}
int disconnect() { return mosquitto_disconnect(mosq); }
int reconnect() { return mosquitto_reconnect(mosq); }
int start_client(const char *boardname, const char *gwid, const char *gwip, const char *gwver, const char *gwcode, std::string &salt) {
char *host;
int port = 51613;
char *bind_address = NULL;
int keepalive = 60;
int rc = 0;
char will_payload[100];
long will_payloadlen = 0;
int will_qos = 0;
bool will_retain = false;
char *will_topic = NULL;
char *psk = NULL;
char *psk_identity = NULL;
mqttsdata_t mqttsdata;
signal(SIGCHLD, SIG_IGN); // signal函数作用为一个信号注册一个信号处理函数。其中SIGCHLD在一个进程终止或者停止时将SIGCHLD信号发送给父进程。而信号处理函数是一个函数指针不过它使用的是预定义函数SIG_IGN(忽视信号)
signal(SIGPIPE, SIG_IGN);
if (my_connect_callback == NULL || my_message_callback == NULL || my_subscribe_callback == NULL || my_log_callback == NULL) {
zlog_error(zct, "please call the register_collback function first.");
return -1;
}
int qos = readIntValue("config", "qos", (char *)GlobalConfig::Config_G.c_str());
memset(&mqttsdata, 0, sizeof(mqttsdata_t));
memcpy(mqttsdata.gwid, gwid, strlen(gwid));
salt = std::string(mqttsdata.salt);
memset(&ud, 0, sizeof(struct userdata));
ud.eol = true;
char localip[100];
sprintf(localip, "%s", gwip);
host = localip;
mqttsdata.port = GlobalConfig::ServerPort;
port = GlobalConfig::ServerPort;
#ifdef WIFI_MODULE
mqttsdata.qos = 0;
#else
mqttsdata.qos = qos;
#endif
zlog_info(zct, "mqttsdata.qos = %d", mqttsdata.qos);
std::string username = ReadStrByOpt(SERVERCONFIG, "Server", "UserName");
std::string password = ReadStrByOpt(SERVERCONFIG, "Server", "Password");
if (username == "" || password == "") {
username = "chaos";
password = "HSD272*#xkd";
}
char userName[100];
memset(userName, 0, 100);
sprintf(userName, "%s", username.c_str());
char passWord[100];
memset(passWord, 0, 100);
sprintf(passWord, "%s", password.c_str());
zlog_info(zbt, "userName = %s , passWord = %s", userName, passWord);
ud.username = userName;
ud.password = passWord;
ud.gwid = mqttsdata.gwid;
ud.dataUrl = mqttsdata.dataUrl;
ud.fsUrl = mqttsdata.fsUrl;
will_qos = mqttsdata.qos;
ud.topic_qos = will_qos;
ud.topic_count++;
ud.topics = (char **)realloc(ud.topics, ud.topic_count * sizeof(char *));
if (ud.topics == NULL) {
zlog_error(zbt, "fail to realloc for ud.topics");
return 1;
}
memset(req_topic, 0, 100);
sprintf(req_topic, "dev/cidwcc");
ud.topics[ud.topic_count - 1] = req_topic;
ud.verbose = 1;
memset(res_topic, 0, 100);
sprintf(res_topic, "gw/will");
zlog_info(zct, "res_topic:%s", res_topic);
will_topic = res_topic;
ud.resp_topic = res_topic;
sprintf(will_payload, "{\"cmd\":\"15\",\"gwID\":\"%s\"}", mqttsdata.gwid);
will_payloadlen = strlen(will_payload);
zlog_info(zct, "will_payload:%s,will_payloadlen:%ld", will_payload, will_payloadlen);
mosquitto_lib_init();
mosq = mosquitto_new(NULL, true, &ud);
if (!mosq) {
zlog_error(zct, "fail to mosquitto_new, errno:%d", errno);
free(ud.topics);
mosquitto_lib_cleanup();
return 1;
}
mosquitto_log_callback_set(mosq, my_log_callback);
if (!psk) {
psk = (char *)malloc(32 * sizeof(char));
if (!psk) {
free(ud.topics);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
zlog_error(zct, "Error: No free space 1");
return -1;
}
memset(psk, 0, 32);
snprintf(psk, 32, "%d", getpid());
psk_identity = (char *)malloc(32 * sizeof(char));
if (!psk_identity) {
free(ud.topics);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
zlog_error(zct, "Error: No free space 2");
free(psk);
return -1;
}
strncpy(psk_identity, psk, 32);
}
if (mosquitto_will_set(mosq, will_topic, will_payloadlen, will_payload, will_qos, will_retain)) {
zlog_error(zct, "Error: Problem setting will.");
goto mosq_free;
}
if (ud.username && mosquitto_username_pw_set(mosq, ud.username, ud.password)) {
zlog_error(zct, "Error: Problem setting username and password.");
goto mosq_free;
}
mosquitto_connect_callback_set(mosq, my_connect_callback);
mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
mosquitto_message_callback_set(mosq, my_message_callback);
mosquitto_publish_callback_set(mosq, my_publish_callback);
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
rc = mosquitto_connect_bind(mosq, host, port, keepalive, bind_address);
zlog_info(zct, "host = %s port = %d rc = %d", host, port, rc);
if (rc) {
zlog_error(zct, "Unable to connect (%d).", rc);
goto mosq_free;
}
rc = mosquitto_loop_forever(mosq, -1, 1);
if (rc) {
zlog_error(zct, "Error: %s", mosquitto_strerror(rc));
}
mosq_free:
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
if (ud.topics != NULL) {
free(ud.topics);
}
if (psk) {
free(psk);
}
if (psk_identity) {
free(psk_identity);
}
sleep(1);
return 0;
}