SimpleHttpOperatorで日本語を使いたい

9221 ワード

tl;dr

  • SimpleHttpOperatorでは、デフォルトではdataパラメータで日本語送れないよ
  • encodeすると送れるようになるよ
  • マクロ(Jinjaテンプレート)が使えなくなるので、必要な場合はpre_execute経由すると良いよ

SimpleHttpOperatorとは

AirflowのOperatorの一つで名前の通りHTTP(S)のリクエストを行います。パラメータとしては、

  • method(GETとかPOSTとか)
  • data(POSTのbody、GETのURL param)
  • HTTPヘッダー
  • responseの扱い(ログに残すか、内容をチェックしTask成功の判断)
  • 認証

設定することができます

dataパラメータで日本語を送りたい

前述の通り、SimpleHttpOperatorではdataパラメータでリクエストに使うデータ(POSTのbody、GETのクエリパラメータ)を送ることができます。しかし、素直な実装では日本語の内容を送るとエラーになります。

例えば、以下のDAGを実行すると、

from datetime import datetime
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
with DAG(
    'simple_http_dag2',
    description='SimpleHttpOperator',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
    default_args={'retries': 1},
) as dag:
    SimpleHttpOperator(
        task_id='en_post',
        http_conn_id='',
        endpoint='https://httpbin.org/post',
        data='{"a": "b"}',
        log_response=True,
        headers={"Content-Type": "application/json"},
    )
    SimpleHttpOperator(
        task_id='ja_post',
        http_conn_id='',
        endpoint='https://httpbin.org/post',
        data='{"a": "日本語"}',
        log_response=True,
        headers={"Content-Type": "application/json"},
    )

英語のdataを送るTask Instanceは成功します(以下Task Instanceのログの抜粋)。

2022-05-06, 22:44:21 UTC] {http.py:129} INFO - Sending 'POST' to url: https://httpbin.org/post
[2022-05-06, 22:44:22 UTC] {http.py:106} INFO - {
  "args": {}, 
  "data": "{\"a\": \"b\"}", 
  "files": {}, 
  "form": {}, 
  "headers": {
    "Accept": "*/*", 
    "Accept-Encoding": "gzip, deflate", 
    "Content-Length": "10", 
    "Content-Type": "application/json", 
    "Host": "httpbin.org", 
    "User-Agent": "python-requests/2.27.1", 
    "X-Amzn-Trace-Id": "Root=1-6275a4c4-0902cdb667985a0364483f6b"
  }, 
  "json": {
    "a": "b"
  }, 
  "origin": "60.113.53.109", 
  "url": "https://httpbin.org/post"
}

日本語のdataを送るTask Instanceは失敗します。

UnicodeEncodeError: 'latin-1' codec can't encode characters in position 7-9: Body ('日本語') is not valid Latin-1. Use body.encode('utf-8') if you want to send it encoded in UTF-8.

エラーの原因

SimpleHttpOperatorは、HttpHookを経由してrequests.Requestを呼び出します。

            req = requests.Request(self.method, url, data=data, headers=headers, **request_kwargs)

requests.Requestのdataは、日本語の場合bytesにencoodeして渡す必要があります(例えばこのQita記事を参考)。SimpleHttpOperator・HttpHookでは引数をそのまま(※)渡しているため、dataパラメータに日本語を設定するとエラーになるわけです。

※正確にはマクロ(Jinja2テンプレート)を展開して

回避策1:encodeして渡す

Taskのdataパラメータをencodeするのが自明な回避策で、これで動きます。ただし、一点落とし穴があります(後述)。

前述のDAGを変えて実行してみます

    # Task定義以外は省略
    SimpleHttpOperator(
        task_id='ja_post',
        http_conn_id='',
        endpoint='https://httpbin.org/post',
	# encodeを追加
        data='{"a": "日本語"}'.encode('utf-8'),
        log_response=True,
        headers={"Content-Type": "application/json"},
    )

動いたようです

[2022-05-06, 23:04:19 UTC] {http.py:129} INFO - Sending 'POST' to url: https://httpbin.org/post
[2022-05-06, 23:04:20 UTC] {http.py:106} INFO - {
  "args": {}, 
  "data": "{\"a\": \"\u65e5\u672c\u8a9e\"}", 
  "files": {}, 
  "form": {}, 
  "headers": {
    "Accept": "*/*", 
    "Accept-Encoding": "gzip, deflate", 
    "Content-Length": "18", 
    "Content-Type": "application/json", 
    "Host": "httpbin.org", 
    "User-Agent": "python-requests/2.27.1", 
    "X-Amzn-Trace-Id": "Root=1-6275a972-3a0acaac0c3d666a4e35ddbd"
  }, 
  "json": {
    "a": "\u65e5\u672c\u8a9e"
  }, 
  "origin": "60.113.53.109", 
  "url": "https://httpbin.org/post"
}

