| // Copyright 2019 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package tracker |
| |
| // Inflights limits the number of MsgApp (represented by the largest index |
| // contained within) sent to followers but not yet acknowledged by them. Callers |
| // use Full() to check whether more messages can be sent, call Add() whenever |
| // they are sending a new append, and release "quota" via FreeLE() whenever an |
| // ack is received. |
| type Inflights struct { |
| // the starting index in the buffer |
| start int |
| // number of inflights in the buffer |
| count int |
| |
| // the size of the buffer |
| size int |
| |
| // buffer contains the index of the last entry |
| // inside one message. |
| buffer []uint64 |
| } |
| |
| // NewInflights sets up an Inflights that allows up to 'size' inflight messages. |
| func NewInflights(size int) *Inflights { |
| return &Inflights{ |
| size: size, |
| } |
| } |
| |
| // Add notifies the Inflights that a new message with the given index is being |
| // dispatched. Full() must be called prior to Add() to verify that there is room |
| // for one more message, and consecutive calls to add Add() must provide a |
| // monotonic sequence of indexes. |
| func (in *Inflights) Add(inflight uint64) { |
| if in.Full() { |
| panic("cannot add into a Full inflights") |
| } |
| next := in.start + in.count |
| size := in.size |
| if next >= size { |
| next -= size |
| } |
| if next >= len(in.buffer) { |
| in.grow() |
| } |
| in.buffer[next] = inflight |
| in.count++ |
| } |
| |
| // grow the inflight buffer by doubling up to inflights.size. We grow on demand |
| // instead of preallocating to inflights.size to handle systems which have |
| // thousands of Raft groups per process. |
| func (in *Inflights) grow() { |
| newSize := len(in.buffer) * 2 |
| if newSize == 0 { |
| newSize = 1 |
| } else if newSize > in.size { |
| newSize = in.size |
| } |
| newBuffer := make([]uint64, newSize) |
| copy(newBuffer, in.buffer) |
| in.buffer = newBuffer |
| } |
| |
| // FreeLE frees the inflights smaller or equal to the given `to` flight. |
| func (in *Inflights) FreeLE(to uint64) { |
| if in.count == 0 || to < in.buffer[in.start] { |
| // out of the left side of the window |
| return |
| } |
| |
| idx := in.start |
| var i int |
| for i = 0; i < in.count; i++ { |
| if to < in.buffer[idx] { // found the first large inflight |
| break |
| } |
| |
| // increase index and maybe rotate |
| size := in.size |
| if idx++; idx >= size { |
| idx -= size |
| } |
| } |
| // free i inflights and set new start index |
| in.count -= i |
| in.start = idx |
| if in.count == 0 { |
| // inflights is empty, reset the start index so that we don't grow the |
| // buffer unnecessarily. |
| in.start = 0 |
| } |
| } |
| |
| // FreeFirstOne releases the first inflight. This is a no-op if nothing is |
| // inflight. |
| func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) } |
| |
| // Full returns true if no more messages can be sent at the moment. |
| func (in *Inflights) Full() bool { |
| return in.count == in.size |
| } |
| |
| // Count returns the number of inflight messages. |
| func (in *Inflights) Count() int { return in.count } |
| |
| // reset frees all inflights. |
| func (in *Inflights) reset() { |
| in.count = 0 |
| in.start = 0 |
| } |