wirelessgateway/mqttclient/SH_MqttClient.cpp

375 lines
12 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include <signal.h>
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <malloc.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <memory.h>
#include <errno.h>
#include <json/json.h>
#include <sys/types.h>
#include "SH_MqttClient.h"
#include "../common/SH_global.h"
#include "../secure/SH_Secure.hpp"
static char res_topic[100];
static char req_topic[100];
struct mosquitto *mosq = NULL;
static int mid_sent = 0;
struct userdata ud;
connect_callback my_connect_callback = NULL;
disconnect_callback my_disconnect_callback = NULL;
message_callback my_message_callback = NULL;
subscribe_callback my_subscribe_callback = NULL;
log_callback my_log_callback = NULL;
publish_callback my_publish_callback = NULL;
void register_collback(connect_callback connect_c,
message_callback message_c,
subscribe_callback subscribe_c,
log_callback log_c,
disconnect_callback disconnect_c,
publish_callback publish_c)
{
my_connect_callback = connect_c;
my_message_callback = message_c;
my_subscribe_callback = subscribe_c;
my_log_callback = log_c;
my_disconnect_callback = disconnect_c;
my_publish_callback = publish_c;
}
int data_publish(const char *str, const char *topic)
{
int iRet = -1;
if(mosq != NULL){
//print_blue("data_publish: %s %s\n", str, topic);
std::string strTopic = std::string(topic);
if (strTopic.find("cmd") != std::string::npos || \
strTopic.find("configureInfo") != std::string::npos ) {
std::string pData = (std::string)str;
char *base64_aes = new char[pData.length() * 2];
memset(base64_aes, 0, pData.length() * 2);
Secure::instance()->Base64Encode((unsigned char*)pData.c_str(), base64_aes, pData.length());
iRet = mosquitto_publish(mosq, &mid_sent, topic, strlen(base64_aes), base64_aes, ud.topic_qos, false);
//char *base64_data = new char[pData.length() * 2];
//memset(base64_data, 0, pData.length() *2);
//Secure::instance()->Base64Decode(base64_aes, (unsigned char*)base64_data);
//std::string pData = std::string(base64_data);
if(base64_aes != NULL)
delete[] base64_aes;
// free(out);
} else {
iRet = mosquitto_publish(mosq, &mid_sent, topic, strlen(str), str, ud.topic_qos, false);
}
}
if(iRet != 0)
{
disconnect();
}
return iRet;
}
int data_publish_wave(WAVE_CONTAIN *str, const char *topic)
{
int iRet = -1;
if(mosq != NULL){
print_blue("data_publish:Unit : %s type:%d flag : %d channelid : %s total : %d count : %d number :%d floats:%f floate:%f\n",str->SensorEngineeringUnit, str->type, str->flag, str->channelId, str->total, str->count,str->number,str->waveData[0],str->waveData[(str->number-2)]);
iRet = mosquitto_publish(mosq, &mid_sent, topic, sizeof(WAVE_CONTAIN), str, ud.topic_qos, false);
}
return iRet;
}
int subscribe(const char* topic, int qos)
{
int iret = mosquitto_subscribe(mosq, NULL, topic, qos);
return iret;
}
int unsubscribe(const char* topic)
{
int iret = mosquitto_unsubscribe(mosq, NULL, topic);
return iret;
}
int disconnect()
{
return mosquitto_disconnect(mosq);
}
int reconnect()
{
return mosquitto_reconnect(mosq);
}
int start_client(const char *boardname, const char *gwid, const char* gwip, const char *gwver, const char *gwcode, std::string &salt)
{
char *id = NULL;
char *id_prefix = NULL;
char *host;
int port = 51613;
char *bind_address = NULL;
int keepalive = 60;
bool clean_session = true;
bool debug = true;
int rc = 0;
char hostname[256];
char err[1024];
int len;
char will_payload[100];
long will_payloadlen = 0;
int will_qos = 0;
bool will_retain = false;
char *will_topic = NULL;
bool insecure = false;
char *tls_version = NULL;
char *psk = NULL;
char *psk_identity = NULL;
char *ciphers = NULL;
bool use_srv = false;
mqttsdata_t mqttsdata;
signal(SIGCHLD, SIG_IGN);//signal函数作用为一个信号注册一个信号处理函数。其中SIGCHLD在一个进程终止或者停止时将SIGCHLD信号发送给父进程。而信号处理函数是一个函数指针不过它使用的是预定义函数SIG_IGN(忽视信号)
signal(SIGPIPE, SIG_IGN);
int reconnect = 0;
if (my_connect_callback == NULL || my_message_callback == NULL || my_subscribe_callback == NULL || my_log_callback == NULL) {
print_info("please call the register_collback function first.\n");
return -1;
}
int qos = readIntValue( "config", "qos",(char*)GlobalConfig::Config_G.c_str());
mosquitto_lib_init();//使用mosquitto库函数前需进行初始化
memset(&mqttsdata, 0, sizeof(mqttsdata_t));
memcpy(mqttsdata.gwid, gwid, strlen(gwid));
// mqttsdata.username = "admin";
// mqttsdata.password = "password";
salt = std::string(mqttsdata.salt);
memset(&ud, 0, sizeof(struct userdata));
ud.eol = true;
char localip[100];
sprintf(localip, "%s", gwip);
host = localip;
mqttsdata.port = GlobalConfig::ServerPort;
port = GlobalConfig::ServerPort;
#ifdef WIFI_MODULE
mqttsdata.qos = 0;
#else
mqttsdata.qos = qos;
#endif
print_info("mqttsdata.qos = %d\n",mqttsdata.qos);
// std::string username = "root";
// std::string password = "xom!*283#@XHG";
std::string username = ReadStrByOpt(SERVERCONFIG, "Server", "UserName");
std::string password = ReadStrByOpt(SERVERCONFIG, "Server", "Password");
if(username == "" || password == ""){
username = "chaos";
password = "HSD272*#xkd";
}
char userName[100];
sprintf(userName, "%s", username.c_str());
char passWord[100];
sprintf(passWord, "%s", password.c_str());
print_info("userName = %s , passWord = %s\n",userName,passWord);
ud.username = userName;
ud.password = passWord;
ud.gwid = mqttsdata.gwid;
ud.dataUrl = mqttsdata.dataUrl;
ud.fsUrl = mqttsdata.fsUrl;
will_qos = mqttsdata.qos;
ud.topic_qos = will_qos;
ud.topic_count++;
ud.topics = (char**)realloc(ud.topics, ud.topic_count*sizeof(char *));
if (ud.topics == NULL) {
return 1;
}
memset(req_topic, 0, 100);
sprintf(req_topic, "dev/cidwcc");
ud.topics[ud.topic_count - 1] = req_topic;
ud.verbose = 1;
//fprintf(stderr, "topic = %s!!!!!!!!!!!\n", ud.topics[ud.topic_count - 1]);
memset(res_topic, 0, 100);
sprintf(res_topic, "gw/will");
print_info("res_topic:%s\n", res_topic);
will_topic = res_topic;
ud.resp_topic = res_topic;
sprintf(will_payload, "{\"cmd\":\"15\",\"gwID\":\"%s\"}", mqttsdata.gwid);
will_payloadlen = strlen(will_payload);
print_info("will_payload:%s,will_payloadlen:%ld\n", will_payload, will_payloadlen);
if (clean_session == false && (id_prefix || !id)){
if (!ud.quiet) fprintf(stderr, "Error: You must provide a client id if you are using the -c option.\n");
return 1;
}
if (ud.topic_count == 0){
fprintf(stderr, "Error: You must specify a topic to subscribe to.\n");
return 1;
}
if (will_payload && !will_topic){
fprintf(stderr, "Error: Will payload given, but no will topic given.\n");
return 1;
}
if (will_retain && !will_topic){
fprintf(stderr, "Error: Will retain given, but no will topic given.\n");
return 1;
}
if (ud.password && !ud.username){
if (!ud.quiet) fprintf(stderr, "Warning: Not using password since username not set.\n");
}
if (psk && !psk_identity){
if (!ud.quiet) fprintf(stderr, "Error: --psk-identity required if --psk used.\n");
return 1;
}
if (id_prefix){
id = (char*)malloc(strlen(id_prefix) + 10);
if (!id){
if (!ud.quiet) fprintf(stderr, "Error: Out of memory.\n");
mosquitto_lib_cleanup();
return 1;
}
snprintf(id, strlen(id_prefix) + 10, "%s%d", id_prefix, getpid());
} else if (!id){
hostname[0] = '\0';
gethostname(hostname, 256);
hostname[255] = '\0';
len = strlen("mosqsub/-") + 6 + strlen(hostname);
id = (char*)malloc(len);
if (!id){
if (!ud.quiet) fprintf(stderr, "Error: Out of memory.\n");
mosquitto_lib_cleanup();
return 1;
}
snprintf(id, len, "mosqsub/%d-%s", getpid(), hostname);
if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH){
/* Enforce maximum client id length of 23 characters */
id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0';
}
}
print_info("id:%s\n", id);
mosq = mosquitto_new(id, clean_session, &ud);
if (!mosq){
switch (errno){
case ENOMEM:
if (!ud.quiet) fprintf(stderr, "Error: Out of memory.\n");
break;
case EINVAL:
if (!ud.quiet) fprintf(stderr, "Error: Invalid id and/or clean_session .\n");
break;
}
mosquitto_lib_cleanup();
return 1;
}
if (debug){
mosquitto_log_callback_set(mosq, my_log_callback);
}
if (!psk) {
psk = (char *)malloc(32 * sizeof(char));
if (!psk) {
fprintf(stderr, "Error: No free space %d\n", __LINE__);
return -1;
}
memset(psk, 0, 32);
snprintf(psk, 32, "%ld", getpid());
psk_identity = (char *)malloc(32 * sizeof(char));
if (!psk_identity) {
fprintf(stderr, "Error: No free space %d\n", __LINE__);
free(psk);
return -1;
}
strncpy(psk_identity, psk, 32);
}
if (will_topic && mosquitto_will_set(mosq, will_topic, will_payloadlen, will_payload, will_qos, will_retain)) {
if (!ud.quiet) fprintf(stderr, "Error: Problem setting will.\n");
mosquitto_lib_cleanup();
free(psk);
free(psk_identity);
return 1;
}
if (ud.username && mosquitto_username_pw_set(mosq, ud.username, ud.password)) {
if (!ud.quiet) fprintf(stderr, "Error: Problem setting username and password.\n");
mosquitto_lib_cleanup();
free(psk);
free(psk_identity);
return 1;
}
// if (insecure && mosquitto_tls_insecure_set(mosq, false)){
// if (!ud.quiet) fprintf(stderr, "Error: Problem setting TLS insecure option.\n");
// mosquitto_lib_cleanup();
// free(psk);
// free(psk_identity);
// return 1;
// }
// if (psk && mosquitto_tls_psk_set(mosq, psk, psk_identity, NULL)){
// if (!ud.quiet) fprintf(stderr, "Error: Problem setting TLS-PSK options.\n");
// mosquitto_lib_cleanup();
// return 1;
// }
// if (tls_version && mosquitto_tls_opts_set(mosq, 0, tls_version, ciphers)){
// if (!ud.quiet) fprintf(stderr, "Error: Problem setting TLS options.\n");
// mosquitto_lib_cleanup();
// return 1;
// }
mosquitto_connect_callback_set(mosq, my_connect_callback);
mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
mosquitto_message_callback_set(mosq, my_message_callback);
mosquitto_publish_callback_set(mosq, my_publish_callback);
if (debug){
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
}
if (use_srv){
rc = mosquitto_connect_srv(mosq, host, keepalive, bind_address);
}
else{
rc = mosquitto_connect_bind(mosq, host, port, keepalive, bind_address);
}
print_info("host = %s port = %d rc = %d\n",host,port,rc);
if (rc){
if (!ud.quiet){
if (rc == MOSQ_ERR_ERRNO){
//strerror_r(errno, err, 1024);
//fprintf(stderr, "Error_: %s\n", err);
//print_info("%s",stderr);
}
else{
fprintf(stderr, "Unable to connect (%d).\n", rc);
print_info("%s",stderr);
}
}
goto mosq_free;
}
rc = mosquitto_loop_forever(mosq, -1, 1);
if (rc)
{
fprintf(stderr, "Error: %s\n", mosquitto_strerror(rc));
print_info("%s",stderr);
}
mosq_free:
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
free(ud.topics);
free(id);
free(psk);
free(psk_identity);
ud.topics = NULL;
id = NULL;
mosq = NULL;
sleep(1);
}