細かく見るあなたのコード:非同期メッセージ処理

2790 ワード

仕事中に非同期メッセージ処理が必要なビジネスシーンに遭遇することが多く,メッセージの性質によって全く異なる処理方式がある.
1、メッセージが独立していない
独立していないメッセージは通常、順序依存関係であり、メッセージ処理メカニズムは線形キュー処理モードに劣化し、1人の消費者がメッセージを単一スレッドで処理するしかない.
2、メッセージが完全に独立
完全に独立したメッセージは、複数の消費者(スレッド)が同時に処理することができ、最大の同時処理能力を達成することができる.
3、メッセージが完全に独立していない
通常、この場合、同源メッセージ(同じ生産者から)は、異種メッセージの順序に関係なく秩序化されることが要求される.
このシーンのメッセージ処理は比較的複雑であり,同源メッセージの秩序を保証するために,同一ソースのメッセージに固定された消費者スレッドをバインドすることを容易に考えることができ,これは簡単であるが大きな問題がある.
生産者の数が大きい場合、バインドスレッドの数が足りない可能性があります.もちろん、スレッドリソースを多重化し、同じスレッドで複数のメッセージソースをバインドして処理することができます.これにより、メッセージソース間の相互影響というもう一つの問題があります.
次のシーンを考慮します.
生産者P 1が大量のメッセージを生成してキューに入った後に消費スレッドC 1処理に割り当てられる(C 1は処理に時間がかかる場合がある)と、生産者P 2はメッセージを生成し、不幸にも消費スレッドC 1処理にも割り当てられる
すると、生産者P 2のメッセージ処理は、P 1の大量のメッセージによってブロックされ、P 1とP 2との間の相互影響を招き、他の消費スレッドを十分に利用することができず、不均衡を招く.
だから、私たちはこのような問題を避けることを考えなければなりません.消費処理のタイムリー性(できるだけ早く)、隔離性(相互干渉を避ける)、均衡性(同時処理を最大化)を実現する
実装では、スレッド配布モデル(PUSH方式)と考えやすい2つのモードがあり、具体的な方法は通常以下の通りである.
1.ポーリング・キューがメッセージを取り出すグローバル・メッセージ・ディスパッチがあります.
2.メッセージソースに基づいて、適切な消費スレッド処理に送信する.
配布のアルゴリズムメカニズムは、メッセージソースに基づくHashのように簡単であり、複雑であれば、各消費スレッドの現在の負荷に基づいて、待ち行列の長さ、メッセージの複雑さを総合的に分析して配布を選択することができる.
簡単なHashは上記のシーン記述の問題に直面するに違いないが,複雑な配布計算は明らかに面倒で複雑であり,効率も必ずしも良くなく,均衡性の面でも十分なバランスをとることは難しい.
第2のモードはPULL方式を採用し、スレッドは必要に応じて引き取り、具体的なやり方は以下の通りである.
1.メッセージ・ソースは、生成されたメッセージを、ソースに対する一時的なキューに直接入れ(以下に示すように、各セッションは異なるメッセージ・ソースを表す)、sessionをブロックされたキュー通知スレッド処理に配置する
2.複数の消費スレッドが同時にキューをポーリングし、メッセージを争う(1つのスレッドだけが取得されることを保証する
3.キュー・インジケータが他のスレッドによって処理されているかどうかを確認します(実装時にはスレッド・レベルでソース・メッセージに基づく検出同期が必要です)
4.他のスレッドによって処理されていない場合は、同期領域設定処理でステータスを示し、同期領域を終了した後、一時キュー内のメッセージを処理する
5.処理が完了した後、最後に再度同期ゾーン処理指示状態がアイドル状態に入る
次に、消費スレッド処理の流れをコードで説明します.
public void run() {
	try {
		for (AbstractSession s = squeue.take(); s != null; s = squeue.take()) {					
			// first check any worker is processing this session? 
                        // if any other worker thread is processing this event with same session, just ignore it.
			synchronized (s) {
				if (!s.isEventProcessing()) {
					s.setEventProcessing(true);
				} else {
					continue;
				}
			}
					
			// fire events with same session
			fire(s);
					
			// last reset processing flag and quit current thread processing
			s.setEventProcessing(false);
					
			// if remaining events, so re-insert to session queue
			if (s.getEventQueue().size() > 0 && !s.isEventProcessing()) {
				squeue.offer(s);
			}
		}
	} catch (InterruptedException e) {
		LOG.warn(e.getMessage(), e);
	}
}

以上のように,PULLモードによる消費スレッドの実現は比較的簡単になり,できるだけ同時能力を最大化する(スレッド争奪,CPU自動スケジューリング)
等化が保証されています(スレッドがアイドルになったら自動的にメッセージを抽出し、異なるメッセージタイプの処理の複雑さを理解する必要はありません).
異なるソース・メッセージの相互影響を分離します(消費スレッドにフル負荷がない限り、メッセージは常にタイムリーに処理されます).