blob: 4f0ebe93f3bf870934ec2a462461ec2a109303b9 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Package alarm manages health status alarms in etcd.
16package alarm
17
18import (
19 "sync"
20
21 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
22 "github.com/coreos/etcd/mvcc/backend"
23 "github.com/coreos/etcd/pkg/types"
24 "github.com/coreos/pkg/capnslog"
25)
26
27var (
28 alarmBucketName = []byte("alarm")
29 plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "alarm")
30)
31
32type BackendGetter interface {
33 Backend() backend.Backend
34}
35
36type alarmSet map[types.ID]*pb.AlarmMember
37
38// AlarmStore persists alarms to the backend.
39type AlarmStore struct {
40 mu sync.Mutex
41 types map[pb.AlarmType]alarmSet
42
43 bg BackendGetter
44}
45
46func NewAlarmStore(bg BackendGetter) (*AlarmStore, error) {
47 ret := &AlarmStore{types: make(map[pb.AlarmType]alarmSet), bg: bg}
48 err := ret.restore()
49 return ret, err
50}
51
52func (a *AlarmStore) Activate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
53 a.mu.Lock()
54 defer a.mu.Unlock()
55
56 newAlarm := &pb.AlarmMember{MemberID: uint64(id), Alarm: at}
57 if m := a.addToMap(newAlarm); m != newAlarm {
58 return m
59 }
60
61 v, err := newAlarm.Marshal()
62 if err != nil {
63 plog.Panicf("failed to marshal alarm member")
64 }
65
66 b := a.bg.Backend()
67 b.BatchTx().Lock()
68 b.BatchTx().UnsafePut(alarmBucketName, v, nil)
69 b.BatchTx().Unlock()
70
71 return newAlarm
72}
73
74func (a *AlarmStore) Deactivate(id types.ID, at pb.AlarmType) *pb.AlarmMember {
75 a.mu.Lock()
76 defer a.mu.Unlock()
77
78 t := a.types[at]
79 if t == nil {
80 t = make(alarmSet)
81 a.types[at] = t
82 }
83 m := t[id]
84 if m == nil {
85 return nil
86 }
87
88 delete(t, id)
89
90 v, err := m.Marshal()
91 if err != nil {
92 plog.Panicf("failed to marshal alarm member")
93 }
94
95 b := a.bg.Backend()
96 b.BatchTx().Lock()
97 b.BatchTx().UnsafeDelete(alarmBucketName, v)
98 b.BatchTx().Unlock()
99
100 return m
101}
102
103func (a *AlarmStore) Get(at pb.AlarmType) (ret []*pb.AlarmMember) {
104 a.mu.Lock()
105 defer a.mu.Unlock()
106 if at == pb.AlarmType_NONE {
107 for _, t := range a.types {
108 for _, m := range t {
109 ret = append(ret, m)
110 }
111 }
112 return ret
113 }
114 for _, m := range a.types[at] {
115 ret = append(ret, m)
116 }
117 return ret
118}
119
120func (a *AlarmStore) restore() error {
121 b := a.bg.Backend()
122 tx := b.BatchTx()
123
124 tx.Lock()
125 tx.UnsafeCreateBucket(alarmBucketName)
126 err := tx.UnsafeForEach(alarmBucketName, func(k, v []byte) error {
127 var m pb.AlarmMember
128 if err := m.Unmarshal(k); err != nil {
129 return err
130 }
131 a.addToMap(&m)
132 return nil
133 })
134 tx.Unlock()
135
136 b.ForceCommit()
137 return err
138}
139
140func (a *AlarmStore) addToMap(newAlarm *pb.AlarmMember) *pb.AlarmMember {
141 t := a.types[newAlarm.Alarm]
142 if t == nil {
143 t = make(alarmSet)
144 a.types[newAlarm.Alarm] = t
145 }
146 m := t[types.ID(newAlarm.MemberID)]
147 if m != nil {
148 return m
149 }
150 t[types.ID(newAlarm.MemberID)] = newAlarm
151 return newAlarm
152}