Dependencies for the affinity router and the
affinity routing daemon.
Change-Id: Icda72c3594ef7f8f0bc0c33dc03087a4c25529ca
diff --git a/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch_broadcasts.go b/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch_broadcasts.go
new file mode 100644
index 0000000..8fe9e5f
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/proxy/grpcproxy/watch_broadcasts.go
@@ -0,0 +1,135 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package grpcproxy
+
+import (
+ "sync"
+)
+
+type watchBroadcasts struct {
+ wp *watchProxy
+
+ // mu protects bcasts and watchers from the coalesce loop.
+ mu sync.Mutex
+ bcasts map[*watchBroadcast]struct{}
+ watchers map[*watcher]*watchBroadcast
+
+ updatec chan *watchBroadcast
+ donec chan struct{}
+}
+
+// maxCoalesceRecievers prevents a popular watchBroadcast from being coalseced.
+const maxCoalesceReceivers = 5
+
+func newWatchBroadcasts(wp *watchProxy) *watchBroadcasts {
+ wbs := &watchBroadcasts{
+ wp: wp,
+ bcasts: make(map[*watchBroadcast]struct{}),
+ watchers: make(map[*watcher]*watchBroadcast),
+ updatec: make(chan *watchBroadcast, 1),
+ donec: make(chan struct{}),
+ }
+ go func() {
+ defer close(wbs.donec)
+ for wb := range wbs.updatec {
+ wbs.coalesce(wb)
+ }
+ }()
+ return wbs
+}
+
+func (wbs *watchBroadcasts) coalesce(wb *watchBroadcast) {
+ if wb.size() >= maxCoalesceReceivers {
+ return
+ }
+ wbs.mu.Lock()
+ for wbswb := range wbs.bcasts {
+ if wbswb == wb {
+ continue
+ }
+ wb.mu.Lock()
+ wbswb.mu.Lock()
+ // 1. check if wbswb is behind wb so it won't skip any events in wb
+ // 2. ensure wbswb started; nextrev == 0 may mean wbswb is waiting
+ // for a current watcher and expects a create event from the server.
+ if wb.nextrev >= wbswb.nextrev && wbswb.responses > 0 {
+ for w := range wb.receivers {
+ wbswb.receivers[w] = struct{}{}
+ wbs.watchers[w] = wbswb
+ }
+ wb.receivers = nil
+ }
+ wbswb.mu.Unlock()
+ wb.mu.Unlock()
+ if wb.empty() {
+ delete(wbs.bcasts, wb)
+ wb.stop()
+ break
+ }
+ }
+ wbs.mu.Unlock()
+}
+
+func (wbs *watchBroadcasts) add(w *watcher) {
+ wbs.mu.Lock()
+ defer wbs.mu.Unlock()
+ // find fitting bcast
+ for wb := range wbs.bcasts {
+ if wb.add(w) {
+ wbs.watchers[w] = wb
+ return
+ }
+ }
+ // no fit; create a bcast
+ wb := newWatchBroadcast(wbs.wp, w, wbs.update)
+ wbs.watchers[w] = wb
+ wbs.bcasts[wb] = struct{}{}
+}
+
+// delete removes a watcher and returns the number of remaining watchers.
+func (wbs *watchBroadcasts) delete(w *watcher) int {
+ wbs.mu.Lock()
+ defer wbs.mu.Unlock()
+
+ wb, ok := wbs.watchers[w]
+ if !ok {
+ panic("deleting missing watcher from broadcasts")
+ }
+ delete(wbs.watchers, w)
+ wb.delete(w)
+ if wb.empty() {
+ delete(wbs.bcasts, wb)
+ wb.stop()
+ }
+ return len(wbs.bcasts)
+}
+
+func (wbs *watchBroadcasts) stop() {
+ wbs.mu.Lock()
+ for wb := range wbs.bcasts {
+ wb.stop()
+ }
+ wbs.bcasts = nil
+ close(wbs.updatec)
+ wbs.mu.Unlock()
+ <-wbs.donec
+}
+
+func (wbs *watchBroadcasts) update(wb *watchBroadcast) {
+ select {
+ case wbs.updatec <- wb:
+ default:
+ }
+}