ROSの勉強 第8弾:少し発展的なアクション


#プログラミング ROS< 少し発展的なアクション >

はじめに

1つの参考書に沿って,ROS(Robot Operating System)を難なく扱えるようになることが目的である.その第8弾として,少し発展的なアクションを扱う.

環境

仮想環境
ソフト VMware Workstation 15
実装RAM 2 GB
OS Ubuntu 64 ビット
isoファイル ubuntu-mate-20.04.1-desktop-amd64.iso
コンピュータ
デバイス MSI
プロセッサ Intel(R) Core(TM) i5-7300HQ CPU @ 2.50GHz 2.50GHz
実装RAM 8.00 GB (7.89 GB 使用可能)
OS Windows (Windows 10 Home, バージョン:1909)
ROS
Distribution noetic
プログラミング言語 Python 3.8.5

基礎的なアクションとの違い

後にソースコードは示すが,ここで扱うアクションではフィードバックの機能や強制終了,処理中断のようなアクションの特有ともいえるものまで扱い,前回よりももう少し発展したタイマーの作成を例として行う.

実装

今回は,サーバーとクライアントの両方をともに示すため,ソースコードは2つある.なお,ソースコード内でプログラムの説明は詳細に記述されている.

ソースコード①(サーバー)
fancy_action_server.py
#! /usr/bin/env python3

import rospy

import time         #タイマー作成時に必要となる
import actionlib    #SimpleActionServerクラスを提供 → アクションのサーバー作成に必要と解釈
from action_lesson.msg import TimerAction, TimerGoal, TimerResult, TimerFeedback    #Timer.actionファイルから自動生成したメッセージクラスで必要なものを読み込む


def do_timer(goal):
    """新しいゴール(目標)を受け取ったときに呼び出される関数"""

    start_time = time.time()
    updates_count = 0           #更新回数を格納する変数を0で初期化

    #60秒を超えるタイマーは使ってほしくない → 打ち切る(強制終了)
    if goal.time_to_wait.to_sec() > 60:
        """タイマーに制限を持たせるための処理"""
        result = TimerResult()  #TimerResultの型のリザルトメッセージ(result)を作成 ←これを作成しないと,定義ファイル内のリザルトの部分にアクセスできない
        result.time_elapsed = rospy.Duration.from_sec(time.time() - start_time) #「現在時刻-開始時刻」をDuration(ROSで扱う期間)に変換してtime_elapsedフィールドに代入
        result.updates_sent = updates_count     #更新回数を記録
        server.set_aborted(result, "Timer aborted due to too-long wait")    #set_aborted()関数により,強制終了
        return

    #一度にsleepを使って目標時間の経過を待つのでなくwhileを使って刻むように時間を経過させることで,途中干渉を可能としている
    while (time.time() - start_time) < goal.time_to_wait.to_sec():

        if server.is_preempt_requested():
            """中断リクエストに対応する処理"""
            result = TimerResult()  #TimerResultの型のリザルトメッセージ(result)を作成 ←これを作成しないと,定義ファイル内のリザルトの部分にアクセスできない
            result.time_elapsed = rospy.Duration.from_sec(time.time() - start_time) #「現在時刻-開始時刻」をDuration(ROSで扱う期間)に変換してtime_elapsedフィールドに代入
            result.updates_sent = updates_count     #更新回数を記録
            server.set_preempted(result, "Timer preempted") #set_preempted()関数により,強制終了(リクエストがあれば中断)
            return

        #状況をfeedbackとして記録
        feedback = TimerFeedback()  #TimerFeedbackの型のフィードバックメッセージ(feedback)を作成 ←これを作成しないと,定義ファイル内のフィードバックの部分にアクセスできない
        feedback.time_elapsed = rospy.Duration.from_sec(time.time() - start_time)   #「現在時刻-開始時刻」をDuration(ROSで扱う期間)に変換してフィードバックのtime_elapsedフィールドに代入
        feedback.time_remaining = goal.time_to_wait - feedback.time_elapsed  #目標時間 - 経過時間で残り時間を計算し,フィードバックのtime_remainingに格納
        server.publish_feedback(feedback)   #クライアントに送る
        updates_count += 1  #更新回数を加算

        time.sleep(1.0) #1.0秒刻み(本当のタイマーとしては正確ではないが,あくまでもテスト)    

    result = TimerResult()  #TimerResultの型のリザルトメッセージ(result)を作成 ←これを作成しないと,定義ファイル内のリザルトの部分にアクセスできない
    result.time_elapsed = rospy.Duration.from_sec(time.time() - start_time) #「現在時刻-開始時刻」をDuration(ROSで扱う期間)に変換してtime_elapsedフィールドに代入
    result.updates_sent = updates_count     #今回は0として,更新回数は考えないこととする.
    server.set_succeeded(result, "Timer completed successsfully")   #resultを引数として,set_succeeded()関数を呼び出すことで,SimpleActionServerにゴールしたことを伝える.


