airflowよくある質問のまとめ

11503 ワード

AirFlow FAQの概要
airflowのよくある問題の調査記録は以下の通りです.
1,airflowはどのようにunpause大量のdag任務を量産します
通常の少量のタスクは、コマンドairflow unpause dag_idコマンドで起動したり、webインタフェースで起動ボタンをクリックしたりすることで実現できますが、タスクが多すぎると、タスクごとに起動するのは面倒です.実はdag情報はデータベースに格納されており、大量にデータベース情報を変更することで、大量にdagタスクを起動する効果を達成することができます.mysqlをsql_alchemy_connとして使用する場合は、airflowデータベースにログインし、テーブルdagのis_を更新するだけです.pausedフィールドが0の場合、dagタスクが開始されます.
例:update dag set is_paused = 0 where dag_id like "benchmark%";2,airflowのschedulerプロセスは1つのタスクを実行した後に偽死状態に保留する
この場合、schedulerスケジューラがタスクを生成したが、パブリッシュできないのが一般的です.ログにはエラーメッセージはありません.
Borker接続依存ライブラリがインストールされていないためかもしれません.redisがbrokerであればpip install apache‐airflow[redis]、rabbitmqがbrokerであればpip install apache-airflow[rabbitmq]、schedulerノードがrabbitmqに正常にアクセスできるかどうかを確認します.
3、定義されたdagファイルが多すぎる場合、airflowのschedulerノードの実行効率は遅い
airflowのschedulerのデフォルトは2つのスレッドで、プロファイルairflow.cfgを変更することで改善できます.
[scheduler]
# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run.
#   2    100
max_threads = 100

4,airflowログレベルの変更
$ vi airflow.cfg
[core]
#logging_level = INFO
logging_level = WARNING

