Skip to content

Commit

Permalink
fix: use sequential index for message entries instead of timestamps (#…
Browse files Browse the repository at this point in the history
…355)

Signed-off-by: veqcc <ryuta.kambe@tier4.jp>
  • Loading branch information
veqcc authored Feb 5, 2025
1 parent a2abd31 commit a701d43
Show file tree
Hide file tree
Showing 14 changed files with 188 additions and 205 deletions.
165 changes: 81 additions & 84 deletions kmod/agnocast.c

Large diffs are not rendered by default.

17 changes: 8 additions & 9 deletions kmod/agnocast.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@ union ioctl_subscriber_args {
const char * topic_name;
uint32_t qos_depth;
pid_t subscriber_pid;
uint64_t init_timestamp;
bool is_take_sub;
};
struct
{
topic_local_id_t ret_id;
uint32_t ret_transient_local_num;
uint64_t ret_timestamps[MAX_QOS_DEPTH];
uint64_t ret_last_msg_addrs[MAX_QOS_DEPTH];
int64_t ret_entry_ids[MAX_QOS_DEPTH];
uint64_t ret_entry_addrs[MAX_QOS_DEPTH];
uint32_t ret_publisher_num;
pid_t ret_publisher_pids[MAX_PUBLISHER_NUM];
uint64_t ret_shm_addrs[MAX_PUBLISHER_NUM];
Expand Down Expand Up @@ -53,7 +52,7 @@ union ioctl_update_entry_args {
{
const char * topic_name;
topic_local_id_t subscriber_id;
uint64_t msg_timestamp;
int64_t entry_id;
};
uint64_t ret;
};
Expand All @@ -67,9 +66,9 @@ union ioctl_receive_msg_args {
};
struct
{
uint16_t ret_len;
uint64_t ret_timestamps[MAX_QOS_DEPTH];
uint64_t ret_last_msg_addrs[MAX_QOS_DEPTH];
uint16_t ret_entry_num;
int64_t ret_entry_ids[MAX_QOS_DEPTH];
uint64_t ret_entry_addrs[MAX_QOS_DEPTH];
};
};

