blob: 93bb1cdf7f6bba918e3b30a47a6eed656e8e2f85 [file] [log] [blame]
Zack Williamse940c7a2019-08-21 14:25:39 -07001/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package watch
18
19import (
20 "io"
21 "sync"
22
23 "github.com/golang/glog"
24 "k8s.io/apimachinery/pkg/runtime"
25 "k8s.io/apimachinery/pkg/util/net"
26 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
27)
28
29// Decoder allows StreamWatcher to watch any stream for which a Decoder can be written.
30type Decoder interface {
31 // Decode should return the type of event, the decoded object, or an error.
32 // An error will cause StreamWatcher to call Close(). Decode should block until
33 // it has data or an error occurs.
34 Decode() (action EventType, object runtime.Object, err error)
35
36 // Close should close the underlying io.Reader, signalling to the source of
37 // the stream that it is no longer being watched. Close() must cause any
38 // outstanding call to Decode() to return with an error of some sort.
39 Close()
40}
41
42// StreamWatcher turns any stream for which you can write a Decoder interface
43// into a watch.Interface.
44type StreamWatcher struct {
45 sync.Mutex
46 source Decoder
47 result chan Event
48 stopped bool
49}
50
51// NewStreamWatcher creates a StreamWatcher from the given decoder.
52func NewStreamWatcher(d Decoder) *StreamWatcher {
53 sw := &StreamWatcher{
54 source: d,
55 // It's easy for a consumer to add buffering via an extra
56 // goroutine/channel, but impossible for them to remove it,
57 // so nonbuffered is better.
58 result: make(chan Event),
59 }
60 go sw.receive()
61 return sw
62}
63
64// ResultChan implements Interface.
65func (sw *StreamWatcher) ResultChan() <-chan Event {
66 return sw.result
67}
68
69// Stop implements Interface.
70func (sw *StreamWatcher) Stop() {
71 // Call Close() exactly once by locking and setting a flag.
72 sw.Lock()
73 defer sw.Unlock()
74 if !sw.stopped {
75 sw.stopped = true
76 sw.source.Close()
77 }
78}
79
80// stopping returns true if Stop() was called previously.
81func (sw *StreamWatcher) stopping() bool {
82 sw.Lock()
83 defer sw.Unlock()
84 return sw.stopped
85}
86
87// receive reads result from the decoder in a loop and sends down the result channel.
88func (sw *StreamWatcher) receive() {
89 defer close(sw.result)
90 defer sw.Stop()
91 defer utilruntime.HandleCrash()
92 for {
93 action, obj, err := sw.source.Decode()
94 if err != nil {
95 // Ignore expected error.
96 if sw.stopping() {
97 return
98 }
99 switch err {
100 case io.EOF:
101 // watch closed normally
102 case io.ErrUnexpectedEOF:
103 glog.V(1).Infof("Unexpected EOF during watch stream event decoding: %v", err)
104 default:
105 msg := "Unable to decode an event from the watch stream: %v"
106 if net.IsProbableEOF(err) {
107 glog.V(5).Infof(msg, err)
108 } else {
109 glog.Errorf(msg, err)
110 }
111 }
112 return
113 }
114 sw.result <- Event{
115 Type: action,
116 Object: obj,
117 }
118 }
119}