filebeatソース分析サービス起動
18993 ワード
ソース分析を開始する前にfilebeatとは何ですか?beatsは有名なELKログ分析キットの一部です.その前身はlogstash-forwarderであり、ログを収集してバックエンドに転送するために使用される(logstash、elasticsearch、redis、kafkaなど).filebeatはbeatsプロジェクトのbeatsであり、ログファイルの新規コンテンツの収集を担当します.現在のコードブランチは最新の6.xのコード.まず、filebeatがk 8 sのログを収集した例です.
filebeatが起動すると、このプロファイルがロードされます.まとめたインタフェースを見てみましょうgo
これは各beatが実現する必要がある2つのインタフェースであり、もちろんfilebeatも例外ではない、filebeat/beater/filebeat.goこの中はfilebeatの具体的な実現で、紙幅が限られているので、省略して貼り付けます
上のコードはstartとstopの関数を省略して紹介して、stopの日は簡単で、1つの閉じる総スイッチで、言いません.このstart法を詳しく分析すると、filebeat全体の最も核心的な場所です.filebeatは特定のプログラムを収集するログ、例えばredis、nginxなどをサポートし、これらはmoduleによってサポートされているので、プログラムの開始時にelasticsearchにこれらの関連するpipeline、ingestがあるかどうかを確認し、
登録方法を詳しく見る
上のコードは主にesとpipelineを確認し、次に起動を見てregistrarを作成します.registrarとは何ですか.
実は登録ログの読み込みの進捗状況で、offsetを記録することで、次は私が切り取ったregistryファイルです.
このjsonファイルにはコンテナと対応するoffsetが保存されており、filebeatが再起動すると作業を継続できます.
次にcrawlerを作成します.これはログ収集を担当します.
configを通過する.Prospectors、crawlerはどの目標を採集するかを知っています.そして通過
収集タスクを開始します.次に、特定の起動タスクの場所、filebeat/crawler/crawlerを見てみましょう.go
ここに来てgoの中
このprospectorer.RunはUDP/stdIN/LOG/REDIS/DOCKERから直接ログを取得することをサポートするインタフェースで、logのfilebeat/prospector/log/prospectorを見てみましょう.go
ここでファイルを収集する必要がある場合は、収集タスクを作成します.
startHarvesterログ収集を開始するか、同じ方法で開始するか、createHarvesterを作成してharvesterを開始します.Start(h). この中はfor死サイクルで実行されます
このようにバッチで読み込みます.起動プログラムの内容はまずここまでです.詳細は後述する.
filebeat.prospectors:
- type: log
paths:
- /var/lib/docker/containers/*/*-json.log
- /var/log/filelog/containers/*/*/*/*.log
processors:
- add_docker_metadata:
host: "unix:///var/run/docker.sock"
- add_fields:
fields:
log: '{message}'
- decode_json_fields:
when:
regexp:
message: "{*}"
fields: ["message"]
overwrite_keys: true
target: ""
- drop_fields:
fields: ["docker.container.labels.annotation.io.kubernetes.container.terminationMessagePath", "docker.container.labels.annotation.io.kubernetes.container.hash", "docker.container.labels.annotation.io.kubernetes.container.terminationMessagePolicy", "docker.container.labels.annotation.io.kubernetes.pod.terminationGracePeriod", "beat.version", "docker.container.labels.annotation.io.kubernetes.container.ports", "docker.container.labels.io.kubernetes.container.terminationMessagePath", "docker.container.labels.io.kubernetes.container.restartCount", "docker.container.labels.io.kubernetes.container.ports", "docker.container.labels.io.kubernetes.container.hash", "docker.container.labels.io.kubernetes.pod.terminationGracePeriod", "docker.container.labels.annotation.io.kubernetes.container.restartCount", "message"]
- parse_level:
levels: ["fatal", "error", "warn", "info", "debug"]
field: "log"
logging.level: info
setup.template.enabled: true
setup.template.name: "filebeat-%{+yyyy.MM.dd}"
setup.template.pattern: "filebeat-*"
setup.template.fields: "${path.config}/fields.yml"
setup.template.overwrite: true
setup.template.settings:
index:
analysis:
analyzer:
enncloud_analyzer:
filter: ["standard", "lowercase", "stop"]
char_filter: ["my_filter"]
type: custom
tokenizer: standard
char_filter:
my_filter:
type: mapping
mappings: ["-=>_"]
output:
elasticsearch:
hosts: ["paasdev.enncloud.cn:9200"]
index: "filebeat-%{+yyyy.MM.dd}"
filebeatが起動すると、このプロファイルがロードされます.まとめたインタフェースを見てみましょうgo
type Beater interface {
// The main event loop. This method should block until signalled to stop by an
// invocation of the Stop() method.
Run(b *Beat) error
// Stop is invoked to signal that the Run method should finish its execution.
// It will be invoked at most once.
Stop()
}
これは各beatが実現する必要がある2つのインタフェースであり、もちろんfilebeatも例外ではない、filebeat/beater/filebeat.goこの中はfilebeatの具体的な実現で、紙幅が限られているので、省略して貼り付けます
// Run allows the beater to be run as a beat.
func (fb *Filebeat) Run(b *beat.Beat) error {
var err error
config := fb.config
if !fb.moduleRegistry.Empty() {
err = fb.loadModulesPipelines(b)
if err != nil {
return err
}
}
// Setup registrar to persist state
registrar, err := registrar.New(config.RegistryFile, config.RegistryFlush, finishedLogger)
if err != nil {
logp.Err("Could not init registrar: %v", err)
return err
}
err = b.Publisher.SetACKHandler(beat.PipelineACKHandler{
ACKEvents: newEventACKer(registrarChannel).ackEvents,
})
if err != nil {
logp.Err("Failed to install the registry with the publisher pipeline: %v", err)
return err
}
crawler, err := crawler.New(
channel.NewOutletFactory(outDone, b.Publisher, wgEvents).Create,
config.Prospectors,
b.Info.Version,
fb.done,
*once)
if err != nil {
logp.Err("Could not init crawler: %v", err)
return err
}
err = registrar.Start()
if err != nil {
return fmt.Errorf("Could not start registrar: %v", err)
}
var pipelineLoaderFactory fileset.PipelineLoaderFactory
if b.Config.Output.Name() == "elasticsearch" {
pipelineLoaderFactory = newPipelineLoaderFactory(b.Config.Output.Config())
} else {
logp.Warn(pipelinesWarning)
}
err = crawler.Start(registrar, config.ConfigProspector, config.ConfigModules, pipelineLoaderFactory)
if err != nil {
crawler.Stop()
return err
}
var adiscover *autodiscover.Autodiscover
if fb.config.Autodiscover != nil {
adapter := NewAutodiscoverAdapter(crawler.ProspectorsFactory, crawler.ModulesFactory)
adiscover, err = autodiscover.NewAutodiscover("filebeat", adapter, config.Autodiscover)
if err != nil {
return err
}
}
adiscover.Start()
return nil
}
// Stop is called on exit to stop the crawling, spooling and registration processes.
func (fb *Filebeat) Stop() {
logp.Info("Stopping filebeat")
// Stop Filebeat
close(fb.done)
}
上のコードはstartとstopの関数を省略して紹介して、stopの日は簡単で、1つの閉じる総スイッチで、言いません.このstart法を詳しく分析すると、filebeat全体の最も核心的な場所です.filebeatは特定のプログラムを収集するログ、例えばredis、nginxなどをサポートし、これらはmoduleによってサポートされているので、プログラムの開始時にelasticsearchにこれらの関連するpipeline、ingestがあるかどうかを確認し、
if !fb.moduleRegistry.Empty() {
err = fb.loadModulesPipelines(b)
if err != nil {
return err
}
}
登録方法を詳しく見る
func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {
if b.Config.Output.Name() != "elasticsearch" {
logp.Warn(pipelinesWarning)
return nil
}
// , es , es pipeline
callback := func(esClient *elasticsearch.Client) error {
return fb.moduleRegistry.LoadPipelines(esClient)
}
elasticsearch.RegisterConnectCallback(callback)
return nil
}
上のコードは主にesとpipelineを確認し、次に起動を見てregistrarを作成します.registrarとは何ですか.
registrar, err := registrar.New(config.RegistryFile, config.RegistryFlush, finishedLogger)
if err != nil {
logp.Err("Could not init registrar: %v", err)
return err
}
実は登録ログの読み込みの進捗状況で、offsetを記録することで、次は私が切り取ったregistryファイルです.
{"source":"/var/lib/docker/containers/e892ad615535e877c8af5856bd27631937d050d00b4ca55554bec41e3391685f/e892ad615535e877c8af5856bd27631937d050d00b4ca55554bec41e3391685f-json.log","offset":0,"timestamp":"2017-11-29T17:01:28.645203497Z","ttl":-1,"type":"log","FileStateOS":{"inode":526963,"device":64769}}
このjsonファイルにはコンテナと対応するoffsetが保存されており、filebeatが再起動すると作業を継続できます.
次にcrawlerを作成します.これはログ収集を担当します.
crawler, err := crawler.New(
channel.NewOutletFactory(outDone, b.Publisher, wgEvents).Create,
config.Prospectors,
b.Info.Version,
fb.done,
*once)
configを通過する.Prospectors、crawlerはどの目標を採集するかを知っています.そして通過
err = crawler.Start(registrar, config.ConfigProspector, config.ConfigModules, pipelineLoaderFactory)
if err != nil {
crawler.Stop()
return err
}
収集タスクを開始します.次に、特定の起動タスクの場所、filebeat/crawler/crawlerを見てみましょう.go
for _, prospectorConfig := range c.prospectorConfigs {
err := c.startProspector(prospectorConfig, r.GetStates())
if err != nil {
return err
}
}
ここに来てgoの中
func (p *Prospector) Run() {
// Initial prospector run
p.prospectorer.Run()
// Shuts down after the first complete run of all prospectors
if p.Once {
return
}
for {
select {
case .done:
logp.Info("Prospector ticker stopped")
return
case .After(p.config.ScanFrequency):
logp.Debug("prospector", "Run prospector")
p.prospectorer.Run()
}
}
}
このprospectorer.RunはUDP/stdIN/LOG/REDIS/DOCKERから直接ログを取得することをサポートするインタフェースで、logのfilebeat/prospector/log/prospectorを見てみましょう.go
func (p *Prospector) Run() {
...
p.scan()
...
}
ここでファイルを収集する必要がある場合は、収集タスクを作成します.
if lastState.IsEmpty() {
logp.Debug("prospector", "Start harvester for new file: %s", newState.Source)
err := p.startHarvester(newState, 0)
if err != nil {
logp.Err("Harvester could not be started on new file: %s, Err: %s", newState.Source, err)
}
} else {
p.harvestExistingFile(newState, lastState)
}
startHarvesterログ収集を開始するか、同じ方法で開始するか、createHarvesterを作成してharvesterを開始します.Start(h). この中はfor死サイクルで実行されます
message, err := h.reader.Next()
このようにバッチで読み込みます.起動プログラムの内容はまずここまでです.詳細は後述する.