Skip to content

Commit

Permalink
feat(agnocastlib): implement Agnocast-ROS2 functionality (#326)
Browse files Browse the repository at this point in the history
Signed-off-by: veqcc <ryuta.kambe@tier4.jp>
  • Loading branch information
veqcc authored Jan 20, 2025
1 parent d168345 commit 76722b3
Show file tree
Hide file tree
Showing 15 changed files with 177 additions and 148 deletions.
25 changes: 0 additions & 25 deletions src/agnocast_bridge/CMakeLists.txt

This file was deleted.

5 changes: 0 additions & 5 deletions src/agnocast_bridge/launch/bridge.launch.xml

This file was deleted.

18 changes: 0 additions & 18 deletions src/agnocast_bridge/package.xml

This file was deleted.

40 changes: 0 additions & 40 deletions src/agnocast_bridge/src/bridge_node.cpp

This file was deleted.

49 changes: 31 additions & 18 deletions src/agnocastlib/include/agnocast.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,34 +28,48 @@ bool ok();

template <typename MessageT>
typename Publisher<MessageT>::SharedPtr create_publisher(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, const std::string & topic_name,
const rclcpp::QoS & qos)
rclcpp::Node * node, const std::string & topic_name, const rclcpp::QoS & qos)
{
return std::make_shared<Publisher<MessageT>>(node, topic_name, qos);
return std::make_shared<Publisher<MessageT>>(node, topic_name, qos, true);
}

template <typename MessageT>
typename Publisher<MessageT>::SharedPtr create_publisher(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, const std::string & topic_name,
const size_t qos_history_depth)
rclcpp::Node * node, const std::string & topic_name, const size_t qos_history_depth)
{
return std::make_shared<Publisher<MessageT>>(
node, topic_name, rclcpp::QoS(rclcpp::KeepLast(qos_history_depth)));
node, topic_name, rclcpp::QoS(rclcpp::KeepLast(qos_history_depth)), true);
}

template <typename MessageT>
typename Publisher<MessageT>::SharedPtr create_publisher(
rclcpp::Node * node, const std::string & topic_name, const rclcpp::QoS & qos,
const bool do_always_ros2_publish)
{
return std::make_shared<Publisher<MessageT>>(node, topic_name, qos, do_always_ros2_publish);
}

template <typename MessageT>
typename Publisher<MessageT>::SharedPtr create_publisher(
rclcpp::Node * node, const std::string & topic_name, const size_t qos_history_depth,
const bool do_always_ros2_publish)
{
return std::make_shared<Publisher<MessageT>>(
node, topic_name, rclcpp::QoS(rclcpp::KeepLast(qos_history_depth)), do_always_ros2_publish);
}

template <typename MessageT>
typename Subscription<MessageT>::SharedPtr create_subscription(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, const std::string & topic_name,
const rclcpp::QoS & qos, std::function<void(const agnocast::ipc_shared_ptr<MessageT> &)> callback)
rclcpp::Node * node, const std::string & topic_name, const rclcpp::QoS & qos,
std::function<void(const agnocast::ipc_shared_ptr<MessageT> &)> callback)
{
const agnocast::SubscriptionOptions options;
return std::make_shared<Subscription<MessageT>>(node, topic_name, qos, callback, options);
}

