prometheusは、アラートポリシーをどのように評価し、alertmanagerにアラートメッセージをプッシュしますか?


目次
init rule manager
rule managerステータス更新
update
load groups
group key
run new group
評価ルールと送信
評価
送信
QueryFunc 
exec query 
 
NotifyFunc 
初期化notify manager
notify managerの実行
notifier run
send all
sendOne
 
prometheus branch: master
1.rule managerの初期化
prometheusのevaluation_によるとinterval、ルールファイル、およびexternal_labelsが更新します.1.ルール・ファイルを解析し、各ルール・ファイルをループしてgroupsを読み込みます.ルールファイルのグループを読み込む
2.group内のルールをループし、各ルールを解析し、ルールにintervalが設定されている場合、group intervalはルールのintervalに値を取り、そうでない場合はevaluation_を直接取ります.interval.
3.グループ(group)フォーマット:ルールファイル名;group名、例えばxxが存在する.yamlのルールファイルgroup nameがyyである場合、最終的なパケットはxxである.yaml;yy
4.得られたグループをループし、manegrの中のこのグループを削除します.新旧グループが等しいかどうかを判断し、等しい場合はスキップし、stopの古いグループをスキップし、古いグループの状態を新しいグループにコピーします.
5.ルール・ファイル名とgroup nameおよびevaluation_に従って、新しいgroupを実行します.intervalはevalTimestampを得,evalTimestampに基づいてアラーム規則を評価する.evaluationInterval間隔で実行し、実行ごとにmissed=((now-evalTimestamp)/evaluationInterval)-1を計算し、evalTimestamp=(evalTimestamp+missed+1)*evaluationIntervalをリセットします.これはアラームルール評価Evealに直接影響します.
6.アラームルールを評価しgroupを評価するルールを送信

init rule manager


コード位置:github.com/prometheus/prometheus/cmd/prometheus/main.go
queryEngine = promql.NewEngine(opts)

ruleManager = rules.NewManager(&rules.ManagerOptions{
	Appendable:      fanoutStorage,
	TSDB:            localStorage,
	QueryFunc:       rules.EngineQueryFunc(queryEngine, fanoutStorage),
	NotifyFunc:      sendAlerts(notifierManager, cfg.web.ExternalURL.String()),
	Context:         ctxRule,
	ExternalURL:     cfg.web.ExternalURL,
	Registerer:      prometheus.DefaultRegisterer,
	Logger:          log.With(logger, "component", "rule manager"),
	OutageTolerance: time.Duration(cfg.outageTolerance),
	ForGracePeriod:  time.Duration(cfg.forGracePeriod),
	ResendDelay:     time.Duration(cfg.resendDelay),
})

rule managerステータス更新


コード位置:github.com/prometheus/prometheus/cmd/prometheus/main.go
func(cfg *config.Config) error {
	// Get all rule files matching the configuration paths.
	var files []string
	for _, pat := range cfg.RuleFiles {
		fs, err := filepath.Glob(pat)
		if err != nil {
			// The only error can be a bad pattern.
			return errors.Wrapf(err, "error retrieving rule files for %s", pat)
		}
		files = append(files, fs...)
	}
	return ruleManager.Update(
		time.Duration(cfg.GlobalConfig.EvaluationInterval),
		files,
		cfg.GlobalConfig.ExternalLabels,
	)
}

update


