手遊びサービス側フレームワークのメッセージスレッドモデル


リクエスト・メッセージ・マッピング・ポリシーの選択
前の記事では,メッセージがminaのioスレッドで直接処理されていることを示した.これには、ビジネス処理に時間がかかるとioスレッドがメッセージを受信する速度が低下し、ioのスループットに深刻な影響を及ぼすという非常に深刻な欠陥があります.
典型的には、プレイヤーのリクエストメッセージを非同期で処理するために、スレッドプールを別途作成する必要があります.
私の前の文章(ゲームサービス側スレッドモデル--ロックなしでプレイヤーリクエストを処理する)では、あるマッピングを通じて、プレイヤーのリクエストを特定のスレッドに配布して処理することができ、同じプレイヤーのリクエストがスレッド同期を必要とすることを避けることができると述べています.
その文章では、プレイヤーのキャラクターidと作業スレッドの総数をモデリングマッピングするマッピング戦略を採用しています.このモデルは実は簡単な戦略です.極端な場合、非常に多くのプレイヤーが同じスレッドに要求する(ログインしたプレイヤーidは負荷等化性を持たない).
どのようなマッピング戦略を採用するかは、ゲームのタイプとのつながりが非常に大きい.
例を挙げると、ゲームのタイプがMMORPGなら(大型マルチプレイオンラインゲーム)、シーンマップは非常に大きく、ゲームの戦闘はサービス側で発生し、pvp同期戦略は状態同期を採用している.このような戦闘案はロック競争を減らすために、同じ地図のすべてのプレイヤーに1つのスレッド上で要求することが多い.特に、戦闘はサービス側で発生し、怪物の行為、シーンタイミングタスクの実行によって、同じスレッド上.したがって、このようなゲームのリクエストメッセージマッピングポリシーは、地図idに関連付けられることが多い.
カジュアルゲームやrpgゲームではあるが、戦闘はクライアント(サービス側は検査のみ)で発生し、マッピング戦略はシーンとは関係なく、負荷のバランスを確保するだけでよい.
本論文で採用したマッピング戦略は,戦闘がサービス側で発生する設計の難しさが非常に大きいため,2つ目である=.
負荷分散を達成するために、クライアント・リンクの作成時に、セッションに自己成長インデックス番号を作成できます.これにより、新しいプレイヤーはポーリングして次の作業スレッドにマッピングされます.
IoHandlerクラスでのsessionCreated(IoSession session)アプローチにおいて,このような論理を追加した.
	@Override 
	public void sessionCreated(IoSession session) { 
		//      ip    
		System.out.println(session.getRemoteAddress().toString()); 
		session.setAttributeIfAbsent(SessionProperties.DISTRIBUTE_KEY,
				SessionManager.INSTANCE.getNextDistributeKey());
	} 
セッションマネージャgetNextDistributeKey()は原子変数の自己成長器である.
非同期メッセージタスクモデルの定義
1.配布可能なタスクインタフェースIDistributeTaskを定義する.java
package com.kingston.net.dispatch;

/**
 *         
 * @author kingston
 */
public interface IDistributeTask {
	
	/**
	 *          
	 * @return
	 */
	int distributeKey();
	
	/**
	 *     
	 * @return
	 */
	String getName();
	
	/**
	 *     
	 */
	void action();
	
	
}

2.AbstractDistributeTask抽象クラスはIDistributeTaskインタフェースのスケルトン実装であり、一部の抽象メソッドを実装する
package com.kingston.net.context;

import com.kingston.net.dispatch.IDistributeTask;

public abstract class AbstractDistributeTask implements IDistributeTask{

	/**          */
	protected int distributeKey;
	
	/**            */
	private long startMillis;
	
	/**            */
	private long endMillis;
	
	
	public String getName() {
		return this.getClass().getSimpleName();
	}
	
	public int distributeKey() {
		return distributeKey;
	}

	public long getStartMillis() {
		return startMillis;
	}

	public void markStartMillis() {
		this.startMillis = System.currentTimeMillis();
	}

