blob: 5b4f2b142281c1bfc96baed156c4377ae1cbde2f [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package v3rpc
16
17import (
18 "context"
19 "io"
20
21 "github.com/coreos/etcd/etcdserver"
22 "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
23 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
24 "github.com/coreos/etcd/lease"
25)
26
27type LeaseServer struct {
28 hdr header
29 le etcdserver.Lessor
30}
31
32func NewLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer {
33 return &LeaseServer{le: s, hdr: newHeader(s)}
34}
35
36func (ls *LeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) {
37 resp, err := ls.le.LeaseGrant(ctx, cr)
38
39 if err != nil {
40 return nil, togRPCError(err)
41 }
42 ls.hdr.fill(resp.Header)
43 return resp, nil
44}
45
46func (ls *LeaseServer) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) {
47 resp, err := ls.le.LeaseRevoke(ctx, rr)
48 if err != nil {
49 return nil, togRPCError(err)
50 }
51 ls.hdr.fill(resp.Header)
52 return resp, nil
53}
54
55func (ls *LeaseServer) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) {
56 resp, err := ls.le.LeaseTimeToLive(ctx, rr)
57 if err != nil && err != lease.ErrLeaseNotFound {
58 return nil, togRPCError(err)
59 }
60 if err == lease.ErrLeaseNotFound {
61 resp = &pb.LeaseTimeToLiveResponse{
62 Header: &pb.ResponseHeader{},
63 ID: rr.ID,
64 TTL: -1,
65 }
66 }
67 ls.hdr.fill(resp.Header)
68 return resp, nil
69}
70
71func (ls *LeaseServer) LeaseLeases(ctx context.Context, rr *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) {
72 resp, err := ls.le.LeaseLeases(ctx, rr)
73 if err != nil && err != lease.ErrLeaseNotFound {
74 return nil, togRPCError(err)
75 }
76 if err == lease.ErrLeaseNotFound {
77 resp = &pb.LeaseLeasesResponse{
78 Header: &pb.ResponseHeader{},
79 Leases: []*pb.LeaseStatus{},
80 }
81 }
82 ls.hdr.fill(resp.Header)
83 return resp, nil
84}
85
86func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) (err error) {
87 errc := make(chan error, 1)
88 go func() {
89 errc <- ls.leaseKeepAlive(stream)
90 }()
91 select {
92 case err = <-errc:
93 case <-stream.Context().Done():
94 // the only server-side cancellation is noleader for now.
95 err = stream.Context().Err()
96 if err == context.Canceled {
97 err = rpctypes.ErrGRPCNoLeader
98 }
99 }
100 return err
101}
102
103func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error {
104 for {
105 req, err := stream.Recv()
106 if err == io.EOF {
107 return nil
108 }
109 if err != nil {
110 if isClientCtxErr(stream.Context().Err(), err) {
111 plog.Debugf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
112 } else {
113 plog.Warningf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error())
114 }
115 return err
116 }
117
118 // Create header before we sent out the renew request.
119 // This can make sure that the revision is strictly smaller or equal to
120 // when the keepalive happened at the local server (when the local server is the leader)
121 // or remote leader.
122 // Without this, a lease might be revoked at rev 3 but client can see the keepalive succeeded
123 // at rev 4.
124 resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}}
125 ls.hdr.fill(resp.Header)
126
127 ttl, err := ls.le.LeaseRenew(stream.Context(), lease.LeaseID(req.ID))
128 if err == lease.ErrLeaseNotFound {
129 err = nil
130 ttl = 0
131 }
132
133 if err != nil {
134 return togRPCError(err)
135 }
136
137 resp.TTL = ttl
138 err = stream.Send(resp)
139 if err != nil {
140 if isClientCtxErr(stream.Context().Err(), err) {
141 plog.Debugf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
142 } else {
143 plog.Warningf("failed to send lease keepalive response to gRPC stream (%q)", err.Error())
144 }
145 return err
146 }
147 }
148}