blob: fbc2ba3ed8b92fdccbbd49c9413bb115a8608c95 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package v3rpc
16
17import (
18 "context"
19 "strings"
20 "sync"
21 "time"
22
23 "github.com/coreos/etcd/etcdserver"
24 "github.com/coreos/etcd/etcdserver/api"
25 "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
26 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
27 "github.com/coreos/etcd/pkg/types"
28 "github.com/coreos/etcd/raft"
29 "go.uber.org/zap"
30 "google.golang.org/grpc"
31 "google.golang.org/grpc/metadata"
32 "google.golang.org/grpc/peer"
33)
34
35const (
36 maxNoLeaderCnt = 3
37)
38
39type streamsMap struct {
40 mu sync.Mutex
41 streams map[grpc.ServerStream]struct{}
42}
43
44func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
45 return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
46 if !api.IsCapabilityEnabled(api.V3rpcCapability) {
47 return nil, rpctypes.ErrGRPCNotCapable
48 }
49
50 md, ok := metadata.FromIncomingContext(ctx)
51 if ok {
52 ver, vs := "unknown", metadataGet(md, rpctypes.MetadataClientAPIVersionKey)
53 if len(vs) > 0 {
54 ver = vs[0]
55 }
56 clientRequests.WithLabelValues("unary", ver).Inc()
57
58 if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
59 if s.Leader() == types.ID(raft.None) {
60 return nil, rpctypes.ErrGRPCNoLeader
61 }
62 }
63 }
64
65 return handler(ctx, req)
66 }
67}
68
69func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
70 return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
71 startTime := time.Now()
72 resp, err := handler(ctx, req)
73 defer logUnaryRequestStats(ctx, nil, info, startTime, req, resp)
74 return resp, err
75 }
76}
77
78func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) {
79 duration := time.Since(startTime)
80 remote := "No remote client info."
81 peerInfo, ok := peer.FromContext(ctx)
82 if ok {
83 remote = peerInfo.Addr.String()
84 }
85 var responseType string = info.FullMethod
86 var reqCount, respCount int64
87 var reqSize, respSize int
88 var reqContent string
89 switch _resp := resp.(type) {
90 case *pb.RangeResponse:
91 _req, ok := req.(*pb.RangeRequest)
92 if ok {
93 reqCount = 0
94 reqSize = _req.Size()
95 reqContent = _req.String()
96 }
97 if _resp != nil {
98 respCount = _resp.GetCount()
99 respSize = _resp.Size()
100 }
101 case *pb.PutResponse:
102 _req, ok := req.(*pb.PutRequest)
103 if ok {
104 reqCount = 1
105 reqSize = _req.Size()
106 reqContent = pb.NewLoggablePutRequest(_req).String()
107 // redact value field from request content, see PR #9821
108 }
109 if _resp != nil {
110 respCount = 0
111 respSize = _resp.Size()
112 }
113 case *pb.DeleteRangeResponse:
114 _req, ok := req.(*pb.DeleteRangeRequest)
115 if ok {
116 reqCount = 0
117 reqSize = _req.Size()
118 reqContent = _req.String()
119 }
120 if _resp != nil {
121 respCount = _resp.GetDeleted()
122 respSize = _resp.Size()
123 }
124 case *pb.TxnResponse:
125 _req, ok := req.(*pb.TxnRequest)
126 if ok && _resp != nil {
127 if _resp.GetSucceeded() { // determine the 'actual' count and size of request based on success or failure
128 reqCount = int64(len(_req.GetSuccess()))
129 reqSize = 0
130 for _, r := range _req.GetSuccess() {
131 reqSize += r.Size()
132 }
133 } else {
134 reqCount = int64(len(_req.GetFailure()))
135 reqSize = 0
136 for _, r := range _req.GetFailure() {
137 reqSize += r.Size()
138 }
139 }
140 reqContent = pb.NewLoggableTxnRequest(_req).String()
141 // redact value field from request content, see PR #9821
142 }
143 if _resp != nil {
144 respCount = 0
145 respSize = _resp.Size()
146 }
147 default:
148 reqCount = -1
149 reqSize = -1
150 respCount = -1
151 respSize = -1
152 }
153
154 logGenericRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
155}
156
157func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
158 reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
159 if lg == nil {
160 plog.Debugf("start time = %v, "+
161 "time spent = %v, "+
162 "remote = %s, "+
163 "response type = %s, "+
164 "request count = %d, "+
165 "request size = %d, "+
166 "response count = %d, "+
167 "response size = %d, "+
168 "request content = %s",
169 startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent,
170 )
171 } else {
172 lg.Debug("request stats",
173 zap.Time("start time", startTime),
174 zap.Duration("time spent", duration),
175 zap.String("remote", remote),
176 zap.String("response type", responseType),
177 zap.Int64("request count", reqCount),
178 zap.Int("request size", reqSize),
179 zap.Int64("response count", respCount),
180 zap.Int("response size", respSize),
181 zap.String("request content", reqContent),
182 )
183 }
184}
185
186func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
187 smap := monitorLeader(s)
188
189 return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
190 if !api.IsCapabilityEnabled(api.V3rpcCapability) {
191 return rpctypes.ErrGRPCNotCapable
192 }
193
194 md, ok := metadata.FromIncomingContext(ss.Context())
195 if ok {
196 ver, vs := "unknown", metadataGet(md, rpctypes.MetadataClientAPIVersionKey)
197 if len(vs) > 0 {
198 ver = vs[0]
199 }
200 clientRequests.WithLabelValues("stream", ver).Inc()
201
202 if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
203 if s.Leader() == types.ID(raft.None) {
204 return rpctypes.ErrGRPCNoLeader
205 }
206
207 cctx, cancel := context.WithCancel(ss.Context())
208 ss = serverStreamWithCtx{ctx: cctx, cancel: &cancel, ServerStream: ss}
209
210 smap.mu.Lock()
211 smap.streams[ss] = struct{}{}
212 smap.mu.Unlock()
213
214 defer func() {
215 smap.mu.Lock()
216 delete(smap.streams, ss)
217 smap.mu.Unlock()
218 cancel()
219 }()
220 }
221 }
222
223 return handler(srv, ss)
224 }
225}
226
227type serverStreamWithCtx struct {
228 grpc.ServerStream
229 ctx context.Context
230 cancel *context.CancelFunc
231}
232
233func (ssc serverStreamWithCtx) Context() context.Context { return ssc.ctx }
234
235func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
236 smap := &streamsMap{
237 streams: make(map[grpc.ServerStream]struct{}),
238 }
239
240 go func() {
241 election := time.Duration(s.Cfg.TickMs) * time.Duration(s.Cfg.ElectionTicks) * time.Millisecond
242 noLeaderCnt := 0
243
244 for {
245 select {
246 case <-s.StopNotify():
247 return
248 case <-time.After(election):
249 if s.Leader() == types.ID(raft.None) {
250 noLeaderCnt++
251 } else {
252 noLeaderCnt = 0
253 }
254
255 // We are more conservative on canceling existing streams. Reconnecting streams
256 // cost much more than just rejecting new requests. So we wait until the member
257 // cannot find a leader for maxNoLeaderCnt election timeouts to cancel existing streams.
258 if noLeaderCnt >= maxNoLeaderCnt {
259 smap.mu.Lock()
260 for ss := range smap.streams {
261 if ssWithCtx, ok := ss.(serverStreamWithCtx); ok {
262 (*ssWithCtx.cancel)()
263 <-ss.Context().Done()
264 }
265 }
266 smap.streams = make(map[grpc.ServerStream]struct{})
267 smap.mu.Unlock()
268 }
269 }
270 }
271 }()
272
273 return smap
274}
275
276func metadataGet(md metadata.MD, k string) []string {
277 k = strings.ToLower(k)
278 return md[k]
279}