From 82d74b6f08757236c6b70f40c2adf7cfbf4985d6 Mon Sep 17 00:00:00 2001 From: pandx Date: Thu, 24 Oct 2024 20:36:27 +0800 Subject: [PATCH] refine mqtt client codes. --- mqttclient/mqtt_client.cpp | 196 ++++++++++++++----------------------- threadfunc/thread_func.cpp | 3 +- 2 files changed, 77 insertions(+), 122 deletions(-) diff --git a/mqttclient/mqtt_client.cpp b/mqttclient/mqtt_client.cpp index 2dd1018..36a49d4 100644 --- a/mqttclient/mqtt_client.cpp +++ b/mqttclient/mqtt_client.cpp @@ -19,6 +19,7 @@ #include "utility/secure.hpp" extern zlog_category_t *zct; +extern zlog_category_t *zbt; static char res_topic[100]; static char req_topic[100]; @@ -42,7 +43,7 @@ void register_collback(connect_callback connect_c, message_callback message_c, s } int data_publish(const char *str, const char *topic) { - int iRet = -1; + int ret = -1; if (mosq != NULL) { std::string strTopic = std::string(topic); if (strTopic.find("cmd") != std::string::npos || strTopic.find("configureInfo") != std::string::npos) { @@ -50,37 +51,52 @@ int data_publish(const char *str, const char *topic) { char *base64_aes = new char[pData.length() * 2]; memset(base64_aes, 0, pData.length() * 2); secure::instance().Base64Encode((unsigned char *)pData.c_str(), base64_aes, pData.length()); - iRet = mosquitto_publish(mosq, &mid_sent, topic, strlen(base64_aes), base64_aes, ud.topic_qos, false); + ret = mosquitto_publish(mosq, &mid_sent, topic, strlen(base64_aes), base64_aes, ud.topic_qos, false); if (base64_aes != NULL) delete[] base64_aes; + if (ret != MOSQ_ERR_SUCCESS) { + zlog_error(zct, "fail to send mqtt msg, ret: [%s], topic: %s, str: %s", mosquitto_strerror(ret), topic, str); + } } else { - iRet = mosquitto_publish(mosq, &mid_sent, topic, strlen(str), str, ud.topic_qos, false); + ret = mosquitto_publish(mosq, &mid_sent, topic, strlen(str), str, ud.topic_qos, false); + if (ret != MOSQ_ERR_SUCCESS) { + zlog_error(zct, "fail to send mqtt msg, ret: [%s], topic: %s, str: %s", mosquitto_strerror(ret), topic, str); + } } } - if (iRet != 0) { + if (ret != 0) { disconnect(); } - return iRet; + return ret; } int data_publish_wave(WAVE_CONTAIN *str, const char *topic) { - int iRet = -1; + int ret = -1; if (mosq != NULL) { zlog_info(zct, "data_publish:Unit : %s type:%d flag : %d channelid : %s total : %d count : %d number :%d floats:%f floate:%f\n", str->SensorEngineeringUnit, str->type, str->flag, str->channelId, str->total, str->count, str->number, str->waveData[0], - str->waveData[(str->number - 2)]); - iRet = mosquitto_publish(mosq, &mid_sent, topic, sizeof(WAVE_CONTAIN), str, ud.topic_qos, false); + str->waveData[(str->number - 2)]); + ret = mosquitto_publish(mosq, &mid_sent, topic, sizeof(WAVE_CONTAIN), str, ud.topic_qos, false); + if (ret != MOSQ_ERR_SUCCESS) { + zlog_error(zct, "fail to send mqtt msg, ret: [%s], topic: %s, str: %s", mosquitto_strerror(ret), topic, str); + } } - return iRet; + return ret; } int subscribe(const char *topic, int qos) { - int iret = mosquitto_subscribe(mosq, NULL, topic, qos); - return iret; + int ret = mosquitto_subscribe(mosq, NULL, topic, qos); + if (ret != MOSQ_ERR_SUCCESS) { + zlog_error(zct, "fail to subscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), topic); + } + return ret; } int unsubscribe(const char *topic) { - int iret = mosquitto_unsubscribe(mosq, NULL, topic); - return iret; + int ret = mosquitto_unsubscribe(mosq, NULL, topic); + if (ret != MOSQ_ERR_SUCCESS) { + zlog_error(zct, "fail to unsubscribe, ret: [%s], topic: %s", mosquitto_strerror(ret), topic); + } + return ret; } int disconnect() { return mosquitto_disconnect(mosq); } @@ -94,7 +110,6 @@ int start_client(const char *boardname, const char *gwid, const char *gwip, cons char *bind_address = NULL; int keepalive = 60; bool clean_session = true; - bool debug = true; int rc = 0; char hostname[256]; int len; @@ -105,17 +120,15 @@ int start_client(const char *boardname, const char *gwid, const char *gwip, cons char *will_topic = NULL; char *psk = NULL; char *psk_identity = NULL; - bool use_srv = false; mqttsdata_t mqttsdata; signal(SIGCHLD, SIG_IGN); // signal函数作用:为一个信号注册一个信号处理函数。其中SIGCHLD:在一个进程终止或者停止时,将SIGCHLD信号发送给父进程。而信号处理函数是一个函数指针,不过它使用的是预定义函数SIG_IGN(忽视信号) signal(SIGPIPE, SIG_IGN); if (my_connect_callback == NULL || my_message_callback == NULL || my_subscribe_callback == NULL || my_log_callback == NULL) { - zlog_error(zct,"please call the register_collback function first."); + zlog_error(zct, "please call the register_collback function first."); return -1; } - int qos = readIntValue("config", "qos", (char *)GlobalConfig::Config_G.c_str()); - mosquitto_lib_init(); //使用mosquitto库函数前需进行初始化 + int qos = readIntValue("config", "qos", (char *)GlobalConfig::Config_G.c_str()); memset(&mqttsdata, 0, sizeof(mqttsdata_t)); memcpy(mqttsdata.gwid, gwid, strlen(gwid)); @@ -132,7 +145,7 @@ int start_client(const char *boardname, const char *gwid, const char *gwip, cons #else mqttsdata.qos = qos; #endif - zlog_info(zct,"mqttsdata.qos = %d", mqttsdata.qos); + zlog_info(zct, "mqttsdata.qos = %d", mqttsdata.qos); std::string username = ReadStrByOpt(SERVERCONFIG, "Server", "UserName"); std::string password = ReadStrByOpt(SERVERCONFIG, "Server", "Password"); @@ -142,11 +155,13 @@ int start_client(const char *boardname, const char *gwid, const char *gwip, cons } char userName[100]; + memset(userName, 0, 100); sprintf(userName, "%s", username.c_str()); char passWord[100]; + memset(passWord, 0, 100); sprintf(passWord, "%s", password.c_str()); - zlog_info(zct,"userName = %s , passWord = %s", userName, passWord); + zlog_info(zbt, "userName = %s , passWord = %s", userName, passWord); ud.username = userName; ud.password = passWord; @@ -158,6 +173,7 @@ int start_client(const char *boardname, const char *gwid, const char *gwip, cons ud.topic_count++; ud.topics = (char **)realloc(ud.topics, ud.topic_count * sizeof(char *)); if (ud.topics == NULL) { + zlog_error(zbt, "fail to realloc for ud.topics"); return 1; } @@ -168,83 +184,34 @@ int start_client(const char *boardname, const char *gwid, const char *gwip, cons memset(res_topic, 0, 100); sprintf(res_topic, "gw/will"); - zlog_info(zct,"res_topic:%s", res_topic); + zlog_info(zct, "res_topic:%s", res_topic); will_topic = res_topic; ud.resp_topic = res_topic; sprintf(will_payload, "{\"cmd\":\"15\",\"gwID\":\"%s\"}", mqttsdata.gwid); will_payloadlen = strlen(will_payload); - zlog_info(zct,"will_payload:%s,will_payloadlen:%ld", will_payload, will_payloadlen); - if (clean_session == false && (id_prefix || !id)) { - if (!ud.quiet) zlog_error(zct, "Error: You must provide a client id if you are using the -c option."); - return 1; - } - - if (ud.topic_count == 0) { - zlog_error(zct, "Error: You must specify a topic to subscribe to."); - return 1; - } - if (!will_topic) { - zlog_error(zct, "Error: Will payload given, but no will topic given."); - return 1; - } - if (will_retain && !will_topic) { - zlog_error(zct, "Error: Will retain given, but no will topic given."); - return 1; - } - if (ud.password && !ud.username) { - if (!ud.quiet) zlog_error(zct, "Warning: Not using password since username not set."); - } - if (psk && !psk_identity) { - if (!ud.quiet) zlog_error(zct, "Error: --psk-identity required if --psk used."); - return 1; - } - if (id_prefix) { - id = (char *)malloc(strlen(id_prefix) + 10); - if (!id) { - if (!ud.quiet) zlog_error(zct, "Error: Out of memory."); - mosquitto_lib_cleanup(); - return 1; - } - snprintf(id, strlen(id_prefix) + 10, "%s%d", id_prefix, getpid()); - } else if (!id) { - hostname[0] = '\0'; - gethostname(hostname, 256); - hostname[255] = '\0'; - len = strlen("mosqsub/-") + 6 + strlen(hostname); - id = (char *)malloc(len); - if (!id) { - if (!ud.quiet) zlog_error(zct, "Error: Out of memory.\n"); - mosquitto_lib_cleanup(); - return 1; - } - - snprintf(id, len, "mosqsub/%s-%s", GlobalConfig::ZigbeeInfo_G.PanID.c_str(), hostname); - if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH) { - /* Enforce maximum client id length of 23 characters */ - id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0'; - } - } - zlog_info(zct, "id:%s", id); - mosq = mosquitto_new(id, clean_session, &ud); + zlog_info(zct, "will_payload:%s,will_payloadlen:%ld", will_payload, will_payloadlen); + + hostname[0] = '\0'; + gethostname(hostname, 256); + hostname[255] = '\0'; + len = strlen("mosqsub/-") + 6 + strlen(hostname); + mosquitto_lib_init(); + mosq = mosquitto_new(NULL, true, &ud); if (!mosq) { - switch (errno) { - case ENOMEM: - if (!ud.quiet) zlog_error(zct, "Error: Out of memory."); - break; - case EINVAL: - if (!ud.quiet) zlog_error(zct, "Error: Invalid id and/or clean_session ."); - break; - } + zlog_error(zct, "fail to mosquitto_new, errno:%d", errno); + free(ud.topics); mosquitto_lib_cleanup(); return 1; } - if (debug) { - mosquitto_log_callback_set(mosq, my_log_callback); - } + + mosquitto_log_callback_set(mosq, my_log_callback); if (!psk) { psk = (char *)malloc(32 * sizeof(char)); if (!psk) { + free(ud.topics); + mosquitto_destroy(mosq); + mosquitto_lib_cleanup(); zlog_error(zct, "Error: No free space 1"); return -1; } @@ -252,68 +219,55 @@ int start_client(const char *boardname, const char *gwid, const char *gwip, cons snprintf(psk, 32, "%d", getpid()); psk_identity = (char *)malloc(32 * sizeof(char)); if (!psk_identity) { + free(ud.topics); + mosquitto_destroy(mosq); + mosquitto_lib_cleanup(); zlog_error(zct, "Error: No free space 2"); free(psk); return -1; } strncpy(psk_identity, psk, 32); } - if (will_topic && mosquitto_will_set(mosq, will_topic, will_payloadlen, will_payload, will_qos, will_retain)) { - if (!ud.quiet) zlog_error(zct, "Error: Problem setting will."); - mosquitto_lib_cleanup(); - free(psk); - free(psk_identity); - return 1; + if (mosquitto_will_set(mosq, will_topic, will_payloadlen, will_payload, will_qos, will_retain)) { + zlog_error(zct, "Error: Problem setting will."); + goto mosq_free; } if (ud.username && mosquitto_username_pw_set(mosq, ud.username, ud.password)) { - if (!ud.quiet) zlog_error(zct, "Error: Problem setting username and password."); - mosquitto_lib_cleanup(); - free(psk); - free(psk_identity); - return 1; + zlog_error(zct, "Error: Problem setting username and password."); + goto mosq_free; } mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); mosquitto_message_callback_set(mosq, my_message_callback); mosquitto_publish_callback_set(mosq, my_publish_callback); + mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); - if (debug) { - mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); - } - - if (use_srv) { - rc = mosquitto_connect_srv(mosq, host, keepalive, bind_address); - } else { - rc = mosquitto_connect_bind(mosq, host, port, keepalive, bind_address); - } - zlog_info(zct,"host = %s port = %d rc = %d", host, port, rc); + rc = mosquitto_connect_bind(mosq, host, port, keepalive, bind_address); + zlog_info(zct, "host = %s port = %d rc = %d", host, port, rc); if (rc) { - if (!ud.quiet) { - if (rc == MOSQ_ERR_ERRNO) { - - } else { - zlog_error(zct, "Unable to connect (%d).", rc); - - } - } + zlog_error(zct, "Unable to connect (%d).", rc); goto mosq_free; } + rc = mosquitto_loop_forever(mosq, -1, 1); if (rc) { zlog_error(zct, "Error: %s", mosquitto_strerror(rc)); } + mosq_free: mosquitto_destroy(mosq); mosquitto_lib_cleanup(); - free(ud.topics); - free(id); - free(psk); - free(psk_identity); - ud.topics = NULL; - id = NULL; - mosq = NULL; + if (ud.topics != NULL) { + free(ud.topics); + } + if (psk) { + free(psk); + } + if (psk_identity) { + free(psk_identity); + } sleep(1); return 0; } diff --git a/threadfunc/thread_func.cpp b/threadfunc/thread_func.cpp index d788e52..cec3e30 100644 --- a/threadfunc/thread_func.cpp +++ b/threadfunc/thread_func.cpp @@ -447,8 +447,9 @@ void StartMqttClient() { std::string salt; register_collback(my_connect_callback, my_message_callback, my_subscribe_callback, my_log_callback, my_disconnect_callback, my_publish_callback); start_client(strEqupNo.c_str(), GlobalConfig::MacAddr_G.c_str(), GlobalConfig::ServerIP.c_str(), strVersion.c_str(), "11111111", salt); + zlog_error(zct, "fail to connect server"); } - sleep(3); + sleep(10); } }