コード位置:github.com/prometheus/prometheus/rules/manager.go
// Update the rule manager's state as the config requires. If
// loading the new rules failed the old rule set is restored.
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels) error {
	m.mtx.Lock()
	defer m.mtx.Unlock()

	groups, errs := m.LoadGroups(interval, externalLabels, files...)
    ...

	var wg sync.WaitGroup
	for _, newg := range groups {
		// If there is an old group with the same identifier,
		// check if new group equals with the old group, if yes then skip it.
		// If not equals, stop it and wait for it to finish the current iteration.
		// Then copy it into the new group.
		gn := groupKey(newg.file, newg.name)
		oldg, ok := m.groups[gn]
		delete(m.groups, gn)

		if ok && oldg.Equals(newg) {
			groups[gn] = oldg
			continue
		}

		wg.Add(1)
		go func(newg *Group) {
			if ok {
				oldg.stop()
				newg.CopyState(oldg)
			}
			go func() {
				// Wait with starting evaluation until the rule manager
				// is told to run. This is necessary to avoid running
				// queries against a bootstrapping storage.
				

load groups


コード位置:github.com/prometheus/prometheus/rules/manager.go
// LoadGroups reads groups from a list of files.
func (m *Manager) LoadGroups(
	interval time.Duration, externalLabels labels.Labels, filenames ...string,
) (map[string]*Group, []error) {
	groups := make(map[string]*Group)

	shouldRestore := !m.restored

	for _, fn := range filenames {
		rgs, errs := rulefmt.ParseFile(fn)
		if errs != nil {
			return nil, errs
		}

		for _, rg := range rgs.Groups {
			itv := interval
			if rg.Interval != 0 {
				itv = time.Duration(rg.Interval)
			}

			rules := make([]Rule, 0, len(rg.Rules))
			for _, r := range rg.Rules {
				expr, err := parser.ParseExpr(r.Expr.Value)
				if err != nil {
					return nil, []error{errors.Wrap(err, fn)}
				}

				if r.Alert.Value != "" {
					rules = append(rules, NewAlertingRule(
						r.Alert.Value,
						expr,
						time.Duration(r.For),
						labels.FromMap(r.Labels),
						labels.FromMap(r.Annotations),
						externalLabels,
						m.restored,
						log.With(m.logger, "alert", r.Alert),
					))
					continue
				}
             ...
			}

			groups[groupKey(fn, rg.Name)] = NewGroup(GroupOptions{
				Name:          rg.Name,
				File:          fn,
				Interval:      itv,
				Rules:         rules,
				ShouldRestore: shouldRestore,
				Opts:          m.opts,
				done:          m.done,
			})
		}
	}

	return groups, nil
}

group key

// Group names need not be unique across filenames.
func groupKey(file, name string) string {
	return file + ";" + name
}

run new group


コード位置:github.com/prometheus/prometheus/rules/manager.go
func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels) error {
...
	wg.Add(1)
	go func(newg *Group) {
		if ok {
			oldg.stop()
			newg.CopyState(oldg)
		}
		go func() {
			// Wait with starting evaluation until the rule manager
			// is told to run. This is necessary to avoid running
			// queries against a bootstrapping storage.
			

run 
コード位置:github.com/prometheus/prometheus/rules/manager.go
evalTimestampの最終結果は、アラート・ルール名とgroup nameに依存して、現在の時間より数秒または数十秒遅れます.
time.Untils(evalTimestamp)はevalTimestamp-現在の時間です
evaluationInterval間隔で実行し、実行ごとにmissed=((now-evalTimestamp)/evaluationInterval)-1を計算します.
アラーム規則評価Evalに直接影響するevalTimestamp=(evalTimestamp+missed+1)*evaluationIntervalをリセット
 
func (g *Group) run(ctx context.Context) {
	defer close(g.terminated)

	// Wait an initial amount to have consistently slotted intervals.
	evalTimestamp := g.evalTimestamp().Add(g.interval)
	select {
	case  0 {
				g.metrics.iterationsMissed.Add(float64(missed))
				g.metrics.iterationsScheduled.Add(float64(missed))
			}
			evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
			iter()
		}

		g.RestoreForState(time.Now())
		g.shouldRestore = false
	}

	for {
		select {
		case  0 {
					g.metrics.iterationsMissed.Add(float64(missed))
					g.metrics.iterationsScheduled.Add(float64(missed))
				}
				evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
				iter()
			}
		}
	}
}

evalTimestampの計算


コード位置:github.com/prometheus/prometheus/rules/manager.go
ルールグループのfile nameとguoup nameでhash%evaluation_を取得interval、evaluation_intervalが固定されている場合、file nameまたはgroup nameが長ければ長いほど、得られる時間が大きくなる.
// evalTimestamp returns the immediately preceding consistently slotted evaluation time.
func (g *Group) evalTimestamp() time.Time {
	var (
		offset = int64(g.hash() % uint64(g.interval))
		now    = time.Now().UnixNano()
		adjNow = now - offset
		base   = adjNow - (adjNow % int64(g.interval))
	)

	return time.Unix(0, base+offset).UTC()
}

評価ルールと送信


コード位置:github.com/prometheus/prometheus/rules/manager.go
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *Group) Eval(ctx context.Context, ts time.Time) {
	for i, rule := range g.rules {
	...
       			vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)
			if err != nil {
				// Canceled queries are intentional termination of queries. This normally
				// happens on shutdown and thus we skip logging of any errors here.
				if _, ok := err.(promql.ErrQueryCanceled); !ok {
					level.Warn(g.logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err)
				}
				g.metrics.evalFailures.WithLabelValues(groupKey(g.File(), g.Name())).Inc()
				return
			}

    ...
}

評価


コード位置:github.com/prometheus/prometheus/rules/alerting.go
alertがinactiveでない場合、inactiveに変更され、resolvedにevaluationIntervalとして割り当てられます.
resolvedRetention=15m

評価はqueryがprometheusのデータをクエリーすることによって行います.ここでQueryFuncを呼び出します.
// Eval evaluates the rule expression and then creates pending alerts and fires
// or removes previously pending alerts accordingly.
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {
	res, err := query(ctx, r.vector.String(), ts)
	if err != nil {
		r.SetHealth(HealthBad)
		r.SetLastError(err)
		return nil, err
	}

	r.mtx.Lock()
	defer r.mtx.Unlock()

	// Create pending alerts for any new vector elements in the alert expression
	// or update the expression value for existing elements.
	resultFPs := map[uint64]struct{}{}

	var vec promql.Vector
	var alerts = make(map[uint64]*Alert, len(res))
	for _, smpl := range res {
		// Provide the alert information to the template.
		l := make(map[string]string, len(smpl.Metric))
		for _, lbl := range smpl.Metric {
			l[lbl.Name] = lbl.Value
		}
            ...

		lb := labels.NewBuilder(smpl.Metric).Del(labels.MetricName)

		   ....

		lbs := lb.Labels()
		h := lbs.Hash()
		resultFPs[h] = struct{}{}

		if _, ok := alerts[h]; ok {
			err = fmt.Errorf("vector contains metrics with the same labelset after applying alert labels")
			// We have already acquired the lock above hence using SetHealth and
			// SetLastError will deadlock.
			r.health = HealthBad
			r.lastError = err
			return nil, err
		}

		alerts[h] = &Alert{
			Labels:      lbs,
			Annotations: annotations,
			ActiveAt:    ts,
			State:       StatePending,
			Value:       smpl.V,
		}
	}

	for h, a := range alerts {
		// Check whether we already have alerting state for the identifying label set.
		// Update the last value and annotations if so, create a new alert entry otherwise.
		if alert, ok := r.active[h]; ok && alert.State != StateInactive {
			alert.Value = a.Value
			alert.Annotations = a.Annotations
			continue
		}

		r.active[h] = a
	}

	// Check if any pending alerts should be removed or fire now. Write out alert timeseries.
	for fp, a := range r.active {
		if _, ok := resultFPs[fp]; !ok {
			// If the alert was previously firing, keep it around for a given
			// retention time so it is reported as resolved to the AlertManager.
			if a.State == StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) {
				delete(r.active, fp)
			}
			if a.State != StateInactive {
				a.State = StateInactive
				a.ResolvedAt = ts
			}
			continue
		}
       ...
	}

	// We have already acquired the lock above hence using SetHealth and
	// SetLastError will deadlock.
	r.health = HealthGood
	r.lastError = err
	return vec, nil
}

送信


sendAlerts
コード位置:github.com/prometheus/prometheus/rules/alerting.go
// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *Group) Eval(ctx context.Context, ts time.Time) {
  ...
  	if ar, ok := rule.(*AlertingRule); ok {
			ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
		}
  ...
}

NotifyFuncの呼び出し
ここでmanagerのNotifyFuncが呼び出されます.
コードの中のresendDelayはprometheusのrulesに対応しています.alert.resend-delayパラメータ、このパラメータのデフォルトは1 mです.
つまりrulesを修正しなかったらalert.resend-delayの値は、alertのValidUntil値がevalTimestamp+4*1 mに等しい.ValidUntilは最終的にalertのEndsAtフィールドに割り当てられます(現在のalertがresolvedでない場合).参照goのsendAlerts()
コード位置:github.com/prometheus/prometheus/rules/alerting.go
func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
	alerts := []*Alert{}
	r.ForEachActiveAlert(func(alert *Alert) {
		if alert.needsSending(ts, resendDelay) {
			alert.LastSentAt = ts
			// Allow for two Eval or Alertmanager send failures.
			delta := resendDelay
			if interval > resendDelay {
				delta = interval
			}
			alert.ValidUntil = ts.Add(4 * delta)
			anew := *alert
			alerts = append(alerts, &anew)
		}
	})
	notifyFunc(ctx, r.vector.String(), alerts...)
}

