Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(perception_reproducer)Beta/v0.19.1.suport new reproducer #1562

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion perception/multi_object_tracker/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<depend>tier4_autoware_utils</depend>
<depend>tier4_perception_msgs</depend>
<depend>unique_identifier_msgs</depend>

<depend>diagnostic_updater</depend>
<test_depend>ament_lint_auto</test_depend>
<test_depend>autoware_lint_common</test_depend>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
#!/usr/bin/env python3

# Copyright 2023 TIER IV, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from subprocess import CalledProcessError
from subprocess import check_output
import time

from autoware_auto_perception_msgs.msg import DetectedObjects
from autoware_auto_perception_msgs.msg import PredictedObjects
from autoware_auto_perception_msgs.msg import TrackedObjects
from autoware_auto_perception_msgs.msg import TrafficSignalArray as AutoTrafficSignalArray
from autoware_perception_msgs.msg import TrafficSignalArray
from geometry_msgs.msg import PoseWithCovarianceStamped
from nav_msgs.msg import Odometry
import psutil
from rclpy.node import Node
from rclpy.serialization import deserialize_message
from rosbag2_py import StorageFilter
from rosidl_runtime_py.utilities import get_message
from sensor_msgs.msg import PointCloud2
from utils import open_reader


class PerceptionReplayerCommon(Node):
def __init__(self, args, name):
super().__init__(name)
self.args = args

self.ego_pose = None
self.rosbag_objects_data = []
self.rosbag_ego_odom_data = []
self.rosbag_traffic_signals_data = []
self.is_auto_traffic_signals = False

# subscriber
self.sub_odom = self.create_subscription(
Odometry, "/localization/kinematic_state", self.on_odom, 1
)

# publisher
if self.args.detected_object:
self.objects_pub = self.create_publisher(
DetectedObjects, "/perception/object_recognition/detection/objects", 1
)
elif self.args.tracked_object:
self.objects_pub = self.create_publisher(
TrackedObjects, "/perception/object_recognition/tracking/objects", 1
)
else:
self.objects_pub = self.create_publisher(
PredictedObjects, "/perception/object_recognition/objects", 1
)

self.pointcloud_pub = self.create_publisher(
PointCloud2, "/perception/obstacle_segmentation/pointcloud", 1
)
self.recorded_ego_pub_as_initialpose = self.create_publisher(
PoseWithCovarianceStamped, "/initialpose", 1
)

self.recorded_ego_pub = self.create_publisher(
Odometry, "/perception_reproducer/rosbag_ego_odom", 1
)

# load rosbag
print("Stared loading rosbag")
if os.path.isdir(args.bag):
for bag_file in sorted(os.listdir(args.bag)):
if bag_file.endswith(self.args.rosbag_format):
self.load_rosbag(args.bag + "/" + bag_file)
else:
self.load_rosbag(args.bag)
print("Ended loading rosbag")

# temporary support old auto msgs
if self.is_auto_traffic_signals:
self.auto_traffic_signals_pub = self.create_publisher(
AutoTrafficSignalArray, "/perception/traffic_light_recognition/traffic_signals", 1
)
else:
self.traffic_signals_pub = self.create_publisher(
TrafficSignalArray, "/perception/traffic_light_recognition/traffic_signals", 1
)

# wait for ready to publish/subscribe
time.sleep(1.0)

def on_odom(self, odom):
self.ego_pose = odom.pose.pose

def load_rosbag(self, rosbag2_path: str):
reader = open_reader(str(rosbag2_path))

topic_types = reader.get_all_topics_and_types()
# Create a map for quicker lookup
type_map = {topic_types[i].name: topic_types[i].type for i in range(len(topic_types))}

objects_topic = (
"/perception/object_recognition/detection/objects"
if self.args.detected_object
else "/perception/object_recognition/tracking/objects"
if self.args.tracked_object
else "/perception/object_recognition/objects"
)
ego_odom_topic = "/localization/kinematic_state"
traffic_signals_topic = "/perception/traffic_light_recognition/traffic_signals"
topic_filter = StorageFilter(topics=[objects_topic, ego_odom_topic, traffic_signals_topic])
reader.set_filter(topic_filter)

while reader.has_next():
(topic, data, stamp) = reader.read_next()
msg_type = get_message(type_map[topic])
msg = deserialize_message(data, msg_type)
# import pdb; pdb.set_trace()
print(type(msg))
if topic == objects_topic:
self.rosbag_objects_data.append((stamp, msg))
if topic == ego_odom_topic:
self.rosbag_ego_odom_data.append((stamp, msg))
if topic == traffic_signals_topic:
self.rosbag_traffic_signals_data.append((stamp, msg))
self.is_auto_traffic_signals = (
"autoware_auto_perception_msgs" in type(msg).__module__
)

def kill_online_perception_node(self):
# kill node if required
kill_process_name = None
if self.args.detected_object:
kill_process_name = "dummy_perception_publisher_node"
elif self.args.tracked_object:
kill_process_name = "multi_object_tracker"
else:
kill_process_name = "map_based_prediction"
if kill_process_name:
try:
pid = check_output(["pidof", kill_process_name])
process = psutil.Process(int(pid[:-1]))
process.terminate()
except CalledProcessError:
pass

