Skip to content

Commit

Permalink
Merge pull request #1845 from tier4/sync-awf-latest
Browse files Browse the repository at this point in the history
chore: sync awf-latest
  • Loading branch information
tier4-autoware-public-bot[bot] authored Feb 19, 2025
2 parents 49fe1bf + 59f2208 commit 342f239
Show file tree
Hide file tree
Showing 6 changed files with 145 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ struct AdvancedCollectorInfo : public CollectorInfoBase
}
};

enum class CollectorStatus { Idle, Processing, Finished };

class CloudCollector
{
public:
Expand All @@ -67,11 +69,12 @@ class CloudCollector
std::unordered_map<std::string, sensor_msgs::msg::PointCloud2::SharedPtr>
get_topic_to_cloud_map();

[[nodiscard]] bool concatenate_finished() const;
[[nodiscard]] CollectorStatus get_status();

void set_info(std::shared_ptr<CollectorInfoBase> collector_info);
[[nodiscard]] std::shared_ptr<CollectorInfoBase> get_info() const;
void show_debug_message();
void reset();

private:
std::shared_ptr<PointCloudConcatenateDataSynchronizerComponent> ros2_parent_node_;
Expand All @@ -81,9 +84,9 @@ class CloudCollector
uint64_t num_of_clouds_;
double timeout_sec_;
bool debug_mode_;
bool concatenate_finished_{false};
std::mutex concatenate_mutex_;
std::shared_ptr<CollectorInfoBase> collector_info_;
CollectorStatus status_;
};

} // namespace autoware::pointcloud_preprocessor
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ class PointCloudConcatenateDataSynchronizerComponent : public rclcpp::Node
std::list<std::shared_ptr<CloudCollector>> cloud_collectors_;
std::unique_ptr<CollectorMatchingStrategy> collector_matching_strategy_;
std::mutex cloud_collectors_mutex_;
bool init_collector_list_ = false;
static constexpr const int num_of_collectors = 3;

// default postfix name for synchronized pointcloud
static constexpr const char * default_sync_topic_postfix = "_synchronized";
Expand Down Expand Up @@ -119,6 +121,7 @@ class PointCloudConcatenateDataSynchronizerComponent : public rclcpp::Node
void check_concat_status(diagnostic_updater::DiagnosticStatusWrapper & stat);
std::string replace_sync_topic_name_postfix(
const std::string & original_topic_name, const std::string & postfix);
void initialize_collector_list();
};

} // namespace autoware::pointcloud_preprocessor
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,19 @@ CloudCollector::CloudCollector(
timeout_sec_(timeout_sec),
debug_mode_(debug_mode)
{
status_ = CollectorStatus::Idle;

const auto period_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::duration<double>(timeout_sec_));

timer_ =
rclcpp::create_timer(ros2_parent_node_, ros2_parent_node_->get_clock(), period_ns, [this]() {
std::lock_guard<std::mutex> concatenate_lock(concatenate_mutex_);
if (concatenate_finished_) return;
if (status_ == CollectorStatus::Finished) return;
concatenate_callback();
});

timer_->cancel();
}

