blob: 78daa0ea97bf801fae81db0a091431ff4443a39c [file] [log] [blame]
khenaidoo59ce9dd2019-11-11 13:05:32 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18 "context"
19 "encoding/json"
20 "expvar"
21 "fmt"
22 "math"
23 "math/rand"
24 "net/http"
25 "os"
26 "path"
27 "regexp"
28 "sync"
29 "sync/atomic"
30 "time"
31
32 "go.etcd.io/etcd/auth"
33 "go.etcd.io/etcd/etcdserver/api"
34 "go.etcd.io/etcd/etcdserver/api/membership"
35 "go.etcd.io/etcd/etcdserver/api/rafthttp"
36 "go.etcd.io/etcd/etcdserver/api/snap"
37 "go.etcd.io/etcd/etcdserver/api/v2discovery"
38 "go.etcd.io/etcd/etcdserver/api/v2http/httptypes"
39 stats "go.etcd.io/etcd/etcdserver/api/v2stats"
40 "go.etcd.io/etcd/etcdserver/api/v2store"
41 "go.etcd.io/etcd/etcdserver/api/v3alarm"
42 "go.etcd.io/etcd/etcdserver/api/v3compactor"
43 pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
44 "go.etcd.io/etcd/lease"
45 "go.etcd.io/etcd/lease/leasehttp"
46 "go.etcd.io/etcd/mvcc"
47 "go.etcd.io/etcd/mvcc/backend"
48 "go.etcd.io/etcd/pkg/fileutil"
49 "go.etcd.io/etcd/pkg/idutil"
50 "go.etcd.io/etcd/pkg/pbutil"
51 "go.etcd.io/etcd/pkg/runtime"
52 "go.etcd.io/etcd/pkg/schedule"
53 "go.etcd.io/etcd/pkg/types"
54 "go.etcd.io/etcd/pkg/wait"
55 "go.etcd.io/etcd/raft"
56 "go.etcd.io/etcd/raft/raftpb"
57 "go.etcd.io/etcd/version"
58 "go.etcd.io/etcd/wal"
59
60 "github.com/coreos/go-semver/semver"
61 "github.com/coreos/pkg/capnslog"
62 humanize "github.com/dustin/go-humanize"
63 "github.com/prometheus/client_golang/prometheus"
64 "go.uber.org/zap"
65)
66
67const (
68 DefaultSnapshotCount = 100000
69
70 // DefaultSnapshotCatchUpEntries is the number of entries for a slow follower
71 // to catch-up after compacting the raft storage entries.
72 // We expect the follower has a millisecond level latency with the leader.
73 // The max throughput is around 10K. Keep a 5K entries is enough for helping
74 // follower to catch up.
75 DefaultSnapshotCatchUpEntries uint64 = 5000
76
77 StoreClusterPrefix = "/0"
78 StoreKeysPrefix = "/1"
79
80 // HealthInterval is the minimum time the cluster should be healthy
81 // before accepting add member requests.
82 HealthInterval = 5 * time.Second
83
84 purgeFileInterval = 30 * time.Second
85 // monitorVersionInterval should be smaller than the timeout
86 // on the connection. Or we will not be able to reuse the connection
87 // (since it will timeout).
88 monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second
89
90 // max number of in-flight snapshot messages etcdserver allows to have
91 // This number is more than enough for most clusters with 5 machines.
92 maxInFlightMsgSnap = 16
93
94 releaseDelayAfterSnapshot = 30 * time.Second
95
96 // maxPendingRevokes is the maximum number of outstanding expired lease revocations.
97 maxPendingRevokes = 16
98
99 recommendedMaxRequestBytes = 10 * 1024 * 1024
100
101 readyPercent = 0.9
102)
103
104var (
105 plog = capnslog.NewPackageLogger("go.etcd.io/etcd", "etcdserver")
106
107 storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes"))
108)
109
110func init() {
111 rand.Seed(time.Now().UnixNano())
112
113 expvar.Publish(
114 "file_descriptor_limit",
115 expvar.Func(
116 func() interface{} {
117 n, _ := runtime.FDLimit()
118 return n
119 },
120 ),
121 )
122}
123
124type Response struct {
125 Term uint64
126 Index uint64
127 Event *v2store.Event
128 Watcher v2store.Watcher
129 Err error
130}
131
132type ServerV2 interface {
133 Server
134 Leader() types.ID
135
136 // Do takes a V2 request and attempts to fulfill it, returning a Response.
137 Do(ctx context.Context, r pb.Request) (Response, error)
138 stats.Stats
139 ClientCertAuthEnabled() bool
140}
141
142type ServerV3 interface {
143 Server
144 RaftStatusGetter
145}
146
147func (s *EtcdServer) ClientCertAuthEnabled() bool { return s.Cfg.ClientCertAuthEnabled }
148
149type Server interface {
150 // AddMember attempts to add a member into the cluster. It will return
151 // ErrIDRemoved if member ID is removed from the cluster, or return
152 // ErrIDExists if member ID exists in the cluster.
153 AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
154 // RemoveMember attempts to remove a member from the cluster. It will
155 // return ErrIDRemoved if member ID is removed from the cluster, or return
156 // ErrIDNotFound if member ID is not in the cluster.
157 RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error)
158 // UpdateMember attempts to update an existing member in the cluster. It will
159 // return ErrIDNotFound if the member ID does not exist.
160 UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error)
161 // PromoteMember attempts to promote a non-voting node to a voting node. It will
162 // return ErrIDNotFound if the member ID does not exist.
163 // return ErrLearnerNotReady if the member are not ready.
164 // return ErrMemberNotLearner if the member is not a learner.
165 PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error)
166
167 // ClusterVersion is the cluster-wide minimum major.minor version.
168 // Cluster version is set to the min version that an etcd member is
169 // compatible with when first bootstrap.
170 //
171 // ClusterVersion is nil until the cluster is bootstrapped (has a quorum).
172 //
173 // During a rolling upgrades, the ClusterVersion will be updated
174 // automatically after a sync. (5 second by default)
175 //
176 // The API/raft component can utilize ClusterVersion to determine if
177 // it can accept a client request or a raft RPC.
178 // NOTE: ClusterVersion might be nil when etcd 2.1 works with etcd 2.0 and
179 // the leader is etcd 2.0. etcd 2.0 leader will not update clusterVersion since
180 // this feature is introduced post 2.0.
181 ClusterVersion() *semver.Version
182 Cluster() api.Cluster
183 Alarms() []*pb.AlarmMember
184}
185
186// EtcdServer is the production implementation of the Server interface
187type EtcdServer struct {
188 // inflightSnapshots holds count the number of snapshots currently inflight.
189 inflightSnapshots int64 // must use atomic operations to access; keep 64-bit aligned.
190 appliedIndex uint64 // must use atomic operations to access; keep 64-bit aligned.
191 committedIndex uint64 // must use atomic operations to access; keep 64-bit aligned.
192 term uint64 // must use atomic operations to access; keep 64-bit aligned.
193 lead uint64 // must use atomic operations to access; keep 64-bit aligned.
194
195 // consistIndex used to hold the offset of current executing entry
196 // It is initialized to 0 before executing any entry.
197 consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned.
198 r raftNode // uses 64-bit atomics; keep 64-bit aligned.
199
200 readych chan struct{}
201 Cfg ServerConfig
202
203 lgMu *sync.RWMutex
204 lg *zap.Logger
205
206 w wait.Wait
207
208 readMu sync.RWMutex
209 // read routine notifies etcd server that it waits for reading by sending an empty struct to
210 // readwaitC
211 readwaitc chan struct{}
212 // readNotifier is used to notify the read routine that it can process the request
213 // when there is no error
214 readNotifier *notifier
215
216 // stop signals the run goroutine should shutdown.
217 stop chan struct{}
218 // stopping is closed by run goroutine on shutdown.
219 stopping chan struct{}
220 // done is closed when all goroutines from start() complete.
221 done chan struct{}
222 // leaderChanged is used to notify the linearizable read loop to drop the old read requests.
223 leaderChanged chan struct{}
224 leaderChangedMu sync.RWMutex
225
226 errorc chan error
227 id types.ID
228 attributes membership.Attributes
229
230 cluster *membership.RaftCluster
231
232 v2store v2store.Store
233 snapshotter *snap.Snapshotter
234
235 applyV2 ApplierV2
236
237 // applyV3 is the applier with auth and quotas
238 applyV3 applierV3
239 // applyV3Base is the core applier without auth or quotas
240 applyV3Base applierV3
241 applyWait wait.WaitTime
242
243 kv mvcc.ConsistentWatchableKV
244 lessor lease.Lessor
245 bemu sync.Mutex
246 be backend.Backend
247 authStore auth.AuthStore
248 alarmStore *v3alarm.AlarmStore
249
250 stats *stats.ServerStats
251 lstats *stats.LeaderStats
252
253 SyncTicker *time.Ticker
254 // compactor is used to auto-compact the KV.
255 compactor v3compactor.Compactor
256
257 // peerRt used to send requests (version, lease) to peers.
258 peerRt http.RoundTripper
259 reqIDGen *idutil.Generator
260
261 // forceVersionC is used to force the version monitor loop
262 // to detect the cluster version immediately.
263 forceVersionC chan struct{}
264
265 // wgMu blocks concurrent waitgroup mutation while server stopping
266 wgMu sync.RWMutex
267 // wg is used to wait for the go routines that depends on the server state
268 // to exit when stopping the server.
269 wg sync.WaitGroup
270
271 // ctx is used for etcd-initiated requests that may need to be canceled
272 // on etcd server shutdown.
273 ctx context.Context
274 cancel context.CancelFunc
275
276 leadTimeMu sync.RWMutex
277 leadElectedTime time.Time
278
279 *AccessController
280}
281
282// NewServer creates a new EtcdServer from the supplied configuration. The
283// configuration is considered static for the lifetime of the EtcdServer.
284func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
285 st := v2store.New(StoreClusterPrefix, StoreKeysPrefix)
286
287 var (
288 w *wal.WAL
289 n raft.Node
290 s *raft.MemoryStorage
291 id types.ID
292 cl *membership.RaftCluster
293 )
294
295 if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
296 if cfg.Logger != nil {
297 cfg.Logger.Warn(
298 "exceeded recommended request limit",
299 zap.Uint("max-request-bytes", cfg.MaxRequestBytes),
300 zap.String("max-request-size", humanize.Bytes(uint64(cfg.MaxRequestBytes))),
301 zap.Int("recommended-request-bytes", recommendedMaxRequestBytes),
302 zap.String("recommended-request-size", humanize.Bytes(uint64(recommendedMaxRequestBytes))),
303 )
304 } else {
305 plog.Warningf("MaxRequestBytes %v exceeds maximum recommended size %v", cfg.MaxRequestBytes, recommendedMaxRequestBytes)
306 }
307 }
308
309 if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
310 return nil, fmt.Errorf("cannot access data directory: %v", terr)
311 }
312
313 haveWAL := wal.Exist(cfg.WALDir())
314
315 if err = fileutil.TouchDirAll(cfg.SnapDir()); err != nil {
316 if cfg.Logger != nil {
317 cfg.Logger.Fatal(
318 "failed to create snapshot directory",
319 zap.String("path", cfg.SnapDir()),
320 zap.Error(err),
321 )
322 } else {
323 plog.Fatalf("create snapshot directory error: %v", err)
324 }
325 }
326 ss := snap.New(cfg.Logger, cfg.SnapDir())
327
328 bepath := cfg.backendPath()
329 beExist := fileutil.Exist(bepath)
330 be := openBackend(cfg)
331
332 defer func() {
333 if err != nil {
334 be.Close()
335 }
336 }()
337
338 prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout())
339 if err != nil {
340 return nil, err
341 }
342 var (
343 remotes []*membership.Member
344 snapshot *raftpb.Snapshot
345 )
346
347 switch {
348 case !haveWAL && !cfg.NewCluster:
349 if err = cfg.VerifyJoinExisting(); err != nil {
350 return nil, err
351 }
352 cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
353 if err != nil {
354 return nil, err
355 }
356 existingCluster, gerr := GetClusterFromRemotePeers(cfg.Logger, getRemotePeerURLs(cl, cfg.Name), prt)
357 if gerr != nil {
358 return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr)
359 }
360 if err = membership.ValidateClusterAndAssignIDs(cfg.Logger, cl, existingCluster); err != nil {
361 return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
362 }
363 if !isCompatibleWithCluster(cfg.Logger, cl, cl.MemberByName(cfg.Name).ID, prt) {
364 return nil, fmt.Errorf("incompatible with current running cluster")
365 }
366
367 remotes = existingCluster.Members()
368 cl.SetID(types.ID(0), existingCluster.ID())
369 cl.SetStore(st)
370 cl.SetBackend(be)
371 id, n, s, w = startNode(cfg, cl, nil)
372 cl.SetID(id, existingCluster.ID())
373
374 case !haveWAL && cfg.NewCluster:
375 if err = cfg.VerifyBootstrap(); err != nil {
376 return nil, err
377 }
378 cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
379 if err != nil {
380 return nil, err
381 }
382 m := cl.MemberByName(cfg.Name)
383 if isMemberBootstrapped(cfg.Logger, cl, cfg.Name, prt, cfg.bootstrapTimeout()) {
384 return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
385 }
386 if cfg.ShouldDiscover() {
387 var str string
388 str, err = v2discovery.JoinCluster(cfg.Logger, cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String())
389 if err != nil {
390 return nil, &DiscoveryError{Op: "join", Err: err}
391 }
392 var urlsmap types.URLsMap
393 urlsmap, err = types.NewURLsMap(str)
394 if err != nil {
395 return nil, err
396 }
397 if checkDuplicateURL(urlsmap) {
398 return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap)
399 }
400 if cl, err = membership.NewClusterFromURLsMap(cfg.Logger, cfg.InitialClusterToken, urlsmap); err != nil {
401 return nil, err
402 }
403 }
404 cl.SetStore(st)
405 cl.SetBackend(be)
406 id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
407 cl.SetID(id, cl.ID())
408
409 case haveWAL:
410 if err = fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
411 return nil, fmt.Errorf("cannot write to member directory: %v", err)
412 }
413
414 if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
415 return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
416 }
417
418 if cfg.ShouldDiscover() {
419 if cfg.Logger != nil {
420 cfg.Logger.Warn(
421 "discovery token is ignored since cluster already initialized; valid logs are found",
422 zap.String("wal-dir", cfg.WALDir()),
423 )
424 } else {
425 plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
426 }
427 }
428 snapshot, err = ss.Load()
429 if err != nil && err != snap.ErrNoSnapshot {
430 return nil, err
431 }
432 if snapshot != nil {
433 if err = st.Recovery(snapshot.Data); err != nil {
434 if cfg.Logger != nil {
435 cfg.Logger.Panic("failed to recover from snapshot")
436 } else {
437 plog.Panicf("recovered store from snapshot error: %v", err)
438 }
439 }
440
441 if cfg.Logger != nil {
442 cfg.Logger.Info(
443 "recovered v2 store from snapshot",
444 zap.Uint64("snapshot-index", snapshot.Metadata.Index),
445 zap.String("snapshot-size", humanize.Bytes(uint64(snapshot.Size()))),
446 )
447 } else {
448 plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index)
449 }
450
451 if be, err = recoverSnapshotBackend(cfg, be, *snapshot); err != nil {
452 if cfg.Logger != nil {
453 cfg.Logger.Panic("failed to recover v3 backend from snapshot", zap.Error(err))
454 } else {
455 plog.Panicf("recovering backend from snapshot error: %v", err)
456 }
457 }
458 if cfg.Logger != nil {
459 s1, s2 := be.Size(), be.SizeInUse()
460 cfg.Logger.Info(
461 "recovered v3 backend from snapshot",
462 zap.Int64("backend-size-bytes", s1),
463 zap.String("backend-size", humanize.Bytes(uint64(s1))),
464 zap.Int64("backend-size-in-use-bytes", s2),
465 zap.String("backend-size-in-use", humanize.Bytes(uint64(s2))),
466 )
467 }
468 }
469
470 if !cfg.ForceNewCluster {
471 id, cl, n, s, w = restartNode(cfg, snapshot)
472 } else {
473 id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot)
474 }
475
476 cl.SetStore(st)
477 cl.SetBackend(be)
478 cl.Recover(api.UpdateCapability)
479 if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
480 os.RemoveAll(bepath)
481 return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
482 }
483
484 default:
485 return nil, fmt.Errorf("unsupported bootstrap config")
486 }
487
488 if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
489 return nil, fmt.Errorf("cannot access member directory: %v", terr)
490 }
491
492 sstats := stats.NewServerStats(cfg.Name, id.String())
493 lstats := stats.NewLeaderStats(id.String())
494
495 heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
496 srv = &EtcdServer{
497 readych: make(chan struct{}),
498 Cfg: cfg,
499 lgMu: new(sync.RWMutex),
500 lg: cfg.Logger,
501 errorc: make(chan error, 1),
502 v2store: st,
503 snapshotter: ss,
504 r: *newRaftNode(
505 raftNodeConfig{
506 lg: cfg.Logger,
507 isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
508 Node: n,
509 heartbeat: heartbeat,
510 raftStorage: s,
511 storage: NewStorage(w, ss),
512 },
513 ),
514 id: id,
515 attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
516 cluster: cl,
517 stats: sstats,
518 lstats: lstats,
519 SyncTicker: time.NewTicker(500 * time.Millisecond),
520 peerRt: prt,
521 reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
522 forceVersionC: make(chan struct{}),
523 AccessController: &AccessController{CORS: cfg.CORS, HostWhitelist: cfg.HostWhitelist},
524 }
525 serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1)
526
527 srv.applyV2 = &applierV2store{store: srv.v2store, cluster: srv.cluster}
528
529 srv.be = be
530 minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
531
532 // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
533 // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
534 srv.lessor = lease.NewLessor(
535 srv.getLogger(),
536 srv.be,
537 lease.LessorConfig{
538 MinLeaseTTL: int64(math.Ceil(minTTL.Seconds())),
539 CheckpointInterval: cfg.LeaseCheckpointInterval,
540 ExpiredLeasesRetryInterval: srv.Cfg.ReqTimeout(),
541 })
542 srv.kv = mvcc.New(srv.getLogger(), srv.be, srv.lessor, &srv.consistIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
543 if beExist {
544 kvindex := srv.kv.ConsistentIndex()
545 // TODO: remove kvindex != 0 checking when we do not expect users to upgrade
546 // etcd from pre-3.0 release.
547 if snapshot != nil && kvindex < snapshot.Metadata.Index {
548 if kvindex != 0 {
549 return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d)", bepath, kvindex, snapshot.Metadata.Index)
550 }
551 if cfg.Logger != nil {
552 cfg.Logger.Warn(
553 "consistent index was never saved",
554 zap.Uint64("snapshot-index", snapshot.Metadata.Index),
555 )
556 } else {
557 plog.Warningf("consistent index never saved (snapshot index=%d)", snapshot.Metadata.Index)
558 }
559 }
560 }
561 newSrv := srv // since srv == nil in defer if srv is returned as nil
562 defer func() {
563 // closing backend without first closing kv can cause
564 // resumed compactions to fail with closed tx errors
565 if err != nil {
566 newSrv.kv.Close()
567 }
568 }()
569
570 srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
571 tp, err := auth.NewTokenProvider(cfg.Logger, cfg.AuthToken,
572 func(index uint64) <-chan struct{} {
573 return srv.applyWait.Wait(index)
574 },
575 )
576 if err != nil {
577 if cfg.Logger != nil {
578 cfg.Logger.Warn("failed to create token provider", zap.Error(err))
579 } else {
580 plog.Errorf("failed to create token provider: %s", err)
581 }
582 return nil, err
583 }
584 srv.authStore = auth.NewAuthStore(srv.getLogger(), srv.be, tp, int(cfg.BcryptCost))
585 if num := cfg.AutoCompactionRetention; num != 0 {
586 srv.compactor, err = v3compactor.New(cfg.Logger, cfg.AutoCompactionMode, num, srv.kv, srv)
587 if err != nil {
588 return nil, err
589 }
590 srv.compactor.Run()
591 }
592
593 srv.applyV3Base = srv.newApplierV3Backend()
594 if err = srv.restoreAlarms(); err != nil {
595 return nil, err
596 }
597
598 if srv.Cfg.EnableLeaseCheckpoint {
599 // setting checkpointer enables lease checkpoint feature.
600 srv.lessor.SetCheckpointer(func(ctx context.Context, cp *pb.LeaseCheckpointRequest) {
601 srv.raftRequestOnce(ctx, pb.InternalRaftRequest{LeaseCheckpoint: cp})
602 })
603 }
604
605 // TODO: move transport initialization near the definition of remote
606 tr := &rafthttp.Transport{
607 Logger: cfg.Logger,
608 TLSInfo: cfg.PeerTLSInfo,
609 DialTimeout: cfg.peerDialTimeout(),
610 ID: id,
611 URLs: cfg.PeerURLs,
612 ClusterID: cl.ID(),
613 Raft: srv,
614 Snapshotter: ss,
615 ServerStats: sstats,
616 LeaderStats: lstats,
617 ErrorC: srv.errorc,
618 }
619 if err = tr.Start(); err != nil {
620 return nil, err
621 }
622 // add all remotes into transport
623 for _, m := range remotes {
624 if m.ID != id {
625 tr.AddRemote(m.ID, m.PeerURLs)
626 }
627 }
628 for _, m := range cl.Members() {
629 if m.ID != id {
630 tr.AddPeer(m.ID, m.PeerURLs)
631 }
632 }
633 srv.r.transport = tr
634
635 return srv, nil
636}
637
638func (s *EtcdServer) getLogger() *zap.Logger {
639 s.lgMu.RLock()
640 l := s.lg
641 s.lgMu.RUnlock()
642 return l
643}
644
645func tickToDur(ticks int, tickMs uint) string {
646 return fmt.Sprintf("%v", time.Duration(ticks)*time.Duration(tickMs)*time.Millisecond)
647}
648
649func (s *EtcdServer) adjustTicks() {
650 lg := s.getLogger()
651 clusterN := len(s.cluster.Members())
652
653 // single-node fresh start, or single-node recovers from snapshot
654 if clusterN == 1 {
655 ticks := s.Cfg.ElectionTicks - 1
656 if lg != nil {
657 lg.Info(
658 "started as single-node; fast-forwarding election ticks",
659 zap.String("local-member-id", s.ID().String()),
660 zap.Int("forward-ticks", ticks),
661 zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)),
662 zap.Int("election-ticks", s.Cfg.ElectionTicks),
663 zap.String("election-timeout", tickToDur(s.Cfg.ElectionTicks, s.Cfg.TickMs)),
664 )
665 } else {
666 plog.Infof("%s as single-node; fast-forwarding %d ticks (election ticks %d)", s.ID(), ticks, s.Cfg.ElectionTicks)
667 }
668 s.r.advanceTicks(ticks)
669 return
670 }
671
672 if !s.Cfg.InitialElectionTickAdvance {
673 if lg != nil {
674 lg.Info("skipping initial election tick advance", zap.Int("election-ticks", s.Cfg.ElectionTicks))
675 }
676 return
677 }
678 if lg != nil {
679 lg.Info("starting initial election tick advance", zap.Int("election-ticks", s.Cfg.ElectionTicks))
680 }
681
682 // retry up to "rafthttp.ConnReadTimeout", which is 5-sec
683 // until peer connection reports; otherwise:
684 // 1. all connections failed, or
685 // 2. no active peers, or
686 // 3. restarted single-node with no snapshot
687 // then, do nothing, because advancing ticks would have no effect
688 waitTime := rafthttp.ConnReadTimeout
689 itv := 50 * time.Millisecond
690 for i := int64(0); i < int64(waitTime/itv); i++ {
691 select {
692 case <-time.After(itv):
693 case <-s.stopping:
694 return
695 }
696
697 peerN := s.r.transport.ActivePeers()
698 if peerN > 1 {
699 // multi-node received peer connection reports
700 // adjust ticks, in case slow leader message receive
701 ticks := s.Cfg.ElectionTicks - 2
702
703 if lg != nil {
704 lg.Info(
705 "initialized peer connections; fast-forwarding election ticks",
706 zap.String("local-member-id", s.ID().String()),
707 zap.Int("forward-ticks", ticks),
708 zap.String("forward-duration", tickToDur(ticks, s.Cfg.TickMs)),
709 zap.Int("election-ticks", s.Cfg.ElectionTicks),
710 zap.String("election-timeout", tickToDur(s.Cfg.ElectionTicks, s.Cfg.TickMs)),
711 zap.Int("active-remote-members", peerN),
712 )
713 } else {
714 plog.Infof("%s initialized peer connection; fast-forwarding %d ticks (election ticks %d) with %d active peer(s)", s.ID(), ticks, s.Cfg.ElectionTicks, peerN)
715 }
716
717 s.r.advanceTicks(ticks)
718 return
719 }
720 }
721}
722
723// Start performs any initialization of the Server necessary for it to
724// begin serving requests. It must be called before Do or Process.
725// Start must be non-blocking; any long-running server functionality
726// should be implemented in goroutines.
727func (s *EtcdServer) Start() {
728 s.start()
729 s.goAttach(func() { s.adjustTicks() })
730 s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
731 s.goAttach(s.purgeFile)
732 s.goAttach(func() { monitorFileDescriptor(s.getLogger(), s.stopping) })
733 s.goAttach(s.monitorVersions)
734 s.goAttach(s.linearizableReadLoop)
735 s.goAttach(s.monitorKVHash)
736}
737
738// start prepares and starts server in a new goroutine. It is no longer safe to
739// modify a server's fields after it has been sent to Start.
740// This function is just used for testing.
741func (s *EtcdServer) start() {
742 lg := s.getLogger()
743
744 if s.Cfg.SnapshotCount == 0 {
745 if lg != nil {
746 lg.Info(
747 "updating snapshot-count to default",
748 zap.Uint64("given-snapshot-count", s.Cfg.SnapshotCount),
749 zap.Uint64("updated-snapshot-count", DefaultSnapshotCount),
750 )
751 } else {
752 plog.Infof("set snapshot count to default %d", DefaultSnapshotCount)
753 }
754 s.Cfg.SnapshotCount = DefaultSnapshotCount
755 }
756 if s.Cfg.SnapshotCatchUpEntries == 0 {
757 if lg != nil {
758 lg.Info(
759 "updating snapshot catch-up entries to default",
760 zap.Uint64("given-snapshot-catchup-entries", s.Cfg.SnapshotCatchUpEntries),
761 zap.Uint64("updated-snapshot-catchup-entries", DefaultSnapshotCatchUpEntries),
762 )
763 }
764 s.Cfg.SnapshotCatchUpEntries = DefaultSnapshotCatchUpEntries
765 }
766
767 s.w = wait.New()
768 s.applyWait = wait.NewTimeList()
769 s.done = make(chan struct{})
770 s.stop = make(chan struct{})
771 s.stopping = make(chan struct{})
772 s.ctx, s.cancel = context.WithCancel(context.Background())
773 s.readwaitc = make(chan struct{}, 1)
774 s.readNotifier = newNotifier()
775 s.leaderChanged = make(chan struct{})
776 if s.ClusterVersion() != nil {
777 if lg != nil {
778 lg.Info(
779 "starting etcd server",
780 zap.String("local-member-id", s.ID().String()),
781 zap.String("local-server-version", version.Version),
782 zap.String("cluster-id", s.Cluster().ID().String()),
783 zap.String("cluster-version", version.Cluster(s.ClusterVersion().String())),
784 )
785 } else {
786 plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String()))
787 }
788 membership.ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": s.ClusterVersion().String()}).Set(1)
789 } else {
790 if lg != nil {
791 lg.Info(
792 "starting etcd server",
793 zap.String("local-member-id", s.ID().String()),
794 zap.String("local-server-version", version.Version),
795 zap.String("cluster-version", "to_be_decided"),
796 )
797 } else {
798 plog.Infof("starting server... [version: %v, cluster version: to_be_decided]", version.Version)
799 }
800 }
801
802 // TODO: if this is an empty log, writes all peer infos
803 // into the first entry
804 go s.run()
805}
806
807func (s *EtcdServer) purgeFile() {
808 var dberrc, serrc, werrc <-chan error
809 if s.Cfg.MaxSnapFiles > 0 {
810 dberrc = fileutil.PurgeFile(s.getLogger(), s.Cfg.SnapDir(), "snap.db", s.Cfg.MaxSnapFiles, purgeFileInterval, s.done)
811 serrc = fileutil.PurgeFile(s.getLogger(), s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.done)
812 }
813 if s.Cfg.MaxWALFiles > 0 {
814 werrc = fileutil.PurgeFile(s.getLogger(), s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.done)
815 }
816
817 lg := s.getLogger()
818 select {
819 case e := <-dberrc:
820 if lg != nil {
821 lg.Fatal("failed to purge snap db file", zap.Error(e))
822 } else {
823 plog.Fatalf("failed to purge snap db file %v", e)
824 }
825 case e := <-serrc:
826 if lg != nil {
827 lg.Fatal("failed to purge snap file", zap.Error(e))
828 } else {
829 plog.Fatalf("failed to purge snap file %v", e)
830 }
831 case e := <-werrc:
832 if lg != nil {
833 lg.Fatal("failed to purge wal file", zap.Error(e))
834 } else {
835 plog.Fatalf("failed to purge wal file %v", e)
836 }
837 case <-s.stopping:
838 return
839 }
840}
841
842func (s *EtcdServer) Cluster() api.Cluster { return s.cluster }
843
844func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) }
845
846type ServerPeer interface {
847 ServerV2
848 RaftHandler() http.Handler
849 LeaseHandler() http.Handler
850}
851
852func (s *EtcdServer) LeaseHandler() http.Handler {
853 if s.lessor == nil {
854 return nil
855 }
856 return leasehttp.NewHandler(s.lessor, s.ApplyWait)
857}
858
859func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
860
861// Process takes a raft message and applies it to the server's raft state
862// machine, respecting any timeout of the given context.
863func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
864 if s.cluster.IsIDRemoved(types.ID(m.From)) {
865 if lg := s.getLogger(); lg != nil {
866 lg.Warn(
867 "rejected Raft message from removed member",
868 zap.String("local-member-id", s.ID().String()),
869 zap.String("removed-member-id", types.ID(m.From).String()),
870 )
871 } else {
872 plog.Warningf("reject message from removed member %s", types.ID(m.From).String())
873 }
874 return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member")
875 }
876 if m.Type == raftpb.MsgApp {
877 s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size())
878 }
879 return s.r.Step(ctx, m)
880}
881
882func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.cluster.IsIDRemoved(types.ID(id)) }
883
884func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) }
885
886// ReportSnapshot reports snapshot sent status to the raft state machine,
887// and clears the used snapshot from the snapshot store.
888func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
889 s.r.ReportSnapshot(id, status)
890}
891
892type etcdProgress struct {
893 confState raftpb.ConfState
894 snapi uint64
895 appliedt uint64
896 appliedi uint64
897}
898
899// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode,
900// and helps decouple state machine logic from Raft algorithms.
901// TODO: add a state machine interface to apply the commit entries and do snapshot/recover
902type raftReadyHandler struct {
903 getLead func() (lead uint64)
904 updateLead func(lead uint64)
905 updateLeadership func(newLeader bool)
906 updateCommittedIndex func(uint64)
907}
908
909func (s *EtcdServer) run() {
910 lg := s.getLogger()
911
912 sn, err := s.r.raftStorage.Snapshot()
913 if err != nil {
914 if lg != nil {
915 lg.Panic("failed to get snapshot from Raft storage", zap.Error(err))
916 } else {
917 plog.Panicf("get snapshot from raft storage error: %v", err)
918 }
919 }
920
921 // asynchronously accept apply packets, dispatch progress in-order
922 sched := schedule.NewFIFOScheduler()
923
924 var (
925 smu sync.RWMutex
926 syncC <-chan time.Time
927 )
928 setSyncC := func(ch <-chan time.Time) {
929 smu.Lock()
930 syncC = ch
931 smu.Unlock()
932 }
933 getSyncC := func() (ch <-chan time.Time) {
934 smu.RLock()
935 ch = syncC
936 smu.RUnlock()
937 return
938 }
939 rh := &raftReadyHandler{
940 getLead: func() (lead uint64) { return s.getLead() },
941 updateLead: func(lead uint64) { s.setLead(lead) },
942 updateLeadership: func(newLeader bool) {
943 if !s.isLeader() {
944 if s.lessor != nil {
945 s.lessor.Demote()
946 }
947 if s.compactor != nil {
948 s.compactor.Pause()
949 }
950 setSyncC(nil)
951 } else {
952 if newLeader {
953 t := time.Now()
954 s.leadTimeMu.Lock()
955 s.leadElectedTime = t
956 s.leadTimeMu.Unlock()
957 }
958 setSyncC(s.SyncTicker.C)
959 if s.compactor != nil {
960 s.compactor.Resume()
961 }
962 }
963 if newLeader {
964 s.leaderChangedMu.Lock()
965 lc := s.leaderChanged
966 s.leaderChanged = make(chan struct{})
967 close(lc)
968 s.leaderChangedMu.Unlock()
969 }
970 // TODO: remove the nil checking
971 // current test utility does not provide the stats
972 if s.stats != nil {
973 s.stats.BecomeLeader()
974 }
975 },
976 updateCommittedIndex: func(ci uint64) {
977 cci := s.getCommittedIndex()
978 if ci > cci {
979 s.setCommittedIndex(ci)
980 }
981 },
982 }
983 s.r.start(rh)
984
985 ep := etcdProgress{
986 confState: sn.Metadata.ConfState,
987 snapi: sn.Metadata.Index,
988 appliedt: sn.Metadata.Term,
989 appliedi: sn.Metadata.Index,
990 }
991
992 defer func() {
993 s.wgMu.Lock() // block concurrent waitgroup adds in goAttach while stopping
994 close(s.stopping)
995 s.wgMu.Unlock()
996 s.cancel()
997
998 sched.Stop()
999
1000 // wait for gouroutines before closing raft so wal stays open
1001 s.wg.Wait()
1002
1003 s.SyncTicker.Stop()
1004
1005 // must stop raft after scheduler-- etcdserver can leak rafthttp pipelines
1006 // by adding a peer after raft stops the transport
1007 s.r.stop()
1008
1009 // kv, lessor and backend can be nil if running without v3 enabled
1010 // or running unit tests.
1011 if s.lessor != nil {
1012 s.lessor.Stop()
1013 }
1014 if s.kv != nil {
1015 s.kv.Close()
1016 }
1017 if s.authStore != nil {
1018 s.authStore.Close()
1019 }
1020 if s.be != nil {
1021 s.be.Close()
1022 }
1023 if s.compactor != nil {
1024 s.compactor.Stop()
1025 }
1026 close(s.done)
1027 }()
1028
1029 var expiredLeaseC <-chan []*lease.Lease
1030 if s.lessor != nil {
1031 expiredLeaseC = s.lessor.ExpiredLeasesC()
1032 }
1033
1034 for {
1035 select {
1036 case ap := <-s.r.apply():
1037 f := func(context.Context) { s.applyAll(&ep, &ap) }
1038 sched.Schedule(f)
1039 case leases := <-expiredLeaseC:
1040 s.goAttach(func() {
1041 // Increases throughput of expired leases deletion process through parallelization
1042 c := make(chan struct{}, maxPendingRevokes)
1043 for _, lease := range leases {
1044 select {
1045 case c <- struct{}{}:
1046 case <-s.stopping:
1047 return
1048 }
1049 lid := lease.ID
1050 s.goAttach(func() {
1051 ctx := s.authStore.WithRoot(s.ctx)
1052 _, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
1053 if lerr == nil {
1054 leaseExpired.Inc()
1055 } else {
1056 if lg != nil {
1057 lg.Warn(
1058 "failed to revoke lease",
1059 zap.String("lease-id", fmt.Sprintf("%016x", lid)),
1060 zap.Error(lerr),
1061 )
1062 } else {
1063 plog.Warningf("failed to revoke %016x (%q)", lid, lerr.Error())
1064 }
1065 }
1066
1067 <-c
1068 })
1069 }
1070 })
1071 case err := <-s.errorc:
1072 if lg != nil {
1073 lg.Warn("server error", zap.Error(err))
1074 lg.Warn("data-dir used by this member must be removed")
1075 } else {
1076 plog.Errorf("%s", err)
1077 plog.Infof("the data-dir used by this member must be removed.")
1078 }
1079 return
1080 case <-getSyncC():
1081 if s.v2store.HasTTLKeys() {
1082 s.sync(s.Cfg.ReqTimeout())
1083 }
1084 case <-s.stop:
1085 return
1086 }
1087 }
1088}
1089
1090func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
1091 s.applySnapshot(ep, apply)
1092 s.applyEntries(ep, apply)
1093
1094 proposalsApplied.Set(float64(ep.appliedi))
1095 s.applyWait.Trigger(ep.appliedi)
1096
1097 // wait for the raft routine to finish the disk writes before triggering a
1098 // snapshot. or applied index might be greater than the last index in raft
1099 // storage, since the raft routine might be slower than apply routine.
1100 <-apply.notifyc
1101
1102 s.triggerSnapshot(ep)
1103 select {
1104 // snapshot requested via send()
1105 case m := <-s.r.msgSnapC:
1106 merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
1107 s.sendMergedSnap(merged)
1108 default:
1109 }
1110}
1111
1112func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
1113 if raft.IsEmptySnap(apply.snapshot) {
1114 return
1115 }
1116 applySnapshotInProgress.Inc()
1117
1118 lg := s.getLogger()
1119 if lg != nil {
1120 lg.Info(
1121 "applying snapshot",
1122 zap.Uint64("current-snapshot-index", ep.snapi),
1123 zap.Uint64("current-applied-index", ep.appliedi),
1124 zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
1125 zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
1126 )
1127 } else {
1128 plog.Infof("applying snapshot at index %d...", ep.snapi)
1129 }
1130 defer func() {
1131 if lg != nil {
1132 lg.Info(
1133 "applied snapshot",
1134 zap.Uint64("current-snapshot-index", ep.snapi),
1135 zap.Uint64("current-applied-index", ep.appliedi),
1136 zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
1137 zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
1138 )
1139 } else {
1140 plog.Infof("finished applying incoming snapshot at index %d", ep.snapi)
1141 }
1142 applySnapshotInProgress.Dec()
1143 }()
1144
1145 if apply.snapshot.Metadata.Index <= ep.appliedi {
1146 if lg != nil {
1147 lg.Panic(
1148 "unexpected leader snapshot from outdated index",
1149 zap.Uint64("current-snapshot-index", ep.snapi),
1150 zap.Uint64("current-applied-index", ep.appliedi),
1151 zap.Uint64("incoming-leader-snapshot-index", apply.snapshot.Metadata.Index),
1152 zap.Uint64("incoming-leader-snapshot-term", apply.snapshot.Metadata.Term),
1153 )
1154 } else {
1155 plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1",
1156 apply.snapshot.Metadata.Index, ep.appliedi)
1157 }
1158 }
1159
1160 // wait for raftNode to persist snapshot onto the disk
1161 <-apply.notifyc
1162
1163 newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot)
1164 if err != nil {
1165 if lg != nil {
1166 lg.Panic("failed to open snapshot backend", zap.Error(err))
1167 } else {
1168 plog.Panic(err)
1169 }
1170 }
1171
1172 // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
1173 // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
1174 if s.lessor != nil {
1175 if lg != nil {
1176 lg.Info("restoring lease store")
1177 } else {
1178 plog.Info("recovering lessor...")
1179 }
1180
1181 s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write() })
1182
1183 if lg != nil {
1184 lg.Info("restored lease store")
1185 } else {
1186 plog.Info("finished recovering lessor")
1187 }
1188 }
1189
1190 if lg != nil {
1191 lg.Info("restoring mvcc store")
1192 } else {
1193 plog.Info("restoring mvcc store...")
1194 }
1195
1196 if err := s.kv.Restore(newbe); err != nil {
1197 if lg != nil {
1198 lg.Panic("failed to restore mvcc store", zap.Error(err))
1199 } else {
1200 plog.Panicf("restore KV error: %v", err)
1201 }
1202 }
1203
1204 s.consistIndex.setConsistentIndex(s.kv.ConsistentIndex())
1205 if lg != nil {
1206 lg.Info("restored mvcc store")
1207 } else {
1208 plog.Info("finished restoring mvcc store")
1209 }
1210
1211 // Closing old backend might block until all the txns
1212 // on the backend are finished.
1213 // We do not want to wait on closing the old backend.
1214 s.bemu.Lock()
1215 oldbe := s.be
1216 go func() {
1217 if lg != nil {
1218 lg.Info("closing old backend file")
1219 } else {
1220 plog.Info("closing old backend...")
1221 }
1222 defer func() {
1223 if lg != nil {
1224 lg.Info("closed old backend file")
1225 } else {
1226 plog.Info("finished closing old backend")
1227 }
1228 }()
1229 if err := oldbe.Close(); err != nil {
1230 if lg != nil {
1231 lg.Panic("failed to close old backend", zap.Error(err))
1232 } else {
1233 plog.Panicf("close backend error: %v", err)
1234 }
1235 }
1236 }()
1237
1238 s.be = newbe
1239 s.bemu.Unlock()
1240
1241 if lg != nil {
1242 lg.Info("restoring alarm store")
1243 } else {
1244 plog.Info("recovering alarms...")
1245 }
1246
1247 if err := s.restoreAlarms(); err != nil {
1248 if lg != nil {
1249 lg.Panic("failed to restore alarm store", zap.Error(err))
1250 } else {
1251 plog.Panicf("restore alarms error: %v", err)
1252 }
1253 }
1254
1255 if lg != nil {
1256 lg.Info("restored alarm store")
1257 } else {
1258 plog.Info("finished recovering alarms")
1259 }
1260
1261 if s.authStore != nil {
1262 if lg != nil {
1263 lg.Info("restoring auth store")
1264 } else {
1265 plog.Info("recovering auth store...")
1266 }
1267
1268 s.authStore.Recover(newbe)
1269
1270 if lg != nil {
1271 lg.Info("restored auth store")
1272 } else {
1273 plog.Info("finished recovering auth store")
1274 }
1275 }
1276
1277 if lg != nil {
1278 lg.Info("restoring v2 store")
1279 } else {
1280 plog.Info("recovering store v2...")
1281 }
1282 if err := s.v2store.Recovery(apply.snapshot.Data); err != nil {
1283 if lg != nil {
1284 lg.Panic("failed to restore v2 store", zap.Error(err))
1285 } else {
1286 plog.Panicf("recovery store error: %v", err)
1287 }
1288 }
1289
1290 if lg != nil {
1291 lg.Info("restored v2 store")
1292 } else {
1293 plog.Info("finished recovering store v2")
1294 }
1295
1296 s.cluster.SetBackend(newbe)
1297
1298 if lg != nil {
1299 lg.Info("restoring cluster configuration")
1300 } else {
1301 plog.Info("recovering cluster configuration...")
1302 }
1303
1304 s.cluster.Recover(api.UpdateCapability)
1305
1306 if lg != nil {
1307 lg.Info("restored cluster configuration")
1308 lg.Info("removing old peers from network")
1309 } else {
1310 plog.Info("finished recovering cluster configuration")
1311 plog.Info("removing old peers from network...")
1312 }
1313
1314 // recover raft transport
1315 s.r.transport.RemoveAllPeers()
1316
1317 if lg != nil {
1318 lg.Info("removed old peers from network")
1319 lg.Info("adding peers from new cluster configuration")
1320 } else {
1321 plog.Info("finished removing old peers from network")
1322 plog.Info("adding peers from new cluster configuration into network...")
1323 }
1324
1325 for _, m := range s.cluster.Members() {
1326 if m.ID == s.ID() {
1327 continue
1328 }
1329 s.r.transport.AddPeer(m.ID, m.PeerURLs)
1330 }
1331
1332 if lg != nil {
1333 lg.Info("added peers from new cluster configuration")
1334 } else {
1335 plog.Info("finished adding peers from new cluster configuration into network...")
1336 }
1337
1338 ep.appliedt = apply.snapshot.Metadata.Term
1339 ep.appliedi = apply.snapshot.Metadata.Index
1340 ep.snapi = ep.appliedi
1341 ep.confState = apply.snapshot.Metadata.ConfState
1342}
1343
1344func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
1345 if len(apply.entries) == 0 {
1346 return
1347 }
1348 firsti := apply.entries[0].Index
1349 if firsti > ep.appliedi+1 {
1350 if lg := s.getLogger(); lg != nil {
1351 lg.Panic(
1352 "unexpected committed entry index",
1353 zap.Uint64("current-applied-index", ep.appliedi),
1354 zap.Uint64("first-committed-entry-index", firsti),
1355 )
1356 } else {
1357 plog.Panicf("first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, ep.appliedi)
1358 }
1359 }
1360 var ents []raftpb.Entry
1361 if ep.appliedi+1-firsti < uint64(len(apply.entries)) {
1362 ents = apply.entries[ep.appliedi+1-firsti:]
1363 }
1364 if len(ents) == 0 {
1365 return
1366 }
1367 var shouldstop bool
1368 if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
1369 go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
1370 }
1371}
1372
1373func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
1374 if ep.appliedi-ep.snapi <= s.Cfg.SnapshotCount {
1375 return
1376 }
1377
1378 if lg := s.getLogger(); lg != nil {
1379 lg.Info(
1380 "triggering snapshot",
1381 zap.String("local-member-id", s.ID().String()),
1382 zap.Uint64("local-member-applied-index", ep.appliedi),
1383 zap.Uint64("local-member-snapshot-index", ep.snapi),
1384 zap.Uint64("local-member-snapshot-count", s.Cfg.SnapshotCount),
1385 )
1386 } else {
1387 plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", ep.appliedi, ep.snapi)
1388 }
1389
1390 s.snapshot(ep.appliedi, ep.confState)
1391 ep.snapi = ep.appliedi
1392}
1393
1394func (s *EtcdServer) hasMultipleVotingMembers() bool {
1395 return s.cluster != nil && len(s.cluster.VotingMemberIDs()) > 1
1396}
1397
1398func (s *EtcdServer) isLeader() bool {
1399 return uint64(s.ID()) == s.Lead()
1400}
1401
1402// MoveLeader transfers the leader to the given transferee.
1403func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) error {
1404 if !s.cluster.IsMemberExist(types.ID(transferee)) || s.cluster.Member(types.ID(transferee)).IsLearner {
1405 return ErrBadLeaderTransferee
1406 }
1407
1408 now := time.Now()
1409 interval := time.Duration(s.Cfg.TickMs) * time.Millisecond
1410
1411 if lg := s.getLogger(); lg != nil {
1412 lg.Info(
1413 "leadership transfer starting",
1414 zap.String("local-member-id", s.ID().String()),
1415 zap.String("current-leader-member-id", types.ID(lead).String()),
1416 zap.String("transferee-member-id", types.ID(transferee).String()),
1417 )
1418 } else {
1419 plog.Infof("%s starts leadership transfer from %s to %s", s.ID(), types.ID(lead), types.ID(transferee))
1420 }
1421
1422 s.r.TransferLeadership(ctx, lead, transferee)
1423 for s.Lead() != transferee {
1424 select {
1425 case <-ctx.Done(): // time out
1426 return ErrTimeoutLeaderTransfer
1427 case <-time.After(interval):
1428 }
1429 }
1430
1431 // TODO: drain all requests, or drop all messages to the old leader
1432 if lg := s.getLogger(); lg != nil {
1433 lg.Info(
1434 "leadership transfer finished",
1435 zap.String("local-member-id", s.ID().String()),
1436 zap.String("old-leader-member-id", types.ID(lead).String()),
1437 zap.String("new-leader-member-id", types.ID(transferee).String()),
1438 zap.Duration("took", time.Since(now)),
1439 )
1440 } else {
1441 plog.Infof("%s finished leadership transfer from %s to %s (took %v)", s.ID(), types.ID(lead), types.ID(transferee), time.Since(now))
1442 }
1443 return nil
1444}
1445
1446// TransferLeadership transfers the leader to the chosen transferee.
1447func (s *EtcdServer) TransferLeadership() error {
1448 if !s.isLeader() {
1449 if lg := s.getLogger(); lg != nil {
1450 lg.Info(
1451 "skipped leadership transfer; local server is not leader",
1452 zap.String("local-member-id", s.ID().String()),
1453 zap.String("current-leader-member-id", types.ID(s.Lead()).String()),
1454 )
1455 } else {
1456 plog.Printf("skipped leadership transfer for stopping non-leader member")
1457 }
1458 return nil
1459 }
1460
1461 if !s.hasMultipleVotingMembers() {
1462 if lg := s.getLogger(); lg != nil {
1463 lg.Info(
1464 "skipped leadership transfer for single voting member cluster",
1465 zap.String("local-member-id", s.ID().String()),
1466 zap.String("current-leader-member-id", types.ID(s.Lead()).String()),
1467 )
1468 } else {
1469 plog.Printf("skipped leadership transfer for single voting member cluster")
1470 }
1471 return nil
1472 }
1473
1474 transferee, ok := longestConnected(s.r.transport, s.cluster.VotingMemberIDs())
1475 if !ok {
1476 return ErrUnhealthy
1477 }
1478
1479 tm := s.Cfg.ReqTimeout()
1480 ctx, cancel := context.WithTimeout(s.ctx, tm)
1481 err := s.MoveLeader(ctx, s.Lead(), uint64(transferee))
1482 cancel()
1483 return err
1484}
1485
1486// HardStop stops the server without coordination with other members in the cluster.
1487func (s *EtcdServer) HardStop() {
1488 select {
1489 case s.stop <- struct{}{}:
1490 case <-s.done:
1491 return
1492 }
1493 <-s.done
1494}
1495
1496// Stop stops the server gracefully, and shuts down the running goroutine.
1497// Stop should be called after a Start(s), otherwise it will block forever.
1498// When stopping leader, Stop transfers its leadership to one of its peers
1499// before stopping the server.
1500// Stop terminates the Server and performs any necessary finalization.
1501// Do and Process cannot be called after Stop has been invoked.
1502func (s *EtcdServer) Stop() {
1503 if err := s.TransferLeadership(); err != nil {
1504 if lg := s.getLogger(); lg != nil {
1505 lg.Warn("leadership transfer failed", zap.String("local-member-id", s.ID().String()), zap.Error(err))
1506 } else {
1507 plog.Warningf("%s failed to transfer leadership (%v)", s.ID(), err)
1508 }
1509 }
1510 s.HardStop()
1511}
1512
1513// ReadyNotify returns a channel that will be closed when the server
1514// is ready to serve client requests
1515func (s *EtcdServer) ReadyNotify() <-chan struct{} { return s.readych }
1516
1517func (s *EtcdServer) stopWithDelay(d time.Duration, err error) {
1518 select {
1519 case <-time.After(d):
1520 case <-s.done:
1521 }
1522 select {
1523 case s.errorc <- err:
1524 default:
1525 }
1526}
1527
1528// StopNotify returns a channel that receives a empty struct
1529// when the server is stopped.
1530func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done }
1531
1532func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }
1533
1534func (s *EtcdServer) LeaderStats() []byte {
1535 lead := s.getLead()
1536 if lead != uint64(s.id) {
1537 return nil
1538 }
1539 return s.lstats.JSON()
1540}
1541
1542func (s *EtcdServer) StoreStats() []byte { return s.v2store.JsonStats() }
1543
1544func (s *EtcdServer) checkMembershipOperationPermission(ctx context.Context) error {
1545 if s.authStore == nil {
1546 // In the context of ordinary etcd process, s.authStore will never be nil.
1547 // This branch is for handling cases in server_test.go
1548 return nil
1549 }
1550
1551 // Note that this permission check is done in the API layer,
1552 // so TOCTOU problem can be caused potentially in a schedule like this:
1553 // update membership with user A -> revoke root role of A -> apply membership change
1554 // in the state machine layer
1555 // However, both of membership change and role management requires the root privilege.
1556 // So careful operation by admins can prevent the problem.
1557 authInfo, err := s.AuthInfoFromCtx(ctx)
1558 if err != nil {
1559 return err
1560 }
1561
1562 return s.AuthStore().IsAdminPermitted(authInfo)
1563}
1564
1565func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
1566 if err := s.checkMembershipOperationPermission(ctx); err != nil {
1567 return nil, err
1568 }
1569
1570 // TODO: move Member to protobuf type
1571 b, err := json.Marshal(memb)
1572 if err != nil {
1573 return nil, err
1574 }
1575
1576 // by default StrictReconfigCheck is enabled; reject new members if unhealthy.
1577 if err := s.mayAddMember(memb); err != nil {
1578 return nil, err
1579 }
1580
1581 cc := raftpb.ConfChange{
1582 Type: raftpb.ConfChangeAddNode,
1583 NodeID: uint64(memb.ID),
1584 Context: b,
1585 }
1586
1587 if memb.IsLearner {
1588 cc.Type = raftpb.ConfChangeAddLearnerNode
1589 }
1590
1591 return s.configure(ctx, cc)
1592}
1593
1594func (s *EtcdServer) mayAddMember(memb membership.Member) error {
1595 if !s.Cfg.StrictReconfigCheck {
1596 return nil
1597 }
1598
1599 // protect quorum when adding voting member
1600 if !memb.IsLearner && !s.cluster.IsReadyToAddVotingMember() {
1601 if lg := s.getLogger(); lg != nil {
1602 lg.Warn(
1603 "rejecting member add request; not enough healthy members",
1604 zap.String("local-member-id", s.ID().String()),
1605 zap.String("requested-member-add", fmt.Sprintf("%+v", memb)),
1606 zap.Error(ErrNotEnoughStartedMembers),
1607 )
1608 } else {
1609 plog.Warningf("not enough started members, rejecting member add %+v", memb)
1610 }
1611 return ErrNotEnoughStartedMembers
1612 }
1613
1614 if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), s.cluster.VotingMembers()) {
1615 if lg := s.getLogger(); lg != nil {
1616 lg.Warn(
1617 "rejecting member add request; local member has not been connected to all peers, reconfigure breaks active quorum",
1618 zap.String("local-member-id", s.ID().String()),
1619 zap.String("requested-member-add", fmt.Sprintf("%+v", memb)),
1620 zap.Error(ErrUnhealthy),
1621 )
1622 } else {
1623 plog.Warningf("not healthy for reconfigure, rejecting member add %+v", memb)
1624 }
1625 return ErrUnhealthy
1626 }
1627
1628 return nil
1629}
1630
1631func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
1632 if err := s.checkMembershipOperationPermission(ctx); err != nil {
1633 return nil, err
1634 }
1635
1636 // by default StrictReconfigCheck is enabled; reject removal if leads to quorum loss
1637 if err := s.mayRemoveMember(types.ID(id)); err != nil {
1638 return nil, err
1639 }
1640
1641 cc := raftpb.ConfChange{
1642 Type: raftpb.ConfChangeRemoveNode,
1643 NodeID: id,
1644 }
1645 return s.configure(ctx, cc)
1646}
1647
1648// PromoteMember promotes a learner node to a voting node.
1649func (s *EtcdServer) PromoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
1650 // only raft leader has information on whether the to-be-promoted learner node is ready. If promoteMember call
1651 // fails with ErrNotLeader, forward the request to leader node via HTTP. If promoteMember call fails with error
1652 // other than ErrNotLeader, return the error.
1653 resp, err := s.promoteMember(ctx, id)
1654 if err == nil {
1655 learnerPromoteSucceed.Inc()
1656 return resp, nil
1657 }
1658 if err != ErrNotLeader {
1659 learnerPromoteFailed.WithLabelValues(err.Error()).Inc()
1660 return resp, err
1661 }
1662
1663 cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
1664 defer cancel()
1665 // forward to leader
1666 for cctx.Err() == nil {
1667 leader, err := s.waitLeader(cctx)
1668 if err != nil {
1669 return nil, err
1670 }
1671 for _, url := range leader.PeerURLs {
1672 resp, err := promoteMemberHTTP(cctx, url, id, s.peerRt)
1673 if err == nil {
1674 return resp, nil
1675 }
1676 // If member promotion failed, return early. Otherwise keep retry.
1677 if err == ErrLearnerNotReady || err == membership.ErrIDNotFound || err == membership.ErrMemberNotLearner {
1678 return nil, err
1679 }
1680 }
1681 }
1682
1683 if cctx.Err() == context.DeadlineExceeded {
1684 return nil, ErrTimeout
1685 }
1686 return nil, ErrCanceled
1687}
1688
1689// promoteMember checks whether the to-be-promoted learner node is ready before sending the promote
1690// request to raft.
1691// The function returns ErrNotLeader if the local node is not raft leader (therefore does not have
1692// enough information to determine if the learner node is ready), returns ErrLearnerNotReady if the
1693// local node is leader (therefore has enough information) but decided the learner node is not ready
1694// to be promoted.
1695func (s *EtcdServer) promoteMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
1696 if err := s.checkMembershipOperationPermission(ctx); err != nil {
1697 return nil, err
1698 }
1699
1700 // check if we can promote this learner.
1701 if err := s.mayPromoteMember(types.ID(id)); err != nil {
1702 return nil, err
1703 }
1704
1705 // build the context for the promote confChange. mark IsLearner to false and IsPromote to true.
1706 promoteChangeContext := membership.ConfigChangeContext{
1707 Member: membership.Member{
1708 ID: types.ID(id),
1709 },
1710 IsPromote: true,
1711 }
1712
1713 b, err := json.Marshal(promoteChangeContext)
1714 if err != nil {
1715 return nil, err
1716 }
1717
1718 cc := raftpb.ConfChange{
1719 Type: raftpb.ConfChangeAddNode,
1720 NodeID: id,
1721 Context: b,
1722 }
1723
1724 return s.configure(ctx, cc)
1725}
1726
1727func (s *EtcdServer) mayPromoteMember(id types.ID) error {
1728 err := s.isLearnerReady(uint64(id))
1729 if err != nil {
1730 return err
1731 }
1732
1733 if !s.Cfg.StrictReconfigCheck {
1734 return nil
1735 }
1736 if !s.cluster.IsReadyToPromoteMember(uint64(id)) {
1737 if lg := s.getLogger(); lg != nil {
1738 lg.Warn(
1739 "rejecting member promote request; not enough healthy members",
1740 zap.String("local-member-id", s.ID().String()),
1741 zap.String("requested-member-remove-id", id.String()),
1742 zap.Error(ErrNotEnoughStartedMembers),
1743 )
1744 } else {
1745 plog.Warningf("not enough started members, rejecting promote member %s", id)
1746 }
1747 return ErrNotEnoughStartedMembers
1748 }
1749
1750 return nil
1751}
1752
1753// check whether the learner catches up with leader or not.
1754// Note: it will return nil if member is not found in cluster or if member is not learner.
1755// These two conditions will be checked before apply phase later.
1756func (s *EtcdServer) isLearnerReady(id uint64) error {
1757 rs := s.raftStatus()
1758
1759 // leader's raftStatus.Progress is not nil
1760 if rs.Progress == nil {
1761 return ErrNotLeader
1762 }
1763
1764 var learnerMatch uint64
1765 isFound := false
1766 leaderID := rs.ID
1767 for memberID, progress := range rs.Progress {
1768 if id == memberID {
1769 // check its status
1770 learnerMatch = progress.Match
1771 isFound = true
1772 break
1773 }
1774 }
1775
1776 if isFound {
1777 leaderMatch := rs.Progress[leaderID].Match
1778 // the learner's Match not caught up with leader yet
1779 if float64(learnerMatch) < float64(leaderMatch)*readyPercent {
1780 return ErrLearnerNotReady
1781 }
1782 }
1783
1784 return nil
1785}
1786
1787func (s *EtcdServer) mayRemoveMember(id types.ID) error {
1788 if !s.Cfg.StrictReconfigCheck {
1789 return nil
1790 }
1791
1792 isLearner := s.cluster.IsMemberExist(id) && s.cluster.Member(id).IsLearner
1793 // no need to check quorum when removing non-voting member
1794 if isLearner {
1795 return nil
1796 }
1797
1798 if !s.cluster.IsReadyToRemoveVotingMember(uint64(id)) {
1799 if lg := s.getLogger(); lg != nil {
1800 lg.Warn(
1801 "rejecting member remove request; not enough healthy members",
1802 zap.String("local-member-id", s.ID().String()),
1803 zap.String("requested-member-remove-id", id.String()),
1804 zap.Error(ErrNotEnoughStartedMembers),
1805 )
1806 } else {
1807 plog.Warningf("not enough started members, rejecting remove member %s", id)
1808 }
1809 return ErrNotEnoughStartedMembers
1810 }
1811
1812 // downed member is safe to remove since it's not part of the active quorum
1813 if t := s.r.transport.ActiveSince(id); id != s.ID() && t.IsZero() {
1814 return nil
1815 }
1816
1817 // protect quorum if some members are down
1818 m := s.cluster.VotingMembers()
1819 active := numConnectedSince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), m)
1820 if (active - 1) < 1+((len(m)-1)/2) {
1821 if lg := s.getLogger(); lg != nil {
1822 lg.Warn(
1823 "rejecting member remove request; local member has not been connected to all peers, reconfigure breaks active quorum",
1824 zap.String("local-member-id", s.ID().String()),
1825 zap.String("requested-member-remove", id.String()),
1826 zap.Int("active-peers", active),
1827 zap.Error(ErrUnhealthy),
1828 )
1829 } else {
1830 plog.Warningf("reconfigure breaks active quorum, rejecting remove member %s", id)
1831 }
1832 return ErrUnhealthy
1833 }
1834
1835 return nil
1836}
1837
1838func (s *EtcdServer) UpdateMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
1839 b, merr := json.Marshal(memb)
1840 if merr != nil {
1841 return nil, merr
1842 }
1843
1844 if err := s.checkMembershipOperationPermission(ctx); err != nil {
1845 return nil, err
1846 }
1847 cc := raftpb.ConfChange{
1848 Type: raftpb.ConfChangeUpdateNode,
1849 NodeID: uint64(memb.ID),
1850 Context: b,
1851 }
1852 return s.configure(ctx, cc)
1853}
1854
1855func (s *EtcdServer) setCommittedIndex(v uint64) {
1856 atomic.StoreUint64(&s.committedIndex, v)
1857}
1858
1859func (s *EtcdServer) getCommittedIndex() uint64 {
1860 return atomic.LoadUint64(&s.committedIndex)
1861}
1862
1863func (s *EtcdServer) setAppliedIndex(v uint64) {
1864 atomic.StoreUint64(&s.appliedIndex, v)
1865}
1866
1867func (s *EtcdServer) getAppliedIndex() uint64 {
1868 return atomic.LoadUint64(&s.appliedIndex)
1869}
1870
1871func (s *EtcdServer) setTerm(v uint64) {
1872 atomic.StoreUint64(&s.term, v)
1873}
1874
1875func (s *EtcdServer) getTerm() uint64 {
1876 return atomic.LoadUint64(&s.term)
1877}
1878
1879func (s *EtcdServer) setLead(v uint64) {
1880 atomic.StoreUint64(&s.lead, v)
1881}
1882
1883func (s *EtcdServer) getLead() uint64 {
1884 return atomic.LoadUint64(&s.lead)
1885}
1886
1887func (s *EtcdServer) leaderChangedNotify() <-chan struct{} {
1888 s.leaderChangedMu.RLock()
1889 defer s.leaderChangedMu.RUnlock()
1890 return s.leaderChanged
1891}
1892
1893// RaftStatusGetter represents etcd server and Raft progress.
1894type RaftStatusGetter interface {
1895 ID() types.ID
1896 Leader() types.ID
1897 CommittedIndex() uint64
1898 AppliedIndex() uint64
1899 Term() uint64
1900}
1901
1902func (s *EtcdServer) ID() types.ID { return s.id }
1903
1904func (s *EtcdServer) Leader() types.ID { return types.ID(s.getLead()) }
1905
1906func (s *EtcdServer) Lead() uint64 { return s.getLead() }
1907
1908func (s *EtcdServer) CommittedIndex() uint64 { return s.getCommittedIndex() }
1909
1910func (s *EtcdServer) AppliedIndex() uint64 { return s.getAppliedIndex() }
1911
1912func (s *EtcdServer) Term() uint64 { return s.getTerm() }
1913
1914type confChangeResponse struct {
1915 membs []*membership.Member
1916 err error
1917}
1918
1919// configure sends a configuration change through consensus and
1920// then waits for it to be applied to the server. It
1921// will block until the change is performed or there is an error.
1922func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*membership.Member, error) {
1923 cc.ID = s.reqIDGen.Next()
1924 ch := s.w.Register(cc.ID)
1925
1926 start := time.Now()
1927 if err := s.r.ProposeConfChange(ctx, cc); err != nil {
1928 s.w.Trigger(cc.ID, nil)
1929 return nil, err
1930 }
1931
1932 select {
1933 case x := <-ch:
1934 if x == nil {
1935 if lg := s.getLogger(); lg != nil {
1936 lg.Panic("failed to configure")
1937 } else {
1938 plog.Panicf("configure trigger value should never be nil")
1939 }
1940 }
1941 resp := x.(*confChangeResponse)
1942 if lg := s.getLogger(); lg != nil {
1943 lg.Info(
1944 "applied a configuration change through raft",
1945 zap.String("local-member-id", s.ID().String()),
1946 zap.String("raft-conf-change", cc.Type.String()),
1947 zap.String("raft-conf-change-node-id", types.ID(cc.NodeID).String()),
1948 )
1949 }
1950 return resp.membs, resp.err
1951
1952 case <-ctx.Done():
1953 s.w.Trigger(cc.ID, nil) // GC wait
1954 return nil, s.parseProposeCtxErr(ctx.Err(), start)
1955
1956 case <-s.stopping:
1957 return nil, ErrStopped
1958 }
1959}
1960
1961// sync proposes a SYNC request and is non-blocking.
1962// This makes no guarantee that the request will be proposed or performed.
1963// The request will be canceled after the given timeout.
1964func (s *EtcdServer) sync(timeout time.Duration) {
1965 req := pb.Request{
1966 Method: "SYNC",
1967 ID: s.reqIDGen.Next(),
1968 Time: time.Now().UnixNano(),
1969 }
1970 data := pbutil.MustMarshal(&req)
1971 // There is no promise that node has leader when do SYNC request,
1972 // so it uses goroutine to propose.
1973 ctx, cancel := context.WithTimeout(s.ctx, timeout)
1974 s.goAttach(func() {
1975 s.r.Propose(ctx, data)
1976 cancel()
1977 })
1978}
1979
1980// publish registers server information into the cluster. The information
1981// is the JSON representation of this server's member struct, updated with the
1982// static clientURLs of the server.
1983// The function keeps attempting to register until it succeeds,
1984// or its server is stopped.
1985//
1986// Use v2 store to encode member attributes, and apply through Raft
1987// but does not go through v2 API endpoint, which means even with v2
1988// client handler disabled (e.g. --enable-v2=false), cluster can still
1989// process publish requests through rafthttp
1990// TODO: Deprecate v2 store
1991func (s *EtcdServer) publish(timeout time.Duration) {
1992 b, err := json.Marshal(s.attributes)
1993 if err != nil {
1994 if lg := s.getLogger(); lg != nil {
1995 lg.Panic("failed to marshal JSON", zap.Error(err))
1996 } else {
1997 plog.Panicf("json marshal error: %v", err)
1998 }
1999 return
2000 }
2001 req := pb.Request{
2002 Method: "PUT",
2003 Path: membership.MemberAttributesStorePath(s.id),
2004 Val: string(b),
2005 }
2006
2007 for {
2008 ctx, cancel := context.WithTimeout(s.ctx, timeout)
2009 _, err := s.Do(ctx, req)
2010 cancel()
2011 switch err {
2012 case nil:
2013 close(s.readych)
2014 if lg := s.getLogger(); lg != nil {
2015 lg.Info(
2016 "published local member to cluster through raft",
2017 zap.String("local-member-id", s.ID().String()),
2018 zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
2019 zap.String("request-path", req.Path),
2020 zap.String("cluster-id", s.cluster.ID().String()),
2021 zap.Duration("publish-timeout", timeout),
2022 )
2023 } else {
2024 plog.Infof("published %+v to cluster %s", s.attributes, s.cluster.ID())
2025 }
2026 return
2027
2028 case ErrStopped:
2029 if lg := s.getLogger(); lg != nil {
2030 lg.Warn(
2031 "stopped publish because server is stopped",
2032 zap.String("local-member-id", s.ID().String()),
2033 zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
2034 zap.Duration("publish-timeout", timeout),
2035 zap.Error(err),
2036 )
2037 } else {
2038 plog.Infof("aborting publish because server is stopped")
2039 }
2040 return
2041
2042 default:
2043 if lg := s.getLogger(); lg != nil {
2044 lg.Warn(
2045 "failed to publish local member to cluster through raft",
2046 zap.String("local-member-id", s.ID().String()),
2047 zap.String("local-member-attributes", fmt.Sprintf("%+v", s.attributes)),
2048 zap.String("request-path", req.Path),
2049 zap.Duration("publish-timeout", timeout),
2050 zap.Error(err),
2051 )
2052 } else {
2053 plog.Errorf("publish error: %v", err)
2054 }
2055 }
2056 }
2057}
2058
2059func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
2060 atomic.AddInt64(&s.inflightSnapshots, 1)
2061
2062 lg := s.getLogger()
2063 fields := []zap.Field{
2064 zap.String("from", s.ID().String()),
2065 zap.String("to", types.ID(merged.To).String()),
2066 zap.Int64("bytes", merged.TotalSize),
2067 zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
2068 }
2069
2070 now := time.Now()
2071 s.r.transport.SendSnapshot(merged)
2072 if lg != nil {
2073 lg.Info("sending merged snapshot", fields...)
2074 }
2075
2076 s.goAttach(func() {
2077 select {
2078 case ok := <-merged.CloseNotify():
2079 // delay releasing inflight snapshot for another 30 seconds to
2080 // block log compaction.
2081 // If the follower still fails to catch up, it is probably just too slow
2082 // to catch up. We cannot avoid the snapshot cycle anyway.
2083 if ok {
2084 select {
2085 case <-time.After(releaseDelayAfterSnapshot):
2086 case <-s.stopping:
2087 }
2088 }
2089
2090 atomic.AddInt64(&s.inflightSnapshots, -1)
2091
2092 if lg != nil {
2093 lg.Info("sent merged snapshot", append(fields, zap.Duration("took", time.Since(now)))...)
2094 }
2095
2096 case <-s.stopping:
2097 if lg != nil {
2098 lg.Warn("canceled sending merged snapshot; server stopping", fields...)
2099 }
2100 return
2101 }
2102 })
2103}
2104
2105// apply takes entries received from Raft (after it has been committed) and
2106// applies them to the current state of the EtcdServer.
2107// The given entries should not be empty.
2108func (s *EtcdServer) apply(
2109 es []raftpb.Entry,
2110 confState *raftpb.ConfState,
2111) (appliedt uint64, appliedi uint64, shouldStop bool) {
2112 for i := range es {
2113 e := es[i]
2114 switch e.Type {
2115 case raftpb.EntryNormal:
2116 s.applyEntryNormal(&e)
2117 s.setAppliedIndex(e.Index)
2118 s.setTerm(e.Term)
2119
2120 case raftpb.EntryConfChange:
2121 // set the consistent index of current executing entry
2122 if e.Index > s.consistIndex.ConsistentIndex() {
2123 s.consistIndex.setConsistentIndex(e.Index)
2124 }
2125 var cc raftpb.ConfChange
2126 pbutil.MustUnmarshal(&cc, e.Data)
2127 removedSelf, err := s.applyConfChange(cc, confState)
2128 s.setAppliedIndex(e.Index)
2129 s.setTerm(e.Term)
2130 shouldStop = shouldStop || removedSelf
2131 s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
2132
2133 default:
2134 if lg := s.getLogger(); lg != nil {
2135 lg.Panic(
2136 "unknown entry type; must be either EntryNormal or EntryConfChange",
2137 zap.String("type", e.Type.String()),
2138 )
2139 } else {
2140 plog.Panicf("entry type should be either EntryNormal or EntryConfChange")
2141 }
2142 }
2143 appliedi, appliedt = e.Index, e.Term
2144 }
2145 return appliedt, appliedi, shouldStop
2146}
2147
2148// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
2149func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
2150 shouldApplyV3 := false
2151 if e.Index > s.consistIndex.ConsistentIndex() {
2152 // set the consistent index of current executing entry
2153 s.consistIndex.setConsistentIndex(e.Index)
2154 shouldApplyV3 = true
2155 }
2156
2157 // raft state machine may generate noop entry when leader confirmation.
2158 // skip it in advance to avoid some potential bug in the future
2159 if len(e.Data) == 0 {
2160 select {
2161 case s.forceVersionC <- struct{}{}:
2162 default:
2163 }
2164 // promote lessor when the local member is leader and finished
2165 // applying all entries from the last term.
2166 if s.isLeader() {
2167 s.lessor.Promote(s.Cfg.electionTimeout())
2168 }
2169 return
2170 }
2171
2172 var raftReq pb.InternalRaftRequest
2173 if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
2174 var r pb.Request
2175 rp := &r
2176 pbutil.MustUnmarshal(rp, e.Data)
2177 s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp)))
2178 return
2179 }
2180 if raftReq.V2 != nil {
2181 req := (*RequestV2)(raftReq.V2)
2182 s.w.Trigger(req.ID, s.applyV2Request(req))
2183 return
2184 }
2185
2186 // do not re-apply applied entries.
2187 if !shouldApplyV3 {
2188 return
2189 }
2190
2191 id := raftReq.ID
2192 if id == 0 {
2193 id = raftReq.Header.ID
2194 }
2195
2196 var ar *applyResult
2197 needResult := s.w.IsRegistered(id)
2198 if needResult || !noSideEffect(&raftReq) {
2199 if !needResult && raftReq.Txn != nil {
2200 removeNeedlessRangeReqs(raftReq.Txn)
2201 }
2202 ar = s.applyV3.Apply(&raftReq)
2203 }
2204
2205 if ar == nil {
2206 return
2207 }
2208
2209 if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
2210 s.w.Trigger(id, ar)
2211 return
2212 }
2213
2214 if lg := s.getLogger(); lg != nil {
2215 lg.Warn(
2216 "message exceeded backend quota; raising alarm",
2217 zap.Int64("quota-size-bytes", s.Cfg.QuotaBackendBytes),
2218 zap.String("quota-size", humanize.Bytes(uint64(s.Cfg.QuotaBackendBytes))),
2219 zap.Error(ar.err),
2220 )
2221 } else {
2222 plog.Errorf("applying raft message exceeded backend quota")
2223 }
2224
2225 s.goAttach(func() {
2226 a := &pb.AlarmRequest{
2227 MemberID: uint64(s.ID()),
2228 Action: pb.AlarmRequest_ACTIVATE,
2229 Alarm: pb.AlarmType_NOSPACE,
2230 }
2231 s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
2232 s.w.Trigger(id, ar)
2233 })
2234}
2235
2236// applyConfChange applies a ConfChange to the server. It is only
2237// invoked with a ConfChange that has already passed through Raft
2238func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) {
2239 if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
2240 cc.NodeID = raft.None
2241 s.r.ApplyConfChange(cc)
2242 return false, err
2243 }
2244
2245 lg := s.getLogger()
2246 *confState = *s.r.ApplyConfChange(cc)
2247 switch cc.Type {
2248 case raftpb.ConfChangeAddNode, raftpb.ConfChangeAddLearnerNode:
2249 confChangeContext := new(membership.ConfigChangeContext)
2250 if err := json.Unmarshal(cc.Context, confChangeContext); err != nil {
2251 if lg != nil {
2252 lg.Panic("failed to unmarshal member", zap.Error(err))
2253 } else {
2254 plog.Panicf("unmarshal member should never fail: %v", err)
2255 }
2256 }
2257 if cc.NodeID != uint64(confChangeContext.Member.ID) {
2258 if lg != nil {
2259 lg.Panic(
2260 "got different member ID",
2261 zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()),
2262 zap.String("member-id-from-message", confChangeContext.Member.ID.String()),
2263 )
2264 } else {
2265 plog.Panicf("nodeID should always be equal to member ID")
2266 }
2267 }
2268 if confChangeContext.IsPromote {
2269 s.cluster.PromoteMember(confChangeContext.Member.ID)
2270 } else {
2271 s.cluster.AddMember(&confChangeContext.Member)
2272
2273 if confChangeContext.Member.ID != s.id {
2274 s.r.transport.AddPeer(confChangeContext.Member.ID, confChangeContext.PeerURLs)
2275 }
2276 }
2277
2278 // update the isLearner metric when this server id is equal to the id in raft member confChange
2279 if confChangeContext.Member.ID == s.id {
2280 if cc.Type == raftpb.ConfChangeAddLearnerNode {
2281 isLearner.Set(1)
2282 } else {
2283 isLearner.Set(0)
2284 }
2285 }
2286
2287 case raftpb.ConfChangeRemoveNode:
2288 id := types.ID(cc.NodeID)
2289 s.cluster.RemoveMember(id)
2290 if id == s.id {
2291 return true, nil
2292 }
2293 s.r.transport.RemovePeer(id)
2294
2295 case raftpb.ConfChangeUpdateNode:
2296 m := new(membership.Member)
2297 if err := json.Unmarshal(cc.Context, m); err != nil {
2298 if lg != nil {
2299 lg.Panic("failed to unmarshal member", zap.Error(err))
2300 } else {
2301 plog.Panicf("unmarshal member should never fail: %v", err)
2302 }
2303 }
2304 if cc.NodeID != uint64(m.ID) {
2305 if lg != nil {
2306 lg.Panic(
2307 "got different member ID",
2308 zap.String("member-id-from-config-change-entry", types.ID(cc.NodeID).String()),
2309 zap.String("member-id-from-message", m.ID.String()),
2310 )
2311 } else {
2312 plog.Panicf("nodeID should always be equal to member ID")
2313 }
2314 }
2315 s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
2316 if m.ID != s.id {
2317 s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
2318 }
2319 }
2320 return false, nil
2321}
2322
2323// TODO: non-blocking snapshot
2324func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
2325 clone := s.v2store.Clone()
2326 // commit kv to write metadata (for example: consistent index) to disk.
2327 // KV().commit() updates the consistent index in backend.
2328 // All operations that update consistent index must be called sequentially
2329 // from applyAll function.
2330 // So KV().Commit() cannot run in parallel with apply. It has to be called outside
2331 // the go routine created below.
2332 s.KV().Commit()
2333
2334 s.goAttach(func() {
2335 lg := s.getLogger()
2336
2337 d, err := clone.SaveNoCopy()
2338 // TODO: current store will never fail to do a snapshot
2339 // what should we do if the store might fail?
2340 if err != nil {
2341 if lg != nil {
2342 lg.Panic("failed to save v2 store", zap.Error(err))
2343 } else {
2344 plog.Panicf("store save should never fail: %v", err)
2345 }
2346 }
2347 snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
2348 if err != nil {
2349 // the snapshot was done asynchronously with the progress of raft.
2350 // raft might have already got a newer snapshot.
2351 if err == raft.ErrSnapOutOfDate {
2352 return
2353 }
2354 if lg != nil {
2355 lg.Panic("failed to create snapshot", zap.Error(err))
2356 } else {
2357 plog.Panicf("unexpected create snapshot error %v", err)
2358 }
2359 }
2360 // SaveSnap saves the snapshot and releases the locked wal files
2361 // to the snapshot index.
2362 if err = s.r.storage.SaveSnap(snap); err != nil {
2363 if lg != nil {
2364 lg.Panic("failed to save snapshot", zap.Error(err))
2365 } else {
2366 plog.Fatalf("save snapshot error: %v", err)
2367 }
2368 }
2369 if lg != nil {
2370 lg.Info(
2371 "saved snapshot",
2372 zap.Uint64("snapshot-index", snap.Metadata.Index),
2373 )
2374 } else {
2375 plog.Infof("saved snapshot at index %d", snap.Metadata.Index)
2376 }
2377
2378 // When sending a snapshot, etcd will pause compaction.
2379 // After receives a snapshot, the slow follower needs to get all the entries right after
2380 // the snapshot sent to catch up. If we do not pause compaction, the log entries right after
2381 // the snapshot sent might already be compacted. It happens when the snapshot takes long time
2382 // to send and save. Pausing compaction avoids triggering a snapshot sending cycle.
2383 if atomic.LoadInt64(&s.inflightSnapshots) != 0 {
2384 if lg != nil {
2385 lg.Info("skip compaction since there is an inflight snapshot")
2386 } else {
2387 plog.Infof("skip compaction since there is an inflight snapshot")
2388 }
2389 return
2390 }
2391
2392 // keep some in memory log entries for slow followers.
2393 compacti := uint64(1)
2394 if snapi > s.Cfg.SnapshotCatchUpEntries {
2395 compacti = snapi - s.Cfg.SnapshotCatchUpEntries
2396 }
2397
2398 err = s.r.raftStorage.Compact(compacti)
2399 if err != nil {
2400 // the compaction was done asynchronously with the progress of raft.
2401 // raft log might already been compact.
2402 if err == raft.ErrCompacted {
2403 return
2404 }
2405 if lg != nil {
2406 lg.Panic("failed to compact", zap.Error(err))
2407 } else {
2408 plog.Panicf("unexpected compaction error %v", err)
2409 }
2410 }
2411 if lg != nil {
2412 lg.Info(
2413 "compacted Raft logs",
2414 zap.Uint64("compact-index", compacti),
2415 )
2416 } else {
2417 plog.Infof("compacted raft log at %d", compacti)
2418 }
2419 })
2420}
2421
2422// CutPeer drops messages to the specified peer.
2423func (s *EtcdServer) CutPeer(id types.ID) {
2424 tr, ok := s.r.transport.(*rafthttp.Transport)
2425 if ok {
2426 tr.CutPeer(id)
2427 }
2428}
2429
2430// MendPeer recovers the message dropping behavior of the given peer.
2431func (s *EtcdServer) MendPeer(id types.ID) {
2432 tr, ok := s.r.transport.(*rafthttp.Transport)
2433 if ok {
2434 tr.MendPeer(id)
2435 }
2436}
2437
2438func (s *EtcdServer) PauseSending() { s.r.pauseSending() }
2439
2440func (s *EtcdServer) ResumeSending() { s.r.resumeSending() }
2441
2442func (s *EtcdServer) ClusterVersion() *semver.Version {
2443 if s.cluster == nil {
2444 return nil
2445 }
2446 return s.cluster.Version()
2447}
2448
2449// monitorVersions checks the member's version every monitorVersionInterval.
2450// It updates the cluster version if all members agrees on a higher one.
2451// It prints out log if there is a member with a higher version than the
2452// local version.
2453func (s *EtcdServer) monitorVersions() {
2454 for {
2455 select {
2456 case <-s.forceVersionC:
2457 case <-time.After(monitorVersionInterval):
2458 case <-s.stopping:
2459 return
2460 }
2461
2462 if s.Leader() != s.ID() {
2463 continue
2464 }
2465
2466 v := decideClusterVersion(s.getLogger(), getVersions(s.getLogger(), s.cluster, s.id, s.peerRt))
2467 if v != nil {
2468 // only keep major.minor version for comparison
2469 v = &semver.Version{
2470 Major: v.Major,
2471 Minor: v.Minor,
2472 }
2473 }
2474
2475 // if the current version is nil:
2476 // 1. use the decided version if possible
2477 // 2. or use the min cluster version
2478 if s.cluster.Version() == nil {
2479 verStr := version.MinClusterVersion
2480 if v != nil {
2481 verStr = v.String()
2482 }
2483 s.goAttach(func() { s.updateClusterVersion(verStr) })
2484 continue
2485 }
2486
2487 // update cluster version only if the decided version is greater than
2488 // the current cluster version
2489 if v != nil && s.cluster.Version().LessThan(*v) {
2490 s.goAttach(func() { s.updateClusterVersion(v.String()) })
2491 }
2492 }
2493}
2494
2495func (s *EtcdServer) updateClusterVersion(ver string) {
2496 lg := s.getLogger()
2497
2498 if s.cluster.Version() == nil {
2499 if lg != nil {
2500 lg.Info(
2501 "setting up initial cluster version",
2502 zap.String("cluster-version", version.Cluster(ver)),
2503 )
2504 } else {
2505 plog.Infof("setting up the initial cluster version to %s", version.Cluster(ver))
2506 }
2507 } else {
2508 if lg != nil {
2509 lg.Info(
2510 "updating cluster version",
2511 zap.String("from", version.Cluster(s.cluster.Version().String())),
2512 zap.String("to", version.Cluster(ver)),
2513 )
2514 } else {
2515 plog.Infof("updating the cluster version from %s to %s", version.Cluster(s.cluster.Version().String()), version.Cluster(ver))
2516 }
2517 }
2518
2519 req := pb.Request{
2520 Method: "PUT",
2521 Path: membership.StoreClusterVersionKey(),
2522 Val: ver,
2523 }
2524
2525 ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
2526 _, err := s.Do(ctx, req)
2527 cancel()
2528
2529 switch err {
2530 case nil:
2531 if lg != nil {
2532 lg.Info("cluster version is updated", zap.String("cluster-version", version.Cluster(ver)))
2533 }
2534 return
2535
2536 case ErrStopped:
2537 if lg != nil {
2538 lg.Warn("aborting cluster version update; server is stopped", zap.Error(err))
2539 } else {
2540 plog.Infof("aborting update cluster version because server is stopped")
2541 }
2542 return
2543
2544 default:
2545 if lg != nil {
2546 lg.Warn("failed to update cluster version", zap.Error(err))
2547 } else {
2548 plog.Errorf("error updating cluster version (%v)", err)
2549 }
2550 }
2551}
2552
2553func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
2554 switch err {
2555 case context.Canceled:
2556 return ErrCanceled
2557
2558 case context.DeadlineExceeded:
2559 s.leadTimeMu.RLock()
2560 curLeadElected := s.leadElectedTime
2561 s.leadTimeMu.RUnlock()
2562 prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond)
2563 if start.After(prevLeadLost) && start.Before(curLeadElected) {
2564 return ErrTimeoutDueToLeaderFail
2565 }
2566 lead := types.ID(s.getLead())
2567 switch lead {
2568 case types.ID(raft.None):
2569 // TODO: return error to specify it happens because the cluster does not have leader now
2570 case s.ID():
2571 if !isConnectedToQuorumSince(s.r.transport, start, s.ID(), s.cluster.Members()) {
2572 return ErrTimeoutDueToConnectionLost
2573 }
2574 default:
2575 if !isConnectedSince(s.r.transport, start, lead) {
2576 return ErrTimeoutDueToConnectionLost
2577 }
2578 }
2579 return ErrTimeout
2580
2581 default:
2582 return err
2583 }
2584}
2585
2586func (s *EtcdServer) KV() mvcc.ConsistentWatchableKV { return s.kv }
2587func (s *EtcdServer) Backend() backend.Backend {
2588 s.bemu.Lock()
2589 defer s.bemu.Unlock()
2590 return s.be
2591}
2592
2593func (s *EtcdServer) AuthStore() auth.AuthStore { return s.authStore }
2594
2595func (s *EtcdServer) restoreAlarms() error {
2596 s.applyV3 = s.newApplierV3()
2597 as, err := v3alarm.NewAlarmStore(s)
2598 if err != nil {
2599 return err
2600 }
2601 s.alarmStore = as
2602 if len(as.Get(pb.AlarmType_NOSPACE)) > 0 {
2603 s.applyV3 = newApplierV3Capped(s.applyV3)
2604 }
2605 if len(as.Get(pb.AlarmType_CORRUPT)) > 0 {
2606 s.applyV3 = newApplierV3Corrupt(s.applyV3)
2607 }
2608 return nil
2609}
2610
2611// goAttach creates a goroutine on a given function and tracks it using
2612// the etcdserver waitgroup.
2613func (s *EtcdServer) goAttach(f func()) {
2614 s.wgMu.RLock() // this blocks with ongoing close(s.stopping)
2615 defer s.wgMu.RUnlock()
2616 select {
2617 case <-s.stopping:
2618 if lg := s.getLogger(); lg != nil {
2619 lg.Warn("server has stopped; skipping goAttach")
2620 } else {
2621 plog.Warning("server has stopped (skipping goAttach)")
2622 }
2623 return
2624 default:
2625 }
2626
2627 // now safe to add since waitgroup wait has not started yet
2628 s.wg.Add(1)
2629 go func() {
2630 defer s.wg.Done()
2631 f()
2632 }()
2633}
2634
2635func (s *EtcdServer) Alarms() []*pb.AlarmMember {
2636 return s.alarmStore.Get(pb.AlarmType_NONE)
2637}
2638
2639func (s *EtcdServer) Logger() *zap.Logger {
2640 return s.lg
2641}
2642
2643// IsLearner returns if the local member is raft learner
2644func (s *EtcdServer) IsLearner() bool {
2645 return s.cluster.IsLocalMemberLearner()
2646}
2647
2648// IsMemberExist returns if the member with the given id exists in cluster.
2649func (s *EtcdServer) IsMemberExist(id types.ID) bool {
2650 return s.cluster.IsMemberExist(id)
2651}
2652
2653// raftStatus returns the raft status of this etcd node.
2654func (s *EtcdServer) raftStatus() raft.Status {
2655 return s.r.Node.Status()
2656}