blob: 4f0b1572ef68f0f2fee96579bad69e71dcb007e3 [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package membership
16
17import (
18 "bytes"
19 "context"
20 "crypto/sha1"
21 "encoding/binary"
22 "encoding/json"
23 "fmt"
24 "path"
25 "sort"
26 "strings"
27 "sync"
28 "time"
29
30 "github.com/coreos/etcd/mvcc/backend"
31 "github.com/coreos/etcd/pkg/netutil"
32 "github.com/coreos/etcd/pkg/types"
33 "github.com/coreos/etcd/raft"
34 "github.com/coreos/etcd/raft/raftpb"
35 "github.com/coreos/etcd/store"
36 "github.com/coreos/etcd/version"
37
38 "github.com/coreos/go-semver/semver"
39)
40
41// RaftCluster is a list of Members that belong to the same raft cluster
42type RaftCluster struct {
43 id types.ID
44 token string
45
46 store store.Store
47 be backend.Backend
48
49 sync.Mutex // guards the fields below
50 version *semver.Version
51 members map[types.ID]*Member
52 // removed contains the ids of removed members in the cluster.
53 // removed id cannot be reused.
54 removed map[types.ID]bool
55}
56
57func NewClusterFromURLsMap(token string, urlsmap types.URLsMap) (*RaftCluster, error) {
58 c := NewCluster(token)
59 for name, urls := range urlsmap {
60 m := NewMember(name, urls, token, nil)
61 if _, ok := c.members[m.ID]; ok {
62 return nil, fmt.Errorf("member exists with identical ID %v", m)
63 }
64 if uint64(m.ID) == raft.None {
65 return nil, fmt.Errorf("cannot use %x as member id", raft.None)
66 }
67 c.members[m.ID] = m
68 }
69 c.genID()
70 return c, nil
71}
72
73func NewClusterFromMembers(token string, id types.ID, membs []*Member) *RaftCluster {
74 c := NewCluster(token)
75 c.id = id
76 for _, m := range membs {
77 c.members[m.ID] = m
78 }
79 return c
80}
81
82func NewCluster(token string) *RaftCluster {
83 return &RaftCluster{
84 token: token,
85 members: make(map[types.ID]*Member),
86 removed: make(map[types.ID]bool),
87 }
88}
89
90func (c *RaftCluster) ID() types.ID { return c.id }
91
92func (c *RaftCluster) Members() []*Member {
93 c.Lock()
94 defer c.Unlock()
95 var ms MembersByID
96 for _, m := range c.members {
97 ms = append(ms, m.Clone())
98 }
99 sort.Sort(ms)
100 return []*Member(ms)
101}
102
103func (c *RaftCluster) Member(id types.ID) *Member {
104 c.Lock()
105 defer c.Unlock()
106 return c.members[id].Clone()
107}
108
109// MemberByName returns a Member with the given name if exists.
110// If more than one member has the given name, it will panic.
111func (c *RaftCluster) MemberByName(name string) *Member {
112 c.Lock()
113 defer c.Unlock()
114 var memb *Member
115 for _, m := range c.members {
116 if m.Name == name {
117 if memb != nil {
118 plog.Panicf("two members with the given name %q exist", name)
119 }
120 memb = m
121 }
122 }
123 return memb.Clone()
124}
125
126func (c *RaftCluster) MemberIDs() []types.ID {
127 c.Lock()
128 defer c.Unlock()
129 var ids []types.ID
130 for _, m := range c.members {
131 ids = append(ids, m.ID)
132 }
133 sort.Sort(types.IDSlice(ids))
134 return ids
135}
136
137func (c *RaftCluster) IsIDRemoved(id types.ID) bool {
138 c.Lock()
139 defer c.Unlock()
140 return c.removed[id]
141}
142
143// PeerURLs returns a list of all peer addresses.
144// The returned list is sorted in ascending lexicographical order.
145func (c *RaftCluster) PeerURLs() []string {
146 c.Lock()
147 defer c.Unlock()
148 urls := make([]string, 0)
149 for _, p := range c.members {
150 urls = append(urls, p.PeerURLs...)
151 }
152 sort.Strings(urls)
153 return urls
154}
155
156// ClientURLs returns a list of all client addresses.
157// The returned list is sorted in ascending lexicographical order.
158func (c *RaftCluster) ClientURLs() []string {
159 c.Lock()
160 defer c.Unlock()
161 urls := make([]string, 0)
162 for _, p := range c.members {
163 urls = append(urls, p.ClientURLs...)
164 }
165 sort.Strings(urls)
166 return urls
167}
168
169func (c *RaftCluster) String() string {
170 c.Lock()
171 defer c.Unlock()
172 b := &bytes.Buffer{}
173 fmt.Fprintf(b, "{ClusterID:%s ", c.id)
174 var ms []string
175 for _, m := range c.members {
176 ms = append(ms, fmt.Sprintf("%+v", m))
177 }
178 fmt.Fprintf(b, "Members:[%s] ", strings.Join(ms, " "))
179 var ids []string
180 for id := range c.removed {
181 ids = append(ids, id.String())
182 }
183 fmt.Fprintf(b, "RemovedMemberIDs:[%s]}", strings.Join(ids, " "))
184 return b.String()
185}
186
187func (c *RaftCluster) genID() {
188 mIDs := c.MemberIDs()
189 b := make([]byte, 8*len(mIDs))
190 for i, id := range mIDs {
191 binary.BigEndian.PutUint64(b[8*i:], uint64(id))
192 }
193 hash := sha1.Sum(b)
194 c.id = types.ID(binary.BigEndian.Uint64(hash[:8]))
195}
196
197func (c *RaftCluster) SetID(id types.ID) { c.id = id }
198
199func (c *RaftCluster) SetStore(st store.Store) { c.store = st }
200
201func (c *RaftCluster) SetBackend(be backend.Backend) {
202 c.be = be
203 mustCreateBackendBuckets(c.be)
204}
205
206func (c *RaftCluster) Recover(onSet func(*semver.Version)) {
207 c.Lock()
208 defer c.Unlock()
209
210 c.members, c.removed = membersFromStore(c.store)
211 c.version = clusterVersionFromStore(c.store)
212 mustDetectDowngrade(c.version)
213 onSet(c.version)
214
215 for _, m := range c.members {
216 plog.Infof("added member %s %v to cluster %s from store", m.ID, m.PeerURLs, c.id)
217 }
218 if c.version != nil {
219 plog.Infof("set the cluster version to %v from store", version.Cluster(c.version.String()))
220 }
221}
222
223// ValidateConfigurationChange takes a proposed ConfChange and
224// ensures that it is still valid.
225func (c *RaftCluster) ValidateConfigurationChange(cc raftpb.ConfChange) error {
226 members, removed := membersFromStore(c.store)
227 id := types.ID(cc.NodeID)
228 if removed[id] {
229 return ErrIDRemoved
230 }
231 switch cc.Type {
232 case raftpb.ConfChangeAddNode:
233 if members[id] != nil {
234 return ErrIDExists
235 }
236 urls := make(map[string]bool)
237 for _, m := range members {
238 for _, u := range m.PeerURLs {
239 urls[u] = true
240 }
241 }
242 m := new(Member)
243 if err := json.Unmarshal(cc.Context, m); err != nil {
244 plog.Panicf("unmarshal member should never fail: %v", err)
245 }
246 for _, u := range m.PeerURLs {
247 if urls[u] {
248 return ErrPeerURLexists
249 }
250 }
251 case raftpb.ConfChangeRemoveNode:
252 if members[id] == nil {
253 return ErrIDNotFound
254 }
255 case raftpb.ConfChangeUpdateNode:
256 if members[id] == nil {
257 return ErrIDNotFound
258 }
259 urls := make(map[string]bool)
260 for _, m := range members {
261 if m.ID == id {
262 continue
263 }
264 for _, u := range m.PeerURLs {
265 urls[u] = true
266 }
267 }
268 m := new(Member)
269 if err := json.Unmarshal(cc.Context, m); err != nil {
270 plog.Panicf("unmarshal member should never fail: %v", err)
271 }
272 for _, u := range m.PeerURLs {
273 if urls[u] {
274 return ErrPeerURLexists
275 }
276 }
277 default:
278 plog.Panicf("ConfChange type should be either AddNode, RemoveNode or UpdateNode")
279 }
280 return nil
281}
282
283// AddMember adds a new Member into the cluster, and saves the given member's
284// raftAttributes into the store. The given member should have empty attributes.
285// A Member with a matching id must not exist.
286func (c *RaftCluster) AddMember(m *Member) {
287 c.Lock()
288 defer c.Unlock()
289 if c.store != nil {
290 mustSaveMemberToStore(c.store, m)
291 }
292 if c.be != nil {
293 mustSaveMemberToBackend(c.be, m)
294 }
295
296 c.members[m.ID] = m
297
298 plog.Infof("added member %s %v to cluster %s", m.ID, m.PeerURLs, c.id)
299}
300
301// RemoveMember removes a member from the store.
302// The given id MUST exist, or the function panics.
303func (c *RaftCluster) RemoveMember(id types.ID) {
304 c.Lock()
305 defer c.Unlock()
306 if c.store != nil {
307 mustDeleteMemberFromStore(c.store, id)
308 }
309 if c.be != nil {
310 mustDeleteMemberFromBackend(c.be, id)
311 }
312
313 delete(c.members, id)
314 c.removed[id] = true
315
316 plog.Infof("removed member %s from cluster %s", id, c.id)
317}
318
319func (c *RaftCluster) UpdateAttributes(id types.ID, attr Attributes) {
320 c.Lock()
321 defer c.Unlock()
322 if m, ok := c.members[id]; ok {
323 m.Attributes = attr
324 if c.store != nil {
325 mustUpdateMemberAttrInStore(c.store, m)
326 }
327 if c.be != nil {
328 mustSaveMemberToBackend(c.be, m)
329 }
330 return
331 }
332 _, ok := c.removed[id]
333 if !ok {
334 plog.Panicf("error updating attributes of unknown member %s", id)
335 }
336 plog.Warningf("skipped updating attributes of removed member %s", id)
337}
338
339func (c *RaftCluster) UpdateRaftAttributes(id types.ID, raftAttr RaftAttributes) {
340 c.Lock()
341 defer c.Unlock()
342
343 c.members[id].RaftAttributes = raftAttr
344 if c.store != nil {
345 mustUpdateMemberInStore(c.store, c.members[id])
346 }
347 if c.be != nil {
348 mustSaveMemberToBackend(c.be, c.members[id])
349 }
350
351 plog.Noticef("updated member %s %v in cluster %s", id, raftAttr.PeerURLs, c.id)
352}
353
354func (c *RaftCluster) Version() *semver.Version {
355 c.Lock()
356 defer c.Unlock()
357 if c.version == nil {
358 return nil
359 }
360 return semver.Must(semver.NewVersion(c.version.String()))
361}
362
363func (c *RaftCluster) SetVersion(ver *semver.Version, onSet func(*semver.Version)) {
364 c.Lock()
365 defer c.Unlock()
366 if c.version != nil {
367 plog.Noticef("updated the cluster version from %v to %v", version.Cluster(c.version.String()), version.Cluster(ver.String()))
368 } else {
369 plog.Noticef("set the initial cluster version to %v", version.Cluster(ver.String()))
370 }
371 c.version = ver
372 mustDetectDowngrade(c.version)
373 if c.store != nil {
374 mustSaveClusterVersionToStore(c.store, ver)
375 }
376 if c.be != nil {
377 mustSaveClusterVersionToBackend(c.be, ver)
378 }
379 onSet(ver)
380}
381
382func (c *RaftCluster) IsReadyToAddNewMember() bool {
383 nmembers := 1
384 nstarted := 0
385
386 for _, member := range c.members {
387 if member.IsStarted() {
388 nstarted++
389 }
390 nmembers++
391 }
392
393 if nstarted == 1 && nmembers == 2 {
394 // a case of adding a new node to 1-member cluster for restoring cluster data
395 // https://github.com/coreos/etcd/blob/master/Documentation/v2/admin_guide.md#restoring-the-cluster
396
397 plog.Debugf("The number of started member is 1. This cluster can accept add member request.")
398 return true
399 }
400
401 nquorum := nmembers/2 + 1
402 if nstarted < nquorum {
403 plog.Warningf("Reject add member request: the number of started member (%d) will be less than the quorum number of the cluster (%d)", nstarted, nquorum)
404 return false
405 }
406
407 return true
408}
409
410func (c *RaftCluster) IsReadyToRemoveMember(id uint64) bool {
411 nmembers := 0
412 nstarted := 0
413
414 for _, member := range c.members {
415 if uint64(member.ID) == id {
416 continue
417 }
418
419 if member.IsStarted() {
420 nstarted++
421 }
422 nmembers++
423 }
424
425 nquorum := nmembers/2 + 1
426 if nstarted < nquorum {
427 plog.Warningf("Reject remove member request: the number of started member (%d) will be less than the quorum number of the cluster (%d)", nstarted, nquorum)
428 return false
429 }
430
431 return true
432}
433
434func membersFromStore(st store.Store) (map[types.ID]*Member, map[types.ID]bool) {
435 members := make(map[types.ID]*Member)
436 removed := make(map[types.ID]bool)
437 e, err := st.Get(StoreMembersPrefix, true, true)
438 if err != nil {
439 if isKeyNotFound(err) {
440 return members, removed
441 }
442 plog.Panicf("get storeMembers should never fail: %v", err)
443 }
444 for _, n := range e.Node.Nodes {
445 var m *Member
446 m, err = nodeToMember(n)
447 if err != nil {
448 plog.Panicf("nodeToMember should never fail: %v", err)
449 }
450 members[m.ID] = m
451 }
452
453 e, err = st.Get(storeRemovedMembersPrefix, true, true)
454 if err != nil {
455 if isKeyNotFound(err) {
456 return members, removed
457 }
458 plog.Panicf("get storeRemovedMembers should never fail: %v", err)
459 }
460 for _, n := range e.Node.Nodes {
461 removed[MustParseMemberIDFromKey(n.Key)] = true
462 }
463 return members, removed
464}
465
466func clusterVersionFromStore(st store.Store) *semver.Version {
467 e, err := st.Get(path.Join(storePrefix, "version"), false, false)
468 if err != nil {
469 if isKeyNotFound(err) {
470 return nil
471 }
472 plog.Panicf("unexpected error (%v) when getting cluster version from store", err)
473 }
474 return semver.Must(semver.NewVersion(*e.Node.Value))
475}
476
477// ValidateClusterAndAssignIDs validates the local cluster by matching the PeerURLs
478// with the existing cluster. If the validation succeeds, it assigns the IDs
479// from the existing cluster to the local cluster.
480// If the validation fails, an error will be returned.
481func ValidateClusterAndAssignIDs(local *RaftCluster, existing *RaftCluster) error {
482 ems := existing.Members()
483 lms := local.Members()
484 if len(ems) != len(lms) {
485 return fmt.Errorf("member count is unequal")
486 }
487 sort.Sort(MembersByPeerURLs(ems))
488 sort.Sort(MembersByPeerURLs(lms))
489
490 ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
491 defer cancel()
492 for i := range ems {
493 if ok, err := netutil.URLStringsEqual(ctx, ems[i].PeerURLs, lms[i].PeerURLs); !ok {
494 return fmt.Errorf("unmatched member while checking PeerURLs (%v)", err)
495 }
496 lms[i].ID = ems[i].ID
497 }
498 local.members = make(map[types.ID]*Member)
499 for _, m := range lms {
500 local.members[m.ID] = m
501 }
502 return nil
503}
504
505func mustDetectDowngrade(cv *semver.Version) {
506 lv := semver.Must(semver.NewVersion(version.Version))
507 // only keep major.minor version for comparison against cluster version
508 lv = &semver.Version{Major: lv.Major, Minor: lv.Minor}
509 if cv != nil && lv.LessThan(*cv) {
510 plog.Fatalf("cluster cannot be downgraded (current version: %s is lower than determined cluster version: %s).", version.Version, version.Cluster(cv.String()))
511 }
512}