needsSending
コード位置:github.com/prometheus/prometheus/rules/alerting.go
func (a *Alert) needsSending(ts time.Time, resendDelay time.Duration) bool {
	if a.State == StatePending {
		return false
	}

	// if an alert has been resolved since the last send, resend it
	if a.ResolvedAt.After(a.LastSentAt) {
		return true
	}

	return a.LastSentAt.Add(resendDelay).Before(ts)
}

QueryFunc 


コード位置:github.com/prometheus/prometheus/rules/manager.go
このfuncは最終的にアラームルールを評価する際にprometheus tsdbからmetricをクエリーするために使用されます.
// EngineQueryFunc returns a new query function that executes instant queries against
// the given engine.
// It converts scalar into vector results.
func EngineQueryFunc(engine *promql.Engine, q storage.Queryable) QueryFunc {
	return func(ctx context.Context, qs string, t time.Time) (promql.Vector, error) {
		q, err := engine.NewInstantQuery(q, qs, t)
		if err != nil {
			return nil, err
		}
		res := q.Exec(ctx)
		if res.Err != nil {
			return nil, res.Err
		}
		switch v := res.Value.(type) {
		case promql.Vector:
			return v, nil
		case promql.Scalar:
			return promql.Vector{promql.Sample{
				Point:  promql.Point(v),
				Metric: labels.Labels{},
			}}, nil
		default:
			return nil, errors.New("rule result is not a vector or scalar")
		}
	}
}

