ROSによるPublicとSubscription

試しにPythonでROS2のプログラミングを行います。

プロジェクトの生成

SSHログイン後次の命令を実行します。

$ source /opt/ros/foxy/setup.bash

次に、ros2_wsフォルダとその中にsrcフォルダを作成します。

$ mkdir -p ~/ros2_ws/src
$ cd ~/ros2_ws/src/

ROSプロジェクトを作成し、そのフォルダに移動します。

$ ros2 pkg create  --build-type ament_python rpi_robot_py
$ cd rpi_robot_py

以下のようなファイル構成になっているはずです。rpirobotpyフォルダの中に自作のPythonプログラムを作っていきます。

├── package.xml
├── resource
│   └── rpi_robot_py
├── rpi_robot_py
│   ├── __init__.py
├── setup.cfg
├── setup.py
└── test
    ├── test_copyright.py
    ├── test_flake8.py
    └── test_pep257.py

Publishノードの作成

「publisher.py」としてrpirobotpyに保存します。

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

# Nodeクラスを継承
class Talker(Node):
    def __init__(self):
        super().__init__("talker")
        self.count = 0
        self.pub = self.create_publisher(String, "chatter")
        self.time_period = 1.0
        self.tmr = self.create_timer(self.time_period, self.callback)

    def callback(self):
        self.count += 1
        msg = String()
        msg.data = "HELLO PUBLISHER! [{0}]".format(self.count)
        self.get_logger().info("Publishing: {0}".format(msg.data))
        self.pub.publish(msg)

def main():
    # rclpy.init()でRCLを初期化
    rclpy.init(args=args)
    # Nodeのインスタンス化
    node = Talker()
    # Node処理をループさせ実行
    rclpy.spin(node)
    # Nodeを破壊
    node.destroy_node()
    # RCLを終了
    rclpy.shutdown()

if __name__ == "__main__":
    main()

Subscribe

Publishされた内容を受け取るプログラムを実装します。

「subscriber.py」としてrpirobotpyに保存します。

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

# Nodeクラスを継承
class Listener(Node):
    def __init__(self):
        super().__init__("listener")
        self.create_subscription(String, "chatter", self.callback)

    def callback(self, msg):
        print(msg.data)


def main():
    # RCLの初期化を実行
    rclpy.init()
    # Listenerクラスのコンスタンス化
    node = Listener()
    # ループに入りnode内の処理を実行させる
    rclpy.spin(node)
    # nodeを破壊
    node.destroy_node()
    # RCLをシャットダウン
    rclpy.shutdown()

if __name__ == "__main__":
    main()

実行の準備

launch

launchプログラムを作成します。

$ cd ~/ros2_ws/src/rpi_robot_py
$ mkdir launch
$ cd launch
$ touch rpi_robot.launch.py

以下のようにコードを書きます。

from launch import LaunchDescription
from launch_ros.actions import Node

def generate_launch_description():
    return LaunchDescription([
        Node(
            package='rpi_robot_py',
            node_executable='publisher',
        ),
        Node(
            package='rpi_robot_py',
            node_executable='listener',
            output='screen'
        )
    ])

setup

setup.pyを以下のように変更します。

from glob import glob
from setuptools import setup

package_name = 'rpi_robot_py'

setup(
    name=package_name,
    version='0.0.0',
    packages=[package_name],
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        ## 'launch/*.launch.py'用に追記
        ('share/' + package_name, glob('launch/*.launch.py'))
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='ns',
    maintainer_email='ns@hoge.com',
    description='sample robot program',
    license='',
    tests_require=['pytest'],
    ## 追記
    entry_points={
        'console_scripts': [
            'publisher = rpi_robot_py.publisher:main',
            'listener = rpi_robot_py.subscriber:main',
        ],
    },
)

build

ビルドします。

$ cd ~/ros2_ws
$ colcon build
$ . install/setup.bash

実行

launchファイルを使って3つのプログラムを全部起動します。

$ ros2 launch rpi_robot_py rpi_robot.launch.py

ZIDO Corp. ALL RIGHTS RESERVED.