refine mqtt client codes.
This commit is contained in:
parent
fcdd364921
commit
82d74b6f08
@ -19,6 +19,7 @@
|
|||||||
#include "utility/secure.hpp"
|
#include "utility/secure.hpp"
|
||||||
|
|
||||||
extern zlog_category_t *zct;
|
extern zlog_category_t *zct;
|
||||||
|
extern zlog_category_t *zbt;
|
||||||
|
|
||||||
static char res_topic[100];
|
static char res_topic[100];
|
||||||
static char req_topic[100];
|
static char req_topic[100];
|
||||||
@ -42,7 +43,7 @@ void register_collback(connect_callback connect_c, message_callback message_c, s
|
|||||||
}
|
}
|
||||||
|
|
||||||
int data_publish(const char *str, const char *topic) {
|
int data_publish(const char *str, const char *topic) {
|
||||||
int iRet = -1;
|
int ret = -1;
|
||||||
if (mosq != NULL) {
|
if (mosq != NULL) {
|
||||||
std::string strTopic = std::string(topic);
|
std::string strTopic = std::string(topic);
|
||||||
if (strTopic.find("cmd") != std::string::npos || strTopic.find("configureInfo") != std::string::npos) {
|
if (strTopic.find("cmd") != std::string::npos || strTopic.find("configureInfo") != std::string::npos) {
|
||||||
@ -50,37 +51,52 @@ int data_publish(const char *str, const char *topic) {
|
|||||||
char *base64_aes = new char[pData.length() * 2];
|
char *base64_aes = new char[pData.length() * 2];
|
||||||
memset(base64_aes, 0, pData.length() * 2);
|
memset(base64_aes, 0, pData.length() * 2);
|
||||||
secure::instance().Base64Encode((unsigned char *)pData.c_str(), base64_aes, pData.length());
|
secure::instance().Base64Encode((unsigned char *)pData.c_str(), base64_aes, pData.length());
|
||||||
iRet = mosquitto_publish(mosq, &mid_sent, topic, strlen(base64_aes), base64_aes, ud.topic_qos, false);
|
ret = mosquitto_publish(mosq, &mid_sent, topic, strlen(base64_aes), base64_aes, ud.topic_qos, false);
|
||||||
|
|
||||||
if (base64_aes != NULL) delete[] base64_aes;
|
if (base64_aes != NULL) delete[] base64_aes;
|
||||||
|
if (ret != MOSQ_ERR_SUCCESS) {
|
||||||
|
zlog_error(zct, "fail to send mqtt msg, ret: [%s], topic: %s, str: %s", mosquitto_strerror(ret), topic, str);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
iRet = mosquitto_publish(mosq, &mid_sent, topic, strlen(str), str, ud.topic_qos, false);
|
ret = mosquitto_publish(mosq, &mid_sent, topic, strlen(str), str, ud.topic_qos, false);
|
||||||
|
if (ret != MOSQ_ERR_SUCCESS) {
|
||||||
|
zlog_error(zct, "fail to send mqtt msg, ret: [%s], topic: %s, str: %s", mosquitto_strerror(ret), topic, str);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (iRet != 0) {
|
}
|
||||||
|
if (ret != 0) {
|
||||||
disconnect();
|
disconnect();
|
||||||
}
|
}
|
||||||
return iRet;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int data_publish_wave(WAVE_CONTAIN *str, const char *topic) {
|
int data_publish_wave(WAVE_CONTAIN *str, const char *topic) {
|
||||||
int iRet = -1;
|
int ret = -1;
|
||||||
if (mosq != NULL) {
|
if (mosq != NULL) {
|
||||||
zlog_info(zct, "data_publish:Unit : %s type:%d flag : %d channelid : %s total : %d count : %d number :%d floats:%f floate:%f\n", str->SensorEngineeringUnit, str->type, str->flag, str->channelId, str->total, str->count, str->number, str->waveData[0],
|
zlog_info(zct, "data_publish:Unit : %s type:%d flag : %d channelid : %s total : %d count : %d number :%d floats:%f floate:%f\n", str->SensorEngineeringUnit, str->type, str->flag, str->channelId, str->total, str->count, str->number, str->waveData[0],
|
||||||
str->waveData[(str->number - 2)]);
|
str->waveData[(str->number - 2)]);
|
||||||
iRet = mosquitto_publish(mosq, &mid_sent, topic, sizeof(WAVE_CONTAIN), str, ud.topic_qos, false);
|
ret = mosquitto_publish(mosq, &mid_sent, topic, sizeof(WAVE_CONTAIN), str, ud.topic_qos, false);
|
||||||
|
if (ret != MOSQ_ERR_SUCCESS) {
|
||||||
|
zlog_error(zct, "fail to send mqtt msg, ret: [%s], topic: %s, str: %s", mosquitto_strerror(ret), topic, str);
|
||||||
}
|
}
|
||||||
return iRet;
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int subscribe(const char *topic, int qos) {
|
int subscribe(const char *topic, int qos) {
|
||||||
int iret = mosquitto_subscribe(mosq, NULL, topic, qos);
|
int ret = mosquitto_subscribe(mosq, NULL, topic, qos);
|
||||||
return iret;
|
if (ret != MOSQ_ERR_SUCCESS) {
|
||||||
|
zlog_error(zct, "fail to subscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), topic);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int unsubscribe(const char *topic) {
|
int unsubscribe(const char *topic) {
|
||||||
int iret = mosquitto_unsubscribe(mosq, NULL, topic);
|
int ret = mosquitto_unsubscribe(mosq, NULL, topic);
|
||||||
return iret;
|
if (ret != MOSQ_ERR_SUCCESS) {
|
||||||
|
zlog_error(zct, "fail to unsubscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), topic);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int disconnect() { return mosquitto_disconnect(mosq); }
|
int disconnect() { return mosquitto_disconnect(mosq); }
|
||||||
@ -94,7 +110,6 @@ int start_client(const char *boardname, const char *gwid, const char *gwip, cons
|
|||||||
char *bind_address = NULL;
|
char *bind_address = NULL;
|
||||||
int keepalive = 60;
|
int keepalive = 60;
|
||||||
bool clean_session = true;
|
bool clean_session = true;
|
||||||
bool debug = true;
|
|
||||||
int rc = 0;
|
int rc = 0;
|
||||||
char hostname[256];
|
char hostname[256];
|
||||||
int len;
|
int len;
|
||||||
@ -105,17 +120,15 @@ int start_client(const char *boardname, const char *gwid, const char *gwip, cons
|
|||||||
char *will_topic = NULL;
|
char *will_topic = NULL;
|
||||||
char *psk = NULL;
|
char *psk = NULL;
|
||||||
char *psk_identity = NULL;
|
char *psk_identity = NULL;
|
||||||
bool use_srv = false;
|
|
||||||
mqttsdata_t mqttsdata;
|
mqttsdata_t mqttsdata;
|
||||||
signal(SIGCHLD, SIG_IGN); // signal函数作用:为一个信号注册一个信号处理函数。其中SIGCHLD:在一个进程终止或者停止时,将SIGCHLD信号发送给父进程。而信号处理函数是一个函数指针,不过它使用的是预定义函数SIG_IGN(忽视信号)
|
signal(SIGCHLD, SIG_IGN); // signal函数作用:为一个信号注册一个信号处理函数。其中SIGCHLD:在一个进程终止或者停止时,将SIGCHLD信号发送给父进程。而信号处理函数是一个函数指针,不过它使用的是预定义函数SIG_IGN(忽视信号)
|
||||||
signal(SIGPIPE, SIG_IGN);
|
signal(SIGPIPE, SIG_IGN);
|
||||||
if (my_connect_callback == NULL || my_message_callback == NULL || my_subscribe_callback == NULL || my_log_callback == NULL) {
|
if (my_connect_callback == NULL || my_message_callback == NULL || my_subscribe_callback == NULL || my_log_callback == NULL) {
|
||||||
zlog_error(zct,"please call the register_collback function first.");
|
zlog_error(zct, "please call the register_collback function first.");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int qos = readIntValue("config", "qos", (char *)GlobalConfig::Config_G.c_str());
|
int qos = readIntValue("config", "qos", (char *)GlobalConfig::Config_G.c_str());
|
||||||
mosquitto_lib_init(); //使用mosquitto库函数前需进行初始化
|
|
||||||
memset(&mqttsdata, 0, sizeof(mqttsdata_t));
|
memset(&mqttsdata, 0, sizeof(mqttsdata_t));
|
||||||
memcpy(mqttsdata.gwid, gwid, strlen(gwid));
|
memcpy(mqttsdata.gwid, gwid, strlen(gwid));
|
||||||
|
|
||||||
@ -132,7 +145,7 @@ int start_client(const char *boardname, const char *gwid, const char *gwip, cons
|
|||||||
#else
|
#else
|
||||||
mqttsdata.qos = qos;
|
mqttsdata.qos = qos;
|
||||||
#endif
|
#endif
|
||||||
zlog_info(zct,"mqttsdata.qos = %d", mqttsdata.qos);
|
zlog_info(zct, "mqttsdata.qos = %d", mqttsdata.qos);
|
||||||
|
|
||||||
std::string username = ReadStrByOpt(SERVERCONFIG, "Server", "UserName");
|
std::string username = ReadStrByOpt(SERVERCONFIG, "Server", "UserName");
|
||||||
std::string password = ReadStrByOpt(SERVERCONFIG, "Server", "Password");
|
std::string password = ReadStrByOpt(SERVERCONFIG, "Server", "Password");
|
||||||
@ -142,11 +155,13 @@ int start_client(const char *boardname, const char *gwid, const char *gwip, cons
|
|||||||
}
|
}
|
||||||
|
|
||||||
char userName[100];
|
char userName[100];
|
||||||
|
memset(userName, 0, 100);
|
||||||
sprintf(userName, "%s", username.c_str());
|
sprintf(userName, "%s", username.c_str());
|
||||||
|
|
||||||
char passWord[100];
|
char passWord[100];
|
||||||
|
memset(passWord, 0, 100);
|
||||||
sprintf(passWord, "%s", password.c_str());
|
sprintf(passWord, "%s", password.c_str());
|
||||||
zlog_info(zct,"userName = %s , passWord = %s", userName, passWord);
|
zlog_info(zbt, "userName = %s , passWord = %s", userName, passWord);
|
||||||
ud.username = userName;
|
ud.username = userName;
|
||||||
ud.password = passWord;
|
ud.password = passWord;
|
||||||
|
|
||||||
@ -158,6 +173,7 @@ int start_client(const char *boardname, const char *gwid, const char *gwip, cons
|
|||||||
ud.topic_count++;
|
ud.topic_count++;
|
||||||
ud.topics = (char **)realloc(ud.topics, ud.topic_count * sizeof(char *));
|
ud.topics = (char **)realloc(ud.topics, ud.topic_count * sizeof(char *));
|
||||||
if (ud.topics == NULL) {
|
if (ud.topics == NULL) {
|
||||||
|
zlog_error(zbt, "fail to realloc for ud.topics");
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -168,83 +184,34 @@ int start_client(const char *boardname, const char *gwid, const char *gwip, cons
|
|||||||
|
|
||||||
memset(res_topic, 0, 100);
|
memset(res_topic, 0, 100);
|
||||||
sprintf(res_topic, "gw/will");
|
sprintf(res_topic, "gw/will");
|
||||||
zlog_info(zct,"res_topic:%s", res_topic);
|
zlog_info(zct, "res_topic:%s", res_topic);
|
||||||
will_topic = res_topic;
|
will_topic = res_topic;
|
||||||
ud.resp_topic = res_topic;
|
ud.resp_topic = res_topic;
|
||||||
sprintf(will_payload, "{\"cmd\":\"15\",\"gwID\":\"%s\"}", mqttsdata.gwid);
|
sprintf(will_payload, "{\"cmd\":\"15\",\"gwID\":\"%s\"}", mqttsdata.gwid);
|
||||||
will_payloadlen = strlen(will_payload);
|
will_payloadlen = strlen(will_payload);
|
||||||
zlog_info(zct,"will_payload:%s,will_payloadlen:%ld", will_payload, will_payloadlen);
|
zlog_info(zct, "will_payload:%s,will_payloadlen:%ld", will_payload, will_payloadlen);
|
||||||
if (clean_session == false && (id_prefix || !id)) {
|
|
||||||
if (!ud.quiet) zlog_error(zct, "Error: You must provide a client id if you are using the -c option.");
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ud.topic_count == 0) {
|
|
||||||
zlog_error(zct, "Error: You must specify a topic to subscribe to.");
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
if (!will_topic) {
|
|
||||||
zlog_error(zct, "Error: Will payload given, but no will topic given.");
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
if (will_retain && !will_topic) {
|
|
||||||
zlog_error(zct, "Error: Will retain given, but no will topic given.");
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
if (ud.password && !ud.username) {
|
|
||||||
if (!ud.quiet) zlog_error(zct, "Warning: Not using password since username not set.");
|
|
||||||
}
|
|
||||||
if (psk && !psk_identity) {
|
|
||||||
if (!ud.quiet) zlog_error(zct, "Error: --psk-identity required if --psk used.");
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
if (id_prefix) {
|
|
||||||
id = (char *)malloc(strlen(id_prefix) + 10);
|
|
||||||
if (!id) {
|
|
||||||
if (!ud.quiet) zlog_error(zct, "Error: Out of memory.");
|
|
||||||
mosquitto_lib_cleanup();
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
snprintf(id, strlen(id_prefix) + 10, "%s%d", id_prefix, getpid());
|
|
||||||
} else if (!id) {
|
|
||||||
hostname[0] = '\0';
|
hostname[0] = '\0';
|
||||||
gethostname(hostname, 256);
|
gethostname(hostname, 256);
|
||||||
hostname[255] = '\0';
|
hostname[255] = '\0';
|
||||||
len = strlen("mosqsub/-") + 6 + strlen(hostname);
|
len = strlen("mosqsub/-") + 6 + strlen(hostname);
|
||||||
id = (char *)malloc(len);
|
mosquitto_lib_init();
|
||||||
if (!id) {
|
mosq = mosquitto_new(NULL, true, &ud);
|
||||||
if (!ud.quiet) zlog_error(zct, "Error: Out of memory.\n");
|
if (!mosq) {
|
||||||
|
zlog_error(zct, "fail to mosquitto_new, errno:%d", errno);
|
||||||
|
free(ud.topics);
|
||||||
mosquitto_lib_cleanup();
|
mosquitto_lib_cleanup();
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
snprintf(id, len, "mosqsub/%s-%s", GlobalConfig::ZigbeeInfo_G.PanID.c_str(), hostname);
|
|
||||||
if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) {
|
|
||||||
/* Enforce maximum client id length of 23 characters */
|
|
||||||
id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0';
|
|
||||||
}
|
|
||||||
}
|
|
||||||
zlog_info(zct, "id:%s", id);
|
|
||||||
mosq = mosquitto_new(id, clean_session, &ud);
|
|
||||||
if (!mosq) {
|
|
||||||
switch (errno) {
|
|
||||||
case ENOMEM:
|
|
||||||
if (!ud.quiet) zlog_error(zct, "Error: Out of memory.");
|
|
||||||
break;
|
|
||||||
case EINVAL:
|
|
||||||
if (!ud.quiet) zlog_error(zct, "Error: Invalid id and/or clean_session .");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
mosquitto_lib_cleanup();
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
if (debug) {
|
|
||||||
mosquitto_log_callback_set(mosq, my_log_callback);
|
mosquitto_log_callback_set(mosq, my_log_callback);
|
||||||
}
|
|
||||||
|
|
||||||
if (!psk) {
|
if (!psk) {
|
||||||
psk = (char *)malloc(32 * sizeof(char));
|
psk = (char *)malloc(32 * sizeof(char));
|
||||||
if (!psk) {
|
if (!psk) {
|
||||||
|
free(ud.topics);
|
||||||
|
mosquitto_destroy(mosq);
|
||||||
|
mosquitto_lib_cleanup();
|
||||||
zlog_error(zct, "Error: No free space 1");
|
zlog_error(zct, "Error: No free space 1");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
@ -252,68 +219,55 @@ int start_client(const char *boardname, const char *gwid, const char *gwip, cons
|
|||||||
snprintf(psk, 32, "%d", getpid());
|
snprintf(psk, 32, "%d", getpid());
|
||||||
psk_identity = (char *)malloc(32 * sizeof(char));
|
psk_identity = (char *)malloc(32 * sizeof(char));
|
||||||
if (!psk_identity) {
|
if (!psk_identity) {
|
||||||
|
free(ud.topics);
|
||||||
|
mosquitto_destroy(mosq);
|
||||||
|
mosquitto_lib_cleanup();
|
||||||
zlog_error(zct, "Error: No free space 2");
|
zlog_error(zct, "Error: No free space 2");
|
||||||
free(psk);
|
free(psk);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
strncpy(psk_identity, psk, 32);
|
strncpy(psk_identity, psk, 32);
|
||||||
}
|
}
|
||||||
if (will_topic && mosquitto_will_set(mosq, will_topic, will_payloadlen, will_payload, will_qos, will_retain)) {
|
if (mosquitto_will_set(mosq, will_topic, will_payloadlen, will_payload, will_qos, will_retain)) {
|
||||||
if (!ud.quiet) zlog_error(zct, "Error: Problem setting will.");
|
zlog_error(zct, "Error: Problem setting will.");
|
||||||
mosquitto_lib_cleanup();
|
goto mosq_free;
|
||||||
free(psk);
|
|
||||||
free(psk_identity);
|
|
||||||
return 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ud.username && mosquitto_username_pw_set(mosq, ud.username, ud.password)) {
|
if (ud.username && mosquitto_username_pw_set(mosq, ud.username, ud.password)) {
|
||||||
if (!ud.quiet) zlog_error(zct, "Error: Problem setting username and password.");
|
zlog_error(zct, "Error: Problem setting username and password.");
|
||||||
mosquitto_lib_cleanup();
|
goto mosq_free;
|
||||||
free(psk);
|
|
||||||
free(psk_identity);
|
|
||||||
return 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
mosquitto_connect_callback_set(mosq, my_connect_callback);
|
mosquitto_connect_callback_set(mosq, my_connect_callback);
|
||||||
mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
|
mosquitto_disconnect_callback_set(mosq, my_disconnect_callback);
|
||||||
mosquitto_message_callback_set(mosq, my_message_callback);
|
mosquitto_message_callback_set(mosq, my_message_callback);
|
||||||
mosquitto_publish_callback_set(mosq, my_publish_callback);
|
mosquitto_publish_callback_set(mosq, my_publish_callback);
|
||||||
|
|
||||||
if (debug) {
|
|
||||||
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
|
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
|
||||||
}
|
|
||||||
|
|
||||||
if (use_srv) {
|
|
||||||
rc = mosquitto_connect_srv(mosq, host, keepalive, bind_address);
|
|
||||||
} else {
|
|
||||||
rc = mosquitto_connect_bind(mosq, host, port, keepalive, bind_address);
|
rc = mosquitto_connect_bind(mosq, host, port, keepalive, bind_address);
|
||||||
}
|
zlog_info(zct, "host = %s port = %d rc = %d", host, port, rc);
|
||||||
zlog_info(zct,"host = %s port = %d rc = %d", host, port, rc);
|
|
||||||
if (rc) {
|
if (rc) {
|
||||||
if (!ud.quiet) {
|
|
||||||
if (rc == MOSQ_ERR_ERRNO) {
|
|
||||||
|
|
||||||
} else {
|
|
||||||
zlog_error(zct, "Unable to connect (%d).", rc);
|
zlog_error(zct, "Unable to connect (%d).", rc);
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
goto mosq_free;
|
goto mosq_free;
|
||||||
}
|
}
|
||||||
|
|
||||||
rc = mosquitto_loop_forever(mosq, -1, 1);
|
rc = mosquitto_loop_forever(mosq, -1, 1);
|
||||||
if (rc) {
|
if (rc) {
|
||||||
zlog_error(zct, "Error: %s", mosquitto_strerror(rc));
|
zlog_error(zct, "Error: %s", mosquitto_strerror(rc));
|
||||||
}
|
}
|
||||||
|
|
||||||
mosq_free:
|
mosq_free:
|
||||||
mosquitto_destroy(mosq);
|
mosquitto_destroy(mosq);
|
||||||
mosquitto_lib_cleanup();
|
mosquitto_lib_cleanup();
|
||||||
|
if (ud.topics != NULL) {
|
||||||
free(ud.topics);
|
free(ud.topics);
|
||||||
free(id);
|
}
|
||||||
|
if (psk) {
|
||||||
free(psk);
|
free(psk);
|
||||||
|
}
|
||||||
|
if (psk_identity) {
|
||||||
free(psk_identity);
|
free(psk_identity);
|
||||||
ud.topics = NULL;
|
}
|
||||||
id = NULL;
|
|
||||||
mosq = NULL;
|
|
||||||
sleep(1);
|
sleep(1);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -447,8 +447,9 @@ void StartMqttClient() {
|
|||||||
std::string salt;
|
std::string salt;
|
||||||
register_collback(my_connect_callback, my_message_callback, my_subscribe_callback, my_log_callback, my_disconnect_callback, my_publish_callback);
|
register_collback(my_connect_callback, my_message_callback, my_subscribe_callback, my_log_callback, my_disconnect_callback, my_publish_callback);
|
||||||
start_client(strEqupNo.c_str(), GlobalConfig::MacAddr_G.c_str(), GlobalConfig::ServerIP.c_str(), strVersion.c_str(), "11111111", salt);
|
start_client(strEqupNo.c_str(), GlobalConfig::MacAddr_G.c_str(), GlobalConfig::ServerIP.c_str(), strVersion.c_str(), "11111111", salt);
|
||||||
|
zlog_error(zct, "fail to connect server");
|
||||||
}
|
}
|
||||||
sleep(3);
|
sleep(10);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user