#include "schedule.hpp" #include #include #include #include #include #include #include #include // 当一个传感器来进行通信时 int SensorScheduler::WriteScheduleCfg(long &ts, std::string &world_time) { Json::Value root; root["schedule_start_timestamp"] = std::to_string(ts); root["schedule_start_time"] = world_time; root["eigen_value_send_interval"] = eigen_value_send_interval_; root["eigen_value_send_duration"] = eigen_value_send_duration_; root["wave_form_send_interval"] = wave_form_send_interval_; root["wave_form_send_duration"] = wave_form_send_duration_; root["max_sensor_num"] = max_sensor_num_; root["available_slice"] = available_slice_; root["free_slice"] = free_slice_; root["support_modification"] = true; // int total_eigen_value_send_duration = eigen_value_send_duration_ * max_sensor_num_; // int rest_duration = eigen_value_send_interval_ - total_eigen_value_send_duration; // int slice_per_eigen_value_interval = rest_duration / wave_form_send_duration_; // int seconds_per_slice = rest_duration / slice_per_eigen_value_interval; // int num = wave_form_send_interval_ / eigen_value_send_interval_; // long current_period_start = ts; int *slice_allocation = new int[available_slice_]; int slice_index = 0; // 有时间片的索引 int no_slice_index = 0; // 没有时间片的索引 int k = 1; slice_sensor_id_.clear(); for (int i = 0; i < available_slice_; ++i) { if (slice_index < max_sensor_num_ && (i % 2 == 0 || no_slice_index >= free_slice_)) { slice_allocation[i] = k; sensor_id_nth_slice_[k] = i + 1; slice_sensor_id_.push_back(k); k = k + 1; slice_index++; } else if (no_slice_index < free_slice_) { slice_sensor_id_.push_back(0); slice_allocation[i] = 0; no_slice_index++; } } Json::Value ids; for (int i = 0; i < available_slice_; ++i) { ids.append(slice_allocation[i]); } delete slice_allocation; root["id"] = ids; Json::StyledStreamWriter streamWriter; std::ofstream out_file(SCHEDULE_CONFIG); streamWriter.write(out_file, root); out_file.close(); return 0; } SensorScheduler::SensorScheduler() { support_modification_ = true; std::ifstream schedule_file(SCHEDULE_CONFIG); if (schedule_file.good()) { printf("exist configuration file\n"); Json::Reader reader; Json::Value root; if (!reader.parse(schedule_file, root, false)) { printf("invalid format, fail to parse %s\n", SCHEDULE_CONFIG); schedule_file.close(); return; } schedule_file.close(); if (!root.isObject()) { printf("invalid format, not an object: %s\n", SCHEDULE_CONFIG); return; } start_timestamp_ = std::stol(root["schedule_start_timestamp"].asString()); start_ts_str_ = root["schedule_start_timestamp"].asString(); eigen_value_send_interval_ = root["eigen_value_send_interval"].asInt(); eigen_value_send_duration_ = root["eigen_value_send_duration"].asInt(); wave_form_send_interval_ = root["wave_form_send_interval"].asInt(); wave_form_send_duration_ = root["wave_form_send_duration"].asInt(); max_sensor_num_ = root["max_sensor_num"].asInt(); available_slice_ = root["available_slice"].asInt(); free_slice_ = root["free_slice"].asInt(); support_modification_ = root["support_modification"].asBool(); // size_t id_num = root["id"].size(); const Json::Value ids = root["id"]; int sensor_id = 0; int i = 0; for (const auto& id : ids) { sensor_id = id.asInt(); slice_sensor_id_.push_back(sensor_id); if (sensor_id != 0) { sensor_id_nth_slice_[sensor_id] = i+1; } ++i; } eigen_value_slice_total_seconds_ = eigen_value_send_duration_ * max_sensor_num_; int rest_duration = eigen_value_send_interval_ - eigen_value_slice_total_seconds_; wave_slice_num_per_eigen_interval_ = rest_duration / wave_form_send_duration_; seconds_per_wave_slice_ = rest_duration / wave_slice_num_per_eigen_interval_; } else { printf("use default configuration\n"); int eigen_value_send_interval = 300; int wave_form_send_interval = 7200; int eigen_value_send_duration = 6; int wave_form_send_duration = 50; int max_sensor_num = 32; Config(eigen_value_send_interval, wave_form_send_interval, eigen_value_send_duration, wave_form_send_duration, max_sensor_num); } short_addr_map_.clear(); std::ifstream base_relation_file(BASE_RELATION); if (base_relation_file.good()) { Json::Reader reader; Json::Value root; if (!reader.parse(base_relation_file, root, false)) { printf("invalid format, fail to parse %s\n", BASE_RELATION); base_relation_file.close(); return; } base_relation_file.close(); if (!root.isArray()) { printf("invalid format, not an array: %s\n", BASE_RELATION); return; } if (root.size() == 0) { printf("no element in %s\n", BASE_RELATION); return; } // size_t sensor_num = root.size(); uint16_t short_addr; int index = 0; for (const auto &item : root) { short_addr = item["pan_id"].asInt(); index = item["id"].asInt(); printf("index:%d, short addr:%d\n", index, short_addr); short_addr_map_[short_addr] = index; } } // read upgrade config file: UPGRADE_CONFIG std::ifstream upgrade_file(UPGRADE_CONFIG); if (upgrade_file.good()) { Json::Reader reader; Json::Value root; if (!reader.parse(upgrade_file, root, false)) { printf("invalid format, fail to parse %s\n", UPGRADE_CONFIG); upgrade_file.close(); return; } upgrade_file.close(); if (!root.isArray()) { printf("invalid format, not an array: %s\n", UPGRADE_CONFIG); return; } if (root.size() > 0) { printf("element in %s\n", UPGRADE_CONFIG); UpgradeInfo info; for (const auto &item : root) { info.try_times = item["try_times"].asInt(); info.sensor_type = item["type"].asString(); info.hw_version = item["hw_version"].asInt(); info.current_sw_version = item["current_sw_version"].asString(); info.upgrade_sw_version = item["upgrade_sw_version"].asString(); info.submit_time = item["submit_time"].asString(); for (const auto &time_item : item["try_world_time"]) { info.try_world_time1.push_back(time_item.asString()); } upgrade_[item["id"].asInt()] = info; printf("id:%d need to upgrade from:%s to %s\n", item["id"].asInt(), info.current_sw_version.c_str(), info.upgrade_sw_version.c_str()); } } } // read config update file: CONFIG_UPDATE std::ifstream config_update_file(CONFIG_UPDATE); if (config_update_file.good()) { Json::Reader reader; Json::Value root; if (!reader.parse(config_update_file, root, false)) { printf("invalid format, fail to parse %s\n", CONFIG_UPDATE); config_update_file.close(); return; } config_update_file.close(); if (!root.isArray()) { printf("invalid format, not an array: %s\n", CONFIG_UPDATE); return; } if (root.size() > 0) { printf("element in %s\n", CONFIG_UPDATE); for (const auto &item : root) { update_.insert(item.asInt()); printf("sensor id:%d need to update\n", item.asInt()); } } } } int SensorScheduler::Init() { // 读入schedule.json文件 return 0; } long SensorScheduler::CalcNextTimestamp(int id) { // current_ts_ = GetLocalTs(); if (ts_in_eigen_slice_) { int forward_wave_slice_num = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_; auto wave_slice_iter = sensor_id_nth_slice_.find(id); if (wave_slice_iter == sensor_id_nth_slice_.end()) { printf("[Error] invaild id:%d, not find wave slice id\n", id); long available_ts = current_wave_start_ts_ + eigen_value_send_interval_ + nth_eigen_slice_ * eigen_value_send_duration_; printf("[Error] [%d] next feature send utc time:[%s]\n", id, GetUTCTime(available_ts).c_str()); return available_ts; } int wave_slice = wave_slice_iter->second; // 从1开始 long send_wave_ts = 0; if (wave_slice > forward_wave_slice_num && wave_slice <= forward_wave_slice_num + wave_slice_num_per_eigen_interval_) { // 发送波的时间窗也在本次特征值发送间隔中 for (int i = forward_wave_slice_num; i <= forward_wave_slice_num + wave_slice_num_per_eigen_interval_; ++i) { if (wave_slice - 1 == i) { send_wave_ts = current_wave_start_ts_ + eigen_value_slice_total_seconds_ + (i - forward_wave_slice_num) * seconds_per_wave_slice_; printf("[%d] send wave time:[%s]\n", id, GetUTCTime(send_wave_ts).c_str()); break; } } } long available_ts = 0; auto upgrade_iter = upgrade_.find(id); if (upgrade_iter != upgrade_.end()) { if (upgrade_iter->second.try_times < 10) { for (int i = 0; i < wave_slice_num_per_eigen_interval_; ++i) { if (slice_sensor_id_[i+forward_wave_slice_num] == 0) { // 判断此空闲位置是否被占用 long current_wave_slice_ts = current_wave_start_ts_ + eigen_value_slice_total_seconds_ + i * seconds_per_wave_slice_; if (free_slice_ocuppied_.count(current_wave_slice_ts) == 0) { available_ts = current_wave_slice_ts; free_slice_ocuppied_.insert(available_ts); printf("[%d] %d nth free wave slice will be used to upgrade, utc time:[%s]\n", id, i+forward_wave_slice_num, GetUTCTime(available_ts).c_str()); break; } } } } } if (send_wave_ts > 0 && available_ts > 0) { long min_ts = std::min(send_wave_ts, available_ts); printf("[%d] will use nearest time:%s\n", id, GetUTCTime(min_ts).c_str()); return min_ts; } if (send_wave_ts + available_ts > 0) { long max_ts = std::max(send_wave_ts, available_ts); printf("[%d] will use vaild time:%s\n", id, GetUTCTime(max_ts).c_str()); return max_ts; } } // 如果是在当前波形时间窗中,不管是空闲时间窗,还是发送波形的时间窗,下一个时间窗是特征值 long available_ts = current_wave_start_ts_ + eigen_value_send_interval_ + (id - 1) * eigen_value_send_duration_; printf("[%d] next feature send utc time:[%s]\n", id, GetUTCTime(available_ts).c_str()); return available_ts; } int SensorScheduler::GetAvailableId(int pan_id) { int max_support_sensor[128] = {0}; for (auto it = short_addr_map_.begin(); it != short_addr_map_.end(); ++it) { max_support_sensor[it->second] = 1; } int available_id = 0; for (int i = 1; i < 128; ++i) { if (max_support_sensor[i] == 0) { available_id = i; break; } } printf("[GetAvailableId][%d] pan id : %d\n", available_id, pan_id); Json::Value root; Json::Value item; item["id"] = available_id; item["pan_id"] = pan_id; root.append(item); for (auto it = short_addr_map_.begin(); it != short_addr_map_.end(); ++it) { Json::Value item; item["id"] = it->second; item["pan_id"] = it->first; root.append(item); } Json::StyledStreamWriter streamWriter; std::ofstream out_file(BASE_RELATION); streamWriter.write(out_file, root); out_file.close(); short_addr_map_[pan_id] = available_id; return available_id; } int SensorScheduler::StartSchedule(int pan_id, int &next_duration) { int id = 0; auto iter = short_addr_map_.find(pan_id); if (iter == short_addr_map_.end()) { id = GetAvailableId(pan_id); } else { id = iter->second; } // 通过pan_id找到id // current_ts_ = 1730170142; // 当前时间 // current_ts_ = 1730170148; // 第2个特征值时间片 // current_ts_ = 1730170154; // 第3个特征值时间片 current_ts_ = 1730177342; // 2小时后的第一个时间片 // current_ts_ = GetLocalTs(); nth_wave_start_slice_ = (current_ts_ - start_timestamp_) / wave_form_send_interval_; current_wave_start_ts_ = nth_wave_start_slice_ * wave_form_send_interval_ + start_timestamp_; seconds_in_current_wave_slice_ = current_ts_ - current_wave_start_ts_; nth_eigen_value_slice_ = seconds_in_current_wave_slice_ / eigen_value_send_interval_; seconds_in_current_eigen_slice_ = seconds_in_current_wave_slice_ % eigen_value_send_interval_; ts_in_eigen_slice_ = true; if (seconds_in_current_eigen_slice_ > eigen_value_slice_total_seconds_ - 3) { ts_in_eigen_slice_ = false; } if (ts_in_eigen_slice_) { nth_eigen_slice_ = (seconds_in_current_eigen_slice_ + 2) / eigen_value_send_duration_; } else { nth_wave_slice_ = (seconds_in_current_eigen_slice_ - eigen_value_slice_total_seconds_ + 3) / seconds_per_wave_slice_; } printf("[%d] current utc:%s\n", id, GetUTCTime(current_ts_).c_str()); if (ts_in_eigen_slice_) { if (id == nth_eigen_slice_ + 1) { // 传感器需要执行上送特征值任务, 如果有配置需要下发的话,下发配置 if (update_.count(id)) { // execute config printf("[%d] update config in eigen slice\n", id); current_request_ = kScheduleConfigSensor; return kScheduleConfigSensor; } else { // 执行上送特征值任务 printf("[%d] send eigen value in eigen slice\n", id); current_request_ = kScheduleEigenValue; return kScheduleEigenValue; } } else { printf("[%d] Invalid request, revive in %d eigen slice\n", id, nth_eigen_slice_ + 1); if (id < nth_eigen_slice_ + 1) { // 不正确的请求 long available_ts = current_wave_start_ts_ + eigen_value_send_interval_ + (id - 1) * eigen_value_send_duration_; printf("[%d] wrong time in eigen slice, next feature in next interval send utc time:[%s]\n", id, GetUTCTime(available_ts).c_str()); next_duration = available_ts - current_ts_; } else { long available_ts = current_wave_start_ts_ + (id - 1) * eigen_value_send_duration_; printf("[%d] wrong time in eigen slice, next feature in current interval send utc time:[%s]\n", id, GetUTCTime(available_ts).c_str()); next_duration = available_ts - current_ts_; } return kScheduleWrongTime; } } else { int nth_wave_slice = nth_eigen_value_slice_ * wave_slice_num_per_eigen_interval_ + nth_wave_slice_; auto wave_slice_iter = sensor_id_nth_slice_.find(id); if (wave_slice_iter == sensor_id_nth_slice_.end()) { printf("[%d]invaild id, not find wave slice id, need to check further\n", id); return kScheduleUnknownSensor; } else { if (nth_wave_slice == wave_slice_iter->second) { // 需要发送波形, 需要判断是否有配置,升级需求 auto upgrade_iter = upgrade_.find(id); if (upgrade_iter != upgrade_.end()) { if (upgrade_iter->second.try_times < 10) { current_request_ = kScheduleUpgrade; printf("[%d] in wave slice to upgrade now from version:%s to %s, try time:%d\n", id, upgrade_iter->second.current_sw_version.c_str(), upgrade_iter->second.upgrade_sw_version.c_str(), upgrade_iter->second.try_times); return kScheduleUpgrade; } } if (update_.count(id)) { // execute config printf("[%d] in wave slice to update config\n", id); current_request_ = kScheduleConfigSensor; return kScheduleConfigSensor; } current_request_ = kScheduleWaveForm; return kScheduleWaveForm; } else { if (slice_sensor_id_[nth_wave_slice] == 0) { // idle time auto upgrade_iter = upgrade_.find(id); if (upgrade_iter != upgrade_.end()) { if (upgrade_iter->second.try_times < 10) { current_request_ = kScheduleUpgrade; printf("[%d] in idle to upgrade now from version:%s to %s, try time:%d\n", id, upgrade_iter->second.current_sw_version.c_str(), upgrade_iter->second.upgrade_sw_version.c_str(), upgrade_iter->second.try_times); return kScheduleUpgrade; } } if (update_.count(id)) { // execute config printf("[%d] in idle time to update config\n", id); current_request_ = kScheduleConfigSensor; return kScheduleConfigSensor; } } // wrong time to come long available_ts = current_wave_start_ts_ + eigen_value_send_interval_ + (id - 1) * eigen_value_send_duration_; printf("[%d] wrong time in wave slice, next feature send utc time:[%s]\n", id, GetUTCTime(available_ts).c_str()); next_duration = available_ts - current_ts_; return kScheduleWrongTime; } } } } int SensorScheduler::Config(int eigen_value_send_interval, int wave_form_send_interval, int eigen_value_send_duration, int wave_form_send_duration, int max_sensor_num) { if (!support_modification_) { printf("not support modification"); return 1; } int available_slice = 0; int free_slice = 0; int ret = CalcAvailableSlice(eigen_value_send_interval, wave_form_send_interval, eigen_value_send_duration, wave_form_send_duration, max_sensor_num, available_slice, free_slice); if (ret != 0) { return ret; } eigen_value_send_interval_ = eigen_value_send_interval; eigen_value_send_duration_ = eigen_value_send_duration; wave_form_send_interval_ = wave_form_send_interval; wave_form_send_duration_ = wave_form_send_duration; max_sensor_num_ = max_sensor_num; available_slice_ = available_slice; free_slice_ = free_slice; support_modification_ = true; eigen_value_slice_total_seconds_ = eigen_value_send_duration_ * max_sensor_num_; int rest_duration = eigen_value_send_interval_ - eigen_value_slice_total_seconds_; wave_slice_num_per_eigen_interval_ = rest_duration / wave_form_send_duration_; seconds_per_wave_slice_ = rest_duration / wave_slice_num_per_eigen_interval_; std::string world_time; long current_ts = GetLocalWorldTime(world_time); ret = WriteScheduleCfg(current_ts, world_time); if (ret != 0) { return ret; } start_timestamp_ = current_ts; start_ts_str_ = world_time; return 0; } // 特征值发送间隔300秒,波形发送间隔为7200秒 // 一次特征值发送时长为6秒,波形发送时长为60秒 int SensorScheduler::CalcAvailableSlice(int eigen_value_send_interval, int wave_form_send_interval, int eigen_value_send_duration, int wave_form_send_duration, int max_sensor_num, int &available_slice, int &free_slice) { if (max_sensor_num <= 0) { printf("invalid max_sensor_num:%d\n", max_sensor_num); return 1; } if (max_sensor_num * eigen_value_send_duration > eigen_value_send_interval) { printf("invalid eigen_value_send_interval:%d and eigen_value_send_duration:%d, max_sensor_num:%d\n", eigen_value_send_interval, eigen_value_send_duration, max_sensor_num); return 2; } if (max_sensor_num * wave_form_send_duration > wave_form_send_interval) { printf("invalid wave_form_send_interval:%d and wave_form_send_duration:%d, max_sensor_num:%d\n", wave_form_send_interval, wave_form_send_duration, max_sensor_num); return 3; } if (wave_form_send_interval % eigen_value_send_interval != 0) { printf("invalid eigen_value_send_interval:%d and wave_form_send_interval:%d\n", eigen_value_send_interval, wave_form_send_interval); return 4; } int total_eigen_value_send_duration = eigen_value_send_duration * max_sensor_num; int rest_duration = eigen_value_send_interval - total_eigen_value_send_duration; int slice_per_eigen_value_interval = rest_duration / wave_form_send_duration; available_slice = wave_form_send_interval / eigen_value_send_interval * slice_per_eigen_value_interval; free_slice = available_slice - max_sensor_num; if (free_slice < 0) { printf("invalid config, available slice:%d, required slice:%d\n", available_slice, max_sensor_num); return 5; } return 0; } long SensorScheduler::GetLocalTs() { auto now = std::chrono::system_clock::now(); auto timestamp = std::chrono::duration_cast(now.time_since_epoch()).count(); printf("current timestamp:%lld\n", timestamp); return timestamp; } long SensorScheduler::GetLocalWorldTime(std::string &world_time) { auto now = std::chrono::system_clock::now(); std::time_t now_c = std::chrono::system_clock::to_time_t(now); std::tm *local_time = std::localtime(&now_c); char str[100] = {0}; snprintf(str, sizeof(str), "%04d-%02d-%02d %02d:%02d:%02d", local_time->tm_year + 1900, local_time->tm_mon+1, local_time->tm_mday, local_time->tm_hour, local_time->tm_min, local_time->tm_sec); world_time = str; auto timestamp = std::chrono::duration_cast(now.time_since_epoch()).count(); printf("world time:%s, timestamp:%lld\n", world_time.c_str(), timestamp); return timestamp; } std::string SensorScheduler::GetUTCTime(long ts) { std::chrono::time_point timePoint = std::chrono::system_clock::from_time_t(ts); std::time_t utcTime = std::chrono::system_clock::to_time_t(timePoint); std::tm* local_time = std::gmtime(&utcTime); char str[100] = {0}; snprintf(str, sizeof(str), "%04d-%02d-%02d %02d:%02d:%02d", local_time->tm_year + 1900, local_time->tm_mon+1, local_time->tm_mday, local_time->tm_hour, local_time->tm_min, local_time->tm_sec); std::string world_time = str; return world_time; }