WLG/scheduler/schedule.cpp
2026-02-13 15:52:50 +08:00

1568 lines
69 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#include "schedule.hpp"
#include <iostream>
#include <string>
#include <fstream>
#include <sstream>
#include <chrono>
#include <ctime>
#include <set>
#include <iomanip>
#include <json/json.h>
#include <zlog.h>
#include "short_addr_cfg.hpp"
#include "update_cfg.hpp"
#include "wave_feature_set.hpp"
extern zlog_category_t *zct;
extern zlog_category_t *zbt;
uint8_t g_x, g_y, g_z;
int SensorScheduler::StartSchedule(uint16_t short_addr, int &next_duration, bool &z, int &next_task_id) {
int id = 0;
auto iter = short_addr_map_.find(short_addr);
if (iter == short_addr_map_.end()) {
id = GetAvailableId(short_addr);
} else {
id = iter->second;
}
current_ts_ = GetLocalTs();
CleanIdleOccupiedSet(current_ts_);
nth_wave_start_slice_ = (current_ts_ - start_timestamp_) / wave_form_send_interval_;
current_wave_start_ts_ = nth_wave_start_slice_ * wave_form_send_interval_ + start_timestamp_;
seconds_in_current_wave_slice_ = current_ts_ - current_wave_start_ts_;
nth_eigen_value_slice_ = seconds_in_current_wave_slice_ / eigen_value_send_interval_;
seconds_in_current_eigen_slice_ = seconds_in_current_wave_slice_ % eigen_value_send_interval_;
ts_in_eigen_slice_ = false;
if (seconds_in_current_eigen_slice_ < 60 - 3) {
ts_in_eigen_slice_ = true;
}
if (ts_in_eigen_slice_) {
nth_eigen_slice_ = (seconds_in_current_eigen_slice_ + 2) / eigen_value_send_duration_;
if (nth_eigen_value_slice_ == 0) {
ClearFailureSuccessMap();
}
} else {
nth_wave_slice_ = (seconds_in_current_eigen_slice_ - 60 + 3) / seconds_per_wave_slice_;
}
zlog_debug(zbt, "[%d:%x] ts:%ld, current utc:%s, nth eigen_value slice:%d, seconds in eigen slice:%d, eigen slice:%d",
id, short_addr, current_ts_, GetUTCTime(current_ts_).c_str(), nth_eigen_value_slice_+1, seconds_in_current_eigen_slice_, ts_in_eigen_slice_);
if (ts_in_eigen_slice_) {
// 传感器需要执行上送特征值任务, 如果有配置需要下发的话,下发配置
if (update_.count(id)) {
// execute config
zlog_debug(zbt, "[%d:%x] update config in eigen slice", id, short_addr);
current_request_ = kScheduleConfigSensor;
return kScheduleConfigSensor;
} else {
wave_feature_set_inst::instance().GetFeatureCfg(short_addr, g_x, g_y, g_z);
// if (g_x || g_y || g_z) {
// // 执行上送特征值任务
// zlog_debug(zbt, "[%d:%x] send eigen value in eigen slice", id, short_addr);
// current_request_ = kScheduleEigenValue;
// return kScheduleEigenValue;
// } else {
next_duration = GetNextDuration(short_addr, z, next_task_id);
zlog_warn(zbt, "[%d:%x] no need for eigen", id, short_addr);
return kScheduleResultNone;
// }
}
} else {
if (current_schedule_status_ == kScheduleStatusDebug) {
if (debug_list_.count(short_addr) == 0) {
next_duration = GetDebugUpgradeNextDuration(short_addr);
next_task_id = kScheduleEigenValue;
zlog_debug(zbt, "[%d:%x] not in debug list", id, short_addr);
return kScheduleWrongTime;
} else {
// z wave
int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_ + 1;
if (debug_slice_sensor_id_[nth_wave_slice] == short_addr) {
current_request_ = kScheduleWaveForm;
z = true;
return kScheduleWaveForm;
} else {
// 当前特征值间隔内是否存在此传感器的波形区间
for (int i = nth_wave_slice + 1; i <= (nth_eigen_value_slice_+1) * wave_slice_num_per_eigen_interval_; ++i) {
if (debug_slice_sensor_id_[i] == short_addr) {
long nxt_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + 60 + (i-nth_wave_slice)*60;
next_duration = nxt_ts - current_ts_;
if (next_duration < 10) {
zlog_debug(zbt, "[%d:%x] [Nxt] debug exception duration:%d, adjust to 25", id, short_addr,next_duration);
next_duration = 25;
} else if (next_duration > eigen_value_send_interval_) {
zlog_debug(zbt, "[%d:%x] [Nxt] debug exception duration:%d, adjust to 120", id, short_addr,next_duration);
next_duration = 120;
}
z = true;
next_task_id = kScheduleWaveForm;
return kScheduleWrongTime;
}
}
next_duration = GetDebugUpgradeNextDuration(short_addr);
next_task_id = kScheduleEigenValue;
zlog_debug(zbt, "[%d:%x] debug wrong time", id, short_addr);
return kScheduleWrongTime;
}
}
return 0;
} else if (current_schedule_status_ == kScheduleStatusUpgrade) {
if (upgrade_list_.count(short_addr) == 0) {
next_duration = GetDebugUpgradeNextDuration(short_addr);
next_task_id = kScheduleEigenValue;
zlog_debug(zbt, "[%d:%x] not in upgrade list", id, short_addr);
return kScheduleWrongTime;
} else {
int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_ + 1;
if (upgrade_slice_sensor_id_[nth_wave_slice] == short_addr) {
current_request_ = kScheduleUpgrade;
// upgrade_list_.erase(short_addr);
return kScheduleUpgrade;
} else {
next_duration = GetDebugUpgradeNextDuration(short_addr);
next_task_id = kScheduleEigenValue;
zlog_debug(zbt, "[%d:%x] in wrong time", id, short_addr);
return kScheduleWrongTime;
}
}
}
int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_ + 1; // 从1开始编号
auto wave_slice_iter = sensor_id_nth_slice_.find(id);
if (wave_slice_iter == sensor_id_nth_slice_.end()) {
zlog_error(zbt, "[%d:%x] invaild id, not find wave slice id, need to check further", id, short_addr);
return kScheduleUnknownSensor;
}
wave_feature_set_inst::instance().GetWaveCfg(short_addr, g_x, g_y, g_z);
if (nth_wave_slice == wave_slice_iter->second.first) { // Z轴
if (g_z) {
zlog_debug(zbt, "[%d:%x] it is wave z time", id, short_addr);
current_request_ = kScheduleWaveForm;
z = true;
return kScheduleWaveForm;
} else {
next_duration = GetNextDuration(short_addr, z, next_task_id);
zlog_debug(zbt, "[%d:%x] no need for wave", id, short_addr);
return kScheduleWrongTime;
}
} else if (nth_wave_slice == wave_slice_iter->second.second) { // XY轴
if (g_x || g_y) {
zlog_debug(zbt, "[%d:%x] it is wave xy time", id, short_addr);
current_request_ = kScheduleWaveForm;
z = false;
return kScheduleWaveForm;
} else {
next_duration = GetNextDuration(short_addr, z, next_task_id);
zlog_debug(zbt, "[%d:%x] no need for wave", id, short_addr);
return kScheduleWrongTime;
}
}
else {
if (slice_sensor_id_[nth_wave_slice] == 0) { // idle time
zlog_debug(zbt, "[%d:%x] in idle time", id, short_addr);
if (trigger_wave_record_.find(short_addr) != trigger_wave_record_.end()) {
auto iter = trigger_wave_record_.find(short_addr);
if (iter->second.first != 0) {
current_request_ = kScheduleWaveForm;
z = true;
zlog_debug(zbt, "[%d:%x] trigger z wave time", id, short_addr);
iter->second.first = 0;
WriteTriggerWaveRecord();
return kScheduleWaveForm;
} else if (iter->second.second != 0) {
current_request_ = kScheduleWaveForm;
z = false;
iter->second.second = 0;
WriteTriggerWaveRecord();
zlog_debug(zbt, "[%d:%x] trigger xy wave time", id, short_addr);
return kScheduleWaveForm;
}
}
if (ZRetransferWave(short_addr)) {
zlog_debug(zbt, "[%d:%x] z retransfer wave time", id, short_addr);
current_request_ = kScheduleWaveForm;
z = true;
return kScheduleWaveForm;
} else if (ZMissedWave(short_addr)) {
zlog_debug(zbt, "[%d:%x] z patch wave time", id, short_addr);
current_request_ = kScheduleWaveForm;
z = true;
z_patch_set_.erase(short_addr);
return kScheduleWaveForm;
} else if (XYRetransferWave(short_addr)) {
zlog_debug(zbt, "[%d:%x] xy retransfer wave time", id, short_addr);
current_request_ = kScheduleWaveForm;
z = false;
return kScheduleWaveForm;
} else if (XYMissedWave(short_addr)) {
zlog_debug(zbt, "[%d:%x] xy patch wave time", id, short_addr);
current_request_ = kScheduleWaveForm;
xy_patch_set_.erase(short_addr);
z = false;
return kScheduleWaveForm;
}
}
// wrong time to come
int eigen_send_ts = (id - 1) * 2;
if (eigen_send_ts > 57) {
eigen_send_ts = eigen_send_ts % 57;
}
long available_ts = current_wave_start_ts_ + (nth_eigen_value_slice_ + 1) * eigen_value_send_interval_ + eigen_send_ts;
next_duration = available_ts - current_ts_;
if (next_duration < 10 || next_duration > eigen_value_send_interval_) {
zlog_debug(zbt, "[%d:%x] invalid next duration:%d, adjust to 120", id, short_addr, next_duration);
next_duration = 120;
}
next_task_id = kScheduleEigenValue;
zlog_debug(zbt, "[%d:%x] wrong time in wave slice, next feature send utc time:[%s], duration:%d", id, short_addr, GetUTCTime(available_ts).c_str(), next_duration);
return kScheduleWrongTime;
}
}
}
long SensorScheduler::CalcNextTimestamp(int id, uint16_t short_addr, bool &z, int& next_task_id) {
z = false;
if (ts_in_eigen_slice_) {
if (current_schedule_status_ == kScheduleStatusDebug) {
if (debug_list_.count(short_addr) == 0) {
next_task_id = kScheduleEigenValue;
return GetDebugUpgradeNextTS(short_addr);
} else {
// 计算发送波形是否在后面的波形时间窗口中
int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + 1;
for (int i = nth_wave_slice; i <= nth_wave_slice + wave_slice_num_per_eigen_interval_; ++i) {
if (debug_slice_sensor_id_[i] == short_addr) {
long nxt_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + 60 + (i-nth_wave_slice)*60;
next_task_id = kScheduleWaveForm;
z = true;
return nxt_ts;
}
}
next_task_id = kScheduleEigenValue;
return GetDebugUpgradeNextTS(short_addr);
}
} else if (current_schedule_status_ == kScheduleStatusUpgrade) {
if (upgrade_list_.count(short_addr) == 0) {
next_task_id = kScheduleEigenValue;
return GetDebugUpgradeNextTS(short_addr);
} else {
// 计算升级是否在后面的波形时间窗口中
int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + 1;
for (int i = nth_wave_slice; i <= nth_wave_slice + wave_slice_num_per_eigen_interval_; ++i) {
if (upgrade_slice_sensor_id_[i] == short_addr) {
long nxt_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + 60 + (i-nth_wave_slice)*60;
next_task_id = kScheduleUpgrade;
return nxt_ts;
}
}
next_task_id = kScheduleEigenValue;
return GetDebugUpgradeNextTS(short_addr);
}
}
int forward_wave_slice_num = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_;
auto wave_slice_iter = sensor_id_nth_slice_.find(id);
if (wave_slice_iter == sensor_id_nth_slice_.end()) {
zlog_warn(zbt, "[Nxt] invaild id:%d, not find wave slice id", id);
long available_ts = current_wave_start_ts_ + eigen_value_send_interval_ + nth_eigen_slice_ * eigen_value_send_duration_;
zlog_warn(zbt, "[Nxt] [%d] next feature send utc time:[%s]", id, GetUTCTime(available_ts).c_str());
return available_ts;
}
int first_wave_slice = wave_slice_iter->second.first;
int second_wave_slice = wave_slice_iter->second.second;
long send_wave_ts = 0;
long available_ts = 0;
next_task_id = kScheduleWaveForm;
wave_feature_set_inst::instance().GetWaveCfg(short_addr, g_x, g_y, g_z);
if (g_x || g_y || g_z) {
if (g_z && first_wave_slice > forward_wave_slice_num &&
first_wave_slice <= forward_wave_slice_num + wave_slice_num_per_eigen_interval_) {
// 发送波的时间窗也在本次特征值发送间隔中
for (int i = forward_wave_slice_num+1; i <= forward_wave_slice_num + wave_slice_num_per_eigen_interval_; ++i) {
if (first_wave_slice == i) {
send_wave_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + 60 + (i - forward_wave_slice_num - 1) * 60;
next_task_id = kScheduleWaveForm;
zlog_debug(zbt, "[Nxt] [%d:%x] send z wave time:[%s], task id:%d", id, short_addr, GetUTCTime(send_wave_ts).c_str(), next_task_id);
z = true;
break;
}
}
} else if ((g_x || g_y) && second_wave_slice > forward_wave_slice_num &&
second_wave_slice <= forward_wave_slice_num + wave_slice_num_per_eigen_interval_) {
// 发送波的时间窗也在本次特征值发送间隔中
for (int i = forward_wave_slice_num+1; i <= forward_wave_slice_num + wave_slice_num_per_eigen_interval_; ++i) {
if (second_wave_slice == i) {
send_wave_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + 60 + (i - forward_wave_slice_num - 1) * 60;
next_task_id = kScheduleWaveForm;
zlog_debug(zbt, "[Nxt] [%d:%x] send xy wave time:[%s], task id:%d", id, short_addr, GetUTCTime(send_wave_ts).c_str(), next_task_id);
z = false;
break;
}
}
}
if ((g_z || g_x || g_y) && send_wave_ts == 0) {
if (trigger_wave_record_.find(short_addr) != trigger_wave_record_.end()) {
auto iter = trigger_wave_record_.find(short_addr);
if (iter->second.first != 0 || iter->second.second != 0) {
for (int i = forward_wave_slice_num+1; i <= forward_wave_slice_num + wave_slice_num_per_eigen_interval_; ++i) {
if (slice_sensor_id_[i] == 0) {
send_wave_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + 60 + (i - forward_wave_slice_num - 1) * 60;
if (free_slice_ocuppied_.count(send_wave_ts) == 0) {
available_ts = send_wave_ts;
free_slice_ocuppied_.insert(available_ts);
next_task_id = kScheduleWaveForm;
if (iter->second.first != 0) {
z = true;
} else {
z = false;
}
zlog_debug(zbt, "[Nxt][%d:%x] %d nth free wave slice will be used to trigger z : %d wave, utc time:[%s]", id, short_addr, i+forward_wave_slice_num, z, GetUTCTime(available_ts).c_str());
break;
} else {
send_wave_ts = 0;
}
break;
}
}
}
}
}
if (send_wave_ts == 0) {
if (g_z && !ZMissedWave(short_addr) && z_success_set_.count(short_addr) == 0) {
// add for patch wave
int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_ + 1;
auto wave_slice_iter = sensor_id_nth_slice_.find(id);
if (wave_slice_iter != sensor_id_nth_slice_.end()) {
if (nth_wave_slice > wave_slice_iter->second.first) {
if (z_success_set_.count(short_addr) == 0 && !ZRetransferWave(short_addr)) {
zlog_debug(zbt, "[Nxt] [%d:%x] add z to patch set", id, short_addr);
z_patch_set_.insert(short_addr);
z = true;
}
}
}
} else if ((g_x || g_y) && !XYMissedWave(short_addr) && xy_success_set_.count(short_addr) == 0) {
// add for patch wave
int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_ + 1;
auto wave_slice_iter = sensor_id_nth_slice_.find(id);
if (wave_slice_iter != sensor_id_nth_slice_.end()) {
if (nth_wave_slice > wave_slice_iter->second.second) {
if (xy_success_set_.count(short_addr) == 0 && !XYRetransferWave(short_addr)) {
zlog_debug(zbt, "[Nxt] [%d:%x] add xy to patch set", id, short_addr);
xy_patch_set_.insert(short_addr);
z = false;
}
}
}
}
if (ZRetransferWave(short_addr) || XYRetransferWave(short_addr) || ZMissedWave(short_addr) || XYMissedWave(short_addr)) {
for (int i = 1; i <= wave_slice_num_per_eigen_interval_; ++i) {
if (slice_sensor_id_[i+forward_wave_slice_num] == 0) {
// 判断此空闲位置是否被占用
long current_wave_slice_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + 60 + (i-1) * seconds_per_wave_slice_;
if (free_slice_ocuppied_.count(current_wave_slice_ts) == 0) {
available_ts = current_wave_slice_ts;
free_slice_ocuppied_.insert(available_ts);
if (ZRetransferWave(short_addr) || ZMissedWave(short_addr)) {
z = true;
} else {
z = false;
}
zlog_debug(zbt, "[Nxt][%d:%x] %d nth free wave slice will be used to retransfer or patch wave, utc time:[%s]", id, short_addr, i+forward_wave_slice_num, GetUTCTime(available_ts).c_str());
break;
}
}
}
}
}
}
if (send_wave_ts > 0 && available_ts > 0) {
long min_ts = std::min(send_wave_ts, available_ts);
zlog_debug(zbt, "[Nxt] [%d:%x] next feature send utc time1:%s", id, short_addr, GetUTCTime(min_ts).c_str());
return min_ts;
}
if (send_wave_ts + available_ts > 0) {
long max_ts = std::max(send_wave_ts, available_ts);
zlog_debug(zbt, "[Nxt] [%d:%x] next feature send utc time2:%s", id, short_addr, GetUTCTime(max_ts).c_str());
return max_ts;
}
} else {
if (current_schedule_status_ == kScheduleStatusDebug) {
if (debug_list_.count(short_addr) == 0) {
next_task_id = kScheduleEigenValue;
return GetDebugUpgradeNextTS(short_addr);
} else {
// 计算发送波形是否在后面的波形时间窗口中
int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_ + 2;
for (int i = nth_wave_slice; i <= nth_wave_slice + wave_slice_num_per_eigen_interval_; ++i) {
if (debug_slice_sensor_id_[i] == short_addr) {
long nxt_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + 60 + (i-nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_-1)*60;
next_task_id = kScheduleWaveForm;
z = true;
return nxt_ts;
}
}
next_task_id = kScheduleEigenValue;
return GetDebugUpgradeNextTS(short_addr);
}
} else if (current_schedule_status_ == kScheduleStatusUpgrade) {
if (upgrade_list_.count(short_addr) == 0) {
next_task_id = kScheduleEigenValue;
return GetDebugUpgradeNextDuration(short_addr);
} else {
// 计算升级是否在后面的波形时间窗口中
int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + 2;
for (int i = nth_wave_slice; i <= nth_wave_slice + wave_slice_num_per_eigen_interval_; ++i) {
if (upgrade_slice_sensor_id_[i] == short_addr) {
long nxt_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + 60 + (i-nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_-1)*60;
next_task_id = kScheduleUpgrade;
return nxt_ts;
}
}
next_task_id = kScheduleEigenValue;
}
}
}
// 如果是在当前波形时间窗中,不管是空闲时间窗,还是发送波形的时间窗,下一个时间窗是特征值
int eigen_send_ts = (id - 1) * 2;
if (eigen_send_ts > 57) {
eigen_send_ts = eigen_send_ts % 57;
}
next_task_id = kScheduleEigenValue;
long available_ts = current_wave_start_ts_ + (nth_eigen_value_slice_ + 1)* eigen_value_send_interval_ + eigen_send_ts;
zlog_debug(zbt, "[Nxt] [%d:%x] next feature send utc time3:[%s], task id:%d", id, short_addr, GetUTCTime(available_ts).c_str(), next_task_id);
return available_ts;
}
int SensorScheduler::GetNextDuration(uint16_t short_addr, bool &z, int &next_task_id) {
int id = 0;
auto iter = short_addr_map_.find(short_addr);
if (iter == short_addr_map_.end()) {
zlog_error(zbt, "cannot find id for short_addr %x", short_addr);
next_task_id = kScheduleEigenValue;
return 0;
} else {
id = iter->second;
}
current_ts_ = GetLocalTs();
// CleanIdleOccupiedSet(current_ts_);
nth_wave_start_slice_ = (current_ts_ - start_timestamp_) / wave_form_send_interval_;
current_wave_start_ts_ = nth_wave_start_slice_ * wave_form_send_interval_ + start_timestamp_;
seconds_in_current_wave_slice_ = current_ts_ - current_wave_start_ts_;
nth_eigen_value_slice_ = seconds_in_current_wave_slice_ / eigen_value_send_interval_;
seconds_in_current_eigen_slice_ = seconds_in_current_wave_slice_ % eigen_value_send_interval_;
ts_in_eigen_slice_ = false;
if (seconds_in_current_eigen_slice_ < 60 - 3) {
ts_in_eigen_slice_ = true;
}
// long current_ts = GetLocalTs();
long next_ts = CalcNextTimestamp(id, short_addr, z, next_task_id);
int duration = next_ts - current_ts_;
if (duration < 10) {
zlog_debug(zbt, "[%d:%x] [Nxt] exception duration:%d, adjust to 25", id, short_addr,duration);
duration = 25;
return duration;
} else if (duration > eigen_value_send_interval_) {
zlog_debug(zbt, "[%d:%x] [Nxt] exception duration:%d, adjust to 120", id, short_addr,duration);
duration = 120;
return duration;
}
zlog_debug(zbt, "[Nxt] [%d:%x] next duration is %d", id, short_addr, duration);
return duration;
}
int SensorScheduler::TriggerWave(uint16_t short_addr, uint8_t z, uint8_t xy) {
zlog_debug(zbt, "[%x] trigger wave z:%d, xy:%d", short_addr, z, xy);
std::pair<uint8_t, uint8_t> p = {z, xy};
trigger_wave_record_[short_addr] = p;
return 0;
}
// 仅返回下一次特征值的时间用于Debug/Upgrade模式
int SensorScheduler::GetDebugUpgradeNextDuration(uint16_t short_addr) {
int id = 0;
auto iter = short_addr_map_.find(short_addr);
if (iter == short_addr_map_.end()) {
zlog_error(zbt, "cannot find id for short_addr %x", short_addr);
return 0;
} else {
id = iter->second;
}
long current_ts = GetLocalTs();
int eigen_send_ts = (id - 1) * 2;
if (eigen_send_ts > 57) {
eigen_send_ts = eigen_send_ts % 57;
}
long available_ts = current_wave_start_ts_ + (nth_eigen_value_slice_ + 1)* eigen_value_send_interval_ + eigen_send_ts;
zlog_debug(zbt, "[Nxt] [%d:%x] next feature send utc time4:[%s]", id, short_addr, GetUTCTime(available_ts).c_str());
int duration = available_ts - current_ts;
if (duration < 10) {
zlog_debug(zbt, "[%d:%x] [Nxt] exception duration:%d, adjust to 25", id, short_addr,duration);
duration = 25;
return duration;
} else if (duration > eigen_value_send_interval_) {
zlog_debug(zbt, "[%d:%x] [Nxt] exception duration:%d, adjust to 120", id, short_addr,duration);
duration = 120;
return duration;
}
zlog_debug(zbt, "[Nxt] [%d:%x] next duration is %d", id, short_addr, duration);
return duration;
}
long SensorScheduler::GetDebugUpgradeNextTS(uint16_t short_addr) {
int id = 0;
auto iter = short_addr_map_.find(short_addr);
if (iter == short_addr_map_.end()) {
zlog_warn(zbt, "cannot find id for short_addr %x", short_addr);
return 0;
} else {
id = iter->second;
}
// long current_ts = GetLocalTs();
int eigen_send_ts = (id - 1) * 2;
if (eigen_send_ts > 57) {
eigen_send_ts = eigen_send_ts % 57;
}
long available_ts = current_wave_start_ts_ + (nth_eigen_value_slice_ + 1)* eigen_value_send_interval_ + eigen_send_ts;
zlog_debug(zbt, "[Nxt] [%d:%x] next feature send utc time4:[%s]", id, short_addr, GetUTCTime(available_ts).c_str());
return available_ts;
}
SensorScheduler::SensorScheduler() {
ReadTriggerWaveRecord();
wave_resend_num_ = 3;
current_schedule_status_ = get_schedule_status();
slice_sensor_id_ = NULL;
zlog_debug(zbt, "current schedule status:%s", get_status_desc(current_schedule_status_).c_str());
std::ifstream schedule_file(SCHEDULE_CONFIG);
bool configed = false;
if (schedule_file.good()) {
zlog_info(zbt, "exist configuration file");
Json::Reader reader;
Json::Value root;
if (!reader.parse(schedule_file, root, false)) {
zlog_error(zbt, "invalid format, fail to parse %s", SCHEDULE_CONFIG);
schedule_file.close();
goto init_config;
}
schedule_file.close();
if (!root.isObject()) {
zlog_error(zbt, "invalid format, not an object: %s", SCHEDULE_CONFIG);
goto init_config;
}
configed = true;
start_timestamp_ = std::stol(root["schedule_start_timestamp"].asString());
start_ts_str_ = root["schedule_start_time"].asString();
long current_ts = GetLocalTs();
if (current_ts < start_timestamp_) {
zlog_warn(zbt, "current ts: %ld less than start ts: %ld, go to adjust it", current_ts, start_timestamp_);
start_timestamp_ = current_ts;
start_ts_str_ = GetUTCTime(current_ts);
root["schedule_start_timestamp"] = std::to_string(start_timestamp_);
root["schedule_start_time"] = start_ts_str_;
Json::StyledStreamWriter streamWriter;
std::ofstream out_file(SCHEDULE_CONFIG);
streamWriter.write(out_file, root);
out_file.close();
}
eigen_value_send_interval_ = root["eigen_value_send_interval"].asInt();
// eigen_value_send_duration_ = root["eigen_value_send_duration"].asInt();
wave_form_send_interval_ = root["wave_form_send_interval"].asInt();
wave_form_send_duration_ = 60; //root["wave_form_send_duration"].asInt();
max_sensor_num_ = root["max_sensor_num"].asInt();
available_slice_ = root["available_slice"].asInt();
free_slice_ = root["free_slice"].asInt();
slice_sensor_id_ = new int[available_slice_+1];
slice_is_z_wave_ = new uint8_t[available_slice_+1];
for (int i = 0; i <= available_slice_; ++i) {
slice_sensor_id_[i] = 0;
}
int xy_wave_start_id = 0;
if (max_sensor_num_ <= 4) {
xy_wave_start_id = 5; // 防止z,xy在时间上排得太近
} else {
xy_wave_start_id = max_sensor_num_ + 1;
}
for (int i = 1; i <= max_sensor_num_; ++i) {
sensor_id_nth_slice_[i] = { i, i + xy_wave_start_id };
slice_sensor_id_[i] = i;
slice_sensor_id_[i+xy_wave_start_id] = i;
slice_is_z_wave_[i] = true;
slice_is_z_wave_[i + xy_wave_start_id] = false;
}
eigen_value_slice_total_seconds_ = 60; // eigen_value_send_duration_ * max_sensor_num_;
int rest_duration = eigen_value_send_interval_ - eigen_value_slice_total_seconds_;
wave_slice_num_per_eigen_interval_ = rest_duration / wave_form_send_duration_;
seconds_per_wave_slice_ = rest_duration / wave_slice_num_per_eigen_interval_;
}
init_config:
if (!configed) {
UseDefaultConfig();
}
short_addr_map_.clear();
ShortAddrCfg::ReadCfg(short_addr_map_);
// read upgrade config file: UPGRADE_CONFIG
UpgradeCfg::ReadCfg(upgrade_);
// read config update file: CONFIG_UPDATE
UpdateCfg::ReadCfg(update_);
if (current_schedule_status_ == kScheduleStatusDebug) {
std::ifstream debug_schedule_file(DEBUG_SCHEDULE_CONFIG);
if (debug_schedule_file.good()) {
Json::Reader reader;
Json::Value root;
if (!reader.parse(debug_schedule_file, root, false)) {
zlog_error(zbt, "invalid format, fail to parse %s", DEBUG_SCHEDULE_CONFIG);
zlog_warn(zct, "due to invalid debug file, status change to normal");
SetScheduleStatus(kScheduleStatusNormal);
return;
}
debug_schedule_file.close();
if (!root.isArray()) {
zlog_error(zbt, "invalid format, not an object: %s", DEBUG_SCHEDULE_CONFIG);
zlog_warn(zct, "due to invalid debug file, status change to normal");
SetScheduleStatus(kScheduleStatusNormal);
return;
} else {
debug_slice_sensor_id_ = new uint16_t[available_slice_+1];
for (int i = 0; i <= available_slice_; ++i) {
debug_slice_sensor_id_[i] = 0;
}
int i = 1;
for (const auto& value : root) {
std::cout << value.asInt() << std::endl; // 转换并输出每个整数
zlog_debug(zbt, "[%d] debug sensor:%x", value.asInt());
debug_slice_sensor_id_[i] = value.asInt();
++i;
}
if (i == 1) {
zlog_warn(zbt, "due to empty debug file, status change to normal");
if (debug_slice_sensor_id_ != NULL) {
delete []debug_slice_sensor_id_;
debug_slice_sensor_id_ = NULL;
}
SetScheduleStatus(kScheduleStatusNormal);
return;
}
}
} else {
zlog_warn(zbt, "due to no debug file, status change to normal");
SetScheduleStatus(kScheduleStatusNormal);
return;
}
} else if (current_schedule_status_ == kScheduleStatusUpgrade) {
std::ifstream upgrade_schedule_file(UPGRADE_SCHEDULE_CONFIG);
if (upgrade_schedule_file.good()) {
Json::Reader reader;
Json::Value root;
if (!reader.parse(upgrade_schedule_file, root, false)) {
zlog_error(zbt, "invalid format, fail to parse %s", UPGRADE_SCHEDULE_CONFIG);
zlog_warn(zbt, "due to invalid upgrade file, status change to normal");
SetScheduleStatus(kScheduleStatusNormal);
return;
}
upgrade_schedule_file.close();
if (!root.isArray()) {
zlog_error(zbt, "invalid format, not an object: %s", UPGRADE_SCHEDULE_CONFIG);
zlog_warn(zbt, "due to invalid upgrade file, status change to normal");
SetScheduleStatus(kScheduleStatusNormal);
return;
} else {
upgrade_slice_sensor_id_ = new uint16_t[available_slice_+1];
for (int i = 0; i <= available_slice_; ++i) {
upgrade_slice_sensor_id_[i] = 0;
}
int i = 1;
for (const auto& value : root) {
std::cout << value.asInt() << std::endl; // 转换并输出每个整数
zlog_debug(zbt, "[%d] upgrade sensor:%x", value.asInt());
upgrade_slice_sensor_id_[i] = value.asInt();
if (upgrade_slice_sensor_id_[i] > 0) {
upgrade_list_.insert(upgrade_slice_sensor_id_[i]);
}
++i;
}
if (i == 1) {
zlog_warn(zbt, "due to empty upgrade file, status change to normal");
if (upgrade_slice_sensor_id_ != NULL) {
delete []upgrade_slice_sensor_id_;
upgrade_slice_sensor_id_ = NULL;
}
SetScheduleStatus(kScheduleStatusNormal);
return;
}
}
} else {
zlog_warn(zbt, "due to no upgrade file, status change to normal");
SetScheduleStatus(kScheduleStatusNormal);
return;
}
}
}
int SensorScheduler::WaveError(uint16_t short_addr, bool z) {
if (wave_resend_num_ <= 0) {
zlog_debug(zbt, "[WaveError][%x] no config to resend wave", short_addr);
return -1;
}
if (z) {
auto iter = z_failure_map_.find(short_addr);
if (iter == z_failure_map_.end()) {
z_failure_map_[short_addr] = wave_resend_num_; // 重试次数
zlog_debug(zbt, "[WaveError][%x] z will try %d times", short_addr, wave_resend_num_);
return 0;
}
if (iter->second == 0) {
zlog_warn(zbt, "[WaveError][%x] z no try times", short_addr);
z_failure_map_.erase(short_addr);
return -1;
}
iter->second = iter->second - 1;
zlog_debug(zbt, "[WaveError][%x] z remain try %d times", short_addr, iter->second);
} else {
auto iter = xy_failure_map_.find(short_addr);
if (iter == xy_failure_map_.end()) {
xy_failure_map_[short_addr] = wave_resend_num_; // 重试次数
zlog_debug(zbt, "[WaveError][%x] xy will try %d times", short_addr, wave_resend_num_);
return 0;
}
if (iter->second == 0) {
zlog_debug(zct, "[WaveError][%x] xy no try times", short_addr);
xy_failure_map_.erase(short_addr);
return -1;
}
iter->second = iter->second - 1;
zlog_debug(zbt, "[WaveError][%x] xy remain try %d times", short_addr, iter->second);
}
return 0;
}
bool SensorScheduler::ZRetransferWave(uint16_t short_addr) {
auto iter = z_failure_map_.find(short_addr);
if (iter != z_failure_map_.end()) {
return true;
}
return false;
}
bool SensorScheduler::XYRetransferWave(uint16_t short_addr) {
auto iter = xy_failure_map_.find(short_addr);
if (iter != xy_failure_map_.end()) {
return true;
}
return false;
}
bool SensorScheduler::ZMissedWave(uint16_t short_addr) {
if (z_patch_set_.count(short_addr) > 0) {
return true;
}
return false;
}
bool SensorScheduler::XYMissedWave(uint16_t short_addr) {
if (xy_patch_set_.count(short_addr) > 0) {
return true;
}
return false;
}
void SensorScheduler::WaveSuccess(uint16_t short_addr, bool z) {
zlog_debug(zbt, "[%x] wave z:%d success", short_addr, z);
if (z) {
z_success_set_.insert(short_addr);
auto iter = z_failure_map_.find(short_addr);
if (iter != z_failure_map_.end()) {
zlog_debug(zbt, "[WaveSuccess][%x] try %d times success", short_addr, 4 - iter->second);
z_failure_map_.erase(short_addr);
return;
}
} else {
xy_success_set_.insert(short_addr);
auto iter = xy_failure_map_.find(short_addr);
if (iter != xy_failure_map_.end()) {
zlog_debug(zbt, "[WaveSuccess][%x] try %d times success", short_addr, 4 - iter->second);
xy_failure_map_.erase(short_addr);
return;
}
}
return;
}
void SensorScheduler::ClearFailureSuccessMap() {
z_failure_map_.clear();
z_success_set_.clear();
z_patch_set_.clear();
xy_failure_map_.clear();
xy_success_set_.clear();
xy_patch_set_.clear();
}
int SensorScheduler::GetAvailableId(uint16_t short_addr) {
int max_support_sensor[96] = {0}; // 5分钟里面有4分钟用于传输波形2小时有96个
for (auto it = short_addr_map_.begin(); it != short_addr_map_.end(); ++it) {
max_support_sensor[it->second] = 1;
}
int available_id = 0;
for (int i = 1; i < 96; ++i) {
if (max_support_sensor[i] == 0) {
available_id = i;
break;
}
}
zlog_warn(zbt, "[GetAvailableId][%d] short addr : %x", available_id, short_addr);
short_addr_map_[short_addr] = available_id;
ShortAddrCfg::WriteCfg(short_addr_map_);
return available_id;
}
int SensorScheduler::WriteScheduleCfg(long &ts, std::string &world_time) {
Json::Value root;
root["schedule_start_timestamp"] = std::to_string(ts);
root["schedule_start_time"] = world_time;
root["eigen_value_send_interval"] = eigen_value_send_interval_;
root["wave_form_send_interval"] = wave_form_send_interval_;
root["max_sensor_num"] = max_sensor_num_;
root["available_slice"] = available_slice_;
root["free_slice"] = free_slice_;
if (slice_sensor_id_ != NULL) {
delete [] slice_sensor_id_;
slice_sensor_id_ = NULL;
}
if (slice_is_z_wave_ != NULL) {
delete [] slice_is_z_wave_;
slice_is_z_wave_ = NULL;
}
slice_sensor_id_ = new int[available_slice_+1];
slice_is_z_wave_ = new uint8_t[available_slice_+1];
for (int i = 0; i <= available_slice_; ++i) {
slice_sensor_id_[i] = 0;
}
int xy_wave_start_id = 0;
if (max_sensor_num_ <= 4) {
xy_wave_start_id = 5; // 防止z,xy在时间上排得太近
} else {
xy_wave_start_id = max_sensor_num_ + 1;
}
for (int i = 1; i <= max_sensor_num_; ++i) {
sensor_id_nth_slice_[i] = { i, i + xy_wave_start_id };
slice_sensor_id_[i] = i;
slice_sensor_id_[i + xy_wave_start_id] = i;
slice_is_z_wave_[i] = true;
slice_is_z_wave_[i + xy_wave_start_id] = false;
}
Json::StyledStreamWriter streamWriter;
std::ofstream out_file(SCHEDULE_CONFIG);
streamWriter.write(out_file, root);
out_file.close();
return 0;
}
int SensorScheduler::Config(int eigen_value_send_interval, int wave_form_send_interval, int max_sensor_num,
int wave_resend_num, std::string &error_msg) {
int available_slice = 0;
int free_slice = 0;
int ret = CalcAvailableSlice(eigen_value_send_interval,
wave_form_send_interval,
max_sensor_num,
available_slice,
free_slice,
error_msg);
if (ret != 0) {
return ret;
}
eigen_value_send_interval_ = eigen_value_send_interval;
eigen_value_send_duration_ = 2;
wave_form_send_interval_ = wave_form_send_interval;
wave_form_send_duration_ = 60;
max_sensor_num_ = max_sensor_num;
wave_resend_num_ = wave_resend_num;
available_slice_ = available_slice;
free_slice_ = free_slice;
eigen_value_slice_total_seconds_ = 60;
int rest_duration = eigen_value_send_interval_ - eigen_value_slice_total_seconds_;
wave_slice_num_per_eigen_interval_ = rest_duration / wave_form_send_duration_;
seconds_per_wave_slice_ = rest_duration / wave_slice_num_per_eigen_interval_;
std::string world_time;
long current_ts = GetLocalWorldTime(world_time);
ret = WriteScheduleCfg(current_ts, world_time);
if (ret != 0) {
return ret;
}
start_timestamp_ = current_ts;
start_ts_str_ = world_time;
return 0;
}
// 特征值发送间隔300秒波形发送间隔为7200秒
// 一次特征值发送时长为2秒波形发送时长为60秒所有特征值在特征值发送间隔的第1分钟中的第3秒至第57秒全部完成
int SensorScheduler::CalcAvailableSlice(int eigen_value_send_interval, int wave_form_send_interval,
int max_sensor_num, int &available_slice, int &free_slice,
std::string &error_msg) {
if (max_sensor_num <= 0) {
error_msg = "max_sensor_num:" + std::to_string(max_sensor_num) + " must bigger than 0";
zlog_error(zbt, "%s", error_msg.c_str());
return 1;
}
if (2 > eigen_value_send_interval) {
error_msg = "invalid max_sensor_num:" + std::to_string(max_sensor_num) +
" * eigen_value_send_duration:" + std::to_string(2) + " > eigen_value_send_interval" + std::to_string(eigen_value_send_interval);
zlog_error(zbt, "%s", error_msg.c_str());
return 2;
}
if (max_sensor_num * 60 * 2 > wave_form_send_interval) { // xy, z分开发送
error_msg = "invalid wave_form_send_duration:" + std::to_string(60) +
"* 2 * max_sensor_num:" + std::to_string(max_sensor_num) + " > wave_form_send_interval:" + std::to_string(wave_form_send_interval);
zlog_error(zbt, "%s", error_msg.c_str());
return 3;
}
if (wave_form_send_interval % eigen_value_send_interval != 0) {
error_msg = "wave_form_send_interval:" + std::to_string(wave_form_send_interval) + " %% eigen_value_send_interval:" + std::to_string(eigen_value_send_interval) +
" != 0";
zlog_error(zbt, "%s", error_msg.c_str());
return 4;
}
int total_eigen_value_send_duration = 60;
int rest_duration = eigen_value_send_interval - total_eigen_value_send_duration;
int slice_per_eigen_value_interval = rest_duration / 60;
available_slice = wave_form_send_interval / eigen_value_send_interval * slice_per_eigen_value_interval;
free_slice = available_slice - max_sensor_num * 2;
if (free_slice < 0) {
error_msg = "invalid config, available slice:" + std::to_string(available_slice) + ", required slice:" + std::to_string(max_sensor_num);
zlog_error(zbt, "%s", error_msg.c_str());
return 5;
}
return 0;
}
int SensorScheduler::GetScheduleConfig(int &eigen_value_send_interval, int &wave_form_send_interval, int &wave_resend_num,
int &max_sensor_num) {
eigen_value_send_interval = eigen_value_send_interval_;
wave_form_send_interval = wave_form_send_interval_;
wave_resend_num = wave_resend_num_;
max_sensor_num = max_sensor_num_;
return 0;
}
int SensorScheduler::UpdateSensorConfig(uint16_t short_addr) {
int id = 0;
auto iter = short_addr_map_.find(short_addr);
if (iter == short_addr_map_.end()) {
zlog_warn(zbt, "cannot find id for short_addr %d", short_addr);
return 1;
} else {
id = iter->second;
}
if (update_.count(id) > 0) {
return 0;
}
update_.insert(id);
UpdateCfg::WriteCfg(update_);
return 0;
}
int SensorScheduler::UpdateConfigResult(uint16_t short_addr, int result) {
int id = 0;
auto iter = short_addr_map_.find(short_addr);
if (iter == short_addr_map_.end()) {
zlog_warn(zbt, "cannot find id for short_addr %x", short_addr);
return 1;
} else {
id = iter->second;
}
if (result != 0) {
return 0;
}
zlog_info(zbt, "[%d] short addr:%d update successfully", id, short_addr);
update_.erase(id);
UpdateCfg::WriteCfg(update_);
return 0;
}
int SensorScheduler::UpgradeSensor(std::vector<UpgradeParameter> &param_list) {
if (param_list.size() == 0) {
zlog_warn(zbt, "upgrade list is empty, do nothing");
return 1;
}
zlog_debug(zbt, "current status:%d", current_schedule_status_);
if (current_schedule_status_ == kScheduleStatusUpgrade) {
std::unordered_set<uint16_t> tmp_set;
for (auto item : param_list) {
tmp_set.insert(item.short_addr);
}
if (tmp_set == upgrade_list_) {
zlog_warn(zct, "upgrade list and mode are same, do nothing");
return 0;
}
} else {
upgrade_list_.clear();
}
for (auto item : param_list) {
upgrade_list_.insert(item.short_addr);
}
int upgrade_num = 0;
int id = 0;
long ts = GetLocalTs();
for (auto item : param_list) {
auto iter = short_addr_map_.find(item.short_addr);
if (iter == short_addr_map_.end()) {
zlog_error(zbt, "cannot find id for short_addr %x", item.short_addr);
continue;
} else {
id = iter->second;
}
UpgradeInfo info;
info.try_times = 0;
info.sensor_type = item.sensor_type;
info.hw_version = item.hw_version;
info.current_sw_version = item.current_sw_version;
info.upgrade_sw_version = item.upgrade_sw_version;
info.submit_time = GetUTCTime(ts);
upgrade_[id] = info;
zlog_info(zbt, "[%d] short addr:%x add upgrade info", id, item.short_addr);
++upgrade_num;
}
if (upgrade_num > 0) {
UpgradeCfg::WriteCfg(upgrade_);
GenerateUpgradeSchedule();
current_schedule_status_ = kScheduleStatusUpgrade;
set_schedule_status(current_schedule_status_);
} else {
return 1;
}
return 0;
}
int SensorScheduler::CancelUpgradeSensor(std::vector<uint16_t> short_addr_list) {
if (short_addr_list.size() == 0) {
return 0;
}
int cancel_num = 0;
for (auto short_addr : short_addr_list) {
upgrade_list_.erase(short_addr);
auto iter = short_addr_map_.find(short_addr);
if (iter == short_addr_map_.end()) {
zlog_info(zbt, "cannot find id for short_addr %x", short_addr);
continue;
} else {
int id = iter->second;
upgrade_.erase(id);
++cancel_num;
}
}
if (cancel_num > 0) {
if (upgrade_list_.size() == 0) {
current_schedule_status_ = kScheduleStatusNormal;
set_schedule_status(current_schedule_status_);
UpgradeCfg::ClearCfg();
} else {
GenerateUpgradeSchedule();
UpgradeCfg::WriteCfg(upgrade_);
}
}
return 0;
}
int SensorScheduler::UpgradeResult(uint16_t short_addr, int result) {
zlog_info(zbt, "[%x] upgrade result:%d", short_addr, result);
int id = 0;
auto iter = short_addr_map_.find(short_addr);
if (iter == short_addr_map_.end()) {
zlog_info(zbt, "cannot find id for short_addr %x", short_addr);
return 1;
} else {
id = iter->second;
}
if (result == kUpgradeSuccess ||
result == kProductTypeMismatch ||
result == kZigbeeHWMismatch ||
result == kUpgradeDoneBefore) {
upgrade_list_.erase(short_addr);
upgrade_.erase(id);
zlog_info(zbt, "[%d] short addr:%x upgrade successfully", id, short_addr);
if (upgrade_list_.size() == 0) {
zlog_info(zbt, "no upgrade sensor, go to normal status");
current_schedule_status_ = kScheduleStatusNormal;
set_schedule_status(current_schedule_status_);
UpgradeCfg::ClearCfg();
} else {
UpgradeCfg::WriteCfg(upgrade_);
GenerateUpgradeSchedule();
}
} else {
auto upgrade_iter = upgrade_.find(id);
if (upgrade_iter->second.try_times >= 10 /*wave_resend_num_*/) {
zlog_warn(zbt, "[%d] short addr:%x upgrade %d time failure", id, short_addr, wave_resend_num_);
upgrade_list_.erase(short_addr);
upgrade_.erase(id);
if (upgrade_list_.size() == 0) {
zlog_info(zbt, "no upgrade sensor, go to normal status");
current_schedule_status_ = kScheduleStatusNormal;
set_schedule_status(current_schedule_status_);
UpgradeCfg::ClearCfg();
} else {
UpgradeCfg::WriteCfg(upgrade_);
GenerateUpgradeSchedule();
}
} else {
UpdateUpgradeInfo(id);
}
}
return 0;
}
void SensorScheduler::UpdateUpgradeInfo(int id) {
auto upgrade_iter = upgrade_.find(id);
upgrade_iter->second.try_times++;
zlog_debug(zbt, "[%d] try_times:%d", id, upgrade_iter->second.try_times);
long ts = GetLocalTs();
upgrade_iter->second.try_world_time1.push_back(GetUTCTime(ts));
UpgradeCfg::WriteCfg(upgrade_);
}
void SensorScheduler::ModifyScheduleTs(int diff_ts) {
zlog_warn(zbt, "[ModifyScheduleTs] adjust ts:%d, from:%ld to:%ld", diff_ts, start_timestamp_, start_timestamp_ + diff_ts);
start_timestamp_ += diff_ts;
start_ts_str_ = GetUTCTime(start_timestamp_);
std::ifstream schedule_file(SCHEDULE_CONFIG);
Json::Reader reader;
Json::Value root;
if (!reader.parse(schedule_file, root, false)) {
zlog_error(zbt, "[ModifyScheduleTs] invalid format, fail to parse %s", SCHEDULE_CONFIG);
schedule_file.close();
return;
}
schedule_file.close();
root["schedule_start_timestamp"] = std::to_string(start_timestamp_);
root["schedule_start_time"] = start_ts_str_;
Json::StyledStreamWriter streamWriter;
std::ofstream out_file(SCHEDULE_CONFIG);
streamWriter.write(out_file, root);
out_file.close();
}
void SensorScheduler::ClearScheduleCfg(uint16_t short_addr) {
zlog_warn(zbt, "[ClearScheduleCfg] clear all schedule config, short_addr:%x", short_addr);
if (short_addr == 0) {
zlog_warn(zbt, "[ClearScheduleCfg] clear all");
update_.clear();
upgrade_.clear();
short_addr_map_.clear();
ShortAddrCfg::ClearCfg();
UpdateCfg::ClearCfg();
UpgradeCfg::ClearCfg();
wave_feature_set_inst::instance().RemoveAllFeatureCfg();
wave_feature_set_inst::instance().RemoveAllWaveCfg();
} else {
UpdateConfigResult(short_addr, 0);
UpgradeResult(short_addr, kUpgradeSuccess);
short_addr_map_.erase(short_addr);
ShortAddrCfg::WriteCfg(short_addr_map_);
wave_feature_set_inst::instance().RemoveFeatureCfg(short_addr);
wave_feature_set_inst::instance().RemoveWaveCfg(short_addr);
}
}
void SensorScheduler::CleanIdleOccupiedSet(long ts) {
if (free_slice_ocuppied_.size() > 5) {
for (auto it = free_slice_ocuppied_.begin(); it != free_slice_ocuppied_.end();) {
if ((*it) < ts) {
it = free_slice_ocuppied_.erase(it);
} else ++it;
}
}
}
void SensorScheduler::UseDefaultConfig() {
zlog_info(zbt, "use default configuration");
int eigen_value_send_interval = 300;
int wave_form_send_interval = 7200;
int eigen_value_send_duration = 6;
int wave_form_send_duration = 50;
int max_sensor_num = 32;
int wave_resend_num = 3;
// int eigen_value_send_interval = 120;
// int wave_form_send_interval = 240;
// int eigen_value_send_duration = 2; // 固定的
// int wave_form_send_duration = 60; // 固定的
// int max_sensor_num = 4;
std::string error_msg;
Config(eigen_value_send_interval,
wave_form_send_interval,
max_sensor_num, wave_resend_num,
error_msg);
}
void SensorScheduler::SetScheduleStatus(ScheduleStatus status) {
if (status != current_schedule_status_) {
zlog_warn(zbt, "schedule status from:%d to %d", current_schedule_status_, status);
current_schedule_status_ = status;
set_schedule_status(current_schedule_status_);
} else {
zlog_warn(zbt, "schedule status not change:%d", status);
}
}
ScheduleStatus SensorScheduler::GetScheduleStatus() {
return current_schedule_status_;
}
void SensorScheduler::GenerateDebugSchedule(std::vector<uint16_t> short_addr_list) {
int debug_size = short_addr_list.size();
debug_list_.clear();
for (auto item : short_addr_list) {
debug_list_.insert(item);
}
// 时间还是用正常模式的,因为特征值还是正常的,所以只把原来波形调度的地方换成调试的传感器
//
// 1 根据传感器列表找到传感器编号例如1,2,3
// 2 这是第几个5分钟从这个5分钟开始放波形如果这是2小时中的最后一个5分钟要考虑越界的问题
// 3 如果是1到2个传感器隔2分钟(两个波形间隔)测试一次; 如果是大于2个传感器一直连排就行了
// 进入调试模式
long current_ts = GetLocalTs();
long nth_wave_start_slice = (current_ts - start_timestamp_) / wave_form_send_interval_;
long current_wave_start_ts = nth_wave_start_slice * wave_form_send_interval_ + start_timestamp_;
long seconds_in_current_wave_slice = current_ts - current_wave_start_ts;
long nth_eigen_value_slice = seconds_in_current_wave_slice / eigen_value_send_interval_;
long seconds_in_current_eigen_slice = seconds_in_current_wave_slice % eigen_value_send_interval_;
int previous_wave_slice = wave_slice_num_per_eigen_interval_ * (nth_eigen_value_slice + 1);
if (previous_wave_slice == available_slice_) {
previous_wave_slice = 0;
}
debug_slice_sensor_id_ = new uint16_t[available_slice_+1];
for (int i = 0; i <= available_slice_; ++i) {
debug_slice_sensor_id_[i] = 0;
}
if (debug_size == 1) { // 只有一个传感器的话,两分钟一次波形
int j = 0;
for (int i = previous_wave_slice+1; i < previous_wave_slice+1+available_slice_; i = i + 3) {
j = i % available_slice_;
if (j == 0) {
j = i;
}
debug_slice_sensor_id_[j] = short_addr_list[0];
}
} else {
int j = 0;
int k = 0;
for (int i = previous_wave_slice+1; i < previous_wave_slice+1+available_slice_; ++i) {
j = i % available_slice_;
if (j == 0) {
j = i;
}
debug_slice_sensor_id_[j] = short_addr_list[k%debug_size];
++k;
}
}
Json::Value root;
for (int i = 1; i <= available_slice_; ++i) {
root.append(debug_slice_sensor_id_[i]);
}
Json::StyledStreamWriter streamWriter;
std::ofstream out_file(DEBUG_SCHEDULE_CONFIG);
streamWriter.write(out_file, root);
out_file.close();
// 从下一个5分钟开始排因为只要错过了这个5分钟的第1分钟时间传感器都将无法调度
// 下一个5分钟是第几个5分钟间隔波形窗口是第多少个
// 对于特征值后的波形如果特征值时间离波形时间间隔小于10秒那就再加20秒上去这样保证传感器至少休息20秒
current_schedule_status_ = kScheduleStatusDebug;
set_schedule_status(current_schedule_status_);
}
int SensorScheduler::OpenDebugMode(std::vector<uint16_t> short_addr_list) {
if (short_addr_list.size() == 0) {
zlog_warn(zbt, "debug list is empty, do nothing");
return 1;
}
// 给进入调试模式的传感器分配波形时间片
if (current_schedule_status_ == kScheduleStatusDebug) {
std::unordered_set<uint16_t> tmp_set;
for (auto item : short_addr_list) {
tmp_set.insert(item);
}
if (tmp_set == debug_list_) {
zlog_warn(zbt, "debug list and mode are same, do nothing");
return 0;
}
// 重新分配z轴波形时间片
GenerateDebugSchedule(short_addr_list);
return 0;
}
GenerateDebugSchedule(short_addr_list);
return 0;
}
int SensorScheduler::CloseDebugMode() {
if (current_schedule_status_ != kScheduleStatusDebug) {
return 0;
}
if (debug_slice_sensor_id_ != NULL) {
delete []debug_slice_sensor_id_;
debug_slice_sensor_id_ = NULL;
}
current_schedule_status_ = kScheduleStatusNormal;
set_schedule_status(current_schedule_status_);
return 0;
}
void SensorScheduler::GenerateUpgradeSchedule() {
zlog_debug(zbt, "GenerateUpgradeSchedule start");
if (upgrade_list_.size() == 0) {
return;
}
std::vector<uint16_t> short_addr_list;
for (auto item : upgrade_list_) {
short_addr_list.push_back(item);
}
int upgrade_size = short_addr_list.size();
// 时间还是用正常模式的,因为特征值还是正常的,所以只把原来波形调度的地方换成升级的传感器
//
// 1 根据传感器列表找到传感器编号例如1,2,3
// 2 这是第几个5分钟从这个5分钟开始放波形如果这是2小时中的最后一个5分钟要考虑越界的问题
// 3 如果是1到2个传感器隔2分钟(两个波形间隔)测试一次; 如果是大于2个传感器一直连排就行了
long current_ts = GetLocalTs();
long nth_wave_start_slice = abs(current_ts - start_timestamp_) / wave_form_send_interval_;
zlog_debug(zbt, "current ts: %ld, start ts:%ld, wave_form_send_interval_:%d", current_ts, start_timestamp_, wave_form_send_interval_);
long current_wave_start_ts = nth_wave_start_slice * wave_form_send_interval_ + start_timestamp_;
long seconds_in_current_wave_slice = current_ts - current_wave_start_ts;
long nth_eigen_value_slice = seconds_in_current_wave_slice / eigen_value_send_interval_;
long seconds_in_current_eigen_slice = seconds_in_current_wave_slice % eigen_value_send_interval_;
int previous_wave_slice = wave_slice_num_per_eigen_interval_ * (nth_eigen_value_slice + 1);
zlog_debug(zbt, "seconds_in_current_wave_slice:%ld, nth_eigen_value_slice:%ld, seconds_in_current_eigen_slice:%ld, previous_wave_slice: %d",
seconds_in_current_wave_slice, nth_eigen_value_slice, seconds_in_current_eigen_slice, previous_wave_slice);
// if (previous_wave_slice < 0) {
// zlog_error(zbt, "previous_wave_slice: %d", previous_wave_slice);
// }
if (previous_wave_slice == available_slice_) {
previous_wave_slice = 0;
}
if (upgrade_slice_sensor_id_ != NULL) {
delete [] upgrade_slice_sensor_id_;
upgrade_slice_sensor_id_ = NULL;
}
upgrade_slice_sensor_id_ = new uint16_t[available_slice_+1];
for (int i = 0; i <= available_slice_; ++i) {
upgrade_slice_sensor_id_[i] = 0;
}
if (upgrade_size == 1) { // 只有一个传感器的话,两分钟一次波形
zlog_debug(zbt, "upgrade_size 1:%x", short_addr_list[0]);
int j = 0;
// int k = 0;
for (int i = previous_wave_slice+1; i < previous_wave_slice+1+available_slice_; i=i+2) {
j = i % available_slice_;
if (j == 0) {
j = i;
}
zlog_debug(zbt, "i = %d, j = %d", i, j);
// k = k % upgrade_size;
upgrade_slice_sensor_id_[j] = short_addr_list[0];
}
} else {
zlog_debug(zbt, "upgrade_size %d", upgrade_size);
int j = 0;
int k = 0;
for (int i = previous_wave_slice+1; i < previous_wave_slice+1+available_slice_; ++i) {
j = i % available_slice_;
if (j == 0) {
j = i;
}
k = k % upgrade_size;
upgrade_slice_sensor_id_[j] = short_addr_list[k];
++k;
}
}
Json::Value root;
for (int i = 1; i <= available_slice_; ++i) {
root.append(upgrade_slice_sensor_id_[i]);
}
Json::StyledStreamWriter streamWriter;
std::ofstream out_file(UPGRADE_SCHEDULE_CONFIG);
streamWriter.write(out_file, root);
out_file.close();
zlog_debug(zbt, "GenerateUpgradeSchedule end");
}
int SensorScheduler::CloseUpgradeMode() {
if (current_schedule_status_ != kScheduleStatusUpgrade) {
return 0;
}
if (upgrade_slice_sensor_id_ != NULL) {
delete []upgrade_slice_sensor_id_;
upgrade_slice_sensor_id_ = NULL;
}
upgrade_list_.clear();
std::string clear_cfg_file_cmd = "rm -f " + std::string(UPGRADE_SCHEDULE_CONFIG);
system(clear_cfg_file_cmd.c_str());
current_schedule_status_ = kScheduleStatusNormal;
set_schedule_status(current_schedule_status_);
return 0;
}
long SensorScheduler::GetLocalTs() {
auto now = std::chrono::system_clock::now();
auto timestamp = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
// zlog_debug(zct, "current timestamp:%lld", timestamp);
return timestamp;
}
long SensorScheduler::GetLocalWorldTime(std::string &world_time) {
auto now = std::chrono::system_clock::now();
std::time_t now_c = std::chrono::system_clock::to_time_t(now);
std::tm *local_time = std::localtime(&now_c);
char str[100] = {0};
snprintf(str, sizeof(str), "%04d-%02d-%02d %02d:%02d:%02d",
local_time->tm_year + 1900, local_time->tm_mon+1, local_time->tm_mday, local_time->tm_hour, local_time->tm_min, local_time->tm_sec);
world_time = str;
auto timestamp = std::chrono::duration_cast<std::chrono::seconds>(now.time_since_epoch()).count();
// zlog_debug(zct, "world time:%s, timestamp:%lld", world_time.c_str(), timestamp);
return timestamp;
}
std::string SensorScheduler::GetUTCTime(long ts) {
std::chrono::time_point<std::chrono::system_clock> timePoint = std::chrono::system_clock::from_time_t(ts);
std::time_t utcTime = std::chrono::system_clock::to_time_t(timePoint);
std::tm* local_time = std::gmtime(&utcTime);
local_time->tm_hour = local_time->tm_hour + 8;
if (local_time->tm_hour > 24) {
local_time->tm_hour -= 24;
local_time->tm_mday += 1;
}
char str[100] = {0};
snprintf(str, sizeof(str), "%04d-%02d-%02d %02d:%02d:%02d",
local_time->tm_year + 1900, local_time->tm_mon+1, local_time->tm_mday, local_time->tm_hour, local_time->tm_min, local_time->tm_sec);
std::string world_time = str;
return world_time;
}
void SensorScheduler::WriteTriggerWaveRecord() {
Json::Value root;
for (auto it = trigger_wave_record_.begin(); it != trigger_wave_record_.end(); ) {
// 检查 pair 中两个值是否都是 0
if (it->second.first == 0 && it->second.second == 0) {
// 使用 erase 方法删除当前元素
it = trigger_wave_record_.erase(it); // erase 返回指向下一个元素的迭代器
} else {
++it; // 继续迭代
}
}
for (const auto& entry : trigger_wave_record_) {
uint16_t key = entry.first;
const auto& value = entry.second;
// if (value.first == 0 && value.second == 0) {
// continue;
// }
Json::Value item;
item["first"] = value.first;
item["second"] = value.second;
root[std::to_string(key)] = item;
}
Json::StyledStreamWriter writer;
std::ofstream outFile(TRIGGER_WAVE_CONFIG);
if (!outFile.is_open()) {
zlog_warn(zbt, "Failed to open %s file", TRIGGER_WAVE_CONFIG);
return;
}
writer.write(outFile, root);
outFile.close();
}
void SensorScheduler::ReadTriggerWaveRecord() {
Json::Value root;
std::ifstream inFile(TRIGGER_WAVE_CONFIG);
if (!inFile.is_open()) {
return;
}
Json::Reader reader;
if (!reader.parse(inFile, root, false)) {
inFile.close();
return;
}
inFile.close();
for (const auto& key : root.getMemberNames()) {
uint16_t uint16Key = static_cast<uint16_t>(std::stoi(key));
auto value = root[key];
uint8_t first = static_cast<uint8_t>(value["first"].asUInt());
uint8_t second = static_cast<uint8_t>(value["second"].asUInt());
trigger_wave_record_[uint16Key] = std::make_pair(first, second);
}
}