airflowよくある質問のまとめ
AirFlow FAQの概要
airflowのよくある問題の調査記録は以下の通りです.
1,airflowはどのように
通常の少量のタスクは、コマンド
例:
この場合、schedulerスケジューラがタスクを生成したが、パブリッシュできないのが一般的です.ログにはエラーメッセージはありません.
Borker接続依存ライブラリがインストールされていないためかもしれません.redisがbrokerであれば
3、定義された
airflowのschedulerのデフォルトは2つのスレッドで、プロファイル
4,airflowログレベルの変更
NOTSET < DEBUG < INFO < WARNING < ERROR < CRITICAL
logのレベルをINFOに設定すると、INFOレベル以下のログは出力されず、INFOレベル以上のログは出力されます.つまり、ログレベルが高いほど、印刷されるログは詳細ではありません.デフォルトのログ・レベルはWARNINGです.
注意:
5,AirFlow: jinja2.exceptions.TemplateNotFound
これはairflowがjinja 2をテンプレートエンジンとして使用したためのトラップであり、bashコマンドを使用する場合、末尾にスペースを追加する必要があります. Described here : see below. You need to add a space after the script name in cases where you are directly calling a bash scripts in the
参照リンク:
https://stackoverflow.com/questions/42147514/templatenotfound-error-when-running-simple-airflow-bashoperator
https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls
6,AirFlow: Task is not able to be run
タスクがしばらく実行されると突然実行できなくなり、バックグラウンドワークログには次のプロンプトが表示されます.
タスクに対応する実行ログを表示します.
エラー・プロンプトに基づいて、依存タスクのステータスが失敗したことを説明します.この場合、2つの解決策があります. airflow runを使用してtaskを実行する場合は、依存task: を無視することを指定します.コマンドairflow clear dag_を使用idタスククリーンアップ: 7,CELERY: PRECONDITION_FAILED - inequivalent arg 'x-expires' for queue '[email protected]' in vhost ''
celery 4をアップグレードします.x以降rabbitmqを使用してbroker実行タスクに次の異常を放出します.
このエラーの原因は、rabbitmqのクライアントとサービス側のパラメータが一致しないため、パラメータを一致させることが一般的です.
たとえば、x-expiresに対応するceleryの構成はcontrol_であることを示します.queue_expires.したがって、コンフィギュレーションファイルにcontrol_を追加するだけです.queue_expires=Noneでいいです.
celery 3.xにはこの2つの構成はなく、4.xでは、この2つの構成の一貫性を保証する必要があります.そうしないと、上記のような異常が放出されます.
ここで出会った2つのrabbitmqの構成とcelery構成のマッピング関係は次の表の通りです.
rabbitmq
celery4.x
x-expires
control_queue_expires
x-message-ttl
control_queue_ttl
8,CELERY: The AMQP result backend is scheduled for deprecation in version 4.0 and removal in version v5.0.Please use RPC backend or a persistent backend
Celeryを4にアップグレードx以降の運転では、次の異常が放出されます.
原因解析:celery 4.0でrabbitmq構成result_backbend方式が変わりました:以前はbrokerと同じでした:result_backend = 'amqp://guest:guest@localhost:5672/'rpc構成:result_backend = 'rpc://'
参照リンク:http://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-event_queue_prefix
9,CELERY: ValueError('not enough values to unpack (expected 3, got 0)',)
Windowsでcelery 4を実行する.x次のエラーが発生しました.
celery 4.x Windowsプラットフォームはしばらくサポートされていません.デバッグの目的であれば、celeryのスレッドプールを置き換えることで、Windowsプラットフォームで実行する目的を達成することができます.
参照リンク:
https://stackoverflow.com/questions/45744992/celery-raises-valueerror-not-enough-values-to-unpack
https://blog.csdn.net/qq_30242609/article/details/79047660
10,Airflow: ERROR - 'DisabledBackend' object has no attribute '_get_task_meta_for'
airflow運転中に以下の異常を放出します.
このエラーには2つの可能性があります. CELERY_RESULT_BACKENDプロパティに構成または構成エラーはありません. celeryバージョンは低すぎます.例えばairflow 1.9.0はcelery 4を使用します.xなので、celeryバージョンをチェックし、バージョンの互換性を維持します.
11,airflow.exceptions.AirflowException dag_id could not be found xxxx. Either the dag did not exist or it failed to parse
ワークログの表示
例外ログの
参照リンク:
https://stackoverflow.com/questions/43235130/airflow-dag-id-could-not-be-found
12,airlfowのREST API呼び出しは
このエラーの原因は、URLに
http://localhost:8080/admin/airflow/run?dag_id=example_hello_world_dag&task_id=sleep_task&execution_date=20180807&ignore_all_deps=true&origin=/admin
13,airflow remote worker log hostname問題workerノードがwebserverと同じマシンに配備されていない場合、webserverからworkerノードログを表示すると、次のエラーが発生します.
*** Log file isn't local. *** Fetching here: http://kaimanas.serveriai.lt:8793/log/.../1.log *** Failed to fetch log file from worker. HTTPConnectionPool(host='kaimanas.serveriai.lt', port=8793): Max retries exceeded with url:/log/.../1.log (Caused by NewConnectionError(': Failed to establish a new connection: [Errno 111] Connection refused',)) kaimanas.serveriai.lt(または他)は、workerノードのhostnameではありません.airflowソースnet.を参照してください.py:
def get_hostname(): """ Fetch the hostname using the callable from the config or using `socket.getfqdn` as a fallback. """ # First we attempt to fetch the callable path from the config. try: callable_path = conf.get('core', 'hostname_callable') except AirflowConfigException: callable_path = None
# Then we handle the case when the config is missing or empty. This is the # default behavior. if not callable_path: return socket.getfqdn()
# Since we have a callable path, we try to import and run it next. module_path, attr_name = callable_path.split(':') module = importlib.import_module(module_path)callable=getattr(module,attr_name)return callable()はsocketを呼び出す.getfqdn()は、次のように機能します.
def getfqdn(name):「」.nameに対応する完全に合格したドメイン名を返します.nameが無視されるとローカルホストとして解釈されます.合格したドメイン名を見つけるためにgethostbyaddr()をチェックします.返されるホスト名と、それに伴う別名.使用可能な場合は、最初の名前が選択されます.適格なドメイン名が1つも使用できない場合、gethostname()の戻り値を戻り値「」とします.これにより、workerノードの/etc/hostsのhostnameマッピングを構成し、workerノードのipをホストのhostnameにマッピングすることができます.
10.xxx.xxx.xxx hostnameこの場合、webserverはそのhostnameでworkerノードのログを要求します.
http://hostname.lt:8793/log/...
airflowのよくある問題の調査記録は以下の通りです.
1,airflowはどのように
unpause
大量のdag任務を量産します通常の少量のタスクは、コマンド
airflow unpause dag_id
コマンドで起動したり、webインタフェースで起動ボタンをクリックしたりすることで実現できますが、タスクが多すぎると、タスクごとに起動するのは面倒です.実はdag情報はデータベースに格納されており、大量にデータベース情報を変更することで、大量にdagタスクを起動する効果を達成することができます.mysqlをsql_alchemy_conn
として使用する場合は、airflowデータベースにログインし、テーブルdagのis_を更新するだけです.pausedフィールドが0の場合、dagタスクが開始されます.例:
update dag set is_paused = 0 where dag_id like "benchmark%";
2,airflowのschedulerプロセスは1つのタスクを実行した後に偽死状態に保留するこの場合、schedulerスケジューラがタスクを生成したが、パブリッシュできないのが一般的です.ログにはエラーメッセージはありません.
Borker接続依存ライブラリがインストールされていないためかもしれません.redisがbrokerであれば
pip install apache‐airflow[redis]
、rabbitmqがbrokerであればpip install apache-airflow[rabbitmq]
、schedulerノードがrabbitmqに正常にアクセスできるかどうかを確認します.3、定義された
dag
ファイルが多すぎる場合、airflowのschedulerノードの実行効率は遅いairflowのschedulerのデフォルトは2つのスレッドで、プロファイル
airflow.cfg
を変更することで改善できます.[scheduler]
# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run.
# 2 100
max_threads = 100
4,airflowログレベルの変更
$ vi airflow.cfg
[core]
#logging_level = INFO
logging_level = WARNING
NOTSET < DEBUG < INFO < WARNING < ERROR < CRITICAL
logのレベルをINFOに設定すると、INFOレベル以下のログは出力されず、INFOレベル以上のログは出力されます.つまり、ログレベルが高いほど、印刷されるログは詳細ではありません.デフォルトのログ・レベルはWARNINGです.
注意:
logging_level
をWARNING
以上のレベルに変更すると、ログだけでなく、コマンドライン出力の詳細も同様に影響を受け、指定したレベル以上の情報しか出力されません.したがって、コマンドライン出力情報が不完全で、システムにエラーログ出力がない場合は、ログレベルが高すぎるためです.5,AirFlow: jinja2.exceptions.TemplateNotFound
これはairflowがjinja 2をテンプレートエンジンとして使用したためのトラップであり、bashコマンドを使用する場合、末尾にスペースを追加する必要があります.
bash_command
attribute of BashOperator
- this is because the Airflow tries to apply a Jinja template to it, which will fail. t2 = BashOperator(
task_id='sleep',
bash_command="/home/batcher/test.sh", // This fails with `Jinja template not found` error
#bash_command="/home/batcher/test.sh ", // This works (has a space after)
dag=dag)
参照リンク:
https://stackoverflow.com/questions/42147514/templatenotfound-error-when-running-simple-airflow-bashoperator
https://cwiki.apache.org/confluence/display/AIRFLOW/Common+Pitfalls
6,AirFlow: Task is not able to be run
タスクがしばらく実行されると突然実行できなくなり、バックグラウンドワークログには次のプロンプトが表示されます.
[2018-05-25 17:22:05,068] {jobs.py:2508} INFO - Task is not able to be run
タスクに対応する実行ログを表示します.
cat /home/py/airflow-home/logs/testBashOperator/print_date/2018-05-25T00:00:00/6.log
...
[2018-05-25 17:22:05,067] {models.py:1190} INFO - Dependencies not met for ,
dependency 'Task Instance State' FAILED: Task is in the 'success' state which is not a valid state for execution. The task must be cleared in order to be run.
エラー・プロンプトに基づいて、依存タスクのステータスが失敗したことを説明します.この場合、2つの解決策があります.
$ airflow run -A dag_id task_id execution_date
$ airflow clear -u testBashOperator
celery 4をアップグレードします.x以降rabbitmqを使用してbroker実行タスクに次の異常を放出します.
[2018-06-29 09:32:14,622: CRITICAL/MainProcess] Unrecoverable error: PreconditionFailed(406, "PRECONDITION_FAILED - inequivalent arg 'x-expires' for queue 'celery@PQ
SZ-L01395.celery.pidbox' in vhost '/': received the value '10000' of type 'signedint' but current is none", (50, 10), 'Queue.declare')
Traceback (most recent call last):
File "c:\programdata\anaconda3\lib\site-packages\celery\worker\worker.py", line 205, in start
self.blueprint.start(self)
.......
File "c:\programdata\anaconda3\lib\site-packages\amqp\channel.py", line 277, in _on_close
reply_code, reply_text, (class_id, method_id), ChannelError,
amqp.exceptions.PreconditionFailed: Queue.declare: (406) PRECONDITION_FAILED - inequivalent arg 'x-expires' for queue '[email protected]' in vhost '/'
: received the value '10000' of type 'signedint' but current is none
このエラーの原因は、rabbitmqのクライアントとサービス側のパラメータが一致しないため、パラメータを一致させることが一般的です.
たとえば、x-expiresに対応するceleryの構成はcontrol_であることを示します.queue_expires.したがって、コンフィギュレーションファイルにcontrol_を追加するだけです.queue_expires=Noneでいいです.
celery 3.xにはこの2つの構成はなく、4.xでは、この2つの構成の一貫性を保証する必要があります.そうしないと、上記のような異常が放出されます.
ここで出会った2つのrabbitmqの構成とcelery構成のマッピング関係は次の表の通りです.
rabbitmq
celery4.x
x-expires
control_queue_expires
x-message-ttl
control_queue_ttl
8,CELERY: The AMQP result backend is scheduled for deprecation in version 4.0 and removal in version v5.0.Please use RPC backend or a persistent backend
Celeryを4にアップグレードx以降の運転では、次の異常が放出されます.
/anaconda/anaconda3/lib/python3.6/site-packages/celery/backends/amqp.py:67: CPendingDeprecationWarning:
The AMQP result backend is scheduled for deprecation in version 4.0 and removal in version v5.0. Please use RPC backend or a persistent backend.
alternative='Please use RPC backend or a persistent backend.')
原因解析:celery 4.0でrabbitmq構成result_backbend方式が変わりました:以前はbrokerと同じでした:result_backend = 'amqp://guest:guest@localhost:5672/'rpc構成:result_backend = 'rpc://'
参照リンク:http://docs.celeryproject.org/en/latest/userguide/configuration.html#std:setting-event_queue_prefix
9,CELERY: ValueError('not enough values to unpack (expected 3, got 0)',)
Windowsでcelery 4を実行する.x次のエラーが発生しました.
[2018-07-02 10:54:17,516: ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)',)
Traceback (most recent call last):
......
tasks, accept, hostname = _loc
ValueError: not enough values to unpack (expected 3, got 0)
celery 4.x Windowsプラットフォームはしばらくサポートされていません.デバッグの目的であれば、celeryのスレッドプールを置き換えることで、Windowsプラットフォームで実行する目的を達成することができます.
pip install eventlet
celery -A worker -l info -P eventlet
参照リンク:
https://stackoverflow.com/questions/45744992/celery-raises-valueerror-not-enough-values-to-unpack
https://blog.csdn.net/qq_30242609/article/details/79047660
10,Airflow: ERROR - 'DisabledBackend' object has no attribute '_get_task_meta_for'
airflow運転中に以下の異常を放出します.
Traceback (most recent call last):
File "/anaconda/anaconda3/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 83, in sync
......
return self._maybe_set_cache(self.backend.get_task_meta(self.id))
File "/anaconda/anaconda3/lib/python3.6/site-packages/celery/backends/base.py", line 307, in get_task_meta
meta = self._get_task_meta_for(task_id)
AttributeError: 'DisabledBackend' object has no attribute '_get_task_meta_for'
[2018-07-04 10:52:14,746] {celery_executor.py:101} ERROR - Error syncing the celery executor, ignoring it:
[2018-07-04 10:52:14,746] {celery_executor.py:102} ERROR - 'DisabledBackend' object has no attribute '_get_task_meta_for'
このエラーには2つの可能性があります.
11,airflow.exceptions.AirflowException dag_id could not be found xxxx. Either the dag did not exist or it failed to parse
ワークログの表示
airflow-worker.err
airflow.exceptions.AirflowException: dag_id could not be found: bmhttp. Either the dag did not exist or it failed to parse.
[2018-07-31 17:37:34,191: ERROR/ForkPoolWorker-6] Task airflow.executors.celery_executor.execute_command[181c78d0-242c-4265-aabe-11d04887f44a] raised unexpected: AirflowException('Celery command failed',)
Traceback (most recent call last):
File "/anaconda/anaconda3/lib/python3.6/site-packages/airflow/executors/celery_executor.py", line 52, in execute_command
subprocess.check_call(command, shell=True)
File "/anaconda/anaconda3/lib/python3.6/subprocess.py", line 291, in check_call
raise CalledProcessError(retcode, cmd)
subprocess.CalledProcessError: Command 'airflow run bmhttp get_op1 2018-07-26T06:28:00 --local -sd /home/ignite/airflow/dags/BenchMark01.py' returned non-zero exit status 1.
例外ログの
Command
情報により、スケジューリングノードは、タスクメッセージを生成する際に実行するスクリプトのパス(dsパラメータで指定)も指定していることがわかります.すなわち、スケジューリングノード(scheduler)とワークノード(worker)に対応するdag脚本を同じパスの下に置く必要があります.そうしないと、以上のエラーが発生します.参照リンク:
https://stackoverflow.com/questions/43235130/airflow-dag-id-could-not-be-found
12,airlfowのREST API呼び出しは
Airflow 404 = lots of circles
を返すこのエラーの原因は、URLに
origin
パラメータが提供されていないためであり、このパラメータはリダイレクトのために使用され、例えばairflowの/run
インタフェースが呼び出され、使用可能な例は以下の通りである.http://localhost:8080/admin/airflow/run?dag_id=example_hello_world_dag&task_id=sleep_task&execution_date=20180807&ignore_all_deps=true&origin=/admin
13,airflow remote worker log hostname問題workerノードがwebserverと同じマシンに配備されていない場合、webserverからworkerノードログを表示すると、次のエラーが発生します.
*** Log file isn't local. *** Fetching here: http://kaimanas.serveriai.lt:8793/log/.../1.log *** Failed to fetch log file from worker. HTTPConnectionPool(host='kaimanas.serveriai.lt', port=8793): Max retries exceeded with url:/log/.../1.log (Caused by NewConnectionError(': Failed to establish a new connection: [Errno 111] Connection refused',)) kaimanas.serveriai.lt(または他)は、workerノードのhostnameではありません.airflowソースnet.を参照してください.py:
def get_hostname(): """ Fetch the hostname using the callable from the config or using `socket.getfqdn` as a fallback. """ # First we attempt to fetch the callable path from the config. try: callable_path = conf.get('core', 'hostname_callable') except AirflowConfigException: callable_path = None
# Then we handle the case when the config is missing or empty. This is the # default behavior. if not callable_path: return socket.getfqdn()
# Since we have a callable path, we try to import and run it next. module_path, attr_name = callable_path.split(':') module = importlib.import_module(module_path)callable=getattr(module,attr_name)return callable()はsocketを呼び出す.getfqdn()は、次のように機能します.
def getfqdn(name):「」.nameに対応する完全に合格したドメイン名を返します.nameが無視されるとローカルホストとして解釈されます.合格したドメイン名を見つけるためにgethostbyaddr()をチェックします.返されるホスト名と、それに伴う別名.使用可能な場合は、最初の名前が選択されます.適格なドメイン名が1つも使用できない場合、gethostname()の戻り値を戻り値「」とします.これにより、workerノードの/etc/hostsのhostnameマッピングを構成し、workerノードのipをホストのhostnameにマッピングすることができます.
10.xxx.xxx.xxx hostnameこの場合、webserverはそのhostnameでworkerノードのログを要求します.
http://hostname.lt:8793/log/...