exec query 


式に基づいてprometheusをクエリーします.
コード位置:github.com/prometheus/prometheus/promql/engine.go

// exec executes the query.
//
// At this point per query only one EvalStmt is evaluated. Alert and record
// statements are not handled by the Engine.
func (ng *Engine) exec(ctx context.Context, q *query) (v parser.Value, w storage.Warnings, err error) {
    ...

	defer func() {
		ng.queryLoggerLock.RLock()
		if l := ng.queryLogger; l != nil {
			params := make(map[string]interface{}, 4)
			params["query"] = q.q
			if eq, ok := q.Statement().(*parser.EvalStmt); ok {
				params["start"] = formatDate(eq.Start)
				params["end"] = formatDate(eq.End)
				// The step provided by the user is in seconds.
				params["step"] = int64(eq.Interval / (time.Second / time.Nanosecond))
			}
			f := []interface{}{"params", params}
			if err != nil {
				f = append(f, "error", err)
			}
			f = append(f, "stats", stats.NewQueryStats(q.Stats()))
			if span := opentracing.SpanFromContext(ctx); span != nil {
				if spanCtx, ok := span.Context().(jaeger.SpanContext); ok {
					f = append(f, "spanID", spanCtx.SpanID())
				}
			}
			if origin := ctx.Value(queryOrigin{}); origin != nil {
				for k, v := range origin.(map[string]interface{}) {
					f = append(f, k, v)
				}
			}
       ....
		}
		...
	}()
     ...
	// The base context might already be canceled on the first iteration (e.g. during shutdown).
	if err := contextDone(ctx, env); err != nil {
		return nil, nil, err
	}

	switch s := q.Statement().(type) {
	case *parser.EvalStmt:
		return ng.execEvalStmt(ctx, q, s)
	case parser.TestStmt:
		return nil, nil, s(ctx)
	}

	panic(errors.Errorf("promql.Engine.exec: unhandled statement of type %T", q.Statement()))
}

 


NotifyFunc 


コード位置:github.com/prometheus/promethues/cmd/prometheus/main.go
このfuncは最終的にalertsをAlertManagerにプッシュするために使用されます.
resolved時間が0でない場合、Endsateはresolved時間である、そうでない場合、EndsateはVaildUntilである.つまりevalTimestamp+4*deltaです.
sendAlertsはNotifyFuncを返します.
type sender interface {
	Send(alerts ...*notifier.Alert)
}

