본 코드는 이전 Detection 프로그래밍과는 다른 방식의, 사전에 촬영한 비디오 영상으로 물체 검출을 실행하는 코드로, image_tools 패키지의 cam2image publisher를 이용하지 않고 영상을 이용하도록 한 코드입니다.

(1) 영상 준비하기

# 현재 예시 영상은 MS-COCO 데이터 셋 내 임의의 영상을 비디오로 만들어 제공합니다.
gdown <https://drive.google.com/uc?id=**1STqlqdXEVVZ9h3o8Ozvbz7GLT5zYn-U5**&export=download>

(2) 비디오 publish를 위한 코드 작성

(2-1) video_publisher.py 생성

# ros2_ws/src/ssd_detection/ssd_detection/video_publisher.py
$ cd ~/ros2_ws/src/ssd_detection/ssd_detection/
$ gedit video_publisher.py

(2-2) video_publisher.py 작성

# ros2_ws/src/ssd_detection/ssd_detection/video_publisher.py
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile

from sensor_msgs.msg import Image

import os
import time
import cv2
from cv_bridge import CvBridge

class ImagePublisher(Node):
    def __init__(self):
        super().__init__('image_publisher')
        
        self.publisher = self.create_publisher(
            Image, # Topic msg type (sensor_msgs.msg.Image)
            'image', # Topic msg name
            QoSProfile(depth=10)) # Set QoS (Quality of Service)
        
        self.timer = self.create_timer(1, self.time_callback) # Calls the time_callback every 1 second (second, callback)
        self.video_path = 'src/ssd_detection/video/coco_video.mp4' # Set path video in
        if not os.path.isfile(self.video_path):
            self.get_logger().info(f'No where video. please check the path: {self.video_path}')
        
        # cv2.VideoCapture: <https://docs.opencv.org/3.4/d8/dfe/classcv_1_1VideoCapture.html#a949d90b766ba42a6a93fe23a67785951:~:text=%E2%97%86-,VideoCapture(),-%5B3/5%5D>
        self.cap = cv2.VideoCapture(self.video_path)
        
        self.bridge = CvBridge() # Image(message) ↔ Image(OpenCV)
        self.n_frame = 0 # Measure frames
        
        
    def time_callback(self):
        ret, frame = self.cap.read() # Read frame (opencv format) (read success/fail, frame)
        if ret:
            fra = self.bridge.cv2_to_imgmsg(frame, 'bgr8') # Image(OpenCV) -> Image(message) 
            self.publisher.publish(fra) # publish to subscriber
            self.n_frame += 1
            self.get_logger().info(f'Publishing images: [{self.n_frame}]')
        else:
            self.get_logger().info(f'End of video. Fail to read frame!')

					
def main(args=None):
    rclpy.init(args=args)  # Initialize the ROS2 python client library
    node = ImagePublisher()  # Create ImagePublisher named node
    try:
        rclpy.spin(node)  # Keep the node active and process callbacks as they come in
    except KeyboardInterrupt:
        node.get_logger().info('Keyboard Interrupt (SIGINT)')  # Log a message if interrupted by keyboard (e.g. Ctrl + C)
    finally:
        node.destroy_node()  # Destroy the node
        rclpy.shutdown()  # Shutdown the system

if __name__ == '__main__':
    main()

(3) Add an entry point


entry_points={
        'console_scripts': [
        'detection_example = ssd_detection.ssd_detection:main',
        'video_publisher = ssd_detection.video_publisher:main',
        ],

(4) Build and run


# 터미널1 (ros2_ws)
$ ros2 run ssd_detection video_publisher