| // Copyright 2015 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package rafthttp |
| |
| import ( |
| "bytes" |
| "context" |
| "errors" |
| "io/ioutil" |
| "sync" |
| "time" |
| |
| stats "go.etcd.io/etcd/etcdserver/api/v2stats" |
| "go.etcd.io/etcd/pkg/pbutil" |
| "go.etcd.io/etcd/pkg/types" |
| "go.etcd.io/etcd/raft" |
| "go.etcd.io/etcd/raft/raftpb" |
| |
| "go.uber.org/zap" |
| ) |
| |
| const ( |
| connPerPipeline = 4 |
| // pipelineBufSize is the size of pipeline buffer, which helps hold the |
| // temporary network latency. |
| // The size ensures that pipeline does not drop messages when the network |
| // is out of work for less than 1 second in good path. |
| pipelineBufSize = 64 |
| ) |
| |
| var errStopped = errors.New("stopped") |
| |
| type pipeline struct { |
| peerID types.ID |
| |
| tr *Transport |
| picker *urlPicker |
| status *peerStatus |
| raft Raft |
| errorc chan error |
| // deprecate when we depercate v2 API |
| followerStats *stats.FollowerStats |
| |
| msgc chan raftpb.Message |
| // wait for the handling routines |
| wg sync.WaitGroup |
| stopc chan struct{} |
| } |
| |
| func (p *pipeline) start() { |
| p.stopc = make(chan struct{}) |
| p.msgc = make(chan raftpb.Message, pipelineBufSize) |
| p.wg.Add(connPerPipeline) |
| for i := 0; i < connPerPipeline; i++ { |
| go p.handle() |
| } |
| |
| if p.tr != nil && p.tr.Logger != nil { |
| p.tr.Logger.Info( |
| "started HTTP pipelining with remote peer", |
| zap.String("local-member-id", p.tr.ID.String()), |
| zap.String("remote-peer-id", p.peerID.String()), |
| ) |
| } else { |
| plog.Infof("started HTTP pipelining with peer %s", p.peerID) |
| } |
| } |
| |
| func (p *pipeline) stop() { |
| close(p.stopc) |
| p.wg.Wait() |
| |
| if p.tr != nil && p.tr.Logger != nil { |
| p.tr.Logger.Info( |
| "stopped HTTP pipelining with remote peer", |
| zap.String("local-member-id", p.tr.ID.String()), |
| zap.String("remote-peer-id", p.peerID.String()), |
| ) |
| } else { |
| plog.Infof("stopped HTTP pipelining with peer %s", p.peerID) |
| } |
| } |
| |
| func (p *pipeline) handle() { |
| defer p.wg.Done() |
| |
| for { |
| select { |
| case m := <-p.msgc: |
| start := time.Now() |
| err := p.post(pbutil.MustMarshal(&m)) |
| end := time.Now() |
| |
| if err != nil { |
| p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error()) |
| |
| if m.Type == raftpb.MsgApp && p.followerStats != nil { |
| p.followerStats.Fail() |
| } |
| p.raft.ReportUnreachable(m.To) |
| if isMsgSnap(m) { |
| p.raft.ReportSnapshot(m.To, raft.SnapshotFailure) |
| } |
| sentFailures.WithLabelValues(types.ID(m.To).String()).Inc() |
| continue |
| } |
| |
| p.status.activate() |
| if m.Type == raftpb.MsgApp && p.followerStats != nil { |
| p.followerStats.Succ(end.Sub(start)) |
| } |
| if isMsgSnap(m) { |
| p.raft.ReportSnapshot(m.To, raft.SnapshotFinish) |
| } |
| sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(m.Size())) |
| case <-p.stopc: |
| return |
| } |
| } |
| } |
| |
| // post POSTs a data payload to a url. Returns nil if the POST succeeds, |
| // error on any failure. |
| func (p *pipeline) post(data []byte) (err error) { |
| u := p.picker.pick() |
| req := createPostRequest(u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.tr.ID, p.tr.ClusterID) |
| |
| done := make(chan struct{}, 1) |
| ctx, cancel := context.WithCancel(context.Background()) |
| req = req.WithContext(ctx) |
| go func() { |
| select { |
| case <-done: |
| case <-p.stopc: |
| waitSchedule() |
| cancel() |
| } |
| }() |
| |
| resp, err := p.tr.pipelineRt.RoundTrip(req) |
| done <- struct{}{} |
| if err != nil { |
| p.picker.unreachable(u) |
| return err |
| } |
| defer resp.Body.Close() |
| b, err := ioutil.ReadAll(resp.Body) |
| if err != nil { |
| p.picker.unreachable(u) |
| return err |
| } |
| |
| err = checkPostResponse(resp, b, req, p.peerID) |
| if err != nil { |
| p.picker.unreachable(u) |
| // errMemberRemoved is a critical error since a removed member should |
| // always be stopped. So we use reportCriticalError to report it to errorc. |
| if err == errMemberRemoved { |
| reportCriticalError(err, p.errorc) |
| } |
| return err |
| } |
| |
| return nil |
| } |
| |
| // waitSchedule waits other goroutines to be scheduled for a while |
| func waitSchedule() { time.Sleep(time.Millisecond) } |