【Kafkaソース】ログ処理

9725 ワード

現在、kafkaのログに記録されている内容は比較的多く、具体的な記憶内容はこのブログを参照して、書いたほうがいいです.格納されているコンテンツはまだ多く、格納ファイルが大きい場合、これらのログをどのように処理すればいいかがわかります.次にkafka起動プロセスのソースコードによりkafkaのログ処理プロセスを解析する.

一、入口方法


kafkaServerでscalaのstartメソッドには、次のような呼び出しがあります.
/* start log manager */
logManager = createLogManager(zkUtils.zkClient, brokerState)
logManager.startup()

二、定時任務の総方法


これがログ関連のタイミングタスクを起動したものですが、具体的にはどのような内容がありますか?中に入ってみましょう.
def startup() {
    /* Schedule the cleanup task to delete old logs */
    if(scheduler != null) {
      info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs))
      scheduler.schedule("kafka-log-retention", 
                         cleanupLogs, 
                         delay = InitialTaskDelayMs, 
                         period = retentionCheckMs, 
                         TimeUnit.MILLISECONDS)
      info("Starting log flusher with a default period of %d ms.".format(flushCheckMs))
      scheduler.schedule("kafka-log-flusher", 
                         flushDirtyLogs, 
                         delay = InitialTaskDelayMs, 
                         period = flushCheckMs, 
                         TimeUnit.MILLISECONDS)
      scheduler.schedule("kafka-recovery-point-checkpoint",
                         checkpointRecoveryPointOffsets,
                         delay = InitialTaskDelayMs,
                         period = flushCheckpointMs,
                         TimeUnit.MILLISECONDS)
    }
    if(cleanerConfig.enableCleaner)
      cleaner.startup()
  }

主に、タスクのタイミング実行を処理するために、タイミングタスクスレッドプールが使用されていることがわかります.具体的には、2つのブロックが含まれています.一部はログのクリーンアップであり、もう一部はログをファイルに書き込むことです.

2.1ログのクリーンアップ


