Dependencies for the affinity router and the
affinity routing daemon.
Change-Id: Icda72c3594ef7f8f0bc0c33dc03087a4c25529ca
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch.go
new file mode 100644
index 0000000..603095f
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch.go
@@ -0,0 +1,298 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+ "context"
+ "sync"
+
+ "github.com/coreos/etcd/clientv3"
+ "github.com/coreos/etcd/etcdserver/api/v3rpc"
+ "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/metadata"
+)
+
+type watchProxy struct {
+ cw clientv3.Watcher
+ ctx context.Context
+
+ leader *leader
+
+ ranges *watchRanges
+
+ // mu protects adding outstanding watch servers through wg.
+ mu sync.Mutex
+
+ // wg waits until all outstanding watch servers quit.
+ wg sync.WaitGroup
+
+ // kv is used for permission checking
+ kv clientv3.KV
+}
+
+func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) {
+ cctx, cancel := context.WithCancel(c.Ctx())
+ wp := &watchProxy{
+ cw: c.Watcher,
+ ctx: cctx,
+ leader: newLeader(c.Ctx(), c.Watcher),
+
+ kv: c.KV, // for permission checking
+ }
+ wp.ranges = newWatchRanges(wp)
+ ch := make(chan struct{})
+ go func() {
+ defer close(ch)
+ <-wp.leader.stopNotify()
+ wp.mu.Lock()
+ select {
+ case <-wp.ctx.Done():
+ case <-wp.leader.disconnectNotify():
+ cancel()
+ }
+ <-wp.ctx.Done()
+ wp.mu.Unlock()
+ wp.wg.Wait()
+ wp.ranges.stop()
+ }()
+ return wp, ch
+}
+
+func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) {
+ wp.mu.Lock()
+ select {
+ case <-wp.ctx.Done():
+ wp.mu.Unlock()
+ select {
+ case <-wp.leader.disconnectNotify():
+ return grpc.ErrClientConnClosing
+ default:
+ return wp.ctx.Err()
+ }
+ default:
+ wp.wg.Add(1)
+ }
+ wp.mu.Unlock()
+
+ ctx, cancel := context.WithCancel(stream.Context())
+ wps := &watchProxyStream{
+ ranges: wp.ranges,
+ watchers: make(map[int64]*watcher),
+ stream: stream,
+ watchCh: make(chan *pb.WatchResponse, 1024),
+ ctx: ctx,
+ cancel: cancel,
+ kv: wp.kv,
+ }
+
+ var lostLeaderC <-chan struct{}
+ if md, ok := metadata.FromOutgoingContext(stream.Context()); ok {
+ v := md[rpctypes.MetadataRequireLeaderKey]
+ if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
+ lostLeaderC = wp.leader.lostNotify()
+ // if leader is known to be lost at creation time, avoid
+ // letting events through at all
+ select {
+ case <-lostLeaderC:
+ wp.wg.Done()
+ return rpctypes.ErrNoLeader
+ default:
+ }
+ }
+ }
+
+ // post to stopc => terminate server stream; can't use a waitgroup
+ // since all goroutines will only terminate after Watch() exits.
+ stopc := make(chan struct{}, 3)
+ go func() {
+ defer func() { stopc <- struct{}{} }()
+ wps.recvLoop()
+ }()
+ go func() {
+ defer func() { stopc <- struct{}{} }()
+ wps.sendLoop()
+ }()
+ // tear down watch if leader goes down or entire watch proxy is terminated
+ go func() {
+ defer func() { stopc <- struct{}{} }()
+ select {
+ case <-lostLeaderC:
+ case <-ctx.Done():
+ case <-wp.ctx.Done():
+ }
+ }()
+
+ <-stopc
+ cancel()
+
+ // recv/send may only shutdown after function exits;
+ // goroutine notifies proxy that stream is through
+ go func() {
+ <-stopc
+ <-stopc
+ wps.close()
+ wp.wg.Done()
+ }()
+
+ select {
+ case <-lostLeaderC:
+ return rpctypes.ErrNoLeader
+ case <-wp.leader.disconnectNotify():
+ return grpc.ErrClientConnClosing
+ default:
+ return wps.ctx.Err()
+ }
+}
+
+// watchProxyStream forwards etcd watch events to a proxied client stream.
+type watchProxyStream struct {
+ ranges *watchRanges
+
+ // mu protects watchers and nextWatcherID
+ mu sync.Mutex
+ // watchers receive events from watch broadcast.
+ watchers map[int64]*watcher
+ // nextWatcherID is the id to assign the next watcher on this stream.
+ nextWatcherID int64
+
+ stream pb.Watch_WatchServer
+
+ // watchCh receives watch responses from the watchers.
+ watchCh chan *pb.WatchResponse
+
+ ctx context.Context
+ cancel context.CancelFunc
+
+ // kv is used for permission checking
+ kv clientv3.KV
+}
+
+func (wps *watchProxyStream) close() {
+ var wg sync.WaitGroup
+ wps.cancel()
+ wps.mu.Lock()
+ wg.Add(len(wps.watchers))
+ for _, wpsw := range wps.watchers {
+ go func(w *watcher) {
+ wps.ranges.delete(w)
+ wg.Done()
+ }(wpsw)
+ }
+ wps.watchers = nil
+ wps.mu.Unlock()
+
+ wg.Wait()
+
+ close(wps.watchCh)
+}
+
+func (wps *watchProxyStream) checkPermissionForWatch(key, rangeEnd []byte) error {
+ if len(key) == 0 {
+ // If the length of the key is 0, we need to obtain full range.
+ // look at clientv3.WithPrefix()
+ key = []byte{0}
+ rangeEnd = []byte{0}
+ }
+ req := &pb.RangeRequest{
+ Serializable: true,
+ Key: key,
+ RangeEnd: rangeEnd,
+ CountOnly: true,
+ Limit: 1,
+ }
+ _, err := wps.kv.Do(wps.ctx, RangeRequestToOp(req))
+ return err
+}
+
+func (wps *watchProxyStream) recvLoop() error {
+ for {
+ req, err := wps.stream.Recv()
+ if err != nil {
+ return err
+ }
+ switch uv := req.RequestUnion.(type) {
+ case *pb.WatchRequest_CreateRequest:
+ cr := uv.CreateRequest
+
+ if err = wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil && err == rpctypes.ErrPermissionDenied {
+ // Return WatchResponse which is caused by permission checking if and only if
+ // the error is permission denied. For other errors (e.g. timeout or connection closed),
+ // the permission checking mechanism should do nothing for preserving error code.
+ wps.watchCh <- &pb.WatchResponse{Header: &pb.ResponseHeader{}, WatchId: -1, Created: true, Canceled: true}
+ continue
+ }
+
+ w := &watcher{
+ wr: watchRange{string(cr.Key), string(cr.RangeEnd)},
+ id: wps.nextWatcherID,
+ wps: wps,
+
+ nextrev: cr.StartRevision,
+ progress: cr.ProgressNotify,
+ prevKV: cr.PrevKv,
+ filters: v3rpc.FiltersFromRequest(cr),
+ }
+ if !w.wr.valid() {
+ w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true})
+ continue
+ }
+ wps.nextWatcherID++
+ w.nextrev = cr.StartRevision
+ wps.watchers[w.id] = w
+ wps.ranges.add(w)
+ case *pb.WatchRequest_CancelRequest:
+ wps.delete(uv.CancelRequest.WatchId)
+ default:
+ panic("not implemented")
+ }
+ }
+}
+
+func (wps *watchProxyStream) sendLoop() {
+ for {
+ select {
+ case wresp, ok := <-wps.watchCh:
+ if !ok {
+ return
+ }
+ if err := wps.stream.Send(wresp); err != nil {
+ return
+ }
+ case <-wps.ctx.Done():
+ return
+ }
+ }
+}
+
+func (wps *watchProxyStream) delete(id int64) {
+ wps.mu.Lock()
+ defer wps.mu.Unlock()
+
+ w, ok := wps.watchers[id]
+ if !ok {
+ return
+ }
+ wps.ranges.delete(w)
+ delete(wps.watchers, id)
+ resp := &pb.WatchResponse{
+ Header: &w.lastHeader,
+ WatchId: id,
+ Canceled: true,
+ }
+ wps.watchCh <- resp
+}