blob: 3729cf37be04a3fae9b94558cc99cc56fd20f7b3 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18 "context"
19 "sync"
20 "time"
21
22 "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
23 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
24
25 "google.golang.org/grpc"
26 "google.golang.org/grpc/metadata"
27)
28
29type (
30 LeaseRevokeResponse pb.LeaseRevokeResponse
31 LeaseID int64
32)
33
34// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
35type LeaseGrantResponse struct {
36 *pb.ResponseHeader
37 ID LeaseID
38 TTL int64
39 Error string
40}
41
42// LeaseKeepAliveResponse wraps the protobuf message LeaseKeepAliveResponse.
43type LeaseKeepAliveResponse struct {
44 *pb.ResponseHeader
45 ID LeaseID
46 TTL int64
47}
48
49// LeaseTimeToLiveResponse wraps the protobuf message LeaseTimeToLiveResponse.
50type LeaseTimeToLiveResponse struct {
51 *pb.ResponseHeader
52 ID LeaseID `json:"id"`
53
54 // TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds. Expired lease will return -1.
55 TTL int64 `json:"ttl"`
56
57 // GrantedTTL is the initial granted time in seconds upon lease creation/renewal.
58 GrantedTTL int64 `json:"granted-ttl"`
59
60 // Keys is the list of keys attached to this lease.
61 Keys [][]byte `json:"keys"`
62}
63
64// LeaseStatus represents a lease status.
65type LeaseStatus struct {
66 ID LeaseID `json:"id"`
67 // TODO: TTL int64
68}
69
70// LeaseLeasesResponse wraps the protobuf message LeaseLeasesResponse.
71type LeaseLeasesResponse struct {
72 *pb.ResponseHeader
73 Leases []LeaseStatus `json:"leases"`
74}
75
76const (
77 // defaultTTL is the assumed lease TTL used for the first keepalive
78 // deadline before the actual TTL is known to the client.
79 defaultTTL = 5 * time.Second
80 // NoLease is a lease ID for the absence of a lease.
81 NoLease LeaseID = 0
82
83 // retryConnWait is how long to wait before retrying request due to an error
84 retryConnWait = 500 * time.Millisecond
85)
86
87// LeaseResponseChSize is the size of buffer to store unsent lease responses.
88// WARNING: DO NOT UPDATE.
89// Only for testing purposes.
90var LeaseResponseChSize = 16
91
92// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
93//
94// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
95type ErrKeepAliveHalted struct {
96 Reason error
97}
98
99func (e ErrKeepAliveHalted) Error() string {
100 s := "etcdclient: leases keep alive halted"
101 if e.Reason != nil {
102 s += ": " + e.Reason.Error()
103 }
104 return s
105}
106
107type Lease interface {
108 // Grant creates a new lease.
109 Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
110
111 // Revoke revokes the given lease.
112 Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
113
114 // TimeToLive retrieves the lease information of the given lease ID.
115 TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
116
117 // Leases retrieves all leases.
118 Leases(ctx context.Context) (*LeaseLeasesResponse, error)
119
120 // KeepAlive keeps the given lease alive forever. If the keepalive response
121 // posted to the channel is not consumed immediately, the lease client will
122 // continue sending keep alive requests to the etcd server at least every
123 // second until latest response is consumed.
124 //
125 // The returned "LeaseKeepAliveResponse" channel closes if underlying keep
126 // alive stream is interrupted in some way the client cannot handle itself;
127 // given context "ctx" is canceled or timed out. "LeaseKeepAliveResponse"
128 // from this closed channel is nil.
129 //
130 // If client keep alive loop halts with an unexpected error (e.g. "etcdserver:
131 // no leader") or canceled by the caller (e.g. context.Canceled), the error
132 // is returned. Otherwise, it retries.
133 //
134 // TODO(v4.0): post errors to last keep alive message before closing
135 // (see https://github.com/coreos/etcd/pull/7866)
136 KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
137
138 // KeepAliveOnce renews the lease once. The response corresponds to the
139 // first message from calling KeepAlive. If the response has a recoverable
140 // error, KeepAliveOnce will retry the RPC with a new keep alive message.
141 //
142 // In most of the cases, Keepalive should be used instead of KeepAliveOnce.
143 KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
144
145 // Close releases all resources Lease keeps for efficient communication
146 // with the etcd server.
147 Close() error
148}
149
150type lessor struct {
151 mu sync.Mutex // guards all fields
152
153 // donec is closed and loopErr is set when recvKeepAliveLoop stops
154 donec chan struct{}
155 loopErr error
156
157 remote pb.LeaseClient
158
159 stream pb.Lease_LeaseKeepAliveClient
160 streamCancel context.CancelFunc
161
162 stopCtx context.Context
163 stopCancel context.CancelFunc
164
165 keepAlives map[LeaseID]*keepAlive
166
167 // firstKeepAliveTimeout is the timeout for the first keepalive request
168 // before the actual TTL is known to the lease client
169 firstKeepAliveTimeout time.Duration
170
171 // firstKeepAliveOnce ensures stream starts after first KeepAlive call.
172 firstKeepAliveOnce sync.Once
173
174 callOpts []grpc.CallOption
175}
176
177// keepAlive multiplexes a keepalive for a lease over multiple channels
178type keepAlive struct {
179 chs []chan<- *LeaseKeepAliveResponse
180 ctxs []context.Context
181 // deadline is the time the keep alive channels close if no response
182 deadline time.Time
183 // nextKeepAlive is when to send the next keep alive message
184 nextKeepAlive time.Time
185 // donec is closed on lease revoke, expiration, or cancel.
186 donec chan struct{}
187}
188
189func NewLease(c *Client) Lease {
190 return NewLeaseFromLeaseClient(RetryLeaseClient(c), c, c.cfg.DialTimeout+time.Second)
191}
192
193func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease {
194 l := &lessor{
195 donec: make(chan struct{}),
196 keepAlives: make(map[LeaseID]*keepAlive),
197 remote: remote,
198 firstKeepAliveTimeout: keepAliveTimeout,
199 }
200 if l.firstKeepAliveTimeout == time.Second {
201 l.firstKeepAliveTimeout = defaultTTL
202 }
203 if c != nil {
204 l.callOpts = c.callOpts
205 }
206 reqLeaderCtx := WithRequireLeader(context.Background())
207 l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx)
208 return l
209}
210
211func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
212 r := &pb.LeaseGrantRequest{TTL: ttl}
213 resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...)
214 if err == nil {
215 gresp := &LeaseGrantResponse{
216 ResponseHeader: resp.GetHeader(),
217 ID: LeaseID(resp.ID),
218 TTL: resp.TTL,
219 Error: resp.Error,
220 }
221 return gresp, nil
222 }
223 return nil, toErr(ctx, err)
224}
225
226func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
227 r := &pb.LeaseRevokeRequest{ID: int64(id)}
228 resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...)
229 if err == nil {
230 return (*LeaseRevokeResponse)(resp), nil
231 }
232 return nil, toErr(ctx, err)
233}
234
235func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
236 r := toLeaseTimeToLiveRequest(id, opts...)
237 resp, err := l.remote.LeaseTimeToLive(ctx, r, l.callOpts...)
238 if err == nil {
239 gresp := &LeaseTimeToLiveResponse{
240 ResponseHeader: resp.GetHeader(),
241 ID: LeaseID(resp.ID),
242 TTL: resp.TTL,
243 GrantedTTL: resp.GrantedTTL,
244 Keys: resp.Keys,
245 }
246 return gresp, nil
247 }
248 return nil, toErr(ctx, err)
249}
250
251func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) {
252 resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}, l.callOpts...)
253 if err == nil {
254 leases := make([]LeaseStatus, len(resp.Leases))
255 for i := range resp.Leases {
256 leases[i] = LeaseStatus{ID: LeaseID(resp.Leases[i].ID)}
257 }
258 return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil
259 }
260 return nil, toErr(ctx, err)
261}
262
263func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
264 ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)
265
266 l.mu.Lock()
267 // ensure that recvKeepAliveLoop is still running
268 select {
269 case <-l.donec:
270 err := l.loopErr
271 l.mu.Unlock()
272 close(ch)
273 return ch, ErrKeepAliveHalted{Reason: err}
274 default:
275 }
276 ka, ok := l.keepAlives[id]
277 if !ok {
278 // create fresh keep alive
279 ka = &keepAlive{
280 chs: []chan<- *LeaseKeepAliveResponse{ch},
281 ctxs: []context.Context{ctx},
282 deadline: time.Now().Add(l.firstKeepAliveTimeout),
283 nextKeepAlive: time.Now(),
284 donec: make(chan struct{}),
285 }
286 l.keepAlives[id] = ka
287 } else {
288 // add channel and context to existing keep alive
289 ka.ctxs = append(ka.ctxs, ctx)
290 ka.chs = append(ka.chs, ch)
291 }
292 l.mu.Unlock()
293
294 go l.keepAliveCtxCloser(id, ctx, ka.donec)
295 l.firstKeepAliveOnce.Do(func() {
296 go l.recvKeepAliveLoop()
297 go l.deadlineLoop()
298 })
299
300 return ch, nil
301}
302
303func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
304 for {
305 resp, err := l.keepAliveOnce(ctx, id)
306 if err == nil {
307 if resp.TTL <= 0 {
308 err = rpctypes.ErrLeaseNotFound
309 }
310 return resp, err
311 }
312 if isHaltErr(ctx, err) {
313 return nil, toErr(ctx, err)
314 }
315 }
316}
317
318func (l *lessor) Close() error {
319 l.stopCancel()
320 // close for synchronous teardown if stream goroutines never launched
321 l.firstKeepAliveOnce.Do(func() { close(l.donec) })
322 <-l.donec
323 return nil
324}
325
326func (l *lessor) keepAliveCtxCloser(id LeaseID, ctx context.Context, donec <-chan struct{}) {
327 select {
328 case <-donec:
329 return
330 case <-l.donec:
331 return
332 case <-ctx.Done():
333 }
334
335 l.mu.Lock()
336 defer l.mu.Unlock()
337
338 ka, ok := l.keepAlives[id]
339 if !ok {
340 return
341 }
342
343 // close channel and remove context if still associated with keep alive
344 for i, c := range ka.ctxs {
345 if c == ctx {
346 close(ka.chs[i])
347 ka.ctxs = append(ka.ctxs[:i], ka.ctxs[i+1:]...)
348 ka.chs = append(ka.chs[:i], ka.chs[i+1:]...)
349 break
350 }
351 }
352 // remove if no one more listeners
353 if len(ka.chs) == 0 {
354 delete(l.keepAlives, id)
355 }
356}
357
358// closeRequireLeader scans keepAlives for ctxs that have require leader
359// and closes the associated channels.
360func (l *lessor) closeRequireLeader() {
361 l.mu.Lock()
362 defer l.mu.Unlock()
363 for _, ka := range l.keepAlives {
364 reqIdxs := 0
365 // find all required leader channels, close, mark as nil
366 for i, ctx := range ka.ctxs {
367 md, ok := metadata.FromOutgoingContext(ctx)
368 if !ok {
369 continue
370 }
371 ks := md[rpctypes.MetadataRequireLeaderKey]
372 if len(ks) < 1 || ks[0] != rpctypes.MetadataHasLeader {
373 continue
374 }
375 close(ka.chs[i])
376 ka.chs[i] = nil
377 reqIdxs++
378 }
379 if reqIdxs == 0 {
380 continue
381 }
382 // remove all channels that required a leader from keepalive
383 newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs)
384 newCtxs := make([]context.Context, len(newChs))
385 newIdx := 0
386 for i := range ka.chs {
387 if ka.chs[i] == nil {
388 continue
389 }
390 newChs[newIdx], newCtxs[newIdx] = ka.chs[i], ka.ctxs[newIdx]
391 newIdx++
392 }
393 ka.chs, ka.ctxs = newChs, newCtxs
394 }
395}
396
397func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
398 cctx, cancel := context.WithCancel(ctx)
399 defer cancel()
400
401 stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...)
402 if err != nil {
403 return nil, toErr(ctx, err)
404 }
405
406 err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
407 if err != nil {
408 return nil, toErr(ctx, err)
409 }
410
411 resp, rerr := stream.Recv()
412 if rerr != nil {
413 return nil, toErr(ctx, rerr)
414 }
415
416 karesp := &LeaseKeepAliveResponse{
417 ResponseHeader: resp.GetHeader(),
418 ID: LeaseID(resp.ID),
419 TTL: resp.TTL,
420 }
421 return karesp, nil
422}
423
424func (l *lessor) recvKeepAliveLoop() (gerr error) {
425 defer func() {
426 l.mu.Lock()
427 close(l.donec)
428 l.loopErr = gerr
429 for _, ka := range l.keepAlives {
430 ka.close()
431 }
432 l.keepAlives = make(map[LeaseID]*keepAlive)
433 l.mu.Unlock()
434 }()
435
436 for {
437 stream, err := l.resetRecv()
438 if err != nil {
439 if canceledByCaller(l.stopCtx, err) {
440 return err
441 }
442 } else {
443 for {
444 resp, err := stream.Recv()
445 if err != nil {
446 if canceledByCaller(l.stopCtx, err) {
447 return err
448 }
449
450 if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader {
451 l.closeRequireLeader()
452 }
453 break
454 }
455
456 l.recvKeepAlive(resp)
457 }
458 }
459
460 select {
461 case <-time.After(retryConnWait):
462 continue
463 case <-l.stopCtx.Done():
464 return l.stopCtx.Err()
465 }
466 }
467}
468
469// resetRecv opens a new lease stream and starts sending keep alive requests.
470func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
471 sctx, cancel := context.WithCancel(l.stopCtx)
472 stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...)
473 if err != nil {
474 cancel()
475 return nil, err
476 }
477
478 l.mu.Lock()
479 defer l.mu.Unlock()
480 if l.stream != nil && l.streamCancel != nil {
481 l.streamCancel()
482 }
483
484 l.streamCancel = cancel
485 l.stream = stream
486
487 go l.sendKeepAliveLoop(stream)
488 return stream, nil
489}
490
491// recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
492func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
493 karesp := &LeaseKeepAliveResponse{
494 ResponseHeader: resp.GetHeader(),
495 ID: LeaseID(resp.ID),
496 TTL: resp.TTL,
497 }
498
499 l.mu.Lock()
500 defer l.mu.Unlock()
501
502 ka, ok := l.keepAlives[karesp.ID]
503 if !ok {
504 return
505 }
506
507 if karesp.TTL <= 0 {
508 // lease expired; close all keep alive channels
509 delete(l.keepAlives, karesp.ID)
510 ka.close()
511 return
512 }
513
514 // send update to all channels
515 nextKeepAlive := time.Now().Add((time.Duration(karesp.TTL) * time.Second) / 3.0)
516 ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
517 for _, ch := range ka.chs {
518 select {
519 case ch <- karesp:
520 default:
521 }
522 // still advance in order to rate-limit keep-alive sends
523 ka.nextKeepAlive = nextKeepAlive
524 }
525}
526
527// deadlineLoop reaps any keep alive channels that have not received a response
528// within the lease TTL
529func (l *lessor) deadlineLoop() {
530 for {
531 select {
532 case <-time.After(time.Second):
533 case <-l.donec:
534 return
535 }
536 now := time.Now()
537 l.mu.Lock()
538 for id, ka := range l.keepAlives {
539 if ka.deadline.Before(now) {
540 // waited too long for response; lease may be expired
541 ka.close()
542 delete(l.keepAlives, id)
543 }
544 }
545 l.mu.Unlock()
546 }
547}
548
549// sendKeepAliveLoop sends keep alive requests for the lifetime of the given stream.
550func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
551 for {
552 var tosend []LeaseID
553
554 now := time.Now()
555 l.mu.Lock()
556 for id, ka := range l.keepAlives {
557 if ka.nextKeepAlive.Before(now) {
558 tosend = append(tosend, id)
559 }
560 }
561 l.mu.Unlock()
562
563 for _, id := range tosend {
564 r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
565 if err := stream.Send(r); err != nil {
566 // TODO do something with this error?
567 return
568 }
569 }
570
571 select {
572 case <-time.After(500 * time.Millisecond):
573 case <-stream.Context().Done():
574 return
575 case <-l.donec:
576 return
577 case <-l.stopCtx.Done():
578 return
579 }
580 }
581}
582
583func (ka *keepAlive) close() {
584 close(ka.donec)
585 for _, ch := range ka.chs {
586 close(ch)
587 }
588}