ActiveMQソース分析のKahaDBインデックスストレージ

7258 ワード

前編ではKahaDBメッセージの格納メカニズムを解析したが,次にKahaDBのインデックス格納メカニズムを解析し,インデックス格納に関連するファイルに*がある.data,*.redo,*.free.BrokerがProducerから送信されたメッセージデータを受信するとメッセージが格納され、Producerがトランザクションコマンドを送信すると、Brokerは先ほど保存したメッセージに対応するインデックスを生成してKahaDBに格納し、メッセージの読み取り効率を向上させる.
Brokerが受信したトランザクション情報は次のとおりです.
TransactionInfo {commandId = 7, responseRequired = true, 
type = 2, connectionId = ID:jiangzhiqiangdeMacBook-Pro.local-53092-1556977982195-1:1, 
transactionId = TX:ID:jiangzhiqiangdeMacBook-Pro.local-53092-1556977982195-1:1:1}

次に、KahaDBTransactionStoreクラスのcommitメソッドでトランザクションコミット操作を実行します.まず、トランザクションtxidに基づいて対応するトランザクション情報を取得します.
local_transaction_id {
  connection_id: ID:jiangzhiqiangdeMacBook-Pro.local-53092-1556977982195-1:1
  transaction_id: 1
}

次に、KahaCommitCommandオブジェクトを作成し、トランザクション情報をカプセル化し、DataFileAppenderクラスのstoreItemメソッドを呼び出して、メッセージ・ストレージ・プロセスと同様にトランザクション情報をディスクに保存します.ここでは再説明しません.MessageDatabaseのupdateIndexインデックス・ストレージ・セクションの論理に重点を置きます.
long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
        StoredDestination sd = getStoredDestination(command.getDestination(), tx);

        // Skip adding the message to the index if this is a topic and there are
        // no subscriptions.
        if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
            return -1;
        }

        // Add the message.
        int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
        long id = sd.orderIndex.getNextMessageId();
        Long previous = sd.locationIndex.put(tx, location, id);
        if (previous == null) {
            previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
            if (previous == null) {
                incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
                sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
                if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
                    addAckLocationForNewMessage(tx, command.getDestination(), sd, id);
                }
                metadata.lastUpdate = location;
                LOG.info("metadata.lastUpdate is:" + location);
            } else {

                MessageKeys messageKeys = sd.orderIndex.get(tx, previous);
                if (messageKeys != null && messageKeys.location.compareTo(location) < 0) {
                    // If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt
                    LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId());
                }
                sd.messageIdIndex.put(tx, command.getMessageId(), previous);
                sd.locationIndex.remove(tx, location);
                id = -1;
            }
        } else {
            // restore the previous value.. Looks like this was a redo of a previously
            // added message. We don't want to assign it a new id as the other indexes would
            // be wrong..
            sd.locationIndex.put(tx, location, previous);
            // ensure sequence is not broken
            sd.orderIndex.revertNextMessageId();
            metadata.lastUpdate = location;
        }
        // record this id in any event, initial send or recovery
        metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());

       return id;
    }

updateIndexメソッドでは、まずStoredDestinationイメージ情報を取得します.このオブジェクトは、他のインデックスを作成するエントリです.StoredDestinationのorderIndexを使用して、現在のメッセージに対応する順序Idが7のように、この時点で保存されているメッセージに対応する順序Idを作成します.次に、StoredDestinationオブジェクトのlocationIndexインデックスを使用して、対応するlocation、すなわち特定の格納場所を保存します.KahaDBインデックスストレージ構造はB+ツリーであるため、インデックスを作成する際にインデックスストレージに対応するノードを先に取得する必要があります.
synchronized public Value put(Transaction tx, Key key, Value value) throws IOException {
        assertLoaded();
        return getRoot(tx).put(tx, key, value);
 }
private BTreeNode getRoot(Transaction tx) throws IOException {
        return loadNode(tx, pageId, null);
}
BTreeNode loadNode(Transaction tx, long pageId, BTreeNode parent) throws IOException {
        Page> page = tx.load(pageId, marshaller);
        BTreeNode node = page.get();
        node.setPage(page);
        node.setParent(parent);
        return node;
}

