blob: bc0c6322fd1f95e9905ffff8a47569971e2dbc95 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package mvcc
16
17import (
18 "bytes"
19 "errors"
20 "sync"
21
22 "github.com/coreos/etcd/mvcc/mvccpb"
23)
24
25var (
26 ErrWatcherNotExist = errors.New("mvcc: watcher does not exist")
27)
28
29type WatchID int64
30
31// FilterFunc returns true if the given event should be filtered out.
32type FilterFunc func(e mvccpb.Event) bool
33
34type WatchStream interface {
35 // Watch creates a watcher. The watcher watches the events happening or
36 // happened on the given key or range [key, end) from the given startRev.
37 //
38 // The whole event history can be watched unless compacted.
39 // If `startRev` <=0, watch observes events after currentRev.
40 //
41 // The returned `id` is the ID of this watcher. It appears as WatchID
42 // in events that are sent to the created watcher through stream channel.
43 //
44 Watch(key, end []byte, startRev int64, fcs ...FilterFunc) WatchID
45
46 // Chan returns a chan. All watch response will be sent to the returned chan.
47 Chan() <-chan WatchResponse
48
49 // RequestProgress requests the progress of the watcher with given ID. The response
50 // will only be sent if the watcher is currently synced.
51 // The responses will be sent through the WatchRespone Chan attached
52 // with this stream to ensure correct ordering.
53 // The responses contains no events. The revision in the response is the progress
54 // of the watchers since the watcher is currently synced.
55 RequestProgress(id WatchID)
56
57 // Cancel cancels a watcher by giving its ID. If watcher does not exist, an error will be
58 // returned.
59 Cancel(id WatchID) error
60
61 // Close closes Chan and release all related resources.
62 Close()
63
64 // Rev returns the current revision of the KV the stream watches on.
65 Rev() int64
66}
67
68type WatchResponse struct {
69 // WatchID is the WatchID of the watcher this response sent to.
70 WatchID WatchID
71
72 // Events contains all the events that needs to send.
73 Events []mvccpb.Event
74
75 // Revision is the revision of the KV when the watchResponse is created.
76 // For a normal response, the revision should be the same as the last
77 // modified revision inside Events. For a delayed response to a unsynced
78 // watcher, the revision is greater than the last modified revision
79 // inside Events.
80 Revision int64
81
82 // CompactRevision is set when the watcher is cancelled due to compaction.
83 CompactRevision int64
84}
85
86// watchStream contains a collection of watchers that share
87// one streaming chan to send out watched events and other control events.
88type watchStream struct {
89 watchable watchable
90 ch chan WatchResponse
91
92 mu sync.Mutex // guards fields below it
93 // nextID is the ID pre-allocated for next new watcher in this stream
94 nextID WatchID
95 closed bool
96 cancels map[WatchID]cancelFunc
97 watchers map[WatchID]*watcher
98}
99
100// Watch creates a new watcher in the stream and returns its WatchID.
101// TODO: return error if ws is closed?
102func (ws *watchStream) Watch(key, end []byte, startRev int64, fcs ...FilterFunc) WatchID {
103 // prevent wrong range where key >= end lexicographically
104 // watch request with 'WithFromKey' has empty-byte range end
105 if len(end) != 0 && bytes.Compare(key, end) != -1 {
106 return -1
107 }
108
109 ws.mu.Lock()
110 defer ws.mu.Unlock()
111 if ws.closed {
112 return -1
113 }
114
115 id := ws.nextID
116 ws.nextID++
117
118 w, c := ws.watchable.watch(key, end, startRev, id, ws.ch, fcs...)
119
120 ws.cancels[id] = c
121 ws.watchers[id] = w
122 return id
123}
124
125func (ws *watchStream) Chan() <-chan WatchResponse {
126 return ws.ch
127}
128
129func (ws *watchStream) Cancel(id WatchID) error {
130 ws.mu.Lock()
131 cancel, ok := ws.cancels[id]
132 w := ws.watchers[id]
133 ok = ok && !ws.closed
134 ws.mu.Unlock()
135
136 if !ok {
137 return ErrWatcherNotExist
138 }
139 cancel()
140
141 ws.mu.Lock()
142 // The watch isn't removed until cancel so that if Close() is called,
143 // it will wait for the cancel. Otherwise, Close() could close the
144 // watch channel while the store is still posting events.
145 if ww := ws.watchers[id]; ww == w {
146 delete(ws.cancels, id)
147 delete(ws.watchers, id)
148 }
149 ws.mu.Unlock()
150
151 return nil
152}
153
154func (ws *watchStream) Close() {
155 ws.mu.Lock()
156 defer ws.mu.Unlock()
157
158 for _, cancel := range ws.cancels {
159 cancel()
160 }
161 ws.closed = true
162 close(ws.ch)
163 watchStreamGauge.Dec()
164}
165
166func (ws *watchStream) Rev() int64 {
167 ws.mu.Lock()
168 defer ws.mu.Unlock()
169 return ws.watchable.rev()
170}
171
172func (ws *watchStream) RequestProgress(id WatchID) {
173 ws.mu.Lock()
174 w, ok := ws.watchers[id]
175 ws.mu.Unlock()
176 if !ok {
177 return
178 }
179 ws.watchable.progress(w)
180}