blob: 65f68b0ea0ca3ee2d9ccc4ab6cb0f55f4ed644ba [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package grpcproxy
16
17import (
18 "context"
19 "io"
20 "sync"
21 "sync/atomic"
22 "time"
23
24 "github.com/coreos/etcd/clientv3"
25 "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
26 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
27
28 "google.golang.org/grpc"
29 "google.golang.org/grpc/metadata"
30)
31
32type leaseProxy struct {
33 // leaseClient handles req from LeaseGrant() that requires a lease ID.
34 leaseClient pb.LeaseClient
35
36 lessor clientv3.Lease
37
38 ctx context.Context
39
40 leader *leader
41
42 // mu protects adding outstanding leaseProxyStream through wg.
43 mu sync.RWMutex
44
45 // wg waits until all outstanding leaseProxyStream quit.
46 wg sync.WaitGroup
47}
48
49func NewLeaseProxy(c *clientv3.Client) (pb.LeaseServer, <-chan struct{}) {
50 cctx, cancel := context.WithCancel(c.Ctx())
51 lp := &leaseProxy{
52 leaseClient: pb.NewLeaseClient(c.ActiveConnection()),
53 lessor: c.Lease,
54 ctx: cctx,
55 leader: newLeader(c.Ctx(), c.Watcher),
56 }
57 ch := make(chan struct{})
58 go func() {
59 defer close(ch)
60 <-lp.leader.stopNotify()
61 lp.mu.Lock()
62 select {
63 case <-lp.ctx.Done():
64 case <-lp.leader.disconnectNotify():
65 cancel()
66 }
67 <-lp.ctx.Done()
68 lp.mu.Unlock()
69 lp.wg.Wait()
70 }()
71 return lp, ch
72}
73
74func (lp *leaseProxy) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
75 rp, err := lp.leaseClient.LeaseGrant(ctx, cr, grpc.FailFast(false))
76 if err != nil {
77 return nil, err
78 }
79 lp.leader.gotLeader()
80 return rp, nil
81}
82
83func (lp *leaseProxy) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
84 r, err := lp.lessor.Revoke(ctx, clientv3.LeaseID(rr.ID))
85 if err != nil {
86 return nil, err
87 }
88 lp.leader.gotLeader()
89 return (*pb.LeaseRevokeResponse)(r), nil
90}
91
92func (lp *leaseProxy) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
93 var (
94 r *clientv3.LeaseTimeToLiveResponse
95 err error
96 )
97 if rr.Keys {
98 r, err = lp.lessor.TimeToLive(ctx, clientv3.LeaseID(rr.ID), clientv3.WithAttachedKeys())
99 } else {
100 r, err = lp.lessor.TimeToLive(ctx, clientv3.LeaseID(rr.ID))
101 }
102 if err != nil {
103 return nil, err
104 }
105 rp := &pb.LeaseTimeToLiveResponse{
106 Header: r.ResponseHeader,
107 ID: int64(r.ID),
108 TTL: r.TTL,
109 GrantedTTL: r.GrantedTTL,
110 Keys: r.Keys,
111 }
112 return rp, err
113}
114
115func (lp *leaseProxy) LeaseLeases(ctx context.Context, rr *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
116 r, err := lp.lessor.Leases(ctx)
117 if err != nil {
118 return nil, err
119 }
120 leases := make([]*pb.LeaseStatus, len(r.Leases))
121 for i := range r.Leases {
122 leases[i] = &pb.LeaseStatus{ID: int64(r.Leases[i].ID)}
123 }
124 rp := &pb.LeaseLeasesResponse{
125 Header: r.ResponseHeader,
126 Leases: leases,
127 }
128 return rp, err
129}
130
131func (lp *leaseProxy) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
132 lp.mu.Lock()
133 select {
134 case <-lp.ctx.Done():
135 lp.mu.Unlock()
136 return lp.ctx.Err()
137 default:
138 lp.wg.Add(1)
139 }
140 lp.mu.Unlock()
141
142 ctx, cancel := context.WithCancel(stream.Context())
143 lps := leaseProxyStream{
144 stream: stream,
145 lessor: lp.lessor,
146 keepAliveLeases: make(map[int64]*atomicCounter),
147 respc: make(chan *pb.LeaseKeepAliveResponse),
148 ctx: ctx,
149 cancel: cancel,
150 }
151
152 errc := make(chan error, 2)
153
154 var lostLeaderC <-chan struct{}
155 if md, ok := metadata.FromOutgoingContext(stream.Context()); ok {
156 v := md[rpctypes.MetadataRequireLeaderKey]
157 if len(v) > 0 && v[0] == rpctypes.MetadataHasLeader {
158 lostLeaderC = lp.leader.lostNotify()
159 // if leader is known to be lost at creation time, avoid
160 // letting events through at all
161 select {
162 case <-lostLeaderC:
163 lp.wg.Done()
164 return rpctypes.ErrNoLeader
165 default:
166 }
167 }
168 }
169 stopc := make(chan struct{}, 3)
170 go func() {
171 defer func() { stopc <- struct{}{} }()
172 if err := lps.recvLoop(); err != nil {
173 errc <- err
174 }
175 }()
176
177 go func() {
178 defer func() { stopc <- struct{}{} }()
179 if err := lps.sendLoop(); err != nil {
180 errc <- err
181 }
182 }()
183
184 // tears down LeaseKeepAlive stream if leader goes down or entire leaseProxy is terminated.
185 go func() {
186 defer func() { stopc <- struct{}{} }()
187 select {
188 case <-lostLeaderC:
189 case <-ctx.Done():
190 case <-lp.ctx.Done():
191 }
192 }()
193
194 var err error
195 select {
196 case <-stopc:
197 stopc <- struct{}{}
198 case err = <-errc:
199 }
200 cancel()
201
202 // recv/send may only shutdown after function exits;
203 // this goroutine notifies lease proxy that the stream is through
204 go func() {
205 <-stopc
206 <-stopc
207 <-stopc
208 lps.close()
209 close(errc)
210 lp.wg.Done()
211 }()
212
213 select {
214 case <-lostLeaderC:
215 return rpctypes.ErrNoLeader
216 case <-lp.leader.disconnectNotify():
217 return grpc.ErrClientConnClosing
218 default:
219 if err != nil {
220 return err
221 }
222 return ctx.Err()
223 }
224}
225
226type leaseProxyStream struct {
227 stream pb.Lease_LeaseKeepAliveServer
228
229 lessor clientv3.Lease
230 // wg tracks keepAliveLoop goroutines
231 wg sync.WaitGroup
232 // mu protects keepAliveLeases
233 mu sync.RWMutex
234 // keepAliveLeases tracks how many outstanding keepalive requests which need responses are on a lease.
235 keepAliveLeases map[int64]*atomicCounter
236 // respc receives lease keepalive responses from etcd backend
237 respc chan *pb.LeaseKeepAliveResponse
238
239 ctx context.Context
240 cancel context.CancelFunc
241}
242
243func (lps *leaseProxyStream) recvLoop() error {
244 for {
245 rr, err := lps.stream.Recv()
246 if err == io.EOF {
247 return nil
248 }
249 if err != nil {
250 return err
251 }
252 lps.mu.Lock()
253 neededResps, ok := lps.keepAliveLeases[rr.ID]
254 if !ok {
255 neededResps = &atomicCounter{}
256 lps.keepAliveLeases[rr.ID] = neededResps
257 lps.wg.Add(1)
258 go func() {
259 defer lps.wg.Done()
260 if err := lps.keepAliveLoop(rr.ID, neededResps); err != nil {
261 lps.cancel()
262 }
263 }()
264 }
265 neededResps.add(1)
266 lps.mu.Unlock()
267 }
268}
269
270func (lps *leaseProxyStream) keepAliveLoop(leaseID int64, neededResps *atomicCounter) error {
271 cctx, ccancel := context.WithCancel(lps.ctx)
272 defer ccancel()
273 respc, err := lps.lessor.KeepAlive(cctx, clientv3.LeaseID(leaseID))
274 if err != nil {
275 return err
276 }
277 // ticker expires when loop hasn't received keepalive within TTL
278 var ticker <-chan time.Time
279 for {
280 select {
281 case <-ticker:
282 lps.mu.Lock()
283 // if there are outstanding keepAlive reqs at the moment of ticker firing,
284 // don't close keepAliveLoop(), let it continuing to process the KeepAlive reqs.
285 if neededResps.get() > 0 {
286 lps.mu.Unlock()
287 ticker = nil
288 continue
289 }
290 delete(lps.keepAliveLeases, leaseID)
291 lps.mu.Unlock()
292 return nil
293 case rp, ok := <-respc:
294 if !ok {
295 lps.mu.Lock()
296 delete(lps.keepAliveLeases, leaseID)
297 lps.mu.Unlock()
298 if neededResps.get() == 0 {
299 return nil
300 }
301 ttlResp, err := lps.lessor.TimeToLive(cctx, clientv3.LeaseID(leaseID))
302 if err != nil {
303 return err
304 }
305 r := &pb.LeaseKeepAliveResponse{
306 Header: ttlResp.ResponseHeader,
307 ID: int64(ttlResp.ID),
308 TTL: ttlResp.TTL,
309 }
310 for neededResps.get() > 0 {
311 select {
312 case lps.respc <- r:
313 neededResps.add(-1)
314 case <-lps.ctx.Done():
315 return nil
316 }
317 }
318 return nil
319 }
320 if neededResps.get() == 0 {
321 continue
322 }
323 ticker = time.After(time.Duration(rp.TTL) * time.Second)
324 r := &pb.LeaseKeepAliveResponse{
325 Header: rp.ResponseHeader,
326 ID: int64(rp.ID),
327 TTL: rp.TTL,
328 }
329 lps.replyToClient(r, neededResps)
330 }
331 }
332}
333
334func (lps *leaseProxyStream) replyToClient(r *pb.LeaseKeepAliveResponse, neededResps *atomicCounter) {
335 timer := time.After(500 * time.Millisecond)
336 for neededResps.get() > 0 {
337 select {
338 case lps.respc <- r:
339 neededResps.add(-1)
340 case <-timer:
341 return
342 case <-lps.ctx.Done():
343 return
344 }
345 }
346}
347
348func (lps *leaseProxyStream) sendLoop() error {
349 for {
350 select {
351 case lrp, ok := <-lps.respc:
352 if !ok {
353 return nil
354 }
355 if err := lps.stream.Send(lrp); err != nil {
356 return err
357 }
358 case <-lps.ctx.Done():
359 return lps.ctx.Err()
360 }
361 }
362}
363
364func (lps *leaseProxyStream) close() {
365 lps.cancel()
366 lps.wg.Wait()
367 // only close respc channel if all the keepAliveLoop() goroutines have finished
368 // this ensures those goroutines don't send resp to a closed resp channel
369 close(lps.respc)
370}
371
372type atomicCounter struct {
373 counter int64
374}
375
376func (ac *atomicCounter) add(delta int64) {
377 atomic.AddInt64(&ac.counter, delta)
378}
379
380func (ac *atomicCounter) get() int64 {
381 return atomic.LoadInt64(&ac.counter)
382}