blob: 1ef2493ed45c2d866a1258813b04b84c8f6fc615 [file] [log] [blame]
khenaidoo59ce9dd2019-11-11 13:05:32 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18 "go.etcd.io/etcd/pkg/types"
19 "go.etcd.io/etcd/raft/raftpb"
20
21 "go.uber.org/zap"
22)
23
24type remote struct {
25 lg *zap.Logger
26 localID types.ID
27 id types.ID
28 status *peerStatus
29 pipeline *pipeline
30}
31
32func startRemote(tr *Transport, urls types.URLs, id types.ID) *remote {
33 picker := newURLPicker(urls)
34 status := newPeerStatus(tr.Logger, tr.ID, id)
35 pipeline := &pipeline{
36 peerID: id,
37 tr: tr,
38 picker: picker,
39 status: status,
40 raft: tr.Raft,
41 errorc: tr.ErrorC,
42 }
43 pipeline.start()
44
45 return &remote{
46 lg: tr.Logger,
47 localID: tr.ID,
48 id: id,
49 status: status,
50 pipeline: pipeline,
51 }
52}
53
54func (g *remote) send(m raftpb.Message) {
55 select {
56 case g.pipeline.msgc <- m:
57 default:
58 if g.status.isActive() {
59 if g.lg != nil {
60 g.lg.Warn(
61 "dropped internal Raft message since sending buffer is full (overloaded network)",
62 zap.String("message-type", m.Type.String()),
63 zap.String("local-member-id", g.localID.String()),
64 zap.String("from", types.ID(m.From).String()),
65 zap.String("remote-peer-id", g.id.String()),
66 zap.Bool("remote-peer-active", g.status.isActive()),
67 )
68 } else {
69 plog.MergeWarningf("dropped internal raft message to %s since sending buffer is full (bad/overloaded network)", g.id)
70 }
71 } else {
72 if g.lg != nil {
73 g.lg.Warn(
74 "dropped Raft message since sending buffer is full (overloaded network)",
75 zap.String("message-type", m.Type.String()),
76 zap.String("local-member-id", g.localID.String()),
77 zap.String("from", types.ID(m.From).String()),
78 zap.String("remote-peer-id", g.id.String()),
79 zap.Bool("remote-peer-active", g.status.isActive()),
80 )
81 } else {
82 plog.Debugf("dropped %s to %s since sending buffer is full", m.Type, g.id)
83 }
84 }
85 sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
86 }
87}
88
89func (g *remote) stop() {
90 g.pipeline.stop()
91}
92
93func (g *remote) Pause() {
94 g.stop()
95}
96
97func (g *remote) Resume() {
98 g.pipeline.start()
99}