MyCatソース分析シリーズの結果

41293 ワード

より多くのMyCatソース分析は、MyCatソース分析シリーズをスタンプしてください
結果のマージ
SQL配信プロセスと前後端検証プロセスで紹介するように、ユーザ検証によるバックエンド接続バインドのNIOHandlerはMySQLConnectionHandlerのインスタンスであり、MySQLサービス側が実行結果を返すとMySQLConnecionHandlerに呼び出される.handleData()は、異なるタイプの処理配布に使用されます.
protected void handleData(byte[] data) {
        switch (resultStatus) {
        case RESULT_STATUS_INIT:
            switch (data[4]) {
            case OkPacket.FIELD_COUNT:
                handleOkPacket(data);
                break;
            case ErrorPacket.FIELD_COUNT:
                handleErrorPacket(data);
                break;
            case RequestFilePacket.FIELD_COUNT:
                handleRequestPacket(data);
                break;
            default:
                resultStatus = RESULT_STATUS_HEADER;
                header = data;
                fields = new ArrayList<byte[]>((int) ByteUtil.readLength(data,
                        4));
            }
            break;
        case RESULT_STATUS_HEADER:
            switch (data[4]) {
            case ErrorPacket.FIELD_COUNT:
                resultStatus = RESULT_STATUS_INIT;
                handleErrorPacket(data);
                break;
            case EOFPacket.FIELD_COUNT:
                resultStatus = RESULT_STATUS_FIELD_EOF;
                handleFieldEofPacket(data);
                break;
            default:
                fields.add(data);
            }
            break;
        case RESULT_STATUS_FIELD_EOF:
            switch (data[4]) {
            case ErrorPacket.FIELD_COUNT:
                resultStatus = RESULT_STATUS_INIT;
                handleErrorPacket(data);
                break;
            case EOFPacket.FIELD_COUNT:
                resultStatus = RESULT_STATUS_INIT;
                handleRowEofPacket(data);
                break;
            default:
                handleRowPacket(data);
            }
            break;
        default:
            throw new RuntimeException("unknown status!");
        }
}

上記のコードクリップでは、赤色で表記されるいくつかの方法が最も核心的であり、handleOkPacket()は主にinsert/update/deleteと残りのOKパケットを返す文が返す実行結果に用いられ、handleFieldEofPacket()handleRowPacket()およびhandleRowEofPacket()はselect文が返す実行結果に用いられる.これらの方法の内部の流れは、実際には、その上にバインドされたResponseHandler(SingleNodeHandlerまたはMultiNodeQueryHandler)インスタンスに対応するこれらの方法をそれぞれ呼び出すことである.
1.SingleNodeHandlerに含まれるこれらの方法は、単一ノードの動作を先に見て、以下のように実現される.
public void okResponse(byte[] data, BackendConnection conn) {        
        boolean executeResponse = conn.syncAndExcute();        
        if (executeResponse) {            
            session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(),
                    false);
            endRunning();
            ServerConnection source = session.getSource();
            OkPacket ok = new OkPacket();
            ok.read(data);
            if (rrs.isLoadData()) {
                byte lastPackId = source.getLoadDataInfileHandler()
                        .getLastPackId();
                ok.packetId = ++lastPackId;// OK_PACKET
                source.getLoadDataInfileHandler().clear();
            } else {
                ok.packetId = ++packetId;// OK_PACKET
            }
            ok.serverStatus = source.isAutocommit() ? 2 : 1;

            recycleResources();
            source.setLastInsertId(ok.insertId);
            ok.write(source);
            
            //TODO: add by zhuam
            //      
            QueryResult queryResult = new QueryResult(session.getSource().getUser(), 
                    rrs.getSqlType(), rrs.getStatement(), startTime);
            QueryResultDispatcher.dispatchQuery( queryResult );
 
        }
}

