blob: 1f4911a31136415a6c09406fd9e69d97938c9db8 [file] [log] [blame]
Matteo Scandoloa4285862020-12-01 18:10:10 -08001/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package watch
18
19import (
20 "fmt"
21 "sync"
22
23 "k8s.io/klog/v2"
24
25 "k8s.io/apimachinery/pkg/runtime"
26)
27
28// Interface can be implemented by anything that knows how to watch and report changes.
29type Interface interface {
30 // Stops watching. Will close the channel returned by ResultChan(). Releases
31 // any resources used by the watch.
32 Stop()
33
34 // Returns a chan which will receive all the events. If an error occurs
35 // or Stop() is called, the implementation will close this channel and
36 // release any resources used by the watch.
37 ResultChan() <-chan Event
38}
39
40// EventType defines the possible types of events.
41type EventType string
42
43const (
44 Added EventType = "ADDED"
45 Modified EventType = "MODIFIED"
46 Deleted EventType = "DELETED"
47 Bookmark EventType = "BOOKMARK"
48 Error EventType = "ERROR"
49)
50
51var (
52 DefaultChanSize int32 = 100
53)
54
55// Event represents a single event to a watched resource.
56// +k8s:deepcopy-gen=true
57type Event struct {
58 Type EventType
59
60 // Object is:
61 // * If Type is Added or Modified: the new state of the object.
62 // * If Type is Deleted: the state of the object immediately before deletion.
63 // * If Type is Bookmark: the object (instance of a type being watched) where
64 // only ResourceVersion field is set. On successful restart of watch from a
65 // bookmark resourceVersion, client is guaranteed to not get repeat event
66 // nor miss any events.
67 // * If Type is Error: *api.Status is recommended; other types may make sense
68 // depending on context.
69 Object runtime.Object
70}
71
72type emptyWatch chan Event
73
74// NewEmptyWatch returns a watch interface that returns no results and is closed.
75// May be used in certain error conditions where no information is available but
76// an error is not warranted.
77func NewEmptyWatch() Interface {
78 ch := make(chan Event)
79 close(ch)
80 return emptyWatch(ch)
81}
82
83// Stop implements Interface
84func (w emptyWatch) Stop() {
85}
86
87// ResultChan implements Interface
88func (w emptyWatch) ResultChan() <-chan Event {
89 return chan Event(w)
90}
91
92// FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
93type FakeWatcher struct {
94 result chan Event
95 stopped bool
96 sync.Mutex
97}
98
99func NewFake() *FakeWatcher {
100 return &FakeWatcher{
101 result: make(chan Event),
102 }
103}
104
105func NewFakeWithChanSize(size int, blocking bool) *FakeWatcher {
106 return &FakeWatcher{
107 result: make(chan Event, size),
108 }
109}
110
111// Stop implements Interface.Stop().
112func (f *FakeWatcher) Stop() {
113 f.Lock()
114 defer f.Unlock()
115 if !f.stopped {
116 klog.V(4).Infof("Stopping fake watcher.")
117 close(f.result)
118 f.stopped = true
119 }
120}
121
122func (f *FakeWatcher) IsStopped() bool {
123 f.Lock()
124 defer f.Unlock()
125 return f.stopped
126}
127
128// Reset prepares the watcher to be reused.
129func (f *FakeWatcher) Reset() {
130 f.Lock()
131 defer f.Unlock()
132 f.stopped = false
133 f.result = make(chan Event)
134}
135
136func (f *FakeWatcher) ResultChan() <-chan Event {
137 return f.result
138}
139
140// Add sends an add event.
141func (f *FakeWatcher) Add(obj runtime.Object) {
142 f.result <- Event{Added, obj}
143}
144
145// Modify sends a modify event.
146func (f *FakeWatcher) Modify(obj runtime.Object) {
147 f.result <- Event{Modified, obj}
148}
149
150// Delete sends a delete event.
151func (f *FakeWatcher) Delete(lastValue runtime.Object) {
152 f.result <- Event{Deleted, lastValue}
153}
154
155// Error sends an Error event.
156func (f *FakeWatcher) Error(errValue runtime.Object) {
157 f.result <- Event{Error, errValue}
158}
159
160// Action sends an event of the requested type, for table-based testing.
161func (f *FakeWatcher) Action(action EventType, obj runtime.Object) {
162 f.result <- Event{action, obj}
163}
164
165// RaceFreeFakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
166type RaceFreeFakeWatcher struct {
167 result chan Event
168 Stopped bool
169 sync.Mutex
170}
171
172func NewRaceFreeFake() *RaceFreeFakeWatcher {
173 return &RaceFreeFakeWatcher{
174 result: make(chan Event, DefaultChanSize),
175 }
176}
177
178// Stop implements Interface.Stop().
179func (f *RaceFreeFakeWatcher) Stop() {
180 f.Lock()
181 defer f.Unlock()
182 if !f.Stopped {
183 klog.V(4).Infof("Stopping fake watcher.")
184 close(f.result)
185 f.Stopped = true
186 }
187}
188
189func (f *RaceFreeFakeWatcher) IsStopped() bool {
190 f.Lock()
191 defer f.Unlock()
192 return f.Stopped
193}
194
195// Reset prepares the watcher to be reused.
196func (f *RaceFreeFakeWatcher) Reset() {
197 f.Lock()
198 defer f.Unlock()
199 f.Stopped = false
200 f.result = make(chan Event, DefaultChanSize)
201}
202
203func (f *RaceFreeFakeWatcher) ResultChan() <-chan Event {
204 f.Lock()
205 defer f.Unlock()
206 return f.result
207}
208
209// Add sends an add event.
210func (f *RaceFreeFakeWatcher) Add(obj runtime.Object) {
211 f.Lock()
212 defer f.Unlock()
213 if !f.Stopped {
214 select {
215 case f.result <- Event{Added, obj}:
216 return
217 default:
218 panic(fmt.Errorf("channel full"))
219 }
220 }
221}
222
223// Modify sends a modify event.
224func (f *RaceFreeFakeWatcher) Modify(obj runtime.Object) {
225 f.Lock()
226 defer f.Unlock()
227 if !f.Stopped {
228 select {
229 case f.result <- Event{Modified, obj}:
230 return
231 default:
232 panic(fmt.Errorf("channel full"))
233 }
234 }
235}
236
237// Delete sends a delete event.
238func (f *RaceFreeFakeWatcher) Delete(lastValue runtime.Object) {
239 f.Lock()
240 defer f.Unlock()
241 if !f.Stopped {
242 select {
243 case f.result <- Event{Deleted, lastValue}:
244 return
245 default:
246 panic(fmt.Errorf("channel full"))
247 }
248 }
249}
250
251// Error sends an Error event.
252func (f *RaceFreeFakeWatcher) Error(errValue runtime.Object) {
253 f.Lock()
254 defer f.Unlock()
255 if !f.Stopped {
256 select {
257 case f.result <- Event{Error, errValue}:
258 return
259 default:
260 panic(fmt.Errorf("channel full"))
261 }
262 }
263}
264
265// Action sends an event of the requested type, for table-based testing.
266func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object) {
267 f.Lock()
268 defer f.Unlock()
269 if !f.Stopped {
270 select {
271 case f.result <- Event{action, obj}:
272 return
273 default:
274 panic(fmt.Errorf("channel full"))
275 }
276 }
277}
278
279// ProxyWatcher lets you wrap your channel in watch Interface. Threadsafe.
280type ProxyWatcher struct {
281 result chan Event
282 stopCh chan struct{}
283
284 mutex sync.Mutex
285 stopped bool
286}
287
288var _ Interface = &ProxyWatcher{}
289
290// NewProxyWatcher creates new ProxyWatcher by wrapping a channel
291func NewProxyWatcher(ch chan Event) *ProxyWatcher {
292 return &ProxyWatcher{
293 result: ch,
294 stopCh: make(chan struct{}),
295 stopped: false,
296 }
297}
298
299// Stop implements Interface
300func (pw *ProxyWatcher) Stop() {
301 pw.mutex.Lock()
302 defer pw.mutex.Unlock()
303 if !pw.stopped {
304 pw.stopped = true
305 close(pw.stopCh)
306 }
307}
308
309// Stopping returns true if Stop() has been called
310func (pw *ProxyWatcher) Stopping() bool {
311 pw.mutex.Lock()
312 defer pw.mutex.Unlock()
313 return pw.stopped
314}
315
316// ResultChan implements Interface
317func (pw *ProxyWatcher) ResultChan() <-chan Event {
318 return pw.result
319}
320
321// StopChan returns stop channel
322func (pw *ProxyWatcher) StopChan() <-chan struct{} {
323 return pw.stopCh
324}