2025-01-23 11:13:58 +08:00
# include "schedule.hpp"
# include <iostream>
# include <string>
# include <fstream>
# include <sstream>
# include <chrono>
# include <ctime>
# include <iomanip>
# include <json/json.h>
# include <zlog.h>
# include "short_addr_cfg.hpp"
# include "update_cfg.hpp"
# include "wave_feature_set.hpp"
extern zlog_category_t * zct ;
extern zlog_category_t * zbt ;
uint8_t g_x , g_y , g_z ;
2025-05-10 11:43:05 +08:00
int SensorScheduler : : StartSchedule ( int short_addr , int & next_duration , int & next_task_id ) {
2025-01-23 11:13:58 +08:00
int id = 0 ;
auto iter = short_addr_map_ . find ( short_addr ) ;
if ( iter = = short_addr_map_ . end ( ) ) {
id = GetAvailableId ( short_addr ) ;
} else {
id = iter - > second ;
}
current_ts_ = GetLocalTs ( ) ;
CleanIdleOccupiedSet ( current_ts_ ) ;
nth_wave_start_slice_ = ( current_ts_ - start_timestamp_ ) / wave_form_send_interval_ ;
current_wave_start_ts_ = nth_wave_start_slice_ * wave_form_send_interval_ + start_timestamp_ ;
seconds_in_current_wave_slice_ = current_ts_ - current_wave_start_ts_ ;
nth_eigen_value_slice_ = seconds_in_current_wave_slice_ / eigen_value_send_interval_ ;
seconds_in_current_eigen_slice_ = seconds_in_current_wave_slice_ % eigen_value_send_interval_ ;
ts_in_eigen_slice_ = false ;
2025-04-03 10:47:57 +08:00
2025-01-23 11:13:58 +08:00
if ( seconds_in_current_eigen_slice_ < eigen_value_slice_total_seconds_ - 3 ) {
ts_in_eigen_slice_ = true ;
2025-04-03 10:47:57 +08:00
}
2025-01-23 11:13:58 +08:00
if ( ts_in_eigen_slice_ ) {
nth_eigen_slice_ = ( seconds_in_current_eigen_slice_ + 2 ) / eigen_value_send_duration_ ;
2025-04-09 17:59:13 +08:00
if ( nth_eigen_value_slice_ = = 0 ) {
2025-04-03 10:47:57 +08:00
ClearFailureSuccessMap ( ) ;
}
2025-01-23 11:13:58 +08:00
} else {
nth_wave_slice_ = ( seconds_in_current_eigen_slice_ - eigen_value_slice_total_seconds_ + 3 ) / seconds_per_wave_slice_ ;
}
zlog_warn ( zct , " [%d:%x] ts:%ld, current utc:%s, nth eigen_value slice:%d, seconds in eigen slice:%d, eigen slice:%d " ,
id , short_addr , current_ts_ , GetUTCTime ( current_ts_ ) . c_str ( ) , nth_eigen_value_slice_ + 1 , seconds_in_current_eigen_slice_ , ts_in_eigen_slice_ ) ;
if ( ts_in_eigen_slice_ ) {
2025-04-03 10:47:57 +08:00
// if (id == nth_eigen_slice_ + 1) {
2025-01-23 11:13:58 +08:00
// 传感器需要执行上送特征值任务, 如果有配置需要下发的话,下发配置
if ( update_ . count ( id ) ) {
// execute config
zlog_warn ( zct , " [%d:%x] update config in eigen slice " , id , short_addr ) ;
current_request_ = kScheduleConfigSensor ;
return kScheduleConfigSensor ;
} else {
2025-05-10 11:43:05 +08:00
// wave_feature_set_inst::instance().GetFeatureCfg(short_addr, g_x, g_y, g_z);
// if (g_x || g_y || g_z) {
// // 执行上送特征值任务
// zlog_warn(zct, "[%d:%x] send eigen value in eigen slice", id, short_addr);
// current_request_ = kScheduleEigenValue;
// return kScheduleEigenValue;
// } else {
next_duration = GetNextDuration ( short_addr , next_task_id ) ;
2025-01-23 11:13:58 +08:00
zlog_warn ( zct , " [%d:%x] no need for eigen " , id , short_addr ) ;
2025-05-10 11:43:05 +08:00
return kScheduleResultNone ;
// }
2025-01-23 11:13:58 +08:00
}
2025-04-03 10:47:57 +08:00
// } else {
// zlog_warn(zct, "[%d:%x] Invalid request, revive in %d eigen slice", id, short_addr, nth_eigen_slice_ + 1);
// if (id < nth_eigen_slice_ + 1) {
// // 不正确的请求
// long available_ts = current_wave_start_ts_ + (nth_eigen_value_slice_ + 1) * eigen_value_send_interval_ + (id - 1) * eigen_value_send_duration_;
// next_duration = available_ts - current_ts_;
// zlog_warn(zct, "[%d:%x] wrong time in eigen slice, next feature in next interval send utc time:[%s], duration:%d", id, short_addr, GetUTCTime(available_ts).c_str(), next_duration);
// } else {
// long available_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + (id - 1) * eigen_value_send_duration_;
// next_duration = available_ts - current_ts_;
// zlog_warn(zct, "[%d:%x] wrong time in eigen slice, next feature in current interval send utc time:[%s], duration:%d", id, short_addr, GetUTCTime(available_ts).c_str(), next_duration);
// }
// return kScheduleWrongTime;
// }
2025-01-23 11:13:58 +08:00
} else {
int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_ + 1 ;
auto wave_slice_iter = sensor_id_nth_slice_ . find ( id ) ;
if ( wave_slice_iter = = sensor_id_nth_slice_ . end ( ) ) {
zlog_error ( zct , " [%d:%x]invaild id, not find wave slice id, need to check further " , id , short_addr ) ;
return kScheduleUnknownSensor ;
}
if ( nth_wave_slice = = wave_slice_iter - > second ) {
// 需要发送波形, 需要判断是否有配置,升级需求
auto upgrade_iter = upgrade_ . find ( id ) ;
if ( upgrade_iter ! = upgrade_ . end ( ) ) {
if ( upgrade_iter - > second . try_times < 10 ) {
current_request_ = kScheduleUpgrade ;
UpdateUpgradeInfo ( id ) ;
zlog_warn ( zct , " [%d:%x] in wave slice to upgrade now from version:%s to %s, try time:%d " ,
id , short_addr , upgrade_iter - > second . current_sw_version . c_str ( ) ,
upgrade_iter - > second . upgrade_sw_version . c_str ( ) , upgrade_iter - > second . try_times ) ;
return kScheduleUpgrade ;
}
}
2025-05-10 11:43:05 +08:00
// wave_feature_set_inst::instance().GetWaveCfg(short_addr, g_x, g_y, g_z);
// if (g_x || g_y || g_z) {
// zlog_warn(zct, "[%d:%x] it is wave time", id, short_addr);
// current_request_ = kScheduleWaveForm;
// return kScheduleWaveForm;
// } else {
next_duration = GetNextDuration ( short_addr , next_task_id ) ;
2025-01-23 11:13:58 +08:00
zlog_warn ( zct , " [%d:%x] no need for wave " , id , short_addr ) ;
2025-05-10 11:43:05 +08:00
return kScheduleResultNone ;
// }
2025-01-23 11:13:58 +08:00
} else {
if ( slice_sensor_id_ [ nth_wave_slice - 1 ] = = 0 ) {
zlog_warn ( zct , " [%d:%x] in idle time " , id , short_addr ) ;
// idle time
auto upgrade_iter = upgrade_ . find ( id ) ;
if ( upgrade_iter ! = upgrade_ . end ( ) ) {
if ( upgrade_iter - > second . try_times < 10 ) {
current_request_ = kScheduleUpgrade ;
UpdateUpgradeInfo ( id ) ;
zlog_warn ( zct , " [%d:%x] in idle to upgrade now from version:%s to %s, try time:%d " ,
id , short_addr , upgrade_iter - > second . current_sw_version . c_str ( ) ,
upgrade_iter - > second . upgrade_sw_version . c_str ( ) , upgrade_iter - > second . try_times ) ;
return kScheduleUpgrade ;
}
}
2025-05-10 11:43:05 +08:00
// if (RetransferWave(short_addr)) {
// zlog_warn(zct, "[%d:%x] it is retransfer wave time", id, short_addr);
// current_request_ = kScheduleWaveForm;
// return kScheduleWaveForm;
// } else if (MissedWave(short_addr)) {
// zlog_warn(zct, "[%d:%x] it is patch wave time", id, short_addr);
// current_request_ = kScheduleWaveForm;
// patch_set_.erase(short_addr);
// return kScheduleWaveForm;
// }
2025-01-23 11:13:58 +08:00
}
// wrong time to come
long available_ts = current_wave_start_ts_ + ( nth_eigen_value_slice_ + 1 ) * eigen_value_send_interval_ + ( id - 1 ) * eigen_value_send_duration_ ;
next_duration = available_ts - current_ts_ ;
2025-07-23 22:34:01 +08:00
if ( next_duration < 0 | | next_duration > eigen_value_send_interval_ ) {
zlog_warn ( zct , " [Nxt] exception duration: %d " , next_duration ) ;
next_duration = eigen_value_send_interval_ ;
} else if ( next_duration < 15 ) {
zlog_warn ( zct , " [Nxt] exception duration: %d " , next_duration ) ;
next_duration = 15 ;
}
2025-05-10 11:43:05 +08:00
next_task_id = kScheduleEigenValue ;
2025-01-23 11:13:58 +08:00
zlog_warn ( zct , " [%d:%x] wrong time in wave slice, next feature send utc time:[%s], duration:%d " , id , short_addr , GetUTCTime ( available_ts ) . c_str ( ) , next_duration ) ;
2025-05-10 11:43:05 +08:00
return kScheduleResultNone ;
2025-01-23 11:13:58 +08:00
}
}
}
2025-05-10 11:43:05 +08:00
long SensorScheduler : : CalcNextTimestamp ( int id , uint16_t short_addr , int & next_task_id ) {
2025-01-23 11:13:58 +08:00
// current_ts_ = GetLocalTs();
2025-04-03 10:47:57 +08:00
// nth_wave_start_slice_ = (current_ts_ - start_timestamp_) / wave_form_send_interval_;
// current_wave_start_ts_ = nth_wave_start_slice_ * wave_form_send_interval_ + start_timestamp_;
// seconds_in_current_wave_slice_ = current_ts_ - current_wave_start_ts_;
// nth_eigen_value_slice_ = seconds_in_current_wave_slice_ / eigen_value_send_interval_;
// seconds_in_current_eigen_slice_ = seconds_in_current_wave_slice_ % eigen_value_send_interval_;
// ts_in_eigen_slice_ = false;
// if (seconds_in_current_eigen_slice_ < eigen_value_slice_total_seconds_ - 3) {
// ts_in_eigen_slice_ = true;
// }
2025-05-10 11:43:05 +08:00
nth_wave_start_slice_ = ( current_ts_ - start_timestamp_ ) / wave_form_send_interval_ ;
current_wave_start_ts_ = nth_wave_start_slice_ * wave_form_send_interval_ + start_timestamp_ ;
seconds_in_current_wave_slice_ = current_ts_ - current_wave_start_ts_ ;
nth_eigen_value_slice_ = seconds_in_current_wave_slice_ / eigen_value_send_interval_ ;
seconds_in_current_eigen_slice_ = seconds_in_current_wave_slice_ % eigen_value_send_interval_ ;
ts_in_eigen_slice_ = false ;
if ( seconds_in_current_eigen_slice_ < eigen_value_slice_total_seconds_ - 3 ) {
ts_in_eigen_slice_ = true ;
}
2025-04-03 10:47:57 +08:00
2025-01-23 11:13:58 +08:00
if ( ts_in_eigen_slice_ ) {
int forward_wave_slice_num = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ ;
auto wave_slice_iter = sensor_id_nth_slice_ . find ( id ) ;
if ( wave_slice_iter = = sensor_id_nth_slice_ . end ( ) ) {
zlog_error ( zct , " [Nxt] invaild id:%d, not find wave slice id " , id ) ;
long available_ts = current_wave_start_ts_ + eigen_value_send_interval_ + nth_eigen_slice_ * eigen_value_send_duration_ ;
zlog_error ( zct , " [Nxt] [%d] next feature send utc time:[%s] " , id , GetUTCTime ( available_ts ) . c_str ( ) ) ;
2025-05-10 11:43:05 +08:00
next_task_id = kScheduleEigenValue ;
2025-01-23 11:13:58 +08:00
return available_ts ;
}
int wave_slice = wave_slice_iter - > second ; // 从1开始
long send_wave_ts = 0 ;
wave_feature_set_inst : : instance ( ) . GetWaveCfg ( short_addr , g_x , g_y , g_z ) ;
if ( g_x | | g_y | | g_z ) {
if ( wave_slice > forward_wave_slice_num & &
wave_slice < = forward_wave_slice_num + wave_slice_num_per_eigen_interval_ ) {
// 发送波的时间窗也在本次特征值发送间隔中
for ( int i = forward_wave_slice_num ; i < = forward_wave_slice_num + wave_slice_num_per_eigen_interval_ ; + + i ) {
if ( wave_slice - 1 = = i ) {
send_wave_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + eigen_value_slice_total_seconds_ + ( i - forward_wave_slice_num ) * seconds_per_wave_slice_ ;
zlog_warn ( zct , " [Nxt] [%d:%x] send wave time:[%s] " , id , short_addr , GetUTCTime ( send_wave_ts ) . c_str ( ) ) ;
break ;
}
}
}
2025-04-03 10:47:57 +08:00
if ( ! MissedWave ( short_addr ) & & send_wave_ts = = 0 & & success_set_ . count ( short_addr ) = = 0 ) {
// add for patch wave
int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_ + 1 ;
auto wave_slice_iter = sensor_id_nth_slice_ . find ( id ) ;
if ( wave_slice_iter ! = sensor_id_nth_slice_ . end ( ) ) {
if ( nth_wave_slice > wave_slice_iter - > second ) {
if ( success_set_ . count ( short_addr ) = = 0 & & ! RetransferWave ( short_addr ) ) {
2025-04-10 15:50:52 +08:00
zlog_warn ( zct , " [Nxt] [%d:%x] add it to patch set " , id , short_addr ) ;
2025-04-03 10:47:57 +08:00
patch_set_ . insert ( short_addr ) ;
}
}
}
}
2025-01-23 11:13:58 +08:00
}
long available_ts = 0 ;
auto upgrade_iter = upgrade_ . find ( id ) ;
if ( upgrade_iter ! = upgrade_ . end ( ) ) {
if ( upgrade_iter - > second . try_times < 10 ) {
for ( int i = 0 ; i < wave_slice_num_per_eigen_interval_ ; + + i ) {
if ( slice_sensor_id_ [ i + forward_wave_slice_num ] = = 0 ) {
// 判断此空闲位置是否被占用
long current_wave_slice_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + eigen_value_slice_total_seconds_ + i * seconds_per_wave_slice_ ;
if ( free_slice_ocuppied_ . count ( current_wave_slice_ts ) = = 0 ) {
available_ts = current_wave_slice_ts ;
free_slice_ocuppied_ . insert ( available_ts ) ;
zlog_warn ( zct , " [Nxt][%d:%x] %d nth free wave slice will be used to upgrade, utc time:[%s] " , id , short_addr , i + forward_wave_slice_num , GetUTCTime ( available_ts ) . c_str ( ) ) ;
2025-05-10 11:43:05 +08:00
// break;
next_task_id = kScheduleUpgrade ;
return available_ts ;
2025-01-23 11:13:58 +08:00
}
}
}
}
2025-04-03 10:47:57 +08:00
} else {
if ( g_x | | g_y | | g_z ) {
if ( RetransferWave ( short_addr ) ) {
for ( int i = 0 ; i < wave_slice_num_per_eigen_interval_ ; + + i ) {
if ( slice_sensor_id_ [ i + forward_wave_slice_num ] = = 0 ) {
// 判断此空闲位置是否被占用
long current_wave_slice_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + eigen_value_slice_total_seconds_ + i * seconds_per_wave_slice_ ;
if ( free_slice_ocuppied_ . count ( current_wave_slice_ts ) = = 0 ) {
available_ts = current_wave_slice_ts ;
free_slice_ocuppied_ . insert ( available_ts ) ;
zlog_warn ( zct , " [Nxt][%d:%x] %d nth free wave slice will be used to retransfer wave, utc time:[%s] " , id , short_addr , i + forward_wave_slice_num , GetUTCTime ( available_ts ) . c_str ( ) ) ;
2025-05-10 11:43:05 +08:00
// break;
next_task_id = kScheduleWaveForm ;
return available_ts ;
2025-04-03 10:47:57 +08:00
}
}
}
2025-04-11 14:28:07 +08:00
} else if ( MissedWave ( short_addr ) ) {
2025-04-03 10:47:57 +08:00
for ( int i = 0 ; i < wave_slice_num_per_eigen_interval_ ; + + i ) {
if ( slice_sensor_id_ [ i + forward_wave_slice_num ] = = 0 ) {
// 判断此空闲位置是否被占用
long current_wave_slice_ts = current_wave_start_ts_ + nth_eigen_value_slice_ * eigen_value_send_interval_ + eigen_value_slice_total_seconds_ + i * seconds_per_wave_slice_ ;
if ( free_slice_ocuppied_ . count ( current_wave_slice_ts ) = = 0 ) {
available_ts = current_wave_slice_ts ;
free_slice_ocuppied_ . insert ( available_ts ) ;
zlog_warn ( zct , " [Nxt][%d:%x] %d nth free wave slice will be used to patch wave, utc time:[%s] " , id , short_addr , i + forward_wave_slice_num , GetUTCTime ( available_ts ) . c_str ( ) ) ;
2025-05-10 11:43:05 +08:00
// break;
next_task_id = kScheduleWaveForm ;
return available_ts ;
2025-04-03 10:47:57 +08:00
}
}
}
}
}
}
2025-05-10 11:43:05 +08:00
if ( send_wave_ts > 0 ) {
// long min_ts = std::min(send_wave_ts, available_ts);
zlog_warn ( zct , " [Nxt] [%d:%x] next wave send utc time1:%s " , id , short_addr , GetUTCTime ( send_wave_ts ) . c_str ( ) ) ;
next_task_id = kScheduleWaveForm ;
return send_wave_ts ;
// return min_ts;
2025-01-23 11:13:58 +08:00
}
2025-05-10 11:43:05 +08:00
// if (send_wave_ts + available_ts > 0) {
// long max_ts = std::max(send_wave_ts, available_ts);
// zlog_warn(zct, "[Nxt] [%d:%x] next feature send utc time2:%s", id, short_addr, GetUTCTime(max_ts).c_str());
// return max_ts;
// }
2025-01-23 11:13:58 +08:00
}
// 如果是在当前波形时间窗中,不管是空闲时间窗,还是发送波形的时间窗,下一个时间窗是特征值
long available_ts = current_wave_start_ts_ + ( nth_eigen_value_slice_ + 1 ) * eigen_value_send_interval_ + ( id - 1 ) * eigen_value_send_duration_ ;
zlog_warn ( zct , " [Nxt] [%d:%x] next feature send utc time3:[%s] " , id , short_addr , GetUTCTime ( available_ts ) . c_str ( ) ) ;
2025-05-10 11:43:05 +08:00
next_task_id = kScheduleEigenValue ;
2025-01-23 11:13:58 +08:00
return available_ts ;
}
2025-05-10 11:43:05 +08:00
int SensorScheduler : : GetNextDuration ( int short_addr , int & next_task_id ) {
2025-01-23 11:13:58 +08:00
int id = 0 ;
auto iter = short_addr_map_ . find ( short_addr ) ;
if ( iter = = short_addr_map_ . end ( ) ) {
zlog_error ( zct , " cannot find id for short_addr %x " , short_addr ) ;
return 0 ;
} else {
id = iter - > second ;
}
2025-05-10 11:43:05 +08:00
current_ts_ = GetLocalTs ( ) ;
// nth_wave_start_slice_ = (current_ts_ - start_timestamp_) / wave_form_send_interval_;
// current_wave_start_ts_ = nth_wave_start_slice_ * wave_form_send_interval_ + start_timestamp_;
// seconds_in_current_wave_slice_ = current_ts_ - current_wave_start_ts_;
// nth_eigen_value_slice_ = seconds_in_current_wave_slice_ / eigen_value_send_interval_;
long next_ts = CalcNextTimestamp ( id , short_addr , next_task_id ) ;
int duration = next_ts - current_ts_ ;
2025-01-23 11:13:58 +08:00
if ( duration < 0 | | duration > eigen_value_send_interval_ ) {
zlog_warn ( zct , " [Nxt] exception duration: %d " , duration ) ;
duration = eigen_value_send_interval_ ;
2025-07-23 22:34:01 +08:00
} else if ( duration < 15 ) {
2025-05-10 11:43:05 +08:00
zlog_warn ( zct , " [Nxt] exception duration: %d " , duration ) ;
2025-07-23 22:34:01 +08:00
duration = 15 ;
2025-01-23 11:13:58 +08:00
}
2025-05-10 11:43:05 +08:00
zlog_warn ( zct , " [Nxt] [%d:%x] next duration: %d, taskid: %d " , id , short_addr , duration , next_task_id ) ;
2025-01-23 11:13:58 +08:00
return duration ;
}
2025-05-10 11:43:05 +08:00
// int SensorScheduler::GetNextDuration(int short_addr) {
// int id = 0;
// auto iter = short_addr_map_.find(short_addr);
// if (iter == short_addr_map_.end()) {
// zlog_error(zct, "cannot find id for short_addr %x", short_addr);
// return 0;
// } else {
// id = iter->second;
// }
// long current_ts = GetLocalTs();
// long next_ts = CalcNextTimestamp(id, short_addr);
// int duration = next_ts - current_ts;
// if (duration < 0 || duration > eigen_value_send_interval_) {
// zlog_warn(zct, "[Nxt] exception duration: %d", duration);
// duration = eigen_value_send_interval_;
// }
// zlog_warn(zct, "[Nxt] [%d:%x] next duration is %d", id, short_addr, duration);
// return duration;
// }
2025-01-23 11:13:58 +08:00
SensorScheduler : : SensorScheduler ( ) {
support_modification_ = true ;
std : : ifstream schedule_file ( SCHEDULE_CONFIG ) ;
2025-04-10 15:50:52 +08:00
bool configed = false ;
2025-01-23 11:13:58 +08:00
if ( schedule_file . good ( ) ) {
zlog_info ( zbt , " exist configuration file " ) ;
Json : : Reader reader ;
Json : : Value root ;
if ( ! reader . parse ( schedule_file , root , false ) ) {
zlog_error ( zbt , " invalid format, fail to parse %s " , SCHEDULE_CONFIG ) ;
schedule_file . close ( ) ;
2025-04-10 15:50:52 +08:00
goto init_config ;
2025-01-23 11:13:58 +08:00
}
schedule_file . close ( ) ;
if ( ! root . isObject ( ) ) {
zlog_error ( zbt , " invalid format, not an object: %s " , SCHEDULE_CONFIG ) ;
2025-04-10 15:50:52 +08:00
goto init_config ;
2025-01-23 11:13:58 +08:00
}
2025-04-10 15:50:52 +08:00
configed = true ;
2025-01-23 11:13:58 +08:00
start_timestamp_ = std : : stol ( root [ " schedule_start_timestamp " ] . asString ( ) ) ;
start_ts_str_ = root [ " schedule_start_time " ] . asString ( ) ;
long current_ts = GetLocalTs ( ) ;
if ( current_ts < start_timestamp_ ) {
zlog_warn ( zbt , " current ts: %ld less than start ts: %ld, go to adjust it " , current_ts , start_timestamp_ ) ;
start_timestamp_ = current_ts ;
start_ts_str_ = GetUTCTime ( current_ts ) ;
root [ " schedule_start_timestamp " ] = std : : to_string ( start_timestamp_ ) ;
root [ " schedule_start_time " ] = start_ts_str_ ;
Json : : StyledStreamWriter streamWriter ;
std : : ofstream out_file ( SCHEDULE_CONFIG ) ;
streamWriter . write ( out_file , root ) ;
out_file . close ( ) ;
}
eigen_value_send_interval_ = root [ " eigen_value_send_interval " ] . asInt ( ) ;
eigen_value_send_duration_ = root [ " eigen_value_send_duration " ] . asInt ( ) ;
wave_form_send_interval_ = root [ " wave_form_send_interval " ] . asInt ( ) ;
wave_form_send_duration_ = root [ " wave_form_send_duration " ] . asInt ( ) ;
max_sensor_num_ = root [ " max_sensor_num " ] . asInt ( ) ;
available_slice_ = root [ " available_slice " ] . asInt ( ) ;
free_slice_ = root [ " free_slice " ] . asInt ( ) ;
support_modification_ = root [ " support_modification " ] . asBool ( ) ;
const Json : : Value ids = root [ " id " ] ;
int sensor_id = 0 ;
int i = 0 ;
for ( const auto & id : ids ) {
sensor_id = id . asInt ( ) ;
slice_sensor_id_ . push_back ( sensor_id ) ;
if ( sensor_id ! = 0 ) {
sensor_id_nth_slice_ [ sensor_id ] = i + 1 ;
}
+ + i ;
}
eigen_value_slice_total_seconds_ = eigen_value_send_duration_ * max_sensor_num_ ;
int rest_duration = eigen_value_send_interval_ - eigen_value_slice_total_seconds_ ;
wave_slice_num_per_eigen_interval_ = rest_duration / wave_form_send_duration_ ;
seconds_per_wave_slice_ = rest_duration / wave_slice_num_per_eigen_interval_ ;
2025-04-10 15:50:52 +08:00
}
2025-01-23 11:13:58 +08:00
2025-04-10 15:50:52 +08:00
init_config :
if ( ! configed ) {
UseDefaultConfig ( ) ;
}
2025-01-23 11:13:58 +08:00
short_addr_map_ . clear ( ) ;
ShortAddrCfg : : ReadCfg ( short_addr_map_ ) ;
// read upgrade config file: UPGRADE_CONFIG
UpgradeCfg : : ReadCfg ( upgrade_ ) ;
// read config update file: CONFIG_UPDATE
UpdateCfg : : ReadCfg ( update_ ) ;
}
2025-04-03 10:47:57 +08:00
int SensorScheduler : : WaveError ( uint16_t short_addr ) {
auto iter = failure_map_ . find ( short_addr ) ;
if ( iter = = failure_map_ . end ( ) ) {
failure_map_ [ short_addr ] = 3 ; // 重试次数
zlog_warn ( zct , " [WaveError][%x] will try 3 times " , short_addr ) ;
2025-05-30 18:31:15 +08:00
return 1 ;
2025-04-03 10:47:57 +08:00
}
if ( iter - > second = = 0 ) {
zlog_warn ( zct , " [WaveError][%x] no try times " , short_addr ) ;
failure_map_ . erase ( short_addr ) ;
2025-05-30 18:31:15 +08:00
return 3 ;
2025-04-03 10:47:57 +08:00
}
iter - > second = iter - > second - 1 ;
zlog_warn ( zct , " [WaveError][%x] remain try %d times " , short_addr , iter - > second ) ;
2025-05-30 18:31:15 +08:00
return 3 - iter - > second ;
2025-04-03 10:47:57 +08:00
}
bool SensorScheduler : : RetransferWave ( uint16_t short_addr ) {
auto iter = failure_map_ . find ( short_addr ) ;
if ( iter ! = failure_map_ . end ( ) ) {
return true ;
}
return false ;
}
bool SensorScheduler : : MissedWave ( uint16_t short_addr ) {
if ( patch_set_ . count ( short_addr ) > 0 ) {
return true ;
}
return false ;
}
2025-05-27 15:37:11 +08:00
int SensorScheduler : : WaveSuccess ( uint16_t short_addr ) {
2025-05-10 11:43:05 +08:00
zlog_warn ( zct , " [WaveSuccess][%x] " , short_addr ) ;
2025-04-03 10:47:57 +08:00
success_set_ . insert ( short_addr ) ;
2025-05-10 11:43:05 +08:00
patch_set_ . erase ( short_addr ) ;
2025-04-03 10:47:57 +08:00
auto iter = failure_map_ . find ( short_addr ) ;
if ( iter ! = failure_map_ . end ( ) ) {
zlog_warn ( zct , " [WaveSuccess][%x] try %d times success " , short_addr , 4 - iter - > second ) ;
failure_map_ . erase ( short_addr ) ;
2025-05-27 15:37:11 +08:00
return 4 - iter - > second ;
2025-04-03 10:47:57 +08:00
}
2025-05-27 15:37:11 +08:00
return 0 ;
2025-04-03 10:47:57 +08:00
}
void SensorScheduler : : ClearFailureSuccessMap ( ) {
2025-05-27 15:37:11 +08:00
zlog_warn ( zct , " [ClearFailureSuccessMap] " ) ;
2025-04-03 10:47:57 +08:00
failure_map_ . clear ( ) ;
success_set_ . clear ( ) ;
patch_set_ . clear ( ) ;
}
long SensorScheduler : : GetBaseTimestamp ( int short_addr ) {
2025-01-23 11:13:58 +08:00
int id = 0 ;
auto iter = short_addr_map_ . find ( short_addr ) ;
if ( iter = = short_addr_map_ . end ( ) ) {
zlog_error ( zct , " cannot find id for short_addr %x " , short_addr ) ;
return 0 ;
} else {
id = iter - > second ;
}
2025-04-03 10:47:57 +08:00
return start_timestamp_ + ( id - 1 ) * eigen_value_send_duration_ ;
2025-01-23 11:13:58 +08:00
}
int SensorScheduler : : GetAvailableId ( int short_addr ) {
int max_support_sensor [ 128 ] = { 0 } ;
for ( auto it = short_addr_map_ . begin ( ) ; it ! = short_addr_map_ . end ( ) ; + + it ) {
max_support_sensor [ it - > second ] = 1 ;
}
int available_id = 0 ;
for ( int i = 1 ; i < 128 ; + + i ) {
if ( max_support_sensor [ i ] = = 0 ) {
available_id = i ;
break ;
}
}
zlog_warn ( zct , " [GetAvailableId][%d] short addr : %x " , available_id , short_addr ) ;
short_addr_map_ [ short_addr ] = available_id ;
ShortAddrCfg : : WriteCfg ( short_addr_map_ ) ;
return available_id ;
}
int SensorScheduler : : WriteScheduleCfg ( long & ts , std : : string & world_time ) {
Json : : Value root ;
root [ " schedule_start_timestamp " ] = std : : to_string ( ts ) ;
root [ " schedule_start_time " ] = world_time ;
root [ " eigen_value_send_interval " ] = eigen_value_send_interval_ ;
root [ " eigen_value_send_duration " ] = eigen_value_send_duration_ ;
root [ " wave_form_send_interval " ] = wave_form_send_interval_ ;
root [ " wave_form_send_duration " ] = wave_form_send_duration_ ;
root [ " max_sensor_num " ] = max_sensor_num_ ;
root [ " available_slice " ] = available_slice_ ;
root [ " free_slice " ] = free_slice_ ;
root [ " support_modification " ] = true ;
int * slice_allocation = new int [ available_slice_ ] ;
int slice_index = 0 ; // 有时间片的索引
int no_slice_index = 0 ; // 没有时间片的索引
int k = 1 ;
slice_sensor_id_ . clear ( ) ;
for ( int i = 0 ; i < available_slice_ ; + + i ) {
if ( slice_index < max_sensor_num_ & & ( i % 2 = = 0 | | no_slice_index > = free_slice_ ) ) {
slice_allocation [ i ] = k ;
sensor_id_nth_slice_ [ k ] = i + 1 ;
slice_sensor_id_ . push_back ( k ) ;
k = k + 1 ;
slice_index + + ;
} else if ( no_slice_index < free_slice_ ) {
slice_sensor_id_ . push_back ( 0 ) ;
slice_allocation [ i ] = 0 ;
no_slice_index + + ;
}
}
Json : : Value ids ;
for ( int i = 0 ; i < available_slice_ ; + + i ) {
ids . append ( slice_allocation [ i ] ) ;
}
delete [ ] slice_allocation ;
root [ " id " ] = ids ;
Json : : StyledStreamWriter streamWriter ;
std : : ofstream out_file ( SCHEDULE_CONFIG ) ;
streamWriter . write ( out_file , root ) ;
out_file . close ( ) ;
return 0 ;
}
int SensorScheduler : : Config ( int eigen_value_send_interval , int wave_form_send_interval , int eigen_value_send_duration ,
int wave_form_send_duration , int max_sensor_num , std : : string & error_msg ) {
if ( ! support_modification_ ) {
zlog_warn ( zct , " not support modification " ) ;
return 1 ;
}
int available_slice = 0 ;
int free_slice = 0 ;
int ret = CalcAvailableSlice ( eigen_value_send_interval ,
wave_form_send_interval ,
eigen_value_send_duration ,
wave_form_send_duration ,
max_sensor_num ,
available_slice ,
free_slice ,
error_msg ) ;
if ( ret ! = 0 ) {
return ret ;
}
eigen_value_send_interval_ = eigen_value_send_interval ;
eigen_value_send_duration_ = eigen_value_send_duration ;
wave_form_send_interval_ = wave_form_send_interval ;
wave_form_send_duration_ = wave_form_send_duration ;
max_sensor_num_ = max_sensor_num ;
available_slice_ = available_slice ;
free_slice_ = free_slice ;
support_modification_ = true ;
eigen_value_slice_total_seconds_ = eigen_value_send_duration_ * max_sensor_num_ ;
int rest_duration = eigen_value_send_interval_ - eigen_value_slice_total_seconds_ ;
wave_slice_num_per_eigen_interval_ = rest_duration / wave_form_send_duration_ ;
seconds_per_wave_slice_ = rest_duration / wave_slice_num_per_eigen_interval_ ;
std : : string world_time ;
long current_ts = GetLocalWorldTime ( world_time ) ;
ret = WriteScheduleCfg ( current_ts , world_time ) ;
if ( ret ! = 0 ) {
return ret ;
}
start_timestamp_ = current_ts ;
start_ts_str_ = world_time ;
return 0 ;
}
// 特征值发送间隔300秒, 波形发送间隔为7200秒
// 一次特征值发送时长为6秒, 波形发送时长为60秒
int SensorScheduler : : CalcAvailableSlice ( int eigen_value_send_interval , int wave_form_send_interval ,
int eigen_value_send_duration , int wave_form_send_duration ,
int max_sensor_num , int & available_slice , int & free_slice ,
std : : string & error_msg ) {
if ( max_sensor_num < = 0 ) {
error_msg = " max_sensor_num: " + std : : to_string ( max_sensor_num ) + " must bigger than 0 " ;
zlog_error ( zbt , " %s " , error_msg . c_str ( ) ) ;
return 1 ;
}
if ( max_sensor_num * eigen_value_send_duration > eigen_value_send_interval ) {
error_msg = " invalid max_sensor_num: " + std : : to_string ( max_sensor_num ) +
" * eigen_value_send_duration: " + std : : to_string ( eigen_value_send_duration ) + " > eigen_value_send_interval " + std : : to_string ( eigen_value_send_interval ) ;
zlog_error ( zbt , " %s " , error_msg . c_str ( ) ) ;
return 2 ;
}
if ( max_sensor_num * wave_form_send_duration > wave_form_send_interval ) {
error_msg = " invalid wave_form_send_duration: " + std : : to_string ( wave_form_send_duration ) +
" * max_sensor_num: " + std : : to_string ( max_sensor_num ) + " > wave_form_send_interval: " + std : : to_string ( wave_form_send_interval ) ;
zlog_error ( zbt , " %s " , error_msg . c_str ( ) ) ;
return 3 ;
}
if ( wave_form_send_interval % eigen_value_send_interval ! = 0 ) {
error_msg = " wave_form_send_interval: " + std : : to_string ( wave_form_send_interval ) + " %% eigen_value_send_interval: " + std : : to_string ( eigen_value_send_interval ) +
" != 0 " ;
zlog_error ( zbt , " %s " , error_msg . c_str ( ) ) ;
return 4 ;
}
int total_eigen_value_send_duration = eigen_value_send_duration * max_sensor_num ;
int rest_duration = eigen_value_send_interval - total_eigen_value_send_duration ;
int slice_per_eigen_value_interval = rest_duration / wave_form_send_duration ;
available_slice = wave_form_send_interval / eigen_value_send_interval * slice_per_eigen_value_interval ;
free_slice = available_slice - max_sensor_num ;
if ( free_slice < 0 ) {
error_msg = " invalid config, available slice: " + std : : to_string ( available_slice ) + " , required slice: " + std : : to_string ( max_sensor_num ) ;
zlog_error ( zbt , " %s " , error_msg . c_str ( ) ) ;
return 5 ;
}
return 0 ;
}
int SensorScheduler : : GetScheduleConfig ( int & eigen_value_send_interval , int & wave_form_send_interval ,
int & eigen_value_send_duration , int & wave_form_send_duration ,
int & max_sensor_num ) {
eigen_value_send_interval = eigen_value_send_interval_ ;
wave_form_send_interval = wave_form_send_interval_ ;
eigen_value_send_duration = eigen_value_send_duration_ ;
wave_form_send_duration = wave_form_send_duration_ ;
max_sensor_num = max_sensor_num_ ;
return 0 ;
}
int SensorScheduler : : UpdateSensorConfig ( int short_addr ) {
int id = 0 ;
auto iter = short_addr_map_ . find ( short_addr ) ;
if ( iter = = short_addr_map_ . end ( ) ) {
zlog_info ( zct , " cannot find id for short_addr %d " , short_addr ) ;
return 1 ;
} else {
id = iter - > second ;
}
if ( update_ . count ( id ) > 0 ) {
return 0 ;
}
update_ . insert ( id ) ;
UpdateCfg : : WriteCfg ( update_ ) ;
return 0 ;
}
int SensorScheduler : : UpdateConfigResult ( int short_addr , int result ) {
int id = 0 ;
auto iter = short_addr_map_ . find ( short_addr ) ;
if ( iter = = short_addr_map_ . end ( ) ) {
zlog_info ( zct , " cannot find id for short_addr %x " , short_addr ) ;
return 1 ;
} else {
id = iter - > second ;
}
if ( result ! = 0 ) {
return 0 ;
}
zlog_info ( zbt , " [%d] short addr:%d update successfully " , id , short_addr ) ;
update_ . erase ( id ) ;
UpdateCfg : : WriteCfg ( update_ ) ;
return 0 ;
}
int SensorScheduler : : UpgradeSensor ( int short_addr , std : : string sensor_type , int hw_version ,
std : : string current_sw_version , std : : string upgrade_sw_version ) {
int id = 0 ;
auto iter = short_addr_map_ . find ( short_addr ) ;
if ( iter = = short_addr_map_ . end ( ) ) {
zlog_error ( zct , " cannot find id for short_addr %x " , short_addr ) ;
return 1 ;
} else {
id = iter - > second ;
}
UpgradeInfo info ;
info . try_times = 0 ;
info . sensor_type = sensor_type ;
info . hw_version = hw_version ;
info . current_sw_version = current_sw_version ;
info . upgrade_sw_version = upgrade_sw_version ;
long ts = GetLocalTs ( ) ;
info . submit_time = GetUTCTime ( ts ) ;
upgrade_ [ id ] = info ;
zlog_info ( zbt , " [%d] short addr:%x add upgrade info " , id , short_addr ) ;
UpgradeCfg : : WriteCfg ( upgrade_ ) ;
return 0 ;
}
int SensorScheduler : : UpgradeResult ( int short_addr , int result ) {
int id = 0 ;
auto iter = short_addr_map_ . find ( short_addr ) ;
if ( iter = = short_addr_map_ . end ( ) ) {
zlog_info ( zct , " cannot find id for short_addr %x " , short_addr ) ;
return 1 ;
} else {
id = iter - > second ;
}
if ( result = = kUpgradeSuccess | |
result = = kProductTypeMismatch | |
result = = kZigbeeHWMismatch | |
result = = kUpgradeDoneBefore ) {
upgrade_ . erase ( id ) ;
zlog_info ( zbt , " [%d] short addr:%x upgrade successfully " , id , short_addr ) ;
UpgradeCfg : : WriteCfg ( upgrade_ ) ;
} else {
auto upgrade_iter = upgrade_ . find ( id ) ;
if ( upgrade_iter - > second . try_times > = 10 ) {
zlog_error ( zct , " [%d] short addr:%x upgrade 10 time failure " , id , short_addr ) ;
upgrade_ . erase ( id ) ;
UpgradeCfg : : WriteCfg ( upgrade_ ) ;
// TODO: call interface to write into database
}
}
return 0 ;
}
long SensorScheduler : : GetLocalTs ( ) {
auto now = std : : chrono : : system_clock : : now ( ) ;
auto timestamp = std : : chrono : : duration_cast < std : : chrono : : seconds > ( now . time_since_epoch ( ) ) . count ( ) ;
// zlog_debug(zct, "current timestamp:%lld", timestamp);
return timestamp ;
}
long SensorScheduler : : GetLocalWorldTime ( std : : string & world_time ) {
auto now = std : : chrono : : system_clock : : now ( ) ;
std : : time_t now_c = std : : chrono : : system_clock : : to_time_t ( now ) ;
std : : tm * local_time = std : : localtime ( & now_c ) ;
char str [ 100 ] = { 0 } ;
snprintf ( str , sizeof ( str ) , " %04d-%02d-%02d %02d:%02d:%02d " ,
local_time - > tm_year + 1900 , local_time - > tm_mon + 1 , local_time - > tm_mday , local_time - > tm_hour , local_time - > tm_min , local_time - > tm_sec ) ;
world_time = str ;
auto timestamp = std : : chrono : : duration_cast < std : : chrono : : seconds > ( now . time_since_epoch ( ) ) . count ( ) ;
// zlog_debug(zct, "world time:%s, timestamp:%lld", world_time.c_str(), timestamp);
return timestamp ;
}
std : : string SensorScheduler : : GetUTCTime ( long ts ) {
std : : chrono : : time_point < std : : chrono : : system_clock > timePoint = std : : chrono : : system_clock : : from_time_t ( ts ) ;
std : : time_t utcTime = std : : chrono : : system_clock : : to_time_t ( timePoint ) ;
std : : tm * local_time = std : : gmtime ( & utcTime ) ;
local_time - > tm_hour = local_time - > tm_hour + 8 ;
if ( local_time - > tm_hour > 24 ) {
local_time - > tm_hour - = 24 ;
local_time - > tm_mday + = 1 ;
}
char str [ 100 ] = { 0 } ;
snprintf ( str , sizeof ( str ) , " %04d-%02d-%02d %02d:%02d:%02d " ,
local_time - > tm_year + 1900 , local_time - > tm_mon + 1 , local_time - > tm_mday , local_time - > tm_hour , local_time - > tm_min , local_time - > tm_sec ) ;
std : : string world_time = str ;
return world_time ;
}
void SensorScheduler : : UpdateUpgradeInfo ( int id ) {
auto upgrade_iter = upgrade_ . find ( id ) ;
upgrade_iter - > second . try_times + + ;
long ts = GetLocalTs ( ) ;
upgrade_iter - > second . try_world_time1 . push_back ( GetUTCTime ( ts ) ) ;
UpgradeCfg : : WriteCfg ( upgrade_ ) ;
}
void SensorScheduler : : ModifyScheduleTs ( int diff_ts ) {
zlog_warn ( zbt , " [ModifyScheduleTs] adjust ts:%d, from:%ld to:%ld " , diff_ts , start_timestamp_ , start_timestamp_ + diff_ts ) ;
start_timestamp_ + = diff_ts ;
start_ts_str_ = GetUTCTime ( start_timestamp_ ) ;
std : : ifstream schedule_file ( SCHEDULE_CONFIG ) ;
Json : : Reader reader ;
Json : : Value root ;
if ( ! reader . parse ( schedule_file , root , false ) ) {
zlog_error ( zbt , " [ModifyScheduleTs] invalid format, fail to parse %s " , SCHEDULE_CONFIG ) ;
schedule_file . close ( ) ;
return ;
}
schedule_file . close ( ) ;
root [ " schedule_start_timestamp " ] = std : : to_string ( start_timestamp_ ) ;
root [ " schedule_start_time " ] = start_ts_str_ ;
Json : : StyledStreamWriter streamWriter ;
std : : ofstream out_file ( SCHEDULE_CONFIG ) ;
streamWriter . write ( out_file , root ) ;
out_file . close ( ) ;
}
void SensorScheduler : : ClearScheduleCfg ( int short_addr ) {
zlog_warn ( zbt , " [ClearScheduleCfg] clear all schedule config, short_addr:%x " , short_addr ) ;
if ( short_addr = = 0 ) {
zlog_warn ( zbt , " [ClearScheduleCfg] clear all " ) ;
update_ . clear ( ) ;
upgrade_ . clear ( ) ;
short_addr_map_ . clear ( ) ;
ShortAddrCfg : : ClearCfg ( ) ;
UpdateCfg : : ClearCfg ( ) ;
UpgradeCfg : : ClearCfg ( ) ;
wave_feature_set_inst : : instance ( ) . RemoveAllFeatureCfg ( ) ;
wave_feature_set_inst : : instance ( ) . RemoveAllWaveCfg ( ) ;
} else {
UpdateConfigResult ( short_addr , 0 ) ;
UpgradeResult ( short_addr , kUpgradeSuccess ) ;
short_addr_map_ . erase ( short_addr ) ;
ShortAddrCfg : : WriteCfg ( short_addr_map_ ) ;
wave_feature_set_inst : : instance ( ) . RemoveFeatureCfg ( short_addr ) ;
wave_feature_set_inst : : instance ( ) . RemoveWaveCfg ( short_addr ) ;
}
}
void SensorScheduler : : CleanIdleOccupiedSet ( long ts ) {
if ( free_slice_ocuppied_ . size ( ) > 5 ) {
for ( auto it = free_slice_ocuppied_ . begin ( ) ; it ! = free_slice_ocuppied_ . end ( ) ; ) {
if ( ( * it ) < ts ) {
it = free_slice_ocuppied_ . erase ( it ) ;
} else + + it ;
}
}
2025-04-10 15:50:52 +08:00
}
void SensorScheduler : : UseDefaultConfig ( ) {
zlog_info ( zbt , " use default configuration " ) ;
int eigen_value_send_interval = 300 ;
int wave_form_send_interval = 7200 ;
int eigen_value_send_duration = 6 ;
int wave_form_send_duration = 50 ;
int max_sensor_num = 32 ;
// int eigen_value_send_interval = 120;
// int wave_form_send_interval = 240;
// int eigen_value_send_duration = 6;
// int wave_form_send_duration = 40;
// int max_sensor_num = 4;
std : : string error_msg ;
Config ( eigen_value_send_interval ,
wave_form_send_interval ,
eigen_value_send_duration ,
wave_form_send_duration ,
max_sensor_num ,
error_msg ) ;
}