MINAソース分析---プロトコル符号化出力インタフェースProtocolEncoderOutputとその実現


以下はプロトコル符号化出力インタフェースProtocolEncoderOutput、
すべてのエンコードされたデータは、インタフェースProtocolEncoderOutputを介してセッションに書き込まれる必要があります.
package org.apache.mina.filter.codec;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.file.FileRegion;
import org.apache.mina.core.future.WriteFuture;

/**
 * Callback for {@link ProtocolEncoder} to generate encoded messages such as
 * {@link IoBuffer}s.  {@link ProtocolEncoder} must call {@link #write(Object)}
 * for each encoded message.
 * 
 */
public interface ProtocolEncoderOutput {
    /**
     * Callback for {@link ProtocolEncoder} to generate an encoded message such
     * as an {@link IoBuffer}. {@link ProtocolEncoder} must call
     * {@link #write(Object)} for each encoded message.
     * ProtocolEncoder ecode     
     * @param encodedMessage the encoded message, typically an {@link IoBuffer}
     *                       or a {@link FileRegion}.
     */
    void write(Object encodedMessage);

    /**
     * Merges all buffers you wrote via {@link #write(Object)} into
     * one {@link IoBuffer} and replaces the old fragmented ones with it.
     * This method is useful when you want to control the way MINA generates
     * network packets.  Please note that this method only works when you
     * called {@link #write(Object)} method with only {@link IoBuffer}s.
     *  IoBuffer 
     * @throws IllegalStateException if you wrote something else than {@link IoBuffer}
     */
    void mergeAll();

    /**
     * Flushes all buffers you wrote via {@link #write(Object)} to
     * the session.  write(Object) 
     * This operation is asynchronous , writeFuture; please wait for
     * the returned {@link WriteFuture} if you want to wait for
     * the buffers flushed.
     *
     * @return null if there is nothing to flush at all.
     */
    WriteFuture flush();
}

インタフェース
ProtocolEncoderOutputの抽象クラス実装で、主にデータのマージ方法を実現しています.
package org.apache.mina.filter.codec;

import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.apache.mina.core.buffer.IoBuffer;

/**
 * A {@link ProtocolEncoderOutput} based on queue.
 *  
 */
public abstract class AbstractProtocolEncoderOutput implements
        ProtocolEncoderOutput {
    private final Queue messageQueue = new ConcurrentLinkedQueue();

    private boolean buffersOnly = true;

    public AbstractProtocolEncoderOutput() {
        // Do nothing
    }

    public Queue getMessageQueue() {
        return messageQueue;
    }

    // , 
    public void write(Object encodedMessage) {
        if (encodedMessage instanceof IoBuffer) {
            IoBuffer buf = (IoBuffer) encodedMessage;
            if (buf.hasRemaining()) {
                messageQueue.offer(buf);
            } else {
                throw new IllegalArgumentException(
                        "buf is empty. Forgot to call flip()?");
            }
        } else {
            messageQueue.offer(encodedMessage);
            buffersOnly = false;
        }
    }

    // 
    public void mergeAll() {
        if (!buffersOnly) {// IoBuffer, 
            throw new IllegalStateException(
                    "the encoded message list contains a non-buffer.");
        }
        
        final int size = messageQueue.size();

        if (size < 2) {
            // no need to merge!
            return;
        }

        // Get the size of merged BB
        int sum = 0;
        for (Object b : messageQueue) {
            sum += ((IoBuffer) b).remaining();
        }

        // Allocate a new BB that will contain all fragments
        IoBuffer newBuf = IoBuffer.allocate(sum);

        // and merge all.
        for (; ;) {
            IoBuffer buf = (IoBuffer) messageQueue.poll();
            if (buf == null) {
                break;
            }

            newBuf.put(buf);
        }

        // Push the new buffer finally.
        newBuf.flip();
        messageQueue.add(newBuf);
    }
}

このような主なflushメソッドの実装
    // 
    private static class ProtocolEncoderOutputImpl extends
            AbstractProtocolEncoderOutput {
        private final IoSession session;

        private final NextFilter nextFilter;

        private final WriteRequest writeRequest;

        public ProtocolEncoderOutputImpl(IoSession session,
                NextFilter nextFilter, WriteRequest writeRequest) {
            this.session = session;
            this.nextFilter = nextFilter;
            this.writeRequest = writeRequest;
        }

        public WriteFuture flush() {
            Queue bufferQueue = getMessageQueue();
            WriteFuture future = null;
            
            while (!bufferQueue.isEmpty()) {
                Object encodedMessage = bufferQueue.poll();

                if (encodedMessage == null) {
                    break;
                }

                // Flush only when the buffer has remaining.
                if (!(encodedMessage instanceof IoBuffer) || 
                		((IoBuffer) encodedMessage).hasRemaining()) 
                {
                    future = new DefaultWriteFuture(session);
                    nextFilter.filterWrite(session, new EncodedWriteRequest(encodedMessage,
                            future, writeRequest.getDestination()));
                }
            }

            if (future == null) {
                future = DefaultWriteFuture.newNotWrittenFuture(
                        session, new NothingWrittenException(writeRequest));
            }

            return future;
        }
    }