Reactor Pattern(一)


Reactor Patternは、サービス要求を処理するために1つまたは複数のサービスハンドラに同時に送信されるイベント設計モードであり、要求が到着すると、サービスハンドラは多重割り当てポリシーを使用し、これらの要求を関連する要求ハンドラに同期的に送信する.
 
こうぞう
構造的にはReactor設計パターンには4つの要素がある.
リソース、システムは出力を提供したり、入力されたりすることができます.
同期イベントマルチプレクサは、すべてのリソースをロックされていないリソースで同期操作を開始できる場合、同期イベントマルチプレクサがこのリソースをアダプタに送信するイベントループ方式を適用します.
アダプタは、要求ハンドラの登録とログアウトを処理し、同期イベントマルチプレクサから関連する要求ハンドラにリソースを送信します.
要求ハンドラ、定義された実際の処理要求を適用するプログラム、および関連するリソース.
メリット
定義的には、すべてのreactorシステムは単一スレッドモードですが、マルチスレッド環境にも適用できます.
この設計モードは、アプリケーションビジネスコードとreactorモードの実装コードを完全に分割し、これは、ビジネスコンポーネントがモジュール化され、再利用可能であることを意味する.同時に、リクエストハンドラを同期的に呼び出すため、システムに複雑なマルチスレッドを追加しない場合、reactor設計モードは粗粒度の同時実行を簡略化することができる.
制限
リバース制御のため、Reactorモードは他のプロセス設計モードに比べて非常にデバッグしにくい.さらに、リクエストハンドラを同期的に呼び出すだけで、reactorモードは、特にSMPハードウェア上で最大同時量を制限する.Reactorの伸縮性は,同期呼び出し要求ハンドラのみならず,マルチプレクサにも制限される.以前のバージョンのUnixのselectとpoll呼び出しには最大の記述子があり、この記述子が設定されすぎるとパフォーマンスの問題が発生します.最近,Solarisの/dev/poll,Linuxのepoll,BSDシステムに基づくkqueue/keventのようなより伸縮性のあるシステムが設計され,これらのシステムは高記述子の場合の高性能を実現している.
ReactorモードとJavaネットワークマルチスレッド
Reactorモードの設計目的はサーバ環境におけるマルチスレッドの問題を解決するためであり、直JDK 1.4後、JavaはNIOとSelectorモードのServerとConsumerネットワークプログラミングモードを積極的に普及させたが、Selectorモードのプログラミングは伝統的なStreamモードが優雅に見えず、ここではまず伝統的なStreamネットワークIOプログラミング方式でreactorモードを実現した.
前述の説明に従って、猫の虎を描いて、Dispatcherを設計して、Demishplexer、RequestHanderはそれぞれ配布器、多重配布器と要求処理プログラムに対応して、資源については、この例では抽象的に1つの配列になって、配列の中の内容はrequestを処理する時に使う資源を代表して、実際の応用の中で、資源は1つのデータ接続で、RPC接続などかもしれません.このモードでは、リソースが初期化されると、任意のコードは配列内のコンテンツを変更することはできません.すなわち、resourceを変更することはできません.マルチプレクサは、現在の配列の要素に基づいて、Dispatcherに「リソース」を配布するかどうかを決定します.クライアントリクエストタスクをシミュレートするrequesterもあります.
 
簡単にするために、この例では直接配布器でリソースを初期化します.配布器には2つの重要な方法があり、accept方法はrequestを受け入れ、dispatcherにリソースがあるかどうかを判断し、returnResource方法はタスクハンドラの実行が完了した後に配布器にリソースを返す.
 
class DemultiPlexer {
   // initialize resources   
 private ArrayBlockingQueue<Integer> resources = new ArrayBlockingQueue<Integer>(2);
	{
		resources.add(1);
		resources.add(2);
	}

	private ArrayList<Integer> requests = new ArrayList<Integer>(5);

