blob: 3945be3ae693b1bfc55532cac00e867aec594aa5 [file] [log] [blame]
Zack Williamse940c7a2019-08-21 14:25:39 -07001/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package watch
18
19import (
20 "fmt"
21 "sync"
22
David Bainbridge86971522019-09-26 22:09:39 +000023 "k8s.io/klog"
Zack Williamse940c7a2019-08-21 14:25:39 -070024
25 "k8s.io/apimachinery/pkg/runtime"
26)
27
28// Interface can be implemented by anything that knows how to watch and report changes.
29type Interface interface {
30 // Stops watching. Will close the channel returned by ResultChan(). Releases
31 // any resources used by the watch.
32 Stop()
33
34 // Returns a chan which will receive all the events. If an error occurs
35 // or Stop() is called, this channel will be closed, in which case the
36 // watch should be completely cleaned up.
37 ResultChan() <-chan Event
38}
39
40// EventType defines the possible types of events.
41type EventType string
42
43const (
44 Added EventType = "ADDED"
45 Modified EventType = "MODIFIED"
46 Deleted EventType = "DELETED"
David Bainbridge86971522019-09-26 22:09:39 +000047 Bookmark EventType = "BOOKMARK"
Zack Williamse940c7a2019-08-21 14:25:39 -070048 Error EventType = "ERROR"
49
50 DefaultChanSize int32 = 100
51)
52
53// Event represents a single event to a watched resource.
54// +k8s:deepcopy-gen=true
55type Event struct {
56 Type EventType
57
58 // Object is:
59 // * If Type is Added or Modified: the new state of the object.
60 // * If Type is Deleted: the state of the object immediately before deletion.
David Bainbridge86971522019-09-26 22:09:39 +000061 // * If Type is Bookmark: the object (instance of a type being watched) where
62 // only ResourceVersion field is set. On successful restart of watch from a
63 // bookmark resourceVersion, client is guaranteed to not get repeat event
64 // nor miss any events.
Zack Williamse940c7a2019-08-21 14:25:39 -070065 // * If Type is Error: *api.Status is recommended; other types may make sense
66 // depending on context.
67 Object runtime.Object
68}
69
70type emptyWatch chan Event
71
72// NewEmptyWatch returns a watch interface that returns no results and is closed.
73// May be used in certain error conditions where no information is available but
74// an error is not warranted.
75func NewEmptyWatch() Interface {
76 ch := make(chan Event)
77 close(ch)
78 return emptyWatch(ch)
79}
80
81// Stop implements Interface
82func (w emptyWatch) Stop() {
83}
84
85// ResultChan implements Interface
86func (w emptyWatch) ResultChan() <-chan Event {
87 return chan Event(w)
88}
89
90// FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
91type FakeWatcher struct {
92 result chan Event
93 Stopped bool
94 sync.Mutex
95}
96
97func NewFake() *FakeWatcher {
98 return &FakeWatcher{
99 result: make(chan Event),
100 }
101}
102
103func NewFakeWithChanSize(size int, blocking bool) *FakeWatcher {
104 return &FakeWatcher{
105 result: make(chan Event, size),
106 }
107}
108
109// Stop implements Interface.Stop().
110func (f *FakeWatcher) Stop() {
111 f.Lock()
112 defer f.Unlock()
113 if !f.Stopped {
David Bainbridge86971522019-09-26 22:09:39 +0000114 klog.V(4).Infof("Stopping fake watcher.")
Zack Williamse940c7a2019-08-21 14:25:39 -0700115 close(f.result)
116 f.Stopped = true
117 }
118}
119
120func (f *FakeWatcher) IsStopped() bool {
121 f.Lock()
122 defer f.Unlock()
123 return f.Stopped
124}
125
126// Reset prepares the watcher to be reused.
127func (f *FakeWatcher) Reset() {
128 f.Lock()
129 defer f.Unlock()
130 f.Stopped = false
131 f.result = make(chan Event)
132}
133
134func (f *FakeWatcher) ResultChan() <-chan Event {
135 return f.result
136}
137
138// Add sends an add event.
139func (f *FakeWatcher) Add(obj runtime.Object) {
140 f.result <- Event{Added, obj}
141}
142
143// Modify sends a modify event.
144func (f *FakeWatcher) Modify(obj runtime.Object) {
145 f.result <- Event{Modified, obj}
146}
147
148// Delete sends a delete event.
149func (f *FakeWatcher) Delete(lastValue runtime.Object) {
150 f.result <- Event{Deleted, lastValue}
151}
152
153// Error sends an Error event.
154func (f *FakeWatcher) Error(errValue runtime.Object) {
155 f.result <- Event{Error, errValue}
156}
157
158// Action sends an event of the requested type, for table-based testing.
159func (f *FakeWatcher) Action(action EventType, obj runtime.Object) {
160 f.result <- Event{action, obj}
161}
162
163// RaceFreeFakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
164type RaceFreeFakeWatcher struct {
165 result chan Event
166 Stopped bool
167 sync.Mutex
168}
169
170func NewRaceFreeFake() *RaceFreeFakeWatcher {
171 return &RaceFreeFakeWatcher{
172 result: make(chan Event, DefaultChanSize),
173 }
174}
175
176// Stop implements Interface.Stop().
177func (f *RaceFreeFakeWatcher) Stop() {
178 f.Lock()
179 defer f.Unlock()
180 if !f.Stopped {
David Bainbridge86971522019-09-26 22:09:39 +0000181 klog.V(4).Infof("Stopping fake watcher.")
Zack Williamse940c7a2019-08-21 14:25:39 -0700182 close(f.result)
183 f.Stopped = true
184 }
185}
186
187func (f *RaceFreeFakeWatcher) IsStopped() bool {
188 f.Lock()
189 defer f.Unlock()
190 return f.Stopped
191}
192
193// Reset prepares the watcher to be reused.
194func (f *RaceFreeFakeWatcher) Reset() {
195 f.Lock()
196 defer f.Unlock()
197 f.Stopped = false
198 f.result = make(chan Event, DefaultChanSize)
199}
200
201func (f *RaceFreeFakeWatcher) ResultChan() <-chan Event {
202 f.Lock()
203 defer f.Unlock()
204 return f.result
205}
206
207// Add sends an add event.
208func (f *RaceFreeFakeWatcher) Add(obj runtime.Object) {
209 f.Lock()
210 defer f.Unlock()
211 if !f.Stopped {
212 select {
213 case f.result <- Event{Added, obj}:
214 return
215 default:
216 panic(fmt.Errorf("channel full"))
217 }
218 }
219}
220
221// Modify sends a modify event.
222func (f *RaceFreeFakeWatcher) Modify(obj runtime.Object) {
223 f.Lock()
224 defer f.Unlock()
225 if !f.Stopped {
226 select {
227 case f.result <- Event{Modified, obj}:
228 return
229 default:
230 panic(fmt.Errorf("channel full"))
231 }
232 }
233}
234
235// Delete sends a delete event.
236func (f *RaceFreeFakeWatcher) Delete(lastValue runtime.Object) {
237 f.Lock()
238 defer f.Unlock()
239 if !f.Stopped {
240 select {
241 case f.result <- Event{Deleted, lastValue}:
242 return
243 default:
244 panic(fmt.Errorf("channel full"))
245 }
246 }
247}
248
249// Error sends an Error event.
250func (f *RaceFreeFakeWatcher) Error(errValue runtime.Object) {
251 f.Lock()
252 defer f.Unlock()
253 if !f.Stopped {
254 select {
255 case f.result <- Event{Error, errValue}:
256 return
257 default:
258 panic(fmt.Errorf("channel full"))
259 }
260 }
261}
262
263// Action sends an event of the requested type, for table-based testing.
264func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object) {
265 f.Lock()
266 defer f.Unlock()
267 if !f.Stopped {
268 select {
269 case f.result <- Event{action, obj}:
270 return
271 default:
272 panic(fmt.Errorf("channel full"))
273 }
274 }
275}
276
277// ProxyWatcher lets you wrap your channel in watch Interface. Threadsafe.
278type ProxyWatcher struct {
279 result chan Event
280 stopCh chan struct{}
281
282 mutex sync.Mutex
283 stopped bool
284}
285
286var _ Interface = &ProxyWatcher{}
287
288// NewProxyWatcher creates new ProxyWatcher by wrapping a channel
289func NewProxyWatcher(ch chan Event) *ProxyWatcher {
290 return &ProxyWatcher{
291 result: ch,
292 stopCh: make(chan struct{}),
293 stopped: false,
294 }
295}
296
297// Stop implements Interface
298func (pw *ProxyWatcher) Stop() {
299 pw.mutex.Lock()
300 defer pw.mutex.Unlock()
301 if !pw.stopped {
302 pw.stopped = true
303 close(pw.stopCh)
304 }
305}
306
307// Stopping returns true if Stop() has been called
308func (pw *ProxyWatcher) Stopping() bool {
309 pw.mutex.Lock()
310 defer pw.mutex.Unlock()
311 return pw.stopped
312}
313
314// ResultChan implements Interface
315func (pw *ProxyWatcher) ResultChan() <-chan Event {
316 return pw.result
317}
318
319// StopChan returns stop channel
320func (pw *ProxyWatcher) StopChan() <-chan struct{} {
321 return pw.stopCh
322}