blob: d414501335dd18283af45f38c29950a0c8cec21d [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package httpproxy
16
17import (
18 "math/rand"
19 "net/url"
20 "sync"
21 "time"
22)
23
24// defaultRefreshInterval is the default proxyRefreshIntervalMs value
25// as in etcdmain/config.go.
26const defaultRefreshInterval = 30000 * time.Millisecond
27
28var once sync.Once
29
30func init() {
31 rand.Seed(time.Now().UnixNano())
32}
33
34func newDirector(urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director {
35 d := &director{
36 uf: urlsFunc,
37 failureWait: failureWait,
38 }
39 d.refresh()
40 go func() {
41 // In order to prevent missing proxy endpoints in the first try:
42 // when given refresh interval of defaultRefreshInterval or greater
43 // and whenever there is no available proxy endpoints,
44 // give 1-second refreshInterval.
45 for {
46 es := d.endpoints()
47 ri := refreshInterval
48 if ri >= defaultRefreshInterval {
49 if len(es) == 0 {
50 ri = time.Second
51 }
52 }
53 if len(es) > 0 {
54 once.Do(func() {
55 var sl []string
56 for _, e := range es {
57 sl = append(sl, e.URL.String())
58 }
59 plog.Infof("endpoints found %q", sl)
60 })
61 }
62 time.Sleep(ri)
63 d.refresh()
64 }
65 }()
66 return d
67}
68
69type director struct {
70 sync.Mutex
71 ep []*endpoint
72 uf GetProxyURLs
73 failureWait time.Duration
74}
75
76func (d *director) refresh() {
77 urls := d.uf()
78 d.Lock()
79 defer d.Unlock()
80 var endpoints []*endpoint
81 for _, u := range urls {
82 uu, err := url.Parse(u)
83 if err != nil {
84 plog.Printf("upstream URL invalid: %v", err)
85 continue
86 }
87 endpoints = append(endpoints, newEndpoint(*uu, d.failureWait))
88 }
89
90 // shuffle array to avoid connections being "stuck" to a single endpoint
91 for i := range endpoints {
92 j := rand.Intn(i + 1)
93 endpoints[i], endpoints[j] = endpoints[j], endpoints[i]
94 }
95
96 d.ep = endpoints
97}
98
99func (d *director) endpoints() []*endpoint {
100 d.Lock()
101 defer d.Unlock()
102 filtered := make([]*endpoint, 0)
103 for _, ep := range d.ep {
104 if ep.Available {
105 filtered = append(filtered, ep)
106 }
107 }
108
109 return filtered
110}
111
112func newEndpoint(u url.URL, failureWait time.Duration) *endpoint {
113 ep := endpoint{
114 URL: u,
115 Available: true,
116 failFunc: timedUnavailabilityFunc(failureWait),
117 }
118
119 return &ep
120}
121
122type endpoint struct {
123 sync.Mutex
124
125 URL url.URL
126 Available bool
127
128 failFunc func(ep *endpoint)
129}
130
131func (ep *endpoint) Failed() {
132 ep.Lock()
133 if !ep.Available {
134 ep.Unlock()
135 return
136 }
137
138 ep.Available = false
139 ep.Unlock()
140
141 plog.Printf("marked endpoint %s unavailable", ep.URL.String())
142
143 if ep.failFunc == nil {
144 plog.Printf("no failFunc defined, endpoint %s will be unavailable forever.", ep.URL.String())
145 return
146 }
147
148 ep.failFunc(ep)
149}
150
151func timedUnavailabilityFunc(wait time.Duration) func(*endpoint) {
152 return func(ep *endpoint) {
153 time.AfterFunc(wait, func() {
154 ep.Available = true
155 plog.Printf("marked endpoint %s available, to retest connectivity", ep.URL.String())
156 })
157 }
158}