wirelessgateway/mqttclient/SH_MqttClient.cpp

376 lines
12 KiB
C++
Raw Normal View History

2021-09-18 13:45:24 +08:00
#include <signal.h>
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <malloc.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <memory.h>
#include <errno.h>
#include <json/json.h>
#include <sys/types.h>
#include "SH_MqttClient.h"
#include "../common/SH_global.h"
#include "../secure/SH_Secure.hpp"
static char res_topic[100];
static char req_topic[100];
struct mosquitto *mosq = NULL;
static int mid_sent = 0;
struct userdata ud;
connect_callback my_connect_callback = NULL;
disconnect_callback my_disconnect_callback = NULL;
message_callback my_message_callback = NULL;
subscribe_callback my_subscribe_callback = NULL;
log_callback my_log_callback = NULL;
publish_callback my_publish_callback = NULL;
void register_collback(connect_callback connect_c,
message_callback message_c,
subscribe_callback subscribe_c,
log_callback log_c,
disconnect_callback disconnect_c,
publish_callback publish_c)
{
my_connect_callback = connect_c;
my_message_callback = message_c;
my_subscribe_callback = subscribe_c;
my_log_callback = log_c;
my_disconnect_callback = disconnect_c;
my_publish_callback = publish_c;
}
int data_publish(const char *str, const char *topic)
{
int iRet = -1;
if(mosq != NULL){
2024-07-09 09:49:42 +08:00
//print_blue("data_publish: %s %s\n", str, topic);
2021-09-18 13:45:24 +08:00
std::string strTopic = std::string(topic);
2024-07-09 09:49:42 +08:00
if (strTopic.find("cmd") != std::string::npos || \
strTopic.find("configureInfo") != std::string::npos ) {
2021-09-18 13:45:24 +08:00
std::string pData = (std::string)str;
char *base64_aes = new char[pData.length() * 2];
memset(base64_aes, 0, pData.length() * 2);
Secure::instance()->Base64Encode((unsigned char*)pData.c_str(), base64_aes, pData.length());
iRet = mosquitto_publish(mosq, &mid_sent, topic, strlen(base64_aes), base64_aes, ud.topic_qos, false);
2024-07-09 09:49:42 +08:00
//char *base64_data = new char[pData.length() * 2];
//memset(base64_data, 0, pData.length() *2);
//Secure::instance()->Base64Decode(base64_aes, (unsigned char*)base64_data);
//std::string pData = std::string(base64_data);
if(base64_aes != NULL)
delete[] base64_aes;
2021-09-18 13:45:24 +08:00
// free(out);
} else {
iRet = mosquitto_publish(mosq, &mid_sent, topic, strlen(str), str, ud.topic_qos, false);
}
}
2024-07-09 09:49:42 +08:00
if(iRet != 0)
{
reconnect();
}
2021-09-18 13:45:24 +08:00
return iRet;
}
int data_publish_wave(WAVE_CONTAIN *str, const char *topic)
{
int iRet = -1;
if(mosq != NULL){
print_blue("data_publish:Unit : %s type:%d flag : %d channelid : %s total : %d count : %d number :%d floats:%f floate:%f\n",str->SensorEngineeringUnit, str->type, str->flag, str->channelId, str->total, str->count,str->number,str->waveData[0],str->waveData[(str->number-2)]);
iRet = mosquitto_publish(mosq, &mid_sent, topic, sizeof(WAVE_CONTAIN), str, ud.topic_qos, false);
}
return iRet;
}
int subscribe(const char* topic, int qos)
{
int iret = mosquitto_subscribe(mosq, NULL, topic, qos);
return iret;
}
int unsubscribe(const char* topic)
{
int iret = mosquitto_unsubscribe(mosq, NULL, topic);
return iret;
}
int disconnect()
{
return mosquitto_disconnect(mosq);
}
2024-07-09 09:49:42 +08:00
int reconnect()
{
return mosquitto_reconnect(mosq);
}
2021-09-18 13:45:24 +08:00
int start_client(const char *boardname, const char *gwid, const char* gwip, const char *gwver, const char *gwcode, std::string &salt)
{
char *id = NULL;
char *id_prefix = NULL;
char *host;
int port = 51613;
char *bind_address = NULL;
int keepalive = 60;
bool clean_session = true;
bool debug = true;
int rc = 0;
char hostname[256];
char err[1024];
int len;
char will_payload[100];
long will_payloadlen = 0;
int will_qos = 0;
bool will_retain = false;
char *will_topic = NULL;
bool insecure = false;
char *tls_version = NULL;
char *psk = NULL;
char *psk_identity = NULL;
char *ciphers = NULL;
bool use_srv = false;
mqttsdata_t mqttsdata;
signal(SIGCHLD, SIG_IGN);//signal函数作用为一个信号注册一个信号处理函数。其中SIGCHLD在一个进程终止或者停止时将SIGCHLD信号发送给父进程。而信号处理函数是一个函数指针不过它使用的是预定义函数SIG_IGN(忽视信号)
signal(SIGPIPE, SIG_IGN);
int reconnect = 0;
if (my_connect_callback == NULL || my_message_callback == NULL || my_subscribe_callback == NULL || my_log_callback == NULL) {
2024-07-09 09:49:42 +08:00
print_info("please call the register_collback function first.\n");
2021-09-18 13:45:24 +08:00
return -1;
}
2024-07-09 09:49:42 +08:00
int qos = readIntValue( "config", "qos",(char*)GlobalConfig::Config_G.c_str());
2021-09-18 13:45:24 +08:00
mosquitto_lib_init();//使用mosquitto库函数前需进行初始化
memset(&mqttsdata, 0, sizeof(mqttsdata_t));
memcpy(mqttsdata.gwid, gwid, strlen(gwid));
// mqttsdata.username = "admin";
// mqttsdata.password = "password";
salt = std::string(mqttsdata.salt);
memset(&ud, 0, sizeof(struct userdata));
ud.eol = true;
char localip[100];
sprintf(localip, "%s", gwip);
host = localip;
mqttsdata.port = GlobalConfig::ServerPort;
port = GlobalConfig::ServerPort;
2024-07-09 09:49:42 +08:00
#ifdef WIFI_MODULE
mqttsdata.qos = 0;
#else
mqttsdata.qos = qos;
#endif
print_info("mqttsdata.qos = %d\n",mqttsdata.qos);
2021-09-18 13:45:24 +08:00
// std::string username = "root";
// std::string password = "xom!*283#@XHG";
2024-07-09 09:49:42 +08:00
std::string username = ReadStrByOpt(SERVERCONFIG, "Server", "UserName");
std::string password = ReadStrByOpt(SERVERCONFIG, "Server", "Password");
if(username == "" || password == ""){
username = "chaos";
password = "HSD272*#xkd";
}
2021-09-18 13:45:24 +08:00
char userName[100];
sprintf(userName, "%s", username.c_str());
char passWord[100];
sprintf(passWord, "%s", password.c_str());
2024-07-09 09:49:42 +08:00
print_info("userName = %s , passWord = %s\n",userName,passWord);
2021-09-18 13:45:24 +08:00
ud.username = userName;
ud.password = passWord;
ud.gwid = mqttsdata.gwid;
ud.dataUrl = mqttsdata.dataUrl;
ud.fsUrl = mqttsdata.fsUrl;
will_qos = mqttsdata.qos;
2024-07-09 09:49:42 +08:00
ud.topic_qos = will_qos;
2021-09-18 13:45:24 +08:00
ud.topic_count++;
ud.topics = (char**)realloc(ud.topics, ud.topic_count*sizeof(char *));
if (ud.topics == NULL) {
return 1;
}
memset(req_topic, 0, 100);
sprintf(req_topic, "dev/cidwcc");
ud.topics[ud.topic_count - 1] = req_topic;
ud.verbose = 1;
2024-07-09 09:49:42 +08:00
//fprintf(stderr, "topic = %s!!!!!!!!!!!\n", ud.topics[ud.topic_count - 1]);
2021-09-18 13:45:24 +08:00
memset(res_topic, 0, 100);
sprintf(res_topic, "gw/will");
2024-07-09 09:49:42 +08:00
print_info("res_topic:%s\n", res_topic);
2021-09-18 13:45:24 +08:00
will_topic = res_topic;
ud.resp_topic = res_topic;
sprintf(will_payload, "{\"cmd\":\"15\",\"gwID\":\"%s\"}", mqttsdata.gwid);
will_payloadlen = strlen(will_payload);
2024-07-09 09:49:42 +08:00
print_info("will_payload:%s,will_payloadlen:%ld\n", will_payload, will_payloadlen);
2021-09-18 13:45:24 +08:00
if (clean_session == false && (id_prefix || !id)){
if (!ud.quiet) fprintf(stderr, "Error: You must provide a client id if you are using the -c option.\n");
return 1;
}
if (ud.topic_count == 0){
fprintf(stderr, "Error: You must specify a topic to subscribe to.\n");
return 1;
}
if (will_payload && !will_topic){
fprintf(stderr, "Error: Will payload given, but no will topic given.\n");
return 1;
}
if (will_retain && !will_topic){
fprintf(stderr, "Error: Will retain given, but no will topic given.\n");
return 1;
}
if (ud.password && !ud.username){
if (!ud.quiet) fprintf(stderr, "Warning: Not using password since username not set.\n");
}
if (psk && !psk_identity){
if (!ud.quiet) fprintf(stderr, "Error: --psk-identity required if --psk used.\n");
return 1;
}
if (id_prefix){
id = (char*)malloc(strlen(id_prefix) + 10);
if (!id){
if (!ud.quiet) fprintf(stderr, "Error: Out of memory.\n");
mosquitto_lib_cleanup();
return 1;
}
snprintf(id, strlen(id_prefix) + 10, "%s%d", id_prefix, getpid());
} else if (!id){
hostname[0] = '\0';
gethostname(hostname, 256);
hostname[255] = '\0';
len = strlen("mosqsub/-") + 6 + strlen(hostname);
id = (char*)malloc(len);
if (!id){
if (!ud.quiet) fprintf(stderr, "Error: Out of memory.\n");
mosquitto_lib_cleanup();
return 1;
}
snprintf(id, len, "mosqsub/%d-%s", getpid(), hostname);
if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH){
/* Enforce maximum client id length of 23 characters */
id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0';
}
}
2024-07-09 09:49:42 +08:00
print_info("id:%s\n", id);
2021-09-18 13:45:24 +08:00
mosq = mosquitto_new(id, clean_session, &ud);
if (!mosq){
switch (errno){
case ENOMEM:
if (!ud.quiet) fprintf(stderr, "Error: Out of memory.\n");
break;
case EINVAL:
if (!ud.quiet) fprintf(stderr, "Error: Invalid id and/or clean_session .\n");
break;
}
mosquitto_lib_cleanup();
return 1;
}
if (debug){
mosquitto_log_callback_set(mosq, my_log_callback);
}
if (!psk) {
psk = (char *)malloc(32 * sizeof(char));
if (!psk) {
fprintf(stderr, "Error: No free space %d\n", __LINE__);
return -1;
}
memset(psk, 0, 32);
snprintf(psk, 32, "%ld", getpid());
psk_identity = (char *)malloc(32 * sizeof(char));
if (!psk_identity) {
fprintf(stderr, "Error: No free space %d\n", __LINE__);
free(psk);
return -1;
}
strncpy(psk_identity, psk, 32);
}
if (will_topic && mosquitto_will_set(mosq, will_topic, will_payloadlen, will_payload, will_qos, will_retain)) {
if (!ud.quiet) fprintf(stderr, "Error: Problem setting will.\n");
mosquitto_lib_cleanup();
free(psk);
free(psk_identity);
return 1;
}
if (ud.username && mosquitto_username_pw_set(mosq, ud.username, ud.password)) {
if (!ud.quiet) fprintf(stderr, "Error: Problem setting username and password.\n");
mosquitto_lib_cleanup();
free(psk);
free(psk_identity);
return 1;
}
// if (insecure && mosquitto_tls_insecure_set(mosq, false)){
// if (!ud.quiet) fprintf(stderr, "Error: Problem setting TLS insecure option.\n");
// mosquitto_lib_cleanup();
// free(psk);
// free(psk_identity);
// return 1;
// }
// if (psk && mosquitto_tls_psk_set(mosq, psk, psk_identity, NULL)){
// if (!ud.quiet) fprintf(stderr, "Error: Problem setting TLS-PSK options.\n");
// mosquitto_lib_cleanup();
// return 1;
// }
// if (tls_version && mosquitto_tls_opts_set(mosq, 0, tls_version, ciphers)){
// if (!ud.quiet) fprintf(stderr, "Error: Problem setting TLS options.\n");
// mosquitto_lib_cleanup();
// return 1;
// }
mosquitto_connect_callback_set(mosq, my_connect_callback);
mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
mosquitto_message_callback_set(mosq, my_message_callback);
mosquitto_publish_callback_set(mosq, my_publish_callback);
if (debug){
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
}
if (use_srv){
rc = mosquitto_connect_srv(mosq, host, keepalive, bind_address);
}
else{
rc = mosquitto_connect_bind(mosq, host, port, keepalive, bind_address);
}
2024-07-09 09:49:42 +08:00
print_info("host = %s port = %d rc = %d\n",host,port,rc);
2021-09-18 13:45:24 +08:00
if (rc){
if (!ud.quiet){
if (rc == MOSQ_ERR_ERRNO){
2024-07-09 09:49:42 +08:00
//strerror_r(errno, err, 1024);
//fprintf(stderr, "Error_: %s\n", err);
//print_info("%s",stderr);
2021-09-18 13:45:24 +08:00
}
else{
fprintf(stderr, "Unable to connect (%d).\n", rc);
2024-07-09 09:49:42 +08:00
print_info("%s",stderr);
2021-09-18 13:45:24 +08:00
}
}
goto mosq_free;
}
rc = mosquitto_loop_forever(mosq, -1, 1);
if (rc)
{
2024-07-09 09:49:42 +08:00
//fprintf(stderr, "Error: %s\n", mosquitto_strerror(rc));
//print_info("%s",stderr);
2021-09-18 13:45:24 +08:00
}
mosq_free:
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
free(ud.topics);
free(id);
free(psk);
free(psk_identity);
ud.topics = NULL;
id = NULL;
mosq = NULL;
sleep(1);
}