blob: eb11d5d0de4ad8b624d73f618a44ac891e92b5a0 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18 "fmt"
19 "reflect"
20 "strings"
21 "time"
22
23 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
24 "github.com/coreos/etcd/etcdserver/membership"
25 "github.com/coreos/etcd/pkg/types"
26 "github.com/coreos/etcd/rafthttp"
27 "github.com/golang/protobuf/proto"
28)
29
30// isConnectedToQuorumSince checks whether the local member is connected to the
31// quorum of the cluster since the given time.
32func isConnectedToQuorumSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) bool {
33 return numConnectedSince(transport, since, self, members) >= (len(members)/2)+1
34}
35
36// isConnectedSince checks whether the local member is connected to the
37// remote member since the given time.
38func isConnectedSince(transport rafthttp.Transporter, since time.Time, remote types.ID) bool {
39 t := transport.ActiveSince(remote)
40 return !t.IsZero() && t.Before(since)
41}
42
43// isConnectedFullySince checks whether the local member is connected to all
44// members in the cluster since the given time.
45func isConnectedFullySince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) bool {
46 return numConnectedSince(transport, since, self, members) == len(members)
47}
48
49// numConnectedSince counts how many members are connected to the local member
50// since the given time.
51func numConnectedSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) int {
52 connectedNum := 0
53 for _, m := range members {
54 if m.ID == self || isConnectedSince(transport, since, m.ID) {
55 connectedNum++
56 }
57 }
58 return connectedNum
59}
60
61// longestConnected chooses the member with longest active-since-time.
62// It returns false, if nothing is active.
63func longestConnected(tp rafthttp.Transporter, membs []types.ID) (types.ID, bool) {
64 var longest types.ID
65 var oldest time.Time
66 for _, id := range membs {
67 tm := tp.ActiveSince(id)
68 if tm.IsZero() { // inactive
69 continue
70 }
71
72 if oldest.IsZero() { // first longest candidate
73 oldest = tm
74 longest = id
75 }
76
77 if tm.Before(oldest) {
78 oldest = tm
79 longest = id
80 }
81 }
82 if uint64(longest) == 0 {
83 return longest, false
84 }
85 return longest, true
86}
87
88type notifier struct {
89 c chan struct{}
90 err error
91}
92
93func newNotifier() *notifier {
94 return &notifier{
95 c: make(chan struct{}),
96 }
97}
98
99func (nc *notifier) notify(err error) {
100 nc.err = err
101 close(nc.c)
102}
103
104func warnOfExpensiveRequest(now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
105 var resp string
106 if !isNil(respMsg) {
107 resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
108 }
109 warnOfExpensiveGenericRequest(now, reqStringer, "", resp, err)
110}
111
112func warnOfFailedRequest(now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) {
113 var resp string
114 if !isNil(respMsg) {
115 resp = fmt.Sprintf("size:%d", proto.Size(respMsg))
116 }
117 d := time.Since(now)
118 plog.Warningf("failed to apply request,took %v,request %s,resp %s,err is %v", d, reqStringer.String(), resp, err)
119}
120
121func warnOfExpensiveReadOnlyTxnRequest(now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) {
122 reqStringer := pb.NewLoggableTxnRequest(r)
123 var resp string
124 if !isNil(txnResponse) {
125 var resps []string
126 for _, r := range txnResponse.Responses {
127 switch op := r.Response.(type) {
128 case *pb.ResponseOp_ResponseRange:
129 resps = append(resps, fmt.Sprintf("range_response_count:%d", len(op.ResponseRange.Kvs)))
130 default:
131 // only range responses should be in a read only txn request
132 }
133 }
134 resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse))
135 }
136 warnOfExpensiveGenericRequest(now, reqStringer, "read-only range ", resp, err)
137}
138
139func warnOfExpensiveReadOnlyRangeRequest(now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) {
140 var resp string
141 if !isNil(rangeResponse) {
142 resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse))
143 }
144 warnOfExpensiveGenericRequest(now, reqStringer, "read-only range ", resp, err)
145}
146
147func warnOfExpensiveGenericRequest(now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) {
148 // TODO: add metrics
149 d := time.Since(now)
150 if d > warnApplyDuration {
151 var result string
152 if err != nil {
153 result = fmt.Sprintf("error:%v", err)
154 } else {
155 result = resp
156 }
157 plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d)
158 slowApplies.Inc()
159 }
160}
161
162func isNil(msg proto.Message) bool {
163 return msg == nil || reflect.ValueOf(msg).IsNil()
164}