void CloudCollector::set_info(std::shared_ptr<CollectorInfoBase> collector_info)
Expand All @@ -67,17 +71,26 @@ bool CloudCollector::process_pointcloud(
const std::string & topic_name, sensor_msgs::msg::PointCloud2::SharedPtr cloud)
{
std::lock_guard<std::mutex> concatenate_lock(concatenate_mutex_);
if (concatenate_finished_) return false;

// Check if the map already contains an entry for the same topic. This shouldn't happen if the
// parameter 'lidar_timestamp_noise_window' is set correctly.
if (topic_to_cloud_map_.find(topic_name) != topic_to_cloud_map_.end()) {
RCLCPP_WARN_STREAM_THROTTLE(
ros2_parent_node_->get_logger(), *ros2_parent_node_->get_clock(),
std::chrono::milliseconds(10000).count(),
"Topic '" << topic_name
<< "' already exists in the collector. Check the timestamp of the pointcloud.");
if (status_ == CollectorStatus::Finished) {
return false;
}

if (status_ == CollectorStatus::Idle) {
// Add first pointcloud to the collector, restart the timer
status_ = CollectorStatus::Processing;
timer_->reset();
} else if (status_ == CollectorStatus::Processing) {
// Check if the map already contains an entry for the same topic. This shouldn't happen if the
// parameter 'lidar_timestamp_noise_window' is set correctly.
if (topic_to_cloud_map_.find(topic_name) != topic_to_cloud_map_.end()) {
RCLCPP_WARN_STREAM_THROTTLE(
ros2_parent_node_->get_logger(), *ros2_parent_node_->get_clock(),
std::chrono::milliseconds(10000).count(),
"Topic '" << topic_name
<< "' already exists in the collector. Check the timestamp of the pointcloud.");
}
}

topic_to_cloud_map_[topic_name] = cloud;
if (topic_to_cloud_map_.size() == num_of_clouds_) {
concatenate_callback();
Expand All @@ -86,9 +99,10 @@ bool CloudCollector::process_pointcloud(
return true;
}

bool CloudCollector::concatenate_finished() const
CollectorStatus CloudCollector::get_status()
{
return concatenate_finished_;
std::lock_guard<std::mutex> concatenate_lock(concatenate_mutex_);
return status_;
}

void CloudCollector::concatenate_callback()
Expand All @@ -102,10 +116,9 @@ void CloudCollector::concatenate_callback()
timer_->cancel();

auto concatenated_cloud_result = concatenate_pointclouds(topic_to_cloud_map_);

ros2_parent_node_->publish_clouds(std::move(concatenated_cloud_result), collector_info_);

concatenate_finished_ = true;
status_ = CollectorStatus::Finished;
}

ConcatenatedCloudResult CloudCollector::concatenate_pointclouds(
Expand Down Expand Up @@ -153,4 +166,17 @@ void CloudCollector::show_debug_message()
RCLCPP_INFO(ros2_parent_node_->get_logger(), "%s", log_stream.str().c_str());
}

void CloudCollector::reset()
{
std::lock_guard<std::mutex> lock(concatenate_mutex_);

status_ = CollectorStatus::Idle; // Reset status to Idle
topic_to_cloud_map_.clear();
collector_info_ = nullptr;

if (timer_ && !timer_->is_canceled()) {
timer_->cancel();
}
}

} // namespace autoware::pointcloud_preprocessor
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,10 @@ ConcatenatedCloudResult CombineCloudHandler::combine_pointclouds(
{
ConcatenatedCloudResult concatenate_cloud_result;

if (topic_to_cloud_map.empty()) return concatenate_cloud_result;

std::vector<rclcpp::Time> pc_stamps;
pc_stamps.reserve(topic_to_cloud_map.size());
for (const auto & [topic, cloud] : topic_to_cloud_map) {
pc_stamps.emplace_back(cloud->header.stamp);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <pcl_conversions/pcl_conversions.h>

#include <iomanip>
#include <limits>
#include <list>
#include <memory>
#include <optional>
Expand Down Expand Up @@ -178,10 +179,26 @@ std::string PointCloudConcatenateDataSynchronizerComponent::replace_sync_topic_n
return replaced_topic_name;
}

void PointCloudConcatenateDataSynchronizerComponent::initialize_collector_list()
{
// Initialize collector list
for (size_t i = 0; i < num_of_collectors; ++i) {
cloud_collectors_.emplace_back(std::make_shared<CloudCollector>(
std::dynamic_pointer_cast<PointCloudConcatenateDataSynchronizerComponent>(shared_from_this()),
combine_cloud_handler_, params_.input_topics.size(), params_.timeout_sec,
params_.debug_mode));
}
init_collector_list_ = true;
}

void PointCloudConcatenateDataSynchronizerComponent::cloud_callback(
const sensor_msgs::msg::PointCloud2::SharedPtr & input_ptr, const std::string & topic_name)
{
stop_watch_ptr_->toc("processing_time", true);
if (!init_collector_list_) {
initialize_collector_list();
}

double cloud_arrival_time = this->get_clock()->now().seconds();
manage_collector_list();

Expand Down Expand Up @@ -230,20 +247,30 @@ void PointCloudConcatenateDataSynchronizerComponent::cloud_callback(
auto collector = cloud_collector.value();
if (collector) {
cloud_collectors_lock.unlock();
process_success = cloud_collector.value()->process_pointcloud(topic_name, input_ptr);
process_success = collector->process_pointcloud(topic_name, input_ptr);
}
}

// Didn't find matched collector
if (!process_success) {
auto new_cloud_collector = std::make_shared<CloudCollector>(
std::dynamic_pointer_cast<PointCloudConcatenateDataSynchronizerComponent>(shared_from_this()),
combine_cloud_handler_, params_.input_topics.size(), params_.timeout_sec, params_.debug_mode);
// Reuse the collector if the status is IDLE
std::shared_ptr<CloudCollector> selected_collector = nullptr;

cloud_collectors_.push_back(new_cloud_collector);
cloud_collectors_lock.unlock();
auto it = std::find_if(
cloud_collectors_.begin(), cloud_collectors_.end(),
[](const auto & collector) { return collector->get_status() == CollectorStatus::Idle; });

collector_matching_strategy_->set_collector_info(new_cloud_collector, matching_params);
(void)new_cloud_collector->process_pointcloud(topic_name, input_ptr);
if (it != cloud_collectors_.end()) {
selected_collector = *it;
}
cloud_collectors_lock.unlock();
if (selected_collector) {
collector_matching_strategy_->set_collector_info(selected_collector, matching_params);
(void)selected_collector->process_pointcloud(topic_name, input_ptr);
} else {
// Handle case where no suitable collector is found
RCLCPP_WARN(get_logger(), "No available CloudCollector in IDLE state.");
}
}
}

Expand Down Expand Up @@ -321,8 +348,7 @@ void PointCloudConcatenateDataSynchronizerComponent::publish_clouds(
}
}

diagnostic_collector_info_ = collector_info;

diagnostic_collector_info_ = std::move(collector_info);
diagnostic_topic_to_original_stamp_map_ = concatenated_cloud_result.topic_to_original_stamp_map;
diagnostic_updater_.force_update();

Expand All @@ -347,11 +373,51 @@ void PointCloudConcatenateDataSynchronizerComponent::manage_collector_list()
{
std::lock_guard<std::mutex> cloud_collectors_lock(cloud_collectors_mutex_);

for (auto it = cloud_collectors_.begin(); it != cloud_collectors_.end();) {
if ((*it)->concatenate_finished()) {
it = cloud_collectors_.erase(it); // Erase and move the iterator to the next element
} else {
++it; // Move to the next element
int num_processing_collectors = 0;

for (auto & collector : cloud_collectors_) {
if (collector->get_status() == CollectorStatus::Finished) {
collector->reset();
}

if (collector->get_status() == CollectorStatus::Processing) {
num_processing_collectors++;
}
}

if (num_processing_collectors == num_of_collectors) {
auto min_it = cloud_collectors_.end();
constexpr double k_max_timestamp = std::numeric_limits<double>::max();
double min_timestamp = k_max_timestamp;

for (auto it = cloud_collectors_.begin(); it != cloud_collectors_.end(); ++it) {
if ((*it)->get_status() == CollectorStatus::Processing) {
auto info = (*it)->get_info();
double timestamp = k_max_timestamp;

if (auto naive_info = std::dynamic_pointer_cast<NaiveCollectorInfo>(info)) {
timestamp = naive_info->timestamp;
} else if (auto advanced_info = std::dynamic_pointer_cast<AdvancedCollectorInfo>(info)) {
timestamp = advanced_info->timestamp;
} else {
continue;
}

if (timestamp < min_timestamp) {
min_timestamp = timestamp;
min_it = it;
}
}
}

// Reset the collector with the oldest timestamp if found
if (min_it != cloud_collectors_.end()) {
RCLCPP_WARN_STREAM_THROTTLE(
this->get_logger(), *this->get_clock(), 1000,
"Reset the oldest collector because the number of processing collectors ("
<< num_processing_collectors << ") is equal to the limit of (" << num_of_collectors
<< ").");
(*min_it)->reset();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -485,18 +485,21 @@ TEST_F(ConcatenateCloudTest, TestProcessSingleCloud)

auto topic_to_cloud_map = collector_->get_topic_to_cloud_map();
EXPECT_EQ(topic_to_cloud_map["lidar_top"], top_pointcloud_ptr);
EXPECT_FALSE(collector_->concatenate_finished());
EXPECT_EQ(
collector_->get_status(), autoware::pointcloud_preprocessor::CollectorStatus::Processing);
concatenate_node_->manage_collector_list();
EXPECT_FALSE(concatenate_node_->get_cloud_collectors().empty());
EXPECT_EQ(concatenate_node_->get_cloud_collectors().size(), 1);

// Sleep for timeout seconds (200 ms)
std::this_thread::sleep_for(std::chrono::milliseconds(200));
rclcpp::spin_some(concatenate_node_);

// Collector should concatenate and publish the pointcloud, also delete itself.
EXPECT_TRUE(collector_->concatenate_finished());
// Collector should concatenate and publish the pointcloud, and set the status to IDLE
EXPECT_EQ(collector_->get_status(), autoware::pointcloud_preprocessor::CollectorStatus::Finished);
concatenate_node_->manage_collector_list();
EXPECT_TRUE(concatenate_node_->get_cloud_collectors().empty());
EXPECT_EQ(
concatenate_node_->get_cloud_collectors().front()->get_status(),
autoware::pointcloud_preprocessor::CollectorStatus::Idle);
}

TEST_F(ConcatenateCloudTest, TestProcessMultipleCloud)
Expand Down Expand Up @@ -524,9 +527,11 @@ TEST_F(ConcatenateCloudTest, TestProcessMultipleCloud)
collector_->process_pointcloud("lidar_left", left_pointcloud_ptr);
collector_->process_pointcloud("lidar_right", right_pointcloud_ptr);

EXPECT_TRUE(collector_->concatenate_finished());
EXPECT_EQ(collector_->get_status(), autoware::pointcloud_preprocessor::CollectorStatus::Finished);
concatenate_node_->manage_collector_list();
EXPECT_TRUE(concatenate_node_->get_cloud_collectors().empty());
EXPECT_EQ(
concatenate_node_->get_cloud_collectors().front()->get_status(),
autoware::pointcloud_preprocessor::CollectorStatus::Idle);
}

int main(int argc, char ** argv)
Expand Down

0 comments on commit 342f239

Please sign in to comment.