	public long getEndMillis() {
		return endMillis;
	}

	public void markEndMillis() {
		this.endMillis = System.currentTimeMillis();
	}
	
}

3.メッセージタスクエンティティ(MessageTask.java)は、AbstractDistributeTaskクラスから継承されるビジネス実行に関するパラメータをカプセル化するために使用されます.
package com.kingston.net.context;

import java.lang.reflect.Method;

import com.kingston.net.Message;

public class MessageTask extends AbstractDistributeTask {
	
	private long playerId;
	/**      */
	private Message message;
	/**       */
	private Object handler;
	
	private Method method;
	/**          */
	private Object[] params;
	
	public static MessageTask valueOf(int distributeKey, Object handler,
			Method method, Object[] params) {
		MessageTask msgTask = new MessageTask();
		msgTask.distributeKey = distributeKey;
		msgTask.handler = handler;
		msgTask.method  = method;
		msgTask.params  = params;
		
		return msgTask;
	}

	@Override
	public void action() {
		try{
			method.invoke(handler, params);
		}catch(Exception e){
			
		}
		
	}

	public long getPlayerId() {
		return playerId;
	}

	public Message getMessage() {
		return message;
	}

	public Object getHandler() {
		return handler;
	}

	public Method getMethod() {
		return method;
	}

	public Object[] getParams() {
		return params;
	}
	
	@Override
	public String toString() {
		return this.getName() + "[" + handler.getClass().getName() + "@" + method.getName() + "]";
	}
	
}

メッセージの生産者消費者モデル
生産者消費者モデルは非同期論理を処理する非常に強力なツールである.メッセージ配信の目的を達成するために、サービス起動時にN個の作業スレッドを初期化し、各作業スレッドには未処理のメッセージリストを保存するためのブロックキューがある.ワークスレッド(消費者)のrun()メソッドは、デッドサイクルでキューヘッダメッセージをポップアップし、ビジネスロジックを実行する
package com.kingston.net.context;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;

/**
 *        
 * @author kingston
 */
public enum TaskHandlerContext {
	
	/**    */
	INSTANCE;
	
	private final int CORE_SIZE = Runtime.getRuntime().availableProcessors();
	/**        */
	private final List workerPool = new ArrayList<>();
	
	private final AtomicBoolean run = new AtomicBoolean(true);
	
	public void initialize() {
		for (int i=0; i taskQueue = new LinkedBlockingQueue<>();
		
		TaskWorker(int index) {
			this.workerIndex = index;
		}

		public void addTask(AbstractDistributeTask task) {
			this.taskQueue.add(task);
		}
		
		@Override
		public void run() {
			//      
			while(run.get()) {
				try {
					AbstractDistributeTask task = taskQueue.take();
					task.markStartMillis();
					task.action();
					task.markEndMillis();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
		}
	}
}

TaskHandlerContextクラスのacceptTask(MessageTask task)メソッドでは、taskのdistributeKey()メソッドで、ワークスレッドグループの指定スレッドを見つけて、対応するプロバイダキューに追加します.消費者スレッドがあれば、対応するメッセージプロバイダエントリはどこですか??
前の記事で私たちのメッセージングディスパッチ(Message Dispatcher)のdispatch(IoSession session,Message)メソッドを振り返ると、論理はこうです.
        try {  
            //    ,  
            cmdExecutor.getMethod().invoke(controller, params);  
        }catch(Exception e) {  
        }  

生産者モデルを採用したら
        int distributeKey = (int)session.getAttribute(SessionProperties.DISTRIBUTE_KEY);
        TaskHandlerContext.INSTANCE.acceptTask(
        		MessageTask.valueOf(distributeKey, controller, cmdExecutor.getMethod(), params));

これで、メッセージスレッドモデルが完了します.
文章予告:次は主にゲームルールを記述する企画構成ライブラリとプレイヤーデータを保存するユーザライブラリの設計を紹介する.
手遊びサービス側オープンソースフレームワークシリーズ完全なコードgithub->>game_server