概述
本来不想看这段代码的,不过最近想解决一个问题,就是 Prometheus 重启之后,报警被会被自动解决,然后再等待一段时间之后,再触发起来。一开始想着自己改代码解决的,思路都想好了,但是,在想的过程中觉得这是个 common case,可以考虑一下社区是否接纳,所以就先看了一些官方的 Issue:
- Prometheus endsAt timeout is too short #5277
- 「Prometheus」没看出啥情况
- Firing -> Resolved -> Firing issue with prometheus + alertmanager #952
- 「Alert Manager」这是一个还没有结论的 bug
- Persist alert ‘for’ state across restarts #422
- Persist alert ‘for’ state across restarts #4061
- 这两个 case 解决了重启之后,报警重置的问题
很明显,这个问题已经被解决了,还是吃了没有跟随社区版本的亏(希望这个坑可以填上),所以,这里顺带看看社区怎么做的(最后发现和自己的想法基本一致)。
加载报警规则
- 加载逻辑所处的代码:
rules/manager.go
- 加载过程
- 加载配置文件:
m.loadGroups(interval, files...)
- 更新旧的配置:更新逻辑
- 从旧的 groups 中删除旧 alert
- 停掉旧 alerting rule
newg.copyState(oldg)
<-m.block
之后运行 newgroup:newg.run(m.opts.Context)
- 加载配置文件:
group copyState
g.evaluationTime = from.evaluationTime
// 上一次执行这个报警规则花费了多少时间g.seriesInPreviousEval[i] = from.seriesInPreviousEval[fi]
ar.active[fp] = a
// 这里包含了 pending 和 firing 的?
group run 做什么
defer close(g.terminated)
... ...
for {
... ...
missed := (time.Since(lastTriggered).Nanoseconds() / g.interval.Nanoseconds()) - 1
if missed > 0 {
iterationsMissed.Add(float64(missed))
iterationsScheduled.Add(float64(missed))
}
lastTriggered = time.Now()
iter()
... ...
}
其中 iter 就是执行报警规则:
iter := func() {
iterationsScheduled.Inc()
start := time.Now()
g.Eval(ctx, start)
iterationDuration.Observe(time.Since(start).Seconds())
g.SetEvaluationTime(time.Since(start))
}
group stop 做什么
就做了这些微小的贡献:
func (g *Group) stop() {
close(g.done)
<-g.terminated
}
重载报警规则
同加载报警规则
执行报警规则
- 注意点:
- 报警规则的查询语句查询出来可能有多条记录,所以需要分别记录多个报警状态
- active map 就是用来记录多条记录报警状态的
- 一些重要的属性
- Value:PromQL 语句最后一次执行时,该时序返回的值
- ActiveAt:
- FiredAt:
- ResolvedAt:义如字段名
- LastSentAt:
执行过程
- 对于每条报警规则,都执行一下:
vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)
- 每个报警都放置入:
resultFPs := map[uint64]struct{}{}
记录报警:
// Check whether we already have alerting state for the identifying label set. // Update the last value and annotations if so, create a new alert entry otherwise. if alert, ok := r.active[h]; ok && alert.State != StateInactive { alert.Value = smpl.V alert.Annotations = annotations continue } r.active[h] = &Alert{ Labels: lb.Labels(), Annotations: annotations, ActiveAt: ts, State: StatePending, Value: smpl.V, }
Check if any pending alerts should be removed or fire
- 通知报警:g.opts.NotifyFunc(ctx, ar.vector.String(), ar.currentAlerts()…)
- 将返回的结果都保存到 TSDB 中:
app.Add(s.Metric, s.T, s.V)
- s.Metric:报警的 labels
- T:当前时间
- V:触发报警的值
- Series no longer exposed, mark it stale. -> app.Add(s.Metric, ts, Nan)
发送报警
代码在:cmd/prometheus/main.go
func sendAlerts(n *notifier.Manager, externalURL string) rules.NotifyFunc {
return func(ctx context.Context, expr string, alerts ...*rules.Alert) error {
var res []*notifier.Alert
for _, alert := range alerts {
if alert.State == rules.StatePending {
continue
}
a := ¬ifier.Alert{
StartsAt: alert.FiredAt,
Labels: alert.Labels,
Annotations: alert.Annotations,
GeneratorURL: externalURL + strutil.TableLinkForExpression(expr),
}
if !alert.ResolvedAt.IsZero() {
a.EndsAt = alert.ResolvedAt
}
res = append(res, a)
}
if len(alerts) > 0 {
n.Send(res...)
}
return nil
}
}
再看看真正的发送逻辑:
// notifier/notifier.go
for _, a := range alerts {
lb := labels.NewBuilder(a.Labels)
for ln, lv := range n.opts.ExternalLabels {
if a.Labels.Get(string(ln)) == "" {
lb.Set(string(ln), string(lv))
}
}
a.Labels = lb.Labels()
}
alerts = n.relabelAlerts(alerts)
if d := (len(n.queue) + len(alerts)) - n.opts.QueueCapacity; d > 0 {
n.queue = n.queue[d:] // 这里是不是有 bug?
level.Warn(n.logger).Log("msg", "Alert notification queue full, dropping alerts", "num_dropped", d)
n.metrics.dropped.Add(float64(d))
}
n.queue = append(n.queue, alerts...)
// Notify sending goroutine that there are alerts to be processed.
n.setMore() // ----> 接着就是各种通知方式了
报警解决
清除解决的报警:
// rules/alerting.go
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {
... ...
for fp, a := range r.active {
if _, ok := resultFPs[fp]; !ok {
// If the alert was previously firing, keep it around for a given
// retention time so it is reported as resolved to the AlertManager.
if a.State == StatePending || (!a.ResolvedAt.IsZero() && ts.Sub(a.ResolvedAt) > resolvedRetention) {
delete(r.active, fp)
}
if a.State != StateInactive {
a.State = StateInactive
a.ResolvedAt = ts
}
continue
}
添加 EndedAt 字段
// cmd/prometheus/main.go
func sendAlerts(n *notifier.Manager, externalURL string) rules.NotifyFunc {
return func(ctx context.Context, expr string, alerts ...*rules.Alert) error {
... ...
if !alert.ResolvedAt.IsZero() {
a.EndsAt = alert.ResolvedAt
}
重启后延续报警
保存报警状态
// rules/alerting.go
func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {
... ...
if r.restored {
vec = append(vec, r.sample(a, ts))
vec = append(vec, r.forStateSample(a, ts, float64(a.ActiveAt.Unix())))
}
... ...
然后被存储进 TSDB
// rules/manager.go
for _, s := range vector {
if _, err := app.Add(s.Metric, s.T, s.V); err != nil {
加载报警状态
// rules/manager.go
func (g *Group) run(ctx context.Context) {
... ...
if g.shouldRestore {
case <-g.done:
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.interval) - 1
if missed > 0 {
g.metrics.iterationsMissed.Add(float64(missed))
g.metrics.iterationsScheduled.Add(float64(missed))
}
evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
iter()
}
g.RestoreForState(time.Now())
g.shouldRestore = false
}
这里有个有意思的点就在于,在重加载报警之前,会先执行一遍报警(relabel)规则(iter()
),原因在注释里面也写了:
这背后的原因是,在第一次执行期间(或之前),我们可能没有采集到足够的数据,relabel 规则也不会更新到最新的值,而这些值可能被一些警报依赖。
然后再来看下具体的加载过程,首先一开始就是先定义要回溯的时间,这个就是我们在 prometheus 的启动参数 --rules.alert.for-outage-tolerance
中指定的值:
maxtMS := int64(model.TimeFromUnixNano(ts.UnixNano()))
// We allow restoration only if alerts were active before after certain time.
mint := ts.Add(-g.opts.OutageTolerance)
mintMS := int64(model.TimeFromUnixNano(mint.UnixNano()))
q, err := g.opts.Queryable.Querier(g.opts.Context, mintMS, maxtMS)
然后就是针对每个报警规则(这里是以报警规则为组织单位),然后一一确认报警规则对应的报警的历史状态:
for _, rule := range g.Rules() {
alertHoldDuration := alertRule.HoldDuration()
if alertHoldDuration < g.opts.ForGracePeriod {
alertRule.SetRestored(true)
continue
}
这里体现的就是 prometheus 启动参数中的 --rules.alert.for-grace-period
这个参数,如果报警规则的 for 小于这个时间的话,那么就会忽略该报警规则的重载,直接跳过;
alertRule.ForEachActiveAlert(func(a *Alert) {
这里有意思的是,组织的形式居然是以现在内存中的 active 的报警为组织单位,这也是为什么重载报警规则之前也先执行一次的原因,这样,那些带 for 的报警就会被放入 active 这个内存中,状态虽然是 pending,但是不要担心,很快他们就会变成 firing 了。
这里通过 Alert 构建 label 系列,然后查询对应的持久化报警:
smpl := alertRule.forStateSample(a, time.Now(), 0)
... ...
sset := q.Select(false, nil, matchers...)
... ...
var t int64
var v float64
it := s.Iterator()
for it.Next() {
t, v = it.At()
}
然后就将报警解析出来,然后就计算此时的报警应该是什么状态(Pending/Firing):
downAt := time.Unix(t/1000, 0).UTC()
restoredActiveAt := time.Unix(int64(v), 0).UTC()
timeSpentPending := downAt.Sub(restoredActiveAt)
timeRemainingPending := alertHoldDuration - timeSpentPending
if timeRemainingPending <= 0 {
// 这里就可以直接触发报警了,那么报警的触发时间就是重启之前的触发时间了
} else if timeRemainingPending < g.opts.ForGracePeriod {
// 这里的逻辑比较奇怪,但是代码里面给了一个运算过程,结论就是这个
// 这里我的理解是新的 restoredActiveAt 就是新的 Alert 会成为 pending 状态的时间,如果再加上 alertHoldDuration 就等于 g.opts.ForGracePeriod 了
// 也就是说在启动 g.opts.ForGracePeriod 时间之后刚好处于 firing
restoredActiveAt = ts.Add(g.opts.ForGracePeriod).Add(-alertHoldDuration)
} else {
// 这里其实就是不认为重启期间的时间属于 for 的访问
// 例如 for 是 5m,重启前已经 for 了 2m,重启花了 2m,那么现在重启之后,还是认为 for 了 2m,而不是 2+2 = 4m
downDuration := ts.Sub(downAt)
restoredActiveAt = restoredActiveAt.Add(downDuration)
}
a.ActiveAt = restoredActiveAt
从代码中可以看出,如果是重新加载之后,并不会马上就通知一次 Alert manager,还是需要再等待一个周期,也就是说在重启之后的第二个周期的那次才会再触发一次。