#include "schedule.hpp" #include #include #include #include #include #include #include #include #include #include #include "short_addr_cfg.hpp" #include "update_cfg.hpp" #include "wave_feature_set.hpp" extern zlog_category_t *zct; extern zlog_category_t *zbt; uint8_t g_x, g_y, g_z; int SensorScheduler::StartSchedule(uint16_t short_addr, int &next_duration, bool &z, int &next_task_id) { int id = 0; auto iter = short_addr_map_.find(short_addr); if (iter == short_addr_map_.end()) { id = GetAvailableId(short_addr); } else { id = iter->second; } current_ts_ = GetLocalTs(); CleanIdleOccupiedSet(current_ts_); nth_wave_start_slice_ = (current_ts_ - start_timestamp_) / wave_form_send_interval_; current_wave_start_ts_ = nth_wave_start_slice_ * wave_form_send_interval_ + start_timestamp_; seconds_in_current_wave_slice_ = current_ts_ - current_wave_start_ts_; nth_eigen_value_slice_ = seconds_in_current_wave_slice_ / eigen_value_send_interval_; seconds_in_current_eigen_slice_ = seconds_in_current_wave_slice_ % eigen_value_send_interval_; ts_in_eigen_slice_ = false; if (seconds_in_current_eigen_slice_ < 60 - 3) { ts_in_eigen_slice_ = true; } if (ts_in_eigen_slice_) { // nth_eigen_slice_ = (seconds_in_current_eigen_slice_ + 2) / eigen_value_send_duration_; if (nth_eigen_value_slice_ == 0) { ClearFailureSuccessMap(); } } else { if (seconds_per_wave_slice_ == 0) { seconds_per_wave_slice_ = 60; } nth_wave_slice_ = (seconds_in_current_eigen_slice_ - 60 + 3) / seconds_per_wave_slice_; } zlog_debug(zbt, "[%d:%x] ts:%ld, current utc:%s, nth eigen_value slice:%d, seconds in eigen slice:%d, eigen slice:%d", id, short_addr, current_ts_, GetUTCTime(current_ts_).c_str(), nth_eigen_value_slice_+1, seconds_in_current_eigen_slice_, ts_in_eigen_slice_); if (ts_in_eigen_slice_) { // 传感器需要执行上送特征值任务, 如果有配置需要下发的话,下发配置 if (update_.count(id)) { // execute config zlog_debug(zbt, "[%d:%x] update config in eigen slice", id, short_addr); current_request_ = kScheduleConfigSensor; return kScheduleConfigSensor; } else { wave_feature_set_inst::instance().GetFeatureCfg(short_addr, g_x, g_y, g_z); // if (g_x || g_y || g_z) { // // 执行上送特征值任务 // zlog_debug(zbt, "[%d:%x] send eigen value in eigen slice", id, short_addr); // current_request_ = kScheduleEigenValue; // return kScheduleEigenValue; // } else { // next_duration = GetNextDuration(short_addr, z, next_task_id); // zlog_warn(zbt, "[%d:%x] no need for eigen", id, short_addr); return kScheduleResultNone; // } } } return kScheduleResultNone; // else { // if (current_schedule_status_ == kScheduleStatusDebug) { // if (debug_list_.count(short_addr) == 0) { // next_duration = GetDebugUpgradeNextDuration(short_addr); // next_task_id = kScheduleEigenValue; // zlog_debug(zbt, "[%d:%x] not in debug list", id, short_addr); // return kScheduleWrongTime; // } else { // // z wave // int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_ + 1; // if (debug_slice_sensor_id_[nth_wave_slice] == short_addr) { // current_request_ = kScheduleWaveForm; // z = true; // return kScheduleWaveForm; // } else { // // 当前特征值间隔内是否存在此传感器的波形区间 // for (int i = nth_wave_slice + 1; i <= (nth_eigen_value_slice_+1) * wave_slice_num_per_eigen_interval_; ++i) { // if (debug_slice_sensor_id_[i] == short_addr) { // long nxt_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + 60 + (i-nth_wave_slice)*60; // next_duration = nxt_ts - current_ts_; // if (next_duration < 10) { // zlog_debug(zbt, "[%d:%x] [Nxt] debug exception duration:%d, adjust to 25", id, short_addr,next_duration); // next_duration = 25; // } else if (next_duration > eigen_value_send_interval_) { // zlog_debug(zbt, "[%d:%x] [Nxt] debug exception duration:%d, adjust to 120", id, short_addr,next_duration); // next_duration = 120; // } // z = true; // next_task_id = kScheduleWaveForm; // return kScheduleWrongTime; // } // } // next_duration = GetDebugUpgradeNextDuration(short_addr); // next_task_id = kScheduleEigenValue; // zlog_debug(zbt, "[%d:%x] debug wrong time", id, short_addr); // return kScheduleWrongTime; // } // } // return 0; // } else if (current_schedule_status_ == kScheduleStatusUpgrade) { // if (upgrade_list_.count(short_addr) == 0) { // next_duration = GetDebugUpgradeNextDuration(short_addr); // next_task_id = kScheduleEigenValue; // zlog_debug(zbt, "[%d:%x] not in upgrade list", id, short_addr); // return kScheduleWrongTime; // } else { // int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_ + 1; // if (upgrade_slice_sensor_id_[nth_wave_slice] == short_addr) { // current_request_ = kScheduleUpgrade; // // upgrade_list_.erase(short_addr); // return kScheduleUpgrade; // } else { // next_duration = GetDebugUpgradeNextDuration(short_addr); // next_task_id = kScheduleEigenValue; // zlog_debug(zbt, "[%d:%x] in wrong time", id, short_addr); // return kScheduleWrongTime; // } // } // } // int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_ + 1; // 从1开始编号 // auto wave_slice_iter = sensor_id_nth_slice_.find(id); // if (wave_slice_iter == sensor_id_nth_slice_.end()) { // zlog_error(zbt, "[%d:%x] invaild id, not find wave slice id, need to check further", id, short_addr); // return kScheduleUnknownSensor; // } // wave_feature_set_inst::instance().GetWaveCfg(short_addr, g_x, g_y, g_z); // if (nth_wave_slice == wave_slice_iter->second.first) { // Z轴 // if (g_z) { // zlog_debug(zbt, "[%d:%x] it is wave z time", id, short_addr); // current_request_ = kScheduleWaveForm; // z = true; // return kScheduleWaveForm; // } else { // next_duration = GetNextDuration(short_addr, z, next_task_id); // zlog_debug(zbt, "[%d:%x] no need for wave", id, short_addr); // return kScheduleWrongTime; // } // } else if (nth_wave_slice == wave_slice_iter->second.second) { // XY轴 // if (g_x || g_y) { // zlog_debug(zbt, "[%d:%x] it is wave xy time", id, short_addr); // current_request_ = kScheduleWaveForm; // z = false; // return kScheduleWaveForm; // } else { // next_duration = GetNextDuration(short_addr, z, next_task_id); // zlog_debug(zbt, "[%d:%x] no need for wave", id, short_addr); // return kScheduleWrongTime; // } // } // else { // if (slice_sensor_id_[nth_wave_slice] == 0) { // idle time // zlog_debug(zbt, "[%d:%x] in idle time", id, short_addr); // if (trigger_wave_record_.find(short_addr) != trigger_wave_record_.end()) { // auto iter = trigger_wave_record_.find(short_addr); // if (iter->second.first != 0) { // current_request_ = kScheduleWaveForm; // z = true; // zlog_debug(zbt, "[%d:%x] trigger z wave time", id, short_addr); // iter->second.first = 0; // WriteTriggerWaveRecord(); // return kScheduleWaveForm; // } else if (iter->second.second != 0) { // current_request_ = kScheduleWaveForm; // z = false; // iter->second.second = 0; // WriteTriggerWaveRecord(); // zlog_debug(zbt, "[%d:%x] trigger xy wave time", id, short_addr); // return kScheduleWaveForm; // } // } // if (ZRetransferWave(short_addr)) { // zlog_debug(zbt, "[%d:%x] z retransfer wave time", id, short_addr); // current_request_ = kScheduleWaveForm; // z = true; // return kScheduleWaveForm; // } else if (ZMissedWave(short_addr)) { // zlog_debug(zbt, "[%d:%x] z patch wave time", id, short_addr); // current_request_ = kScheduleWaveForm; // z = true; // z_patch_set_.erase(short_addr); // return kScheduleWaveForm; // } else if (XYRetransferWave(short_addr)) { // zlog_debug(zbt, "[%d:%x] xy retransfer wave time", id, short_addr); // current_request_ = kScheduleWaveForm; // z = false; // return kScheduleWaveForm; // } else if (XYMissedWave(short_addr)) { // zlog_debug(zbt, "[%d:%x] xy patch wave time", id, short_addr); // current_request_ = kScheduleWaveForm; // xy_patch_set_.erase(short_addr); // z = false; // return kScheduleWaveForm; // } // } // // wrong time to come // int eigen_send_ts = (id - 1) * 2; // if (eigen_send_ts > 57) { // eigen_send_ts = eigen_send_ts % 57; // } // long available_ts = current_wave_start_ts_ + (nth_eigen_value_slice_ + 1) * eigen_value_send_interval_ + eigen_send_ts; // next_duration = available_ts - current_ts_; // if (next_duration < 10 || next_duration > eigen_value_send_interval_) { // zlog_debug(zbt, "[%d:%x] invalid next duration:%d, adjust to 120", id, short_addr, next_duration); // next_duration = 120; // } // next_task_id = kScheduleEigenValue; // zlog_debug(zbt, "[%d:%x] wrong time in wave slice, next feature send utc time:[%s], duration:%d", id, short_addr, GetUTCTime(available_ts).c_str(), next_duration); // return kScheduleWrongTime; // } // } } long SensorScheduler::CalcNextTimestamp(int id, uint16_t short_addr, bool &z, int& next_task_id) { z = false; if (ts_in_eigen_slice_) { if (current_schedule_status_ == kScheduleStatusDebug) { if (debug_list_.count(short_addr) == 0) { next_task_id = kScheduleEigenValue; return GetDebugUpgradeNextTS(short_addr); } else { // 计算发送波形是否在后面的波形时间窗口中 int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + 1; for (int i = nth_wave_slice; i <= nth_wave_slice + wave_slice_num_per_eigen_interval_; ++i) { if (debug_slice_sensor_id_[i] == short_addr) { long nxt_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + 60 + (i-nth_wave_slice)*60; next_task_id = kScheduleWaveForm; z = true; zlog_debug(zbt, "[%d:%x] debug wave", id, short_addr); return nxt_ts; } } zlog_debug(zbt, "[%d:%x] check wave_slice_num_per_eigen_interval_:%d, nth_wave_slice:%d", id, short_addr, wave_slice_num_per_eigen_interval_, nth_wave_slice); next_task_id = kScheduleEigenValue; return GetDebugUpgradeNextTS(short_addr); } } else if (current_schedule_status_ == kScheduleStatusUpgrade) { if (upgrade_list_.count(short_addr) == 0) { next_task_id = kScheduleEigenValue; return GetDebugUpgradeNextTS(short_addr); } else { // 计算升级是否在后面的波形时间窗口中 // int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + 1; // for (int i = nth_wave_slice; i <= nth_wave_slice + wave_slice_num_per_eigen_interval_; ++i) { // if (upgrade_slice_sensor_id_[i] == short_addr) { // long nxt_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + 60 + (i-nth_wave_slice)*60; // next_task_id = kScheduleUpgrade; // return nxt_ts; // } // } next_task_id = kScheduleEigenValue; return GetDebugUpgradeNextTS(short_addr); } } int forward_wave_slice_num = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_; auto wave_slice_iter = sensor_id_nth_slice_.find(id); if (wave_slice_iter == sensor_id_nth_slice_.end()) { zlog_warn(zbt, "[Nxt] invaild id:%d, not find wave slice id", id); long available_ts = current_wave_start_ts_ + eigen_value_send_interval_ + nth_eigen_slice_ * eigen_value_send_duration_; zlog_warn(zbt, "[Nxt] [%d] next feature send utc time:[%s]", id, GetUTCTime(available_ts).c_str()); return available_ts; } int first_wave_slice = wave_slice_iter->second.first; int second_wave_slice = wave_slice_iter->second.second; long send_wave_ts = 0; long available_ts = 0; next_task_id = kScheduleWaveForm; wave_feature_set_inst::instance().GetWaveCfg(short_addr, g_x, g_y, g_z); if (g_x || g_y || g_z) { if (g_z && first_wave_slice > forward_wave_slice_num && first_wave_slice <= forward_wave_slice_num + wave_slice_num_per_eigen_interval_) { // 发送波的时间窗也在本次特征值发送间隔中 for (int i = forward_wave_slice_num+1; i <= forward_wave_slice_num + wave_slice_num_per_eigen_interval_; ++i) { if (first_wave_slice == i) { send_wave_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + 60 + (i - forward_wave_slice_num - 1) * 60; next_task_id = kScheduleWaveForm; zlog_debug(zbt, "[Nxt] [%d:%x] send z wave time:[%s], task id:%d", id, short_addr, GetUTCTime(send_wave_ts).c_str(), next_task_id); z = true; break; } } } else if ((g_x || g_y) && second_wave_slice > forward_wave_slice_num && second_wave_slice <= forward_wave_slice_num + wave_slice_num_per_eigen_interval_) { // 发送波的时间窗也在本次特征值发送间隔中 for (int i = forward_wave_slice_num+1; i <= forward_wave_slice_num + wave_slice_num_per_eigen_interval_; ++i) { if (second_wave_slice == i) { send_wave_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + 60 + (i - forward_wave_slice_num - 1) * 60; next_task_id = kScheduleWaveForm; zlog_debug(zbt, "[Nxt] [%d:%x] send xy wave time:[%s], task id:%d", id, short_addr, GetUTCTime(send_wave_ts).c_str(), next_task_id); z = false; break; } } } if (send_wave_ts == 0) { if (trigger_wave_record_.find(short_addr) != trigger_wave_record_.end()) { auto iter = trigger_wave_record_.find(short_addr); if (iter->second.first != 0 || iter->second.second != 0) { for (int i = forward_wave_slice_num+1; i <= forward_wave_slice_num + wave_slice_num_per_eigen_interval_; ++i) { if (slice_sensor_id_[i] == 0 || short_addr_map_.find(slice_sensor_id_[i]) == short_addr_map_.end()) { send_wave_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + 60 + (i - forward_wave_slice_num - 1) * 60; if (free_slice_ocuppied_.count(send_wave_ts) == 0) { available_ts = send_wave_ts; free_slice_ocuppied_.insert(available_ts); next_task_id = kScheduleWaveForm; if (iter->second.first != 0) { z = true; iter->second.first = 0; } else { z = false; iter->second.second = 0; } WriteTriggerWaveRecord(); zlog_debug(zbt, "[Nxt][%d:%x] %d nth free wave slice will be used to trigger z : %d wave, utc time:[%s]", id, short_addr, i+forward_wave_slice_num, z, GetUTCTime(available_ts).c_str()); break; } else { send_wave_ts = 0; } break; } } } } } if (send_wave_ts == 0) { if (g_z && !ZMissedWave(short_addr) && z_success_set_.count(short_addr) == 0) { // add for patch wave int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_ + 1; auto wave_slice_iter = sensor_id_nth_slice_.find(id); if (wave_slice_iter != sensor_id_nth_slice_.end()) { if (nth_wave_slice > wave_slice_iter->second.first) { if (z_success_set_.count(short_addr) == 0 && !ZRetransferWave(short_addr)) { zlog_debug(zbt, "[Nxt] [%d:%x] add z to patch set", id, short_addr); z_patch_set_.insert(short_addr); z = true; } } } } else if ((g_x || g_y) && !XYMissedWave(short_addr) && xy_success_set_.count(short_addr) == 0) { // add for patch wave int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_ + 1; auto wave_slice_iter = sensor_id_nth_slice_.find(id); if (wave_slice_iter != sensor_id_nth_slice_.end()) { if (nth_wave_slice > wave_slice_iter->second.second) { if (xy_success_set_.count(short_addr) == 0 && !XYRetransferWave(short_addr)) { zlog_debug(zbt, "[Nxt] [%d:%x] add xy to patch set", id, short_addr); xy_patch_set_.insert(short_addr); z = false; } } } } if (ZRetransferWave(short_addr) || XYRetransferWave(short_addr) || ZMissedWave(short_addr) || XYMissedWave(short_addr)) { for (int i = 1; i <= wave_slice_num_per_eigen_interval_; ++i) { if (slice_sensor_id_[i+forward_wave_slice_num] == 0 || short_addr_map_.find(slice_sensor_id_[i+forward_wave_slice_num]) == short_addr_map_.end()) { // 判断此空闲位置是否被占用 long current_wave_slice_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + 60 + (i-1) * seconds_per_wave_slice_; if (free_slice_ocuppied_.count(current_wave_slice_ts) == 0) { available_ts = current_wave_slice_ts; free_slice_ocuppied_.insert(available_ts); if (ZRetransferWave(short_addr) || ZMissedWave(short_addr)) { if (ZRetransferWave(short_addr)) { zlog_debug(zbt, "[Nxt] [%d:%x] retransfer z wave", id, short_addr); } else { zlog_debug(zbt, "[Nxt] [%d:%x] missed z wave", id, short_addr); } z = true; } else { if (XYRetransferWave(short_addr)) { zlog_debug(zbt, "[Nxt] [%d:%x] retransfer xy wave ", id, short_addr); } else { zlog_debug(zbt, "[Nxt] [%d:%x] missed xy wave ", id, short_addr); } z = false; } zlog_debug(zbt, "[Nxt][%d:%x] %d nth free wave slice will be used to retransfer or patch z:%d wave, utc time:[%s]", id, short_addr, i+forward_wave_slice_num, z, GetUTCTime(available_ts).c_str()); break; } } } } } } if (send_wave_ts > 0 && available_ts > 0) { long min_ts = std::min(send_wave_ts, available_ts); zlog_debug(zbt, "[Nxt] [%d:%x] next feature send utc time1:%s", id, short_addr, GetUTCTime(min_ts).c_str()); return min_ts; } if (send_wave_ts + available_ts > 0) { long max_ts = std::max(send_wave_ts, available_ts); zlog_debug(zbt, "[Nxt] [%d:%x] next feature send utc time2:%s", id, short_addr, GetUTCTime(max_ts).c_str()); return max_ts; } // for (int i = forward_wave_slice_num+1; i <= forward_wave_slice_num + wave_slice_num_per_eigen_interval_; ++i) { // if (slice_sensor_id_[i] == 0) { // idle time // send_wave_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + 60 + (i - forward_wave_slice_num - 1) * 60; // if (free_slice_ocuppied_.count(send_wave_ts) != 0) { // continue; // } // zlog_debug(zbt, "[%d:%x] in idle time for trigger wave", id, short_addr); // if (trigger_wave_record_.find(short_addr) != trigger_wave_record_.end()) { // auto iter = trigger_wave_record_.find(short_addr); // if (iter->second.first != 0) { // current_request_ = kScheduleWaveForm; // z = true; // zlog_debug(zbt, "[%d:%x] trigger z wave time", id, short_addr); // iter->second.first = 0; // next_task_id = kScheduleWaveForm; // free_slice_ocuppied_.insert(send_wave_ts); // WriteTriggerWaveRecord(); // return send_wave_ts; // } else if (iter->second.second != 0) { // current_request_ = kScheduleWaveForm; // z = false; // iter->second.second = 0; // WriteTriggerWaveRecord(); // next_task_id = kScheduleWaveForm; // zlog_debug(zbt, "[%d:%x] trigger xy wave time", id, short_addr); // free_slice_ocuppied_.insert(send_wave_ts); // return send_wave_ts; // } // } else { // break; // } // } // } } else { if (current_schedule_status_ == kScheduleStatusDebug) { if (debug_list_.count(short_addr) == 0) { next_task_id = kScheduleEigenValue; return GetDebugUpgradeNextTS(short_addr); } else { // 计算发送波形是否在后面的波形时间窗口中 // int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_ + 2; // for (int i = nth_wave_slice; i <= nth_wave_slice + wave_slice_num_per_eigen_interval_; ++i) { // if (debug_slice_sensor_id_[i] == short_addr) { // long nxt_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + 60 + (i-nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_-1)*60; // next_task_id = kScheduleWaveForm; // z = true; // return nxt_ts; // } // } next_task_id = kScheduleEigenValue; return GetDebugUpgradeNextTS(short_addr); } } else if (current_schedule_status_ == kScheduleStatusUpgrade) { if (upgrade_list_.count(short_addr) == 0) { next_task_id = kScheduleEigenValue; return GetDebugUpgradeNextDuration(short_addr); } else { // 计算升级是否在后面的波形时间窗口中 int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + 2; for (int i = nth_wave_slice; i <= nth_wave_slice + wave_slice_num_per_eigen_interval_; ++i) { if (upgrade_slice_sensor_id_[i] == short_addr) { long nxt_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + 60 + (i-nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_-1)*60; next_task_id = kScheduleUpgrade; return nxt_ts; } } next_task_id = kScheduleEigenValue; } } } // 如果是在当前波形时间窗中,不管是空闲时间窗,还是发送波形的时间窗,下一个时间窗是特征值 int eigen_send_ts = (id - 1) * 4; if (eigen_send_ts > 57) { eigen_send_ts = eigen_send_ts % 57; } next_task_id = kScheduleEigenValue; long available_ts = current_wave_start_ts_ + (nth_eigen_value_slice_ + 1)* eigen_value_send_interval_ + eigen_send_ts; zlog_debug(zbt, "[Nxt] [%d:%x] next feature send utc time3:[%s], task id:%d", id, short_addr, GetUTCTime(available_ts).c_str(), next_task_id); return available_ts; } int SensorScheduler::GetNextDuration(uint16_t short_addr, bool &z, int &next_task_id) { int id = 0; auto iter = short_addr_map_.find(short_addr); if (iter == short_addr_map_.end()) { zlog_error(zbt, "cannot find id for short_addr %x", short_addr); next_task_id = kScheduleEigenValue; return 0; } else { id = iter->second; } current_ts_ = GetLocalTs(); CleanIdleOccupiedSet(current_ts_); nth_wave_start_slice_ = (current_ts_ - start_timestamp_) / wave_form_send_interval_; current_wave_start_ts_ = nth_wave_start_slice_ * wave_form_send_interval_ + start_timestamp_; seconds_in_current_wave_slice_ = current_ts_ - current_wave_start_ts_; nth_eigen_value_slice_ = seconds_in_current_wave_slice_ / eigen_value_send_interval_; seconds_in_current_eigen_slice_ = seconds_in_current_wave_slice_ % eigen_value_send_interval_; ts_in_eigen_slice_ = false; if (seconds_in_current_eigen_slice_ < 57) { ts_in_eigen_slice_ = true; } // if (ts_in_eigen_slice_) { // // nth_eigen_slice_ = (seconds_in_current_eigen_slice_ + 2) / eigen_value_send_duration_; // if (nth_eigen_value_slice_ == 0) { // ClearFailureSuccessMap(); // } // } zlog_debug(zbt, "[%d:%x] ts:%ld, current utc:%s, nth eigen_value slice:%d, seconds in eigen slice:%d, eigen slice:%d", id, short_addr, current_ts_, GetUTCTime(current_ts_).c_str(), nth_eigen_value_slice_+1, seconds_in_current_eigen_slice_, ts_in_eigen_slice_); // long current_ts = GetLocalTs(); long next_ts = CalcNextTimestamp(id, short_addr, z, next_task_id); int duration = next_ts - current_ts_; if (duration < 10) { zlog_debug(zbt, "[%d:%x] [Nxt] exception duration:%d, adjust to 25", id, short_addr,duration); duration = 25; return duration; } else if (duration > eigen_value_send_interval_) { zlog_debug(zbt, "[%d:%x] [Nxt] exception duration:%d, adjust to 120", id, short_addr,duration); duration = 120; return duration; } zlog_debug(zbt, "[Nxt] [%d:%x] next duration is %d,next_task_id = %d,z = %d,ScheduleStatus = %d", id, short_addr, duration,next_task_id,z,GetScheduleStatus()); return duration; } int SensorScheduler::TriggerWave(uint16_t short_addr, uint8_t z, uint8_t xy) { zlog_debug(zbt, "[%x] trigger wave z:%d, xy:%d", short_addr, z, xy); std::pair p = {z, xy}; trigger_wave_record_[short_addr] = p; return 0; } // 仅返回下一次特征值的时间,用于Debug/Upgrade模式 int SensorScheduler::GetDebugUpgradeNextDuration(uint16_t short_addr) { int id = 0; auto iter = short_addr_map_.find(short_addr); if (iter == short_addr_map_.end()) { zlog_error(zbt, "cannot find id for short_addr %x", short_addr); return 0; } else { id = iter->second; } long current_ts = GetLocalTs(); int eigen_send_ts = (id - 1) * 2; if (eigen_send_ts > 57) { eigen_send_ts = eigen_send_ts % 57; } long available_ts = current_wave_start_ts_ + (nth_eigen_value_slice_ + 1)* eigen_value_send_interval_ + eigen_send_ts; zlog_debug(zbt, "[Nxt] [%d:%x] next feature send utc time4:[%s]", id, short_addr, GetUTCTime(available_ts).c_str()); int duration = available_ts - current_ts; if (duration < 10) { zlog_debug(zbt, "[%d:%x] [Nxt] exception duration:%d, adjust to 25", id, short_addr,duration); duration = 25; return duration; } else if (duration > eigen_value_send_interval_) { zlog_debug(zbt, "[%d:%x] [Nxt] exception duration:%d, adjust to 120", id, short_addr,duration); duration = 120; return duration; } zlog_debug(zbt, "[Nxt] [%d:%x] next duration is %d", id, short_addr, duration); return duration; } long SensorScheduler::GetDebugUpgradeNextTS(uint16_t short_addr) { int id = 0; auto iter = short_addr_map_.find(short_addr); if (iter == short_addr_map_.end()) { zlog_warn(zbt, "cannot find id for short_addr %x", short_addr); return 0; } else { id = iter->second; } // long current_ts = GetLocalTs(); int eigen_send_ts = (id - 1) * 2; if (eigen_send_ts > 57) { eigen_send_ts = eigen_send_ts % 57; } long available_ts = current_wave_start_ts_ + (nth_eigen_value_slice_ + 1)* eigen_value_send_interval_ + eigen_send_ts; zlog_debug(zbt, "[Nxt] [%d:%x] next feature send utc time4:[%s]", id, short_addr, GetUTCTime(available_ts).c_str()); return available_ts; } SensorScheduler::SensorScheduler() { ReadTriggerWaveRecord(); wave_resend_num_ = 3; current_schedule_status_ = get_schedule_status(); slice_sensor_id_ = NULL; zlog_debug(zbt, "current schedule status:%s", get_status_desc(current_schedule_status_).c_str()); std::ifstream schedule_file(SCHEDULE_CONFIG); bool configed = false; if (schedule_file.good()) { zlog_info(zbt, "exist configuration file"); Json::Reader reader; Json::Value root; if (!reader.parse(schedule_file, root, false)) { zlog_error(zbt, "invalid format, fail to parse %s", SCHEDULE_CONFIG); schedule_file.close(); goto init_config; } schedule_file.close(); if (!root.isObject()) { zlog_error(zbt, "invalid format, not an object: %s", SCHEDULE_CONFIG); goto init_config; } configed = true; start_timestamp_ = std::stol(root["schedule_start_timestamp"].asString()); start_ts_str_ = root["schedule_start_time"].asString(); long current_ts = GetLocalTs(); if (current_ts < start_timestamp_) { zlog_warn(zbt, "current ts: %ld less than start ts: %ld, go to adjust it", current_ts, start_timestamp_); start_timestamp_ = current_ts; start_ts_str_ = GetUTCTime(current_ts); root["schedule_start_timestamp"] = std::to_string(start_timestamp_); root["schedule_start_time"] = start_ts_str_; Json::StyledStreamWriter streamWriter; std::ofstream out_file(SCHEDULE_CONFIG); streamWriter.write(out_file, root); out_file.close(); } eigen_value_send_interval_ = root["eigen_value_send_interval"].asInt(); // eigen_value_send_duration_ = root["eigen_value_send_duration"].asInt(); wave_form_send_interval_ = root["wave_form_send_interval"].asInt(); wave_form_send_duration_ = 60; //root["wave_form_send_duration"].asInt(); max_sensor_num_ = root["max_sensor_num"].asInt(); available_slice_ = root["available_slice"].asInt(); free_slice_ = root["free_slice"].asInt(); slice_sensor_id_ = new int[available_slice_+1]; slice_is_z_wave_ = new uint8_t[available_slice_+1]; for (int i = 0; i <= available_slice_; ++i) { slice_sensor_id_[i] = 0; } int xy_wave_start_id = 0; if (max_sensor_num_ <= 4) { xy_wave_start_id = 5; // 防止z,xy在时间上排得太近 } else { xy_wave_start_id = max_sensor_num_ + 1; } if (max_sensor_num_ == 2 && free_slice_ == 0) { xy_wave_start_id = 2; } if (max_sensor_num_ == 1 && free_slice_ == 0) { if (available_slice_ == 2) { xy_wave_start_id = 1; } else if (available_slice_ == 4) { xy_wave_start_id = 2; } } for (int i = 1; i <= max_sensor_num_; ++i) { sensor_id_nth_slice_[i] = { i, i + xy_wave_start_id }; slice_sensor_id_[i] = i; slice_sensor_id_[i+xy_wave_start_id] = i; slice_is_z_wave_[i] = true; slice_is_z_wave_[i + xy_wave_start_id] = false; } eigen_value_slice_total_seconds_ = 60; // eigen_value_send_duration_ * max_sensor_num_; int rest_duration = eigen_value_send_interval_ - eigen_value_slice_total_seconds_; wave_slice_num_per_eigen_interval_ = rest_duration / wave_form_send_duration_; seconds_per_wave_slice_ = rest_duration / wave_slice_num_per_eigen_interval_; } init_config: if (!configed) { UseDefaultConfig(); } short_addr_map_.clear(); ShortAddrCfg::ReadCfg(short_addr_map_); // read upgrade config file: UPGRADE_CONFIG UpgradeCfg::ReadCfg(upgrade_); // read config update file: CONFIG_UPDATE UpdateCfg::ReadCfg(update_); if (current_schedule_status_ == kScheduleStatusDebug) { std::ifstream debug_schedule_file(DEBUG_SCHEDULE_CONFIG); if (debug_schedule_file.good()) { Json::Reader reader; Json::Value root; if (!reader.parse(debug_schedule_file, root, false)) { zlog_error(zbt, "invalid format, fail to parse %s", DEBUG_SCHEDULE_CONFIG); zlog_warn(zct, "due to invalid debug file, status change to normal"); SetScheduleStatus(kScheduleStatusNormal); return; } debug_schedule_file.close(); if (!root.isArray()) { zlog_error(zbt, "invalid format, not an object: %s", DEBUG_SCHEDULE_CONFIG); zlog_warn(zct, "due to invalid debug file, status change to normal"); SetScheduleStatus(kScheduleStatusNormal); return; } else { debug_slice_sensor_id_ = new uint16_t[available_slice_+1]; for (int i = 0; i <= available_slice_; ++i) { debug_slice_sensor_id_[i] = 0; } int i = 1; for (const auto& value : root) { std::cout << value.asInt() << std::endl; // 转换并输出每个整数 zlog_debug(zbt, "[%d] debug sensor:%x", value.asInt()); debug_slice_sensor_id_[i] = value.asInt(); if (debug_slice_sensor_id_[i] != 0) { debug_list_.insert(debug_slice_sensor_id_[i]); } ++i; } if (i == 1) { zlog_warn(zbt, "due to empty debug file, status change to normal"); if (debug_slice_sensor_id_ != NULL) { delete []debug_slice_sensor_id_; debug_slice_sensor_id_ = NULL; } SetScheduleStatus(kScheduleStatusNormal); return; } } } else { zlog_warn(zbt, "due to no debug file, status change to normal"); SetScheduleStatus(kScheduleStatusNormal); return; } } else if (current_schedule_status_ == kScheduleStatusUpgrade) { std::ifstream upgrade_schedule_file(UPGRADE_SCHEDULE_CONFIG); if (upgrade_schedule_file.good()) { Json::Reader reader; Json::Value root; if (!reader.parse(upgrade_schedule_file, root, false)) { zlog_error(zbt, "invalid format, fail to parse %s", UPGRADE_SCHEDULE_CONFIG); zlog_warn(zbt, "due to invalid upgrade file, status change to normal"); SetScheduleStatus(kScheduleStatusNormal); return; } upgrade_schedule_file.close(); if (!root.isArray()) { zlog_error(zbt, "invalid format, not an object: %s", UPGRADE_SCHEDULE_CONFIG); zlog_warn(zbt, "due to invalid upgrade file, status change to normal"); SetScheduleStatus(kScheduleStatusNormal); return; } else { upgrade_slice_sensor_id_ = new uint16_t[available_slice_+1]; for (int i = 0; i <= available_slice_; ++i) { upgrade_slice_sensor_id_[i] = 0; } int i = 1; for (const auto& value : root) { std::cout << value.asInt() << std::endl; // 转换并输出每个整数 zlog_debug(zbt, "[%d] upgrade sensor:%x", value.asInt()); upgrade_slice_sensor_id_[i] = value.asInt(); if (upgrade_slice_sensor_id_[i] > 0) { upgrade_list_.insert(upgrade_slice_sensor_id_[i]); } ++i; } if (i == 1) { zlog_warn(zbt, "due to empty upgrade file, status change to normal"); if (upgrade_slice_sensor_id_ != NULL) { delete []upgrade_slice_sensor_id_; upgrade_slice_sensor_id_ = NULL; } SetScheduleStatus(kScheduleStatusNormal); return; } } } else { zlog_warn(zbt, "due to no upgrade file, status change to normal"); SetScheduleStatus(kScheduleStatusNormal); return; } } zlog_warn(zbt,"init end "); } int SensorScheduler::WaveError(uint16_t short_addr, bool z) { if (wave_resend_num_ <= 0) { zlog_debug(zbt, "[WaveError][%x] no config to resend wave", short_addr); return -1; } if (z) { auto iter = z_failure_map_.find(short_addr); if (iter == z_failure_map_.end()) { z_failure_map_[short_addr] = wave_resend_num_; // 重试次数 zlog_debug(zbt, "[WaveError][%x] z will try %d times", short_addr, wave_resend_num_); return 0; } if (iter->second == 0) { zlog_warn(zbt, "[WaveError][%x] z no try times", short_addr); z_failure_map_.erase(short_addr); return -1; } iter->second = iter->second - 1; zlog_debug(zbt, "[WaveError][%x] z remain try %d times", short_addr, iter->second); } else { auto iter = xy_failure_map_.find(short_addr); if (iter == xy_failure_map_.end()) { xy_failure_map_[short_addr] = wave_resend_num_; // 重试次数 zlog_debug(zbt, "[WaveError][%x] xy will try %d times", short_addr, wave_resend_num_); return 0; } if (iter->second == 0) { zlog_debug(zct, "[WaveError][%x] xy no try times", short_addr); xy_failure_map_.erase(short_addr); return -1; } iter->second = iter->second - 1; zlog_debug(zbt, "[WaveError][%x] xy remain try %d times", short_addr, iter->second); } return 0; } bool SensorScheduler::ZRetransferWave(uint16_t short_addr) { auto iter = z_failure_map_.find(short_addr); if (iter != z_failure_map_.end()) { return true; } return false; } bool SensorScheduler::XYRetransferWave(uint16_t short_addr) { auto iter = xy_failure_map_.find(short_addr); if (iter != xy_failure_map_.end()) { return true; } return false; } bool SensorScheduler::ZMissedWave(uint16_t short_addr) { if (z_patch_set_.count(short_addr) > 0) { return true; } return false; } bool SensorScheduler::XYMissedWave(uint16_t short_addr) { if (xy_patch_set_.count(short_addr) > 0) { return true; } return false; } void SensorScheduler::WaveSuccess(uint16_t short_addr, bool z) { zlog_debug(zbt, "[%x] wave z:%d success", short_addr, z); if (z) { z_success_set_.insert(short_addr); z_patch_set_.erase(short_addr); auto iter = z_failure_map_.find(short_addr); if (iter != z_failure_map_.end()) { zlog_debug(zbt, "[WaveSuccess][%x] try %d times success", short_addr, 4 - iter->second); z_failure_map_.erase(short_addr); return; } } else { xy_success_set_.insert(short_addr); xy_patch_set_.erase(short_addr); auto iter = xy_failure_map_.find(short_addr); if (iter != xy_failure_map_.end()) { zlog_debug(zbt, "[WaveSuccess][%x] try %d times success", short_addr, 4 - iter->second); xy_failure_map_.erase(short_addr); return; } } return; } void SensorScheduler::ClearFailureSuccessMap() { z_failure_map_.clear(); z_success_set_.clear(); z_patch_set_.clear(); xy_failure_map_.clear(); xy_success_set_.clear(); xy_patch_set_.clear(); } int SensorScheduler::GetAvailableId(uint16_t short_addr) { int max_support_sensor[96] = {0}; // 5分钟里面有4分钟用于传输波形,2小时有96个 for (auto it = short_addr_map_.begin(); it != short_addr_map_.end(); ++it) { max_support_sensor[it->second] = 1; } int available_id = 0; for (int i = 1; i < 96; ++i) { if (max_support_sensor[i] == 0) { available_id = i; break; } } zlog_warn(zbt, "[GetAvailableId][%d] short addr : %x", available_id, short_addr); short_addr_map_[short_addr] = available_id; ShortAddrCfg::WriteCfg(short_addr_map_); return available_id; } int SensorScheduler::WriteScheduleCfg(long &ts, std::string &world_time) { Json::Value root; root["schedule_start_timestamp"] = std::to_string(ts); root["schedule_start_time"] = world_time; root["eigen_value_send_interval"] = eigen_value_send_interval_; root["wave_form_send_interval"] = wave_form_send_interval_; root["max_sensor_num"] = max_sensor_num_; root["available_slice"] = available_slice_; root["free_slice"] = free_slice_; if (slice_sensor_id_ != NULL) { delete [] slice_sensor_id_; slice_sensor_id_ = NULL; } if (slice_is_z_wave_ != NULL) { delete [] slice_is_z_wave_; slice_is_z_wave_ = NULL; } slice_sensor_id_ = new int[available_slice_+1]; slice_is_z_wave_ = new uint8_t[available_slice_+1]; for (int i = 0; i <= available_slice_; ++i) { slice_sensor_id_[i] = 0; } int xy_wave_start_id = 0; if (max_sensor_num_ <= 4) { xy_wave_start_id = 5; // 防止z,xy在时间上排得太近 } else { xy_wave_start_id = max_sensor_num_ + 1; } if (max_sensor_num_ == 2 && free_slice_ == 0) { xy_wave_start_id = 2; } if (max_sensor_num_ == 1 && free_slice_ == 0) { if (available_slice_ == 2) { xy_wave_start_id = 1; } else if (available_slice_ == 4) { xy_wave_start_id = 2; } } for (int i = 1; i <= max_sensor_num_; ++i) { sensor_id_nth_slice_[i] = { i, i + xy_wave_start_id }; slice_sensor_id_[i] = i; slice_sensor_id_[i + xy_wave_start_id] = i; slice_is_z_wave_[i] = true; slice_is_z_wave_[i + xy_wave_start_id] = false; } Json::StyledStreamWriter streamWriter; std::ofstream out_file(SCHEDULE_CONFIG); streamWriter.write(out_file, root); out_file.close(); return 0; } int SensorScheduler::Config(int eigen_value_send_interval, int wave_form_send_interval, int &max_sensor_num, int wave_resend_num, std::string &error_msg) { int available_slice = 0; int free_slice = 0; int ret = CalcAvailableSlice(eigen_value_send_interval, wave_form_send_interval, max_sensor_num, available_slice, free_slice, error_msg); if (ret != 0) { return ret; } eigen_value_send_interval_ = eigen_value_send_interval; eigen_value_send_duration_ = 2; wave_form_send_interval_ = wave_form_send_interval; wave_form_send_duration_ = 60; max_sensor_num_ = max_sensor_num; wave_resend_num_ = wave_resend_num; available_slice_ = available_slice; free_slice_ = free_slice; eigen_value_slice_total_seconds_ = 60; int rest_duration = eigen_value_send_interval_ - eigen_value_slice_total_seconds_; wave_slice_num_per_eigen_interval_ = rest_duration / wave_form_send_duration_; seconds_per_wave_slice_ = rest_duration / wave_slice_num_per_eigen_interval_; std::string world_time; long current_ts = GetLocalWorldTime(world_time); ret = WriteScheduleCfg(current_ts, world_time); if (ret != 0) { return ret; } start_timestamp_ = current_ts; start_ts_str_ = world_time; return 0; } // 特征值发送间隔300秒,波形发送间隔为7200秒 // 一次特征值发送时长为2秒,波形发送时长为60秒,所有特征值在特征值发送间隔的第1分钟中的第3秒至第57秒全部完成 int SensorScheduler::CalcAvailableSlice(int eigen_value_send_interval, int wave_form_send_interval, int &max_sensor_num, int &available_slice, int &free_slice, std::string &error_msg) { // if (max_sensor_num <= 0) { // error_msg = "max_sensor_num:" + std::to_string(max_sensor_num) + " must bigger than 0"; // zlog_error(zbt, "%s", error_msg.c_str()); // return 1; // } if (2 > eigen_value_send_interval) { error_msg = "invalid max_sensor_num:" + std::to_string(max_sensor_num) + " * eigen_value_send_duration:" + std::to_string(2) + " > eigen_value_send_interval" + std::to_string(eigen_value_send_interval); zlog_error(zbt, "%s", error_msg.c_str()); return 2; } // if (max_sensor_num * 60 * 2 > wave_form_send_interval) { // xy, z分开发送 // error_msg = "invalid wave_form_send_duration:" + std::to_string(60) + // "* 2 * max_sensor_num:" + std::to_string(max_sensor_num) + " > wave_form_send_interval:" + std::to_string(wave_form_send_interval); // zlog_error(zbt, "%s", error_msg.c_str()); // return 3; // } if (wave_form_send_interval % eigen_value_send_interval != 0) { error_msg = "wave_form_send_interval:" + std::to_string(wave_form_send_interval) + " %% eigen_value_send_interval:" + std::to_string(eigen_value_send_interval) + " != 0"; zlog_error(zbt, "%s", error_msg.c_str()); return 4; } int total_eigen_value_send_duration = 60; int rest_duration = eigen_value_send_interval - total_eigen_value_send_duration; int slice_per_eigen_value_interval = rest_duration / 60; available_slice = wave_form_send_interval / eigen_value_send_interval * slice_per_eigen_value_interval; // free_slice = available_slice - max_sensor_num * 2; max_sensor_num = available_slice / 2; // if (free_slice < 0) { // error_msg = "invalid config, available slice:" + std::to_string(available_slice) + ", required slice:" + std::to_string(max_sensor_num*2); // zlog_error(zbt, "%s", error_msg.c_str()); // return 5; // } if (available_slice <= 0) { error_msg = "invalid config, available slice:" + std::to_string(available_slice) + ", required slice:" + std::to_string(max_sensor_num*2); zlog_error(zbt, "%s", error_msg.c_str()); return 5; } return 0; } int SensorScheduler::GetScheduleConfig(int &eigen_value_send_interval, int &wave_form_send_interval, int &wave_resend_num, int &max_sensor_num) { eigen_value_send_interval = eigen_value_send_interval_; wave_form_send_interval = wave_form_send_interval_; wave_resend_num = wave_resend_num_; max_sensor_num = max_sensor_num_; return 0; } int SensorScheduler::UpdateSensorConfig(uint16_t short_addr) { int id = 0; auto iter = short_addr_map_.find(short_addr); if (iter == short_addr_map_.end()) { zlog_warn(zbt, "cannot find id for short_addr %d", short_addr); return 1; } else { id = iter->second; } if (update_.count(id) > 0) { return 0; } update_.insert(id); UpdateCfg::WriteCfg(update_); return 0; } int SensorScheduler::UpdateConfigResult(uint16_t short_addr, int result) { int id = 0; auto iter = short_addr_map_.find(short_addr); if (iter == short_addr_map_.end()) { zlog_warn(zbt, "cannot find id for short_addr %x", short_addr); return 1; } else { id = iter->second; } if (result != 0) { return 0; } zlog_info(zbt, "[%d] short addr:%d update successfully", id, short_addr); update_.erase(id); UpdateCfg::WriteCfg(update_); return 0; } int SensorScheduler::UpgradeSensor(std::vector ¶m_list) { if (param_list.size() == 0) { zlog_warn(zbt, "upgrade list is empty, do nothing"); return 1; } zlog_debug(zbt, "current status:%d", current_schedule_status_); if (current_schedule_status_ == kScheduleStatusUpgrade) { std::unordered_set tmp_set; for (auto item : param_list) { tmp_set.insert(item.short_addr); } if (tmp_set == upgrade_list_) { zlog_warn(zct, "upgrade list and mode are same, do nothing"); return 0; } } else { upgrade_list_.clear(); } for (auto item : param_list) { upgrade_list_.insert(item.short_addr); } int upgrade_num = 0; int id = 0; long ts = GetLocalTs(); for (auto item : param_list) { auto iter = short_addr_map_.find(item.short_addr); if (iter == short_addr_map_.end()) { zlog_error(zbt, "cannot find id for short_addr %x", item.short_addr); continue; } else { id = iter->second; } UpgradeInfo info; info.try_times = 0; info.sensor_type = item.sensor_type; info.hw_version = item.hw_version; info.current_sw_version = item.current_sw_version; info.upgrade_sw_version = item.upgrade_sw_version; info.submit_time = GetUTCTime(ts); upgrade_[id] = info; zlog_info(zbt, "[%d] short addr:%x add upgrade info", id, item.short_addr); ++upgrade_num; } if (upgrade_num > 0) { UpgradeCfg::WriteCfg(upgrade_); GenerateUpgradeSchedule(); current_schedule_status_ = kScheduleStatusUpgrade; set_schedule_status(current_schedule_status_); } else { return 1; } return 0; } int SensorScheduler::CancelUpgradeSensor(std::vector short_addr_list) { if (short_addr_list.size() == 0) { return 0; } int cancel_num = 0; for (auto short_addr : short_addr_list) { upgrade_list_.erase(short_addr); auto iter = short_addr_map_.find(short_addr); if (iter == short_addr_map_.end()) { zlog_info(zbt, "cannot find id for short_addr %x", short_addr); continue; } else { int id = iter->second; upgrade_.erase(id); ++cancel_num; } } if (cancel_num > 0) { if (upgrade_list_.size() == 0) { current_schedule_status_ = kScheduleStatusNormal; set_schedule_status(current_schedule_status_); UpgradeCfg::ClearCfg(); } else { GenerateUpgradeSchedule(); UpgradeCfg::WriteCfg(upgrade_); } } return 0; } int SensorScheduler::UpgradeResult(uint16_t short_addr, int result) { zlog_info(zbt, "[%x] upgrade result:%d", short_addr, result); int id = 0; auto iter = short_addr_map_.find(short_addr); if (iter == short_addr_map_.end()) { zlog_info(zbt, "cannot find id for short_addr %x", short_addr); return 1; } else { id = iter->second; } if (result == kUpgradeSuccess || result == kProductTypeMismatch || result == kZigbeeHWMismatch || result == kUpgradeDoneBefore) { upgrade_list_.erase(short_addr); upgrade_.erase(id); zlog_info(zbt, "[%d] short addr:%x upgrade successfully", id, short_addr); if (upgrade_list_.size() == 0) { zlog_info(zbt, "no upgrade sensor, go to normal status"); current_schedule_status_ = kScheduleStatusNormal; set_schedule_status(current_schedule_status_); UpgradeCfg::ClearCfg(); } else { UpgradeCfg::WriteCfg(upgrade_); GenerateUpgradeSchedule(); } } else { auto upgrade_iter = upgrade_.find(id); if (upgrade_iter->second.try_times >= 10 /*wave_resend_num_*/) { zlog_warn(zbt, "[%d] short addr:%x upgrade %d time failure", id, short_addr, wave_resend_num_); upgrade_list_.erase(short_addr); upgrade_.erase(id); if (upgrade_list_.size() == 0) { zlog_info(zbt, "no upgrade sensor, go to normal status"); current_schedule_status_ = kScheduleStatusNormal; set_schedule_status(current_schedule_status_); UpgradeCfg::ClearCfg(); } else { UpgradeCfg::WriteCfg(upgrade_); GenerateUpgradeSchedule(); } } else { UpdateUpgradeInfo(id); } } return 0; } void SensorScheduler::UpdateUpgradeInfo(int id) { auto upgrade_iter = upgrade_.find(id); upgrade_iter->second.try_times++; zlog_debug(zbt, "[%d] try_times:%d", id, upgrade_iter->second.try_times); long ts = GetLocalTs(); upgrade_iter->second.try_world_time1.push_back(GetUTCTime(ts)); UpgradeCfg::WriteCfg(upgrade_); } void SensorScheduler::ModifyScheduleTs(int diff_ts) { zlog_warn(zbt, "[ModifyScheduleTs] adjust ts:%d, from:%ld to:%ld", diff_ts, start_timestamp_, start_timestamp_ + diff_ts); start_timestamp_ += diff_ts; start_ts_str_ = GetUTCTime(start_timestamp_); std::ifstream schedule_file(SCHEDULE_CONFIG); Json::Reader reader; Json::Value root; if (!reader.parse(schedule_file, root, false)) { zlog_error(zbt, "[ModifyScheduleTs] invalid format, fail to parse %s", SCHEDULE_CONFIG); schedule_file.close(); return; } schedule_file.close(); root["schedule_start_timestamp"] = std::to_string(start_timestamp_); root["schedule_start_time"] = start_ts_str_; Json::StyledStreamWriter streamWriter; std::ofstream out_file(SCHEDULE_CONFIG); streamWriter.write(out_file, root); out_file.close(); } void SensorScheduler::ClearScheduleCfg(uint16_t short_addr) { zlog_warn(zbt, "[ClearScheduleCfg] clear all schedule config, short_addr:%x", short_addr); if (short_addr == 0) { zlog_warn(zbt, "[ClearScheduleCfg] clear all"); update_.clear(); upgrade_.clear(); short_addr_map_.clear(); ShortAddrCfg::ClearCfg(); UpdateCfg::ClearCfg(); UpgradeCfg::ClearCfg(); wave_feature_set_inst::instance().RemoveAllFeatureCfg(); wave_feature_set_inst::instance().RemoveAllWaveCfg(); } else { UpdateConfigResult(short_addr, 0); UpgradeResult(short_addr, kUpgradeSuccess); short_addr_map_.erase(short_addr); ShortAddrCfg::WriteCfg(short_addr_map_); wave_feature_set_inst::instance().RemoveFeatureCfg(short_addr); wave_feature_set_inst::instance().RemoveWaveCfg(short_addr); } } void SensorScheduler::CleanIdleOccupiedSet(long ts) { if (free_slice_ocuppied_.size() > 5) { for (auto it = free_slice_ocuppied_.begin(); it != free_slice_ocuppied_.end();) { if ((*it) < ts) { it = free_slice_ocuppied_.erase(it); } else ++it; } } } void SensorScheduler::UseDefaultConfig() { zlog_info(zbt, "use default configuration"); int eigen_value_send_interval = 300; int wave_form_send_interval = 7200; int eigen_value_send_duration = 6; int wave_form_send_duration = 50; int max_sensor_num = 32; int wave_resend_num = 3; // int eigen_value_send_interval = 300; // int wave_form_send_interval = 1200; // int eigen_value_send_duration = 2; // 固定的 // int wave_form_send_duration = 60; // 固定的 // int max_sensor_num = 4; // int wave_resend_num = 3; std::string error_msg; Config(eigen_value_send_interval, wave_form_send_interval, max_sensor_num, wave_resend_num, error_msg); } void SensorScheduler::SetScheduleStatus(ScheduleStatus status) { if (status != current_schedule_status_) { zlog_warn(zbt, "schedule status from:%d to %d", current_schedule_status_, status); current_schedule_status_ = status; set_schedule_status(current_schedule_status_); } else { zlog_warn(zbt, "schedule status not change:%d", status); } } ScheduleStatus SensorScheduler::GetScheduleStatus() { return current_schedule_status_; } void SensorScheduler::GenerateDebugSchedule(std::vector short_addr_list) { int debug_size = short_addr_list.size(); debug_list_.clear(); for (auto item : short_addr_list) { debug_list_.insert(item); } // 时间还是用正常模式的,因为特征值还是正常的,所以只把原来波形调度的地方换成调试的传感器 // // 1 根据传感器列表,找到传感器编号,例如:1,2,3 // 2 这是第几个5分钟,从这个5分钟开始,放波形,如果这是2小时中的最后一个5分钟,要考虑越界的问题 // 3 如果是1到2个传感器,隔2分钟(两个波形间隔)测试一次; 如果是大于2个传感器,一直连排就行了 // 进入调试模式 long current_ts = GetLocalTs(); long nth_wave_start_slice = (current_ts - start_timestamp_) / wave_form_send_interval_; long current_wave_start_ts = nth_wave_start_slice * wave_form_send_interval_ + start_timestamp_; long seconds_in_current_wave_slice = current_ts - current_wave_start_ts; long nth_eigen_value_slice = seconds_in_current_wave_slice / eigen_value_send_interval_; long seconds_in_current_eigen_slice = seconds_in_current_wave_slice % eigen_value_send_interval_; int previous_wave_slice = wave_slice_num_per_eigen_interval_ * (nth_eigen_value_slice + 1); if (previous_wave_slice == available_slice_) { previous_wave_slice = 0; } debug_slice_sensor_id_ = new uint16_t[available_slice_+1]; for (int i = 0; i <= available_slice_; ++i) { debug_slice_sensor_id_[i] = 0; } if (debug_size == 1) { // 只有一个传感器的话,两分钟一次波形 zlog_debug(zbt, "one sensor debug"); int j = 0; for (int i = previous_wave_slice+1; i < previous_wave_slice+1+available_slice_; i = i + 3) { j = i % available_slice_; if (j == 0) { j = i; } debug_slice_sensor_id_[j] = short_addr_list[0]; } } else { zlog_debug(zbt, "%d sensor debug", debug_list_.size()); int j = 0; int k = 0; for (int i = previous_wave_slice+1; i < previous_wave_slice+1+available_slice_; ++i) { j = i % available_slice_; if (j == 0) { j = i; } debug_slice_sensor_id_[j] = short_addr_list[k%debug_size]; ++k; } } Json::Value root; for (int i = 1; i <= available_slice_; ++i) { root.append(debug_slice_sensor_id_[i]); } Json::StyledStreamWriter streamWriter; std::ofstream out_file(DEBUG_SCHEDULE_CONFIG); streamWriter.write(out_file, root); out_file.close(); // 从下一个5分钟开始排,因为只要错过了这个5分钟的第1分钟时间,传感器都将无法调度 // 下一个5分钟是第几个5分钟间隔,波形窗口是第多少个 // 对于特征值后的波形,如果特征值时间离波形时间间隔小于10秒,那就再加20秒上去,这样保证传感器至少休息20秒 current_schedule_status_ = kScheduleStatusDebug; set_schedule_status(current_schedule_status_); } int SensorScheduler::OpenDebugMode(std::vector short_addr_list) { if (short_addr_list.size() == 0) { zlog_warn(zbt, "debug list is empty, do nothing"); return 1; } // 给进入调试模式的传感器分配波形时间片 if (current_schedule_status_ == kScheduleStatusDebug) { std::unordered_set tmp_set; for (auto item : short_addr_list) { tmp_set.insert(item); } if (tmp_set == debug_list_) { zlog_warn(zbt, "debug list and mode are same, do nothing"); return 0; } // 重新分配z轴波形时间片 GenerateDebugSchedule(short_addr_list); return 0; } GenerateDebugSchedule(short_addr_list); return 0; } int SensorScheduler::CloseDebugMode() { if (current_schedule_status_ != kScheduleStatusDebug) { return 0; } if (debug_slice_sensor_id_ != NULL) { delete []debug_slice_sensor_id_; debug_slice_sensor_id_ = NULL; } current_schedule_status_ = kScheduleStatusNormal; set_schedule_status(current_schedule_status_); return 0; } void SensorScheduler::GenerateUpgradeSchedule() { zlog_debug(zbt, "GenerateUpgradeSchedule start"); if (upgrade_list_.size() == 0) { return; } std::vector short_addr_list; for (auto item : upgrade_list_) { short_addr_list.push_back(item); } int upgrade_size = short_addr_list.size(); // 时间还是用正常模式的,因为特征值还是正常的,所以只把原来波形调度的地方换成升级的传感器 // // 1 根据传感器列表,找到传感器编号,例如:1,2,3 // 2 这是第几个5分钟,从这个5分钟开始,放波形,如果这是2小时中的最后一个5分钟,要考虑越界的问题 // 3 如果是1到2个传感器,隔2分钟(两个波形间隔)测试一次; 如果是大于2个传感器,一直连排就行了 long current_ts = GetLocalTs(); long nth_wave_start_slice = abs(current_ts - start_timestamp_) / wave_form_send_interval_; zlog_debug(zbt, "current ts: %ld, start ts:%ld, wave_form_send_interval_:%d", current_ts, start_timestamp_, wave_form_send_interval_); long current_wave_start_ts = nth_wave_start_slice * wave_form_send_interval_ + start_timestamp_; long seconds_in_current_wave_slice = current_ts - current_wave_start_ts; long nth_eigen_value_slice = seconds_in_current_wave_slice / eigen_value_send_interval_; long seconds_in_current_eigen_slice = seconds_in_current_wave_slice % eigen_value_send_interval_; int previous_wave_slice = wave_slice_num_per_eigen_interval_ * (nth_eigen_value_slice + 1); zlog_debug(zbt, "seconds_in_current_wave_slice:%ld, nth_eigen_value_slice:%ld, seconds_in_current_eigen_slice:%ld, previous_wave_slice: %d", seconds_in_current_wave_slice, nth_eigen_value_slice, seconds_in_current_eigen_slice, previous_wave_slice); // if (previous_wave_slice < 0) { // zlog_error(zbt, "previous_wave_slice: %d", previous_wave_slice); // } if (previous_wave_slice == available_slice_) { previous_wave_slice = 0; } if (upgrade_slice_sensor_id_ != NULL) { delete [] upgrade_slice_sensor_id_; upgrade_slice_sensor_id_ = NULL; } upgrade_slice_sensor_id_ = new uint16_t[available_slice_+1]; for (int i = 0; i <= available_slice_; ++i) { upgrade_slice_sensor_id_[i] = 0; } if (upgrade_size == 1) { // 只有一个传感器的话,两分钟一次波形 zlog_debug(zbt, "upgrade_size 1:%x", short_addr_list[0]); int j = 0; // int k = 0; for (int i = previous_wave_slice+1; i < previous_wave_slice+1+available_slice_; i=i+2) { j = i % available_slice_; if (j == 0) { j = i; } zlog_debug(zbt, "i = %d, j = %d", i, j); // k = k % upgrade_size; upgrade_slice_sensor_id_[j] = short_addr_list[0]; } } else { zlog_debug(zbt, "upgrade_size %d", upgrade_size); int j = 0; int k = 0; for (int i = previous_wave_slice+1; i < previous_wave_slice+1+available_slice_; ++i) { j = i % available_slice_; if (j == 0) { j = i; } k = k % upgrade_size; upgrade_slice_sensor_id_[j] = short_addr_list[k]; ++k; } } Json::Value root; for (int i = 1; i <= available_slice_; ++i) { root.append(upgrade_slice_sensor_id_[i]); } Json::StyledStreamWriter streamWriter; std::ofstream out_file(UPGRADE_SCHEDULE_CONFIG); streamWriter.write(out_file, root); out_file.close(); zlog_debug(zbt, "GenerateUpgradeSchedule end"); } int SensorScheduler::CloseUpgradeMode() { if (current_schedule_status_ != kScheduleStatusUpgrade) { return 0; } if (upgrade_slice_sensor_id_ != NULL) { delete []upgrade_slice_sensor_id_; upgrade_slice_sensor_id_ = NULL; } upgrade_list_.clear(); std::string clear_cfg_file_cmd = "rm -f " + std::string(UPGRADE_SCHEDULE_CONFIG); system(clear_cfg_file_cmd.c_str()); current_schedule_status_ = kScheduleStatusNormal; set_schedule_status(current_schedule_status_); return 0; } long SensorScheduler::GetLocalTs() { auto now = std::chrono::system_clock::now(); auto timestamp = std::chrono::duration_cast(now.time_since_epoch()).count(); // zlog_debug(zct, "current timestamp:%lld", timestamp); return timestamp; } long SensorScheduler::GetLocalWorldTime(std::string &world_time) { auto now = std::chrono::system_clock::now(); std::time_t now_c = std::chrono::system_clock::to_time_t(now); std::tm *local_time = std::localtime(&now_c); char str[100] = {0}; snprintf(str, sizeof(str), "%04d-%02d-%02d %02d:%02d:%02d", local_time->tm_year + 1900, local_time->tm_mon+1, local_time->tm_mday, local_time->tm_hour, local_time->tm_min, local_time->tm_sec); world_time = str; auto timestamp = std::chrono::duration_cast(now.time_since_epoch()).count(); // zlog_debug(zct, "world time:%s, timestamp:%lld", world_time.c_str(), timestamp); return timestamp; } std::string SensorScheduler::GetUTCTime(long ts) { std::chrono::time_point timePoint = std::chrono::system_clock::from_time_t(ts); std::time_t utcTime = std::chrono::system_clock::to_time_t(timePoint); std::tm* local_time = std::gmtime(&utcTime); local_time->tm_hour = local_time->tm_hour + 8; if (local_time->tm_hour > 24) { local_time->tm_hour -= 24; local_time->tm_mday += 1; } char str[100] = {0}; snprintf(str, sizeof(str), "%04d-%02d-%02d %02d:%02d:%02d", local_time->tm_year + 1900, local_time->tm_mon+1, local_time->tm_mday, local_time->tm_hour, local_time->tm_min, local_time->tm_sec); std::string world_time = str; return world_time; } void SensorScheduler::WriteTriggerWaveRecord() { Json::Value root; for (auto it = trigger_wave_record_.begin(); it != trigger_wave_record_.end(); ) { // 检查 pair 中两个值是否都是 0 if (it->second.first == 0 && it->second.second == 0) { // 使用 erase 方法删除当前元素 it = trigger_wave_record_.erase(it); // erase 返回指向下一个元素的迭代器 } else { ++it; // 继续迭代 } } for (const auto& entry : trigger_wave_record_) { uint16_t key = entry.first; const auto& value = entry.second; // if (value.first == 0 && value.second == 0) { // continue; // } Json::Value item; item["first"] = value.first; item["second"] = value.second; root[std::to_string(key)] = item; } Json::StyledStreamWriter writer; std::ofstream outFile(TRIGGER_WAVE_CONFIG); if (!outFile.is_open()) { zlog_warn(zbt, "Failed to open %s file", TRIGGER_WAVE_CONFIG); return; } writer.write(outFile, root); outFile.close(); } void SensorScheduler::ReadTriggerWaveRecord() { Json::Value root; std::ifstream inFile(TRIGGER_WAVE_CONFIG); if (!inFile.is_open()) { return; } Json::Reader reader; if (!reader.parse(inFile, root, false)) { inFile.close(); return; } inFile.close(); for (const auto& key : root.getMemberNames()) { uint16_t uint16Key = static_cast(std::stoi(key)); auto value = root[key]; uint8_t first = static_cast(value["first"].asUInt()); uint8_t second = static_cast(value["second"].asUInt()); trigger_wave_record_[uint16Key] = std::make_pair(first, second); } }