template <typename MessageT>
typename Subscription<MessageT>::SharedPtr create_subscription(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, const std::string & topic_name,
const size_t qos_history_depth,
rclcpp::Node * node, const std::string & topic_name, const size_t qos_history_depth,
std::function<void(const agnocast::ipc_shared_ptr<MessageT> &)> callback)
{
const agnocast::SubscriptionOptions options;
Expand All @@ -65,17 +79,16 @@ typename Subscription<MessageT>::SharedPtr create_subscription(

template <typename MessageT>
typename Subscription<MessageT>::SharedPtr create_subscription(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, const std::string & topic_name,
const rclcpp::QoS & qos, std::function<void(const agnocast::ipc_shared_ptr<MessageT> &)> callback,
rclcpp::Node * node, const std::string & topic_name, const rclcpp::QoS & qos,
std::function<void(const agnocast::ipc_shared_ptr<MessageT> &)> callback,
agnocast::SubscriptionOptions options)
{
return std::make_shared<Subscription<MessageT>>(node, topic_name, qos, callback, options);
}

template <typename MessageT>
typename Subscription<MessageT>::SharedPtr create_subscription(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, const std::string & topic_name,
const size_t qos_history_depth,
rclcpp::Node * node, const std::string & topic_name, const size_t qos_history_depth,
std::function<void(const agnocast::ipc_shared_ptr<MessageT> &)> callback,
agnocast::SubscriptionOptions options)
{
Expand All @@ -85,17 +98,17 @@ typename Subscription<MessageT>::SharedPtr create_subscription(

template <typename MessageT>
typename PollingSubscriber<MessageT>::SharedPtr create_subscription(
const std::string & topic_name, const size_t qos_history_depth)
rclcpp::Node * node, const std::string & topic_name, const size_t qos_history_depth)
{
return std::make_shared<PollingSubscriber<MessageT>>(
topic_name, rclcpp::QoS(rclcpp::KeepLast(qos_history_depth)));
node, topic_name, rclcpp::QoS(rclcpp::KeepLast(qos_history_depth)));
}

template <typename MessageT>
typename PollingSubscriber<MessageT>::SharedPtr create_subscription(
const std::string & topic_name, const rclcpp::QoS & qos)
rclcpp::Node * node, const std::string & topic_name, const rclcpp::QoS & qos)
{
return std::make_shared<PollingSubscriber<MessageT>>(topic_name, qos);
return std::make_shared<PollingSubscriber<MessageT>>(node, topic_name, qos);
}

} // namespace agnocast
27 changes: 23 additions & 4 deletions src/agnocastlib/include/agnocast_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,26 @@ class Publisher
// TODO(Koichi98): The mq should be closed when a subscriber unsubscribes the topic, but this is
// not currently implemented.
std::unordered_map<std::string, mqd_t> opened_mqs_;
bool do_always_ros2_publish_; // For transient local.
typename rclcpp::Publisher<MessageT>::SharedPtr ros2_publisher_;

public:
using SharedPtr = std::shared_ptr<Publisher<MessageT>>;

Publisher(
[[maybe_unused]] rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, /* for CARET */
const std::string & topic_name, const rclcpp::QoS & qos)
: topic_name_(topic_name), publisher_pid_(getpid()), qos_(qos)
rclcpp::Node * node, const std::string & topic_name, const rclcpp::QoS & qos,
const bool do_always_ros2_publish)
: topic_name_(node->get_node_topics_interface()->resolve_topic_name(topic_name)),
publisher_pid_(getpid()),
qos_(qos),
ros2_publisher_(node->create_publisher<MessageT>(topic_name_, qos))
{
if (qos.durability() == rclcpp::DurabilityPolicy::TransientLocal) {
do_always_ros2_publish_ = do_always_ros2_publish;
} else {
do_always_ros2_publish_ = false;
}

initialize_publisher(publisher_pid_, topic_name_);
}

Expand Down Expand Up @@ -82,11 +93,19 @@ class Publisher
exit(EXIT_FAILURE);
}

if (do_always_ros2_publish_ || ros2_publisher_->get_subscription_count() > 0) {
const MessageT * raw = message.get();
ros2_publisher_->publish(*raw);
}

publish_core(topic_name_, publisher_pid_, message.get_timestamp(), opened_mqs_);
message.reset();
}

uint32_t get_subscription_count() const { return get_subscription_count_core(topic_name_); }
uint32_t get_subscription_count() const
{
return ros2_publisher_->get_subscription_count() + get_subscription_count_core(topic_name_);
}
};

} // namespace agnocast
24 changes: 13 additions & 11 deletions src/agnocastlib/include/agnocast_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ class SubscriptionBase
union ioctl_subscriber_args initialize(bool is_take_sub);

public:
SubscriptionBase(const pid_t subscriber_pid, std::string topic_name, const rclcpp::QoS & qos);
SubscriptionBase(
rclcpp::Node * node, const pid_t subscriber_pid, const std::string & topic_name,
const rclcpp::QoS & qos);
};

