Dependencies for the affinity router and the
affinity routing daemon.
Change-Id: Icda72c3594ef7f8f0bc0c33dc03087a4c25529ca
diff --git a/vendor/github.com/coreos/etcd/proxy/httpproxy/director.go b/vendor/github.com/coreos/etcd/proxy/httpproxy/director.go
new file mode 100644
index 0000000..d414501
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/httpproxy/director.go
@@ -0,0 +1,158 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package httpproxy
+
+import (
+ "math/rand"
+ "net/url"
+ "sync"
+ "time"
+)
+
+// defaultRefreshInterval is the default proxyRefreshIntervalMs value
+// as in etcdmain/config.go.
+const defaultRefreshInterval = 30000 * time.Millisecond
+
+var once sync.Once
+
+func init() {
+ rand.Seed(time.Now().UnixNano())
+}
+
+func newDirector(urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director {
+ d := &director{
+ uf: urlsFunc,
+ failureWait: failureWait,
+ }
+ d.refresh()
+ go func() {
+ // In order to prevent missing proxy endpoints in the first try:
+ // when given refresh interval of defaultRefreshInterval or greater
+ // and whenever there is no available proxy endpoints,
+ // give 1-second refreshInterval.
+ for {
+ es := d.endpoints()
+ ri := refreshInterval
+ if ri >= defaultRefreshInterval {
+ if len(es) == 0 {
+ ri = time.Second
+ }
+ }
+ if len(es) > 0 {
+ once.Do(func() {
+ var sl []string
+ for _, e := range es {
+ sl = append(sl, e.URL.String())
+ }
+ plog.Infof("endpoints found %q", sl)
+ })
+ }
+ time.Sleep(ri)
+ d.refresh()
+ }
+ }()
+ return d
+}
+
+type director struct {
+ sync.Mutex
+ ep []*endpoint
+ uf GetProxyURLs
+ failureWait time.Duration
+}
+
+func (d *director) refresh() {
+ urls := d.uf()
+ d.Lock()
+ defer d.Unlock()
+ var endpoints []*endpoint
+ for _, u := range urls {
+ uu, err := url.Parse(u)
+ if err != nil {
+ plog.Printf("upstream URL invalid: %v", err)
+ continue
+ }
+ endpoints = append(endpoints, newEndpoint(*uu, d.failureWait))
+ }
+
+ // shuffle array to avoid connections being "stuck" to a single endpoint
+ for i := range endpoints {
+ j := rand.Intn(i + 1)
+ endpoints[i], endpoints[j] = endpoints[j], endpoints[i]
+ }
+
+ d.ep = endpoints
+}
+
+func (d *director) endpoints() []*endpoint {
+ d.Lock()
+ defer d.Unlock()
+ filtered := make([]*endpoint, 0)
+ for _, ep := range d.ep {
+ if ep.Available {
+ filtered = append(filtered, ep)
+ }
+ }
+
+ return filtered
+}
+
+func newEndpoint(u url.URL, failureWait time.Duration) *endpoint {
+ ep := endpoint{
+ URL: u,
+ Available: true,
+ failFunc: timedUnavailabilityFunc(failureWait),
+ }
+
+ return &ep
+}
+
+type endpoint struct {
+ sync.Mutex
+
+ URL url.URL
+ Available bool
+
+ failFunc func(ep *endpoint)
+}
+
+func (ep *endpoint) Failed() {
+ ep.Lock()
+ if !ep.Available {
+ ep.Unlock()
+ return
+ }
+
+ ep.Available = false
+ ep.Unlock()
+
+ plog.Printf("marked endpoint %s unavailable", ep.URL.String())
+
+ if ep.failFunc == nil {
+ plog.Printf("no failFunc defined, endpoint %s will be unavailable forever.", ep.URL.String())
+ return
+ }
+
+ ep.failFunc(ep)
+}
+
+func timedUnavailabilityFunc(wait time.Duration) func(*endpoint) {
+ return func(ep *endpoint) {
+ time.AfterFunc(wait, func() {
+ ep.Available = true
+ plog.Printf("marked endpoint %s available, to retest connectivity", ep.URL.String())
+ })
+ }
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/httpproxy/doc.go b/vendor/github.com/coreos/etcd/proxy/httpproxy/doc.go
new file mode 100644
index 0000000..7a45099
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/httpproxy/doc.go
@@ -0,0 +1,18 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package httpproxy implements etcd httpproxy. The etcd proxy acts as a reverse
+// http proxy forwarding client requests to active etcd cluster members, and does
+// not participate in consensus.
+package httpproxy
diff --git a/vendor/github.com/coreos/etcd/proxy/httpproxy/metrics.go b/vendor/github.com/coreos/etcd/proxy/httpproxy/metrics.go
new file mode 100644
index 0000000..f71258c
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/httpproxy/metrics.go
@@ -0,0 +1,88 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package httpproxy
+
+import (
+ "net/http"
+ "strconv"
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+var (
+ requestsIncoming = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "proxy",
+ Name: "requests_total",
+ Help: "Counter requests incoming by method.",
+ }, []string{"method"})
+
+ requestsHandled = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "proxy",
+ Name: "handled_total",
+ Help: "Counter of requests fully handled (by authoratitave servers)",
+ }, []string{"method", "code"})
+
+ requestsDropped = prometheus.NewCounterVec(
+ prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "proxy",
+ Name: "dropped_total",
+ Help: "Counter of requests dropped on the proxy.",
+ }, []string{"method", "proxying_error"})
+
+ requestsHandlingTime = prometheus.NewHistogramVec(
+ prometheus.HistogramOpts{
+ Namespace: "etcd",
+ Subsystem: "proxy",
+ Name: "handling_duration_seconds",
+ Help: "Bucketed histogram of handling time of successful events (non-watches), by method " +
+ "(GET/PUT etc.).",
+ Buckets: prometheus.ExponentialBuckets(0.0005, 2, 13),
+ }, []string{"method"})
+)
+
+type forwardingError string
+
+const (
+ zeroEndpoints forwardingError = "zero_endpoints"
+ failedSendingRequest forwardingError = "failed_sending_request"
+ failedGettingResponse forwardingError = "failed_getting_response"
+)
+
+func init() {
+ prometheus.MustRegister(requestsIncoming)
+ prometheus.MustRegister(requestsHandled)
+ prometheus.MustRegister(requestsDropped)
+ prometheus.MustRegister(requestsHandlingTime)
+}
+
+func reportIncomingRequest(request *http.Request) {
+ requestsIncoming.WithLabelValues(request.Method).Inc()
+}
+
+func reportRequestHandled(request *http.Request, response *http.Response, startTime time.Time) {
+ method := request.Method
+ requestsHandled.WithLabelValues(method, strconv.Itoa(response.StatusCode)).Inc()
+ requestsHandlingTime.WithLabelValues(method).Observe(time.Since(startTime).Seconds())
+}
+
+func reportRequestDropped(request *http.Request, err forwardingError) {
+ requestsDropped.WithLabelValues(request.Method, string(err)).Inc()
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/httpproxy/proxy.go b/vendor/github.com/coreos/etcd/proxy/httpproxy/proxy.go
new file mode 100644
index 0000000..3cd3161
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/httpproxy/proxy.go
@@ -0,0 +1,116 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package httpproxy
+
+import (
+ "encoding/json"
+ "net/http"
+ "strings"
+ "time"
+
+ "golang.org/x/net/http2"
+)
+
+const (
+ // DefaultMaxIdleConnsPerHost indicates the default maximum idle connection
+ // count maintained between proxy and each member. We set it to 128 to
+ // let proxy handle 128 concurrent requests in long term smoothly.
+ // If the number of concurrent requests is bigger than this value,
+ // proxy needs to create one new connection when handling each request in
+ // the delta, which is bad because the creation consumes resource and
+ // may eat up ephemeral ports.
+ DefaultMaxIdleConnsPerHost = 128
+)
+
+// GetProxyURLs is a function which should return the current set of URLs to
+// which client requests should be proxied. This function will be queried
+// periodically by the proxy Handler to refresh the set of available
+// backends.
+type GetProxyURLs func() []string
+
+// NewHandler creates a new HTTP handler, listening on the given transport,
+// which will proxy requests to an etcd cluster.
+// The handler will periodically update its view of the cluster.
+func NewHandler(t *http.Transport, urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) http.Handler {
+ if t.TLSClientConfig != nil {
+ // Enable http2, see Issue 5033.
+ err := http2.ConfigureTransport(t)
+ if err != nil {
+ plog.Infof("Error enabling Transport HTTP/2 support: %v", err)
+ }
+ }
+
+ p := &reverseProxy{
+ director: newDirector(urlsFunc, failureWait, refreshInterval),
+ transport: t,
+ }
+
+ mux := http.NewServeMux()
+ mux.Handle("/", p)
+ mux.HandleFunc("/v2/config/local/proxy", p.configHandler)
+
+ return mux
+}
+
+// NewReadonlyHandler wraps the given HTTP handler to allow only GET requests
+func NewReadonlyHandler(hdlr http.Handler) http.Handler {
+ readonly := readonlyHandlerFunc(hdlr)
+ return http.HandlerFunc(readonly)
+}
+
+func readonlyHandlerFunc(next http.Handler) func(http.ResponseWriter, *http.Request) {
+ return func(w http.ResponseWriter, req *http.Request) {
+ if req.Method != "GET" {
+ w.WriteHeader(http.StatusNotImplemented)
+ return
+ }
+
+ next.ServeHTTP(w, req)
+ }
+}
+
+func (p *reverseProxy) configHandler(w http.ResponseWriter, r *http.Request) {
+ if !allowMethod(w, r.Method, "GET") {
+ return
+ }
+
+ eps := p.director.endpoints()
+ epstr := make([]string, len(eps))
+ for i, e := range eps {
+ epstr[i] = e.URL.String()
+ }
+
+ proxyConfig := struct {
+ Endpoints []string `json:"endpoints"`
+ }{
+ Endpoints: epstr,
+ }
+
+ json.NewEncoder(w).Encode(proxyConfig)
+}
+
+// allowMethod verifies that the given method is one of the allowed methods,
+// and if not, it writes an error to w. A boolean is returned indicating
+// whether or not the method is allowed.
+func allowMethod(w http.ResponseWriter, m string, ms ...string) bool {
+ for _, meth := range ms {
+ if m == meth {
+ return true
+ }
+ }
+ w.Header().Set("Allow", strings.Join(ms, ","))
+ http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
+ return false
+}
diff --git a/vendor/github.com/coreos/etcd/proxy/httpproxy/reverse.go b/vendor/github.com/coreos/etcd/proxy/httpproxy/reverse.go
new file mode 100644
index 0000000..2ecff3a
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/httpproxy/reverse.go
@@ -0,0 +1,208 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package httpproxy
+
+import (
+ "bytes"
+ "context"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net"
+ "net/http"
+ "net/url"
+ "strings"
+ "sync/atomic"
+ "time"
+
+ "github.com/coreos/etcd/etcdserver/api/v2http/httptypes"
+ "github.com/coreos/pkg/capnslog"
+)
+
+var (
+ plog = capnslog.NewPackageLogger("github.com/coreos/etcd", "proxy/httpproxy")
+
+ // Hop-by-hop headers. These are removed when sent to the backend.
+ // http://www.w3.org/Protocols/rfc2616/rfc2616-sec13.html
+ // This list of headers borrowed from stdlib httputil.ReverseProxy
+ singleHopHeaders = []string{
+ "Connection",
+ "Keep-Alive",
+ "Proxy-Authenticate",
+ "Proxy-Authorization",
+ "Te", // canonicalized version of "TE"
+ "Trailers",
+ "Transfer-Encoding",
+ "Upgrade",
+ }
+)
+
+func removeSingleHopHeaders(hdrs *http.Header) {
+ for _, h := range singleHopHeaders {
+ hdrs.Del(h)
+ }
+}
+
+type reverseProxy struct {
+ director *director
+ transport http.RoundTripper
+}
+
+func (p *reverseProxy) ServeHTTP(rw http.ResponseWriter, clientreq *http.Request) {
+ reportIncomingRequest(clientreq)
+ proxyreq := new(http.Request)
+ *proxyreq = *clientreq
+ startTime := time.Now()
+
+ var (
+ proxybody []byte
+ err error
+ )
+
+ if clientreq.Body != nil {
+ proxybody, err = ioutil.ReadAll(clientreq.Body)
+ if err != nil {
+ msg := fmt.Sprintf("failed to read request body: %v", err)
+ plog.Println(msg)
+ e := httptypes.NewHTTPError(http.StatusInternalServerError, "httpproxy: "+msg)
+ if we := e.WriteTo(rw); we != nil {
+ plog.Debugf("error writing HTTPError (%v) to %s", we, clientreq.RemoteAddr)
+ }
+ return
+ }
+ }
+
+ // deep-copy the headers, as these will be modified below
+ proxyreq.Header = make(http.Header)
+ copyHeader(proxyreq.Header, clientreq.Header)
+
+ normalizeRequest(proxyreq)
+ removeSingleHopHeaders(&proxyreq.Header)
+ maybeSetForwardedFor(proxyreq)
+
+ endpoints := p.director.endpoints()
+ if len(endpoints) == 0 {
+ msg := "zero endpoints currently available"
+ reportRequestDropped(clientreq, zeroEndpoints)
+
+ // TODO: limit the rate of the error logging.
+ plog.Println(msg)
+ e := httptypes.NewHTTPError(http.StatusServiceUnavailable, "httpproxy: "+msg)
+ if we := e.WriteTo(rw); we != nil {
+ plog.Debugf("error writing HTTPError (%v) to %s", we, clientreq.RemoteAddr)
+ }
+ return
+ }
+
+ var requestClosed int32
+ completeCh := make(chan bool, 1)
+ closeNotifier, ok := rw.(http.CloseNotifier)
+ ctx, cancel := context.WithCancel(context.Background())
+ proxyreq = proxyreq.WithContext(ctx)
+ defer cancel()
+ if ok {
+ closeCh := closeNotifier.CloseNotify()
+ go func() {
+ select {
+ case <-closeCh:
+ atomic.StoreInt32(&requestClosed, 1)
+ plog.Printf("client %v closed request prematurely", clientreq.RemoteAddr)
+ cancel()
+ case <-completeCh:
+ }
+ }()
+
+ defer func() {
+ completeCh <- true
+ }()
+ }
+
+ var res *http.Response
+
+ for _, ep := range endpoints {
+ if proxybody != nil {
+ proxyreq.Body = ioutil.NopCloser(bytes.NewBuffer(proxybody))
+ }
+ redirectRequest(proxyreq, ep.URL)
+
+ res, err = p.transport.RoundTrip(proxyreq)
+ if atomic.LoadInt32(&requestClosed) == 1 {
+ return
+ }
+ if err != nil {
+ reportRequestDropped(clientreq, failedSendingRequest)
+ plog.Printf("failed to direct request to %s: %v", ep.URL.String(), err)
+ ep.Failed()
+ continue
+ }
+
+ break
+ }
+
+ if res == nil {
+ // TODO: limit the rate of the error logging.
+ msg := fmt.Sprintf("unable to get response from %d endpoint(s)", len(endpoints))
+ reportRequestDropped(clientreq, failedGettingResponse)
+ plog.Println(msg)
+ e := httptypes.NewHTTPError(http.StatusBadGateway, "httpproxy: "+msg)
+ if we := e.WriteTo(rw); we != nil {
+ plog.Debugf("error writing HTTPError (%v) to %s", we, clientreq.RemoteAddr)
+ }
+ return
+ }
+
+ defer res.Body.Close()
+ reportRequestHandled(clientreq, res, startTime)
+ removeSingleHopHeaders(&res.Header)
+ copyHeader(rw.Header(), res.Header)
+
+ rw.WriteHeader(res.StatusCode)
+ io.Copy(rw, res.Body)
+}
+
+func copyHeader(dst, src http.Header) {
+ for k, vv := range src {
+ for _, v := range vv {
+ dst.Add(k, v)
+ }
+ }
+}
+
+func redirectRequest(req *http.Request, loc url.URL) {
+ req.URL.Scheme = loc.Scheme
+ req.URL.Host = loc.Host
+}
+
+func normalizeRequest(req *http.Request) {
+ req.Proto = "HTTP/1.1"
+ req.ProtoMajor = 1
+ req.ProtoMinor = 1
+ req.Close = false
+}
+
+func maybeSetForwardedFor(req *http.Request) {
+ clientIP, _, err := net.SplitHostPort(req.RemoteAddr)
+ if err != nil {
+ return
+ }
+
+ // If we aren't the first proxy retain prior
+ // X-Forwarded-For information as a comma+space
+ // separated list and fold multiple headers into one.
+ if prior, ok := req.Header["X-Forwarded-For"]; ok {
+ clientIP = strings.Join(prior, ", ") + ", " + clientIP
+ }
+ req.Header.Set("X-Forwarded-For", clientIP)
+}