[VOL-2235] Mocks and interfaces for rw-core

This update consists of mocks that are used by the rw-core
during unit testing.  It also includes interfaces used for unit
tests.

Change-Id: I20ca1455c358113c3aa897acc6355e0ddbc614b7
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/coder.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/coder.go
new file mode 100644
index 0000000..12c3e44
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/coder.go
@@ -0,0 +1,27 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import "go.etcd.io/etcd/raft/raftpb"
+
+type encoder interface {
+	// encode encodes the given message to an output stream.
+	encode(m *raftpb.Message) error
+}
+
+type decoder interface {
+	// decode decodes the message from an input stream.
+	decode() (raftpb.Message, error)
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/doc.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/doc.go
new file mode 100644
index 0000000..a9486a8
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/doc.go
@@ -0,0 +1,16 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package rafthttp implements HTTP transportation layer for etcd/raft pkg.
+package rafthttp
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/http.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/http.go
new file mode 100644
index 0000000..d0e0c81
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/http.go
@@ -0,0 +1,577 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"path"
+	"strings"
+	"time"
+
+	"go.etcd.io/etcd/etcdserver/api/snap"
+	pioutil "go.etcd.io/etcd/pkg/ioutil"
+	"go.etcd.io/etcd/pkg/types"
+	"go.etcd.io/etcd/raft/raftpb"
+	"go.etcd.io/etcd/version"
+
+	humanize "github.com/dustin/go-humanize"
+	"go.uber.org/zap"
+)
+
+const (
+	// connReadLimitByte limits the number of bytes
+	// a single read can read out.
+	//
+	// 64KB should be large enough for not causing
+	// throughput bottleneck as well as small enough
+	// for not causing a read timeout.
+	connReadLimitByte = 64 * 1024
+)
+
+var (
+	RaftPrefix         = "/raft"
+	ProbingPrefix      = path.Join(RaftPrefix, "probing")
+	RaftStreamPrefix   = path.Join(RaftPrefix, "stream")
+	RaftSnapshotPrefix = path.Join(RaftPrefix, "snapshot")
+
+	errIncompatibleVersion = errors.New("incompatible version")
+	errClusterIDMismatch   = errors.New("cluster ID mismatch")
+)
+
+type peerGetter interface {
+	Get(id types.ID) Peer
+}
+
+type writerToResponse interface {
+	WriteTo(w http.ResponseWriter)
+}
+
+type pipelineHandler struct {
+	lg      *zap.Logger
+	localID types.ID
+	tr      Transporter
+	r       Raft
+	cid     types.ID
+}
+
+// newPipelineHandler returns a handler for handling raft messages
+// from pipeline for RaftPrefix.
+//
+// The handler reads out the raft message from request body,
+// and forwards it to the given raft state machine for processing.
+func newPipelineHandler(t *Transport, r Raft, cid types.ID) http.Handler {
+	return &pipelineHandler{
+		lg:      t.Logger,
+		localID: t.ID,
+		tr:      t,
+		r:       r,
+		cid:     cid,
+	}
+}
+
+func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	if r.Method != "POST" {
+		w.Header().Set("Allow", "POST")
+		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
+		return
+	}
+
+	w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
+
+	if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
+		http.Error(w, err.Error(), http.StatusPreconditionFailed)
+		return
+	}
+
+	addRemoteFromRequest(h.tr, r)
+
+	// Limit the data size that could be read from the request body, which ensures that read from
+	// connection will not time out accidentally due to possible blocking in underlying implementation.
+	limitedr := pioutil.NewLimitedBufferReader(r.Body, connReadLimitByte)
+	b, err := ioutil.ReadAll(limitedr)
+	if err != nil {
+		if h.lg != nil {
+			h.lg.Warn(
+				"failed to read Raft message",
+				zap.String("local-member-id", h.localID.String()),
+				zap.Error(err),
+			)
+		} else {
+			plog.Errorf("failed to read raft message (%v)", err)
+		}
+		http.Error(w, "error reading raft message", http.StatusBadRequest)
+		recvFailures.WithLabelValues(r.RemoteAddr).Inc()
+		return
+	}
+
+	var m raftpb.Message
+	if err := m.Unmarshal(b); err != nil {
+		if h.lg != nil {
+			h.lg.Warn(
+				"failed to unmarshal Raft message",
+				zap.String("local-member-id", h.localID.String()),
+				zap.Error(err),
+			)
+		} else {
+			plog.Errorf("failed to unmarshal raft message (%v)", err)
+		}
+		http.Error(w, "error unmarshalling raft message", http.StatusBadRequest)
+		recvFailures.WithLabelValues(r.RemoteAddr).Inc()
+		return
+	}
+
+	receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(len(b)))
+
+	if err := h.r.Process(context.TODO(), m); err != nil {
+		switch v := err.(type) {
+		case writerToResponse:
+			v.WriteTo(w)
+		default:
+			if h.lg != nil {
+				h.lg.Warn(
+					"failed to process Raft message",
+					zap.String("local-member-id", h.localID.String()),
+					zap.Error(err),
+				)
+			} else {
+				plog.Warningf("failed to process raft message (%v)", err)
+			}
+			http.Error(w, "error processing raft message", http.StatusInternalServerError)
+			w.(http.Flusher).Flush()
+			// disconnect the http stream
+			panic(err)
+		}
+		return
+	}
+
+	// Write StatusNoContent header after the message has been processed by
+	// raft, which facilitates the client to report MsgSnap status.
+	w.WriteHeader(http.StatusNoContent)
+}
+
+type snapshotHandler struct {
+	lg          *zap.Logger
+	tr          Transporter
+	r           Raft
+	snapshotter *snap.Snapshotter
+
+	localID types.ID
+	cid     types.ID
+}
+
+func newSnapshotHandler(t *Transport, r Raft, snapshotter *snap.Snapshotter, cid types.ID) http.Handler {
+	return &snapshotHandler{
+		lg:          t.Logger,
+		tr:          t,
+		r:           r,
+		snapshotter: snapshotter,
+		localID:     t.ID,
+		cid:         cid,
+	}
+}
+
+const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER"
+
+// ServeHTTP serves HTTP request to receive and process snapshot message.
+//
+// If request sender dies without closing underlying TCP connection,
+// the handler will keep waiting for the request body until TCP keepalive
+// finds out that the connection is broken after several minutes.
+// This is acceptable because
+// 1. snapshot messages sent through other TCP connections could still be
+// received and processed.
+// 2. this case should happen rarely, so no further optimization is done.
+func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	start := time.Now()
+
+	if r.Method != "POST" {
+		w.Header().Set("Allow", "POST")
+		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
+		snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
+		return
+	}
+
+	w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
+
+	if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
+		http.Error(w, err.Error(), http.StatusPreconditionFailed)
+		snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
+		return
+	}
+
+	addRemoteFromRequest(h.tr, r)
+
+	dec := &messageDecoder{r: r.Body}
+	// let snapshots be very large since they can exceed 512MB for large installations
+	m, err := dec.decodeLimit(uint64(1 << 63))
+	from := types.ID(m.From).String()
+	if err != nil {
+		msg := fmt.Sprintf("failed to decode raft message (%v)", err)
+		if h.lg != nil {
+			h.lg.Warn(
+				"failed to decode Raft message",
+				zap.String("local-member-id", h.localID.String()),
+				zap.String("remote-snapshot-sender-id", from),
+				zap.Error(err),
+			)
+		} else {
+			plog.Error(msg)
+		}
+		http.Error(w, msg, http.StatusBadRequest)
+		recvFailures.WithLabelValues(r.RemoteAddr).Inc()
+		snapshotReceiveFailures.WithLabelValues(from).Inc()
+		return
+	}
+
+	msgSize := m.Size()
+	receivedBytes.WithLabelValues(from).Add(float64(msgSize))
+
+	if m.Type != raftpb.MsgSnap {
+		if h.lg != nil {
+			h.lg.Warn(
+				"unexpected Raft message type",
+				zap.String("local-member-id", h.localID.String()),
+				zap.String("remote-snapshot-sender-id", from),
+				zap.String("message-type", m.Type.String()),
+			)
+		} else {
+			plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
+		}
+		http.Error(w, "wrong raft message type", http.StatusBadRequest)
+		snapshotReceiveFailures.WithLabelValues(from).Inc()
+		return
+	}
+
+	snapshotReceiveInflights.WithLabelValues(from).Inc()
+	defer func() {
+		snapshotReceiveInflights.WithLabelValues(from).Dec()
+	}()
+
+	if h.lg != nil {
+		h.lg.Info(
+			"receiving database snapshot",
+			zap.String("local-member-id", h.localID.String()),
+			zap.String("remote-snapshot-sender-id", from),
+			zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
+			zap.Int("incoming-snapshot-message-size-bytes", msgSize),
+			zap.String("incoming-snapshot-message-size", humanize.Bytes(uint64(msgSize))),
+		)
+	} else {
+		plog.Infof("receiving database snapshot [index:%d, from %s] ...", m.Snapshot.Metadata.Index, types.ID(m.From))
+	}
+
+	// save incoming database snapshot.
+	n, err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index)
+	if err != nil {
+		msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
+		if h.lg != nil {
+			h.lg.Warn(
+				"failed to save incoming database snapshot",
+				zap.String("local-member-id", h.localID.String()),
+				zap.String("remote-snapshot-sender-id", from),
+				zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
+				zap.Error(err),
+			)
+		} else {
+			plog.Error(msg)
+		}
+		http.Error(w, msg, http.StatusInternalServerError)
+		snapshotReceiveFailures.WithLabelValues(from).Inc()
+		return
+	}
+
+	receivedBytes.WithLabelValues(from).Add(float64(n))
+
+	if h.lg != nil {
+		h.lg.Info(
+			"received and saved database snapshot",
+			zap.String("local-member-id", h.localID.String()),
+			zap.String("remote-snapshot-sender-id", from),
+			zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
+			zap.Int64("incoming-snapshot-size-bytes", n),
+			zap.String("incoming-snapshot-size", humanize.Bytes(uint64(n))),
+		)
+	} else {
+		plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
+	}
+
+	if err := h.r.Process(context.TODO(), m); err != nil {
+		switch v := err.(type) {
+		// Process may return writerToResponse error when doing some
+		// additional checks before calling raft.Node.Step.
+		case writerToResponse:
+			v.WriteTo(w)
+		default:
+			msg := fmt.Sprintf("failed to process raft message (%v)", err)
+			if h.lg != nil {
+				h.lg.Warn(
+					"failed to process Raft message",
+					zap.String("local-member-id", h.localID.String()),
+					zap.String("remote-snapshot-sender-id", from),
+					zap.Error(err),
+				)
+			} else {
+				plog.Error(msg)
+			}
+			http.Error(w, msg, http.StatusInternalServerError)
+			snapshotReceiveFailures.WithLabelValues(from).Inc()
+		}
+		return
+	}
+
+	// Write StatusNoContent header after the message has been processed by
+	// raft, which facilitates the client to report MsgSnap status.
+	w.WriteHeader(http.StatusNoContent)
+
+	snapshotReceive.WithLabelValues(from).Inc()
+	snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds())
+}
+
+type streamHandler struct {
+	lg         *zap.Logger
+	tr         *Transport
+	peerGetter peerGetter
+	r          Raft
+	id         types.ID
+	cid        types.ID
+}
+
+func newStreamHandler(t *Transport, pg peerGetter, r Raft, id, cid types.ID) http.Handler {
+	return &streamHandler{
+		lg:         t.Logger,
+		tr:         t,
+		peerGetter: pg,
+		r:          r,
+		id:         id,
+		cid:        cid,
+	}
+}
+
+func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+	if r.Method != "GET" {
+		w.Header().Set("Allow", "GET")
+		http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
+		return
+	}
+
+	w.Header().Set("X-Server-Version", version.Version)
+	w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
+
+	if err := checkClusterCompatibilityFromHeader(h.lg, h.tr.ID, r.Header, h.cid); err != nil {
+		http.Error(w, err.Error(), http.StatusPreconditionFailed)
+		return
+	}
+
+	var t streamType
+	switch path.Dir(r.URL.Path) {
+	case streamTypeMsgAppV2.endpoint():
+		t = streamTypeMsgAppV2
+	case streamTypeMessage.endpoint():
+		t = streamTypeMessage
+	default:
+		if h.lg != nil {
+			h.lg.Debug(
+				"ignored unexpected streaming request path",
+				zap.String("local-member-id", h.tr.ID.String()),
+				zap.String("remote-peer-id-stream-handler", h.id.String()),
+				zap.String("path", r.URL.Path),
+			)
+		} else {
+			plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path)
+		}
+		http.Error(w, "invalid path", http.StatusNotFound)
+		return
+	}
+
+	fromStr := path.Base(r.URL.Path)
+	from, err := types.IDFromString(fromStr)
+	if err != nil {
+		if h.lg != nil {
+			h.lg.Warn(
+				"failed to parse path into ID",
+				zap.String("local-member-id", h.tr.ID.String()),
+				zap.String("remote-peer-id-stream-handler", h.id.String()),
+				zap.String("path", fromStr),
+				zap.Error(err),
+			)
+		} else {
+			plog.Errorf("failed to parse from %s into ID (%v)", fromStr, err)
+		}
+		http.Error(w, "invalid from", http.StatusNotFound)
+		return
+	}
+	if h.r.IsIDRemoved(uint64(from)) {
+		if h.lg != nil {
+			h.lg.Warn(
+				"rejected stream from remote peer because it was removed",
+				zap.String("local-member-id", h.tr.ID.String()),
+				zap.String("remote-peer-id-stream-handler", h.id.String()),
+				zap.String("remote-peer-id-from", from.String()),
+			)
+		} else {
+			plog.Warningf("rejected the stream from peer %s since it was removed", from)
+		}
+		http.Error(w, "removed member", http.StatusGone)
+		return
+	}
+	p := h.peerGetter.Get(from)
+	if p == nil {
+		// This may happen in following cases:
+		// 1. user starts a remote peer that belongs to a different cluster
+		// with the same cluster ID.
+		// 2. local etcd falls behind of the cluster, and cannot recognize
+		// the members that joined after its current progress.
+		if urls := r.Header.Get("X-PeerURLs"); urls != "" {
+			h.tr.AddRemote(from, strings.Split(urls, ","))
+		}
+		if h.lg != nil {
+			h.lg.Warn(
+				"failed to find remote peer in cluster",
+				zap.String("local-member-id", h.tr.ID.String()),
+				zap.String("remote-peer-id-stream-handler", h.id.String()),
+				zap.String("remote-peer-id-from", from.String()),
+				zap.String("cluster-id", h.cid.String()),
+			)
+		} else {
+			plog.Errorf("failed to find member %s in cluster %s", from, h.cid)
+		}
+		http.Error(w, "error sender not found", http.StatusNotFound)
+		return
+	}
+
+	wto := h.id.String()
+	if gto := r.Header.Get("X-Raft-To"); gto != wto {
+		if h.lg != nil {
+			h.lg.Warn(
+				"ignored streaming request; ID mismatch",
+				zap.String("local-member-id", h.tr.ID.String()),
+				zap.String("remote-peer-id-stream-handler", h.id.String()),
+				zap.String("remote-peer-id-header", gto),
+				zap.String("remote-peer-id-from", from.String()),
+				zap.String("cluster-id", h.cid.String()),
+			)
+		} else {
+			plog.Errorf("streaming request ignored (ID mismatch got %s want %s)", gto, wto)
+		}
+		http.Error(w, "to field mismatch", http.StatusPreconditionFailed)
+		return
+	}
+
+	w.WriteHeader(http.StatusOK)
+	w.(http.Flusher).Flush()
+
+	c := newCloseNotifier()
+	conn := &outgoingConn{
+		t:       t,
+		Writer:  w,
+		Flusher: w.(http.Flusher),
+		Closer:  c,
+		localID: h.tr.ID,
+		peerID:  h.id,
+	}
+	p.attachOutgoingConn(conn)
+	<-c.closeNotify()
+}
+
+// checkClusterCompatibilityFromHeader checks the cluster compatibility of
+// the local member from the given header.
+// It checks whether the version of local member is compatible with
+// the versions in the header, and whether the cluster ID of local member
+// matches the one in the header.
+func checkClusterCompatibilityFromHeader(lg *zap.Logger, localID types.ID, header http.Header, cid types.ID) error {
+	remoteName := header.Get("X-Server-From")
+
+	remoteServer := serverVersion(header)
+	remoteVs := ""
+	if remoteServer != nil {
+		remoteVs = remoteServer.String()
+	}
+
+	remoteMinClusterVer := minClusterVersion(header)
+	remoteMinClusterVs := ""
+	if remoteMinClusterVer != nil {
+		remoteMinClusterVs = remoteMinClusterVer.String()
+	}
+
+	localServer, localMinCluster, err := checkVersionCompatibility(remoteName, remoteServer, remoteMinClusterVer)
+
+	localVs := ""
+	if localServer != nil {
+		localVs = localServer.String()
+	}
+	localMinClusterVs := ""
+	if localMinCluster != nil {
+		localMinClusterVs = localMinCluster.String()
+	}
+
+	if err != nil {
+		if lg != nil {
+			lg.Warn(
+				"failed to check version compatibility",
+				zap.String("local-member-id", localID.String()),
+				zap.String("local-member-cluster-id", cid.String()),
+				zap.String("local-member-server-version", localVs),
+				zap.String("local-member-server-minimum-cluster-version", localMinClusterVs),
+				zap.String("remote-peer-server-name", remoteName),
+				zap.String("remote-peer-server-version", remoteVs),
+				zap.String("remote-peer-server-minimum-cluster-version", remoteMinClusterVs),
+				zap.Error(err),
+			)
+		} else {
+			plog.Errorf("request version incompatibility (%v)", err)
+		}
+		return errIncompatibleVersion
+	}
+	if gcid := header.Get("X-Etcd-Cluster-ID"); gcid != cid.String() {
+		if lg != nil {
+			lg.Warn(
+				"request cluster ID mismatch",
+				zap.String("local-member-id", localID.String()),
+				zap.String("local-member-cluster-id", cid.String()),
+				zap.String("local-member-server-version", localVs),
+				zap.String("local-member-server-minimum-cluster-version", localMinClusterVs),
+				zap.String("remote-peer-server-name", remoteName),
+				zap.String("remote-peer-server-version", remoteVs),
+				zap.String("remote-peer-server-minimum-cluster-version", remoteMinClusterVs),
+				zap.String("remote-peer-cluster-id", gcid),
+			)
+		} else {
+			plog.Errorf("request cluster ID mismatch (got %s want %s)", gcid, cid)
+		}
+		return errClusterIDMismatch
+	}
+	return nil
+}
+
+type closeNotifier struct {
+	done chan struct{}
+}
+
+func newCloseNotifier() *closeNotifier {
+	return &closeNotifier{
+		done: make(chan struct{}),
+	}
+}
+
+func (n *closeNotifier) Close() error {
+	close(n.done)
+	return nil
+}
+
+func (n *closeNotifier) closeNotify() <-chan struct{} { return n.done }
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/metrics.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/metrics.go
new file mode 100644
index 0000000..02fff84
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/metrics.go
@@ -0,0 +1,186 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import "github.com/prometheus/client_golang/prometheus"
+
+var (
+	activePeers = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "active_peers",
+		Help:      "The current number of active peer connections.",
+	},
+		[]string{"Local", "Remote"},
+	)
+
+	disconnectedPeers = prometheus.NewCounterVec(prometheus.CounterOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "disconnected_peers_total",
+		Help:      "The total number of disconnected peers.",
+	},
+		[]string{"Local", "Remote"},
+	)
+
+	sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "peer_sent_bytes_total",
+		Help:      "The total number of bytes sent to peers.",
+	},
+		[]string{"To"},
+	)
+
+	receivedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "peer_received_bytes_total",
+		Help:      "The total number of bytes received from peers.",
+	},
+		[]string{"From"},
+	)
+
+	sentFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "peer_sent_failures_total",
+		Help:      "The total number of send failures from peers.",
+	},
+		[]string{"To"},
+	)
+
+	recvFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "peer_received_failures_total",
+		Help:      "The total number of receive failures from peers.",
+	},
+		[]string{"From"},
+	)
+
+	snapshotSend = prometheus.NewCounterVec(prometheus.CounterOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "snapshot_send_success",
+		Help:      "Total number of successful snapshot sends",
+	},
+		[]string{"To"},
+	)
+
+	snapshotSendInflights = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "snapshot_send_inflights_total",
+		Help:      "Total number of inflight snapshot sends",
+	},
+		[]string{"To"},
+	)
+
+	snapshotSendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "snapshot_send_failures",
+		Help:      "Total number of snapshot send failures",
+	},
+		[]string{"To"},
+	)
+
+	snapshotSendSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "snapshot_send_total_duration_seconds",
+		Help:      "Total latency distributions of v3 snapshot sends",
+
+		// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
+		// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
+		Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
+	},
+		[]string{"To"},
+	)
+
+	snapshotReceive = prometheus.NewCounterVec(prometheus.CounterOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "snapshot_receive_success",
+		Help:      "Total number of successful snapshot receives",
+	},
+		[]string{"From"},
+	)
+
+	snapshotReceiveInflights = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "snapshot_receive_inflights_total",
+		Help:      "Total number of inflight snapshot receives",
+	},
+		[]string{"From"},
+	)
+
+	snapshotReceiveFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "snapshot_receive_failures",
+		Help:      "Total number of snapshot receive failures",
+	},
+		[]string{"From"},
+	)
+
+	snapshotReceiveSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "snapshot_receive_total_duration_seconds",
+		Help:      "Total latency distributions of v3 snapshot receives",
+
+		// lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
+		// highest bucket start of 0.1 sec * 2^9 == 51.2 sec
+		Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
+	},
+		[]string{"From"},
+	)
+
+	rttSec = prometheus.NewHistogramVec(prometheus.HistogramOpts{
+		Namespace: "etcd",
+		Subsystem: "network",
+		Name:      "peer_round_trip_time_seconds",
+		Help:      "Round-Trip-Time histogram between peers",
+
+		// lowest bucket start of upper bound 0.0001 sec (0.1 ms) with factor 2
+		// highest bucket start of 0.0001 sec * 2^15 == 3.2768 sec
+		Buckets: prometheus.ExponentialBuckets(0.0001, 2, 16),
+	},
+		[]string{"To"},
+	)
+)
+
+func init() {
+	prometheus.MustRegister(activePeers)
+	prometheus.MustRegister(disconnectedPeers)
+	prometheus.MustRegister(sentBytes)
+	prometheus.MustRegister(receivedBytes)
+	prometheus.MustRegister(sentFailures)
+	prometheus.MustRegister(recvFailures)
+
+	prometheus.MustRegister(snapshotSend)
+	prometheus.MustRegister(snapshotSendInflights)
+	prometheus.MustRegister(snapshotSendFailures)
+	prometheus.MustRegister(snapshotSendSeconds)
+	prometheus.MustRegister(snapshotReceive)
+	prometheus.MustRegister(snapshotReceiveInflights)
+	prometheus.MustRegister(snapshotReceiveFailures)
+	prometheus.MustRegister(snapshotReceiveSeconds)
+
+	prometheus.MustRegister(rttSec)
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/msg_codec.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/msg_codec.go
new file mode 100644
index 0000000..2417d22
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/msg_codec.go
@@ -0,0 +1,68 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"encoding/binary"
+	"errors"
+	"io"
+
+	"go.etcd.io/etcd/pkg/pbutil"
+	"go.etcd.io/etcd/raft/raftpb"
+)
+
+// messageEncoder is a encoder that can encode all kinds of messages.
+// It MUST be used with a paired messageDecoder.
+type messageEncoder struct {
+	w io.Writer
+}
+
+func (enc *messageEncoder) encode(m *raftpb.Message) error {
+	if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
+		return err
+	}
+	_, err := enc.w.Write(pbutil.MustMarshal(m))
+	return err
+}
+
+// messageDecoder is a decoder that can decode all kinds of messages.
+type messageDecoder struct {
+	r io.Reader
+}
+
+var (
+	readBytesLimit     uint64 = 512 * 1024 * 1024 // 512 MB
+	ErrExceedSizeLimit        = errors.New("rafthttp: error limit exceeded")
+)
+
+func (dec *messageDecoder) decode() (raftpb.Message, error) {
+	return dec.decodeLimit(readBytesLimit)
+}
+
+func (dec *messageDecoder) decodeLimit(numBytes uint64) (raftpb.Message, error) {
+	var m raftpb.Message
+	var l uint64
+	if err := binary.Read(dec.r, binary.BigEndian, &l); err != nil {
+		return m, err
+	}
+	if l > numBytes {
+		return m, ErrExceedSizeLimit
+	}
+	buf := make([]byte, int(l))
+	if _, err := io.ReadFull(dec.r, buf); err != nil {
+		return m, err
+	}
+	return m, m.Unmarshal(buf)
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/msgappv2_codec.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/msgappv2_codec.go
new file mode 100644
index 0000000..1fa36de
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/msgappv2_codec.go
@@ -0,0 +1,248 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"encoding/binary"
+	"fmt"
+	"io"
+	"time"
+
+	stats "go.etcd.io/etcd/etcdserver/api/v2stats"
+	"go.etcd.io/etcd/pkg/pbutil"
+	"go.etcd.io/etcd/pkg/types"
+	"go.etcd.io/etcd/raft/raftpb"
+)
+
+const (
+	msgTypeLinkHeartbeat uint8 = 0
+	msgTypeAppEntries    uint8 = 1
+	msgTypeApp           uint8 = 2
+
+	msgAppV2BufSize = 1024 * 1024
+)
+
+// msgappv2 stream sends three types of message: linkHeartbeatMessage,
+// AppEntries and MsgApp. AppEntries is the MsgApp that is sent in
+// replicate state in raft, whose index and term are fully predictable.
+//
+// Data format of linkHeartbeatMessage:
+// | offset | bytes | description |
+// +--------+-------+-------------+
+// | 0      | 1     | \x00        |
+//
+// Data format of AppEntries:
+// | offset | bytes | description |
+// +--------+-------+-------------+
+// | 0      | 1     | \x01        |
+// | 1      | 8     | length of entries |
+// | 9      | 8     | length of first entry |
+// | 17     | n1    | first entry |
+// ...
+// | x      | 8     | length of k-th entry data |
+// | x+8    | nk    | k-th entry data |
+// | x+8+nk | 8     | commit index |
+//
+// Data format of MsgApp:
+// | offset | bytes | description |
+// +--------+-------+-------------+
+// | 0      | 1     | \x02        |
+// | 1      | 8     | length of encoded message |
+// | 9      | n     | encoded message |
+type msgAppV2Encoder struct {
+	w  io.Writer
+	fs *stats.FollowerStats
+
+	term      uint64
+	index     uint64
+	buf       []byte
+	uint64buf []byte
+	uint8buf  []byte
+}
+
+func newMsgAppV2Encoder(w io.Writer, fs *stats.FollowerStats) *msgAppV2Encoder {
+	return &msgAppV2Encoder{
+		w:         w,
+		fs:        fs,
+		buf:       make([]byte, msgAppV2BufSize),
+		uint64buf: make([]byte, 8),
+		uint8buf:  make([]byte, 1),
+	}
+}
+
+func (enc *msgAppV2Encoder) encode(m *raftpb.Message) error {
+	start := time.Now()
+	switch {
+	case isLinkHeartbeatMessage(m):
+		enc.uint8buf[0] = msgTypeLinkHeartbeat
+		if _, err := enc.w.Write(enc.uint8buf); err != nil {
+			return err
+		}
+	case enc.index == m.Index && enc.term == m.LogTerm && m.LogTerm == m.Term:
+		enc.uint8buf[0] = msgTypeAppEntries
+		if _, err := enc.w.Write(enc.uint8buf); err != nil {
+			return err
+		}
+		// write length of entries
+		binary.BigEndian.PutUint64(enc.uint64buf, uint64(len(m.Entries)))
+		if _, err := enc.w.Write(enc.uint64buf); err != nil {
+			return err
+		}
+		for i := 0; i < len(m.Entries); i++ {
+			// write length of entry
+			binary.BigEndian.PutUint64(enc.uint64buf, uint64(m.Entries[i].Size()))
+			if _, err := enc.w.Write(enc.uint64buf); err != nil {
+				return err
+			}
+			if n := m.Entries[i].Size(); n < msgAppV2BufSize {
+				if _, err := m.Entries[i].MarshalTo(enc.buf); err != nil {
+					return err
+				}
+				if _, err := enc.w.Write(enc.buf[:n]); err != nil {
+					return err
+				}
+			} else {
+				if _, err := enc.w.Write(pbutil.MustMarshal(&m.Entries[i])); err != nil {
+					return err
+				}
+			}
+			enc.index++
+		}
+		// write commit index
+		binary.BigEndian.PutUint64(enc.uint64buf, m.Commit)
+		if _, err := enc.w.Write(enc.uint64buf); err != nil {
+			return err
+		}
+		enc.fs.Succ(time.Since(start))
+	default:
+		if err := binary.Write(enc.w, binary.BigEndian, msgTypeApp); err != nil {
+			return err
+		}
+		// write size of message
+		if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
+			return err
+		}
+		// write message
+		if _, err := enc.w.Write(pbutil.MustMarshal(m)); err != nil {
+			return err
+		}
+
+		enc.term = m.Term
+		enc.index = m.Index
+		if l := len(m.Entries); l > 0 {
+			enc.index = m.Entries[l-1].Index
+		}
+		enc.fs.Succ(time.Since(start))
+	}
+	return nil
+}
+
+type msgAppV2Decoder struct {
+	r             io.Reader
+	local, remote types.ID
+
+	term      uint64
+	index     uint64
+	buf       []byte
+	uint64buf []byte
+	uint8buf  []byte
+}
+
+func newMsgAppV2Decoder(r io.Reader, local, remote types.ID) *msgAppV2Decoder {
+	return &msgAppV2Decoder{
+		r:         r,
+		local:     local,
+		remote:    remote,
+		buf:       make([]byte, msgAppV2BufSize),
+		uint64buf: make([]byte, 8),
+		uint8buf:  make([]byte, 1),
+	}
+}
+
+func (dec *msgAppV2Decoder) decode() (raftpb.Message, error) {
+	var (
+		m   raftpb.Message
+		typ uint8
+	)
+	if _, err := io.ReadFull(dec.r, dec.uint8buf); err != nil {
+		return m, err
+	}
+	typ = dec.uint8buf[0]
+	switch typ {
+	case msgTypeLinkHeartbeat:
+		return linkHeartbeatMessage, nil
+	case msgTypeAppEntries:
+		m = raftpb.Message{
+			Type:    raftpb.MsgApp,
+			From:    uint64(dec.remote),
+			To:      uint64(dec.local),
+			Term:    dec.term,
+			LogTerm: dec.term,
+			Index:   dec.index,
+		}
+
+		// decode entries
+		if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil {
+			return m, err
+		}
+		l := binary.BigEndian.Uint64(dec.uint64buf)
+		m.Entries = make([]raftpb.Entry, int(l))
+		for i := 0; i < int(l); i++ {
+			if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil {
+				return m, err
+			}
+			size := binary.BigEndian.Uint64(dec.uint64buf)
+			var buf []byte
+			if size < msgAppV2BufSize {
+				buf = dec.buf[:size]
+				if _, err := io.ReadFull(dec.r, buf); err != nil {
+					return m, err
+				}
+			} else {
+				buf = make([]byte, int(size))
+				if _, err := io.ReadFull(dec.r, buf); err != nil {
+					return m, err
+				}
+			}
+			dec.index++
+			// 1 alloc
+			pbutil.MustUnmarshal(&m.Entries[i], buf)
+		}
+		// decode commit index
+		if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil {
+			return m, err
+		}
+		m.Commit = binary.BigEndian.Uint64(dec.uint64buf)
+	case msgTypeApp:
+		var size uint64
+		if err := binary.Read(dec.r, binary.BigEndian, &size); err != nil {
+			return m, err
+		}
+		buf := make([]byte, int(size))
+		if _, err := io.ReadFull(dec.r, buf); err != nil {
+			return m, err
+		}
+		pbutil.MustUnmarshal(&m, buf)
+
+		dec.term = m.Term
+		dec.index = m.Index
+		if l := len(m.Entries); l > 0 {
+			dec.index = m.Entries[l-1].Index
+		}
+	default:
+		return m, fmt.Errorf("failed to parse type %d in msgappv2 stream", typ)
+	}
+	return m, nil
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/peer.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/peer.go
new file mode 100644
index 0000000..8130c4a
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/peer.go
@@ -0,0 +1,374 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"context"
+	"sync"
+	"time"
+
+	"go.etcd.io/etcd/etcdserver/api/snap"
+	stats "go.etcd.io/etcd/etcdserver/api/v2stats"
+	"go.etcd.io/etcd/pkg/types"
+	"go.etcd.io/etcd/raft"
+	"go.etcd.io/etcd/raft/raftpb"
+
+	"go.uber.org/zap"
+	"golang.org/x/time/rate"
+)
+
+const (
+	// ConnReadTimeout and ConnWriteTimeout are the i/o timeout set on each connection rafthttp pkg creates.
+	// A 5 seconds timeout is good enough for recycling bad connections. Or we have to wait for
+	// tcp keepalive failing to detect a bad connection, which is at minutes level.
+	// For long term streaming connections, rafthttp pkg sends application level linkHeartbeatMessage
+	// to keep the connection alive.
+	// For short term pipeline connections, the connection MUST be killed to avoid it being
+	// put back to http pkg connection pool.
+	ConnReadTimeout  = 5 * time.Second
+	ConnWriteTimeout = 5 * time.Second
+
+	recvBufSize = 4096
+	// maxPendingProposals holds the proposals during one leader election process.
+	// Generally one leader election takes at most 1 sec. It should have
+	// 0-2 election conflicts, and each one takes 0.5 sec.
+	// We assume the number of concurrent proposers is smaller than 4096.
+	// One client blocks on its proposal for at least 1 sec, so 4096 is enough
+	// to hold all proposals.
+	maxPendingProposals = 4096
+
+	streamAppV2 = "streamMsgAppV2"
+	streamMsg   = "streamMsg"
+	pipelineMsg = "pipeline"
+	sendSnap    = "sendMsgSnap"
+)
+
+type Peer interface {
+	// send sends the message to the remote peer. The function is non-blocking
+	// and has no promise that the message will be received by the remote.
+	// When it fails to send message out, it will report the status to underlying
+	// raft.
+	send(m raftpb.Message)
+
+	// sendSnap sends the merged snapshot message to the remote peer. Its behavior
+	// is similar to send.
+	sendSnap(m snap.Message)
+
+	// update updates the urls of remote peer.
+	update(urls types.URLs)
+
+	// attachOutgoingConn attaches the outgoing connection to the peer for
+	// stream usage. After the call, the ownership of the outgoing
+	// connection hands over to the peer. The peer will close the connection
+	// when it is no longer used.
+	attachOutgoingConn(conn *outgoingConn)
+	// activeSince returns the time that the connection with the
+	// peer becomes active.
+	activeSince() time.Time
+	// stop performs any necessary finalization and terminates the peer
+	// elegantly.
+	stop()
+}
+
+// peer is the representative of a remote raft node. Local raft node sends
+// messages to the remote through peer.
+// Each peer has two underlying mechanisms to send out a message: stream and
+// pipeline.
+// A stream is a receiver initialized long-polling connection, which
+// is always open to transfer messages. Besides general stream, peer also has
+// a optimized stream for sending msgApp since msgApp accounts for large part
+// of all messages. Only raft leader uses the optimized stream to send msgApp
+// to the remote follower node.
+// A pipeline is a series of http clients that send http requests to the remote.
+// It is only used when the stream has not been established.
+type peer struct {
+	lg *zap.Logger
+
+	localID types.ID
+	// id of the remote raft peer node
+	id types.ID
+
+	r Raft
+
+	status *peerStatus
+
+	picker *urlPicker
+
+	msgAppV2Writer *streamWriter
+	writer         *streamWriter
+	pipeline       *pipeline
+	snapSender     *snapshotSender // snapshot sender to send v3 snapshot messages
+	msgAppV2Reader *streamReader
+	msgAppReader   *streamReader
+
+	recvc chan raftpb.Message
+	propc chan raftpb.Message
+
+	mu     sync.Mutex
+	paused bool
+
+	cancel context.CancelFunc // cancel pending works in go routine created by peer.
+	stopc  chan struct{}
+}
+
+func startPeer(t *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer {
+	if t.Logger != nil {
+		t.Logger.Info("starting remote peer", zap.String("remote-peer-id", peerID.String()))
+	} else {
+		plog.Infof("starting peer %s...", peerID)
+	}
+	defer func() {
+		if t.Logger != nil {
+			t.Logger.Info("started remote peer", zap.String("remote-peer-id", peerID.String()))
+		} else {
+			plog.Infof("started peer %s", peerID)
+		}
+	}()
+
+	status := newPeerStatus(t.Logger, t.ID, peerID)
+	picker := newURLPicker(urls)
+	errorc := t.ErrorC
+	r := t.Raft
+	pipeline := &pipeline{
+		peerID:        peerID,
+		tr:            t,
+		picker:        picker,
+		status:        status,
+		followerStats: fs,
+		raft:          r,
+		errorc:        errorc,
+	}
+	pipeline.start()
+
+	p := &peer{
+		lg:             t.Logger,
+		localID:        t.ID,
+		id:             peerID,
+		r:              r,
+		status:         status,
+		picker:         picker,
+		msgAppV2Writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
+		writer:         startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
+		pipeline:       pipeline,
+		snapSender:     newSnapshotSender(t, picker, peerID, status),
+		recvc:          make(chan raftpb.Message, recvBufSize),
+		propc:          make(chan raftpb.Message, maxPendingProposals),
+		stopc:          make(chan struct{}),
+	}
+
+	ctx, cancel := context.WithCancel(context.Background())
+	p.cancel = cancel
+	go func() {
+		for {
+			select {
+			case mm := <-p.recvc:
+				if err := r.Process(ctx, mm); err != nil {
+					if t.Logger != nil {
+						t.Logger.Warn("failed to process Raft message", zap.Error(err))
+					} else {
+						plog.Warningf("failed to process raft message (%v)", err)
+					}
+				}
+			case <-p.stopc:
+				return
+			}
+		}
+	}()
+
+	// r.Process might block for processing proposal when there is no leader.
+	// Thus propc must be put into a separate routine with recvc to avoid blocking
+	// processing other raft messages.
+	go func() {
+		for {
+			select {
+			case mm := <-p.propc:
+				if err := r.Process(ctx, mm); err != nil {
+					plog.Warningf("failed to process raft message (%v)", err)
+				}
+			case <-p.stopc:
+				return
+			}
+		}
+	}()
+
+	p.msgAppV2Reader = &streamReader{
+		lg:     t.Logger,
+		peerID: peerID,
+		typ:    streamTypeMsgAppV2,
+		tr:     t,
+		picker: picker,
+		status: status,
+		recvc:  p.recvc,
+		propc:  p.propc,
+		rl:     rate.NewLimiter(t.DialRetryFrequency, 1),
+	}
+	p.msgAppReader = &streamReader{
+		lg:     t.Logger,
+		peerID: peerID,
+		typ:    streamTypeMessage,
+		tr:     t,
+		picker: picker,
+		status: status,
+		recvc:  p.recvc,
+		propc:  p.propc,
+		rl:     rate.NewLimiter(t.DialRetryFrequency, 1),
+	}
+
+	p.msgAppV2Reader.start()
+	p.msgAppReader.start()
+
+	return p
+}
+
+func (p *peer) send(m raftpb.Message) {
+	p.mu.Lock()
+	paused := p.paused
+	p.mu.Unlock()
+
+	if paused {
+		return
+	}
+
+	writec, name := p.pick(m)
+	select {
+	case writec <- m:
+	default:
+		p.r.ReportUnreachable(m.To)
+		if isMsgSnap(m) {
+			p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
+		}
+		if p.status.isActive() {
+			if p.lg != nil {
+				p.lg.Warn(
+					"dropped internal Raft message since sending buffer is full (overloaded network)",
+					zap.String("message-type", m.Type.String()),
+					zap.String("local-member-id", p.localID.String()),
+					zap.String("from", types.ID(m.From).String()),
+					zap.String("remote-peer-id", p.id.String()),
+					zap.Bool("remote-peer-active", p.status.isActive()),
+				)
+			} else {
+				plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full (bad/overloaded network)", p.id, name)
+			}
+		} else {
+			if p.lg != nil {
+				p.lg.Warn(
+					"dropped internal Raft message since sending buffer is full (overloaded network)",
+					zap.String("message-type", m.Type.String()),
+					zap.String("local-member-id", p.localID.String()),
+					zap.String("from", types.ID(m.From).String()),
+					zap.String("remote-peer-id", p.id.String()),
+					zap.Bool("remote-peer-active", p.status.isActive()),
+				)
+			} else {
+				plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)
+			}
+		}
+		sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
+	}
+}
+
+func (p *peer) sendSnap(m snap.Message) {
+	go p.snapSender.send(m)
+}
+
+func (p *peer) update(urls types.URLs) {
+	p.picker.update(urls)
+}
+
+func (p *peer) attachOutgoingConn(conn *outgoingConn) {
+	var ok bool
+	switch conn.t {
+	case streamTypeMsgAppV2:
+		ok = p.msgAppV2Writer.attach(conn)
+	case streamTypeMessage:
+		ok = p.writer.attach(conn)
+	default:
+		if p.lg != nil {
+			p.lg.Panic("unknown stream type", zap.String("type", conn.t.String()))
+		} else {
+			plog.Panicf("unhandled stream type %s", conn.t)
+		}
+	}
+	if !ok {
+		conn.Close()
+	}
+}
+
+func (p *peer) activeSince() time.Time { return p.status.activeSince() }
+
+// Pause pauses the peer. The peer will simply drops all incoming
+// messages without returning an error.
+func (p *peer) Pause() {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	p.paused = true
+	p.msgAppReader.pause()
+	p.msgAppV2Reader.pause()
+}
+
+// Resume resumes a paused peer.
+func (p *peer) Resume() {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	p.paused = false
+	p.msgAppReader.resume()
+	p.msgAppV2Reader.resume()
+}
+
+func (p *peer) stop() {
+	if p.lg != nil {
+		p.lg.Info("stopping remote peer", zap.String("remote-peer-id", p.id.String()))
+	} else {
+		plog.Infof("stopping peer %s...", p.id)
+	}
+
+	defer func() {
+		if p.lg != nil {
+			p.lg.Info("stopped remote peer", zap.String("remote-peer-id", p.id.String()))
+		} else {
+			plog.Infof("stopped peer %s", p.id)
+		}
+	}()
+
+	close(p.stopc)
+	p.cancel()
+	p.msgAppV2Writer.stop()
+	p.writer.stop()
+	p.pipeline.stop()
+	p.snapSender.stop()
+	p.msgAppV2Reader.stop()
+	p.msgAppReader.stop()
+}
+
+// pick picks a chan for sending the given message. The picked chan and the picked chan
+// string name are returned.
+func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
+	var ok bool
+	// Considering MsgSnap may have a big size, e.g., 1G, and will block
+	// stream for a long time, only use one of the N pipelines to send MsgSnap.
+	if isMsgSnap(m) {
+		return p.pipeline.msgc, pipelineMsg
+	} else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) {
+		return writec, streamAppV2
+	} else if writec, ok = p.writer.writec(); ok {
+		return writec, streamMsg
+	}
+	return p.pipeline.msgc, pipelineMsg
+}
+
+func isMsgApp(m raftpb.Message) bool { return m.Type == raftpb.MsgApp }
+
+func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap }
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/peer_status.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/peer_status.go
new file mode 100644
index 0000000..66149ff
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/peer_status.go
@@ -0,0 +1,96 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"errors"
+	"fmt"
+	"sync"
+	"time"
+
+	"go.etcd.io/etcd/pkg/types"
+
+	"go.uber.org/zap"
+)
+
+type failureType struct {
+	source string
+	action string
+}
+
+type peerStatus struct {
+	lg     *zap.Logger
+	local  types.ID
+	id     types.ID
+	mu     sync.Mutex // protect variables below
+	active bool
+	since  time.Time
+}
+
+func newPeerStatus(lg *zap.Logger, local, id types.ID) *peerStatus {
+	return &peerStatus{lg: lg, local: local, id: id}
+}
+
+func (s *peerStatus) activate() {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if !s.active {
+		if s.lg != nil {
+			s.lg.Info("peer became active", zap.String("peer-id", s.id.String()))
+		} else {
+			plog.Infof("peer %s became active", s.id)
+		}
+		s.active = true
+		s.since = time.Now()
+
+		activePeers.WithLabelValues(s.local.String(), s.id.String()).Inc()
+	}
+}
+
+func (s *peerStatus) deactivate(failure failureType, reason string) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	msg := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason)
+	if s.active {
+		if s.lg != nil {
+			s.lg.Warn("peer became inactive (message send to peer failed)", zap.String("peer-id", s.id.String()), zap.Error(errors.New(msg)))
+		} else {
+			plog.Errorf(msg)
+			plog.Infof("peer %s became inactive (message send to peer failed)", s.id)
+		}
+		s.active = false
+		s.since = time.Time{}
+
+		activePeers.WithLabelValues(s.local.String(), s.id.String()).Dec()
+		disconnectedPeers.WithLabelValues(s.local.String(), s.id.String()).Inc()
+		return
+	}
+
+	if s.lg != nil {
+		s.lg.Debug("peer deactivated again", zap.String("peer-id", s.id.String()), zap.Error(errors.New(msg)))
+	}
+}
+
+func (s *peerStatus) isActive() bool {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.active
+}
+
+func (s *peerStatus) activeSince() time.Time {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.since
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/pipeline.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/pipeline.go
new file mode 100644
index 0000000..70f9257
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/pipeline.go
@@ -0,0 +1,180 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"bytes"
+	"context"
+	"errors"
+	"io/ioutil"
+	"sync"
+	"time"
+
+	stats "go.etcd.io/etcd/etcdserver/api/v2stats"
+	"go.etcd.io/etcd/pkg/pbutil"
+	"go.etcd.io/etcd/pkg/types"
+	"go.etcd.io/etcd/raft"
+	"go.etcd.io/etcd/raft/raftpb"
+
+	"go.uber.org/zap"
+)
+
+const (
+	connPerPipeline = 4
+	// pipelineBufSize is the size of pipeline buffer, which helps hold the
+	// temporary network latency.
+	// The size ensures that pipeline does not drop messages when the network
+	// is out of work for less than 1 second in good path.
+	pipelineBufSize = 64
+)
+
+var errStopped = errors.New("stopped")
+
+type pipeline struct {
+	peerID types.ID
+
+	tr     *Transport
+	picker *urlPicker
+	status *peerStatus
+	raft   Raft
+	errorc chan error
+	// deprecate when we depercate v2 API
+	followerStats *stats.FollowerStats
+
+	msgc chan raftpb.Message
+	// wait for the handling routines
+	wg    sync.WaitGroup
+	stopc chan struct{}
+}
+
+func (p *pipeline) start() {
+	p.stopc = make(chan struct{})
+	p.msgc = make(chan raftpb.Message, pipelineBufSize)
+	p.wg.Add(connPerPipeline)
+	for i := 0; i < connPerPipeline; i++ {
+		go p.handle()
+	}
+
+	if p.tr != nil && p.tr.Logger != nil {
+		p.tr.Logger.Info(
+			"started HTTP pipelining with remote peer",
+			zap.String("local-member-id", p.tr.ID.String()),
+			zap.String("remote-peer-id", p.peerID.String()),
+		)
+	} else {
+		plog.Infof("started HTTP pipelining with peer %s", p.peerID)
+	}
+}
+
+func (p *pipeline) stop() {
+	close(p.stopc)
+	p.wg.Wait()
+
+	if p.tr != nil && p.tr.Logger != nil {
+		p.tr.Logger.Info(
+			"stopped HTTP pipelining with remote peer",
+			zap.String("local-member-id", p.tr.ID.String()),
+			zap.String("remote-peer-id", p.peerID.String()),
+		)
+	} else {
+		plog.Infof("stopped HTTP pipelining with peer %s", p.peerID)
+	}
+}
+
+func (p *pipeline) handle() {
+	defer p.wg.Done()
+
+	for {
+		select {
+		case m := <-p.msgc:
+			start := time.Now()
+			err := p.post(pbutil.MustMarshal(&m))
+			end := time.Now()
+
+			if err != nil {
+				p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error())
+
+				if m.Type == raftpb.MsgApp && p.followerStats != nil {
+					p.followerStats.Fail()
+				}
+				p.raft.ReportUnreachable(m.To)
+				if isMsgSnap(m) {
+					p.raft.ReportSnapshot(m.To, raft.SnapshotFailure)
+				}
+				sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
+				continue
+			}
+
+			p.status.activate()
+			if m.Type == raftpb.MsgApp && p.followerStats != nil {
+				p.followerStats.Succ(end.Sub(start))
+			}
+			if isMsgSnap(m) {
+				p.raft.ReportSnapshot(m.To, raft.SnapshotFinish)
+			}
+			sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(m.Size()))
+		case <-p.stopc:
+			return
+		}
+	}
+}
+
+// post POSTs a data payload to a url. Returns nil if the POST succeeds,
+// error on any failure.
+func (p *pipeline) post(data []byte) (err error) {
+	u := p.picker.pick()
+	req := createPostRequest(u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.tr.ID, p.tr.ClusterID)
+
+	done := make(chan struct{}, 1)
+	ctx, cancel := context.WithCancel(context.Background())
+	req = req.WithContext(ctx)
+	go func() {
+		select {
+		case <-done:
+		case <-p.stopc:
+			waitSchedule()
+			cancel()
+		}
+	}()
+
+	resp, err := p.tr.pipelineRt.RoundTrip(req)
+	done <- struct{}{}
+	if err != nil {
+		p.picker.unreachable(u)
+		return err
+	}
+	defer resp.Body.Close()
+	b, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		p.picker.unreachable(u)
+		return err
+	}
+
+	err = checkPostResponse(resp, b, req, p.peerID)
+	if err != nil {
+		p.picker.unreachable(u)
+		// errMemberRemoved is a critical error since a removed member should
+		// always be stopped. So we use reportCriticalError to report it to errorc.
+		if err == errMemberRemoved {
+			reportCriticalError(err, p.errorc)
+		}
+		return err
+	}
+
+	return nil
+}
+
+// waitSchedule waits other goroutines to be scheduled for a while
+func waitSchedule() { time.Sleep(time.Millisecond) }
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/probing_status.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/probing_status.go
new file mode 100644
index 0000000..474d9a0
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/probing_status.go
@@ -0,0 +1,104 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"time"
+
+	"github.com/prometheus/client_golang/prometheus"
+	"github.com/xiang90/probing"
+	"go.uber.org/zap"
+)
+
+const (
+	// RoundTripperNameRaftMessage is the name of round-tripper that sends
+	// all other Raft messages, other than "snap.Message".
+	RoundTripperNameRaftMessage = "ROUND_TRIPPER_RAFT_MESSAGE"
+	// RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message.
+	RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT"
+)
+
+var (
+	// proberInterval must be shorter than read timeout.
+	// Or the connection will time-out.
+	proberInterval           = ConnReadTimeout - time.Second
+	statusMonitoringInterval = 30 * time.Second
+	statusErrorInterval      = 5 * time.Second
+)
+
+func addPeerToProber(lg *zap.Logger, p probing.Prober, id string, us []string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
+	hus := make([]string, len(us))
+	for i := range us {
+		hus[i] = us[i] + ProbingPrefix
+	}
+
+	p.AddHTTP(id, proberInterval, hus)
+
+	s, err := p.Status(id)
+	if err != nil {
+		if lg != nil {
+			lg.Warn("failed to add peer into prober", zap.String("remote-peer-id", id))
+		} else {
+			plog.Errorf("failed to add peer %s into prober", id)
+		}
+		return
+	}
+
+	go monitorProbingStatus(lg, s, id, roundTripperName, rttSecProm)
+}
+
+func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
+	// set the first interval short to log error early.
+	interval := statusErrorInterval
+	for {
+		select {
+		case <-time.After(interval):
+			if !s.Health() {
+				if lg != nil {
+					lg.Warn(
+						"prober detected unhealthy status",
+						zap.String("round-tripper-name", roundTripperName),
+						zap.String("remote-peer-id", id),
+						zap.Duration("rtt", s.SRTT()),
+						zap.Error(s.Err()),
+					)
+				} else {
+					plog.Warningf("health check for peer %s could not connect: %v", id, s.Err())
+				}
+				interval = statusErrorInterval
+			} else {
+				interval = statusMonitoringInterval
+			}
+			if s.ClockDiff() > time.Second {
+				if lg != nil {
+					lg.Warn(
+						"prober found high clock drift",
+						zap.String("round-tripper-name", roundTripperName),
+						zap.String("remote-peer-id", id),
+						zap.Duration("clock-drift", s.ClockDiff()),
+						zap.Duration("rtt", s.SRTT()),
+						zap.Error(s.Err()),
+					)
+				} else {
+					plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)
+				}
+			}
+			rttSecProm.WithLabelValues(id).Observe(s.SRTT().Seconds())
+
+		case <-s.StopNotify():
+			return
+		}
+	}
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/remote.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/remote.go
new file mode 100644
index 0000000..1ef2493
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/remote.go
@@ -0,0 +1,99 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"go.etcd.io/etcd/pkg/types"
+	"go.etcd.io/etcd/raft/raftpb"
+
+	"go.uber.org/zap"
+)
+
+type remote struct {
+	lg       *zap.Logger
+	localID  types.ID
+	id       types.ID
+	status   *peerStatus
+	pipeline *pipeline
+}
+
+func startRemote(tr *Transport, urls types.URLs, id types.ID) *remote {
+	picker := newURLPicker(urls)
+	status := newPeerStatus(tr.Logger, tr.ID, id)
+	pipeline := &pipeline{
+		peerID: id,
+		tr:     tr,
+		picker: picker,
+		status: status,
+		raft:   tr.Raft,
+		errorc: tr.ErrorC,
+	}
+	pipeline.start()
+
+	return &remote{
+		lg:       tr.Logger,
+		localID:  tr.ID,
+		id:       id,
+		status:   status,
+		pipeline: pipeline,
+	}
+}
+
+func (g *remote) send(m raftpb.Message) {
+	select {
+	case g.pipeline.msgc <- m:
+	default:
+		if g.status.isActive() {
+			if g.lg != nil {
+				g.lg.Warn(
+					"dropped internal Raft message since sending buffer is full (overloaded network)",
+					zap.String("message-type", m.Type.String()),
+					zap.String("local-member-id", g.localID.String()),
+					zap.String("from", types.ID(m.From).String()),
+					zap.String("remote-peer-id", g.id.String()),
+					zap.Bool("remote-peer-active", g.status.isActive()),
+				)
+			} else {
+				plog.MergeWarningf("dropped internal raft message to %s since sending buffer is full (bad/overloaded network)", g.id)
+			}
+		} else {
+			if g.lg != nil {
+				g.lg.Warn(
+					"dropped Raft message since sending buffer is full (overloaded network)",
+					zap.String("message-type", m.Type.String()),
+					zap.String("local-member-id", g.localID.String()),
+					zap.String("from", types.ID(m.From).String()),
+					zap.String("remote-peer-id", g.id.String()),
+					zap.Bool("remote-peer-active", g.status.isActive()),
+				)
+			} else {
+				plog.Debugf("dropped %s to %s since sending buffer is full", m.Type, g.id)
+			}
+		}
+		sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
+	}
+}
+
+func (g *remote) stop() {
+	g.pipeline.stop()
+}
+
+func (g *remote) Pause() {
+	g.stop()
+}
+
+func (g *remote) Resume() {
+	g.pipeline.start()
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/snapshot_sender.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/snapshot_sender.go
new file mode 100644
index 0000000..62efb0c
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/snapshot_sender.go
@@ -0,0 +1,207 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"bytes"
+	"context"
+	"io"
+	"io/ioutil"
+	"net/http"
+	"time"
+
+	"go.etcd.io/etcd/etcdserver/api/snap"
+	"go.etcd.io/etcd/pkg/httputil"
+	pioutil "go.etcd.io/etcd/pkg/ioutil"
+	"go.etcd.io/etcd/pkg/types"
+	"go.etcd.io/etcd/raft"
+
+	"github.com/dustin/go-humanize"
+	"go.uber.org/zap"
+)
+
+var (
+	// timeout for reading snapshot response body
+	snapResponseReadTimeout = 5 * time.Second
+)
+
+type snapshotSender struct {
+	from, to types.ID
+	cid      types.ID
+
+	tr     *Transport
+	picker *urlPicker
+	status *peerStatus
+	r      Raft
+	errorc chan error
+
+	stopc chan struct{}
+}
+
+func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *peerStatus) *snapshotSender {
+	return &snapshotSender{
+		from:   tr.ID,
+		to:     to,
+		cid:    tr.ClusterID,
+		tr:     tr,
+		picker: picker,
+		status: status,
+		r:      tr.Raft,
+		errorc: tr.ErrorC,
+		stopc:  make(chan struct{}),
+	}
+}
+
+func (s *snapshotSender) stop() { close(s.stopc) }
+
+func (s *snapshotSender) send(merged snap.Message) {
+	start := time.Now()
+
+	m := merged.Message
+	to := types.ID(m.To).String()
+
+	body := createSnapBody(s.tr.Logger, merged)
+	defer body.Close()
+
+	u := s.picker.pick()
+	req := createPostRequest(u, RaftSnapshotPrefix, body, "application/octet-stream", s.tr.URLs, s.from, s.cid)
+
+	if s.tr.Logger != nil {
+		s.tr.Logger.Info(
+			"sending database snapshot",
+			zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
+			zap.String("remote-peer-id", to),
+			zap.Int64("bytes", merged.TotalSize),
+			zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
+		)
+	} else {
+		plog.Infof("start to send database snapshot [index: %d, to %s]...", m.Snapshot.Metadata.Index, types.ID(m.To))
+	}
+
+	snapshotSendInflights.WithLabelValues(to).Inc()
+	defer func() {
+		snapshotSendInflights.WithLabelValues(to).Dec()
+	}()
+
+	err := s.post(req)
+	defer merged.CloseWithError(err)
+	if err != nil {
+		if s.tr.Logger != nil {
+			s.tr.Logger.Warn(
+				"failed to send database snapshot",
+				zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
+				zap.String("remote-peer-id", to),
+				zap.Int64("bytes", merged.TotalSize),
+				zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
+				zap.Error(err),
+			)
+		} else {
+			plog.Warningf("database snapshot [index: %d, to: %s] failed to be sent out (%v)", m.Snapshot.Metadata.Index, types.ID(m.To), err)
+		}
+
+		// errMemberRemoved is a critical error since a removed member should
+		// always be stopped. So we use reportCriticalError to report it to errorc.
+		if err == errMemberRemoved {
+			reportCriticalError(err, s.errorc)
+		}
+
+		s.picker.unreachable(u)
+		s.status.deactivate(failureType{source: sendSnap, action: "post"}, err.Error())
+		s.r.ReportUnreachable(m.To)
+		// report SnapshotFailure to raft state machine. After raft state
+		// machine knows about it, it would pause a while and retry sending
+		// new snapshot message.
+		s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
+		sentFailures.WithLabelValues(to).Inc()
+		snapshotSendFailures.WithLabelValues(to).Inc()
+		return
+	}
+	s.status.activate()
+	s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
+
+	if s.tr.Logger != nil {
+		s.tr.Logger.Info(
+			"sent database snapshot",
+			zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
+			zap.String("remote-peer-id", to),
+			zap.Int64("bytes", merged.TotalSize),
+			zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
+		)
+	} else {
+		plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
+	}
+
+	sentBytes.WithLabelValues(to).Add(float64(merged.TotalSize))
+	snapshotSend.WithLabelValues(to).Inc()
+	snapshotSendSeconds.WithLabelValues(to).Observe(time.Since(start).Seconds())
+}
+
+// post posts the given request.
+// It returns nil when request is sent out and processed successfully.
+func (s *snapshotSender) post(req *http.Request) (err error) {
+	ctx, cancel := context.WithCancel(context.Background())
+	req = req.WithContext(ctx)
+	defer cancel()
+
+	type responseAndError struct {
+		resp *http.Response
+		body []byte
+		err  error
+	}
+	result := make(chan responseAndError, 1)
+
+	go func() {
+		resp, err := s.tr.pipelineRt.RoundTrip(req)
+		if err != nil {
+			result <- responseAndError{resp, nil, err}
+			return
+		}
+
+		// close the response body when timeouts.
+		// prevents from reading the body forever when the other side dies right after
+		// successfully receives the request body.
+		time.AfterFunc(snapResponseReadTimeout, func() { httputil.GracefulClose(resp) })
+		body, err := ioutil.ReadAll(resp.Body)
+		result <- responseAndError{resp, body, err}
+	}()
+
+	select {
+	case <-s.stopc:
+		return errStopped
+	case r := <-result:
+		if r.err != nil {
+			return r.err
+		}
+		return checkPostResponse(r.resp, r.body, req, s.to)
+	}
+}
+
+func createSnapBody(lg *zap.Logger, merged snap.Message) io.ReadCloser {
+	buf := new(bytes.Buffer)
+	enc := &messageEncoder{w: buf}
+	// encode raft message
+	if err := enc.encode(&merged.Message); err != nil {
+		if lg != nil {
+			lg.Panic("failed to encode message", zap.Error(err))
+		} else {
+			plog.Panicf("encode message error (%v)", err)
+		}
+	}
+
+	return &pioutil.ReaderAndCloser{
+		Reader: io.MultiReader(buf, merged.ReadCloser),
+		Closer: merged.ReadCloser,
+	}
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/stream.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/stream.go
new file mode 100644
index 0000000..dcb2223
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/stream.go
@@ -0,0 +1,746 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"context"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"net/http"
+	"path"
+	"strings"
+	"sync"
+	"time"
+
+	stats "go.etcd.io/etcd/etcdserver/api/v2stats"
+	"go.etcd.io/etcd/pkg/httputil"
+	"go.etcd.io/etcd/pkg/transport"
+	"go.etcd.io/etcd/pkg/types"
+	"go.etcd.io/etcd/raft/raftpb"
+	"go.etcd.io/etcd/version"
+
+	"github.com/coreos/go-semver/semver"
+	"go.uber.org/zap"
+	"golang.org/x/time/rate"
+)
+
+const (
+	streamTypeMessage  streamType = "message"
+	streamTypeMsgAppV2 streamType = "msgappv2"
+
+	streamBufSize = 4096
+)
+
+var (
+	errUnsupportedStreamType = fmt.Errorf("unsupported stream type")
+
+	// the key is in string format "major.minor.patch"
+	supportedStream = map[string][]streamType{
+		"2.0.0": {},
+		"2.1.0": {streamTypeMsgAppV2, streamTypeMessage},
+		"2.2.0": {streamTypeMsgAppV2, streamTypeMessage},
+		"2.3.0": {streamTypeMsgAppV2, streamTypeMessage},
+		"3.0.0": {streamTypeMsgAppV2, streamTypeMessage},
+		"3.1.0": {streamTypeMsgAppV2, streamTypeMessage},
+		"3.2.0": {streamTypeMsgAppV2, streamTypeMessage},
+		"3.3.0": {streamTypeMsgAppV2, streamTypeMessage},
+	}
+)
+
+type streamType string
+
+func (t streamType) endpoint() string {
+	switch t {
+	case streamTypeMsgAppV2:
+		return path.Join(RaftStreamPrefix, "msgapp")
+	case streamTypeMessage:
+		return path.Join(RaftStreamPrefix, "message")
+	default:
+		plog.Panicf("unhandled stream type %v", t)
+		return ""
+	}
+}
+
+func (t streamType) String() string {
+	switch t {
+	case streamTypeMsgAppV2:
+		return "stream MsgApp v2"
+	case streamTypeMessage:
+		return "stream Message"
+	default:
+		return "unknown stream"
+	}
+}
+
+var (
+	// linkHeartbeatMessage is a special message used as heartbeat message in
+	// link layer. It never conflicts with messages from raft because raft
+	// doesn't send out messages without From and To fields.
+	linkHeartbeatMessage = raftpb.Message{Type: raftpb.MsgHeartbeat}
+)
+
+func isLinkHeartbeatMessage(m *raftpb.Message) bool {
+	return m.Type == raftpb.MsgHeartbeat && m.From == 0 && m.To == 0
+}
+
+type outgoingConn struct {
+	t streamType
+	io.Writer
+	http.Flusher
+	io.Closer
+
+	localID types.ID
+	peerID  types.ID
+}
+
+// streamWriter writes messages to the attached outgoingConn.
+type streamWriter struct {
+	lg *zap.Logger
+
+	localID types.ID
+	peerID  types.ID
+
+	status *peerStatus
+	fs     *stats.FollowerStats
+	r      Raft
+
+	mu      sync.Mutex // guard field working and closer
+	closer  io.Closer
+	working bool
+
+	msgc  chan raftpb.Message
+	connc chan *outgoingConn
+	stopc chan struct{}
+	done  chan struct{}
+}
+
+// startStreamWriter creates a streamWrite and starts a long running go-routine that accepts
+// messages and writes to the attached outgoing connection.
+func startStreamWriter(lg *zap.Logger, local, id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
+	w := &streamWriter{
+		lg: lg,
+
+		localID: local,
+		peerID:  id,
+
+		status: status,
+		fs:     fs,
+		r:      r,
+		msgc:   make(chan raftpb.Message, streamBufSize),
+		connc:  make(chan *outgoingConn),
+		stopc:  make(chan struct{}),
+		done:   make(chan struct{}),
+	}
+	go w.run()
+	return w
+}
+
+func (cw *streamWriter) run() {
+	var (
+		msgc       chan raftpb.Message
+		heartbeatc <-chan time.Time
+		t          streamType
+		enc        encoder
+		flusher    http.Flusher
+		batched    int
+	)
+	tickc := time.NewTicker(ConnReadTimeout / 3)
+	defer tickc.Stop()
+	unflushed := 0
+
+	if cw.lg != nil {
+		cw.lg.Info(
+			"started stream writer with remote peer",
+			zap.String("local-member-id", cw.localID.String()),
+			zap.String("remote-peer-id", cw.peerID.String()),
+		)
+	} else {
+		plog.Infof("started streaming with peer %s (writer)", cw.peerID)
+	}
+
+	for {
+		select {
+		case <-heartbeatc:
+			err := enc.encode(&linkHeartbeatMessage)
+			unflushed += linkHeartbeatMessage.Size()
+			if err == nil {
+				flusher.Flush()
+				batched = 0
+				sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
+				unflushed = 0
+				continue
+			}
+
+			cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
+
+			sentFailures.WithLabelValues(cw.peerID.String()).Inc()
+			cw.close()
+			if cw.lg != nil {
+				cw.lg.Warn(
+					"lost TCP streaming connection with remote peer",
+					zap.String("stream-writer-type", t.String()),
+					zap.String("local-member-id", cw.localID.String()),
+					zap.String("remote-peer-id", cw.peerID.String()),
+				)
+			} else {
+				plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
+			}
+			heartbeatc, msgc = nil, nil
+
+		case m := <-msgc:
+			err := enc.encode(&m)
+			if err == nil {
+				unflushed += m.Size()
+
+				if len(msgc) == 0 || batched > streamBufSize/2 {
+					flusher.Flush()
+					sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
+					unflushed = 0
+					batched = 0
+				} else {
+					batched++
+				}
+
+				continue
+			}
+
+			cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
+			cw.close()
+			if cw.lg != nil {
+				cw.lg.Warn(
+					"lost TCP streaming connection with remote peer",
+					zap.String("stream-writer-type", t.String()),
+					zap.String("local-member-id", cw.localID.String()),
+					zap.String("remote-peer-id", cw.peerID.String()),
+				)
+			} else {
+				plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
+			}
+			heartbeatc, msgc = nil, nil
+			cw.r.ReportUnreachable(m.To)
+			sentFailures.WithLabelValues(cw.peerID.String()).Inc()
+
+		case conn := <-cw.connc:
+			cw.mu.Lock()
+			closed := cw.closeUnlocked()
+			t = conn.t
+			switch conn.t {
+			case streamTypeMsgAppV2:
+				enc = newMsgAppV2Encoder(conn.Writer, cw.fs)
+			case streamTypeMessage:
+				enc = &messageEncoder{w: conn.Writer}
+			default:
+				plog.Panicf("unhandled stream type %s", conn.t)
+			}
+			if cw.lg != nil {
+				cw.lg.Info(
+					"set message encoder",
+					zap.String("from", conn.localID.String()),
+					zap.String("to", conn.peerID.String()),
+					zap.String("stream-type", t.String()),
+				)
+			}
+			flusher = conn.Flusher
+			unflushed = 0
+			cw.status.activate()
+			cw.closer = conn.Closer
+			cw.working = true
+			cw.mu.Unlock()
+
+			if closed {
+				if cw.lg != nil {
+					cw.lg.Warn(
+						"closed TCP streaming connection with remote peer",
+						zap.String("stream-writer-type", t.String()),
+						zap.String("local-member-id", cw.localID.String()),
+						zap.String("remote-peer-id", cw.peerID.String()),
+					)
+				} else {
+					plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
+				}
+			}
+			if cw.lg != nil {
+				cw.lg.Warn(
+					"established TCP streaming connection with remote peer",
+					zap.String("stream-writer-type", t.String()),
+					zap.String("local-member-id", cw.localID.String()),
+					zap.String("remote-peer-id", cw.peerID.String()),
+				)
+			} else {
+				plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
+			}
+			heartbeatc, msgc = tickc.C, cw.msgc
+
+		case <-cw.stopc:
+			if cw.close() {
+				if cw.lg != nil {
+					cw.lg.Warn(
+						"closed TCP streaming connection with remote peer",
+						zap.String("stream-writer-type", t.String()),
+						zap.String("remote-peer-id", cw.peerID.String()),
+					)
+				} else {
+					plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
+				}
+			}
+			if cw.lg != nil {
+				cw.lg.Warn(
+					"stopped TCP streaming connection with remote peer",
+					zap.String("stream-writer-type", t.String()),
+					zap.String("remote-peer-id", cw.peerID.String()),
+				)
+			} else {
+				plog.Infof("stopped streaming with peer %s (writer)", cw.peerID)
+			}
+			close(cw.done)
+			return
+		}
+	}
+}
+
+func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) {
+	cw.mu.Lock()
+	defer cw.mu.Unlock()
+	return cw.msgc, cw.working
+}
+
+func (cw *streamWriter) close() bool {
+	cw.mu.Lock()
+	defer cw.mu.Unlock()
+	return cw.closeUnlocked()
+}
+
+func (cw *streamWriter) closeUnlocked() bool {
+	if !cw.working {
+		return false
+	}
+	if err := cw.closer.Close(); err != nil {
+		if cw.lg != nil {
+			cw.lg.Warn(
+				"failed to close connection with remote peer",
+				zap.String("remote-peer-id", cw.peerID.String()),
+				zap.Error(err),
+			)
+		} else {
+			plog.Errorf("peer %s (writer) connection close error: %v", cw.peerID, err)
+		}
+	}
+	if len(cw.msgc) > 0 {
+		cw.r.ReportUnreachable(uint64(cw.peerID))
+	}
+	cw.msgc = make(chan raftpb.Message, streamBufSize)
+	cw.working = false
+	return true
+}
+
+func (cw *streamWriter) attach(conn *outgoingConn) bool {
+	select {
+	case cw.connc <- conn:
+		return true
+	case <-cw.done:
+		return false
+	}
+}
+
+func (cw *streamWriter) stop() {
+	close(cw.stopc)
+	<-cw.done
+}
+
+// streamReader is a long-running go-routine that dials to the remote stream
+// endpoint and reads messages from the response body returned.
+type streamReader struct {
+	lg *zap.Logger
+
+	peerID types.ID
+	typ    streamType
+
+	tr     *Transport
+	picker *urlPicker
+	status *peerStatus
+	recvc  chan<- raftpb.Message
+	propc  chan<- raftpb.Message
+
+	rl *rate.Limiter // alters the frequency of dial retrial attempts
+
+	errorc chan<- error
+
+	mu     sync.Mutex
+	paused bool
+	closer io.Closer
+
+	ctx    context.Context
+	cancel context.CancelFunc
+	done   chan struct{}
+}
+
+func (cr *streamReader) start() {
+	cr.done = make(chan struct{})
+	if cr.errorc == nil {
+		cr.errorc = cr.tr.ErrorC
+	}
+	if cr.ctx == nil {
+		cr.ctx, cr.cancel = context.WithCancel(context.Background())
+	}
+	go cr.run()
+}
+
+func (cr *streamReader) run() {
+	t := cr.typ
+
+	if cr.lg != nil {
+		cr.lg.Info(
+			"started stream reader with remote peer",
+			zap.String("stream-reader-type", t.String()),
+			zap.String("local-member-id", cr.tr.ID.String()),
+			zap.String("remote-peer-id", cr.peerID.String()),
+		)
+	} else {
+		plog.Infof("started streaming with peer %s (%s reader)", cr.peerID, t)
+	}
+
+	for {
+		rc, err := cr.dial(t)
+		if err != nil {
+			if err != errUnsupportedStreamType {
+				cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error())
+			}
+		} else {
+			cr.status.activate()
+			if cr.lg != nil {
+				cr.lg.Info(
+					"established TCP streaming connection with remote peer",
+					zap.String("stream-reader-type", cr.typ.String()),
+					zap.String("local-member-id", cr.tr.ID.String()),
+					zap.String("remote-peer-id", cr.peerID.String()),
+				)
+			} else {
+				plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
+			}
+			err = cr.decodeLoop(rc, t)
+			if cr.lg != nil {
+				cr.lg.Warn(
+					"lost TCP streaming connection with remote peer",
+					zap.String("stream-reader-type", cr.typ.String()),
+					zap.String("local-member-id", cr.tr.ID.String()),
+					zap.String("remote-peer-id", cr.peerID.String()),
+					zap.Error(err),
+				)
+			} else {
+				plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
+			}
+			switch {
+			// all data is read out
+			case err == io.EOF:
+			// connection is closed by the remote
+			case transport.IsClosedConnError(err):
+			default:
+				cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
+			}
+		}
+		// Wait for a while before new dial attempt
+		err = cr.rl.Wait(cr.ctx)
+		if cr.ctx.Err() != nil {
+			if cr.lg != nil {
+				cr.lg.Info(
+					"stopped stream reader with remote peer",
+					zap.String("stream-reader-type", t.String()),
+					zap.String("local-member-id", cr.tr.ID.String()),
+					zap.String("remote-peer-id", cr.peerID.String()),
+				)
+			} else {
+				plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
+			}
+			close(cr.done)
+			return
+		}
+		if err != nil {
+			if cr.lg != nil {
+				cr.lg.Warn(
+					"rate limit on stream reader with remote peer",
+					zap.String("stream-reader-type", t.String()),
+					zap.String("local-member-id", cr.tr.ID.String()),
+					zap.String("remote-peer-id", cr.peerID.String()),
+					zap.Error(err),
+				)
+			} else {
+				plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err)
+			}
+		}
+	}
+}
+
+func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
+	var dec decoder
+	cr.mu.Lock()
+	switch t {
+	case streamTypeMsgAppV2:
+		dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
+	case streamTypeMessage:
+		dec = &messageDecoder{r: rc}
+	default:
+		if cr.lg != nil {
+			cr.lg.Panic("unknown stream type", zap.String("type", t.String()))
+		} else {
+			plog.Panicf("unhandled stream type %s", t)
+		}
+	}
+	select {
+	case <-cr.ctx.Done():
+		cr.mu.Unlock()
+		if err := rc.Close(); err != nil {
+			return err
+		}
+		return io.EOF
+	default:
+		cr.closer = rc
+	}
+	cr.mu.Unlock()
+
+	// gofail: labelRaftDropHeartbeat:
+	for {
+		m, err := dec.decode()
+		if err != nil {
+			cr.mu.Lock()
+			cr.close()
+			cr.mu.Unlock()
+			return err
+		}
+
+		// gofail-go: var raftDropHeartbeat struct{}
+		// continue labelRaftDropHeartbeat
+		receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
+
+		cr.mu.Lock()
+		paused := cr.paused
+		cr.mu.Unlock()
+
+		if paused {
+			continue
+		}
+
+		if isLinkHeartbeatMessage(&m) {
+			// raft is not interested in link layer
+			// heartbeat message, so we should ignore
+			// it.
+			continue
+		}
+
+		recvc := cr.recvc
+		if m.Type == raftpb.MsgProp {
+			recvc = cr.propc
+		}
+
+		select {
+		case recvc <- m:
+		default:
+			if cr.status.isActive() {
+				if cr.lg != nil {
+					cr.lg.Warn(
+						"dropped internal Raft message since receiving buffer is full (overloaded network)",
+						zap.String("message-type", m.Type.String()),
+						zap.String("local-member-id", cr.tr.ID.String()),
+						zap.String("from", types.ID(m.From).String()),
+						zap.String("remote-peer-id", types.ID(m.To).String()),
+						zap.Bool("remote-peer-active", cr.status.isActive()),
+					)
+				} else {
+					plog.MergeWarningf("dropped internal raft message from %s since receiving buffer is full (overloaded network)", types.ID(m.From))
+				}
+			} else {
+				if cr.lg != nil {
+					cr.lg.Warn(
+						"dropped Raft message since receiving buffer is full (overloaded network)",
+						zap.String("message-type", m.Type.String()),
+						zap.String("local-member-id", cr.tr.ID.String()),
+						zap.String("from", types.ID(m.From).String()),
+						zap.String("remote-peer-id", types.ID(m.To).String()),
+						zap.Bool("remote-peer-active", cr.status.isActive()),
+					)
+				} else {
+					plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
+				}
+			}
+			recvFailures.WithLabelValues(types.ID(m.From).String()).Inc()
+		}
+	}
+}
+
+func (cr *streamReader) stop() {
+	cr.mu.Lock()
+	cr.cancel()
+	cr.close()
+	cr.mu.Unlock()
+	<-cr.done
+}
+
+func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
+	u := cr.picker.pick()
+	uu := u
+	uu.Path = path.Join(t.endpoint(), cr.tr.ID.String())
+
+	if cr.lg != nil {
+		cr.lg.Debug(
+			"dial stream reader",
+			zap.String("from", cr.tr.ID.String()),
+			zap.String("to", cr.peerID.String()),
+			zap.String("address", uu.String()),
+		)
+	}
+	req, err := http.NewRequest("GET", uu.String(), nil)
+	if err != nil {
+		cr.picker.unreachable(u)
+		return nil, fmt.Errorf("failed to make http request to %v (%v)", u, err)
+	}
+	req.Header.Set("X-Server-From", cr.tr.ID.String())
+	req.Header.Set("X-Server-Version", version.Version)
+	req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
+	req.Header.Set("X-Etcd-Cluster-ID", cr.tr.ClusterID.String())
+	req.Header.Set("X-Raft-To", cr.peerID.String())
+
+	setPeerURLsHeader(req, cr.tr.URLs)
+
+	req = req.WithContext(cr.ctx)
+
+	cr.mu.Lock()
+	select {
+	case <-cr.ctx.Done():
+		cr.mu.Unlock()
+		return nil, fmt.Errorf("stream reader is stopped")
+	default:
+	}
+	cr.mu.Unlock()
+
+	resp, err := cr.tr.streamRt.RoundTrip(req)
+	if err != nil {
+		cr.picker.unreachable(u)
+		return nil, err
+	}
+
+	rv := serverVersion(resp.Header)
+	lv := semver.Must(semver.NewVersion(version.Version))
+	if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) {
+		httputil.GracefulClose(resp)
+		cr.picker.unreachable(u)
+		return nil, errUnsupportedStreamType
+	}
+
+	switch resp.StatusCode {
+	case http.StatusGone:
+		httputil.GracefulClose(resp)
+		cr.picker.unreachable(u)
+		reportCriticalError(errMemberRemoved, cr.errorc)
+		return nil, errMemberRemoved
+
+	case http.StatusOK:
+		return resp.Body, nil
+
+	case http.StatusNotFound:
+		httputil.GracefulClose(resp)
+		cr.picker.unreachable(u)
+		return nil, fmt.Errorf("peer %s failed to find local node %s", cr.peerID, cr.tr.ID)
+
+	case http.StatusPreconditionFailed:
+		b, err := ioutil.ReadAll(resp.Body)
+		if err != nil {
+			cr.picker.unreachable(u)
+			return nil, err
+		}
+		httputil.GracefulClose(resp)
+		cr.picker.unreachable(u)
+
+		switch strings.TrimSuffix(string(b), "\n") {
+		case errIncompatibleVersion.Error():
+			if cr.lg != nil {
+				cr.lg.Warn(
+					"request sent was ignored by remote peer due to server version incompatibility",
+					zap.String("local-member-id", cr.tr.ID.String()),
+					zap.String("remote-peer-id", cr.peerID.String()),
+					zap.Error(errIncompatibleVersion),
+				)
+			} else {
+				plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.peerID)
+			}
+			return nil, errIncompatibleVersion
+
+		case errClusterIDMismatch.Error():
+			if cr.lg != nil {
+				cr.lg.Warn(
+					"request sent was ignored by remote peer due to cluster ID mismatch",
+					zap.String("remote-peer-id", cr.peerID.String()),
+					zap.String("remote-peer-cluster-id", resp.Header.Get("X-Etcd-Cluster-ID")),
+					zap.String("local-member-id", cr.tr.ID.String()),
+					zap.String("local-member-cluster-id", cr.tr.ClusterID.String()),
+					zap.Error(errClusterIDMismatch),
+				)
+			} else {
+				plog.Errorf("request sent was ignored (cluster ID mismatch: peer[%s]=%s, local=%s)",
+					cr.peerID, resp.Header.Get("X-Etcd-Cluster-ID"), cr.tr.ClusterID)
+			}
+			return nil, errClusterIDMismatch
+
+		default:
+			return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b))
+		}
+
+	default:
+		httputil.GracefulClose(resp)
+		cr.picker.unreachable(u)
+		return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
+	}
+}
+
+func (cr *streamReader) close() {
+	if cr.closer != nil {
+		if err := cr.closer.Close(); err != nil {
+			if cr.lg != nil {
+				cr.lg.Warn(
+					"failed to close remote peer connection",
+					zap.String("local-member-id", cr.tr.ID.String()),
+					zap.String("remote-peer-id", cr.peerID.String()),
+					zap.Error(err),
+				)
+			} else {
+				plog.Errorf("peer %s (reader) connection close error: %v", cr.peerID, err)
+			}
+		}
+	}
+	cr.closer = nil
+}
+
+func (cr *streamReader) pause() {
+	cr.mu.Lock()
+	defer cr.mu.Unlock()
+	cr.paused = true
+}
+
+func (cr *streamReader) resume() {
+	cr.mu.Lock()
+	defer cr.mu.Unlock()
+	cr.paused = false
+}
+
+// checkStreamSupport checks whether the stream type is supported in the
+// given version.
+func checkStreamSupport(v *semver.Version, t streamType) bool {
+	nv := &semver.Version{Major: v.Major, Minor: v.Minor}
+	for _, s := range supportedStream[nv.String()] {
+		if s == t {
+			return true
+		}
+	}
+	return false
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/transport.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/transport.go
new file mode 100644
index 0000000..7191c3d
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/transport.go
@@ -0,0 +1,467 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"context"
+	"net/http"
+	"sync"
+	"time"
+
+	"go.etcd.io/etcd/etcdserver/api/snap"
+	stats "go.etcd.io/etcd/etcdserver/api/v2stats"
+	"go.etcd.io/etcd/pkg/logutil"
+	"go.etcd.io/etcd/pkg/transport"
+	"go.etcd.io/etcd/pkg/types"
+	"go.etcd.io/etcd/raft"
+	"go.etcd.io/etcd/raft/raftpb"
+
+	"github.com/coreos/pkg/capnslog"
+	"github.com/xiang90/probing"
+	"go.uber.org/zap"
+	"golang.org/x/time/rate"
+)
+
+var plog = logutil.NewMergeLogger(capnslog.NewPackageLogger("go.etcd.io/etcd", "rafthttp"))
+
+type Raft interface {
+	Process(ctx context.Context, m raftpb.Message) error
+	IsIDRemoved(id uint64) bool
+	ReportUnreachable(id uint64)
+	ReportSnapshot(id uint64, status raft.SnapshotStatus)
+}
+
+type Transporter interface {
+	// Start starts the given Transporter.
+	// Start MUST be called before calling other functions in the interface.
+	Start() error
+	// Handler returns the HTTP handler of the transporter.
+	// A transporter HTTP handler handles the HTTP requests
+	// from remote peers.
+	// The handler MUST be used to handle RaftPrefix(/raft)
+	// endpoint.
+	Handler() http.Handler
+	// Send sends out the given messages to the remote peers.
+	// Each message has a To field, which is an id that maps
+	// to an existing peer in the transport.
+	// If the id cannot be found in the transport, the message
+	// will be ignored.
+	Send(m []raftpb.Message)
+	// SendSnapshot sends out the given snapshot message to a remote peer.
+	// The behavior of SendSnapshot is similar to Send.
+	SendSnapshot(m snap.Message)
+	// AddRemote adds a remote with given peer urls into the transport.
+	// A remote helps newly joined member to catch up the progress of cluster,
+	// and will not be used after that.
+	// It is the caller's responsibility to ensure the urls are all valid,
+	// or it panics.
+	AddRemote(id types.ID, urls []string)
+	// AddPeer adds a peer with given peer urls into the transport.
+	// It is the caller's responsibility to ensure the urls are all valid,
+	// or it panics.
+	// Peer urls are used to connect to the remote peer.
+	AddPeer(id types.ID, urls []string)
+	// RemovePeer removes the peer with given id.
+	RemovePeer(id types.ID)
+	// RemoveAllPeers removes all the existing peers in the transport.
+	RemoveAllPeers()
+	// UpdatePeer updates the peer urls of the peer with the given id.
+	// It is the caller's responsibility to ensure the urls are all valid,
+	// or it panics.
+	UpdatePeer(id types.ID, urls []string)
+	// ActiveSince returns the time that the connection with the peer
+	// of the given id becomes active.
+	// If the connection is active since peer was added, it returns the adding time.
+	// If the connection is currently inactive, it returns zero time.
+	ActiveSince(id types.ID) time.Time
+	// ActivePeers returns the number of active peers.
+	ActivePeers() int
+	// Stop closes the connections and stops the transporter.
+	Stop()
+}
+
+// Transport implements Transporter interface. It provides the functionality
+// to send raft messages to peers, and receive raft messages from peers.
+// User should call Handler method to get a handler to serve requests
+// received from peerURLs.
+// User needs to call Start before calling other functions, and call
+// Stop when the Transport is no longer used.
+type Transport struct {
+	Logger *zap.Logger
+
+	DialTimeout time.Duration // maximum duration before timing out dial of the request
+	// DialRetryFrequency defines the frequency of streamReader dial retrial attempts;
+	// a distinct rate limiter is created per every peer (default value: 10 events/sec)
+	DialRetryFrequency rate.Limit
+
+	TLSInfo transport.TLSInfo // TLS information used when creating connection
+
+	ID          types.ID   // local member ID
+	URLs        types.URLs // local peer URLs
+	ClusterID   types.ID   // raft cluster ID for request validation
+	Raft        Raft       // raft state machine, to which the Transport forwards received messages and reports status
+	Snapshotter *snap.Snapshotter
+	ServerStats *stats.ServerStats // used to record general transportation statistics
+	// used to record transportation statistics with followers when
+	// performing as leader in raft protocol
+	LeaderStats *stats.LeaderStats
+	// ErrorC is used to report detected critical errors, e.g.,
+	// the member has been permanently removed from the cluster
+	// When an error is received from ErrorC, user should stop raft state
+	// machine and thus stop the Transport.
+	ErrorC chan error
+
+	streamRt   http.RoundTripper // roundTripper used by streams
+	pipelineRt http.RoundTripper // roundTripper used by pipelines
+
+	mu      sync.RWMutex         // protect the remote and peer map
+	remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
+	peers   map[types.ID]Peer    // peers map
+
+	pipelineProber probing.Prober
+	streamProber   probing.Prober
+}
+
+func (t *Transport) Start() error {
+	var err error
+	t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)
+	if err != nil {
+		return err
+	}
+	t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)
+	if err != nil {
+		return err
+	}
+	t.remotes = make(map[types.ID]*remote)
+	t.peers = make(map[types.ID]Peer)
+	t.pipelineProber = probing.NewProber(t.pipelineRt)
+	t.streamProber = probing.NewProber(t.streamRt)
+
+	// If client didn't provide dial retry frequency, use the default
+	// (100ms backoff between attempts to create a new stream),
+	// so it doesn't bring too much overhead when retry.
+	if t.DialRetryFrequency == 0 {
+		t.DialRetryFrequency = rate.Every(100 * time.Millisecond)
+	}
+	return nil
+}
+
+func (t *Transport) Handler() http.Handler {
+	pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
+	streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
+	snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
+	mux := http.NewServeMux()
+	mux.Handle(RaftPrefix, pipelineHandler)
+	mux.Handle(RaftStreamPrefix+"/", streamHandler)
+	mux.Handle(RaftSnapshotPrefix, snapHandler)
+	mux.Handle(ProbingPrefix, probing.NewHandler())
+	return mux
+}
+
+func (t *Transport) Get(id types.ID) Peer {
+	t.mu.RLock()
+	defer t.mu.RUnlock()
+	return t.peers[id]
+}
+
+func (t *Transport) Send(msgs []raftpb.Message) {
+	for _, m := range msgs {
+		if m.To == 0 {
+			// ignore intentionally dropped message
+			continue
+		}
+		to := types.ID(m.To)
+
+		t.mu.RLock()
+		p, pok := t.peers[to]
+		g, rok := t.remotes[to]
+		t.mu.RUnlock()
+
+		if pok {
+			if m.Type == raftpb.MsgApp {
+				t.ServerStats.SendAppendReq(m.Size())
+			}
+			p.send(m)
+			continue
+		}
+
+		if rok {
+			g.send(m)
+			continue
+		}
+
+		if t.Logger != nil {
+			t.Logger.Debug(
+				"ignored message send request; unknown remote peer target",
+				zap.String("type", m.Type.String()),
+				zap.String("unknown-target-peer-id", to.String()),
+			)
+		} else {
+			plog.Debugf("ignored message %s (sent to unknown peer %s)", m.Type, to)
+		}
+	}
+}
+
+func (t *Transport) Stop() {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+	for _, r := range t.remotes {
+		r.stop()
+	}
+	for _, p := range t.peers {
+		p.stop()
+	}
+	t.pipelineProber.RemoveAll()
+	t.streamProber.RemoveAll()
+	if tr, ok := t.streamRt.(*http.Transport); ok {
+		tr.CloseIdleConnections()
+	}
+	if tr, ok := t.pipelineRt.(*http.Transport); ok {
+		tr.CloseIdleConnections()
+	}
+	t.peers = nil
+	t.remotes = nil
+}
+
+// CutPeer drops messages to the specified peer.
+func (t *Transport) CutPeer(id types.ID) {
+	t.mu.RLock()
+	p, pok := t.peers[id]
+	g, gok := t.remotes[id]
+	t.mu.RUnlock()
+
+	if pok {
+		p.(Pausable).Pause()
+	}
+	if gok {
+		g.Pause()
+	}
+}
+
+// MendPeer recovers the message dropping behavior of the given peer.
+func (t *Transport) MendPeer(id types.ID) {
+	t.mu.RLock()
+	p, pok := t.peers[id]
+	g, gok := t.remotes[id]
+	t.mu.RUnlock()
+
+	if pok {
+		p.(Pausable).Resume()
+	}
+	if gok {
+		g.Resume()
+	}
+}
+
+func (t *Transport) AddRemote(id types.ID, us []string) {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+	if t.remotes == nil {
+		// there's no clean way to shutdown the golang http server
+		// (see: https://github.com/golang/go/issues/4674) before
+		// stopping the transport; ignore any new connections.
+		return
+	}
+	if _, ok := t.peers[id]; ok {
+		return
+	}
+	if _, ok := t.remotes[id]; ok {
+		return
+	}
+	urls, err := types.NewURLs(us)
+	if err != nil {
+		if t.Logger != nil {
+			t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err))
+		} else {
+			plog.Panicf("newURLs %+v should never fail: %+v", us, err)
+		}
+	}
+	t.remotes[id] = startRemote(t, urls, id)
+
+	if t.Logger != nil {
+		t.Logger.Info(
+			"added new remote peer",
+			zap.String("local-member-id", t.ID.String()),
+			zap.String("remote-peer-id", id.String()),
+			zap.Strings("remote-peer-urls", us),
+		)
+	}
+}
+
+func (t *Transport) AddPeer(id types.ID, us []string) {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+
+	if t.peers == nil {
+		panic("transport stopped")
+	}
+	if _, ok := t.peers[id]; ok {
+		return
+	}
+	urls, err := types.NewURLs(us)
+	if err != nil {
+		if t.Logger != nil {
+			t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err))
+		} else {
+			plog.Panicf("newURLs %+v should never fail: %+v", us, err)
+		}
+	}
+	fs := t.LeaderStats.Follower(id.String())
+	t.peers[id] = startPeer(t, urls, id, fs)
+	addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec)
+	addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec)
+
+	if t.Logger != nil {
+		t.Logger.Info(
+			"added remote peer",
+			zap.String("local-member-id", t.ID.String()),
+			zap.String("remote-peer-id", id.String()),
+			zap.Strings("remote-peer-urls", us),
+		)
+	} else {
+		plog.Infof("added peer %s", id)
+	}
+}
+
+func (t *Transport) RemovePeer(id types.ID) {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+	t.removePeer(id)
+}
+
+func (t *Transport) RemoveAllPeers() {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+	for id := range t.peers {
+		t.removePeer(id)
+	}
+}
+
+// the caller of this function must have the peers mutex.
+func (t *Transport) removePeer(id types.ID) {
+	if peer, ok := t.peers[id]; ok {
+		peer.stop()
+	} else {
+		if t.Logger != nil {
+			t.Logger.Panic("unexpected removal of unknown remote peer", zap.String("remote-peer-id", id.String()))
+		} else {
+			plog.Panicf("unexpected removal of unknown peer '%d'", id)
+		}
+	}
+	delete(t.peers, id)
+	delete(t.LeaderStats.Followers, id.String())
+	t.pipelineProber.Remove(id.String())
+	t.streamProber.Remove(id.String())
+
+	if t.Logger != nil {
+		t.Logger.Info(
+			"removed remote peer",
+			zap.String("local-member-id", t.ID.String()),
+			zap.String("removed-remote-peer-id", id.String()),
+		)
+	} else {
+		plog.Infof("removed peer %s", id)
+	}
+}
+
+func (t *Transport) UpdatePeer(id types.ID, us []string) {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+	// TODO: return error or just panic?
+	if _, ok := t.peers[id]; !ok {
+		return
+	}
+	urls, err := types.NewURLs(us)
+	if err != nil {
+		if t.Logger != nil {
+			t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err))
+		} else {
+			plog.Panicf("newURLs %+v should never fail: %+v", us, err)
+		}
+	}
+	t.peers[id].update(urls)
+
+	t.pipelineProber.Remove(id.String())
+	addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec)
+	t.streamProber.Remove(id.String())
+	addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec)
+
+	if t.Logger != nil {
+		t.Logger.Info(
+			"updated remote peer",
+			zap.String("local-member-id", t.ID.String()),
+			zap.String("updated-remote-peer-id", id.String()),
+			zap.Strings("updated-remote-peer-urls", us),
+		)
+	} else {
+		plog.Infof("updated peer %s", id)
+	}
+}
+
+func (t *Transport) ActiveSince(id types.ID) time.Time {
+	t.mu.RLock()
+	defer t.mu.RUnlock()
+	if p, ok := t.peers[id]; ok {
+		return p.activeSince()
+	}
+	return time.Time{}
+}
+
+func (t *Transport) SendSnapshot(m snap.Message) {
+	t.mu.Lock()
+	defer t.mu.Unlock()
+	p := t.peers[types.ID(m.To)]
+	if p == nil {
+		m.CloseWithError(errMemberNotFound)
+		return
+	}
+	p.sendSnap(m)
+}
+
+// Pausable is a testing interface for pausing transport traffic.
+type Pausable interface {
+	Pause()
+	Resume()
+}
+
+func (t *Transport) Pause() {
+	t.mu.RLock()
+	defer t.mu.RUnlock()
+	for _, p := range t.peers {
+		p.(Pausable).Pause()
+	}
+}
+
+func (t *Transport) Resume() {
+	t.mu.RLock()
+	defer t.mu.RUnlock()
+	for _, p := range t.peers {
+		p.(Pausable).Resume()
+	}
+}
+
+// ActivePeers returns a channel that closes when an initial
+// peer connection has been established. Use this to wait until the
+// first peer connection becomes active.
+func (t *Transport) ActivePeers() (cnt int) {
+	t.mu.RLock()
+	defer t.mu.RUnlock()
+	for _, p := range t.peers {
+		if !p.activeSince().IsZero() {
+			cnt++
+		}
+	}
+	return cnt
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/urlpick.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/urlpick.go
new file mode 100644
index 0000000..61ef468
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/urlpick.go
@@ -0,0 +1,57 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"net/url"
+	"sync"
+
+	"go.etcd.io/etcd/pkg/types"
+)
+
+type urlPicker struct {
+	mu     sync.Mutex // guards urls and picked
+	urls   types.URLs
+	picked int
+}
+
+func newURLPicker(urls types.URLs) *urlPicker {
+	return &urlPicker{
+		urls: urls,
+	}
+}
+
+func (p *urlPicker) update(urls types.URLs) {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	p.urls = urls
+	p.picked = 0
+}
+
+func (p *urlPicker) pick() url.URL {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	return p.urls[p.picked]
+}
+
+// unreachable notices the picker that the given url is unreachable,
+// and it should use other possible urls.
+func (p *urlPicker) unreachable(u url.URL) {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	if u == p.urls[p.picked] {
+		p.picked = (p.picked + 1) % len(p.urls)
+	}
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/util.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/util.go
new file mode 100644
index 0000000..2093864
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/util.go
@@ -0,0 +1,190 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+	"fmt"
+	"io"
+	"net"
+	"net/http"
+	"net/url"
+	"strings"
+	"time"
+
+	"go.etcd.io/etcd/pkg/transport"
+	"go.etcd.io/etcd/pkg/types"
+	"go.etcd.io/etcd/version"
+
+	"github.com/coreos/go-semver/semver"
+)
+
+var (
+	errMemberRemoved  = fmt.Errorf("the member has been permanently removed from the cluster")
+	errMemberNotFound = fmt.Errorf("member not found")
+)
+
+// NewListener returns a listener for raft message transfer between peers.
+// It uses timeout listener to identify broken streams promptly.
+func NewListener(u url.URL, tlsinfo *transport.TLSInfo) (net.Listener, error) {
+	return transport.NewTimeoutListener(u.Host, u.Scheme, tlsinfo, ConnReadTimeout, ConnWriteTimeout)
+}
+
+// NewRoundTripper returns a roundTripper used to send requests
+// to rafthttp listener of remote peers.
+func NewRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
+	// It uses timeout transport to pair with remote timeout listeners.
+	// It sets no read/write timeout, because message in requests may
+	// take long time to write out before reading out the response.
+	return transport.NewTimeoutTransport(tlsInfo, dialTimeout, 0, 0)
+}
+
+// newStreamRoundTripper returns a roundTripper used to send stream requests
+// to rafthttp listener of remote peers.
+// Read/write timeout is set for stream roundTripper to promptly
+// find out broken status, which minimizes the number of messages
+// sent on broken connection.
+func newStreamRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
+	return transport.NewTimeoutTransport(tlsInfo, dialTimeout, ConnReadTimeout, ConnWriteTimeout)
+}
+
+// createPostRequest creates a HTTP POST request that sends raft message.
+func createPostRequest(u url.URL, path string, body io.Reader, ct string, urls types.URLs, from, cid types.ID) *http.Request {
+	uu := u
+	uu.Path = path
+	req, err := http.NewRequest("POST", uu.String(), body)
+	if err != nil {
+		plog.Panicf("unexpected new request error (%v)", err)
+	}
+	req.Header.Set("Content-Type", ct)
+	req.Header.Set("X-Server-From", from.String())
+	req.Header.Set("X-Server-Version", version.Version)
+	req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
+	req.Header.Set("X-Etcd-Cluster-ID", cid.String())
+	setPeerURLsHeader(req, urls)
+
+	return req
+}
+
+// checkPostResponse checks the response of the HTTP POST request that sends
+// raft message.
+func checkPostResponse(resp *http.Response, body []byte, req *http.Request, to types.ID) error {
+	switch resp.StatusCode {
+	case http.StatusPreconditionFailed:
+		switch strings.TrimSuffix(string(body), "\n") {
+		case errIncompatibleVersion.Error():
+			plog.Errorf("request sent was ignored by peer %s (server version incompatible)", to)
+			return errIncompatibleVersion
+		case errClusterIDMismatch.Error():
+			plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)",
+				to, resp.Header.Get("X-Etcd-Cluster-ID"), req.Header.Get("X-Etcd-Cluster-ID"))
+			return errClusterIDMismatch
+		default:
+			return fmt.Errorf("unhandled error %q when precondition failed", string(body))
+		}
+	case http.StatusForbidden:
+		return errMemberRemoved
+	case http.StatusNoContent:
+		return nil
+	default:
+		return fmt.Errorf("unexpected http status %s while posting to %q", http.StatusText(resp.StatusCode), req.URL.String())
+	}
+}
+
+// reportCriticalError reports the given error through sending it into
+// the given error channel.
+// If the error channel is filled up when sending error, it drops the error
+// because the fact that error has happened is reported, which is
+// good enough.
+func reportCriticalError(err error, errc chan<- error) {
+	select {
+	case errc <- err:
+	default:
+	}
+}
+
+// compareMajorMinorVersion returns an integer comparing two versions based on
+// their major and minor version. The result will be 0 if a==b, -1 if a < b,
+// and 1 if a > b.
+func compareMajorMinorVersion(a, b *semver.Version) int {
+	na := &semver.Version{Major: a.Major, Minor: a.Minor}
+	nb := &semver.Version{Major: b.Major, Minor: b.Minor}
+	switch {
+	case na.LessThan(*nb):
+		return -1
+	case nb.LessThan(*na):
+		return 1
+	default:
+		return 0
+	}
+}
+
+// serverVersion returns the server version from the given header.
+func serverVersion(h http.Header) *semver.Version {
+	verStr := h.Get("X-Server-Version")
+	// backward compatibility with etcd 2.0
+	if verStr == "" {
+		verStr = "2.0.0"
+	}
+	return semver.Must(semver.NewVersion(verStr))
+}
+
+// serverVersion returns the min cluster version from the given header.
+func minClusterVersion(h http.Header) *semver.Version {
+	verStr := h.Get("X-Min-Cluster-Version")
+	// backward compatibility with etcd 2.0
+	if verStr == "" {
+		verStr = "2.0.0"
+	}
+	return semver.Must(semver.NewVersion(verStr))
+}
+
+// checkVersionCompatibility checks whether the given version is compatible
+// with the local version.
+func checkVersionCompatibility(name string, server, minCluster *semver.Version) (
+	localServer *semver.Version,
+	localMinCluster *semver.Version,
+	err error) {
+	localServer = semver.Must(semver.NewVersion(version.Version))
+	localMinCluster = semver.Must(semver.NewVersion(version.MinClusterVersion))
+	if compareMajorMinorVersion(server, localMinCluster) == -1 {
+		return localServer, localMinCluster, fmt.Errorf("remote version is too low: remote[%s]=%s, local=%s", name, server, localServer)
+	}
+	if compareMajorMinorVersion(minCluster, localServer) == 1 {
+		return localServer, localMinCluster, fmt.Errorf("local version is too low: remote[%s]=%s, local=%s", name, server, localServer)
+	}
+	return localServer, localMinCluster, nil
+}
+
+// setPeerURLsHeader reports local urls for peer discovery
+func setPeerURLsHeader(req *http.Request, urls types.URLs) {
+	if urls == nil {
+		// often not set in unit tests
+		return
+	}
+	peerURLs := make([]string, urls.Len())
+	for i := range urls {
+		peerURLs[i] = urls[i].String()
+	}
+	req.Header.Set("X-PeerURLs", strings.Join(peerURLs, ","))
+}
+
+// addRemoteFromRequest adds a remote peer according to an http request header
+func addRemoteFromRequest(tr Transporter, r *http.Request) {
+	if from, err := types.IDFromString(r.Header.Get("X-Server-From")); err == nil {
+		if urls := r.Header.Get("X-PeerURLs"); urls != "" {
+			tr.AddRemote(from, strings.Split(urls, ","))
+		}
+	}
+}