filebeatソース分析サービス起動


ソース分析を開始する前にfilebeatとは何ですか?beatsは有名なELKログ分析キットの一部です.その前身はlogstash-forwarderであり、ログを収集してバックエンドに転送するために使用される(logstash、elasticsearch、redis、kafkaなど).filebeatはbeatsプロジェクトのbeatsであり、ログファイルの新規コンテンツの収集を担当します.現在のコードブランチは最新の6.xのコード.まず、filebeatがk 8 sのログを収集した例です.
filebeat.prospectors:
- type: log
  paths:
     - /var/lib/docker/containers/*/*-json.log
     - /var/log/filelog/containers/*/*/*/*.log

processors:
- add_docker_metadata:
    host: "unix:///var/run/docker.sock"
- add_fields:
    fields:
      log: '{message}'
- decode_json_fields:
    when:
       regexp:
         message: "{*}"
    fields: ["message"]
    overwrite_keys: true
    target: ""
- drop_fields:
    fields: ["docker.container.labels.annotation.io.kubernetes.container.terminationMessagePath", "docker.container.labels.annotation.io.kubernetes.container.hash", "docker.container.labels.annotation.io.kubernetes.container.terminationMessagePolicy", "docker.container.labels.annotation.io.kubernetes.pod.terminationGracePeriod", "beat.version", "docker.container.labels.annotation.io.kubernetes.container.ports", "docker.container.labels.io.kubernetes.container.terminationMessagePath", "docker.container.labels.io.kubernetes.container.restartCount", "docker.container.labels.io.kubernetes.container.ports", "docker.container.labels.io.kubernetes.container.hash", "docker.container.labels.io.kubernetes.pod.terminationGracePeriod", "docker.container.labels.annotation.io.kubernetes.container.restartCount", "message"]
- parse_level:
    levels: ["fatal", "error", "warn", "info", "debug"]
    field: "log"

logging.level: info
setup.template.enabled: true
setup.template.name: "filebeat-%{+yyyy.MM.dd}"
setup.template.pattern: "filebeat-*"
setup.template.fields: "${path.config}/fields.yml"
setup.template.overwrite: true
setup.template.settings:
  index:
     analysis:
       analyzer:
         enncloud_analyzer:
           filter: ["standard", "lowercase", "stop"]
           char_filter: ["my_filter"]
           type: custom
           tokenizer: standard
       char_filter:
         my_filter:
           type: mapping
           mappings: ["-=>_"]

output:
  elasticsearch:
    hosts: ["paasdev.enncloud.cn:9200"]
    index: "filebeat-%{+yyyy.MM.dd}"

filebeatが起動すると、このプロファイルがロードされます.まとめたインタフェースを見てみましょうgo
type Beater interface {
    // The main event loop. This method should block until signalled to stop by an
    // invocation of the Stop() method.
    Run(b *Beat) error

    // Stop is invoked to signal that the Run method should finish its execution.
    // It will be invoked at most once.
    Stop()
}

これは各beatが実現する必要がある2つのインタフェースであり、もちろんfilebeatも例外ではない、filebeat/beater/filebeat.goこの中はfilebeatの具体的な実現で、紙幅が限られているので、省略して貼り付けます
// Run allows the beater to be run as a beat.
func (fb *Filebeat) Run(b *beat.Beat) error {
    var err error
    config := fb.config

    if !fb.moduleRegistry.Empty() {
        err = fb.loadModulesPipelines(b)
        if err != nil {
            return err
        }
    }
    // Setup registrar to persist state
    registrar, err := registrar.New(config.RegistryFile, config.RegistryFlush, finishedLogger)
    if err != nil {
        logp.Err("Could not init registrar: %v", err)
        return err
    }

    err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{
        ACKEvents: newEventACKer(registrarChannel).ackEvents,
    })
    if err != nil {
        logp.Err("Failed to install the registry with the publisher pipeline: %v", err)
        return err
    }


    crawler, err := crawler.New(
        channel.NewOutletFactory(outDone, b.Publisher, wgEvents).Create,
        config.Prospectors,
        b.Info.Version,
        fb.done,
        *once)
    if err != nil {
        logp.Err("Could not init crawler: %v", err)
        return err
    }


    err = registrar.Start()
    if err != nil {
        return fmt.Errorf("Could not start registrar: %v", err)
    }


    var pipelineLoaderFactory fileset.PipelineLoaderFactory
    if b.Config.Output.Name() == "elasticsearch" {
        pipelineLoaderFactory = newPipelineLoaderFactory(b.Config.Output.Config())
    } else {
        logp.Warn(pipelinesWarning)
    }

    err = crawler.Start(registrar, config.ConfigProspector, config.ConfigModules, pipelineLoaderFactory)
    if err != nil {
        crawler.Stop()
        return err
    }

    var adiscover *autodiscover.Autodiscover
    if fb.config.Autodiscover != nil {
        adapter := NewAutodiscoverAdapter(crawler.ProspectorsFactory, crawler.ModulesFactory)
        adiscover, err = autodiscover.NewAutodiscover("filebeat", adapter, config.Autodiscover)
        if err != nil {
            return err
        }
    }
    adiscover.Start()

    return nil
}

// Stop is called on exit to stop the crawling, spooling and registration processes.
func (fb *Filebeat) Stop() {
    logp.Info("Stopping filebeat")

    // Stop Filebeat
    close(fb.done)
}

上のコードはstartとstopの関数を省略して紹介して、stopの日は簡単で、1つの閉じる総スイッチで、言いません.このstart法を詳しく分析すると、filebeat全体の最も核心的な場所です.filebeatは特定のプログラムを収集するログ、例えばredis、nginxなどをサポートし、これらはmoduleによってサポートされているので、プログラムの開始時にelasticsearchにこれらの関連するpipeline、ingestがあるかどうかを確認し、
    if !fb.moduleRegistry.Empty() {
        err = fb.loadModulesPipelines(b)
        if err != nil {
            return err
        }
    }

登録方法を詳しく見る
func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {
    if b.Config.Output.Name() != "elasticsearch" {
        logp.Warn(pipelinesWarning)
        return nil
    }

    //            ,     es       ,     es    pipeline
    callback := func(esClient *elasticsearch.Client) error {
        return fb.moduleRegistry.LoadPipelines(esClient)
    }
    elasticsearch.RegisterConnectCallback(callback)

    return nil
}

上のコードは主にesとpipelineを確認し、次に起動を見てregistrarを作成します.registrarとは何ですか.
registrar, err := registrar.New(config.RegistryFile, config.RegistryFlush, finishedLogger)
    if err != nil {
        logp.Err("Could not init registrar: %v", err)
        return err
    }

実は登録ログの読み込みの進捗状況で、offsetを記録することで、次は私が切り取ったregistryファイルです.
{"source":"/var/lib/docker/containers/e892ad615535e877c8af5856bd27631937d050d00b4ca55554bec41e3391685f/e892ad615535e877c8af5856bd27631937d050d00b4ca55554bec41e3391685f-json.log","offset":0,"timestamp":"2017-11-29T17:01:28.645203497Z","ttl":-1,"type":"log","FileStateOS":{"inode":526963,"device":64769}}

このjsonファイルにはコンテナと対応するoffsetが保存されており、filebeatが再起動すると作業を継続できます.
次にcrawlerを作成します.これはログ収集を担当します.
crawler, err := crawler.New(
        channel.NewOutletFactory(outDone, b.Publisher, wgEvents).Create,
        config.Prospectors,
        b.Info.Version,
        fb.done,
        *once)

configを通過する.Prospectors、crawlerはどの目標を採集するかを知っています.そして通過
err = crawler.Start(registrar, config.ConfigProspector, config.ConfigModules, pipelineLoaderFactory)
    if err != nil {
        crawler.Stop()
        return err
    }

収集タスクを開始します.次に、特定の起動タスクの場所、filebeat/crawler/crawlerを見てみましょう.go
for _, prospectorConfig := range c.prospectorConfigs {
        err := c.startProspector(prospectorConfig, r.GetStates())
        if err != nil {
            return err
        }
    }

ここに来てgoの中
func (p *Prospector) Run() {
    // Initial prospector run
    p.prospectorer.Run()

    // Shuts down after the first complete run of all prospectors
    if p.Once {
        return
    }

    for {
        select {
        case .done:
            logp.Info("Prospector ticker stopped")
            return
        case .After(p.config.ScanFrequency):
            logp.Debug("prospector", "Run prospector")
            p.prospectorer.Run()
        }
    }
}

このprospectorer.RunはUDP/stdIN/LOG/REDIS/DOCKERから直接ログを取得することをサポートするインタフェースで、logのfilebeat/prospector/log/prospectorを見てみましょう.go
func (p *Prospector) Run() {
...
p.scan()
...
}

ここでファイルを収集する必要がある場合は、収集タスクを作成します.
if lastState.IsEmpty() {
    logp.Debug("prospector", "Start harvester for new file: %s", newState.Source)
    err := p.startHarvester(newState, 0)
    if err != nil {
        logp.Err("Harvester could not be started on new file: %s, Err: %s", newState.Source, err)
        }
    } else {
        p.harvestExistingFile(newState, lastState)
}

startHarvesterログ収集を開始するか、同じ方法で開始するか、createHarvesterを作成してharvesterを開始します.Start(h). この中はfor死サイクルで実行されます
message, err := h.reader.Next()

このようにバッチで読み込みます.起動プログラムの内容はまずここまでです.詳細は後述する.