試しにPythonでROS2のプログラミングを行います。
プロジェクトの生成
SSHログイン後次の命令を実行します。
$ source /opt/ros/foxy/setup.bash
次に、ros2_wsフォルダとその中にsrcフォルダを作成します。
$ mkdir -p ~/ros2_ws/src
$ cd ~/ros2_ws/src/
ROSプロジェクトを作成し、そのフォルダに移動します。
$ ros2 pkg create --build-type ament_python rpi_robot_py
$ cd rpi_robot_py
以下のようなファイル構成になっているはずです。rpirobotpyフォルダの中に自作のPythonプログラムを作っていきます。
├── package.xml
├── resource
│ └── rpi_robot_py
├── rpi_robot_py
│ ├── __init__.py
├── setup.cfg
├── setup.py
└── test
├── test_copyright.py
├── test_flake8.py
└── test_pep257.py
Publishノードの作成
「publisher.py」としてrpirobotpyに保存します。
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
# Nodeクラスを継承
class Talker(Node):
def __init__(self):
super().__init__("talker")
self.count = 0
self.pub = self.create_publisher(String, "chatter")
self.time_period = 1.0
self.tmr = self.create_timer(self.time_period, self.callback)
def callback(self):
self.count += 1
msg = String()
msg.data = "HELLO PUBLISHER! [{0}]".format(self.count)
self.get_logger().info("Publishing: {0}".format(msg.data))
self.pub.publish(msg)
def main():
# rclpy.init()でRCLを初期化
rclpy.init(args=args)
# Nodeのインスタンス化
node = Talker()
# Node処理をループさせ実行
rclpy.spin(node)
# Nodeを破壊
node.destroy_node()
# RCLを終了
rclpy.shutdown()
if __name__ == "__main__":
main()
Subscribe
Publishされた内容を受け取るプログラムを実装します。
「subscriber.py」としてrpirobotpyに保存します。
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
# Nodeクラスを継承
class Listener(Node):
def __init__(self):
super().__init__("listener")
self.create_subscription(String, "chatter", self.callback)
def callback(self, msg):
print(msg.data)
def main():
# RCLの初期化を実行
rclpy.init()
# Listenerクラスのコンスタンス化
node = Listener()
# ループに入りnode内の処理を実行させる
rclpy.spin(node)
# nodeを破壊
node.destroy_node()
# RCLをシャットダウン
rclpy.shutdown()
if __name__ == "__main__":
main()
実行の準備
launch
launchプログラムを作成します。
$ cd ~/ros2_ws/src/rpi_robot_py
$ mkdir launch
$ cd launch
$ touch rpi_robot.launch.py
以下のようにコードを書きます。
from launch import LaunchDescription
from launch_ros.actions import Node
def generate_launch_description():
return LaunchDescription([
Node(
package='rpi_robot_py',
node_executable='publisher',
),
Node(
package='rpi_robot_py',
node_executable='listener',
output='screen'
)
])
setup
setup.pyを以下のように変更します。
from glob import glob
from setuptools import setup
package_name = 'rpi_robot_py'
setup(
name=package_name,
version='0.0.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
## 'launch/*.launch.py'用に追記
('share/' + package_name, glob('launch/*.launch.py'))
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='ns',
maintainer_email='ns@hoge.com',
description='sample robot program',
license='',
tests_require=['pytest'],
## 追記
entry_points={
'console_scripts': [
'publisher = rpi_robot_py.publisher:main',
'listener = rpi_robot_py.subscriber:main',
],
},
)
build
ビルドします。
$ cd ~/ros2_ws
$ colcon build
$ . install/setup.bash
実行
launchファイルを使って3つのプログラムを全部起動します。
$ ros2 launch rpi_robot_py rpi_robot.launch.py