From 59f220810320805ab3a80599f763f922f9174733 Mon Sep 17 00:00:00 2001 From: "Yi-Hsiang Fang (Vivid)" <146902905+vividf@users.noreply.github.com> Date: Wed, 19 Feb 2025 15:44:06 +0900 Subject: [PATCH] feat(autoware_pointcloud_preprocessor): reuse collectors to reduce creation of collector and timer (#10074) * feat: reuse collectors Signed-off-by: vividf * chore: remove for-loop to find_if Signed-off-by: vividf * chore: remove set period Signed-off-by: vividf * chore: remove oldest timestamp Signed-off-by: vividf * chore: fix managing collector list logic Signed-off-by: vividf * chore: fix logging Signed-off-by: vividf * feat: change to THROTTLE Signed-off-by: vividf * feat: initialize required number of collectors when the node start Signed-off-by: vividf * chore: fix init collector Signed-off-by: vividf * chore: fix grammar Signed-off-by: vividf --------- Signed-off-by: vividf --- .../concatenate_data/cloud_collector.hpp | 7 +- .../concatenate_and_time_sync_node.hpp | 3 + .../src/concatenate_data/cloud_collector.cpp | 56 ++++++++--- .../combine_cloud_handler.cpp | 3 + .../concatenate_and_time_sync_node.cpp | 96 ++++++++++++++++--- .../test/test_concatenate_node_unit.cpp | 19 ++-- 6 files changed, 145 insertions(+), 39 deletions(-) diff --git a/sensing/autoware_pointcloud_preprocessor/include/autoware/pointcloud_preprocessor/concatenate_data/cloud_collector.hpp b/sensing/autoware_pointcloud_preprocessor/include/autoware/pointcloud_preprocessor/concatenate_data/cloud_collector.hpp index 13604569df9a8..3e63c7ba2264d 100644 --- a/sensing/autoware_pointcloud_preprocessor/include/autoware/pointcloud_preprocessor/concatenate_data/cloud_collector.hpp +++ b/sensing/autoware_pointcloud_preprocessor/include/autoware/pointcloud_preprocessor/concatenate_data/cloud_collector.hpp @@ -49,6 +49,8 @@ struct AdvancedCollectorInfo : public CollectorInfoBase } }; +enum class CollectorStatus { Idle, Processing, Finished }; + class CloudCollector { public: @@ -67,11 +69,12 @@ class CloudCollector std::unordered_map get_topic_to_cloud_map(); - [[nodiscard]] bool concatenate_finished() const; + [[nodiscard]] CollectorStatus get_status(); void set_info(std::shared_ptr collector_info); [[nodiscard]] std::shared_ptr get_info() const; void show_debug_message(); + void reset(); private: std::shared_ptr ros2_parent_node_; @@ -81,9 +84,9 @@ class CloudCollector uint64_t num_of_clouds_; double timeout_sec_; bool debug_mode_; - bool concatenate_finished_{false}; std::mutex concatenate_mutex_; std::shared_ptr collector_info_; + CollectorStatus status_; }; } // namespace autoware::pointcloud_preprocessor diff --git a/sensing/autoware_pointcloud_preprocessor/include/autoware/pointcloud_preprocessor/concatenate_data/concatenate_and_time_sync_node.hpp b/sensing/autoware_pointcloud_preprocessor/include/autoware/pointcloud_preprocessor/concatenate_data/concatenate_and_time_sync_node.hpp index 654593e317ad9..fc422c7d2e54e 100644 --- a/sensing/autoware_pointcloud_preprocessor/include/autoware/pointcloud_preprocessor/concatenate_data/concatenate_and_time_sync_node.hpp +++ b/sensing/autoware_pointcloud_preprocessor/include/autoware/pointcloud_preprocessor/concatenate_data/concatenate_and_time_sync_node.hpp @@ -92,6 +92,8 @@ class PointCloudConcatenateDataSynchronizerComponent : public rclcpp::Node std::list> cloud_collectors_; std::unique_ptr collector_matching_strategy_; std::mutex cloud_collectors_mutex_; + bool init_collector_list_ = false; + static constexpr const int num_of_collectors = 3; // default postfix name for synchronized pointcloud static constexpr const char * default_sync_topic_postfix = "_synchronized"; @@ -119,6 +121,7 @@ class PointCloudConcatenateDataSynchronizerComponent : public rclcpp::Node void check_concat_status(diagnostic_updater::DiagnosticStatusWrapper & stat); std::string replace_sync_topic_name_postfix( const std::string & original_topic_name, const std::string & postfix); + void initialize_collector_list(); }; } // namespace autoware::pointcloud_preprocessor diff --git a/sensing/autoware_pointcloud_preprocessor/src/concatenate_data/cloud_collector.cpp b/sensing/autoware_pointcloud_preprocessor/src/concatenate_data/cloud_collector.cpp index 63ee23d204788..57ce5c400188a 100644 --- a/sensing/autoware_pointcloud_preprocessor/src/concatenate_data/cloud_collector.cpp +++ b/sensing/autoware_pointcloud_preprocessor/src/concatenate_data/cloud_collector.cpp @@ -37,15 +37,19 @@ CloudCollector::CloudCollector( timeout_sec_(timeout_sec), debug_mode_(debug_mode) { + status_ = CollectorStatus::Idle; + const auto period_ns = std::chrono::duration_cast( std::chrono::duration(timeout_sec_)); timer_ = rclcpp::create_timer(ros2_parent_node_, ros2_parent_node_->get_clock(), period_ns, [this]() { std::lock_guard concatenate_lock(concatenate_mutex_); - if (concatenate_finished_) return; + if (status_ == CollectorStatus::Finished) return; concatenate_callback(); }); + + timer_->cancel(); } void CloudCollector::set_info(std::shared_ptr collector_info) @@ -67,17 +71,26 @@ bool CloudCollector::process_pointcloud( const std::string & topic_name, sensor_msgs::msg::PointCloud2::SharedPtr cloud) { std::lock_guard concatenate_lock(concatenate_mutex_); - if (concatenate_finished_) return false; - - // Check if the map already contains an entry for the same topic. This shouldn't happen if the - // parameter 'lidar_timestamp_noise_window' is set correctly. - if (topic_to_cloud_map_.find(topic_name) != topic_to_cloud_map_.end()) { - RCLCPP_WARN_STREAM_THROTTLE( - ros2_parent_node_->get_logger(), *ros2_parent_node_->get_clock(), - std::chrono::milliseconds(10000).count(), - "Topic '" << topic_name - << "' already exists in the collector. Check the timestamp of the pointcloud."); + if (status_ == CollectorStatus::Finished) { + return false; } + + if (status_ == CollectorStatus::Idle) { + // Add first pointcloud to the collector, restart the timer + status_ = CollectorStatus::Processing; + timer_->reset(); + } else if (status_ == CollectorStatus::Processing) { + // Check if the map already contains an entry for the same topic. This shouldn't happen if the + // parameter 'lidar_timestamp_noise_window' is set correctly. + if (topic_to_cloud_map_.find(topic_name) != topic_to_cloud_map_.end()) { + RCLCPP_WARN_STREAM_THROTTLE( + ros2_parent_node_->get_logger(), *ros2_parent_node_->get_clock(), + std::chrono::milliseconds(10000).count(), + "Topic '" << topic_name + << "' already exists in the collector. Check the timestamp of the pointcloud."); + } + } + topic_to_cloud_map_[topic_name] = cloud; if (topic_to_cloud_map_.size() == num_of_clouds_) { concatenate_callback(); @@ -86,9 +99,10 @@ bool CloudCollector::process_pointcloud( return true; } -bool CloudCollector::concatenate_finished() const +CollectorStatus CloudCollector::get_status() { - return concatenate_finished_; + std::lock_guard concatenate_lock(concatenate_mutex_); + return status_; } void CloudCollector::concatenate_callback() @@ -102,10 +116,9 @@ void CloudCollector::concatenate_callback() timer_->cancel(); auto concatenated_cloud_result = concatenate_pointclouds(topic_to_cloud_map_); - ros2_parent_node_->publish_clouds(std::move(concatenated_cloud_result), collector_info_); - concatenate_finished_ = true; + status_ = CollectorStatus::Finished; } ConcatenatedCloudResult CloudCollector::concatenate_pointclouds( @@ -153,4 +166,17 @@ void CloudCollector::show_debug_message() RCLCPP_INFO(ros2_parent_node_->get_logger(), "%s", log_stream.str().c_str()); } +void CloudCollector::reset() +{ + std::lock_guard lock(concatenate_mutex_); + + status_ = CollectorStatus::Idle; // Reset status to Idle + topic_to_cloud_map_.clear(); + collector_info_ = nullptr; + + if (timer_ && !timer_->is_canceled()) { + timer_->cancel(); + } +} + } // namespace autoware::pointcloud_preprocessor diff --git a/sensing/autoware_pointcloud_preprocessor/src/concatenate_data/combine_cloud_handler.cpp b/sensing/autoware_pointcloud_preprocessor/src/concatenate_data/combine_cloud_handler.cpp index d68218314830b..8574e3e8a35ae 100644 --- a/sensing/autoware_pointcloud_preprocessor/src/concatenate_data/combine_cloud_handler.cpp +++ b/sensing/autoware_pointcloud_preprocessor/src/concatenate_data/combine_cloud_handler.cpp @@ -189,7 +189,10 @@ ConcatenatedCloudResult CombineCloudHandler::combine_pointclouds( { ConcatenatedCloudResult concatenate_cloud_result; + if (topic_to_cloud_map.empty()) return concatenate_cloud_result; + std::vector pc_stamps; + pc_stamps.reserve(topic_to_cloud_map.size()); for (const auto & [topic, cloud] : topic_to_cloud_map) { pc_stamps.emplace_back(cloud->header.stamp); } diff --git a/sensing/autoware_pointcloud_preprocessor/src/concatenate_data/concatenate_and_time_sync_node.cpp b/sensing/autoware_pointcloud_preprocessor/src/concatenate_data/concatenate_and_time_sync_node.cpp index e168781c647ed..693da939ed196 100644 --- a/sensing/autoware_pointcloud_preprocessor/src/concatenate_data/concatenate_and_time_sync_node.cpp +++ b/sensing/autoware_pointcloud_preprocessor/src/concatenate_data/concatenate_and_time_sync_node.cpp @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -178,10 +179,26 @@ std::string PointCloudConcatenateDataSynchronizerComponent::replace_sync_topic_n return replaced_topic_name; } +void PointCloudConcatenateDataSynchronizerComponent::initialize_collector_list() +{ + // Initialize collector list + for (size_t i = 0; i < num_of_collectors; ++i) { + cloud_collectors_.emplace_back(std::make_shared( + std::dynamic_pointer_cast(shared_from_this()), + combine_cloud_handler_, params_.input_topics.size(), params_.timeout_sec, + params_.debug_mode)); + } + init_collector_list_ = true; +} + void PointCloudConcatenateDataSynchronizerComponent::cloud_callback( const sensor_msgs::msg::PointCloud2::SharedPtr & input_ptr, const std::string & topic_name) { stop_watch_ptr_->toc("processing_time", true); + if (!init_collector_list_) { + initialize_collector_list(); + } + double cloud_arrival_time = this->get_clock()->now().seconds(); manage_collector_list(); @@ -230,20 +247,30 @@ void PointCloudConcatenateDataSynchronizerComponent::cloud_callback( auto collector = cloud_collector.value(); if (collector) { cloud_collectors_lock.unlock(); - process_success = cloud_collector.value()->process_pointcloud(topic_name, input_ptr); + process_success = collector->process_pointcloud(topic_name, input_ptr); } } + // Didn't find matched collector if (!process_success) { - auto new_cloud_collector = std::make_shared( - std::dynamic_pointer_cast(shared_from_this()), - combine_cloud_handler_, params_.input_topics.size(), params_.timeout_sec, params_.debug_mode); + // Reuse the collector if the status is IDLE + std::shared_ptr selected_collector = nullptr; - cloud_collectors_.push_back(new_cloud_collector); - cloud_collectors_lock.unlock(); + auto it = std::find_if( + cloud_collectors_.begin(), cloud_collectors_.end(), + [](const auto & collector) { return collector->get_status() == CollectorStatus::Idle; }); - collector_matching_strategy_->set_collector_info(new_cloud_collector, matching_params); - (void)new_cloud_collector->process_pointcloud(topic_name, input_ptr); + if (it != cloud_collectors_.end()) { + selected_collector = *it; + } + cloud_collectors_lock.unlock(); + if (selected_collector) { + collector_matching_strategy_->set_collector_info(selected_collector, matching_params); + (void)selected_collector->process_pointcloud(topic_name, input_ptr); + } else { + // Handle case where no suitable collector is found + RCLCPP_WARN(get_logger(), "No available CloudCollector in IDLE state."); + } } } @@ -321,8 +348,7 @@ void PointCloudConcatenateDataSynchronizerComponent::publish_clouds( } } - diagnostic_collector_info_ = collector_info; - + diagnostic_collector_info_ = std::move(collector_info); diagnostic_topic_to_original_stamp_map_ = concatenated_cloud_result.topic_to_original_stamp_map; diagnostic_updater_.force_update(); @@ -347,11 +373,51 @@ void PointCloudConcatenateDataSynchronizerComponent::manage_collector_list() { std::lock_guard cloud_collectors_lock(cloud_collectors_mutex_); - for (auto it = cloud_collectors_.begin(); it != cloud_collectors_.end();) { - if ((*it)->concatenate_finished()) { - it = cloud_collectors_.erase(it); // Erase and move the iterator to the next element - } else { - ++it; // Move to the next element + int num_processing_collectors = 0; + + for (auto & collector : cloud_collectors_) { + if (collector->get_status() == CollectorStatus::Finished) { + collector->reset(); + } + + if (collector->get_status() == CollectorStatus::Processing) { + num_processing_collectors++; + } + } + + if (num_processing_collectors == num_of_collectors) { + auto min_it = cloud_collectors_.end(); + constexpr double k_max_timestamp = std::numeric_limits::max(); + double min_timestamp = k_max_timestamp; + + for (auto it = cloud_collectors_.begin(); it != cloud_collectors_.end(); ++it) { + if ((*it)->get_status() == CollectorStatus::Processing) { + auto info = (*it)->get_info(); + double timestamp = k_max_timestamp; + + if (auto naive_info = std::dynamic_pointer_cast(info)) { + timestamp = naive_info->timestamp; + } else if (auto advanced_info = std::dynamic_pointer_cast(info)) { + timestamp = advanced_info->timestamp; + } else { + continue; + } + + if (timestamp < min_timestamp) { + min_timestamp = timestamp; + min_it = it; + } + } + } + + // Reset the collector with the oldest timestamp if found + if (min_it != cloud_collectors_.end()) { + RCLCPP_WARN_STREAM_THROTTLE( + this->get_logger(), *this->get_clock(), 1000, + "Reset the oldest collector because the number of processing collectors (" + << num_processing_collectors << ") is equal to the limit of (" << num_of_collectors + << ")."); + (*min_it)->reset(); } } } diff --git a/sensing/autoware_pointcloud_preprocessor/test/test_concatenate_node_unit.cpp b/sensing/autoware_pointcloud_preprocessor/test/test_concatenate_node_unit.cpp index bc3c7e9538c84..75bcfdefcd214 100644 --- a/sensing/autoware_pointcloud_preprocessor/test/test_concatenate_node_unit.cpp +++ b/sensing/autoware_pointcloud_preprocessor/test/test_concatenate_node_unit.cpp @@ -485,18 +485,21 @@ TEST_F(ConcatenateCloudTest, TestProcessSingleCloud) auto topic_to_cloud_map = collector_->get_topic_to_cloud_map(); EXPECT_EQ(topic_to_cloud_map["lidar_top"], top_pointcloud_ptr); - EXPECT_FALSE(collector_->concatenate_finished()); + EXPECT_EQ( + collector_->get_status(), autoware::pointcloud_preprocessor::CollectorStatus::Processing); concatenate_node_->manage_collector_list(); - EXPECT_FALSE(concatenate_node_->get_cloud_collectors().empty()); + EXPECT_EQ(concatenate_node_->get_cloud_collectors().size(), 1); // Sleep for timeout seconds (200 ms) std::this_thread::sleep_for(std::chrono::milliseconds(200)); rclcpp::spin_some(concatenate_node_); - // Collector should concatenate and publish the pointcloud, also delete itself. - EXPECT_TRUE(collector_->concatenate_finished()); + // Collector should concatenate and publish the pointcloud, and set the status to IDLE + EXPECT_EQ(collector_->get_status(), autoware::pointcloud_preprocessor::CollectorStatus::Finished); concatenate_node_->manage_collector_list(); - EXPECT_TRUE(concatenate_node_->get_cloud_collectors().empty()); + EXPECT_EQ( + concatenate_node_->get_cloud_collectors().front()->get_status(), + autoware::pointcloud_preprocessor::CollectorStatus::Idle); } TEST_F(ConcatenateCloudTest, TestProcessMultipleCloud) @@ -524,9 +527,11 @@ TEST_F(ConcatenateCloudTest, TestProcessMultipleCloud) collector_->process_pointcloud("lidar_left", left_pointcloud_ptr); collector_->process_pointcloud("lidar_right", right_pointcloud_ptr); - EXPECT_TRUE(collector_->concatenate_finished()); + EXPECT_EQ(collector_->get_status(), autoware::pointcloud_preprocessor::CollectorStatus::Finished); concatenate_node_->manage_collector_list(); - EXPECT_TRUE(concatenate_node_->get_cloud_collectors().empty()); + EXPECT_EQ( + concatenate_node_->get_cloud_collectors().front()->get_status(), + autoware::pointcloud_preprocessor::CollectorStatus::Idle); } int main(int argc, char ** argv)