Scott Baker | 611f6bd | 2019-10-18 13:45:19 -0700 | [diff] [blame] | 1 | // Copyright 2019 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package tracker |
| 16 | |
| 17 | // Inflights limits the number of MsgApp (represented by the largest index |
| 18 | // contained within) sent to followers but not yet acknowledged by them. Callers |
| 19 | // use Full() to check whether more messages can be sent, call Add() whenever |
| 20 | // they are sending a new append, and release "quota" via FreeLE() whenever an |
| 21 | // ack is received. |
| 22 | type Inflights struct { |
| 23 | // the starting index in the buffer |
| 24 | start int |
| 25 | // number of inflights in the buffer |
| 26 | count int |
| 27 | |
| 28 | // the size of the buffer |
| 29 | size int |
| 30 | |
| 31 | // buffer contains the index of the last entry |
| 32 | // inside one message. |
| 33 | buffer []uint64 |
| 34 | } |
| 35 | |
| 36 | // NewInflights sets up an Inflights that allows up to 'size' inflight messages. |
| 37 | func NewInflights(size int) *Inflights { |
| 38 | return &Inflights{ |
| 39 | size: size, |
| 40 | } |
| 41 | } |
| 42 | |
| 43 | // Clone returns an *Inflights that is identical to but shares no memory with |
| 44 | // the receiver. |
| 45 | func (in *Inflights) Clone() *Inflights { |
| 46 | ins := *in |
| 47 | ins.buffer = append([]uint64(nil), in.buffer...) |
| 48 | return &ins |
| 49 | } |
| 50 | |
| 51 | // Add notifies the Inflights that a new message with the given index is being |
| 52 | // dispatched. Full() must be called prior to Add() to verify that there is room |
| 53 | // for one more message, and consecutive calls to add Add() must provide a |
| 54 | // monotonic sequence of indexes. |
| 55 | func (in *Inflights) Add(inflight uint64) { |
| 56 | if in.Full() { |
| 57 | panic("cannot add into a Full inflights") |
| 58 | } |
| 59 | next := in.start + in.count |
| 60 | size := in.size |
| 61 | if next >= size { |
| 62 | next -= size |
| 63 | } |
| 64 | if next >= len(in.buffer) { |
| 65 | in.grow() |
| 66 | } |
| 67 | in.buffer[next] = inflight |
| 68 | in.count++ |
| 69 | } |
| 70 | |
| 71 | // grow the inflight buffer by doubling up to inflights.size. We grow on demand |
| 72 | // instead of preallocating to inflights.size to handle systems which have |
| 73 | // thousands of Raft groups per process. |
| 74 | func (in *Inflights) grow() { |
| 75 | newSize := len(in.buffer) * 2 |
| 76 | if newSize == 0 { |
| 77 | newSize = 1 |
| 78 | } else if newSize > in.size { |
| 79 | newSize = in.size |
| 80 | } |
| 81 | newBuffer := make([]uint64, newSize) |
| 82 | copy(newBuffer, in.buffer) |
| 83 | in.buffer = newBuffer |
| 84 | } |
| 85 | |
| 86 | // FreeLE frees the inflights smaller or equal to the given `to` flight. |
| 87 | func (in *Inflights) FreeLE(to uint64) { |
| 88 | if in.count == 0 || to < in.buffer[in.start] { |
| 89 | // out of the left side of the window |
| 90 | return |
| 91 | } |
| 92 | |
| 93 | idx := in.start |
| 94 | var i int |
| 95 | for i = 0; i < in.count; i++ { |
| 96 | if to < in.buffer[idx] { // found the first large inflight |
| 97 | break |
| 98 | } |
| 99 | |
| 100 | // increase index and maybe rotate |
| 101 | size := in.size |
| 102 | if idx++; idx >= size { |
| 103 | idx -= size |
| 104 | } |
| 105 | } |
| 106 | // free i inflights and set new start index |
| 107 | in.count -= i |
| 108 | in.start = idx |
| 109 | if in.count == 0 { |
| 110 | // inflights is empty, reset the start index so that we don't grow the |
| 111 | // buffer unnecessarily. |
| 112 | in.start = 0 |
| 113 | } |
| 114 | } |
| 115 | |
| 116 | // FreeFirstOne releases the first inflight. This is a no-op if nothing is |
| 117 | // inflight. |
| 118 | func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) } |
| 119 | |
| 120 | // Full returns true if no more messages can be sent at the moment. |
| 121 | func (in *Inflights) Full() bool { |
| 122 | return in.count == in.size |
| 123 | } |
| 124 | |
| 125 | // Count returns the number of inflight messages. |
| 126 | func (in *Inflights) Count() int { return in.count } |
| 127 | |
| 128 | // reset frees all inflights. |
| 129 | func (in *Inflights) reset() { |
| 130 | in.count = 0 |
| 131 | in.start = 0 |
| 132 | } |