blob: 1a056341ab51794de17b9c4af598398615f501fa [file] [log] [blame]
Takahiro Suzukid7bf8202020-12-17 20:21:59 +09001// Copyright 2019 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package tracker
16
17// Inflights limits the number of MsgApp (represented by the largest index
18// contained within) sent to followers but not yet acknowledged by them. Callers
19// use Full() to check whether more messages can be sent, call Add() whenever
20// they are sending a new append, and release "quota" via FreeLE() whenever an
21// ack is received.
22type Inflights struct {
23 // the starting index in the buffer
24 start int
25 // number of inflights in the buffer
26 count int
27
28 // the size of the buffer
29 size int
30
31 // buffer contains the index of the last entry
32 // inside one message.
33 buffer []uint64
34}
35
36// NewInflights sets up an Inflights that allows up to 'size' inflight messages.
37func NewInflights(size int) *Inflights {
38 return &Inflights{
39 size: size,
40 }
41}
42
43// Clone returns an *Inflights that is identical to but shares no memory with
44// the receiver.
45func (in *Inflights) Clone() *Inflights {
46 ins := *in
47 ins.buffer = append([]uint64(nil), in.buffer...)
48 return &ins
49}
50
51// Add notifies the Inflights that a new message with the given index is being
52// dispatched. Full() must be called prior to Add() to verify that there is room
53// for one more message, and consecutive calls to add Add() must provide a
54// monotonic sequence of indexes.
55func (in *Inflights) Add(inflight uint64) {
56 if in.Full() {
57 panic("cannot add into a Full inflights")
58 }
59 next := in.start + in.count
60 size := in.size
61 if next >= size {
62 next -= size
63 }
64 if next >= len(in.buffer) {
65 in.grow()
66 }
67 in.buffer[next] = inflight
68 in.count++
69}
70
71// grow the inflight buffer by doubling up to inflights.size. We grow on demand
72// instead of preallocating to inflights.size to handle systems which have
73// thousands of Raft groups per process.
74func (in *Inflights) grow() {
75 newSize := len(in.buffer) * 2
76 if newSize == 0 {
77 newSize = 1
78 } else if newSize > in.size {
79 newSize = in.size
80 }
81 newBuffer := make([]uint64, newSize)
82 copy(newBuffer, in.buffer)
83 in.buffer = newBuffer
84}
85
86// FreeLE frees the inflights smaller or equal to the given `to` flight.
87func (in *Inflights) FreeLE(to uint64) {
88 if in.count == 0 || to < in.buffer[in.start] {
89 // out of the left side of the window
90 return
91 }
92
93 idx := in.start
94 var i int
95 for i = 0; i < in.count; i++ {
96 if to < in.buffer[idx] { // found the first large inflight
97 break
98 }
99
100 // increase index and maybe rotate
101 size := in.size
102 if idx++; idx >= size {
103 idx -= size
104 }
105 }
106 // free i inflights and set new start index
107 in.count -= i
108 in.start = idx
109 if in.count == 0 {
110 // inflights is empty, reset the start index so that we don't grow the
111 // buffer unnecessarily.
112 in.start = 0
113 }
114}
115
116// FreeFirstOne releases the first inflight. This is a no-op if nothing is
117// inflight.
118func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) }
119
120// Full returns true if no more messages can be sent at the moment.
121func (in *Inflights) Full() bool {
122 return in.count == in.size
123}
124
125// Count returns the number of inflight messages.
126func (in *Inflights) Count() int { return in.count }
127
128// reset frees all inflights.
129func (in *Inflights) reset() {
130 in.count = 0
131 in.start = 0
132}