NOTSET < DEBUG < INFO < WARNING < ERROR < CRITICAL
logのレベルをINFOに設定すると、INFOレベル以下のログは出力されず、INFOレベル以上のログは出力されます.つまり、ログレベルが高いほど、印刷されるログは詳細ではありません.デフォルトのログ・レベルはWARNINGです.
注意:logging_levelWARNING以上のレベルに変更すると、ログだけでなく、コマンドライン出力の詳細も同様に影響を受け、指定したレベル以上の情報しか出力されません.したがって、コマンドライン出力情報が不完全で、システムにエラーログ出力がない場合は、ログレベルが高すぎるためです.
5,AirFlow: jinja2.exceptions.TemplateNotFound
これはairflowがjinja 2をテンプレートエンジンとして使用したためのトラップであり、bashコマンドを使用する場合、末尾にスペースを追加する必要があります.
  • Described here : see below. You need to add a space after the script name in cases where you are directly calling a bash scripts in the  bash_command  attribute of  BashOperator  - this is because the Airflow tries to apply a Jinja template to it, which will fail.
  • t2 = BashOperator(
    task_id='sleep',
    bash_command="/home/batcher/test.sh", // This fails with `Jinja template not found` error
    #bash_command="/home/batcher/test.sh ", // This works (has a space after)
    dag=dag)

    参照リンク:
    https://stackoverflow.com/questions/42147514/templatenotfound-error-when-running-simple-airflow-bashoperator
    https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls
    6,AirFlow: Task is not able to be run
    タスクがしばらく実行されると突然実行できなくなり、バックグラウンドワークログには次のプロンプトが表示されます.
    [2018-05-25 17:22:05,068] {jobs.py:2508} INFO - Task is not able to be run

    タスクに対応する実行ログを表示します.
    cat /home/py/airflow-home/logs/testBashOperator/print_date/2018-05-25T00:00:00/6.log
    ...
    [2018-05-25 17:22:05,067] {models.py:1190} INFO - Dependencies not met for , 
    dependency 'Task Instance State' FAILED: Task is in the 'success' state which is not a valid state for execution. The task must be cleared in order to be run.

    エラー・プロンプトに基づいて、依存タスクのステータスが失敗したことを説明します.この場合、2つの解決策があります.
  • airflow runを使用してtaskを実行する場合は、依存task:
    $ airflow run -A dag_id task_id execution_date
  • を無視することを指定します.
  • コマンドairflow clear dag_を使用idタスククリーンアップ:
    $ airflow clear -u testBashOperator
  • 7,CELERY: PRECONDITION_FAILED - inequivalent arg 'x-expires' for queue '[email protected]' in vhost ''
    celery 4をアップグレードします.x以降rabbitmqを使用してbroker実行タスクに次の異常を放出します.
    [2018-06-29 09:32:14,622: CRITICAL/MainProcess] Unrecoverable error: PreconditionFailed(406, "PRECONDITION_FAILED - inequivalent arg 'x-expires' for queue 'celery@PQ
    SZ-L01395.celery.pidbox' in vhost '/': received the value '10000' of type 'signedint' but current is none", (50, 10), 'Queue.declare')
    Traceback (most recent call last):
      File "c:\programdata\anaconda3\lib\site-packages\celery\worker\worker.py", line 205, in start
        self.blueprint.start(self)
    .......
      File "c:\programdata\anaconda3\lib\site-packages\amqp\channel.py", line 277, in _on_close
        reply_code, reply_text, (class_id, method_id), ChannelError,
    amqp.exceptions.PreconditionFailed: Queue.declare: (406) PRECONDITION_FAILED - inequivalent arg 'x-expires' for queue '[email protected]' in vhost '/'
    : received the value '10000' of type 'signedint' but current is none

    このエラーの原因は、rabbitmqのクライアントとサービス側のパラメータが一致しないため、パラメータを一致させることが一般的です.
    たとえば、x-expiresに対応するceleryの構成はcontrol_であることを示します.queue_expires.したがって、コンフィギュレーションファイルにcontrol_を追加するだけです.queue_expires=Noneでいいです.
    celery 3.xにはこの2つの構成はなく、4.xでは、この2つの構成の一貫性を保証する必要があります.そうしないと、上記のような異常が放出されます.
    ここで出会った2つのrabbitmqの構成とcelery構成のマッピング関係は次の表の通りです.
    rabbitmq
    celery4.x
    x-expires
    control_queue_expires
    x-message-ttl
    control_queue_ttl
    8,CELERY: The AMQP result backend is scheduled for deprecation in version 4.0 and removal in version v5.0.Please use RPC backend or a persistent backend
    Celeryを4にアップグレードx以降の運転では、次の異常が放出されます.
    /anaconda/anaconda3/lib/python3.6/site-packages/celery/backends/amqp.py:67: CPendingDeprecationWarning: 
        The AMQP result backend is scheduled for deprecation in     version 4.0 and removal in version v5.0.     Please use RPC backend or a persistent backend.
      alternative='Please use RPC backend or a persistent backend.')

    原因解析:celery 4.0でrabbitmq構成result_backbend方式が変わりました:以前はbrokerと同じでした:result_backend = 'amqp://guest:guest@localhost:5672/'rpc構成:result_backend = 'rpc://'
    参照リンク:http://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-event_queue_prefix
    9,CELERY: ValueError('not enough values to unpack (expected 3, got 0)',)
    Windowsでcelery 4を実行する.x次のエラーが発生しました.
    [2018-07-02 10:54:17,516: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)',)
    Traceback (most recent call last):
        ......
        tasks, accept, hostname = _loc
    ValueError: not enough values to unpack (expected 3, got 0)
    

    celery 4.x Windowsプラットフォームはしばらくサポートされていません.デバッグの目的であれば、celeryのスレッドプールを置き換えることで、Windowsプラットフォームで実行する目的を達成することができます.
    pip install eventlet
    celery -A  worker -l info -P eventlet

    参照リンク:
    https://stackoverflow.com/questions/45744992/celery-raises-valueerror-not-enough-values-to-unpack
    https://blog.csdn.net/qq_30242609/article/details/79047660
    10,Airflow: ERROR - 'DisabledBackend' object has no attribute '_get_task_meta_for'
    airflow運転中に以下の異常を放出します.
    Traceback (most recent call last):
      File "/anaconda/anaconda3/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 83, in sync
    ......
        return self._maybe_set_cache(self.backend.get_task_meta(self.id))
      File "/anaconda/anaconda3/lib/python3.6/site-packages/celery/backends/base.py", line 307, in get_task_meta
        meta = self._get_task_meta_for(task_id)
    AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'
    [2018-07-04 10:52:14,746] {celery_executor.py:101} ERROR - Error syncing the celery executor, ignoring it:
    [2018-07-04 10:52:14,746] {celery_executor.py:102} ERROR - 'DisabledBackend' object has no attribute '_get_task_meta_for'

    このエラーには2つの可能性があります.
  • CELERY_RESULT_BACKENDプロパティに構成または構成エラーはありません.
  • celeryバージョンは低すぎます.例えばairflow 1.9.0はcelery 4を使用します.xなので、celeryバージョンをチェックし、バージョンの互換性を維持します.

  • 11,airflow.exceptions.AirflowException dag_id could not be found xxxx. Either the dag did not exist or it failed to parse
    ワークログの表示airflow-worker.err
    airflow.exceptions.AirflowException: dag_id could not be found: bmhttp. Either the dag did not exist or it failed to parse.
    [2018-07-31 17:37:34,191: ERROR/ForkPoolWorker-6] Task airflow.executors.celery_executor.execute_command[181c78d0-242c-4265-aabe-11d04887f44a] raised unexpected: AirflowException('Celery command failed',)
    Traceback (most recent call last):
      File "/anaconda/anaconda3/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 52, in execute_command
        subprocess.check_call(command, shell=True)
      File "/anaconda/anaconda3/lib/python3.6/subprocess.py", line 291, in check_call
        raise CalledProcessError(retcode, cmd)
    subprocess.CalledProcessError: Command 'airflow run bmhttp get_op1 2018-07-26T06:28:00 --local -sd /home/ignite/airflow/dags/BenchMark01.py' returned non-zero exit status 1.

    例外ログのCommand情報により、スケジューリングノードは、タスクメッセージを生成する際に実行するスクリプトのパス(dsパラメータで指定)も指定していることがわかります.すなわち、スケジューリングノード(scheduler)とワークノード(worker)に対応するdag脚本を同じパスの下に置く必要があります.そうしないと、以上のエラーが発生します.
    参照リンク:
    https://stackoverflow.com/questions/43235130/airflow-dag-id-could-not-be-found
    12,airlfowのREST API呼び出しはAirflow 404 = lots of circlesを返す
    このエラーの原因は、URLにoriginパラメータが提供されていないためであり、このパラメータはリダイレクトのために使用され、例えばairflowの/runインタフェースが呼び出され、使用可能な例は以下の通りである.
    http://localhost:8080/admin/airflow/run?dag_id=example_hello_world_dag&task_id=sleep_task&execution_date=20180807&ignore_all_deps=true&origin=/admin
    13,airflow remote worker log hostname問題workerノードがwebserverと同じマシンに配備されていない場合、webserverからworkerノードログを表示すると、次のエラーが発生します.
    *** Log file isn't local. *** Fetching here: http://kaimanas.serveriai.lt:8793/log/.../1.log *** Failed to fetch log file from worker. HTTPConnectionPool(host='kaimanas.serveriai.lt', port=8793): Max retries exceeded with url:/log/.../1.log (Caused by NewConnectionError(': Failed to establish a new connection: [Errno 111] Connection refused',)) kaimanas.serveriai.lt(または他)は、workerノードのhostnameではありません.airflowソースnet.を参照してください.py:
    def get_hostname():     """    Fetch the hostname using the callable from the config or using     `socket.getfqdn` as a fallback.     """    # First we attempt to fetch the callable path from the config.     try:         callable_path = conf.get('core', 'hostname_callable')     except AirflowConfigException:         callable_path = None
        # Then we handle the case when the config is missing or empty. This is the     # default behavior.     if not callable_path:         return socket.getfqdn()
        # Since we have a callable path, we try to import and run it next.     module_path, attr_name = callable_path.split(':')     module = importlib.import_module(module_path)callable=getattr(module,attr_name)return callable()はsocketを呼び出す.getfqdn()は、次のように機能します.
    def getfqdn(name):「」.nameに対応する完全に合格したドメイン名を返します.nameが無視されるとローカルホストとして解釈されます.合格したドメイン名を見つけるためにgethostbyaddr()をチェックします.返されるホスト名と、それに伴う別名.使用可能な場合は、最初の名前が選択されます.適格なドメイン名が1つも使用できない場合、gethostname()の戻り値を戻り値「」とします.これにより、workerノードの/etc/hostsのhostnameマッピングを構成し、workerノードのipをホストのhostnameにマッピングすることができます.
    10.xxx.xxx.xxx hostnameこの場合、webserverはそのhostnameでworkerノードのログを要求します.
    http://hostname.lt:8793/log/...