Skip to content

Commit

Permalink
fix(autoware_pointcloud_preprocessor): fix potential double unlock in…
Browse files Browse the repository at this point in the history
… concatenate node (autowarefoundation#10082)

* feat: reuse collectors

Signed-off-by: vividf <yihsiang.fang@tier4.jp>

* fix: potential double unlock

Signed-off-by: vividf <yihsiang.fang@tier4.jp>

* style(pre-commit): autofix

* chore: remove mutex

Signed-off-by: vividf <yihsiang.fang@tier4.jp>

* chore: reset the processing cloud only if needed

Signed-off-by: vividf <yihsiang.fang@tier4.jp>

* chore: fix grammar

Signed-off-by: vividf <yihsiang.fang@tier4.jp>

---------

Signed-off-by: vividf <yihsiang.fang@tier4.jp>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: SakodaShintaro <shintaro.sakoda@tier4.jp>
  • Loading branch information
3 people committed Mar 4, 2025
1 parent 64eaa47 commit 024d36a
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 79 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2024 TIER IV, Inc.
// Copyright 2025 TIER IV, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,7 +59,7 @@ class CloudCollector
std::shared_ptr<CombineCloudHandler> & combine_cloud_handler, int num_of_clouds,
double timeout_sec, bool debug_mode);
bool topic_exists(const std::string & topic_name);
bool process_pointcloud(
void process_pointcloud(
const std::string & topic_name, sensor_msgs::msg::PointCloud2::SharedPtr cloud);
void concatenate_callback();

Expand All @@ -69,7 +69,7 @@ class CloudCollector
std::unordered_map<std::string, sensor_msgs::msg::PointCloud2::SharedPtr>
get_topic_to_cloud_map();

[[nodiscard]] CollectorStatus get_status();
[[nodiscard]] CollectorStatus get_status() const;

void set_info(std::shared_ptr<CollectorInfoBase> collector_info);
[[nodiscard]] std::shared_ptr<CollectorInfoBase> get_info() const;
Expand All @@ -84,7 +84,6 @@ class CloudCollector
uint64_t num_of_clouds_;
double timeout_sec_;
bool debug_mode_;
std::mutex concatenate_mutex_;
std::shared_ptr<CollectorInfoBase> collector_info_;
CollectorStatus status_;
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2024 TIER IV, Inc.
// Copyright 2025 TIER IV, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2024 TIER IV, Inc.
// Copyright 2025 TIER IV, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2023 TIER IV, Inc.
// Copyright 2025 TIER IV, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -16,7 +16,6 @@

#include <list>
#include <memory>
#include <mutex>
#include <string>
#include <unordered_map>
#include <vector>
Expand Down Expand Up @@ -91,7 +90,6 @@ class PointCloudConcatenateDataSynchronizerComponent : public rclcpp::Node
std::shared_ptr<CombineCloudHandler> combine_cloud_handler_;
std::list<std::shared_ptr<CloudCollector>> cloud_collectors_;
std::unique_ptr<CollectorMatchingStrategy> collector_matching_strategy_;
std::mutex cloud_collectors_mutex_;
bool init_collector_list_ = false;
static constexpr const int num_of_collectors = 3;

Expand Down Expand Up @@ -122,6 +120,7 @@ class PointCloudConcatenateDataSynchronizerComponent : public rclcpp::Node
std::string replace_sync_topic_name_postfix(
const std::string & original_topic_name, const std::string & postfix);
void initialize_collector_list();
std::list<std::shared_ptr<CloudCollector>>::iterator find_and_reset_oldest_collector();
};

} // namespace autoware::pointcloud_preprocessor
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ CloudCollector::CloudCollector(

timer_ =
rclcpp::create_timer(ros2_parent_node_, ros2_parent_node_->get_clock(), period_ns, [this]() {
std::lock_guard<std::mutex> concatenate_lock(concatenate_mutex_);
if (status_ == CollectorStatus::Finished) return;
concatenate_callback();
});
Expand All @@ -67,14 +66,9 @@ bool CloudCollector::topic_exists(const std::string & topic_name)
return topic_to_cloud_map_.find(topic_name) != topic_to_cloud_map_.end();
}