Expand All @@ -78,12 +77,12 @@ union ioctl_publish_args {
{
const char * topic_name;
topic_local_id_t publisher_id;
uint64_t msg_timestamp;
uint32_t qos_depth;
uint64_t msg_virtual_address;
};
struct
{
int64_t ret_entry_id;
uint32_t ret_subscriber_num;
topic_local_id_t ret_subscriber_ids[MAX_SUBSCRIBER_NUM];
uint32_t ret_released_num;
Expand All @@ -102,7 +101,7 @@ union ioctl_take_msg_args {
struct
{
uint64_t ret_addr;
uint64_t ret_timestamp;
int64_t ret_entry_id;
};
};

Expand Down
6 changes: 3 additions & 3 deletions src/agnocastlib/include/agnocast_callback_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ uint32_t register_callback(

auto message_creator = [](
const void * ptr, const std::string & topic_name,
const topic_local_id_t subscriber_id, const uint64_t timestamp) {
const topic_local_id_t subscriber_id, const int64_t entry_id) {
return std::make_unique<TypedMessagePtr<MessageType>>(agnocast::ipc_shared_ptr<MessageType>(
const_cast<MessageType *>(static_cast<const MessageType *>(ptr)), topic_name, subscriber_id,
timestamp));
entry_id));
};

uint32_t callback_info_id = next_callback_info_id.fetch_add(1);
Expand All @@ -131,7 +131,7 @@ uint32_t register_callback(
}

std::shared_ptr<std::function<void()>> create_callable(
const void * ptr, const topic_local_id_t subscriber_id, const uint64_t timestamp,
const void * ptr, const topic_local_id_t subscriber_id, const int64_t entry_id,
const uint32_t callback_info_id);

} // namespace agnocast
17 changes: 8 additions & 9 deletions src/agnocastlib/include/agnocast_ioctl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,14 @@ union ioctl_subscriber_args {
const char * topic_name;
uint32_t qos_depth;
pid_t subscriber_pid;
uint64_t init_timestamp;
bool is_take_sub;
};
struct
{
topic_local_id_t ret_id;
uint32_t ret_transient_local_num;
uint64_t ret_timestamps[MAX_QOS_DEPTH];
uint64_t ret_last_msg_addrs[MAX_QOS_DEPTH];
int64_t ret_entry_ids[MAX_QOS_DEPTH];
uint64_t ret_entry_addrs[MAX_QOS_DEPTH];
uint32_t ret_publisher_num;
pid_t ret_publisher_pids[MAX_PUBLISHER_NUM];
uint64_t ret_shm_addrs[MAX_PUBLISHER_NUM];
Expand Down Expand Up @@ -67,7 +66,7 @@ union ioctl_update_entry_args {
{
const char * topic_name;
topic_local_id_t subscriber_id;
uint64_t msg_timestamp;
int64_t entry_id;
};
uint64_t ret;
};
Expand All @@ -84,9 +83,9 @@ union ioctl_receive_msg_args {
};
struct
{
uint16_t ret_len;
uint64_t ret_timestamps[MAX_QOS_DEPTH];
uint64_t ret_last_msg_addrs[MAX_QOS_DEPTH];
uint16_t ret_entry_num;
int64_t ret_entry_ids[MAX_QOS_DEPTH];
uint64_t ret_entry_addrs[MAX_QOS_DEPTH];
};
};
#pragma GCC diagnostic pop
Expand All @@ -98,12 +97,12 @@ union ioctl_publish_args {
{
const char * topic_name;
topic_local_id_t publisher_id;
uint64_t msg_timestamp;
uint32_t qos_depth;
uint64_t msg_virtual_address;
};
struct
{
int64_t ret_entry_id;
uint32_t ret_subscriber_num;
topic_local_id_t ret_subscriber_ids[MAX_SUBSCRIBER_NUM];
uint32_t ret_released_num;
Expand All @@ -125,7 +124,7 @@ union ioctl_take_msg_args {
struct
{
uint64_t ret_addr;
uint64_t ret_timestamp;
int64_t ret_entry_id;
};
};
#pragma GCC diagnostic pop
Expand Down
29 changes: 13 additions & 16 deletions src/agnocastlib/include/agnocast_publisher.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ namespace agnocast

// These are cut out of the class for information hiding.
topic_local_id_t initialize_publisher(const pid_t publisher_pid, const std::string & topic_name);
std::vector<uint64_t> publish_core(
const std::string & topic_name, const topic_local_id_t publisher_id, const uint64_t timestamp,
const uint32_t qos_depth, const uint64_t msg_virtual_address,
std::unordered_map<std::string, mqd_t> & opened_mqs);
union ioctl_publish_args publish_core(
const std::string & topic_name, const topic_local_id_t publisher_id, const uint32_t qos_depth,
const uint64_t msg_virtual_address, std::unordered_map<std::string, mqd_t> & opened_mqs);
uint32_t get_subscription_count_core(const std::string & topic_name);
void increment_borrowed_publisher_num();
void decrement_borrowed_publisher_num();
Expand Down Expand Up @@ -85,32 +84,30 @@ class Publisher

void publish(ipc_shared_ptr<MessageT> && message)
{
const uint64_t timestamp = agnocast::agnocast_get_timestamp();

#ifdef TRACETOOLS_LTTNG_ENABLED
TRACEPOINT(
agnocast_publish, static_cast<const void *>(this), static_cast<const void *>(message.get()),
timestamp);
#endif

if (!message || topic_name_ != message.get_topic_name()) {
RCLCPP_ERROR(logger, "Invalid message to publish.");
close(agnocast_fd);
exit(EXIT_FAILURE);
}

std::vector<uint64_t> released_addrs = publish_core(
topic_name_, id_, timestamp, static_cast<uint32_t>(qos_.depth()),
const union ioctl_publish_args publish_args = publish_core(
topic_name_, id_, static_cast<uint32_t>(qos_.depth()),
reinterpret_cast<uint64_t>(message.get()), opened_mqs_);

#ifdef TRACETOOLS_LTTNG_ENABLED
TRACEPOINT(
agnocast_publish, static_cast<const void *>(this), static_cast<const void *>(message.get()),
publish_args.ret_entry_id);
#endif

// We need to decrement borrowed_publisher_num before ros2_publish, otherwise the buffers used
// for ROS2 serialization will also use shared memory. Since we don't need to store any
// additional data in shared memory after the agnocast publish operation, here is the ideal
// point to decrement.
decrement_borrowed_publisher_num();

for (uint64_t addr : released_addrs) {
MessageT * release_ptr = reinterpret_cast<MessageT *>(addr);
for (uint32_t i = 0; i < publish_args.ret_released_num; i++) {
MessageT * release_ptr = reinterpret_cast<MessageT *>(publish_args.ret_released_addrs[i]);
delete release_ptr;
}

Expand Down
22 changes: 11 additions & 11 deletions src/agnocastlib/include/agnocast_smart_pointer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ namespace agnocast

// These are cut out of the class for information hiding.
void decrement_rc(
const std::string & topic_name, const topic_local_id_t subscriber_id, const uint64_t timestamp);
const std::string & topic_name, const topic_local_id_t subscriber_id, const int64_t entry_id);
void increment_rc_core(
const std::string & topic_name, const topic_local_id_t subscriber_id, const uint64_t timestamp);
const std::string & topic_name, const topic_local_id_t subscriber_id, const int64_t entry_id);

extern int agnocast_fd;

Expand All @@ -32,14 +32,14 @@ class ipc_shared_ptr
T * ptr_ = nullptr;
std::string topic_name_;
topic_local_id_t subscriber_id_ = -1;
uint64_t timestamp_ = 0;
int64_t entry_id_ = -1;
bool is_created_by_sub_ = false;

void increment_rc() const
{
if (!is_created_by_sub_) return;

increment_rc_core(topic_name_, subscriber_id_, timestamp_);
increment_rc_core(topic_name_, subscriber_id_, entry_id_);
}

// Unimplemented operators. If these are called, a compile error is raised.
Expand All @@ -51,7 +51,7 @@ class ipc_shared_ptr
using element_type = T;

const std::string get_topic_name() const { return topic_name_; }
uint64_t get_timestamp() const { return timestamp_; }
int64_t get_entry_id() const { return entry_id_; }

ipc_shared_ptr() = default;

Expand All @@ -62,11 +62,11 @@ class ipc_shared_ptr

explicit ipc_shared_ptr(
T * ptr, const std::string & topic_name, const topic_local_id_t subscriber_id,
const uint64_t timestamp)
const int64_t entry_id)
: ptr_(ptr),
topic_name_(topic_name),
subscriber_id_(subscriber_id),
timestamp_(timestamp),
entry_id_(entry_id),
is_created_by_sub_(true)
{
}
Expand All @@ -77,7 +77,7 @@ class ipc_shared_ptr
: ptr_(r.ptr_),
topic_name_(r.topic_name_),
subscriber_id_(r.subscriber_id_),
timestamp_(r.timestamp_),
entry_id_(r.entry_id_),
is_created_by_sub_(r.is_created_by_sub_)
{
if (ptr_ != nullptr && !is_created_by_sub_) {
Expand All @@ -95,7 +95,7 @@ class ipc_shared_ptr
: ptr_(r.ptr_),
topic_name_(r.topic_name_),
subscriber_id_(r.subscriber_id_),
timestamp_(r.timestamp_),
entry_id_(r.entry_id_),
is_created_by_sub_(r.is_created_by_sub_)
{
r.ptr_ = nullptr;
Expand All @@ -108,7 +108,7 @@ class ipc_shared_ptr
ptr_ = r.ptr_;
topic_name_ = r.topic_name_;
subscriber_id_ = r.subscriber_id_;
timestamp_ = r.timestamp_;
entry_id_ = r.entry_id_;
is_created_by_sub_ = r.is_created_by_sub_;

r.ptr_ = nullptr;
Expand All @@ -129,7 +129,7 @@ class ipc_shared_ptr
if (ptr_ == nullptr) return;

if (is_created_by_sub_) {
decrement_rc(topic_name_, subscriber_id_, timestamp_);
decrement_rc(topic_name_, subscriber_id_, entry_id_);
}
ptr_ = nullptr;
}
Expand Down
8 changes: 4 additions & 4 deletions src/agnocastlib/include/agnocast_subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ class Subscription : public SubscriptionBase
if (qos_.durability() == rclcpp::DurabilityPolicy::TransientLocal) {
// old messages first
for (int i = subscriber_args.ret_transient_local_num - 1; i >= 0; i--) {
MessageT * ptr = reinterpret_cast<MessageT *>(subscriber_args.ret_last_msg_addrs[i]);
MessageT * ptr = reinterpret_cast<MessageT *>(subscriber_args.ret_entry_addrs[i]);
agnocast::ipc_shared_ptr<MessageT> agnocast_ptr = agnocast::ipc_shared_ptr<MessageT>(
ptr, topic_name_, id_, subscriber_args.ret_timestamps[i]);
ptr, topic_name_, id_, subscriber_args.ret_entry_ids[i]);
callback(agnocast_ptr);
}
}
Expand Down Expand Up @@ -157,11 +157,11 @@ class TakeSubscription : public SubscriptionBase
#ifdef TRACETOOLS_LTTNG_ENABLED
TRACEPOINT(
agnocast_take, static_cast<void *>(this), reinterpret_cast<void *>(take_args.ret_addr),
take_args.ret_timestamp);
take_args.ret_entry_id);
#endif

MessageT * ptr = reinterpret_cast<MessageT *>(take_args.ret_addr);
return agnocast::ipc_shared_ptr<MessageT>(ptr, topic_name_, id_, take_args.ret_timestamp);
return agnocast::ipc_shared_ptr<MessageT>(ptr, topic_name_, id_, take_args.ret_entry_id);
}
};

