diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index 99ebd62c8e..f50ddc5712 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -150,8 +150,8 @@ class SchedulingAgent { }; std::shared_ptr logger_; - mutable std::mutex watchdog_mtx_; // used to protect the set below - std::set scheduled_processors_; // set was chosen to avoid iterator invalidation + mutable std::mutex watchdog_mtx_; // used to protect the vector below + std::vector> scheduled_processors_; std::unique_ptr watchDogTimer_; std::chrono::milliseconds alert_time_; }; diff --git a/libminifi/src/SchedulingAgent.cpp b/libminifi/src/SchedulingAgent.cpp index f42ca057f6..90da819715 100644 --- a/libminifi/src/SchedulingAgent.cpp +++ b/libminifi/src/SchedulingAgent.cpp @@ -18,10 +18,12 @@ * limitations under the License. */ #include "SchedulingAgent.h" + #include +#include #include #include -#include + #include "core/Processor.h" #include "utils/gsl.h" @@ -68,18 +70,19 @@ nonstd::expected SchedulingAgent::triggerAndCommit(cor return {}; } - auto schedule_it = scheduled_processors_.end(); - + auto processor_scheduling_info = SchedulingInfo(processor); { std::lock_guard lock(watchdog_mtx_); - schedule_it = scheduled_processors_.emplace(processor).first; + scheduled_processors_.push_back(gsl::make_not_null(&processor_scheduling_info)); } - const auto guard = gsl::finally([this, &schedule_it](){ + const auto guard = gsl::finally([this, &processor_scheduling_info](){ std::lock_guard lock(watchdog_mtx_); - scheduled_processors_.erase(schedule_it); + [[maybe_unused]] const auto erased_scheduling_infos_count = std::erase(scheduled_processors_, gsl::make_not_null(&processor_scheduling_info)); + gsl_Assert(1 == erased_scheduling_infos_count); }); + processor->incrementActiveTasks(); auto decrement_task = gsl::finally([processor]() { processor->decrementActiveTask(); }); @@ -107,16 +110,16 @@ nonstd::expected SchedulingAgent::trigger(core::Proces return false; } - auto schedule_it = scheduled_processors_.end(); - + auto processor_scheduling_info = SchedulingInfo(processor); { std::lock_guard lock(watchdog_mtx_); - schedule_it = scheduled_processors_.emplace(processor).first; + scheduled_processors_.push_back(gsl::make_not_null(&processor_scheduling_info)); } - const auto guard = gsl::finally([this, &schedule_it](){ + const auto guard = gsl::finally([this, &processor_scheduling_info](){ std::lock_guard lock(watchdog_mtx_); - scheduled_processors_.erase(schedule_it); + [[maybe_unused]] const auto erased_scheduling_infos_count = std::erase(scheduled_processors_, gsl::make_not_null(&processor_scheduling_info)); + gsl_Assert(1 == erased_scheduling_infos_count); }); processor->incrementActiveTasks(); @@ -141,11 +144,11 @@ void SchedulingAgent::watchDogFunc() { std::lock_guard lock(watchdog_mtx_); auto now = std::chrono::steady_clock::now(); for (const auto& info : scheduled_processors_) { - auto elapsed = now - info.last_alert_time_; + auto elapsed = now - info->last_alert_time_; if (elapsed > alert_time_) { - int64_t elapsed_ms{ std::chrono::duration_cast(now - info.start_time_).count() }; - logger_->log_warn("{}::onTrigger has been running for {} ms in {}", info.name_, elapsed_ms, info.uuid_); - info.last_alert_time_ = now; + int64_t elapsed_ms{ std::chrono::duration_cast(now - info->start_time_).count() }; + logger_->log_warn("{}::onTrigger has been running for {} ms in {}", info->name_, elapsed_ms, info->uuid_); + info->last_alert_time_ = now; } } }