ROSを始めよう その5


前回の内容

その4

今回の内容

今回はActionについてみていきましょう。
参考:http://wiki.ros.org/ja/actionlib_tutorials/Tutorials

Action

actionlib

ここまでTopicやServiceを見てきましたが、ロボットの自律移動の指令など、「長時間かかるのでTopicのように非同期通信にしたいけど、Serviceのように成否は知りたい」というときもあると思います。そんなときactionlibというライブラリを用いれば、これが可能となります。仕組みとしては、指令を出すTopicと結果を返すTopicを扱いやすくまとめてくれています。

Action Messageの作成

ActionをするにはAction Messageというものを定義する必要があります。
まず、ros_beginnerの下にactionというディレクトリを作成し、その中に以下のようなファイルを作成しましょう。

GoUntilBumper.action
geometry_msgs/Twist target_vel
int32 timeout_sec
---
bool bumper_hit
---
geometry_msgs/Twist current_vel

一番上がGoalと呼ばれる引数のようなもので、目的の値を表します。
真ん中がResultと呼ばれ、その成否を主に返します。
一番下がFeedbackと呼ばれ、途中経過として返したい情報を発行します。

次に、CMakeLists.txtを変更しましょう。
find_packageの中に、geometry_msgsとactionlib_msgsを足します。
60行目あたりのadd_action_filesをコメントアウトし、actionファイルはGoUntilBumper.actionに変更します。
その下のgenerate_messagesの中に、geometry_msgsとactionlib_msgsを追加します。

最後に~/catkin_wsで$ catkin_makeをしてビルドしましょう。
$ ls devel/share/ros_beginner/msg/とすると、msgファイルがたくさんできているはずです。
これで準備完了です。

ActionServerの作成

bumper_action.py
#!/usr/bin/env python
import rospy
from geometry_msgs.msg import Twist
from kobuki_msgs.msg import BumperEvent
import actionlib
from ros_beginner.msg import GoUntilBumperAction
from ros_beginner.msg import GoUntilBumperResult
from ros_beginner.msg import GoUntilBumperFeedback

class BumperAction(object):
    def __init__(self):
        self._pub = rospy.Publisher('/mobile_base/commands/velocity', Twist,
                                    queue_size=10)
        self._sub = rospy.Subscriber('/mobile_base/events/bumper',
                                     BumperEvent, self.bumper_callback, queue_size=1)
        # Parameterから最大速度を取得
        self._max_vel = rospy.get_param('~max_vel', 0.5)
        # bumper_actionという名前でGoUntilBumperAction型のActionを作成(self.go_until_bumperが実際の動作)
        self._action_server = actionlib.SimpleActionServer(
            'bumper_action', GoUntilBumperAction, 
            execute_cb=self.go_until_bumper, auto_start=False)
        # バンパーが当たったかどうか
        self._hit_bumper = False
        # ActionServerをスタートさせる
        self._action_server.start()

    # バンパーに当たった時の処理、Subscriberに呼ばれる
    def bumper_callback(self, bumper):
        self._hit_bumper = True

    # ActionServerの実体
    def go_until_bumper(self, goal):
        print(goal.target_vel)
        r = rospy.Rate(10.0)
        zero_vel = Twist()
        for i in range(10 * goal.timeout_sec):
            # 外部から停止指令が来ているかどうか
            if self._action_server.is_preempt_requested():
                self._action_server.set_preempted()
                break
            # バンパーにぶつかっているかどうか
            if self._hit_bumper:
                self._pub.publish(zero_vel)
                break
            else:
                # 目標の速度でロボットを動かす
                if goal.target_vel.linear.x > self._max_vel:
                    goal.target_vel.linear.x = self._max_vel
                self._pub.publish(goal.target_vel)
                # 現在の速度を途中経過としてFeedbackで返す
                feedback = GoUntilBumperFeedback(current_vel=goal.target_vel)
                self._action_server.publish_feedback(feedback)
            r.sleep()
        # 実行結果を返す
        result = GoUntilBumperResult(bumper_hit=self._hit_bumper)
        self._action_server.set_succeeded(result)

if __name__ == '__main__':
    rospy.init_node('bumper_action')
    bumper_action = BumperAction()
    # 無限ループでActionServerが呼ばれるのを待つ
    rospy.spin()

ActionClientの作成

bumper_client.py
#!/usr/bin/env python
import rospy
import actionlib
from ros_beginner.msg import GoUntilBumperAction
from ros_beginner.msg import GoUntilBumperGoal

def go_until_bumper():
    # bumper_actionという名前でGoUntilBumperAction型のActionClientを作成
    action_client = actionlib.SimpleActionClient('bumper_action', GoUntilBumperAction)
    # ActionServerの準備を待つ
    action_client.wait_for_server()
    # 目標の値をセットする
    goal = GoUntilBumperGoal()
    goal.target_vel.linear.x = 0.8
    goal.timeout_sec = 10

    # ゴールを送る
    action_client.send_goal(goal)
    # 結果を待つ
    action_client.wait_for_result()
    # 結果を取得
    result = action_client.get_result()
    if result.bumper_hit:
        rospy.loginfo('bumper hit!!')
    else:
        rospy.loginfo('failed')

if __name__ == '__main__':
    try:
        rospy.init_node('bumper_client')
        go_until_bumper()
    except rospy.ROSInterruptException:
        pass

実行

$ roslaunch kobuki_gazebo kobuki_playground.launchでシミュレーターを立ち上げ、
いつものようにchmodで実行可能にした後、
$ rosrun ros_beginner bumper_action.py
$ rosrun ros_beginner bumper_client.pyを実行しましょう。
ロボットの移動が完了すると、client側に結果が表示されるはずです。

Pythonライブラリにする

今の機能をライブラリにしてみましょう。
そのためにはまず、以下の手順で、~/catkin_ws/src/ros_beginner/src/ros_beginner/を作る必要があります。

$ cd ~/catkin_ws/src/ros_beginner
$ mkdir -p src/ros_beginner
スクリプトをディレクトリ内にコピー
$ cp scripts/bumper_action.py src/ros_beginner/
$ cp scripts/bumper_client.py src/ros_beginner/
空のファイル__init.pyを作成(ライブラリとして認識されるために必要)
$ touch src/ros_beginner/__init__.py

次に、以下の~/catkin_ws/src/ros_beginner/setup.pyを作成します。

setup.py
from distutils.core import setup
from catkin_pkg.python_setup import generate_distutils_setup

setup_args = generate_distutils_setup(
    packages=['ros_beginner'],
    package_dir={'': 'src'},
)

setup(**setup_args)

次にCMakeList.txtの23行目あたりのcatkin_python_setup()をコメントアウトしてください。

最後に~/catkin_wsでcatkin_makeをすれば準備完了です。

ライブラリを使ってActionServerプログラムを書くと、

bumper_action_use_lib.py
#!/usr/bin/env python
import rospy
from ros_beginner.bumper_action import BumperAction

if __name__ == '__main__':
    rospy.init_node('bumper_action_use_lib')
    bumper_action = BumperAction()
    rospy.spin()

ActionClientプログラムを書くと、

bumper_client_use_lib.py
#!/usr/bin/env python
import rospy
from ros_beginner.bumper_client import go_until_bumper
rospy.init_node('bumper_client_use_lib')
go_until_bumper()

こんなに分かりやすくなりましたね。

まとめ

今回はActionをみていきました。
次回はROSのGUIについてみていきましょう。

その6