Dependencies for the affinity router and the
affinity routing daemon.

Change-Id: Icda72c3594ef7f8f0bc0c33dc03087a4c25529ca
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch_broadcasts.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch_broadcasts.go
new file mode 100644
index 0000000..8fe9e5f
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch_broadcasts.go
@@ -0,0 +1,135 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+	"sync"
+)
+
+type watchBroadcasts struct {
+	wp *watchProxy
+
+	// mu protects bcasts and watchers from the coalesce loop.
+	mu       sync.Mutex
+	bcasts   map[*watchBroadcast]struct{}
+	watchers map[*watcher]*watchBroadcast
+
+	updatec chan *watchBroadcast
+	donec   chan struct{}
+}
+
+// maxCoalesceRecievers prevents a popular watchBroadcast from being coalseced.
+const maxCoalesceReceivers = 5
+
+func newWatchBroadcasts(wp *watchProxy) *watchBroadcasts {
+	wbs := &watchBroadcasts{
+		wp:       wp,
+		bcasts:   make(map[*watchBroadcast]struct{}),
+		watchers: make(map[*watcher]*watchBroadcast),
+		updatec:  make(chan *watchBroadcast, 1),
+		donec:    make(chan struct{}),
+	}
+	go func() {
+		defer close(wbs.donec)
+		for wb := range wbs.updatec {
+			wbs.coalesce(wb)
+		}
+	}()
+	return wbs
+}
+
+func (wbs *watchBroadcasts) coalesce(wb *watchBroadcast) {
+	if wb.size() >= maxCoalesceReceivers {
+		return
+	}
+	wbs.mu.Lock()
+	for wbswb := range wbs.bcasts {
+		if wbswb == wb {
+			continue
+		}
+		wb.mu.Lock()
+		wbswb.mu.Lock()
+		// 1. check if wbswb is behind wb so it won't skip any events in wb
+		// 2. ensure wbswb started; nextrev == 0 may mean wbswb is waiting
+		// for a current watcher and expects a create event from the server.
+		if wb.nextrev >= wbswb.nextrev && wbswb.responses > 0 {
+			for w := range wb.receivers {
+				wbswb.receivers[w] = struct{}{}
+				wbs.watchers[w] = wbswb
+			}
+			wb.receivers = nil
+		}
+		wbswb.mu.Unlock()
+		wb.mu.Unlock()
+		if wb.empty() {
+			delete(wbs.bcasts, wb)
+			wb.stop()
+			break
+		}
+	}
+	wbs.mu.Unlock()
+}
+
+func (wbs *watchBroadcasts) add(w *watcher) {
+	wbs.mu.Lock()
+	defer wbs.mu.Unlock()
+	// find fitting bcast
+	for wb := range wbs.bcasts {
+		if wb.add(w) {
+			wbs.watchers[w] = wb
+			return
+		}
+	}
+	// no fit; create a bcast
+	wb := newWatchBroadcast(wbs.wp, w, wbs.update)
+	wbs.watchers[w] = wb
+	wbs.bcasts[wb] = struct{}{}
+}
+
+// delete removes a watcher and returns the number of remaining watchers.
+func (wbs *watchBroadcasts) delete(w *watcher) int {
+	wbs.mu.Lock()
+	defer wbs.mu.Unlock()
+
+	wb, ok := wbs.watchers[w]
+	if !ok {
+		panic("deleting missing watcher from broadcasts")
+	}
+	delete(wbs.watchers, w)
+	wb.delete(w)
+	if wb.empty() {
+		delete(wbs.bcasts, wb)
+		wb.stop()
+	}
+	return len(wbs.bcasts)
+}
+
+func (wbs *watchBroadcasts) stop() {
+	wbs.mu.Lock()
+	for wb := range wbs.bcasts {
+		wb.stop()
+	}
+	wbs.bcasts = nil
+	close(wbs.updatec)
+	wbs.mu.Unlock()
+	<-wbs.donec
+}
+
+func (wbs *watchBroadcasts) update(wb *watchBroadcast) {
+	select {
+	case wbs.updatec <- wb:
+	default:
+	}
+}