[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/coreos/etcd/lease/doc.go b/vendor/github.com/coreos/etcd/lease/doc.go
new file mode 100644
index 0000000..a74eaf7
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/lease/doc.go
@@ -0,0 +1,16 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package lease provides an interface and implementation for time-limited leases over arbitrary resources.
+package lease
diff --git a/vendor/github.com/coreos/etcd/lease/leasehttp/doc.go b/vendor/github.com/coreos/etcd/lease/leasehttp/doc.go
new file mode 100644
index 0000000..8177a37
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/lease/leasehttp/doc.go
@@ -0,0 +1,16 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package leasehttp serves lease renewals made through HTTP requests.
+package leasehttp
diff --git a/vendor/github.com/coreos/etcd/lease/leasehttp/http.go b/vendor/github.com/coreos/etcd/lease/leasehttp/http.go
new file mode 100644
index 0000000..ac2e788
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/lease/leasehttp/http.go
@@ -0,0 +1,247 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package leasehttp
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "time"
+
+ pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+ "github.com/coreos/etcd/lease"
+ "github.com/coreos/etcd/lease/leasepb"
+ "github.com/coreos/etcd/pkg/httputil"
+)
+
+var (
+ LeasePrefix = "/leases"
+ LeaseInternalPrefix = "/leases/internal"
+ applyTimeout = time.Second
+ ErrLeaseHTTPTimeout = errors.New("waiting for node to catch up its applied index has timed out")
+)
+
+// NewHandler returns an http Handler for lease renewals
+func NewHandler(l lease.Lessor, waitch func() <-chan struct{}) http.Handler {
+ return &leaseHandler{l, waitch}
+}
+
+type leaseHandler struct {
+ l lease.Lessor
+ waitch func() <-chan struct{}
+}
+
+func (h *leaseHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if r.Method != "POST" {
+ http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ b, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ http.Error(w, "error reading body", http.StatusBadRequest)
+ return
+ }
+
+ var v []byte
+ switch r.URL.Path {
+ case LeasePrefix:
+ lreq := pb.LeaseKeepAliveRequest{}
+ if err := lreq.Unmarshal(b); err != nil {
+ http.Error(w, "error unmarshalling request", http.StatusBadRequest)
+ return
+ }
+ select {
+ case <-h.waitch():
+ case <-time.After(applyTimeout):
+ http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
+ return
+ }
+ ttl, err := h.l.Renew(lease.LeaseID(lreq.ID))
+ if err != nil {
+ if err == lease.ErrLeaseNotFound {
+ http.Error(w, err.Error(), http.StatusNotFound)
+ return
+ }
+
+ http.Error(w, err.Error(), http.StatusBadRequest)
+ return
+ }
+ // TODO: fill out ResponseHeader
+ resp := &pb.LeaseKeepAliveResponse{ID: lreq.ID, TTL: ttl}
+ v, err = resp.Marshal()
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ case LeaseInternalPrefix:
+ lreq := leasepb.LeaseInternalRequest{}
+ if err := lreq.Unmarshal(b); err != nil {
+ http.Error(w, "error unmarshalling request", http.StatusBadRequest)
+ return
+ }
+ select {
+ case <-h.waitch():
+ case <-time.After(applyTimeout):
+ http.Error(w, ErrLeaseHTTPTimeout.Error(), http.StatusRequestTimeout)
+ return
+ }
+ l := h.l.Lookup(lease.LeaseID(lreq.LeaseTimeToLiveRequest.ID))
+ if l == nil {
+ http.Error(w, lease.ErrLeaseNotFound.Error(), http.StatusNotFound)
+ return
+ }
+ // TODO: fill out ResponseHeader
+ resp := &leasepb.LeaseInternalResponse{
+ LeaseTimeToLiveResponse: &pb.LeaseTimeToLiveResponse{
+ Header: &pb.ResponseHeader{},
+ ID: lreq.LeaseTimeToLiveRequest.ID,
+ TTL: int64(l.Remaining().Seconds()),
+ GrantedTTL: l.TTL(),
+ },
+ }
+ if lreq.LeaseTimeToLiveRequest.Keys {
+ ks := l.Keys()
+ kbs := make([][]byte, len(ks))
+ for i := range ks {
+ kbs[i] = []byte(ks[i])
+ }
+ resp.LeaseTimeToLiveResponse.Keys = kbs
+ }
+
+ v, err = resp.Marshal()
+ if err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ return
+ }
+
+ default:
+ http.Error(w, fmt.Sprintf("unknown request path %q", r.URL.Path), http.StatusBadRequest)
+ return
+ }
+
+ w.Header().Set("Content-Type", "application/protobuf")
+ w.Write(v)
+}
+
+// RenewHTTP renews a lease at a given primary server.
+// TODO: Batch request in future?
+func RenewHTTP(ctx context.Context, id lease.LeaseID, url string, rt http.RoundTripper) (int64, error) {
+ // will post lreq protobuf to leader
+ lreq, err := (&pb.LeaseKeepAliveRequest{ID: int64(id)}).Marshal()
+ if err != nil {
+ return -1, err
+ }
+
+ cc := &http.Client{Transport: rt}
+ req, err := http.NewRequest("POST", url, bytes.NewReader(lreq))
+ if err != nil {
+ return -1, err
+ }
+ req.Header.Set("Content-Type", "application/protobuf")
+ req.Cancel = ctx.Done()
+
+ resp, err := cc.Do(req)
+ if err != nil {
+ return -1, err
+ }
+ b, err := readResponse(resp)
+ if err != nil {
+ return -1, err
+ }
+
+ if resp.StatusCode == http.StatusRequestTimeout {
+ return -1, ErrLeaseHTTPTimeout
+ }
+
+ if resp.StatusCode == http.StatusNotFound {
+ return -1, lease.ErrLeaseNotFound
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ return -1, fmt.Errorf("lease: unknown error(%s)", string(b))
+ }
+
+ lresp := &pb.LeaseKeepAliveResponse{}
+ if err := lresp.Unmarshal(b); err != nil {
+ return -1, fmt.Errorf(`lease: %v. data = "%s"`, err, string(b))
+ }
+ if lresp.ID != int64(id) {
+ return -1, fmt.Errorf("lease: renew id mismatch")
+ }
+ return lresp.TTL, nil
+}
+
+// TimeToLiveHTTP retrieves lease information of the given lease ID.
+func TimeToLiveHTTP(ctx context.Context, id lease.LeaseID, keys bool, url string, rt http.RoundTripper) (*leasepb.LeaseInternalResponse, error) {
+ // will post lreq protobuf to leader
+ lreq, err := (&leasepb.LeaseInternalRequest{
+ LeaseTimeToLiveRequest: &pb.LeaseTimeToLiveRequest{
+ ID: int64(id),
+ Keys: keys,
+ },
+ }).Marshal()
+ if err != nil {
+ return nil, err
+ }
+
+ req, err := http.NewRequest("POST", url, bytes.NewReader(lreq))
+ if err != nil {
+ return nil, err
+ }
+ req.Header.Set("Content-Type", "application/protobuf")
+
+ req = req.WithContext(ctx)
+
+ cc := &http.Client{Transport: rt}
+ var b []byte
+ // buffer errc channel so that errc don't block inside the go routinue
+ resp, err := cc.Do(req)
+ if err != nil {
+ return nil, err
+ }
+ b, err = readResponse(resp)
+ if err != nil {
+ return nil, err
+ }
+ if resp.StatusCode == http.StatusRequestTimeout {
+ return nil, ErrLeaseHTTPTimeout
+ }
+ if resp.StatusCode == http.StatusNotFound {
+ return nil, lease.ErrLeaseNotFound
+ }
+ if resp.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("lease: unknown error(%s)", string(b))
+ }
+
+ lresp := &leasepb.LeaseInternalResponse{}
+ if err := lresp.Unmarshal(b); err != nil {
+ return nil, fmt.Errorf(`lease: %v. data = "%s"`, err, string(b))
+ }
+ if lresp.LeaseTimeToLiveResponse.ID != int64(id) {
+ return nil, fmt.Errorf("lease: renew id mismatch")
+ }
+ return lresp, nil
+}
+
+func readResponse(resp *http.Response) (b []byte, err error) {
+ b, err = ioutil.ReadAll(resp.Body)
+ httputil.GracefulClose(resp)
+ return
+}
diff --git a/vendor/github.com/coreos/etcd/lease/leasepb/lease.pb.go b/vendor/github.com/coreos/etcd/lease/leasepb/lease.pb.go
new file mode 100644
index 0000000..433f0aa
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/lease/leasepb/lease.pb.go
@@ -0,0 +1,739 @@
+// Code generated by protoc-gen-gogo. DO NOT EDIT.
+// source: lease.proto
+
+package leasepb
+
+import (
+ fmt "fmt"
+ io "io"
+ math "math"
+ math_bits "math/bits"
+
+ etcdserverpb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+ _ "github.com/gogo/protobuf/gogoproto"
+ proto "github.com/golang/protobuf/proto"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type Lease struct {
+ ID int64 `protobuf:"varint,1,opt,name=ID,proto3" json:"ID,omitempty"`
+ TTL int64 `protobuf:"varint,2,opt,name=TTL,proto3" json:"TTL,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *Lease) Reset() { *m = Lease{} }
+func (m *Lease) String() string { return proto.CompactTextString(m) }
+func (*Lease) ProtoMessage() {}
+func (*Lease) Descriptor() ([]byte, []int) {
+ return fileDescriptor_3dd57e402472b33a, []int{0}
+}
+func (m *Lease) XXX_Unmarshal(b []byte) error {
+ return m.Unmarshal(b)
+}
+func (m *Lease) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ if deterministic {
+ return xxx_messageInfo_Lease.Marshal(b, m, deterministic)
+ } else {
+ b = b[:cap(b)]
+ n, err := m.MarshalToSizedBuffer(b)
+ if err != nil {
+ return nil, err
+ }
+ return b[:n], nil
+ }
+}
+func (m *Lease) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_Lease.Merge(m, src)
+}
+func (m *Lease) XXX_Size() int {
+ return m.Size()
+}
+func (m *Lease) XXX_DiscardUnknown() {
+ xxx_messageInfo_Lease.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_Lease proto.InternalMessageInfo
+
+type LeaseInternalRequest struct {
+ LeaseTimeToLiveRequest *etcdserverpb.LeaseTimeToLiveRequest `protobuf:"bytes,1,opt,name=LeaseTimeToLiveRequest,proto3" json:"LeaseTimeToLiveRequest,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *LeaseInternalRequest) Reset() { *m = LeaseInternalRequest{} }
+func (m *LeaseInternalRequest) String() string { return proto.CompactTextString(m) }
+func (*LeaseInternalRequest) ProtoMessage() {}
+func (*LeaseInternalRequest) Descriptor() ([]byte, []int) {
+ return fileDescriptor_3dd57e402472b33a, []int{1}
+}
+func (m *LeaseInternalRequest) XXX_Unmarshal(b []byte) error {
+ return m.Unmarshal(b)
+}
+func (m *LeaseInternalRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ if deterministic {
+ return xxx_messageInfo_LeaseInternalRequest.Marshal(b, m, deterministic)
+ } else {
+ b = b[:cap(b)]
+ n, err := m.MarshalToSizedBuffer(b)
+ if err != nil {
+ return nil, err
+ }
+ return b[:n], nil
+ }
+}
+func (m *LeaseInternalRequest) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_LeaseInternalRequest.Merge(m, src)
+}
+func (m *LeaseInternalRequest) XXX_Size() int {
+ return m.Size()
+}
+func (m *LeaseInternalRequest) XXX_DiscardUnknown() {
+ xxx_messageInfo_LeaseInternalRequest.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_LeaseInternalRequest proto.InternalMessageInfo
+
+type LeaseInternalResponse struct {
+ LeaseTimeToLiveResponse *etcdserverpb.LeaseTimeToLiveResponse `protobuf:"bytes,1,opt,name=LeaseTimeToLiveResponse,proto3" json:"LeaseTimeToLiveResponse,omitempty"`
+ XXX_NoUnkeyedLiteral struct{} `json:"-"`
+ XXX_unrecognized []byte `json:"-"`
+ XXX_sizecache int32 `json:"-"`
+}
+
+func (m *LeaseInternalResponse) Reset() { *m = LeaseInternalResponse{} }
+func (m *LeaseInternalResponse) String() string { return proto.CompactTextString(m) }
+func (*LeaseInternalResponse) ProtoMessage() {}
+func (*LeaseInternalResponse) Descriptor() ([]byte, []int) {
+ return fileDescriptor_3dd57e402472b33a, []int{2}
+}
+func (m *LeaseInternalResponse) XXX_Unmarshal(b []byte) error {
+ return m.Unmarshal(b)
+}
+func (m *LeaseInternalResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
+ if deterministic {
+ return xxx_messageInfo_LeaseInternalResponse.Marshal(b, m, deterministic)
+ } else {
+ b = b[:cap(b)]
+ n, err := m.MarshalToSizedBuffer(b)
+ if err != nil {
+ return nil, err
+ }
+ return b[:n], nil
+ }
+}
+func (m *LeaseInternalResponse) XXX_Merge(src proto.Message) {
+ xxx_messageInfo_LeaseInternalResponse.Merge(m, src)
+}
+func (m *LeaseInternalResponse) XXX_Size() int {
+ return m.Size()
+}
+func (m *LeaseInternalResponse) XXX_DiscardUnknown() {
+ xxx_messageInfo_LeaseInternalResponse.DiscardUnknown(m)
+}
+
+var xxx_messageInfo_LeaseInternalResponse proto.InternalMessageInfo
+
+func init() {
+ proto.RegisterType((*Lease)(nil), "leasepb.Lease")
+ proto.RegisterType((*LeaseInternalRequest)(nil), "leasepb.LeaseInternalRequest")
+ proto.RegisterType((*LeaseInternalResponse)(nil), "leasepb.LeaseInternalResponse")
+}
+
+func init() { proto.RegisterFile("lease.proto", fileDescriptor_3dd57e402472b33a) }
+
+var fileDescriptor_3dd57e402472b33a = []byte{
+ // 233 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xe2, 0xce, 0x49, 0x4d, 0x2c,
+ 0x4e, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x07, 0x73, 0x0a, 0x92, 0xa4, 0x44, 0xd2,
+ 0xf3, 0xd3, 0xf3, 0xc1, 0x62, 0xfa, 0x20, 0x16, 0x44, 0x5a, 0x4a, 0x2d, 0xb5, 0x24, 0x39, 0x45,
+ 0x1f, 0x44, 0x14, 0xa7, 0x16, 0x95, 0xa5, 0x16, 0x21, 0x31, 0x0b, 0x92, 0xf4, 0x8b, 0x0a, 0x92,
+ 0x21, 0xea, 0x94, 0x34, 0xb9, 0x58, 0x7d, 0x40, 0x06, 0x09, 0xf1, 0x71, 0x31, 0x79, 0xba, 0x48,
+ 0x30, 0x2a, 0x30, 0x6a, 0x30, 0x07, 0x31, 0x79, 0xba, 0x08, 0x09, 0x70, 0x31, 0x87, 0x84, 0xf8,
+ 0x48, 0x30, 0x81, 0x05, 0x40, 0x4c, 0xa5, 0x12, 0x2e, 0x11, 0xb0, 0x52, 0xcf, 0xbc, 0x92, 0xd4,
+ 0xa2, 0xbc, 0xc4, 0x9c, 0xa0, 0xd4, 0xc2, 0xd2, 0xd4, 0xe2, 0x12, 0xa1, 0x18, 0x2e, 0x31, 0xb0,
+ 0x78, 0x48, 0x66, 0x6e, 0x6a, 0x48, 0xbe, 0x4f, 0x66, 0x59, 0x2a, 0x54, 0x06, 0x6c, 0x1a, 0xb7,
+ 0x91, 0x8a, 0x1e, 0xb2, 0xdd, 0x7a, 0xd8, 0xd5, 0x06, 0xe1, 0x30, 0x43, 0xa9, 0x82, 0x4b, 0x14,
+ 0xcd, 0xd6, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0xa1, 0x78, 0x2e, 0x71, 0x0c, 0x2d, 0x10, 0x29,
+ 0xa8, 0xbd, 0xaa, 0x04, 0xec, 0x85, 0x28, 0x0e, 0xc2, 0x65, 0x8a, 0x93, 0xc4, 0x89, 0x87, 0x72,
+ 0x0c, 0x17, 0x1e, 0xca, 0x31, 0x9c, 0x78, 0x24, 0xc7, 0x78, 0xe1, 0x91, 0x1c, 0xe3, 0x83, 0x47,
+ 0x72, 0x8c, 0x33, 0x1e, 0xcb, 0x31, 0x24, 0xb1, 0x81, 0xc3, 0xce, 0x18, 0x10, 0x00, 0x00, 0xff,
+ 0xff, 0x9f, 0xf2, 0x42, 0xe0, 0x91, 0x01, 0x00, 0x00,
+}
+
+func (m *Lease) Marshal() (dAtA []byte, err error) {
+ size := m.Size()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalToSizedBuffer(dAtA[:size])
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *Lease) MarshalTo(dAtA []byte) (int, error) {
+ size := m.Size()
+ return m.MarshalToSizedBuffer(dAtA[:size])
+}
+
+func (m *Lease) MarshalToSizedBuffer(dAtA []byte) (int, error) {
+ i := len(dAtA)
+ _ = i
+ var l int
+ _ = l
+ if m.XXX_unrecognized != nil {
+ i -= len(m.XXX_unrecognized)
+ copy(dAtA[i:], m.XXX_unrecognized)
+ }
+ if m.TTL != 0 {
+ i = encodeVarintLease(dAtA, i, uint64(m.TTL))
+ i--
+ dAtA[i] = 0x10
+ }
+ if m.ID != 0 {
+ i = encodeVarintLease(dAtA, i, uint64(m.ID))
+ i--
+ dAtA[i] = 0x8
+ }
+ return len(dAtA) - i, nil
+}
+
+func (m *LeaseInternalRequest) Marshal() (dAtA []byte, err error) {
+ size := m.Size()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalToSizedBuffer(dAtA[:size])
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *LeaseInternalRequest) MarshalTo(dAtA []byte) (int, error) {
+ size := m.Size()
+ return m.MarshalToSizedBuffer(dAtA[:size])
+}
+
+func (m *LeaseInternalRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
+ i := len(dAtA)
+ _ = i
+ var l int
+ _ = l
+ if m.XXX_unrecognized != nil {
+ i -= len(m.XXX_unrecognized)
+ copy(dAtA[i:], m.XXX_unrecognized)
+ }
+ if m.LeaseTimeToLiveRequest != nil {
+ {
+ size, err := m.LeaseTimeToLiveRequest.MarshalToSizedBuffer(dAtA[:i])
+ if err != nil {
+ return 0, err
+ }
+ i -= size
+ i = encodeVarintLease(dAtA, i, uint64(size))
+ }
+ i--
+ dAtA[i] = 0xa
+ }
+ return len(dAtA) - i, nil
+}
+
+func (m *LeaseInternalResponse) Marshal() (dAtA []byte, err error) {
+ size := m.Size()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalToSizedBuffer(dAtA[:size])
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *LeaseInternalResponse) MarshalTo(dAtA []byte) (int, error) {
+ size := m.Size()
+ return m.MarshalToSizedBuffer(dAtA[:size])
+}
+
+func (m *LeaseInternalResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) {
+ i := len(dAtA)
+ _ = i
+ var l int
+ _ = l
+ if m.XXX_unrecognized != nil {
+ i -= len(m.XXX_unrecognized)
+ copy(dAtA[i:], m.XXX_unrecognized)
+ }
+ if m.LeaseTimeToLiveResponse != nil {
+ {
+ size, err := m.LeaseTimeToLiveResponse.MarshalToSizedBuffer(dAtA[:i])
+ if err != nil {
+ return 0, err
+ }
+ i -= size
+ i = encodeVarintLease(dAtA, i, uint64(size))
+ }
+ i--
+ dAtA[i] = 0xa
+ }
+ return len(dAtA) - i, nil
+}
+
+func encodeVarintLease(dAtA []byte, offset int, v uint64) int {
+ offset -= sovLease(v)
+ base := offset
+ for v >= 1<<7 {
+ dAtA[offset] = uint8(v&0x7f | 0x80)
+ v >>= 7
+ offset++
+ }
+ dAtA[offset] = uint8(v)
+ return base
+}
+func (m *Lease) Size() (n int) {
+ if m == nil {
+ return 0
+ }
+ var l int
+ _ = l
+ if m.ID != 0 {
+ n += 1 + sovLease(uint64(m.ID))
+ }
+ if m.TTL != 0 {
+ n += 1 + sovLease(uint64(m.TTL))
+ }
+ if m.XXX_unrecognized != nil {
+ n += len(m.XXX_unrecognized)
+ }
+ return n
+}
+
+func (m *LeaseInternalRequest) Size() (n int) {
+ if m == nil {
+ return 0
+ }
+ var l int
+ _ = l
+ if m.LeaseTimeToLiveRequest != nil {
+ l = m.LeaseTimeToLiveRequest.Size()
+ n += 1 + l + sovLease(uint64(l))
+ }
+ if m.XXX_unrecognized != nil {
+ n += len(m.XXX_unrecognized)
+ }
+ return n
+}
+
+func (m *LeaseInternalResponse) Size() (n int) {
+ if m == nil {
+ return 0
+ }
+ var l int
+ _ = l
+ if m.LeaseTimeToLiveResponse != nil {
+ l = m.LeaseTimeToLiveResponse.Size()
+ n += 1 + l + sovLease(uint64(l))
+ }
+ if m.XXX_unrecognized != nil {
+ n += len(m.XXX_unrecognized)
+ }
+ return n
+}
+
+func sovLease(x uint64) (n int) {
+ return (math_bits.Len64(x|1) + 6) / 7
+}
+func sozLease(x uint64) (n int) {
+ return sovLease(uint64((x << 1) ^ uint64((int64(x) >> 63))))
+}
+func (m *Lease) Unmarshal(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowLease
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= uint64(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: Lease: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: Lease: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType)
+ }
+ m.ID = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowLease
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.ID |= int64(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 2:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field TTL", wireType)
+ }
+ m.TTL = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowLease
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.TTL |= int64(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ default:
+ iNdEx = preIndex
+ skippy, err := skipLease(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if skippy < 0 {
+ return ErrInvalidLengthLease
+ }
+ if (iNdEx + skippy) < 0 {
+ return ErrInvalidLengthLease
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
+func (m *LeaseInternalRequest) Unmarshal(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowLease
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= uint64(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: LeaseInternalRequest: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: LeaseInternalRequest: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field LeaseTimeToLiveRequest", wireType)
+ }
+ var msglen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowLease
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ msglen |= int(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if msglen < 0 {
+ return ErrInvalidLengthLease
+ }
+ postIndex := iNdEx + msglen
+ if postIndex < 0 {
+ return ErrInvalidLengthLease
+ }
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ if m.LeaseTimeToLiveRequest == nil {
+ m.LeaseTimeToLiveRequest = &etcdserverpb.LeaseTimeToLiveRequest{}
+ }
+ if err := m.LeaseTimeToLiveRequest.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
+ return err
+ }
+ iNdEx = postIndex
+ default:
+ iNdEx = preIndex
+ skippy, err := skipLease(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if skippy < 0 {
+ return ErrInvalidLengthLease
+ }
+ if (iNdEx + skippy) < 0 {
+ return ErrInvalidLengthLease
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
+func (m *LeaseInternalResponse) Unmarshal(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowLease
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= uint64(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: LeaseInternalResponse: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: LeaseInternalResponse: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field LeaseTimeToLiveResponse", wireType)
+ }
+ var msglen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowLease
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ msglen |= int(b&0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if msglen < 0 {
+ return ErrInvalidLengthLease
+ }
+ postIndex := iNdEx + msglen
+ if postIndex < 0 {
+ return ErrInvalidLengthLease
+ }
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ if m.LeaseTimeToLiveResponse == nil {
+ m.LeaseTimeToLiveResponse = &etcdserverpb.LeaseTimeToLiveResponse{}
+ }
+ if err := m.LeaseTimeToLiveResponse.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
+ return err
+ }
+ iNdEx = postIndex
+ default:
+ iNdEx = preIndex
+ skippy, err := skipLease(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if skippy < 0 {
+ return ErrInvalidLengthLease
+ }
+ if (iNdEx + skippy) < 0 {
+ return ErrInvalidLengthLease
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
+func skipLease(dAtA []byte) (n int, err error) {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return 0, ErrIntOverflowLease
+ }
+ if iNdEx >= l {
+ return 0, io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ wireType := int(wire & 0x7)
+ switch wireType {
+ case 0:
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return 0, ErrIntOverflowLease
+ }
+ if iNdEx >= l {
+ return 0, io.ErrUnexpectedEOF
+ }
+ iNdEx++
+ if dAtA[iNdEx-1] < 0x80 {
+ break
+ }
+ }
+ return iNdEx, nil
+ case 1:
+ iNdEx += 8
+ return iNdEx, nil
+ case 2:
+ var length int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return 0, ErrIntOverflowLease
+ }
+ if iNdEx >= l {
+ return 0, io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ length |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if length < 0 {
+ return 0, ErrInvalidLengthLease
+ }
+ iNdEx += length
+ if iNdEx < 0 {
+ return 0, ErrInvalidLengthLease
+ }
+ return iNdEx, nil
+ case 3:
+ for {
+ var innerWire uint64
+ var start int = iNdEx
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return 0, ErrIntOverflowLease
+ }
+ if iNdEx >= l {
+ return 0, io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ innerWire |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ innerWireType := int(innerWire & 0x7)
+ if innerWireType == 4 {
+ break
+ }
+ next, err := skipLease(dAtA[start:])
+ if err != nil {
+ return 0, err
+ }
+ iNdEx = start + next
+ if iNdEx < 0 {
+ return 0, ErrInvalidLengthLease
+ }
+ }
+ return iNdEx, nil
+ case 4:
+ return iNdEx, nil
+ case 5:
+ iNdEx += 4
+ return iNdEx, nil
+ default:
+ return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
+ }
+ }
+ panic("unreachable")
+}
+
+var (
+ ErrInvalidLengthLease = fmt.Errorf("proto: negative length found during unmarshaling")
+ ErrIntOverflowLease = fmt.Errorf("proto: integer overflow")
+)
diff --git a/vendor/github.com/coreos/etcd/lease/leasepb/lease.proto b/vendor/github.com/coreos/etcd/lease/leasepb/lease.proto
new file mode 100644
index 0000000..be414b9
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/lease/leasepb/lease.proto
@@ -0,0 +1,24 @@
+syntax = "proto3";
+package leasepb;
+
+import "gogoproto/gogo.proto";
+import "etcd/etcdserver/etcdserverpb/rpc.proto";
+
+option (gogoproto.marshaler_all) = true;
+option (gogoproto.sizer_all) = true;
+option (gogoproto.unmarshaler_all) = true;
+option (gogoproto.goproto_getters_all) = false;
+option (gogoproto.goproto_enum_prefix_all) = false;
+
+message Lease {
+ int64 ID = 1;
+ int64 TTL = 2;
+}
+
+message LeaseInternalRequest {
+ etcdserverpb.LeaseTimeToLiveRequest LeaseTimeToLiveRequest = 1;
+}
+
+message LeaseInternalResponse {
+ etcdserverpb.LeaseTimeToLiveResponse LeaseTimeToLiveResponse = 1;
+}
diff --git a/vendor/github.com/coreos/etcd/lease/lessor.go b/vendor/github.com/coreos/etcd/lease/lessor.go
new file mode 100644
index 0000000..43f0503
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/lease/lessor.go
@@ -0,0 +1,680 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package lease
+
+import (
+ "encoding/binary"
+ "errors"
+ "math"
+ "sort"
+ "sync"
+ "time"
+
+ "github.com/coreos/etcd/lease/leasepb"
+ "github.com/coreos/etcd/mvcc/backend"
+)
+
+// NoLease is a special LeaseID representing the absence of a lease.
+const NoLease = LeaseID(0)
+
+// MaxLeaseTTL is the maximum lease TTL value
+const MaxLeaseTTL = 9000000000
+
+var (
+ forever = time.Time{}
+
+ leaseBucketName = []byte("lease")
+
+ // maximum number of leases to revoke per second; configurable for tests
+ leaseRevokeRate = 1000
+
+ ErrNotPrimary = errors.New("not a primary lessor")
+ ErrLeaseNotFound = errors.New("lease not found")
+ ErrLeaseExists = errors.New("lease already exists")
+ ErrLeaseTTLTooLarge = errors.New("too large lease TTL")
+)
+
+// TxnDelete is a TxnWrite that only permits deletes. Defined here
+// to avoid circular dependency with mvcc.
+type TxnDelete interface {
+ DeleteRange(key, end []byte) (n, rev int64)
+ End()
+}
+
+// RangeDeleter is a TxnDelete constructor.
+type RangeDeleter func() TxnDelete
+
+type LeaseID int64
+
+// Lessor owns leases. It can grant, revoke, renew and modify leases for lessee.
+type Lessor interface {
+ // SetRangeDeleter lets the lessor create TxnDeletes to the store.
+ // Lessor deletes the items in the revoked or expired lease by creating
+ // new TxnDeletes.
+ SetRangeDeleter(rd RangeDeleter)
+
+ // Grant grants a lease that expires at least after TTL seconds.
+ Grant(id LeaseID, ttl int64) (*Lease, error)
+ // Revoke revokes a lease with given ID. The item attached to the
+ // given lease will be removed. If the ID does not exist, an error
+ // will be returned.
+ Revoke(id LeaseID) error
+
+ // Attach attaches given leaseItem to the lease with given LeaseID.
+ // If the lease does not exist, an error will be returned.
+ Attach(id LeaseID, items []LeaseItem) error
+
+ // GetLease returns LeaseID for given item.
+ // If no lease found, NoLease value will be returned.
+ GetLease(item LeaseItem) LeaseID
+
+ // Detach detaches given leaseItem from the lease with given LeaseID.
+ // If the lease does not exist, an error will be returned.
+ Detach(id LeaseID, items []LeaseItem) error
+
+ // Promote promotes the lessor to be the primary lessor. Primary lessor manages
+ // the expiration and renew of leases.
+ // Newly promoted lessor renew the TTL of all lease to extend + previous TTL.
+ Promote(extend time.Duration)
+
+ // Demote demotes the lessor from being the primary lessor.
+ Demote()
+
+ // Renew renews a lease with given ID. It returns the renewed TTL. If the ID does not exist,
+ // an error will be returned.
+ Renew(id LeaseID) (int64, error)
+
+ // Lookup gives the lease at a given lease id, if any
+ Lookup(id LeaseID) *Lease
+
+ // Leases lists all leases.
+ Leases() []*Lease
+
+ // ExpiredLeasesC returns a chan that is used to receive expired leases.
+ ExpiredLeasesC() <-chan []*Lease
+
+ // Recover recovers the lessor state from the given backend and RangeDeleter.
+ Recover(b backend.Backend, rd RangeDeleter)
+
+ // Stop stops the lessor for managing leases. The behavior of calling Stop multiple
+ // times is undefined.
+ Stop()
+}
+
+// lessor implements Lessor interface.
+// TODO: use clockwork for testability.
+type lessor struct {
+ mu sync.Mutex
+
+ // demotec is set when the lessor is the primary.
+ // demotec will be closed if the lessor is demoted.
+ demotec chan struct{}
+
+ // TODO: probably this should be a heap with a secondary
+ // id index.
+ // Now it is O(N) to loop over the leases to find expired ones.
+ // We want to make Grant, Revoke, and findExpiredLeases all O(logN) and
+ // Renew O(1).
+ // findExpiredLeases and Renew should be the most frequent operations.
+ leaseMap map[LeaseID]*Lease
+
+ itemMap map[LeaseItem]LeaseID
+
+ // When a lease expires, the lessor will delete the
+ // leased range (or key) by the RangeDeleter.
+ rd RangeDeleter
+
+ // backend to persist leases. We only persist lease ID and expiry for now.
+ // The leased items can be recovered by iterating all the keys in kv.
+ b backend.Backend
+
+ // minLeaseTTL is the minimum lease TTL that can be granted for a lease. Any
+ // requests for shorter TTLs are extended to the minimum TTL.
+ minLeaseTTL int64
+
+ expiredC chan []*Lease
+ // stopC is a channel whose closure indicates that the lessor should be stopped.
+ stopC chan struct{}
+ // doneC is a channel whose closure indicates that the lessor is stopped.
+ doneC chan struct{}
+}
+
+func NewLessor(b backend.Backend, minLeaseTTL int64) Lessor {
+ return newLessor(b, minLeaseTTL)
+}
+
+func newLessor(b backend.Backend, minLeaseTTL int64) *lessor {
+ l := &lessor{
+ leaseMap: make(map[LeaseID]*Lease),
+ itemMap: make(map[LeaseItem]LeaseID),
+ b: b,
+ minLeaseTTL: minLeaseTTL,
+ // expiredC is a small buffered chan to avoid unnecessary blocking.
+ expiredC: make(chan []*Lease, 16),
+ stopC: make(chan struct{}),
+ doneC: make(chan struct{}),
+ }
+ l.initAndRecover()
+
+ go l.runLoop()
+
+ return l
+}
+
+// isPrimary indicates if this lessor is the primary lessor. The primary
+// lessor manages lease expiration and renew.
+//
+// in etcd, raft leader is the primary. Thus there might be two primary
+// leaders at the same time (raft allows concurrent leader but with different term)
+// for at most a leader election timeout.
+// The old primary leader cannot affect the correctness since its proposal has a
+// smaller term and will not be committed.
+//
+// TODO: raft follower do not forward lease management proposals. There might be a
+// very small window (within second normally which depends on go scheduling) that
+// a raft follow is the primary between the raft leader demotion and lessor demotion.
+// Usually this should not be a problem. Lease should not be that sensitive to timing.
+func (le *lessor) isPrimary() bool {
+ return le.demotec != nil
+}
+
+func (le *lessor) SetRangeDeleter(rd RangeDeleter) {
+ le.mu.Lock()
+ defer le.mu.Unlock()
+
+ le.rd = rd
+}
+
+func (le *lessor) Grant(id LeaseID, ttl int64) (*Lease, error) {
+ if id == NoLease {
+ return nil, ErrLeaseNotFound
+ }
+
+ if ttl > MaxLeaseTTL {
+ return nil, ErrLeaseTTLTooLarge
+ }
+
+ // TODO: when lessor is under high load, it should give out lease
+ // with longer TTL to reduce renew load.
+ l := &Lease{
+ ID: id,
+ ttl: ttl,
+ itemSet: make(map[LeaseItem]struct{}),
+ revokec: make(chan struct{}),
+ }
+
+ le.mu.Lock()
+ defer le.mu.Unlock()
+
+ if _, ok := le.leaseMap[id]; ok {
+ return nil, ErrLeaseExists
+ }
+
+ if l.ttl < le.minLeaseTTL {
+ l.ttl = le.minLeaseTTL
+ }
+
+ if le.isPrimary() {
+ l.refresh(0)
+ } else {
+ l.forever()
+ }
+
+ le.leaseMap[id] = l
+ l.persistTo(le.b)
+
+ return l, nil
+}
+
+func (le *lessor) Revoke(id LeaseID) error {
+ le.mu.Lock()
+
+ l := le.leaseMap[id]
+ if l == nil {
+ le.mu.Unlock()
+ return ErrLeaseNotFound
+ }
+ defer close(l.revokec)
+ // unlock before doing external work
+ le.mu.Unlock()
+
+ if le.rd == nil {
+ return nil
+ }
+
+ txn := le.rd()
+
+ // sort keys so deletes are in same order among all members,
+ // otherwise the backened hashes will be different
+ keys := l.Keys()
+ sort.StringSlice(keys).Sort()
+ for _, key := range keys {
+ txn.DeleteRange([]byte(key), nil)
+ }
+
+ le.mu.Lock()
+ defer le.mu.Unlock()
+ delete(le.leaseMap, l.ID)
+ // lease deletion needs to be in the same backend transaction with the
+ // kv deletion. Or we might end up with not executing the revoke or not
+ // deleting the keys if etcdserver fails in between.
+ le.b.BatchTx().UnsafeDelete(leaseBucketName, int64ToBytes(int64(l.ID)))
+
+ txn.End()
+ return nil
+}
+
+// Renew renews an existing lease. If the given lease does not exist or
+// has expired, an error will be returned.
+func (le *lessor) Renew(id LeaseID) (int64, error) {
+ le.mu.Lock()
+
+ unlock := func() { le.mu.Unlock() }
+ defer func() { unlock() }()
+
+ if !le.isPrimary() {
+ // forward renew request to primary instead of returning error.
+ return -1, ErrNotPrimary
+ }
+
+ demotec := le.demotec
+
+ l := le.leaseMap[id]
+ if l == nil {
+ return -1, ErrLeaseNotFound
+ }
+
+ if l.expired() {
+ le.mu.Unlock()
+ unlock = func() {}
+ select {
+ // A expired lease might be pending for revoking or going through
+ // quorum to be revoked. To be accurate, renew request must wait for the
+ // deletion to complete.
+ case <-l.revokec:
+ return -1, ErrLeaseNotFound
+ // The expired lease might fail to be revoked if the primary changes.
+ // The caller will retry on ErrNotPrimary.
+ case <-demotec:
+ return -1, ErrNotPrimary
+ case <-le.stopC:
+ return -1, ErrNotPrimary
+ }
+ }
+
+ l.refresh(0)
+ return l.ttl, nil
+}
+
+func (le *lessor) Lookup(id LeaseID) *Lease {
+ le.mu.Lock()
+ defer le.mu.Unlock()
+ return le.leaseMap[id]
+}
+
+func (le *lessor) unsafeLeases() []*Lease {
+ leases := make([]*Lease, 0, len(le.leaseMap))
+ for _, l := range le.leaseMap {
+ leases = append(leases, l)
+ }
+ sort.Sort(leasesByExpiry(leases))
+ return leases
+}
+
+func (le *lessor) Leases() []*Lease {
+ le.mu.Lock()
+ ls := le.unsafeLeases()
+ le.mu.Unlock()
+ return ls
+}
+
+func (le *lessor) Promote(extend time.Duration) {
+ le.mu.Lock()
+ defer le.mu.Unlock()
+
+ le.demotec = make(chan struct{})
+
+ // refresh the expiries of all leases.
+ for _, l := range le.leaseMap {
+ l.refresh(extend)
+ }
+
+ if len(le.leaseMap) < leaseRevokeRate {
+ // no possibility of lease pile-up
+ return
+ }
+
+ // adjust expiries in case of overlap
+ leases := le.unsafeLeases()
+
+ baseWindow := leases[0].Remaining()
+ nextWindow := baseWindow + time.Second
+ expires := 0
+ // have fewer expires than the total revoke rate so piled up leases
+ // don't consume the entire revoke limit
+ targetExpiresPerSecond := (3 * leaseRevokeRate) / 4
+ for _, l := range leases {
+ remaining := l.Remaining()
+ if remaining > nextWindow {
+ baseWindow = remaining
+ nextWindow = baseWindow + time.Second
+ expires = 1
+ continue
+ }
+ expires++
+ if expires <= targetExpiresPerSecond {
+ continue
+ }
+ rateDelay := float64(time.Second) * (float64(expires) / float64(targetExpiresPerSecond))
+ // If leases are extended by n seconds, leases n seconds ahead of the
+ // base window should be extended by only one second.
+ rateDelay -= float64(remaining - baseWindow)
+ delay := time.Duration(rateDelay)
+ nextWindow = baseWindow + delay
+ l.refresh(delay + extend)
+ }
+}
+
+type leasesByExpiry []*Lease
+
+func (le leasesByExpiry) Len() int { return len(le) }
+func (le leasesByExpiry) Less(i, j int) bool { return le[i].Remaining() < le[j].Remaining() }
+func (le leasesByExpiry) Swap(i, j int) { le[i], le[j] = le[j], le[i] }
+
+func (le *lessor) Demote() {
+ le.mu.Lock()
+ defer le.mu.Unlock()
+
+ // set the expiries of all leases to forever
+ for _, l := range le.leaseMap {
+ l.forever()
+ }
+
+ if le.demotec != nil {
+ close(le.demotec)
+ le.demotec = nil
+ }
+}
+
+// Attach attaches items to the lease with given ID. When the lease
+// expires, the attached items will be automatically removed.
+// If the given lease does not exist, an error will be returned.
+func (le *lessor) Attach(id LeaseID, items []LeaseItem) error {
+ le.mu.Lock()
+ defer le.mu.Unlock()
+
+ l := le.leaseMap[id]
+ if l == nil {
+ return ErrLeaseNotFound
+ }
+
+ l.mu.Lock()
+ for _, it := range items {
+ l.itemSet[it] = struct{}{}
+ le.itemMap[it] = id
+ }
+ l.mu.Unlock()
+ return nil
+}
+
+func (le *lessor) GetLease(item LeaseItem) LeaseID {
+ le.mu.Lock()
+ id := le.itemMap[item]
+ le.mu.Unlock()
+ return id
+}
+
+// Detach detaches items from the lease with given ID.
+// If the given lease does not exist, an error will be returned.
+func (le *lessor) Detach(id LeaseID, items []LeaseItem) error {
+ le.mu.Lock()
+ defer le.mu.Unlock()
+
+ l := le.leaseMap[id]
+ if l == nil {
+ return ErrLeaseNotFound
+ }
+
+ l.mu.Lock()
+ for _, it := range items {
+ delete(l.itemSet, it)
+ delete(le.itemMap, it)
+ }
+ l.mu.Unlock()
+ return nil
+}
+
+func (le *lessor) Recover(b backend.Backend, rd RangeDeleter) {
+ le.mu.Lock()
+ defer le.mu.Unlock()
+
+ le.b = b
+ le.rd = rd
+ le.leaseMap = make(map[LeaseID]*Lease)
+ le.itemMap = make(map[LeaseItem]LeaseID)
+ le.initAndRecover()
+}
+
+func (le *lessor) ExpiredLeasesC() <-chan []*Lease {
+ return le.expiredC
+}
+
+func (le *lessor) Stop() {
+ close(le.stopC)
+ <-le.doneC
+}
+
+func (le *lessor) runLoop() {
+ defer close(le.doneC)
+
+ for {
+ var ls []*Lease
+
+ // rate limit
+ revokeLimit := leaseRevokeRate / 2
+
+ le.mu.Lock()
+ if le.isPrimary() {
+ ls = le.findExpiredLeases(revokeLimit)
+ }
+ le.mu.Unlock()
+
+ if len(ls) != 0 {
+ select {
+ case <-le.stopC:
+ return
+ case le.expiredC <- ls:
+ default:
+ // the receiver of expiredC is probably busy handling
+ // other stuff
+ // let's try this next time after 500ms
+ }
+ }
+
+ select {
+ case <-time.After(500 * time.Millisecond):
+ case <-le.stopC:
+ return
+ }
+ }
+}
+
+// findExpiredLeases loops leases in the leaseMap until reaching expired limit
+// and returns the expired leases that needed to be revoked.
+func (le *lessor) findExpiredLeases(limit int) []*Lease {
+ leases := make([]*Lease, 0, 16)
+
+ for _, l := range le.leaseMap {
+ // TODO: probably should change to <= 100-500 millisecond to
+ // make up committing latency.
+ if l.expired() {
+ leases = append(leases, l)
+
+ // reach expired limit
+ if len(leases) == limit {
+ break
+ }
+ }
+ }
+
+ return leases
+}
+
+func (le *lessor) initAndRecover() {
+ tx := le.b.BatchTx()
+ tx.Lock()
+
+ tx.UnsafeCreateBucket(leaseBucketName)
+ _, vs := tx.UnsafeRange(leaseBucketName, int64ToBytes(0), int64ToBytes(math.MaxInt64), 0)
+ // TODO: copy vs and do decoding outside tx lock if lock contention becomes an issue.
+ for i := range vs {
+ var lpb leasepb.Lease
+ err := lpb.Unmarshal(vs[i])
+ if err != nil {
+ tx.Unlock()
+ panic("failed to unmarshal lease proto item")
+ }
+ ID := LeaseID(lpb.ID)
+ if lpb.TTL < le.minLeaseTTL {
+ lpb.TTL = le.minLeaseTTL
+ }
+ le.leaseMap[ID] = &Lease{
+ ID: ID,
+ ttl: lpb.TTL,
+ // itemSet will be filled in when recover key-value pairs
+ // set expiry to forever, refresh when promoted
+ itemSet: make(map[LeaseItem]struct{}),
+ expiry: forever,
+ revokec: make(chan struct{}),
+ }
+ }
+ tx.Unlock()
+
+ le.b.ForceCommit()
+}
+
+type Lease struct {
+ ID LeaseID
+ ttl int64 // time to live in seconds
+ // expiryMu protects concurrent accesses to expiry
+ expiryMu sync.RWMutex
+ // expiry is time when lease should expire. no expiration when expiry.IsZero() is true
+ expiry time.Time
+
+ // mu protects concurrent accesses to itemSet
+ mu sync.RWMutex
+ itemSet map[LeaseItem]struct{}
+ revokec chan struct{}
+}
+
+func (l *Lease) expired() bool {
+ return l.Remaining() <= 0
+}
+
+func (l *Lease) persistTo(b backend.Backend) {
+ key := int64ToBytes(int64(l.ID))
+
+ lpb := leasepb.Lease{ID: int64(l.ID), TTL: int64(l.ttl)}
+ val, err := lpb.Marshal()
+ if err != nil {
+ panic("failed to marshal lease proto item")
+ }
+
+ b.BatchTx().Lock()
+ b.BatchTx().UnsafePut(leaseBucketName, key, val)
+ b.BatchTx().Unlock()
+}
+
+// TTL returns the TTL of the Lease.
+func (l *Lease) TTL() int64 {
+ return l.ttl
+}
+
+// refresh refreshes the expiry of the lease.
+func (l *Lease) refresh(extend time.Duration) {
+ newExpiry := time.Now().Add(extend + time.Duration(l.ttl)*time.Second)
+ l.expiryMu.Lock()
+ defer l.expiryMu.Unlock()
+ l.expiry = newExpiry
+}
+
+// forever sets the expiry of lease to be forever.
+func (l *Lease) forever() {
+ l.expiryMu.Lock()
+ defer l.expiryMu.Unlock()
+ l.expiry = forever
+}
+
+// Keys returns all the keys attached to the lease.
+func (l *Lease) Keys() []string {
+ l.mu.RLock()
+ keys := make([]string, 0, len(l.itemSet))
+ for k := range l.itemSet {
+ keys = append(keys, k.Key)
+ }
+ l.mu.RUnlock()
+ return keys
+}
+
+// Remaining returns the remaining time of the lease.
+func (l *Lease) Remaining() time.Duration {
+ l.expiryMu.RLock()
+ defer l.expiryMu.RUnlock()
+ if l.expiry.IsZero() {
+ return time.Duration(math.MaxInt64)
+ }
+ return time.Until(l.expiry)
+}
+
+type LeaseItem struct {
+ Key string
+}
+
+func int64ToBytes(n int64) []byte {
+ bytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(bytes, uint64(n))
+ return bytes
+}
+
+// FakeLessor is a fake implementation of Lessor interface.
+// Used for testing only.
+type FakeLessor struct{}
+
+func (fl *FakeLessor) SetRangeDeleter(dr RangeDeleter) {}
+
+func (fl *FakeLessor) Grant(id LeaseID, ttl int64) (*Lease, error) { return nil, nil }
+
+func (fl *FakeLessor) Revoke(id LeaseID) error { return nil }
+
+func (fl *FakeLessor) Attach(id LeaseID, items []LeaseItem) error { return nil }
+
+func (fl *FakeLessor) GetLease(item LeaseItem) LeaseID { return 0 }
+func (fl *FakeLessor) Detach(id LeaseID, items []LeaseItem) error { return nil }
+
+func (fl *FakeLessor) Promote(extend time.Duration) {}
+
+func (fl *FakeLessor) Demote() {}
+
+func (fl *FakeLessor) Renew(id LeaseID) (int64, error) { return 10, nil }
+
+func (fl *FakeLessor) Lookup(id LeaseID) *Lease { return nil }
+
+func (fl *FakeLessor) Leases() []*Lease { return nil }
+
+func (fl *FakeLessor) ExpiredLeasesC() <-chan []*Lease { return nil }
+
+func (fl *FakeLessor) Recover(b backend.Backend, rd RangeDeleter) {}
+
+func (fl *FakeLessor) Stop() {}