Jaegerソース解析--All in Oneモード

78530 ワード

文書ディレクトリ

  • 概要
  • コード解析
  • 入口
  • 準備
  • storageFactory
  • strategyStoreFactory
  • 初期化構成
  • 起動
  • Agent
  • を起動
  • Collector
  • を起動
  • Query
  • を起動
  • 参照文書
  • 概要


    JaegerのAll-in-oneモードは、Jaeger UI、collector、query、agent、これらのコンポーネントを含むローカルサービスを迅速に起動するために主に使用されます.このモードでのストレージデータはメモリに格納されます.
    All-in-oneモードのjaegerを起動する最も簡単な方法はDockerミラーを使用して起動することです.
    $ docker run -d --name jaeger \
      -e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \
      -p 5775:5775/udp \
      -p 6831:6831/udp \
      -p 6832:6832/udp \
      -p 5778:5778 \
      -p 16686:16686 \
      -p 14268:14268 \
      -p 9411:9411 \
      jaegertracing/all-in-one:1.8
    

    次の表は、Jaeger All-in-one露出ポートのリストです.
    ポート
    プロトコル
    コンポーネント
    機能
    5775
    UDP
    agent zipkin.thrift compact thriftプロトコルを受け入れる(古いクライアントのみ使用)
    6831
    UDP
    agent jaeger.thrift compact thriftプロトコルを受け入れる
    6832
    UDP
    agent jaeger.thrift binary thriftプロトコルを受け入れる
    5778
    HTTP
    agent
    サービス構成
    16686
    HTTP
    query
    サービスフロントエンド
    14268
    HTTP
    collector jaeger.thriftプロトコルをクライアントから直接受信
    9411
    HTTP
    collector
    Zipkin対応サービス(オプション)

    コード解析


    このブログで使用されているコードはv 190バージョンです.

    いりぐち


    All-in-oneの入り口はcmd/all-in-one/main.goにあります.実際にはすべてのコンポーネントの入り口がcmdというパッケージの下にあります.
    All-in-oneはagent queryとcollectorの3つのコンポーネントを起動する必要があり、いずれもエントリの起動関数で実現されます.具体的なコードを見てみましょう.

    準備作業

    var signalsChannel = make(chan os.Signal)
    signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
    

    まず、システム割り込み(os.Interrupt)およびシステムのkill命令(syscall.SIGTERM)を受信するための信号を受信するためのchannelを作成する.
    if os.Getenv(storage.SpanStorageTypeEnvVar) == "" {
    	os.Setenv(storage.SpanStorageTypeEnvVar, "memory") // other storage types default to SpanStorage
    }
    

    環境変数を設定します.
    storageFactory, err := storage.NewFactory(storage.FactoryConfigFromEnvAndCLI(os.Args, os.Stderr))
    
    strategyStoreFactory, err := ss.NewFactory(ss.FactoryConfigFromEnv())
    

    上の行はストレージファクトリを初期化し、次はサンプリングポリシーのファクトリです.次に、この2つの工場がどのように初期化されたのか、そして彼らの役割が何なのかを見てみましょう.
    storageFactory storage.NewFactoryの入参はstorage.FactoryConfigFromEnvAndCLIで、まずこの方法を見てみましょう.
    func FactoryConfigFromEnvAndCLI(args []string, log io.Writer) FactoryConfig {
    	//  span 
    	spanStorageType := os.Getenv(SpanStorageTypeEnvVar)
    	if spanStorageType == "" {
    		//  , 
    		spanStorageType = spanStorageTypeFromArgs(args, log)
    	}
    	if spanStorageType == "" {
    		//  cassandraStorageType
    		spanStorageType = cassandraStorageType
    	}
    	//  
    	spanWriterTypes := strings.Split(spanStorageType, ",")
    	//  , 
    	if len(spanWriterTypes) > 1 {
    		fmt.Fprintf(log,
    			"WARNING: multiple span storage types have been specified. "+
    				"Only the first type (%s) will be used for reading and archiving.

    "
    , spanWriterTypes[0], ) } // dependency depStorageType := os.Getenv(DependencyStorageTypeEnvVar) if depStorageType == "" { depStorageType = spanWriterTypes[0] } return FactoryConfig{ SpanWriterTypes: spanWriterTypes, SpanReaderType: spanWriterTypes[0], DependenciesStorageType: depStorageType, } }

    上記のコードから分かるように、この方法の役割は、環境変数またはコマンドラインからストレージ構成を読み出すことであり、現在は次の2つのタイプのストレージがある.
    //  spans 
    SpanStorageTypeEnvVar = "SPAN_STORAGE_TYPE"
    
    //  dependencies 
    DependencyStorageTypeEnvVar = "DEPENDENCY_STORAGE_TYPE"
    

    spanにはリンクトラッキングのデータが格納され,dependencyには依存するデータが格納される.Dependenciesは、Jaeger UIの上にあるメニューのDependenciesの欄です.
    我々のシナリオでは、FactoryConfigFromEnvAndCLIメソッドの戻り値は、memoryタイプのストレージである.
    次にNewFactoryの方法を見てみましょう.
    func NewFactory(config FactoryConfig) (*Factory, error) {
    	f := &Factory{FactoryConfig: config}
    	uniqueTypes := map[string]struct{}{
    		f.SpanReaderType:          {},
    		f.DependenciesStorageType: {},
    	}
    	//  
    	for _, storageType := range f.SpanWriterTypes {
    		uniqueTypes[storageType] = struct{}{}
    	}
    	f.factories = make(map[string]storage.Factory)
    	//  
    	for t := range uniqueTypes {
    		ff, err := f.getFactoryOfType(t)
    		if err != nil {
    			return nil, err
    		}
    		f.factories[t] = ff
    	}
    	return f, nil
    }
    

    上記のコードから分かるように、NewFactoryメソッドは、どのストレージタイプが必要かを判断し、それぞれインスタンス化するプロセスである.
    strategyStoreFactory
    ここでのNewFactoryの方法には同様にFactoryConfigFromEnvを参照する方法があり、まずこの方法を見てみましょう.
    func FactoryConfigFromEnv() FactoryConfig {
    	strategyStoreType := os.Getenv(SamplingTypeEnvVar)
    	if strategyStoreType == "" {
    		strategyStoreType = staticStrategyStoreType
    	}
    	return FactoryConfig{
    		StrategyStoreType: strategyStoreType,
    	}
    }
    

    上記のコードから分かるように、FactoryConfigFromEnvメソッドは、環境変数からサンプリングタイプを取得する役割を果たす.サンプリングタイプは2種類あります.
    staticStrategyStoreType   = "static"
    adaptiveStrategyStoreType = "adaptive"
    

    デフォルトのサンプリングタイプはstaticです.NewFactoryメソッドも上記のメソッドと同様に、構成されたサンプリングのタイプを取得してインスタンス化し、ここでのサンプリングタイプはstaticのみを実現し、adaptiveタイプのファクトリを実現していないことに注目すべきである.
    func NewFactory(config FactoryConfig) (*Factory, error) {
    	f := &Factory{FactoryConfig: config}
    	uniqueTypes := map[string]struct{}{
    		f.StrategyStoreType: {},
    	}
    	f.factories = make(map[string]strategystore.Factory)
    	for t := range uniqueTypes {
    		ff, err := f.getFactoryOfType(t)
    		if err != nil {
    			return nil, err
    		}
    		f.factories[t] = ff
    	}
    	return f, nil
    }
    

    特筆すべきは、ここのfは本当の工場ではなく、f.factoriesの中に存在していることだ.
    構成の初期化
    v := viper.New()
    

    ここでviperインスタンスを新規作成し、ストレージ構成として機能します.viperはGoアプリケーションの完全な構成ソリューションです.
    次に、command構成に関する次のコードをスキップし、後ろの部分を直接見ます.
    flags.SetDefaultHealthCheckPort(collector.CollectorDefaultHealthCheckHTTPPort)
    
    config.AddFlags(
    	v,
    	command,
    	flags.AddConfigFileFlag,
    	flags.AddFlags,
    	storageFactory.AddFlags,
    	agentApp.AddFlags,
    	agentRep.AddFlags,
    	agentTchanRep.AddFlags,
    	agentGrpcRep.AddFlags,
    	collector.AddFlags,
    	queryApp.AddFlags,
    	pMetrics.AddFlags,
    	strategyStoreFactory.AddFlags,
    )
    

    ここではデフォルトの健康診断のポートが設定され、AddFlagsメソッドでは、以前に新規に作成したviperにデフォルトのパラメータの一部が書き込まれます.
    構成パラメータを書き込むと、準備作業も完了します.次は起動段階です.

    開始


    起動方式はcobraで構成されたコマンドラインを用い,RunEメソッドを直接見る.
    sFlags := new(flags.SharedFlags).InitFromViper(v)
    logger, err := sFlags.NewLogger(zap.NewProductionConfig())
    

    ログ構成を初期化し、デフォルトのログレベルはinfoです.
    hc, err := sFlags.NewHealthCheck(logger)
    

    健康診断インタフェースを監視します.
    mBldr := new(pMetrics.Builder).InitFromViper(v)
    //  Prometheus 
    rootMetricsFactory, err := mBldr.CreateMetricsFactory("")
    //  , Prometheus 
    metricsFactory := rootMetricsFactory.Namespace(metrics.NSOptions{Name: "jaeger", Tags: nil})
    

    metricsは関連する構成を収集し、デフォルトのmetricsサービスはPrometheusです.
    storageFactory.InitFromViper(v)
    if err := storageFactory.Initialize(metricsFactory, logger); err != nil {
    	logger.Fatal("Failed to init storage factory", zap.Error(err))
    }
    

    初期化ストレージファクトリ構成は、spanのmemoryReader、dependencyのWriterインタフェースを実現する、初期化された構造体を返す.
    spanReader, err := storageFactory.CreateSpanReader()
    
    spanWriter, err := storageFactory.CreateSpanWriter()
    
    dependencyReader, err := storageFactory.CreateDependencyReader()
    

    ここでは、上述したspanのReaderReader、dependencyのWriterインターフェースの3つのインターフェースをインスタンス化する.この3つのインタフェースがどのようにインスタンス化されているかを具体的に見てみましょう.(memoryベースの実装のみ)
    func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
    	factory, ok := f.factories[f.SpanReaderType]
    	if !ok {
    		return nil, fmt.Errorf("No %s backend registered for span store", f.SpanReaderType)
    	}
    	return factory.CreateSpanReader()
    }
    
    func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
    	return f.store, nil
    }
    

    上記のコードから分かるように、Readerメソッドは、reader関連インタフェースが実装されているため、CreateSpanReader構造を直接返します.
    func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
    	var writers []spanstore.Writer
    	for _, storageType := range f.SpanWriterTypes {
    		factory, ok := f.factories[storageType]
    		if !ok {
    			return nil, fmt.Errorf("No %s backend registered for span store", storageType)
    		}
    		writer, err := factory.CreateSpanWriter()
    		if err != nil {
    			return nil, err
    		}
    		writers = append(writers, writer)
    	}
    	if len(f.SpanWriterTypes) == 1 {
    		return writers[0], nil
    	}
    	return spanstore.NewCompositeWriter(writers...), nil
    }
    
    func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
    	return f.store, nil
    }
    

    SpanWriterのロジックはreaderのロジックとほぼ同じで、違いはwriterが複数ある可能性があるので、スライスに入れる必要があります.
    func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
    	factory, ok := f.factories[f.DependenciesStorageType]
    	if !ok {
    		return nil, fmt.Errorf("No %s backend registered for span store", f.DependenciesStorageType)
    	}
    	return factory.CreateDependencyReader()
    }
    
    func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
    	return f.store, nil
    }
    

    DependencyReaderとSpanReaderは論理的に一致している.
    サンプリングポリシーファクトリのインスタンス化を次に示します.
    strategyStoreFactory.InitFromViper(v)
    strategyStore := initSamplingStrategyStore(strategyStoreFactory, metricsFactory, logger)
    

    まずstoreの方法を見てみましょう.
    func (f *Factory) InitFromViper(v *viper.Viper) {
    	for _, factory := range f.factories {
    		if conf, ok := factory.(plugin.Configurable); ok {
    			conf.InitFromViper(v)
    		}
    	}
    }
    
    // static/factory.go
    func (f *Factory) InitFromViper(v *viper.Viper) {
    	f.options.InitFromViper(v)
    }
    
    func (opts *Options) InitFromViper(v *viper.Viper) *Options {
    	opts.StrategiesFile = v.GetString(samplingStrategiesFile)
    	return opts
    }
    

    まず,以前に格納されたすべての工場を巡り,InitFromViperメソッドの実装に進む.ここではInitFromViperという実装方法に入ります.そしてstatic/factory.goに対応する値を取り出し、samplingStrategiesFile変数に入れる.ここでこの値は空の文字列です.
    次はoptsメソッドです.
    func initSamplingStrategyStore(
    	samplingStrategyStoreFactory *ss.Factory,
    	metricsFactory metrics.Factory,
    	logger *zap.Logger,
    ) strategystore.StrategyStore {
    	if err := samplingStrategyStoreFactory.Initialize(metricsFactory, logger); err != nil {
    		logger.Fatal("Failed to init sampling strategy store factory", zap.Error(err))
    	}
    	strategyStore, err := samplingStrategyStoreFactory.CreateStrategyStore()
    	if err != nil {
    		logger.Fatal("Failed to create sampling strategy store", zap.Error(err))
    	}
    	return strategyStore
    }
    

    上のコードは2つのことをして、initSamplingStrategyStore方法を呼び出して、この方法はここでloggerをfactor内の変数に割り当てるだけで、議論に深く入りません.もう1つの方法Initializeは、静的ポリシーを用いてこれらのポリシーを格納するサンプリングポリシーストレージを新規作成することである.以下、いくつかのステップジャンプを省略して、CreateStrategyStoreコードに直接入ります.
    func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, error) {
    	h := &strategyStore{
    		logger:            logger,
    		serviceStrategies: make(map[string]*sampling.SamplingStrategyResponse),
    	}
    	strategies, err := loadStrategies(options.StrategiesFile)
    	if err != nil {
    		return nil, err
    	}
    	h.parseStrategies(strategies)
    	return h, nil
    }
    

    まず、'strategy_store.goのデータ構造が新設され、strategyStoreはmap構造の変数であり、特定のサービスのサンプリングポリシーが格納されている.mapのkeyはサービスの名前です.serviceStrategiesメソッドでは、viperで構成されたポリシーパスに基づいてファイルが読み出されます.ここではパスが空なので、このメソッドは直接戻り、返される数値は空です.
    次のloadStrategiesメソッドでは、パラメータが空であるため、サンプリングポリシーはデフォルトパラメータに割り当てられ、返されます.
    defaultStrategy = sampling.SamplingStrategyResponse{
    	StrategyType: sampling.SamplingStrategyType_PROBABILISTIC,
    	ProbabilisticSampling: &sampling.ProbabilisticSamplingStrategy{
    		SamplingRate: defaultSamplingProbability,
    	},
    }
    

    デフォルトのサンプリングポリシーは確率サンプリングであり、使用するサンプリング確率は0.001です.
    次にviperからパラメータを取得し、以下の初期化の準備をします.
    // agent 
    aOpts := new(agentApp.Builder).InitFromViper(v)
    // agent query , grpc tchannel
    repOpts := new(agentRep.Options).InitFromViper(v)
    //  tchannel 
    tchannelRepOpts := agentTchanRep.NewBuilder().InitFromViper(v, logger)
    //  grpc 
    grpcRepOpts := new(agentGrpcRep.Options).InitFromViper(v)
    // collector 
    cOpts := new(collector.CollectorOptions).InitFromViper(v)
    // query 
    qOpts := new(queryApp.QueryOptions).InitFromViper(v)
    

    次に、この3つのサービスをそれぞれ起動します.
    startAgent(aOpts, repOpts, tchannelRepOpts, grpcRepOpts, cOpts, logger, metricsFactory)
    grpcServer := startCollector(cOpts, spanWriter, logger, metricsFactory, strategyStore, hc)
    startQuery(qOpts, spanReader, dependencyReader, logger, rootMetricsFactory, metricsFactory, mBldr, hc, archiveOptions(storageFactory, logger))
    

    これらの起動の過程をそれぞれ見てみましょう.
    エージェントの起動
    func startAgent(
    	b *agentApp.Builder,
    	repOpts *agentRep.Options,
    	tchanRep *agentTchanRep.Builder,
    	grpcRepOpts *agentGrpcRep.Options,
    	cOpts *collector.CollectorOptions,
    	logger *zap.Logger,
    	baseFactory metrics.Factory,
    ) {
    	//  metrics 
    	metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "agent", Tags: nil})
    
    	cp, err := createCollectorProxy(cOpts, repOpts, tchanRep, grpcRepOpts, logger, metricsFactory)
    	if err != nil {
    		logger.Fatal("Could not create collector proxy", zap.Error(err))
    	}
    
    	agent, err := b.CreateAgent(cp, logger, baseFactory)
    	if err != nil {
    		logger.Fatal("Unable to initialize Jaeger Agent", zap.Error(err))
    	}
    
    	logger.Info("Starting agent")
    	if err := agent.Run(); err != nil {
    		logger.Fatal("Failed to run the agent", zap.Error(err))
    	}
    }
    
    func createCollectorProxy(
    	cOpts *collector.CollectorOptions,
    	repOpts *agentRep.Options,
    	tchanRepOpts *agentTchanRep.Builder,
    	grpcRepOpts *agentGrpcRep.Options,
    	logger *zap.Logger,
    	mFactory metrics.Factory,
    ) (agentApp.CollectorProxy, error) {
    	switch repOpts.ReporterType {
    	case agentRep.GRPC:
    		grpcRepOpts.CollectorHostPort = append(grpcRepOpts.CollectorHostPort, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorGRPCPort))
    		return agentGrpcRep.NewCollectorProxy(grpcRepOpts, mFactory, logger)
    	case agentRep.TCHANNEL:
    		tchanRepOpts.CollectorHostPorts = append(tchanRepOpts.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorPort))
    		return agentTchanRep.NewCollectorProxy(tchanRepOpts, mFactory, logger)
    	default:
    		return nil, errors.New(fmt.Sprintf("unknown reporter type %s", string(repOpts.ReporterType)))
    	}
    }
    

    Agentサービスを開始する前に、まずparseStrategiesメソッドを呼び出してCollectorのエージェントを作成し、Collectorにデータを報告します.この方法では,通信プロトコルのタイプに応じて異なるエージェントを作成する.
    次に、作成したCollectorエージェントを含むエージェントインスタンスを作成するためにcreateCollectorProxyメソッドを呼び出します.Agentインスタンスの作成はAgentモジュールの範囲に属するため、この方法は後で詳細に展開されます.
    上記の手順にエラーがない場合は、Agentを起動します.
    Collectorの起動
    Collectorを起動するコードが長いので、次は段階的に分解します.
    	spanBuilder, err := collector.NewSpanHandlerBuilder(
    		cOpts,
    		spanWriter,
    		basic.Options.LoggerOption(logger),
    		basic.Options.MetricsFactoryOption(metricsFactory),
    	)
    	zipkinSpansHandler, jaegerBatchesHandler, grpcHandler := spanBuilder.BuildHandlers()
    

    新しいspanBuilderを作成し、handlerを3つ作成しました.ここで、CreateAgentおよびzipkinSpansHandlerはTChanCollectorインターフェースを実現し、Tchan RPCの呼び出しを処理することができる.
    	{
    		ch, err := tchannel.NewChannel("jaeger-collector", &tchannel.ChannelOptions{})
    		server := thrift.NewServer(ch)
    		server.Register(jc.NewTChanCollectorServer(jaegerBatchesHandler))
    		server.Register(zc.NewTChanZipkinCollectorServer(zipkinSpansHandler))
    		//  handler 
    		server.Register(sc.NewTChanSamplingManagerServer(sampling.NewHandler(strategyStore)))
    		portStr := ":" + strconv.Itoa(cOpts.CollectorPort)
    		listener, err := net.Listen("tcp", portStr)
    		logger.Info("Starting jaeger-collector TChannel server", zap.Int("port", cOpts.CollectorPort))
    		//  tchan 
    		ch.Serve(listener)
    	}
    

    上記のコードは、jaegerBatchesHandlerおよびzipkinSpansHandlerを処理できるtchannelを起動することです.
    次にGRPCサービスを開始する.
    func startGRPCServer(
    	port int,
    	handler *collectorApp.GRPCHandler,
    	samplingStore strategystore.StrategyStore,
    	logger *zap.Logger,
    ) (*grpc.Server, error) {
    	server := grpc.NewServer()
    	//  grpc handler
    	_, err := grpcserver.StartGRPCCollector(port, server, handler, samplingStore, logger, func(err error) {
    		logger.Fatal("gRPC collector failed", zap.Error(err))
    	})
    	if err != nil {
    		return nil, err
    	}
    	return server, err
    }
    

    後のコードはZipkin Http Apiを処理するために使用され、ZipkinサービスはCollecortに直接データを報告することができ、ここでは展開されていません.
    ここで、Agentコンポーネントの起動が完了し、Tchannel、GRPC、Zipkinを専門に処理するHttpサービスの3つのサービスが開始されました.
    Queryの起動
    	tracer, closer, err := jaegerClientConfig.Configuration{
    		Sampler: &jaegerClientConfig.SamplerConfig{
    			Type:  "const",
    			Param: 1.0,
    		},
    		RPCMetrics: true,
    	}.New(
    		"jaeger-query",
    		jaegerClientConfig.Metrics(rootFactory),
    		jaegerClientConfig.Logger(jaegerClientZapLog.NewLogger(logger)),
    	)
    	opentracing.SetGlobalTracer(tracer)
    

    上記のコードは、まずtracerを作成し、自身の情報を収集し、Agentコンポーネントに報告します.
    spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, baseFactory.Namespace(metrics.NSOptions{Name: "query", Tags: nil}))
    

    次に、jaegerBatchesHandlerにmetrics関連の方法をいくつか追加し、実行中の指標情報を収集する.ここではアクセサリーのデザインモードを使いました.
    handlerOpts = append(handlerOpts, queryApp.HandlerOptions.Logger(logger), queryApp.HandlerOptions.Tracer(tracer))
    apiHandler := queryApp.NewAPIHandler(
    	spanReader,
    	depReader,
    	handlerOpts...)
    
    r := mux.NewRouter()
    if qOpts.BasePath != "/" {
    	r = r.PathPrefix(qOpts.BasePath).Subrouter()
    }
    //   url
    apiHandler.RegisterRoutes(r)
    queryApp.RegisterStaticHandler(r, logger, qOpts)
    //  metrics handler
    if h := metricsBuilder.Handler(); h != nil {
    		logger.Info("Registering metrics handler with jaeger-query HTTP server", zap.String("route", metricsBuilder.HTTPRoute))
    		r.Handle(metricsBuilder.HTTPRoute, h)
    	}
    

    上記のコードでは、spanReaderはすべての内蔵APIの集合に相当し、転送トラフィックの機能も受けている.
    go func() {
    		defer closer.Close()
    		if err := http.ListenAndServe(portStr, recoveryHandler(r)); err != nil {
    			logger.Fatal("Could not launch jaeger-query service", zap.Error(err))
    		}
    		hc.Set(healthcheck.Unavailable)
    	}()
    

    最後にコラボレーションを開始し、httpサービスを実行します.
    3つのコンポーネントが起動するとjaeger all-in-oneが正常に起動します.agentコンポーネントにデータが報告されている場合は、Webページを開くと追跡が表示されます.

    リファレンスドキュメント


    Golangのsignal SIGKILLとSIGTERM、SIGINT