Nioではクライアントが1回データを送信し、サービス側は複数回readableイベントを通じて完全に読み取ることができる.


最近、c/sプロジェクトを開発しています.主な機能は、クライアントがデータを収集してサービス側に送信することです.
クライアントとサービス側のインタラクションの概要は次のとおりです.
クライアントは1回のデータを送信し、サービス側readは1回のデータを解析して入庫する.
まず問題を説明します.後で詳細な設計とコードが貼られます.
デバッグ環境:
クライアントとサービス側は同じpcマシン(パケット損失の可能性を排除)
質問:
このプロジェクトはデバッグ時に
たまたま問題が発生しました.
次のようになります.
クライアント
一度に9793 Bのデータを送信し、サービス側nioはreactorがトリガした1回のreadableイベントでchannel中のストリームデータを読み出し、このreadableイベントのストリームデータの読み出しが完了すると、総データ量は4380 Bであり、クライアントが送信した9793 Bより小さく、残りのデータは次の1回のreadableイベントで読み出されることが分かった.
従って、クライアントが送信するパケットの1つにより、サービス側は2回または複数回に分けて受信し、そのパケットの後続の処理に異常が発生する.
最初に設計した時、このような状況を全く考慮していなかったため、誰が似たような問題にぶつかったことも発見されず、今コードを変更するのは面倒で、英雄豪傑の皆さんが分析を手伝ってほしいと思っています.
このプロジェクトの概略設計と作業過程は以下の通りである.
クライアント:
クライアントはnioを使用しないで、ただ普通のsocket接続サービス端を使用します(違いがあるかどうか分からないので、問題の原因はここにありますか?);
クライアントは収集したデータを、サービス側にタイミングよく送信し、送信する.
データ量の大きさは固定されていない.
送信部分のコードは次のとおりです.

    public void send2server(int command, byte[] msg) throws IOException
    {
        if (out != null)
        {
            msg = MessageFilter.addHeader(command, msg);
            _log.info("     :" + msg.length);
            //msg byte[]  
            out.write(msg);
            out.flush();
        }
        else
        {
            throw new IOException("     ");
        }
    }
 

サービス:
サービス側はnioによってreadableイベントをトリガする
チャンネルのデータを一度に読み込み、解析してライブラリに入れます.
サービス側nioの部分コードは以下の通りです.

                        try
                        {
                            //   IO  
                                 if (key.isAcceptable())
                                accept(key);
                            else if (key.isReadable())
                            {
                                _log.debug("   IO");

                                //              
                                Reader2.processKey(key); //               
                                SocketChannel sc = (SocketChannel) key.channel();
                                //      
                                if (sc.socket().getKeepAlive())
                                {
                                    key.interestOps(key.interestOps()
                                            & ~SelectionKey.OP_READ);
                                    _log.debug("     ");
                                }
                                else
                                    key.cancel();
                            }
                            else if (key.isWritable())
                            {
                                _log.debug("   IO");
                                SocketChannel sc = (SocketChannel) key.channel();
                                Writer.processRequest(key); //                  
                                     //      
                                if (sc.socket().getKeepAlive())
                                    key.interestOps(SelectionKey.OP_READ);
                                else
                                    key.cancel();
                            }
                        }
                        catch (Exception e)
                        {
                            key.cancel();
                            _log.info("  key  ,         ");
                            e.printStackTrace();
                        }

//この問題に関するキーコードはこの文
Reader2.processKey(key);//リード・サービス・スレッドの発行クライアント・データの読み出し
Reader 2クラスコードは次のとおりです.

package com.gdtec.nmt.nioserver.io;

import java.io.IOException;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.LinkedList;
import java.util.List;

import org.apache.log4j.Logger;

import com.gdtec.nmt.filter.MessageFilter;
import com.gdtec.nmt.nioserver.InterestInfo;
import com.gdtec.nmt.nioserver.Notifier;
import com.gdtec.nmt.nioserver.Request;
import com.gdtec.nmt.nioserver.Server;
import com.gdtec.nmt.pojo.ClientInfo;
import com.gdtec.nmt.pojo.MessagePackage;


