[VOL-2235] Mocks and interfaces for rw-core
This update consists of mocks that are used by the rw-core
during unit testing. It also includes interfaces used for unit
tests.
Change-Id: I20ca1455c358113c3aa897acc6355e0ddbc614b7
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/pipeline.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/pipeline.go
new file mode 100644
index 0000000..70f9257
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/pipeline.go
@@ -0,0 +1,180 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "io/ioutil"
+ "sync"
+ "time"
+
+ stats "go.etcd.io/etcd/etcdserver/api/v2stats"
+ "go.etcd.io/etcd/pkg/pbutil"
+ "go.etcd.io/etcd/pkg/types"
+ "go.etcd.io/etcd/raft"
+ "go.etcd.io/etcd/raft/raftpb"
+
+ "go.uber.org/zap"
+)
+
+const (
+ connPerPipeline = 4
+ // pipelineBufSize is the size of pipeline buffer, which helps hold the
+ // temporary network latency.
+ // The size ensures that pipeline does not drop messages when the network
+ // is out of work for less than 1 second in good path.
+ pipelineBufSize = 64
+)
+
+var errStopped = errors.New("stopped")
+
+type pipeline struct {
+ peerID types.ID
+
+ tr *Transport
+ picker *urlPicker
+ status *peerStatus
+ raft Raft
+ errorc chan error
+ // deprecate when we depercate v2 API
+ followerStats *stats.FollowerStats
+
+ msgc chan raftpb.Message
+ // wait for the handling routines
+ wg sync.WaitGroup
+ stopc chan struct{}
+}
+
+func (p *pipeline) start() {
+ p.stopc = make(chan struct{})
+ p.msgc = make(chan raftpb.Message, pipelineBufSize)
+ p.wg.Add(connPerPipeline)
+ for i := 0; i < connPerPipeline; i++ {
+ go p.handle()
+ }
+
+ if p.tr != nil && p.tr.Logger != nil {
+ p.tr.Logger.Info(
+ "started HTTP pipelining with remote peer",
+ zap.String("local-member-id", p.tr.ID.String()),
+ zap.String("remote-peer-id", p.peerID.String()),
+ )
+ } else {
+ plog.Infof("started HTTP pipelining with peer %s", p.peerID)
+ }
+}
+
+func (p *pipeline) stop() {
+ close(p.stopc)
+ p.wg.Wait()
+
+ if p.tr != nil && p.tr.Logger != nil {
+ p.tr.Logger.Info(
+ "stopped HTTP pipelining with remote peer",
+ zap.String("local-member-id", p.tr.ID.String()),
+ zap.String("remote-peer-id", p.peerID.String()),
+ )
+ } else {
+ plog.Infof("stopped HTTP pipelining with peer %s", p.peerID)
+ }
+}
+
+func (p *pipeline) handle() {
+ defer p.wg.Done()
+
+ for {
+ select {
+ case m := <-p.msgc:
+ start := time.Now()
+ err := p.post(pbutil.MustMarshal(&m))
+ end := time.Now()
+
+ if err != nil {
+ p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error())
+
+ if m.Type == raftpb.MsgApp && p.followerStats != nil {
+ p.followerStats.Fail()
+ }
+ p.raft.ReportUnreachable(m.To)
+ if isMsgSnap(m) {
+ p.raft.ReportSnapshot(m.To, raft.SnapshotFailure)
+ }
+ sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
+ continue
+ }
+
+ p.status.activate()
+ if m.Type == raftpb.MsgApp && p.followerStats != nil {
+ p.followerStats.Succ(end.Sub(start))
+ }
+ if isMsgSnap(m) {
+ p.raft.ReportSnapshot(m.To, raft.SnapshotFinish)
+ }
+ sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(m.Size()))
+ case <-p.stopc:
+ return
+ }
+ }
+}
+
+// post POSTs a data payload to a url. Returns nil if the POST succeeds,
+// error on any failure.
+func (p *pipeline) post(data []byte) (err error) {
+ u := p.picker.pick()
+ req := createPostRequest(u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.tr.ID, p.tr.ClusterID)
+
+ done := make(chan struct{}, 1)
+ ctx, cancel := context.WithCancel(context.Background())
+ req = req.WithContext(ctx)
+ go func() {
+ select {
+ case <-done:
+ case <-p.stopc:
+ waitSchedule()
+ cancel()
+ }
+ }()
+
+ resp, err := p.tr.pipelineRt.RoundTrip(req)
+ done <- struct{}{}
+ if err != nil {
+ p.picker.unreachable(u)
+ return err
+ }
+ defer resp.Body.Close()
+ b, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ p.picker.unreachable(u)
+ return err
+ }
+
+ err = checkPostResponse(resp, b, req, p.peerID)
+ if err != nil {
+ p.picker.unreachable(u)
+ // errMemberRemoved is a critical error since a removed member should
+ // always be stopped. So we use reportCriticalError to report it to errorc.
+ if err == errMemberRemoved {
+ reportCriticalError(err, p.errorc)
+ }
+ return err
+ }
+
+ return nil
+}
+
+// waitSchedule waits other goroutines to be scheduled for a while
+func waitSchedule() { time.Sleep(time.Millisecond) }