blob: 3582ee4dae2e61cf4afb4da3a7d2f364a2f4bad2 [file] [log] [blame]
divyadesai81bb7ba2020-03-11 11:45:23 +00001package coordinate
2
3import (
4 "fmt"
5 "math"
6 "sort"
7 "sync"
8 "time"
9
10 "github.com/armon/go-metrics"
11)
12
13// Client manages the estimated network coordinate for a given node, and adjusts
14// it as the node observes round trip times and estimated coordinates from other
15// nodes. The core algorithm is based on Vivaldi, see the documentation for Config
16// for more details.
17type Client struct {
18 // coord is the current estimate of the client's network coordinate.
19 coord *Coordinate
20
21 // origin is a coordinate sitting at the origin.
22 origin *Coordinate
23
24 // config contains the tuning parameters that govern the performance of
25 // the algorithm.
26 config *Config
27
28 // adjustmentIndex is the current index into the adjustmentSamples slice.
29 adjustmentIndex uint
30
31 // adjustment is used to store samples for the adjustment calculation.
32 adjustmentSamples []float64
33
34 // latencyFilterSamples is used to store the last several RTT samples,
35 // keyed by node name. We will use the config's LatencyFilterSamples
36 // value to determine how many samples we keep, per node.
37 latencyFilterSamples map[string][]float64
38
39 // stats is used to record events that occur when updating coordinates.
40 stats ClientStats
41
42 // mutex enables safe concurrent access to the client.
43 mutex sync.RWMutex
44}
45
46// ClientStats is used to record events that occur when updating coordinates.
47type ClientStats struct {
48 // Resets is incremented any time we reset our local coordinate because
49 // our calculations have resulted in an invalid state.
50 Resets int
51}
52
53// NewClient creates a new Client and verifies the configuration is valid.
54func NewClient(config *Config) (*Client, error) {
55 if !(config.Dimensionality > 0) {
56 return nil, fmt.Errorf("dimensionality must be >0")
57 }
58
59 return &Client{
60 coord: NewCoordinate(config),
61 origin: NewCoordinate(config),
62 config: config,
63 adjustmentIndex: 0,
64 adjustmentSamples: make([]float64, config.AdjustmentWindowSize),
65 latencyFilterSamples: make(map[string][]float64),
66 }, nil
67}
68
69// GetCoordinate returns a copy of the coordinate for this client.
70func (c *Client) GetCoordinate() *Coordinate {
71 c.mutex.RLock()
72 defer c.mutex.RUnlock()
73
74 return c.coord.Clone()
75}
76
77// SetCoordinate forces the client's coordinate to a known state.
78func (c *Client) SetCoordinate(coord *Coordinate) error {
79 c.mutex.Lock()
80 defer c.mutex.Unlock()
81
82 if err := c.checkCoordinate(coord); err != nil {
83 return err
84 }
85
86 c.coord = coord.Clone()
87 return nil
88}
89
90// ForgetNode removes any client state for the given node.
91func (c *Client) ForgetNode(node string) {
92 c.mutex.Lock()
93 defer c.mutex.Unlock()
94
95 delete(c.latencyFilterSamples, node)
96}
97
98// Stats returns a copy of stats for the client.
99func (c *Client) Stats() ClientStats {
100 c.mutex.Lock()
101 defer c.mutex.Unlock()
102
103 return c.stats
104}
105
106// checkCoordinate returns an error if the coordinate isn't compatible with
107// this client, or if the coordinate itself isn't valid. This assumes the mutex
108// has been locked already.
109func (c *Client) checkCoordinate(coord *Coordinate) error {
110 if !c.coord.IsCompatibleWith(coord) {
111 return fmt.Errorf("dimensions aren't compatible")
112 }
113
114 if !coord.IsValid() {
115 return fmt.Errorf("coordinate is invalid")
116 }
117
118 return nil
119}
120
121// latencyFilter applies a simple moving median filter with a new sample for
122// a node. This assumes that the mutex has been locked already.
123func (c *Client) latencyFilter(node string, rttSeconds float64) float64 {
124 samples, ok := c.latencyFilterSamples[node]
125 if !ok {
126 samples = make([]float64, 0, c.config.LatencyFilterSize)
127 }
128
129 // Add the new sample and trim the list, if needed.
130 samples = append(samples, rttSeconds)
131 if len(samples) > int(c.config.LatencyFilterSize) {
132 samples = samples[1:]
133 }
134 c.latencyFilterSamples[node] = samples
135
136 // Sort a copy of the samples and return the median.
137 sorted := make([]float64, len(samples))
138 copy(sorted, samples)
139 sort.Float64s(sorted)
140 return sorted[len(sorted)/2]
141}
142
143// updateVivialdi updates the Vivaldi portion of the client's coordinate. This
144// assumes that the mutex has been locked already.
145func (c *Client) updateVivaldi(other *Coordinate, rttSeconds float64) {
146 const zeroThreshold = 1.0e-6
147
148 dist := c.coord.DistanceTo(other).Seconds()
149 if rttSeconds < zeroThreshold {
150 rttSeconds = zeroThreshold
151 }
152 wrongness := math.Abs(dist-rttSeconds) / rttSeconds
153
154 totalError := c.coord.Error + other.Error
155 if totalError < zeroThreshold {
156 totalError = zeroThreshold
157 }
158 weight := c.coord.Error / totalError
159
160 c.coord.Error = c.config.VivaldiCE*weight*wrongness + c.coord.Error*(1.0-c.config.VivaldiCE*weight)
161 if c.coord.Error > c.config.VivaldiErrorMax {
162 c.coord.Error = c.config.VivaldiErrorMax
163 }
164
165 delta := c.config.VivaldiCC * weight
166 force := delta * (rttSeconds - dist)
167 c.coord = c.coord.ApplyForce(c.config, force, other)
168}
169
170// updateAdjustment updates the adjustment portion of the client's coordinate, if
171// the feature is enabled. This assumes that the mutex has been locked already.
172func (c *Client) updateAdjustment(other *Coordinate, rttSeconds float64) {
173 if c.config.AdjustmentWindowSize == 0 {
174 return
175 }
176
177 // Note that the existing adjustment factors don't figure in to this
178 // calculation so we use the raw distance here.
179 dist := c.coord.rawDistanceTo(other)
180 c.adjustmentSamples[c.adjustmentIndex] = rttSeconds - dist
181 c.adjustmentIndex = (c.adjustmentIndex + 1) % c.config.AdjustmentWindowSize
182
183 sum := 0.0
184 for _, sample := range c.adjustmentSamples {
185 sum += sample
186 }
187 c.coord.Adjustment = sum / (2.0 * float64(c.config.AdjustmentWindowSize))
188}
189
190// updateGravity applies a small amount of gravity to pull coordinates towards
191// the center of the coordinate system to combat drift. This assumes that the
192// mutex is locked already.
193func (c *Client) updateGravity() {
194 dist := c.origin.DistanceTo(c.coord).Seconds()
195 force := -1.0 * math.Pow(dist/c.config.GravityRho, 2.0)
196 c.coord = c.coord.ApplyForce(c.config, force, c.origin)
197}
198
199// Update takes other, a coordinate for another node, and rtt, a round trip
200// time observation for a ping to that node, and updates the estimated position of
201// the client's coordinate. Returns the updated coordinate.
202func (c *Client) Update(node string, other *Coordinate, rtt time.Duration) (*Coordinate, error) {
203 c.mutex.Lock()
204 defer c.mutex.Unlock()
205
206 if err := c.checkCoordinate(other); err != nil {
207 return nil, err
208 }
209
210 // The code down below can handle zero RTTs, which we have seen in
211 // https://github.com/hashicorp/consul/issues/3789, presumably in
212 // environments with coarse-grained monotonic clocks (we are still
213 // trying to pin this down). In any event, this is ok from a code PoV
214 // so we don't need to alert operators with spammy messages. We did
215 // add a counter so this is still observable, though.
216 const maxRTT = 10 * time.Second
217 if rtt < 0 || rtt > maxRTT {
218 return nil, fmt.Errorf("round trip time not in valid range, duration %v is not a positive value less than %v ", rtt, maxRTT)
219 }
220 if rtt == 0 {
221 metrics.IncrCounter([]string{"serf", "coordinate", "zero-rtt"}, 1)
222 }
223
224 rttSeconds := c.latencyFilter(node, rtt.Seconds())
225 c.updateVivaldi(other, rttSeconds)
226 c.updateAdjustment(other, rttSeconds)
227 c.updateGravity()
228 if !c.coord.IsValid() {
229 c.stats.Resets++
230 c.coord = NewCoordinate(c.config)
231 }
232
233 return c.coord.Clone(), nil
234}
235
236// DistanceTo returns the estimated RTT from the client's coordinate to other, the
237// coordinate for another node.
238func (c *Client) DistanceTo(other *Coordinate) time.Duration {
239 c.mutex.RLock()
240 defer c.mutex.RUnlock()
241
242 return c.coord.DistanceTo(other)
243}