blob: 2ec2c3a64bd43e9e1f3a23fdd7affe1cb8219384 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package raft
16
17import (
18 "context"
19 "errors"
20
21 pb "go.etcd.io/etcd/raft/raftpb"
22)
23
24type SnapshotStatus int
25
26const (
27 SnapshotFinish SnapshotStatus = 1
28 SnapshotFailure SnapshotStatus = 2
29)
30
31var (
32 emptyState = pb.HardState{}
33
34 // ErrStopped is returned by methods on Nodes that have been stopped.
35 ErrStopped = errors.New("raft: stopped")
36)
37
38// SoftState provides state that is useful for logging and debugging.
39// The state is volatile and does not need to be persisted to the WAL.
40type SoftState struct {
41 Lead uint64 // must use atomic operations to access; keep 64-bit aligned.
42 RaftState StateType
43}
44
45func (a *SoftState) equal(b *SoftState) bool {
46 return a.Lead == b.Lead && a.RaftState == b.RaftState
47}
48
49// Ready encapsulates the entries and messages that are ready to read,
50// be saved to stable storage, committed or sent to other peers.
51// All fields in Ready are read-only.
52type Ready struct {
53 // The current volatile state of a Node.
54 // SoftState will be nil if there is no update.
55 // It is not required to consume or store SoftState.
56 *SoftState
57
58 // The current state of a Node to be saved to stable storage BEFORE
59 // Messages are sent.
60 // HardState will be equal to empty state if there is no update.
61 pb.HardState
62
63 // ReadStates can be used for node to serve linearizable read requests locally
64 // when its applied index is greater than the index in ReadState.
65 // Note that the readState will be returned when raft receives msgReadIndex.
66 // The returned is only valid for the request that requested to read.
67 ReadStates []ReadState
68
69 // Entries specifies entries to be saved to stable storage BEFORE
70 // Messages are sent.
71 Entries []pb.Entry
72
73 // Snapshot specifies the snapshot to be saved to stable storage.
74 Snapshot pb.Snapshot
75
76 // CommittedEntries specifies entries to be committed to a
77 // store/state-machine. These have previously been committed to stable
78 // store.
79 CommittedEntries []pb.Entry
80
81 // Messages specifies outbound messages to be sent AFTER Entries are
82 // committed to stable storage.
83 // If it contains a MsgSnap message, the application MUST report back to raft
84 // when the snapshot has been received or has failed by calling ReportSnapshot.
85 Messages []pb.Message
86
87 // MustSync indicates whether the HardState and Entries must be synchronously
88 // written to disk or if an asynchronous write is permissible.
89 MustSync bool
90}
91
92func isHardStateEqual(a, b pb.HardState) bool {
93 return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit
94}
95
96// IsEmptyHardState returns true if the given HardState is empty.
97func IsEmptyHardState(st pb.HardState) bool {
98 return isHardStateEqual(st, emptyState)
99}
100
101// IsEmptySnap returns true if the given Snapshot is empty.
102func IsEmptySnap(sp pb.Snapshot) bool {
103 return sp.Metadata.Index == 0
104}
105
106func (rd Ready) containsUpdates() bool {
107 return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) ||
108 !IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 ||
109 len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0
110}
111
112// appliedCursor extracts from the Ready the highest index the client has
113// applied (once the Ready is confirmed via Advance). If no information is
114// contained in the Ready, returns zero.
115func (rd Ready) appliedCursor() uint64 {
116 if n := len(rd.CommittedEntries); n > 0 {
117 return rd.CommittedEntries[n-1].Index
118 }
119 if index := rd.Snapshot.Metadata.Index; index > 0 {
120 return index
121 }
122 return 0
123}
124
125// Node represents a node in a raft cluster.
126type Node interface {
127 // Tick increments the internal logical clock for the Node by a single tick. Election
128 // timeouts and heartbeat timeouts are in units of ticks.
129 Tick()
130 // Campaign causes the Node to transition to candidate state and start campaigning to become leader.
131 Campaign(ctx context.Context) error
132 // Propose proposes that data be appended to the log. Note that proposals can be lost without
133 // notice, therefore it is user's job to ensure proposal retries.
134 Propose(ctx context.Context, data []byte) error
135 // ProposeConfChange proposes config change.
136 // At most one ConfChange can be in the process of going through consensus.
137 // Application needs to call ApplyConfChange when applying EntryConfChange type entry.
138 ProposeConfChange(ctx context.Context, cc pb.ConfChange) error
139 // Step advances the state machine using the given message. ctx.Err() will be returned, if any.
140 Step(ctx context.Context, msg pb.Message) error
141
142 // Ready returns a channel that returns the current point-in-time state.
143 // Users of the Node must call Advance after retrieving the state returned by Ready.
144 //
145 // NOTE: No committed entries from the next Ready may be applied until all committed entries
146 // and snapshots from the previous one have finished.
147 Ready() <-chan Ready
148
149 // Advance notifies the Node that the application has saved progress up to the last Ready.
150 // It prepares the node to return the next available Ready.
151 //
152 // The application should generally call Advance after it applies the entries in last Ready.
153 //
154 // However, as an optimization, the application may call Advance while it is applying the
155 // commands. For example. when the last Ready contains a snapshot, the application might take
156 // a long time to apply the snapshot data. To continue receiving Ready without blocking raft
157 // progress, it can call Advance before finishing applying the last ready.
158 Advance()
159 // ApplyConfChange applies config change to the local node.
160 // Returns an opaque ConfState protobuf which must be recorded
161 // in snapshots. Will never return nil; it returns a pointer only
162 // to match MemoryStorage.Compact.
163 ApplyConfChange(cc pb.ConfChange) *pb.ConfState
164
165 // TransferLeadership attempts to transfer leadership to the given transferee.
166 TransferLeadership(ctx context.Context, lead, transferee uint64)
167
168 // ReadIndex request a read state. The read state will be set in the ready.
169 // Read state has a read index. Once the application advances further than the read
170 // index, any linearizable read requests issued before the read request can be
171 // processed safely. The read state will have the same rctx attached.
172 ReadIndex(ctx context.Context, rctx []byte) error
173
174 // Status returns the current status of the raft state machine.
175 Status() Status
176 // ReportUnreachable reports the given node is not reachable for the last send.
177 ReportUnreachable(id uint64)
178 // ReportSnapshot reports the status of the sent snapshot. The id is the raft ID of the follower
179 // who is meant to receive the snapshot, and the status is SnapshotFinish or SnapshotFailure.
180 // Calling ReportSnapshot with SnapshotFinish is a no-op. But, any failure in applying a
181 // snapshot (for e.g., while streaming it from leader to follower), should be reported to the
182 // leader with SnapshotFailure. When leader sends a snapshot to a follower, it pauses any raft
183 // log probes until the follower can apply the snapshot and advance its state. If the follower
184 // can't do that, for e.g., due to a crash, it could end up in a limbo, never getting any
185 // updates from the leader. Therefore, it is crucial that the application ensures that any
186 // failure in snapshot sending is caught and reported back to the leader; so it can resume raft
187 // log probing in the follower.
188 ReportSnapshot(id uint64, status SnapshotStatus)
189 // Stop performs any necessary termination of the Node.
190 Stop()
191}
192
193type Peer struct {
194 ID uint64
195 Context []byte
196}
197
198// StartNode returns a new Node given configuration and a list of raft peers.
199// It appends a ConfChangeAddNode entry for each given peer to the initial log.
200func StartNode(c *Config, peers []Peer) Node {
201 r := newRaft(c)
202 // become the follower at term 1 and apply initial configuration
203 // entries of term 1
204 r.becomeFollower(1, None)
205 for _, peer := range peers {
206 cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
207 d, err := cc.Marshal()
208 if err != nil {
209 panic("unexpected marshal error")
210 }
211 e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
212 r.raftLog.append(e)
213 }
214 // Mark these initial entries as committed.
215 // TODO(bdarnell): These entries are still unstable; do we need to preserve
216 // the invariant that committed < unstable?
217 r.raftLog.committed = r.raftLog.lastIndex()
218 // Now apply them, mainly so that the application can call Campaign
219 // immediately after StartNode in tests. Note that these nodes will
220 // be added to raft twice: here and when the application's Ready
221 // loop calls ApplyConfChange. The calls to addNode must come after
222 // all calls to raftLog.append so progress.next is set after these
223 // bootstrapping entries (it is an error if we try to append these
224 // entries since they have already been committed).
225 // We do not set raftLog.applied so the application will be able
226 // to observe all conf changes via Ready.CommittedEntries.
227 for _, peer := range peers {
228 r.addNode(peer.ID)
229 }
230
231 n := newNode()
232 n.logger = c.Logger
233 go n.run(r)
234 return &n
235}
236
237// RestartNode is similar to StartNode but does not take a list of peers.
238// The current membership of the cluster will be restored from the Storage.
239// If the caller has an existing state machine, pass in the last log index that
240// has been applied to it; otherwise use zero.
241func RestartNode(c *Config) Node {
242 r := newRaft(c)
243
244 n := newNode()
245 n.logger = c.Logger
246 go n.run(r)
247 return &n
248}
249
250type msgWithResult struct {
251 m pb.Message
252 result chan error
253}
254
255// node is the canonical implementation of the Node interface
256type node struct {
257 propc chan msgWithResult
258 recvc chan pb.Message
259 confc chan pb.ConfChange
260 confstatec chan pb.ConfState
261 readyc chan Ready
262 advancec chan struct{}
263 tickc chan struct{}
264 done chan struct{}
265 stop chan struct{}
266 status chan chan Status
267
268 logger Logger
269}
270
271func newNode() node {
272 return node{
273 propc: make(chan msgWithResult),
274 recvc: make(chan pb.Message),
275 confc: make(chan pb.ConfChange),
276 confstatec: make(chan pb.ConfState),
277 readyc: make(chan Ready),
278 advancec: make(chan struct{}),
279 // make tickc a buffered chan, so raft node can buffer some ticks when the node
280 // is busy processing raft messages. Raft node will resume process buffered
281 // ticks when it becomes idle.
282 tickc: make(chan struct{}, 128),
283 done: make(chan struct{}),
284 stop: make(chan struct{}),
285 status: make(chan chan Status),
286 }
287}
288
289func (n *node) Stop() {
290 select {
291 case n.stop <- struct{}{}:
292 // Not already stopped, so trigger it
293 case <-n.done:
294 // Node has already been stopped - no need to do anything
295 return
296 }
297 // Block until the stop has been acknowledged by run()
298 <-n.done
299}
300
301func (n *node) run(r *raft) {
302 var propc chan msgWithResult
303 var readyc chan Ready
304 var advancec chan struct{}
305 var prevLastUnstablei, prevLastUnstablet uint64
306 var havePrevLastUnstablei bool
307 var prevSnapi uint64
308 var applyingToI uint64
309 var rd Ready
310
311 lead := None
312 prevSoftSt := r.softState()
313 prevHardSt := emptyState
314
315 for {
316 if advancec != nil {
317 readyc = nil
318 } else {
319 rd = newReady(r, prevSoftSt, prevHardSt)
320 if rd.containsUpdates() {
321 readyc = n.readyc
322 } else {
323 readyc = nil
324 }
325 }
326
327 if lead != r.lead {
328 if r.hasLeader() {
329 if lead == None {
330 r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
331 } else {
332 r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
333 }
334 propc = n.propc
335 } else {
336 r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
337 propc = nil
338 }
339 lead = r.lead
340 }
341
342 select {
343 // TODO: maybe buffer the config propose if there exists one (the way
344 // described in raft dissertation)
345 // Currently it is dropped in Step silently.
346 case pm := <-propc:
347 m := pm.m
348 m.From = r.id
349 err := r.Step(m)
350 if pm.result != nil {
351 pm.result <- err
352 close(pm.result)
353 }
354 case m := <-n.recvc:
355 // filter out response message from unknown From.
356 if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
357 r.Step(m)
358 }
359 case cc := <-n.confc:
360 if cc.NodeID == None {
361 select {
362 case n.confstatec <- pb.ConfState{
363 Nodes: r.nodes(),
364 Learners: r.learnerNodes()}:
365 case <-n.done:
366 }
367 break
368 }
369 switch cc.Type {
370 case pb.ConfChangeAddNode:
371 r.addNode(cc.NodeID)
372 case pb.ConfChangeAddLearnerNode:
373 r.addLearner(cc.NodeID)
374 case pb.ConfChangeRemoveNode:
375 // block incoming proposal when local node is
376 // removed
377 if cc.NodeID == r.id {
378 propc = nil
379 }
380 r.removeNode(cc.NodeID)
381 case pb.ConfChangeUpdateNode:
382 default:
383 panic("unexpected conf type")
384 }
385 select {
386 case n.confstatec <- pb.ConfState{
387 Nodes: r.nodes(),
388 Learners: r.learnerNodes()}:
389 case <-n.done:
390 }
391 case <-n.tickc:
392 r.tick()
393 case readyc <- rd:
394 if rd.SoftState != nil {
395 prevSoftSt = rd.SoftState
396 }
397 if len(rd.Entries) > 0 {
398 prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
399 prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
400 havePrevLastUnstablei = true
401 }
402 if !IsEmptyHardState(rd.HardState) {
403 prevHardSt = rd.HardState
404 }
405 if !IsEmptySnap(rd.Snapshot) {
406 prevSnapi = rd.Snapshot.Metadata.Index
407 }
408 if index := rd.appliedCursor(); index != 0 {
409 applyingToI = index
410 }
411
412 r.msgs = nil
413 r.readStates = nil
414 r.reduceUncommittedSize(rd.CommittedEntries)
415 advancec = n.advancec
416 case <-advancec:
417 if applyingToI != 0 {
418 r.raftLog.appliedTo(applyingToI)
419 applyingToI = 0
420 }
421 if havePrevLastUnstablei {
422 r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
423 havePrevLastUnstablei = false
424 }
425 r.raftLog.stableSnapTo(prevSnapi)
426 advancec = nil
427 case c := <-n.status:
428 c <- getStatus(r)
429 case <-n.stop:
430 close(n.done)
431 return
432 }
433 }
434}
435
436// Tick increments the internal logical clock for this Node. Election timeouts
437// and heartbeat timeouts are in units of ticks.
438func (n *node) Tick() {
439 select {
440 case n.tickc <- struct{}{}:
441 case <-n.done:
442 default:
443 n.logger.Warningf("A tick missed to fire. Node blocks too long!")
444 }
445}
446
447func (n *node) Campaign(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgHup}) }
448
449func (n *node) Propose(ctx context.Context, data []byte) error {
450 return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
451}
452
453func (n *node) Step(ctx context.Context, m pb.Message) error {
454 // ignore unexpected local messages receiving over network
455 if IsLocalMsg(m.Type) {
456 // TODO: return an error?
457 return nil
458 }
459 return n.step(ctx, m)
460}
461
462func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error {
463 data, err := cc.Marshal()
464 if err != nil {
465 return err
466 }
467 return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}})
468}
469
470func (n *node) step(ctx context.Context, m pb.Message) error {
471 return n.stepWithWaitOption(ctx, m, false)
472}
473
474func (n *node) stepWait(ctx context.Context, m pb.Message) error {
475 return n.stepWithWaitOption(ctx, m, true)
476}
477
478// Step advances the state machine using msgs. The ctx.Err() will be returned,
479// if any.
480func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
481 if m.Type != pb.MsgProp {
482 select {
483 case n.recvc <- m:
484 return nil
485 case <-ctx.Done():
486 return ctx.Err()
487 case <-n.done:
488 return ErrStopped
489 }
490 }
491 ch := n.propc
492 pm := msgWithResult{m: m}
493 if wait {
494 pm.result = make(chan error, 1)
495 }
496 select {
497 case ch <- pm:
498 if !wait {
499 return nil
500 }
501 case <-ctx.Done():
502 return ctx.Err()
503 case <-n.done:
504 return ErrStopped
505 }
506 select {
507 case err := <-pm.result:
508 if err != nil {
509 return err
510 }
511 case <-ctx.Done():
512 return ctx.Err()
513 case <-n.done:
514 return ErrStopped
515 }
516 return nil
517}
518
519func (n *node) Ready() <-chan Ready { return n.readyc }
520
521func (n *node) Advance() {
522 select {
523 case n.advancec <- struct{}{}:
524 case <-n.done:
525 }
526}
527
528func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
529 var cs pb.ConfState
530 select {
531 case n.confc <- cc:
532 case <-n.done:
533 }
534 select {
535 case cs = <-n.confstatec:
536 case <-n.done:
537 }
538 return &cs
539}
540
541func (n *node) Status() Status {
542 c := make(chan Status)
543 select {
544 case n.status <- c:
545 return <-c
546 case <-n.done:
547 return Status{}
548 }
549}
550
551func (n *node) ReportUnreachable(id uint64) {
552 select {
553 case n.recvc <- pb.Message{Type: pb.MsgUnreachable, From: id}:
554 case <-n.done:
555 }
556}
557
558func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) {
559 rej := status == SnapshotFailure
560
561 select {
562 case n.recvc <- pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}:
563 case <-n.done:
564 }
565}
566
567func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) {
568 select {
569 // manually set 'from' and 'to', so that leader can voluntarily transfers its leadership
570 case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}:
571 case <-n.done:
572 case <-ctx.Done():
573 }
574}
575
576func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
577 return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
578}
579
580func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
581 rd := Ready{
582 Entries: r.raftLog.unstableEntries(),
583 CommittedEntries: r.raftLog.nextEnts(),
584 Messages: r.msgs,
585 }
586 if softSt := r.softState(); !softSt.equal(prevSoftSt) {
587 rd.SoftState = softSt
588 }
589 if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
590 rd.HardState = hardSt
591 }
592 if r.raftLog.unstable.snapshot != nil {
593 rd.Snapshot = *r.raftLog.unstable.snapshot
594 }
595 if len(r.readStates) != 0 {
596 rd.ReadStates = r.readStates
597 }
598 rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
599 return rd
600}
601
602// MustSync returns true if the hard state and count of Raft entries indicate
603// that a synchronous write to persistent storage is required.
604func MustSync(st, prevst pb.HardState, entsnum int) bool {
605 // Persistent state on all servers:
606 // (Updated on stable storage before responding to RPCs)
607 // currentTerm
608 // votedFor
609 // log entries[]
610 return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
611}