blob: 62c81f45af81745178c27035d740da5479842159 [file] [log] [blame]
Takahiro Suzukid7bf8202020-12-17 20:21:59 +09001// Copyright 2019 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package tracker
16
17import (
18 "fmt"
19 "sort"
20 "strings"
21)
22
23// Progress represents a follower’s progress in the view of the leader. Leader
24// maintains progresses of all followers, and sends entries to the follower
25// based on its progress.
26//
27// NB(tbg): Progress is basically a state machine whose transitions are mostly
28// strewn around `*raft.raft`. Additionally, some fields are only used when in a
29// certain State. All of this isn't ideal.
30type Progress struct {
31 Match, Next uint64
32 // State defines how the leader should interact with the follower.
33 //
34 // When in StateProbe, leader sends at most one replication message
35 // per heartbeat interval. It also probes actual progress of the follower.
36 //
37 // When in StateReplicate, leader optimistically increases next
38 // to the latest entry sent after sending replication message. This is
39 // an optimized state for fast replicating log entries to the follower.
40 //
41 // When in StateSnapshot, leader should have sent out snapshot
42 // before and stops sending any replication message.
43 State StateType
44
45 // PendingSnapshot is used in StateSnapshot.
46 // If there is a pending snapshot, the pendingSnapshot will be set to the
47 // index of the snapshot. If pendingSnapshot is set, the replication process of
48 // this Progress will be paused. raft will not resend snapshot until the pending one
49 // is reported to be failed.
50 PendingSnapshot uint64
51
52 // RecentActive is true if the progress is recently active. Receiving any messages
53 // from the corresponding follower indicates the progress is active.
54 // RecentActive can be reset to false after an election timeout.
55 //
56 // TODO(tbg): the leader should always have this set to true.
57 RecentActive bool
58
59 // ProbeSent is used while this follower is in StateProbe. When ProbeSent is
60 // true, raft should pause sending replication message to this peer until
61 // ProbeSent is reset. See ProbeAcked() and IsPaused().
62 ProbeSent bool
63
64 // Inflights is a sliding window for the inflight messages.
65 // Each inflight message contains one or more log entries.
66 // The max number of entries per message is defined in raft config as MaxSizePerMsg.
67 // Thus inflight effectively limits both the number of inflight messages
68 // and the bandwidth each Progress can use.
69 // When inflights is Full, no more message should be sent.
70 // When a leader sends out a message, the index of the last
71 // entry should be added to inflights. The index MUST be added
72 // into inflights in order.
73 // When a leader receives a reply, the previous inflights should
74 // be freed by calling inflights.FreeLE with the index of the last
75 // received entry.
76 Inflights *Inflights
77
78 // IsLearner is true if this progress is tracked for a learner.
79 IsLearner bool
80}
81
82// ResetState moves the Progress into the specified State, resetting ProbeSent,
83// PendingSnapshot, and Inflights.
84func (pr *Progress) ResetState(state StateType) {
85 pr.ProbeSent = false
86 pr.PendingSnapshot = 0
87 pr.State = state
88 pr.Inflights.reset()
89}
90
91func max(a, b uint64) uint64 {
92 if a > b {
93 return a
94 }
95 return b
96}
97
98func min(a, b uint64) uint64 {
99 if a > b {
100 return b
101 }
102 return a
103}
104
105// ProbeAcked is called when this peer has accepted an append. It resets
106// ProbeSent to signal that additional append messages should be sent without
107// further delay.
108func (pr *Progress) ProbeAcked() {
109 pr.ProbeSent = false
110}
111
112// BecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
113// optionally and if larger, the index of the pending snapshot.
114func (pr *Progress) BecomeProbe() {
115 // If the original state is StateSnapshot, progress knows that
116 // the pending snapshot has been sent to this peer successfully, then
117 // probes from pendingSnapshot + 1.
118 if pr.State == StateSnapshot {
119 pendingSnapshot := pr.PendingSnapshot
120 pr.ResetState(StateProbe)
121 pr.Next = max(pr.Match+1, pendingSnapshot+1)
122 } else {
123 pr.ResetState(StateProbe)
124 pr.Next = pr.Match + 1
125 }
126}
127
128// BecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
129func (pr *Progress) BecomeReplicate() {
130 pr.ResetState(StateReplicate)
131 pr.Next = pr.Match + 1
132}
133
134// BecomeSnapshot moves the Progress to StateSnapshot with the specified pending
135// snapshot index.
136func (pr *Progress) BecomeSnapshot(snapshoti uint64) {
137 pr.ResetState(StateSnapshot)
138 pr.PendingSnapshot = snapshoti
139}
140
141// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
142// index acked by it. The method returns false if the given n index comes from
143// an outdated message. Otherwise it updates the progress and returns true.
144func (pr *Progress) MaybeUpdate(n uint64) bool {
145 var updated bool
146 if pr.Match < n {
147 pr.Match = n
148 updated = true
149 pr.ProbeAcked()
150 }
151 if pr.Next < n+1 {
152 pr.Next = n + 1
153 }
154 return updated
155}
156
157// OptimisticUpdate signals that appends all the way up to and including index n
158// are in-flight. As a result, Next is increased to n+1.
159func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 }
160
161// MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
162// arguments are the index the follower rejected to append to its log, and its
163// last index.
164//
165// Rejections can happen spuriously as messages are sent out of order or
166// duplicated. In such cases, the rejection pertains to an index that the
167// Progress already knows were previously acknowledged, and false is returned
168// without changing the Progress.
169//
170// If the rejection is genuine, Next is lowered sensibly, and the Progress is
171// cleared for sending log entries.
172func (pr *Progress) MaybeDecrTo(rejected, last uint64) bool {
173 if pr.State == StateReplicate {
174 // The rejection must be stale if the progress has matched and "rejected"
175 // is smaller than "match".
176 if rejected <= pr.Match {
177 return false
178 }
179 // Directly decrease next to match + 1.
180 //
181 // TODO(tbg): why not use last if it's larger?
182 pr.Next = pr.Match + 1
183 return true
184 }
185
186 // The rejection must be stale if "rejected" does not match next - 1. This
187 // is because non-replicating followers are probed one entry at a time.
188 if pr.Next-1 != rejected {
189 return false
190 }
191
192 if pr.Next = min(rejected, last+1); pr.Next < 1 {
193 pr.Next = 1
194 }
195 pr.ProbeSent = false
196 return true
197}
198
199// IsPaused returns whether sending log entries to this node has been throttled.
200// This is done when a node has rejected recent MsgApps, is currently waiting
201// for a snapshot, or has reached the MaxInflightMsgs limit. In normal
202// operation, this is false. A throttled node will be contacted less frequently
203// until it has reached a state in which it's able to accept a steady stream of
204// log entries again.
205func (pr *Progress) IsPaused() bool {
206 switch pr.State {
207 case StateProbe:
208 return pr.ProbeSent
209 case StateReplicate:
210 return pr.Inflights.Full()
211 case StateSnapshot:
212 return true
213 default:
214 panic("unexpected state")
215 }
216}
217
218func (pr *Progress) String() string {
219 var buf strings.Builder
220 fmt.Fprintf(&buf, "%s match=%d next=%d", pr.State, pr.Match, pr.Next)
221 if pr.IsLearner {
222 fmt.Fprint(&buf, " learner")
223 }
224 if pr.IsPaused() {
225 fmt.Fprint(&buf, " paused")
226 }
227 if pr.PendingSnapshot > 0 {
228 fmt.Fprintf(&buf, " pendingSnap=%d", pr.PendingSnapshot)
229 }
230 if !pr.RecentActive {
231 fmt.Fprintf(&buf, " inactive")
232 }
233 if n := pr.Inflights.Count(); n > 0 {
234 fmt.Fprintf(&buf, " inflight=%d", n)
235 if pr.Inflights.Full() {
236 fmt.Fprint(&buf, "[full]")
237 }
238 }
239 return buf.String()
240}
241
242// ProgressMap is a map of *Progress.
243type ProgressMap map[uint64]*Progress
244
245// String prints the ProgressMap in sorted key order, one Progress per line.
246func (m ProgressMap) String() string {
247 ids := make([]uint64, 0, len(m))
248 for k := range m {
249 ids = append(ids, k)
250 }
251 sort.Slice(ids, func(i, j int) bool {
252 return ids[i] < ids[j]
253 })
254 var buf strings.Builder
255 for _, id := range ids {
256 fmt.Fprintf(&buf, "%d: %s\n", id, m[id])
257 }
258 return buf.String()
259}