[VOL-2235] Mocks and interfaces for rw-core

This update consists of mocks that are used by the rw-core
during unit testing.  It also includes interfaces used for unit
tests.

Change-Id: I20ca1455c358113c3aa897acc6355e0ddbc614b7
diff --git a/vendor/go.etcd.io/etcd/client/client.go b/vendor/go.etcd.io/etcd/client/client.go
new file mode 100644
index 0000000..de9ab79
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/client/client.go
@@ -0,0 +1,710 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package client
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io/ioutil"
+	"math/rand"
+	"net"
+	"net/http"
+	"net/url"
+	"sort"
+	"strconv"
+	"sync"
+	"time"
+
+	"go.etcd.io/etcd/version"
+)
+
+var (
+	ErrNoEndpoints           = errors.New("client: no endpoints available")
+	ErrTooManyRedirects      = errors.New("client: too many redirects")
+	ErrClusterUnavailable    = errors.New("client: etcd cluster is unavailable or misconfigured")
+	ErrNoLeaderEndpoint      = errors.New("client: no leader endpoint available")
+	errTooManyRedirectChecks = errors.New("client: too many redirect checks")
+
+	// oneShotCtxValue is set on a context using WithValue(&oneShotValue) so
+	// that Do() will not retry a request
+	oneShotCtxValue interface{}
+)
+
+var DefaultRequestTimeout = 5 * time.Second
+
+var DefaultTransport CancelableTransport = &http.Transport{
+	Proxy: http.ProxyFromEnvironment,
+	Dial: (&net.Dialer{
+		Timeout:   30 * time.Second,
+		KeepAlive: 30 * time.Second,
+	}).Dial,
+	TLSHandshakeTimeout: 10 * time.Second,
+}
+
+type EndpointSelectionMode int
+
+const (
+	// EndpointSelectionRandom is the default value of the 'SelectionMode'.
+	// As the name implies, the client object will pick a node from the members
+	// of the cluster in a random fashion. If the cluster has three members, A, B,
+	// and C, the client picks any node from its three members as its request
+	// destination.
+	EndpointSelectionRandom EndpointSelectionMode = iota
+
+	// If 'SelectionMode' is set to 'EndpointSelectionPrioritizeLeader',
+	// requests are sent directly to the cluster leader. This reduces
+	// forwarding roundtrips compared to making requests to etcd followers
+	// who then forward them to the cluster leader. In the event of a leader
+	// failure, however, clients configured this way cannot prioritize among
+	// the remaining etcd followers. Therefore, when a client sets 'SelectionMode'
+	// to 'EndpointSelectionPrioritizeLeader', it must use 'client.AutoSync()' to
+	// maintain its knowledge of current cluster state.
+	//
+	// This mode should be used with Client.AutoSync().
+	EndpointSelectionPrioritizeLeader
+)
+
+type Config struct {
+	// Endpoints defines a set of URLs (schemes, hosts and ports only)
+	// that can be used to communicate with a logical etcd cluster. For
+	// example, a three-node cluster could be provided like so:
+	//
+	// 	Endpoints: []string{
+	//		"http://node1.example.com:2379",
+	//		"http://node2.example.com:2379",
+	//		"http://node3.example.com:2379",
+	//	}
+	//
+	// If multiple endpoints are provided, the Client will attempt to
+	// use them all in the event that one or more of them are unusable.
+	//
+	// If Client.Sync is ever called, the Client may cache an alternate
+	// set of endpoints to continue operation.
+	Endpoints []string
+
+	// Transport is used by the Client to drive HTTP requests. If not
+	// provided, DefaultTransport will be used.
+	Transport CancelableTransport
+
+	// CheckRedirect specifies the policy for handling HTTP redirects.
+	// If CheckRedirect is not nil, the Client calls it before
+	// following an HTTP redirect. The sole argument is the number of
+	// requests that have already been made. If CheckRedirect returns
+	// an error, Client.Do will not make any further requests and return
+	// the error back it to the caller.
+	//
+	// If CheckRedirect is nil, the Client uses its default policy,
+	// which is to stop after 10 consecutive requests.
+	CheckRedirect CheckRedirectFunc
+
+	// Username specifies the user credential to add as an authorization header
+	Username string
+
+	// Password is the password for the specified user to add as an authorization header
+	// to the request.
+	Password string
+
+	// HeaderTimeoutPerRequest specifies the time limit to wait for response
+	// header in a single request made by the Client. The timeout includes
+	// connection time, any redirects, and header wait time.
+	//
+	// For non-watch GET request, server returns the response body immediately.
+	// For PUT/POST/DELETE request, server will attempt to commit request
+	// before responding, which is expected to take `100ms + 2 * RTT`.
+	// For watch request, server returns the header immediately to notify Client
+	// watch start. But if server is behind some kind of proxy, the response
+	// header may be cached at proxy, and Client cannot rely on this behavior.
+	//
+	// Especially, wait request will ignore this timeout.
+	//
+	// One API call may send multiple requests to different etcd servers until it
+	// succeeds. Use context of the API to specify the overall timeout.
+	//
+	// A HeaderTimeoutPerRequest of zero means no timeout.
+	HeaderTimeoutPerRequest time.Duration
+
+	// SelectionMode is an EndpointSelectionMode enum that specifies the
+	// policy for choosing the etcd cluster node to which requests are sent.
+	SelectionMode EndpointSelectionMode
+}
+
+func (cfg *Config) transport() CancelableTransport {
+	if cfg.Transport == nil {
+		return DefaultTransport
+	}
+	return cfg.Transport
+}
+
+func (cfg *Config) checkRedirect() CheckRedirectFunc {
+	if cfg.CheckRedirect == nil {
+		return DefaultCheckRedirect
+	}
+	return cfg.CheckRedirect
+}
+
+// CancelableTransport mimics net/http.Transport, but requires that
+// the object also support request cancellation.
+type CancelableTransport interface {
+	http.RoundTripper
+	CancelRequest(req *http.Request)
+}
+
+type CheckRedirectFunc func(via int) error
+
+// DefaultCheckRedirect follows up to 10 redirects, but no more.
+var DefaultCheckRedirect CheckRedirectFunc = func(via int) error {
+	if via > 10 {
+		return ErrTooManyRedirects
+	}
+	return nil
+}
+
+type Client interface {
+	// Sync updates the internal cache of the etcd cluster's membership.
+	Sync(context.Context) error
+
+	// AutoSync periodically calls Sync() every given interval.
+	// The recommended sync interval is 10 seconds to 1 minute, which does
+	// not bring too much overhead to server and makes client catch up the
+	// cluster change in time.
+	//
+	// The example to use it:
+	//
+	//  for {
+	//      err := client.AutoSync(ctx, 10*time.Second)
+	//      if err == context.DeadlineExceeded || err == context.Canceled {
+	//          break
+	//      }
+	//      log.Print(err)
+	//  }
+	AutoSync(context.Context, time.Duration) error
+
+	// Endpoints returns a copy of the current set of API endpoints used
+	// by Client to resolve HTTP requests. If Sync has ever been called,
+	// this may differ from the initial Endpoints provided in the Config.
+	Endpoints() []string
+
+	// SetEndpoints sets the set of API endpoints used by Client to resolve
+	// HTTP requests. If the given endpoints are not valid, an error will be
+	// returned
+	SetEndpoints(eps []string) error
+
+	// GetVersion retrieves the current etcd server and cluster version
+	GetVersion(ctx context.Context) (*version.Versions, error)
+
+	httpClient
+}
+
+func New(cfg Config) (Client, error) {
+	c := &httpClusterClient{
+		clientFactory: newHTTPClientFactory(cfg.transport(), cfg.checkRedirect(), cfg.HeaderTimeoutPerRequest),
+		rand:          rand.New(rand.NewSource(int64(time.Now().Nanosecond()))),
+		selectionMode: cfg.SelectionMode,
+	}
+	if cfg.Username != "" {
+		c.credentials = &credentials{
+			username: cfg.Username,
+			password: cfg.Password,
+		}
+	}
+	if err := c.SetEndpoints(cfg.Endpoints); err != nil {
+		return nil, err
+	}
+	return c, nil
+}
+
+type httpClient interface {
+	Do(context.Context, httpAction) (*http.Response, []byte, error)
+}
+
+func newHTTPClientFactory(tr CancelableTransport, cr CheckRedirectFunc, headerTimeout time.Duration) httpClientFactory {
+	return func(ep url.URL) httpClient {
+		return &redirectFollowingHTTPClient{
+			checkRedirect: cr,
+			client: &simpleHTTPClient{
+				transport:     tr,
+				endpoint:      ep,
+				headerTimeout: headerTimeout,
+			},
+		}
+	}
+}
+
+type credentials struct {
+	username string
+	password string
+}
+
+type httpClientFactory func(url.URL) httpClient
+
+type httpAction interface {
+	HTTPRequest(url.URL) *http.Request
+}
+
+type httpClusterClient struct {
+	clientFactory httpClientFactory
+	endpoints     []url.URL
+	pinned        int
+	credentials   *credentials
+	sync.RWMutex
+	rand          *rand.Rand
+	selectionMode EndpointSelectionMode
+}
+
+func (c *httpClusterClient) getLeaderEndpoint(ctx context.Context, eps []url.URL) (string, error) {
+	ceps := make([]url.URL, len(eps))
+	copy(ceps, eps)
+
+	// To perform a lookup on the new endpoint list without using the current
+	// client, we'll copy it
+	clientCopy := &httpClusterClient{
+		clientFactory: c.clientFactory,
+		credentials:   c.credentials,
+		rand:          c.rand,
+
+		pinned:    0,
+		endpoints: ceps,
+	}
+
+	mAPI := NewMembersAPI(clientCopy)
+	leader, err := mAPI.Leader(ctx)
+	if err != nil {
+		return "", err
+	}
+	if len(leader.ClientURLs) == 0 {
+		return "", ErrNoLeaderEndpoint
+	}
+
+	return leader.ClientURLs[0], nil // TODO: how to handle multiple client URLs?
+}
+
+func (c *httpClusterClient) parseEndpoints(eps []string) ([]url.URL, error) {
+	if len(eps) == 0 {
+		return []url.URL{}, ErrNoEndpoints
+	}
+
+	neps := make([]url.URL, len(eps))
+	for i, ep := range eps {
+		u, err := url.Parse(ep)
+		if err != nil {
+			return []url.URL{}, err
+		}
+		neps[i] = *u
+	}
+	return neps, nil
+}
+
+func (c *httpClusterClient) SetEndpoints(eps []string) error {
+	neps, err := c.parseEndpoints(eps)
+	if err != nil {
+		return err
+	}
+
+	c.Lock()
+	defer c.Unlock()
+
+	c.endpoints = shuffleEndpoints(c.rand, neps)
+	// We're not doing anything for PrioritizeLeader here. This is
+	// due to not having a context meaning we can't call getLeaderEndpoint
+	// However, if you're using PrioritizeLeader, you've already been told
+	// to regularly call sync, where we do have a ctx, and can figure the
+	// leader. PrioritizeLeader is also quite a loose guarantee, so deal
+	// with it
+	c.pinned = 0
+
+	return nil
+}
+
+func (c *httpClusterClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
+	action := act
+	c.RLock()
+	leps := len(c.endpoints)
+	eps := make([]url.URL, leps)
+	n := copy(eps, c.endpoints)
+	pinned := c.pinned
+
+	if c.credentials != nil {
+		action = &authedAction{
+			act:         act,
+			credentials: *c.credentials,
+		}
+	}
+	c.RUnlock()
+
+	if leps == 0 {
+		return nil, nil, ErrNoEndpoints
+	}
+
+	if leps != n {
+		return nil, nil, errors.New("unable to pick endpoint: copy failed")
+	}
+
+	var resp *http.Response
+	var body []byte
+	var err error
+	cerr := &ClusterError{}
+	isOneShot := ctx.Value(&oneShotCtxValue) != nil
+
+	for i := pinned; i < leps+pinned; i++ {
+		k := i % leps
+		hc := c.clientFactory(eps[k])
+		resp, body, err = hc.Do(ctx, action)
+		if err != nil {
+			cerr.Errors = append(cerr.Errors, err)
+			if err == ctx.Err() {
+				return nil, nil, ctx.Err()
+			}
+			if err == context.Canceled || err == context.DeadlineExceeded {
+				return nil, nil, err
+			}
+		} else if resp.StatusCode/100 == 5 {
+			switch resp.StatusCode {
+			case http.StatusInternalServerError, http.StatusServiceUnavailable:
+				// TODO: make sure this is a no leader response
+				cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s has no leader", eps[k].String()))
+			default:
+				cerr.Errors = append(cerr.Errors, fmt.Errorf("client: etcd member %s returns server error [%s]", eps[k].String(), http.StatusText(resp.StatusCode)))
+			}
+			err = cerr.Errors[0]
+		}
+		if err != nil {
+			if !isOneShot {
+				continue
+			}
+			c.Lock()
+			c.pinned = (k + 1) % leps
+			c.Unlock()
+			return nil, nil, err
+		}
+		if k != pinned {
+			c.Lock()
+			c.pinned = k
+			c.Unlock()
+		}
+		return resp, body, nil
+	}
+
+	return nil, nil, cerr
+}
+
+func (c *httpClusterClient) Endpoints() []string {
+	c.RLock()
+	defer c.RUnlock()
+
+	eps := make([]string, len(c.endpoints))
+	for i, ep := range c.endpoints {
+		eps[i] = ep.String()
+	}
+
+	return eps
+}
+
+func (c *httpClusterClient) Sync(ctx context.Context) error {
+	mAPI := NewMembersAPI(c)
+	ms, err := mAPI.List(ctx)
+	if err != nil {
+		return err
+	}
+
+	var eps []string
+	for _, m := range ms {
+		eps = append(eps, m.ClientURLs...)
+	}
+
+	neps, err := c.parseEndpoints(eps)
+	if err != nil {
+		return err
+	}
+
+	npin := 0
+
+	switch c.selectionMode {
+	case EndpointSelectionRandom:
+		c.RLock()
+		eq := endpointsEqual(c.endpoints, neps)
+		c.RUnlock()
+
+		if eq {
+			return nil
+		}
+		// When items in the endpoint list changes, we choose a new pin
+		neps = shuffleEndpoints(c.rand, neps)
+	case EndpointSelectionPrioritizeLeader:
+		nle, err := c.getLeaderEndpoint(ctx, neps)
+		if err != nil {
+			return ErrNoLeaderEndpoint
+		}
+
+		for i, n := range neps {
+			if n.String() == nle {
+				npin = i
+				break
+			}
+		}
+	default:
+		return fmt.Errorf("invalid endpoint selection mode: %d", c.selectionMode)
+	}
+
+	c.Lock()
+	defer c.Unlock()
+	c.endpoints = neps
+	c.pinned = npin
+
+	return nil
+}
+
+func (c *httpClusterClient) AutoSync(ctx context.Context, interval time.Duration) error {
+	ticker := time.NewTicker(interval)
+	defer ticker.Stop()
+	for {
+		err := c.Sync(ctx)
+		if err != nil {
+			return err
+		}
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		case <-ticker.C:
+		}
+	}
+}
+
+func (c *httpClusterClient) GetVersion(ctx context.Context) (*version.Versions, error) {
+	act := &getAction{Prefix: "/version"}
+
+	resp, body, err := c.Do(ctx, act)
+	if err != nil {
+		return nil, err
+	}
+
+	switch resp.StatusCode {
+	case http.StatusOK:
+		if len(body) == 0 {
+			return nil, ErrEmptyBody
+		}
+		var vresp version.Versions
+		if err := json.Unmarshal(body, &vresp); err != nil {
+			return nil, ErrInvalidJSON
+		}
+		return &vresp, nil
+	default:
+		var etcdErr Error
+		if err := json.Unmarshal(body, &etcdErr); err != nil {
+			return nil, ErrInvalidJSON
+		}
+		return nil, etcdErr
+	}
+}
+
+type roundTripResponse struct {
+	resp *http.Response
+	err  error
+}
+
+type simpleHTTPClient struct {
+	transport     CancelableTransport
+	endpoint      url.URL
+	headerTimeout time.Duration
+}
+
+func (c *simpleHTTPClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
+	req := act.HTTPRequest(c.endpoint)
+
+	if err := printcURL(req); err != nil {
+		return nil, nil, err
+	}
+
+	isWait := false
+	if req != nil && req.URL != nil {
+		ws := req.URL.Query().Get("wait")
+		if len(ws) != 0 {
+			var err error
+			isWait, err = strconv.ParseBool(ws)
+			if err != nil {
+				return nil, nil, fmt.Errorf("wrong wait value %s (%v for %+v)", ws, err, req)
+			}
+		}
+	}
+
+	var hctx context.Context
+	var hcancel context.CancelFunc
+	if !isWait && c.headerTimeout > 0 {
+		hctx, hcancel = context.WithTimeout(ctx, c.headerTimeout)
+	} else {
+		hctx, hcancel = context.WithCancel(ctx)
+	}
+	defer hcancel()
+
+	reqcancel := requestCanceler(c.transport, req)
+
+	rtchan := make(chan roundTripResponse, 1)
+	go func() {
+		resp, err := c.transport.RoundTrip(req)
+		rtchan <- roundTripResponse{resp: resp, err: err}
+		close(rtchan)
+	}()
+
+	var resp *http.Response
+	var err error
+
+	select {
+	case rtresp := <-rtchan:
+		resp, err = rtresp.resp, rtresp.err
+	case <-hctx.Done():
+		// cancel and wait for request to actually exit before continuing
+		reqcancel()
+		rtresp := <-rtchan
+		resp = rtresp.resp
+		switch {
+		case ctx.Err() != nil:
+			err = ctx.Err()
+		case hctx.Err() != nil:
+			err = fmt.Errorf("client: endpoint %s exceeded header timeout", c.endpoint.String())
+		default:
+			panic("failed to get error from context")
+		}
+	}
+
+	// always check for resp nil-ness to deal with possible
+	// race conditions between channels above
+	defer func() {
+		if resp != nil {
+			resp.Body.Close()
+		}
+	}()
+
+	if err != nil {
+		return nil, nil, err
+	}
+
+	var body []byte
+	done := make(chan struct{})
+	go func() {
+		body, err = ioutil.ReadAll(resp.Body)
+		done <- struct{}{}
+	}()
+
+	select {
+	case <-ctx.Done():
+		resp.Body.Close()
+		<-done
+		return nil, nil, ctx.Err()
+	case <-done:
+	}
+
+	return resp, body, err
+}
+
+type authedAction struct {
+	act         httpAction
+	credentials credentials
+}
+
+func (a *authedAction) HTTPRequest(url url.URL) *http.Request {
+	r := a.act.HTTPRequest(url)
+	r.SetBasicAuth(a.credentials.username, a.credentials.password)
+	return r
+}
+
+type redirectFollowingHTTPClient struct {
+	client        httpClient
+	checkRedirect CheckRedirectFunc
+}
+
+func (r *redirectFollowingHTTPClient) Do(ctx context.Context, act httpAction) (*http.Response, []byte, error) {
+	next := act
+	for i := 0; i < 100; i++ {
+		if i > 0 {
+			if err := r.checkRedirect(i); err != nil {
+				return nil, nil, err
+			}
+		}
+		resp, body, err := r.client.Do(ctx, next)
+		if err != nil {
+			return nil, nil, err
+		}
+		if resp.StatusCode/100 == 3 {
+			hdr := resp.Header.Get("Location")
+			if hdr == "" {
+				return nil, nil, fmt.Errorf("location header not set")
+			}
+			loc, err := url.Parse(hdr)
+			if err != nil {
+				return nil, nil, fmt.Errorf("location header not valid URL: %s", hdr)
+			}
+			next = &redirectedHTTPAction{
+				action:   act,
+				location: *loc,
+			}
+			continue
+		}
+		return resp, body, nil
+	}
+
+	return nil, nil, errTooManyRedirectChecks
+}
+
+type redirectedHTTPAction struct {
+	action   httpAction
+	location url.URL
+}
+
+func (r *redirectedHTTPAction) HTTPRequest(ep url.URL) *http.Request {
+	orig := r.action.HTTPRequest(ep)
+	orig.URL = &r.location
+	return orig
+}
+
+func shuffleEndpoints(r *rand.Rand, eps []url.URL) []url.URL {
+	// copied from Go 1.9<= rand.Rand.Perm
+	n := len(eps)
+	p := make([]int, n)
+	for i := 0; i < n; i++ {
+		j := r.Intn(i + 1)
+		p[i] = p[j]
+		p[j] = i
+	}
+	neps := make([]url.URL, n)
+	for i, k := range p {
+		neps[i] = eps[k]
+	}
+	return neps
+}
+
+func endpointsEqual(left, right []url.URL) bool {
+	if len(left) != len(right) {
+		return false
+	}
+
+	sLeft := make([]string, len(left))
+	sRight := make([]string, len(right))
+	for i, l := range left {
+		sLeft[i] = l.String()
+	}
+	for i, r := range right {
+		sRight[i] = r.String()
+	}
+
+	sort.Strings(sLeft)
+	sort.Strings(sRight)
+	for i := range sLeft {
+		if sLeft[i] != sRight[i] {
+			return false
+		}
+	}
+	return true
+}