Dependencies for the affinity router and the
affinity routing daemon.

Change-Id: Icda72c3594ef7f8f0bc0c33dc03087a4c25529ca
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch.go
new file mode 100644
index 0000000..603095f
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch.go
@@ -0,0 +1,298 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+	"context"
+	"sync"
+
+	"github.com/coreos/etcd/clientv3"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc"
+	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/metadata"
+)
+
+type watchProxy struct {
+	cw  clientv3.Watcher
+	ctx context.Context
+
+	leader *leader
+
+	ranges *watchRanges
+
+	// mu protects adding outstanding watch servers through wg.
+	mu sync.Mutex
+
+	// wg waits until all outstanding watch servers quit.
+	wg sync.WaitGroup
+
+	// kv is used for permission checking
+	kv clientv3.KV
+}
+
+func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
+	cctx, cancel := context.WithCancel(c.Ctx())
+	wp := &watchProxy{
+		cw:     c.Watcher,
+		ctx:    cctx,
+		leader: newLeader(c.Ctx(), c.Watcher),
+
+		kv: c.KV, // for permission checking
+	}
+	wp.ranges = newWatchRanges(wp)
+	ch := make(chan struct{})
+	go func() {
+		defer close(ch)
+		<-wp.leader.stopNotify()
+		wp.mu.Lock()
+		select {
+		case <-wp.ctx.Done():
+		case <-wp.leader.disconnectNotify():
+			cancel()
+		}
+		<-wp.ctx.Done()
+		wp.mu.Unlock()
+		wp.wg.Wait()
+		wp.ranges.stop()
+	}()
+	return wp, ch
+}
+
+func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
+	wp.mu.Lock()
+	select {
+	case <-wp.ctx.Done():
+		wp.mu.Unlock()
+		select {
+		case <-wp.leader.disconnectNotify():
+			return grpc.ErrClientConnClosing
+		default:
+			return wp.ctx.Err()
+		}
+	default:
+		wp.wg.Add(1)
+	}
+	wp.mu.Unlock()
+
+	ctx, cancel := context.WithCancel(stream.Context())
+	wps := &watchProxyStream{
+		ranges:   wp.ranges,
+		watchers: make(map[int64]*watcher),
+		stream:   stream,
+		watchCh:  make(chan *pb.WatchResponse, 1024),
+		ctx:      ctx,
+		cancel:   cancel,
+		kv:       wp.kv,
+	}
+
+	var lostLeaderC <-chan struct{}
+	if md, ok := metadata.FromOutgoingContext(stream.Context()); ok {
+		v := md[rpctypes.MetadataRequireLeaderKey]
+		if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
+			lostLeaderC = wp.leader.lostNotify()
+			// if leader is known to be lost at creation time, avoid
+			// letting events through at all
+			select {
+			case <-lostLeaderC:
+				wp.wg.Done()
+				return rpctypes.ErrNoLeader
+			default:
+			}
+		}
+	}
+
+	// post to stopc => terminate server stream; can't use a waitgroup
+	// since all goroutines will only terminate after Watch() exits.
+	stopc := make(chan struct{}, 3)
+	go func() {
+		defer func() { stopc <- struct{}{} }()
+		wps.recvLoop()
+	}()
+	go func() {
+		defer func() { stopc <- struct{}{} }()
+		wps.sendLoop()
+	}()
+	// tear down watch if leader goes down or entire watch proxy is terminated
+	go func() {
+		defer func() { stopc <- struct{}{} }()
+		select {
+		case <-lostLeaderC:
+		case <-ctx.Done():
+		case <-wp.ctx.Done():
+		}
+	}()
+
+	<-stopc
+	cancel()
+
+	// recv/send may only shutdown after function exits;
+	// goroutine notifies proxy that stream is through
+	go func() {
+		<-stopc
+		<-stopc
+		wps.close()
+		wp.wg.Done()
+	}()
+
+	select {
+	case <-lostLeaderC:
+		return rpctypes.ErrNoLeader
+	case <-wp.leader.disconnectNotify():
+		return grpc.ErrClientConnClosing
+	default:
+		return wps.ctx.Err()
+	}
+}
+
+// watchProxyStream forwards etcd watch events to a proxied client stream.
+type watchProxyStream struct {
+	ranges *watchRanges
+
+	// mu protects watchers and nextWatcherID
+	mu sync.Mutex
+	// watchers receive events from watch broadcast.
+	watchers map[int64]*watcher
+	// nextWatcherID is the id to assign the next watcher on this stream.
+	nextWatcherID int64
+
+	stream pb.Watch_WatchServer
+
+	// watchCh receives watch responses from the watchers.
+	watchCh chan *pb.WatchResponse
+
+	ctx    context.Context
+	cancel context.CancelFunc
+
+	// kv is used for permission checking
+	kv clientv3.KV
+}
+
+func (wps *watchProxyStream) close() {
+	var wg sync.WaitGroup
+	wps.cancel()
+	wps.mu.Lock()
+	wg.Add(len(wps.watchers))
+	for _, wpsw := range wps.watchers {
+		go func(w *watcher) {
+			wps.ranges.delete(w)
+			wg.Done()
+		}(wpsw)
+	}
+	wps.watchers = nil
+	wps.mu.Unlock()
+
+	wg.Wait()
+
+	close(wps.watchCh)
+}
+
+func (wps *watchProxyStream) checkPermissionForWatch(key, rangeEnd []byte) error {
+	if len(key) == 0 {
+		// If the length of the key is 0, we need to obtain full range.
+		// look at clientv3.WithPrefix()
+		key = []byte{0}
+		rangeEnd = []byte{0}
+	}
+	req := &pb.RangeRequest{
+		Serializable: true,
+		Key:          key,
+		RangeEnd:     rangeEnd,
+		CountOnly:    true,
+		Limit:        1,
+	}
+	_, err := wps.kv.Do(wps.ctx, RangeRequestToOp(req))
+	return err
+}
+
+func (wps *watchProxyStream) recvLoop() error {
+	for {
+		req, err := wps.stream.Recv()
+		if err != nil {
+			return err
+		}
+		switch uv := req.RequestUnion.(type) {
+		case *pb.WatchRequest_CreateRequest:
+			cr := uv.CreateRequest
+
+			if err = wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil && err == rpctypes.ErrPermissionDenied {
+				// Return WatchResponse which is caused by permission checking if and only if
+				// the error is permission denied. For other errors (e.g. timeout or connection closed),
+				// the permission checking mechanism should do nothing for preserving error code.
+				wps.watchCh <- &pb.WatchResponse{Header: &pb.ResponseHeader{}, WatchId: -1, Created: true, Canceled: true}
+				continue
+			}
+
+			w := &watcher{
+				wr:  watchRange{string(cr.Key), string(cr.RangeEnd)},
+				id:  wps.nextWatcherID,
+				wps: wps,
+
+				nextrev:  cr.StartRevision,
+				progress: cr.ProgressNotify,
+				prevKV:   cr.PrevKv,
+				filters:  v3rpc.FiltersFromRequest(cr),
+			}
+			if !w.wr.valid() {
+				w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true})
+				continue
+			}
+			wps.nextWatcherID++
+			w.nextrev = cr.StartRevision
+			wps.watchers[w.id] = w
+			wps.ranges.add(w)
+		case *pb.WatchRequest_CancelRequest:
+			wps.delete(uv.CancelRequest.WatchId)
+		default:
+			panic("not implemented")
+		}
+	}
+}
+
+func (wps *watchProxyStream) sendLoop() {
+	for {
+		select {
+		case wresp, ok := <-wps.watchCh:
+			if !ok {
+				return
+			}
+			if err := wps.stream.Send(wresp); err != nil {
+				return
+			}
+		case <-wps.ctx.Done():
+			return
+		}
+	}
+}
+
+func (wps *watchProxyStream) delete(id int64) {
+	wps.mu.Lock()
+	defer wps.mu.Unlock()
+
+	w, ok := wps.watchers[id]
+	if !ok {
+		return
+	}
+	wps.ranges.delete(w)
+	delete(wps.watchers, id)
+	resp := &pb.WatchResponse{
+		Header:   &w.lastHeader,
+		WatchId:  id,
+		Canceled: true,
+	}
+	wps.watchCh <- resp
+}