blob: c2796fc969af0eb907b6b2e07c2bbaf4491c82dd [file] [log] [blame]
Takahiro Suzukid7bf8202020-12-17 20:21:59 +09001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package clientv3
16
17import (
18 "context"
19 "sync"
20 "time"
21
22 "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
23 pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
24
25 "go.uber.org/zap"
26 "google.golang.org/grpc"
27 "google.golang.org/grpc/metadata"
28)
29
30type (
31 LeaseRevokeResponse pb.LeaseRevokeResponse
32 LeaseID int64
33)
34
35// LeaseGrantResponse wraps the protobuf message LeaseGrantResponse.
36type LeaseGrantResponse struct {
37 *pb.ResponseHeader
38 ID LeaseID
39 TTL int64
40 Error string
41}
42
43// LeaseKeepAliveResponse wraps the protobuf message LeaseKeepAliveResponse.
44type LeaseKeepAliveResponse struct {
45 *pb.ResponseHeader
46 ID LeaseID
47 TTL int64
48}
49
50// LeaseTimeToLiveResponse wraps the protobuf message LeaseTimeToLiveResponse.
51type LeaseTimeToLiveResponse struct {
52 *pb.ResponseHeader
53 ID LeaseID `json:"id"`
54
55 // TTL is the remaining TTL in seconds for the lease; the lease will expire in under TTL+1 seconds. Expired lease will return -1.
56 TTL int64 `json:"ttl"`
57
58 // GrantedTTL is the initial granted time in seconds upon lease creation/renewal.
59 GrantedTTL int64 `json:"granted-ttl"`
60
61 // Keys is the list of keys attached to this lease.
62 Keys [][]byte `json:"keys"`
63}
64
65// LeaseStatus represents a lease status.
66type LeaseStatus struct {
67 ID LeaseID `json:"id"`
68 // TODO: TTL int64
69}
70
71// LeaseLeasesResponse wraps the protobuf message LeaseLeasesResponse.
72type LeaseLeasesResponse struct {
73 *pb.ResponseHeader
74 Leases []LeaseStatus `json:"leases"`
75}
76
77const (
78 // defaultTTL is the assumed lease TTL used for the first keepalive
79 // deadline before the actual TTL is known to the client.
80 defaultTTL = 5 * time.Second
81 // NoLease is a lease ID for the absence of a lease.
82 NoLease LeaseID = 0
83
84 // retryConnWait is how long to wait before retrying request due to an error
85 retryConnWait = 500 * time.Millisecond
86)
87
88// LeaseResponseChSize is the size of buffer to store unsent lease responses.
89// WARNING: DO NOT UPDATE.
90// Only for testing purposes.
91var LeaseResponseChSize = 16
92
93// ErrKeepAliveHalted is returned if client keep alive loop halts with an unexpected error.
94//
95// This usually means that automatic lease renewal via KeepAlive is broken, but KeepAliveOnce will still work as expected.
96type ErrKeepAliveHalted struct {
97 Reason error
98}
99
100func (e ErrKeepAliveHalted) Error() string {
101 s := "etcdclient: leases keep alive halted"
102 if e.Reason != nil {
103 s += ": " + e.Reason.Error()
104 }
105 return s
106}
107
108type Lease interface {
109 // Grant creates a new lease.
110 Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
111
112 // Revoke revokes the given lease.
113 Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
114
115 // TimeToLive retrieves the lease information of the given lease ID.
116 TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
117
118 // Leases retrieves all leases.
119 Leases(ctx context.Context) (*LeaseLeasesResponse, error)
120
121 // KeepAlive attempts to keep the given lease alive forever. If the keepalive responses posted
122 // to the channel are not consumed promptly the channel may become full. When full, the lease
123 // client will continue sending keep alive requests to the etcd server, but will drop responses
124 // until there is capacity on the channel to send more responses.
125 //
126 // If client keep alive loop halts with an unexpected error (e.g. "etcdserver: no leader") or
127 // canceled by the caller (e.g. context.Canceled), KeepAlive returns a ErrKeepAliveHalted error
128 // containing the error reason.
129 //
130 // The returned "LeaseKeepAliveResponse" channel closes if underlying keep
131 // alive stream is interrupted in some way the client cannot handle itself;
132 // given context "ctx" is canceled or timed out.
133 //
134 // TODO(v4.0): post errors to last keep alive message before closing
135 // (see https://github.com/etcd-io/etcd/pull/7866)
136 KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
137
138 // KeepAliveOnce renews the lease once. The response corresponds to the
139 // first message from calling KeepAlive. If the response has a recoverable
140 // error, KeepAliveOnce will retry the RPC with a new keep alive message.
141 //
142 // In most of the cases, Keepalive should be used instead of KeepAliveOnce.
143 KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
144
145 // Close releases all resources Lease keeps for efficient communication
146 // with the etcd server.
147 Close() error
148}
149
150type lessor struct {
151 mu sync.Mutex // guards all fields
152
153 // donec is closed and loopErr is set when recvKeepAliveLoop stops
154 donec chan struct{}
155 loopErr error
156
157 remote pb.LeaseClient
158
159 stream pb.Lease_LeaseKeepAliveClient
160 streamCancel context.CancelFunc
161
162 stopCtx context.Context
163 stopCancel context.CancelFunc
164
165 keepAlives map[LeaseID]*keepAlive
166
167 // firstKeepAliveTimeout is the timeout for the first keepalive request
168 // before the actual TTL is known to the lease client
169 firstKeepAliveTimeout time.Duration
170
171 // firstKeepAliveOnce ensures stream starts after first KeepAlive call.
172 firstKeepAliveOnce sync.Once
173
174 callOpts []grpc.CallOption
175
176 lg *zap.Logger
177}
178
179// keepAlive multiplexes a keepalive for a lease over multiple channels
180type keepAlive struct {
181 chs []chan<- *LeaseKeepAliveResponse
182 ctxs []context.Context
183 // deadline is the time the keep alive channels close if no response
184 deadline time.Time
185 // nextKeepAlive is when to send the next keep alive message
186 nextKeepAlive time.Time
187 // donec is closed on lease revoke, expiration, or cancel.
188 donec chan struct{}
189}
190
191func NewLease(c *Client) Lease {
192 return NewLeaseFromLeaseClient(RetryLeaseClient(c), c, c.cfg.DialTimeout+time.Second)
193}
194
195func NewLeaseFromLeaseClient(remote pb.LeaseClient, c *Client, keepAliveTimeout time.Duration) Lease {
196 l := &lessor{
197 donec: make(chan struct{}),
198 keepAlives: make(map[LeaseID]*keepAlive),
199 remote: remote,
200 firstKeepAliveTimeout: keepAliveTimeout,
201 lg: c.lg,
202 }
203 if l.firstKeepAliveTimeout == time.Second {
204 l.firstKeepAliveTimeout = defaultTTL
205 }
206 if c != nil {
207 l.callOpts = c.callOpts
208 }
209 reqLeaderCtx := WithRequireLeader(context.Background())
210 l.stopCtx, l.stopCancel = context.WithCancel(reqLeaderCtx)
211 return l
212}
213
214func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
215 r := &pb.LeaseGrantRequest{TTL: ttl}
216 resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...)
217 if err == nil {
218 gresp := &LeaseGrantResponse{
219 ResponseHeader: resp.GetHeader(),
220 ID: LeaseID(resp.ID),
221 TTL: resp.TTL,
222 Error: resp.Error,
223 }
224 return gresp, nil
225 }
226 return nil, toErr(ctx, err)
227}
228
229func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
230 r := &pb.LeaseRevokeRequest{ID: int64(id)}
231 resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...)
232 if err == nil {
233 return (*LeaseRevokeResponse)(resp), nil
234 }
235 return nil, toErr(ctx, err)
236}
237
238func (l *lessor) TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error) {
239 r := toLeaseTimeToLiveRequest(id, opts...)
240 resp, err := l.remote.LeaseTimeToLive(ctx, r, l.callOpts...)
241 if err == nil {
242 gresp := &LeaseTimeToLiveResponse{
243 ResponseHeader: resp.GetHeader(),
244 ID: LeaseID(resp.ID),
245 TTL: resp.TTL,
246 GrantedTTL: resp.GrantedTTL,
247 Keys: resp.Keys,
248 }
249 return gresp, nil
250 }
251 return nil, toErr(ctx, err)
252}
253
254func (l *lessor) Leases(ctx context.Context) (*LeaseLeasesResponse, error) {
255 resp, err := l.remote.LeaseLeases(ctx, &pb.LeaseLeasesRequest{}, l.callOpts...)
256 if err == nil {
257 leases := make([]LeaseStatus, len(resp.Leases))
258 for i := range resp.Leases {
259 leases[i] = LeaseStatus{ID: LeaseID(resp.Leases[i].ID)}
260 }
261 return &LeaseLeasesResponse{ResponseHeader: resp.GetHeader(), Leases: leases}, nil
262 }
263 return nil, toErr(ctx, err)
264}
265
266func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
267 ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)
268
269 l.mu.Lock()
270 // ensure that recvKeepAliveLoop is still running
271 select {
272 case <-l.donec:
273 err := l.loopErr
274 l.mu.Unlock()
275 close(ch)
276 return ch, ErrKeepAliveHalted{Reason: err}
277 default:
278 }
279 ka, ok := l.keepAlives[id]
280 if !ok {
281 // create fresh keep alive
282 ka = &keepAlive{
283 chs: []chan<- *LeaseKeepAliveResponse{ch},
284 ctxs: []context.Context{ctx},
285 deadline: time.Now().Add(l.firstKeepAliveTimeout),
286 nextKeepAlive: time.Now(),
287 donec: make(chan struct{}),
288 }
289 l.keepAlives[id] = ka
290 } else {
291 // add channel and context to existing keep alive
292 ka.ctxs = append(ka.ctxs, ctx)
293 ka.chs = append(ka.chs, ch)
294 }
295 l.mu.Unlock()
296
297 go l.keepAliveCtxCloser(ctx, id, ka.donec)
298 l.firstKeepAliveOnce.Do(func() {
299 go l.recvKeepAliveLoop()
300 go l.deadlineLoop()
301 })
302
303 return ch, nil
304}
305
306func (l *lessor) KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
307 for {
308 resp, err := l.keepAliveOnce(ctx, id)
309 if err == nil {
310 if resp.TTL <= 0 {
311 err = rpctypes.ErrLeaseNotFound
312 }
313 return resp, err
314 }
315 if isHaltErr(ctx, err) {
316 return nil, toErr(ctx, err)
317 }
318 }
319}
320
321func (l *lessor) Close() error {
322 l.stopCancel()
323 // close for synchronous teardown if stream goroutines never launched
324 l.firstKeepAliveOnce.Do(func() { close(l.donec) })
325 <-l.donec
326 return nil
327}
328
329func (l *lessor) keepAliveCtxCloser(ctx context.Context, id LeaseID, donec <-chan struct{}) {
330 select {
331 case <-donec:
332 return
333 case <-l.donec:
334 return
335 case <-ctx.Done():
336 }
337
338 l.mu.Lock()
339 defer l.mu.Unlock()
340
341 ka, ok := l.keepAlives[id]
342 if !ok {
343 return
344 }
345
346 // close channel and remove context if still associated with keep alive
347 for i, c := range ka.ctxs {
348 if c == ctx {
349 close(ka.chs[i])
350 ka.ctxs = append(ka.ctxs[:i], ka.ctxs[i+1:]...)
351 ka.chs = append(ka.chs[:i], ka.chs[i+1:]...)
352 break
353 }
354 }
355 // remove if no one more listeners
356 if len(ka.chs) == 0 {
357 delete(l.keepAlives, id)
358 }
359}
360
361// closeRequireLeader scans keepAlives for ctxs that have require leader
362// and closes the associated channels.
363func (l *lessor) closeRequireLeader() {
364 l.mu.Lock()
365 defer l.mu.Unlock()
366 for _, ka := range l.keepAlives {
367 reqIdxs := 0
368 // find all required leader channels, close, mark as nil
369 for i, ctx := range ka.ctxs {
370 md, ok := metadata.FromOutgoingContext(ctx)
371 if !ok {
372 continue
373 }
374 ks := md[rpctypes.MetadataRequireLeaderKey]
375 if len(ks) < 1 || ks[0] != rpctypes.MetadataHasLeader {
376 continue
377 }
378 close(ka.chs[i])
379 ka.chs[i] = nil
380 reqIdxs++
381 }
382 if reqIdxs == 0 {
383 continue
384 }
385 // remove all channels that required a leader from keepalive
386 newChs := make([]chan<- *LeaseKeepAliveResponse, len(ka.chs)-reqIdxs)
387 newCtxs := make([]context.Context, len(newChs))
388 newIdx := 0
389 for i := range ka.chs {
390 if ka.chs[i] == nil {
391 continue
392 }
393 newChs[newIdx], newCtxs[newIdx] = ka.chs[i], ka.ctxs[newIdx]
394 newIdx++
395 }
396 ka.chs, ka.ctxs = newChs, newCtxs
397 }
398}
399
400func (l *lessor) keepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error) {
401 cctx, cancel := context.WithCancel(ctx)
402 defer cancel()
403
404 stream, err := l.remote.LeaseKeepAlive(cctx, l.callOpts...)
405 if err != nil {
406 return nil, toErr(ctx, err)
407 }
408
409 err = stream.Send(&pb.LeaseKeepAliveRequest{ID: int64(id)})
410 if err != nil {
411 return nil, toErr(ctx, err)
412 }
413
414 resp, rerr := stream.Recv()
415 if rerr != nil {
416 return nil, toErr(ctx, rerr)
417 }
418
419 karesp := &LeaseKeepAliveResponse{
420 ResponseHeader: resp.GetHeader(),
421 ID: LeaseID(resp.ID),
422 TTL: resp.TTL,
423 }
424 return karesp, nil
425}
426
427func (l *lessor) recvKeepAliveLoop() (gerr error) {
428 defer func() {
429 l.mu.Lock()
430 close(l.donec)
431 l.loopErr = gerr
432 for _, ka := range l.keepAlives {
433 ka.close()
434 }
435 l.keepAlives = make(map[LeaseID]*keepAlive)
436 l.mu.Unlock()
437 }()
438
439 for {
440 stream, err := l.resetRecv()
441 if err != nil {
442 if canceledByCaller(l.stopCtx, err) {
443 return err
444 }
445 } else {
446 for {
447 resp, err := stream.Recv()
448 if err != nil {
449 if canceledByCaller(l.stopCtx, err) {
450 return err
451 }
452
453 if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader {
454 l.closeRequireLeader()
455 }
456 break
457 }
458
459 l.recvKeepAlive(resp)
460 }
461 }
462
463 select {
464 case <-time.After(retryConnWait):
465 case <-l.stopCtx.Done():
466 return l.stopCtx.Err()
467 }
468 }
469}
470
471// resetRecv opens a new lease stream and starts sending keep alive requests.
472func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
473 sctx, cancel := context.WithCancel(l.stopCtx)
474 stream, err := l.remote.LeaseKeepAlive(sctx, append(l.callOpts, withMax(0))...)
475 if err != nil {
476 cancel()
477 return nil, err
478 }
479
480 l.mu.Lock()
481 defer l.mu.Unlock()
482 if l.stream != nil && l.streamCancel != nil {
483 l.streamCancel()
484 }
485
486 l.streamCancel = cancel
487 l.stream = stream
488
489 go l.sendKeepAliveLoop(stream)
490 return stream, nil
491}
492
493// recvKeepAlive updates a lease based on its LeaseKeepAliveResponse
494func (l *lessor) recvKeepAlive(resp *pb.LeaseKeepAliveResponse) {
495 karesp := &LeaseKeepAliveResponse{
496 ResponseHeader: resp.GetHeader(),
497 ID: LeaseID(resp.ID),
498 TTL: resp.TTL,
499 }
500
501 l.mu.Lock()
502 defer l.mu.Unlock()
503
504 ka, ok := l.keepAlives[karesp.ID]
505 if !ok {
506 return
507 }
508
509 if karesp.TTL <= 0 {
510 // lease expired; close all keep alive channels
511 delete(l.keepAlives, karesp.ID)
512 ka.close()
513 return
514 }
515
516 // send update to all channels
517 nextKeepAlive := time.Now().Add((time.Duration(karesp.TTL) * time.Second) / 3.0)
518 ka.deadline = time.Now().Add(time.Duration(karesp.TTL) * time.Second)
519 for _, ch := range ka.chs {
520 select {
521 case ch <- karesp:
522 default:
523 if l.lg != nil {
524 l.lg.Warn("lease keepalive response queue is full; dropping response send",
525 zap.Int("queue-size", len(ch)),
526 zap.Int("queue-capacity", cap(ch)),
527 )
528 }
529 }
530 // still advance in order to rate-limit keep-alive sends
531 ka.nextKeepAlive = nextKeepAlive
532 }
533}
534
535// deadlineLoop reaps any keep alive channels that have not received a response
536// within the lease TTL
537func (l *lessor) deadlineLoop() {
538 for {
539 select {
540 case <-time.After(time.Second):
541 case <-l.donec:
542 return
543 }
544 now := time.Now()
545 l.mu.Lock()
546 for id, ka := range l.keepAlives {
547 if ka.deadline.Before(now) {
548 // waited too long for response; lease may be expired
549 ka.close()
550 delete(l.keepAlives, id)
551 }
552 }
553 l.mu.Unlock()
554 }
555}
556
557// sendKeepAliveLoop sends keep alive requests for the lifetime of the given stream.
558func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
559 for {
560 var tosend []LeaseID
561
562 now := time.Now()
563 l.mu.Lock()
564 for id, ka := range l.keepAlives {
565 if ka.nextKeepAlive.Before(now) {
566 tosend = append(tosend, id)
567 }
568 }
569 l.mu.Unlock()
570
571 for _, id := range tosend {
572 r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
573 if err := stream.Send(r); err != nil {
574 // TODO do something with this error?
575 return
576 }
577 }
578
579 select {
580 case <-time.After(retryConnWait):
581 case <-stream.Context().Done():
582 return
583 case <-l.donec:
584 return
585 case <-l.stopCtx.Done():
586 return
587 }
588 }
589}
590
591func (ka *keepAlive) close() {
592 close(ka.donec)
593 for _, ch := range ka.chs {
594 close(ch)
595 }
596}