blob: 281c84b16ef07fc88476f7483ba958e4c377e336 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18 "context"
19 "encoding/json"
20 "expvar"
21 "fmt"
22 "math"
23 "math/rand"
24 "net/http"
25 "os"
26 "path"
27 "regexp"
28 "sync"
29 "sync/atomic"
30 "time"
31
32 "github.com/coreos/etcd/alarm"
33 "github.com/coreos/etcd/auth"
34 "github.com/coreos/etcd/compactor"
35 "github.com/coreos/etcd/discovery"
36 "github.com/coreos/etcd/etcdserver/api"
37 "github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
38 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
39 "github.com/coreos/etcd/etcdserver/membership"
40 "github.com/coreos/etcd/etcdserver/stats"
41 "github.com/coreos/etcd/lease"
42 "github.com/coreos/etcd/lease/leasehttp"
43 "github.com/coreos/etcd/mvcc"
44 "github.com/coreos/etcd/mvcc/backend"
45 "github.com/coreos/etcd/pkg/fileutil"
46 "github.com/coreos/etcd/pkg/idutil"
47 "github.com/coreos/etcd/pkg/pbutil"
48 "github.com/coreos/etcd/pkg/runtime"
49 "github.com/coreos/etcd/pkg/schedule"
50 "github.com/coreos/etcd/pkg/types"
51 "github.com/coreos/etcd/pkg/wait"
52 "github.com/coreos/etcd/raft"
53 "github.com/coreos/etcd/raft/raftpb"
54 "github.com/coreos/etcd/rafthttp"
55 "github.com/coreos/etcd/snap"
56 "github.com/coreos/etcd/store"
57 "github.com/coreos/etcd/version"
58 "github.com/coreos/etcd/wal"
59
60 "github.com/coreos/go-semver/semver"
61 "github.com/coreos/pkg/capnslog"
62 "github.com/prometheus/client_golang/prometheus"
63)
64
65const (
66 DefaultSnapCount = 100000
67
68 StoreClusterPrefix = "/0"
69 StoreKeysPrefix = "/1"
70
71 // HealthInterval is the minimum time the cluster should be healthy
72 // before accepting add member requests.
73 HealthInterval = 5 * time.Second
74
75 purgeFileInterval = 30 * time.Second
76 // monitorVersionInterval should be smaller than the timeout
77 // on the connection. Or we will not be able to reuse the connection
78 // (since it will timeout).
79 monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second
80
81 // max number of in-flight snapshot messages etcdserver allows to have
82 // This number is more than enough for most clusters with 5 machines.
83 maxInFlightMsgSnap = 16
84
85 releaseDelayAfterSnapshot = 30 * time.Second
86
87 // maxPendingRevokes is the maximum number of outstanding expired lease revocations.
88 maxPendingRevokes = 16
89
90 recommendedMaxRequestBytes = 10 * 1024 * 1024
91)
92
93var (
94 plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdserver")
95
96 storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes"))
97)
98
99func init() {
100 rand.Seed(time.Now().UnixNano())
101
102 expvar.Publish(
103 "file_descriptor_limit",
104 expvar.Func(
105 func() interface{} {
106 n, _ := runtime.FDLimit()
107 return n
108 },
109 ),
110 )
111}
112
113type Response struct {
114 Term uint64
115 Index uint64
116 Event *store.Event
117 Watcher store.Watcher
118 Err error
119}
120
121type ServerV2 interface {
122 Server
123 // Do takes a V2 request and attempts to fulfill it, returning a Response.
124 Do(ctx context.Context, r pb.Request) (Response, error)
125 stats.Stats
126 ClientCertAuthEnabled() bool
127}
128
129type ServerV3 interface {
130 Server
131 ID() types.ID
132 RaftTimer
133}
134
135func (s *EtcdServer) ClientCertAuthEnabled() bool { return s.Cfg.ClientCertAuthEnabled }
136
137type Server interface {
138 // Leader returns the ID of the leader Server.
139 Leader() types.ID
140
141 // AddMember attempts to add a member into the cluster. It will return
142 // ErrIDRemoved if member ID is removed from the cluster, or return
143 // ErrIDExists if member ID exists in the cluster.
144 AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
145 // RemoveMember attempts to remove a member from the cluster. It will
146 // return ErrIDRemoved if member ID is removed from the cluster, or return
147 // ErrIDNotFound if member ID is not in the cluster.
148 RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error)
149 // UpdateMember attempts to update an existing member in the cluster. It will
150 // return ErrIDNotFound if the member ID does not exist.
151 UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error)
152
153 // ClusterVersion is the cluster-wide minimum major.minor version.
154 // Cluster version is set to the min version that an etcd member is
155 // compatible with when first bootstrap.
156 //
157 // ClusterVersion is nil until the cluster is bootstrapped (has a quorum).
158 //
159 // During a rolling upgrades, the ClusterVersion will be updated
160 // automatically after a sync. (5 second by default)
161 //
162 // The API/raft component can utilize ClusterVersion to determine if
163 // it can accept a client request or a raft RPC.
164 // NOTE: ClusterVersion might be nil when etcd 2.1 works with etcd 2.0 and
165 // the leader is etcd 2.0. etcd 2.0 leader will not update clusterVersion since
166 // this feature is introduced post 2.0.
167 ClusterVersion() *semver.Version
168 Cluster() api.Cluster
169 Alarms() []*pb.AlarmMember
170}
171
172// EtcdServer is the production implementation of the Server interface
173type EtcdServer struct {
174 // inflightSnapshots holds count the number of snapshots currently inflight.
175 inflightSnapshots int64 // must use atomic operations to access; keep 64-bit aligned.
176 appliedIndex uint64 // must use atomic operations to access; keep 64-bit aligned.
177 committedIndex uint64 // must use atomic operations to access; keep 64-bit aligned.
178 // consistIndex used to hold the offset of current executing entry
179 // It is initialized to 0 before executing any entry.
180 consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned.
181 r raftNode // uses 64-bit atomics; keep 64-bit aligned.
182
183 readych chan struct{}
184 Cfg ServerConfig
185
186 w wait.Wait
187
188 readMu sync.RWMutex
189 // read routine notifies etcd server that it waits for reading by sending an empty struct to
190 // readwaitC
191 readwaitc chan struct{}
192 // readNotifier is used to notify the read routine that it can process the request
193 // when there is no error
194 readNotifier *notifier
195
196 // stop signals the run goroutine should shutdown.
197 stop chan struct{}
198 // stopping is closed by run goroutine on shutdown.
199 stopping chan struct{}
200 // done is closed when all goroutines from start() complete.
201 done chan struct{}
202 leaderChanged chan struct{}
203 leaderChangedMu sync.RWMutex
204
205 errorc chan error
206 id types.ID
207 attributes membership.Attributes
208
209 cluster *membership.RaftCluster
210
211 store store.Store
212 snapshotter *snap.Snapshotter
213
214 applyV2 ApplierV2
215
216 // applyV3 is the applier with auth and quotas
217 applyV3 applierV3
218 // applyV3Base is the core applier without auth or quotas
219 applyV3Base applierV3
220 applyWait wait.WaitTime
221
222 kv mvcc.ConsistentWatchableKV
223 lessor lease.Lessor
224 bemu sync.Mutex
225 be backend.Backend
226 authStore auth.AuthStore
227 alarmStore *alarm.AlarmStore
228
229 stats *stats.ServerStats
230 lstats *stats.LeaderStats
231
232 SyncTicker *time.Ticker
233 // compactor is used to auto-compact the KV.
234 compactor compactor.Compactor
235
236 // peerRt used to send requests (version, lease) to peers.
237 peerRt http.RoundTripper
238 reqIDGen *idutil.Generator
239
240 // forceVersionC is used to force the version monitor loop
241 // to detect the cluster version immediately.
242 forceVersionC chan struct{}
243
244 // wgMu blocks concurrent waitgroup mutation while server stopping
245 wgMu sync.RWMutex
246 // wg is used to wait for the go routines that depends on the server state
247 // to exit when stopping the server.
248 wg sync.WaitGroup
249
250 // ctx is used for etcd-initiated requests that may need to be canceled
251 // on etcd server shutdown.
252 ctx context.Context
253 cancel context.CancelFunc
254
255 leadTimeMu sync.RWMutex
256 leadElectedTime time.Time
257}
258
259// NewServer creates a new EtcdServer from the supplied configuration. The
260// configuration is considered static for the lifetime of the EtcdServer.
261func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
262 st := store.New(StoreClusterPrefix, StoreKeysPrefix)
263
264 var (
265 w *wal.WAL
266 n raft.Node
267 s *raft.MemoryStorage
268 id types.ID
269 cl *membership.RaftCluster
270 )
271
272 if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
273 plog.Warningf("MaxRequestBytes %v exceeds maximum recommended size %v", cfg.MaxRequestBytes, recommendedMaxRequestBytes)
274 }
275
276 if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
277 return nil, fmt.Errorf("cannot access data directory: %v", terr)
278 }
279
280 haveWAL := wal.Exist(cfg.WALDir())
281
282 if err = fileutil.TouchDirAll(cfg.SnapDir()); err != nil {
283 plog.Fatalf("create snapshot directory error: %v", err)
284 }
285 ss := snap.New(cfg.SnapDir())
286
287 bepath := cfg.backendPath()
288 beExist := fileutil.Exist(bepath)
289 be := openBackend(cfg)
290
291 defer func() {
292 if err != nil {
293 be.Close()
294 }
295 }()
296
297 prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout())
298 if err != nil {
299 return nil, err
300 }
301 var (
302 remotes []*membership.Member
303 snapshot *raftpb.Snapshot
304 )
305
306 switch {
307 case !haveWAL && !cfg.NewCluster:
308 if err = cfg.VerifyJoinExisting(); err != nil {
309 return nil, err
310 }
311 cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
312 if err != nil {
313 return nil, err
314 }
315 existingCluster, gerr := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), prt)
316 if gerr != nil {
317 return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr)
318 }
319 if err = membership.ValidateClusterAndAssignIDs(cl, existingCluster); err != nil {
320 return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
321 }
322 if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, prt) {
323 return nil, fmt.Errorf("incompatible with current running cluster")
324 }
325
326 remotes = existingCluster.Members()
327 cl.SetID(existingCluster.ID())
328 cl.SetStore(st)
329 cl.SetBackend(be)
330 cfg.Print()
331 id, n, s, w = startNode(cfg, cl, nil)
332 case !haveWAL && cfg.NewCluster:
333 if err = cfg.VerifyBootstrap(); err != nil {
334 return nil, err
335 }
336 cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
337 if err != nil {
338 return nil, err
339 }
340 m := cl.MemberByName(cfg.Name)
341 if isMemberBootstrapped(cl, cfg.Name, prt, cfg.bootstrapTimeout()) {
342 return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
343 }
344 if cfg.ShouldDiscover() {
345 var str string
346 str, err = discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String())
347 if err != nil {
348 return nil, &DiscoveryError{Op: "join", Err: err}
349 }
350 var urlsmap types.URLsMap
351 urlsmap, err = types.NewURLsMap(str)
352 if err != nil {
353 return nil, err
354 }
355 if checkDuplicateURL(urlsmap) {
356 return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap)
357 }
358 if cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, urlsmap); err != nil {
359 return nil, err
360 }
361 }
362 cl.SetStore(st)
363 cl.SetBackend(be)
364 cfg.PrintWithInitial()
365 id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
366 case haveWAL:
367 if err = fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
368 return nil, fmt.Errorf("cannot write to member directory: %v", err)
369 }
370
371 if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
372 return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
373 }
374
375 if cfg.ShouldDiscover() {
376 plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
377 }
378
379 // Find a snapshot to start/restart a raft node
380 walSnaps, serr := wal.ValidSnapshotEntries(cfg.WALDir())
381 if serr != nil {
382 return nil, serr
383 }
384 // snapshot files can be orphaned if etcd crashes after writing them but before writing the corresponding
385 // wal log entries
386 snapshot, err = ss.LoadNewestAvailable(walSnaps)
387 if err != nil && err != snap.ErrNoSnapshot {
388 return nil, err
389 }
390 if snapshot != nil {
391 if err = st.Recovery(snapshot.Data); err != nil {
392 plog.Panicf("recovered store from snapshot error: %v", err)
393 }
394 plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index)
395 if be, err = recoverSnapshotBackend(cfg, be, *snapshot); err != nil {
396 plog.Panicf("recovering backend from snapshot error: %v", err)
397 }
398 }
399 cfg.Print()
400 if !cfg.ForceNewCluster {
401 id, cl, n, s, w = restartNode(cfg, snapshot)
402 } else {
403 id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot)
404 }
405 cl.SetStore(st)
406 cl.SetBackend(be)
407 cl.Recover(api.UpdateCapability)
408 if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
409 os.RemoveAll(bepath)
410 return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
411 }
412 default:
413 return nil, fmt.Errorf("unsupported bootstrap config")
414 }
415
416 if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
417 return nil, fmt.Errorf("cannot access member directory: %v", terr)
418 }
419
420 sstats := stats.NewServerStats(cfg.Name, id.String())
421 lstats := stats.NewLeaderStats(id.String())
422
423 heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
424 srv = &EtcdServer{
425 readych: make(chan struct{}),
426 Cfg: cfg,
427 errorc: make(chan error, 1),
428 store: st,
429 snapshotter: ss,
430 r: *newRaftNode(
431 raftNodeConfig{
432 isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
433 Node: n,
434 heartbeat: heartbeat,
435 raftStorage: s,
436 storage: NewStorage(w, ss),
437 },
438 ),
439 id: id,
440 attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
441 cluster: cl,
442 stats: sstats,
443 lstats: lstats,
444 SyncTicker: time.NewTicker(500 * time.Millisecond),
445 peerRt: prt,
446 reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
447 forceVersionC: make(chan struct{}),
448 }
449 serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1)
450
451 srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
452
453 srv.be = be
454 minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
455
456 // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
457 // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
458 srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds())))
459
460 tp, err := auth.NewTokenProvider(cfg.AuthToken,
461 func(index uint64) <-chan struct{} {
462 return srv.applyWait.Wait(index)
463 },
464 time.Duration(cfg.TokenTTL)*time.Second,
465 )
466 if err != nil {
467 plog.Warningf("failed to create token provider,err is %v", err)
468 return nil, err
469 }
470 srv.authStore = auth.NewAuthStore(srv.be, tp)
471
472 srv.kv = mvcc.New(srv.be, srv.lessor, srv.authStore, &srv.consistIndex)
473 if beExist {
474 kvindex := srv.kv.ConsistentIndex()
475 // TODO: remove kvindex != 0 checking when we do not expect users to upgrade
476 // etcd from pre-3.0 release.
477 if snapshot != nil && kvindex < snapshot.Metadata.Index {
478 if kvindex != 0 {
479 return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d).", bepath, kvindex, snapshot.Metadata.Index)
480 }
481 plog.Warningf("consistent index never saved (snapshot index=%d)", snapshot.Metadata.Index)
482 }
483 }
484 newSrv := srv // since srv == nil in defer if srv is returned as nil
485 defer func() {
486 // closing backend without first closing kv can cause
487 // resumed compactions to fail with closed tx errors
488 if err != nil {
489 newSrv.kv.Close()
490 }
491 }()
492
493 srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
494 if num := cfg.AutoCompactionRetention; num != 0 {
495 srv.compactor, err = compactor.New(cfg.AutoCompactionMode, num, srv.kv, srv)
496 if err != nil {
497 return nil, err
498 }
499 srv.compactor.Run()
500 }
501
502 srv.applyV3Base = srv.newApplierV3Backend()
503 if err = srv.restoreAlarms(); err != nil {
504 return nil, err
505 }
506
507 // TODO: move transport initialization near the definition of remote
508 tr := &rafthttp.Transport{
509 TLSInfo: cfg.PeerTLSInfo,
510 DialTimeout: cfg.peerDialTimeout(),
511 ID: id,
512 URLs: cfg.PeerURLs,
513 ClusterID: cl.ID(),
514 Raft: srv,
515 Snapshotter: ss,
516 ServerStats: sstats,
517 LeaderStats: lstats,
518 ErrorC: srv.errorc,
519 }
520 if err = tr.Start(); err != nil {
521 return nil, err
522 }
523 // add all remotes into transport
524 for _, m := range remotes {
525 if m.ID != id {
526 tr.AddRemote(m.ID, m.PeerURLs)
527 }
528 }
529 for _, m := range cl.Members() {
530 if m.ID != id {
531 tr.AddPeer(m.ID, m.PeerURLs)
532 }
533 }
534 srv.r.transport = tr
535
536 return srv, nil
537}
538
539func (s *EtcdServer) adjustTicks() {
540 clusterN := len(s.cluster.Members())
541
542 // single-node fresh start, or single-node recovers from snapshot
543 if clusterN == 1 {
544 ticks := s.Cfg.ElectionTicks - 1
545 plog.Infof("%s as single-node; fast-forwarding %d ticks (election ticks %d)", s.ID(), ticks, s.Cfg.ElectionTicks)
546 s.r.advanceTicks(ticks)
547 return
548 }
549
550 if !s.Cfg.InitialElectionTickAdvance {
551 plog.Infof("skipping initial election tick advance (election tick %d)", s.Cfg.ElectionTicks)
552 return
553 }
554
555 // retry up to "rafthttp.ConnReadTimeout", which is 5-sec
556 // until peer connection reports; otherwise:
557 // 1. all connections failed, or
558 // 2. no active peers, or
559 // 3. restarted single-node with no snapshot
560 // then, do nothing, because advancing ticks would have no effect
561 waitTime := rafthttp.ConnReadTimeout
562 itv := 50 * time.Millisecond
563 for i := int64(0); i < int64(waitTime/itv); i++ {
564 select {
565 case <-time.After(itv):
566 case <-s.stopping:
567 return
568 }
569
570 peerN := s.r.transport.ActivePeers()
571 if peerN > 1 {
572 // multi-node received peer connection reports
573 // adjust ticks, in case slow leader message receive
574 ticks := s.Cfg.ElectionTicks - 2
575 plog.Infof("%s initialzed peer connection; fast-forwarding %d ticks (election ticks %d) with %d active peer(s)", s.ID(), ticks, s.Cfg.ElectionTicks, peerN)
576 s.r.advanceTicks(ticks)
577 return
578 }
579 }
580}
581
582// Start performs any initialization of the Server necessary for it to
583// begin serving requests. It must be called before Do or Process.
584// Start must be non-blocking; any long-running server functionality
585// should be implemented in goroutines.
586func (s *EtcdServer) Start() {
587 s.start()
588 s.goAttach(func() { s.adjustTicks() })
589 s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
590 s.goAttach(s.purgeFile)
591 s.goAttach(func() { monitorFileDescriptor(s.stopping) })
592 s.goAttach(s.monitorVersions)
593 s.goAttach(s.linearizableReadLoop)
594 s.goAttach(s.monitorKVHash)
595}
596
597// start prepares and starts server in a new goroutine. It is no longer safe to
598// modify a server's fields after it has been sent to Start.
599// This function is just used for testing.
600func (s *EtcdServer) start() {
601 if s.Cfg.SnapCount == 0 {
602 plog.Infof("set snapshot count to default %d", DefaultSnapCount)
603 s.Cfg.SnapCount = DefaultSnapCount
604 }
605 s.w = wait.New()
606 s.applyWait = wait.NewTimeList()
607 s.done = make(chan struct{})
608 s.stop = make(chan struct{})
609 s.stopping = make(chan struct{})
610 s.ctx, s.cancel = context.WithCancel(context.Background())
611 s.readwaitc = make(chan struct{}, 1)
612 s.readNotifier = newNotifier()
613 s.leaderChanged = make(chan struct{})
614 if s.ClusterVersion() != nil {
615 plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String()))
616 membership.ClusterVersionMetrics.With(prometheus.Labels{"cluster_version": version.Cluster(s.ClusterVersion().String())}).Set(1)
617 } else {
618 plog.Infof("starting server... [version: %v, cluster version: to_be_decided]", version.Version)
619 }
620 // TODO: if this is an empty log, writes all peer infos
621 // into the first entry
622 go s.run()
623}
624
625func (s *EtcdServer) purgeFile() {
626 var dberrc, serrc, werrc <-chan error
627 var dbdonec, sdonec, wdonec <-chan struct{}
628 if s.Cfg.MaxSnapFiles > 0 {
629 dbdonec, dberrc = fileutil.PurgeFileWithDoneNotify(s.Cfg.SnapDir(), "snap.db", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping)
630 sdonec, serrc = fileutil.PurgeFileWithDoneNotify(s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.stopping)
631 }
632 if s.Cfg.MaxWALFiles > 0 {
633 wdonec, werrc = fileutil.PurgeFileWithDoneNotify(s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.stopping)
634 }
635 select {
636 case e := <-dberrc:
637 plog.Fatalf("failed to purge snap db file %v", e)
638 case e := <-serrc:
639 plog.Fatalf("failed to purge snap file %v", e)
640 case e := <-werrc:
641 plog.Fatalf("failed to purge wal file %v", e)
642 case <-s.stopping:
643 if dbdonec != nil {
644 <-dbdonec
645 }
646 if sdonec != nil {
647 <-sdonec
648 }
649 if wdonec != nil {
650 <-wdonec
651 }
652 return
653 }
654}
655
656func (s *EtcdServer) ID() types.ID { return s.id }
657
658func (s *EtcdServer) Cluster() api.Cluster { return s.cluster }
659
660func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) }
661
662type ServerPeer interface {
663 ServerV2
664 RaftHandler() http.Handler
665 LeaseHandler() http.Handler
666}
667
668func (s *EtcdServer) LeaseHandler() http.Handler {
669 if s.lessor == nil {
670 return nil
671 }
672 return leasehttp.NewHandler(s.lessor, s.ApplyWait)
673}
674
675func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
676
677// Process takes a raft message and applies it to the server's raft state
678// machine, respecting any timeout of the given context.
679func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
680 if s.cluster.IsIDRemoved(types.ID(m.From)) {
681 plog.Warningf("reject message from removed member %s", types.ID(m.From).String())
682 return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member")
683 }
684 if m.Type == raftpb.MsgApp {
685 s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size())
686 }
687 return s.r.Step(ctx, m)
688}
689
690func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.cluster.IsIDRemoved(types.ID(id)) }
691
692func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) }
693
694// ReportSnapshot reports snapshot sent status to the raft state machine,
695// and clears the used snapshot from the snapshot store.
696func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
697 s.r.ReportSnapshot(id, status)
698}
699
700type etcdProgress struct {
701 confState raftpb.ConfState
702 snapi uint64
703 appliedt uint64
704 appliedi uint64
705}
706
707// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode,
708// and helps decouple state machine logic from Raft algorithms.
709// TODO: add a state machine interface to apply the commit entries and do snapshot/recover
710type raftReadyHandler struct {
711 updateLeadership func(newLeader bool)
712 updateCommittedIndex func(uint64)
713}
714
715func (s *EtcdServer) run() {
716 sn, err := s.r.raftStorage.Snapshot()
717 if err != nil {
718 plog.Panicf("get snapshot from raft storage error: %v", err)
719 }
720
721 // asynchronously accept apply packets, dispatch progress in-order
722 sched := schedule.NewFIFOScheduler()
723
724 var (
725 smu sync.RWMutex
726 syncC <-chan time.Time
727 )
728 setSyncC := func(ch <-chan time.Time) {
729 smu.Lock()
730 syncC = ch
731 smu.Unlock()
732 }
733 getSyncC := func() (ch <-chan time.Time) {
734 smu.RLock()
735 ch = syncC
736 smu.RUnlock()
737 return
738 }
739 rh := &raftReadyHandler{
740 updateLeadership: func(newLeader bool) {
741 if !s.isLeader() {
742 if s.lessor != nil {
743 s.lessor.Demote()
744 }
745 if s.compactor != nil {
746 s.compactor.Pause()
747 }
748 setSyncC(nil)
749 } else {
750 if newLeader {
751 t := time.Now()
752 s.leadTimeMu.Lock()
753 s.leadElectedTime = t
754 s.leadTimeMu.Unlock()
755 }
756 setSyncC(s.SyncTicker.C)
757 if s.compactor != nil {
758 s.compactor.Resume()
759 }
760 }
761 if newLeader {
762 select {
763 case s.leaderChanged <- struct{}{}:
764 default:
765 }
766 s.leaderChangedMu.Lock()
767 lc := s.leaderChanged
768 s.leaderChanged = make(chan struct{})
769 s.leaderChangedMu.Unlock()
770 close(lc)
771 }
772
773 // TODO: remove the nil checking
774 // current test utility does not provide the stats
775 if s.stats != nil {
776 s.stats.BecomeLeader()
777 }
778 },
779 updateCommittedIndex: func(ci uint64) {
780 cci := s.getCommittedIndex()
781 if ci > cci {
782 s.setCommittedIndex(ci)
783 }
784 },
785 }
786 s.r.start(rh)
787
788 ep := etcdProgress{
789 confState: sn.Metadata.ConfState,
790 snapi: sn.Metadata.Index,
791 appliedt: sn.Metadata.Term,
792 appliedi: sn.Metadata.Index,
793 }
794
795 defer func() {
796 s.wgMu.Lock() // block concurrent waitgroup adds in goAttach while stopping
797 close(s.stopping)
798 s.wgMu.Unlock()
799 s.cancel()
800
801 sched.Stop()
802
803 // wait for gouroutines before closing raft so wal stays open
804 s.wg.Wait()
805
806 s.SyncTicker.Stop()
807
808 // must stop raft after scheduler-- etcdserver can leak rafthttp pipelines
809 // by adding a peer after raft stops the transport
810 s.r.stop()
811
812 // kv, lessor and backend can be nil if running without v3 enabled
813 // or running unit tests.
814 if s.lessor != nil {
815 s.lessor.Stop()
816 }
817 if s.kv != nil {
818 s.kv.Close()
819 }
820 if s.authStore != nil {
821 s.authStore.Close()
822 }
823 if s.be != nil {
824 s.be.Close()
825 }
826 if s.compactor != nil {
827 s.compactor.Stop()
828 }
829 close(s.done)
830 }()
831
832 var expiredLeaseC <-chan []*lease.Lease
833 if s.lessor != nil {
834 expiredLeaseC = s.lessor.ExpiredLeasesC()
835 }
836
837 for {
838 select {
839 case ap := <-s.r.apply():
840 f := func(context.Context) { s.applyAll(&ep, &ap) }
841 sched.Schedule(f)
842 case leases := <-expiredLeaseC:
843 s.goAttach(func() {
844 // Increases throughput of expired leases deletion process through parallelization
845 c := make(chan struct{}, maxPendingRevokes)
846 for _, lease := range leases {
847 select {
848 case c <- struct{}{}:
849 case <-s.stopping:
850 return
851 }
852 lid := lease.ID
853 s.goAttach(func() {
854 ctx := s.authStore.WithRoot(s.ctx)
855 _, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
856 if lerr == nil {
857 leaseExpired.Inc()
858 } else {
859 plog.Warningf("failed to revoke %016x (%q)", lid, lerr.Error())
860 }
861
862 <-c
863 })
864 }
865 })
866 case err := <-s.errorc:
867 plog.Errorf("%s", err)
868 plog.Infof("the data-dir used by this member must be removed.")
869 return
870 case <-getSyncC():
871 if s.store.HasTTLKeys() {
872 s.sync(s.Cfg.ReqTimeout())
873 }
874 case <-s.stop:
875 return
876 }
877 }
878}
879
880func (s *EtcdServer) leaderChangedNotify() <-chan struct{} {
881 s.leaderChangedMu.RLock()
882 defer s.leaderChangedMu.RUnlock()
883 return s.leaderChanged
884}
885
886func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
887 s.applySnapshot(ep, apply)
888 s.applyEntries(ep, apply)
889
890 proposalsApplied.Set(float64(ep.appliedi))
891 s.applyWait.Trigger(ep.appliedi)
892 // wait for the raft routine to finish the disk writes before triggering a
893 // snapshot. or applied index might be greater than the last index in raft
894 // storage, since the raft routine might be slower than apply routine.
895 <-apply.notifyc
896
897 s.triggerSnapshot(ep)
898 select {
899 // snapshot requested via send()
900 case m := <-s.r.msgSnapC:
901 merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
902 s.sendMergedSnap(merged)
903 default:
904 }
905}
906
907func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
908 if raft.IsEmptySnap(apply.snapshot) {
909 return
910 }
911 applySnapshotInProgress.Inc()
912 plog.Infof("applying snapshot at index %d...", ep.snapi)
913 defer func() {
914 plog.Infof("finished applying incoming snapshot at index %d", ep.snapi)
915 applySnapshotInProgress.Dec()
916 }()
917
918 if apply.snapshot.Metadata.Index <= ep.appliedi {
919 plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1",
920 apply.snapshot.Metadata.Index, ep.appliedi)
921 }
922
923 // wait for raftNode to persist snapshot onto the disk
924 <-apply.notifyc
925
926 newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot)
927 if err != nil {
928 plog.Panic(err)
929 }
930
931 // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
932 // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
933 if s.lessor != nil {
934 plog.Info("recovering lessor...")
935 s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write() })
936 plog.Info("finished recovering lessor")
937 }
938
939 plog.Info("restoring mvcc store...")
940
941 if err := s.kv.Restore(newbe); err != nil {
942 plog.Panicf("restore KV error: %v", err)
943 }
944 s.consistIndex.setConsistentIndex(s.kv.ConsistentIndex())
945
946 plog.Info("finished restoring mvcc store")
947
948 // Closing old backend might block until all the txns
949 // on the backend are finished.
950 // We do not want to wait on closing the old backend.
951 s.bemu.Lock()
952 oldbe := s.be
953 go func() {
954 plog.Info("closing old backend...")
955 defer plog.Info("finished closing old backend")
956
957 if err := oldbe.Close(); err != nil {
958 plog.Panicf("close backend error: %v", err)
959 }
960 }()
961
962 s.be = newbe
963 s.bemu.Unlock()
964
965 plog.Info("recovering alarms...")
966 if err := s.restoreAlarms(); err != nil {
967 plog.Panicf("restore alarms error: %v", err)
968 }
969 plog.Info("finished recovering alarms")
970
971 if s.authStore != nil {
972 plog.Info("recovering auth store...")
973 s.authStore.Recover(newbe)
974 plog.Info("finished recovering auth store")
975 }
976
977 plog.Info("recovering store v2...")
978 if err := s.store.Recovery(apply.snapshot.Data); err != nil {
979 plog.Panicf("recovery store error: %v", err)
980 }
981 plog.Info("finished recovering store v2")
982
983 s.cluster.SetBackend(s.be)
984 plog.Info("recovering cluster configuration...")
985 s.cluster.Recover(api.UpdateCapability)
986 plog.Info("finished recovering cluster configuration")
987
988 plog.Info("removing old peers from network...")
989 // recover raft transport
990 s.r.transport.RemoveAllPeers()
991 plog.Info("finished removing old peers from network")
992
993 plog.Info("adding peers from new cluster configuration into network...")
994 for _, m := range s.cluster.Members() {
995 if m.ID == s.ID() {
996 continue
997 }
998 s.r.transport.AddPeer(m.ID, m.PeerURLs)
999 }
1000 plog.Info("finished adding peers from new cluster configuration into network...")
1001
1002 ep.appliedt = apply.snapshot.Metadata.Term
1003 ep.appliedi = apply.snapshot.Metadata.Index
1004 ep.snapi = ep.appliedi
1005 ep.confState = apply.snapshot.Metadata.ConfState
1006}
1007
1008func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
1009 if len(apply.entries) == 0 {
1010 return
1011 }
1012 firsti := apply.entries[0].Index
1013 if firsti > ep.appliedi+1 {
1014 plog.Panicf("first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, ep.appliedi)
1015 }
1016 var ents []raftpb.Entry
1017 if ep.appliedi+1-firsti < uint64(len(apply.entries)) {
1018 ents = apply.entries[ep.appliedi+1-firsti:]
1019 }
1020 if len(ents) == 0 {
1021 return
1022 }
1023 var shouldstop bool
1024 if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
1025 go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
1026 }
1027}
1028
1029func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
1030 if ep.appliedi-ep.snapi <= s.Cfg.SnapCount {
1031 return
1032 }
1033
1034 plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", ep.appliedi, ep.snapi)
1035 s.snapshot(ep.appliedi, ep.confState)
1036 ep.snapi = ep.appliedi
1037}
1038
1039func (s *EtcdServer) isMultiNode() bool {
1040 return s.cluster != nil && len(s.cluster.MemberIDs()) > 1
1041}
1042
1043func (s *EtcdServer) isLeader() bool {
1044 return uint64(s.ID()) == s.Lead()
1045}
1046
1047// MoveLeader transfers the leader to the given transferee.
1048func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) error {
1049 now := time.Now()
1050 interval := time.Duration(s.Cfg.TickMs) * time.Millisecond
1051
1052 plog.Infof("%s starts leadership transfer from %s to %s", s.ID(), types.ID(lead), types.ID(transferee))
1053 s.r.TransferLeadership(ctx, lead, transferee)
1054 for s.Lead() != transferee {
1055 select {
1056 case <-ctx.Done(): // time out
1057 return ErrTimeoutLeaderTransfer
1058 case <-time.After(interval):
1059 }
1060 }
1061
1062 // TODO: drain all requests, or drop all messages to the old leader
1063
1064 plog.Infof("%s finished leadership transfer from %s to %s (took %v)", s.ID(), types.ID(lead), types.ID(transferee), time.Since(now))
1065 return nil
1066}
1067
1068// TransferLeadership transfers the leader to the chosen transferee.
1069func (s *EtcdServer) TransferLeadership() error {
1070 if !s.isLeader() {
1071 plog.Printf("skipped leadership transfer for stopping non-leader member")
1072 return nil
1073 }
1074
1075 if !s.isMultiNode() {
1076 plog.Printf("skipped leadership transfer for single member cluster")
1077 return nil
1078 }
1079
1080 transferee, ok := longestConnected(s.r.transport, s.cluster.MemberIDs())
1081 if !ok {
1082 return ErrUnhealthy
1083 }
1084
1085 tm := s.Cfg.ReqTimeout()
1086 ctx, cancel := context.WithTimeout(s.ctx, tm)
1087 err := s.MoveLeader(ctx, s.Lead(), uint64(transferee))
1088 cancel()
1089 return err
1090}
1091
1092// HardStop stops the server without coordination with other members in the cluster.
1093func (s *EtcdServer) HardStop() {
1094 select {
1095 case s.stop <- struct{}{}:
1096 case <-s.done:
1097 return
1098 }
1099 <-s.done
1100}
1101
1102// Stop stops the server gracefully, and shuts down the running goroutine.
1103// Stop should be called after a Start(s), otherwise it will block forever.
1104// When stopping leader, Stop transfers its leadership to one of its peers
1105// before stopping the server.
1106// Stop terminates the Server and performs any necessary finalization.
1107// Do and Process cannot be called after Stop has been invoked.
1108func (s *EtcdServer) Stop() {
1109 if err := s.TransferLeadership(); err != nil {
1110 plog.Warningf("%s failed to transfer leadership (%v)", s.ID(), err)
1111 }
1112 s.HardStop()
1113}
1114
1115// ReadyNotify returns a channel that will be closed when the server
1116// is ready to serve client requests
1117func (s *EtcdServer) ReadyNotify() <-chan struct{} { return s.readych }
1118
1119func (s *EtcdServer) stopWithDelay(d time.Duration, err error) {
1120 select {
1121 case <-time.After(d):
1122 case <-s.done:
1123 }
1124 select {
1125 case s.errorc <- err:
1126 default:
1127 }
1128}
1129
1130// StopNotify returns a channel that receives a empty struct
1131// when the server is stopped.
1132func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done }
1133
1134func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }
1135
1136func (s *EtcdServer) LeaderStats() []byte {
1137 lead := atomic.LoadUint64(&s.r.lead)
1138 if lead != uint64(s.id) {
1139 return nil
1140 }
1141 return s.lstats.JSON()
1142}
1143
1144func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() }
1145
1146func (s *EtcdServer) checkMembershipOperationPermission(ctx context.Context) error {
1147 if s.authStore == nil {
1148 // In the context of ordinary etcd process, s.authStore will never be nil.
1149 // This branch is for handling cases in server_test.go
1150 return nil
1151 }
1152
1153 // Note that this permission check is done in the API layer,
1154 // so TOCTOU problem can be caused potentially in a schedule like this:
1155 // update membership with user A -> revoke root role of A -> apply membership change
1156 // in the state machine layer
1157 // However, both of membership change and role management requires the root privilege.
1158 // So careful operation by admins can prevent the problem.
1159 authInfo, err := s.AuthInfoFromCtx(ctx)
1160 if err != nil {
1161 return err
1162 }
1163
1164 return s.AuthStore().IsAdminPermitted(authInfo)
1165}
1166
1167func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
1168 if err := s.checkMembershipOperationPermission(ctx); err != nil {
1169 return nil, err
1170 }
1171
1172 if s.Cfg.StrictReconfigCheck {
1173 // by default StrictReconfigCheck is enabled; reject new members if unhealthy
1174 if !s.cluster.IsReadyToAddNewMember() {
1175 plog.Warningf("not enough started members, rejecting member add %+v", memb)
1176 return nil, ErrNotEnoughStartedMembers
1177 }
1178 if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), s.cluster.Members()) {
1179 plog.Warningf("not healthy for reconfigure, rejecting member add %+v", memb)
1180 return nil, ErrUnhealthy
1181 }
1182 }
1183
1184 // TODO: move Member to protobuf type
1185 b, err := json.Marshal(memb)
1186 if err != nil {
1187 return nil, err
1188 }
1189 cc := raftpb.ConfChange{
1190 Type: raftpb.ConfChangeAddNode,
1191 NodeID: uint64(memb.ID),
1192 Context: b,
1193 }
1194 return s.configure(ctx, cc)
1195}
1196
1197func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
1198 if err := s.checkMembershipOperationPermission(ctx); err != nil {
1199 return nil, err
1200 }
1201
1202 // by default StrictReconfigCheck is enabled; reject removal if leads to quorum loss
1203 if err := s.mayRemoveMember(types.ID(id)); err != nil {
1204 return nil, err
1205 }
1206
1207 cc := raftpb.ConfChange{
1208 Type: raftpb.ConfChangeRemoveNode,
1209 NodeID: id,
1210 }
1211 return s.configure(ctx, cc)
1212}
1213
1214func (s *EtcdServer) mayRemoveMember(id types.ID) error {
1215 if !s.Cfg.StrictReconfigCheck {
1216 return nil
1217 }
1218
1219 if !s.cluster.IsReadyToRemoveMember(uint64(id)) {
1220 plog.Warningf("not enough started members, rejecting remove member %s", id)
1221 return ErrNotEnoughStartedMembers
1222 }
1223
1224 // downed member is safe to remove since it's not part of the active quorum
1225 if t := s.r.transport.ActiveSince(id); id != s.ID() && t.IsZero() {
1226 return nil
1227 }
1228
1229 // protect quorum if some members are down
1230 m := s.cluster.Members()
1231 active := numConnectedSince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), m)
1232 if (active - 1) < 1+((len(m)-1)/2) {
1233 plog.Warningf("reconfigure breaks active quorum, rejecting remove member %s", id)
1234 return ErrUnhealthy
1235 }
1236
1237 return nil
1238}
1239
1240func (s *EtcdServer) UpdateMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
1241 b, merr := json.Marshal(memb)
1242 if merr != nil {
1243 return nil, merr
1244 }
1245
1246 if err := s.checkMembershipOperationPermission(ctx); err != nil {
1247 return nil, err
1248 }
1249 cc := raftpb.ConfChange{
1250 Type: raftpb.ConfChangeUpdateNode,
1251 NodeID: uint64(memb.ID),
1252 Context: b,
1253 }
1254 return s.configure(ctx, cc)
1255}
1256
1257// Implement the RaftTimer interface
1258
1259func (s *EtcdServer) Index() uint64 { return atomic.LoadUint64(&s.r.index) }
1260
1261func (s *EtcdServer) Term() uint64 { return atomic.LoadUint64(&s.r.term) }
1262
1263// Lead is only for testing purposes.
1264// TODO: add Raft server interface to expose raft related info:
1265// Index, Term, Lead, Committed, Applied, LastIndex, etc.
1266func (s *EtcdServer) Lead() uint64 { return atomic.LoadUint64(&s.r.lead) }
1267
1268func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) }
1269
1270type confChangeResponse struct {
1271 membs []*membership.Member
1272 err error
1273}
1274
1275// configure sends a configuration change through consensus and
1276// then waits for it to be applied to the server. It
1277// will block until the change is performed or there is an error.
1278func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*membership.Member, error) {
1279 cc.ID = s.reqIDGen.Next()
1280 ch := s.w.Register(cc.ID)
1281 start := time.Now()
1282 if err := s.r.ProposeConfChange(ctx, cc); err != nil {
1283 s.w.Trigger(cc.ID, nil)
1284 return nil, err
1285 }
1286 select {
1287 case x := <-ch:
1288 if x == nil {
1289 plog.Panicf("configure trigger value should never be nil")
1290 }
1291 resp := x.(*confChangeResponse)
1292 return resp.membs, resp.err
1293 case <-ctx.Done():
1294 s.w.Trigger(cc.ID, nil) // GC wait
1295 return nil, s.parseProposeCtxErr(ctx.Err(), start)
1296 case <-s.stopping:
1297 return nil, ErrStopped
1298 }
1299}
1300
1301// sync proposes a SYNC request and is non-blocking.
1302// This makes no guarantee that the request will be proposed or performed.
1303// The request will be canceled after the given timeout.
1304func (s *EtcdServer) sync(timeout time.Duration) {
1305 req := pb.Request{
1306 Method: "SYNC",
1307 ID: s.reqIDGen.Next(),
1308 Time: time.Now().UnixNano(),
1309 }
1310 data := pbutil.MustMarshal(&req)
1311 // There is no promise that node has leader when do SYNC request,
1312 // so it uses goroutine to propose.
1313 ctx, cancel := context.WithTimeout(s.ctx, timeout)
1314 s.goAttach(func() {
1315 s.r.Propose(ctx, data)
1316 cancel()
1317 })
1318}
1319
1320// publish registers server information into the cluster. The information
1321// is the JSON representation of this server's member struct, updated with the
1322// static clientURLs of the server.
1323// The function keeps attempting to register until it succeeds,
1324// or its server is stopped.
1325func (s *EtcdServer) publish(timeout time.Duration) {
1326 b, err := json.Marshal(s.attributes)
1327 if err != nil {
1328 plog.Panicf("json marshal error: %v", err)
1329 return
1330 }
1331 req := pb.Request{
1332 Method: "PUT",
1333 Path: membership.MemberAttributesStorePath(s.id),
1334 Val: string(b),
1335 }
1336
1337 for {
1338 ctx, cancel := context.WithTimeout(s.ctx, timeout)
1339 _, err := s.Do(ctx, req)
1340 cancel()
1341 switch err {
1342 case nil:
1343 close(s.readych)
1344 plog.Infof("published %+v to cluster %s", s.attributes, s.cluster.ID())
1345 return
1346 case ErrStopped:
1347 plog.Infof("aborting publish because server is stopped")
1348 return
1349 default:
1350 plog.Errorf("publish error: %v", err)
1351 }
1352 }
1353}
1354
1355func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
1356 atomic.AddInt64(&s.inflightSnapshots, 1)
1357
1358 s.r.transport.SendSnapshot(merged)
1359 s.goAttach(func() {
1360 select {
1361 case ok := <-merged.CloseNotify():
1362 // delay releasing inflight snapshot for another 30 seconds to
1363 // block log compaction.
1364 // If the follower still fails to catch up, it is probably just too slow
1365 // to catch up. We cannot avoid the snapshot cycle anyway.
1366 if ok {
1367 select {
1368 case <-time.After(releaseDelayAfterSnapshot):
1369 case <-s.stopping:
1370 }
1371 }
1372 atomic.AddInt64(&s.inflightSnapshots, -1)
1373 case <-s.stopping:
1374 return
1375 }
1376 })
1377}
1378
1379// apply takes entries received from Raft (after it has been committed) and
1380// applies them to the current state of the EtcdServer.
1381// The given entries should not be empty.
1382func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (appliedt uint64, appliedi uint64, shouldStop bool) {
1383 for i := range es {
1384 e := es[i]
1385 switch e.Type {
1386 case raftpb.EntryNormal:
1387 s.applyEntryNormal(&e)
1388 case raftpb.EntryConfChange:
1389 // set the consistent index of current executing entry
1390 if e.Index > s.consistIndex.ConsistentIndex() {
1391 s.consistIndex.setConsistentIndex(e.Index)
1392 }
1393 var cc raftpb.ConfChange
1394 pbutil.MustUnmarshal(&cc, e.Data)
1395 removedSelf, err := s.applyConfChange(cc, confState)
1396 s.setAppliedIndex(e.Index)
1397 shouldStop = shouldStop || removedSelf
1398 s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
1399 default:
1400 plog.Panicf("entry type should be either EntryNormal or EntryConfChange")
1401 }
1402 atomic.StoreUint64(&s.r.index, e.Index)
1403 atomic.StoreUint64(&s.r.term, e.Term)
1404 appliedt = e.Term
1405 appliedi = e.Index
1406 }
1407 return appliedt, appliedi, shouldStop
1408}
1409
1410// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
1411func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
1412 shouldApplyV3 := false
1413 if e.Index > s.consistIndex.ConsistentIndex() {
1414 // set the consistent index of current executing entry
1415 s.consistIndex.setConsistentIndex(e.Index)
1416 shouldApplyV3 = true
1417 }
1418 defer s.setAppliedIndex(e.Index)
1419
1420 // raft state machine may generate noop entry when leader confirmation.
1421 // skip it in advance to avoid some potential bug in the future
1422 if len(e.Data) == 0 {
1423 select {
1424 case s.forceVersionC <- struct{}{}:
1425 default:
1426 }
1427 // promote lessor when the local member is leader and finished
1428 // applying all entries from the last term.
1429 if s.isLeader() {
1430 s.lessor.Promote(s.Cfg.electionTimeout())
1431 }
1432 return
1433 }
1434
1435 var raftReq pb.InternalRaftRequest
1436 if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
1437 var r pb.Request
1438 rp := &r
1439 pbutil.MustUnmarshal(rp, e.Data)
1440 s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp)))
1441 return
1442 }
1443 if raftReq.V2 != nil {
1444 req := (*RequestV2)(raftReq.V2)
1445 s.w.Trigger(req.ID, s.applyV2Request(req))
1446 return
1447 }
1448
1449 // do not re-apply applied entries.
1450 if !shouldApplyV3 {
1451 return
1452 }
1453
1454 id := raftReq.ID
1455 if id == 0 {
1456 id = raftReq.Header.ID
1457 }
1458
1459 var ar *applyResult
1460 needResult := s.w.IsRegistered(id)
1461 if needResult || !noSideEffect(&raftReq) {
1462 if !needResult && raftReq.Txn != nil {
1463 removeNeedlessRangeReqs(raftReq.Txn)
1464 }
1465 ar = s.applyV3.Apply(&raftReq)
1466 }
1467
1468 if ar == nil {
1469 return
1470 }
1471
1472 if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
1473 s.w.Trigger(id, ar)
1474 return
1475 }
1476
1477 plog.Errorf("applying raft message exceeded backend quota")
1478 s.goAttach(func() {
1479 a := &pb.AlarmRequest{
1480 MemberID: uint64(s.ID()),
1481 Action: pb.AlarmRequest_ACTIVATE,
1482 Alarm: pb.AlarmType_NOSPACE,
1483 }
1484 s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
1485 s.w.Trigger(id, ar)
1486 })
1487}
1488
1489// applyConfChange applies a ConfChange to the server. It is only
1490// invoked with a ConfChange that has already passed through Raft
1491func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) {
1492 if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
1493 cc.NodeID = raft.None
1494 s.r.ApplyConfChange(cc)
1495 return false, err
1496 }
1497 *confState = *s.r.ApplyConfChange(cc)
1498 switch cc.Type {
1499 case raftpb.ConfChangeAddNode:
1500 m := new(membership.Member)
1501 if err := json.Unmarshal(cc.Context, m); err != nil {
1502 plog.Panicf("unmarshal member should never fail: %v", err)
1503 }
1504 if cc.NodeID != uint64(m.ID) {
1505 plog.Panicf("nodeID should always be equal to member ID")
1506 }
1507 s.cluster.AddMember(m)
1508 if m.ID != s.id {
1509 s.r.transport.AddPeer(m.ID, m.PeerURLs)
1510 }
1511 case raftpb.ConfChangeRemoveNode:
1512 id := types.ID(cc.NodeID)
1513 s.cluster.RemoveMember(id)
1514 if id == s.id {
1515 return true, nil
1516 }
1517 s.r.transport.RemovePeer(id)
1518 case raftpb.ConfChangeUpdateNode:
1519 m := new(membership.Member)
1520 if err := json.Unmarshal(cc.Context, m); err != nil {
1521 plog.Panicf("unmarshal member should never fail: %v", err)
1522 }
1523 if cc.NodeID != uint64(m.ID) {
1524 plog.Panicf("nodeID should always be equal to member ID")
1525 }
1526 s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
1527 if m.ID != s.id {
1528 s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
1529 }
1530 }
1531 return false, nil
1532}
1533
1534// TODO: non-blocking snapshot
1535func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
1536 clone := s.store.Clone()
1537 // commit kv to write metadata (for example: consistent index) to disk.
1538 // KV().commit() updates the consistent index in backend.
1539 // All operations that update consistent index must be called sequentially
1540 // from applyAll function.
1541 // So KV().Commit() cannot run in parallel with apply. It has to be called outside
1542 // the go routine created below.
1543 s.KV().Commit()
1544
1545 s.goAttach(func() {
1546 d, err := clone.SaveNoCopy()
1547 // TODO: current store will never fail to do a snapshot
1548 // what should we do if the store might fail?
1549 if err != nil {
1550 plog.Panicf("store save should never fail: %v", err)
1551 }
1552 snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
1553 if err != nil {
1554 // the snapshot was done asynchronously with the progress of raft.
1555 // raft might have already got a newer snapshot.
1556 if err == raft.ErrSnapOutOfDate {
1557 return
1558 }
1559 plog.Panicf("unexpected create snapshot error %v", err)
1560 }
1561 // SaveSnap saves the snapshot and releases the locked wal files
1562 // to the snapshot index.
1563 if err = s.r.storage.SaveSnap(snap); err != nil {
1564 plog.Fatalf("save snapshot error: %v", err)
1565 }
1566 plog.Infof("saved snapshot at index %d", snap.Metadata.Index)
1567
1568 if err = s.r.storage.Release(snap); err != nil {
1569 plog.Panicf("failed to release wal %v", err)
1570 }
1571
1572 // When sending a snapshot, etcd will pause compaction.
1573 // After receives a snapshot, the slow follower needs to get all the entries right after
1574 // the snapshot sent to catch up. If we do not pause compaction, the log entries right after
1575 // the snapshot sent might already be compacted. It happens when the snapshot takes long time
1576 // to send and save. Pausing compaction avoids triggering a snapshot sending cycle.
1577 if atomic.LoadInt64(&s.inflightSnapshots) != 0 {
1578 plog.Infof("skip compaction since there is an inflight snapshot")
1579 return
1580 }
1581
1582 // keep some in memory log entries for slow followers.
1583 compacti := uint64(1)
1584 if snapi > numberOfCatchUpEntries {
1585 compacti = snapi - numberOfCatchUpEntries
1586 }
1587 err = s.r.raftStorage.Compact(compacti)
1588 if err != nil {
1589 // the compaction was done asynchronously with the progress of raft.
1590 // raft log might already been compact.
1591 if err == raft.ErrCompacted {
1592 return
1593 }
1594 plog.Panicf("unexpected compaction error %v", err)
1595 }
1596 plog.Infof("compacted raft log at %d", compacti)
1597 })
1598}
1599
1600// CutPeer drops messages to the specified peer.
1601func (s *EtcdServer) CutPeer(id types.ID) {
1602 tr, ok := s.r.transport.(*rafthttp.Transport)
1603 if ok {
1604 tr.CutPeer(id)
1605 }
1606}
1607
1608// MendPeer recovers the message dropping behavior of the given peer.
1609func (s *EtcdServer) MendPeer(id types.ID) {
1610 tr, ok := s.r.transport.(*rafthttp.Transport)
1611 if ok {
1612 tr.MendPeer(id)
1613 }
1614}
1615
1616func (s *EtcdServer) PauseSending() { s.r.pauseSending() }
1617
1618func (s *EtcdServer) ResumeSending() { s.r.resumeSending() }
1619
1620func (s *EtcdServer) ClusterVersion() *semver.Version {
1621 if s.cluster == nil {
1622 return nil
1623 }
1624 return s.cluster.Version()
1625}
1626
1627// monitorVersions checks the member's version every monitorVersionInterval.
1628// It updates the cluster version if all members agrees on a higher one.
1629// It prints out log if there is a member with a higher version than the
1630// local version.
1631func (s *EtcdServer) monitorVersions() {
1632 for {
1633 select {
1634 case <-s.forceVersionC:
1635 case <-time.After(monitorVersionInterval):
1636 case <-s.stopping:
1637 return
1638 }
1639
1640 if s.Leader() != s.ID() {
1641 continue
1642 }
1643
1644 v := decideClusterVersion(getVersions(s.cluster, s.id, s.peerRt))
1645 if v != nil {
1646 // only keep major.minor version for comparison
1647 v = &semver.Version{
1648 Major: v.Major,
1649 Minor: v.Minor,
1650 }
1651 }
1652
1653 // if the current version is nil:
1654 // 1. use the decided version if possible
1655 // 2. or use the min cluster version
1656 if s.cluster.Version() == nil {
1657 verStr := version.MinClusterVersion
1658 if v != nil {
1659 verStr = v.String()
1660 }
1661 s.goAttach(func() { s.updateClusterVersion(verStr) })
1662 continue
1663 }
1664
1665 // update cluster version only if the decided version is greater than
1666 // the current cluster version
1667 if v != nil && s.cluster.Version().LessThan(*v) {
1668 s.goAttach(func() { s.updateClusterVersion(v.String()) })
1669 }
1670 }
1671}
1672
1673func (s *EtcdServer) updateClusterVersion(ver string) {
1674 if s.cluster.Version() == nil {
1675 plog.Infof("setting up the initial cluster version to %s", version.Cluster(ver))
1676 } else {
1677 plog.Infof("updating the cluster version from %s to %s", version.Cluster(s.cluster.Version().String()), version.Cluster(ver))
1678 }
1679 req := pb.Request{
1680 Method: "PUT",
1681 Path: membership.StoreClusterVersionKey(),
1682 Val: ver,
1683 }
1684 ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
1685 _, err := s.Do(ctx, req)
1686 cancel()
1687 switch err {
1688 case nil:
1689 return
1690 case ErrStopped:
1691 plog.Infof("aborting update cluster version because server is stopped")
1692 return
1693 default:
1694 plog.Errorf("error updating cluster version (%v)", err)
1695 }
1696}
1697
1698func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
1699 switch err {
1700 case context.Canceled:
1701 return ErrCanceled
1702 case context.DeadlineExceeded:
1703 s.leadTimeMu.RLock()
1704 curLeadElected := s.leadElectedTime
1705 s.leadTimeMu.RUnlock()
1706 prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond)
1707 if start.After(prevLeadLost) && start.Before(curLeadElected) {
1708 return ErrTimeoutDueToLeaderFail
1709 }
1710
1711 lead := types.ID(atomic.LoadUint64(&s.r.lead))
1712 switch lead {
1713 case types.ID(raft.None):
1714 // TODO: return error to specify it happens because the cluster does not have leader now
1715 case s.ID():
1716 if !isConnectedToQuorumSince(s.r.transport, start, s.ID(), s.cluster.Members()) {
1717 return ErrTimeoutDueToConnectionLost
1718 }
1719 default:
1720 if !isConnectedSince(s.r.transport, start, lead) {
1721 return ErrTimeoutDueToConnectionLost
1722 }
1723 }
1724
1725 return ErrTimeout
1726 default:
1727 return err
1728 }
1729}
1730
1731func (s *EtcdServer) KV() mvcc.ConsistentWatchableKV { return s.kv }
1732func (s *EtcdServer) Backend() backend.Backend {
1733 s.bemu.Lock()
1734 defer s.bemu.Unlock()
1735 return s.be
1736}
1737
1738func (s *EtcdServer) AuthStore() auth.AuthStore { return s.authStore }
1739
1740func (s *EtcdServer) restoreAlarms() error {
1741 s.applyV3 = s.newApplierV3()
1742 as, err := alarm.NewAlarmStore(s)
1743 if err != nil {
1744 return err
1745 }
1746 s.alarmStore = as
1747 if len(as.Get(pb.AlarmType_NOSPACE)) > 0 {
1748 s.applyV3 = newApplierV3Capped(s.applyV3)
1749 }
1750 if len(as.Get(pb.AlarmType_CORRUPT)) > 0 {
1751 s.applyV3 = newApplierV3Corrupt(s.applyV3)
1752 }
1753 return nil
1754}
1755
1756func (s *EtcdServer) getAppliedIndex() uint64 {
1757 return atomic.LoadUint64(&s.appliedIndex)
1758}
1759
1760func (s *EtcdServer) setAppliedIndex(v uint64) {
1761 atomic.StoreUint64(&s.appliedIndex, v)
1762}
1763
1764func (s *EtcdServer) getCommittedIndex() uint64 {
1765 return atomic.LoadUint64(&s.committedIndex)
1766}
1767
1768func (s *EtcdServer) setCommittedIndex(v uint64) {
1769 atomic.StoreUint64(&s.committedIndex, v)
1770}
1771
1772// goAttach creates a goroutine on a given function and tracks it using
1773// the etcdserver waitgroup.
1774func (s *EtcdServer) goAttach(f func()) {
1775 s.wgMu.RLock() // this blocks with ongoing close(s.stopping)
1776 defer s.wgMu.RUnlock()
1777 select {
1778 case <-s.stopping:
1779 plog.Warning("server has stopped (skipping goAttach)")
1780 return
1781 default:
1782 }
1783
1784 // now safe to add since waitgroup wait has not started yet
1785 s.wg.Add(1)
1786 go func() {
1787 defer s.wg.Done()
1788 f()
1789 }()
1790}
1791
1792func (s *EtcdServer) Alarms() []*pb.AlarmMember {
1793 return s.alarmStore.Get(pb.AlarmType_NONE)
1794}