sslobodr | d046be8 | 2019-01-16 10:02:22 -0500 | [diff] [blame^] | 1 | // Copyright 2016 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package grpcproxy |
| 16 | |
| 17 | import ( |
| 18 | "context" |
| 19 | "sync" |
| 20 | |
| 21 | "github.com/coreos/etcd/clientv3" |
| 22 | "github.com/coreos/etcd/etcdserver/api/v3rpc" |
| 23 | "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" |
| 24 | pb "github.com/coreos/etcd/etcdserver/etcdserverpb" |
| 25 | |
| 26 | "google.golang.org/grpc" |
| 27 | "google.golang.org/grpc/metadata" |
| 28 | ) |
| 29 | |
| 30 | type watchProxy struct { |
| 31 | cw clientv3.Watcher |
| 32 | ctx context.Context |
| 33 | |
| 34 | leader *leader |
| 35 | |
| 36 | ranges *watchRanges |
| 37 | |
| 38 | // mu protects adding outstanding watch servers through wg. |
| 39 | mu sync.Mutex |
| 40 | |
| 41 | // wg waits until all outstanding watch servers quit. |
| 42 | wg sync.WaitGroup |
| 43 | |
| 44 | // kv is used for permission checking |
| 45 | kv clientv3.KV |
| 46 | } |
| 47 | |
| 48 | func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) { |
| 49 | cctx, cancel := context.WithCancel(c.Ctx()) |
| 50 | wp := &watchProxy{ |
| 51 | cw: c.Watcher, |
| 52 | ctx: cctx, |
| 53 | leader: newLeader(c.Ctx(), c.Watcher), |
| 54 | |
| 55 | kv: c.KV, // for permission checking |
| 56 | } |
| 57 | wp.ranges = newWatchRanges(wp) |
| 58 | ch := make(chan struct{}) |
| 59 | go func() { |
| 60 | defer close(ch) |
| 61 | <-wp.leader.stopNotify() |
| 62 | wp.mu.Lock() |
| 63 | select { |
| 64 | case <-wp.ctx.Done(): |
| 65 | case <-wp.leader.disconnectNotify(): |
| 66 | cancel() |
| 67 | } |
| 68 | <-wp.ctx.Done() |
| 69 | wp.mu.Unlock() |
| 70 | wp.wg.Wait() |
| 71 | wp.ranges.stop() |
| 72 | }() |
| 73 | return wp, ch |
| 74 | } |
| 75 | |
| 76 | func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { |
| 77 | wp.mu.Lock() |
| 78 | select { |
| 79 | case <-wp.ctx.Done(): |
| 80 | wp.mu.Unlock() |
| 81 | select { |
| 82 | case <-wp.leader.disconnectNotify(): |
| 83 | return grpc.ErrClientConnClosing |
| 84 | default: |
| 85 | return wp.ctx.Err() |
| 86 | } |
| 87 | default: |
| 88 | wp.wg.Add(1) |
| 89 | } |
| 90 | wp.mu.Unlock() |
| 91 | |
| 92 | ctx, cancel := context.WithCancel(stream.Context()) |
| 93 | wps := &watchProxyStream{ |
| 94 | ranges: wp.ranges, |
| 95 | watchers: make(map[int64]*watcher), |
| 96 | stream: stream, |
| 97 | watchCh: make(chan *pb.WatchResponse, 1024), |
| 98 | ctx: ctx, |
| 99 | cancel: cancel, |
| 100 | kv: wp.kv, |
| 101 | } |
| 102 | |
| 103 | var lostLeaderC <-chan struct{} |
| 104 | if md, ok := metadata.FromOutgoingContext(stream.Context()); ok { |
| 105 | v := md[rpctypes.MetadataRequireLeaderKey] |
| 106 | if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader { |
| 107 | lostLeaderC = wp.leader.lostNotify() |
| 108 | // if leader is known to be lost at creation time, avoid |
| 109 | // letting events through at all |
| 110 | select { |
| 111 | case <-lostLeaderC: |
| 112 | wp.wg.Done() |
| 113 | return rpctypes.ErrNoLeader |
| 114 | default: |
| 115 | } |
| 116 | } |
| 117 | } |
| 118 | |
| 119 | // post to stopc => terminate server stream; can't use a waitgroup |
| 120 | // since all goroutines will only terminate after Watch() exits. |
| 121 | stopc := make(chan struct{}, 3) |
| 122 | go func() { |
| 123 | defer func() { stopc <- struct{}{} }() |
| 124 | wps.recvLoop() |
| 125 | }() |
| 126 | go func() { |
| 127 | defer func() { stopc <- struct{}{} }() |
| 128 | wps.sendLoop() |
| 129 | }() |
| 130 | // tear down watch if leader goes down or entire watch proxy is terminated |
| 131 | go func() { |
| 132 | defer func() { stopc <- struct{}{} }() |
| 133 | select { |
| 134 | case <-lostLeaderC: |
| 135 | case <-ctx.Done(): |
| 136 | case <-wp.ctx.Done(): |
| 137 | } |
| 138 | }() |
| 139 | |
| 140 | <-stopc |
| 141 | cancel() |
| 142 | |
| 143 | // recv/send may only shutdown after function exits; |
| 144 | // goroutine notifies proxy that stream is through |
| 145 | go func() { |
| 146 | <-stopc |
| 147 | <-stopc |
| 148 | wps.close() |
| 149 | wp.wg.Done() |
| 150 | }() |
| 151 | |
| 152 | select { |
| 153 | case <-lostLeaderC: |
| 154 | return rpctypes.ErrNoLeader |
| 155 | case <-wp.leader.disconnectNotify(): |
| 156 | return grpc.ErrClientConnClosing |
| 157 | default: |
| 158 | return wps.ctx.Err() |
| 159 | } |
| 160 | } |
| 161 | |
| 162 | // watchProxyStream forwards etcd watch events to a proxied client stream. |
| 163 | type watchProxyStream struct { |
| 164 | ranges *watchRanges |
| 165 | |
| 166 | // mu protects watchers and nextWatcherID |
| 167 | mu sync.Mutex |
| 168 | // watchers receive events from watch broadcast. |
| 169 | watchers map[int64]*watcher |
| 170 | // nextWatcherID is the id to assign the next watcher on this stream. |
| 171 | nextWatcherID int64 |
| 172 | |
| 173 | stream pb.Watch_WatchServer |
| 174 | |
| 175 | // watchCh receives watch responses from the watchers. |
| 176 | watchCh chan *pb.WatchResponse |
| 177 | |
| 178 | ctx context.Context |
| 179 | cancel context.CancelFunc |
| 180 | |
| 181 | // kv is used for permission checking |
| 182 | kv clientv3.KV |
| 183 | } |
| 184 | |
| 185 | func (wps *watchProxyStream) close() { |
| 186 | var wg sync.WaitGroup |
| 187 | wps.cancel() |
| 188 | wps.mu.Lock() |
| 189 | wg.Add(len(wps.watchers)) |
| 190 | for _, wpsw := range wps.watchers { |
| 191 | go func(w *watcher) { |
| 192 | wps.ranges.delete(w) |
| 193 | wg.Done() |
| 194 | }(wpsw) |
| 195 | } |
| 196 | wps.watchers = nil |
| 197 | wps.mu.Unlock() |
| 198 | |
| 199 | wg.Wait() |
| 200 | |
| 201 | close(wps.watchCh) |
| 202 | } |
| 203 | |
| 204 | func (wps *watchProxyStream) checkPermissionForWatch(key, rangeEnd []byte) error { |
| 205 | if len(key) == 0 { |
| 206 | // If the length of the key is 0, we need to obtain full range. |
| 207 | // look at clientv3.WithPrefix() |
| 208 | key = []byte{0} |
| 209 | rangeEnd = []byte{0} |
| 210 | } |
| 211 | req := &pb.RangeRequest{ |
| 212 | Serializable: true, |
| 213 | Key: key, |
| 214 | RangeEnd: rangeEnd, |
| 215 | CountOnly: true, |
| 216 | Limit: 1, |
| 217 | } |
| 218 | _, err := wps.kv.Do(wps.ctx, RangeRequestToOp(req)) |
| 219 | return err |
| 220 | } |
| 221 | |
| 222 | func (wps *watchProxyStream) recvLoop() error { |
| 223 | for { |
| 224 | req, err := wps.stream.Recv() |
| 225 | if err != nil { |
| 226 | return err |
| 227 | } |
| 228 | switch uv := req.RequestUnion.(type) { |
| 229 | case *pb.WatchRequest_CreateRequest: |
| 230 | cr := uv.CreateRequest |
| 231 | |
| 232 | if err = wps.checkPermissionForWatch(cr.Key, cr.RangeEnd); err != nil && err == rpctypes.ErrPermissionDenied { |
| 233 | // Return WatchResponse which is caused by permission checking if and only if |
| 234 | // the error is permission denied. For other errors (e.g. timeout or connection closed), |
| 235 | // the permission checking mechanism should do nothing for preserving error code. |
| 236 | wps.watchCh <- &pb.WatchResponse{Header: &pb.ResponseHeader{}, WatchId: -1, Created: true, Canceled: true} |
| 237 | continue |
| 238 | } |
| 239 | |
| 240 | w := &watcher{ |
| 241 | wr: watchRange{string(cr.Key), string(cr.RangeEnd)}, |
| 242 | id: wps.nextWatcherID, |
| 243 | wps: wps, |
| 244 | |
| 245 | nextrev: cr.StartRevision, |
| 246 | progress: cr.ProgressNotify, |
| 247 | prevKV: cr.PrevKv, |
| 248 | filters: v3rpc.FiltersFromRequest(cr), |
| 249 | } |
| 250 | if !w.wr.valid() { |
| 251 | w.post(&pb.WatchResponse{WatchId: -1, Created: true, Canceled: true}) |
| 252 | continue |
| 253 | } |
| 254 | wps.nextWatcherID++ |
| 255 | w.nextrev = cr.StartRevision |
| 256 | wps.watchers[w.id] = w |
| 257 | wps.ranges.add(w) |
| 258 | case *pb.WatchRequest_CancelRequest: |
| 259 | wps.delete(uv.CancelRequest.WatchId) |
| 260 | default: |
| 261 | panic("not implemented") |
| 262 | } |
| 263 | } |
| 264 | } |
| 265 | |
| 266 | func (wps *watchProxyStream) sendLoop() { |
| 267 | for { |
| 268 | select { |
| 269 | case wresp, ok := <-wps.watchCh: |
| 270 | if !ok { |
| 271 | return |
| 272 | } |
| 273 | if err := wps.stream.Send(wresp); err != nil { |
| 274 | return |
| 275 | } |
| 276 | case <-wps.ctx.Done(): |
| 277 | return |
| 278 | } |
| 279 | } |
| 280 | } |
| 281 | |
| 282 | func (wps *watchProxyStream) delete(id int64) { |
| 283 | wps.mu.Lock() |
| 284 | defer wps.mu.Unlock() |
| 285 | |
| 286 | w, ok := wps.watchers[id] |
| 287 | if !ok { |
| 288 | return |
| 289 | } |
| 290 | wps.ranges.delete(w) |
| 291 | delete(wps.watchers, id) |
| 292 | resp := &pb.WatchResponse{ |
| 293 | Header: &w.lastHeader, |
| 294 | WatchId: id, |
| 295 | Canceled: true, |
| 296 | } |
| 297 | wps.watchCh <- resp |
| 298 | } |