blob: 2846d62a5d41d6bd9490da0352180f57397dc78b [file] [log] [blame]
khenaidoo59ce9dd2019-11-11 13:05:32 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mvcc
16
17import (
18 "bytes"
19 "errors"
20 "sync"
21
22 "go.etcd.io/etcd/mvcc/mvccpb"
23)
24
25// AutoWatchID is the watcher ID passed in WatchStream.Watch when no
26// user-provided ID is available. If pass, an ID will automatically be assigned.
27const AutoWatchID WatchID = 0
28
29var (
30 ErrWatcherNotExist = errors.New("mvcc: watcher does not exist")
31 ErrEmptyWatcherRange = errors.New("mvcc: watcher range is empty")
32 ErrWatcherDuplicateID = errors.New("mvcc: duplicate watch ID provided on the WatchStream")
33)
34
35type WatchID int64
36
37// FilterFunc returns true if the given event should be filtered out.
38type FilterFunc func(e mvccpb.Event) bool
39
40type WatchStream interface {
41 // Watch creates a watcher. The watcher watches the events happening or
42 // happened on the given key or range [key, end) from the given startRev.
43 //
44 // The whole event history can be watched unless compacted.
45 // If "startRev" <=0, watch observes events after currentRev.
46 //
47 // The returned "id" is the ID of this watcher. It appears as WatchID
48 // in events that are sent to the created watcher through stream channel.
49 // The watch ID is used when it's not equal to AutoWatchID. Otherwise,
50 // an auto-generated watch ID is returned.
51 Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error)
52
53 // Chan returns a chan. All watch response will be sent to the returned chan.
54 Chan() <-chan WatchResponse
55
56 // RequestProgress requests the progress of the watcher with given ID. The response
57 // will only be sent if the watcher is currently synced.
58 // The responses will be sent through the WatchRespone Chan attached
59 // with this stream to ensure correct ordering.
60 // The responses contains no events. The revision in the response is the progress
61 // of the watchers since the watcher is currently synced.
62 RequestProgress(id WatchID)
63
64 // Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
65 // returned.
66 Cancel(id WatchID) error
67
68 // Close closes Chan and release all related resources.
69 Close()
70
71 // Rev returns the current revision of the KV the stream watches on.
72 Rev() int64
73}
74
75type WatchResponse struct {
76 // WatchID is the WatchID of the watcher this response sent to.
77 WatchID WatchID
78
79 // Events contains all the events that needs to send.
80 Events []mvccpb.Event
81
82 // Revision is the revision of the KV when the watchResponse is created.
83 // For a normal response, the revision should be the same as the last
84 // modified revision inside Events. For a delayed response to a unsynced
85 // watcher, the revision is greater than the last modified revision
86 // inside Events.
87 Revision int64
88
89 // CompactRevision is set when the watcher is cancelled due to compaction.
90 CompactRevision int64
91}
92
93// watchStream contains a collection of watchers that share
94// one streaming chan to send out watched events and other control events.
95type watchStream struct {
96 watchable watchable
97 ch chan WatchResponse
98
99 mu sync.Mutex // guards fields below it
100 // nextID is the ID pre-allocated for next new watcher in this stream
101 nextID WatchID
102 closed bool
103 cancels map[WatchID]cancelFunc
104 watchers map[WatchID]*watcher
105}
106
107// Watch creates a new watcher in the stream and returns its WatchID.
108func (ws *watchStream) Watch(id WatchID, key, end []byte, startRev int64, fcs ...FilterFunc) (WatchID, error) {
109 // prevent wrong range where key >= end lexicographically
110 // watch request with 'WithFromKey' has empty-byte range end
111 if len(end) != 0 && bytes.Compare(key, end) != -1 {
112 return -1, ErrEmptyWatcherRange
113 }
114
115 ws.mu.Lock()
116 defer ws.mu.Unlock()
117 if ws.closed {
118 return -1, ErrEmptyWatcherRange
119 }
120
121 if id == AutoWatchID {
122 for ws.watchers[ws.nextID] != nil {
123 ws.nextID++
124 }
125 id = ws.nextID
126 ws.nextID++
127 } else if _, ok := ws.watchers[id]; ok {
128 return -1, ErrWatcherDuplicateID
129 }
130
131 w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)
132
133 ws.cancels[id] = c
134 ws.watchers[id] = w
135 return id, nil
136}
137
138func (ws *watchStream) Chan() <-chan WatchResponse {
139 return ws.ch
140}
141
142func (ws *watchStream) Cancel(id WatchID) error {
143 ws.mu.Lock()
144 cancel, ok := ws.cancels[id]
145 w := ws.watchers[id]
146 ok = ok && !ws.closed
147 ws.mu.Unlock()
148
149 if !ok {
150 return ErrWatcherNotExist
151 }
152 cancel()
153
154 ws.mu.Lock()
155 // The watch isn't removed until cancel so that if Close() is called,
156 // it will wait for the cancel. Otherwise, Close() could close the
157 // watch channel while the store is still posting events.
158 if ww := ws.watchers[id]; ww == w {
159 delete(ws.cancels, id)
160 delete(ws.watchers, id)
161 }
162 ws.mu.Unlock()
163
164 return nil
165}
166
167func (ws *watchStream) Close() {
168 ws.mu.Lock()
169 defer ws.mu.Unlock()
170
171 for _, cancel := range ws.cancels {
172 cancel()
173 }
174 ws.closed = true
175 close(ws.ch)
176 watchStreamGauge.Dec()
177}
178
179func (ws *watchStream) Rev() int64 {
180 ws.mu.Lock()
181 defer ws.mu.Unlock()
182 return ws.watchable.rev()
183}
184
185func (ws *watchStream) RequestProgress(id WatchID) {
186 ws.mu.Lock()
187 w, ok := ws.watchers[id]
188 ws.mu.Unlock()
189 if !ok {
190 return
191 }
192 ws.watchable.progress(w)
193}