python-gearmanの複数のGearmanServerの問題


============================================================================
オリジナル作品、転載許可.転載の際は必ず元の出典、および本声明をハイパーリンク形式で明記してください.
転入先:http://yunjianfei.iteye.com/blog/
============================================================================
 
問題の説明
 
1.複数のGearmanServerをオンにします.ポート番号はそれぞれ4730、4731です.
2.python-gearmanを使用してclientとworkerを開発し、複数のworker接続4730と4731の2つのサーバを開く
3.client GearmanServerに大量のタスクを送信
4.複数のworkerのうちの1つまたは複数は、1つまたは複数のタスクを受信した後、二度とタスクを受信しない
 
コード実装
注意:
コードを作成する前にpython-gearmanをインストールする必要があります.公式サイトのリンクは以下の通りです.
https://pypi.python.org/pypi/gearman/
 
1.ファイル名:test.bash
 
# test.bash
for i in {0..5}; do
    python gearman_client.py
    sleep 2
done

 
 
 2.ファイル名:gearman_client.py
 
# gearman_client.py
import multiprocessing
import gearman
import traceback

def start_gearman_client(process_id):
    gm_client = gearman.GearmanClient(['127.0.0.1:4730','127.0.0.1:4731'])
    try:
        requests = []
        for gm_job_id in range(500):
            request = gm_client.submit_job(
                    task='do_task',
                    data='%d_%03d' % (process_id,gm_job_id),
                    unique='%d_%03d' % (process_id,gm_job_id),
                    background=False,wait_until_complete=False)
            requests.append(request)
        gm_client.wait_until_jobs_completed(requests)
    except:
        print traceback.format_exc()
    return 0

def main():
    child_processes = []
    for process_id in range(2):
        p = multiprocessing.Process(target=start_gearman_client, args=(process_id,))
        child_processes.append((process_id,p))
        p.start()

    for (pid,child) in child_processes:
        print 'Confirming that child number %d had died' % pid
        child.join()

if __name__ == '__main__':
    main()

 
 
 3.ファイル名:gearman_worker.py
 
# gearman_worker.py
import gearman
import multiprocessing
import time
import traceback
from functools import partial

def do_work(gearman_worker,gearman_job,worker_id):
    try:
        print 'Worker %02d processing %s from port %d: %s' % (worker_id,gearman_job.data,gearman_job.connection.gearman_port,gearman_job.unique)
        time.sleep(0.001)
    except:
        print traceback.format_exc()
    return 'Done by worker %d through port %d' % (worker_id,gearman_job.connection.gearman_port)

def start_gearman_worker(worker_id):
    gm_worker = gearman.GearmanWorker(['127.0.0.1:4730','127.0.0.1:4731'])
    gm_worker.register_task('do_task', partial(do_work,worker_id=worker_id))
    print 'Worker %d start working' % worker_id
    gm_worker.work()

if __name__ == '__main__':
    workers = []
    for pid in range(8):
        worker = multiprocessing.Process(target=start_gearman_worker,args=(pid,))
        workers.append(worker)
        worker.start()
    for worker in workers:
        worker.join()

 
 
問題の再現
1.複数のGearmanServerをオンにします.ポート番号はそれぞれ4730、4731です.
2.コマンドpython gearman_の実行worker.py
3.スクリプトの実行/bin/bash test.bash
4.worker印刷出力のlogを見ると、logの末尾には一部のworkerだけがタスクを受信して実行しており、いくつかのworkerは1回のタスクだけを実行して二度と実行しないことがわかります.(注意:この状況が発生していない場合はtest.bashを何回か実行し、観察します)
 
ここでは、テスト結果の1つを貼ります.
テスト結果
Worker 02 processing 1_467 from port 4731: 1_467
Worker 04 processing 0_484 from port 4731: 0_484
Worker 05 processing 1_468 from port 4731: 1_468
Worker 02 processing 1_469 from port 4731: 1_469
Worker 04 processing 1_470 from port 4731: 1_470
Worker 05 processing 0_486 from port 4731: 0_486
Worker 02 processing 0_487 from port 4731: 0_487
Worker 04 processing 1_474 from port 4731: 1_474
Worker 05 processing 1_473 from port 4731: 1_473
Worker 02 processing 0_490 from port 4731: 0_490
Worker 05 processing 1_476 from port 4731: 1_476
Worker 04 processing 0_491 from port 4731: 0_491
Worker 02 processing 1_478 from port 4731: 1_478
Worker 05 processing 0_492 from port 4731: 0_492
Worker 04 processing 1_479 from port 4731: 1_479
Worker 02 processing 1_484 from port 4731: 1_484
Worker 05 processing 1_485 from port 4731: 1_485
Worker 04 processing 0_497 from port 4731: 0_497
Worker 02 processing 0_498 from port 4731: 0_498
Worker 05 processing 1_489 from port 4731: 1_489
Worker 02 processing 1_492 from port 4731: 1_492
Worker 04 processing 1_493 from port 4731: 1_493
Worker 05 processing 1_495 from port 4731: 1_495
Worker 02 processing 1_498 from port 4731: 1_498
Worker 04 processing 1_499 from port 4731: 1_499
 後ろにはworker 2、4、5だけがタスクを実行しており、他のworkerは動作していないことがわかります.
 
問題の分析と解決
問題分析を開始する前に、GearmanServerとclient、workerの間の大まかなワークフローについて理解しておきます.
python-gearman之多个GearmanServer问题_第1张图片 1.client JobをGearmanServerにコミット
2.GearmanServerはすべてのSleepingのworkerを探し出す
3.GearmanServerはこれらのSleepingのworkerに「noop」コマンドを送信し、workerを起動する
4.workerがGearmanServerにGrab_を送信ジョブのコマンド取得ジョブ
 
上記の手順に従って、workerがJobを取得するには、まずGearmanServerから送信される「NOOP」コマンドを受けなければならないことがわかります.では、workerはタスクを実行しません.以下のいくつかの状況にすぎません.
1.workerとGearmanServerの接続が切断されました
2.GearmanServerでは何らかの理由ですべてのworkerに「NOOP」を送信していない
 
この2点もコードで検証できます.python-gearmanに印刷出力を追加し、workerとGearmanServerの接続が切断されていないことを確認できます.では、GearmanServerがすべてのworkerにNOOPを送信していないためです.
 
これでは、次の2つの解決策があります.
1.GearmanServer(C実装)を参照し、ソースコードを修正し、すべてのworkerに対してNOOPを送信する
2.python-gearmanのコードを変更し、すべてのworkerがGearmanServerからJobを積極的に取得できるようにする
 
第1のシナリオでは、明らかにコストが高く、リスクが大きく、より大きなバグを引き起こす可能性があり、明らかに望ましくない.だから私たちは第2の案を選んで、workerに自発的にGearmanServerに行ってJobを引いて、死などの「NOOP」ではありません.
 
第2の案はどのように実現しますか?ここでは詳しく説明しませんが、すべてコードの中にあります.以下はgithubで発表したソースコードです.python-gearmanの公式バージョンに基づいて修正されました.リンクは以下の通りです.
https://github.com/yunjianfei/python-gearman
 
主な特徴は次のとおりです.
1.TCP keepalive特性が入っている
2.epollを採用し、効率が高い
3.workerはアクティブにタスクをキャプチャする機能を加え、複数のserverからタスクを取得できることを保証する
 
皆さんが積極的に使ってほしいです.バグを見つけたら、連絡してください.ありがとうございます.