blob: 71e2bcf4bc73cc32e5bb61eace95c6806802d58d [file] [log] [blame]
sslobodrd046be82019-01-16 10:02:22 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18 "context"
19 "encoding/json"
20 "expvar"
21 "fmt"
22 "math"
23 "math/rand"
24 "net/http"
25 "os"
26 "path"
27 "regexp"
28 "sync"
29 "sync/atomic"
30 "time"
31
32 "github.com/coreos/etcd/alarm"
33 "github.com/coreos/etcd/auth"
34 "github.com/coreos/etcd/compactor"
35 "github.com/coreos/etcd/discovery"
36 "github.com/coreos/etcd/etcdserver/api"
37 "github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
38 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
39 "github.com/coreos/etcd/etcdserver/membership"
40 "github.com/coreos/etcd/etcdserver/stats"
41 "github.com/coreos/etcd/lease"
42 "github.com/coreos/etcd/lease/leasehttp"
43 "github.com/coreos/etcd/mvcc"
44 "github.com/coreos/etcd/mvcc/backend"
45 "github.com/coreos/etcd/pkg/fileutil"
46 "github.com/coreos/etcd/pkg/idutil"
47 "github.com/coreos/etcd/pkg/pbutil"
48 "github.com/coreos/etcd/pkg/runtime"
49 "github.com/coreos/etcd/pkg/schedule"
50 "github.com/coreos/etcd/pkg/types"
51 "github.com/coreos/etcd/pkg/wait"
52 "github.com/coreos/etcd/raft"
53 "github.com/coreos/etcd/raft/raftpb"
54 "github.com/coreos/etcd/rafthttp"
55 "github.com/coreos/etcd/snap"
56 "github.com/coreos/etcd/store"
57 "github.com/coreos/etcd/version"
58 "github.com/coreos/etcd/wal"
59
60 "github.com/coreos/go-semver/semver"
61 "github.com/coreos/pkg/capnslog"
62 "github.com/prometheus/client_golang/prometheus"
63)
64
65const (
66 DefaultSnapCount = 100000
67
68 StoreClusterPrefix = "/0"
69 StoreKeysPrefix = "/1"
70
71 // HealthInterval is the minimum time the cluster should be healthy
72 // before accepting add member requests.
73 HealthInterval = 5 * time.Second
74
75 purgeFileInterval = 30 * time.Second
76 // monitorVersionInterval should be smaller than the timeout
77 // on the connection. Or we will not be able to reuse the connection
78 // (since it will timeout).
79 monitorVersionInterval = rafthttp.ConnWriteTimeout - time.Second
80
81 // max number of in-flight snapshot messages etcdserver allows to have
82 // This number is more than enough for most clusters with 5 machines.
83 maxInFlightMsgSnap = 16
84
85 releaseDelayAfterSnapshot = 30 * time.Second
86
87 // maxPendingRevokes is the maximum number of outstanding expired lease revocations.
88 maxPendingRevokes = 16
89
90 recommendedMaxRequestBytes = 10 * 1024 * 1024
91)
92
93var (
94 plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "etcdserver")
95
96 storeMemberAttributeRegexp = regexp.MustCompile(path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes"))
97)
98
99func init() {
100 rand.Seed(time.Now().UnixNano())
101
102 expvar.Publish(
103 "file_descriptor_limit",
104 expvar.Func(
105 func() interface{} {
106 n, _ := runtime.FDLimit()
107 return n
108 },
109 ),
110 )
111}
112
113type Response struct {
114 Term uint64
115 Index uint64
116 Event *store.Event
117 Watcher store.Watcher
118 Err error
119}
120
121type ServerV2 interface {
122 Server
123 // Do takes a V2 request and attempts to fulfill it, returning a Response.
124 Do(ctx context.Context, r pb.Request) (Response, error)
125 stats.Stats
126 ClientCertAuthEnabled() bool
127}
128
129type ServerV3 interface {
130 Server
131 ID() types.ID
132 RaftTimer
133}
134
135func (s *EtcdServer) ClientCertAuthEnabled() bool { return s.Cfg.ClientCertAuthEnabled }
136
137type Server interface {
138 // Leader returns the ID of the leader Server.
139 Leader() types.ID
140
141 // AddMember attempts to add a member into the cluster. It will return
142 // ErrIDRemoved if member ID is removed from the cluster, or return
143 // ErrIDExists if member ID exists in the cluster.
144 AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error)
145 // RemoveMember attempts to remove a member from the cluster. It will
146 // return ErrIDRemoved if member ID is removed from the cluster, or return
147 // ErrIDNotFound if member ID is not in the cluster.
148 RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error)
149 // UpdateMember attempts to update an existing member in the cluster. It will
150 // return ErrIDNotFound if the member ID does not exist.
151 UpdateMember(ctx context.Context, updateMemb membership.Member) ([]*membership.Member, error)
152
153 // ClusterVersion is the cluster-wide minimum major.minor version.
154 // Cluster version is set to the min version that an etcd member is
155 // compatible with when first bootstrap.
156 //
157 // ClusterVersion is nil until the cluster is bootstrapped (has a quorum).
158 //
159 // During a rolling upgrades, the ClusterVersion will be updated
160 // automatically after a sync. (5 second by default)
161 //
162 // The API/raft component can utilize ClusterVersion to determine if
163 // it can accept a client request or a raft RPC.
164 // NOTE: ClusterVersion might be nil when etcd 2.1 works with etcd 2.0 and
165 // the leader is etcd 2.0. etcd 2.0 leader will not update clusterVersion since
166 // this feature is introduced post 2.0.
167 ClusterVersion() *semver.Version
168 Cluster() api.Cluster
169 Alarms() []*pb.AlarmMember
170}
171
172// EtcdServer is the production implementation of the Server interface
173type EtcdServer struct {
174 // inflightSnapshots holds count the number of snapshots currently inflight.
175 inflightSnapshots int64 // must use atomic operations to access; keep 64-bit aligned.
176 appliedIndex uint64 // must use atomic operations to access; keep 64-bit aligned.
177 committedIndex uint64 // must use atomic operations to access; keep 64-bit aligned.
178 // consistIndex used to hold the offset of current executing entry
179 // It is initialized to 0 before executing any entry.
180 consistIndex consistentIndex // must use atomic operations to access; keep 64-bit aligned.
181 r raftNode // uses 64-bit atomics; keep 64-bit aligned.
182
183 readych chan struct{}
184 Cfg ServerConfig
185
186 w wait.Wait
187
188 readMu sync.RWMutex
189 // read routine notifies etcd server that it waits for reading by sending an empty struct to
190 // readwaitC
191 readwaitc chan struct{}
192 // readNotifier is used to notify the read routine that it can process the request
193 // when there is no error
194 readNotifier *notifier
195
196 // stop signals the run goroutine should shutdown.
197 stop chan struct{}
198 // stopping is closed by run goroutine on shutdown.
199 stopping chan struct{}
200 // done is closed when all goroutines from start() complete.
201 done chan struct{}
202
203 errorc chan error
204 id types.ID
205 attributes membership.Attributes
206
207 cluster *membership.RaftCluster
208
209 store store.Store
210 snapshotter *snap.Snapshotter
211
212 applyV2 ApplierV2
213
214 // applyV3 is the applier with auth and quotas
215 applyV3 applierV3
216 // applyV3Base is the core applier without auth or quotas
217 applyV3Base applierV3
218 applyWait wait.WaitTime
219
220 kv mvcc.ConsistentWatchableKV
221 lessor lease.Lessor
222 bemu sync.Mutex
223 be backend.Backend
224 authStore auth.AuthStore
225 alarmStore *alarm.AlarmStore
226
227 stats *stats.ServerStats
228 lstats *stats.LeaderStats
229
230 SyncTicker *time.Ticker
231 // compactor is used to auto-compact the KV.
232 compactor compactor.Compactor
233
234 // peerRt used to send requests (version, lease) to peers.
235 peerRt http.RoundTripper
236 reqIDGen *idutil.Generator
237
238 // forceVersionC is used to force the version monitor loop
239 // to detect the cluster version immediately.
240 forceVersionC chan struct{}
241
242 // wgMu blocks concurrent waitgroup mutation while server stopping
243 wgMu sync.RWMutex
244 // wg is used to wait for the go routines that depends on the server state
245 // to exit when stopping the server.
246 wg sync.WaitGroup
247
248 // ctx is used for etcd-initiated requests that may need to be canceled
249 // on etcd server shutdown.
250 ctx context.Context
251 cancel context.CancelFunc
252
253 leadTimeMu sync.RWMutex
254 leadElectedTime time.Time
255}
256
257// NewServer creates a new EtcdServer from the supplied configuration. The
258// configuration is considered static for the lifetime of the EtcdServer.
259func NewServer(cfg ServerConfig) (srv *EtcdServer, err error) {
260 st := store.New(StoreClusterPrefix, StoreKeysPrefix)
261
262 var (
263 w *wal.WAL
264 n raft.Node
265 s *raft.MemoryStorage
266 id types.ID
267 cl *membership.RaftCluster
268 )
269
270 if cfg.MaxRequestBytes > recommendedMaxRequestBytes {
271 plog.Warningf("MaxRequestBytes %v exceeds maximum recommended size %v", cfg.MaxRequestBytes, recommendedMaxRequestBytes)
272 }
273
274 if terr := fileutil.TouchDirAll(cfg.DataDir); terr != nil {
275 return nil, fmt.Errorf("cannot access data directory: %v", terr)
276 }
277
278 haveWAL := wal.Exist(cfg.WALDir())
279
280 if err = fileutil.TouchDirAll(cfg.SnapDir()); err != nil {
281 plog.Fatalf("create snapshot directory error: %v", err)
282 }
283 ss := snap.New(cfg.SnapDir())
284
285 bepath := cfg.backendPath()
286 beExist := fileutil.Exist(bepath)
287 be := openBackend(cfg)
288
289 defer func() {
290 if err != nil {
291 be.Close()
292 }
293 }()
294
295 prt, err := rafthttp.NewRoundTripper(cfg.PeerTLSInfo, cfg.peerDialTimeout())
296 if err != nil {
297 return nil, err
298 }
299 var (
300 remotes []*membership.Member
301 snapshot *raftpb.Snapshot
302 )
303
304 switch {
305 case !haveWAL && !cfg.NewCluster:
306 if err = cfg.VerifyJoinExisting(); err != nil {
307 return nil, err
308 }
309 cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
310 if err != nil {
311 return nil, err
312 }
313 existingCluster, gerr := GetClusterFromRemotePeers(getRemotePeerURLs(cl, cfg.Name), prt)
314 if gerr != nil {
315 return nil, fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr)
316 }
317 if err = membership.ValidateClusterAndAssignIDs(cl, existingCluster); err != nil {
318 return nil, fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
319 }
320 if !isCompatibleWithCluster(cl, cl.MemberByName(cfg.Name).ID, prt) {
321 return nil, fmt.Errorf("incompatible with current running cluster")
322 }
323
324 remotes = existingCluster.Members()
325 cl.SetID(existingCluster.ID())
326 cl.SetStore(st)
327 cl.SetBackend(be)
328 cfg.Print()
329 id, n, s, w = startNode(cfg, cl, nil)
330 case !haveWAL && cfg.NewCluster:
331 if err = cfg.VerifyBootstrap(); err != nil {
332 return nil, err
333 }
334 cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialPeerURLsMap)
335 if err != nil {
336 return nil, err
337 }
338 m := cl.MemberByName(cfg.Name)
339 if isMemberBootstrapped(cl, cfg.Name, prt, cfg.bootstrapTimeout()) {
340 return nil, fmt.Errorf("member %s has already been bootstrapped", m.ID)
341 }
342 if cfg.ShouldDiscover() {
343 var str string
344 str, err = discovery.JoinCluster(cfg.DiscoveryURL, cfg.DiscoveryProxy, m.ID, cfg.InitialPeerURLsMap.String())
345 if err != nil {
346 return nil, &DiscoveryError{Op: "join", Err: err}
347 }
348 var urlsmap types.URLsMap
349 urlsmap, err = types.NewURLsMap(str)
350 if err != nil {
351 return nil, err
352 }
353 if checkDuplicateURL(urlsmap) {
354 return nil, fmt.Errorf("discovery cluster %s has duplicate url", urlsmap)
355 }
356 if cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, urlsmap); err != nil {
357 return nil, err
358 }
359 }
360 cl.SetStore(st)
361 cl.SetBackend(be)
362 cfg.PrintWithInitial()
363 id, n, s, w = startNode(cfg, cl, cl.MemberIDs())
364 case haveWAL:
365 if err = fileutil.IsDirWriteable(cfg.MemberDir()); err != nil {
366 return nil, fmt.Errorf("cannot write to member directory: %v", err)
367 }
368
369 if err = fileutil.IsDirWriteable(cfg.WALDir()); err != nil {
370 return nil, fmt.Errorf("cannot write to WAL directory: %v", err)
371 }
372
373 if cfg.ShouldDiscover() {
374 plog.Warningf("discovery token ignored since a cluster has already been initialized. Valid log found at %q", cfg.WALDir())
375 }
376 snapshot, err = ss.Load()
377 if err != nil && err != snap.ErrNoSnapshot {
378 return nil, err
379 }
380 if snapshot != nil {
381 if err = st.Recovery(snapshot.Data); err != nil {
382 plog.Panicf("recovered store from snapshot error: %v", err)
383 }
384 plog.Infof("recovered store from snapshot at index %d", snapshot.Metadata.Index)
385 if be, err = recoverSnapshotBackend(cfg, be, *snapshot); err != nil {
386 plog.Panicf("recovering backend from snapshot error: %v", err)
387 }
388 }
389 cfg.Print()
390 if !cfg.ForceNewCluster {
391 id, cl, n, s, w = restartNode(cfg, snapshot)
392 } else {
393 id, cl, n, s, w = restartAsStandaloneNode(cfg, snapshot)
394 }
395 cl.SetStore(st)
396 cl.SetBackend(be)
397 cl.Recover(api.UpdateCapability)
398 if cl.Version() != nil && !cl.Version().LessThan(semver.Version{Major: 3}) && !beExist {
399 os.RemoveAll(bepath)
400 return nil, fmt.Errorf("database file (%v) of the backend is missing", bepath)
401 }
402 default:
403 return nil, fmt.Errorf("unsupported bootstrap config")
404 }
405
406 if terr := fileutil.TouchDirAll(cfg.MemberDir()); terr != nil {
407 return nil, fmt.Errorf("cannot access member directory: %v", terr)
408 }
409
410 sstats := stats.NewServerStats(cfg.Name, id.String())
411 lstats := stats.NewLeaderStats(id.String())
412
413 heartbeat := time.Duration(cfg.TickMs) * time.Millisecond
414 srv = &EtcdServer{
415 readych: make(chan struct{}),
416 Cfg: cfg,
417 errorc: make(chan error, 1),
418 store: st,
419 snapshotter: ss,
420 r: *newRaftNode(
421 raftNodeConfig{
422 isIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
423 Node: n,
424 heartbeat: heartbeat,
425 raftStorage: s,
426 storage: NewStorage(w, ss),
427 },
428 ),
429 id: id,
430 attributes: membership.Attributes{Name: cfg.Name, ClientURLs: cfg.ClientURLs.StringSlice()},
431 cluster: cl,
432 stats: sstats,
433 lstats: lstats,
434 SyncTicker: time.NewTicker(500 * time.Millisecond),
435 peerRt: prt,
436 reqIDGen: idutil.NewGenerator(uint16(id), time.Now()),
437 forceVersionC: make(chan struct{}),
438 }
439 serverID.With(prometheus.Labels{"server_id": id.String()}).Set(1)
440
441 srv.applyV2 = &applierV2store{store: srv.store, cluster: srv.cluster}
442
443 srv.be = be
444 minTTL := time.Duration((3*cfg.ElectionTicks)/2) * heartbeat
445
446 // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
447 // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
448 srv.lessor = lease.NewLessor(srv.be, int64(math.Ceil(minTTL.Seconds())))
449 srv.kv = mvcc.New(srv.be, srv.lessor, &srv.consistIndex)
450 if beExist {
451 kvindex := srv.kv.ConsistentIndex()
452 // TODO: remove kvindex != 0 checking when we do not expect users to upgrade
453 // etcd from pre-3.0 release.
454 if snapshot != nil && kvindex < snapshot.Metadata.Index {
455 if kvindex != 0 {
456 return nil, fmt.Errorf("database file (%v index %d) does not match with snapshot (index %d).", bepath, kvindex, snapshot.Metadata.Index)
457 }
458 plog.Warningf("consistent index never saved (snapshot index=%d)", snapshot.Metadata.Index)
459 }
460 }
461 newSrv := srv // since srv == nil in defer if srv is returned as nil
462 defer func() {
463 // closing backend without first closing kv can cause
464 // resumed compactions to fail with closed tx errors
465 if err != nil {
466 newSrv.kv.Close()
467 }
468 }()
469
470 srv.consistIndex.setConsistentIndex(srv.kv.ConsistentIndex())
471 tp, err := auth.NewTokenProvider(cfg.AuthToken,
472 func(index uint64) <-chan struct{} {
473 return srv.applyWait.Wait(index)
474 },
475 )
476 if err != nil {
477 plog.Errorf("failed to create token provider: %s", err)
478 return nil, err
479 }
480 srv.authStore = auth.NewAuthStore(srv.be, tp)
481 if num := cfg.AutoCompactionRetention; num != 0 {
482 srv.compactor, err = compactor.New(cfg.AutoCompactionMode, num, srv.kv, srv)
483 if err != nil {
484 return nil, err
485 }
486 srv.compactor.Run()
487 }
488
489 srv.applyV3Base = srv.newApplierV3Backend()
490 if err = srv.restoreAlarms(); err != nil {
491 return nil, err
492 }
493
494 // TODO: move transport initialization near the definition of remote
495 tr := &rafthttp.Transport{
496 TLSInfo: cfg.PeerTLSInfo,
497 DialTimeout: cfg.peerDialTimeout(),
498 ID: id,
499 URLs: cfg.PeerURLs,
500 ClusterID: cl.ID(),
501 Raft: srv,
502 Snapshotter: ss,
503 ServerStats: sstats,
504 LeaderStats: lstats,
505 ErrorC: srv.errorc,
506 }
507 if err = tr.Start(); err != nil {
508 return nil, err
509 }
510 // add all remotes into transport
511 for _, m := range remotes {
512 if m.ID != id {
513 tr.AddRemote(m.ID, m.PeerURLs)
514 }
515 }
516 for _, m := range cl.Members() {
517 if m.ID != id {
518 tr.AddPeer(m.ID, m.PeerURLs)
519 }
520 }
521 srv.r.transport = tr
522
523 return srv, nil
524}
525
526func (s *EtcdServer) adjustTicks() {
527 clusterN := len(s.cluster.Members())
528
529 // single-node fresh start, or single-node recovers from snapshot
530 if clusterN == 1 {
531 ticks := s.Cfg.ElectionTicks - 1
532 plog.Infof("%s as single-node; fast-forwarding %d ticks (election ticks %d)", s.ID(), ticks, s.Cfg.ElectionTicks)
533 s.r.advanceTicks(ticks)
534 return
535 }
536
537 if !s.Cfg.InitialElectionTickAdvance {
538 plog.Infof("skipping initial election tick advance (election tick %d)", s.Cfg.ElectionTicks)
539 return
540 }
541
542 // retry up to "rafthttp.ConnReadTimeout", which is 5-sec
543 // until peer connection reports; otherwise:
544 // 1. all connections failed, or
545 // 2. no active peers, or
546 // 3. restarted single-node with no snapshot
547 // then, do nothing, because advancing ticks would have no effect
548 waitTime := rafthttp.ConnReadTimeout
549 itv := 50 * time.Millisecond
550 for i := int64(0); i < int64(waitTime/itv); i++ {
551 select {
552 case <-time.After(itv):
553 case <-s.stopping:
554 return
555 }
556
557 peerN := s.r.transport.ActivePeers()
558 if peerN > 1 {
559 // multi-node received peer connection reports
560 // adjust ticks, in case slow leader message receive
561 ticks := s.Cfg.ElectionTicks - 2
562 plog.Infof("%s initialzed peer connection; fast-forwarding %d ticks (election ticks %d) with %d active peer(s)", s.ID(), ticks, s.Cfg.ElectionTicks, peerN)
563 s.r.advanceTicks(ticks)
564 return
565 }
566 }
567}
568
569// Start performs any initialization of the Server necessary for it to
570// begin serving requests. It must be called before Do or Process.
571// Start must be non-blocking; any long-running server functionality
572// should be implemented in goroutines.
573func (s *EtcdServer) Start() {
574 s.start()
575 s.goAttach(func() { s.adjustTicks() })
576 s.goAttach(func() { s.publish(s.Cfg.ReqTimeout()) })
577 s.goAttach(s.purgeFile)
578 s.goAttach(func() { monitorFileDescriptor(s.stopping) })
579 s.goAttach(s.monitorVersions)
580 s.goAttach(s.linearizableReadLoop)
581 s.goAttach(s.monitorKVHash)
582}
583
584// start prepares and starts server in a new goroutine. It is no longer safe to
585// modify a server's fields after it has been sent to Start.
586// This function is just used for testing.
587func (s *EtcdServer) start() {
588 if s.Cfg.SnapCount == 0 {
589 plog.Infof("set snapshot count to default %d", DefaultSnapCount)
590 s.Cfg.SnapCount = DefaultSnapCount
591 }
592 s.w = wait.New()
593 s.applyWait = wait.NewTimeList()
594 s.done = make(chan struct{})
595 s.stop = make(chan struct{})
596 s.stopping = make(chan struct{})
597 s.ctx, s.cancel = context.WithCancel(context.Background())
598 s.readwaitc = make(chan struct{}, 1)
599 s.readNotifier = newNotifier()
600 if s.ClusterVersion() != nil {
601 plog.Infof("starting server... [version: %v, cluster version: %v]", version.Version, version.Cluster(s.ClusterVersion().String()))
602 } else {
603 plog.Infof("starting server... [version: %v, cluster version: to_be_decided]", version.Version)
604 }
605 // TODO: if this is an empty log, writes all peer infos
606 // into the first entry
607 go s.run()
608}
609
610func (s *EtcdServer) purgeFile() {
611 var dberrc, serrc, werrc <-chan error
612 if s.Cfg.MaxSnapFiles > 0 {
613 dberrc = fileutil.PurgeFile(s.Cfg.SnapDir(), "snap.db", s.Cfg.MaxSnapFiles, purgeFileInterval, s.done)
614 serrc = fileutil.PurgeFile(s.Cfg.SnapDir(), "snap", s.Cfg.MaxSnapFiles, purgeFileInterval, s.done)
615 }
616 if s.Cfg.MaxWALFiles > 0 {
617 werrc = fileutil.PurgeFile(s.Cfg.WALDir(), "wal", s.Cfg.MaxWALFiles, purgeFileInterval, s.done)
618 }
619 select {
620 case e := <-dberrc:
621 plog.Fatalf("failed to purge snap db file %v", e)
622 case e := <-serrc:
623 plog.Fatalf("failed to purge snap file %v", e)
624 case e := <-werrc:
625 plog.Fatalf("failed to purge wal file %v", e)
626 case <-s.stopping:
627 return
628 }
629}
630
631func (s *EtcdServer) ID() types.ID { return s.id }
632
633func (s *EtcdServer) Cluster() api.Cluster { return s.cluster }
634
635func (s *EtcdServer) ApplyWait() <-chan struct{} { return s.applyWait.Wait(s.getCommittedIndex()) }
636
637type ServerPeer interface {
638 ServerV2
639 RaftHandler() http.Handler
640 LeaseHandler() http.Handler
641}
642
643func (s *EtcdServer) LeaseHandler() http.Handler {
644 if s.lessor == nil {
645 return nil
646 }
647 return leasehttp.NewHandler(s.lessor, s.ApplyWait)
648}
649
650func (s *EtcdServer) RaftHandler() http.Handler { return s.r.transport.Handler() }
651
652// Process takes a raft message and applies it to the server's raft state
653// machine, respecting any timeout of the given context.
654func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
655 if s.cluster.IsIDRemoved(types.ID(m.From)) {
656 plog.Warningf("reject message from removed member %s", types.ID(m.From).String())
657 return httptypes.NewHTTPError(http.StatusForbidden, "cannot process message from removed member")
658 }
659 if m.Type == raftpb.MsgApp {
660 s.stats.RecvAppendReq(types.ID(m.From).String(), m.Size())
661 }
662 return s.r.Step(ctx, m)
663}
664
665func (s *EtcdServer) IsIDRemoved(id uint64) bool { return s.cluster.IsIDRemoved(types.ID(id)) }
666
667func (s *EtcdServer) ReportUnreachable(id uint64) { s.r.ReportUnreachable(id) }
668
669// ReportSnapshot reports snapshot sent status to the raft state machine,
670// and clears the used snapshot from the snapshot store.
671func (s *EtcdServer) ReportSnapshot(id uint64, status raft.SnapshotStatus) {
672 s.r.ReportSnapshot(id, status)
673}
674
675type etcdProgress struct {
676 confState raftpb.ConfState
677 snapi uint64
678 appliedt uint64
679 appliedi uint64
680}
681
682// raftReadyHandler contains a set of EtcdServer operations to be called by raftNode,
683// and helps decouple state machine logic from Raft algorithms.
684// TODO: add a state machine interface to apply the commit entries and do snapshot/recover
685type raftReadyHandler struct {
686 updateLeadership func(newLeader bool)
687 updateCommittedIndex func(uint64)
688}
689
690func (s *EtcdServer) run() {
691 sn, err := s.r.raftStorage.Snapshot()
692 if err != nil {
693 plog.Panicf("get snapshot from raft storage error: %v", err)
694 }
695
696 // asynchronously accept apply packets, dispatch progress in-order
697 sched := schedule.NewFIFOScheduler()
698
699 var (
700 smu sync.RWMutex
701 syncC <-chan time.Time
702 )
703 setSyncC := func(ch <-chan time.Time) {
704 smu.Lock()
705 syncC = ch
706 smu.Unlock()
707 }
708 getSyncC := func() (ch <-chan time.Time) {
709 smu.RLock()
710 ch = syncC
711 smu.RUnlock()
712 return
713 }
714 rh := &raftReadyHandler{
715 updateLeadership: func(newLeader bool) {
716 if !s.isLeader() {
717 if s.lessor != nil {
718 s.lessor.Demote()
719 }
720 if s.compactor != nil {
721 s.compactor.Pause()
722 }
723 setSyncC(nil)
724 } else {
725 if newLeader {
726 t := time.Now()
727 s.leadTimeMu.Lock()
728 s.leadElectedTime = t
729 s.leadTimeMu.Unlock()
730 }
731 setSyncC(s.SyncTicker.C)
732 if s.compactor != nil {
733 s.compactor.Resume()
734 }
735 }
736
737 // TODO: remove the nil checking
738 // current test utility does not provide the stats
739 if s.stats != nil {
740 s.stats.BecomeLeader()
741 }
742 },
743 updateCommittedIndex: func(ci uint64) {
744 cci := s.getCommittedIndex()
745 if ci > cci {
746 s.setCommittedIndex(ci)
747 }
748 },
749 }
750 s.r.start(rh)
751
752 ep := etcdProgress{
753 confState: sn.Metadata.ConfState,
754 snapi: sn.Metadata.Index,
755 appliedt: sn.Metadata.Term,
756 appliedi: sn.Metadata.Index,
757 }
758
759 defer func() {
760 s.wgMu.Lock() // block concurrent waitgroup adds in goAttach while stopping
761 close(s.stopping)
762 s.wgMu.Unlock()
763 s.cancel()
764
765 sched.Stop()
766
767 // wait for gouroutines before closing raft so wal stays open
768 s.wg.Wait()
769
770 s.SyncTicker.Stop()
771
772 // must stop raft after scheduler-- etcdserver can leak rafthttp pipelines
773 // by adding a peer after raft stops the transport
774 s.r.stop()
775
776 // kv, lessor and backend can be nil if running without v3 enabled
777 // or running unit tests.
778 if s.lessor != nil {
779 s.lessor.Stop()
780 }
781 if s.kv != nil {
782 s.kv.Close()
783 }
784 if s.authStore != nil {
785 s.authStore.Close()
786 }
787 if s.be != nil {
788 s.be.Close()
789 }
790 if s.compactor != nil {
791 s.compactor.Stop()
792 }
793 close(s.done)
794 }()
795
796 var expiredLeaseC <-chan []*lease.Lease
797 if s.lessor != nil {
798 expiredLeaseC = s.lessor.ExpiredLeasesC()
799 }
800
801 for {
802 select {
803 case ap := <-s.r.apply():
804 f := func(context.Context) { s.applyAll(&ep, &ap) }
805 sched.Schedule(f)
806 case leases := <-expiredLeaseC:
807 s.goAttach(func() {
808 // Increases throughput of expired leases deletion process through parallelization
809 c := make(chan struct{}, maxPendingRevokes)
810 for _, lease := range leases {
811 select {
812 case c <- struct{}{}:
813 case <-s.stopping:
814 return
815 }
816 lid := lease.ID
817 s.goAttach(func() {
818 ctx := s.authStore.WithRoot(s.ctx)
819 _, lerr := s.LeaseRevoke(ctx, &pb.LeaseRevokeRequest{ID: int64(lid)})
820 if lerr == nil {
821 leaseExpired.Inc()
822 } else {
823 plog.Warningf("failed to revoke %016x (%q)", lid, lerr.Error())
824 }
825
826 <-c
827 })
828 }
829 })
830 case err := <-s.errorc:
831 plog.Errorf("%s", err)
832 plog.Infof("the data-dir used by this member must be removed.")
833 return
834 case <-getSyncC():
835 if s.store.HasTTLKeys() {
836 s.sync(s.Cfg.ReqTimeout())
837 }
838 case <-s.stop:
839 return
840 }
841 }
842}
843
844func (s *EtcdServer) applyAll(ep *etcdProgress, apply *apply) {
845 s.applySnapshot(ep, apply)
846 s.applyEntries(ep, apply)
847
848 proposalsApplied.Set(float64(ep.appliedi))
849 s.applyWait.Trigger(ep.appliedi)
850 // wait for the raft routine to finish the disk writes before triggering a
851 // snapshot. or applied index might be greater than the last index in raft
852 // storage, since the raft routine might be slower than apply routine.
853 <-apply.notifyc
854
855 s.triggerSnapshot(ep)
856 select {
857 // snapshot requested via send()
858 case m := <-s.r.msgSnapC:
859 merged := s.createMergedSnapshotMessage(m, ep.appliedt, ep.appliedi, ep.confState)
860 s.sendMergedSnap(merged)
861 default:
862 }
863}
864
865func (s *EtcdServer) applySnapshot(ep *etcdProgress, apply *apply) {
866 if raft.IsEmptySnap(apply.snapshot) {
867 return
868 }
869
870 plog.Infof("applying snapshot at index %d...", ep.snapi)
871 defer plog.Infof("finished applying incoming snapshot at index %d", ep.snapi)
872
873 if apply.snapshot.Metadata.Index <= ep.appliedi {
874 plog.Panicf("snapshot index [%d] should > appliedi[%d] + 1",
875 apply.snapshot.Metadata.Index, ep.appliedi)
876 }
877
878 // wait for raftNode to persist snapshot onto the disk
879 <-apply.notifyc
880
881 newbe, err := openSnapshotBackend(s.Cfg, s.snapshotter, apply.snapshot)
882 if err != nil {
883 plog.Panic(err)
884 }
885
886 // always recover lessor before kv. When we recover the mvcc.KV it will reattach keys to its leases.
887 // If we recover mvcc.KV first, it will attach the keys to the wrong lessor before it recovers.
888 if s.lessor != nil {
889 plog.Info("recovering lessor...")
890 s.lessor.Recover(newbe, func() lease.TxnDelete { return s.kv.Write() })
891 plog.Info("finished recovering lessor")
892 }
893
894 plog.Info("restoring mvcc store...")
895
896 if err := s.kv.Restore(newbe); err != nil {
897 plog.Panicf("restore KV error: %v", err)
898 }
899 s.consistIndex.setConsistentIndex(s.kv.ConsistentIndex())
900
901 plog.Info("finished restoring mvcc store")
902
903 // Closing old backend might block until all the txns
904 // on the backend are finished.
905 // We do not want to wait on closing the old backend.
906 s.bemu.Lock()
907 oldbe := s.be
908 go func() {
909 plog.Info("closing old backend...")
910 defer plog.Info("finished closing old backend")
911
912 if err := oldbe.Close(); err != nil {
913 plog.Panicf("close backend error: %v", err)
914 }
915 }()
916
917 s.be = newbe
918 s.bemu.Unlock()
919
920 plog.Info("recovering alarms...")
921 if err := s.restoreAlarms(); err != nil {
922 plog.Panicf("restore alarms error: %v", err)
923 }
924 plog.Info("finished recovering alarms")
925
926 if s.authStore != nil {
927 plog.Info("recovering auth store...")
928 s.authStore.Recover(newbe)
929 plog.Info("finished recovering auth store")
930 }
931
932 plog.Info("recovering store v2...")
933 if err := s.store.Recovery(apply.snapshot.Data); err != nil {
934 plog.Panicf("recovery store error: %v", err)
935 }
936 plog.Info("finished recovering store v2")
937
938 s.cluster.SetBackend(s.be)
939 plog.Info("recovering cluster configuration...")
940 s.cluster.Recover(api.UpdateCapability)
941 plog.Info("finished recovering cluster configuration")
942
943 plog.Info("removing old peers from network...")
944 // recover raft transport
945 s.r.transport.RemoveAllPeers()
946 plog.Info("finished removing old peers from network")
947
948 plog.Info("adding peers from new cluster configuration into network...")
949 for _, m := range s.cluster.Members() {
950 if m.ID == s.ID() {
951 continue
952 }
953 s.r.transport.AddPeer(m.ID, m.PeerURLs)
954 }
955 plog.Info("finished adding peers from new cluster configuration into network...")
956
957 ep.appliedt = apply.snapshot.Metadata.Term
958 ep.appliedi = apply.snapshot.Metadata.Index
959 ep.snapi = ep.appliedi
960 ep.confState = apply.snapshot.Metadata.ConfState
961}
962
963func (s *EtcdServer) applyEntries(ep *etcdProgress, apply *apply) {
964 if len(apply.entries) == 0 {
965 return
966 }
967 firsti := apply.entries[0].Index
968 if firsti > ep.appliedi+1 {
969 plog.Panicf("first index of committed entry[%d] should <= appliedi[%d] + 1", firsti, ep.appliedi)
970 }
971 var ents []raftpb.Entry
972 if ep.appliedi+1-firsti < uint64(len(apply.entries)) {
973 ents = apply.entries[ep.appliedi+1-firsti:]
974 }
975 if len(ents) == 0 {
976 return
977 }
978 var shouldstop bool
979 if ep.appliedt, ep.appliedi, shouldstop = s.apply(ents, &ep.confState); shouldstop {
980 go s.stopWithDelay(10*100*time.Millisecond, fmt.Errorf("the member has been permanently removed from the cluster"))
981 }
982}
983
984func (s *EtcdServer) triggerSnapshot(ep *etcdProgress) {
985 if ep.appliedi-ep.snapi <= s.Cfg.SnapCount {
986 return
987 }
988
989 plog.Infof("start to snapshot (applied: %d, lastsnap: %d)", ep.appliedi, ep.snapi)
990 s.snapshot(ep.appliedi, ep.confState)
991 ep.snapi = ep.appliedi
992}
993
994func (s *EtcdServer) isMultiNode() bool {
995 return s.cluster != nil && len(s.cluster.MemberIDs()) > 1
996}
997
998func (s *EtcdServer) isLeader() bool {
999 return uint64(s.ID()) == s.Lead()
1000}
1001
1002// MoveLeader transfers the leader to the given transferee.
1003func (s *EtcdServer) MoveLeader(ctx context.Context, lead, transferee uint64) error {
1004 now := time.Now()
1005 interval := time.Duration(s.Cfg.TickMs) * time.Millisecond
1006
1007 plog.Infof("%s starts leadership transfer from %s to %s", s.ID(), types.ID(lead), types.ID(transferee))
1008 s.r.TransferLeadership(ctx, lead, transferee)
1009 for s.Lead() != transferee {
1010 select {
1011 case <-ctx.Done(): // time out
1012 return ErrTimeoutLeaderTransfer
1013 case <-time.After(interval):
1014 }
1015 }
1016
1017 // TODO: drain all requests, or drop all messages to the old leader
1018
1019 plog.Infof("%s finished leadership transfer from %s to %s (took %v)", s.ID(), types.ID(lead), types.ID(transferee), time.Since(now))
1020 return nil
1021}
1022
1023// TransferLeadership transfers the leader to the chosen transferee.
1024func (s *EtcdServer) TransferLeadership() error {
1025 if !s.isLeader() {
1026 plog.Printf("skipped leadership transfer for stopping non-leader member")
1027 return nil
1028 }
1029
1030 if !s.isMultiNode() {
1031 plog.Printf("skipped leadership transfer for single member cluster")
1032 return nil
1033 }
1034
1035 transferee, ok := longestConnected(s.r.transport, s.cluster.MemberIDs())
1036 if !ok {
1037 return ErrUnhealthy
1038 }
1039
1040 tm := s.Cfg.ReqTimeout()
1041 ctx, cancel := context.WithTimeout(s.ctx, tm)
1042 err := s.MoveLeader(ctx, s.Lead(), uint64(transferee))
1043 cancel()
1044 return err
1045}
1046
1047// HardStop stops the server without coordination with other members in the cluster.
1048func (s *EtcdServer) HardStop() {
1049 select {
1050 case s.stop <- struct{}{}:
1051 case <-s.done:
1052 return
1053 }
1054 <-s.done
1055}
1056
1057// Stop stops the server gracefully, and shuts down the running goroutine.
1058// Stop should be called after a Start(s), otherwise it will block forever.
1059// When stopping leader, Stop transfers its leadership to one of its peers
1060// before stopping the server.
1061// Stop terminates the Server and performs any necessary finalization.
1062// Do and Process cannot be called after Stop has been invoked.
1063func (s *EtcdServer) Stop() {
1064 if err := s.TransferLeadership(); err != nil {
1065 plog.Warningf("%s failed to transfer leadership (%v)", s.ID(), err)
1066 }
1067 s.HardStop()
1068}
1069
1070// ReadyNotify returns a channel that will be closed when the server
1071// is ready to serve client requests
1072func (s *EtcdServer) ReadyNotify() <-chan struct{} { return s.readych }
1073
1074func (s *EtcdServer) stopWithDelay(d time.Duration, err error) {
1075 select {
1076 case <-time.After(d):
1077 case <-s.done:
1078 }
1079 select {
1080 case s.errorc <- err:
1081 default:
1082 }
1083}
1084
1085// StopNotify returns a channel that receives a empty struct
1086// when the server is stopped.
1087func (s *EtcdServer) StopNotify() <-chan struct{} { return s.done }
1088
1089func (s *EtcdServer) SelfStats() []byte { return s.stats.JSON() }
1090
1091func (s *EtcdServer) LeaderStats() []byte {
1092 lead := atomic.LoadUint64(&s.r.lead)
1093 if lead != uint64(s.id) {
1094 return nil
1095 }
1096 return s.lstats.JSON()
1097}
1098
1099func (s *EtcdServer) StoreStats() []byte { return s.store.JsonStats() }
1100
1101func (s *EtcdServer) checkMembershipOperationPermission(ctx context.Context) error {
1102 if s.authStore == nil {
1103 // In the context of ordinary etcd process, s.authStore will never be nil.
1104 // This branch is for handling cases in server_test.go
1105 return nil
1106 }
1107
1108 // Note that this permission check is done in the API layer,
1109 // so TOCTOU problem can be caused potentially in a schedule like this:
1110 // update membership with user A -> revoke root role of A -> apply membership change
1111 // in the state machine layer
1112 // However, both of membership change and role management requires the root privilege.
1113 // So careful operation by admins can prevent the problem.
1114 authInfo, err := s.AuthInfoFromCtx(ctx)
1115 if err != nil {
1116 return err
1117 }
1118
1119 return s.AuthStore().IsAdminPermitted(authInfo)
1120}
1121
1122func (s *EtcdServer) AddMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
1123 if err := s.checkMembershipOperationPermission(ctx); err != nil {
1124 return nil, err
1125 }
1126
1127 if s.Cfg.StrictReconfigCheck {
1128 // by default StrictReconfigCheck is enabled; reject new members if unhealthy
1129 if !s.cluster.IsReadyToAddNewMember() {
1130 plog.Warningf("not enough started members, rejecting member add %+v", memb)
1131 return nil, ErrNotEnoughStartedMembers
1132 }
1133 if !isConnectedFullySince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), s.cluster.Members()) {
1134 plog.Warningf("not healthy for reconfigure, rejecting member add %+v", memb)
1135 return nil, ErrUnhealthy
1136 }
1137 }
1138
1139 // TODO: move Member to protobuf type
1140 b, err := json.Marshal(memb)
1141 if err != nil {
1142 return nil, err
1143 }
1144 cc := raftpb.ConfChange{
1145 Type: raftpb.ConfChangeAddNode,
1146 NodeID: uint64(memb.ID),
1147 Context: b,
1148 }
1149 return s.configure(ctx, cc)
1150}
1151
1152func (s *EtcdServer) RemoveMember(ctx context.Context, id uint64) ([]*membership.Member, error) {
1153 if err := s.checkMembershipOperationPermission(ctx); err != nil {
1154 return nil, err
1155 }
1156
1157 // by default StrictReconfigCheck is enabled; reject removal if leads to quorum loss
1158 if err := s.mayRemoveMember(types.ID(id)); err != nil {
1159 return nil, err
1160 }
1161
1162 cc := raftpb.ConfChange{
1163 Type: raftpb.ConfChangeRemoveNode,
1164 NodeID: id,
1165 }
1166 return s.configure(ctx, cc)
1167}
1168
1169func (s *EtcdServer) mayRemoveMember(id types.ID) error {
1170 if !s.Cfg.StrictReconfigCheck {
1171 return nil
1172 }
1173
1174 if !s.cluster.IsReadyToRemoveMember(uint64(id)) {
1175 plog.Warningf("not enough started members, rejecting remove member %s", id)
1176 return ErrNotEnoughStartedMembers
1177 }
1178
1179 // downed member is safe to remove since it's not part of the active quorum
1180 if t := s.r.transport.ActiveSince(id); id != s.ID() && t.IsZero() {
1181 return nil
1182 }
1183
1184 // protect quorum if some members are down
1185 m := s.cluster.Members()
1186 active := numConnectedSince(s.r.transport, time.Now().Add(-HealthInterval), s.ID(), m)
1187 if (active - 1) < 1+((len(m)-1)/2) {
1188 plog.Warningf("reconfigure breaks active quorum, rejecting remove member %s", id)
1189 return ErrUnhealthy
1190 }
1191
1192 return nil
1193}
1194
1195func (s *EtcdServer) UpdateMember(ctx context.Context, memb membership.Member) ([]*membership.Member, error) {
1196 b, merr := json.Marshal(memb)
1197 if merr != nil {
1198 return nil, merr
1199 }
1200
1201 if err := s.checkMembershipOperationPermission(ctx); err != nil {
1202 return nil, err
1203 }
1204 cc := raftpb.ConfChange{
1205 Type: raftpb.ConfChangeUpdateNode,
1206 NodeID: uint64(memb.ID),
1207 Context: b,
1208 }
1209 return s.configure(ctx, cc)
1210}
1211
1212// Implement the RaftTimer interface
1213
1214func (s *EtcdServer) Index() uint64 { return atomic.LoadUint64(&s.r.index) }
1215
1216func (s *EtcdServer) Term() uint64 { return atomic.LoadUint64(&s.r.term) }
1217
1218// Lead is only for testing purposes.
1219// TODO: add Raft server interface to expose raft related info:
1220// Index, Term, Lead, Committed, Applied, LastIndex, etc.
1221func (s *EtcdServer) Lead() uint64 { return atomic.LoadUint64(&s.r.lead) }
1222
1223func (s *EtcdServer) Leader() types.ID { return types.ID(s.Lead()) }
1224
1225type confChangeResponse struct {
1226 membs []*membership.Member
1227 err error
1228}
1229
1230// configure sends a configuration change through consensus and
1231// then waits for it to be applied to the server. It
1232// will block until the change is performed or there is an error.
1233func (s *EtcdServer) configure(ctx context.Context, cc raftpb.ConfChange) ([]*membership.Member, error) {
1234 cc.ID = s.reqIDGen.Next()
1235 ch := s.w.Register(cc.ID)
1236 start := time.Now()
1237 if err := s.r.ProposeConfChange(ctx, cc); err != nil {
1238 s.w.Trigger(cc.ID, nil)
1239 return nil, err
1240 }
1241 select {
1242 case x := <-ch:
1243 if x == nil {
1244 plog.Panicf("configure trigger value should never be nil")
1245 }
1246 resp := x.(*confChangeResponse)
1247 return resp.membs, resp.err
1248 case <-ctx.Done():
1249 s.w.Trigger(cc.ID, nil) // GC wait
1250 return nil, s.parseProposeCtxErr(ctx.Err(), start)
1251 case <-s.stopping:
1252 return nil, ErrStopped
1253 }
1254}
1255
1256// sync proposes a SYNC request and is non-blocking.
1257// This makes no guarantee that the request will be proposed or performed.
1258// The request will be canceled after the given timeout.
1259func (s *EtcdServer) sync(timeout time.Duration) {
1260 req := pb.Request{
1261 Method: "SYNC",
1262 ID: s.reqIDGen.Next(),
1263 Time: time.Now().UnixNano(),
1264 }
1265 data := pbutil.MustMarshal(&req)
1266 // There is no promise that node has leader when do SYNC request,
1267 // so it uses goroutine to propose.
1268 ctx, cancel := context.WithTimeout(s.ctx, timeout)
1269 s.goAttach(func() {
1270 s.r.Propose(ctx, data)
1271 cancel()
1272 })
1273}
1274
1275// publish registers server information into the cluster. The information
1276// is the JSON representation of this server's member struct, updated with the
1277// static clientURLs of the server.
1278// The function keeps attempting to register until it succeeds,
1279// or its server is stopped.
1280func (s *EtcdServer) publish(timeout time.Duration) {
1281 b, err := json.Marshal(s.attributes)
1282 if err != nil {
1283 plog.Panicf("json marshal error: %v", err)
1284 return
1285 }
1286 req := pb.Request{
1287 Method: "PUT",
1288 Path: membership.MemberAttributesStorePath(s.id),
1289 Val: string(b),
1290 }
1291
1292 for {
1293 ctx, cancel := context.WithTimeout(s.ctx, timeout)
1294 _, err := s.Do(ctx, req)
1295 cancel()
1296 switch err {
1297 case nil:
1298 close(s.readych)
1299 plog.Infof("published %+v to cluster %s", s.attributes, s.cluster.ID())
1300 return
1301 case ErrStopped:
1302 plog.Infof("aborting publish because server is stopped")
1303 return
1304 default:
1305 plog.Errorf("publish error: %v", err)
1306 }
1307 }
1308}
1309
1310func (s *EtcdServer) sendMergedSnap(merged snap.Message) {
1311 atomic.AddInt64(&s.inflightSnapshots, 1)
1312
1313 s.r.transport.SendSnapshot(merged)
1314 s.goAttach(func() {
1315 select {
1316 case ok := <-merged.CloseNotify():
1317 // delay releasing inflight snapshot for another 30 seconds to
1318 // block log compaction.
1319 // If the follower still fails to catch up, it is probably just too slow
1320 // to catch up. We cannot avoid the snapshot cycle anyway.
1321 if ok {
1322 select {
1323 case <-time.After(releaseDelayAfterSnapshot):
1324 case <-s.stopping:
1325 }
1326 }
1327 atomic.AddInt64(&s.inflightSnapshots, -1)
1328 case <-s.stopping:
1329 return
1330 }
1331 })
1332}
1333
1334// apply takes entries received from Raft (after it has been committed) and
1335// applies them to the current state of the EtcdServer.
1336// The given entries should not be empty.
1337func (s *EtcdServer) apply(es []raftpb.Entry, confState *raftpb.ConfState) (appliedt uint64, appliedi uint64, shouldStop bool) {
1338 for i := range es {
1339 e := es[i]
1340 switch e.Type {
1341 case raftpb.EntryNormal:
1342 s.applyEntryNormal(&e)
1343 case raftpb.EntryConfChange:
1344 // set the consistent index of current executing entry
1345 if e.Index > s.consistIndex.ConsistentIndex() {
1346 s.consistIndex.setConsistentIndex(e.Index)
1347 }
1348 var cc raftpb.ConfChange
1349 pbutil.MustUnmarshal(&cc, e.Data)
1350 removedSelf, err := s.applyConfChange(cc, confState)
1351 s.setAppliedIndex(e.Index)
1352 shouldStop = shouldStop || removedSelf
1353 s.w.Trigger(cc.ID, &confChangeResponse{s.cluster.Members(), err})
1354 default:
1355 plog.Panicf("entry type should be either EntryNormal or EntryConfChange")
1356 }
1357 atomic.StoreUint64(&s.r.index, e.Index)
1358 atomic.StoreUint64(&s.r.term, e.Term)
1359 appliedt = e.Term
1360 appliedi = e.Index
1361 }
1362 return appliedt, appliedi, shouldStop
1363}
1364
1365// applyEntryNormal apples an EntryNormal type raftpb request to the EtcdServer
1366func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry) {
1367 shouldApplyV3 := false
1368 if e.Index > s.consistIndex.ConsistentIndex() {
1369 // set the consistent index of current executing entry
1370 s.consistIndex.setConsistentIndex(e.Index)
1371 shouldApplyV3 = true
1372 }
1373 defer s.setAppliedIndex(e.Index)
1374
1375 // raft state machine may generate noop entry when leader confirmation.
1376 // skip it in advance to avoid some potential bug in the future
1377 if len(e.Data) == 0 {
1378 select {
1379 case s.forceVersionC <- struct{}{}:
1380 default:
1381 }
1382 // promote lessor when the local member is leader and finished
1383 // applying all entries from the last term.
1384 if s.isLeader() {
1385 s.lessor.Promote(s.Cfg.electionTimeout())
1386 }
1387 return
1388 }
1389
1390 var raftReq pb.InternalRaftRequest
1391 if !pbutil.MaybeUnmarshal(&raftReq, e.Data) { // backward compatible
1392 var r pb.Request
1393 rp := &r
1394 pbutil.MustUnmarshal(rp, e.Data)
1395 s.w.Trigger(r.ID, s.applyV2Request((*RequestV2)(rp)))
1396 return
1397 }
1398 if raftReq.V2 != nil {
1399 req := (*RequestV2)(raftReq.V2)
1400 s.w.Trigger(req.ID, s.applyV2Request(req))
1401 return
1402 }
1403
1404 // do not re-apply applied entries.
1405 if !shouldApplyV3 {
1406 return
1407 }
1408
1409 id := raftReq.ID
1410 if id == 0 {
1411 id = raftReq.Header.ID
1412 }
1413
1414 var ar *applyResult
1415 needResult := s.w.IsRegistered(id)
1416 if needResult || !noSideEffect(&raftReq) {
1417 if !needResult && raftReq.Txn != nil {
1418 removeNeedlessRangeReqs(raftReq.Txn)
1419 }
1420 ar = s.applyV3.Apply(&raftReq)
1421 }
1422
1423 if ar == nil {
1424 return
1425 }
1426
1427 if ar.err != ErrNoSpace || len(s.alarmStore.Get(pb.AlarmType_NOSPACE)) > 0 {
1428 s.w.Trigger(id, ar)
1429 return
1430 }
1431
1432 plog.Errorf("applying raft message exceeded backend quota")
1433 s.goAttach(func() {
1434 a := &pb.AlarmRequest{
1435 MemberID: uint64(s.ID()),
1436 Action: pb.AlarmRequest_ACTIVATE,
1437 Alarm: pb.AlarmType_NOSPACE,
1438 }
1439 s.raftRequest(s.ctx, pb.InternalRaftRequest{Alarm: a})
1440 s.w.Trigger(id, ar)
1441 })
1442}
1443
1444// applyConfChange applies a ConfChange to the server. It is only
1445// invoked with a ConfChange that has already passed through Raft
1446func (s *EtcdServer) applyConfChange(cc raftpb.ConfChange, confState *raftpb.ConfState) (bool, error) {
1447 if err := s.cluster.ValidateConfigurationChange(cc); err != nil {
1448 cc.NodeID = raft.None
1449 s.r.ApplyConfChange(cc)
1450 return false, err
1451 }
1452 *confState = *s.r.ApplyConfChange(cc)
1453 switch cc.Type {
1454 case raftpb.ConfChangeAddNode:
1455 m := new(membership.Member)
1456 if err := json.Unmarshal(cc.Context, m); err != nil {
1457 plog.Panicf("unmarshal member should never fail: %v", err)
1458 }
1459 if cc.NodeID != uint64(m.ID) {
1460 plog.Panicf("nodeID should always be equal to member ID")
1461 }
1462 s.cluster.AddMember(m)
1463 if m.ID != s.id {
1464 s.r.transport.AddPeer(m.ID, m.PeerURLs)
1465 }
1466 case raftpb.ConfChangeRemoveNode:
1467 id := types.ID(cc.NodeID)
1468 s.cluster.RemoveMember(id)
1469 if id == s.id {
1470 return true, nil
1471 }
1472 s.r.transport.RemovePeer(id)
1473 case raftpb.ConfChangeUpdateNode:
1474 m := new(membership.Member)
1475 if err := json.Unmarshal(cc.Context, m); err != nil {
1476 plog.Panicf("unmarshal member should never fail: %v", err)
1477 }
1478 if cc.NodeID != uint64(m.ID) {
1479 plog.Panicf("nodeID should always be equal to member ID")
1480 }
1481 s.cluster.UpdateRaftAttributes(m.ID, m.RaftAttributes)
1482 if m.ID != s.id {
1483 s.r.transport.UpdatePeer(m.ID, m.PeerURLs)
1484 }
1485 }
1486 return false, nil
1487}
1488
1489// TODO: non-blocking snapshot
1490func (s *EtcdServer) snapshot(snapi uint64, confState raftpb.ConfState) {
1491 clone := s.store.Clone()
1492 // commit kv to write metadata (for example: consistent index) to disk.
1493 // KV().commit() updates the consistent index in backend.
1494 // All operations that update consistent index must be called sequentially
1495 // from applyAll function.
1496 // So KV().Commit() cannot run in parallel with apply. It has to be called outside
1497 // the go routine created below.
1498 s.KV().Commit()
1499
1500 s.goAttach(func() {
1501 d, err := clone.SaveNoCopy()
1502 // TODO: current store will never fail to do a snapshot
1503 // what should we do if the store might fail?
1504 if err != nil {
1505 plog.Panicf("store save should never fail: %v", err)
1506 }
1507 snap, err := s.r.raftStorage.CreateSnapshot(snapi, &confState, d)
1508 if err != nil {
1509 // the snapshot was done asynchronously with the progress of raft.
1510 // raft might have already got a newer snapshot.
1511 if err == raft.ErrSnapOutOfDate {
1512 return
1513 }
1514 plog.Panicf("unexpected create snapshot error %v", err)
1515 }
1516 // SaveSnap saves the snapshot and releases the locked wal files
1517 // to the snapshot index.
1518 if err = s.r.storage.SaveSnap(snap); err != nil {
1519 plog.Fatalf("save snapshot error: %v", err)
1520 }
1521 plog.Infof("saved snapshot at index %d", snap.Metadata.Index)
1522
1523 // When sending a snapshot, etcd will pause compaction.
1524 // After receives a snapshot, the slow follower needs to get all the entries right after
1525 // the snapshot sent to catch up. If we do not pause compaction, the log entries right after
1526 // the snapshot sent might already be compacted. It happens when the snapshot takes long time
1527 // to send and save. Pausing compaction avoids triggering a snapshot sending cycle.
1528 if atomic.LoadInt64(&s.inflightSnapshots) != 0 {
1529 plog.Infof("skip compaction since there is an inflight snapshot")
1530 return
1531 }
1532
1533 // keep some in memory log entries for slow followers.
1534 compacti := uint64(1)
1535 if snapi > numberOfCatchUpEntries {
1536 compacti = snapi - numberOfCatchUpEntries
1537 }
1538 err = s.r.raftStorage.Compact(compacti)
1539 if err != nil {
1540 // the compaction was done asynchronously with the progress of raft.
1541 // raft log might already been compact.
1542 if err == raft.ErrCompacted {
1543 return
1544 }
1545 plog.Panicf("unexpected compaction error %v", err)
1546 }
1547 plog.Infof("compacted raft log at %d", compacti)
1548 })
1549}
1550
1551// CutPeer drops messages to the specified peer.
1552func (s *EtcdServer) CutPeer(id types.ID) {
1553 tr, ok := s.r.transport.(*rafthttp.Transport)
1554 if ok {
1555 tr.CutPeer(id)
1556 }
1557}
1558
1559// MendPeer recovers the message dropping behavior of the given peer.
1560func (s *EtcdServer) MendPeer(id types.ID) {
1561 tr, ok := s.r.transport.(*rafthttp.Transport)
1562 if ok {
1563 tr.MendPeer(id)
1564 }
1565}
1566
1567func (s *EtcdServer) PauseSending() { s.r.pauseSending() }
1568
1569func (s *EtcdServer) ResumeSending() { s.r.resumeSending() }
1570
1571func (s *EtcdServer) ClusterVersion() *semver.Version {
1572 if s.cluster == nil {
1573 return nil
1574 }
1575 return s.cluster.Version()
1576}
1577
1578// monitorVersions checks the member's version every monitorVersionInterval.
1579// It updates the cluster version if all members agrees on a higher one.
1580// It prints out log if there is a member with a higher version than the
1581// local version.
1582func (s *EtcdServer) monitorVersions() {
1583 for {
1584 select {
1585 case <-s.forceVersionC:
1586 case <-time.After(monitorVersionInterval):
1587 case <-s.stopping:
1588 return
1589 }
1590
1591 if s.Leader() != s.ID() {
1592 continue
1593 }
1594
1595 v := decideClusterVersion(getVersions(s.cluster, s.id, s.peerRt))
1596 if v != nil {
1597 // only keep major.minor version for comparison
1598 v = &semver.Version{
1599 Major: v.Major,
1600 Minor: v.Minor,
1601 }
1602 }
1603
1604 // if the current version is nil:
1605 // 1. use the decided version if possible
1606 // 2. or use the min cluster version
1607 if s.cluster.Version() == nil {
1608 verStr := version.MinClusterVersion
1609 if v != nil {
1610 verStr = v.String()
1611 }
1612 s.goAttach(func() { s.updateClusterVersion(verStr) })
1613 continue
1614 }
1615
1616 // update cluster version only if the decided version is greater than
1617 // the current cluster version
1618 if v != nil && s.cluster.Version().LessThan(*v) {
1619 s.goAttach(func() { s.updateClusterVersion(v.String()) })
1620 }
1621 }
1622}
1623
1624func (s *EtcdServer) updateClusterVersion(ver string) {
1625 if s.cluster.Version() == nil {
1626 plog.Infof("setting up the initial cluster version to %s", version.Cluster(ver))
1627 } else {
1628 plog.Infof("updating the cluster version from %s to %s", version.Cluster(s.cluster.Version().String()), version.Cluster(ver))
1629 }
1630 req := pb.Request{
1631 Method: "PUT",
1632 Path: membership.StoreClusterVersionKey(),
1633 Val: ver,
1634 }
1635 ctx, cancel := context.WithTimeout(s.ctx, s.Cfg.ReqTimeout())
1636 _, err := s.Do(ctx, req)
1637 cancel()
1638 switch err {
1639 case nil:
1640 return
1641 case ErrStopped:
1642 plog.Infof("aborting update cluster version because server is stopped")
1643 return
1644 default:
1645 plog.Errorf("error updating cluster version (%v)", err)
1646 }
1647}
1648
1649func (s *EtcdServer) parseProposeCtxErr(err error, start time.Time) error {
1650 switch err {
1651 case context.Canceled:
1652 return ErrCanceled
1653 case context.DeadlineExceeded:
1654 s.leadTimeMu.RLock()
1655 curLeadElected := s.leadElectedTime
1656 s.leadTimeMu.RUnlock()
1657 prevLeadLost := curLeadElected.Add(-2 * time.Duration(s.Cfg.ElectionTicks) * time.Duration(s.Cfg.TickMs) * time.Millisecond)
1658 if start.After(prevLeadLost) && start.Before(curLeadElected) {
1659 return ErrTimeoutDueToLeaderFail
1660 }
1661
1662 lead := types.ID(atomic.LoadUint64(&s.r.lead))
1663 switch lead {
1664 case types.ID(raft.None):
1665 // TODO: return error to specify it happens because the cluster does not have leader now
1666 case s.ID():
1667 if !isConnectedToQuorumSince(s.r.transport, start, s.ID(), s.cluster.Members()) {
1668 return ErrTimeoutDueToConnectionLost
1669 }
1670 default:
1671 if !isConnectedSince(s.r.transport, start, lead) {
1672 return ErrTimeoutDueToConnectionLost
1673 }
1674 }
1675
1676 return ErrTimeout
1677 default:
1678 return err
1679 }
1680}
1681
1682func (s *EtcdServer) KV() mvcc.ConsistentWatchableKV { return s.kv }
1683func (s *EtcdServer) Backend() backend.Backend {
1684 s.bemu.Lock()
1685 defer s.bemu.Unlock()
1686 return s.be
1687}
1688
1689func (s *EtcdServer) AuthStore() auth.AuthStore { return s.authStore }
1690
1691func (s *EtcdServer) restoreAlarms() error {
1692 s.applyV3 = s.newApplierV3()
1693 as, err := alarm.NewAlarmStore(s)
1694 if err != nil {
1695 return err
1696 }
1697 s.alarmStore = as
1698 if len(as.Get(pb.AlarmType_NOSPACE)) > 0 {
1699 s.applyV3 = newApplierV3Capped(s.applyV3)
1700 }
1701 if len(as.Get(pb.AlarmType_CORRUPT)) > 0 {
1702 s.applyV3 = newApplierV3Corrupt(s.applyV3)
1703 }
1704 return nil
1705}
1706
1707func (s *EtcdServer) getAppliedIndex() uint64 {
1708 return atomic.LoadUint64(&s.appliedIndex)
1709}
1710
1711func (s *EtcdServer) setAppliedIndex(v uint64) {
1712 atomic.StoreUint64(&s.appliedIndex, v)
1713}
1714
1715func (s *EtcdServer) getCommittedIndex() uint64 {
1716 return atomic.LoadUint64(&s.committedIndex)
1717}
1718
1719func (s *EtcdServer) setCommittedIndex(v uint64) {
1720 atomic.StoreUint64(&s.committedIndex, v)
1721}
1722
1723// goAttach creates a goroutine on a given function and tracks it using
1724// the etcdserver waitgroup.
1725func (s *EtcdServer) goAttach(f func()) {
1726 s.wgMu.RLock() // this blocks with ongoing close(s.stopping)
1727 defer s.wgMu.RUnlock()
1728 select {
1729 case <-s.stopping:
1730 plog.Warning("server has stopped (skipping goAttach)")
1731 return
1732 default:
1733 }
1734
1735 // now safe to add since waitgroup wait has not started yet
1736 s.wg.Add(1)
1737 go func() {
1738 defer s.wg.Done()
1739 f()
1740 }()
1741}
1742
1743func (s *EtcdServer) Alarms() []*pb.AlarmMember {
1744 return s.alarmStore.Get(pb.AlarmType_NONE)
1745}