	/**
	 * 
	 * @param s
	 *            the starting time of get resource action
	 * @param timeout
	 *            , limit the request resource action within a time period in
	 *            second
	 * @return a integer represent a resource requesting
	 */
	public synchronized Integer getResource(long s, int timeout) {
		for (;;) {
			if ((System.currentTimeMillis() - s) / 1000 > timeout) {
				throw new RuntimeException("time out to get resource");
			} else if (resources.size() > 0) {
				return resources.poll();
			}
			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	public void returnResource(Integer i) {
		synchronized (resources) {
			resources.add(i);
		}
	}

	/**
	 * accept a request and create a new dispatcher to assign the resource to a
	 * request handler.
	 * 
	 * @param Request
	 */
	public void accept(Integer requestId) {
		requests.add(requestId);
		Integer rid = getResource(System.currentTimeMillis(), 5);
		Dispatcher d = new Dispatcher(this, requestId, rid);
		d.createRequestHandler().start();
	}
}
 
Dispatcherはリソースを取得した後、新しいタスクハンドラの作成を担当します.ここでは直接作成され、実際のアプリケーションでは、タスクハンドラがスレッドプールに配置されて実行される可能性があります.
 
class Dispatcher {
	private Integer requestId;
	private Integer resourceId;
	private DemultiPlexer demultiPlexer;

	public Dispatcher(DemultiPlexer demultiPlexer, Integer requestId, Integer resourceId) {
		super();
		this.requestId = requestId;
		this.resourceId = resourceId;
		this.demultiPlexer = demultiPlexer;
	}

	/**
	 * create a request handler when demultiPlexer has an ideal resource.
	 * 
	 * @return RequestHandler
	 */
	public RequestHandler createRequestHandler() {
		return new RequestHandler(this, requestId, resourceId);
	}

	/**
	 * when request handler complete task, free the resource assigned.
	 */
	public synchronized void freeResource(Integer i) {
		demultiPlexer.returnResource(i);
	}
       
// getters & setters ......

}
 
RequestHandlerこそ本当のタスク実行者です.実行が完了すると、他のタスク実行者がリソースを要求できるように、リソースを返す必要があります.
 
class RequestHandler extends Thread {
	private Integer tid;
	private Integer rid;
	private Dispatcher dispatcher;

	public RequestHandler(Dispatcher dispatcher, Integer tid, Integer rid) {
		this.tid = tid;
		this.rid = rid;
		this.dispatcher = dispatcher;
	}

	@Override
	public void run() {
		System.out.println("the request No.[" + tid + "] is handling the requesting with resource [" + rid + "]");
		try {
			sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("the request No.[" + tid + "] processed the requesting with resource [" + rid + "]");

		// free resource after request was handled
		dispatcher.freeResource(rid);

		System.out.println("return resource: " + rid);
	}
}

 
Requesterクラスはリクエストプログラムであり,このクラスのrunメソッドはsocketに接続した後に独自の識別子requestidが発生し,main関数で10個のrequesterスレッドを起動し,マルチユーザの同時アクセスをシミュレートする.
 
class Requester extends Thread {

	private String address = "localhost";

	private int port = 1220;

	private int requestID;

	@Override
	public void run() {
		try {
			Socket socket = new Socket(address, port);
			socket.getOutputStream().write(requestID);
			sleep(5000);
			socket.close();
		} catch (UnknownHostException e) {
			e.printStackTrace();
		} catch (IOException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println("No." + requestID + " was sent...");
	}

	public static void main(String args[]) {
		for (int i=0; i< 10 ; i++){
			new Requester(i).start();
		}
	}

}

 
 これらのクラスがあれば、server側を起動してこれらの要求を処理すれば、完全なreactorモードのコードを構成することができます.このクラスはserversocketを起動してrequesterから送られてきたrequestidを読み出し、配布器に渡して処理し、serversocketは30 sアイドルになった後に自動的に閉じ、またこのクラスはshutdownhookを登録し、プログラムが異常に終了したときにserversocketが閉じられることを確保する.
 
public class ReactorPattern {

	DemultiPlexer demultiPlexer = new DemultiPlexer();

	public static void main(String args[]) throws InterruptedException, IOException {
		ReactorPattern reactorPattern = new ReactorPattern();
		ServerSocket server = new ServerSocket(1220);
		Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(server));

		long s = System.currentTimeMillis();
		for (;;) {
			if (System.currentTimeMillis() - s > 30000)
				break;
			Socket socket = server.accept();
			int i = socket.getInputStream().read();
			if (i != -1) {
				reactorPattern.getDemultiPlexer().accept(i);
			}			 
		}
		server.close();
	}
	public DemultiPlexer getDemultiPlexer() {
		return demultiPlexer;
	}
	
}
 
補助クラス
class ShutdownHookThread extends Thread {
	private ServerSocket server;

	public ShutdownHookThread(ServerSocket server) {
		this.server = server;
	}

	@Override
	public void run() {
		try {
			server.close();
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			server = null;
		}
	}
}

 
ReactorPatternクラスを実行してからRequesterクラスを実行します.出力は次のとおりです.
 
the request No.[7] is handling the requesting with resource [1]
the request No.[8] is handling the requesting with resource [2]
the request No.[7] processed the requesting with resource [1]
return resource: 1
the request No.[8] processed the requesting with resource [2]
return resource: 2
the request No.[5] is handling the requesting with resource [1]
the request No.[1] is handling the requesting with resource [2]
the request No.[5] processed the requesting with resource [1]
the request No.[1] processed the requesting with resource [2]
return resource: 1
return resource: 2
the request No.[9] is handling the requesting with resource [1]
the request No.[4] is handling the requesting with resource [2]
the request No.[4] processed the requesting with resource [2]
the request No.[9] processed the requesting with resource [1]
return resource: 1
return resource: 2
the request No.[0] is handling the requesting with resource [1]
the request No.[3] is handling the requesting with resource [2]
the request No.[0] processed the requesting with resource [1]
the request No.[3] processed the requesting with resource [2]
return resource: 1
return resource: 2
the request No.[2] is handling the requesting with resource [1]
the request No.[6] is handling the requesting with resource [2]
the request No.[2] processed the requesting with resource [1]
return resource: 1
the request No.[6] processed the requesting with resource [2]
return resource: 2
 
 
小結
 
Reactorモードでは,リソースの配布と返却が同期され,要求を処理する際には非同期であり,実際に使用すると,真のビジネスコードが要求ハンドラに格納される.
 
添付ファイルには完全なコードがあり、みんなで研究を勉強しています.
 
[オリジナルコンテンツ、著作権所有、転載する場合は出典を明記してください]