/**
 * 

Title:

*

Description:

* @author zhuhongzheng * @version 1.0 */ public class Reader2 extends Thread { private static Logger _log = Logger.getLogger(Reader2.class); private static List requestsPool = new LinkedList(); private static Notifier notifier = Notifier.getNotifier(); public Reader2() { } public void run() { while (true) { try { Request request; synchronized (requestsPool) { while (requestsPool.isEmpty()) { requestsPool.wait(); } request = requestsPool.remove(0); } // processRequest(request); } catch (Exception e) { _log.info(" !", e); } } } private static int BUFFER_SIZE = 20480; /** * * @param sc * @return * @throws IOException */ private static byte[] readInput(SocketChannel sc) throws IOException { ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE); int off = 0; int r = 0; byte[] data = new byte[BUFFER_SIZE]; // , , read 4380B , while , // ? while ((r = sc.read(buffer)) > 0) { _log.debug(" :" + r); if ((off + r) > data.length) { data = grow(data, BUFFER_SIZE * 2); // System.out.println(" :" + data.length); _log.debug(" :" + data.length); } byte[] buf = buffer.array(); System.arraycopy(buf, 0, data, off, r); off += r; buffer.clear(); _log.debug(" :" + off); } String memoryMsg = ", freeMemory=" + (Runtime.getRuntime().freeMemory() / (1024) + "k,totalMemory=" + (Runtime.getRuntime().totalMemory() / (1024)) + "k"); _log.info(" :" + off + memoryMsg); byte[] req = new byte[off]; System.arraycopy(data, 0, req, 0, off); return req; } /** * * @param request Request */ public static void processRequest(Request request) { try { // byte[] received = request.getDataInputByte(); _log.debug(" :" + received.length); //System.out.println("reader :" + new String(received, "GBK")); MessagePackage msgPackage = MessageFilter.decodeMessage(received); // String clientData = readInput(sc); String clientData = null; // , if (msgPackage == null || null == msgPackage.getBody()) { throw new Exception(" , "); } clientData = msgPackage.getBody(); request.setDataInput(clientData); request.setDataInputByte(new byte[0]); request.setParameter("command", msgPackage.getCommand()); request.setParameter("bodyLength", msgPackage.getBodyLength()); // onRead notifier.fireOnRead(request); } catch (Exception e) { _log.error(e); } try { //SocketChannel accept request SocketChannel sc = request.getSc(); if (sc == null) return; // ,read write selector, channel write if (sc.socket().getKeepAlive()) { Server.registerInterest(new InterestInfo(sc, SelectionKey.OP_WRITE, request)); } } catch (SocketException e) { _log.error(" !", e); } } public static void processKey(SelectionKey key) { SocketChannel sc = (SocketChannel) key.channel(); Request request = (Request) key.attachment(); try { byte[] received = readInput(sc); if(received.length==0) { // TODO: 。。 ; return ; } _log.debug(" :" + received.length); request.setDataInputByte(received); put2RequestPool(request); } catch (IOException e1) { key.cancel(); _log.error(e1); ClientInfo client = (ClientInfo) request.getParameter("ClientInfo"); Server.clientConnectionError(client); } catch (Exception e) { _log.error(e); } } /** * , , */ public static void put2RequestPool(Request request) { synchronized (requestsPool) { requestsPool.add(requestsPool.size(), request); requestsPool.notifyAll(); } } /** * * @param src byte[] * @param size int * @return byte[] */ public static byte[] grow(byte[] src, int size) { byte[] tmp = new byte[src.length + size]; System.arraycopy(src, 0, tmp, 0, src.length); return tmp; } }

//この問題に関連するキーコードは、後で私が貼ったログから分かるように、ここでは4380 Bのデータしかreadされておらず、このwhileループは、一度しか行われていません.
//何解?
        while ((r = sc.read(buffer)) > 0)
ログの内容:
クライアントのログの一部:
参照
2010-11-16 18:53:10[INFO]ClientMain.JAva 369■Thread-2■:送信バイト数:9793
2010-11-16 18:53:10[INFO]ClientWriter.JAva 166■Thread-2■:バッファがいっぱい、データ送信、バッファクリア
サービス側ログ:
参照
2010-11-16 18:51:16[INFO]Reader2.JAva 121■main■:発見データ:9600
2010-11-16 18:51:16[INFO]Reader2.JAva 121■main■:発見データ:4441
2010-11-16 18:51:16[INFO]Reader2.JAva 138■main■:共読:14041,freeMemory=2936 k,totalMemory=5056 k
2010-11-16 18:51:16[INFO]ResultsHandler.JAva 271■Thread-11■:111個保存
2010-11-16 18:53:10[INFO]Reader2.JAva 121■main■:発見データ:4380
2010-11-16 18:53:10[INFO]Reader2.JAva 138■main■:共読:4380,freeMemory=2090 k,totalMemory=5056 k
2010-11-16 18:53:10[ERROR]ResultsHandler.JAva 280■Thread-11■:保存タスク結果に異常が発生:
java.sql.BatchUpdateException:'127.0'付近に構文エラーがあります.
at com.microsoft.sqlserver.jdbc.SQLServerStatement.executeBatch(SQLServerStatement.java:1693)
at com.gdtec.nmt.nioserver.Handler.ResultsHandler.saveTasks(ResultsHandler.java:269)
at com.gdtec.nmt.nioserver.Handler.ResultsHandler.onRead(ResultsHandler.java:105)
at com.gdtec.nmt.nioserver.Notifier.fireOnRead(Notifier.java:71)
at com.gdtec.nmt.nioserver.io.Reader2.processRequest(Reader2.java:175)
at com.gdtec.nmt.nioserver.io.Reader2.run(Reader2.java:51)
2010-11-16 18:53:10[INFO]Reader2.JAva 121■main■:発見データ:5413
2010-11-16 18:53:10[INFO]Reader2.JAva 138■main■:共読:5413,freeMemory=1056 k,totalMemory=5056 k
2010-11-16 18:53:10[ERROR]Reader2.java 180■Thread-11■: java.lang.NumberFormatException: For input string: "0A4E-D6DF-27D8-C444E3A3EA67'"
2010-11-16 18:53:21[INFO]Reader2.JAva 121■main■:発見データ:4380
2010-11-16 18:53:21[INFO]Reader2.JAva 121■main■:発見データ:4493
2010-11-16 18:53:21[INFO]Reader2.JAva 138■main■:共読:8873,freeMemory=1306 k,totalMemory=5056 k
2010-11-16 18:53:21[INFO]ResultsHandler.JAva 271■Thread-11■:68データ保存
ほとんどのエラーは、サービス側が4380 Bを読み取り専用にしたためであり、ログから見ると、クライアントが9793 Bを送信し、サービス側が初めてreadableを発見したのは4380 Bのみであり、次のreadableを発見したのは5413 Bであり、クライアントが送信した9793 Bにちょうど等しいことから、1回のreadableイベントがioストリームを読み取り終わるとは限らず、複数回に分けてreadableイベントを読み取る可能性があることがわかる.これはまた私にnioの底層の原理を理解していないような気がします.reactorがreadable事件をトリガーする条件は何ですか.