Expand Down
6 changes: 3 additions & 3 deletions src/agnocastlib/src/agnocast_callback_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ std::atomic<uint32_t> next_callback_info_id;
std::atomic<bool> need_epoll_updates{false};

std::shared_ptr<std::function<void()>> create_callable(
const void * ptr, const topic_local_id_t subscriber_id, const uint64_t timestamp,
const void * ptr, const topic_local_id_t subscriber_id, const int64_t entry_id,
const uint32_t callback_info_id)
{
bool found = false;
Expand All @@ -31,8 +31,8 @@ std::shared_ptr<std::function<void()>> create_callable(
exit(EXIT_FAILURE);
}

return std::make_shared<std::function<void()>>([ptr, subscriber_id, timestamp, info]() {
auto typed_msg = info->message_creator(ptr, info->topic_name, subscriber_id, timestamp);
return std::make_shared<std::function<void()>>([ptr, subscriber_id, entry_id, info]() {
auto typed_msg = info->message_creator(ptr, info->topic_name, subscriber_id, entry_id);
info->callback(*typed_msg);
});
}
Expand Down
8 changes: 4 additions & 4 deletions src/agnocastlib/src/agnocast_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -130,17 +130,17 @@ bool AgnocastExecutor::get_next_agnocast_executables(
exit(EXIT_FAILURE);
}

for (int32_t i = static_cast<int32_t>(receive_args.ret_len) - 1; i >= 0;
for (int32_t i = static_cast<int32_t>(receive_args.ret_entry_num) - 1; i >= 0;
i--) { // older messages first
const auto callable = agnocast::create_callable(
reinterpret_cast<void *>(receive_args.ret_last_msg_addrs[i]), callback_info.subscriber_id,
receive_args.ret_timestamps[i], callback_info_id);
reinterpret_cast<void *>(receive_args.ret_entry_addrs[i]), callback_info.subscriber_id,
receive_args.ret_entry_ids[i], callback_info_id);

#ifdef TRACETOOLS_LTTNG_ENABLED
uint64_t pid_ciid = (static_cast<uint64_t>(my_pid_) << 32) | callback_info_id;
TRACEPOINT(
agnocast_create_callable, static_cast<const void *>(callable.get()),
reinterpret_cast<void *>(receive_args.ret_last_msg_addrs[i]), receive_args.ret_timestamps[i],
reinterpret_cast<void *>(receive_args.ret_entry_addrs[i]), receive_args.ret_entry_ids[i],
pid_ciid);
#endif

Expand Down
14 changes: 4 additions & 10 deletions src/agnocastlib/src/agnocast_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,13 @@ topic_local_id_t initialize_publisher(const pid_t publisher_pid, const std::stri
return pub_args.ret_id;
}

std::vector<uint64_t> publish_core(
const std::string & topic_name, const topic_local_id_t publisher_id, const uint64_t timestamp,
const uint32_t qos_depth, const uint64_t msg_virtual_address,
std::unordered_map<std::string, mqd_t> & opened_mqs)
union ioctl_publish_args publish_core(
const std::string & topic_name, const topic_local_id_t publisher_id, const uint32_t qos_depth,
const uint64_t msg_virtual_address, std::unordered_map<std::string, mqd_t> & opened_mqs)
{
union ioctl_publish_args publish_args = {};
publish_args.topic_name = topic_name.c_str();
publish_args.publisher_id = publisher_id;
publish_args.msg_timestamp = timestamp;
publish_args.qos_depth = qos_depth;
publish_args.msg_virtual_address = msg_virtual_address;
if (ioctl(agnocast_fd, AGNOCAST_PUBLISH_MSG_CMD, &publish_args) < 0) {
Expand Down Expand Up @@ -113,11 +111,7 @@ std::vector<uint64_t> publish_core(
}
}

std::vector<uint64_t> addresses(publish_args.ret_released_num);
std::copy_n(
static_cast<const uint64_t *>(publish_args.ret_released_addrs), publish_args.ret_released_num,
addresses.begin());
return addresses;
return publish_args;
}

uint32_t get_subscription_count_core(const std::string & topic_name)
Expand Down
Loading

0 comments on commit a701d43

Please sign in to comment.