From cdd700dbcffff0ba268bce2c9a2f89e7225e9385 Mon Sep 17 00:00:00 2001 From: vangeliq Date: Sat, 8 Feb 2025 11:25:37 -0800 Subject: [PATCH] file end change --- .../python_package/scripts/utils.py | 514 +++++++++--------- .../python_package/video_node.py | 294 +++++----- 2 files changed, 404 insertions(+), 404 deletions(-) diff --git a/ros2_ws/src/python_package/python_package/scripts/utils.py b/ros2_ws/src/python_package/python_package/scripts/utils.py index 67c3479..f57f1b2 100644 --- a/ros2_ws/src/python_package/python_package/scripts/utils.py +++ b/ros2_ws/src/python_package/python_package/scripts/utils.py @@ -1,258 +1,258 @@ -import cv2 -import os -import numpy as np -from typing import Tuple, List -import torch -from ultralytics import YOLO -from ultralytics.engine.results import Results - -#utility for the python workspace -# Constants -ROI_ROWS = 300 -ROI_COLS = 300 -CONTOUR_AREA_THRESHOLD = 500 -POSTPROCESS_OUTPUT_SHAPE = (640, 640) - - - -def initialise_model(weights_path=None, precision=None): - """ - create model with the given weights - """ - if not os.path.exists(weights_path): - print(f"weights file not found at {weights_path}") - raise FileNotFoundError(f"weights file not found at {weights_path}") - - #TODO: do something with the precision - - yolo = YOLO(weights_path) - return yolo - - -# Not used for now -def preprocess(image: np.ndarray): - """ - Takes in a numpy array that has been preprocessed - No pre-processing is nededed for YOLO - """ - resized_up = cv2.resize(image, POSTPROCESS_OUTPUT_SHAPE) - - return resized_up - -def run_inference(model, image_array: np.ndarray) -> Results: - # https://docs.ultralytics.com/modes/predict/#inference-arguments - """ - Perform inference on the provided image array using the YOLO model. - Note that YOLO handles normalization of the bounding boxes - - Args: - image_array (np.ndarray): The input image array on which inference is to be performed. - - Returns: - tuple: A tuple containing: - - out_img (np.ndarray): The output image with bounding boxes drawn. - - confidences (list): A list of confidence scores for each detected object. - - boxes (list): A list of bounding boxes in the format [x1, y1, x2, y2] normalized from 0 to 1. - """ - if model is None: - raise Exception("Model was not initialised, inference cannot be done") - - results = model.predict(image_array) - - if results[0] is not None: # todo: change this so the message only sends bb - result = results[0] - out_img = cv2.cvtColor(result.plot(), cv2.COLOR_BGR2RGB) # Convert BGR to RGB - - boxes = result.boxes.xyxyn.cpu().numpy() - confidences = result.boxes.conf.cpu().numpy() - return out_img, confidences, boxes - - return image_array, np.array([]), np.array([]) - -def postprocess(confidence, bbox_array: np.ndarray,raw_image: np.ndarray, velocity=0): - """ - Postprocesses the bounding boxes to convert them from normalized coordinates (xyxyn) to pixel coordinates (xyxy). - Applies color segmentation to refine the bounding boxes and adjusts them to fit within a shifted region of interest (ROI). - - Args: - confidence (list): List of confidence scores for each bounding box. 2D array of shape (N, 4). (N = no. of boxes) - bbox_array (list): List of bounding boxes in normalized coordinates (xyxyn). - raw_image (np.ndarray): The original input image. - velocity (int, optional): The velocity to shift the ROI. Default is 0. - - Returns: - list: A list of refined bounding boxes in pixel coordinates (xyxy). - """ - - detections = _convert_bboxes_to_pixel(bbox_array, raw_image.shape) - detections = _object_filter(raw_image, detections) #color segmentation - detections = _verify_object(raw_image, detections,velocity) - - return detections - -def _convert_bboxes_to_pixel(bbox_array: np.ndarray, image_shape: Tuple[int, int]) -> List[Tuple[int, int, int, int]]: - """ - Converts normalized bounding boxes to pixel coordinates - - Args: - bbox_array (np.ndarray): 2D Array of bounding boxes in normalized coordinates - image_shape (Tuple[int, int]): The shape of the image - - Returns: - List[Tuple[int, int, int, int]]: List of bounding boxes in pixel coordinates. - """ - height, width = image_shape[:2] - detections = [] - for bbox in bbox_array: - x1 = int(bbox[0] * width) - y1 = int(bbox[1] * height) - x2 = int(bbox[2] * width) - y2 = int(bbox[3] * height) - detections.append((x1, y1, x2, y2)) - return detections - -# creates a color segmentation mask and filters small areas within bounding box frame -def _object_filter(image: np.ndarray, bboxes: List[Tuple[int, int, int, int]]) -> List[Tuple[int, int, int, int]]: - """ - Filters objects in an image based on bounding boxes and color thresholds. - - Args: - image (np.ndarray): The input image in which objects are to be filtered. - bboxes (List[Tuple[int, int, int, int]]): A list of bounding boxes. - - Returns: - List[Tuple[int, int, int, int]]: A list of filtered bounding boxes. - """ - detections = [] - for bbox in bboxes: - x1, y1, x2, y2 = bbox - roi = image[y1:y2, x1:x2] - - # Perform color segmentation - hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) - lower_hue = 35 - upper_hue = 80 - lower_mask = hsv[:, :, 0] > lower_hue - upper_mask = hsv[:, :, 0] < upper_hue - saturation_mask = hsv[:, :, 1] > 50 - mask = lower_mask & upper_mask & saturation_mask - - # Check if the mask has significant area - if np.sum(mask) > CONTOUR_AREA_THRESHOLD: - # Refine bounding boxes using contours - gray_image = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) - gray_image = gray_image * mask.astype(np.uint8) - _, thresh = cv2.threshold(gray_image, 0, 255, cv2.THRESH_BINARY) - - contours, _ = cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) - for cnt in contours: - area = cv2.contourArea(cnt) - if area > CONTOUR_AREA_THRESHOLD: - x, y, w, h = cv2.boundingRect(cnt) - detections.append((x + x1, y + y1, x + w + x1, y + h + y1)) - return detections - -def _verify_object(raw_image, bboxes, velocity=0): - """ - Adjusts bounding boxes based on the region of interest (ROI) and velocity. - This function takes an image, a list of bounding boxes, and an optional velocity parameter. - It adjusts the bounding boxes to ensure they are within the shifted ROI - Parameters: - raw_image (numpy.ndarray): The input image. - bboxes (list of list of int): A list of bounding boxes, where each bounding box is represented - as [x1, y1, x2, y2]. - velocity (int, optional): The velocity to shift the ROI. Default is 0. - Returns: - List[List[int]]: A list of adjusted bounding boxes that are within the shifted ROI. - """ - - velocity = int(velocity) - roi_x1, roi_y1, roi_x2, roi_y2 = _get_roi_coordinates(image=raw_image) - shifted_roi_x1, shifted_roi_y1, shifted_roi_x2, shifted_roi_y2 = roi_x1 - velocity, roi_y1, roi_x2 - velocity, roi_y2 - - adjusted_bboxes = [] - for bbox in bboxes: - x1, y1, x2, y2 = bbox - - # Check if bounding box is within shifted ROI - if not ((x1 < shifted_roi_x1 and x2 < shifted_roi_x1) or - (x1 > shifted_roi_x2 and x2 > shifted_roi_x2) or - (y1 < roi_y1 and y2 < roi_y1) or - (y1 > roi_y2 and y2 > roi_y2)): - x1 = max(shifted_roi_x1, min(x1, shifted_roi_x2)) #set the coordinates to be inside roi - x2 = max(shifted_roi_x1, min(x2, shifted_roi_x2)) - y1 = max(shifted_roi_y1, min(y1, shifted_roi_y2)) - y2 = max(shifted_roi_y1, min(y2, shifted_roi_y2)) - - adjusted_bboxes.append([x1, y1, x2, y2]) - - return adjusted_bboxes - - -def draw_boxes(image: np.ndarray, bboxes: list, with_roi =True, with_roi_shift = True, velocity = 0) -> np.ndarray: - """ - Given array of bounding box tuples and an image, draw the bounding boxes into the image. - If with_roi and with_roi shift is set to true, the ROI areas will also be drawn in. - velocity must be set if roi_shift is True - - Parameters: - image (np.ndarray): The image on which to draw the bounding boxes. - bboxes (list): A list of bounding boxes, where each bounding box is represented as a list of four values [x1, y1, x2, y2]. - - Returns: - np.ndarray: The image with bounding boxes drawn. - """ - velocity = int(velocity) - - if with_roi: - x1,y1,x2,y2 = _get_roi_coordinates(image) - overlay = image.copy() - cv2.rectangle(overlay, (x1, y1), (x2, y2), (0, 128, 255), -1) - alpha = 0.3 # Transparency factor. - image = cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0) - cv2.putText(image, 'Original ROI', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (100, 100, 100), 2) - - if with_roi_shift and velocity != 0: - x1_shifted = max(x1 - velocity, 0) - x2_shifted = max(x2 - velocity, 0) - image = cv2.rectangle(image, (x1_shifted, y1), (x2_shifted, y2), (128, 128, 128), 2) - overlay = image.copy() - cv2.rectangle(overlay, (x1_shifted, y1), (x2_shifted, y2), (128, 128, 128), -1) - alpha = 0.5 # Transparency factor - image = cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0) - cv2.putText(image, 'Shifted ROI', (x1_shifted, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (100, 100, 100), 2) - - color = tuple(np.random.randint(0, 256, 3).tolist()) # Generate a random color - - for bbox in bboxes: - x1, y1, x2, y2 = map(int, bbox) - - # print(f"Bounding box: ({x1}, {y1}), ({x2}, {y2})") - image = cv2.rectangle(image, (x1, y1), (x2, y2),(255, 0, 0), 2) - - return image - -def _get_roi_coordinates(image:np.ndarray): - """ - Calculate the coordinates of the Region of Interest (ROI) in the given image. - Args: - image (np.ndarray): The input image as a NumPy array. - Returns: - list: A list containing the coordinates of the top-left and bottom-right corners of the ROI - in the format [top_left_x, top_left_y, bottom_right_x, bottom_right_y]. - """ - rows, cols = image.shape[:2] - - top_left_y = (int(rows/2) - int(ROI_ROWS/2)) - top_left_x = (int(cols/2) - int(ROI_COLS/2)) - - bottom_right_y = min(rows, top_left_y + ROI_COLS) - bottom_right_x = min(cols, top_left_x + ROI_ROWS) - - return top_left_x, top_left_y, bottom_right_x, bottom_right_y - -def print_info(model): - """ - Prints the model information - """ +import cv2 +import os +import numpy as np +from typing import Tuple, List +import torch +from ultralytics import YOLO +from ultralytics.engine.results import Results + +#utility for the python workspace +# Constants +ROI_ROWS = 300 +ROI_COLS = 300 +CONTOUR_AREA_THRESHOLD = 500 +POSTPROCESS_OUTPUT_SHAPE = (640, 640) + + + +def initialise_model(weights_path=None, precision=None): + """ + create model with the given weights + """ + if not os.path.exists(weights_path): + print(f"weights file not found at {weights_path}") + raise FileNotFoundError(f"weights file not found at {weights_path}") + + #TODO: do something with the precision + + yolo = YOLO(weights_path) + return yolo + + +# Not used for now +def preprocess(image: np.ndarray): + """ + Takes in a numpy array that has been preprocessed + No pre-processing is nededed for YOLO + """ + resized_up = cv2.resize(image, POSTPROCESS_OUTPUT_SHAPE) + + return resized_up + +def run_inference(model, image_array: np.ndarray) -> Results: + # https://docs.ultralytics.com/modes/predict/#inference-arguments + """ + Perform inference on the provided image array using the YOLO model. + Note that YOLO handles normalization of the bounding boxes + + Args: + image_array (np.ndarray): The input image array on which inference is to be performed. + + Returns: + tuple: A tuple containing: + - out_img (np.ndarray): The output image with bounding boxes drawn. + - confidences (list): A list of confidence scores for each detected object. + - boxes (list): A list of bounding boxes in the format [x1, y1, x2, y2] normalized from 0 to 1. + """ + if model is None: + raise Exception("Model was not initialised, inference cannot be done") + + results = model.predict(image_array) + + if results[0] is not None: # todo: change this so the message only sends bb + result = results[0] + out_img = cv2.cvtColor(result.plot(), cv2.COLOR_BGR2RGB) # Convert BGR to RGB + + boxes = result.boxes.xyxyn.cpu().numpy() + confidences = result.boxes.conf.cpu().numpy() + return out_img, confidences, boxes + + return image_array, np.array([]), np.array([]) + +def postprocess(confidence, bbox_array: np.ndarray,raw_image: np.ndarray, velocity=0): + """ + Postprocesses the bounding boxes to convert them from normalized coordinates (xyxyn) to pixel coordinates (xyxy). + Applies color segmentation to refine the bounding boxes and adjusts them to fit within a shifted region of interest (ROI). + + Args: + confidence (list): List of confidence scores for each bounding box. 2D array of shape (N, 4). (N = no. of boxes) + bbox_array (list): List of bounding boxes in normalized coordinates (xyxyn). + raw_image (np.ndarray): The original input image. + velocity (int, optional): The velocity to shift the ROI. Default is 0. + + Returns: + list: A list of refined bounding boxes in pixel coordinates (xyxy). + """ + + detections = _convert_bboxes_to_pixel(bbox_array, raw_image.shape) + detections = _object_filter(raw_image, detections) #color segmentation + detections = _verify_object(raw_image, detections,velocity) + + return detections + +def _convert_bboxes_to_pixel(bbox_array: np.ndarray, image_shape: Tuple[int, int]) -> List[Tuple[int, int, int, int]]: + """ + Converts normalized bounding boxes to pixel coordinates + + Args: + bbox_array (np.ndarray): 2D Array of bounding boxes in normalized coordinates + image_shape (Tuple[int, int]): The shape of the image + + Returns: + List[Tuple[int, int, int, int]]: List of bounding boxes in pixel coordinates. + """ + height, width = image_shape[:2] + detections = [] + for bbox in bbox_array: + x1 = int(bbox[0] * width) + y1 = int(bbox[1] * height) + x2 = int(bbox[2] * width) + y2 = int(bbox[3] * height) + detections.append((x1, y1, x2, y2)) + return detections + +# creates a color segmentation mask and filters small areas within bounding box frame +def _object_filter(image: np.ndarray, bboxes: List[Tuple[int, int, int, int]]) -> List[Tuple[int, int, int, int]]: + """ + Filters objects in an image based on bounding boxes and color thresholds. + + Args: + image (np.ndarray): The input image in which objects are to be filtered. + bboxes (List[Tuple[int, int, int, int]]): A list of bounding boxes. + + Returns: + List[Tuple[int, int, int, int]]: A list of filtered bounding boxes. + """ + detections = [] + for bbox in bboxes: + x1, y1, x2, y2 = bbox + roi = image[y1:y2, x1:x2] + + # Perform color segmentation + hsv = cv2.cvtColor(roi, cv2.COLOR_BGR2HSV) + lower_hue = 35 + upper_hue = 80 + lower_mask = hsv[:, :, 0] > lower_hue + upper_mask = hsv[:, :, 0] < upper_hue + saturation_mask = hsv[:, :, 1] > 50 + mask = lower_mask & upper_mask & saturation_mask + + # Check if the mask has significant area + if np.sum(mask) > CONTOUR_AREA_THRESHOLD: + # Refine bounding boxes using contours + gray_image = cv2.cvtColor(roi, cv2.COLOR_BGR2GRAY) + gray_image = gray_image * mask.astype(np.uint8) + _, thresh = cv2.threshold(gray_image, 0, 255, cv2.THRESH_BINARY) + + contours, _ = cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) + for cnt in contours: + area = cv2.contourArea(cnt) + if area > CONTOUR_AREA_THRESHOLD: + x, y, w, h = cv2.boundingRect(cnt) + detections.append((x + x1, y + y1, x + w + x1, y + h + y1)) + return detections + +def _verify_object(raw_image, bboxes, velocity=0): + """ + Adjusts bounding boxes based on the region of interest (ROI) and velocity. + This function takes an image, a list of bounding boxes, and an optional velocity parameter. + It adjusts the bounding boxes to ensure they are within the shifted ROI + Parameters: + raw_image (numpy.ndarray): The input image. + bboxes (list of list of int): A list of bounding boxes, where each bounding box is represented + as [x1, y1, x2, y2]. + velocity (int, optional): The velocity to shift the ROI. Default is 0. + Returns: + List[List[int]]: A list of adjusted bounding boxes that are within the shifted ROI. + """ + + velocity = int(velocity) + roi_x1, roi_y1, roi_x2, roi_y2 = _get_roi_coordinates(image=raw_image) + shifted_roi_x1, shifted_roi_y1, shifted_roi_x2, shifted_roi_y2 = roi_x1 - velocity, roi_y1, roi_x2 - velocity, roi_y2 + + adjusted_bboxes = [] + for bbox in bboxes: + x1, y1, x2, y2 = bbox + + # Check if bounding box is within shifted ROI + if not ((x1 < shifted_roi_x1 and x2 < shifted_roi_x1) or + (x1 > shifted_roi_x2 and x2 > shifted_roi_x2) or + (y1 < roi_y1 and y2 < roi_y1) or + (y1 > roi_y2 and y2 > roi_y2)): + x1 = max(shifted_roi_x1, min(x1, shifted_roi_x2)) #set the coordinates to be inside roi + x2 = max(shifted_roi_x1, min(x2, shifted_roi_x2)) + y1 = max(shifted_roi_y1, min(y1, shifted_roi_y2)) + y2 = max(shifted_roi_y1, min(y2, shifted_roi_y2)) + + adjusted_bboxes.append([x1, y1, x2, y2]) + + return adjusted_bboxes + + +def draw_boxes(image: np.ndarray, bboxes: list, with_roi =True, with_roi_shift = True, velocity = 0) -> np.ndarray: + """ + Given array of bounding box tuples and an image, draw the bounding boxes into the image. + If with_roi and with_roi shift is set to true, the ROI areas will also be drawn in. + velocity must be set if roi_shift is True + + Parameters: + image (np.ndarray): The image on which to draw the bounding boxes. + bboxes (list): A list of bounding boxes, where each bounding box is represented as a list of four values [x1, y1, x2, y2]. + + Returns: + np.ndarray: The image with bounding boxes drawn. + """ + velocity = int(velocity) + + if with_roi: + x1,y1,x2,y2 = _get_roi_coordinates(image) + overlay = image.copy() + cv2.rectangle(overlay, (x1, y1), (x2, y2), (0, 128, 255), -1) + alpha = 0.3 # Transparency factor. + image = cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0) + cv2.putText(image, 'Original ROI', (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (100, 100, 100), 2) + + if with_roi_shift and velocity != 0: + x1_shifted = max(x1 - velocity, 0) + x2_shifted = max(x2 - velocity, 0) + image = cv2.rectangle(image, (x1_shifted, y1), (x2_shifted, y2), (128, 128, 128), 2) + overlay = image.copy() + cv2.rectangle(overlay, (x1_shifted, y1), (x2_shifted, y2), (128, 128, 128), -1) + alpha = 0.5 # Transparency factor + image = cv2.addWeighted(overlay, alpha, image, 1 - alpha, 0) + cv2.putText(image, 'Shifted ROI', (x1_shifted, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (100, 100, 100), 2) + + color = tuple(np.random.randint(0, 256, 3).tolist()) # Generate a random color + + for bbox in bboxes: + x1, y1, x2, y2 = map(int, bbox) + + # print(f"Bounding box: ({x1}, {y1}), ({x2}, {y2})") + image = cv2.rectangle(image, (x1, y1), (x2, y2),(255, 0, 0), 2) + + return image + +def _get_roi_coordinates(image:np.ndarray): + """ + Calculate the coordinates of the Region of Interest (ROI) in the given image. + Args: + image (np.ndarray): The input image as a NumPy array. + Returns: + list: A list containing the coordinates of the top-left and bottom-right corners of the ROI + in the format [top_left_x, top_left_y, bottom_right_x, bottom_right_y]. + """ + rows, cols = image.shape[:2] + + top_left_y = (int(rows/2) - int(ROI_ROWS/2)) + top_left_x = (int(cols/2) - int(ROI_COLS/2)) + + bottom_right_y = min(rows, top_left_y + ROI_COLS) + bottom_right_x = min(cols, top_left_x + ROI_ROWS) + + return top_left_x, top_left_y, bottom_right_x, bottom_right_y + +def print_info(model): + """ + Prints the model information + """ print(model.info()) \ No newline at end of file diff --git a/ros2_ws/src/python_package/python_package/video_node.py b/ros2_ws/src/python_package/python_package/video_node.py index c2911e2..e22d8d7 100644 --- a/ros2_ws/src/python_package/python_package/video_node.py +++ b/ros2_ws/src/python_package/python_package/video_node.py @@ -1,148 +1,148 @@ -import time, os -import cv2 -import pycuda.driver as cuda -import cupy as cp -import queue - -import rclpy -from rclpy.time import Time -from rclpy.node import Node -from rclpy.executors import MultiThreadedExecutor -from sensor_msgs.msg import Image -from std_msgs.msg import Header, String -from cv_bridge import CvBridge - -class VideoNode(Node): - def __init__(self): - super().__init__('video_node') - - cuda.init() - device = cuda.Device(0) - self.cuda_driver_context = device.make_context() - self.stream = cuda.Stream() - - self.declare_parameter('video_path', '/home/usr/Desktop/ROS/assets/video.mp4') - self.declare_parameter('loop', 0) # 0 = don't loop, >0 = # of loops, -1 = loop forever - self.declare_parameter('frame_rate', 30) # Desired frame rate for publishing - self.declare_parameter('model_dimensions', (448, 1024)) - self.declare_parameter('shift_constant', 1) - self.declare_parameter('roi_dimensions', [0, 0, 100, 100]) - self.declare_paramter('precision', 'fp32') - - self.video_path = self.get_parameter('video_path').get_parameter_value().string_value - self.loop = self.get_parameter('loop').get_parameter_value().integer_value - self.frame_rate = self.get_parameter('frame_rate').get_parameter_value().integer_value - self.dimensions = tuple(self.get_parameter('model_dimensions').get_parameter_value().integer_array_value) - self.shift_constant = self.get_parameter('shift_constant').get_parameter_value().integer_value - self.roi_dimensions = self.get_parameter('roi_dimensions').get_parameter_value().integer_array_value - self.precision = self.get_parameter('precision').get_parameter_value().string_value - - self.pointer_publisher = self.create_publisher(String, 'preprocessing_done', 10) - self.image_service = self.create_service(Image, 'image_data', self.image_callback) - - self.velocity = [0, 0, 0] - self.image_queue = queue.Queue() - self.bridge = CvBridge() - self.publish_video_frames() - # propagate fp16 option (.fp16 datatype for cupy) - if self.precision == 'fp32': - pass - elif self.precision == 'fp16': - pass - else: - self.get_logger().error(f"Invalid precision: {self.precision}") - - def image_callback(self, response): - self.get_logger().info("Received image request") - if not self.image_queue.empty(): - image_data = self.image_queue.get() # Get the image from the queue - cv_image, velocity = image_data # unpack tuple (image, velocity) - - # Convert OpenCV image to ROS2 Image message using cv_bridge - ros_image = self.bridge.cv2_to_imgmsg(cv_image, encoding='rgb8') - # Create a new header and include velocity in the stamp fields - header = Header() - current_time = self.get_clock().now().to_msg() - header.stamp = current_time # Set timestamp to current time - header.frame_id = str(velocity) # Set frame ID to velocity - - ros_image.header = header # Attach the header to the ROS image message - response.image = ros_image # Set the response's image - response.image = ros_image # Set the response's image - return response - - else: - self.get_logger().error("Image queue is empty") - return response - - def publish_video_frames(self): - if not os.path.exists(self.video_path): - self.get_logger().error(f"Video file not found at {self.video_path}") - return - - cap = cv2.VideoCapture(self.video_path) - if not cap.isOpened(): - self.get_logger().error(f"Failed to open video: {self.video_path}") - return - - loops = 0 - while rclpy.ok() and (self.loop == -1 or loops < self.loop): - while cap.isOpened() and rclpy.ok(): - ret, frame = cap.read() - if not ret: - break - self.image_queue.put((frame, self.velocity[0])) - self.preprocess_image(frame) - time.sleep(1.0 / self.frame_rate) - - if self.loop > 0: - loops += 1 - - if self.loop != -1: - cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # restart video - cap.release() - - def preprocess_image(self, image): - tic = time.perf_counter_ns() - - roi_x, roi_y, roi_w, roi_h = self.roi_dimensions - shifted_x = roi_x + abs(self.velocity[0]) * self.shift_constant - - gpu_image = cv2.cuda_GpuMat() - gpu_image.upload(image) - - gpu_image = cv2.cuda.cvtColor(gpu_image, cv2.COLOR_RGBA2RGB) # remove alpha channel - gpu_image = gpu_image[roi_y:(roi_y+roi_h), shifted_x:(shifted_x+roi_w)] # crop the image to ROI - gpu_image = cv2.cuda.resize(gpu_image, self.dimensions) # resize to model dimensions - - input_data = cp.asarray(gpu_image) # Now the image is on GPU memory as CuPy array - input_data = input_data.astype(cp.float32) / 255.0 # normalize to [0, 1] - input_data = cp.transpose(input_data, (2, 0, 1)) # Transpose from HWC (height, width, channels) to CHW (channels, height, width) - input_data = cp.ravel(input_data) # flatten the array - - d_input_ptr = input_data.data.ptr # Get device pointer of the CuPy array - ipc_handle = cuda.mem_get_ipc_handle(d_input_ptr) # Create the IPC handle - - toc = time.perf_counter_ns() - self.get_logger().info(f"Preprocessing: {(toc-tic)/1e6} ms") - - # Publish the IPC handle as a string (sending the handle reference as a string) - ipc_handle_msg = String() - ipc_handle_msg.data = str(ipc_handle.handle) - self.pointer_publisher.publish(ipc_handle_msg) - -def main(args=None): - rclpy.init(args=args) - video_node = VideoNode() - executor = MultiThreadedExecutor(num_threads=1) - executor.add_node(video_node) - - try: - executor.spin() - finally: - executor.shutdown() - video_node.destroy_node() - rclpy.shutdown() - -if __name__ == '__main__': +import time, os +import cv2 +import pycuda.driver as cuda +import cupy as cp +import queue + +import rclpy +from rclpy.time import Time +from rclpy.node import Node +from rclpy.executors import MultiThreadedExecutor +from sensor_msgs.msg import Image +from std_msgs.msg import Header, String +from cv_bridge import CvBridge + +class VideoNode(Node): + def __init__(self): + super().__init__('video_node') + + cuda.init() + device = cuda.Device(0) + self.cuda_driver_context = device.make_context() + self.stream = cuda.Stream() + + self.declare_parameter('video_path', '/home/usr/Desktop/ROS/assets/video.mp4') + self.declare_parameter('loop', 0) # 0 = don't loop, >0 = # of loops, -1 = loop forever + self.declare_parameter('frame_rate', 30) # Desired frame rate for publishing + self.declare_parameter('model_dimensions', (448, 1024)) + self.declare_parameter('shift_constant', 1) + self.declare_parameter('roi_dimensions', [0, 0, 100, 100]) + self.declare_paramter('precision', 'fp32') + + self.video_path = self.get_parameter('video_path').get_parameter_value().string_value + self.loop = self.get_parameter('loop').get_parameter_value().integer_value + self.frame_rate = self.get_parameter('frame_rate').get_parameter_value().integer_value + self.dimensions = tuple(self.get_parameter('model_dimensions').get_parameter_value().integer_array_value) + self.shift_constant = self.get_parameter('shift_constant').get_parameter_value().integer_value + self.roi_dimensions = self.get_parameter('roi_dimensions').get_parameter_value().integer_array_value + self.precision = self.get_parameter('precision').get_parameter_value().string_value + + self.pointer_publisher = self.create_publisher(String, 'preprocessing_done', 10) + self.image_service = self.create_service(Image, 'image_data', self.image_callback) + + self.velocity = [0, 0, 0] + self.image_queue = queue.Queue() + self.bridge = CvBridge() + self.publish_video_frames() + # propagate fp16 option (.fp16 datatype for cupy) + if self.precision == 'fp32': + pass + elif self.precision == 'fp16': + pass + else: + self.get_logger().error(f"Invalid precision: {self.precision}") + + def image_callback(self, response): + self.get_logger().info("Received image request") + if not self.image_queue.empty(): + image_data = self.image_queue.get() # Get the image from the queue + cv_image, velocity = image_data # unpack tuple (image, velocity) + + # Convert OpenCV image to ROS2 Image message using cv_bridge + ros_image = self.bridge.cv2_to_imgmsg(cv_image, encoding='rgb8') + # Create a new header and include velocity in the stamp fields + header = Header() + current_time = self.get_clock().now().to_msg() + header.stamp = current_time # Set timestamp to current time + header.frame_id = str(velocity) # Set frame ID to velocity + + ros_image.header = header # Attach the header to the ROS image message + response.image = ros_image # Set the response's image + response.image = ros_image # Set the response's image + return response + + else: + self.get_logger().error("Image queue is empty") + return response + + def publish_video_frames(self): + if not os.path.exists(self.video_path): + self.get_logger().error(f"Video file not found at {self.video_path}") + return + + cap = cv2.VideoCapture(self.video_path) + if not cap.isOpened(): + self.get_logger().error(f"Failed to open video: {self.video_path}") + return + + loops = 0 + while rclpy.ok() and (self.loop == -1 or loops < self.loop): + while cap.isOpened() and rclpy.ok(): + ret, frame = cap.read() + if not ret: + break + self.image_queue.put((frame, self.velocity[0])) + self.preprocess_image(frame) + time.sleep(1.0 / self.frame_rate) + + if self.loop > 0: + loops += 1 + + if self.loop != -1: + cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # restart video + cap.release() + + def preprocess_image(self, image): + tic = time.perf_counter_ns() + + roi_x, roi_y, roi_w, roi_h = self.roi_dimensions + shifted_x = roi_x + abs(self.velocity[0]) * self.shift_constant + + gpu_image = cv2.cuda_GpuMat() + gpu_image.upload(image) + + gpu_image = cv2.cuda.cvtColor(gpu_image, cv2.COLOR_RGBA2RGB) # remove alpha channel + gpu_image = gpu_image[roi_y:(roi_y+roi_h), shifted_x:(shifted_x+roi_w)] # crop the image to ROI + gpu_image = cv2.cuda.resize(gpu_image, self.dimensions) # resize to model dimensions + + input_data = cp.asarray(gpu_image) # Now the image is on GPU memory as CuPy array + input_data = input_data.astype(cp.float32) / 255.0 # normalize to [0, 1] + input_data = cp.transpose(input_data, (2, 0, 1)) # Transpose from HWC (height, width, channels) to CHW (channels, height, width) + input_data = cp.ravel(input_data) # flatten the array + + d_input_ptr = input_data.data.ptr # Get device pointer of the CuPy array + ipc_handle = cuda.mem_get_ipc_handle(d_input_ptr) # Create the IPC handle + + toc = time.perf_counter_ns() + self.get_logger().info(f"Preprocessing: {(toc-tic)/1e6} ms") + + # Publish the IPC handle as a string (sending the handle reference as a string) + ipc_handle_msg = String() + ipc_handle_msg.data = str(ipc_handle.handle) + self.pointer_publisher.publish(ipc_handle_msg) + +def main(args=None): + rclpy.init(args=args) + video_node = VideoNode() + executor = MultiThreadedExecutor(num_threads=1) + executor.add_node(video_node) + + try: + executor.spin() + finally: + executor.shutdown() + video_node.destroy_node() + rclpy.shutdown() + +if __name__ == '__main__': main() \ No newline at end of file