| // Copyright 2015 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package httpproxy |
| |
| import ( |
| "math/rand" |
| "net/url" |
| "sync" |
| "time" |
| ) |
| |
| // defaultRefreshInterval is the default proxyRefreshIntervalMs value |
| // as in etcdmain/config.go. |
| const defaultRefreshInterval = 30000 * time.Millisecond |
| |
| var once sync.Once |
| |
| func init() { |
| rand.Seed(time.Now().UnixNano()) |
| } |
| |
| func newDirector(urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director { |
| d := &director{ |
| uf: urlsFunc, |
| failureWait: failureWait, |
| } |
| d.refresh() |
| go func() { |
| // In order to prevent missing proxy endpoints in the first try: |
| // when given refresh interval of defaultRefreshInterval or greater |
| // and whenever there is no available proxy endpoints, |
| // give 1-second refreshInterval. |
| for { |
| es := d.endpoints() |
| ri := refreshInterval |
| if ri >= defaultRefreshInterval { |
| if len(es) == 0 { |
| ri = time.Second |
| } |
| } |
| if len(es) > 0 { |
| once.Do(func() { |
| var sl []string |
| for _, e := range es { |
| sl = append(sl, e.URL.String()) |
| } |
| plog.Infof("endpoints found %q", sl) |
| }) |
| } |
| time.Sleep(ri) |
| d.refresh() |
| } |
| }() |
| return d |
| } |
| |
| type director struct { |
| sync.Mutex |
| ep []*endpoint |
| uf GetProxyURLs |
| failureWait time.Duration |
| } |
| |
| func (d *director) refresh() { |
| urls := d.uf() |
| d.Lock() |
| defer d.Unlock() |
| var endpoints []*endpoint |
| for _, u := range urls { |
| uu, err := url.Parse(u) |
| if err != nil { |
| plog.Printf("upstream URL invalid: %v", err) |
| continue |
| } |
| endpoints = append(endpoints, newEndpoint(*uu, d.failureWait)) |
| } |
| |
| // shuffle array to avoid connections being "stuck" to a single endpoint |
| for i := range endpoints { |
| j := rand.Intn(i + 1) |
| endpoints[i], endpoints[j] = endpoints[j], endpoints[i] |
| } |
| |
| d.ep = endpoints |
| } |
| |
| func (d *director) endpoints() []*endpoint { |
| d.Lock() |
| defer d.Unlock() |
| filtered := make([]*endpoint, 0) |
| for _, ep := range d.ep { |
| if ep.Available { |
| filtered = append(filtered, ep) |
| } |
| } |
| |
| return filtered |
| } |
| |
| func newEndpoint(u url.URL, failureWait time.Duration) *endpoint { |
| ep := endpoint{ |
| URL: u, |
| Available: true, |
| failFunc: timedUnavailabilityFunc(failureWait), |
| } |
| |
| return &ep |
| } |
| |
| type endpoint struct { |
| sync.Mutex |
| |
| URL url.URL |
| Available bool |
| |
| failFunc func(ep *endpoint) |
| } |
| |
| func (ep *endpoint) Failed() { |
| ep.Lock() |
| if !ep.Available { |
| ep.Unlock() |
| return |
| } |
| |
| ep.Available = false |
| ep.Unlock() |
| |
| plog.Printf("marked endpoint %s unavailable", ep.URL.String()) |
| |
| if ep.failFunc == nil { |
| plog.Printf("no failFunc defined, endpoint %s will be unavailable forever.", ep.URL.String()) |
| return |
| } |
| |
| ep.failFunc(ep) |
| } |
| |
| func timedUnavailabilityFunc(wait time.Duration) func(*endpoint) { |
| return func(ep *endpoint) { |
| time.AfterFunc(wait, func() { |
| ep.Available = true |
| plog.Printf("marked endpoint %s available, to retest connectivity", ep.URL.String()) |
| }) |
| } |
| } |