#include #include #include #include #include #include #include #include #include #include #include #include #include #include "SH_MqttClient.h" #include "../common/SH_global.h" #include "../secure/SH_Secure.hpp" static char res_topic[100]; static char req_topic[100]; struct mosquitto *mosq = NULL; static int mid_sent = 0; struct userdata ud; connect_callback my_connect_callback = NULL; disconnect_callback my_disconnect_callback = NULL; message_callback my_message_callback = NULL; subscribe_callback my_subscribe_callback = NULL; log_callback my_log_callback = NULL; publish_callback my_publish_callback = NULL; void register_collback(connect_callback connect_c, message_callback message_c, subscribe_callback subscribe_c, log_callback log_c, disconnect_callback disconnect_c, publish_callback publish_c) { my_connect_callback = connect_c; my_message_callback = message_c; my_subscribe_callback = subscribe_c; my_log_callback = log_c; my_disconnect_callback = disconnect_c; my_publish_callback = publish_c; } int data_publish(const char *str, const char *topic) { int iRet = -1; if(mosq != NULL){ //print_blue("data_publish: %s %s\n", str, topic); std::string strTopic = std::string(topic); if (strTopic.find("cmd") != std::string::npos || \ strTopic.find("configureInfo") != std::string::npos ) { std::string pData = (std::string)str; char *base64_aes = new char[pData.length() * 2]; memset(base64_aes, 0, pData.length() * 2); Secure::instance()->Base64Encode((unsigned char*)pData.c_str(), base64_aes, pData.length()); iRet = mosquitto_publish(mosq, &mid_sent, topic, strlen(base64_aes), base64_aes, ud.topic_qos, false); //char *base64_data = new char[pData.length() * 2]; //memset(base64_data, 0, pData.length() *2); //Secure::instance()->Base64Decode(base64_aes, (unsigned char*)base64_data); //std::string pData = std::string(base64_data); if(base64_aes != NULL) delete[] base64_aes; // free(out); } else { iRet = mosquitto_publish(mosq, &mid_sent, topic, strlen(str), str, ud.topic_qos, false); } } if(iRet != 0) { reconnect(); } return iRet; } int data_publish_wave(WAVE_CONTAIN *str, const char *topic) { int iRet = -1; if(mosq != NULL){ print_blue("data_publish:Unit : %s type:%d flag : %d channelid : %s total : %d count : %d number :%d floats:%f floate:%f\n",str->SensorEngineeringUnit, str->type, str->flag, str->channelId, str->total, str->count,str->number,str->waveData[0],str->waveData[(str->number-2)]); iRet = mosquitto_publish(mosq, &mid_sent, topic, sizeof(WAVE_CONTAIN), str, ud.topic_qos, false); } return iRet; } int subscribe(const char* topic, int qos) { int iret = mosquitto_subscribe(mosq, NULL, topic, qos); return iret; } int unsubscribe(const char* topic) { int iret = mosquitto_unsubscribe(mosq, NULL, topic); return iret; } int disconnect() { return mosquitto_disconnect(mosq); } int reconnect() { return mosquitto_reconnect(mosq); } int start_client(const char *boardname, const char *gwid, const char* gwip, const char *gwver, const char *gwcode, std::string &salt) { char *id = NULL; char *id_prefix = NULL; char *host; int port = 51613; char *bind_address = NULL; int keepalive = 60; bool clean_session = true; bool debug = true; int rc = 0; char hostname[256]; char err[1024]; int len; char will_payload[100]; long will_payloadlen = 0; int will_qos = 0; bool will_retain = false; char *will_topic = NULL; bool insecure = false; char *tls_version = NULL; char *psk = NULL; char *psk_identity = NULL; char *ciphers = NULL; bool use_srv = false; mqttsdata_t mqttsdata; signal(SIGCHLD, SIG_IGN);//signal函数作用:为一个信号注册一个信号处理函数。其中SIGCHLD:在一个进程终止或者停止时,将SIGCHLD信号发送给父进程。而信号处理函数是一个函数指针,不过它使用的是预定义函数SIG_IGN(忽视信号) signal(SIGPIPE, SIG_IGN); int reconnect = 0; if (my_connect_callback == NULL || my_message_callback == NULL || my_subscribe_callback == NULL || my_log_callback == NULL) { print_info("please call the register_collback function first.\n"); return -1; } int qos = readIntValue( "config", "qos",(char*)GlobalConfig::Config_G.c_str()); mosquitto_lib_init();//使用mosquitto库函数前需进行初始化 memset(&mqttsdata, 0, sizeof(mqttsdata_t)); memcpy(mqttsdata.gwid, gwid, strlen(gwid)); // mqttsdata.username = "admin"; // mqttsdata.password = "password"; salt = std::string(mqttsdata.salt); memset(&ud, 0, sizeof(struct userdata)); ud.eol = true; char localip[100]; sprintf(localip, "%s", gwip); host = localip; mqttsdata.port = GlobalConfig::ServerPort; port = GlobalConfig::ServerPort; #ifdef WIFI_MODULE mqttsdata.qos = 0; #else mqttsdata.qos = qos; #endif print_info("mqttsdata.qos = %d\n",mqttsdata.qos); // std::string username = "root"; // std::string password = "xom!*283#@XHG"; std::string username = ReadStrByOpt(SERVERCONFIG, "Server", "UserName"); std::string password = ReadStrByOpt(SERVERCONFIG, "Server", "Password"); if(username == "" || password == ""){ username = "chaos"; password = "HSD272*#xkd"; } char userName[100]; sprintf(userName, "%s", username.c_str()); char passWord[100]; sprintf(passWord, "%s", password.c_str()); print_info("userName = %s , passWord = %s\n",userName,passWord); ud.username = userName; ud.password = passWord; ud.gwid = mqttsdata.gwid; ud.dataUrl = mqttsdata.dataUrl; ud.fsUrl = mqttsdata.fsUrl; will_qos = mqttsdata.qos; ud.topic_qos = will_qos; ud.topic_count++; ud.topics = (char**)realloc(ud.topics, ud.topic_count*sizeof(char *)); if (ud.topics == NULL) { return 1; } memset(req_topic, 0, 100); sprintf(req_topic, "dev/cidwcc"); ud.topics[ud.topic_count - 1] = req_topic; ud.verbose = 1; //fprintf(stderr, "topic = %s!!!!!!!!!!!\n", ud.topics[ud.topic_count - 1]); memset(res_topic, 0, 100); sprintf(res_topic, "gw/will"); print_info("res_topic:%s\n", res_topic); will_topic = res_topic; ud.resp_topic = res_topic; sprintf(will_payload, "{\"cmd\":\"15\",\"gwID\":\"%s\"}", mqttsdata.gwid); will_payloadlen = strlen(will_payload); print_info("will_payload:%s,will_payloadlen:%ld\n", will_payload, will_payloadlen); if (clean_session == false && (id_prefix || !id)){ if (!ud.quiet) fprintf(stderr, "Error: You must provide a client id if you are using the -c option.\n"); return 1; } if (ud.topic_count == 0){ fprintf(stderr, "Error: You must specify a topic to subscribe to.\n"); return 1; } if (will_payload && !will_topic){ fprintf(stderr, "Error: Will payload given, but no will topic given.\n"); return 1; } if (will_retain && !will_topic){ fprintf(stderr, "Error: Will retain given, but no will topic given.\n"); return 1; } if (ud.password && !ud.username){ if (!ud.quiet) fprintf(stderr, "Warning: Not using password since username not set.\n"); } if (psk && !psk_identity){ if (!ud.quiet) fprintf(stderr, "Error: --psk-identity required if --psk used.\n"); return 1; } if (id_prefix){ id = (char*)malloc(strlen(id_prefix) + 10); if (!id){ if (!ud.quiet) fprintf(stderr, "Error: Out of memory.\n"); mosquitto_lib_cleanup(); return 1; } snprintf(id, strlen(id_prefix) + 10, "%s%d", id_prefix, getpid()); } else if (!id){ hostname[0] = '\0'; gethostname(hostname, 256); hostname[255] = '\0'; len = strlen("mosqsub/-") + 6 + strlen(hostname); id = (char*)malloc(len); if (!id){ if (!ud.quiet) fprintf(stderr, "Error: Out of memory.\n"); mosquitto_lib_cleanup(); return 1; } snprintf(id, len, "mosqsub/%d-%s", getpid(), hostname); if (strlen(id) > MOSQ_MQTT_ID_MAX_LENGTH){ /* Enforce maximum client id length of 23 characters */ id[MOSQ_MQTT_ID_MAX_LENGTH] = '\0'; } } print_info("id:%s\n", id); mosq = mosquitto_new(id, clean_session, &ud); if (!mosq){ switch (errno){ case ENOMEM: if (!ud.quiet) fprintf(stderr, "Error: Out of memory.\n"); break; case EINVAL: if (!ud.quiet) fprintf(stderr, "Error: Invalid id and/or clean_session .\n"); break; } mosquitto_lib_cleanup(); return 1; } if (debug){ mosquitto_log_callback_set(mosq, my_log_callback); } if (!psk) { psk = (char *)malloc(32 * sizeof(char)); if (!psk) { fprintf(stderr, "Error: No free space %d\n", __LINE__); return -1; } memset(psk, 0, 32); snprintf(psk, 32, "%ld", getpid()); psk_identity = (char *)malloc(32 * sizeof(char)); if (!psk_identity) { fprintf(stderr, "Error: No free space %d\n", __LINE__); free(psk); return -1; } strncpy(psk_identity, psk, 32); } if (will_topic && mosquitto_will_set(mosq, will_topic, will_payloadlen, will_payload, will_qos, will_retain)) { if (!ud.quiet) fprintf(stderr, "Error: Problem setting will.\n"); mosquitto_lib_cleanup(); free(psk); free(psk_identity); return 1; } if (ud.username && mosquitto_username_pw_set(mosq, ud.username, ud.password)) { if (!ud.quiet) fprintf(stderr, "Error: Problem setting username and password.\n"); mosquitto_lib_cleanup(); free(psk); free(psk_identity); return 1; } // if (insecure && mosquitto_tls_insecure_set(mosq, false)){ // if (!ud.quiet) fprintf(stderr, "Error: Problem setting TLS insecure option.\n"); // mosquitto_lib_cleanup(); // free(psk); // free(psk_identity); // return 1; // } // if (psk && mosquitto_tls_psk_set(mosq, psk, psk_identity, NULL)){ // if (!ud.quiet) fprintf(stderr, "Error: Problem setting TLS-PSK options.\n"); // mosquitto_lib_cleanup(); // return 1; // } // if (tls_version && mosquitto_tls_opts_set(mosq, 0, tls_version, ciphers)){ // if (!ud.quiet) fprintf(stderr, "Error: Problem setting TLS options.\n"); // mosquitto_lib_cleanup(); // return 1; // } mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_disconnect_callback_set(mosq, my_disconnect_callback); mosquitto_message_callback_set(mosq, my_message_callback); mosquitto_publish_callback_set(mosq, my_publish_callback); if (debug){ mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); } if (use_srv){ rc = mosquitto_connect_srv(mosq, host, keepalive, bind_address); } else{ rc = mosquitto_connect_bind(mosq, host, port, keepalive, bind_address); } print_info("host = %s port = %d rc = %d\n",host,port,rc); if (rc){ if (!ud.quiet){ if (rc == MOSQ_ERR_ERRNO){ //strerror_r(errno, err, 1024); //fprintf(stderr, "Error_: %s\n", err); //print_info("%s",stderr); } else{ fprintf(stderr, "Unable to connect (%d).\n", rc); print_info("%s",stderr); } } goto mosq_free; } rc = mosquitto_loop_forever(mosq, -1, 1); if (rc) { //fprintf(stderr, "Error: %s\n", mosquitto_strerror(rc)); //print_info("%s",stderr); } mosq_free: mosquitto_destroy(mosq); mosquitto_lib_cleanup(); free(ud.topics); free(id); free(psk); free(psk_identity); ud.topics = NULL; id = NULL; mosq = NULL; sleep(1); }