blob: d594ae7f154c93646f473af773d2e53143aec7d6 [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2016 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package v3rpc
16
17import (
18 "context"
19 "sync"
20 "time"
21
22 "github.com/coreos/etcd/etcdserver"
23 "github.com/coreos/etcd/etcdserver/api"
24 "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
25 "github.com/coreos/etcd/pkg/types"
26 "github.com/coreos/etcd/raft"
27
28 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
29 "go.uber.org/zap"
30 "google.golang.org/grpc"
31 "google.golang.org/grpc/metadata"
32 "google.golang.org/grpc/peer"
33)
34
35const (
36 maxNoLeaderCnt = 3
37)
38
39type streamsMap struct {
40 mu sync.Mutex
41 streams map[grpc.ServerStream]struct{}
42}
43
44func newUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
45 return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
46 if !api.IsCapabilityEnabled(api.V3rpcCapability) {
47 return nil, rpctypes.ErrGRPCNotCapable
48 }
49
50 md, ok := metadata.FromIncomingContext(ctx)
51 if ok {
52 if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
53 if s.Leader() == types.ID(raft.None) {
54 return nil, rpctypes.ErrGRPCNoLeader
55 }
56 }
57 }
58
59 return handler(ctx, req)
60 }
61}
62
63func newLogUnaryInterceptor(s *etcdserver.EtcdServer) grpc.UnaryServerInterceptor {
64 return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
65 startTime := time.Now()
66 resp, err := handler(ctx, req)
67 defer logUnaryRequestStats(ctx, nil, info, startTime, req, resp)
68 return resp, err
69 }
70}
71
72func logUnaryRequestStats(ctx context.Context, lg *zap.Logger, info *grpc.UnaryServerInfo, startTime time.Time, req interface{}, resp interface{}) {
73 duration := time.Since(startTime)
74 remote := "No remote client info."
75 peerInfo, ok := peer.FromContext(ctx)
76 if ok {
77 remote = peerInfo.Addr.String()
78 }
79 var responseType string = info.FullMethod
80 var reqCount, respCount int64
81 var reqSize, respSize int
82 var reqContent string
83 switch _resp := resp.(type) {
84 case *pb.RangeResponse:
85 _req, ok := req.(*pb.RangeRequest)
86 if ok {
87 reqCount = 0
88 reqSize = _req.Size()
89 reqContent = _req.String()
90 }
91 if _resp != nil {
92 respCount = _resp.GetCount()
93 respSize = _resp.Size()
94 }
95 case *pb.PutResponse:
96 _req, ok := req.(*pb.PutRequest)
97 if ok {
98 reqCount = 1
99 reqSize = _req.Size()
100 reqContent = pb.NewLoggablePutRequest(_req).String()
101 // redact value field from request content, see PR #9821
102 }
103 if _resp != nil {
104 respCount = 0
105 respSize = _resp.Size()
106 }
107 case *pb.DeleteRangeResponse:
108 _req, ok := req.(*pb.DeleteRangeRequest)
109 if ok {
110 reqCount = 0
111 reqSize = _req.Size()
112 reqContent = _req.String()
113 }
114 if _resp != nil {
115 respCount = _resp.GetDeleted()
116 respSize = _resp.Size()
117 }
118 case *pb.TxnResponse:
119 _req, ok := req.(*pb.TxnRequest)
120 if ok && _resp != nil {
121 if _resp.GetSucceeded() { // determine the 'actual' count and size of request based on success or failure
122 reqCount = int64(len(_req.GetSuccess()))
123 reqSize = 0
124 for _, r := range _req.GetSuccess() {
125 reqSize += r.Size()
126 }
127 } else {
128 reqCount = int64(len(_req.GetFailure()))
129 reqSize = 0
130 for _, r := range _req.GetFailure() {
131 reqSize += r.Size()
132 }
133 }
134 reqContent = pb.NewLoggableTxnRequest(_req).String()
135 // redact value field from request content, see PR #9821
136 }
137 if _resp != nil {
138 respCount = 0
139 respSize = _resp.Size()
140 }
141 default:
142 reqCount = -1
143 reqSize = -1
144 respCount = -1
145 respSize = -1
146 }
147
148 logGenericRequestStats(lg, startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent)
149}
150
151func logGenericRequestStats(lg *zap.Logger, startTime time.Time, duration time.Duration, remote string, responseType string,
152 reqCount int64, reqSize int, respCount int64, respSize int, reqContent string) {
153 if lg == nil {
154 plog.Debugf("start time = %v, "+
155 "time spent = %v, "+
156 "remote = %s, "+
157 "response type = %s, "+
158 "request count = %d, "+
159 "request size = %d, "+
160 "response count = %d, "+
161 "response size = %d, "+
162 "request content = %s",
163 startTime, duration, remote, responseType, reqCount, reqSize, respCount, respSize, reqContent,
164 )
165 } else {
166 lg.Debug("request stats",
167 zap.Time("start time", startTime),
168 zap.Duration("time spent", duration),
169 zap.String("remote", remote),
170 zap.String("response type", responseType),
171 zap.Int64("request count", reqCount),
172 zap.Int("request size", reqSize),
173 zap.Int64("response count", respCount),
174 zap.Int("response size", respSize),
175 zap.String("request content", reqContent),
176 )
177 }
178}
179
180func newStreamInterceptor(s *etcdserver.EtcdServer) grpc.StreamServerInterceptor {
181 smap := monitorLeader(s)
182
183 return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
184 if !api.IsCapabilityEnabled(api.V3rpcCapability) {
185 return rpctypes.ErrGRPCNotCapable
186 }
187
188 md, ok := metadata.FromIncomingContext(ss.Context())
189 if ok {
190 if ks := md[rpctypes.MetadataRequireLeaderKey]; len(ks) > 0 && ks[0] == rpctypes.MetadataHasLeader {
191 if s.Leader() == types.ID(raft.None) {
192 return rpctypes.ErrGRPCNoLeader
193 }
194
195 cctx, cancel := context.WithCancel(ss.Context())
196 ss = serverStreamWithCtx{ctx: cctx, cancel: &cancel, ServerStream: ss}
197
198 smap.mu.Lock()
199 smap.streams[ss] = struct{}{}
200 smap.mu.Unlock()
201
202 defer func() {
203 smap.mu.Lock()
204 delete(smap.streams, ss)
205 smap.mu.Unlock()
206 cancel()
207 }()
208
209 }
210 }
211
212 return handler(srv, ss)
213 }
214}
215
216type serverStreamWithCtx struct {
217 grpc.ServerStream
218 ctx context.Context
219 cancel *context.CancelFunc
220}
221
222func (ssc serverStreamWithCtx) Context() context.Context { return ssc.ctx }
223
224func monitorLeader(s *etcdserver.EtcdServer) *streamsMap {
225 smap := &streamsMap{
226 streams: make(map[grpc.ServerStream]struct{}),
227 }
228
229 go func() {
230 election := time.Duration(s.Cfg.TickMs) * time.Duration(s.Cfg.ElectionTicks) * time.Millisecond
231 noLeaderCnt := 0
232
233 for {
234 select {
235 case <-s.StopNotify():
236 return
237 case <-time.After(election):
238 if s.Leader() == types.ID(raft.None) {
239 noLeaderCnt++
240 } else {
241 noLeaderCnt = 0
242 }
243
244 // We are more conservative on canceling existing streams. Reconnecting streams
245 // cost much more than just rejecting new requests. So we wait until the member
246 // cannot find a leader for maxNoLeaderCnt election timeouts to cancel existing streams.
247 if noLeaderCnt >= maxNoLeaderCnt {
248 smap.mu.Lock()
249 for ss := range smap.streams {
250 if ssWithCtx, ok := ss.(serverStreamWithCtx); ok {
251 (*ssWithCtx.cancel)()
252 <-ss.Context().Done()
253 }
254 }
255 smap.streams = make(map[grpc.ServerStream]struct{})
256 smap.mu.Unlock()
257 }
258 }
259 }
260 }()
261
262 return smap
263}