Skip to content

Commit

Permalink
fix: use publish timestamp instead of borrow timestamp
Browse files Browse the repository at this point in the history
Signed-off-by: veqcc <ryuta.kambe@tier4.jp>
  • Loading branch information
veqcc committed Feb 4, 2025
1 parent f523eac commit 92df3a3
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 221 deletions.
120 changes: 25 additions & 95 deletions kmod/agnocast.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ struct entry_node
uint64_t msg_virtual_address;
topic_local_id_t referencing_subscriber_ids[MAX_SUBSCRIBER_NUM];
uint8_t subscriber_reference_count[MAX_SUBSCRIBER_NUM];
bool published;
};

DEFINE_HASHTABLE(topic_hashtable, TOPIC_HASH_BITS);
Expand Down Expand Up @@ -431,7 +430,6 @@ static int insert_message_entry(
new_node->referencing_subscriber_ids[i] = -1;
new_node->subscriber_reference_count[i] = 0;
}
new_node->published = false;

rb_link_node(&new_node->node, parent, new);
rb_insert_color(&new_node->node, root);
Expand Down Expand Up @@ -630,8 +628,8 @@ static ssize_t store_topic_name(
// publisher: 9495, 9468,
// subscription: 9534,
// entries:
// time=123456, topic_local_id=0, addr=4398118383424, published=1, referencing=[9534,]
// time=123457, topic_local_id=1, addr=4398051231552, published=1, referencing=[]
// time=123456, topic_local_id=0, addr=4398118383424, referencing=[9534,]
// time=123457, topic_local_id=1, addr=4398051231552, referencing=[]
static ssize_t show_topic_info(struct kobject * kobj, struct kobj_attribute * attr, char * buf)
{
mutex_lock(&global_mutex);
Expand Down Expand Up @@ -685,8 +683,8 @@ static ssize_t show_topic_info(struct kobject * kobj, struct kobj_attribute * at

ret = scnprintf(
buf + used_size, PAGE_SIZE - used_size,
" time=%lld, topic_local_id=%u, addr=%lld, published=%d, referencing=[", en->timestamp,
en->publisher_id, en->msg_virtual_address, en->published);
" time=%lld, topic_local_id=%u, addr=%lld, referencing=[", en->timestamp, en->publisher_id,
en->msg_virtual_address);
used_size += ret;

for (int i = 0; i < MAX_SUBSCRIBER_NUM; i++) {
Expand Down Expand Up @@ -766,16 +764,6 @@ static int subscriber_add(
if (qos_depth <= ioctl_ret->ret_transient_local_num) break;

struct entry_node * en = container_of(node, struct entry_node, node);
/*
* TODO: In the current implementation, the timestamp of the most recently received item is
* stored in sub_info->latest_received_timestamp. If there are older items that haven't been
* published yet, they will be ignored, even on the next RECEIVE. To fix this, the
* implementation should be changed so that items are inserted into the rb_tree only after they
* are published.
*/
if (!en->published) {
continue;
}

if (increment_sub_rc(en, sub_info->id) == -1) {
dev_warn(
Expand Down Expand Up @@ -897,9 +885,9 @@ static int publisher_add(

static uint64_t release_msgs_to_meet_depth(
struct topic_wrapper * wrapper, struct publisher_info * pub_info, const uint32_t qos_depth,
union ioctl_enqueue_and_release_args * ioctl_ret)
union ioctl_publish_args * ioctl_ret)
{
ioctl_ret->ret_len = 0;
ioctl_ret->ret_released_num = 0;

if (pub_info->entries_num <= qos_depth) {
return 0;
Expand Down Expand Up @@ -941,7 +929,7 @@ static uint64_t release_msgs_to_meet_depth(
// that are out of qos_depth to be zero at a specific time. If this happens, as long as the
// publisher's qos_depth is greater than the subscriber's qos_depth, this has little effect on
// system behavior.
while (num_search_entries > 0 && ioctl_ret->ret_len < MAX_RELEASE_NUM) {
while (num_search_entries > 0 && ioctl_ret->ret_released_num < MAX_RELEASE_NUM) {
struct entry_node * en = container_of(node, struct entry_node, node);
node = rb_next(node);
if (!node) {
Expand All @@ -959,8 +947,8 @@ static uint64_t release_msgs_to_meet_depth(
// This is not counted in a Queue size of QoS.
if (is_subscriber_referencing(en)) continue;

ioctl_ret->ret_released_addrs[ioctl_ret->ret_len] = en->msg_virtual_address;
ioctl_ret->ret_len++;
ioctl_ret->ret_released_addrs[ioctl_ret->ret_released_num] = en->msg_virtual_address;
ioctl_ret->ret_released_num++;

rb_erase(&en->node, &wrapper->topic.entries);
kfree(en);
Expand All @@ -977,38 +965,6 @@ static uint64_t release_msgs_to_meet_depth(
return 0;
}

static uint64_t enqueue_and_release(
const char * topic_name, const topic_local_id_t publisher_id, const uint32_t qos_depth,
const uint64_t msg_virtual_address, const uint64_t timestamp,
union ioctl_enqueue_and_release_args * ioctl_ret)
{
struct topic_wrapper * wrapper = find_topic(topic_name);
if (!wrapper) {
dev_warn(
agnocast_device, "Topic (topic_name=%s) not found. (enqueue_and_release)\n", topic_name);
return -1;
}

struct publisher_info * pub_info = find_publisher_info(wrapper, publisher_id);
if (!pub_info) {
dev_warn(
agnocast_device,
"Publisher (id=%d) not found in the topic (topic_name=%s). (enqueue_and_release)\n",
publisher_id, topic_name);
return -1;
}

if (insert_message_entry(wrapper, pub_info, msg_virtual_address, timestamp) == -1) {
return -1;
}

if (release_msgs_to_meet_depth(wrapper, pub_info, qos_depth, ioctl_ret) == -1) {
return -1;
}

return 0;
}

static int receive_and_update(
const char * topic_name, const topic_local_id_t subscriber_id, const uint32_t qos_depth,
union ioctl_receive_msg_args * ioctl_ret)
Expand Down Expand Up @@ -1039,17 +995,6 @@ static int receive_and_update(
break;
}

/*
* TODO: In the current implementation, the timestamp of the most recently received item is
* stored in sub_info->latest_received_timestamp. If there are older items that haven't been
* published yet, they will be ignored, even on the next RECEIVE. To fix this, the
* implementation should be changed so that items are inserted into the rb_tree only after they
* are published.
*/
if (!en->published) {
continue;
}

if (increment_sub_rc(en, subscriber_id) == -1) {
return -1;
}
Expand All @@ -1068,28 +1013,29 @@ static int receive_and_update(
}

static int publish_msg(
const char * topic_name, const uint64_t msg_timestamp, union ioctl_publish_args * ioctl_ret)
const char * topic_name, const topic_local_id_t publisher_id, const uint32_t qos_depth,
const uint64_t msg_virtual_address, const uint64_t timestamp,
union ioctl_publish_args * ioctl_ret)
{
struct topic_wrapper * wrapper = find_topic(topic_name);
if (!wrapper) {
dev_warn(agnocast_device, "Topic (topic_name=%s) not found. (publish_msg)\n", topic_name);
return -1;
}

struct entry_node * en = find_message_entry(wrapper, msg_timestamp);
if (!en) {
struct publisher_info * pub_info = find_publisher_info(wrapper, publisher_id);
if (!pub_info) {
dev_warn(
agnocast_device, "Message entry (topic_name=%s timestamp=%lld) not found. (publish_msg)\n",
topic_name, msg_timestamp);
agnocast_device, "Publisher (id=%d) not found in the topic (topic_name=%s). (publish_msg)\n",
publisher_id, topic_name);
return -1;
}

if (en->published) {
dev_warn(
agnocast_device,
"Tried to publish a message that has already been published. (topic_name=%s "
"timestamp=%lld). (publish_msg)\n",
topic_name, msg_timestamp);
if (insert_message_entry(wrapper, pub_info, msg_virtual_address, timestamp) == -1) {
return -1;
}

if (release_msgs_to_meet_depth(wrapper, pub_info, qos_depth, ioctl_ret) == -1) {
return -1;
}

Expand All @@ -1104,8 +1050,6 @@ static int publish_msg(
}
ioctl_ret->ret_subscriber_num = subscriber_num;

en->published = true;

return 0;
}

Expand Down Expand Up @@ -1144,7 +1088,7 @@ static int take_msg(
if (en->timestamp < sub_info->latest_received_timestamp) {
break; // Never take any messages that are older than the most recently received
}
if (en->published) candidate_en = en;
candidate_en = en;
searched_count++;
node = rb_prev(node);
}
Expand Down Expand Up @@ -1229,22 +1173,6 @@ static long agnocast_ioctl(struct file * file, unsigned int cmd, unsigned long a
if (copy_to_user((union ioctl_publisher_args __user *)arg, &pub_args, sizeof(pub_args)))
goto unlock_mutex_and_return;
break;
case AGNOCAST_ENQUEUE_AND_RELEASE_CMD:
union ioctl_enqueue_and_release_args enqueue_release_args;
if (copy_from_user(
&enqueue_release_args, (union ioctl_enqueue_and_release_args __user *)arg,
sizeof(enqueue_release_args)))
goto unlock_mutex_and_return;
if (copy_from_user(
topic_name_buf, (char __user *)enqueue_release_args.topic_name, sizeof(topic_name_buf)))
goto unlock_mutex_and_return;
ret = enqueue_and_release(
topic_name_buf, enqueue_release_args.publisher_id, enqueue_release_args.qos_depth,
enqueue_release_args.msg_virtual_address, enqueue_release_args.timestamp,
&enqueue_release_args);
if (copy_to_user((uint64_t __user *)arg, &enqueue_release_args, sizeof(enqueue_release_args)))
goto unlock_mutex_and_return;
break;
case AGNOCAST_INCREMENT_RC_CMD:
if (copy_from_user(
&entry_args, (union ioctl_update_entry_args __user *)arg, sizeof(entry_args)))
Expand Down Expand Up @@ -1290,7 +1218,9 @@ static long agnocast_ioctl(struct file * file, unsigned int cmd, unsigned long a
if (copy_from_user(
topic_name_buf, (char __user *)publish_args.topic_name, sizeof(topic_name_buf)))
goto unlock_mutex_and_return;
ret = publish_msg(topic_name_buf, publish_args.msg_timestamp, &publish_args);
ret = publish_msg(
topic_name_buf, publish_args.publisher_id, publish_args.qos_depth,
publish_args.msg_virtual_address, publish_args.msg_timestamp, &publish_args);
if (copy_to_user((union ioctl_publish_args __user *)arg, &publish_args, sizeof(publish_args)))
goto unlock_mutex_and_return;
break;
Expand Down
27 changes: 6 additions & 21 deletions kmod/agnocast.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
// TODO: should be made larger when applied for Autoware
#define MAX_PUBLISHER_NUM 4 // At least 4 is required for sample application
#define MAX_SUBSCRIBER_NUM 8 // At least 6 is required for pointcloud topic in Autoware

#define MAX_QOS_DEPTH 10 // Maximum depth of transient local usage part in Autoware
#define MAX_QOS_DEPTH 10 // Maximum depth of transient local usage part in Autoware
#define MAX_RELEASE_NUM 3 // Max to keep union size equal to 32 bytes

typedef int32_t topic_local_id_t;

Expand Down Expand Up @@ -48,24 +48,6 @@ union ioctl_publisher_args {
};
};

#define MAX_RELEASE_NUM 3 // Max to keep union size equal to 32 bytes

union ioctl_enqueue_and_release_args {
struct
{
const char * topic_name;
topic_local_id_t publisher_id;
uint32_t qos_depth;
uint64_t msg_virtual_address;
uint64_t timestamp;
};
struct
{
uint32_t ret_len;
uint64_t ret_released_addrs[MAX_RELEASE_NUM];
};
};

union ioctl_update_entry_args {
struct
{
Expand Down Expand Up @@ -97,11 +79,15 @@ union ioctl_publish_args {
const char * topic_name;
topic_local_id_t publisher_id;
uint64_t msg_timestamp;
uint32_t qos_depth;
uint64_t msg_virtual_address;
};
struct
{
uint32_t ret_subscriber_num;
topic_local_id_t ret_subscriber_ids[MAX_SUBSCRIBER_NUM];
uint32_t ret_released_num;
uint64_t ret_released_addrs[MAX_RELEASE_NUM];
};
};

Expand Down Expand Up @@ -136,7 +122,6 @@ union ioctl_get_subscriber_num_args {

#define AGNOCAST_SUBSCRIBER_ADD_CMD _IOW('S', 1, union ioctl_subscriber_args)
#define AGNOCAST_PUBLISHER_ADD_CMD _IOW('P', 1, union ioctl_publisher_args)
#define AGNOCAST_ENQUEUE_AND_RELEASE_CMD _IOW('E', 1, union ioctl_enqueue_and_release_args)
#define AGNOCAST_INCREMENT_RC_CMD _IOW('M', 1, union ioctl_update_entry_args)
#define AGNOCAST_DECREMENT_RC_CMD _IOW('M', 2, union ioctl_update_entry_args)
#define AGNOCAST_RECEIVE_MSG_CMD _IOW('M', 3, union ioctl_receive_msg_args)
Expand Down
30 changes: 6 additions & 24 deletions src/agnocastlib/include/agnocast_ioctl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ namespace agnocast
// TODO: should be made larger when applied for Autoware
#define MAX_PUBLISHER_NUM 4 // At least 4 is required for sample application
#define MAX_SUBSCRIBER_NUM 8 // At least 6 is required for pointcloud topic in Autoware

#define MAX_QOS_DEPTH 10 // Maximum depth of transient local usage part in Autoware
#define MAX_QOS_DEPTH 10 // Maximum depth of transient local usage part in Autoware
#define MAX_RELEASE_NUM 3 // Max to keep union size equal to 32 bytes

using topic_local_id_t = int32_t;

Expand Down Expand Up @@ -60,27 +60,6 @@ union ioctl_publisher_args {
};
#pragma GCC diagnostic pop

#define MAX_RELEASE_NUM 3 // Max to keep union size equal to 32 bytes

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpedantic"
union ioctl_enqueue_and_release_args {
struct
{
const char * topic_name;
topic_local_id_t publisher_id;
uint32_t qos_depth;
uint64_t msg_virtual_address;
uint64_t timestamp;
};
struct
{
uint32_t ret_len;
uint64_t ret_released_addrs[MAX_RELEASE_NUM];
};
};
#pragma GCC diagnostic pop

#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpedantic"
union ioctl_update_entry_args {
Expand Down Expand Up @@ -120,11 +99,15 @@ union ioctl_publish_args {
const char * topic_name;
topic_local_id_t publisher_id;
uint64_t msg_timestamp;
uint32_t qos_depth;
uint64_t msg_virtual_address;
};
struct
{
uint32_t ret_subscriber_num;
topic_local_id_t ret_subscriber_ids[MAX_SUBSCRIBER_NUM];
uint32_t ret_released_num;
uint64_t ret_released_addrs[MAX_RELEASE_NUM];
};
};
#pragma GCC diagnostic pop
Expand Down Expand Up @@ -166,7 +149,6 @@ union ioctl_get_subscriber_num_args {

#define AGNOCAST_SUBSCRIBER_ADD_CMD _IOW('S', 1, union ioctl_subscriber_args)
#define AGNOCAST_PUBLISHER_ADD_CMD _IOW('P', 1, union ioctl_publisher_args)
#define AGNOCAST_ENQUEUE_AND_RELEASE_CMD _IOW('E', 1, union ioctl_enqueue_and_release_args)
#define AGNOCAST_INCREMENT_RC_CMD _IOW('M', 1, union ioctl_update_entry_args)
#define AGNOCAST_DECREMENT_RC_CMD _IOW('M', 2, union ioctl_update_entry_args)
#define AGNOCAST_RECEIVE_MSG_CMD _IOW('M', 3, union ioctl_receive_msg_args)
Expand Down
Loading

0 comments on commit 92df3a3

Please sign in to comment.