modify wave resend

This commit is contained in:
zhangsheng 2025-04-03 10:47:57 +08:00
parent 1e70299fbe
commit d5216b8708
5 changed files with 172 additions and 36 deletions

View File

@ -14,7 +14,7 @@ int GlobalConfig::LinkStatus_G = 0;
int GlobalConfig::LinkCount = 0; int GlobalConfig::LinkCount = 0;
int GlobalConfig::net0Status = 1; int GlobalConfig::net0Status = 1;
std::string GlobalConfig::Version = "5.3"; std::string GlobalConfig::Version = "5.4";
std::string GlobalConfig::MacAddr_G = ""; std::string GlobalConfig::MacAddr_G = "";
std::string GlobalConfig::MacAddr_G2 = ""; std::string GlobalConfig::MacAddr_G2 = "";
std::string GlobalConfig::IpAddr_G = ""; std::string GlobalConfig::IpAddr_G = "";

View File

@ -35,19 +35,23 @@ int SensorScheduler::StartSchedule(int short_addr, int &next_duration) {
nth_eigen_value_slice_ = seconds_in_current_wave_slice_ / eigen_value_send_interval_; nth_eigen_value_slice_ = seconds_in_current_wave_slice_ / eigen_value_send_interval_;
seconds_in_current_eigen_slice_ = seconds_in_current_wave_slice_ % eigen_value_send_interval_; seconds_in_current_eigen_slice_ = seconds_in_current_wave_slice_ % eigen_value_send_interval_;
ts_in_eigen_slice_ = false; ts_in_eigen_slice_ = false;
if (seconds_in_current_eigen_slice_ < eigen_value_slice_total_seconds_ - 3) { if (seconds_in_current_eigen_slice_ < eigen_value_slice_total_seconds_ - 3) {
ts_in_eigen_slice_ = true; ts_in_eigen_slice_ = true;
} }
if (ts_in_eigen_slice_) { if (ts_in_eigen_slice_) {
nth_eigen_slice_ = (seconds_in_current_eigen_slice_ + 2) / eigen_value_send_duration_; nth_eigen_slice_ = (seconds_in_current_eigen_slice_ + 2) / eigen_value_send_duration_;
if (nth_eigen_value_slice_ == 0 && nth_eigen_slice_ == 0) {
ClearFailureSuccessMap();
}
} else { } else {
nth_wave_slice_ = (seconds_in_current_eigen_slice_ - eigen_value_slice_total_seconds_ + 3) / seconds_per_wave_slice_; nth_wave_slice_ = (seconds_in_current_eigen_slice_ - eigen_value_slice_total_seconds_ + 3) / seconds_per_wave_slice_;
} }
zlog_warn(zct, "[%d:%x] ts:%ld, current utc:%s, nth eigen_value slice:%d, seconds in eigen slice:%d, eigen slice:%d", zlog_warn(zct, "[%d:%x] ts:%ld, current utc:%s, nth eigen_value slice:%d, seconds in eigen slice:%d, eigen slice:%d",
id, short_addr, current_ts_, GetUTCTime(current_ts_).c_str(), nth_eigen_value_slice_+1, seconds_in_current_eigen_slice_, ts_in_eigen_slice_); id, short_addr, current_ts_, GetUTCTime(current_ts_).c_str(), nth_eigen_value_slice_+1, seconds_in_current_eigen_slice_, ts_in_eigen_slice_);
if (ts_in_eigen_slice_) { if (ts_in_eigen_slice_) {
if (id == nth_eigen_slice_ + 1) { // if (id == nth_eigen_slice_ + 1) {
// 传感器需要执行上送特征值任务, 如果有配置需要下发的话,下发配置 // 传感器需要执行上送特征值任务, 如果有配置需要下发的话,下发配置
if (update_.count(id)) { if (update_.count(id)) {
// execute config // execute config
@ -67,20 +71,20 @@ int SensorScheduler::StartSchedule(int short_addr, int &next_duration) {
return kScheduleWrongTime; return kScheduleWrongTime;
} }
} }
} else { // } else {
zlog_warn(zct, "[%d:%x] Invalid request, revive in %d eigen slice", id, short_addr, nth_eigen_slice_ + 1); // zlog_warn(zct, "[%d:%x] Invalid request, revive in %d eigen slice", id, short_addr, nth_eigen_slice_ + 1);
if (id < nth_eigen_slice_ + 1) { // if (id < nth_eigen_slice_ + 1) {
// 不正确的请求 // // 不正确的请求
long available_ts = current_wave_start_ts_ + (nth_eigen_value_slice_ + 1) * eigen_value_send_interval_ + (id - 1) * eigen_value_send_duration_; // long available_ts = current_wave_start_ts_ + (nth_eigen_value_slice_ + 1) * eigen_value_send_interval_ + (id - 1) * eigen_value_send_duration_;
next_duration = available_ts - current_ts_; // next_duration = available_ts - current_ts_;
zlog_warn(zct, "[%d:%x] wrong time in eigen slice, next feature in next interval send utc time:[%s], duration:%d", id, short_addr, GetUTCTime(available_ts).c_str(), next_duration); // zlog_warn(zct, "[%d:%x] wrong time in eigen slice, next feature in next interval send utc time:[%s], duration:%d", id, short_addr, GetUTCTime(available_ts).c_str(), next_duration);
} else { // } else {
long available_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + (id - 1) * eigen_value_send_duration_; // long available_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + (id - 1) * eigen_value_send_duration_;
next_duration = available_ts - current_ts_; // next_duration = available_ts - current_ts_;
zlog_warn(zct, "[%d:%x] wrong time in eigen slice, next feature in current interval send utc time:[%s], duration:%d", id, short_addr, GetUTCTime(available_ts).c_str(), next_duration); // zlog_warn(zct, "[%d:%x] wrong time in eigen slice, next feature in current interval send utc time:[%s], duration:%d", id, short_addr, GetUTCTime(available_ts).c_str(), next_duration);
} // }
return kScheduleWrongTime; // return kScheduleWrongTime;
} // }
} else { } else {
int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_ + 1; int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_ + 1;
auto wave_slice_iter = sensor_id_nth_slice_.find(id); auto wave_slice_iter = sensor_id_nth_slice_.find(id);
@ -103,12 +107,12 @@ int SensorScheduler::StartSchedule(int short_addr, int &next_duration) {
} }
} }
if (update_.count(id)) { // if (update_.count(id)) {
// execute config // // execute config
zlog_warn(zct, "[%d:%x] in wave slice to update config", id, short_addr); // zlog_warn(zct, "[%d:%x] in wave slice to update config", id, short_addr);
current_request_ = kScheduleConfigSensor; // current_request_ = kScheduleConfigSensor;
return kScheduleConfigSensor; // return kScheduleConfigSensor;
} // }
wave_feature_set_inst::instance().GetWaveCfg(short_addr, g_x, g_y, g_z); wave_feature_set_inst::instance().GetWaveCfg(short_addr, g_x, g_y, g_z);
if (g_x || g_y || g_z) { if (g_x || g_y || g_z) {
zlog_warn(zct, "[%d:%x] it is wave time", id, short_addr); zlog_warn(zct, "[%d:%x] it is wave time", id, short_addr);
@ -135,12 +139,22 @@ int SensorScheduler::StartSchedule(int short_addr, int &next_duration) {
} }
} }
if (update_.count(id)) { if (RetransferWave(short_addr)) {
// execute config zlog_warn(zct, "[%d:%x] it is retransfer wave time", id, short_addr);
zlog_warn(zct, "[%d:%x] in idle time to update config", id, short_addr); current_request_ = kScheduleWaveForm;
current_request_ = kScheduleConfigSensor; return kScheduleWaveForm;
return kScheduleConfigSensor; } else if (MissedWave(short_addr)) {
zlog_warn(zct, "[%d:%x] it is patch wave time", id, short_addr);
current_request_ = kScheduleWaveForm;
patch_set_.erase(short_addr);
return kScheduleWaveForm;
} }
// if (update_.count(id)) {
// // execute config
// zlog_warn(zct, "[%d:%x] in idle time to update config", id, short_addr);
// current_request_ = kScheduleConfigSensor;
// return kScheduleConfigSensor;
// }
} }
// wrong time to come // wrong time to come
long available_ts = current_wave_start_ts_ + (nth_eigen_value_slice_ + 1) * eigen_value_send_interval_ + (id - 1) * eigen_value_send_duration_; long available_ts = current_wave_start_ts_ + (nth_eigen_value_slice_ + 1) * eigen_value_send_interval_ + (id - 1) * eigen_value_send_duration_;
@ -153,6 +167,17 @@ int SensorScheduler::StartSchedule(int short_addr, int &next_duration) {
long SensorScheduler::CalcNextTimestamp(int id, uint16_t short_addr) { long SensorScheduler::CalcNextTimestamp(int id, uint16_t short_addr) {
// current_ts_ = GetLocalTs(); // current_ts_ = GetLocalTs();
// nth_wave_start_slice_ = (current_ts_ - start_timestamp_) / wave_form_send_interval_;
// current_wave_start_ts_ = nth_wave_start_slice_ * wave_form_send_interval_ + start_timestamp_;
// seconds_in_current_wave_slice_ = current_ts_ - current_wave_start_ts_;
// nth_eigen_value_slice_ = seconds_in_current_wave_slice_ / eigen_value_send_interval_;
// seconds_in_current_eigen_slice_ = seconds_in_current_wave_slice_ % eigen_value_send_interval_;
// ts_in_eigen_slice_ = false;
// if (seconds_in_current_eigen_slice_ < eigen_value_slice_total_seconds_ - 3) {
// ts_in_eigen_slice_ = true;
// }
if (ts_in_eigen_slice_) { if (ts_in_eigen_slice_) {
int forward_wave_slice_num = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_; int forward_wave_slice_num = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_;
auto wave_slice_iter = sensor_id_nth_slice_.find(id); auto wave_slice_iter = sensor_id_nth_slice_.find(id);
@ -177,6 +202,20 @@ long SensorScheduler::CalcNextTimestamp(int id, uint16_t short_addr) {
} }
} }
} }
if (!MissedWave(short_addr) && send_wave_ts == 0 && success_set_.count(short_addr) == 0) {
// add for patch wave
int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_ + 1;
auto wave_slice_iter = sensor_id_nth_slice_.find(id);
if (wave_slice_iter != sensor_id_nth_slice_.end()) {
if (nth_wave_slice > wave_slice_iter->second) {
if (success_set_.count(short_addr) == 0 && !RetransferWave(short_addr)) {
zlog_warn(zct, "[Nxt] [%d] add it to patch set", short_addr);
patch_set_.insert(short_addr);
}
}
}
}
} }
long available_ts = 0; long available_ts = 0;
@ -196,6 +235,36 @@ long SensorScheduler::CalcNextTimestamp(int id, uint16_t short_addr) {
} }
} }
} }
} else {
if (g_x || g_y || g_z) {
if (RetransferWave(short_addr)) {
for (int i = 0; i < wave_slice_num_per_eigen_interval_; ++i) {
if (slice_sensor_id_[i+forward_wave_slice_num] == 0) {
// 判断此空闲位置是否被占用
long current_wave_slice_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + eigen_value_slice_total_seconds_ + i * seconds_per_wave_slice_;
if (free_slice_ocuppied_.count(current_wave_slice_ts) == 0) {
available_ts = current_wave_slice_ts;
free_slice_ocuppied_.insert(available_ts);
zlog_warn(zct, "[Nxt][%d:%x] %d nth free wave slice will be used to retransfer wave, utc time:[%s]", id, short_addr, i+forward_wave_slice_num, GetUTCTime(available_ts).c_str());
break;
}
}
}
} else if (MissedWave(short_addr)) {
for (int i = 0; i < wave_slice_num_per_eigen_interval_; ++i) {
if (slice_sensor_id_[i+forward_wave_slice_num] == 0) {
// 判断此空闲位置是否被占用
long current_wave_slice_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + eigen_value_slice_total_seconds_ + i * seconds_per_wave_slice_;
if (free_slice_ocuppied_.count(current_wave_slice_ts) == 0) {
available_ts = current_wave_slice_ts;
free_slice_ocuppied_.insert(available_ts);
zlog_warn(zct, "[Nxt][%d:%x] %d nth free wave slice will be used to patch wave, utc time:[%s]", id, short_addr, i+forward_wave_slice_num, GetUTCTime(available_ts).c_str());
break;
}
}
}
}
}
} }
if (send_wave_ts > 0 && available_ts > 0) { if (send_wave_ts > 0 && available_ts > 0) {
long min_ts = std::min(send_wave_ts, available_ts); long min_ts = std::min(send_wave_ts, available_ts);
@ -325,6 +394,56 @@ SensorScheduler::SensorScheduler() {
UpdateCfg::ReadCfg(update_); UpdateCfg::ReadCfg(update_);
} }
int SensorScheduler::WaveError(uint16_t short_addr) {
auto iter = failure_map_.find(short_addr);
if (iter == failure_map_.end()) {
failure_map_[short_addr] = 3; // 重试次数
zlog_warn(zct, "[WaveError][%x] will try 3 times", short_addr);
return 0;
}
if (iter->second == 0) {
zlog_warn(zct, "[WaveError][%x] no try times", short_addr);
failure_map_.erase(short_addr);
return -1;
}
iter->second = iter->second - 1;
zlog_warn(zct, "[WaveError][%x] remain try %d times", short_addr, iter->second);
return 0;
}
bool SensorScheduler::RetransferWave(uint16_t short_addr) {
auto iter = failure_map_.find(short_addr);
if (iter != failure_map_.end()) {
return true;
}
return false;
}
bool SensorScheduler::MissedWave(uint16_t short_addr) {
if (patch_set_.count(short_addr) > 0) {
return true;
}
return false;
}
void SensorScheduler::WaveSuccess(uint16_t short_addr) {
success_set_.insert(short_addr);
auto iter = failure_map_.find(short_addr);
if (iter != failure_map_.end()) {
zlog_warn(zct, "[WaveSuccess][%x] try %d times success", short_addr, 4 - iter->second);
failure_map_.erase(short_addr);
return;
}
return;
}
void SensorScheduler::ClearFailureSuccessMap() {
failure_map_.clear();
success_set_.clear();
patch_set_.clear();
}
long SensorScheduler::GetBaseTimestamp(int short_addr) { long SensorScheduler::GetBaseTimestamp(int short_addr) {
int id = 0; int id = 0;
auto iter = short_addr_map_.find(short_addr); auto iter = short_addr_map_.find(short_addr);

View File

@ -41,6 +41,8 @@ public:
int StartSchedule(int short_addr, int &next_duration); int StartSchedule(int short_addr, int &next_duration);
int GetNextDuration(int short_addr); int GetNextDuration(int short_addr);
int WaveError(uint16_t short_addr);
void WaveSuccess(uint16_t short_addr);
long GetBaseTimestamp(int id); long GetBaseTimestamp(int id);
long CalcNextTimestamp(int id, uint16_t short_addr); long CalcNextTimestamp(int id, uint16_t short_addr);
@ -124,6 +126,14 @@ private:
std::map<int, int> sensor_id_nth_slice_; // 传感器编号与第几个波形发送窗口对应关系 std::map<int, int> sensor_id_nth_slice_; // 传感器编号与第几个波形发送窗口对应关系
std::map<uint16_t, int> short_addr_map_; // base_relation.json std::map<uint16_t, int> short_addr_map_; // base_relation.json
// 存储当前2小时内失败与成功的传感器
std::map<uint16_t, int> failure_map_;
std::unordered_set<uint16_t> success_set_;
std::unordered_set<uint16_t> patch_set_; // 漏传的补传
void ClearFailureSuccessMap();
bool RetransferWave(uint16_t short_addr);
bool MissedWave(uint16_t short_addr);
// 空闲时间戳被占用 // 空闲时间戳被占用
std::unordered_set<long> free_slice_ocuppied_; std::unordered_set<long> free_slice_ocuppied_;

View File

@ -1096,8 +1096,11 @@ int Uart::FindRecvPackage(int bytesRead, char *mUartRecvBuf, char *head) {
strChannelID = strMeasurementID + "-Z"; strChannelID = strMeasurementID + "-Z";
} }
if(scheduler::instance().WaveError(wave_shortAddr) < 0){
zlog_error(zct, "WaveError error ShortAddr :%s", strShortAddr.c_str());
sprintf(insertSql, "'%s','%s','%s',%d,'%d','%s' ", strChannelID.c_str(),strShortAddr.c_str(),localtimestamp,bytesRead,0,error_msg.c_str()); sprintf(insertSql, "'%s','%s','%s',%d,'%d','%s' ", strChannelID.c_str(),strShortAddr.c_str(),localtimestamp,bytesRead,0,error_msg.c_str());
sqlite_db_ctrl::instance().InsertData(" receive_wave_status ", insertSql); sqlite_db_ctrl::instance().InsertData(" receive_wave_status ", insertSql);
}
JsonData jd; JsonData jd;
jd.JsonCmd_32(strMeasurementID,1,1,strChannelID,error_msg); jd.JsonCmd_32(strMeasurementID,1,1,strChannelID,error_msg);
break; break;
@ -1138,8 +1141,11 @@ int Uart::FindRecvPackage(int bytesRead, char *mUartRecvBuf, char *head) {
error_msg = "Crc error,wave Z"; error_msg = "Crc error,wave Z";
strChannelID = strMeasurementID + "-Z"; strChannelID = strMeasurementID + "-Z";
} }
if(scheduler::instance().WaveError(wave_shortAddr) < 0){
zlog_error(zct, "WaveError error ShortAddr :%s", strShortAddr.c_str());
sprintf(insertSql, "'%s','%s','%s',%d,'%d','%s' ", strChannelID.c_str(),strShortAddr.c_str(),localtimestamp,bytesRead,0,error_msg.c_str()); sprintf(insertSql, "'%s','%s','%s',%d,'%d','%s' ", strChannelID.c_str(),strShortAddr.c_str(),localtimestamp,bytesRead,0,error_msg.c_str());
sqlite_db_ctrl::instance().InsertData(" receive_wave_status ", insertSql); sqlite_db_ctrl::instance().InsertData(" receive_wave_status ", insertSql);
}
JsonData jd; JsonData jd;
jd.JsonCmd_32(strMeasurementID,1,1,strChannelID,error_msg); jd.JsonCmd_32(strMeasurementID,1,1,strChannelID,error_msg);

View File

@ -12,6 +12,7 @@
#include "minilzo/minilzo.h" #include "minilzo/minilzo.h"
#include "jsonparse/communication_cmd.hpp" #include "jsonparse/communication_cmd.hpp"
#include "utility/calculation.hpp" #include "utility/calculation.hpp"
#include "scheduler/schedule.hpp"
extern zlog_category_t *zct; extern zlog_category_t *zct;
extern zlog_category_t *zbt; extern zlog_category_t *zbt;
@ -804,7 +805,7 @@ void Uart::DealWave() {
sprintf(insertSql, "'%s-Z','%02x%02x','%s',0,'1','%s' ", strMeasurementID.c_str(),(wave_shortAddr >> 8) & 0xFF,wave_shortAddr & 0xFF,localtimestamp,""); sprintf(insertSql, "'%s-Z','%02x%02x','%s',0,'1','%s' ", strMeasurementID.c_str(),(wave_shortAddr >> 8) & 0xFF,wave_shortAddr & 0xFF,localtimestamp,"");
sqlite_db_ctrl::instance().InsertData(" receive_wave_status ", insertSql); sqlite_db_ctrl::instance().InsertData(" receive_wave_status ", insertSql);
} }
scheduler::instance().WaveSuccess(wave_shortAddr);
} }
std::string ran = ""; std::string ran = "";