diff --git a/src/agnocastlib/include/agnocast_mq.hpp b/src/agnocastlib/include/agnocast_mq.hpp index 2a1d97b6..773b9272 100644 --- a/src/agnocastlib/include/agnocast_mq.hpp +++ b/src/agnocastlib/include/agnocast_mq.hpp @@ -7,4 +7,9 @@ struct MqMsgAgnocast { }; +struct MqMsgROS2Publish +{ + bool should_terminate; +}; + } // namespace agnocast diff --git a/src/agnocastlib/include/agnocast_publisher.hpp b/src/agnocastlib/include/agnocast_publisher.hpp index 2ad9f88e..8be10e5f 100644 --- a/src/agnocastlib/include/agnocast_publisher.hpp +++ b/src/agnocastlib/include/agnocast_publisher.hpp @@ -18,6 +18,9 @@ #include #include +#include +#include +#include namespace agnocast { @@ -53,7 +56,14 @@ class Publisher // not currently implemented. std::unordered_map opened_mqs_; PublisherOptions options_; + + // ROS2 publish related variables typename rclcpp::Publisher::SharedPtr ros2_publisher_; + mqd_t ros2_publish_mq_; + std::string ros2_publish_mq_name_; + std::queue> ros2_message_queue_; + std::thread ros2_publish_thread_; + std::mutex ros2_publish_mtx_; public: using SharedPtr = std::shared_ptr>; @@ -81,6 +91,87 @@ class Publisher } id_ = initialize_publisher(publisher_pid_, topic_name_); + + ros2_publish_mq_name_ = create_mq_name_for_ros2_publish(topic_name_, id_); + + const int mq_mode = 0666; + struct mq_attr attr = {}; + attr.mq_flags = 0; + attr.mq_maxmsg = 1; + attr.mq_msgsize = sizeof(MqMsgROS2Publish); + attr.mq_curmsgs = 0; + ros2_publish_mq_ = + mq_open(ros2_publish_mq_name_.c_str(), O_CREAT | O_WRONLY | O_NONBLOCK, mq_mode, &attr); + if (ros2_publish_mq_ == -1) { + RCLCPP_ERROR(logger, "mq_open failed: %s", strerror(errno)); + close(agnocast_fd); + exit(EXIT_FAILURE); + } + + ros2_publish_thread_ = std::thread([this]() { do_ros2_publish(); }); + } + + ~Publisher() + { + MqMsgROS2Publish mq_msg = {}; + mq_msg.should_terminate = true; + if (mq_send(ros2_publish_mq_, reinterpret_cast(&mq_msg), sizeof(mq_msg), 0) == -1) { + RCLCPP_ERROR(logger, "mq_send failed: %s", strerror(errno)); + } + + ros2_publish_thread_.join(); + + if (mq_close(ros2_publish_mq_) == -1) { + RCLCPP_ERROR(logger, "mq_close failed: %s", strerror(errno)); + } + + if (mq_unlink(ros2_publish_mq_name_.c_str()) == -1) { + RCLCPP_ERROR(logger, "mq_unlink failed: %s", strerror(errno)); + } + } + + void do_ros2_publish() + { + mqd_t mq = mq_open(ros2_publish_mq_name_.c_str(), O_RDONLY); + if (mq == -1) { + RCLCPP_ERROR(logger, "mq_open failed: %s", strerror(errno)); + close(agnocast_fd); + exit(EXIT_FAILURE); + } + + while (true) { + MqMsgROS2Publish mq_msg = {}; + auto ret = mq_receive(mq, reinterpret_cast(&mq_msg), sizeof(mq_msg), nullptr); + if (ret == -1) { + RCLCPP_ERROR(logger, "mq_receive failed: %s", strerror(errno)); + close(agnocast_fd); + exit(EXIT_FAILURE); + } + + if (mq_msg.should_terminate) { + break; + } + + while (true) { + ipc_shared_ptr message; + + { + std::scoped_lock lock(ros2_publish_mtx_); + if (ros2_message_queue_.empty()) { + break; + } + + message = std::move(ros2_message_queue_.front()); + ros2_message_queue_.pop(); + } + + ros2_publisher_->publish(*message.get()); + } + } + + if (mq_close(mq) == -1) { + RCLCPP_ERROR(logger, "mq_close failed: %s", strerror(errno)); + } } ipc_shared_ptr borrow_loaned_message() @@ -112,11 +203,24 @@ class Publisher } if (options_.do_always_ros2_publish || ros2_publisher_->get_subscription_count() > 0) { - const MessageT * raw = message.get(); - ros2_publisher_->publish(*raw); + { + std::lock_guard lock(ros2_publish_mtx_); + ros2_message_queue_.push(std::move(message)); + } + + MqMsgROS2Publish mq_msg = {}; + mq_msg.should_terminate = false; + if (mq_send(ros2_publish_mq_, reinterpret_cast(&mq_msg), sizeof(mq_msg), 0) == -1) { + // If it returns EAGAIN, it means mq_send has already been executed, but the ros2 publish + // thread hasn't received it yet. Thus, there's no need to send it again since the + // notification has already been sent. + if (errno != EAGAIN) { + RCLCPP_ERROR(logger, "mq_send failed: %s", strerror(errno)); + } + } + } else { + message.reset(); } - - message.reset(); } uint32_t get_subscription_count() const diff --git a/src/agnocastlib/include/agnocast_utils.hpp b/src/agnocastlib/include/agnocast_utils.hpp index 5f48680b..5e38298c 100644 --- a/src/agnocastlib/include/agnocast_utils.hpp +++ b/src/agnocastlib/include/agnocast_utils.hpp @@ -11,7 +11,10 @@ namespace agnocast extern rclcpp::Logger logger; void validate_ld_preload(); -std::string create_mq_name(const std::string & topic_name, const topic_local_id_t id); +std::string create_mq_name_for_agnocast_publish( + const std::string & topic_name, const topic_local_id_t id); +std::string create_mq_name_for_ros2_publish( + const std::string & topic_name, const topic_local_id_t id); std::string create_shm_name(const pid_t pid); uint64_t agnocast_get_timestamp(); diff --git a/src/agnocastlib/src/agnocast_publisher.cpp b/src/agnocastlib/src/agnocast_publisher.cpp index e9005845..c28d5204 100644 --- a/src/agnocastlib/src/agnocast_publisher.cpp +++ b/src/agnocastlib/src/agnocast_publisher.cpp @@ -69,7 +69,7 @@ union ioctl_publish_args publish_core( for (uint32_t i = 0; i < publish_args.ret_subscriber_num; i++) { const topic_local_id_t subscriber_id = publish_args.ret_subscriber_ids[i]; - const std::string mq_name = create_mq_name(topic_name, subscriber_id); + const std::string mq_name = create_mq_name_for_agnocast_publish(topic_name, subscriber_id); mqd_t mq = 0; if (opened_mqs.find(mq_name) != opened_mqs.end()) { mq = opened_mqs[mq_name]; diff --git a/src/agnocastlib/src/agnocast_subscription.cpp b/src/agnocastlib/src/agnocast_subscription.cpp index 9af7496b..18d9e72c 100644 --- a/src/agnocastlib/src/agnocast_subscription.cpp +++ b/src/agnocastlib/src/agnocast_subscription.cpp @@ -43,7 +43,7 @@ mqd_t open_mq_for_subscription( const std::string & topic_name, const topic_local_id_t subscriber_id, std::pair & mq_subscription) { - std::string mq_name = create_mq_name(topic_name, subscriber_id); + std::string mq_name = create_mq_name_for_agnocast_publish(topic_name, subscriber_id); struct mq_attr attr = {}; attr.mq_flags = 0; // Blocking queue attr.mq_msgsize = sizeof(MqMsgAgnocast); // Maximum message size diff --git a/src/agnocastlib/src/agnocast_utils.cpp b/src/agnocastlib/src/agnocast_utils.cpp index 8bad558b..ce57a874 100644 --- a/src/agnocastlib/src/agnocast_utils.cpp +++ b/src/agnocastlib/src/agnocast_utils.cpp @@ -20,7 +20,8 @@ void validate_ld_preload() } } -std::string create_mq_name(const std::string & topic_name, const topic_local_id_t id) +static std::string create_mq_name( + const std::string & header, const std::string & topic_name, const topic_local_id_t id) { if (topic_name.length() == 0 || topic_name[0] != '/') { RCLCPP_ERROR(logger, "create_mq_name failed"); @@ -30,7 +31,7 @@ std::string create_mq_name(const std::string & topic_name, const topic_local_id_ std::string mq_name = topic_name; mq_name[0] = '@'; - mq_name = "/agnocast" + mq_name + "@" + std::to_string(id); + mq_name = header + mq_name + "@" + std::to_string(id); // As a mq_name, '/' cannot be used for (size_t i = 1; i < mq_name.size(); i++) { @@ -42,6 +43,18 @@ std::string create_mq_name(const std::string & topic_name, const topic_local_id_ return mq_name; } +std::string create_mq_name_for_agnocast_publish( + const std::string & topic_name, const topic_local_id_t id) +{ + return create_mq_name("/agnocast", topic_name, id); +} + +std::string create_mq_name_for_ros2_publish( + const std::string & topic_name, const topic_local_id_t id) +{ + return create_mq_name("/agnocast_to_ros2", topic_name, id); +} + std::string create_shm_name(const pid_t pid) { return "/agnocast@" + std::to_string(pid); diff --git a/src/agnocastlib/test/test_agnocast_utils.cpp b/src/agnocastlib/test/test_agnocast_utils.cpp index c5af6aad..542bf9a0 100644 --- a/src/agnocastlib/test/test_agnocast_utils.cpp +++ b/src/agnocastlib/test/test_agnocast_utils.cpp @@ -4,17 +4,27 @@ TEST(AgnocastUtilsTest, create_mq_name_normal) { - EXPECT_EQ(agnocast::create_mq_name("/dummy", 0), "/agnocast@dummy@0"); + EXPECT_EQ(agnocast::create_mq_name_for_agnocast_publish("/dummy", 0), "/agnocast@dummy@0"); + EXPECT_EQ(agnocast::create_mq_name_for_ros2_publish("/dummy", 0), "/agnocast_to_ros2@dummy@0"); } TEST(AgnocastUtilsTest, create_mq_name_slash_included) { - EXPECT_EQ(agnocast::create_mq_name("/dummy/dummy", 0), "/agnocast@dummy_dummy@0"); + EXPECT_EQ( + agnocast::create_mq_name_for_agnocast_publish("/dummy/dummy", 0), "/agnocast@dummy_dummy@0"); + EXPECT_EQ( + agnocast::create_mq_name_for_ros2_publish("/dummy/dummy", 0), + "/agnocast_to_ros2@dummy_dummy@0"); } TEST(AgnocastUtilsTest, create_mq_name_invalid_topic) { - EXPECT_EXIT(agnocast::create_mq_name("dummy", 0), ::testing::ExitedWithCode(EXIT_FAILURE), ""); + EXPECT_EXIT( + agnocast::create_mq_name_for_agnocast_publish("dummy", 0), + ::testing::ExitedWithCode(EXIT_FAILURE), ""); + EXPECT_EXIT( + agnocast::create_mq_name_for_ros2_publish("dummy", 0), ::testing::ExitedWithCode(EXIT_FAILURE), + ""); } TEST(AgnocastUtilsTest, validate_ld_preload_normal)