blob: 697277b26430457d9ba307225531acc31db32ae9 [file] [log] [blame]
Abhilash S.L3b494632019-07-16 15:51:09 +05301// Copyright 2019 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package tracker
16
17import (
18 "fmt"
Devmalya Paulfb990a52019-07-09 10:01:49 -040019 "sort"
Abhilash S.L3b494632019-07-16 15:51:09 +053020 "strings"
21)
22
23// Progress represents a follower’s progress in the view of the leader. Leader
24// maintains progresses of all followers, and sends entries to the follower
25// based on its progress.
26//
27// NB(tbg): Progress is basically a state machine whose transitions are mostly
28// strewn around `*raft.raft`. Additionally, some fields are only used when in a
29// certain State. All of this isn't ideal.
30type Progress struct {
31 Match, Next uint64
32 // State defines how the leader should interact with the follower.
33 //
34 // When in StateProbe, leader sends at most one replication message
35 // per heartbeat interval. It also probes actual progress of the follower.
36 //
37 // When in StateReplicate, leader optimistically increases next
38 // to the latest entry sent after sending replication message. This is
39 // an optimized state for fast replicating log entries to the follower.
40 //
41 // When in StateSnapshot, leader should have sent out snapshot
42 // before and stops sending any replication message.
43 State StateType
44
45 // PendingSnapshot is used in StateSnapshot.
46 // If there is a pending snapshot, the pendingSnapshot will be set to the
47 // index of the snapshot. If pendingSnapshot is set, the replication process of
48 // this Progress will be paused. raft will not resend snapshot until the pending one
49 // is reported to be failed.
50 PendingSnapshot uint64
51
52 // RecentActive is true if the progress is recently active. Receiving any messages
53 // from the corresponding follower indicates the progress is active.
54 // RecentActive can be reset to false after an election timeout.
55 RecentActive bool
56
57 // ProbeSent is used while this follower is in StateProbe. When ProbeSent is
58 // true, raft should pause sending replication message to this peer until
59 // ProbeSent is reset. See ProbeAcked() and IsPaused().
60 ProbeSent bool
61
62 // Inflights is a sliding window for the inflight messages.
63 // Each inflight message contains one or more log entries.
64 // The max number of entries per message is defined in raft config as MaxSizePerMsg.
65 // Thus inflight effectively limits both the number of inflight messages
66 // and the bandwidth each Progress can use.
67 // When inflights is Full, no more message should be sent.
68 // When a leader sends out a message, the index of the last
69 // entry should be added to inflights. The index MUST be added
70 // into inflights in order.
71 // When a leader receives a reply, the previous inflights should
72 // be freed by calling inflights.FreeLE with the index of the last
73 // received entry.
74 Inflights *Inflights
75
76 // IsLearner is true if this progress is tracked for a learner.
77 IsLearner bool
78}
79
80// ResetState moves the Progress into the specified State, resetting ProbeSent,
81// PendingSnapshot, and Inflights.
82func (pr *Progress) ResetState(state StateType) {
83 pr.ProbeSent = false
84 pr.PendingSnapshot = 0
85 pr.State = state
86 pr.Inflights.reset()
87}
88
89func max(a, b uint64) uint64 {
90 if a > b {
91 return a
92 }
93 return b
94}
95
96func min(a, b uint64) uint64 {
97 if a > b {
98 return b
99 }
100 return a
101}
102
103// ProbeAcked is called when this peer has accepted an append. It resets
104// ProbeSent to signal that additional append messages should be sent without
105// further delay.
106func (pr *Progress) ProbeAcked() {
107 pr.ProbeSent = false
108}
109
110// BecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
111// optionally and if larger, the index of the pending snapshot.
112func (pr *Progress) BecomeProbe() {
113 // If the original state is StateSnapshot, progress knows that
114 // the pending snapshot has been sent to this peer successfully, then
115 // probes from pendingSnapshot + 1.
116 if pr.State == StateSnapshot {
117 pendingSnapshot := pr.PendingSnapshot
118 pr.ResetState(StateProbe)
119 pr.Next = max(pr.Match+1, pendingSnapshot+1)
120 } else {
121 pr.ResetState(StateProbe)
122 pr.Next = pr.Match + 1
123 }
124}
125
126// BecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
127func (pr *Progress) BecomeReplicate() {
128 pr.ResetState(StateReplicate)
129 pr.Next = pr.Match + 1
130}
131
132// BecomeSnapshot moves the Progress to StateSnapshot with the specified pending
133// snapshot index.
134func (pr *Progress) BecomeSnapshot(snapshoti uint64) {
135 pr.ResetState(StateSnapshot)
136 pr.PendingSnapshot = snapshoti
137}
138
139// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
140// index acked by it. The method returns false if the given n index comes from
141// an outdated message. Otherwise it updates the progress and returns true.
142func (pr *Progress) MaybeUpdate(n uint64) bool {
143 var updated bool
144 if pr.Match < n {
145 pr.Match = n
146 updated = true
147 pr.ProbeAcked()
148 }
149 if pr.Next < n+1 {
150 pr.Next = n + 1
151 }
152 return updated
153}
154
155// OptimisticUpdate signals that appends all the way up to and including index n
156// are in-flight. As a result, Next is increased to n+1.
157func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 }
158
159// MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
160// arguments are the index the follower rejected to append to its log, and its
161// last index.
162//
163// Rejections can happen spuriously as messages are sent out of order or
164// duplicated. In such cases, the rejection pertains to an index that the
165// Progress already knows were previously acknowledged, and false is returned
166// without changing the Progress.
167//
168// If the rejection is genuine, Next is lowered sensibly, and the Progress is
169// cleared for sending log entries.
170func (pr *Progress) MaybeDecrTo(rejected, last uint64) bool {
171 if pr.State == StateReplicate {
172 // The rejection must be stale if the progress has matched and "rejected"
173 // is smaller than "match".
174 if rejected <= pr.Match {
175 return false
176 }
177 // Directly decrease next to match + 1.
178 //
179 // TODO(tbg): why not use last if it's larger?
180 pr.Next = pr.Match + 1
181 return true
182 }
183
184 // The rejection must be stale if "rejected" does not match next - 1. This
185 // is because non-replicating followers are probed one entry at a time.
186 if pr.Next-1 != rejected {
187 return false
188 }
189
190 if pr.Next = min(rejected, last+1); pr.Next < 1 {
191 pr.Next = 1
192 }
193 pr.ProbeSent = false
194 return true
195}
196
197// IsPaused returns whether sending log entries to this node has been throttled.
198// This is done when a node has rejected recent MsgApps, is currently waiting
199// for a snapshot, or has reached the MaxInflightMsgs limit. In normal
200// operation, this is false. A throttled node will be contacted less frequently
201// until it has reached a state in which it's able to accept a steady stream of
202// log entries again.
203func (pr *Progress) IsPaused() bool {
204 switch pr.State {
205 case StateProbe:
206 return pr.ProbeSent
207 case StateReplicate:
208 return pr.Inflights.Full()
209 case StateSnapshot:
210 return true
211 default:
212 panic("unexpected state")
213 }
214}
215
216func (pr *Progress) String() string {
217 var buf strings.Builder
218 fmt.Fprintf(&buf, "%s match=%d next=%d", pr.State, pr.Match, pr.Next)
219 if pr.IsLearner {
220 fmt.Fprint(&buf, " learner")
221 }
222 if pr.IsPaused() {
223 fmt.Fprint(&buf, " paused")
224 }
225 if pr.PendingSnapshot > 0 {
226 fmt.Fprintf(&buf, " pendingSnap=%d", pr.PendingSnapshot)
227 }
228 if !pr.RecentActive {
229 fmt.Fprintf(&buf, " inactive")
230 }
231 if n := pr.Inflights.Count(); n > 0 {
232 fmt.Fprintf(&buf, " inflight=%d", n)
233 if pr.Inflights.Full() {
234 fmt.Fprint(&buf, "[full]")
235 }
236 }
237 return buf.String()
238}
Devmalya Paulfb990a52019-07-09 10:01:49 -0400239
240// ProgressMap is a map of *Progress.
241type ProgressMap map[uint64]*Progress
242
243// String prints the ProgressMap in sorted key order, one Progress per line.
244func (m ProgressMap) String() string {
245 ids := make([]uint64, 0, len(m))
246 for k := range m {
247 ids = append(ids, k)
248 }
249 sort.Slice(ids, func(i, j int) bool {
250 return ids[i] < ids[j]
251 })
252 var buf strings.Builder
253 for _, id := range ids {
254 fmt.Fprintf(&buf, "%d: %s\n", id, m[id])
255 }
256 return buf.String()
257}