まずcleanupLogsです.これは構成、logに関連しています.retention.check.interval.ms、すなわち、ログクリーンアップを実行する時間がどれくらいかかりますか.具体的な方法を見てみましょう.
/**
   * Delete any eligible logs. Return the number of segments deleted.
   */
  def cleanupLogs() {
    debug("Beginning log cleanup...")
    var total = 0
    val startMs = time.milliseconds
    for(log 

これはもう一つの構成にも関連しています.cleanup.policy、つまりクリーンアップの戦略は、現在いくつかあります.一つはcompact、つまりログ圧縮で、ログファイルをクリーンアップしません.もう一つはdelete、つまり削除です.これは主に2つの方法があります.それぞれ見てみましょう.

2.1.1期限切れログのクリーンアップ

  /**
   * Runs through the log removing segments older than a certain age
   */
  private def cleanupExpiredSegments(log: Log): Int = {
    if (log.config.retentionMs < 0)
      return 0
    val startMs = time.milliseconds
    log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
  }

これはまた1つの構成に関連しています:retention.ms,このパラメータはログが保存された時間を表す.0より小さい場合は、無効にならないことを示し、削除するという説もありません.
もちろん、ファイルの変更時間と現在の時間の差が設定したログの保存時間より大きい場合は、削除アクションを実行します.具体的な削除方法は次のとおりです.
  /**
   * Delete any log segments matching the given predicate function,
   * starting with the oldest segment and moving forward until a segment doesn't match.
   * @param predicate A function that takes in a single log segment and returns true iff it is deletable
   * @return The number of segments deleted
   */
  def deleteOldSegments(predicate: LogSegment => Boolean): Int = {
    lock synchronized {
      //find any segments that match the user-supplied predicate UNLESS it is the final segment
      //and it is empty (since we would just end up re-creating it)
      val lastEntry = segments.lastEntry
      val deletable =
        if (lastEntry == null) Seq.empty
        else logSegments.takeWhile(s => predicate(s) && (s.baseOffset != lastEntry.getValue.baseOffset || s.size > 0))
      val numToDelete = deletable.size
      if (numToDelete > 0) {
        // we must always have at least one segment, so if we are going to delete all the segments, create a new one first
        if (segments.size == numToDelete)
          roll()
        // remove the segments for lookups
        deletable.foreach(deleteSegment(_))
      }
      numToDelete
    }
  }

このロジックは、転送されたpredicateに基づいて、削除された要求に合致するログを判断し、deletableに入れ、最後にdeletableを巡り、削除操作を行う.
  private def deleteSegment(segment: LogSegment) {
    info("Scheduling log segment %d for log %s for deletion.".format(segment.baseOffset, name))
    lock synchronized {
      segments.remove(segment.baseOffset)
      asyncDeleteSegment(segment)
    }
  }
  
    private def asyncDeleteSegment(segment: LogSegment) {
    segment.changeFileSuffixes("", Log.DeletedFileSuffix)
    def deleteSeg() {
      info("Deleting segment %d from log %s.".format(segment.baseOffset, name))
      segment.delete()
    }
    scheduler.schedule("delete-file", deleteSeg, delay = config.fileDeleteDelayMs)
  }

これはファイルを非同期で削除するプロセスで、file.delete.delay.ms.ログ・ファイルを削除する頻度を示します.削除のプロセスは、ログの接尾辞を先に変更することです.delete、そしてタイミングで削除します.

2.1.2大きなログのクリーンアップ

  /**
   *  Runs through the log removing segments until the size of the log
   *  is at least logRetentionSize bytes in size
   */
  private def cleanupSegmentsToMaintainSize(log: Log): Int = {
    if(log.config.retentionSize < 0 || log.size < log.config.retentionSize)
      return 0
    var diff = log.size - log.config.retentionSize
    def shouldDelete(segment: LogSegment) = {
      if(diff - segment.size >= 0) {
        diff -= segment.size
        true
      } else {
        false
      }
    }
    log.deleteOldSegments(shouldDelete)
  }

このコードは比較的明確で、ログのサイズがretentionより大きい場合.bytesは、削除するようにマークされ、呼び出される方法は同じであり、deleteOldSegmentsでもあります.余計なことは言わない.

2.2ハードディスク(HDD)へのログの書き込み


これには2つのタイミングタスクがあります.
scheduler.schedule("kafka-log-flusher", 
                         flushDirtyLogs, 
                         delay = InitialTaskDelayMs, 
                         period = flushCheckMs, 
                         TimeUnit.MILLISECONDS)
scheduler.schedule("kafka-recovery-point-checkpoint",
                         checkpointRecoveryPointOffsets,
                         delay = InitialTaskDelayMs,
                         period = flushCheckpointMs,
                         TimeUnit.MILLISECONDS)

2つの構成に関連します.
  • log.flush.scheduler.interval.ms:ハードディスク(HDD)に固化する必要があるかどうかを確認する間隔
  • log.flush.offset.checkpoint.interval.ms:データ復旧のために、最終的にハードディスクを硬化する時点を制御する
  • を修正する必要はありません.
    私たちはそれぞれ次の2つの任務を見て何をしましたか.

    2.2.1 flushDirtyLogs

      /**
       * Flush any log which has exceeded its flush interval and has unwritten messages.
       */
      private def flushDirtyLogs() = {
        debug("Checking for dirty logs to flush...")
    
        for ((topicAndPartition, log) = log.config.flushMs)
              log.flush
          } catch {
            case e: Throwable =>
              error("Error flushing topic " + topicAndPartition.topic, e)
          }
        }
      }
    

    この方法の目的は、ログをハードディスク(HDD)にリフレッシュし、データが失われないようにすることです.
    この設計はflush.ms.ログのリフレッシュ時間と現在の時間の差が構成の値より大きい場合、flush操作が実行されます.
    /**
       * Flush all log segments
       */
      def flush(): Unit = flush(this.logEndOffset)
    
      /**
       * Flush log segments for all offsets up to offset-1
       * @param offset The offset to flush up to (non-inclusive); the new recovery point
       */
      def flush(offset: Long) : Unit = {
        if (offset <= this.recoveryPoint)
          return
        debug("Flushing log '" + name + " up to offset " + offset + ", last flushed: " + lastFlushTime + " current time: " +
              time.milliseconds + " unflushed = " + unflushedMessages)
        for(segment  this.recoveryPoint) {
            this.recoveryPoint = offset
            lastflushedTime.set(time.milliseconds)
          }
        }
      }
      
        /**
       * Flush this log segment to disk
       */
      @threadsafe
      def flush() {
        LogFlushStats.logFlushTimer.time {
          log.flush()
          index.flush()
        }
      }
    

    現在のsegmentの最後のoffset、すなわちlogEndOffsetを見つけ、flushメソッドを呼び出してログファイルにリフレッシュします.まず、現在のoffsetがrecoveryPointより小さいかどうかを判断します.つまり、ハードディスクにリフレッシュする必要がある最初のoffsetです.小さい場合は、直接戻ります.そうしないとflush操作を続行します.
    ログ内のrecoveryPointからoffsetまでのすべてのログをログファイルにリフレッシュし、segmentを呼び出す.flush()メソッド上.logファイルとindexファイルをリフレッシュします.

    2.2.2 checkpointRecoveryPointOffsets

      /**
       * Write out the current recovery point for all logs to a text file in the log directory 
       * to avoid recovering the whole log on startup.
       */
      def checkpointRecoveryPointOffsets() {
        this.logDirs.foreach(checkpointLogsInDir)
      }
      
        /**
       * Make a checkpoint for all logs in provided directory.
       */
      private def checkpointLogsInDir(dir: File): Unit = {
        val recoveryPoints = this.logsByDir.get(dir.toString)
        if (recoveryPoints.isDefined) {
          this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
        }
      }
    

    これは主にリカバリポイントのデータをファイルに書くために使用されます.ファイル名はrecovery-point-offset-checkpointです.内容は次のとおりです.
  • の最初の行は、現在のバージョンversion
  • です.
  • の2行目は、すべてのオフセット量の数字と、各topicとpartitionの組合せの数
  • である.
  • 以降はすべてのtopicとpartitionの組み合わせを巡回し、各行に示す内容は:topic partition offset
  • しかし、この書き込みファイルは直接ターゲットファイルに書き込むのではなく、一時ファイルを1つ書いてからターゲットファイルに移動します.

    三、まとめ


    以上、kafkaのログ処理のいくつかのソースコードについてまとめました.構成項目は次のとおりです.
  • log.retention.check.interval.ms
  • cleanup.policy
  • retention.ms
  • file.delete.delay.ms
  • retention.bytes
  • log.flush.scheduler.interval.ms
  • log.flush.offset.checkpoint.interval.ms
  • flush.ms

  • 他にもいくつかの構成があるかもしれませんが、これは関連していません.もちろん、これらのパラメータがどのように構成されてこそ、パフォーマンスを最適化することができ、テストと探索を継続する必要があります.現在はデフォルトのパラメータに依存して構成するしかありません.これは明らかに十分ではありません.