session.write型が引き起こす思考---Mina Session.writeプロセス探索.doc


Minaに基づいてネットワーク通信プログラムを開発し、センサデータアクセスの分野で広く応用されています.今日、私は何気なく問題を発見しました.それは私がフロントエンドでsessionしていることです.write(msg)データが出た後も、FilterのEncoderメソッドを経ずにリモートサーバに書き込むことができます.私が送ったデータは複雑な符号化を必要としないので、encoderメソッドもずっと見ていません.今日は自分で書いたフィルタで符号化できないことに気づきました.この問題に対して、私は以前のコードと以前のプロジェクトの関連コードを開き、同僚もsessionです.write(IoBuffer)の後、encoderメソッドにout.write(message);Minaソースコードを追跡することで、sessionが書いたデータ型はIoBuffer形式であり、カスタムフィルタを通過していないことが分かった.次のコードは余計です

@Override
	public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
		out.write(message);//IoBuffer , encoder.
	}


次は自分がMinaを追跡してデバッグする過程を記録します.
一、シーン
クライアントは、Time時間ごとにサービス側にハートビートパケットを送信する必要があります.コードは次のとおりです.
session.write(IoBuffer.wrap(「心拍包XXX」.getBytes()));
二、現象
MyFilterのEncoderメソッドencoderは実行しません
public class MyFilter implements ProtocolCodecFactory {
	private ProtocolEncoder encoder = new MyEncoder();
	private ProtocolDecoder decoder = new MyDecoder();

	@Override
	public ProtocolEncoder getEncoder(IoSession session) throws Exception {
		return encoder;
	}


	@Override
	public ProtocolDecoder getDecoder(IoSession session) throws Exception {
		
		return decoder;
	}

}

三、分析
セッションに入るwriteメソッド、IoSessionを実現する.writeメソッドはAbstractIoSessionです.直接呼び出されたのは
public WriteFuture write(Object message) {
        return write(message, null);
}

そしてAbstractIoSession.write(Object message, SocketAddress address)
このメソッドのワークフローは、次のとおりです.
  • WriteFeatureオブジェクトを作成し、値を返す(session.write自体がwriteFeatureを返す)
  • .
  • session.write(message)中のObjectタイプのmessageはwriteRequestにカプセル化する.
  • write動作を開始します.これは主にIoFilterChainが完成します.
  • 具体的なコアコードは以下の通りです.
    // Now, we can write the message. First, create a future
            WriteFuture writeFuture = new DefaultWriteFuture(this);
            WriteRequest writeRequest = new DefaultWriteRequest(message, writeFuture, remoteAddress);
    
            // Then, get the chain and inject the WriteRequest into it
            IoFilterChain filterChain = getFilterChain();
            filterChain.fireFilterWrite(writeRequest);
    

    FireFilterWriteに引き続き追跡すると、IoFilterChainのデフォルト実装クラスDefaultIoFilterChainの重要な方法がわかります.
    public void fireFilterWrite(WriteRequest writeRequest) {
            Entry tail = this.tail;
            callPreviousFilterWrite(tail, session, writeRequest);
    }

    ここではまずDefaultIoFiterChainのデータフォーマットについて説明します.主な属性は以下の通りです.
    
    private final Map<String, Entry> name2entry = new ConcurrentHashMap<String, Entry>();
    
        /** The chain head */
        private final EntryImpl head;
    
        /** The chain tail */
        private final EntryImpl tail;
    

    ここでheadとtailはDefaultIoFilterChain固有の属性であり、name 2 entityは私たちがFilterChainに追加したフィルタです.従ってIoFilterChainは、フィルタをチェーンテーブルで保存する(('tail',prev:'myFilter:ProtocolCodecFilter',next:'null'))、ヘッダとテーブルは固定headとtailであり、対応するFilterも独自である、HeadFilterとTailFilter.
    重要な方法はcallPreviousFilterWrite(tail,session,writeRequest);
    try {
                IoFilter filter = entry.getFilter();
                NextFilter nextFilter = entry.getNextFilter();
                filter.filterWrite(nextFilter, session, writeRequest);
            } catch (Throwable e) {
                writeRequest.getFuture().setException(e);
                fireExceptionCaught(e);
            }
    

    上記の2つのコードセグメントから、IoFilterChainはまずリストからtailを見つけ、tailからfilterを探し、各filterのfilterWrite()メソッドを順番に呼び出すことがわかります.ここでの「逐次呼び出し」とは、tail->headから呼び出される、つまり逆呼び出しFilterを指す.でもfilterが見えたfilterWrite(nextFilter, session, writeRequest);この行のコードの中のパラメータは発見することができて、nextFilter、表面の意味は次のフィルタで、少し誤解して、tailの次のフィルタがnullではないかと感じて、実はさもなくば、filterWriterに入って知ることができます.
    Entry nextEntry = EntryImpl.this.prevEntry;
    callPreviousFilterWrite(nextEntry, session, writeRequest);

    headとtailフィルタを除いて、他のフィルタはどのように動作しますか?ProtocolCodecFilterのfireFilterメソッドを見てみましょう.
    if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
                nextFilter.filterWrite(session, writeRequest);
                return;
            }
    

    ここまで来て、なぜsessionなのか分かりました.write(IoBuffer.wrap()と書くと、自分で定義したフィルタを通すことができなくなり、元々fireFilterでmessageを判断していたのですが、すでにIoBufferタイプのものであればそのままreturnしていました.
    最後に実行したのはHeadFilterのfireFilterメソッドで、内容を直接見ます.
    if (writeRequest.getMessage() instanceof IoBuffer) {
                    IoBuffer buffer = (IoBuffer) writeRequest.getMessage();
                    // I/O processor implementation will call buffer.reset()
                    // it after the write operation is finished, because
                    // the buffer will be specified with messageSent event.
                    buffer.mark();
                    int remaining = buffer.remaining();
                    if (remaining == 0) {
                        // Zero-sized buffer means the internal message
                        // delimiter.
                        s.increaseScheduledWriteMessages();
                    } else {
                        s.increaseScheduledWriteBytes(remaining);
                    }
                } else {
                    s.increaseScheduledWriteMessages();
                }
    
                s.getWriteRequestQueue().offer(s, writeRequest);
                if (!s.isWriteSuspended()) {
                    s.getProcessor().flush(s);
                }

    WriteRequestQueueのデフォルトの実装はjavaです.util.concurrent.ConcurrentLinkedQueueは、受信したセッションオブジェクトを切り捨てます.