| // Copyright 2016 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package grpcproxy |
| |
| import ( |
| "context" |
| "sync" |
| |
| "github.com/coreos/etcd/clientv3" |
| pb "github.com/coreos/etcd/etcdserver/etcdserverpb" |
| ) |
| |
| // watchBroadcast broadcasts a server watcher to many client watchers. |
| type watchBroadcast struct { |
| // cancel stops the underlying etcd server watcher and closes ch. |
| cancel context.CancelFunc |
| donec chan struct{} |
| |
| // mu protects rev and receivers. |
| mu sync.RWMutex |
| // nextrev is the minimum expected next revision of the watcher on ch. |
| nextrev int64 |
| // receivers contains all the client-side watchers to serve. |
| receivers map[*watcher]struct{} |
| // responses counts the number of responses |
| responses int |
| } |
| |
| func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast)) *watchBroadcast { |
| cctx, cancel := context.WithCancel(wp.ctx) |
| wb := &watchBroadcast{ |
| cancel: cancel, |
| nextrev: w.nextrev, |
| receivers: make(map[*watcher]struct{}), |
| donec: make(chan struct{}), |
| } |
| wb.add(w) |
| go func() { |
| defer close(wb.donec) |
| |
| opts := []clientv3.OpOption{ |
| clientv3.WithRange(w.wr.end), |
| clientv3.WithProgressNotify(), |
| clientv3.WithRev(wb.nextrev), |
| clientv3.WithPrevKV(), |
| clientv3.WithCreatedNotify(), |
| } |
| |
| cctx = withClientAuthToken(cctx, w.wps.stream.Context()) |
| |
| wch := wp.cw.Watch(cctx, w.wr.key, opts...) |
| |
| for wr := range wch { |
| wb.bcast(wr) |
| update(wb) |
| } |
| }() |
| return wb |
| } |
| |
| func (wb *watchBroadcast) bcast(wr clientv3.WatchResponse) { |
| wb.mu.Lock() |
| defer wb.mu.Unlock() |
| // watchers start on the given revision, if any; ignore header rev on create |
| if wb.responses > 0 || wb.nextrev == 0 { |
| wb.nextrev = wr.Header.Revision + 1 |
| } |
| wb.responses++ |
| for r := range wb.receivers { |
| r.send(wr) |
| } |
| if len(wb.receivers) > 0 { |
| eventsCoalescing.Add(float64(len(wb.receivers) - 1)) |
| } |
| } |
| |
| // add puts a watcher into receiving a broadcast if its revision at least |
| // meets the broadcast revision. Returns true if added. |
| func (wb *watchBroadcast) add(w *watcher) bool { |
| wb.mu.Lock() |
| defer wb.mu.Unlock() |
| if wb.nextrev > w.nextrev || (wb.nextrev == 0 && w.nextrev != 0) { |
| // wb is too far ahead, w will miss events |
| // or wb is being established with a current watcher |
| return false |
| } |
| if wb.responses == 0 { |
| // Newly created; create event will be sent by etcd. |
| wb.receivers[w] = struct{}{} |
| return true |
| } |
| // already sent by etcd; emulate create event |
| ok := w.post(&pb.WatchResponse{ |
| Header: &pb.ResponseHeader{ |
| // todo: fill in ClusterId |
| // todo: fill in MemberId: |
| Revision: w.nextrev, |
| // todo: fill in RaftTerm: |
| }, |
| WatchId: w.id, |
| Created: true, |
| }) |
| if !ok { |
| return false |
| } |
| wb.receivers[w] = struct{}{} |
| watchersCoalescing.Inc() |
| |
| return true |
| } |
| func (wb *watchBroadcast) delete(w *watcher) { |
| wb.mu.Lock() |
| defer wb.mu.Unlock() |
| if _, ok := wb.receivers[w]; !ok { |
| panic("deleting missing watcher from broadcast") |
| } |
| delete(wb.receivers, w) |
| if len(wb.receivers) > 0 { |
| // do not dec the only left watcher for coalescing. |
| watchersCoalescing.Dec() |
| } |
| } |
| |
| func (wb *watchBroadcast) size() int { |
| wb.mu.RLock() |
| defer wb.mu.RUnlock() |
| return len(wb.receivers) |
| } |
| |
| func (wb *watchBroadcast) empty() bool { return wb.size() == 0 } |
| |
| func (wb *watchBroadcast) stop() { |
| if !wb.empty() { |
| // do not dec the only left watcher for coalescing. |
| watchersCoalescing.Sub(float64(wb.size() - 1)) |
| } |
| |
| wb.cancel() |
| <-wb.donec |
| } |