| // Copyright 2016 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package v3rpc |
| |
| import ( |
| "context" |
| "io" |
| |
| "go.etcd.io/etcd/etcdserver" |
| "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes" |
| pb "go.etcd.io/etcd/etcdserver/etcdserverpb" |
| "go.etcd.io/etcd/lease" |
| |
| "go.uber.org/zap" |
| ) |
| |
| type LeaseServer struct { |
| lg *zap.Logger |
| hdr header |
| le etcdserver.Lessor |
| } |
| |
| func NewLeaseServer(s *etcdserver.EtcdServer) pb.LeaseServer { |
| return &LeaseServer{lg: s.Cfg.Logger, le: s, hdr: newHeader(s)} |
| } |
| |
| func (ls *LeaseServer) LeaseGrant(ctx context.Context, cr *pb.LeaseGrantRequest) (*pb.LeaseGrantResponse, error) { |
| resp, err := ls.le.LeaseGrant(ctx, cr) |
| |
| if err != nil { |
| return nil, togRPCError(err) |
| } |
| ls.hdr.fill(resp.Header) |
| return resp, nil |
| } |
| |
| func (ls *LeaseServer) LeaseRevoke(ctx context.Context, rr *pb.LeaseRevokeRequest) (*pb.LeaseRevokeResponse, error) { |
| resp, err := ls.le.LeaseRevoke(ctx, rr) |
| if err != nil { |
| return nil, togRPCError(err) |
| } |
| ls.hdr.fill(resp.Header) |
| return resp, nil |
| } |
| |
| func (ls *LeaseServer) LeaseTimeToLive(ctx context.Context, rr *pb.LeaseTimeToLiveRequest) (*pb.LeaseTimeToLiveResponse, error) { |
| resp, err := ls.le.LeaseTimeToLive(ctx, rr) |
| if err != nil && err != lease.ErrLeaseNotFound { |
| return nil, togRPCError(err) |
| } |
| if err == lease.ErrLeaseNotFound { |
| resp = &pb.LeaseTimeToLiveResponse{ |
| Header: &pb.ResponseHeader{}, |
| ID: rr.ID, |
| TTL: -1, |
| } |
| } |
| ls.hdr.fill(resp.Header) |
| return resp, nil |
| } |
| |
| func (ls *LeaseServer) LeaseLeases(ctx context.Context, rr *pb.LeaseLeasesRequest) (*pb.LeaseLeasesResponse, error) { |
| resp, err := ls.le.LeaseLeases(ctx, rr) |
| if err != nil && err != lease.ErrLeaseNotFound { |
| return nil, togRPCError(err) |
| } |
| if err == lease.ErrLeaseNotFound { |
| resp = &pb.LeaseLeasesResponse{ |
| Header: &pb.ResponseHeader{}, |
| Leases: []*pb.LeaseStatus{}, |
| } |
| } |
| ls.hdr.fill(resp.Header) |
| return resp, nil |
| } |
| |
| func (ls *LeaseServer) LeaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) (err error) { |
| errc := make(chan error, 1) |
| go func() { |
| errc <- ls.leaseKeepAlive(stream) |
| }() |
| select { |
| case err = <-errc: |
| case <-stream.Context().Done(): |
| // the only server-side cancellation is noleader for now. |
| err = stream.Context().Err() |
| if err == context.Canceled { |
| err = rpctypes.ErrGRPCNoLeader |
| } |
| } |
| return err |
| } |
| |
| func (ls *LeaseServer) leaseKeepAlive(stream pb.Lease_LeaseKeepAliveServer) error { |
| for { |
| req, err := stream.Recv() |
| if err == io.EOF { |
| return nil |
| } |
| if err != nil { |
| if isClientCtxErr(stream.Context().Err(), err) { |
| if ls.lg != nil { |
| ls.lg.Debug("failed to receive lease keepalive request from gRPC stream", zap.Error(err)) |
| } else { |
| plog.Debugf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error()) |
| } |
| } else { |
| if ls.lg != nil { |
| ls.lg.Warn("failed to receive lease keepalive request from gRPC stream", zap.Error(err)) |
| } else { |
| plog.Warningf("failed to receive lease keepalive request from gRPC stream (%q)", err.Error()) |
| } |
| streamFailures.WithLabelValues("receive", "lease-keepalive").Inc() |
| } |
| return err |
| } |
| |
| // Create header before we sent out the renew request. |
| // This can make sure that the revision is strictly smaller or equal to |
| // when the keepalive happened at the local server (when the local server is the leader) |
| // or remote leader. |
| // Without this, a lease might be revoked at rev 3 but client can see the keepalive succeeded |
| // at rev 4. |
| resp := &pb.LeaseKeepAliveResponse{ID: req.ID, Header: &pb.ResponseHeader{}} |
| ls.hdr.fill(resp.Header) |
| |
| ttl, err := ls.le.LeaseRenew(stream.Context(), lease.LeaseID(req.ID)) |
| if err == lease.ErrLeaseNotFound { |
| err = nil |
| ttl = 0 |
| } |
| |
| if err != nil { |
| return togRPCError(err) |
| } |
| |
| resp.TTL = ttl |
| err = stream.Send(resp) |
| if err != nil { |
| if isClientCtxErr(stream.Context().Err(), err) { |
| if ls.lg != nil { |
| ls.lg.Debug("failed to send lease keepalive response to gRPC stream", zap.Error(err)) |
| } else { |
| plog.Debugf("failed to send lease keepalive response to gRPC stream (%q)", err.Error()) |
| } |
| } else { |
| if ls.lg != nil { |
| ls.lg.Warn("failed to send lease keepalive response to gRPC stream", zap.Error(err)) |
| } else { |
| plog.Warningf("failed to send lease keepalive response to gRPC stream (%q)", err.Error()) |
| } |
| streamFailures.WithLabelValues("send", "lease-keepalive").Inc() |
| } |
| return err |
| } |
| } |
| } |