public void fieldEofResponse(byte[] header, List<byte[]> fields,
            byte[] eof, BackendConnection conn) {
              
            //TODO: add by zhuam
            //      
            QueryResult queryResult = new QueryResult(session.getSource().getUser(), 
                    rrs.getSqlType(), rrs.getStatement(), startTime);
            QueryResultDispatcher.dispatchQuery( queryResult );

            header[3] = ++packetId;
            ServerConnection source = session.getSource();
            buffer = source.writeToBuffer(header, allocBuffer());
            for (int i = 0, len = fields.size(); i < len; ++i)
            {
                byte[] field = fields.get(i);
                field[3] = ++packetId;
                buffer = source.writeToBuffer(field, buffer);
            }
            eof[3] = ++packetId;
            buffer = source.writeToBuffer(eof, buffer);
            
            if (isDefaultNodeShowTable) {
                for (String name : shardingTablesSet) {
                    RowDataPacket row = new RowDataPacket(1);
                    row.add(StringUtil.encode(name.toLowerCase(), source.getCharset()));
                    row.packetId = ++packetId;
                    buffer = row.write(buffer, source, true);
                }
            }
}

public void rowResponse(byte[] row, BackendConnection conn) {
        if(isDefaultNodeShowTable)
        {
            RowDataPacket rowDataPacket =new RowDataPacket(1);
            rowDataPacket.read(row);
            String table=  StringUtil.decode(rowDataPacket.fieldValues.get(0),conn.getCharset());
            if(shardingTablesSet.contains(table.toUpperCase())) return;
        }

            row[3] = ++packetId;
            buffer = session.getSource().writeToBuffer(row, allocBuffer());
}

public void rowEofResponse(byte[] eof, BackendConnection conn) {
        ServerConnection source = session.getSource();
        conn.recordSql(source.getHost(), source.getSchema(),
                node.getStatement());

        //                     
        if (!rrs.isCallStatement()) {
            session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(),
                    false);
            endRunning();
        }

        eof[3] = ++packetId;
        buffer = source.writeToBuffer(eof, allocBuffer());
        source.write(buffer);
}

okResponse()メソッドでは、まずconn.syncAndExcute()が呼び出されます.このプロシージャは、SQLのダウンロード中に以前に説明したように、既存の接続設定を変更する必要がある場合に、これらの変更が正常に返されるのを待っていないことを説明します.ここで判断します.
public boolean syncAndExcute() {
        StatusSync sync = this.statusSync;
        if (sync == null) {
            return true;
        } else {
            boolean executed = sync.synAndExecuted(this);
            if (executed) {
                statusSync = null;
            }
            return executed;
        }
}

ここではStatusSyncをさらに順番に呼び出しました.synAndExecuted()およびupdateConnectionInfo()メソッド:
public boolean synAndExecuted(MySQLConnection conn) {
            int remains = synCmdCount.decrementAndGet();
            if (remains == 0) {// syn command finished
                this.updateConnectionInfo(conn);
                conn.metaDataSyned = true;
                return false;
            } else if (remains < 0) {
                return true;
            }
            return false;
}

private void updateConnectionInfo(MySQLConnection conn)
{
            conn.xaStatus = (xaStarted == true) ? 1 : 0;
            if (schema != null) {
                conn.schema = schema;
                conn.oldSchema = conn.schema;
            }
            if (charsetIndex != null) {
                conn.setCharset(CharsetUtil.getCharset(charsetIndex));
            }
            if (txtIsolation != null) {
                conn.txIsolation = txtIsolation;
            }
            if (autocommit != null) {
                conn.autocommit = autocommit;
            }
}

