blob: be9c90c03d10805a1dbf2960f3bd78fdf1a87e7d [file] [log] [blame]
Scott Bakere7144bc2019-10-01 14:16:47 -07001/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package watch
18
19import (
20 "fmt"
21 "sync"
22
23 "k8s.io/klog"
24
25 "k8s.io/apimachinery/pkg/runtime"
26)
27
28// Interface can be implemented by anything that knows how to watch and report changes.
29type Interface interface {
30 // Stops watching. Will close the channel returned by ResultChan(). Releases
31 // any resources used by the watch.
32 Stop()
33
34 // Returns a chan which will receive all the events. If an error occurs
35 // or Stop() is called, this channel will be closed, in which case the
36 // watch should be completely cleaned up.
37 ResultChan() <-chan Event
38}
39
40// EventType defines the possible types of events.
41type EventType string
42
43const (
44 Added EventType = "ADDED"
45 Modified EventType = "MODIFIED"
46 Deleted EventType = "DELETED"
47 Error EventType = "ERROR"
48
49 DefaultChanSize int32 = 100
50)
51
52// Event represents a single event to a watched resource.
53// +k8s:deepcopy-gen=true
54type Event struct {
55 Type EventType
56
57 // Object is:
58 // * If Type is Added or Modified: the new state of the object.
59 // * If Type is Deleted: the state of the object immediately before deletion.
60 // * If Type is Error: *api.Status is recommended; other types may make sense
61 // depending on context.
62 Object runtime.Object
63}
64
65type emptyWatch chan Event
66
67// NewEmptyWatch returns a watch interface that returns no results and is closed.
68// May be used in certain error conditions where no information is available but
69// an error is not warranted.
70func NewEmptyWatch() Interface {
71 ch := make(chan Event)
72 close(ch)
73 return emptyWatch(ch)
74}
75
76// Stop implements Interface
77func (w emptyWatch) Stop() {
78}
79
80// ResultChan implements Interface
81func (w emptyWatch) ResultChan() <-chan Event {
82 return chan Event(w)
83}
84
85// FakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
86type FakeWatcher struct {
87 result chan Event
88 Stopped bool
89 sync.Mutex
90}
91
92func NewFake() *FakeWatcher {
93 return &FakeWatcher{
94 result: make(chan Event),
95 }
96}
97
98func NewFakeWithChanSize(size int, blocking bool) *FakeWatcher {
99 return &FakeWatcher{
100 result: make(chan Event, size),
101 }
102}
103
104// Stop implements Interface.Stop().
105func (f *FakeWatcher) Stop() {
106 f.Lock()
107 defer f.Unlock()
108 if !f.Stopped {
109 klog.V(4).Infof("Stopping fake watcher.")
110 close(f.result)
111 f.Stopped = true
112 }
113}
114
115func (f *FakeWatcher) IsStopped() bool {
116 f.Lock()
117 defer f.Unlock()
118 return f.Stopped
119}
120
121// Reset prepares the watcher to be reused.
122func (f *FakeWatcher) Reset() {
123 f.Lock()
124 defer f.Unlock()
125 f.Stopped = false
126 f.result = make(chan Event)
127}
128
129func (f *FakeWatcher) ResultChan() <-chan Event {
130 return f.result
131}
132
133// Add sends an add event.
134func (f *FakeWatcher) Add(obj runtime.Object) {
135 f.result <- Event{Added, obj}
136}
137
138// Modify sends a modify event.
139func (f *FakeWatcher) Modify(obj runtime.Object) {
140 f.result <- Event{Modified, obj}
141}
142
143// Delete sends a delete event.
144func (f *FakeWatcher) Delete(lastValue runtime.Object) {
145 f.result <- Event{Deleted, lastValue}
146}
147
148// Error sends an Error event.
149func (f *FakeWatcher) Error(errValue runtime.Object) {
150 f.result <- Event{Error, errValue}
151}
152
153// Action sends an event of the requested type, for table-based testing.
154func (f *FakeWatcher) Action(action EventType, obj runtime.Object) {
155 f.result <- Event{action, obj}
156}
157
158// RaceFreeFakeWatcher lets you test anything that consumes a watch.Interface; threadsafe.
159type RaceFreeFakeWatcher struct {
160 result chan Event
161 Stopped bool
162 sync.Mutex
163}
164
165func NewRaceFreeFake() *RaceFreeFakeWatcher {
166 return &RaceFreeFakeWatcher{
167 result: make(chan Event, DefaultChanSize),
168 }
169}
170
171// Stop implements Interface.Stop().
172func (f *RaceFreeFakeWatcher) Stop() {
173 f.Lock()
174 defer f.Unlock()
175 if !f.Stopped {
176 klog.V(4).Infof("Stopping fake watcher.")
177 close(f.result)
178 f.Stopped = true
179 }
180}
181
182func (f *RaceFreeFakeWatcher) IsStopped() bool {
183 f.Lock()
184 defer f.Unlock()
185 return f.Stopped
186}
187
188// Reset prepares the watcher to be reused.
189func (f *RaceFreeFakeWatcher) Reset() {
190 f.Lock()
191 defer f.Unlock()
192 f.Stopped = false
193 f.result = make(chan Event, DefaultChanSize)
194}
195
196func (f *RaceFreeFakeWatcher) ResultChan() <-chan Event {
197 f.Lock()
198 defer f.Unlock()
199 return f.result
200}
201
202// Add sends an add event.
203func (f *RaceFreeFakeWatcher) Add(obj runtime.Object) {
204 f.Lock()
205 defer f.Unlock()
206 if !f.Stopped {
207 select {
208 case f.result <- Event{Added, obj}:
209 return
210 default:
211 panic(fmt.Errorf("channel full"))
212 }
213 }
214}
215
216// Modify sends a modify event.
217func (f *RaceFreeFakeWatcher) Modify(obj runtime.Object) {
218 f.Lock()
219 defer f.Unlock()
220 if !f.Stopped {
221 select {
222 case f.result <- Event{Modified, obj}:
223 return
224 default:
225 panic(fmt.Errorf("channel full"))
226 }
227 }
228}
229
230// Delete sends a delete event.
231func (f *RaceFreeFakeWatcher) Delete(lastValue runtime.Object) {
232 f.Lock()
233 defer f.Unlock()
234 if !f.Stopped {
235 select {
236 case f.result <- Event{Deleted, lastValue}:
237 return
238 default:
239 panic(fmt.Errorf("channel full"))
240 }
241 }
242}
243
244// Error sends an Error event.
245func (f *RaceFreeFakeWatcher) Error(errValue runtime.Object) {
246 f.Lock()
247 defer f.Unlock()
248 if !f.Stopped {
249 select {
250 case f.result <- Event{Error, errValue}:
251 return
252 default:
253 panic(fmt.Errorf("channel full"))
254 }
255 }
256}
257
258// Action sends an event of the requested type, for table-based testing.
259func (f *RaceFreeFakeWatcher) Action(action EventType, obj runtime.Object) {
260 f.Lock()
261 defer f.Unlock()
262 if !f.Stopped {
263 select {
264 case f.result <- Event{action, obj}:
265 return
266 default:
267 panic(fmt.Errorf("channel full"))
268 }
269 }
270}
271
272// ProxyWatcher lets you wrap your channel in watch Interface. Threadsafe.
273type ProxyWatcher struct {
274 result chan Event
275 stopCh chan struct{}
276
277 mutex sync.Mutex
278 stopped bool
279}
280
281var _ Interface = &ProxyWatcher{}
282
283// NewProxyWatcher creates new ProxyWatcher by wrapping a channel
284func NewProxyWatcher(ch chan Event) *ProxyWatcher {
285 return &ProxyWatcher{
286 result: ch,
287 stopCh: make(chan struct{}),
288 stopped: false,
289 }
290}
291
292// Stop implements Interface
293func (pw *ProxyWatcher) Stop() {
294 pw.mutex.Lock()
295 defer pw.mutex.Unlock()
296 if !pw.stopped {
297 pw.stopped = true
298 close(pw.stopCh)
299 }
300}
301
302// Stopping returns true if Stop() has been called
303func (pw *ProxyWatcher) Stopping() bool {
304 pw.mutex.Lock()
305 defer pw.mutex.Unlock()
306 return pw.stopped
307}
308
309// ResultChan implements Interface
310func (pw *ProxyWatcher) ResultChan() <-chan Event {
311 return pw.result
312}
313
314// StopChan returns stop channel
315func (pw *ProxyWatcher) StopChan() <-chan struct{} {
316 return pw.stopCh
317}