blob: 97eb7631067d2747c19d66a3b3d8db5627b0c703 [file] [log] [blame]
Scott Bakereee8dd82019-09-24 12:52:34 -07001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package concurrency
16
17import (
18 "context"
19 "time"
20
21 v3 "go.etcd.io/etcd/clientv3"
22)
23
24const defaultSessionTTL = 60
25
26// Session represents a lease kept alive for the lifetime of a client.
27// Fault-tolerant applications may use sessions to reason about liveness.
28type Session struct {
29 client *v3.Client
30 opts *sessionOptions
31 id v3.LeaseID
32
33 cancel context.CancelFunc
34 donec <-chan struct{}
35}
36
37// NewSession gets the leased session for a client.
38func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
39 ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
40 for _, opt := range opts {
41 opt(ops)
42 }
43
44 id := ops.leaseID
45 if id == v3.NoLease {
46 resp, err := client.Grant(ops.ctx, int64(ops.ttl))
47 if err != nil {
48 return nil, err
49 }
Scott Baker611f6bd2019-10-18 13:45:19 -070050 id = resp.ID
Scott Bakereee8dd82019-09-24 12:52:34 -070051 }
52
53 ctx, cancel := context.WithCancel(ops.ctx)
54 keepAlive, err := client.KeepAlive(ctx, id)
55 if err != nil || keepAlive == nil {
56 cancel()
57 return nil, err
58 }
59
60 donec := make(chan struct{})
61 s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}
62
63 // keep the lease alive until client error or cancelled context
64 go func() {
65 defer close(donec)
66 for range keepAlive {
67 // eat messages until keep alive channel closes
68 }
69 }()
70
71 return s, nil
72}
73
74// Client is the etcd client that is attached to the session.
75func (s *Session) Client() *v3.Client {
76 return s.client
77}
78
79// Lease is the lease ID for keys bound to the session.
80func (s *Session) Lease() v3.LeaseID { return s.id }
81
82// Done returns a channel that closes when the lease is orphaned, expires, or
83// is otherwise no longer being refreshed.
84func (s *Session) Done() <-chan struct{} { return s.donec }
85
86// Orphan ends the refresh for the session lease. This is useful
87// in case the state of the client connection is indeterminate (revoke
88// would fail) or when transferring lease ownership.
89func (s *Session) Orphan() {
90 s.cancel()
91 <-s.donec
92}
93
94// Close orphans the session and revokes the session lease.
95func (s *Session) Close() error {
96 s.Orphan()
97 // if revoke takes longer than the ttl, lease is expired anyway
98 ctx, cancel := context.WithTimeout(s.opts.ctx, time.Duration(s.opts.ttl)*time.Second)
99 _, err := s.client.Revoke(ctx, s.id)
100 cancel()
101 return err
102}
103
104type sessionOptions struct {
105 ttl int
106 leaseID v3.LeaseID
107 ctx context.Context
108}
109
110// SessionOption configures Session.
111type SessionOption func(*sessionOptions)
112
113// WithTTL configures the session's TTL in seconds.
114// If TTL is <= 0, the default 60 seconds TTL will be used.
115func WithTTL(ttl int) SessionOption {
116 return func(so *sessionOptions) {
117 if ttl > 0 {
118 so.ttl = ttl
119 }
120 }
121}
122
123// WithLease specifies the existing leaseID to be used for the session.
124// This is useful in process restart scenario, for example, to reclaim
125// leadership from an election prior to restart.
126func WithLease(leaseID v3.LeaseID) SessionOption {
127 return func(so *sessionOptions) {
128 so.leaseID = leaseID
129 }
130}
131
132// WithContext assigns a context to the session instead of defaulting to
133// using the client context. This is useful for canceling NewSession and
134// Close operations immediately without having to close the client. If the
135// context is canceled before Close() completes, the session's lease will be
136// abandoned and left to expire instead of being revoked.
137func WithContext(ctx context.Context) SessionOption {
138 return func(so *sessionOptions) {
139 so.ctx = ctx
140 }
141}