現在の接続が必要な接続とデータベース名と文字セットで異なる場合、同期する必要がある数は2です.この2つの変更が成功した場合、2つのOKパケット(すなわち、SingleNodeHandler.okResponse()が2回トリガーされ、synAndExecuted()で受信したOKパケット数synCmdCountについて判断し、すべて受信するとupdateConnectionInfo()が呼び出されます.接続のこれらの設定を新しい値に設定します.同期プロセスが完了すると、SQL文の実行が返される結果処理フェーズ(select/insert/update/delete)に進みます.
1.1 insert/update/delete
  • okResponse():dataバイト配列を読み出し、OkPacketを構成し、okを呼び出す.write(source)は、フロントエンド接続FrontendConnectionのライトバッファキューwriteQueueに結果を書き込み、アプリケーションに真に送信するのは、対応するNIOSocketWRによってライトキューからByteBufferが読み出されて返される.

  • 1.2 select
  • fieldEofResponse():メタデータが戻るとトリガーされ、ヘッダーとメタデータの内容が順次バッファに書き込まれる.
  • rowResponse():行データが戻ったときにトリガーされ、行データがバッファに書き込まれる.
  • rowEofResponse():行終了フラグが戻るとトリガーされ、EOFフラグがバッファに書き込まれ、最後にsourceが呼び出される.write(buffer)は、フロントエンド接続の書き込みバッファキューにバッファを入れ、NIOSocketWRがアプリケーションに送信するのを待つ.

  • 2.マルチノード操作の結果のマージとリターンプロセスを見てみましょう.MultiNodeQueryHandlerはこのプロセスの実行を担当します.
    マルチノード操作とシングルノード操作の違いは、次の点です.
    1)複数のMySQLノードからそれぞれ送信された結果データを受信し、得られたすべての結果を簡単にマージする必要がある場合がある(順序は不確定であり、FIFOを満たす).
    2)集約関数、group by、order by、limitに関連する場合は、すべての結果を集計する必要があります.
    1つ目の場合、insert/update/deleteとselectの違いは次のとおりです.
  • insert/update/delete:この3つの文はいずれもOKパッケージを返し、最も核心的なaffectedRowsが含まれているため、MySQLノードから送信されたaffectedRowsを得るたびにそれを累積し、最後のノードのデータを受け取った後(decrementOkCountBy()方法で判断)、結果をフロントエンドに返す.
  • select:MySQLノードごとにメタデータ、行データ1、行データ2...、行データn、行終了フラグビット(EOF)は、メタデータと行終了フラグビットの各ノードの戻りは同じであるが、MultiNodeQueryHandlerはすべてのデータのマージを担当するため、実際にはメタデータ1部、すべてのノードの行データ1部、および行ビームフラグビット1部しか必要としないため、fieldEofResponse()メソッドではbooleanタイプのfieldsReturned判定により最初に受信したメタデータパケットを取得し、その後はすべて破棄し、rowEofResponse()メソッドでdecrementCountBy()メソッドですべてのノードのEOFパケットが受信されたか否かを判断し、最後のEOF書き込みバッファをフロントエンドに戻す.

  • 2つ目のケースでは、insert/update/deleteは1つ目のケースと完全に一致していますが、selectの処理はM u l t i NodeQueryHandlerに含まれるDataMergeServiceインスタンスが結果セットの集計を担当します(limitは実際にはM u l t i NodeQueryHandlerで実現されています).まず、DataMergeServiceに含まれるコア変数を見てみましょう.
    private int fieldCount;   //    
        private RouteResultset rrs;   //     
        private RowDataSorter sorter;   //    
        private RowDataPacketGrouper grouper;  //    
        private volatile boolean hasOrderBy = false;
        private MultiNodeQueryHandler multiQueryHandler;
        public PackWraper END_FLAG_PACK = new PackWraper();  //    
        private AtomicInteger areadyAdd = new AtomicInteger();
        private List<RowDataPacket> result = new Vector<RowDataPacket>();  //       
        private static Logger LOGGER = Logger.getLogger(DataMergeService.class);
        private BlockingQueue<PackWraper> packs = new LinkedBlockingQueue<PackWraper>();  //         
        private ConcurrentHashMap<String, Boolean> canDiscard = new ConcurrentHashMap<String, Boolean>();  //         map

    その中で最も重要なのは、ソートに使用されるsorterとパケットに使用されるgrouper、および集計結果を最後に格納するロー・パケット・キューresultです.DataMergeServiceのコアの方法は、次のように実現されます.
    public void onRowMetaData(Map<String, ColMeta> columToIndx, int fieldCount) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("field metadata inf:" + columToIndx.entrySet());
            }
            int[] groupColumnIndexs = null;
            this.fieldCount = fieldCount;
            if (rrs.getGroupByCols() != null) {
                groupColumnIndexs = toColumnIndex(rrs.getGroupByCols(), columToIndx);
            }
    
            if (rrs.getHavingCols() != null) {
                ColMeta colMeta = columToIndx.get(rrs.getHavingCols().getLeft()
                        .toUpperCase());
                if (colMeta != null) {
                    rrs.getHavingCols().setColMeta(colMeta);
                }
            }
    
            if (rrs.isHasAggrColumn()) {
                List<MergeCol> mergCols = new LinkedList<MergeCol>();
                Map<String, Integer> mergeColsMap = rrs.getMergeCols();
                if (mergeColsMap != null) {
                    for (Map.Entry<String, Integer> mergEntry : mergeColsMap
                            .entrySet()) {
                        String colName = mergEntry.getKey().toUpperCase();
                        int type = mergEntry.getValue();
                        if (MergeCol.MERGE_AVG == type) {
                            ColMeta sumColMeta = columToIndx.get(colName + "SUM");
                            ColMeta countColMeta = columToIndx.get(colName
                                    + "COUNT");
                            if (sumColMeta != null && countColMeta != null) {
                                ColMeta colMeta = new ColMeta(sumColMeta.colIndex,
                                        countColMeta.colIndex,
                                        sumColMeta.getColType());
                                mergCols.add(new MergeCol(colMeta, mergEntry
                                        .getValue()));
                            }
                        } else {
                            ColMeta colMeta = columToIndx.get(colName);
                            mergCols.add(new MergeCol(colMeta, mergEntry.getValue()));
                        }
                    }
                }
                // add no alias merg column
                for (Map.Entry<String, ColMeta> fieldEntry : columToIndx.entrySet()) {
                    String colName = fieldEntry.getKey();
                    int result = MergeCol.tryParseAggCol(colName);
                    if (result != MergeCol.MERGE_UNSUPPORT
                            && result != MergeCol.MERGE_NOMERGE) {
                        mergCols.add(new MergeCol(fieldEntry.getValue(), result));
                    }
                }
                grouper = new RowDataPacketGrouper(groupColumnIndexs,
                        mergCols.toArray(new MergeCol[mergCols.size()]), rrs.getHavingCols());
            }
            if (rrs.getOrderByCols() != null) {
                LinkedHashMap<String, Integer> orders = rrs.getOrderByCols();
                OrderCol[] orderCols = new OrderCol[orders.size()];
                int i = 0;
                for (Map.Entry<String, Integer> entry : orders.entrySet()) {
                    String key = StringUtil.removeBackquote(entry.getKey()
                            .toUpperCase());
                    ColMeta colMeta = columToIndx.get(key);
                    if (colMeta == null) {
                        throw new java.lang.IllegalArgumentException(
                                "all columns in order by clause should be in the selected column list!"
                                        + entry.getKey());
                    }
                    orderCols[i++] = new OrderCol(colMeta, entry.getValue());
                }
                // sorter = new RowDataPacketSorter(orderCols);
                RowDataSorter tmp = new RowDataSorter(orderCols);
                tmp.setLimit(rrs.getLimitStart(), rrs.getLimitSize());
                hasOrderBy = true;
                sorter = tmp;
            } else {
                hasOrderBy = false;
            }
            MycatServer.getInstance().getBusinessExecutor().execute(this);
    }
    
    public boolean onNewRecord(String dataNode, byte[] rowData) {
            //          ,  mysql           ,
            //                  ,          
            if (canDiscard.size() == rrs.getNodes().length) {
                LOGGER.error("now we output to client");
                packs.add(END_FLAG_PACK);
                return true;
            }
            if (canDiscard.get(dataNode) != null) {
                return true;
            }
            PackWraper data = new PackWraper();
            data.node = dataNode;
            data.data = rowData;
            packs.add(data);
            areadyAdd.getAndIncrement();
            return false;
    }
    
    public void run() {
            int warningCount = 0;
            EOFPacket eofp = new EOFPacket();
            ByteBuffer eof = ByteBuffer.allocate(9);
            BufferUtil.writeUB3(eof, eofp.calcPacketSize());
            eof.put(eofp.packetId);
            eof.put(eofp.fieldCount);
            BufferUtil.writeUB2(eof, warningCount);
            BufferUtil.writeUB2(eof, eofp.status);
            ServerConnection source = multiQueryHandler.getSession().getSource();
    
            while (!Thread.interrupted()) {
                try {
                    PackWraper pack = packs.take();
                    if (pack == END_FLAG_PACK) {
                        break; }
                    RowDataPacket row = new RowDataPacket(fieldCount);
                    row.read(pack.data);
                    if (grouper != null) {
                        grouper.addRow(row);
                    } else if (sorter != null) {
                        if (!sorter.addRow(row)) {
                            canDiscard.put(pack.node, true);
                        }
                    } else { result.add(row); }
                } catch (Exception e) {
                    LOGGER.error("Merge multi data error", e);
                }
            }
            byte[] array = eof.array();
            multiQueryHandler.outputMergeResult(source, array, getResults(array));
    }
    
    private List<RowDataPacket> getResults(byte[] eof) {
            List<RowDataPacket> tmpResult = result;
            if (this.grouper != null) {
                tmpResult = grouper.getResult();
                grouper = null;
            }
            if (sorter != null) {
                //   grouper      
                if (tmpResult != null) {
                    Iterator<RowDataPacket> itor = tmpResult.iterator();
                    while (itor.hasNext()) {
                        sorter.addRow(itor.next());
                        itor.remove();
                    }
                }
                tmpResult = sorter.getSortedResult();
                sorter = null;
            }
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("prepare mpp merge result for " + rrs.getStatement());
            }
    
            return tmpResult;
    }

    次に、これらの方法について説明します.
  • onRowMetaData():MultiNodeQueryHandler.FieldEofResponse()で呼び出され、grouperとsorterが初期化され、スレッドプールを使用してrun()メソッドが呼び出されます.
  • onNewRecord():MultiNodeQueryHandler.rowResponse()で呼び出され、まずcanDiscardの長さが次のノード数に等しいかどうかを判断し、後続のすべてのノードを説明するデータが破棄され、パッケージにEND_を入れるFLAG_PACK終了run()のループ(もう1つのより一般的な終了ループは、MultiNodeQueryHandler.rowEofResponse()ですべてのノードのEOFパケットを受信した後にトリガーされる)であり、現在のノードがcanDiscardキューにおいても同様にノードの後続データを無視している場合、ノード名と行データをPackWraperインスタンスにカプセル化し、packsに入れる.
  • run():コアはループであり、packsからPackWraperインスタンスがブロックされるたびに読み込まれ、END_であることが判明した場合、RowDataPacketインスタンスが生成される.FLAG_PACKは終了し、if-else判断:
  • を行う
    1)グループ化が必要な場合はgrouperを呼び出す.addRow()は行を追加します.パケットはソートよりも優先されるため、パケットの要件があると、ソートはすべてのパケット動作が完了するまで待たなければなりません(getResults()).
    2)逆にソートが必要な場合はsorterを呼び出す.addRow()は行を追加しようとしますが、参加が成功しないと、ノードの後続のデータが成功することはできません(ここでのソートはMaxHeapの最上位を構築することによって実現され、スタックがいっぱいになると要素の淘汰が実行され、各ノードが返すデータが内部秩序に満ちているため)、ノードをcanDiscardに入れて後続のデータを無視します.
    3)逆に、ローをresultに直接追加します.
    ループが終了すると、MultiNodeQueryHandlerが呼び出される.outputMergeResult()は、まずgetResults()を呼び出して、パケットデータ/パケットソートデータ/ソートデータ/一般データ、MultiNodeQueryHandlerを取得する.outputMergeResult()は、limitを実行するために、処理された結果セットをバッファに順次書き込み、最後にフロントエンドに戻ります.具体的には、次のようになります.
    public void outputMergeResult(final ServerConnection source,
                final byte[] eof, List<RowDataPacket> results) {
            try {
                lock.lock();
                ByteBuffer buffer = session.getSource().allocate();
                final RouteResultset rrs = this.dataMergeSvr.getRrs();
    
                //   limit  
                int start = rrs.getLimitStart();
                int end = start + rrs.getLimitSize();
    
                            if (start < 0)
                    start = 0;
    
                if (rrs.getLimitSize() < 0)
                    end = results.size();
                                
                if (end > results.size())
                    end = results.size();
    
                for (int i = start; i < end; i++) {
                    RowDataPacket row = results.get(i);
                    row.packetId = ++packetId;
                    buffer = row.write(buffer, source, true);
                }
    
                eof[3] = ++packetId;
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("last packet id:" + packetId);
                }
                source.write(source.writeToBuffer(eof, buffer));
    
            } catch (Exception e) {
                handleDataProcessException(e);
            } finally {
                lock.unlock();
                dataMergeSvr.clear();
            }
    }

     
    オリジナルの成果を尊重するために、転載する必要がある場合は、本文の出典を明記してください.
    http://www.cnblogs.com/fernandolee24/p/5243258.htmlああ、どうもありがとう