event-exportソース解析

5719 ワード

背景:kubernetesでeventはクラスタの実行で遭遇した様々な大きなイベントを記録し、並べ替えに役立ちますが、大量のイベントがetcdに格納されると、大きな性能と容量の圧力をもたらすため、etcdではデフォルトで最近1時間しか保存されません.この時間を変更するとクラスタetcdの圧力が大幅に増加するため、このデータを他の場所に格納する必要があります.しばらくの時間を経て感じたk 8 s-stackdriverというプロジェクトは現在の私のニーズに合っていますが、それはpromethuesに保存されていますが、現在、弊社がpromethuesに与えた資源は十分ではありません.また、保存された時間はデフォルトで15日しかありません.そこでESをストレージとして利用することを考えた.しかし、esストレージスキームは現在公式にサポートされておらず、fluentdを通じてログを収集するだけで効果があるので、こちらではソースコードを修正することでストレージを直接esに変更し、fluentdを借りてログを収集する必要はありません.
 
本文は主にevent-exportソースコードを分析してから、修正過程を話して書きます.
 
プログラムエントリmain.go
sink, err := stackdriver.NewSdSinkFactory().CreateNew(strings.Split(*sinkOpts, " "))
if err != nil {
   glog.Fatalf("Failed to initialize sink: %v", err)
}

説明:sinkオブジェクトを初期化し、sink-optsから渡されたパラメータに基づいて対応するパラメータを設定します.
パラメータリスト(NewSdSinkFactory):
flagSet
flushDelay
maxBufferSize
maxConcurrency
resourceModelVersion
endpoint
解析sinkOptsのパラメータ(CreateNew)
client, err := newKubernetesClient()
if err != nil {
   glog.Fatalf("Failed to initialize kubernetes client: %v", err)
}
eventExporter := newEventExporter(client, sink, *resyncPeriod)

新KubernetesClient初期化go-clientインタフェース
クラスタ内部構成によりk 8 s構成情報を作成し、KUBERNETES_を介してSERVICE_HOSTとKUBERNETES_SERVICE_PORT環境変数方式取得
デフォルトtokenfile rootCAFileは/var/run/secrets/kubernetesにあります.io/serviceaccount/
 
新EventExporter関数の役割は、彼の具体的な実装方法を見ることができます.
func newEventExporter(client kubernetes.Interface, sink sinks.Sink, resyncPeriod time.Duration) *eventExporter {
   return &eventExporter{
      sink:    sink,
      watcher: createWatcher(client, sink, resyncPeriod),
   }
}

func createWatcher(client kubernetes.Interface, sink sinks.Sink, resyncPeriod time.Duration) watchers.Watcher {
   return events.NewEventWatcher(client, &events.EventWatcherConfig{
      OnList:       sink.OnList,
      ResyncPeriod: resyncPeriod,
      Handler:      sink,
   })
}

新EventExporterは、sinkとwatcherの2つのフィールドを含むeventExporterという構造体を返します.
sink:すべてのパラメータ構成などを含む
watcher:createwatcher関数によるevents resourceの傍受
分解ぶんかい:createWatcher関数createWatcherかんすう
NewEventWatcherはwatcheを必要とするターゲットオブジェクトをeventとし、どのくらい間隔でwatchを1回配置しますか?
events.EventWatcherConfig構造体には3つのフィールドがあります
OnList OnListFunc eventリスト
ResyncPeriod watcherの間隔はデフォルトで1分間に1回です.resyncPeriodパラメータによる設定
Handlerには3つのフィールドがあります.
OnAdd:主な役割は、watchからのデータをsdSinkのlogEntryChannelフィールドにチャネルを介して格納し、promethuesでデータを挿入することです.
OnUpdate:logEntryChannelフィールドを更新し、promethuesのデータを更新します.
 
