blob: a7f1ab7d38fc88131744d04b25dba81403d78210 [file] [log] [blame]
Abhilash S.L3b494632019-07-16 15:51:09 +05301// Copyright 2019 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package tracker
16
17import (
18 "fmt"
19 "strings"
20)
21
22// Progress represents a follower’s progress in the view of the leader. Leader
23// maintains progresses of all followers, and sends entries to the follower
24// based on its progress.
25//
26// NB(tbg): Progress is basically a state machine whose transitions are mostly
27// strewn around `*raft.raft`. Additionally, some fields are only used when in a
28// certain State. All of this isn't ideal.
29type Progress struct {
30 Match, Next uint64
31 // State defines how the leader should interact with the follower.
32 //
33 // When in StateProbe, leader sends at most one replication message
34 // per heartbeat interval. It also probes actual progress of the follower.
35 //
36 // When in StateReplicate, leader optimistically increases next
37 // to the latest entry sent after sending replication message. This is
38 // an optimized state for fast replicating log entries to the follower.
39 //
40 // When in StateSnapshot, leader should have sent out snapshot
41 // before and stops sending any replication message.
42 State StateType
43
44 // PendingSnapshot is used in StateSnapshot.
45 // If there is a pending snapshot, the pendingSnapshot will be set to the
46 // index of the snapshot. If pendingSnapshot is set, the replication process of
47 // this Progress will be paused. raft will not resend snapshot until the pending one
48 // is reported to be failed.
49 PendingSnapshot uint64
50
51 // RecentActive is true if the progress is recently active. Receiving any messages
52 // from the corresponding follower indicates the progress is active.
53 // RecentActive can be reset to false after an election timeout.
54 RecentActive bool
55
56 // ProbeSent is used while this follower is in StateProbe. When ProbeSent is
57 // true, raft should pause sending replication message to this peer until
58 // ProbeSent is reset. See ProbeAcked() and IsPaused().
59 ProbeSent bool
60
61 // Inflights is a sliding window for the inflight messages.
62 // Each inflight message contains one or more log entries.
63 // The max number of entries per message is defined in raft config as MaxSizePerMsg.
64 // Thus inflight effectively limits both the number of inflight messages
65 // and the bandwidth each Progress can use.
66 // When inflights is Full, no more message should be sent.
67 // When a leader sends out a message, the index of the last
68 // entry should be added to inflights. The index MUST be added
69 // into inflights in order.
70 // When a leader receives a reply, the previous inflights should
71 // be freed by calling inflights.FreeLE with the index of the last
72 // received entry.
73 Inflights *Inflights
74
75 // IsLearner is true if this progress is tracked for a learner.
76 IsLearner bool
77}
78
79// ResetState moves the Progress into the specified State, resetting ProbeSent,
80// PendingSnapshot, and Inflights.
81func (pr *Progress) ResetState(state StateType) {
82 pr.ProbeSent = false
83 pr.PendingSnapshot = 0
84 pr.State = state
85 pr.Inflights.reset()
86}
87
88func max(a, b uint64) uint64 {
89 if a > b {
90 return a
91 }
92 return b
93}
94
95func min(a, b uint64) uint64 {
96 if a > b {
97 return b
98 }
99 return a
100}
101
102// ProbeAcked is called when this peer has accepted an append. It resets
103// ProbeSent to signal that additional append messages should be sent without
104// further delay.
105func (pr *Progress) ProbeAcked() {
106 pr.ProbeSent = false
107}
108
109// BecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
110// optionally and if larger, the index of the pending snapshot.
111func (pr *Progress) BecomeProbe() {
112 // If the original state is StateSnapshot, progress knows that
113 // the pending snapshot has been sent to this peer successfully, then
114 // probes from pendingSnapshot + 1.
115 if pr.State == StateSnapshot {
116 pendingSnapshot := pr.PendingSnapshot
117 pr.ResetState(StateProbe)
118 pr.Next = max(pr.Match+1, pendingSnapshot+1)
119 } else {
120 pr.ResetState(StateProbe)
121 pr.Next = pr.Match + 1
122 }
123}
124
125// BecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
126func (pr *Progress) BecomeReplicate() {
127 pr.ResetState(StateReplicate)
128 pr.Next = pr.Match + 1
129}
130
131// BecomeSnapshot moves the Progress to StateSnapshot with the specified pending
132// snapshot index.
133func (pr *Progress) BecomeSnapshot(snapshoti uint64) {
134 pr.ResetState(StateSnapshot)
135 pr.PendingSnapshot = snapshoti
136}
137
138// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
139// index acked by it. The method returns false if the given n index comes from
140// an outdated message. Otherwise it updates the progress and returns true.
141func (pr *Progress) MaybeUpdate(n uint64) bool {
142 var updated bool
143 if pr.Match < n {
144 pr.Match = n
145 updated = true
146 pr.ProbeAcked()
147 }
148 if pr.Next < n+1 {
149 pr.Next = n + 1
150 }
151 return updated
152}
153
154// OptimisticUpdate signals that appends all the way up to and including index n
155// are in-flight. As a result, Next is increased to n+1.
156func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 }
157
158// MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
159// arguments are the index the follower rejected to append to its log, and its
160// last index.
161//
162// Rejections can happen spuriously as messages are sent out of order or
163// duplicated. In such cases, the rejection pertains to an index that the
164// Progress already knows were previously acknowledged, and false is returned
165// without changing the Progress.
166//
167// If the rejection is genuine, Next is lowered sensibly, and the Progress is
168// cleared for sending log entries.
169func (pr *Progress) MaybeDecrTo(rejected, last uint64) bool {
170 if pr.State == StateReplicate {
171 // The rejection must be stale if the progress has matched and "rejected"
172 // is smaller than "match".
173 if rejected <= pr.Match {
174 return false
175 }
176 // Directly decrease next to match + 1.
177 //
178 // TODO(tbg): why not use last if it's larger?
179 pr.Next = pr.Match + 1
180 return true
181 }
182
183 // The rejection must be stale if "rejected" does not match next - 1. This
184 // is because non-replicating followers are probed one entry at a time.
185 if pr.Next-1 != rejected {
186 return false
187 }
188
189 if pr.Next = min(rejected, last+1); pr.Next < 1 {
190 pr.Next = 1
191 }
192 pr.ProbeSent = false
193 return true
194}
195
196// IsPaused returns whether sending log entries to this node has been throttled.
197// This is done when a node has rejected recent MsgApps, is currently waiting
198// for a snapshot, or has reached the MaxInflightMsgs limit. In normal
199// operation, this is false. A throttled node will be contacted less frequently
200// until it has reached a state in which it's able to accept a steady stream of
201// log entries again.
202func (pr *Progress) IsPaused() bool {
203 switch pr.State {
204 case StateProbe:
205 return pr.ProbeSent
206 case StateReplicate:
207 return pr.Inflights.Full()
208 case StateSnapshot:
209 return true
210 default:
211 panic("unexpected state")
212 }
213}
214
215func (pr *Progress) String() string {
216 var buf strings.Builder
217 fmt.Fprintf(&buf, "%s match=%d next=%d", pr.State, pr.Match, pr.Next)
218 if pr.IsLearner {
219 fmt.Fprint(&buf, " learner")
220 }
221 if pr.IsPaused() {
222 fmt.Fprint(&buf, " paused")
223 }
224 if pr.PendingSnapshot > 0 {
225 fmt.Fprintf(&buf, " pendingSnap=%d", pr.PendingSnapshot)
226 }
227 if !pr.RecentActive {
228 fmt.Fprintf(&buf, " inactive")
229 }
230 if n := pr.Inflights.Count(); n > 0 {
231 fmt.Fprintf(&buf, " inflight=%d", n)
232 if pr.Inflights.Full() {
233 fmt.Fprint(&buf, "[full]")
234 }
235 }
236 return buf.String()
237}