template <typename MessageT>
Expand All @@ -66,17 +68,17 @@ class Subscription : public SubscriptionBase
using SharedPtr = std::shared_ptr<Subscription<MessageT>>;

Subscription(
rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node, const std::string & topic_name,
const rclcpp::QoS & qos,
rclcpp::Node * node, const std::string & topic_name, const rclcpp::QoS & qos,
std::function<void(const agnocast::ipc_shared_ptr<MessageT> &)> callback,
agnocast::SubscriptionOptions options)
: SubscriptionBase(getpid(), topic_name, qos)
: SubscriptionBase(node, getpid(), topic_name, qos)
{
union ioctl_subscriber_args subscriber_args = initialize(false);
mqd_t mq = open_mq_for_subscription(topic_name, subscriber_pid_, mq_subscription);
rclcpp::CallbackGroup::SharedPtr callback_group = get_valid_callback_group(node, options);
mqd_t mq = open_mq_for_subscription(topic_name_, subscriber_pid_, mq_subscription);
auto node_base = node->get_node_base_interface();
rclcpp::CallbackGroup::SharedPtr callback_group = get_valid_callback_group(node_base, options);
agnocast::register_callback(
callback, topic_name, static_cast<uint32_t>(qos.depth()), mq, callback_group);
callback, topic_name_, static_cast<uint32_t>(qos.depth()), mq, callback_group);

// If there are messages available and the transient local is enabled, immediately call the
// callback.
Expand All @@ -101,8 +103,8 @@ class TakeSubscription : public SubscriptionBase
public:
using SharedPtr = std::shared_ptr<TakeSubscription<MessageT>>;

TakeSubscription(const std::string & topic_name, const rclcpp::QoS & qos)
: SubscriptionBase(getpid(), topic_name, qos)
TakeSubscription(rclcpp::Node * node, const std::string & topic_name, const rclcpp::QoS & qos)
: SubscriptionBase(node, getpid(), topic_name, qos)
{
if (qos.durability() == rclcpp::DurabilityPolicy::TransientLocal) {
RCLCPP_WARN(
Expand Down Expand Up @@ -145,10 +147,10 @@ class PollingSubscriber
using SharedPtr = std::shared_ptr<PollingSubscriber<MessageT>>;

explicit PollingSubscriber(
const std::string & topic_name, const rclcpp::QoS & qos = rclcpp::QoS{1})
rclcpp::Node * node, const std::string & topic_name, const rclcpp::QoS & qos = rclcpp::QoS{1})
: data_(agnocast::ipc_shared_ptr<MessageT>())
{
subscriber_ = std::make_shared<TakeSubscription<MessageT>>(topic_name, qos);
subscriber_ = std::make_shared<TakeSubscription<MessageT>>(node, topic_name, qos);
};

const agnocast::ipc_shared_ptr<MessageT> takeData()
Expand Down
1 change: 1 addition & 0 deletions src/agnocastlib/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

<depend>rclcpp</depend>
<depend>rclcpp_components</depend>
<depend>std_msgs</depend>

<test_depend>ament_cmake_gmock</test_depend>
<test_depend>ament_lint_auto</test_depend>
Expand Down
7 changes: 5 additions & 2 deletions src/agnocastlib/src/agnocast_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ mqd_t mq_new_publisher = -1;
extern std::vector<std::thread> threads;

SubscriptionBase::SubscriptionBase(
const pid_t subscriber_pid, std::string topic_name, const rclcpp::QoS & qos)
: subscriber_pid_(subscriber_pid), topic_name_(std::move(topic_name)), qos_(qos)
rclcpp::Node * node, const pid_t subscriber_pid, const std::string & topic_name,
const rclcpp::QoS & qos)
: subscriber_pid_(subscriber_pid),
topic_name_(node->get_node_topics_interface()->resolve_topic_name(topic_name)),
qos_(qos)
{
validate_ld_preload();
}
Expand Down
Loading

0 comments on commit 76722b3

Please sign in to comment.