본 코드는 이전 Detection 프로그래밍과는 다른 방식의, 사전에 촬영한 비디오 영상으로 물체 검출을 실행하는 코드로,
image_tools패키지의cam2imagepublisher를 이용하지 않고 영상을 이용하도록 한 코드입니다.
# 현재 예시 영상은 MS-COCO 데이터 셋 내 임의의 영상을 비디오로 만들어 제공합니다.
gdown <https://drive.google.com/uc?id=**1STqlqdXEVVZ9h3o8Ozvbz7GLT5zYn-U5**&export=download>
# ros2_ws/src/ssd_detection/ssd_detection/video_publisher.py
$ cd ~/ros2_ws/src/ssd_detection/ssd_detection/
$ gedit video_publisher.py
# ros2_ws/src/ssd_detection/ssd_detection/video_publisher.py
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from sensor_msgs.msg import Image
import os
import time
import cv2
from cv_bridge import CvBridge
class ImagePublisher(Node):
def __init__(self):
super().__init__('image_publisher')
self.publisher = self.create_publisher(
Image, # Topic msg type (sensor_msgs.msg.Image)
'image', # Topic msg name
QoSProfile(depth=10)) # Set QoS (Quality of Service)
self.timer = self.create_timer(1, self.time_callback) # Calls the time_callback every 1 second (second, callback)
self.video_path = 'src/ssd_detection/video/coco_video.mp4' # Set path video in
if not os.path.isfile(self.video_path):
self.get_logger().info(f'No where video. please check the path: {self.video_path}')
# cv2.VideoCapture: <https://docs.opencv.org/3.4/d8/dfe/classcv_1_1VideoCapture.html#a949d90b766ba42a6a93fe23a67785951:~:text=%E2%97%86-,VideoCapture(),-%5B3/5%5D>
self.cap = cv2.VideoCapture(self.video_path)
self.bridge = CvBridge() # Image(message) ↔ Image(OpenCV)
self.n_frame = 0 # Measure frames
def time_callback(self):
ret, frame = self.cap.read() # Read frame (opencv format) (read success/fail, frame)
if ret:
fra = self.bridge.cv2_to_imgmsg(frame, 'bgr8') # Image(OpenCV) -> Image(message)
self.publisher.publish(fra) # publish to subscriber
self.n_frame += 1
self.get_logger().info(f'Publishing images: [{self.n_frame}]')
else:
self.get_logger().info(f'End of video. Fail to read frame!')
def main(args=None):
rclpy.init(args=args) # Initialize the ROS2 python client library
node = ImagePublisher() # Create ImagePublisher named node
try:
rclpy.spin(node) # Keep the node active and process callbacks as they come in
except KeyboardInterrupt:
node.get_logger().info('Keyboard Interrupt (SIGINT)') # Log a message if interrupted by keyboard (e.g. Ctrl + C)
finally:
node.destroy_node() # Destroy the node
rclpy.shutdown() # Shutdown the system
if __name__ == '__main__':
main()
setup.py 내 entry_points에 다음의 내용을 추가해야 합니다. (video_publisher로 지정되는 이름은 변경해도 됩니다.)entry_points={
'console_scripts': [
'detection_example = ssd_detection.ssd_detection:main',
'video_publisher = ssd_detection.video_publisher:main',
],
이제 코드를 실행해봅시다!
실행 과정( ros2 run 실행 전에 수행해야 하는 코드)
먼저 실행을 위한 경로로 이동하여 ROS2 실행 환경을 실행합니다.
$ cd ~/ros2_ws
$ source /opt/ros/foxy/setup.bash
그 다음에 빌드를 수행합니다.
$ colcon build --symlink-install --packages-select ssd_detection
마지막으로 로컬에 위치한 패키지의 환경 변수를 설정하기 위해서 setup file을 source 합니다.
$ source install/local_setup.bash
그럼 이제 실행해볼까요!
# 터미널1 (ros2_ws)
$ ros2 run ssd_detection video_publisher