blob: 79bb6b859ca8f5e3d1c88bc48b41252691842e88 [file] [log] [blame]
khenaidooffe076b2019-01-15 16:08:08 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18 "fmt"
19 "reflect"
20 "strings"
21 "time"
22
23 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
24 "github.com/coreos/etcd/etcdserver/membership"
25 "github.com/coreos/etcd/pkg/types"
26 "github.com/coreos/etcd/rafthttp"
27 "github.com/golang/protobuf/proto"
28)
29
30// isConnectedToQuorumSince checks whether the local member is connected to the
31// quorum of the cluster since the given time.
32func isConnectedToQuorumSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) bool {
33 return numConnectedSince(transport, since, self, members) >= (len(members)/2)+1
34}
35
36// isConnectedSince checks whether the local member is connected to the
37// remote member since the given time.
38func isConnectedSince(transport rafthttp.Transporter, since time.Time, remote types.ID) bool {
39 t := transport.ActiveSince(remote)
40 return !t.IsZero() && t.Before(since)
41}
42
43// isConnectedFullySince checks whether the local member is connected to all
44// members in the cluster since the given time.
45func isConnectedFullySince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) bool {
46 return numConnectedSince(transport, since, self, members) == len(members)
47}
48
49// numConnectedSince counts how many members are connected to the local member
50// since the given time.
51func numConnectedSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) int {
52 connectedNum := 0
53 for _, m := range members {
54 if m.ID == self || isConnectedSince(transport, since, m.ID) {
55 connectedNum++
56 }
57 }
58 return connectedNum
59}
60
61// longestConnected chooses the member with longest active-since-time.
62// It returns false, if nothing is active.
63func longestConnected(tp rafthttp.Transporter, membs []types.ID) (types.ID, bool) {
64 var longest types.ID
65 var oldest time.Time
66 for _, id := range membs {
67 tm := tp.ActiveSince(id)
68 if tm.IsZero() { // inactive
69 continue
70 }
71
72 if oldest.IsZero() { // first longest candidate
73 oldest = tm
74 longest = id
75 }
76
77 if tm.Before(oldest) {
78 oldest = tm
79 longest = id
80 }
81 }
82 if uint64(longest) == 0 {
83 return longest, false
84 }
85 return longest, true
86}
87
88type notifier struct {
89 c chan struct{}
90 err error
91}
92
93func newNotifier() *notifier {
94 return &notifier{
95 c: make(chan struct{}),
96 }
97}
98
99func (nc *notifier) notify(err error) {
100 nc.err = err
101 close(nc.c)
102}
103
104func warnOfExpensiveRequest(now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
105 var resp string
106 if !isNil(respMsg) {
107 resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
108 }
109 warnOfExpensiveGenericRequest(now, reqStringer, "", resp, err)
110}
111
112func warnOfExpensiveReadOnlyTxnRequest(now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
113 reqStringer := pb.NewLoggableTxnRequest(r)
114 var resp string
115 if !isNil(txnResponse) {
116 var resps []string
117 for _, r := range txnResponse.Responses {
118 switch op := r.Response.(type) {
119 case *pb.ResponseOp_ResponseRange:
120 resps = append(resps, fmt.Sprintf("range_response_count:%d", len(op.ResponseRange.Kvs)))
121 default:
122 // only range responses should be in a read only txn request
123 }
124 }
125 resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse))
126 }
127 warnOfExpensiveGenericRequest(now, reqStringer, "read-only range ", resp, err)
128}
129
130func warnOfExpensiveReadOnlyRangeRequest(now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
131 var resp string
132 if !isNil(rangeResponse) {
133 resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse))
134 }
135 warnOfExpensiveGenericRequest(now, reqStringer, "read-only range ", resp, err)
136}
137
138func warnOfExpensiveGenericRequest(now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
139 // TODO: add metrics
140 d := time.Since(now)
141 if d > warnApplyDuration {
142 var result string
143 if err != nil {
144 result = fmt.Sprintf("error:%v", err)
145 } else {
146 result = resp
147 }
148 plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d)
149 slowApplies.Inc()
150 }
151}
152
153func isNil(msg proto.Message) bool {
154 return msg == nil || reflect.ValueOf(msg).IsNil()
155}