khenaidoo | ffe076b | 2019-01-15 16:08:08 -0500 | [diff] [blame^] | 1 | // Copyright 2015 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package httpproxy |
| 16 | |
| 17 | import ( |
| 18 | "math/rand" |
| 19 | "net/url" |
| 20 | "sync" |
| 21 | "time" |
| 22 | ) |
| 23 | |
| 24 | // defaultRefreshInterval is the default proxyRefreshIntervalMs value |
| 25 | // as in etcdmain/config.go. |
| 26 | const defaultRefreshInterval = 30000 * time.Millisecond |
| 27 | |
| 28 | var once sync.Once |
| 29 | |
| 30 | func init() { |
| 31 | rand.Seed(time.Now().UnixNano()) |
| 32 | } |
| 33 | |
| 34 | func newDirector(urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director { |
| 35 | d := &director{ |
| 36 | uf: urlsFunc, |
| 37 | failureWait: failureWait, |
| 38 | } |
| 39 | d.refresh() |
| 40 | go func() { |
| 41 | // In order to prevent missing proxy endpoints in the first try: |
| 42 | // when given refresh interval of defaultRefreshInterval or greater |
| 43 | // and whenever there is no available proxy endpoints, |
| 44 | // give 1-second refreshInterval. |
| 45 | for { |
| 46 | es := d.endpoints() |
| 47 | ri := refreshInterval |
| 48 | if ri >= defaultRefreshInterval { |
| 49 | if len(es) == 0 { |
| 50 | ri = time.Second |
| 51 | } |
| 52 | } |
| 53 | if len(es) > 0 { |
| 54 | once.Do(func() { |
| 55 | var sl []string |
| 56 | for _, e := range es { |
| 57 | sl = append(sl, e.URL.String()) |
| 58 | } |
| 59 | plog.Infof("endpoints found %q", sl) |
| 60 | }) |
| 61 | } |
| 62 | time.Sleep(ri) |
| 63 | d.refresh() |
| 64 | } |
| 65 | }() |
| 66 | return d |
| 67 | } |
| 68 | |
| 69 | type director struct { |
| 70 | sync.Mutex |
| 71 | ep []*endpoint |
| 72 | uf GetProxyURLs |
| 73 | failureWait time.Duration |
| 74 | } |
| 75 | |
| 76 | func (d *director) refresh() { |
| 77 | urls := d.uf() |
| 78 | d.Lock() |
| 79 | defer d.Unlock() |
| 80 | var endpoints []*endpoint |
| 81 | for _, u := range urls { |
| 82 | uu, err := url.Parse(u) |
| 83 | if err != nil { |
| 84 | plog.Printf("upstream URL invalid: %v", err) |
| 85 | continue |
| 86 | } |
| 87 | endpoints = append(endpoints, newEndpoint(*uu, d.failureWait)) |
| 88 | } |
| 89 | |
| 90 | // shuffle array to avoid connections being "stuck" to a single endpoint |
| 91 | for i := range endpoints { |
| 92 | j := rand.Intn(i + 1) |
| 93 | endpoints[i], endpoints[j] = endpoints[j], endpoints[i] |
| 94 | } |
| 95 | |
| 96 | d.ep = endpoints |
| 97 | } |
| 98 | |
| 99 | func (d *director) endpoints() []*endpoint { |
| 100 | d.Lock() |
| 101 | defer d.Unlock() |
| 102 | filtered := make([]*endpoint, 0) |
| 103 | for _, ep := range d.ep { |
| 104 | if ep.Available { |
| 105 | filtered = append(filtered, ep) |
| 106 | } |
| 107 | } |
| 108 | |
| 109 | return filtered |
| 110 | } |
| 111 | |
| 112 | func newEndpoint(u url.URL, failureWait time.Duration) *endpoint { |
| 113 | ep := endpoint{ |
| 114 | URL: u, |
| 115 | Available: true, |
| 116 | failFunc: timedUnavailabilityFunc(failureWait), |
| 117 | } |
| 118 | |
| 119 | return &ep |
| 120 | } |
| 121 | |
| 122 | type endpoint struct { |
| 123 | sync.Mutex |
| 124 | |
| 125 | URL url.URL |
| 126 | Available bool |
| 127 | |
| 128 | failFunc func(ep *endpoint) |
| 129 | } |
| 130 | |
| 131 | func (ep *endpoint) Failed() { |
| 132 | ep.Lock() |
| 133 | if !ep.Available { |
| 134 | ep.Unlock() |
| 135 | return |
| 136 | } |
| 137 | |
| 138 | ep.Available = false |
| 139 | ep.Unlock() |
| 140 | |
| 141 | plog.Printf("marked endpoint %s unavailable", ep.URL.String()) |
| 142 | |
| 143 | if ep.failFunc == nil { |
| 144 | plog.Printf("no failFunc defined, endpoint %s will be unavailable forever.", ep.URL.String()) |
| 145 | return |
| 146 | } |
| 147 | |
| 148 | ep.failFunc(ep) |
| 149 | } |
| 150 | |
| 151 | func timedUnavailabilityFunc(wait time.Duration) func(*endpoint) { |
| 152 | return func(ep *endpoint) { |
| 153 | time.AfterFunc(wait, func() { |
| 154 | ep.Available = true |
| 155 | plog.Printf("marked endpoint %s available, to retest connectivity", ep.URL.String()) |
| 156 | }) |
| 157 | } |
| 158 | } |