Jaegerソース解析--All in Oneモード
78530 ワード
文書ディレクトリ
概要
JaegerのAll-in-oneモードは、Jaeger UI、collector、query、agent、これらのコンポーネントを含むローカルサービスを迅速に起動するために主に使用されます.このモードでのストレージデータはメモリに格納されます.
All-in-oneモードのjaegerを起動する最も簡単な方法はDockerミラーを使用して起動することです.
$ docker run -d --name jaeger \
-e COLLECTOR_ZIPKIN_HTTP_PORT=9411 \
-p 5775:5775/udp \
-p 6831:6831/udp \
-p 6832:6832/udp \
-p 5778:5778 \
-p 16686:16686 \
-p 14268:14268 \
-p 9411:9411 \
jaegertracing/all-in-one:1.8
次の表は、Jaeger All-in-one露出ポートのリストです.
ポート
プロトコル
コンポーネント
機能
5775
UDP
agent
zipkin.thrift
compact thriftプロトコルを受け入れる(古いクライアントのみ使用)6831
UDP
agent
jaeger.thrift
compact thriftプロトコルを受け入れる6832
UDP
agent
jaeger.thrift
binary thriftプロトコルを受け入れる5778
HTTP
agent
サービス構成
16686
HTTP
query
サービスフロントエンド
14268
HTTP
collector
jaeger.thrift
プロトコルをクライアントから直接受信9411
HTTP
collector
Zipkin対応サービス(オプション)
コード解析
このブログで使用されているコードはv 190バージョンです.
いりぐち
All-in-oneの入り口は
cmd/all-in-one/main.go
にあります.実際にはすべてのコンポーネントの入り口がcmdというパッケージの下にあります.All-in-oneはagent queryとcollectorの3つのコンポーネントを起動する必要があり、いずれもエントリの起動関数で実現されます.具体的なコードを見てみましょう.
準備作業
var signalsChannel = make(chan os.Signal)
signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM)
まず、システム割り込み(os.Interrupt)およびシステムのkill命令(syscall.SIGTERM)を受信するための信号を受信するためのchannelを作成する.
if os.Getenv(storage.SpanStorageTypeEnvVar) == "" {
os.Setenv(storage.SpanStorageTypeEnvVar, "memory") // other storage types default to SpanStorage
}
環境変数を設定します.
storageFactory, err := storage.NewFactory(storage.FactoryConfigFromEnvAndCLI(os.Args, os.Stderr))
strategyStoreFactory, err := ss.NewFactory(ss.FactoryConfigFromEnv())
上の行はストレージファクトリを初期化し、次はサンプリングポリシーのファクトリです.次に、この2つの工場がどのように初期化されたのか、そして彼らの役割が何なのかを見てみましょう.
storageFactory
storage.NewFactory
の入参はstorage.FactoryConfigFromEnvAndCLI
で、まずこの方法を見てみましょう.func FactoryConfigFromEnvAndCLI(args []string, log io.Writer) FactoryConfig {
// span
spanStorageType := os.Getenv(SpanStorageTypeEnvVar)
if spanStorageType == "" {
// ,
spanStorageType = spanStorageTypeFromArgs(args, log)
}
if spanStorageType == "" {
// cassandraStorageType
spanStorageType = cassandraStorageType
}
//
spanWriterTypes := strings.Split(spanStorageType, ",")
// ,
if len(spanWriterTypes) > 1 {
fmt.Fprintf(log,
"WARNING: multiple span storage types have been specified. "+
"Only the first type (%s) will be used for reading and archiving.
",
spanWriterTypes[0],
)
}
// dependency
depStorageType := os.Getenv(DependencyStorageTypeEnvVar)
if depStorageType == "" {
depStorageType = spanWriterTypes[0]
}
return FactoryConfig{
SpanWriterTypes: spanWriterTypes,
SpanReaderType: spanWriterTypes[0],
DependenciesStorageType: depStorageType,
}
}
上記のコードから分かるように、この方法の役割は、環境変数またはコマンドラインからストレージ構成を読み出すことであり、現在は次の2つのタイプのストレージがある.
// spans
SpanStorageTypeEnvVar = "SPAN_STORAGE_TYPE"
// dependencies
DependencyStorageTypeEnvVar = "DEPENDENCY_STORAGE_TYPE"
spanにはリンクトラッキングのデータが格納され,dependencyには依存するデータが格納される.Dependenciesは、Jaeger UIの上にあるメニューのDependenciesの欄です.
我々のシナリオでは、
FactoryConfigFromEnvAndCLI
メソッドの戻り値は、memory
タイプのストレージである.次に
NewFactory
の方法を見てみましょう.func NewFactory(config FactoryConfig) (*Factory, error) {
f := &Factory{FactoryConfig: config}
uniqueTypes := map[string]struct{}{
f.SpanReaderType: {},
f.DependenciesStorageType: {},
}
//
for _, storageType := range f.SpanWriterTypes {
uniqueTypes[storageType] = struct{}{}
}
f.factories = make(map[string]storage.Factory)
//
for t := range uniqueTypes {
ff, err := f.getFactoryOfType(t)
if err != nil {
return nil, err
}
f.factories[t] = ff
}
return f, nil
}
上記のコードから分かるように、
NewFactory
メソッドは、どのストレージタイプが必要かを判断し、それぞれインスタンス化するプロセスである.strategyStoreFactory
ここでの
NewFactory
の方法には同様にFactoryConfigFromEnv
を参照する方法があり、まずこの方法を見てみましょう.func FactoryConfigFromEnv() FactoryConfig {
strategyStoreType := os.Getenv(SamplingTypeEnvVar)
if strategyStoreType == "" {
strategyStoreType = staticStrategyStoreType
}
return FactoryConfig{
StrategyStoreType: strategyStoreType,
}
}
上記のコードから分かるように、
FactoryConfigFromEnv
メソッドは、環境変数からサンプリングタイプを取得する役割を果たす.サンプリングタイプは2種類あります.staticStrategyStoreType = "static"
adaptiveStrategyStoreType = "adaptive"
デフォルトのサンプリングタイプはstaticです.
NewFactory
メソッドも上記のメソッドと同様に、構成されたサンプリングのタイプを取得してインスタンス化し、ここでのサンプリングタイプはstaticのみを実現し、adaptiveタイプのファクトリを実現していないことに注目すべきである.func NewFactory(config FactoryConfig) (*Factory, error) {
f := &Factory{FactoryConfig: config}
uniqueTypes := map[string]struct{}{
f.StrategyStoreType: {},
}
f.factories = make(map[string]strategystore.Factory)
for t := range uniqueTypes {
ff, err := f.getFactoryOfType(t)
if err != nil {
return nil, err
}
f.factories[t] = ff
}
return f, nil
}
特筆すべきは、ここのfは本当の工場ではなく、f.factoriesの中に存在していることだ.
構成の初期化
v := viper.New()
ここでviperインスタンスを新規作成し、ストレージ構成として機能します.viperはGoアプリケーションの完全な構成ソリューションです.
次に、command構成に関する次のコードをスキップし、後ろの部分を直接見ます.
flags.SetDefaultHealthCheckPort(collector.CollectorDefaultHealthCheckHTTPPort)
config.AddFlags(
v,
command,
flags.AddConfigFileFlag,
flags.AddFlags,
storageFactory.AddFlags,
agentApp.AddFlags,
agentRep.AddFlags,
agentTchanRep.AddFlags,
agentGrpcRep.AddFlags,
collector.AddFlags,
queryApp.AddFlags,
pMetrics.AddFlags,
strategyStoreFactory.AddFlags,
)
ここではデフォルトの健康診断のポートが設定され、
AddFlags
メソッドでは、以前に新規に作成したviperにデフォルトのパラメータの一部が書き込まれます.構成パラメータを書き込むと、準備作業も完了します.次は起動段階です.
開始
起動方式は
cobra
で構成されたコマンドラインを用い,RunE
メソッドを直接見る.sFlags := new(flags.SharedFlags).InitFromViper(v)
logger, err := sFlags.NewLogger(zap.NewProductionConfig())
ログ構成を初期化し、デフォルトのログレベルはinfoです.
hc, err := sFlags.NewHealthCheck(logger)
健康診断インタフェースを監視します.
mBldr := new(pMetrics.Builder).InitFromViper(v)
// Prometheus
rootMetricsFactory, err := mBldr.CreateMetricsFactory("")
// , Prometheus
metricsFactory := rootMetricsFactory.Namespace(metrics.NSOptions{Name: "jaeger", Tags: nil})
metricsは関連する構成を収集し、デフォルトのmetricsサービスはPrometheusです.
storageFactory.InitFromViper(v)
if err := storageFactory.Initialize(metricsFactory, logger); err != nil {
logger.Fatal("Failed to init storage factory", zap.Error(err))
}
初期化ストレージファクトリ構成は、spanの
memory
、Reader
、dependencyのWriter
インタフェースを実現する、初期化された構造体を返す.spanReader, err := storageFactory.CreateSpanReader()
spanWriter, err := storageFactory.CreateSpanWriter()
dependencyReader, err := storageFactory.CreateDependencyReader()
ここでは、上述したspanの
Reader
、Reader
、dependencyのWriter
インターフェースの3つのインターフェースをインスタンス化する.この3つのインタフェースがどのようにインスタンス化されているかを具体的に見てみましょう.(memoryベースの実装のみ)func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
factory, ok := f.factories[f.SpanReaderType]
if !ok {
return nil, fmt.Errorf("No %s backend registered for span store", f.SpanReaderType)
}
return factory.CreateSpanReader()
}
func (f *Factory) CreateSpanReader() (spanstore.Reader, error) {
return f.store, nil
}
上記のコードから分かるように、
Reader
メソッドは、reader関連インタフェースが実装されているため、CreateSpanReader
構造を直接返します.func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
var writers []spanstore.Writer
for _, storageType := range f.SpanWriterTypes {
factory, ok := f.factories[storageType]
if !ok {
return nil, fmt.Errorf("No %s backend registered for span store", storageType)
}
writer, err := factory.CreateSpanWriter()
if err != nil {
return nil, err
}
writers = append(writers, writer)
}
if len(f.SpanWriterTypes) == 1 {
return writers[0], nil
}
return spanstore.NewCompositeWriter(writers...), nil
}
func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {
return f.store, nil
}
SpanWriterのロジックはreaderのロジックとほぼ同じで、違いはwriterが複数ある可能性があるので、スライスに入れる必要があります.
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
factory, ok := f.factories[f.DependenciesStorageType]
if !ok {
return nil, fmt.Errorf("No %s backend registered for span store", f.DependenciesStorageType)
}
return factory.CreateDependencyReader()
}
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
return f.store, nil
}
DependencyReaderとSpanReaderは論理的に一致している.
サンプリングポリシーファクトリのインスタンス化を次に示します.
strategyStoreFactory.InitFromViper(v)
strategyStore := initSamplingStrategyStore(strategyStoreFactory, metricsFactory, logger)
まず
store
の方法を見てみましょう.func (f *Factory) InitFromViper(v *viper.Viper) {
for _, factory := range f.factories {
if conf, ok := factory.(plugin.Configurable); ok {
conf.InitFromViper(v)
}
}
}
// static/factory.go
func (f *Factory) InitFromViper(v *viper.Viper) {
f.options.InitFromViper(v)
}
func (opts *Options) InitFromViper(v *viper.Viper) *Options {
opts.StrategiesFile = v.GetString(samplingStrategiesFile)
return opts
}
まず,以前に格納されたすべての工場を巡り,
InitFromViper
メソッドの実装に進む.ここではInitFromViper
という実装方法に入ります.そしてstatic/factory.go
に対応する値を取り出し、samplingStrategiesFile
変数に入れる.ここでこの値は空の文字列です.次は
opts
メソッドです.func initSamplingStrategyStore(
samplingStrategyStoreFactory *ss.Factory,
metricsFactory metrics.Factory,
logger *zap.Logger,
) strategystore.StrategyStore {
if err := samplingStrategyStoreFactory.Initialize(metricsFactory, logger); err != nil {
logger.Fatal("Failed to init sampling strategy store factory", zap.Error(err))
}
strategyStore, err := samplingStrategyStoreFactory.CreateStrategyStore()
if err != nil {
logger.Fatal("Failed to create sampling strategy store", zap.Error(err))
}
return strategyStore
}
上のコードは2つのことをして、
initSamplingStrategyStore
方法を呼び出して、この方法はここでloggerをfactor内の変数に割り当てるだけで、議論に深く入りません.もう1つの方法Initialize
は、静的ポリシーを用いてこれらのポリシーを格納するサンプリングポリシーストレージを新規作成することである.以下、いくつかのステップジャンプを省略して、CreateStrategyStore
コードに直接入ります.func NewStrategyStore(options Options, logger *zap.Logger) (ss.StrategyStore, error) {
h := &strategyStore{
logger: logger,
serviceStrategies: make(map[string]*sampling.SamplingStrategyResponse),
}
strategies, err := loadStrategies(options.StrategiesFile)
if err != nil {
return nil, err
}
h.parseStrategies(strategies)
return h, nil
}
まず、
'strategy_store.go
のデータ構造が新設され、strategyStore
はmap構造の変数であり、特定のサービスのサンプリングポリシーが格納されている.mapのkeyはサービスの名前です.serviceStrategies
メソッドでは、viperで構成されたポリシーパスに基づいてファイルが読み出されます.ここではパスが空なので、このメソッドは直接戻り、返される数値は空です.次の
loadStrategies
メソッドでは、パラメータが空であるため、サンプリングポリシーはデフォルトパラメータに割り当てられ、返されます.defaultStrategy = sampling.SamplingStrategyResponse{
StrategyType: sampling.SamplingStrategyType_PROBABILISTIC,
ProbabilisticSampling: &sampling.ProbabilisticSamplingStrategy{
SamplingRate: defaultSamplingProbability,
},
}
デフォルトのサンプリングポリシーは確率サンプリングであり、使用するサンプリング確率は0.001です.
次にviperからパラメータを取得し、以下の初期化の準備をします.
// agent
aOpts := new(agentApp.Builder).InitFromViper(v)
// agent query , grpc tchannel
repOpts := new(agentRep.Options).InitFromViper(v)
// tchannel
tchannelRepOpts := agentTchanRep.NewBuilder().InitFromViper(v, logger)
// grpc
grpcRepOpts := new(agentGrpcRep.Options).InitFromViper(v)
// collector
cOpts := new(collector.CollectorOptions).InitFromViper(v)
// query
qOpts := new(queryApp.QueryOptions).InitFromViper(v)
次に、この3つのサービスをそれぞれ起動します.
startAgent(aOpts, repOpts, tchannelRepOpts, grpcRepOpts, cOpts, logger, metricsFactory)
grpcServer := startCollector(cOpts, spanWriter, logger, metricsFactory, strategyStore, hc)
startQuery(qOpts, spanReader, dependencyReader, logger, rootMetricsFactory, metricsFactory, mBldr, hc, archiveOptions(storageFactory, logger))
これらの起動の過程をそれぞれ見てみましょう.
エージェントの起動
func startAgent(
b *agentApp.Builder,
repOpts *agentRep.Options,
tchanRep *agentTchanRep.Builder,
grpcRepOpts *agentGrpcRep.Options,
cOpts *collector.CollectorOptions,
logger *zap.Logger,
baseFactory metrics.Factory,
) {
// metrics
metricsFactory := baseFactory.Namespace(metrics.NSOptions{Name: "agent", Tags: nil})
cp, err := createCollectorProxy(cOpts, repOpts, tchanRep, grpcRepOpts, logger, metricsFactory)
if err != nil {
logger.Fatal("Could not create collector proxy", zap.Error(err))
}
agent, err := b.CreateAgent(cp, logger, baseFactory)
if err != nil {
logger.Fatal("Unable to initialize Jaeger Agent", zap.Error(err))
}
logger.Info("Starting agent")
if err := agent.Run(); err != nil {
logger.Fatal("Failed to run the agent", zap.Error(err))
}
}
func createCollectorProxy(
cOpts *collector.CollectorOptions,
repOpts *agentRep.Options,
tchanRepOpts *agentTchanRep.Builder,
grpcRepOpts *agentGrpcRep.Options,
logger *zap.Logger,
mFactory metrics.Factory,
) (agentApp.CollectorProxy, error) {
switch repOpts.ReporterType {
case agentRep.GRPC:
grpcRepOpts.CollectorHostPort = append(grpcRepOpts.CollectorHostPort, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorGRPCPort))
return agentGrpcRep.NewCollectorProxy(grpcRepOpts, mFactory, logger)
case agentRep.TCHANNEL:
tchanRepOpts.CollectorHostPorts = append(tchanRepOpts.CollectorHostPorts, fmt.Sprintf("127.0.0.1:%d", cOpts.CollectorPort))
return agentTchanRep.NewCollectorProxy(tchanRepOpts, mFactory, logger)
default:
return nil, errors.New(fmt.Sprintf("unknown reporter type %s", string(repOpts.ReporterType)))
}
}
Agentサービスを開始する前に、まず
parseStrategies
メソッドを呼び出してCollectorのエージェントを作成し、Collectorにデータを報告します.この方法では,通信プロトコルのタイプに応じて異なるエージェントを作成する.次に、作成したCollectorエージェントを含むエージェントインスタンスを作成するために
createCollectorProxy
メソッドを呼び出します.Agentインスタンスの作成はAgentモジュールの範囲に属するため、この方法は後で詳細に展開されます.上記の手順にエラーがない場合は、Agentを起動します.
Collectorの起動
Collectorを起動するコードが長いので、次は段階的に分解します.
spanBuilder, err := collector.NewSpanHandlerBuilder(
cOpts,
spanWriter,
basic.Options.LoggerOption(logger),
basic.Options.MetricsFactoryOption(metricsFactory),
)
zipkinSpansHandler, jaegerBatchesHandler, grpcHandler := spanBuilder.BuildHandlers()
新しいspanBuilderを作成し、handlerを3つ作成しました.ここで、
CreateAgent
およびzipkinSpansHandler
はTChanCollectorインターフェースを実現し、Tchan RPCの呼び出しを処理することができる. {
ch, err := tchannel.NewChannel("jaeger-collector", &tchannel.ChannelOptions{})
server := thrift.NewServer(ch)
server.Register(jc.NewTChanCollectorServer(jaegerBatchesHandler))
server.Register(zc.NewTChanZipkinCollectorServer(zipkinSpansHandler))
// handler
server.Register(sc.NewTChanSamplingManagerServer(sampling.NewHandler(strategyStore)))
portStr := ":" + strconv.Itoa(cOpts.CollectorPort)
listener, err := net.Listen("tcp", portStr)
logger.Info("Starting jaeger-collector TChannel server", zap.Int("port", cOpts.CollectorPort))
// tchan
ch.Serve(listener)
}
上記のコードは、
jaegerBatchesHandler
およびzipkinSpansHandler
を処理できるtchannelを起動することです.次にGRPCサービスを開始する.
func startGRPCServer(
port int,
handler *collectorApp.GRPCHandler,
samplingStore strategystore.StrategyStore,
logger *zap.Logger,
) (*grpc.Server, error) {
server := grpc.NewServer()
// grpc handler
_, err := grpcserver.StartGRPCCollector(port, server, handler, samplingStore, logger, func(err error) {
logger.Fatal("gRPC collector failed", zap.Error(err))
})
if err != nil {
return nil, err
}
return server, err
}
後のコードはZipkin Http Apiを処理するために使用され、ZipkinサービスはCollecortに直接データを報告することができ、ここでは展開されていません.
ここで、Agentコンポーネントの起動が完了し、Tchannel、GRPC、Zipkinを専門に処理するHttpサービスの3つのサービスが開始されました.
Queryの起動
tracer, closer, err := jaegerClientConfig.Configuration{
Sampler: &jaegerClientConfig.SamplerConfig{
Type: "const",
Param: 1.0,
},
RPCMetrics: true,
}.New(
"jaeger-query",
jaegerClientConfig.Metrics(rootFactory),
jaegerClientConfig.Logger(jaegerClientZapLog.NewLogger(logger)),
)
opentracing.SetGlobalTracer(tracer)
上記のコードは、まずtracerを作成し、自身の情報を収集し、Agentコンポーネントに報告します.
spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, baseFactory.Namespace(metrics.NSOptions{Name: "query", Tags: nil}))
次に、
jaegerBatchesHandler
にmetrics関連の方法をいくつか追加し、実行中の指標情報を収集する.ここではアクセサリーのデザインモードを使いました.handlerOpts = append(handlerOpts, queryApp.HandlerOptions.Logger(logger), queryApp.HandlerOptions.Tracer(tracer))
apiHandler := queryApp.NewAPIHandler(
spanReader,
depReader,
handlerOpts...)
r := mux.NewRouter()
if qOpts.BasePath != "/" {
r = r.PathPrefix(qOpts.BasePath).Subrouter()
}
// url
apiHandler.RegisterRoutes(r)
queryApp.RegisterStaticHandler(r, logger, qOpts)
// metrics handler
if h := metricsBuilder.Handler(); h != nil {
logger.Info("Registering metrics handler with jaeger-query HTTP server", zap.String("route", metricsBuilder.HTTPRoute))
r.Handle(metricsBuilder.HTTPRoute, h)
}
上記のコードでは、
spanReader
はすべての内蔵APIの集合に相当し、転送トラフィックの機能も受けている.go func() {
defer closer.Close()
if err := http.ListenAndServe(portStr, recoveryHandler(r)); err != nil {
logger.Fatal("Could not launch jaeger-query service", zap.Error(err))
}
hc.Set(healthcheck.Unavailable)
}()
最後にコラボレーションを開始し、httpサービスを実行します.
3つのコンポーネントが起動するとjaeger all-in-oneが正常に起動します.agentコンポーネントにデータが報告されている場合は、Webページを開くと追跡が表示されます.
リファレンスドキュメント
Golangのsignal SIGKILLとSIGTERM、SIGINT