From d5216b870861f90667a746f7a332e22d1544b9ac Mon Sep 17 00:00:00 2001 From: zhangsheng Date: Thu, 3 Apr 2025 10:47:57 +0800 Subject: [PATCH] modify wave resend --- platform/platform_init.cpp | 2 +- scheduler/schedule.cpp | 179 ++++++++++++++++++++++++++++++------ scheduler/schedule.hpp | 10 ++ uart/uart.cpp | 14 ++- uart/uart_feature_parse.cpp | 3 +- 5 files changed, 172 insertions(+), 36 deletions(-) diff --git a/platform/platform_init.cpp b/platform/platform_init.cpp index 85f7c4f..5cbd07b 100644 --- a/platform/platform_init.cpp +++ b/platform/platform_init.cpp @@ -14,7 +14,7 @@ int GlobalConfig::LinkStatus_G = 0; int GlobalConfig::LinkCount = 0; int GlobalConfig::net0Status = 1; -std::string GlobalConfig::Version = "5.3"; +std::string GlobalConfig::Version = "5.4"; std::string GlobalConfig::MacAddr_G = ""; std::string GlobalConfig::MacAddr_G2 = ""; std::string GlobalConfig::IpAddr_G = ""; diff --git a/scheduler/schedule.cpp b/scheduler/schedule.cpp index d1fea65..4fa9ee5 100644 --- a/scheduler/schedule.cpp +++ b/scheduler/schedule.cpp @@ -35,19 +35,23 @@ int SensorScheduler::StartSchedule(int short_addr, int &next_duration) { nth_eigen_value_slice_ = seconds_in_current_wave_slice_ / eigen_value_send_interval_; seconds_in_current_eigen_slice_ = seconds_in_current_wave_slice_ % eigen_value_send_interval_; ts_in_eigen_slice_ = false; + if (seconds_in_current_eigen_slice_ < eigen_value_slice_total_seconds_ - 3) { ts_in_eigen_slice_ = true; - } + } if (ts_in_eigen_slice_) { nth_eigen_slice_ = (seconds_in_current_eigen_slice_ + 2) / eigen_value_send_duration_; + if (nth_eigen_value_slice_ == 0 && nth_eigen_slice_ == 0) { + ClearFailureSuccessMap(); + } } else { nth_wave_slice_ = (seconds_in_current_eigen_slice_ - eigen_value_slice_total_seconds_ + 3) / seconds_per_wave_slice_; } zlog_warn(zct, "[%d:%x] ts:%ld, current utc:%s, nth eigen_value slice:%d, seconds in eigen slice:%d, eigen slice:%d", id, short_addr, current_ts_, GetUTCTime(current_ts_).c_str(), nth_eigen_value_slice_+1, seconds_in_current_eigen_slice_, ts_in_eigen_slice_); if (ts_in_eigen_slice_) { - if (id == nth_eigen_slice_ + 1) { + // if (id == nth_eigen_slice_ + 1) { // 传感器需要执行上送特征值任务, 如果有配置需要下发的话,下发配置 if (update_.count(id)) { // execute config @@ -67,20 +71,20 @@ int SensorScheduler::StartSchedule(int short_addr, int &next_duration) { return kScheduleWrongTime; } } - } else { - zlog_warn(zct, "[%d:%x] Invalid request, revive in %d eigen slice", id, short_addr, nth_eigen_slice_ + 1); - if (id < nth_eigen_slice_ + 1) { - // 不正确的请求 - long available_ts = current_wave_start_ts_ + (nth_eigen_value_slice_ + 1) * eigen_value_send_interval_ + (id - 1) * eigen_value_send_duration_; - next_duration = available_ts - current_ts_; - zlog_warn(zct, "[%d:%x] wrong time in eigen slice, next feature in next interval send utc time:[%s], duration:%d", id, short_addr, GetUTCTime(available_ts).c_str(), next_duration); - } else { - long available_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + (id - 1) * eigen_value_send_duration_; - next_duration = available_ts - current_ts_; - zlog_warn(zct, "[%d:%x] wrong time in eigen slice, next feature in current interval send utc time:[%s], duration:%d", id, short_addr, GetUTCTime(available_ts).c_str(), next_duration); - } - return kScheduleWrongTime; - } + // } else { + // zlog_warn(zct, "[%d:%x] Invalid request, revive in %d eigen slice", id, short_addr, nth_eigen_slice_ + 1); + // if (id < nth_eigen_slice_ + 1) { + // // 不正确的请求 + // long available_ts = current_wave_start_ts_ + (nth_eigen_value_slice_ + 1) * eigen_value_send_interval_ + (id - 1) * eigen_value_send_duration_; + // next_duration = available_ts - current_ts_; + // zlog_warn(zct, "[%d:%x] wrong time in eigen slice, next feature in next interval send utc time:[%s], duration:%d", id, short_addr, GetUTCTime(available_ts).c_str(), next_duration); + // } else { + // long available_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + (id - 1) * eigen_value_send_duration_; + // next_duration = available_ts - current_ts_; + // zlog_warn(zct, "[%d:%x] wrong time in eigen slice, next feature in current interval send utc time:[%s], duration:%d", id, short_addr, GetUTCTime(available_ts).c_str(), next_duration); + // } + // return kScheduleWrongTime; + // } } else { int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_ + 1; auto wave_slice_iter = sensor_id_nth_slice_.find(id); @@ -103,12 +107,12 @@ int SensorScheduler::StartSchedule(int short_addr, int &next_duration) { } } - if (update_.count(id)) { - // execute config - zlog_warn(zct, "[%d:%x] in wave slice to update config", id, short_addr); - current_request_ = kScheduleConfigSensor; - return kScheduleConfigSensor; - } + // if (update_.count(id)) { + // // execute config + // zlog_warn(zct, "[%d:%x] in wave slice to update config", id, short_addr); + // current_request_ = kScheduleConfigSensor; + // return kScheduleConfigSensor; + // } wave_feature_set_inst::instance().GetWaveCfg(short_addr, g_x, g_y, g_z); if (g_x || g_y || g_z) { zlog_warn(zct, "[%d:%x] it is wave time", id, short_addr); @@ -135,12 +139,22 @@ int SensorScheduler::StartSchedule(int short_addr, int &next_duration) { } } - if (update_.count(id)) { - // execute config - zlog_warn(zct, "[%d:%x] in idle time to update config", id, short_addr); - current_request_ = kScheduleConfigSensor; - return kScheduleConfigSensor; + if (RetransferWave(short_addr)) { + zlog_warn(zct, "[%d:%x] it is retransfer wave time", id, short_addr); + current_request_ = kScheduleWaveForm; + return kScheduleWaveForm; + } else if (MissedWave(short_addr)) { + zlog_warn(zct, "[%d:%x] it is patch wave time", id, short_addr); + current_request_ = kScheduleWaveForm; + patch_set_.erase(short_addr); + return kScheduleWaveForm; } + // if (update_.count(id)) { + // // execute config + // zlog_warn(zct, "[%d:%x] in idle time to update config", id, short_addr); + // current_request_ = kScheduleConfigSensor; + // return kScheduleConfigSensor; + // } } // wrong time to come long available_ts = current_wave_start_ts_ + (nth_eigen_value_slice_ + 1) * eigen_value_send_interval_ + (id - 1) * eigen_value_send_duration_; @@ -153,6 +167,17 @@ int SensorScheduler::StartSchedule(int short_addr, int &next_duration) { long SensorScheduler::CalcNextTimestamp(int id, uint16_t short_addr) { // current_ts_ = GetLocalTs(); + // nth_wave_start_slice_ = (current_ts_ - start_timestamp_) / wave_form_send_interval_; + // current_wave_start_ts_ = nth_wave_start_slice_ * wave_form_send_interval_ + start_timestamp_; + // seconds_in_current_wave_slice_ = current_ts_ - current_wave_start_ts_; + // nth_eigen_value_slice_ = seconds_in_current_wave_slice_ / eigen_value_send_interval_; + // seconds_in_current_eigen_slice_ = seconds_in_current_wave_slice_ % eigen_value_send_interval_; + // ts_in_eigen_slice_ = false; + + // if (seconds_in_current_eigen_slice_ < eigen_value_slice_total_seconds_ - 3) { + // ts_in_eigen_slice_ = true; + // } + if (ts_in_eigen_slice_) { int forward_wave_slice_num = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_; auto wave_slice_iter = sensor_id_nth_slice_.find(id); @@ -177,6 +202,20 @@ long SensorScheduler::CalcNextTimestamp(int id, uint16_t short_addr) { } } } + + if (!MissedWave(short_addr) && send_wave_ts == 0 && success_set_.count(short_addr) == 0) { + // add for patch wave + int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_ + 1; + auto wave_slice_iter = sensor_id_nth_slice_.find(id); + if (wave_slice_iter != sensor_id_nth_slice_.end()) { + if (nth_wave_slice > wave_slice_iter->second) { + if (success_set_.count(short_addr) == 0 && !RetransferWave(short_addr)) { + zlog_warn(zct, "[Nxt] [%d] add it to patch set", short_addr); + patch_set_.insert(short_addr); + } + } + } + } } long available_ts = 0; @@ -196,7 +235,37 @@ long SensorScheduler::CalcNextTimestamp(int id, uint16_t short_addr) { } } } - } + } else { + if (g_x || g_y || g_z) { + if (RetransferWave(short_addr)) { + for (int i = 0; i < wave_slice_num_per_eigen_interval_; ++i) { + if (slice_sensor_id_[i+forward_wave_slice_num] == 0) { + // 判断此空闲位置是否被占用 + long current_wave_slice_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + eigen_value_slice_total_seconds_ + i * seconds_per_wave_slice_; + if (free_slice_ocuppied_.count(current_wave_slice_ts) == 0) { + available_ts = current_wave_slice_ts; + free_slice_ocuppied_.insert(available_ts); + zlog_warn(zct, "[Nxt][%d:%x] %d nth free wave slice will be used to retransfer wave, utc time:[%s]", id, short_addr, i+forward_wave_slice_num, GetUTCTime(available_ts).c_str()); + break; + } + } + } + } else if (MissedWave(short_addr)) { + for (int i = 0; i < wave_slice_num_per_eigen_interval_; ++i) { + if (slice_sensor_id_[i+forward_wave_slice_num] == 0) { + // 判断此空闲位置是否被占用 + long current_wave_slice_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + eigen_value_slice_total_seconds_ + i * seconds_per_wave_slice_; + if (free_slice_ocuppied_.count(current_wave_slice_ts) == 0) { + available_ts = current_wave_slice_ts; + free_slice_ocuppied_.insert(available_ts); + zlog_warn(zct, "[Nxt][%d:%x] %d nth free wave slice will be used to patch wave, utc time:[%s]", id, short_addr, i+forward_wave_slice_num, GetUTCTime(available_ts).c_str()); + break; + } + } + } + } + } + } if (send_wave_ts > 0 && available_ts > 0) { long min_ts = std::min(send_wave_ts, available_ts); zlog_warn(zct, "[Nxt] [%d:%x] next feature send utc time1:%s", id, short_addr, GetUTCTime(min_ts).c_str()); @@ -325,7 +394,57 @@ SensorScheduler::SensorScheduler() { UpdateCfg::ReadCfg(update_); } -long SensorScheduler::GetBaseTimestamp(int short_addr) { +int SensorScheduler::WaveError(uint16_t short_addr) { + auto iter = failure_map_.find(short_addr); + if (iter == failure_map_.end()) { + failure_map_[short_addr] = 3; // 重试次数 + zlog_warn(zct, "[WaveError][%x] will try 3 times", short_addr); + return 0; + } + if (iter->second == 0) { + zlog_warn(zct, "[WaveError][%x] no try times", short_addr); + failure_map_.erase(short_addr); + return -1; + } + iter->second = iter->second - 1; + zlog_warn(zct, "[WaveError][%x] remain try %d times", short_addr, iter->second); + return 0; +} + +bool SensorScheduler::RetransferWave(uint16_t short_addr) { + auto iter = failure_map_.find(short_addr); + if (iter != failure_map_.end()) { + return true; + } + return false; +} + +bool SensorScheduler::MissedWave(uint16_t short_addr) { + if (patch_set_.count(short_addr) > 0) { + return true; + } + + return false; +} + +void SensorScheduler::WaveSuccess(uint16_t short_addr) { + success_set_.insert(short_addr); + auto iter = failure_map_.find(short_addr); + if (iter != failure_map_.end()) { + zlog_warn(zct, "[WaveSuccess][%x] try %d times success", short_addr, 4 - iter->second); + failure_map_.erase(short_addr); + return; + } + return; +} + +void SensorScheduler::ClearFailureSuccessMap() { + failure_map_.clear(); + success_set_.clear(); + patch_set_.clear(); +} + +long SensorScheduler::GetBaseTimestamp(int short_addr) { int id = 0; auto iter = short_addr_map_.find(short_addr); if (iter == short_addr_map_.end()) { @@ -334,7 +453,7 @@ long SensorScheduler::GetBaseTimestamp(int short_addr) { } else { id = iter->second; } - return start_timestamp_ + (id - 1) * eigen_value_send_duration_; + return start_timestamp_ + (id - 1) * eigen_value_send_duration_; } int SensorScheduler::GetAvailableId(int short_addr) { diff --git a/scheduler/schedule.hpp b/scheduler/schedule.hpp index a3a959d..dae6c56 100644 --- a/scheduler/schedule.hpp +++ b/scheduler/schedule.hpp @@ -41,6 +41,8 @@ public: int StartSchedule(int short_addr, int &next_duration); int GetNextDuration(int short_addr); + int WaveError(uint16_t short_addr); + void WaveSuccess(uint16_t short_addr); long GetBaseTimestamp(int id); long CalcNextTimestamp(int id, uint16_t short_addr); @@ -124,6 +126,14 @@ private: std::map sensor_id_nth_slice_; // 传感器编号与第几个波形发送窗口对应关系 std::map short_addr_map_; // base_relation.json + // 存储当前2小时内失败与成功的传感器 + std::map failure_map_; + std::unordered_set success_set_; + std::unordered_set patch_set_; // 漏传的补传 + void ClearFailureSuccessMap(); + bool RetransferWave(uint16_t short_addr); + bool MissedWave(uint16_t short_addr); + // 空闲时间戳被占用 std::unordered_set free_slice_ocuppied_; diff --git a/uart/uart.cpp b/uart/uart.cpp index 3de36f9..b8afb7c 100644 --- a/uart/uart.cpp +++ b/uart/uart.cpp @@ -1096,8 +1096,11 @@ int Uart::FindRecvPackage(int bytesRead, char *mUartRecvBuf, char *head) { strChannelID = strMeasurementID + "-Z"; } - sprintf(insertSql, "'%s','%s','%s',%d,'%d','%s' ", strChannelID.c_str(),strShortAddr.c_str(),localtimestamp,bytesRead,0,error_msg.c_str()); - sqlite_db_ctrl::instance().InsertData(" receive_wave_status ", insertSql); + if(scheduler::instance().WaveError(wave_shortAddr) < 0){ + zlog_error(zct, "WaveError error ShortAddr :%s", strShortAddr.c_str()); + sprintf(insertSql, "'%s','%s','%s',%d,'%d','%s' ", strChannelID.c_str(),strShortAddr.c_str(),localtimestamp,bytesRead,0,error_msg.c_str()); + sqlite_db_ctrl::instance().InsertData(" receive_wave_status ", insertSql); + } JsonData jd; jd.JsonCmd_32(strMeasurementID,1,1,strChannelID,error_msg); break; @@ -1138,8 +1141,11 @@ int Uart::FindRecvPackage(int bytesRead, char *mUartRecvBuf, char *head) { error_msg = "Crc error,wave Z"; strChannelID = strMeasurementID + "-Z"; } - sprintf(insertSql, "'%s','%s','%s',%d,'%d','%s' ", strChannelID.c_str(),strShortAddr.c_str(),localtimestamp,bytesRead,0,error_msg.c_str()); - sqlite_db_ctrl::instance().InsertData(" receive_wave_status ", insertSql); + if(scheduler::instance().WaveError(wave_shortAddr) < 0){ + zlog_error(zct, "WaveError error ShortAddr :%s", strShortAddr.c_str()); + sprintf(insertSql, "'%s','%s','%s',%d,'%d','%s' ", strChannelID.c_str(),strShortAddr.c_str(),localtimestamp,bytesRead,0,error_msg.c_str()); + sqlite_db_ctrl::instance().InsertData(" receive_wave_status ", insertSql); + } JsonData jd; jd.JsonCmd_32(strMeasurementID,1,1,strChannelID,error_msg); diff --git a/uart/uart_feature_parse.cpp b/uart/uart_feature_parse.cpp index 9e38e36..03f987d 100644 --- a/uart/uart_feature_parse.cpp +++ b/uart/uart_feature_parse.cpp @@ -12,6 +12,7 @@ #include "minilzo/minilzo.h" #include "jsonparse/communication_cmd.hpp" #include "utility/calculation.hpp" +#include "scheduler/schedule.hpp" extern zlog_category_t *zct; extern zlog_category_t *zbt; @@ -804,7 +805,7 @@ void Uart::DealWave() { sprintf(insertSql, "'%s-Z','%02x%02x','%s',0,'1','%s' ", strMeasurementID.c_str(),(wave_shortAddr >> 8) & 0xFF,wave_shortAddr & 0xFF,localtimestamp,""); sqlite_db_ctrl::instance().InsertData(" receive_wave_status ", insertSql); } - + scheduler::instance().WaveSuccess(wave_shortAddr); } std::string ran = "";