#include "schedule.hpp" #include #include #include #include #include #include #include #include #include #include "short_addr_cfg.hpp" #include "update_cfg.hpp" #include "wave_feature_set.hpp" extern zlog_category_t *zct; extern zlog_category_t *zbt; uint8_t g_x, g_y, g_z; int SensorScheduler::StartSchedule(int short_addr, int &next_duration, int &next_task_id) { int id = 0; auto iter = short_addr_map_.find(short_addr); if (iter == short_addr_map_.end()) { id = GetAvailableId(short_addr); } else { id = iter->second; } current_ts_ = GetLocalTs(); CleanIdleOccupiedSet(current_ts_); nth_wave_start_slice_ = (current_ts_ - start_timestamp_) / wave_form_send_interval_; current_wave_start_ts_ = nth_wave_start_slice_ * wave_form_send_interval_ + start_timestamp_; seconds_in_current_wave_slice_ = current_ts_ - current_wave_start_ts_; nth_eigen_value_slice_ = seconds_in_current_wave_slice_ / eigen_value_send_interval_; seconds_in_current_eigen_slice_ = seconds_in_current_wave_slice_ % eigen_value_send_interval_; ts_in_eigen_slice_ = false; if (seconds_in_current_eigen_slice_ < eigen_value_slice_total_seconds_ - 3) { ts_in_eigen_slice_ = true; } if (ts_in_eigen_slice_) { nth_eigen_slice_ = (seconds_in_current_eigen_slice_ + 2) / eigen_value_send_duration_; if (nth_eigen_value_slice_ == 0) { ClearFailureSuccessMap(); } } else { nth_wave_slice_ = (seconds_in_current_eigen_slice_ - eigen_value_slice_total_seconds_ + 3) / seconds_per_wave_slice_; } zlog_warn(zct, "[%d:%x] ts:%ld, current utc:%s, nth eigen_value slice:%d, seconds in eigen slice:%d, eigen slice:%d", id, short_addr, current_ts_, GetUTCTime(current_ts_).c_str(), nth_eigen_value_slice_+1, seconds_in_current_eigen_slice_, ts_in_eigen_slice_); if (ts_in_eigen_slice_) { // if (id == nth_eigen_slice_ + 1) { // 传感器需要执行上送特征值任务, 如果有配置需要下发的话,下发配置 if (update_.count(id)) { // execute config zlog_warn(zct, "[%d:%x] update config in eigen slice", id, short_addr); current_request_ = kScheduleConfigSensor; return kScheduleConfigSensor; } else { // wave_feature_set_inst::instance().GetFeatureCfg(short_addr, g_x, g_y, g_z); // if (g_x || g_y || g_z) { // // 执行上送特征值任务 // zlog_warn(zct, "[%d:%x] send eigen value in eigen slice", id, short_addr); // current_request_ = kScheduleEigenValue; // return kScheduleEigenValue; // } else { next_duration = GetNextDuration(short_addr, next_task_id); zlog_warn(zct, "[%d:%x] no need for eigen", id, short_addr); return kScheduleResultNone; // } } // } else { // zlog_warn(zct, "[%d:%x] Invalid request, revive in %d eigen slice", id, short_addr, nth_eigen_slice_ + 1); // if (id < nth_eigen_slice_ + 1) { // // 不正确的请求 // long available_ts = current_wave_start_ts_ + (nth_eigen_value_slice_ + 1) * eigen_value_send_interval_ + (id - 1) * eigen_value_send_duration_; // next_duration = available_ts - current_ts_; // zlog_warn(zct, "[%d:%x] wrong time in eigen slice, next feature in next interval send utc time:[%s], duration:%d", id, short_addr, GetUTCTime(available_ts).c_str(), next_duration); // } else { // long available_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + (id - 1) * eigen_value_send_duration_; // next_duration = available_ts - current_ts_; // zlog_warn(zct, "[%d:%x] wrong time in eigen slice, next feature in current interval send utc time:[%s], duration:%d", id, short_addr, GetUTCTime(available_ts).c_str(), next_duration); // } // return kScheduleWrongTime; // } } else { int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_ + 1; auto wave_slice_iter = sensor_id_nth_slice_.find(id); if (wave_slice_iter == sensor_id_nth_slice_.end()) { zlog_error(zct, "[%d:%x]invaild id, not find wave slice id, need to check further", id, short_addr); return kScheduleUnknownSensor; } if (nth_wave_slice == wave_slice_iter->second) { // 需要发送波形, 需要判断是否有配置,升级需求 auto upgrade_iter = upgrade_.find(id); if (upgrade_iter != upgrade_.end()) { if (upgrade_iter->second.try_times < 10) { current_request_ = kScheduleUpgrade; UpdateUpgradeInfo(id); zlog_warn(zct, "[%d:%x] in wave slice to upgrade now from version:%s to %s, try time:%d", id, short_addr, upgrade_iter->second.current_sw_version.c_str(), upgrade_iter->second.upgrade_sw_version.c_str(), upgrade_iter->second.try_times); return kScheduleUpgrade; } } // wave_feature_set_inst::instance().GetWaveCfg(short_addr, g_x, g_y, g_z); // if (g_x || g_y || g_z) { // zlog_warn(zct, "[%d:%x] it is wave time", id, short_addr); // current_request_ = kScheduleWaveForm; // return kScheduleWaveForm; // } else { next_duration = GetNextDuration(short_addr, next_task_id); zlog_warn(zct, "[%d:%x] no need for wave", id, short_addr); return kScheduleResultNone; // } } else { if (slice_sensor_id_[nth_wave_slice-1] == 0) { zlog_warn(zct, "[%d:%x] in idle time", id, short_addr); // idle time auto upgrade_iter = upgrade_.find(id); if (upgrade_iter != upgrade_.end()) { if (upgrade_iter->second.try_times < 10) { current_request_ = kScheduleUpgrade; UpdateUpgradeInfo(id); zlog_warn(zct, "[%d:%x] in idle to upgrade now from version:%s to %s, try time:%d", id, short_addr, upgrade_iter->second.current_sw_version.c_str(), upgrade_iter->second.upgrade_sw_version.c_str(), upgrade_iter->second.try_times); return kScheduleUpgrade; } } // if (RetransferWave(short_addr)) { // zlog_warn(zct, "[%d:%x] it is retransfer wave time", id, short_addr); // current_request_ = kScheduleWaveForm; // return kScheduleWaveForm; // } else if (MissedWave(short_addr)) { // zlog_warn(zct, "[%d:%x] it is patch wave time", id, short_addr); // current_request_ = kScheduleWaveForm; // patch_set_.erase(short_addr); // return kScheduleWaveForm; // } } // wrong time to come long available_ts = current_wave_start_ts_ + (nth_eigen_value_slice_ + 1) * eigen_value_send_interval_ + (id - 1) * eigen_value_send_duration_; next_duration = available_ts - current_ts_; next_task_id = kScheduleEigenValue; zlog_warn(zct, "[%d:%x] wrong time in wave slice, next feature send utc time:[%s], duration:%d", id, short_addr, GetUTCTime(available_ts).c_str(), next_duration); return kScheduleResultNone; } } } long SensorScheduler::CalcNextTimestamp(int id, uint16_t short_addr, int& next_task_id) { // current_ts_ = GetLocalTs(); // nth_wave_start_slice_ = (current_ts_ - start_timestamp_) / wave_form_send_interval_; // current_wave_start_ts_ = nth_wave_start_slice_ * wave_form_send_interval_ + start_timestamp_; // seconds_in_current_wave_slice_ = current_ts_ - current_wave_start_ts_; // nth_eigen_value_slice_ = seconds_in_current_wave_slice_ / eigen_value_send_interval_; // seconds_in_current_eigen_slice_ = seconds_in_current_wave_slice_ % eigen_value_send_interval_; // ts_in_eigen_slice_ = false; // if (seconds_in_current_eigen_slice_ < eigen_value_slice_total_seconds_ - 3) { // ts_in_eigen_slice_ = true; // } nth_wave_start_slice_ = (current_ts_ - start_timestamp_) / wave_form_send_interval_; current_wave_start_ts_ = nth_wave_start_slice_ * wave_form_send_interval_ + start_timestamp_; seconds_in_current_wave_slice_ = current_ts_ - current_wave_start_ts_; nth_eigen_value_slice_ = seconds_in_current_wave_slice_ / eigen_value_send_interval_; seconds_in_current_eigen_slice_ = seconds_in_current_wave_slice_ % eigen_value_send_interval_; ts_in_eigen_slice_ = false; if (seconds_in_current_eigen_slice_ < eigen_value_slice_total_seconds_ - 3) { ts_in_eigen_slice_ = true; } if (ts_in_eigen_slice_) { int forward_wave_slice_num = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_; auto wave_slice_iter = sensor_id_nth_slice_.find(id); if (wave_slice_iter == sensor_id_nth_slice_.end()) { zlog_error(zct, "[Nxt] invaild id:%d, not find wave slice id", id); long available_ts = current_wave_start_ts_ + eigen_value_send_interval_ + nth_eigen_slice_ * eigen_value_send_duration_; zlog_error(zct, "[Nxt] [%d] next feature send utc time:[%s]", id, GetUTCTime(available_ts).c_str()); next_task_id = kScheduleEigenValue; return available_ts; } int wave_slice = wave_slice_iter->second; // 从1开始 long send_wave_ts = 0; wave_feature_set_inst::instance().GetWaveCfg(short_addr, g_x, g_y, g_z); if (g_x || g_y || g_z) { if (wave_slice > forward_wave_slice_num && wave_slice <= forward_wave_slice_num + wave_slice_num_per_eigen_interval_) { // 发送波的时间窗也在本次特征值发送间隔中 for (int i = forward_wave_slice_num; i <= forward_wave_slice_num + wave_slice_num_per_eigen_interval_; ++i) { if (wave_slice - 1 == i) { send_wave_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + eigen_value_slice_total_seconds_ + (i - forward_wave_slice_num) * seconds_per_wave_slice_; zlog_warn(zct, "[Nxt] [%d:%x] send wave time:[%s]", id, short_addr, GetUTCTime(send_wave_ts).c_str()); break; } } } if (!MissedWave(short_addr) && send_wave_ts == 0 && success_set_.count(short_addr) == 0) { // add for patch wave int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_ + 1; auto wave_slice_iter = sensor_id_nth_slice_.find(id); if (wave_slice_iter != sensor_id_nth_slice_.end()) { if (nth_wave_slice > wave_slice_iter->second) { if (success_set_.count(short_addr) == 0 && !RetransferWave(short_addr)) { zlog_warn(zct, "[Nxt] [%d:%x] add it to patch set", id, short_addr); patch_set_.insert(short_addr); } } } } } long available_ts = 0; auto upgrade_iter = upgrade_.find(id); if (upgrade_iter != upgrade_.end()) { if (upgrade_iter->second.try_times < 10) { for (int i = 0; i < wave_slice_num_per_eigen_interval_; ++i) { if (slice_sensor_id_[i+forward_wave_slice_num] == 0) { // 判断此空闲位置是否被占用 long current_wave_slice_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + eigen_value_slice_total_seconds_ + i * seconds_per_wave_slice_; if (free_slice_ocuppied_.count(current_wave_slice_ts) == 0) { available_ts = current_wave_slice_ts; free_slice_ocuppied_.insert(available_ts); zlog_warn(zct, "[Nxt][%d:%x] %d nth free wave slice will be used to upgrade, utc time:[%s]", id, short_addr, i+forward_wave_slice_num, GetUTCTime(available_ts).c_str()); // break; next_task_id = kScheduleUpgrade; return available_ts; } } } } } else { if (g_x || g_y || g_z) { if (RetransferWave(short_addr)) { for (int i = 0; i < wave_slice_num_per_eigen_interval_; ++i) { if (slice_sensor_id_[i+forward_wave_slice_num] == 0) { // 判断此空闲位置是否被占用 long current_wave_slice_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + eigen_value_slice_total_seconds_ + i * seconds_per_wave_slice_; if (free_slice_ocuppied_.count(current_wave_slice_ts) == 0) { available_ts = current_wave_slice_ts; free_slice_ocuppied_.insert(available_ts); zlog_warn(zct, "[Nxt][%d:%x] %d nth free wave slice will be used to retransfer wave, utc time:[%s]", id, short_addr, i+forward_wave_slice_num, GetUTCTime(available_ts).c_str()); // break; next_task_id = kScheduleWaveForm; return available_ts; } } } } else if (MissedWave(short_addr)) { for (int i = 0; i < wave_slice_num_per_eigen_interval_; ++i) { if (slice_sensor_id_[i+forward_wave_slice_num] == 0) { // 判断此空闲位置是否被占用 long current_wave_slice_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + eigen_value_slice_total_seconds_ + i * seconds_per_wave_slice_; if (free_slice_ocuppied_.count(current_wave_slice_ts) == 0) { available_ts = current_wave_slice_ts; free_slice_ocuppied_.insert(available_ts); zlog_warn(zct, "[Nxt][%d:%x] %d nth free wave slice will be used to patch wave, utc time:[%s]", id, short_addr, i+forward_wave_slice_num, GetUTCTime(available_ts).c_str()); // break; next_task_id = kScheduleWaveForm; return available_ts; } } } } } } if (send_wave_ts > 0) { // long min_ts = std::min(send_wave_ts, available_ts); zlog_warn(zct, "[Nxt] [%d:%x] next wave send utc time1:%s", id, short_addr, GetUTCTime(send_wave_ts).c_str()); next_task_id = kScheduleWaveForm; return send_wave_ts; // return min_ts; } // if (send_wave_ts + available_ts > 0) { // long max_ts = std::max(send_wave_ts, available_ts); // zlog_warn(zct, "[Nxt] [%d:%x] next feature send utc time2:%s", id, short_addr, GetUTCTime(max_ts).c_str()); // return max_ts; // } } // 如果是在当前波形时间窗中,不管是空闲时间窗,还是发送波形的时间窗,下一个时间窗是特征值 long available_ts = current_wave_start_ts_ + (nth_eigen_value_slice_ + 1)* eigen_value_send_interval_ + (id - 1) * eigen_value_send_duration_; zlog_warn(zct, "[Nxt] [%d:%x] next feature send utc time3:[%s]", id, short_addr, GetUTCTime(available_ts).c_str()); next_task_id = kScheduleEigenValue; return available_ts; } int SensorScheduler::GetNextDuration(int short_addr, int &next_task_id) { int id = 0; auto iter = short_addr_map_.find(short_addr); if (iter == short_addr_map_.end()) { zlog_error(zct, "cannot find id for short_addr %x", short_addr); return 0; } else { id = iter->second; } current_ts_ = GetLocalTs(); // nth_wave_start_slice_ = (current_ts_ - start_timestamp_) / wave_form_send_interval_; // current_wave_start_ts_ = nth_wave_start_slice_ * wave_form_send_interval_ + start_timestamp_; // seconds_in_current_wave_slice_ = current_ts_ - current_wave_start_ts_; // nth_eigen_value_slice_ = seconds_in_current_wave_slice_ / eigen_value_send_interval_; long next_ts = CalcNextTimestamp(id, short_addr, next_task_id); int duration = next_ts - current_ts_; if (duration < 0 || duration > eigen_value_send_interval_) { zlog_warn(zct, "[Nxt] exception duration: %d", duration); duration = eigen_value_send_interval_; }else if(duration < 9){ zlog_warn(zct, "[Nxt] exception duration: %d", duration); duration = 10; } zlog_warn(zct, "[Nxt] [%d:%x] next duration: %d, taskid: %d", id, short_addr, duration, next_task_id); return duration; } // int SensorScheduler::GetNextDuration(int short_addr) { // int id = 0; // auto iter = short_addr_map_.find(short_addr); // if (iter == short_addr_map_.end()) { // zlog_error(zct, "cannot find id for short_addr %x", short_addr); // return 0; // } else { // id = iter->second; // } // long current_ts = GetLocalTs(); // long next_ts = CalcNextTimestamp(id, short_addr); // int duration = next_ts - current_ts; // if (duration < 0 || duration > eigen_value_send_interval_) { // zlog_warn(zct, "[Nxt] exception duration: %d", duration); // duration = eigen_value_send_interval_; // } // zlog_warn(zct, "[Nxt] [%d:%x] next duration is %d", id, short_addr, duration); // return duration; // } SensorScheduler::SensorScheduler() { support_modification_ = true; std::ifstream schedule_file(SCHEDULE_CONFIG); bool configed = false; if (schedule_file.good()) { zlog_info(zbt, "exist configuration file"); Json::Reader reader; Json::Value root; if (!reader.parse(schedule_file, root, false)) { zlog_error(zbt, "invalid format, fail to parse %s", SCHEDULE_CONFIG); schedule_file.close(); goto init_config; } schedule_file.close(); if (!root.isObject()) { zlog_error(zbt, "invalid format, not an object: %s", SCHEDULE_CONFIG); goto init_config; } configed = true; start_timestamp_ = std::stol(root["schedule_start_timestamp"].asString()); start_ts_str_ = root["schedule_start_time"].asString(); long current_ts = GetLocalTs(); if (current_ts < start_timestamp_) { zlog_warn(zbt, "current ts: %ld less than start ts: %ld, go to adjust it", current_ts, start_timestamp_); start_timestamp_ = current_ts; start_ts_str_ = GetUTCTime(current_ts); root["schedule_start_timestamp"] = std::to_string(start_timestamp_); root["schedule_start_time"] = start_ts_str_; Json::StyledStreamWriter streamWriter; std::ofstream out_file(SCHEDULE_CONFIG); streamWriter.write(out_file, root); out_file.close(); } eigen_value_send_interval_ = root["eigen_value_send_interval"].asInt(); eigen_value_send_duration_ = root["eigen_value_send_duration"].asInt(); wave_form_send_interval_ = root["wave_form_send_interval"].asInt(); wave_form_send_duration_ = root["wave_form_send_duration"].asInt(); max_sensor_num_ = root["max_sensor_num"].asInt(); available_slice_ = root["available_slice"].asInt(); free_slice_ = root["free_slice"].asInt(); support_modification_ = root["support_modification"].asBool(); const Json::Value ids = root["id"]; int sensor_id = 0; int i = 0; for (const auto& id : ids) { sensor_id = id.asInt(); slice_sensor_id_.push_back(sensor_id); if (sensor_id != 0) { sensor_id_nth_slice_[sensor_id] = i+1; } ++i; } eigen_value_slice_total_seconds_ = eigen_value_send_duration_ * max_sensor_num_; int rest_duration = eigen_value_send_interval_ - eigen_value_slice_total_seconds_; wave_slice_num_per_eigen_interval_ = rest_duration / wave_form_send_duration_; seconds_per_wave_slice_ = rest_duration / wave_slice_num_per_eigen_interval_; } init_config: if (!configed) { UseDefaultConfig(); } short_addr_map_.clear(); ShortAddrCfg::ReadCfg(short_addr_map_); // read upgrade config file: UPGRADE_CONFIG UpgradeCfg::ReadCfg(upgrade_); // read config update file: CONFIG_UPDATE UpdateCfg::ReadCfg(update_); } int SensorScheduler::WaveError(uint16_t short_addr) { auto iter = failure_map_.find(short_addr); if (iter == failure_map_.end()) { failure_map_[short_addr] = 3; // 重试次数 zlog_warn(zct, "[WaveError][%x] will try 3 times", short_addr); return 0; } if (iter->second == 0) { zlog_warn(zct, "[WaveError][%x] no try times", short_addr); failure_map_.erase(short_addr); return -1; } iter->second = iter->second - 1; zlog_warn(zct, "[WaveError][%x] remain try %d times", short_addr, iter->second); return 0; } bool SensorScheduler::RetransferWave(uint16_t short_addr) { auto iter = failure_map_.find(short_addr); if (iter != failure_map_.end()) { return true; } return false; } bool SensorScheduler::MissedWave(uint16_t short_addr) { if (patch_set_.count(short_addr) > 0) { return true; } return false; } void SensorScheduler::WaveSuccess(uint16_t short_addr) { zlog_warn(zct, "[WaveSuccess][%x]", short_addr); success_set_.insert(short_addr); auto iter = failure_map_.find(short_addr); if (iter != failure_map_.end()) { zlog_warn(zct, "[WaveSuccess][%x] try %d times success", short_addr, 4 - iter->second); failure_map_.erase(short_addr); return; } return; } void SensorScheduler::ClearFailureSuccessMap() { failure_map_.clear(); success_set_.clear(); patch_set_.clear(); } long SensorScheduler::GetBaseTimestamp(int short_addr) { int id = 0; auto iter = short_addr_map_.find(short_addr); if (iter == short_addr_map_.end()) { zlog_error(zct, "cannot find id for short_addr %x", short_addr); return 0; } else { id = iter->second; } return start_timestamp_ + (id - 1) * eigen_value_send_duration_; } int SensorScheduler::GetAvailableId(int short_addr) { int max_support_sensor[128] = {0}; for (auto it = short_addr_map_.begin(); it != short_addr_map_.end(); ++it) { max_support_sensor[it->second] = 1; } int available_id = 0; for (int i = 1; i < 128; ++i) { if (max_support_sensor[i] == 0) { available_id = i; break; } } zlog_warn(zct, "[GetAvailableId][%d] short addr : %x", available_id, short_addr); short_addr_map_[short_addr] = available_id; ShortAddrCfg::WriteCfg(short_addr_map_); return available_id; } int SensorScheduler::WriteScheduleCfg(long &ts, std::string &world_time) { Json::Value root; root["schedule_start_timestamp"] = std::to_string(ts); root["schedule_start_time"] = world_time; root["eigen_value_send_interval"] = eigen_value_send_interval_; root["eigen_value_send_duration"] = eigen_value_send_duration_; root["wave_form_send_interval"] = wave_form_send_interval_; root["wave_form_send_duration"] = wave_form_send_duration_; root["max_sensor_num"] = max_sensor_num_; root["available_slice"] = available_slice_; root["free_slice"] = free_slice_; root["support_modification"] = true; int *slice_allocation = new int[available_slice_]; int slice_index = 0; // 有时间片的索引 int no_slice_index = 0; // 没有时间片的索引 int k = 1; slice_sensor_id_.clear(); for (int i = 0; i < available_slice_; ++i) { if (slice_index < max_sensor_num_ && (i % 2 == 0 || no_slice_index >= free_slice_)) { slice_allocation[i] = k; sensor_id_nth_slice_[k] = i + 1; slice_sensor_id_.push_back(k); k = k + 1; slice_index++; } else if (no_slice_index < free_slice_) { slice_sensor_id_.push_back(0); slice_allocation[i] = 0; no_slice_index++; } } Json::Value ids; for (int i = 0; i < available_slice_; ++i) { ids.append(slice_allocation[i]); } delete []slice_allocation; root["id"] = ids; Json::StyledStreamWriter streamWriter; std::ofstream out_file(SCHEDULE_CONFIG); streamWriter.write(out_file, root); out_file.close(); return 0; } int SensorScheduler::Config(int eigen_value_send_interval, int wave_form_send_interval, int eigen_value_send_duration, int wave_form_send_duration, int max_sensor_num, std::string &error_msg) { if (!support_modification_) { zlog_warn(zct, "not support modification"); return 1; } int available_slice = 0; int free_slice = 0; int ret = CalcAvailableSlice(eigen_value_send_interval, wave_form_send_interval, eigen_value_send_duration, wave_form_send_duration, max_sensor_num, available_slice, free_slice, error_msg); if (ret != 0) { return ret; } eigen_value_send_interval_ = eigen_value_send_interval; eigen_value_send_duration_ = eigen_value_send_duration; wave_form_send_interval_ = wave_form_send_interval; wave_form_send_duration_ = wave_form_send_duration; max_sensor_num_ = max_sensor_num; available_slice_ = available_slice; free_slice_ = free_slice; support_modification_ = true; eigen_value_slice_total_seconds_ = eigen_value_send_duration_ * max_sensor_num_; int rest_duration = eigen_value_send_interval_ - eigen_value_slice_total_seconds_; wave_slice_num_per_eigen_interval_ = rest_duration / wave_form_send_duration_; seconds_per_wave_slice_ = rest_duration / wave_slice_num_per_eigen_interval_; std::string world_time; long current_ts = GetLocalWorldTime(world_time); ret = WriteScheduleCfg(current_ts, world_time); if (ret != 0) { return ret; } start_timestamp_ = current_ts; start_ts_str_ = world_time; return 0; } // 特征值发送间隔300秒,波形发送间隔为7200秒 // 一次特征值发送时长为6秒,波形发送时长为60秒 int SensorScheduler::CalcAvailableSlice(int eigen_value_send_interval, int wave_form_send_interval, int eigen_value_send_duration, int wave_form_send_duration, int max_sensor_num, int &available_slice, int &free_slice, std::string &error_msg) { if (max_sensor_num <= 0) { error_msg = "max_sensor_num:" + std::to_string(max_sensor_num) + " must bigger than 0"; zlog_error(zbt, "%s", error_msg.c_str()); return 1; } if (max_sensor_num * eigen_value_send_duration > eigen_value_send_interval) { error_msg = "invalid max_sensor_num:" + std::to_string(max_sensor_num) + " * eigen_value_send_duration:" + std::to_string(eigen_value_send_duration) + " > eigen_value_send_interval" + std::to_string(eigen_value_send_interval); zlog_error(zbt, "%s", error_msg.c_str()); return 2; } if (max_sensor_num * wave_form_send_duration > wave_form_send_interval) { error_msg = "invalid wave_form_send_duration:" + std::to_string(wave_form_send_duration) + " * max_sensor_num:" + std::to_string(max_sensor_num) + " > wave_form_send_interval:" + std::to_string(wave_form_send_interval); zlog_error(zbt, "%s", error_msg.c_str()); return 3; } if (wave_form_send_interval % eigen_value_send_interval != 0) { error_msg = "wave_form_send_interval:" + std::to_string(wave_form_send_interval) + " %% eigen_value_send_interval:" + std::to_string(eigen_value_send_interval) + " != 0"; zlog_error(zbt, "%s", error_msg.c_str()); return 4; } int total_eigen_value_send_duration = eigen_value_send_duration * max_sensor_num; int rest_duration = eigen_value_send_interval - total_eigen_value_send_duration; int slice_per_eigen_value_interval = rest_duration / wave_form_send_duration; available_slice = wave_form_send_interval / eigen_value_send_interval * slice_per_eigen_value_interval; free_slice = available_slice - max_sensor_num; if (free_slice < 0) { error_msg = "invalid config, available slice:" + std::to_string(available_slice) + ", required slice:" + std::to_string(max_sensor_num); zlog_error(zbt, "%s", error_msg.c_str()); return 5; } return 0; } int SensorScheduler::GetScheduleConfig(int &eigen_value_send_interval, int &wave_form_send_interval, int &eigen_value_send_duration, int &wave_form_send_duration, int &max_sensor_num) { eigen_value_send_interval = eigen_value_send_interval_; wave_form_send_interval = wave_form_send_interval_; eigen_value_send_duration = eigen_value_send_duration_; wave_form_send_duration = wave_form_send_duration_; max_sensor_num = max_sensor_num_; return 0; } int SensorScheduler::UpdateSensorConfig(int short_addr) { int id = 0; auto iter = short_addr_map_.find(short_addr); if (iter == short_addr_map_.end()) { zlog_info(zct, "cannot find id for short_addr %d", short_addr); return 1; } else { id = iter->second; } if (update_.count(id) > 0) { return 0; } update_.insert(id); UpdateCfg::WriteCfg(update_); return 0; } int SensorScheduler::UpdateConfigResult(int short_addr, int result) { int id = 0; auto iter = short_addr_map_.find(short_addr); if (iter == short_addr_map_.end()) { zlog_info(zct, "cannot find id for short_addr %x", short_addr); return 1; } else { id = iter->second; } if (result != 0) { return 0; } zlog_info(zbt, "[%d] short addr:%d update successfully", id, short_addr); update_.erase(id); UpdateCfg::WriteCfg(update_); return 0; } int SensorScheduler::UpgradeSensor(int short_addr, std::string sensor_type, int hw_version, std::string current_sw_version, std::string upgrade_sw_version) { int id = 0; auto iter = short_addr_map_.find(short_addr); if (iter == short_addr_map_.end()) { zlog_error(zct, "cannot find id for short_addr %x", short_addr); return 1; } else { id = iter->second; } UpgradeInfo info; info.try_times = 0; info.sensor_type = sensor_type; info.hw_version = hw_version; info.current_sw_version = current_sw_version; info.upgrade_sw_version = upgrade_sw_version; long ts = GetLocalTs(); info.submit_time = GetUTCTime(ts); upgrade_[id] = info; zlog_info(zbt, "[%d] short addr:%x add upgrade info", id, short_addr); UpgradeCfg::WriteCfg(upgrade_); return 0; } int SensorScheduler::UpgradeResult(int short_addr, int result) { int id = 0; auto iter = short_addr_map_.find(short_addr); if (iter == short_addr_map_.end()) { zlog_info(zct, "cannot find id for short_addr %x", short_addr); return 1; } else { id = iter->second; } if (result == kUpgradeSuccess || result == kProductTypeMismatch || result == kZigbeeHWMismatch || result == kUpgradeDoneBefore) { upgrade_.erase(id); zlog_info(zbt, "[%d] short addr:%x upgrade successfully", id, short_addr); UpgradeCfg::WriteCfg(upgrade_); } else { auto upgrade_iter = upgrade_.find(id); if (upgrade_iter->second.try_times >= 10) { zlog_error(zct, "[%d] short addr:%x upgrade 10 time failure", id, short_addr); upgrade_.erase(id); UpgradeCfg::WriteCfg(upgrade_); // TODO: call interface to write into database } } return 0; } long SensorScheduler::GetLocalTs() { auto now = std::chrono::system_clock::now(); auto timestamp = std::chrono::duration_cast(now.time_since_epoch()).count(); // zlog_debug(zct, "current timestamp:%lld", timestamp); return timestamp; } long SensorScheduler::GetLocalWorldTime(std::string &world_time) { auto now = std::chrono::system_clock::now(); std::time_t now_c = std::chrono::system_clock::to_time_t(now); std::tm *local_time = std::localtime(&now_c); char str[100] = {0}; snprintf(str, sizeof(str), "%04d-%02d-%02d %02d:%02d:%02d", local_time->tm_year + 1900, local_time->tm_mon+1, local_time->tm_mday, local_time->tm_hour, local_time->tm_min, local_time->tm_sec); world_time = str; auto timestamp = std::chrono::duration_cast(now.time_since_epoch()).count(); // zlog_debug(zct, "world time:%s, timestamp:%lld", world_time.c_str(), timestamp); return timestamp; } std::string SensorScheduler::GetUTCTime(long ts) { std::chrono::time_point timePoint = std::chrono::system_clock::from_time_t(ts); std::time_t utcTime = std::chrono::system_clock::to_time_t(timePoint); std::tm* local_time = std::gmtime(&utcTime); local_time->tm_hour = local_time->tm_hour + 8; if (local_time->tm_hour > 24) { local_time->tm_hour -= 24; local_time->tm_mday += 1; } char str[100] = {0}; snprintf(str, sizeof(str), "%04d-%02d-%02d %02d:%02d:%02d", local_time->tm_year + 1900, local_time->tm_mon+1, local_time->tm_mday, local_time->tm_hour, local_time->tm_min, local_time->tm_sec); std::string world_time = str; return world_time; } void SensorScheduler::UpdateUpgradeInfo(int id) { auto upgrade_iter = upgrade_.find(id); upgrade_iter->second.try_times++; long ts = GetLocalTs(); upgrade_iter->second.try_world_time1.push_back(GetUTCTime(ts)); UpgradeCfg::WriteCfg(upgrade_); } void SensorScheduler::ModifyScheduleTs(int diff_ts) { zlog_warn(zbt, "[ModifyScheduleTs] adjust ts:%d, from:%ld to:%ld", diff_ts, start_timestamp_, start_timestamp_ + diff_ts); start_timestamp_ += diff_ts; start_ts_str_ = GetUTCTime(start_timestamp_); std::ifstream schedule_file(SCHEDULE_CONFIG); Json::Reader reader; Json::Value root; if (!reader.parse(schedule_file, root, false)) { zlog_error(zbt, "[ModifyScheduleTs] invalid format, fail to parse %s", SCHEDULE_CONFIG); schedule_file.close(); return; } schedule_file.close(); root["schedule_start_timestamp"] = std::to_string(start_timestamp_); root["schedule_start_time"] = start_ts_str_; Json::StyledStreamWriter streamWriter; std::ofstream out_file(SCHEDULE_CONFIG); streamWriter.write(out_file, root); out_file.close(); } void SensorScheduler::ClearScheduleCfg(int short_addr) { zlog_warn(zbt, "[ClearScheduleCfg] clear all schedule config, short_addr:%x", short_addr); if (short_addr == 0) { zlog_warn(zbt, "[ClearScheduleCfg] clear all"); update_.clear(); upgrade_.clear(); short_addr_map_.clear(); ShortAddrCfg::ClearCfg(); UpdateCfg::ClearCfg(); UpgradeCfg::ClearCfg(); wave_feature_set_inst::instance().RemoveAllFeatureCfg(); wave_feature_set_inst::instance().RemoveAllWaveCfg(); } else { UpdateConfigResult(short_addr, 0); UpgradeResult(short_addr, kUpgradeSuccess); short_addr_map_.erase(short_addr); ShortAddrCfg::WriteCfg(short_addr_map_); wave_feature_set_inst::instance().RemoveFeatureCfg(short_addr); wave_feature_set_inst::instance().RemoveWaveCfg(short_addr); } } void SensorScheduler::CleanIdleOccupiedSet(long ts) { if (free_slice_ocuppied_.size() > 5) { for (auto it = free_slice_ocuppied_.begin(); it != free_slice_ocuppied_.end();) { if ((*it) < ts) { it = free_slice_ocuppied_.erase(it); } else ++it; } } } void SensorScheduler::UseDefaultConfig() { zlog_info(zbt, "use default configuration"); int eigen_value_send_interval = 300; int wave_form_send_interval = 7200; int eigen_value_send_duration = 6; int wave_form_send_duration = 50; int max_sensor_num = 32; // int eigen_value_send_interval = 120; // int wave_form_send_interval = 240; // int eigen_value_send_duration = 6; // int wave_form_send_duration = 40; // int max_sensor_num = 4; std::string error_msg; Config(eigen_value_send_interval, wave_form_send_interval, eigen_value_send_duration, wave_form_send_duration, max_sensor_num, error_msg); }