blob: 7441beedf15efa5d981a1c6fbff255b120f98906 [file] [log] [blame]
khenaidoo59ce9dd2019-11-11 13:05:32 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package v3rpc
16
17import (
18 "context"
19 "io"
20
21 "go.etcd.io/etcd/etcdserver"
22 "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
23 pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
24 "go.etcd.io/etcd/lease"
25
26 "go.uber.org/zap"
27)
28
29type LeaseServer struct {
30 lg *zap.Logger
31 hdr header
32 le etcdserver.Lessor
33}
34
35func NewLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer {
36 return &LeaseServer{lg: s.Cfg.Logger, le: s, hdr: newHeader(s)}
37}
38
39func (ls *LeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
40 resp, err := ls.le.LeaseGrant(ctx, cr)
41
42 if err != nil {
43 return nil, togRPCError(err)
44 }
45 ls.hdr.fill(resp.Header)
46 return resp, nil
47}
48
49func (ls *LeaseServer) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
50 resp, err := ls.le.LeaseRevoke(ctx, rr)
51 if err != nil {
52 return nil, togRPCError(err)
53 }
54 ls.hdr.fill(resp.Header)
55 return resp, nil
56}
57
58func (ls *LeaseServer) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
59 resp, err := ls.le.LeaseTimeToLive(ctx, rr)
60 if err != nil && err != lease.ErrLeaseNotFound {
61 return nil, togRPCError(err)
62 }
63 if err == lease.ErrLeaseNotFound {
64 resp = &pb.LeaseTimeToLiveResponse{
65 Header: &pb.ResponseHeader{},
66 ID: rr.ID,
67 TTL: -1,
68 }
69 }
70 ls.hdr.fill(resp.Header)
71 return resp, nil
72}
73
74func (ls *LeaseServer) LeaseLeases(ctx context.Context, rr *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
75 resp, err := ls.le.LeaseLeases(ctx, rr)
76 if err != nil && err != lease.ErrLeaseNotFound {
77 return nil, togRPCError(err)
78 }
79 if err == lease.ErrLeaseNotFound {
80 resp = &pb.LeaseLeasesResponse{
81 Header: &pb.ResponseHeader{},
82 Leases: []*pb.LeaseStatus{},
83 }
84 }
85 ls.hdr.fill(resp.Header)
86 return resp, nil
87}
88
89func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) (err error) {
90 errc := make(chan error, 1)
91 go func() {
92 errc <- ls.leaseKeepAlive(stream)
93 }()
94 select {
95 case err = <-errc:
96 case <-stream.Context().Done():
97 // the only server-side cancellation is noleader for now.
98 err = stream.Context().Err()
99 if err == context.Canceled {
100 err = rpctypes.ErrGRPCNoLeader
101 }
102 }
103 return err
104}
105
106func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
107 for {
108 req, err := stream.Recv()
109 if err == io.EOF {
110 return nil
111 }
112 if err != nil {
113 if isClientCtxErr(stream.Context().Err(), err) {
114 if ls.lg != nil {
115 ls.lg.Debug("failed to receive lease keepalive request from gRPC stream", zap.Error(err))
116 } else {
117 plog.Debugf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
118 }
119 } else {
120 if ls.lg != nil {
121 ls.lg.Warn("failed to receive lease keepalive request from gRPC stream", zap.Error(err))
122 } else {
123 plog.Warningf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
124 }
125 streamFailures.WithLabelValues("receive", "lease-keepalive").Inc()
126 }
127 return err
128 }
129
130 // Create header before we sent out the renew request.
131 // This can make sure that the revision is strictly smaller or equal to
132 // when the keepalive happened at the local server (when the local server is the leader)
133 // or remote leader.
134 // Without this, a lease might be revoked at rev 3 but client can see the keepalive succeeded
135 // at rev 4.
136 resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
137 ls.hdr.fill(resp.Header)
138
139 ttl, err := ls.le.LeaseRenew(stream.Context(), lease.LeaseID(req.ID))
140 if err == lease.ErrLeaseNotFound {
141 err = nil
142 ttl = 0
143 }
144
145 if err != nil {
146 return togRPCError(err)
147 }
148
149 resp.TTL = ttl
150 err = stream.Send(resp)
151 if err != nil {
152 if isClientCtxErr(stream.Context().Err(), err) {
153 if ls.lg != nil {
154 ls.lg.Debug("failed to send lease keepalive response to gRPC stream", zap.Error(err))
155 } else {
156 plog.Debugf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
157 }
158 } else {
159 if ls.lg != nil {
160 ls.lg.Warn("failed to send lease keepalive response to gRPC stream", zap.Error(err))
161 } else {
162 plog.Warningf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
163 }
164 streamFailures.WithLabelValues("send", "lease-keepalive").Inc()
165 }
166 return err
167 }
168 }
169}