go func() {
   http.Handle("/metrics", promhttp.Handler())
   glog.Fatalf("Prometheus monitoring failed: %v", http.ListenAndServe(*prometheusEndpoint, nil))
}()

stopCh := newSystemStopChannel()
eventExporter.Run(stopCh)

まずgorouteingによって匿名関数が実行されます.この匿名関数は主にpromethuesの/metricsルーティングを設定し、localhostのローカルポートを傍受します.デフォルトは80です.パラメータprometheusEndpoint設定を起動します.
func newSystemStopChannel() chan struct{} {
   ch := make(chan struct{})
   go func() {
      c := make(chan os.Signal)
      signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
      sig := 

NewSystemStopChannel関数は、gorouteingにより匿名関数を実行する、SIGINT(2)、SIGTERM(9)を受信とReceived signal sigを印刷する.String()terminatingの信号
 
eventExporter.Run(stopCh)関数:
func (e *eventExporter) Run(stopCh 

run関数:
stopCh:newSystemStopChannelから伝達された信号値チャネルを受信する
utils.RunConcurrentlyUntilメソッド
func RunConcurrentlyUntil(stopCh 

e.sinkを同時実行します.Run, e.watcher.Run
syncを通ります.WaitGroupは同期ブロック待ちの問題を解決します.
wg.Addカウンタを1に設定
e.sinkを実行します.Run, e.watcher.Runの2つの関数
実行が完了したらカウンタ設定から1を減算
カウンタが0でないときはWait()がずっと詰まっています
カウンタが0の場合、または終了信号を受信した場合にのみ終了します.
 
e.sink.Run:
eventExporterのsinkフィールドの後に
interface.goの下のSinkインタフェースの下のrun方法
newEventExporter関数でsinkの値がsinkに割り当てられているため、err:=stackdriver.NewSdSinkFactory().CreateNew(strings.Split(*sinkOpts, ""))
返される値createNew
そしてCreateNew関数return newSdSink(writer,clk,config,resourceModelFactory),nil
newSdSinkを表示すると、この関数はsdSink構造体を返します.
だからe.sink.Runは関数所有者がsdSinkであるRunメソッドに相当する.
func (s *sdSink) Run(stopCh = s.config.MaxBufferSize {
            s.flushBuffer()
         } else if len(s.currentBuffer) == 1 {
            s.setTimer()
         }
         break
      case 

runメソッドは、newSystemStopChannelから伝達する信号を受信した場合にのみ終了するforデッドサイクルであり、s.logEntryChannelから伝達されたログをs.currentBufferに格納し、記憶内の個数がs.configより大きい場合には、MaxBufferSizeの値の場合flushBuffer関数を実行
 
func (s *sdSink) flushBuffer() {
   entries := s.currentBuffer
   s.currentBuffer = nil
   s.concurrencyChannel 

flushBufferはまずs.concurrencyChannelの値をクリアします
その後entriesの値をsendEntriesに渡します
sendEntries関数の下でsuccessfullySentEntryCountを介してpromethuesにログを書き込む
その後s.concurrencyChannelから渡された値を受信し、ここでは主にSIGINT、SIGTERMを受信したときにすべてのリクエストが完了してから停止することを解決します.
 
s.currentBufferが1に等しい場合、チャージタイマはデフォルトで5 sで、5 s内にタイムアウトを表すデータがない場合はflushBuffer関数を直接実行します.
 
e.watcher.Run:
func (w *watcher) Run(stopCh 

w.reflector.Run()は、所属者がreflectorの下にあるRun関数にジャンプします
func (r *Reflector) Run() {
   glog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedType, r.resyncPeriod, r.name)
   go wait.Until(func() {
      if err := r.ListAndWatch(wait.NeverStop); err != nil {
         utilruntime.HandleError(err)
      }
   }, r.period, wait.NeverStop)
}

匿名関数を宣言して、この匿名関数を一定時間おきに実行します.
この関数の主な方法はListAndWatch、すなわちeventのログを収集することです.