blob: 8fe9e5f512e52c4ac672565a2fa2150c85a42232 [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package grpcproxy
16
17import (
18 "sync"
19)
20
21type watchBroadcasts struct {
22 wp *watchProxy
23
24 // mu protects bcasts and watchers from the coalesce loop.
25 mu sync.Mutex
26 bcasts map[*watchBroadcast]struct{}
27 watchers map[*watcher]*watchBroadcast
28
29 updatec chan *watchBroadcast
30 donec chan struct{}
31}
32
33// maxCoalesceRecievers prevents a popular watchBroadcast from being coalseced.
34const maxCoalesceReceivers = 5
35
36func newWatchBroadcasts(wp *watchProxy) *watchBroadcasts {
37 wbs := &watchBroadcasts{
38 wp: wp,
39 bcasts: make(map[*watchBroadcast]struct{}),
40 watchers: make(map[*watcher]*watchBroadcast),
41 updatec: make(chan *watchBroadcast, 1),
42 donec: make(chan struct{}),
43 }
44 go func() {
45 defer close(wbs.donec)
46 for wb := range wbs.updatec {
47 wbs.coalesce(wb)
48 }
49 }()
50 return wbs
51}
52
53func (wbs *watchBroadcasts) coalesce(wb *watchBroadcast) {
54 if wb.size() >= maxCoalesceReceivers {
55 return
56 }
57 wbs.mu.Lock()
58 for wbswb := range wbs.bcasts {
59 if wbswb == wb {
60 continue
61 }
62 wb.mu.Lock()
63 wbswb.mu.Lock()
64 // 1. check if wbswb is behind wb so it won't skip any events in wb
65 // 2. ensure wbswb started; nextrev == 0 may mean wbswb is waiting
66 // for a current watcher and expects a create event from the server.
67 if wb.nextrev >= wbswb.nextrev && wbswb.responses > 0 {
68 for w := range wb.receivers {
69 wbswb.receivers[w] = struct{}{}
70 wbs.watchers[w] = wbswb
71 }
72 wb.receivers = nil
73 }
74 wbswb.mu.Unlock()
75 wb.mu.Unlock()
76 if wb.empty() {
77 delete(wbs.bcasts, wb)
78 wb.stop()
79 break
80 }
81 }
82 wbs.mu.Unlock()
83}
84
85func (wbs *watchBroadcasts) add(w *watcher) {
86 wbs.mu.Lock()
87 defer wbs.mu.Unlock()
88 // find fitting bcast
89 for wb := range wbs.bcasts {
90 if wb.add(w) {
91 wbs.watchers[w] = wb
92 return
93 }
94 }
95 // no fit; create a bcast
96 wb := newWatchBroadcast(wbs.wp, w, wbs.update)
97 wbs.watchers[w] = wb
98 wbs.bcasts[wb] = struct{}{}
99}
100
101// delete removes a watcher and returns the number of remaining watchers.
102func (wbs *watchBroadcasts) delete(w *watcher) int {
103 wbs.mu.Lock()
104 defer wbs.mu.Unlock()
105
106 wb, ok := wbs.watchers[w]
107 if !ok {
108 panic("deleting missing watcher from broadcasts")
109 }
110 delete(wbs.watchers, w)
111 wb.delete(w)
112 if wb.empty() {
113 delete(wbs.bcasts, wb)
114 wb.stop()
115 }
116 return len(wbs.bcasts)
117}
118
119func (wbs *watchBroadcasts) stop() {
120 wbs.mu.Lock()
121 for wb := range wbs.bcasts {
122 wb.stop()
123 }
124 wbs.bcasts = nil
125 close(wbs.updatec)
126 wbs.mu.Unlock()
127 <-wbs.donec
128}
129
130func (wbs *watchBroadcasts) update(wb *watchBroadcast) {
131 select {
132 case wbs.updatec <- wb:
133 default:
134 }
135}