pageIdに従って対応するpage値を取得し、異なるインデックスは異なるpageに格納され、pageとPageクラスは対応する.Pageクラスはディスク上のpageに対応するストレージクラスであり、データストレージの最小ユニットであり、Pageの唯一の識別はpageIdである.locationインデックスに基づいて見つかったpageIdが5の場合、位置インデックスがこのpageに格納されていることを示します.
BTreeNodeはインデックスノードを表し,keysとvaluesの2つの重要な属性を持つ.ここでkeysはインデックスに対応するkey値を格納し、valueは対応する順序Idである.locationインデックスなどのkeysとvaluesの内容は次のとおりです.
keys:[1:10779, 1:27152, 1:105640, 1:117641, 1:124686, 1:170367]
values:[1, 2, 3, 4, 5, 6]
locationインデックスのkey値は、メッセージが実際に格納されているオフセット量の位置に対応します.
インデックスに格納するノード情報を取得した後、新たに保存したメッセージインデックス情報を対応するノードに格納する必要があり、BTreeNodeのputメソッドはこの機能を実現するために使用される.putメソッドでは、まずkeysに現在保存するkey値がないことを二分検索で確認し、次に現在保存するkeyとvalueを前のkeysとvaluesに追加します.今回保存したkeysとvaluesの内容は次のとおりです.
keys:[1:10779, 1:27152, 1:105640, 1:117641, 1:124686, 1:170367, 1:201484]
values:[1, 2, 3, 4, 5, 6, 7]
最後にTransactionのstoreを介してインデックス情報BTreeNodeをディスクに格納する.
同様に、messageIdIndexおよびorderIndexストレージのメカニズムはlocationIndexと一致します.内
MessageIdIndex情報はpageId=6のpageに保存され、対応するkeysとvaluesは:
keys:[ID:jiangzhiqiangdeMacBook-Pro.local-49633-1556954797292-1:1:1:1:1, ID:jiangzhiqiangdeMacBook-Pro.local-49923-1556959139334-1:1:1:1:1, ID:jiangzhiqiangdeMacBook-Pro.local-49943-1556959422344-1:1:1:1:1, ID:jiangzhiqiangdeMacBook-Pro.local-51539-1556891457996-1:1:1:1:1, ID:jiangzhiqiangdeMacBook-Pro.local-51580-1556892301557-1:1:1:1:1, ID:jiangzhiqiangdeMacBook-Pro.local-52983-1556976736145-1:1:1:1:1, ID:jiangzhiqiangdeMacBook-Pro.local-53092-1556977982195-1:1:1:1:1]
values:[3, 4, 5, 1, 2, 6, 7]
orderIndex情報はpageId=2のpageに保存され、対応するkeysとvaluesは:
keys:[1, 2, 3, 4, 5, 6, 7]
values:[[ID:jiangzhiqiangdeMacBook-Pro.local-51539-1556891457996-1:1:1:1:1,1:10779], [ID:jiangzhiqiangdeMacBook-Pro.local-51580-1556892301557-1:1:1:1:1,1:27152], [ID:jiangzhiqiangdeMacBook-Pro.local-49633-1556954797292-1:1:1:1:1,1:105640], [ID:jiangzhiqiangdeMacBook-Pro.local-49923-1556959139334-1:1:1:1:1,1:117641], [ID:jiangzhiqiangdeMacBook-Pro.local-49943-1556959422344-1:1:1:1:1,1:124686], [ID:jiangzhiqiangdeMacBook-Pro.local-52983-1556976736145-1:1:1:1:1,1:170367], [ID:jiangzhiqiangdeMacBook-Pro.local-53092-1556977982195-1:1:1:1:1,1:201484]]
updateIndexメソッドにおけるmetadata.lastUpdate = location;この文は特に重視する必要があり、最後のメッセージ記憶に対応するオフセット値を保存するために使用される.例えば、最後に保存したオフセット値が1:201484である場合、metadata.LastUpdateに対応する値は1:201484です.このオフセット量は,KahaDBからメッセージの内容を取得する際に用いられ,消費者部分で詳細に分析される.
メッセージに対応するインデックスを作成することで、KahaDBのメッセージを読み取る際にインデックス情報に基づいて対応するメッセージ格納アドレスを素早く見つけることができ、メッセージの読み取り速度を大幅に向上させることができます.