Skip to content

Commit

Permalink
create receive mq inside thread
Browse files Browse the repository at this point in the history
Signed-off-by: veqcc <ryuta.kambe@tier4.jp>
  • Loading branch information
veqcc committed Feb 14, 2025
1 parent 1217460 commit 782edf4
Showing 1 changed file with 44 additions and 42 deletions.
86 changes: 44 additions & 42 deletions src/agnocastlib/include/agnocast_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,21 @@ class Publisher

ros2_publish_mq_name_ = create_mq_name_for_ros2_publish(topic_name_, id_);

create_ros2_publish_thread();

ros2_publish_mq_ = mq_open(ros2_publish_mq_name_.c_str(), O_WRONLY | O_NONBLOCK);
const int mq_mode = 0666;
struct mq_attr attr = {};
attr.mq_flags = 0;
attr.mq_maxmsg = 1;
attr.mq_msgsize = sizeof(MqMsgROS2Publish);
attr.mq_curmsgs = 0;
ros2_publish_mq_ =
mq_open(ros2_publish_mq_name_.c_str(), O_CREAT | O_WRONLY | O_NONBLOCK, mq_mode, &attr);
if (ros2_publish_mq_ == -1) {
RCLCPP_ERROR(logger, "mq_open failed: %s", strerror(errno));
close(agnocast_fd);
exit(EXIT_FAILURE);
}

ros2_publish_thread_ = std::thread([this]() { do_ros2_publish(); });
}

~Publisher()
Expand All @@ -113,63 +120,58 @@ class Publisher
}

ros2_publish_thread_.join();

if (mq_close(ros2_publish_mq_) == -1) {
RCLCPP_ERROR(logger, "mq_close failed: %s", strerror(errno));
}

if (mq_unlink(ros2_publish_mq_name_.c_str()) == -1) {
RCLCPP_ERROR(logger, "mq_unlink failed: %s", strerror(errno));
}
}

void create_ros2_publish_thread()
void do_ros2_publish()
{
const int mq_mode = 0666;
struct mq_attr attr = {};
attr.mq_flags = 0;
attr.mq_maxmsg = 1;
attr.mq_msgsize = sizeof(MqMsgROS2Publish);
attr.mq_curmsgs = 0;
mqd_t mq = mq_open(ros2_publish_mq_name_.c_str(), O_CREAT | O_RDONLY, mq_mode, &attr);
mqd_t mq = mq_open(ros2_publish_mq_name_.c_str(), O_RDONLY);
if (mq == -1) {
RCLCPP_ERROR(logger, "mq_open failed: %s", strerror(errno));
close(agnocast_fd);
exit(EXIT_FAILURE);
}

ros2_publish_thread_ = std::thread([this, mq]() {
while (true) {
MqMsgROS2Publish mq_msg = {};
auto ret = mq_receive(mq, reinterpret_cast<char *>(&mq_msg), sizeof(mq_msg), nullptr);
if (ret == -1) {
RCLCPP_ERROR(
logger, "mq_receive for ROS 2 publish notification failed: %s", strerror(errno));
close(agnocast_fd);
exit(EXIT_FAILURE);
}

if (mq_msg.should_terminate) {
break;
}
while (true) {
MqMsgROS2Publish mq_msg = {};
auto ret = mq_receive(mq, reinterpret_cast<char *>(&mq_msg), sizeof(mq_msg), nullptr);
if (ret == -1) {
RCLCPP_ERROR(logger, "mq_receive failed: %s", strerror(errno));
close(agnocast_fd);
exit(EXIT_FAILURE);
}

while (true) {
ipc_shared_ptr<MessageT> message;
if (mq_msg.should_terminate) {
break;
}

{
std::scoped_lock lock(ros2_publish_mtx_);
if (ros2_message_queue_.empty()) {
break;
}
while (true) {
ipc_shared_ptr<MessageT> message;

message = std::move(ros2_message_queue_.front());
ros2_message_queue_.pop();
{
std::scoped_lock lock(ros2_publish_mtx_);
if (ros2_message_queue_.empty()) {
break;
}

ros2_publisher_->publish(*message.get());
message = std::move(ros2_message_queue_.front());
ros2_message_queue_.pop();
}
}

if (mq_close(mq) == -1) {
RCLCPP_ERROR(logger, "mq_close failed: %s", strerror(errno));
ros2_publisher_->publish(*message.get());
}
}

if (mq_unlink(ros2_publish_mq_name_.c_str()) == -1) {
RCLCPP_ERROR(logger, "mq_unlink failed: %s", strerror(errno));
}
});
if (mq_close(mq) == -1) {
RCLCPP_ERROR(logger, "mq_close failed: %s", strerror(errno));
}
}

ipc_shared_ptr<MessageT> borrow_loaned_message()
Expand Down

0 comments on commit 782edf4

Please sign in to comment.