blob: 70f92575d13ebdd753b6909a2805d24f3dd820c8 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package rafthttp
16
17import (
18 "bytes"
19 "context"
20 "errors"
21 "io/ioutil"
22 "sync"
23 "time"
24
25 stats "go.etcd.io/etcd/etcdserver/api/v2stats"
26 "go.etcd.io/etcd/pkg/pbutil"
27 "go.etcd.io/etcd/pkg/types"
28 "go.etcd.io/etcd/raft"
29 "go.etcd.io/etcd/raft/raftpb"
30
31 "go.uber.org/zap"
32)
33
34const (
35 connPerPipeline = 4
36 // pipelineBufSize is the size of pipeline buffer, which helps hold the
37 // temporary network latency.
38 // The size ensures that pipeline does not drop messages when the network
39 // is out of work for less than 1 second in good path.
40 pipelineBufSize = 64
41)
42
43var errStopped = errors.New("stopped")
44
45type pipeline struct {
46 peerID types.ID
47
48 tr *Transport
49 picker *urlPicker
50 status *peerStatus
51 raft Raft
52 errorc chan error
53 // deprecate when we depercate v2 API
54 followerStats *stats.FollowerStats
55
56 msgc chan raftpb.Message
57 // wait for the handling routines
58 wg sync.WaitGroup
59 stopc chan struct{}
60}
61
62func (p *pipeline) start() {
63 p.stopc = make(chan struct{})
64 p.msgc = make(chan raftpb.Message, pipelineBufSize)
65 p.wg.Add(connPerPipeline)
66 for i := 0; i < connPerPipeline; i++ {
67 go p.handle()
68 }
69
70 if p.tr != nil && p.tr.Logger != nil {
71 p.tr.Logger.Info(
72 "started HTTP pipelining with remote peer",
73 zap.String("local-member-id", p.tr.ID.String()),
74 zap.String("remote-peer-id", p.peerID.String()),
75 )
76 } else {
77 plog.Infof("started HTTP pipelining with peer %s", p.peerID)
78 }
79}
80
81func (p *pipeline) stop() {
82 close(p.stopc)
83 p.wg.Wait()
84
85 if p.tr != nil && p.tr.Logger != nil {
86 p.tr.Logger.Info(
87 "stopped HTTP pipelining with remote peer",
88 zap.String("local-member-id", p.tr.ID.String()),
89 zap.String("remote-peer-id", p.peerID.String()),
90 )
91 } else {
92 plog.Infof("stopped HTTP pipelining with peer %s", p.peerID)
93 }
94}
95
96func (p *pipeline) handle() {
97 defer p.wg.Done()
98
99 for {
100 select {
101 case m := <-p.msgc:
102 start := time.Now()
103 err := p.post(pbutil.MustMarshal(&m))
104 end := time.Now()
105
106 if err != nil {
107 p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error())
108
109 if m.Type == raftpb.MsgApp && p.followerStats != nil {
110 p.followerStats.Fail()
111 }
112 p.raft.ReportUnreachable(m.To)
113 if isMsgSnap(m) {
114 p.raft.ReportSnapshot(m.To, raft.SnapshotFailure)
115 }
116 sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
117 continue
118 }
119
120 p.status.activate()
121 if m.Type == raftpb.MsgApp && p.followerStats != nil {
122 p.followerStats.Succ(end.Sub(start))
123 }
124 if isMsgSnap(m) {
125 p.raft.ReportSnapshot(m.To, raft.SnapshotFinish)
126 }
127 sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(m.Size()))
128 case <-p.stopc:
129 return
130 }
131 }
132}
133
134// post POSTs a data payload to a url. Returns nil if the POST succeeds,
135// error on any failure.
136func (p *pipeline) post(data []byte) (err error) {
137 u := p.picker.pick()
138 req := createPostRequest(u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.tr.ID, p.tr.ClusterID)
139
140 done := make(chan struct{}, 1)
141 ctx, cancel := context.WithCancel(context.Background())
142 req = req.WithContext(ctx)
143 go func() {
144 select {
145 case <-done:
146 case <-p.stopc:
147 waitSchedule()
148 cancel()
149 }
150 }()
151
152 resp, err := p.tr.pipelineRt.RoundTrip(req)
153 done <- struct{}{}
154 if err != nil {
155 p.picker.unreachable(u)
156 return err
157 }
158 defer resp.Body.Close()
159 b, err := ioutil.ReadAll(resp.Body)
160 if err != nil {
161 p.picker.unreachable(u)
162 return err
163 }
164
165 err = checkPostResponse(resp, b, req, p.peerID)
166 if err != nil {
167 p.picker.unreachable(u)
168 // errMemberRemoved is a critical error since a removed member should
169 // always be stopped. So we use reportCriticalError to report it to errorc.
170 if err == errMemberRemoved {
171 reportCriticalError(err, p.errorc)
172 }
173 return err
174 }
175
176 return nil
177}
178
179// waitSchedule waits other goroutines to be scheduled for a while
180func waitSchedule() { time.Sleep(time.Millisecond) }