blob: 8271e9b707ed17f35a6d4ea44301f04e12028178 [file] [log] [blame]
Matteo Scandoloa4285862020-12-01 18:10:10 -08001/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package watch
18
19import (
20 "fmt"
21 "io"
22 "sync"
23
24 "k8s.io/klog/v2"
25
26 "k8s.io/apimachinery/pkg/runtime"
27 "k8s.io/apimachinery/pkg/util/net"
28 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
29)
30
31// Decoder allows StreamWatcher to watch any stream for which a Decoder can be written.
32type Decoder interface {
33 // Decode should return the type of event, the decoded object, or an error.
34 // An error will cause StreamWatcher to call Close(). Decode should block until
35 // it has data or an error occurs.
36 Decode() (action EventType, object runtime.Object, err error)
37
38 // Close should close the underlying io.Reader, signalling to the source of
39 // the stream that it is no longer being watched. Close() must cause any
40 // outstanding call to Decode() to return with an error of some sort.
41 Close()
42}
43
44// Reporter hides the details of how an error is turned into a runtime.Object for
45// reporting on a watch stream since this package may not import a higher level report.
46type Reporter interface {
47 // AsObject must convert err into a valid runtime.Object for the watch stream.
48 AsObject(err error) runtime.Object
49}
50
51// StreamWatcher turns any stream for which you can write a Decoder interface
52// into a watch.Interface.
53type StreamWatcher struct {
54 sync.Mutex
55 source Decoder
56 reporter Reporter
57 result chan Event
58 stopped bool
59}
60
61// NewStreamWatcher creates a StreamWatcher from the given decoder.
62func NewStreamWatcher(d Decoder, r Reporter) *StreamWatcher {
63 sw := &StreamWatcher{
64 source: d,
65 reporter: r,
66 // It's easy for a consumer to add buffering via an extra
67 // goroutine/channel, but impossible for them to remove it,
68 // so nonbuffered is better.
69 result: make(chan Event),
70 }
71 go sw.receive()
72 return sw
73}
74
75// ResultChan implements Interface.
76func (sw *StreamWatcher) ResultChan() <-chan Event {
77 return sw.result
78}
79
80// Stop implements Interface.
81func (sw *StreamWatcher) Stop() {
82 // Call Close() exactly once by locking and setting a flag.
83 sw.Lock()
84 defer sw.Unlock()
85 if !sw.stopped {
86 sw.stopped = true
87 sw.source.Close()
88 }
89}
90
91// stopping returns true if Stop() was called previously.
92func (sw *StreamWatcher) stopping() bool {
93 sw.Lock()
94 defer sw.Unlock()
95 return sw.stopped
96}
97
98// receive reads result from the decoder in a loop and sends down the result channel.
99func (sw *StreamWatcher) receive() {
100 defer close(sw.result)
101 defer sw.Stop()
102 defer utilruntime.HandleCrash()
103 for {
104 action, obj, err := sw.source.Decode()
105 if err != nil {
106 // Ignore expected error.
107 if sw.stopping() {
108 return
109 }
110 switch err {
111 case io.EOF:
112 // watch closed normally
113 case io.ErrUnexpectedEOF:
114 klog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
115 default:
116 if net.IsProbableEOF(err) || net.IsTimeout(err) {
117 klog.V(5).Infof("Unable to decode an event from the watch stream: %v", err)
118 } else {
119 sw.result <- Event{
120 Type: Error,
121 Object: sw.reporter.AsObject(fmt.Errorf("unable to decode an event from the watch stream: %v", err)),
122 }
123 }
124 }
125 return
126 }
127 sw.result <- Event{
128 Type: action,
129 Object: obj,
130 }
131 }
132}