blob: ab6185b99ec2d731c0d49548aa3ab504e70dcabb [file] [log] [blame]
Takahiro Suzuki241c10e2020-12-17 20:17:57 +09001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package raft
16
17import (
18 "context"
19 "errors"
20
21 pb "go.etcd.io/etcd/raft/raftpb"
22)
23
24type SnapshotStatus int
25
26const (
27 SnapshotFinish SnapshotStatus = 1
28 SnapshotFailure SnapshotStatus = 2
29)
30
31var (
32 emptyState = pb.HardState{}
33
34 // ErrStopped is returned by methods on Nodes that have been stopped.
35 ErrStopped = errors.New("raft: stopped")
36)
37
38// SoftState provides state that is useful for logging and debugging.
39// The state is volatile and does not need to be persisted to the WAL.
40type SoftState struct {
41 Lead uint64 // must use atomic operations to access; keep 64-bit aligned.
42 RaftState StateType
43}
44
45func (a *SoftState) equal(b *SoftState) bool {
46 return a.Lead == b.Lead && a.RaftState == b.RaftState
47}
48
49// Ready encapsulates the entries and messages that are ready to read,
50// be saved to stable storage, committed or sent to other peers.
51// All fields in Ready are read-only.
52type Ready struct {
53 // The current volatile state of a Node.
54 // SoftState will be nil if there is no update.
55 // It is not required to consume or store SoftState.
56 *SoftState
57
58 // The current state of a Node to be saved to stable storage BEFORE
59 // Messages are sent.
60 // HardState will be equal to empty state if there is no update.
61 pb.HardState
62
63 // ReadStates can be used for node to serve linearizable read requests locally
64 // when its applied index is greater than the index in ReadState.
65 // Note that the readState will be returned when raft receives msgReadIndex.
66 // The returned is only valid for the request that requested to read.
67 ReadStates []ReadState
68
69 // Entries specifies entries to be saved to stable storage BEFORE
70 // Messages are sent.
71 Entries []pb.Entry
72
73 // Snapshot specifies the snapshot to be saved to stable storage.
74 Snapshot pb.Snapshot
75
76 // CommittedEntries specifies entries to be committed to a
77 // store/state-machine. These have previously been committed to stable
78 // store.
79 CommittedEntries []pb.Entry
80
81 // Messages specifies outbound messages to be sent AFTER Entries are
82 // committed to stable storage.
83 // If it contains a MsgSnap message, the application MUST report back to raft
84 // when the snapshot has been received or has failed by calling ReportSnapshot.
85 Messages []pb.Message
86
87 // MustSync indicates whether the HardState and Entries must be synchronously
88 // written to disk or if an asynchronous write is permissible.
89 MustSync bool
90}
91
92func isHardStateEqual(a, b pb.HardState) bool {
93 return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit
94}
95
96// IsEmptyHardState returns true if the given HardState is empty.
97func IsEmptyHardState(st pb.HardState) bool {
98 return isHardStateEqual(st, emptyState)
99}
100
101// IsEmptySnap returns true if the given Snapshot is empty.
102func IsEmptySnap(sp pb.Snapshot) bool {
103 return sp.Metadata.Index == 0
104}
105
106func (rd Ready) containsUpdates() bool {
107 return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) ||
108 !IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 ||
109 len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0
110}
111
112// appliedCursor extracts from the Ready the highest index the client has
113// applied (once the Ready is confirmed via Advance). If no information is
114// contained in the Ready, returns zero.
115func (rd Ready) appliedCursor() uint64 {
116 if n := len(rd.CommittedEntries); n > 0 {
117 return rd.CommittedEntries[n-1].Index
118 }
119 if index := rd.Snapshot.Metadata.Index; index > 0 {
120 return index
121 }
122 return 0
123}
124
125// Node represents a node in a raft cluster.
126type Node interface {
127 // Tick increments the internal logical clock for the Node by a single tick. Election
128 // timeouts and heartbeat timeouts are in units of ticks.
129 Tick()
130 // Campaign causes the Node to transition to candidate state and start campaigning to become leader.
131 Campaign(ctx context.Context) error
132 // Propose proposes that data be appended to the log. Note that proposals can be lost without
133 // notice, therefore it is user's job to ensure proposal retries.
134 Propose(ctx context.Context, data []byte) error
135 // ProposeConfChange proposes a configuration change. Like any proposal, the
136 // configuration change may be dropped with or without an error being
137 // returned. In particular, configuration changes are dropped unless the
138 // leader has certainty that there is no prior unapplied configuration
139 // change in its log.
140 //
141 // The method accepts either a pb.ConfChange (deprecated) or pb.ConfChangeV2
142 // message. The latter allows arbitrary configuration changes via joint
143 // consensus, notably including replacing a voter. Passing a ConfChangeV2
144 // message is only allowed if all Nodes participating in the cluster run a
145 // version of this library aware of the V2 API. See pb.ConfChangeV2 for
146 // usage details and semantics.
147 ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error
148
149 // Step advances the state machine using the given message. ctx.Err() will be returned, if any.
150 Step(ctx context.Context, msg pb.Message) error
151
152 // Ready returns a channel that returns the current point-in-time state.
153 // Users of the Node must call Advance after retrieving the state returned by Ready.
154 //
155 // NOTE: No committed entries from the next Ready may be applied until all committed entries
156 // and snapshots from the previous one have finished.
157 Ready() <-chan Ready
158
159 // Advance notifies the Node that the application has saved progress up to the last Ready.
160 // It prepares the node to return the next available Ready.
161 //
162 // The application should generally call Advance after it applies the entries in last Ready.
163 //
164 // However, as an optimization, the application may call Advance while it is applying the
165 // commands. For example. when the last Ready contains a snapshot, the application might take
166 // a long time to apply the snapshot data. To continue receiving Ready without blocking raft
167 // progress, it can call Advance before finishing applying the last ready.
168 Advance()
169 // ApplyConfChange applies a config change (previously passed to
170 // ProposeConfChange) to the node. This must be called whenever a config
171 // change is observed in Ready.CommittedEntries.
172 //
173 // Returns an opaque non-nil ConfState protobuf which must be recorded in
174 // snapshots.
175 ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState
176
177 // TransferLeadership attempts to transfer leadership to the given transferee.
178 TransferLeadership(ctx context.Context, lead, transferee uint64)
179
180 // ReadIndex request a read state. The read state will be set in the ready.
181 // Read state has a read index. Once the application advances further than the read
182 // index, any linearizable read requests issued before the read request can be
183 // processed safely. The read state will have the same rctx attached.
184 ReadIndex(ctx context.Context, rctx []byte) error
185
186 // Status returns the current status of the raft state machine.
187 Status() Status
188 // ReportUnreachable reports the given node is not reachable for the last send.
189 ReportUnreachable(id uint64)
190 // ReportSnapshot reports the status of the sent snapshot. The id is the raft ID of the follower
191 // who is meant to receive the snapshot, and the status is SnapshotFinish or SnapshotFailure.
192 // Calling ReportSnapshot with SnapshotFinish is a no-op. But, any failure in applying a
193 // snapshot (for e.g., while streaming it from leader to follower), should be reported to the
194 // leader with SnapshotFailure. When leader sends a snapshot to a follower, it pauses any raft
195 // log probes until the follower can apply the snapshot and advance its state. If the follower
196 // can't do that, for e.g., due to a crash, it could end up in a limbo, never getting any
197 // updates from the leader. Therefore, it is crucial that the application ensures that any
198 // failure in snapshot sending is caught and reported back to the leader; so it can resume raft
199 // log probing in the follower.
200 ReportSnapshot(id uint64, status SnapshotStatus)
201 // Stop performs any necessary termination of the Node.
202 Stop()
203}
204
205type Peer struct {
206 ID uint64
207 Context []byte
208}
209
210// StartNode returns a new Node given configuration and a list of raft peers.
211// It appends a ConfChangeAddNode entry for each given peer to the initial log.
212//
213// Peers must not be zero length; call RestartNode in that case.
214func StartNode(c *Config, peers []Peer) Node {
215 if len(peers) == 0 {
216 panic("no peers given; use RestartNode instead")
217 }
218 rn, err := NewRawNode(c)
219 if err != nil {
220 panic(err)
221 }
222 rn.Bootstrap(peers)
223
224 n := newNode(rn)
225
226 go n.run()
227 return &n
228}
229
230// RestartNode is similar to StartNode but does not take a list of peers.
231// The current membership of the cluster will be restored from the Storage.
232// If the caller has an existing state machine, pass in the last log index that
233// has been applied to it; otherwise use zero.
234func RestartNode(c *Config) Node {
235 rn, err := NewRawNode(c)
236 if err != nil {
237 panic(err)
238 }
239 n := newNode(rn)
240 go n.run()
241 return &n
242}
243
244type msgWithResult struct {
245 m pb.Message
246 result chan error
247}
248
249// node is the canonical implementation of the Node interface
250type node struct {
251 propc chan msgWithResult
252 recvc chan pb.Message
253 confc chan pb.ConfChangeV2
254 confstatec chan pb.ConfState
255 readyc chan Ready
256 advancec chan struct{}
257 tickc chan struct{}
258 done chan struct{}
259 stop chan struct{}
260 status chan chan Status
261
262 rn *RawNode
263}
264
265func newNode(rn *RawNode) node {
266 return node{
267 propc: make(chan msgWithResult),
268 recvc: make(chan pb.Message),
269 confc: make(chan pb.ConfChangeV2),
270 confstatec: make(chan pb.ConfState),
271 readyc: make(chan Ready),
272 advancec: make(chan struct{}),
273 // make tickc a buffered chan, so raft node can buffer some ticks when the node
274 // is busy processing raft messages. Raft node will resume process buffered
275 // ticks when it becomes idle.
276 tickc: make(chan struct{}, 128),
277 done: make(chan struct{}),
278 stop: make(chan struct{}),
279 status: make(chan chan Status),
280 rn: rn,
281 }
282}
283
284func (n *node) Stop() {
285 select {
286 case n.stop <- struct{}{}:
287 // Not already stopped, so trigger it
288 case <-n.done:
289 // Node has already been stopped - no need to do anything
290 return
291 }
292 // Block until the stop has been acknowledged by run()
293 <-n.done
294}
295
296func (n *node) run() {
297 var propc chan msgWithResult
298 var readyc chan Ready
299 var advancec chan struct{}
300 var rd Ready
301
302 r := n.rn.raft
303
304 lead := None
305
306 for {
307 if advancec != nil {
308 readyc = nil
309 } else if n.rn.HasReady() {
310 // Populate a Ready. Note that this Ready is not guaranteed to
311 // actually be handled. We will arm readyc, but there's no guarantee
312 // that we will actually send on it. It's possible that we will
313 // service another channel instead, loop around, and then populate
314 // the Ready again. We could instead force the previous Ready to be
315 // handled first, but it's generally good to emit larger Readys plus
316 // it simplifies testing (by emitting less frequently and more
317 // predictably).
318 rd = n.rn.readyWithoutAccept()
319 readyc = n.readyc
320 }
321
322 if lead != r.lead {
323 if r.hasLeader() {
324 if lead == None {
325 r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
326 } else {
327 r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
328 }
329 propc = n.propc
330 } else {
331 r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
332 propc = nil
333 }
334 lead = r.lead
335 }
336
337 select {
338 // TODO: maybe buffer the config propose if there exists one (the way
339 // described in raft dissertation)
340 // Currently it is dropped in Step silently.
341 case pm := <-propc:
342 m := pm.m
343 m.From = r.id
344 err := r.Step(m)
345 if pm.result != nil {
346 pm.result <- err
347 close(pm.result)
348 }
349 case m := <-n.recvc:
350 // filter out response message from unknown From.
351 if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
352 r.Step(m)
353 }
354 case cc := <-n.confc:
355 _, okBefore := r.prs.Progress[r.id]
356 cs := r.applyConfChange(cc)
357 // If the node was removed, block incoming proposals. Note that we
358 // only do this if the node was in the config before. Nodes may be
359 // a member of the group without knowing this (when they're catching
360 // up on the log and don't have the latest config) and we don't want
361 // to block the proposal channel in that case.
362 //
363 // NB: propc is reset when the leader changes, which, if we learn
364 // about it, sort of implies that we got readded, maybe? This isn't
365 // very sound and likely has bugs.
366 if _, okAfter := r.prs.Progress[r.id]; okBefore && !okAfter {
367 var found bool
368 for _, sl := range [][]uint64{cs.Voters, cs.VotersOutgoing} {
369 for _, id := range sl {
370 if id == r.id {
371 found = true
372 }
373 }
374 }
375 if !found {
376 propc = nil
377 }
378 }
379 select {
380 case n.confstatec <- cs:
381 case <-n.done:
382 }
383 case <-n.tickc:
384 n.rn.Tick()
385 case readyc <- rd:
386 n.rn.acceptReady(rd)
387 advancec = n.advancec
388 case <-advancec:
389 n.rn.Advance(rd)
390 rd = Ready{}
391 advancec = nil
392 case c := <-n.status:
393 c <- getStatus(r)
394 case <-n.stop:
395 close(n.done)
396 return
397 }
398 }
399}
400
401// Tick increments the internal logical clock for this Node. Election timeouts
402// and heartbeat timeouts are in units of ticks.
403func (n *node) Tick() {
404 select {
405 case n.tickc <- struct{}{}:
406 case <-n.done:
407 default:
408 n.rn.raft.logger.Warningf("%x (leader %v) A tick missed to fire. Node blocks too long!", n.rn.raft.id, n.rn.raft.id == n.rn.raft.lead)
409 }
410}
411
412func (n *node) Campaign(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgHup}) }
413
414func (n *node) Propose(ctx context.Context, data []byte) error {
415 return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
416}
417
418func (n *node) Step(ctx context.Context, m pb.Message) error {
419 // ignore unexpected local messages receiving over network
420 if IsLocalMsg(m.Type) {
421 // TODO: return an error?
422 return nil
423 }
424 return n.step(ctx, m)
425}
426
427func confChangeToMsg(c pb.ConfChangeI) (pb.Message, error) {
428 typ, data, err := pb.MarshalConfChange(c)
429 if err != nil {
430 return pb.Message{}, err
431 }
432 return pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: typ, Data: data}}}, nil
433}
434
435func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChangeI) error {
436 msg, err := confChangeToMsg(cc)
437 if err != nil {
438 return err
439 }
440 return n.Step(ctx, msg)
441}
442
443func (n *node) step(ctx context.Context, m pb.Message) error {
444 return n.stepWithWaitOption(ctx, m, false)
445}
446
447func (n *node) stepWait(ctx context.Context, m pb.Message) error {
448 return n.stepWithWaitOption(ctx, m, true)
449}
450
451// Step advances the state machine using msgs. The ctx.Err() will be returned,
452// if any.
453func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
454 if m.Type != pb.MsgProp {
455 select {
456 case n.recvc <- m:
457 return nil
458 case <-ctx.Done():
459 return ctx.Err()
460 case <-n.done:
461 return ErrStopped
462 }
463 }
464 ch := n.propc
465 pm := msgWithResult{m: m}
466 if wait {
467 pm.result = make(chan error, 1)
468 }
469 select {
470 case ch <- pm:
471 if !wait {
472 return nil
473 }
474 case <-ctx.Done():
475 return ctx.Err()
476 case <-n.done:
477 return ErrStopped
478 }
479 select {
480 case err := <-pm.result:
481 if err != nil {
482 return err
483 }
484 case <-ctx.Done():
485 return ctx.Err()
486 case <-n.done:
487 return ErrStopped
488 }
489 return nil
490}
491
492func (n *node) Ready() <-chan Ready { return n.readyc }
493
494func (n *node) Advance() {
495 select {
496 case n.advancec <- struct{}{}:
497 case <-n.done:
498 }
499}
500
501func (n *node) ApplyConfChange(cc pb.ConfChangeI) *pb.ConfState {
502 var cs pb.ConfState
503 select {
504 case n.confc <- cc.AsV2():
505 case <-n.done:
506 }
507 select {
508 case cs = <-n.confstatec:
509 case <-n.done:
510 }
511 return &cs
512}
513
514func (n *node) Status() Status {
515 c := make(chan Status)
516 select {
517 case n.status <- c:
518 return <-c
519 case <-n.done:
520 return Status{}
521 }
522}
523
524func (n *node) ReportUnreachable(id uint64) {
525 select {
526 case n.recvc <- pb.Message{Type: pb.MsgUnreachable, From: id}:
527 case <-n.done:
528 }
529}
530
531func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) {
532 rej := status == SnapshotFailure
533
534 select {
535 case n.recvc <- pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}:
536 case <-n.done:
537 }
538}
539
540func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) {
541 select {
542 // manually set 'from' and 'to', so that leader can voluntarily transfers its leadership
543 case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}:
544 case <-n.done:
545 case <-ctx.Done():
546 }
547}
548
549func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
550 return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
551}
552
553func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
554 rd := Ready{
555 Entries: r.raftLog.unstableEntries(),
556 CommittedEntries: r.raftLog.nextEnts(),
557 Messages: r.msgs,
558 }
559 if softSt := r.softState(); !softSt.equal(prevSoftSt) {
560 rd.SoftState = softSt
561 }
562 if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
563 rd.HardState = hardSt
564 }
565 if r.raftLog.unstable.snapshot != nil {
566 rd.Snapshot = *r.raftLog.unstable.snapshot
567 }
568 if len(r.readStates) != 0 {
569 rd.ReadStates = r.readStates
570 }
571 rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
572 return rd
573}
574
575// MustSync returns true if the hard state and count of Raft entries indicate
576// that a synchronous write to persistent storage is required.
577func MustSync(st, prevst pb.HardState, entsnum int) bool {
578 // Persistent state on all servers:
579 // (Updated on stable storage before responding to RPCs)
580 // currentTerm
581 // votedFor
582 // log entries[]
583 return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
584}