// sendAlerts implements the rules.NotifyFunc for a Notifier.
func sendAlerts(s sender, externalURL string) rules.NotifyFunc {
	return func(ctx context.Context, expr string, alerts ...*rules.Alert) {
		var res []*notifier.Alert

		for _, alert := range alerts {
			a := &notifier.Alert{
				StartsAt:     alert.FiredAt,
				Labels:       alert.Labels,
				Annotations:  alert.Annotations,
				GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
			}
			if !alert.ResolvedAt.IsZero() {
				a.EndsAt = alert.ResolvedAt
			} else {
                // ValidUntil = 4 * rules.alert.resend-delay( 1m)
				a.EndsAt = alert.ValidUntil
			}
			res = append(res, a)
		}

		if len(alerts) > 0 {
			s.Send(res...)
		}
	}
}

初期化notify manager


コード位置:github.com/prometheus/promethues/cmd/prometheus/main.go
notifierManager = notifier.NewManager(&cfg.notifier, log.With(logger, "component", "notifier"))

notify managerの実行


notify manager通知AlertManager
// Notifier.

notifierManager.Run(discoveryManagerNotify.SyncCh())

notifier run


コード位置:github.com/prometheus/prometheus/notifier/notifier.go
// Run dispatches notifications continuously.
func (n *Manager) Run(tsets  0 {
			n.setMore()
		}
	}
}

send all


コード位置:github.com/prometheus/prometheus/notifier/notifier.go 1. alertsの長さを判断し,長さが0であれば説明はすべて送信済みである.
2.prometheusのバージョンv 1またはv 2に基づいてalertsをpayloadに生成します.
3.各AlertManagerをループし、sendOneを呼び出してalrtmanagerにpost要求を送信します.
// sendAll sends the alerts to all configured Alertmanagers concurrently.
// It returns true if the alerts could be sent successfully to at least one Alertmanager.
func (n *Manager) sendAll(alerts ...*Alert) bool {
	if len(alerts) == 0 {
		return true
	}

	begin := time.Now()

	// v1Payload and v2Payload represent 'alerts' marshaled for Alertmanager API
	// v1 or v2. Marshaling happens below. Reference here is for caching between
	// for loop iterations.
   ...
	for _, ams := range amSets {
		var (
			payload []byte
			err     error
		)

		ams.mtx.RLock()

		switch ams.cfg.APIVersion {
		case config.AlertmanagerAPIVersionV1:
			{
			...

				payload = v1Payload
			}
		case config.AlertmanagerAPIVersionV2:
			{
				...
				payload = v2Payload
			}
         ....

		for _, am := range ams.ams {
			wg.Add(1)

			ctx, cancel := context.WithTimeout(n.ctx, time.Duration(ams.cfg.Timeout))
			defer cancel()

			go func(client *http.Client, url string) {
				if err := n.sendOne(ctx, client, url, payload); err != nil {
					level.Error(n.logger).Log("alertmanager", url, "count", len(alerts), "msg", "Error sending alert", "err", err)
					n.metrics.errors.WithLabelValues(url).Inc()
				} else {
					atomic.AddUint64(&numSuccess, 1)
				}
              ....
			}(ams.client, am.url().String())
		}

		ams.mtx.RUnlock()
	}

	wg.Wait()

	return numSuccess > 0
}

sendOne


コード位置:github.com/prometheus/prometheus/notifier/notifier.go postリクエストをalertmanagerに送信
func (n *Manager) sendOne(ctx context.Context, c *http.Client, url string, b []byte) error {
	req, err := http.NewRequest("POST", url, bytes.NewReader(b))
	if err != nil {
		return err
	}
	req.Header.Set("User-Agent", userAgent)
	req.Header.Set("Content-Type", contentTypeJSON)
	resp, err := n.opts.Do(ctx, c, req)
	if err != nil {
		return err
	}
	defer func() {
		io.Copy(ioutil.Discard, resp.Body)
		resp.Body.Close()
	}()

	// Any HTTP status 2xx is OK.
	if resp.StatusCode/100 != 2 {
		return errors.Errorf("bad response status %s", resp.Status)
	}

	return nil
}