| // Copyright 2016 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package grpcproxy |
| |
| import ( |
| "sync" |
| ) |
| |
| type watchBroadcasts struct { |
| wp *watchProxy |
| |
| // mu protects bcasts and watchers from the coalesce loop. |
| mu sync.Mutex |
| bcasts map[*watchBroadcast]struct{} |
| watchers map[*watcher]*watchBroadcast |
| |
| updatec chan *watchBroadcast |
| donec chan struct{} |
| } |
| |
| // maxCoalesceRecievers prevents a popular watchBroadcast from being coalseced. |
| const maxCoalesceReceivers = 5 |
| |
| func newWatchBroadcasts(wp *watchProxy) *watchBroadcasts { |
| wbs := &watchBroadcasts{ |
| wp: wp, |
| bcasts: make(map[*watchBroadcast]struct{}), |
| watchers: make(map[*watcher]*watchBroadcast), |
| updatec: make(chan *watchBroadcast, 1), |
| donec: make(chan struct{}), |
| } |
| go func() { |
| defer close(wbs.donec) |
| for wb := range wbs.updatec { |
| wbs.coalesce(wb) |
| } |
| }() |
| return wbs |
| } |
| |
| func (wbs *watchBroadcasts) coalesce(wb *watchBroadcast) { |
| if wb.size() >= maxCoalesceReceivers { |
| return |
| } |
| wbs.mu.Lock() |
| for wbswb := range wbs.bcasts { |
| if wbswb == wb { |
| continue |
| } |
| wb.mu.Lock() |
| wbswb.mu.Lock() |
| // 1. check if wbswb is behind wb so it won't skip any events in wb |
| // 2. ensure wbswb started; nextrev == 0 may mean wbswb is waiting |
| // for a current watcher and expects a create event from the server. |
| if wb.nextrev >= wbswb.nextrev && wbswb.responses > 0 { |
| for w := range wb.receivers { |
| wbswb.receivers[w] = struct{}{} |
| wbs.watchers[w] = wbswb |
| } |
| wb.receivers = nil |
| } |
| wbswb.mu.Unlock() |
| wb.mu.Unlock() |
| if wb.empty() { |
| delete(wbs.bcasts, wb) |
| wb.stop() |
| break |
| } |
| } |
| wbs.mu.Unlock() |
| } |
| |
| func (wbs *watchBroadcasts) add(w *watcher) { |
| wbs.mu.Lock() |
| defer wbs.mu.Unlock() |
| // find fitting bcast |
| for wb := range wbs.bcasts { |
| if wb.add(w) { |
| wbs.watchers[w] = wb |
| return |
| } |
| } |
| // no fit; create a bcast |
| wb := newWatchBroadcast(wbs.wp, w, wbs.update) |
| wbs.watchers[w] = wb |
| wbs.bcasts[wb] = struct{}{} |
| } |
| |
| // delete removes a watcher and returns the number of remaining watchers. |
| func (wbs *watchBroadcasts) delete(w *watcher) int { |
| wbs.mu.Lock() |
| defer wbs.mu.Unlock() |
| |
| wb, ok := wbs.watchers[w] |
| if !ok { |
| panic("deleting missing watcher from broadcasts") |
| } |
| delete(wbs.watchers, w) |
| wb.delete(w) |
| if wb.empty() { |
| delete(wbs.bcasts, wb) |
| wb.stop() |
| } |
| return len(wbs.bcasts) |
| } |
| |
| func (wbs *watchBroadcasts) stop() { |
| wbs.mu.Lock() |
| for wb := range wbs.bcasts { |
| wb.stop() |
| } |
| wbs.bcasts = nil |
| close(wbs.updatec) |
| wbs.mu.Unlock() |
| <-wbs.donec |
| } |
| |
| func (wbs *watchBroadcasts) update(wb *watchBroadcast) { |
| select { |
| case wbs.updatec <- wb: |
| default: |
| } |
| } |