RocketMQにおけるBrokerのブラシディスクソース分析
65748 ワード
前回のブログの最後にCommitLogのブラシ盤【RocketMQでBrokerのメッセージストレージソース分析】を簡単にご紹介しました
BrokerのCommitLogブラシはスレッドを開始し、バッファの内容をディスク(CommitLogファイル)に書き込み続けます.主に非同期ブラシと同期ブラシに分けられます
非同期ディスクには、①mappedByteBufferにキャッシュする->ディスクに書き込む(同期ディスクを含む)②writeBufferにキャッシュする->fileChannelにキャッシュする->ディスクに書き込む(前述のメモリバイトバッファをオンにする場合)の2つの方法があります.
CommitLogの2種類のブラシモード:
同期と非同期、同期ブラシはGroupCommitService、非同期ブラシはFlushRealTimeService、デフォルトは非同期ブラシ
非同期ブラシディスクを使用するモードでメモリバイトバッファをオンにすると、FlushRealtimeServiceに基づいてCommitRealtimeServiceがオンになります
同期ブラシ:
GroupCommitServiceスレッドの起動:
サイクル中のdoCommitでどんどんブラシをかけていきます
doCommitメソッド:
このうちGroupCommitServiceでは2つのListが管理されています.
GroupCommitRequestにOffsetがカプセル化されています
ここでは、前のブログの最後に述べたhandleDiskFlushメソッドを見る必要があります.
このメソッドの呼び出しはBrokerがProducerからのメッセージを受信し、ByteBufferへの書き込みが完了したときに発生する.
同期ブラシSYNC_FLUSHモードでは、AppendMessageResultからWroteOffsetおよびWroteBytesを取り出してnextOffsetを算出し、このnextOffsetをGroupCommitRequestにカプセル化し、GroupCommitServiceのputRequestメソッドでGroupCommitRequestをrequestsWriteというListに追加します.
リストのadd操作が完了すると、CAS操作でhasNotifiedという原子化されたBoolean値が修正され、waitPointのcountDownで起動操作が行われ、後で役に立つ
ここではシンクロブラシ盤であるため、GroupCommitRequestのwaitForFlushメソッドで、タイムアウト時間内にその記録に対応するブラシ盤の完了を待つ必要があり、非同期ブラシ盤はwakeupメソッドでブラシ盤タスクを起動し、待機していない.これが両者の違いである
doCommitメソッドに戻ると、ここではrequestsReadというリストに対する操作ですが、さっきrequestsWriteというリストに記録を保存していたのはrunメソッドのwaitForRunningメソッドと関係があります.
ここではCAS操作によりhasNotified値を変更し、onWaitEndメソッドを呼び出す.修正に失敗した場合、awaitがブロックに入るため、上記のputRequestメソッドが起動するのを待つ.つまり、Producerが送信したメッセージがキャッシュに成功した後、handleDiskFlushメソッドを呼び出すと、ブラシラインが起動し、もちろんブラシスレッドはタイムアウトインターバルに達した後も起動する
onWaitEndの方法を見てみましょう.
ここでは2つのリストを交換していることがわかります
これは非常に興味深い方法で、JVMに詳しいなら、新生代のレプリケーションアルゴリズムに似ていると思いますか?ブラシスレッドがブロックされるとrequestsWriteにレコードが埋め込まれ、ブラシスレッドが起動して動作すると、まずrequestsWriteとrequestsReadが交換され、このときのレコードはrequestsReadから読み出され、同時にrequestsWriteは空のListになり、メッセージレコードはこの空のListに埋め込まれ、往復する
doCommitメソッドでは、requestsReadが空でない場合、最後にrequestsReadのclearメソッドが呼び出されることがわかり、上記の説を証明しました
ブラシディスクの使い方をよく見てみましょう.
requestsReadを巡ることでGroupCommitRequestパッケージのNextOffsetを得ることができます
ここでflushedWhereは前回のブラシディスクが完了したoffsetを記録するために使用され、前回のブラシディスクの位置がNextOffset以上であれば、NextOffsetの位置からリフレッシュされたことを示し、リフレッシュする必要はありません.そうでなければmappedFileQueueのflushメソッドを呼び出してブラシディスクを行います
MappedFileQueueのflushメソッド:
ここではまずflushedWhereの前回のブラシディスクが完了したoffsetに基づいて、findMappedFileByOffsetの方法でCommitLogファイルのマッピングMappedFileに関するMappedFileとその関連操作を見つけました.私の前のブログで何度も紹介しましたが、もう面倒ではありません.
MappedFileを見つけたら、flushメソッドを呼び出します.
MappedFileのflushメソッド:
まずisAbleToFlushメソッド:
ここで、flushは前回のリフレッシュ完了後の位置を記録し、writeは現在のメッセージコンテンツの書き込み後の位置を記録する.flushLeastPagesが0より大きい場合、
ステージの要件を満たすかどうかを計算できます.OS_PAGE_SIZEは4 K、つまり1ページサイズは4 k
ここは同期ブラシディスクなので、flushLeastPagesは0で、pageの要求ではなく、キャッシュされた内容があればブラシディスクをブラシします.ただし、非同期ディスクではflushLeastPagesは4です.つまり、キャッシュされたメッセージが少なくとも4(page個数)*4 K(pageサイズ)=16 Kの場合にのみ、非同期ディスクはキャッシュをファイルに書き込むことができます.
MappedFileのflushメソッドに戻り、isAbleToFlushで書き込み要求をチェックした後
まずgetReadPositionで現在のメッセージ内容の書き込み後の位置を取得し、同期ブラシディスクであるため、ここでmappedByteBufferのforceメソッドを呼び出し、JDKのNIO操作によりmappedByteBufferキャッシュのデータをCommitLogファイルに書き込み最後にflushedPositionの値を更新する
MappedFileQueueのflushメソッドに戻り、MappedFileのflushが完了した後、flushedWhereの値を更新する必要があります.
この時点でキャッシュ内のデータの永続化が完了し、同期ブラシディスクが終了します.
非同期ブラシ:
①FlushCommitLogService:
flushCommitLogTimed:タイミングブラシintervalを使用するかどうか:ブラシ間隔、デフォルト500 msflushPhysicQueLeastPages:pageサイズ、デフォルト4つのflushPhysicQueThoroughInterval:完全ブラシ間隔、デフォルト10 s
まずlastFlushTimestamp(前回のブラシ時間)+flushPhysicQueThoroughIntervalと現在時間との比較から、一度徹底的なブラシが必要かどうかを判断し、必要に応じてflushPhysicQueLeastPagesを0にする
次にflushCommitLogTimeからflushCommitLogTimeがtrueであると判断し、sleepを使用して500 ms待ち、flushCommitLogTimeがfalseである場合、呼び出しwaitForRunningはタイムアウト時間500 msでブロックされ、その起動条件はhandleDiskFlushにおけるwakeup起動である
最後に、同期ブラシ盤と同様にmappedFileQueueを呼び出すflushメソッドにすぎないが、ここでのflushPhysicQueueLeastPagesは徹底的なリフレッシュを行うか、4 page(16 K)の基準でリフレッシュするかを決定する
②CommitRealTimeServiceこのブラシ方式はFlushCommitLogServiceと連携する必要がある
CommitRealTimeServiceのrunメソッド:
ここの論理はFlushCommitLogServiceと似ていますが、パラメータは少し違います.
interval:コミット間隔、デフォルト200 mscommitDataLeastPages:pageサイズ、デフォルト4つのcommitDataThoroughInterval:コミット完了間隔、デフォルト200 ms
基本的にはFlushCommitLogServiceと似ていますが、mappedFileQueueのcommitメソッドを呼び出しただけです
ここではmappedFileQueueのflushメソッドとよく似ていて、committedWhereでMappedFileを探しています
次に、MappedFileのcommitメソッドを呼び出します.
相変わらずMappedFileのflushメソッドと似ていて、isAbleToCommitでpageをチェックした後にcommit 0メソッドを呼び出します
MappedFileのcommit 0メソッド:
【RocketMQにおけるBrokerのメッセージ格納ソース分析】
では、この方法を使用すると、以前のmappedByteBufferではなくwriteBufferにメッセージがキャッシュされます.ここでは、writeBufferのlastCommittedPosition(前回コミット位置)からwritePos(キャッシュメッセージ終了位置)までのコンテンツがfileChannelと同じ位置にキャッシュされ、ディスクに書き込まれていません.fileChannelにキャッシュされた後、committedPosition値が更新されます.
commitメソッドに戻り、fileCfihannelへのキャッシュが完了すると、committedPositionがfileSizeに達したかどうかをチェックします.つまり、writeBufferの内容がすべてコミットされたかどうかを判断します.
すべてコミットする場合は、writeBuffertransientStorePoolを利用するには、transientStorePoolのreturnBufferメソッドを使用する必要があります.writeBuffertransientStorePoolは、実際には双方向のキューであり、CommitLogによってTransientStorePoolを管理します.
returnBufferメソッド:
ここでbyteBufferが確かに回収されていることがわかります
MappedFileQueueのcommitメソッドに戻ります.
mappedFileのcommitを完了した後、whereとcommittedWhereで本当にfileCfihannelにキャッシュしたかどうかを判断し、resultを確実にキャッシュしたのがfalse!その後committedWhereが更新されresultが返されます
ではCommitRealTimeServiceのrunメソッドに戻ると、commitが完了するとresultは本当にfileCfihannelにキャッシュされた後にflushCommitLogServiceを呼び出すwakeupメソッド、つまりFlushCommitLogServiceのブラシスレッドを起動したと判断します
以前に分析したFlushCommitLogServiceとは異なる唯一の場所は、MappedFileのflushメソッドです.
以前はメモリバイトバッファをオンにしていなかった場合、mappedByteBufferの内容をディスクに書き込んでいたのですが、その時、やっとfileChannelの番になりました
ここで、writeBufferがnullに等しくない場合、またはfileChannelのpositionが0 writeBufferとnullに等しくない場合、TransientStorePoolが回収した後に
メモリバイトバッファをオンにした場合、ディスクへの書き込みは2回キャッシュされていることがわかります
これで、Brokerのメッセージの持続化とブラシディスクのプロセス全体が完了しました.
BrokerのCommitLogブラシはスレッドを開始し、バッファの内容をディスク(CommitLogファイル)に書き込み続けます.主に非同期ブラシと同期ブラシに分けられます
非同期ディスクには、①mappedByteBufferにキャッシュする->ディスクに書き込む(同期ディスクを含む)②writeBufferにキャッシュする->fileChannelにキャッシュする->ディスクに書き込む(前述のメモリバイトバッファをオンにする場合)の2つの方法があります.
CommitLogの2種類のブラシモード:
1 public enum FlushDiskType {
2 SYNC_FLUSH,
3 ASYNC_FLUSH
4 }
同期と非同期、同期ブラシはGroupCommitService、非同期ブラシはFlushRealTimeService、デフォルトは非同期ブラシ
非同期ブラシディスクを使用するモードでメモリバイトバッファをオンにすると、FlushRealtimeServiceに基づいてCommitRealtimeServiceがオンになります
同期ブラシ:
GroupCommitServiceスレッドの起動:
1 public void run() {
2 CommitLog.log.info(this.getServiceName() + " service started");
3
4 while (!this.isStopped()) {
5 try {
6 this.waitForRunning(10);
7 this.doCommit();
8 } catch (Exception e) {
9 CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
10 }
11 }
12
13 // Under normal circumstances shutdown, wait for the arrival of the
14 // request, and then flush
15 try {
16 Thread.sleep(10);
17 } catch (InterruptedException e) {
18 CommitLog.log.warn("GroupCommitService Exception, ", e);
19 }
20
21 synchronized (this) {
22 this.swapRequests();
23 }
24
25 this.doCommit();
26
27 CommitLog.log.info(this.getServiceName() + " service end");
28 }
サイクル中のdoCommitでどんどんブラシをかけていきます
doCommitメソッド:
1 private void doCommit() {
2 synchronized (this.requestsRead) {
3 if (!this.requestsRead.isEmpty()) {
4 for (GroupCommitRequest req : this.requestsRead) {
5 // There may be a message in the next file, so a maximum of
6 // two times the flush
7 boolean flushOK = false;
8 for (int i = 0; i < 2 && !flushOK; i++) {
9 flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
10
11 if (!flushOK) {
12 CommitLog.this.mappedFileQueue.flush(0);
13 }
14 }
15
16 req.wakeupCustomer(flushOK);
17 }
18
19 long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
20 if (storeTimestamp > 0) {
21 CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
22 }
23
24 this.requestsRead.clear();
25 } else {
26 // Because of individual messages is set to not sync flush, it
27 // will come to this process
28 CommitLog.this.mappedFileQueue.flush(0);
29 }
30 }
31 }
このうちGroupCommitServiceでは2つのListが管理されています.
1 private volatile List requestsWrite = new ArrayList();
2 private volatile List requestsRead = new ArrayList();
GroupCommitRequestにOffsetがカプセル化されています
1 private final long nextOffset;
ここでは、前のブログの最後に述べたhandleDiskFlushメソッドを見る必要があります.
1 public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
2 // Synchronization flush
3 if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
4 final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
5 if (messageExt.isWaitStoreMsgOK()) {
6 GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
7 service.putRequest(request);
8 boolean flushOK = request.waitForFlush(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
9 if (!flushOK) {
10 log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
11 + " client address: " + messageExt.getBornHostString());
12 putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
13 }
14 } else {
15 service.wakeup();
16 }
17 }
18 // Asynchronous flush
19 else {
20 if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
21 flushCommitLogService.wakeup();
22 } else {
23 commitLogService.wakeup();
24 }
25 }
26 }
このメソッドの呼び出しはBrokerがProducerからのメッセージを受信し、ByteBufferへの書き込みが完了したときに発生する.
同期ブラシSYNC_FLUSHモードでは、AppendMessageResultからWroteOffsetおよびWroteBytesを取り出してnextOffsetを算出し、このnextOffsetをGroupCommitRequestにカプセル化し、GroupCommitServiceのputRequestメソッドでGroupCommitRequestをrequestsWriteというListに追加します.
1 public synchronized void putRequest(final GroupCommitRequest request) {
2 synchronized (this.requestsWrite) {
3 this.requestsWrite.add(request);
4 }
5 if (hasNotified.compareAndSet(false, true)) {
6 waitPoint.countDown(); // notify
7 }
8 }
リストのadd操作が完了すると、CAS操作でhasNotifiedという原子化されたBoolean値が修正され、waitPointのcountDownで起動操作が行われ、後で役に立つ
ここではシンクロブラシ盤であるため、GroupCommitRequestのwaitForFlushメソッドで、タイムアウト時間内にその記録に対応するブラシ盤の完了を待つ必要があり、非同期ブラシ盤はwakeupメソッドでブラシ盤タスクを起動し、待機していない.これが両者の違いである
doCommitメソッドに戻ると、ここではrequestsReadというリストに対する操作ですが、さっきrequestsWriteというリストに記録を保存していたのはrunメソッドのwaitForRunningメソッドと関係があります.
1 protected void waitForRunning(long interval) {
2 if (hasNotified.compareAndSet(true, false)) {
3 this.onWaitEnd();
4 return;
5 }
6
7 //entry to wait
8 waitPoint.reset();
9
10 try {
11 waitPoint.await(interval, TimeUnit.MILLISECONDS);
12 } catch (InterruptedException e) {
13 log.error("Interrupted", e);
14 } finally {
15 hasNotified.set(false);
16 this.onWaitEnd();
17 }
18 }
ここではCAS操作によりhasNotified値を変更し、onWaitEndメソッドを呼び出す.修正に失敗した場合、awaitがブロックに入るため、上記のputRequestメソッドが起動するのを待つ.つまり、Producerが送信したメッセージがキャッシュに成功した後、handleDiskFlushメソッドを呼び出すと、ブラシラインが起動し、もちろんブラシスレッドはタイムアウトインターバルに達した後も起動する
onWaitEndの方法を見てみましょう.
1 protected void onWaitEnd() {
2 this.swapRequests();
3 }
4
5 private void swapRequests() {
6 List tmp = this.requestsWrite;
7 this.requestsWrite = this.requestsRead;
8 this.requestsRead = tmp;
9 }
ここでは2つのリストを交換していることがわかります
これは非常に興味深い方法で、JVMに詳しいなら、新生代のレプリケーションアルゴリズムに似ていると思いますか?ブラシスレッドがブロックされるとrequestsWriteにレコードが埋め込まれ、ブラシスレッドが起動して動作すると、まずrequestsWriteとrequestsReadが交換され、このときのレコードはrequestsReadから読み出され、同時にrequestsWriteは空のListになり、メッセージレコードはこの空のListに埋め込まれ、往復する
doCommitメソッドでは、requestsReadが空でない場合、最後にrequestsReadのclearメソッドが呼び出されることがわかり、上記の説を証明しました
ブラシディスクの使い方をよく見てみましょう.
1 for (GroupCommitRequest req : this.requestsRead) {
2 // There may be a message in the next file, so a maximum of
3 // two times the flush
4 boolean flushOK = false;
5 for (int i = 0; i < 2 && !flushOK; i++) {
6 flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
7
8 if (!flushOK) {
9 CommitLog.this.mappedFileQueue.flush(0);
10 }
11 }
12
13 req.wakeupCustomer(flushOK);
14 }
requestsReadを巡ることでGroupCommitRequestパッケージのNextOffsetを得ることができます
ここでflushedWhereは前回のブラシディスクが完了したoffsetを記録するために使用され、前回のブラシディスクの位置がNextOffset以上であれば、NextOffsetの位置からリフレッシュされたことを示し、リフレッシュする必要はありません.そうでなければmappedFileQueueのflushメソッドを呼び出してブラシディスクを行います
MappedFileQueueのflushメソッド:
1 public boolean flush(final int flushLeastPages) {
2 boolean result = true;
3 MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);
4 if (mappedFile != null) {
5 long tmpTimeStamp = mappedFile.getStoreTimestamp();
6 int offset = mappedFile.flush(flushLeastPages);
7 long where = mappedFile.getFileFromOffset() + offset;
8 result = where == this.flushedWhere;
9 this.flushedWhere = where;
10 if (0 == flushLeastPages) {
11 this.storeTimestamp = tmpTimeStamp;
12 }
13 }
14
15 return result;
16 }
ここではまずflushedWhereの前回のブラシディスクが完了したoffsetに基づいて、findMappedFileByOffsetの方法でCommitLogファイルのマッピングMappedFileに関するMappedFileとその関連操作を見つけました.私の前のブログで何度も紹介しましたが、もう面倒ではありません.
MappedFileを見つけたら、flushメソッドを呼び出します.
MappedFileのflushメソッド:
1 public int flush(final int flushLeastPages) {
2 if (this.isAbleToFlush(flushLeastPages)) {
3 if (this.hold()) {
4 int value = getReadPosition();
5
6 try {
7 //We only append data to fileChannel or mappedByteBuffer, never both.
8 if (writeBuffer != null || this.fileChannel.position() != 0) {
9 this.fileChannel.force(false);
10 } else {
11 this.mappedByteBuffer.force();
12 }
13 } catch (Throwable e) {
14 log.error("Error occurred when force data to disk.", e);
15 }
16
17 this.flushedPosition.set(value);
18 this.release();
19 } else {
20 log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
21 this.flushedPosition.set(getReadPosition());
22 }
23 }
24 return this.getFlushedPosition();
25 }
まずisAbleToFlushメソッド:
1 private boolean isAbleToFlush(final int flushLeastPages) {
2 int flush = this.flushedPosition.get();
3 int write = getReadPosition();
4
5 if (this.isFull()) {
6 return true;
7 }
8
9 if (flushLeastPages > 0) {
10 return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
11 }
12
13 return write > flush;
14 }
ここで、flushは前回のリフレッシュ完了後の位置を記録し、writeは現在のメッセージコンテンツの書き込み後の位置を記録する.flushLeastPagesが0より大きい場合、
1 return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
ステージの要件を満たすかどうかを計算できます.OS_PAGE_SIZEは4 K、つまり1ページサイズは4 k
ここは同期ブラシディスクなので、flushLeastPagesは0で、pageの要求ではなく、キャッシュされた内容があればブラシディスクをブラシします.ただし、非同期ディスクではflushLeastPagesは4です.つまり、キャッシュされたメッセージが少なくとも4(page個数)*4 K(pageサイズ)=16 Kの場合にのみ、非同期ディスクはキャッシュをファイルに書き込むことができます.
MappedFileのflushメソッドに戻り、isAbleToFlushで書き込み要求をチェックした後
1 int value = getReadPosition();
2 try {
3 //We only append data to fileChannel or mappedByteBuffer, never both.
4 if (writeBuffer != null || this.fileChannel.position() != 0) {
5 this.fileChannel.force(false);
6 } else {
7 this.mappedByteBuffer.force();
8 }
9 } catch (Throwable e) {
10 log.error("Error occurred when force data to disk.", e);
11 }
12
13 this.flushedPosition.set(value);
まずgetReadPositionで現在のメッセージ内容の書き込み後の位置を取得し、同期ブラシディスクであるため、ここでmappedByteBufferのforceメソッドを呼び出し、JDKのNIO操作によりmappedByteBufferキャッシュのデータをCommitLogファイルに書き込み最後にflushedPositionの値を更新する
MappedFileQueueのflushメソッドに戻り、MappedFileのflushが完了した後、flushedWhereの値を更新する必要があります.
この時点でキャッシュ内のデータの永続化が完了し、同期ブラシディスクが終了します.
非同期ブラシ:
①FlushCommitLogService:
1 public void run() {
2 CommitLog.log.info(this.getServiceName() + " service started");
3
4 while (!this.isStopped()) {
5 boolean flushCommitLogTimed = CommitLog.this.defaultMessageStore.getMessageStoreConfig().isFlushCommitLogTimed();
6
7 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushIntervalCommitLog();
8 int flushPhysicQueueLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogLeastPages();
9
10 int flushPhysicQueueThoroughInterval =
11 CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushCommitLogThoroughInterval();
12
13 boolean printFlushProgress = false;
14
15 // Print flush progress
16 long currentTimeMillis = System.currentTimeMillis();
17 if (currentTimeMillis >= (this.lastFlushTimestamp + flushPhysicQueueThoroughInterval)) {
18 this.lastFlushTimestamp = currentTimeMillis;
19 flushPhysicQueueLeastPages = 0;
20 printFlushProgress = (printTimes++ % 10) == 0;
21 }
22
23 try {
24 if (flushCommitLogTimed) {
25 Thread.sleep(interval);
26 } else {
27 this.waitForRunning(interval);
28 }
29
30 if (printFlushProgress) {
31 this.printFlushProgress();
32 }
33
34 long begin = System.currentTimeMillis();
35 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
36 long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
37 if (storeTimestamp > 0) {
38 CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
39 }
40 long past = System.currentTimeMillis() - begin;
41 if (past > 500) {
42 log.info("Flush data to disk costs {} ms", past);
43 }
44 } catch (Throwable e) {
45 CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
46 this.printFlushProgress();
47 }
48 }
49
50 // Normal shutdown, to ensure that all the flush before exit
51 boolean result = false;
52 for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
53 result = CommitLog.this.mappedFileQueue.flush(0);
54 CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
55 }
56
57 this.printFlushProgress();
58
59 CommitLog.log.info(this.getServiceName() + " service end");
60 }
flushCommitLogTimed:タイミングブラシintervalを使用するかどうか:ブラシ間隔、デフォルト500 msflushPhysicQueLeastPages:pageサイズ、デフォルト4つのflushPhysicQueThoroughInterval:完全ブラシ間隔、デフォルト10 s
まずlastFlushTimestamp(前回のブラシ時間)+flushPhysicQueThoroughIntervalと現在時間との比較から、一度徹底的なブラシが必要かどうかを判断し、必要に応じてflushPhysicQueLeastPagesを0にする
次にflushCommitLogTimeからflushCommitLogTimeがtrueであると判断し、sleepを使用して500 ms待ち、flushCommitLogTimeがfalseである場合、呼び出しwaitForRunningはタイムアウト時間500 msでブロックされ、その起動条件はhandleDiskFlushにおけるwakeup起動である
最後に、同期ブラシ盤と同様にmappedFileQueueを呼び出すflushメソッドにすぎないが、ここでのflushPhysicQueueLeastPagesは徹底的なリフレッシュを行うか、4 page(16 K)の基準でリフレッシュするかを決定する
②CommitRealTimeServiceこのブラシ方式はFlushCommitLogServiceと連携する必要がある
CommitRealTimeServiceのrunメソッド:
1 public void run() {
2 CommitLog.log.info(this.getServiceName() + " service started");
3 while (!this.isStopped()) {
4 int interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();
5
6 int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();
7
8 int commitDataThoroughInterval =
9 CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();
10
11 long begin = System.currentTimeMillis();
12 if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {
13 this.lastCommitTimestamp = begin;
14 commitDataLeastPages = 0;
15 }
16
17 try {
18 boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);
19 long end = System.currentTimeMillis();
20 if (!result) {
21 this.lastCommitTimestamp = end; // result = false means some data committed.
22 //now wake up flush thread.
23 flushCommitLogService.wakeup();
24 }
25
26 if (end - begin > 500) {
27 log.info("Commit data to file costs {} ms", end - begin);
28 }
29 this.waitForRunning(interval);
30 } catch (Throwable e) {
31 CommitLog.log.error(this.getServiceName() + " service has exception. ", e);
32 }
33 }
34
35 boolean result = false;
36 for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {
37 result = CommitLog.this.mappedFileQueue.commit(0);
38 CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));
39 }
40 CommitLog.log.info(this.getServiceName() + " service end");
41 }
ここの論理はFlushCommitLogServiceと似ていますが、パラメータは少し違います.
interval:コミット間隔、デフォルト200 mscommitDataLeastPages:pageサイズ、デフォルト4つのcommitDataThoroughInterval:コミット完了間隔、デフォルト200 ms
基本的にはFlushCommitLogServiceと似ていますが、mappedFileQueueのcommitメソッドを呼び出しただけです
1 public boolean commit(final int commitLeastPages) {
2 boolean result = true;
3 MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
4 if (mappedFile != null) {
5 int offset = mappedFile.commit(commitLeastPages);
6 long where = mappedFile.getFileFromOffset() + offset;
7 result = where == this.committedWhere;
8 this.committedWhere = where;
9 }
10
11 return result;
12 }
ここではmappedFileQueueのflushメソッドとよく似ていて、committedWhereでMappedFileを探しています
次に、MappedFileのcommitメソッドを呼び出します.
1 public int commit(final int commitLeastPages) {
2 if (writeBuffer == null) {
3 //no need to commit data to file channel, so just regard wrotePosition as committedPosition.
4 return this.wrotePosition.get();
5 }
6 if (this.isAbleToCommit(commitLeastPages)) {
7 if (this.hold()) {
8 commit0(commitLeastPages);
9 this.release();
10 } else {
11 log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
12 }
13 }
14
15 // All dirty data has been committed to FileChannel.
16 if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
17 this.transientStorePool.returnBuffer(writeBuffer);
18 this.writeBuffer = null;
19 }
20
21 return this.committedPosition.get();
22 }
相変わらずMappedFileのflushメソッドと似ていて、isAbleToCommitでpageをチェックした後にcommit 0メソッドを呼び出します
MappedFileのcommit 0メソッド:
1 protected void commit0(final int commitLeastPages) {
2 int writePos = this.wrotePosition.get();
3 int lastCommittedPosition = this.committedPosition.get();
4
5 if (writePos - this.committedPosition.get() > 0) {
6 try {
7 ByteBuffer byteBuffer = writeBuffer.slice();
8 byteBuffer.position(lastCommittedPosition);
9 byteBuffer.limit(writePos);
10 this.fileChannel.position(lastCommittedPosition);
11 this.fileChannel.write(byteBuffer);
12 this.committedPosition.set(writePos);
13 } catch (Throwable e) {
14 log.error("Error occurred when commit data to FileChannel.", e);
15 }
16 }
17 }
【RocketMQにおけるBrokerのメッセージ格納ソース分析】
では、この方法を使用すると、以前のmappedByteBufferではなくwriteBufferにメッセージがキャッシュされます.ここでは、writeBufferのlastCommittedPosition(前回コミット位置)からwritePos(キャッシュメッセージ終了位置)までのコンテンツがfileChannelと同じ位置にキャッシュされ、ディスクに書き込まれていません.fileChannelにキャッシュされた後、committedPosition値が更新されます.
commitメソッドに戻り、fileCfihannelへのキャッシュが完了すると、committedPositionがfileSizeに達したかどうかをチェックします.つまり、writeBufferの内容がすべてコミットされたかどうかを判断します.
すべてコミットする場合は、writeBuffertransientStorePoolを利用するには、transientStorePoolのreturnBufferメソッドを使用する必要があります.writeBuffertransientStorePoolは、実際には双方向のキューであり、CommitLogによってTransientStorePoolを管理します.
1 public class TransientStorePool {
2 private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
3
4 private final int poolSize;
5 private final int fileSize;
6 private final Deque availableBuffers;
7 private final MessageStoreConfig storeConfig;
8
9 public TransientStorePool(final MessageStoreConfig storeConfig) {
10 this.storeConfig = storeConfig;
11 this.poolSize = storeConfig.getTransientStorePoolSize();
12 this.fileSize = storeConfig.getMapedFileSizeCommitLog();
13 this.availableBuffers = new ConcurrentLinkedDeque<>();
14 }
15 ......
16 }
returnBufferメソッド:
1 public void returnBuffer(ByteBuffer byteBuffer) {
2 byteBuffer.position(0);
3 byteBuffer.limit(fileSize);
4 this.availableBuffers.offerFirst(byteBuffer);
5 }
ここでbyteBufferが確かに回収されていることがわかります
MappedFileQueueのcommitメソッドに戻ります.
1 public boolean commit(final int commitLeastPages) {
2 boolean result = true;
3 MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);
4 if (mappedFile != null) {
5 int offset = mappedFile.commit(commitLeastPages);
6 long where = mappedFile.getFileFromOffset() + offset;
7 result = where == this.committedWhere;
8 this.committedWhere = where;
9 }
10
11 return result;
12 }
mappedFileのcommitを完了した後、whereとcommittedWhereで本当にfileCfihannelにキャッシュしたかどうかを判断し、resultを確実にキャッシュしたのがfalse!その後committedWhereが更新されresultが返されます
ではCommitRealTimeServiceのrunメソッドに戻ると、commitが完了するとresultは本当にfileCfihannelにキャッシュされた後にflushCommitLogServiceを呼び出すwakeupメソッド、つまりFlushCommitLogServiceのブラシスレッドを起動したと判断します
以前に分析したFlushCommitLogServiceとは異なる唯一の場所は、MappedFileのflushメソッドです.
1 if (writeBuffer != null || this.fileChannel.position() != 0) {
2 this.fileChannel.force(false);
3 } else {
4 this.mappedByteBuffer.force();
5 }
以前はメモリバイトバッファをオンにしていなかった場合、mappedByteBufferの内容をディスクに書き込んでいたのですが、その時、やっとfileChannelの番になりました
ここで、writeBufferがnullに等しくない場合、またはfileChannelのpositionが0 writeBufferとnullに等しくない場合、TransientStorePoolが回収した後に
メモリバイトバッファをオンにした場合、ディスクへの書き込みは2回キャッシュされていることがわかります
これで、Brokerのメッセージの持続化とブラシディスクのプロセス全体が完了しました.