def binary_search(self, data, timestamp):
if data[-1][0] < timestamp:
return data[-1][1]
elif data[0][0] > timestamp:
return data[0][1]

low, high = 0, len(data) - 1

while low <= high:
mid = (low + high) // 2
if data[mid][0] < timestamp:
low = mid + 1
elif data[mid][0] > timestamp:
high = mid - 1
else:
return data[mid][1]

# Return the next timestamp's data if available
if low < len(data):
return data[low][1]
return None

def find_topics_by_timestamp(self, timestamp):
objects_data = self.binary_search(self.rosbag_objects_data, timestamp)
traffic_signals_data = self.binary_search(self.rosbag_traffic_signals_data, timestamp)
return objects_data, traffic_signals_data

def find_ego_odom_by_timestamp(self, timestamp):
return self.binary_search(self.rosbag_ego_odom_data, timestamp)
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
#!/usr/bin/env python3

# Copyright 2023 TIER IV, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import math
import time

from geometry_msgs.msg import Quaternion
import numpy as np
import rosbag2_py
from sensor_msgs.msg import PointCloud2
from sensor_msgs.msg import PointField
from tf_transformations import euler_from_quaternion
from tf_transformations import quaternion_from_euler


def get_rosbag_options(path, serialization_format="cdr"):
storage_options = rosbag2_py.StorageOptions(uri=path, storage_id="sqlite3")

converter_options = rosbag2_py.ConverterOptions(
input_serialization_format=serialization_format,
output_serialization_format=serialization_format,
)

return storage_options, converter_options


def open_reader(path: str):
storage_options, converter_options = get_rosbag_options(path)
reader = rosbag2_py.SequentialReader()
reader.open(storage_options, converter_options)
return reader


def calc_squared_distance(p1, p2):
return math.sqrt((p1.x - p2.x) ** 2 + (p1.y - p2.y) ** 2)


def create_empty_pointcloud(timestamp):
pointcloud_msg = PointCloud2()
pointcloud_msg.header.stamp = timestamp
pointcloud_msg.header.frame_id = "map"
pointcloud_msg.height = 1
pointcloud_msg.is_dense = True
pointcloud_msg.point_step = 16
field_name_vec = ["x", "y", "z"]
offset_vec = [0, 4, 8]
for field_name, offset in zip(field_name_vec, offset_vec):
field = PointField()
field.name = field_name
field.offset = offset
field.datatype = 7
field.count = 1
pointcloud_msg.fields.append(field)
return pointcloud_msg


def get_yaw_from_quaternion(orientation):
orientation_list = [
orientation.x,
orientation.y,
orientation.z,
orientation.w,
]
return euler_from_quaternion(orientation_list)[2]


def get_quaternion_from_yaw(yaw):
q = quaternion_from_euler(0, 0, yaw)
orientation = Quaternion()
orientation.x = q[0]
orientation.y = q[1]
orientation.z = q[2]
orientation.w = q[3]
return orientation


def translate_objects_coordinate(ego_pose, log_ego_pose, objects_msg):
log_ego_yaw = get_yaw_from_quaternion(log_ego_pose.orientation)
log_ego_pose_trans_mat = np.array(
[
[
math.cos(log_ego_yaw),
-math.sin(log_ego_yaw),
log_ego_pose.position.x,
],
[math.sin(log_ego_yaw), math.cos(log_ego_yaw), log_ego_pose.position.y],
[0.0, 0.0, 1.0],
]
)

ego_yaw = get_yaw_from_quaternion(ego_pose.orientation)
ego_pose_trans_mat = np.array(
[
[math.cos(ego_yaw), -math.sin(ego_yaw), ego_pose.position.x],
[math.sin(ego_yaw), math.cos(ego_yaw), ego_pose.position.y],
[0.0, 0.0, 1.0],
]
)

for o in objects_msg.objects:
log_object_pose = o.kinematics.pose_with_covariance.pose
log_object_yaw = get_yaw_from_quaternion(log_object_pose.orientation)
log_object_pos_vec = np.array([log_object_pose.position.x, log_object_pose.position.y, 1.0])

# translate object pose from ego pose in log to ego pose in simulation
object_pos_vec = np.linalg.inv(ego_pose_trans_mat).dot(
log_ego_pose_trans_mat.dot(log_object_pos_vec.T)
)

object_pose = o.kinematics.pose_with_covariance.pose
object_pose.position.x = object_pos_vec[0]
object_pose.position.y = object_pos_vec[1]
object_pose.orientation = get_quaternion_from_yaw(log_object_yaw + log_ego_yaw - ego_yaw)


class StopWatch:
def __init__(self, verbose):
# A dictionary to store the starting times
self.start_times = {}
self.verbose = verbose

def tic(self, name):
"""Store the current time with the given name."""
self.start_times[name] = time.perf_counter()

def toc(self, name):
"""Print the elapsed time since the last call to tic() with the same name."""
if name not in self.start_times:
print(f"No start time found for {name}!")
return

elapsed_time = (
time.perf_counter() - self.start_times[name]
) * 1000 # Convert to milliseconds
if self.verbose:
print(f"Time for {name}: {elapsed_time: .2f} ms")

# Reset the starting time for the name
del self.start_times[name]
Loading
Loading