bool CloudCollector::process_pointcloud(
void CloudCollector::process_pointcloud(
const std::string & topic_name, sensor_msgs::msg::PointCloud2::SharedPtr cloud)
{
std::lock_guard<std::mutex> concatenate_lock(concatenate_mutex_);
if (status_ == CollectorStatus::Finished) {
return false;
}

if (status_ == CollectorStatus::Idle) {
// Add first pointcloud to the collector, restart the timer
status_ = CollectorStatus::Processing;
Expand All @@ -95,13 +89,10 @@ bool CloudCollector::process_pointcloud(
if (topic_to_cloud_map_.size() == num_of_clouds_) {
concatenate_callback();
}

return true;
}

CollectorStatus CloudCollector::get_status()
CollectorStatus CloudCollector::get_status() const
{
std::lock_guard<std::mutex> concatenate_lock(concatenate_mutex_);
return status_;
}

Expand Down Expand Up @@ -168,8 +159,6 @@ void CloudCollector::show_debug_message()

void CloudCollector::reset()
{
std::lock_guard<std::mutex> lock(concatenate_mutex_);

status_ = CollectorStatus::Idle; // Reset status to Idle
topic_to_cloud_map_.clear();
collector_info_ = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,9 @@ void PointCloudConcatenateDataSynchronizerComponent::cloud_callback(
this->get_logger(), *this->get_clock(), 1000, "Empty sensor points!");
}

// protect cloud collectors list
std::unique_lock<std::mutex> cloud_collectors_lock(cloud_collectors_mutex_);
std::shared_ptr<CloudCollector> selected_collector = nullptr;

// For each callback, check whether there is a exist collector that matches this cloud
// For each callback, check whether there is an existing collector that matches this cloud
std::optional<std::shared_ptr<CloudCollector>> cloud_collector = std::nullopt;
MatchingParams matching_params;
matching_params.topic_name = topic_name;
Expand All @@ -242,36 +241,36 @@ void PointCloudConcatenateDataSynchronizerComponent::cloud_callback(
collector_matching_strategy_->match_cloud_to_collector(cloud_collectors_, matching_params);
}

bool process_success = false;
if (cloud_collector.has_value()) {
auto collector = cloud_collector.value();
if (collector) {
cloud_collectors_lock.unlock();
process_success = collector->process_pointcloud(topic_name, input_ptr);
}
if (cloud_collector.has_value() && cloud_collector.value()) {
selected_collector = cloud_collector.value();
}

// Didn't find matched collector
if (!process_success) {
// Reuse the collector if the status is IDLE
std::shared_ptr<CloudCollector> selected_collector = nullptr;

if (!selected_collector || selected_collector->get_status() == CollectorStatus::Finished) {
// Reuse a collector if one is in the IDLE state
auto it = std::find_if(
cloud_collectors_.begin(), cloud_collectors_.end(),
[](const auto & collector) { return collector->get_status() == CollectorStatus::Idle; });

if (it != cloud_collectors_.end()) {
selected_collector = *it;
} else {
auto oldest_it = find_and_reset_oldest_collector();
if (oldest_it != cloud_collectors_.end()) {
selected_collector = *oldest_it;
} else {
// Handle case where no suitable collector is found
RCLCPP_ERROR(get_logger(), "No available CloudCollector in IDLE state.");
return;
}
}
cloud_collectors_lock.unlock();

if (selected_collector) {
collector_matching_strategy_->set_collector_info(selected_collector, matching_params);
(void)selected_collector->process_pointcloud(topic_name, input_ptr);
} else {
// Handle case where no suitable collector is found
RCLCPP_WARN(get_logger(), "No available CloudCollector in IDLE state.");
}
}

selected_collector->process_pointcloud(topic_name, input_ptr);
}

void PointCloudConcatenateDataSynchronizerComponent::twist_callback(
Expand Down Expand Up @@ -371,55 +370,51 @@ void PointCloudConcatenateDataSynchronizerComponent::publish_clouds(

void PointCloudConcatenateDataSynchronizerComponent::manage_collector_list()
{
std::lock_guard<std::mutex> cloud_collectors_lock(cloud_collectors_mutex_);

int num_processing_collectors = 0;

for (auto & collector : cloud_collectors_) {
if (collector->get_status() == CollectorStatus::Finished) {
collector->reset();
}

if (collector->get_status() == CollectorStatus::Processing) {
num_processing_collectors++;
}
}
}

if (num_processing_collectors == num_of_collectors) {
auto min_it = cloud_collectors_.end();
constexpr double k_max_timestamp = std::numeric_limits<double>::max();
double min_timestamp = k_max_timestamp;

for (auto it = cloud_collectors_.begin(); it != cloud_collectors_.end(); ++it) {
if ((*it)->get_status() == CollectorStatus::Processing) {
auto info = (*it)->get_info();
double timestamp = k_max_timestamp;

if (auto naive_info = std::dynamic_pointer_cast<NaiveCollectorInfo>(info)) {
timestamp = naive_info->timestamp;
} else if (auto advanced_info = std::dynamic_pointer_cast<AdvancedCollectorInfo>(info)) {
timestamp = advanced_info->timestamp;
} else {
continue;
}
std::list<std::shared_ptr<CloudCollector>>::iterator
PointCloudConcatenateDataSynchronizerComponent::find_and_reset_oldest_collector()
{
auto min_it = cloud_collectors_.end();
constexpr double k_max_timestamp = std::numeric_limits<double>::max();
double min_timestamp = k_max_timestamp;

for (auto it = cloud_collectors_.begin(); it != cloud_collectors_.end(); ++it) {
if ((*it)->get_status() == CollectorStatus::Processing) {
auto info = (*it)->get_info();
double timestamp = k_max_timestamp;

if (auto naive_info = std::dynamic_pointer_cast<NaiveCollectorInfo>(info)) {
timestamp = naive_info->timestamp;
} else if (auto advanced_info = std::dynamic_pointer_cast<AdvancedCollectorInfo>(info)) {
timestamp = advanced_info->timestamp;
} else {
continue;
}

if (timestamp < min_timestamp) {
min_timestamp = timestamp;
min_it = it;
}
if (timestamp < min_timestamp) {
min_timestamp = timestamp;
min_it = it;
}
}
}

// Reset the collector with the oldest timestamp if found
if (min_it != cloud_collectors_.end()) {
RCLCPP_WARN_STREAM_THROTTLE(
this->get_logger(), *this->get_clock(), 1000,
"Reset the oldest collector because the number of processing collectors ("
<< num_processing_collectors << ") is equal to the limit of (" << num_of_collectors
<< ").");
(*min_it)->reset();
}
// Reset the processing collector with the oldest timestamp if found
if (min_it != cloud_collectors_.end()) {
RCLCPP_WARN_STREAM_THROTTLE(
this->get_logger(), *this->get_clock(), 1000,
"Reset the oldest collector because the number of processing collectors is equal to the "
"limit of ("
<< num_of_collectors << ").");
(*min_it)->reset();
}

return min_it;
}

std::string PointCloudConcatenateDataSynchronizerComponent::format_timestamp(double timestamp)
Expand Down

0 comments on commit 024d36a

Please sign in to comment.