blob: 67e916dba9e56b3245d93596e86b04414449c33d [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package leasehttp
16
17import (
18 "bytes"
19 "context"
20 "errors"
21 "fmt"
22 "io/ioutil"
23 "net/http"
24 "time"
25
26 pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
27 "go.etcd.io/etcd/lease"
28 "go.etcd.io/etcd/lease/leasepb"
29 "go.etcd.io/etcd/pkg/httputil"
30)
31
32var (
33 LeasePrefix = "/leases"
34 LeaseInternalPrefix = "/leases/internal"
35 applyTimeout = time.Second
36 ErrLeaseHTTPTimeout = errors.New("waiting for node to catch up its applied index has timed out")
37)
38
39// NewHandler returns an http Handler for lease renewals
40func NewHandler(l lease.Lessor, waitch func() <-chan struct{}) http.Handler {
41 return &leaseHandler{l, waitch}
42}
43
44type leaseHandler struct {
45 l lease.Lessor
46 waitch func() <-chan struct{}
47}
48
49func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
50 if r.Method != "POST" {
51 http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
52 return
53 }
54
55 defer r.Body.Close()
56 b, err := ioutil.ReadAll(r.Body)
57 if err != nil {
58 http.Error(w, "error reading body", http.StatusBadRequest)
59 return
60 }
61
62 var v []byte
63 switch r.URL.Path {
64 case LeasePrefix:
65 lreq := pb.LeaseKeepAliveRequest{}
66 if uerr := lreq.Unmarshal(b); uerr != nil {
67 http.Error(w, "error unmarshalling request", http.StatusBadRequest)
68 return
69 }
70 select {
71 case <-h.waitch():
72 case <-time.After(applyTimeout):
73 http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
74 return
75 }
76 ttl, rerr := h.l.Renew(lease.LeaseID(lreq.ID))
77 if rerr != nil {
78 if rerr == lease.ErrLeaseNotFound {
79 http.Error(w, rerr.Error(), http.StatusNotFound)
80 return
81 }
82
83 http.Error(w, rerr.Error(), http.StatusBadRequest)
84 return
85 }
86 // TODO: fill out ResponseHeader
87 resp := &pb.LeaseKeepAliveResponse{ID: lreq.ID, TTL: ttl}
88 v, err = resp.Marshal()
89 if err != nil {
90 http.Error(w, err.Error(), http.StatusInternalServerError)
91 return
92 }
93
94 case LeaseInternalPrefix:
95 lreq := leasepb.LeaseInternalRequest{}
96 if lerr := lreq.Unmarshal(b); lerr != nil {
97 http.Error(w, "error unmarshalling request", http.StatusBadRequest)
98 return
99 }
100 select {
101 case <-h.waitch():
102 case <-time.After(applyTimeout):
103 http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
104 return
105 }
106 l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID))
107 if l == nil {
108 http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound)
109 return
110 }
111 // TODO: fill out ResponseHeader
112 resp := &leasepb.LeaseInternalResponse{
113 LeaseTimeToLiveResponse: &pb.LeaseTimeToLiveResponse{
114 Header: &pb.ResponseHeader{},
115 ID: lreq.LeaseTimeToLiveRequest.ID,
116 TTL: int64(l.Remaining().Seconds()),
117 GrantedTTL: l.TTL(),
118 },
119 }
120 if lreq.LeaseTimeToLiveRequest.Keys {
121 ks := l.Keys()
122 kbs := make([][]byte, len(ks))
123 for i := range ks {
124 kbs[i] = []byte(ks[i])
125 }
126 resp.LeaseTimeToLiveResponse.Keys = kbs
127 }
128
129 v, err = resp.Marshal()
130 if err != nil {
131 http.Error(w, err.Error(), http.StatusInternalServerError)
132 return
133 }
134
135 default:
136 http.Error(w, fmt.Sprintf("unknown request path %q", r.URL.Path), http.StatusBadRequest)
137 return
138 }
139
140 w.Header().Set("Content-Type", "application/protobuf")
141 w.Write(v)
142}
143
144// RenewHTTP renews a lease at a given primary server.
145// TODO: Batch request in future?
146func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundTripper) (int64, error) {
147 // will post lreq protobuf to leader
148 lreq, err := (&pb.LeaseKeepAliveRequest{ID: int64(id)}).Marshal()
149 if err != nil {
150 return -1, err
151 }
152
153 cc := &http.Client{Transport: rt}
154 req, err := http.NewRequest("POST", url, bytes.NewReader(lreq))
155 if err != nil {
156 return -1, err
157 }
158 req.Header.Set("Content-Type", "application/protobuf")
159 req.Cancel = ctx.Done()
160
161 resp, err := cc.Do(req)
162 if err != nil {
163 return -1, err
164 }
165 b, err := readResponse(resp)
166 if err != nil {
167 return -1, err
168 }
169
170 if resp.StatusCode == http.StatusRequestTimeout {
171 return -1, ErrLeaseHTTPTimeout
172 }
173
174 if resp.StatusCode == http.StatusNotFound {
175 return -1, lease.ErrLeaseNotFound
176 }
177
178 if resp.StatusCode != http.StatusOK {
179 return -1, fmt.Errorf("lease: unknown error(%s)", string(b))
180 }
181
182 lresp := &pb.LeaseKeepAliveResponse{}
183 if err := lresp.Unmarshal(b); err != nil {
184 return -1, fmt.Errorf(`lease: %v. data = "%s"`, err, string(b))
185 }
186 if lresp.ID != int64(id) {
187 return -1, fmt.Errorf("lease: renew id mismatch")
188 }
189 return lresp.TTL, nil
190}
191
192// TimeToLiveHTTP retrieves lease information of the given lease ID.
193func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string, rt http.RoundTripper) (*leasepb.LeaseInternalResponse, error) {
194 // will post lreq protobuf to leader
195 lreq, err := (&leasepb.LeaseInternalRequest{
196 LeaseTimeToLiveRequest: &pb.LeaseTimeToLiveRequest{
197 ID: int64(id),
198 Keys: keys,
199 },
200 }).Marshal()
201 if err != nil {
202 return nil, err
203 }
204
205 req, err := http.NewRequest("POST", url, bytes.NewReader(lreq))
206 if err != nil {
207 return nil, err
208 }
209 req.Header.Set("Content-Type", "application/protobuf")
210
211 req = req.WithContext(ctx)
212
213 cc := &http.Client{Transport: rt}
214 var b []byte
215 // buffer errc channel so that errc don't block inside the go routinue
216 resp, err := cc.Do(req)
217 if err != nil {
218 return nil, err
219 }
220 b, err = readResponse(resp)
221 if err != nil {
222 return nil, err
223 }
224 if resp.StatusCode == http.StatusRequestTimeout {
225 return nil, ErrLeaseHTTPTimeout
226 }
227 if resp.StatusCode == http.StatusNotFound {
228 return nil, lease.ErrLeaseNotFound
229 }
230 if resp.StatusCode != http.StatusOK {
231 return nil, fmt.Errorf("lease: unknown error(%s)", string(b))
232 }
233
234 lresp := &leasepb.LeaseInternalResponse{}
235 if err := lresp.Unmarshal(b); err != nil {
236 return nil, fmt.Errorf(`lease: %v. data = "%s"`, err, string(b))
237 }
238 if lresp.LeaseTimeToLiveResponse.ID != int64(id) {
239 return nil, fmt.Errorf("lease: renew id mismatch")
240 }
241 return lresp, nil
242}
243
244func readResponse(resp *http.Response) (b []byte, err error) {
245 b, err = ioutil.ReadAll(resp.Body)
246 httputil.GracefulClose(resp)
247 return
248}