blob: 9e209e21eae005d04e8705f7d2496a0a9a1a0d0a [file] [log] [blame]
Abhilash S.L3b494632019-07-16 15:51:09 +05301// Copyright 2019 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package tracker
16
17// Inflights limits the number of MsgApp (represented by the largest index
18// contained within) sent to followers but not yet acknowledged by them. Callers
19// use Full() to check whether more messages can be sent, call Add() whenever
20// they are sending a new append, and release "quota" via FreeLE() whenever an
21// ack is received.
22type Inflights struct {
23 // the starting index in the buffer
24 start int
25 // number of inflights in the buffer
26 count int
27
28 // the size of the buffer
29 size int
30
31 // buffer contains the index of the last entry
32 // inside one message.
33 buffer []uint64
34}
35
36// NewInflights sets up an Inflights that allows up to 'size' inflight messages.
37func NewInflights(size int) *Inflights {
38 return &Inflights{
39 size: size,
40 }
41}
42
43// Add notifies the Inflights that a new message with the given index is being
44// dispatched. Full() must be called prior to Add() to verify that there is room
45// for one more message, and consecutive calls to add Add() must provide a
46// monotonic sequence of indexes.
47func (in *Inflights) Add(inflight uint64) {
48 if in.Full() {
49 panic("cannot add into a Full inflights")
50 }
51 next := in.start + in.count
52 size := in.size
53 if next >= size {
54 next -= size
55 }
56 if next >= len(in.buffer) {
57 in.grow()
58 }
59 in.buffer[next] = inflight
60 in.count++
61}
62
63// grow the inflight buffer by doubling up to inflights.size. We grow on demand
64// instead of preallocating to inflights.size to handle systems which have
65// thousands of Raft groups per process.
66func (in *Inflights) grow() {
67 newSize := len(in.buffer) * 2
68 if newSize == 0 {
69 newSize = 1
70 } else if newSize > in.size {
71 newSize = in.size
72 }
73 newBuffer := make([]uint64, newSize)
74 copy(newBuffer, in.buffer)
75 in.buffer = newBuffer
76}
77
78// FreeLE frees the inflights smaller or equal to the given `to` flight.
79func (in *Inflights) FreeLE(to uint64) {
80 if in.count == 0 || to < in.buffer[in.start] {
81 // out of the left side of the window
82 return
83 }
84
85 idx := in.start
86 var i int
87 for i = 0; i < in.count; i++ {
88 if to < in.buffer[idx] { // found the first large inflight
89 break
90 }
91
92 // increase index and maybe rotate
93 size := in.size
94 if idx++; idx >= size {
95 idx -= size
96 }
97 }
98 // free i inflights and set new start index
99 in.count -= i
100 in.start = idx
101 if in.count == 0 {
102 // inflights is empty, reset the start index so that we don't grow the
103 // buffer unnecessarily.
104 in.start = 0
105 }
106}
107
108// FreeFirstOne releases the first inflight. This is a no-op if nothing is
109// inflight.
110func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) }
111
112// Full returns true if no more messages can be sent at the moment.
113func (in *Inflights) Full() bool {
114 return in.count == in.size
115}
116
117// Count returns the number of inflight messages.
118func (in *Inflights) Count() int { return in.count }
119
120// reset frees all inflights.
121func (in *Inflights) reset() {
122 in.count = 0
123 in.start = 0
124}