回避策1の問題:マクロ(Jinjaテンプレート)が展開されない

Operatorのパラメータ(※)には、Jinjaテンプレートでマクロを記載することができます。回避策1でdataパラメータをencodeしてbytesで渡すと、マクロが展開されなくなります
実行時に決まる値(XComやAirflow Variables、DAG Runのパラメータ等)を使う場合は、マクロが必要ですので、これは嬉しくない話です。

※正確にはtemplate_fieldsで指定されているパラメータ。SimpleHttpOperatorの場合endpoint、data、headers

例えば下のDAGを実行すると、

    # Task定義以外は省略
    SimpleHttpOperator(
        task_id='macro_post',
        http_conn_id='',
        endpoint='https://httpbin.org/post',
        data='{{ run_id }} 日本語'.encode('utf-8'),
        log_response=True,
    )

エラーにこそなりませんが、httpbinの結果からマクロが展開されずに渡されたことがわかります(dataの部分の{}が残っていることに注目)。

[2022-05-06, 23:04:19 UTC] {http.py:129} INFO - Sending 'POST' to url: https://httpbin.org/post
[2022-05-06, 23:04:20 UTC] {http.py:106} INFO - {
  "args": {}, 
  "data": "{{ run_id }} \u65e5\u672c\u8a9e", 
  "files": {}, 
  "form": {}, 
  "headers": {
    "Accept": "*/*", 
    "Accept-Encoding": "gzip, deflate", 
    "Content-Length": "22", 
    "Host": "httpbin.org", 
    "User-Agent": "python-requests/2.27.1", 
    "X-Amzn-Trace-Id": "Root=1-6275a972-6dfe5d7a561151a72665922e"
  }, 
  "json": null, 
  "origin": "60.113.53.109", 
  "url": "https://httpbin.org/post"
}

マクロ展開されない原因

AbstractOperator(すべてのOperatorの親クラス)のrender_templateメソッドで、template_fieldsで指定されたパラメータを実際のパラメータに変換(render)しますが、この時、パラメータが文字列(str)の場合だけJinjaテンプレートとして読み込みます

(具体的には下の分岐です)

        if isinstance(value, str):
            if any(value.endswith(ext) for ext in self.template_ext):  # A filepath.
                template = jinja_env.get_template(value)
            else:
                template = jinja_env.from_string(value)
            dag = self.get_dag()
            if dag and dag.render_template_as_native_obj:
                return render_template_as_native(template, context)
            return render_template_to_string(template, context)

回避策2:pre_execute+encode

BaseOperator(全てのOperatorの親クラス)には、executeの直前フックできるpre_executeというメソッドがあります。このメソッドを利用することで

  1. マクロ(Jinjaテンプレート)を展開(pre_executeよりさらに前の処理)
  2. strからbytesに変換(pre_executeに記載)
  3. SimpleHttpOperatorの処理を実行(execute)

の順に処理を行い、マクロを展開しつつ日本語を使うことがで可能となります。

    # Task定義以外は省略
    class SimpleHttpOperator2(SimpleHttpOperator):
        def pre_execute(self, context):
            super().pre_execute(context)
            self.data = self.data.encode('utf-8')

    SimpleHttpOperator2(
        task_id='macro_post_with_hook',
        http_conn_id='',
        endpoint='https://httpbin.org/post',
        data='{{ run_id }} 日本語',
        log_response=True,        
    )

マクロ(Jinjaテンプレート)を展開しつつ、日本語を送ることができました。

[2022-05-06, 23:13:19 UTC] {http.py:129} INFO - Sending 'POST' to url: https://httpbin.org/post
[2022-05-06, 23:13:19 UTC] {http.py:106} INFO - {
  "args": {}, 
  "data": "manual__2022-05-06T23:04:19.514089+00:00 \u65e5\u672c\u8a9e", 
  "files": {}, 
  "form": {}, 
  "headers": {
    "Accept": "*/*", 
    "Accept-Encoding": "gzip, deflate", 
    "Content-Length": "50", 
    "Host": "httpbin.org", 
    "User-Agent": "python-requests/2.27.1", 
    "X-Amzn-Trace-Id": "Root=1-6275ab8d-30bf5e2a08357ade66145790"
  }, 
  "json": null, 
  "origin": "60.113.53.109", 
  "url": "https://httpbin.org/post"
}