TouchDesignerだけで非同期通信する方法


はじめに

TouchDesigner(以下TD) ではpythonスクリプトを実行することができますが、
HTTP通信のような処理に時間のかかるものだと、その処理が終わるまでちょっと固まってしまいます。
このとき、TD全体のタイムラインも止まってしまい描画が更新されない状態になってしまうので、
それを防ぐために非同期で通信する方針にしました。

ただ非同期で値を取ってきたいだけであれば、TDに加えてnodeなどなどを使えば良かったりもするのですが、
環境の都合によりTDだけで完結させたいという状況だったので、以下の方法を検討しました。

  • Web DAT を使う
  • threading モジュールを使う

これらについて、それぞれ説明していきます。

サンプルコード

今回のサンプルを以下にアップしました。
https://github.com/genkitoyama/TD_with_asynchronous_communication

Web DAT を使う

これがいちばん簡単です。
パラメータのAsynchronous Fetch をONにしている状態で、
URLを入力して Fetch ボタンをpulseしてあげれば、
TDのタイムラインは止まらずに裏で(たぶんcurlとか?)通信をしてくれます。
参考: https://docs.derivative.ca/Web_DAT


(上記の例では、国土地理院の標高を求めるAPIを使用しています)

GETの場合はURLに直接入れてしまえばOKで、
POSTの場合は、上側のインレット Input 0 に送りたいデータのtableを繋げた状態で Submit ボタンをpulseすればOKです。(ちなみに下側のインレットにはカスタムHTTPヘッダを入れることができます)

そして返ってきたものの形式に合わせて、XML DATjsonモジュールなどを使って必要な値を引っ張ってくれば完成です。

参考: TouchDesignerでJSONをDAT Tableにパースする

threading モジュールを使う

threadingモジュール は、マルチスレッドで並列処理をしてくれるモジュールです。
TD内部のpython3.5に標準で入っています。

threading.Thread() の引数に、別スレッドで実行したい関数およびその関数の引数を入れることで、別スレッドで実行してくれます。

ここでは、HTTP通信するモジュールとして、requestsを使用しています。
requestsもTD内部のpythonに標準で入っています。
(他にもHTTP通信するモジュール何種類かありますが、お好きなもので良い気がしています)


import threading
import requests

class HttpRequest():
    def __init__(self):
        self.url = op('URL').text
        self.city_id = op('CITY_ID').text

    #非同期で実行したい関数
    def request(self):
        response = requests.get(self.url, params={'city':self.city_id})

    def start_request(self):
        myThread = threading.Thread(target=self.request)
        myThread.start()

こちらは Matthew Ragan大先生の記事を大いに参考にさせていただきました。

欠点と対策

threading モジュールもとても便利なのですが、欠点がいくつかあります。

  • 別スレッドでの処理がいつ終了したかを検知できない
  • 別スレッドの処理中にTDのオペレータを参照できない

別スレッドでの処理がいつ終了したかを検知できない

メインスレッドの方から停止の命令は送れるのですが、
別スレッドでの処理が終了したタイミングを検知することができません。
join()メソッドを使えば、別スレッドの終了を待ち合わせることはできるのですが、
その間メインスレッドが止まってしまうので、結局TDは止まってしまいます。。

なので、「別スレッドでURL叩いて値が返ってきたらこの処理を実行する」的なことが簡単にはできません。

別スレッドの処理中にTDのオペレータを参照できない

これもよく考えれば当然ではあるのですが、別のスレッドで処理を回しているので、
TDのメインスレッドに存在しているオペレータにアクセスすることができません。

なので、「別スレッドでURL叩いて返ってきた結果を、同じ関数内でそのまま Table DAT にパースする」的なことはできません。

(ちなみに、それをするとこんなダイアログが出ます)

対策

なので、結構な力技ですが、値が返ってきたかどうか(≒ 別スレッドの処理が終了したかどうか)を Timer CHOP で一定時間おきに監視することにしました。

詳しくはサンプルを見ていただきたいのですが、ざっくりいうと以下のような流れになっています。

  1. 別スレッドで実行する関数の中に、得られた値をメンバ変数に格納する処理を書いておく
  2. Timer CHOPのcallbackの onDone()の中で、その変数に値が入っているかどうかを確認する
  3. 入っていればタイマーは終了、別の処理へ進む
  4. 入っていなかったら、タイマーを再スタートさせる

上記の例では、livedoor社のWeather Hacksを使用しています。このAPIだと一瞬で結果が返ってくるので、この方法でやってもあまり旨味はないのですが、自分が以前使っていたオリジナルのAPIだと都合により返ってくるまで7,8秒かかるものだったので、この方法を考案しました。

pythonは、以下のように RequestManagerクラスと、HttpRequestクラスを作ってみました。


import http_request
import json

class RequestManager():
    def __init__(self):
        self.http = http_request.HttpRequest()
        self.can_start_request = True           #リクエストをしても良いか
        self.max_confirmation_count = 10        #最大確認カウント数
        self.current_confirmation_count = 0     #現在の確認カウント数
        self.timer = op('timer1')

    def init_timer(self):
        print('init http request timer')

    def start_timer(self):
        #リクエストをしても良いならリクエストを投げる、ダメなら待つ
        if self.can_start_request == True:
            print('start http request timer')
            self.http.start_request()
        else:
            print('restart timer')

    def done_timer(self):
        print('done http request timer')
        #値が返ってきたかを確認する
        output = self.http.get_data()
        #値が入っていなければ確認カウントを増やしてもう一回
        if output is None:
            print('http response is none')
            self.can_start_request = False
            self.current_confirmation_count += 1
            #確認カウントが最大になったら通信失敗と判断
            if self.current_confirmation_count >= self.max_confirmation_count:
                print('http out of request')
                self.timer.par.initialize.pulse()
                self.http = http_request.HttpRequest()
                self.current_confirmation_count = 0
            else:
                self.timer.par.start.pulse()
        else:
        #値が入っていればjsonをパースして格納
            self.can_start_request = True
            out_json = json.loads(output)
            op('out_json')[0,0].val = out_json['forecasts'][0]

            self.current_confirmation_count = 0
            self.timer.par.initialize.pulse()

import threading
import requests

class HttpRequest():
    def __init__(self):
        self.url = op('URL').text
        self.city_id = op('CITY_ID').text
        self.out_data = None
        print('http request init')

    def request(self):
        get_data = {'city':self.city_id}
        response = requests.get(self.url, params=get_data)
        self.out_data = response.text
        print(self.out_data)

    def start_request(self):
        self.out_data = None
        myThread = threading.Thread(target=self.request)
        myThread.start()

    def get_data(self):
        return self.out_data

おわりに

  • TouchDesignerだけで非同期通信する方法について、2種類をまとめました。
  • python初心者なので、特に最後の力技の部分、もっと良い方法を思いついた方などいらっしゃいましたら是非コメントいただけると嬉しいです…!

おまけ

  • 今回、TouchPlayerを使って複数端末でそれぞれ動かしていたのですが、TouchPlayerだとTextportが見れないので、ログを外部ファイルに吐きだしてエラーが出たらそれを確認するという方法をとっていました。
    • (TouchDesignerをReadOnlyで開くという選択肢もあります。)
  • その際、ログを吐き出すモジュールとしてloguruがとても便利だったのでオススメです。

参考にしたリンク集