blob: 1613f11a60cca08e7c9e395e13dfecdd7d58505c [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package api
2
3import (
4 "errors"
5 "fmt"
6 "time"
7)
8
9const (
10 // SessionBehaviorRelease is the default behavior and causes
11 // all associated locks to be released on session invalidation.
12 SessionBehaviorRelease = "release"
13
14 // SessionBehaviorDelete is new in Consul 0.5 and changes the
15 // behavior to delete all associated locks on session invalidation.
16 // It can be used in a way similar to Ephemeral Nodes in ZooKeeper.
17 SessionBehaviorDelete = "delete"
18)
19
20var ErrSessionExpired = errors.New("session expired")
21
22// SessionEntry represents a session in consul
23type SessionEntry struct {
24 CreateIndex uint64
25 ID string
26 Name string
27 Node string
28 Checks []string
29 LockDelay time.Duration
30 Behavior string
31 TTL string
32}
33
34// Session can be used to query the Session endpoints
35type Session struct {
36 c *Client
37}
38
39// Session returns a handle to the session endpoints
40func (c *Client) Session() *Session {
41 return &Session{c}
42}
43
44// CreateNoChecks is like Create but is used specifically to create
45// a session with no associated health checks.
46func (s *Session) CreateNoChecks(se *SessionEntry, q *WriteOptions) (string, *WriteMeta, error) {
47 body := make(map[string]interface{})
48 body["Checks"] = []string{}
49 if se != nil {
50 if se.Name != "" {
51 body["Name"] = se.Name
52 }
53 if se.Node != "" {
54 body["Node"] = se.Node
55 }
56 if se.LockDelay != 0 {
57 body["LockDelay"] = durToMsec(se.LockDelay)
58 }
59 if se.Behavior != "" {
60 body["Behavior"] = se.Behavior
61 }
62 if se.TTL != "" {
63 body["TTL"] = se.TTL
64 }
65 }
66 return s.create(body, q)
67
68}
69
70// Create makes a new session. Providing a session entry can
71// customize the session. It can also be nil to use defaults.
72func (s *Session) Create(se *SessionEntry, q *WriteOptions) (string, *WriteMeta, error) {
73 var obj interface{}
74 if se != nil {
75 body := make(map[string]interface{})
76 obj = body
77 if se.Name != "" {
78 body["Name"] = se.Name
79 }
80 if se.Node != "" {
81 body["Node"] = se.Node
82 }
83 if se.LockDelay != 0 {
84 body["LockDelay"] = durToMsec(se.LockDelay)
85 }
86 if len(se.Checks) > 0 {
87 body["Checks"] = se.Checks
88 }
89 if se.Behavior != "" {
90 body["Behavior"] = se.Behavior
91 }
92 if se.TTL != "" {
93 body["TTL"] = se.TTL
94 }
95 }
96 return s.create(obj, q)
97}
98
99func (s *Session) create(obj interface{}, q *WriteOptions) (string, *WriteMeta, error) {
100 var out struct{ ID string }
101 wm, err := s.c.write("/v1/session/create", obj, &out, q)
102 if err != nil {
103 return "", nil, err
104 }
105 return out.ID, wm, nil
106}
107
108// Destroy invalidates a given session
109func (s *Session) Destroy(id string, q *WriteOptions) (*WriteMeta, error) {
110 wm, err := s.c.write("/v1/session/destroy/"+id, nil, nil, q)
111 if err != nil {
112 return nil, err
113 }
114 return wm, nil
115}
116
117// Renew renews the TTL on a given session
118func (s *Session) Renew(id string, q *WriteOptions) (*SessionEntry, *WriteMeta, error) {
119 r := s.c.newRequest("PUT", "/v1/session/renew/"+id)
120 r.setWriteOptions(q)
121 rtt, resp, err := s.c.doRequest(r)
122 if err != nil {
123 return nil, nil, err
124 }
125 defer resp.Body.Close()
126
127 wm := &WriteMeta{RequestTime: rtt}
128
129 if resp.StatusCode == 404 {
130 return nil, wm, nil
131 } else if resp.StatusCode != 200 {
132 return nil, nil, fmt.Errorf("Unexpected response code: %d", resp.StatusCode)
133 }
134
135 var entries []*SessionEntry
136 if err := decodeBody(resp, &entries); err != nil {
137 return nil, nil, fmt.Errorf("Failed to read response: %v", err)
138 }
139 if len(entries) > 0 {
140 return entries[0], wm, nil
141 }
142 return nil, wm, nil
143}
144
145// RenewPeriodic is used to periodically invoke Session.Renew on a
146// session until a doneCh is closed. This is meant to be used in a long running
147// goroutine to ensure a session stays valid.
148func (s *Session) RenewPeriodic(initialTTL string, id string, q *WriteOptions, doneCh <-chan struct{}) error {
149 ctx := q.Context()
150
151 ttl, err := time.ParseDuration(initialTTL)
152 if err != nil {
153 return err
154 }
155
156 waitDur := ttl / 2
157 lastRenewTime := time.Now()
158 var lastErr error
159 for {
160 if time.Since(lastRenewTime) > ttl {
161 return lastErr
162 }
163 select {
164 case <-time.After(waitDur):
165 entry, _, err := s.Renew(id, q)
166 if err != nil {
167 waitDur = time.Second
168 lastErr = err
169 continue
170 }
171 if entry == nil {
172 return ErrSessionExpired
173 }
174
175 // Handle the server updating the TTL
176 ttl, _ = time.ParseDuration(entry.TTL)
177 waitDur = ttl / 2
178 lastRenewTime = time.Now()
179
180 case <-doneCh:
181 // Attempt a session destroy
182 s.Destroy(id, q)
183 return nil
184
185 case <-ctx.Done():
186 // Bail immediately since attempting the destroy would
187 // use the canceled context in q, which would just bail.
188 return ctx.Err()
189 }
190 }
191}
192
193// Info looks up a single session
194func (s *Session) Info(id string, q *QueryOptions) (*SessionEntry, *QueryMeta, error) {
195 var entries []*SessionEntry
196 qm, err := s.c.query("/v1/session/info/"+id, &entries, q)
197 if err != nil {
198 return nil, nil, err
199 }
200 if len(entries) > 0 {
201 return entries[0], qm, nil
202 }
203 return nil, qm, nil
204}
205
206// List gets sessions for a node
207func (s *Session) Node(node string, q *QueryOptions) ([]*SessionEntry, *QueryMeta, error) {
208 var entries []*SessionEntry
209 qm, err := s.c.query("/v1/session/node/"+node, &entries, q)
210 if err != nil {
211 return nil, nil, err
212 }
213 return entries, qm, nil
214}
215
216// List gets all active sessions
217func (s *Session) List(q *QueryOptions) ([]*SessionEntry, *QueryMeta, error) {
218 var entries []*SessionEntry
219 qm, err := s.c.query("/v1/session/list", &entries, q)
220 if err != nil {
221 return nil, nil, err
222 }
223 return entries, qm, nil
224}