sslobodr | d046be8 | 2019-01-16 10:02:22 -0500 | [diff] [blame] | 1 | /* |
| 2 | Copyright 2014 The Kubernetes Authors. |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | package watch |
| 18 | |
| 19 | import ( |
| 20 | "sync" |
| 21 | |
| 22 | "k8s.io/apimachinery/pkg/runtime" |
| 23 | "k8s.io/apimachinery/pkg/runtime/schema" |
| 24 | ) |
| 25 | |
| 26 | // FullChannelBehavior controls how the Broadcaster reacts if a watcher's watch |
| 27 | // channel is full. |
| 28 | type FullChannelBehavior int |
| 29 | |
| 30 | const ( |
| 31 | WaitIfChannelFull FullChannelBehavior = iota |
| 32 | DropIfChannelFull |
| 33 | ) |
| 34 | |
| 35 | // Buffer the incoming queue a little bit even though it should rarely ever accumulate |
| 36 | // anything, just in case a few events are received in such a short window that |
| 37 | // Broadcaster can't move them onto the watchers' queues fast enough. |
| 38 | const incomingQueueLength = 25 |
| 39 | |
| 40 | // Broadcaster distributes event notifications among any number of watchers. Every event |
| 41 | // is delivered to every watcher. |
| 42 | type Broadcaster struct { |
| 43 | // TODO: see if this lock is needed now that new watchers go through |
| 44 | // the incoming channel. |
| 45 | lock sync.Mutex |
| 46 | |
| 47 | watchers map[int64]*broadcasterWatcher |
| 48 | nextWatcher int64 |
| 49 | distributing sync.WaitGroup |
| 50 | |
| 51 | incoming chan Event |
| 52 | |
| 53 | // How large to make watcher's channel. |
| 54 | watchQueueLength int |
| 55 | // If one of the watch channels is full, don't wait for it to become empty. |
| 56 | // Instead just deliver it to the watchers that do have space in their |
| 57 | // channels and move on to the next event. |
| 58 | // It's more fair to do this on a per-watcher basis than to do it on the |
| 59 | // "incoming" channel, which would allow one slow watcher to prevent all |
| 60 | // other watchers from getting new events. |
| 61 | fullChannelBehavior FullChannelBehavior |
| 62 | } |
| 63 | |
| 64 | // NewBroadcaster creates a new Broadcaster. queueLength is the maximum number of events to queue per watcher. |
| 65 | // It is guaranteed that events will be distributed in the order in which they occur, |
| 66 | // but the order in which a single event is distributed among all of the watchers is unspecified. |
| 67 | func NewBroadcaster(queueLength int, fullChannelBehavior FullChannelBehavior) *Broadcaster { |
| 68 | m := &Broadcaster{ |
| 69 | watchers: map[int64]*broadcasterWatcher{}, |
| 70 | incoming: make(chan Event, incomingQueueLength), |
| 71 | watchQueueLength: queueLength, |
| 72 | fullChannelBehavior: fullChannelBehavior, |
| 73 | } |
| 74 | m.distributing.Add(1) |
| 75 | go m.loop() |
| 76 | return m |
| 77 | } |
| 78 | |
| 79 | const internalRunFunctionMarker = "internal-do-function" |
| 80 | |
| 81 | // a function type we can shoehorn into the queue. |
| 82 | type functionFakeRuntimeObject func() |
| 83 | |
| 84 | func (obj functionFakeRuntimeObject) GetObjectKind() schema.ObjectKind { |
| 85 | return schema.EmptyObjectKind |
| 86 | } |
| 87 | func (obj functionFakeRuntimeObject) DeepCopyObject() runtime.Object { |
| 88 | if obj == nil { |
| 89 | return nil |
| 90 | } |
| 91 | // funcs are immutable. Hence, just return the original func. |
| 92 | return obj |
| 93 | } |
| 94 | |
| 95 | // Execute f, blocking the incoming queue (and waiting for it to drain first). |
| 96 | // The purpose of this terrible hack is so that watchers added after an event |
| 97 | // won't ever see that event, and will always see any event after they are |
| 98 | // added. |
| 99 | func (b *Broadcaster) blockQueue(f func()) { |
| 100 | var wg sync.WaitGroup |
| 101 | wg.Add(1) |
| 102 | b.incoming <- Event{ |
| 103 | Type: internalRunFunctionMarker, |
| 104 | Object: functionFakeRuntimeObject(func() { |
| 105 | defer wg.Done() |
| 106 | f() |
| 107 | }), |
| 108 | } |
| 109 | wg.Wait() |
| 110 | } |
| 111 | |
| 112 | // Watch adds a new watcher to the list and returns an Interface for it. |
| 113 | // Note: new watchers will only receive new events. They won't get an entire history |
| 114 | // of previous events. |
| 115 | func (m *Broadcaster) Watch() Interface { |
| 116 | var w *broadcasterWatcher |
| 117 | m.blockQueue(func() { |
| 118 | m.lock.Lock() |
| 119 | defer m.lock.Unlock() |
| 120 | id := m.nextWatcher |
| 121 | m.nextWatcher++ |
| 122 | w = &broadcasterWatcher{ |
| 123 | result: make(chan Event, m.watchQueueLength), |
| 124 | stopped: make(chan struct{}), |
| 125 | id: id, |
| 126 | m: m, |
| 127 | } |
| 128 | m.watchers[id] = w |
| 129 | }) |
| 130 | return w |
| 131 | } |
| 132 | |
| 133 | // WatchWithPrefix adds a new watcher to the list and returns an Interface for it. It sends |
| 134 | // queuedEvents down the new watch before beginning to send ordinary events from Broadcaster. |
| 135 | // The returned watch will have a queue length that is at least large enough to accommodate |
| 136 | // all of the items in queuedEvents. |
| 137 | func (m *Broadcaster) WatchWithPrefix(queuedEvents []Event) Interface { |
| 138 | var w *broadcasterWatcher |
| 139 | m.blockQueue(func() { |
| 140 | m.lock.Lock() |
| 141 | defer m.lock.Unlock() |
| 142 | id := m.nextWatcher |
| 143 | m.nextWatcher++ |
| 144 | length := m.watchQueueLength |
| 145 | if n := len(queuedEvents) + 1; n > length { |
| 146 | length = n |
| 147 | } |
| 148 | w = &broadcasterWatcher{ |
| 149 | result: make(chan Event, length), |
| 150 | stopped: make(chan struct{}), |
| 151 | id: id, |
| 152 | m: m, |
| 153 | } |
| 154 | m.watchers[id] = w |
| 155 | for _, e := range queuedEvents { |
| 156 | w.result <- e |
| 157 | } |
| 158 | }) |
| 159 | return w |
| 160 | } |
| 161 | |
| 162 | // stopWatching stops the given watcher and removes it from the list. |
| 163 | func (m *Broadcaster) stopWatching(id int64) { |
| 164 | m.lock.Lock() |
| 165 | defer m.lock.Unlock() |
| 166 | w, ok := m.watchers[id] |
| 167 | if !ok { |
| 168 | // No need to do anything, it's already been removed from the list. |
| 169 | return |
| 170 | } |
| 171 | delete(m.watchers, id) |
| 172 | close(w.result) |
| 173 | } |
| 174 | |
| 175 | // closeAll disconnects all watchers (presumably in response to a Shutdown call). |
| 176 | func (m *Broadcaster) closeAll() { |
| 177 | m.lock.Lock() |
| 178 | defer m.lock.Unlock() |
| 179 | for _, w := range m.watchers { |
| 180 | close(w.result) |
| 181 | } |
| 182 | // Delete everything from the map, since presence/absence in the map is used |
| 183 | // by stopWatching to avoid double-closing the channel. |
| 184 | m.watchers = map[int64]*broadcasterWatcher{} |
| 185 | } |
| 186 | |
| 187 | // Action distributes the given event among all watchers. |
| 188 | func (m *Broadcaster) Action(action EventType, obj runtime.Object) { |
| 189 | m.incoming <- Event{action, obj} |
| 190 | } |
| 191 | |
| 192 | // Shutdown disconnects all watchers (but any queued events will still be distributed). |
| 193 | // You must not call Action or Watch* after calling Shutdown. This call blocks |
| 194 | // until all events have been distributed through the outbound channels. Note |
| 195 | // that since they can be buffered, this means that the watchers might not |
| 196 | // have received the data yet as it can remain sitting in the buffered |
| 197 | // channel. |
| 198 | func (m *Broadcaster) Shutdown() { |
| 199 | close(m.incoming) |
| 200 | m.distributing.Wait() |
| 201 | } |
| 202 | |
| 203 | // loop receives from m.incoming and distributes to all watchers. |
| 204 | func (m *Broadcaster) loop() { |
| 205 | // Deliberately not catching crashes here. Yes, bring down the process if there's a |
| 206 | // bug in watch.Broadcaster. |
| 207 | for event := range m.incoming { |
| 208 | if event.Type == internalRunFunctionMarker { |
| 209 | event.Object.(functionFakeRuntimeObject)() |
| 210 | continue |
| 211 | } |
| 212 | m.distribute(event) |
| 213 | } |
| 214 | m.closeAll() |
| 215 | m.distributing.Done() |
| 216 | } |
| 217 | |
| 218 | // distribute sends event to all watchers. Blocking. |
| 219 | func (m *Broadcaster) distribute(event Event) { |
| 220 | m.lock.Lock() |
| 221 | defer m.lock.Unlock() |
| 222 | if m.fullChannelBehavior == DropIfChannelFull { |
| 223 | for _, w := range m.watchers { |
| 224 | select { |
| 225 | case w.result <- event: |
| 226 | case <-w.stopped: |
| 227 | default: // Don't block if the event can't be queued. |
| 228 | } |
| 229 | } |
| 230 | } else { |
| 231 | for _, w := range m.watchers { |
| 232 | select { |
| 233 | case w.result <- event: |
| 234 | case <-w.stopped: |
| 235 | } |
| 236 | } |
| 237 | } |
| 238 | } |
| 239 | |
| 240 | // broadcasterWatcher handles a single watcher of a broadcaster |
| 241 | type broadcasterWatcher struct { |
| 242 | result chan Event |
| 243 | stopped chan struct{} |
| 244 | stop sync.Once |
| 245 | id int64 |
| 246 | m *Broadcaster |
| 247 | } |
| 248 | |
| 249 | // ResultChan returns a channel to use for waiting on events. |
| 250 | func (mw *broadcasterWatcher) ResultChan() <-chan Event { |
| 251 | return mw.result |
| 252 | } |
| 253 | |
| 254 | // Stop stops watching and removes mw from its list. |
| 255 | func (mw *broadcasterWatcher) Stop() { |
| 256 | mw.stop.Do(func() { |
| 257 | close(mw.stopped) |
| 258 | mw.m.stopWatching(mw.id) |
| 259 | }) |
| 260 | } |