ActiveMQソース分析のKahaDBインデックスストレージ
7258 ワード
前編ではKahaDBメッセージの格納メカニズムを解析したが,次にKahaDBのインデックス格納メカニズムを解析し,インデックス格納に関連するファイルに*がある.data,*.redo,*.free.BrokerがProducerから送信されたメッセージデータを受信するとメッセージが格納され、Producerがトランザクションコマンドを送信すると、Brokerは先ほど保存したメッセージに対応するインデックスを生成してKahaDBに格納し、メッセージの読み取り効率を向上させる.
Brokerが受信したトランザクション情報は次のとおりです.
次に、KahaDBTransactionStoreクラスのcommitメソッドでトランザクションコミット操作を実行します.まず、トランザクションtxidに基づいて対応するトランザクション情報を取得します.
次に、KahaCommitCommandオブジェクトを作成し、トランザクション情報をカプセル化し、DataFileAppenderクラスのstoreItemメソッドを呼び出して、メッセージ・ストレージ・プロセスと同様にトランザクション情報をディスクに保存します.ここでは再説明しません.MessageDatabaseのupdateIndexインデックス・ストレージ・セクションの論理に重点を置きます.
updateIndexメソッドでは、まずStoredDestinationイメージ情報を取得します.このオブジェクトは、他のインデックスを作成するエントリです.StoredDestinationのorderIndexを使用して、現在のメッセージに対応する順序Idが7のように、この時点で保存されているメッセージに対応する順序Idを作成します.次に、StoredDestinationオブジェクトのlocationIndexインデックスを使用して、対応するlocation、すなわち特定の格納場所を保存します.KahaDBインデックスストレージ構造はB+ツリーであるため、インデックスを作成する際にインデックスストレージに対応するノードを先に取得する必要があります.
pageIdに従って対応するpage値を取得し、異なるインデックスは異なるpageに格納され、pageとPageクラスは対応する.Pageクラスはディスク上のpageに対応するストレージクラスであり、データストレージの最小ユニットであり、Pageの唯一の識別はpageIdである.locationインデックスに基づいて見つかったpageIdが5の場合、位置インデックスがこのpageに格納されていることを示します.
BTreeNodeはインデックスノードを表し,keysとvaluesの2つの重要な属性を持つ.ここでkeysはインデックスに対応するkey値を格納し、valueは対応する順序Idである.locationインデックスなどのkeysとvaluesの内容は次のとおりです.
keys:[1:10779, 1:27152, 1:105640, 1:117641, 1:124686, 1:170367]
values:[1, 2, 3, 4, 5, 6]
locationインデックスのkey値は、メッセージが実際に格納されているオフセット量の位置に対応します.
インデックスに格納するノード情報を取得した後、新たに保存したメッセージインデックス情報を対応するノードに格納する必要があり、BTreeNodeのputメソッドはこの機能を実現するために使用される.putメソッドでは、まずkeysに現在保存するkey値がないことを二分検索で確認し、次に現在保存するkeyとvalueを前のkeysとvaluesに追加します.今回保存したkeysとvaluesの内容は次のとおりです.
keys:[1:10779, 1:27152, 1:105640, 1:117641, 1:124686, 1:170367, 1:201484]
values:[1, 2, 3, 4, 5, 6, 7]
最後にTransactionのstoreを介してインデックス情報BTreeNodeをディスクに格納する.
同様に、messageIdIndexおよびorderIndexストレージのメカニズムはlocationIndexと一致します.内
MessageIdIndex情報はpageId=6のpageに保存され、対応するkeysとvaluesは:
keys:[ID:jiangzhiqiangdeMacBook-Pro.local-49633-1556954797292-1:1:1:1:1, ID:jiangzhiqiangdeMacBook-Pro.local-49923-1556959139334-1:1:1:1:1, ID:jiangzhiqiangdeMacBook-Pro.local-49943-1556959422344-1:1:1:1:1, ID:jiangzhiqiangdeMacBook-Pro.local-51539-1556891457996-1:1:1:1:1, ID:jiangzhiqiangdeMacBook-Pro.local-51580-1556892301557-1:1:1:1:1, ID:jiangzhiqiangdeMacBook-Pro.local-52983-1556976736145-1:1:1:1:1, ID:jiangzhiqiangdeMacBook-Pro.local-53092-1556977982195-1:1:1:1:1]
values:[3, 4, 5, 1, 2, 6, 7]
orderIndex情報はpageId=2のpageに保存され、対応するkeysとvaluesは:
keys:[1, 2, 3, 4, 5, 6, 7]
values:[[ID:jiangzhiqiangdeMacBook-Pro.local-51539-1556891457996-1:1:1:1:1,1:10779], [ID:jiangzhiqiangdeMacBook-Pro.local-51580-1556892301557-1:1:1:1:1,1:27152], [ID:jiangzhiqiangdeMacBook-Pro.local-49633-1556954797292-1:1:1:1:1,1:105640], [ID:jiangzhiqiangdeMacBook-Pro.local-49923-1556959139334-1:1:1:1:1,1:117641], [ID:jiangzhiqiangdeMacBook-Pro.local-49943-1556959422344-1:1:1:1:1,1:124686], [ID:jiangzhiqiangdeMacBook-Pro.local-52983-1556976736145-1:1:1:1:1,1:170367], [ID:jiangzhiqiangdeMacBook-Pro.local-53092-1556977982195-1:1:1:1:1,1:201484]]
updateIndexメソッドにおけるmetadata.lastUpdate = location;この文は特に重視する必要があり、最後のメッセージ記憶に対応するオフセット値を保存するために使用される.例えば、最後に保存したオフセット値が1:201484である場合、metadata.LastUpdateに対応する値は1:201484です.このオフセット量は,KahaDBからメッセージの内容を取得する際に用いられ,消費者部分で詳細に分析される.
メッセージに対応するインデックスを作成することで、KahaDBのメッセージを読み取る際にインデックス情報に基づいて対応するメッセージ格納アドレスを素早く見つけることができ、メッセージの読み取り速度を大幅に向上させることができます.
Brokerが受信したトランザクション情報は次のとおりです.
TransactionInfo {commandId = 7, responseRequired = true,
type = 2, connectionId = ID:jiangzhiqiangdeMacBook-Pro.local-53092-1556977982195-1:1,
transactionId = TX:ID:jiangzhiqiangdeMacBook-Pro.local-53092-1556977982195-1:1:1}
次に、KahaDBTransactionStoreクラスのcommitメソッドでトランザクションコミット操作を実行します.まず、トランザクションtxidに基づいて対応するトランザクション情報を取得します.
local_transaction_id {
connection_id: ID:jiangzhiqiangdeMacBook-Pro.local-53092-1556977982195-1:1
transaction_id: 1
}
次に、KahaCommitCommandオブジェクトを作成し、トランザクション情報をカプセル化し、DataFileAppenderクラスのstoreItemメソッドを呼び出して、メッセージ・ストレージ・プロセスと同様にトランザクション情報をディスクに保存します.ここでは再説明しません.MessageDatabaseのupdateIndexインデックス・ストレージ・セクションの論理に重点を置きます.
long updateIndex(Transaction tx, KahaAddMessageCommand command, Location location) throws IOException {
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
// Skip adding the message to the index if this is a topic and there are
// no subscriptions.
if (sd.subscriptions != null && sd.subscriptions.isEmpty(tx)) {
return -1;
}
// Add the message.
int priority = command.getPrioritySupported() ? command.getPriority() : javax.jms.Message.DEFAULT_PRIORITY;
long id = sd.orderIndex.getNextMessageId();
Long previous = sd.locationIndex.put(tx, location, id);
if (previous == null) {
previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
if (previous == null) {
incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
addAckLocationForNewMessage(tx, command.getDestination(), sd, id);
}
metadata.lastUpdate = location;
LOG.info("metadata.lastUpdate is:" + location);
} else {
MessageKeys messageKeys = sd.orderIndex.get(tx, previous);
if (messageKeys != null && messageKeys.location.compareTo(location) < 0) {
// If the message ID is indexed, then the broker asked us to store a duplicate before the message was dispatched and acked, we ignore this add attempt
LOG.warn("Duplicate message add attempt rejected. Destination: {}://{}, Message id: {}", command.getDestination().getType(), command.getDestination().getName(), command.getMessageId());
}
sd.messageIdIndex.put(tx, command.getMessageId(), previous);
sd.locationIndex.remove(tx, location);
id = -1;
}
} else {
// restore the previous value.. Looks like this was a redo of a previously
// added message. We don't want to assign it a new id as the other indexes would
// be wrong..
sd.locationIndex.put(tx, location, previous);
// ensure sequence is not broken
sd.orderIndex.revertNextMessageId();
metadata.lastUpdate = location;
}
// record this id in any event, initial send or recovery
metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
return id;
}
updateIndexメソッドでは、まずStoredDestinationイメージ情報を取得します.このオブジェクトは、他のインデックスを作成するエントリです.StoredDestinationのorderIndexを使用して、現在のメッセージに対応する順序Idが7のように、この時点で保存されているメッセージに対応する順序Idを作成します.次に、StoredDestinationオブジェクトのlocationIndexインデックスを使用して、対応するlocation、すなわち特定の格納場所を保存します.KahaDBインデックスストレージ構造はB+ツリーであるため、インデックスを作成する際にインデックスストレージに対応するノードを先に取得する必要があります.
synchronized public Value put(Transaction tx, Key key, Value value) throws IOException {
assertLoaded();
return getRoot(tx).put(tx, key, value);
}
private BTreeNode getRoot(Transaction tx) throws IOException {
return loadNode(tx, pageId, null);
}
BTreeNode loadNode(Transaction tx, long pageId, BTreeNode parent) throws IOException {
Page> page = tx.load(pageId, marshaller);
BTreeNode node = page.get();
node.setPage(page);
node.setParent(parent);
return node;
}
pageIdに従って対応するpage値を取得し、異なるインデックスは異なるpageに格納され、pageとPageクラスは対応する.Pageクラスはディスク上のpageに対応するストレージクラスであり、データストレージの最小ユニットであり、Pageの唯一の識別はpageIdである.locationインデックスに基づいて見つかったpageIdが5の場合、位置インデックスがこのpageに格納されていることを示します.
BTreeNodeはインデックスノードを表し,keysとvaluesの2つの重要な属性を持つ.ここでkeysはインデックスに対応するkey値を格納し、valueは対応する順序Idである.locationインデックスなどのkeysとvaluesの内容は次のとおりです.
keys:[1:10779, 1:27152, 1:105640, 1:117641, 1:124686, 1:170367]
values:[1, 2, 3, 4, 5, 6]
locationインデックスのkey値は、メッセージが実際に格納されているオフセット量の位置に対応します.
インデックスに格納するノード情報を取得した後、新たに保存したメッセージインデックス情報を対応するノードに格納する必要があり、BTreeNodeのputメソッドはこの機能を実現するために使用される.putメソッドでは、まずkeysに現在保存するkey値がないことを二分検索で確認し、次に現在保存するkeyとvalueを前のkeysとvaluesに追加します.今回保存したkeysとvaluesの内容は次のとおりです.
keys:[1:10779, 1:27152, 1:105640, 1:117641, 1:124686, 1:170367, 1:201484]
values:[1, 2, 3, 4, 5, 6, 7]
最後にTransactionのstoreを介してインデックス情報BTreeNodeをディスクに格納する.
同様に、messageIdIndexおよびorderIndexストレージのメカニズムはlocationIndexと一致します.内
MessageIdIndex情報はpageId=6のpageに保存され、対応するkeysとvaluesは:
keys:[ID:jiangzhiqiangdeMacBook-Pro.local-49633-1556954797292-1:1:1:1:1, ID:jiangzhiqiangdeMacBook-Pro.local-49923-1556959139334-1:1:1:1:1, ID:jiangzhiqiangdeMacBook-Pro.local-49943-1556959422344-1:1:1:1:1, ID:jiangzhiqiangdeMacBook-Pro.local-51539-1556891457996-1:1:1:1:1, ID:jiangzhiqiangdeMacBook-Pro.local-51580-1556892301557-1:1:1:1:1, ID:jiangzhiqiangdeMacBook-Pro.local-52983-1556976736145-1:1:1:1:1, ID:jiangzhiqiangdeMacBook-Pro.local-53092-1556977982195-1:1:1:1:1]
values:[3, 4, 5, 1, 2, 6, 7]
orderIndex情報はpageId=2のpageに保存され、対応するkeysとvaluesは:
keys:[1, 2, 3, 4, 5, 6, 7]
values:[[ID:jiangzhiqiangdeMacBook-Pro.local-51539-1556891457996-1:1:1:1:1,1:10779], [ID:jiangzhiqiangdeMacBook-Pro.local-51580-1556892301557-1:1:1:1:1,1:27152], [ID:jiangzhiqiangdeMacBook-Pro.local-49633-1556954797292-1:1:1:1:1,1:105640], [ID:jiangzhiqiangdeMacBook-Pro.local-49923-1556959139334-1:1:1:1:1,1:117641], [ID:jiangzhiqiangdeMacBook-Pro.local-49943-1556959422344-1:1:1:1:1,1:124686], [ID:jiangzhiqiangdeMacBook-Pro.local-52983-1556976736145-1:1:1:1:1,1:170367], [ID:jiangzhiqiangdeMacBook-Pro.local-53092-1556977982195-1:1:1:1:1,1:201484]]
updateIndexメソッドにおけるmetadata.lastUpdate = location;この文は特に重視する必要があり、最後のメッセージ記憶に対応するオフセット値を保存するために使用される.例えば、最後に保存したオフセット値が1:201484である場合、metadata.LastUpdateに対応する値は1:201484です.このオフセット量は,KahaDBからメッセージの内容を取得する際に用いられ,消費者部分で詳細に分析される.
メッセージに対応するインデックスを作成することで、KahaDBのメッセージを読み取る際にインデックス情報に基づいて対応するメッセージ格納アドレスを素早く見つけることができ、メッセージの読み取り速度を大幅に向上させることができます.