blob: ce9047e80fdc0a640d75366a70086a7bcb2ca0e7 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package v3rpc
16
17import (
18 "context"
19 "sync"
20 "time"
21
22 "go.etcd.io/etcd/etcdserver"
23 "go.etcd.io/etcd/etcdserver/api"
24 "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
25 "go.etcd.io/etcd/pkg/types"
26 "go.etcd.io/etcd/raft"
27
28 "github.com/coreos/pkg/capnslog"
29 pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
30 "go.uber.org/zap"
31 "google.golang.org/grpc"
32 "google.golang.org/grpc/metadata"
33 "google.golang.org/grpc/peer"
34)
35
36const (
37 maxNoLeaderCnt = 3
38)
39
40type streamsMap struct {
41 mu sync.Mutex
42 streams map[grpc.ServerStream]struct{}
43}
44
45func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
46 return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
47 if !api.IsCapabilityEnabled(api.V3rpcCapability) {
48 return nil, rpctypes.ErrGRPCNotCapable
49 }
50
51 if s.IsMemberExist(s.ID()) && s.IsLearner() && !isRPCSupportedForLearner(req) {
52 return nil, rpctypes.ErrGPRCNotSupportedForLearner
53 }
54
55 md, ok := metadata.FromIncomingContext(ctx)
56 if ok {
57 if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
58 if s.Leader() == types.ID(raft.None) {
59 return nil, rpctypes.ErrGRPCNoLeader
60 }
61 }
62 }
63
64 return handler(ctx, req)
65 }
66}
67
68func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
69 return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
70 startTime := time.Now()
71 resp, err := handler(ctx, req)
72 lg := s.Logger()
73 if (lg != nil && lg.Core().Enabled(zap.DebugLevel)) || // using zap logger and debug level is enabled
74 (lg == nil && plog.LevelAt(capnslog.DEBUG)) { // or, using capnslog and debug level is enabled
75 defer logUnaryRequestStats(ctx, lg, info, startTime, req, resp)
76 }
77 return resp, err
78 }
79}
80
81func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) {
82 duration := time.Since(startTime)
83 remote := "No remote client info."
84 peerInfo, ok := peer.FromContext(ctx)
85 if ok {
86 remote = peerInfo.Addr.String()
87 }
88 responseType := info.FullMethod
89 var reqCount, respCount int64
90 var reqSize, respSize int
91 var reqContent string
92 switch _resp := resp.(type) {
93 case *pb.RangeResponse:
94 _req, ok := req.(*pb.RangeRequest)
95 if ok {
96 reqCount = 0
97 reqSize = _req.Size()
98 reqContent = _req.String()
99 }
100 if _resp != nil {
101 respCount = _resp.GetCount()
102 respSize = _resp.Size()
103 }
104 case *pb.PutResponse:
105 _req, ok := req.(*pb.PutRequest)
106 if ok {
107 reqCount = 1
108 reqSize = _req.Size()
109 reqContent = pb.NewLoggablePutRequest(_req).String()
110 // redact value field from request content, see PR #9821
111 }
112 if _resp != nil {
113 respCount = 0
114 respSize = _resp.Size()
115 }
116 case *pb.DeleteRangeResponse:
117 _req, ok := req.(*pb.DeleteRangeRequest)
118 if ok {
119 reqCount = 0
120 reqSize = _req.Size()
121 reqContent = _req.String()
122 }
123 if _resp != nil {
124 respCount = _resp.GetDeleted()
125 respSize = _resp.Size()
126 }
127 case *pb.TxnResponse:
128 _req, ok := req.(*pb.TxnRequest)
129 if ok && _resp != nil {
130 if _resp.GetSucceeded() { // determine the 'actual' count and size of request based on success or failure
131 reqCount = int64(len(_req.GetSuccess()))
132 reqSize = 0
133 for _, r := range _req.GetSuccess() {
134 reqSize += r.Size()
135 }
136 } else {
137 reqCount = int64(len(_req.GetFailure()))
138 reqSize = 0
139 for _, r := range _req.GetFailure() {
140 reqSize += r.Size()
141 }
142 }
143 reqContent = pb.NewLoggableTxnRequest(_req).String()
144 // redact value field from request content, see PR #9821
145 }
146 if _resp != nil {
147 respCount = 0
148 respSize = _resp.Size()
149 }
150 default:
151 reqCount = -1
152 reqSize = -1
153 respCount = -1
154 respSize = -1
155 }
156
157 logGenericRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
158}
159
160func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
161 reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
162 if lg == nil {
163 plog.Debugf("start time = %v, "+
164 "time spent = %v, "+
165 "remote = %s, "+
166 "response type = %s, "+
167 "request count = %d, "+
168 "request size = %d, "+
169 "response count = %d, "+
170 "response size = %d, "+
171 "request content = %s",
172 startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent,
173 )
174 } else {
175 lg.Debug("request stats",
176 zap.Time("start time", startTime),
177 zap.Duration("time spent", duration),
178 zap.String("remote", remote),
179 zap.String("response type", responseType),
180 zap.Int64("request count", reqCount),
181 zap.Int("request size", reqSize),
182 zap.Int64("response count", respCount),
183 zap.Int("response size", respSize),
184 zap.String("request content", reqContent),
185 )
186 }
187}
188
189func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
190 smap := monitorLeader(s)
191
192 return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
193 if !api.IsCapabilityEnabled(api.V3rpcCapability) {
194 return rpctypes.ErrGRPCNotCapable
195 }
196
197 if s.IsMemberExist(s.ID()) && s.IsLearner() { // learner does not support stream RPC
198 return rpctypes.ErrGPRCNotSupportedForLearner
199 }
200
201 md, ok := metadata.FromIncomingContext(ss.Context())
202 if ok {
203 if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
204 if s.Leader() == types.ID(raft.None) {
205 return rpctypes.ErrGRPCNoLeader
206 }
207
208 cctx, cancel := context.WithCancel(ss.Context())
209 ss = serverStreamWithCtx{ctx: cctx, cancel: &cancel, ServerStream: ss}
210
211 smap.mu.Lock()
212 smap.streams[ss] = struct{}{}
213 smap.mu.Unlock()
214
215 defer func() {
216 smap.mu.Lock()
217 delete(smap.streams, ss)
218 smap.mu.Unlock()
219 cancel()
220 }()
221
222 }
223 }
224
225 return handler(srv, ss)
226 }
227}
228
229type serverStreamWithCtx struct {
230 grpc.ServerStream
231 ctx context.Context
232 cancel *context.CancelFunc
233}
234
235func (ssc serverStreamWithCtx) Context() context.Context { return ssc.ctx }
236
237func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
238 smap := &streamsMap{
239 streams: make(map[grpc.ServerStream]struct{}),
240 }
241
242 go func() {
243 election := time.Duration(s.Cfg.TickMs) * time.Duration(s.Cfg.ElectionTicks) * time.Millisecond
244 noLeaderCnt := 0
245
246 for {
247 select {
248 case <-s.StopNotify():
249 return
250 case <-time.After(election):
251 if s.Leader() == types.ID(raft.None) {
252 noLeaderCnt++
253 } else {
254 noLeaderCnt = 0
255 }
256
257 // We are more conservative on canceling existing streams. Reconnecting streams
258 // cost much more than just rejecting new requests. So we wait until the member
259 // cannot find a leader for maxNoLeaderCnt election timeouts to cancel existing streams.
260 if noLeaderCnt >= maxNoLeaderCnt {
261 smap.mu.Lock()
262 for ss := range smap.streams {
263 if ssWithCtx, ok := ss.(serverStreamWithCtx); ok {
264 (*ssWithCtx.cancel)()
265 <-ss.Context().Done()
266 }
267 }
268 smap.streams = make(map[grpc.ServerStream]struct{})
269 smap.mu.Unlock()
270 }
271 }
272 }
273 }()
274
275 return smap
276}