blob: 4a3b2f1dfd3e27531b464203dc4eec67bf97e33f [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package raft
16
17import (
18 "context"
19 "errors"
20
21 pb "go.etcd.io/etcd/raft/raftpb"
22)
23
24type SnapshotStatus int
25
26const (
27 SnapshotFinish SnapshotStatus = 1
28 SnapshotFailure SnapshotStatus = 2
29)
30
31var (
32 emptyState = pb.HardState{}
33
34 // ErrStopped is returned by methods on Nodes that have been stopped.
35 ErrStopped = errors.New("raft: stopped")
36)
37
38// SoftState provides state that is useful for logging and debugging.
39// The state is volatile and does not need to be persisted to the WAL.
40type SoftState struct {
41 Lead uint64 // must use atomic operations to access; keep 64-bit aligned.
42 RaftState StateType
43}
44
45func (a *SoftState) equal(b *SoftState) bool {
46 return a.Lead == b.Lead && a.RaftState == b.RaftState
47}
48
49// Ready encapsulates the entries and messages that are ready to read,
50// be saved to stable storage, committed or sent to other peers.
51// All fields in Ready are read-only.
52type Ready struct {
53 // The current volatile state of a Node.
54 // SoftState will be nil if there is no update.
55 // It is not required to consume or store SoftState.
56 *SoftState
57
58 // The current state of a Node to be saved to stable storage BEFORE
59 // Messages are sent.
60 // HardState will be equal to empty state if there is no update.
61 pb.HardState
62
63 // ReadStates can be used for node to serve linearizable read requests locally
64 // when its applied index is greater than the index in ReadState.
65 // Note that the readState will be returned when raft receives msgReadIndex.
66 // The returned is only valid for the request that requested to read.
67 ReadStates []ReadState
68
69 // Entries specifies entries to be saved to stable storage BEFORE
70 // Messages are sent.
71 Entries []pb.Entry
72
73 // Snapshot specifies the snapshot to be saved to stable storage.
74 Snapshot pb.Snapshot
75
76 // CommittedEntries specifies entries to be committed to a
77 // store/state-machine. These have previously been committed to stable
78 // store.
79 CommittedEntries []pb.Entry
80
81 // Messages specifies outbound messages to be sent AFTER Entries are
82 // committed to stable storage.
83 // If it contains a MsgSnap message, the application MUST report back to raft
84 // when the snapshot has been received or has failed by calling ReportSnapshot.
85 Messages []pb.Message
86
87 // MustSync indicates whether the HardState and Entries must be synchronously
88 // written to disk or if an asynchronous write is permissible.
89 MustSync bool
90}
91
92func isHardStateEqual(a, b pb.HardState) bool {
93 return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit
94}
95
96// IsEmptyHardState returns true if the given HardState is empty.
97func IsEmptyHardState(st pb.HardState) bool {
98 return isHardStateEqual(st, emptyState)
99}
100
101// IsEmptySnap returns true if the given Snapshot is empty.
102func IsEmptySnap(sp pb.Snapshot) bool {
103 return sp.Metadata.Index == 0
104}
105
106func (rd Ready) containsUpdates() bool {
107 return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) ||
108 !IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 ||
109 len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0
110}
111
112// appliedCursor extracts from the Ready the highest index the client has
113// applied (once the Ready is confirmed via Advance). If no information is
114// contained in the Ready, returns zero.
115func (rd Ready) appliedCursor() uint64 {
116 if n := len(rd.CommittedEntries); n > 0 {
117 return rd.CommittedEntries[n-1].Index
118 }
119 if index := rd.Snapshot.Metadata.Index; index > 0 {
120 return index
121 }
122 return 0
123}
124
125// Node represents a node in a raft cluster.
126type Node interface {
127 // Tick increments the internal logical clock for the Node by a single tick. Election
128 // timeouts and heartbeat timeouts are in units of ticks.
129 Tick()
130 // Campaign causes the Node to transition to candidate state and start campaigning to become leader.
131 Campaign(ctx context.Context) error
132 // Propose proposes that data be appended to the log. Note that proposals can be lost without
133 // notice, therefore it is user's job to ensure proposal retries.
134 Propose(ctx context.Context, data []byte) error
135 // ProposeConfChange proposes config change.
136 // At most one ConfChange can be in the process of going through consensus.
137 // Application needs to call ApplyConfChange when applying EntryConfChange type entry.
138 ProposeConfChange(ctx context.Context, cc pb.ConfChange) error
139 // Step advances the state machine using the given message. ctx.Err() will be returned, if any.
140 Step(ctx context.Context, msg pb.Message) error
141
142 // Ready returns a channel that returns the current point-in-time state.
143 // Users of the Node must call Advance after retrieving the state returned by Ready.
144 //
145 // NOTE: No committed entries from the next Ready may be applied until all committed entries
146 // and snapshots from the previous one have finished.
147 Ready() <-chan Ready
148
149 // Advance notifies the Node that the application has saved progress up to the last Ready.
150 // It prepares the node to return the next available Ready.
151 //
152 // The application should generally call Advance after it applies the entries in last Ready.
153 //
154 // However, as an optimization, the application may call Advance while it is applying the
155 // commands. For example. when the last Ready contains a snapshot, the application might take
156 // a long time to apply the snapshot data. To continue receiving Ready without blocking raft
157 // progress, it can call Advance before finishing applying the last ready.
158 Advance()
159 // ApplyConfChange applies config change to the local node.
160 // Returns an opaque ConfState protobuf which must be recorded
161 // in snapshots. Will never return nil; it returns a pointer only
162 // to match MemoryStorage.Compact.
163 ApplyConfChange(cc pb.ConfChange) *pb.ConfState
164
165 // TransferLeadership attempts to transfer leadership to the given transferee.
166 TransferLeadership(ctx context.Context, lead, transferee uint64)
167
168 // ReadIndex request a read state. The read state will be set in the ready.
169 // Read state has a read index. Once the application advances further than the read
170 // index, any linearizable read requests issued before the read request can be
171 // processed safely. The read state will have the same rctx attached.
172 ReadIndex(ctx context.Context, rctx []byte) error
173
174 // Status returns the current status of the raft state machine.
175 Status() Status
176 // ReportUnreachable reports the given node is not reachable for the last send.
177 ReportUnreachable(id uint64)
178 // ReportSnapshot reports the status of the sent snapshot. The id is the raft ID of the follower
179 // who is meant to receive the snapshot, and the status is SnapshotFinish or SnapshotFailure.
180 // Calling ReportSnapshot with SnapshotFinish is a no-op. But, any failure in applying a
181 // snapshot (for e.g., while streaming it from leader to follower), should be reported to the
182 // leader with SnapshotFailure. When leader sends a snapshot to a follower, it pauses any raft
183 // log probes until the follower can apply the snapshot and advance its state. If the follower
184 // can't do that, for e.g., due to a crash, it could end up in a limbo, never getting any
185 // updates from the leader. Therefore, it is crucial that the application ensures that any
186 // failure in snapshot sending is caught and reported back to the leader; so it can resume raft
187 // log probing in the follower.
188 ReportSnapshot(id uint64, status SnapshotStatus)
189 // Stop performs any necessary termination of the Node.
190 Stop()
191}
192
193type Peer struct {
194 ID uint64
195 Context []byte
196}
197
198// StartNode returns a new Node given configuration and a list of raft peers.
199// It appends a ConfChangeAddNode entry for each given peer to the initial log.
200func StartNode(c *Config, peers []Peer) Node {
201 r := newRaft(c)
202 // become the follower at term 1 and apply initial configuration
203 // entries of term 1
204 r.becomeFollower(1, None)
205 for _, peer := range peers {
206 cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
207 d, err := cc.Marshal()
208 if err != nil {
209 panic("unexpected marshal error")
210 }
Abhilash S.L3b494632019-07-16 15:51:09 +0530211 // TODO(tbg): this should append the ConfChange for the own node first
212 // and also call applyConfChange below for that node first. Otherwise
213 // we have a Raft group (for a little while) that doesn't have itself
214 // in its config, which is bad.
215 // This whole way of setting things up is rickety. The app should just
216 // populate the initial ConfState appropriately and then all of this
217 // goes away.
218 e := pb.Entry{
219 Type: pb.EntryConfChange,
220 Term: 1,
221 Index: r.raftLog.lastIndex() + 1,
222 Data: d,
223 }
William Kurkianea869482019-04-09 15:16:11 -0400224 r.raftLog.append(e)
225 }
226 // Mark these initial entries as committed.
227 // TODO(bdarnell): These entries are still unstable; do we need to preserve
228 // the invariant that committed < unstable?
229 r.raftLog.committed = r.raftLog.lastIndex()
230 // Now apply them, mainly so that the application can call Campaign
231 // immediately after StartNode in tests. Note that these nodes will
232 // be added to raft twice: here and when the application's Ready
233 // loop calls ApplyConfChange. The calls to addNode must come after
234 // all calls to raftLog.append so progress.next is set after these
235 // bootstrapping entries (it is an error if we try to append these
236 // entries since they have already been committed).
237 // We do not set raftLog.applied so the application will be able
238 // to observe all conf changes via Ready.CommittedEntries.
239 for _, peer := range peers {
Abhilash S.L3b494632019-07-16 15:51:09 +0530240 r.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode})
William Kurkianea869482019-04-09 15:16:11 -0400241 }
242
243 n := newNode()
244 n.logger = c.Logger
245 go n.run(r)
246 return &n
247}
248
249// RestartNode is similar to StartNode but does not take a list of peers.
250// The current membership of the cluster will be restored from the Storage.
251// If the caller has an existing state machine, pass in the last log index that
252// has been applied to it; otherwise use zero.
253func RestartNode(c *Config) Node {
254 r := newRaft(c)
255
256 n := newNode()
257 n.logger = c.Logger
258 go n.run(r)
259 return &n
260}
261
262type msgWithResult struct {
263 m pb.Message
264 result chan error
265}
266
267// node is the canonical implementation of the Node interface
268type node struct {
269 propc chan msgWithResult
270 recvc chan pb.Message
271 confc chan pb.ConfChange
272 confstatec chan pb.ConfState
273 readyc chan Ready
274 advancec chan struct{}
275 tickc chan struct{}
276 done chan struct{}
277 stop chan struct{}
278 status chan chan Status
279
280 logger Logger
281}
282
283func newNode() node {
284 return node{
285 propc: make(chan msgWithResult),
286 recvc: make(chan pb.Message),
287 confc: make(chan pb.ConfChange),
288 confstatec: make(chan pb.ConfState),
289 readyc: make(chan Ready),
290 advancec: make(chan struct{}),
291 // make tickc a buffered chan, so raft node can buffer some ticks when the node
292 // is busy processing raft messages. Raft node will resume process buffered
293 // ticks when it becomes idle.
294 tickc: make(chan struct{}, 128),
295 done: make(chan struct{}),
296 stop: make(chan struct{}),
297 status: make(chan chan Status),
298 }
299}
300
301func (n *node) Stop() {
302 select {
303 case n.stop <- struct{}{}:
304 // Not already stopped, so trigger it
305 case <-n.done:
306 // Node has already been stopped - no need to do anything
307 return
308 }
309 // Block until the stop has been acknowledged by run()
310 <-n.done
311}
312
313func (n *node) run(r *raft) {
314 var propc chan msgWithResult
315 var readyc chan Ready
316 var advancec chan struct{}
317 var prevLastUnstablei, prevLastUnstablet uint64
318 var havePrevLastUnstablei bool
319 var prevSnapi uint64
320 var applyingToI uint64
321 var rd Ready
322
323 lead := None
324 prevSoftSt := r.softState()
325 prevHardSt := emptyState
326
327 for {
328 if advancec != nil {
329 readyc = nil
330 } else {
331 rd = newReady(r, prevSoftSt, prevHardSt)
332 if rd.containsUpdates() {
333 readyc = n.readyc
334 } else {
335 readyc = nil
336 }
337 }
338
339 if lead != r.lead {
340 if r.hasLeader() {
341 if lead == None {
342 r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
343 } else {
344 r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
345 }
346 propc = n.propc
347 } else {
348 r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
349 propc = nil
350 }
351 lead = r.lead
352 }
353
354 select {
355 // TODO: maybe buffer the config propose if there exists one (the way
356 // described in raft dissertation)
357 // Currently it is dropped in Step silently.
358 case pm := <-propc:
359 m := pm.m
360 m.From = r.id
361 err := r.Step(m)
362 if pm.result != nil {
363 pm.result <- err
364 close(pm.result)
365 }
366 case m := <-n.recvc:
367 // filter out response message from unknown From.
Abhilash S.L3b494632019-07-16 15:51:09 +0530368 if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
William Kurkianea869482019-04-09 15:16:11 -0400369 r.Step(m)
370 }
371 case cc := <-n.confc:
Abhilash S.L3b494632019-07-16 15:51:09 +0530372 cs := r.applyConfChange(cc)
373 if _, ok := r.prs.Progress[r.id]; !ok {
William Kurkianea869482019-04-09 15:16:11 -0400374 // block incoming proposal when local node is
375 // removed
376 if cc.NodeID == r.id {
377 propc = nil
378 }
William Kurkianea869482019-04-09 15:16:11 -0400379 }
380 select {
Abhilash S.L3b494632019-07-16 15:51:09 +0530381 case n.confstatec <- cs:
William Kurkianea869482019-04-09 15:16:11 -0400382 case <-n.done:
383 }
384 case <-n.tickc:
385 r.tick()
386 case readyc <- rd:
387 if rd.SoftState != nil {
388 prevSoftSt = rd.SoftState
389 }
390 if len(rd.Entries) > 0 {
391 prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
392 prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
393 havePrevLastUnstablei = true
394 }
395 if !IsEmptyHardState(rd.HardState) {
396 prevHardSt = rd.HardState
397 }
398 if !IsEmptySnap(rd.Snapshot) {
399 prevSnapi = rd.Snapshot.Metadata.Index
400 }
401 if index := rd.appliedCursor(); index != 0 {
402 applyingToI = index
403 }
404
405 r.msgs = nil
406 r.readStates = nil
407 r.reduceUncommittedSize(rd.CommittedEntries)
408 advancec = n.advancec
409 case <-advancec:
410 if applyingToI != 0 {
411 r.raftLog.appliedTo(applyingToI)
412 applyingToI = 0
413 }
414 if havePrevLastUnstablei {
415 r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
416 havePrevLastUnstablei = false
417 }
418 r.raftLog.stableSnapTo(prevSnapi)
419 advancec = nil
420 case c := <-n.status:
421 c <- getStatus(r)
422 case <-n.stop:
423 close(n.done)
424 return
425 }
426 }
427}
428
429// Tick increments the internal logical clock for this Node. Election timeouts
430// and heartbeat timeouts are in units of ticks.
431func (n *node) Tick() {
432 select {
433 case n.tickc <- struct{}{}:
434 case <-n.done:
435 default:
436 n.logger.Warningf("A tick missed to fire. Node blocks too long!")
437 }
438}
439
440func (n *node) Campaign(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgHup}) }
441
442func (n *node) Propose(ctx context.Context, data []byte) error {
443 return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
444}
445
446func (n *node) Step(ctx context.Context, m pb.Message) error {
447 // ignore unexpected local messages receiving over network
448 if IsLocalMsg(m.Type) {
449 // TODO: return an error?
450 return nil
451 }
452 return n.step(ctx, m)
453}
454
455func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error {
456 data, err := cc.Marshal()
457 if err != nil {
458 return err
459 }
460 return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}})
461}
462
463func (n *node) step(ctx context.Context, m pb.Message) error {
464 return n.stepWithWaitOption(ctx, m, false)
465}
466
467func (n *node) stepWait(ctx context.Context, m pb.Message) error {
468 return n.stepWithWaitOption(ctx, m, true)
469}
470
471// Step advances the state machine using msgs. The ctx.Err() will be returned,
472// if any.
473func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
474 if m.Type != pb.MsgProp {
475 select {
476 case n.recvc <- m:
477 return nil
478 case <-ctx.Done():
479 return ctx.Err()
480 case <-n.done:
481 return ErrStopped
482 }
483 }
484 ch := n.propc
485 pm := msgWithResult{m: m}
486 if wait {
487 pm.result = make(chan error, 1)
488 }
489 select {
490 case ch <- pm:
491 if !wait {
492 return nil
493 }
494 case <-ctx.Done():
495 return ctx.Err()
496 case <-n.done:
497 return ErrStopped
498 }
499 select {
500 case err := <-pm.result:
501 if err != nil {
502 return err
503 }
504 case <-ctx.Done():
505 return ctx.Err()
506 case <-n.done:
507 return ErrStopped
508 }
509 return nil
510}
511
512func (n *node) Ready() <-chan Ready { return n.readyc }
513
514func (n *node) Advance() {
515 select {
516 case n.advancec <- struct{}{}:
517 case <-n.done:
518 }
519}
520
521func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
522 var cs pb.ConfState
523 select {
524 case n.confc <- cc:
525 case <-n.done:
526 }
527 select {
528 case cs = <-n.confstatec:
529 case <-n.done:
530 }
531 return &cs
532}
533
534func (n *node) Status() Status {
535 c := make(chan Status)
536 select {
537 case n.status <- c:
538 return <-c
539 case <-n.done:
540 return Status{}
541 }
542}
543
544func (n *node) ReportUnreachable(id uint64) {
545 select {
546 case n.recvc <- pb.Message{Type: pb.MsgUnreachable, From: id}:
547 case <-n.done:
548 }
549}
550
551func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) {
552 rej := status == SnapshotFailure
553
554 select {
555 case n.recvc <- pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}:
556 case <-n.done:
557 }
558}
559
560func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) {
561 select {
562 // manually set 'from' and 'to', so that leader can voluntarily transfers its leadership
563 case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}:
564 case <-n.done:
565 case <-ctx.Done():
566 }
567}
568
569func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
570 return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
571}
572
573func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
574 rd := Ready{
575 Entries: r.raftLog.unstableEntries(),
576 CommittedEntries: r.raftLog.nextEnts(),
577 Messages: r.msgs,
578 }
579 if softSt := r.softState(); !softSt.equal(prevSoftSt) {
580 rd.SoftState = softSt
581 }
582 if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
583 rd.HardState = hardSt
584 }
585 if r.raftLog.unstable.snapshot != nil {
586 rd.Snapshot = *r.raftLog.unstable.snapshot
587 }
588 if len(r.readStates) != 0 {
589 rd.ReadStates = r.readStates
590 }
591 rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
592 return rd
593}
594
595// MustSync returns true if the hard state and count of Raft entries indicate
596// that a synchronous write to persistent storage is required.
597func MustSync(st, prevst pb.HardState, entsnum int) bool {
598 // Persistent state on all servers:
599 // (Updated on stable storage before responding to RPCs)
600 // currentTerm
601 // votedFor
602 // log entries[]
603 return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
604}