mqtt_bridgeのrostestをcircleciで動かしてみた


概要

最近 ROS を触れてないのですが、2016年に作ったmqtt_bridge のissueやPRを放置してしまっていたので、12月に入ってから改めてテコ入れしはじめました。python3 対応したいんですが、動作確認環境もままならない環境で手を入れるのもあれなんで、取り急ぎはということで rostest 対応して circleci で動かしました。実装内容はこちら

工夫したこと

publisher を安定化

ROS の publisher には、インスタンスを生成した直後に publish しても subscriber にデータが届かない問題(仕様)があります。これだとテストを書くのに困ります。以下のように、publisher 生成後に rosmaster の subscriber 数と、publisher に紐づいた subscriber 数が一致するまで待つロジックを入れることで、解決しました。昔、社内向けに工夫していた実装ですが、bitbucketを掘り起こして見つけました。昔の自分、ありがとう!

def get_publisher(self, topic_path, msg_type, **kwargs):
    pub = rospy.Publisher(topic_path, msg_type, **kwargs)
    num_subs = len(_get_subscribers(topic_path))
    for i in range(10):
        num_cons = pub.get_num_connections()
        if num_cons == num_subs:
            return pub
        time.sleep(0.1)
    raise RuntimeError("failed to get publisher")

def _get_subscribers(self, topic_path):
    ros_master = rosgraph.Master('/rostopic')
    topic_path = rosgraph.names.script_resolve_name('rostopic', topic_path)
    state = ros_master.getSystemState()
    subs = []
    for sub in state[1]:
        if sub[0] == topic_path:
            subs.extend(sub[1])
    return subs

MagicMock で pubsub をテスト

unittest.mock が python2.7 向けに backport された mock を使って pubsub をテストしました。

ros_callback_ping = MagicMock()
rospy.Subscriber("/ping", Bool, ros_callback_ping)
ros_callback_ping.assert_called_once_with(Bool(True))

みたいなコードを書くことで、 subscriber が Bool(True) の値で1回だけ呼び出されることをテストできます。

CircleCI で rostest

現職では CI に CircleCI を使っているのですが、ローカル作業用のコマンド circleci が便利です。
ros の docker ファイルを使ったテストを .circleci/config.yml を書いて、

circleci local execute --job rostest

のように実行すると、ローカルで簡単に rostest を実行できます。

まとめ

久々にROS触ると楽しいですね。次は python3 + noetic 対応です。