khenaidoo | ab1f7bd | 2019-11-14 14:00:27 -0500 | [diff] [blame] | 1 | // Copyright 2015 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package etcdserver |
| 16 | |
| 17 | import ( |
| 18 | "fmt" |
| 19 | "reflect" |
| 20 | "strings" |
| 21 | "time" |
| 22 | |
| 23 | "github.com/golang/protobuf/proto" |
| 24 | "go.etcd.io/etcd/etcdserver/api/membership" |
| 25 | "go.etcd.io/etcd/etcdserver/api/rafthttp" |
| 26 | pb "go.etcd.io/etcd/etcdserver/etcdserverpb" |
| 27 | "go.etcd.io/etcd/pkg/types" |
| 28 | |
| 29 | "go.uber.org/zap" |
| 30 | ) |
| 31 | |
| 32 | // isConnectedToQuorumSince checks whether the local member is connected to the |
| 33 | // quorum of the cluster since the given time. |
| 34 | func isConnectedToQuorumSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) bool { |
| 35 | return numConnectedSince(transport, since, self, members) >= (len(members)/2)+1 |
| 36 | } |
| 37 | |
| 38 | // isConnectedSince checks whether the local member is connected to the |
| 39 | // remote member since the given time. |
| 40 | func isConnectedSince(transport rafthttp.Transporter, since time.Time, remote types.ID) bool { |
| 41 | t := transport.ActiveSince(remote) |
| 42 | return !t.IsZero() && t.Before(since) |
| 43 | } |
| 44 | |
| 45 | // isConnectedFullySince checks whether the local member is connected to all |
| 46 | // members in the cluster since the given time. |
| 47 | func isConnectedFullySince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) bool { |
| 48 | return numConnectedSince(transport, since, self, members) == len(members) |
| 49 | } |
| 50 | |
| 51 | // numConnectedSince counts how many members are connected to the local member |
| 52 | // since the given time. |
| 53 | func numConnectedSince(transport rafthttp.Transporter, since time.Time, self types.ID, members []*membership.Member) int { |
| 54 | connectedNum := 0 |
| 55 | for _, m := range members { |
| 56 | if m.ID == self || isConnectedSince(transport, since, m.ID) { |
| 57 | connectedNum++ |
| 58 | } |
| 59 | } |
| 60 | return connectedNum |
| 61 | } |
| 62 | |
| 63 | // longestConnected chooses the member with longest active-since-time. |
| 64 | // It returns false, if nothing is active. |
| 65 | func longestConnected(tp rafthttp.Transporter, membs []types.ID) (types.ID, bool) { |
| 66 | var longest types.ID |
| 67 | var oldest time.Time |
| 68 | for _, id := range membs { |
| 69 | tm := tp.ActiveSince(id) |
| 70 | if tm.IsZero() { // inactive |
| 71 | continue |
| 72 | } |
| 73 | |
| 74 | if oldest.IsZero() { // first longest candidate |
| 75 | oldest = tm |
| 76 | longest = id |
| 77 | } |
| 78 | |
| 79 | if tm.Before(oldest) { |
| 80 | oldest = tm |
| 81 | longest = id |
| 82 | } |
| 83 | } |
| 84 | if uint64(longest) == 0 { |
| 85 | return longest, false |
| 86 | } |
| 87 | return longest, true |
| 88 | } |
| 89 | |
| 90 | type notifier struct { |
| 91 | c chan struct{} |
| 92 | err error |
| 93 | } |
| 94 | |
| 95 | func newNotifier() *notifier { |
| 96 | return ¬ifier{ |
| 97 | c: make(chan struct{}), |
| 98 | } |
| 99 | } |
| 100 | |
| 101 | func (nc *notifier) notify(err error) { |
| 102 | nc.err = err |
| 103 | close(nc.c) |
| 104 | } |
| 105 | |
| 106 | func warnOfExpensiveRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, respMsg proto.Message, err error) { |
| 107 | var resp string |
| 108 | if !isNil(respMsg) { |
| 109 | resp = fmt.Sprintf("size:%d", proto.Size(respMsg)) |
| 110 | } |
| 111 | warnOfExpensiveGenericRequest(lg, now, reqStringer, "", resp, err) |
| 112 | } |
| 113 | |
| 114 | func warnOfExpensiveReadOnlyTxnRequest(lg *zap.Logger, now time.Time, r *pb.TxnRequest, txnResponse *pb.TxnResponse, err error) { |
| 115 | reqStringer := pb.NewLoggableTxnRequest(r) |
| 116 | var resp string |
| 117 | if !isNil(txnResponse) { |
| 118 | var resps []string |
| 119 | for _, r := range txnResponse.Responses { |
| 120 | switch op := r.Response.(type) { |
| 121 | case *pb.ResponseOp_ResponseRange: |
| 122 | resps = append(resps, fmt.Sprintf("range_response_count:%d", len(op.ResponseRange.Kvs))) |
| 123 | default: |
| 124 | // only range responses should be in a read only txn request |
| 125 | } |
| 126 | } |
| 127 | resp = fmt.Sprintf("responses:<%s> size:%d", strings.Join(resps, " "), proto.Size(txnResponse)) |
| 128 | } |
| 129 | warnOfExpensiveGenericRequest(lg, now, reqStringer, "read-only range ", resp, err) |
| 130 | } |
| 131 | |
| 132 | func warnOfExpensiveReadOnlyRangeRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, rangeResponse *pb.RangeResponse, err error) { |
| 133 | var resp string |
| 134 | if !isNil(rangeResponse) { |
| 135 | resp = fmt.Sprintf("range_response_count:%d size:%d", len(rangeResponse.Kvs), proto.Size(rangeResponse)) |
| 136 | } |
| 137 | warnOfExpensiveGenericRequest(lg, now, reqStringer, "read-only range ", resp, err) |
| 138 | } |
| 139 | |
| 140 | func warnOfExpensiveGenericRequest(lg *zap.Logger, now time.Time, reqStringer fmt.Stringer, prefix string, resp string, err error) { |
| 141 | d := time.Since(now) |
| 142 | if d > warnApplyDuration { |
| 143 | if lg != nil { |
| 144 | lg.Warn( |
| 145 | "apply request took too long", |
| 146 | zap.Duration("took", d), |
| 147 | zap.Duration("expected-duration", warnApplyDuration), |
| 148 | zap.String("prefix", prefix), |
| 149 | zap.String("request", reqStringer.String()), |
| 150 | zap.String("response", resp), |
| 151 | zap.Error(err), |
| 152 | ) |
| 153 | } else { |
| 154 | var result string |
| 155 | if err != nil { |
| 156 | result = fmt.Sprintf("error:%v", err) |
| 157 | } else { |
| 158 | result = resp |
| 159 | } |
| 160 | plog.Warningf("%srequest %q with result %q took too long (%v) to execute", prefix, reqStringer.String(), result, d) |
| 161 | } |
| 162 | slowApplies.Inc() |
| 163 | } |
| 164 | } |
| 165 | |
| 166 | func isNil(msg proto.Message) bool { |
| 167 | return msg == nil || reflect.ValueOf(msg).IsNil() |
| 168 | } |