rospy.init_node('timer_action_server')  #ノードの初期化

server = actionlib.SimpleActionServer('timer', TimerAction, do_timer, False)    #'timer'という名で,TimerActionというアクションの型 do_timerを実行.サーバーの自動起動を無効にするためFalseを指定

server.start()  #サーバー生成後,明示的にサーバーを開始する

rospy.spin()    #ループに入り,到着すべきゴールが送られるのを待つ

前に扱ったアクションサーバーとは処理内容(do_timer関数)のみ変更され,サーバーの組み立て自体の変化はないことが分かる.

ソースコード②(クライアント)
fancy_action_client.py
#! /usr/bin/env python3

import rospy

import time
import actionlib #SimpleActionClientを使うためのパッケージ
from action_lesson.msg import TimerAction, TimerGoal, TimerResult, TimerFeedback    #アクションにアクセスするためのクラス


def feedback_cb(feedback):
    """feedbackのcallback関数"""
    print(f"[Feedback] Time elapsed: {feedback.time_elapsed.to_sec()}") #経過時間の表示
    print(f"[Feedback] Time remaining: {feedback.time_remaining.to_sec()}") #残り時間の表示



rospy.init_node('timer_action_client') #ノードの初期化

#actionlibパッケージにあるSimpleActionClientクラスを使う(アクション利用の最も簡単な方法)
client = actionlib.SimpleActionClient('timer', TimerAction) #サーバー名,型ともに利用するサーバーっと一致させる必要がある

client.wait_for_server()    #サーバーの応答を待つ

goal = TimerGoal()  #goalという変数にTimerGoalのインスタンスを生成(TimerGoalの値にアクセス可能となる)
goal.time_to_wait = rospy.Duration.from_sec(5.0)    #5.0秒をタイマーにセットrospy.Duration.from_secでrosでの期間単位Durationに変換
#goal.time_to_wait = rospy.Duration.from_sec(500.0) #500.0秒をタイマーにセット→60秒を超えているので強制終了される

client.send_goal(goal, feedback_cb = feedback_cb)   #clientとしてgoalをサーバーに送ると同時にfeedback_cb関数を呼び出す

#time.sleep(3.0)    #3秒待つ
#client.cancel_goal()   #中断のリクエストはclient.cancel_goal()で行う

client.wait_for_result()    #サーバーからの結果を待つ

State = client.get_state()                                  #ゴールの状態を数値で取得(PREEMPTED=2, SUCCEEDED=3, ABORTED=4)
Status = client.get_goal_status_text()                      #サーバーで結果を送る際に第2引数に指定した文字列を取得
Time_elapsed = client.get_result().time_elapsed.to_sec()    #取得した結果の経過時間を秒単位に変換してresultに格納
Updates = client.get_result().updates_sent                  #更新回数を取得

print(f"[Result] State: {State}")           #ゴールの状態を表示
print(f"[Result] Status: {Status}")         #ゴールの状況報告を表示
print(f"[Result] Time elapsed: {Time_elapsed}") #経過時間を表示
print(f"[Result] Updates sent: {Updates}")  #更新回数を表示

クライアントもサーバー同様,基本的なクライアント構成は大きく変わらないが,サーバーでいろいろと対応できるようになった分,クライアントもいろいろとできるようになった.

結果

サーバーでは,タイマーを提供しているわけだが,決まりとして60秒を超える目標時間は設定できないようにしている.また,途中で強制中断の要請が入ったときには受け入れるようにしている.したがって,クライアントが受ける結果としては,「正常稼働」「強制終了」「強制中断」の3パターンである.以下にその時の様子をそれぞれ示す.

正常稼働


フィードバックの効果により,逐次状況報告がなされている.

強制終了


そもそも目標時間が規定外であれば,タイマーは作動しないようになっている.

強制中断


ここでは,3秒後中断のリクエストをサーバーに送っているので,3秒経過後にタイマーは終了し,そのときの結果を出力していることが分かる.

感想

今回までで,ROSの基礎中の基礎ともいえるトピック,サービス,アクションについて学んだ.だいぶ慣れてきたと思う.今回の内容もだいぶ理解しやすかった.次回からはこれらのツールを使って,いよいよ実践的な勉強に取り組めそうで,楽しみである.

参考文献

プログラミングROS Pythonによるロボットアプリケーション開発
        Morgan Quigley, Brian Gerkey, William D.Smart 著
                       河田 卓志 監訳
            松田 晃一,福地 正樹,由谷 哲夫 訳
                  オイラリー・ジャパン 発行