Netty転送大ファイルインスタンス

9193 ワード

最近またnettyを用いたネットワーク通信のプログラミング開発が必要になってきた.そこでいくつかの問題に遭遇して、多くの資料を探して記録しました.クライアントがサービス側にコマンドを送信し、サービス側が受信した後、コマンドの中のいくつかの情報に基づいてサーバ上のいくつかのファイルを読み取り、ファイルの内容(ファイルの内容はデータベースの1行1行のデータと似ており、行で格納され、各フィールド値はtで分割され、各データは1行)をクライアント処理に送信します(ここでのサンプルはデータを取得してから行でファイルに保存します).
1、クライアントサービス側のコード
cmdLog = getSearchCmd();  
        ChannelFactory factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),  
                Executors.newCachedThreadPool());  
        ClientBootstrap bootstrap = new ClientBootstrap(factory);  
        final ClientBufferHandler clientHandler =  new ClientBufferHandler(cmdLog, getEncoding());  
        final DelimiterBasedFrameDecoder clientDecoder = new DelimiterBasedFrameDecoder(Integer.MAX_VALUE, false, true, ChannelBuffers.copiedBuffer("\r
", Charset.defaultCharset())); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { @Override public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(clientDecoder, clientHandler); } }); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("keepAlive", true); ChannelFuture future = bootstrap.connect(new InetSocketAddress(serverHost, serverPort)); future.awaitUninterruptibly(); if (!future.isSuccess()) { future.getCause().printStackTrace(); } future.getChannel().getCloseFuture().awaitUninterruptibly(); factory.releaseExternalResources();

cmdLogはクライアントが送信するコマンドであり、getEncoding()は、各サービス側が読み出すファイルが異なる符号化である可能性があるため、クライアント側から転送された後にこれによって符号化される.handlerが2つありますが、以下で紹介しますが、他は一般的なのでここではあまり言いません.
2、サービス側コード
 
ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool()  
                ,Executors.newCachedThreadPool());  
        ServerBootstrap bootstrap = new ServerBootstrap(factory);  
        bootstrap.setPipelineFactory(new ChannelPipelineFactory()  
        {  
            public ChannelPipeline getPipeline()  
            {  
                ChannelPipeline pipeline = Channels.pipeline(  
                        new ServerDecoderHandler(),  
                        new FileSearchHandler());  
                return pipeline;  
            }  
        });  
        bootstrap.setOption("child.tcpNoDelay", true);  
        bootstrap.setOption("child.keepAlive", true);  
        bootstrap.bind(new InetSocketAddress(8027));

サービス側のコードも簡単で、後でhandlerについて詳しく説明します.
 
3、まずクライアントのhandlerを見て、1つはClientBufferHandlerで、これはコマンドを送信してサービス側の応答handlerを受信するために使用されます.
 
ClientBufferHandler extends SimpleChannelHandler
private static final String testPath = "F:/ test/test";  
 
private String cmd;  
private String encoding;  
public ClientBufferHandler(String cmd, String encoding)  
    {  
        this.cmd = cmd;  
        this.encoding = encoding;  
    }  
 
@Override  
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception  
    {  
        int cmdLength = cmd.getBytes().length;  
        ChannelBuffer cmdBuffer = ChannelBuffers.buffer(cmdLength+4);  
        cmdBuffer.writeInt(cmdLength);  
        cmdBuffer.writeBytes(cmd.getBytes());  
        e.getChannel().write(cmdBuffer);  
    }  
@Override  
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception  
    {  
ChannelBuffer buf = (ChannelBuffer) e.getMessage();  
        if(!buf.readable())  
        {  
            return;  
        }  
 
        FileHelper.writeFile(testPath, buf.toString(Charset.forName(encoding)));

channelConnectedメソッドを書き換え、コマンドを送信します.ここでは、送信コマンドの前に4バイトのコマンド長を付けて、サービス側がすべてのコマンド情報を一度に受信することを保証します.
 
MessageReceivedメソッドを書き換え,受信サービス側が取得した情報をファイルに格納する.FileHelper.writeFileは文字列を追加してファイルに書き込むツールですが、私は解放しません.実現方法はたくさんあります.
 
これに先立ってもう1つのデコーダがあり、サービス側から送られてきたデータはすべて行ごとに送られている(各行の末尾はr)ため、nettyが提供した1つのデコーダDelimiterBasedFrameDecoderを用いて受信したデータをセパレータで分割し、構造方法はクライアント側のコードを参照してください.取得したデータが毎回完全なローであることを保証します.ここに感謝しますhttp://blog.163.com/linfenliang@126/blog/static/1278571952012182103807/提供されるnettyのパケット、パケット、粘着パケット処理メカニズム.
 
4、それからサービス側のhandlerです.
 
まず、ServerDecoderHandlerデコーダは、完全なコマンドを読み取り、コマンドの前の4バイトをコマンドの長さを識別するために使用するコンテンツを失うことを保証します.
 
public class ServerDecoderHandler extends FrameDecoder  
{  
@Override  
    protected Object decode(ChannelHandlerContext ctx, Channel c, ChannelBuffer buf) throws Exception  
    {  
        int length = 4;  
        if(buf.readableBytes() < length)  
        {  
            return null;  
        }  
        byte[] header = new byte[length];  
        buf.markReaderIndex();  
        buf.readBytes(header);  
        int cmdLength = (header[0] & 0xFF) << 24 | (header[1] & 0xFF) << 16 | (header[2] & 0xFF) << 8 | (header[3] & 0xFF);  
        if (cmdLength != 0)  
        {  
            if (buf.readableBytes() < cmdLength)  
            {  
                buf.resetReaderIndex();  
                return null;  
            }  
            length += cmdLength;  
        }  
        buf.resetReaderIndex();  
        buf.readerIndex(4);  
        return buf.readBytes(cmdLength);  
    }  
}

この部分のコードの内容は簡単で説明しません.
 
そしてFileSearchHandlerのコードです.
 
public class CdrFileSearchHandler extends SimpleChannelUpstreamHandler  
{  
@Override  
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception  
    {  
        ChannelBuffer buf = (ChannelBuffer) e.getMessage();  
        String cmd = buf.toString(Charset.defaultCharset());  
        logger.info("    :\r
" + cmd); // Channel ch = e.getChannel(); ChannelFuture f = null; final BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(getPath(cmd)))); String line = ""; while (line != null) { line = reader.readLine(); if (line != null) { ChannelBuffer returnBuf = ChannelBuffers.dynamicBuffer(); returnBuf.writeBytes((line + "\r
").getBytes()); f = ch.write(returnBuf); } } if(line == null) { f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { reader.close(); Channel ch = future.getChannel(); ch.close(); } }); } }

この部分のコードも実はとても简単で、コマンドを取得して、コマンドによってファイル(この部分のコードは省略しました)を选んで、行ごとにファイルを読んで、それからrをプラスしてそれから书いて送信します.lineがnullでない場合はデータが読み出され、nullである場合はデータが読み出されていないことを示し、ループを飛び出し、リスナーを追加し、送信が完了すると各種リンクが閉じられる.△netty自体がmessageReceivedを複数回呼び出しているため、最後のデータを送信したときに接続を閉じる必要があると判断した.
 
このようなコンテンツの送信方法は、ファイルの内容と比較して小さい場合にのみ適用できますが、youyuの一般的なhandlerは同期して実行され、ファイルの内容が大きいと、ファイルの読み取りに時間がかかるため、Workerスレッドが他の要求を処理できず、パフォーマンスに影響が高くなり、メモリオーバーフローなどの問題が発生します.
 
このときnettyが提供するhandler,ExecutionHandler,ExecutionHandlerを使用する必要があります.ExecutionHandlerは、このような状況のために設計されています.これは、タスクを非同期で処理するメカニズムを提供し、その後handler処理をスレッドプールにタスクとして送信し、直接戻ります.ExecutionHandlerは、他のHandlerとは独立したものではなく、すべてのHandlerが共有して使用されています.OrderedMemoryAwareThreadPoolExecutorスレッドプールを使用して、同じチャネル上のイベントの優先順位を保証します.
サービス側のコードでは、次のようにコードを変更する必要があります.
 
bootstrap.setPipelineFactory(new ChannelPipelineFactory()  
        {  
            public ChannelPipeline getPipeline()  
            {  
                ChannelPipeline pipeline = Channels.pipeline(  
                        new ExecutionHandler(new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)),  
                        new ServerDecoderHandler(),  
                        new CdrFileSearchHandler());  
                return pipeline;  
            }  
        });

ExecutionHandlerのhandlerを1つ追加すると処理できます.
 
 
注意ExecutionHandlerは必ず異なるpipeline間で共有してください.その役割は、ExecutionHandlerが自分で管理しているスレッドプールからスレッドを自動的に取り出して、その後ろに並んでいるビジネスロジックhandlerを処理することです.ワークスレッドはExecutionHandlerを通過した後に終了し、ChannelFactoryのワークスレッドプールによって回収されます.
その構造方法はExecutionHandler(Executor executor)であり、executorがExecutionHandler内部で管理されているスレッドプールであることは明らかである.Nettyでは、2つのスレッドプールが追加されました.jboss.netty.handler.executionパッケージの下.MemoryAwareThreadPoolExecutorは、jvmが過剰なスレッドによってメモリオーバーフローエラーを起こさないことを保証します.OrderedMemoryAwareThreadPoolExecutorは、前のスレッドプールのサブクラスであり、メモリオーバーフローがないことを保証するほか、channel eventの処理順序も保証します.詳細については、APIドキュメントを参照してください.
 
まとめてみると、これらのものを書くのに1日かかりましたが、実は多くの場合、大きなファイルをテストしている問題で、ネット上の多くのソース分析の文章は、とても浅く、本当にすべてのbufferがどのように使うか、すべてのhandlerの使い方が少ないと言っています.ソースを詳しく話す文章の多くも写しているが、いいものがたくさんある.だから、何か問題があったら、もっと資料を探して、自分で探しても解決できるはずです.(実はapiの上のものは少なくないので、時間があればnettyのソースコードをよく見てnettyの開発に役立つはずです)
 
最後にここで使用するバージョンは3.6です.