[VOL-2193] Create mocks for Kafka Client and Etcd
This commit consists of:
1) A kafka client mock that implements the kafka client interface
under voltha-lib-go/pkg/kafka/client.go
2) An embedded Etcd server that runs in-process and represents an
Etcd server.
Change-Id: I52a36132568e08c596bb4136918bebcb654a3b99
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/coder.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/coder.go
new file mode 100644
index 0000000..12c3e44
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/coder.go
@@ -0,0 +1,27 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import "go.etcd.io/etcd/raft/raftpb"
+
+type encoder interface {
+ // encode encodes the given message to an output stream.
+ encode(m *raftpb.Message) error
+}
+
+type decoder interface {
+ // decode decodes the message from an input stream.
+ decode() (raftpb.Message, error)
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/doc.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/doc.go
new file mode 100644
index 0000000..a9486a8
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/doc.go
@@ -0,0 +1,16 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Package rafthttp implements HTTP transportation layer for etcd/raft pkg.
+package rafthttp
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/http.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/http.go
new file mode 100644
index 0000000..d0e0c81
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/http.go
@@ -0,0 +1,577 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "path"
+ "strings"
+ "time"
+
+ "go.etcd.io/etcd/etcdserver/api/snap"
+ pioutil "go.etcd.io/etcd/pkg/ioutil"
+ "go.etcd.io/etcd/pkg/types"
+ "go.etcd.io/etcd/raft/raftpb"
+ "go.etcd.io/etcd/version"
+
+ humanize "github.com/dustin/go-humanize"
+ "go.uber.org/zap"
+)
+
+const (
+ // connReadLimitByte limits the number of bytes
+ // a single read can read out.
+ //
+ // 64KB should be large enough for not causing
+ // throughput bottleneck as well as small enough
+ // for not causing a read timeout.
+ connReadLimitByte = 64 * 1024
+)
+
+var (
+ RaftPrefix = "/raft"
+ ProbingPrefix = path.Join(RaftPrefix, "probing")
+ RaftStreamPrefix = path.Join(RaftPrefix, "stream")
+ RaftSnapshotPrefix = path.Join(RaftPrefix, "snapshot")
+
+ errIncompatibleVersion = errors.New("incompatible version")
+ errClusterIDMismatch = errors.New("cluster ID mismatch")
+)
+
+type peerGetter interface {
+ Get(id types.ID) Peer
+}
+
+type writerToResponse interface {
+ WriteTo(w http.ResponseWriter)
+}
+
+type pipelineHandler struct {
+ lg *zap.Logger
+ localID types.ID
+ tr Transporter
+ r Raft
+ cid types.ID
+}
+
+// newPipelineHandler returns a handler for handling raft messages
+// from pipeline for RaftPrefix.
+//
+// The handler reads out the raft message from request body,
+// and forwards it to the given raft state machine for processing.
+func newPipelineHandler(t *Transport, r Raft, cid types.ID) http.Handler {
+ return &pipelineHandler{
+ lg: t.Logger,
+ localID: t.ID,
+ tr: t,
+ r: r,
+ cid: cid,
+ }
+}
+
+func (h *pipelineHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if r.Method != "POST" {
+ w.Header().Set("Allow", "POST")
+ http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
+
+ if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
+ http.Error(w, err.Error(), http.StatusPreconditionFailed)
+ return
+ }
+
+ addRemoteFromRequest(h.tr, r)
+
+ // Limit the data size that could be read from the request body, which ensures that read from
+ // connection will not time out accidentally due to possible blocking in underlying implementation.
+ limitedr := pioutil.NewLimitedBufferReader(r.Body, connReadLimitByte)
+ b, err := ioutil.ReadAll(limitedr)
+ if err != nil {
+ if h.lg != nil {
+ h.lg.Warn(
+ "failed to read Raft message",
+ zap.String("local-member-id", h.localID.String()),
+ zap.Error(err),
+ )
+ } else {
+ plog.Errorf("failed to read raft message (%v)", err)
+ }
+ http.Error(w, "error reading raft message", http.StatusBadRequest)
+ recvFailures.WithLabelValues(r.RemoteAddr).Inc()
+ return
+ }
+
+ var m raftpb.Message
+ if err := m.Unmarshal(b); err != nil {
+ if h.lg != nil {
+ h.lg.Warn(
+ "failed to unmarshal Raft message",
+ zap.String("local-member-id", h.localID.String()),
+ zap.Error(err),
+ )
+ } else {
+ plog.Errorf("failed to unmarshal raft message (%v)", err)
+ }
+ http.Error(w, "error unmarshalling raft message", http.StatusBadRequest)
+ recvFailures.WithLabelValues(r.RemoteAddr).Inc()
+ return
+ }
+
+ receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(len(b)))
+
+ if err := h.r.Process(context.TODO(), m); err != nil {
+ switch v := err.(type) {
+ case writerToResponse:
+ v.WriteTo(w)
+ default:
+ if h.lg != nil {
+ h.lg.Warn(
+ "failed to process Raft message",
+ zap.String("local-member-id", h.localID.String()),
+ zap.Error(err),
+ )
+ } else {
+ plog.Warningf("failed to process raft message (%v)", err)
+ }
+ http.Error(w, "error processing raft message", http.StatusInternalServerError)
+ w.(http.Flusher).Flush()
+ // disconnect the http stream
+ panic(err)
+ }
+ return
+ }
+
+ // Write StatusNoContent header after the message has been processed by
+ // raft, which facilitates the client to report MsgSnap status.
+ w.WriteHeader(http.StatusNoContent)
+}
+
+type snapshotHandler struct {
+ lg *zap.Logger
+ tr Transporter
+ r Raft
+ snapshotter *snap.Snapshotter
+
+ localID types.ID
+ cid types.ID
+}
+
+func newSnapshotHandler(t *Transport, r Raft, snapshotter *snap.Snapshotter, cid types.ID) http.Handler {
+ return &snapshotHandler{
+ lg: t.Logger,
+ tr: t,
+ r: r,
+ snapshotter: snapshotter,
+ localID: t.ID,
+ cid: cid,
+ }
+}
+
+const unknownSnapshotSender = "UNKNOWN_SNAPSHOT_SENDER"
+
+// ServeHTTP serves HTTP request to receive and process snapshot message.
+//
+// If request sender dies without closing underlying TCP connection,
+// the handler will keep waiting for the request body until TCP keepalive
+// finds out that the connection is broken after several minutes.
+// This is acceptable because
+// 1. snapshot messages sent through other TCP connections could still be
+// received and processed.
+// 2. this case should happen rarely, so no further optimization is done.
+func (h *snapshotHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ start := time.Now()
+
+ if r.Method != "POST" {
+ w.Header().Set("Allow", "POST")
+ http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
+ snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
+ return
+ }
+
+ w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
+
+ if err := checkClusterCompatibilityFromHeader(h.lg, h.localID, r.Header, h.cid); err != nil {
+ http.Error(w, err.Error(), http.StatusPreconditionFailed)
+ snapshotReceiveFailures.WithLabelValues(unknownSnapshotSender).Inc()
+ return
+ }
+
+ addRemoteFromRequest(h.tr, r)
+
+ dec := &messageDecoder{r: r.Body}
+ // let snapshots be very large since they can exceed 512MB for large installations
+ m, err := dec.decodeLimit(uint64(1 << 63))
+ from := types.ID(m.From).String()
+ if err != nil {
+ msg := fmt.Sprintf("failed to decode raft message (%v)", err)
+ if h.lg != nil {
+ h.lg.Warn(
+ "failed to decode Raft message",
+ zap.String("local-member-id", h.localID.String()),
+ zap.String("remote-snapshot-sender-id", from),
+ zap.Error(err),
+ )
+ } else {
+ plog.Error(msg)
+ }
+ http.Error(w, msg, http.StatusBadRequest)
+ recvFailures.WithLabelValues(r.RemoteAddr).Inc()
+ snapshotReceiveFailures.WithLabelValues(from).Inc()
+ return
+ }
+
+ msgSize := m.Size()
+ receivedBytes.WithLabelValues(from).Add(float64(msgSize))
+
+ if m.Type != raftpb.MsgSnap {
+ if h.lg != nil {
+ h.lg.Warn(
+ "unexpected Raft message type",
+ zap.String("local-member-id", h.localID.String()),
+ zap.String("remote-snapshot-sender-id", from),
+ zap.String("message-type", m.Type.String()),
+ )
+ } else {
+ plog.Errorf("unexpected raft message type %s on snapshot path", m.Type)
+ }
+ http.Error(w, "wrong raft message type", http.StatusBadRequest)
+ snapshotReceiveFailures.WithLabelValues(from).Inc()
+ return
+ }
+
+ snapshotReceiveInflights.WithLabelValues(from).Inc()
+ defer func() {
+ snapshotReceiveInflights.WithLabelValues(from).Dec()
+ }()
+
+ if h.lg != nil {
+ h.lg.Info(
+ "receiving database snapshot",
+ zap.String("local-member-id", h.localID.String()),
+ zap.String("remote-snapshot-sender-id", from),
+ zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
+ zap.Int("incoming-snapshot-message-size-bytes", msgSize),
+ zap.String("incoming-snapshot-message-size", humanize.Bytes(uint64(msgSize))),
+ )
+ } else {
+ plog.Infof("receiving database snapshot [index:%d, from %s] ...", m.Snapshot.Metadata.Index, types.ID(m.From))
+ }
+
+ // save incoming database snapshot.
+ n, err := h.snapshotter.SaveDBFrom(r.Body, m.Snapshot.Metadata.Index)
+ if err != nil {
+ msg := fmt.Sprintf("failed to save KV snapshot (%v)", err)
+ if h.lg != nil {
+ h.lg.Warn(
+ "failed to save incoming database snapshot",
+ zap.String("local-member-id", h.localID.String()),
+ zap.String("remote-snapshot-sender-id", from),
+ zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
+ zap.Error(err),
+ )
+ } else {
+ plog.Error(msg)
+ }
+ http.Error(w, msg, http.StatusInternalServerError)
+ snapshotReceiveFailures.WithLabelValues(from).Inc()
+ return
+ }
+
+ receivedBytes.WithLabelValues(from).Add(float64(n))
+
+ if h.lg != nil {
+ h.lg.Info(
+ "received and saved database snapshot",
+ zap.String("local-member-id", h.localID.String()),
+ zap.String("remote-snapshot-sender-id", from),
+ zap.Uint64("incoming-snapshot-index", m.Snapshot.Metadata.Index),
+ zap.Int64("incoming-snapshot-size-bytes", n),
+ zap.String("incoming-snapshot-size", humanize.Bytes(uint64(n))),
+ )
+ } else {
+ plog.Infof("received and saved database snapshot [index: %d, from: %s] successfully", m.Snapshot.Metadata.Index, types.ID(m.From))
+ }
+
+ if err := h.r.Process(context.TODO(), m); err != nil {
+ switch v := err.(type) {
+ // Process may return writerToResponse error when doing some
+ // additional checks before calling raft.Node.Step.
+ case writerToResponse:
+ v.WriteTo(w)
+ default:
+ msg := fmt.Sprintf("failed to process raft message (%v)", err)
+ if h.lg != nil {
+ h.lg.Warn(
+ "failed to process Raft message",
+ zap.String("local-member-id", h.localID.String()),
+ zap.String("remote-snapshot-sender-id", from),
+ zap.Error(err),
+ )
+ } else {
+ plog.Error(msg)
+ }
+ http.Error(w, msg, http.StatusInternalServerError)
+ snapshotReceiveFailures.WithLabelValues(from).Inc()
+ }
+ return
+ }
+
+ // Write StatusNoContent header after the message has been processed by
+ // raft, which facilitates the client to report MsgSnap status.
+ w.WriteHeader(http.StatusNoContent)
+
+ snapshotReceive.WithLabelValues(from).Inc()
+ snapshotReceiveSeconds.WithLabelValues(from).Observe(time.Since(start).Seconds())
+}
+
+type streamHandler struct {
+ lg *zap.Logger
+ tr *Transport
+ peerGetter peerGetter
+ r Raft
+ id types.ID
+ cid types.ID
+}
+
+func newStreamHandler(t *Transport, pg peerGetter, r Raft, id, cid types.ID) http.Handler {
+ return &streamHandler{
+ lg: t.Logger,
+ tr: t,
+ peerGetter: pg,
+ r: r,
+ id: id,
+ cid: cid,
+ }
+}
+
+func (h *streamHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ if r.Method != "GET" {
+ w.Header().Set("Allow", "GET")
+ http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
+ return
+ }
+
+ w.Header().Set("X-Server-Version", version.Version)
+ w.Header().Set("X-Etcd-Cluster-ID", h.cid.String())
+
+ if err := checkClusterCompatibilityFromHeader(h.lg, h.tr.ID, r.Header, h.cid); err != nil {
+ http.Error(w, err.Error(), http.StatusPreconditionFailed)
+ return
+ }
+
+ var t streamType
+ switch path.Dir(r.URL.Path) {
+ case streamTypeMsgAppV2.endpoint():
+ t = streamTypeMsgAppV2
+ case streamTypeMessage.endpoint():
+ t = streamTypeMessage
+ default:
+ if h.lg != nil {
+ h.lg.Debug(
+ "ignored unexpected streaming request path",
+ zap.String("local-member-id", h.tr.ID.String()),
+ zap.String("remote-peer-id-stream-handler", h.id.String()),
+ zap.String("path", r.URL.Path),
+ )
+ } else {
+ plog.Debugf("ignored unexpected streaming request path %s", r.URL.Path)
+ }
+ http.Error(w, "invalid path", http.StatusNotFound)
+ return
+ }
+
+ fromStr := path.Base(r.URL.Path)
+ from, err := types.IDFromString(fromStr)
+ if err != nil {
+ if h.lg != nil {
+ h.lg.Warn(
+ "failed to parse path into ID",
+ zap.String("local-member-id", h.tr.ID.String()),
+ zap.String("remote-peer-id-stream-handler", h.id.String()),
+ zap.String("path", fromStr),
+ zap.Error(err),
+ )
+ } else {
+ plog.Errorf("failed to parse from %s into ID (%v)", fromStr, err)
+ }
+ http.Error(w, "invalid from", http.StatusNotFound)
+ return
+ }
+ if h.r.IsIDRemoved(uint64(from)) {
+ if h.lg != nil {
+ h.lg.Warn(
+ "rejected stream from remote peer because it was removed",
+ zap.String("local-member-id", h.tr.ID.String()),
+ zap.String("remote-peer-id-stream-handler", h.id.String()),
+ zap.String("remote-peer-id-from", from.String()),
+ )
+ } else {
+ plog.Warningf("rejected the stream from peer %s since it was removed", from)
+ }
+ http.Error(w, "removed member", http.StatusGone)
+ return
+ }
+ p := h.peerGetter.Get(from)
+ if p == nil {
+ // This may happen in following cases:
+ // 1. user starts a remote peer that belongs to a different cluster
+ // with the same cluster ID.
+ // 2. local etcd falls behind of the cluster, and cannot recognize
+ // the members that joined after its current progress.
+ if urls := r.Header.Get("X-PeerURLs"); urls != "" {
+ h.tr.AddRemote(from, strings.Split(urls, ","))
+ }
+ if h.lg != nil {
+ h.lg.Warn(
+ "failed to find remote peer in cluster",
+ zap.String("local-member-id", h.tr.ID.String()),
+ zap.String("remote-peer-id-stream-handler", h.id.String()),
+ zap.String("remote-peer-id-from", from.String()),
+ zap.String("cluster-id", h.cid.String()),
+ )
+ } else {
+ plog.Errorf("failed to find member %s in cluster %s", from, h.cid)
+ }
+ http.Error(w, "error sender not found", http.StatusNotFound)
+ return
+ }
+
+ wto := h.id.String()
+ if gto := r.Header.Get("X-Raft-To"); gto != wto {
+ if h.lg != nil {
+ h.lg.Warn(
+ "ignored streaming request; ID mismatch",
+ zap.String("local-member-id", h.tr.ID.String()),
+ zap.String("remote-peer-id-stream-handler", h.id.String()),
+ zap.String("remote-peer-id-header", gto),
+ zap.String("remote-peer-id-from", from.String()),
+ zap.String("cluster-id", h.cid.String()),
+ )
+ } else {
+ plog.Errorf("streaming request ignored (ID mismatch got %s want %s)", gto, wto)
+ }
+ http.Error(w, "to field mismatch", http.StatusPreconditionFailed)
+ return
+ }
+
+ w.WriteHeader(http.StatusOK)
+ w.(http.Flusher).Flush()
+
+ c := newCloseNotifier()
+ conn := &outgoingConn{
+ t: t,
+ Writer: w,
+ Flusher: w.(http.Flusher),
+ Closer: c,
+ localID: h.tr.ID,
+ peerID: h.id,
+ }
+ p.attachOutgoingConn(conn)
+ <-c.closeNotify()
+}
+
+// checkClusterCompatibilityFromHeader checks the cluster compatibility of
+// the local member from the given header.
+// It checks whether the version of local member is compatible with
+// the versions in the header, and whether the cluster ID of local member
+// matches the one in the header.
+func checkClusterCompatibilityFromHeader(lg *zap.Logger, localID types.ID, header http.Header, cid types.ID) error {
+ remoteName := header.Get("X-Server-From")
+
+ remoteServer := serverVersion(header)
+ remoteVs := ""
+ if remoteServer != nil {
+ remoteVs = remoteServer.String()
+ }
+
+ remoteMinClusterVer := minClusterVersion(header)
+ remoteMinClusterVs := ""
+ if remoteMinClusterVer != nil {
+ remoteMinClusterVs = remoteMinClusterVer.String()
+ }
+
+ localServer, localMinCluster, err := checkVersionCompatibility(remoteName, remoteServer, remoteMinClusterVer)
+
+ localVs := ""
+ if localServer != nil {
+ localVs = localServer.String()
+ }
+ localMinClusterVs := ""
+ if localMinCluster != nil {
+ localMinClusterVs = localMinCluster.String()
+ }
+
+ if err != nil {
+ if lg != nil {
+ lg.Warn(
+ "failed to check version compatibility",
+ zap.String("local-member-id", localID.String()),
+ zap.String("local-member-cluster-id", cid.String()),
+ zap.String("local-member-server-version", localVs),
+ zap.String("local-member-server-minimum-cluster-version", localMinClusterVs),
+ zap.String("remote-peer-server-name", remoteName),
+ zap.String("remote-peer-server-version", remoteVs),
+ zap.String("remote-peer-server-minimum-cluster-version", remoteMinClusterVs),
+ zap.Error(err),
+ )
+ } else {
+ plog.Errorf("request version incompatibility (%v)", err)
+ }
+ return errIncompatibleVersion
+ }
+ if gcid := header.Get("X-Etcd-Cluster-ID"); gcid != cid.String() {
+ if lg != nil {
+ lg.Warn(
+ "request cluster ID mismatch",
+ zap.String("local-member-id", localID.String()),
+ zap.String("local-member-cluster-id", cid.String()),
+ zap.String("local-member-server-version", localVs),
+ zap.String("local-member-server-minimum-cluster-version", localMinClusterVs),
+ zap.String("remote-peer-server-name", remoteName),
+ zap.String("remote-peer-server-version", remoteVs),
+ zap.String("remote-peer-server-minimum-cluster-version", remoteMinClusterVs),
+ zap.String("remote-peer-cluster-id", gcid),
+ )
+ } else {
+ plog.Errorf("request cluster ID mismatch (got %s want %s)", gcid, cid)
+ }
+ return errClusterIDMismatch
+ }
+ return nil
+}
+
+type closeNotifier struct {
+ done chan struct{}
+}
+
+func newCloseNotifier() *closeNotifier {
+ return &closeNotifier{
+ done: make(chan struct{}),
+ }
+}
+
+func (n *closeNotifier) Close() error {
+ close(n.done)
+ return nil
+}
+
+func (n *closeNotifier) closeNotify() <-chan struct{} { return n.done }
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/metrics.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/metrics.go
new file mode 100644
index 0000000..02fff84
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/metrics.go
@@ -0,0 +1,186 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import "github.com/prometheus/client_golang/prometheus"
+
+var (
+ activePeers = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "etcd",
+ Subsystem: "network",
+ Name: "active_peers",
+ Help: "The current number of active peer connections.",
+ },
+ []string{"Local", "Remote"},
+ )
+
+ disconnectedPeers = prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "network",
+ Name: "disconnected_peers_total",
+ Help: "The total number of disconnected peers.",
+ },
+ []string{"Local", "Remote"},
+ )
+
+ sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "network",
+ Name: "peer_sent_bytes_total",
+ Help: "The total number of bytes sent to peers.",
+ },
+ []string{"To"},
+ )
+
+ receivedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "network",
+ Name: "peer_received_bytes_total",
+ Help: "The total number of bytes received from peers.",
+ },
+ []string{"From"},
+ )
+
+ sentFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "network",
+ Name: "peer_sent_failures_total",
+ Help: "The total number of send failures from peers.",
+ },
+ []string{"To"},
+ )
+
+ recvFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "network",
+ Name: "peer_received_failures_total",
+ Help: "The total number of receive failures from peers.",
+ },
+ []string{"From"},
+ )
+
+ snapshotSend = prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "network",
+ Name: "snapshot_send_success",
+ Help: "Total number of successful snapshot sends",
+ },
+ []string{"To"},
+ )
+
+ snapshotSendInflights = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "etcd",
+ Subsystem: "network",
+ Name: "snapshot_send_inflights_total",
+ Help: "Total number of inflight snapshot sends",
+ },
+ []string{"To"},
+ )
+
+ snapshotSendFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "network",
+ Name: "snapshot_send_failures",
+ Help: "Total number of snapshot send failures",
+ },
+ []string{"To"},
+ )
+
+ snapshotSendSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
+ Namespace: "etcd",
+ Subsystem: "network",
+ Name: "snapshot_send_total_duration_seconds",
+ Help: "Total latency distributions of v3 snapshot sends",
+
+ // lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
+ // highest bucket start of 0.1 sec * 2^9 == 51.2 sec
+ Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
+ },
+ []string{"To"},
+ )
+
+ snapshotReceive = prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "network",
+ Name: "snapshot_receive_success",
+ Help: "Total number of successful snapshot receives",
+ },
+ []string{"From"},
+ )
+
+ snapshotReceiveInflights = prometheus.NewGaugeVec(prometheus.GaugeOpts{
+ Namespace: "etcd",
+ Subsystem: "network",
+ Name: "snapshot_receive_inflights_total",
+ Help: "Total number of inflight snapshot receives",
+ },
+ []string{"From"},
+ )
+
+ snapshotReceiveFailures = prometheus.NewCounterVec(prometheus.CounterOpts{
+ Namespace: "etcd",
+ Subsystem: "network",
+ Name: "snapshot_receive_failures",
+ Help: "Total number of snapshot receive failures",
+ },
+ []string{"From"},
+ )
+
+ snapshotReceiveSeconds = prometheus.NewHistogramVec(prometheus.HistogramOpts{
+ Namespace: "etcd",
+ Subsystem: "network",
+ Name: "snapshot_receive_total_duration_seconds",
+ Help: "Total latency distributions of v3 snapshot receives",
+
+ // lowest bucket start of upper bound 0.1 sec (100 ms) with factor 2
+ // highest bucket start of 0.1 sec * 2^9 == 51.2 sec
+ Buckets: prometheus.ExponentialBuckets(0.1, 2, 10),
+ },
+ []string{"From"},
+ )
+
+ rttSec = prometheus.NewHistogramVec(prometheus.HistogramOpts{
+ Namespace: "etcd",
+ Subsystem: "network",
+ Name: "peer_round_trip_time_seconds",
+ Help: "Round-Trip-Time histogram between peers",
+
+ // lowest bucket start of upper bound 0.0001 sec (0.1 ms) with factor 2
+ // highest bucket start of 0.0001 sec * 2^15 == 3.2768 sec
+ Buckets: prometheus.ExponentialBuckets(0.0001, 2, 16),
+ },
+ []string{"To"},
+ )
+)
+
+func init() {
+ prometheus.MustRegister(activePeers)
+ prometheus.MustRegister(disconnectedPeers)
+ prometheus.MustRegister(sentBytes)
+ prometheus.MustRegister(receivedBytes)
+ prometheus.MustRegister(sentFailures)
+ prometheus.MustRegister(recvFailures)
+
+ prometheus.MustRegister(snapshotSend)
+ prometheus.MustRegister(snapshotSendInflights)
+ prometheus.MustRegister(snapshotSendFailures)
+ prometheus.MustRegister(snapshotSendSeconds)
+ prometheus.MustRegister(snapshotReceive)
+ prometheus.MustRegister(snapshotReceiveInflights)
+ prometheus.MustRegister(snapshotReceiveFailures)
+ prometheus.MustRegister(snapshotReceiveSeconds)
+
+ prometheus.MustRegister(rttSec)
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/msg_codec.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/msg_codec.go
new file mode 100644
index 0000000..2417d22
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/msg_codec.go
@@ -0,0 +1,68 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+ "encoding/binary"
+ "errors"
+ "io"
+
+ "go.etcd.io/etcd/pkg/pbutil"
+ "go.etcd.io/etcd/raft/raftpb"
+)
+
+// messageEncoder is a encoder that can encode all kinds of messages.
+// It MUST be used with a paired messageDecoder.
+type messageEncoder struct {
+ w io.Writer
+}
+
+func (enc *messageEncoder) encode(m *raftpb.Message) error {
+ if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
+ return err
+ }
+ _, err := enc.w.Write(pbutil.MustMarshal(m))
+ return err
+}
+
+// messageDecoder is a decoder that can decode all kinds of messages.
+type messageDecoder struct {
+ r io.Reader
+}
+
+var (
+ readBytesLimit uint64 = 512 * 1024 * 1024 // 512 MB
+ ErrExceedSizeLimit = errors.New("rafthttp: error limit exceeded")
+)
+
+func (dec *messageDecoder) decode() (raftpb.Message, error) {
+ return dec.decodeLimit(readBytesLimit)
+}
+
+func (dec *messageDecoder) decodeLimit(numBytes uint64) (raftpb.Message, error) {
+ var m raftpb.Message
+ var l uint64
+ if err := binary.Read(dec.r, binary.BigEndian, &l); err != nil {
+ return m, err
+ }
+ if l > numBytes {
+ return m, ErrExceedSizeLimit
+ }
+ buf := make([]byte, int(l))
+ if _, err := io.ReadFull(dec.r, buf); err != nil {
+ return m, err
+ }
+ return m, m.Unmarshal(buf)
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/msgappv2_codec.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/msgappv2_codec.go
new file mode 100644
index 0000000..1fa36de
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/msgappv2_codec.go
@@ -0,0 +1,248 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+ "encoding/binary"
+ "fmt"
+ "io"
+ "time"
+
+ stats "go.etcd.io/etcd/etcdserver/api/v2stats"
+ "go.etcd.io/etcd/pkg/pbutil"
+ "go.etcd.io/etcd/pkg/types"
+ "go.etcd.io/etcd/raft/raftpb"
+)
+
+const (
+ msgTypeLinkHeartbeat uint8 = 0
+ msgTypeAppEntries uint8 = 1
+ msgTypeApp uint8 = 2
+
+ msgAppV2BufSize = 1024 * 1024
+)
+
+// msgappv2 stream sends three types of message: linkHeartbeatMessage,
+// AppEntries and MsgApp. AppEntries is the MsgApp that is sent in
+// replicate state in raft, whose index and term are fully predictable.
+//
+// Data format of linkHeartbeatMessage:
+// | offset | bytes | description |
+// +--------+-------+-------------+
+// | 0 | 1 | \x00 |
+//
+// Data format of AppEntries:
+// | offset | bytes | description |
+// +--------+-------+-------------+
+// | 0 | 1 | \x01 |
+// | 1 | 8 | length of entries |
+// | 9 | 8 | length of first entry |
+// | 17 | n1 | first entry |
+// ...
+// | x | 8 | length of k-th entry data |
+// | x+8 | nk | k-th entry data |
+// | x+8+nk | 8 | commit index |
+//
+// Data format of MsgApp:
+// | offset | bytes | description |
+// +--------+-------+-------------+
+// | 0 | 1 | \x02 |
+// | 1 | 8 | length of encoded message |
+// | 9 | n | encoded message |
+type msgAppV2Encoder struct {
+ w io.Writer
+ fs *stats.FollowerStats
+
+ term uint64
+ index uint64
+ buf []byte
+ uint64buf []byte
+ uint8buf []byte
+}
+
+func newMsgAppV2Encoder(w io.Writer, fs *stats.FollowerStats) *msgAppV2Encoder {
+ return &msgAppV2Encoder{
+ w: w,
+ fs: fs,
+ buf: make([]byte, msgAppV2BufSize),
+ uint64buf: make([]byte, 8),
+ uint8buf: make([]byte, 1),
+ }
+}
+
+func (enc *msgAppV2Encoder) encode(m *raftpb.Message) error {
+ start := time.Now()
+ switch {
+ case isLinkHeartbeatMessage(m):
+ enc.uint8buf[0] = msgTypeLinkHeartbeat
+ if _, err := enc.w.Write(enc.uint8buf); err != nil {
+ return err
+ }
+ case enc.index == m.Index && enc.term == m.LogTerm && m.LogTerm == m.Term:
+ enc.uint8buf[0] = msgTypeAppEntries
+ if _, err := enc.w.Write(enc.uint8buf); err != nil {
+ return err
+ }
+ // write length of entries
+ binary.BigEndian.PutUint64(enc.uint64buf, uint64(len(m.Entries)))
+ if _, err := enc.w.Write(enc.uint64buf); err != nil {
+ return err
+ }
+ for i := 0; i < len(m.Entries); i++ {
+ // write length of entry
+ binary.BigEndian.PutUint64(enc.uint64buf, uint64(m.Entries[i].Size()))
+ if _, err := enc.w.Write(enc.uint64buf); err != nil {
+ return err
+ }
+ if n := m.Entries[i].Size(); n < msgAppV2BufSize {
+ if _, err := m.Entries[i].MarshalTo(enc.buf); err != nil {
+ return err
+ }
+ if _, err := enc.w.Write(enc.buf[:n]); err != nil {
+ return err
+ }
+ } else {
+ if _, err := enc.w.Write(pbutil.MustMarshal(&m.Entries[i])); err != nil {
+ return err
+ }
+ }
+ enc.index++
+ }
+ // write commit index
+ binary.BigEndian.PutUint64(enc.uint64buf, m.Commit)
+ if _, err := enc.w.Write(enc.uint64buf); err != nil {
+ return err
+ }
+ enc.fs.Succ(time.Since(start))
+ default:
+ if err := binary.Write(enc.w, binary.BigEndian, msgTypeApp); err != nil {
+ return err
+ }
+ // write size of message
+ if err := binary.Write(enc.w, binary.BigEndian, uint64(m.Size())); err != nil {
+ return err
+ }
+ // write message
+ if _, err := enc.w.Write(pbutil.MustMarshal(m)); err != nil {
+ return err
+ }
+
+ enc.term = m.Term
+ enc.index = m.Index
+ if l := len(m.Entries); l > 0 {
+ enc.index = m.Entries[l-1].Index
+ }
+ enc.fs.Succ(time.Since(start))
+ }
+ return nil
+}
+
+type msgAppV2Decoder struct {
+ r io.Reader
+ local, remote types.ID
+
+ term uint64
+ index uint64
+ buf []byte
+ uint64buf []byte
+ uint8buf []byte
+}
+
+func newMsgAppV2Decoder(r io.Reader, local, remote types.ID) *msgAppV2Decoder {
+ return &msgAppV2Decoder{
+ r: r,
+ local: local,
+ remote: remote,
+ buf: make([]byte, msgAppV2BufSize),
+ uint64buf: make([]byte, 8),
+ uint8buf: make([]byte, 1),
+ }
+}
+
+func (dec *msgAppV2Decoder) decode() (raftpb.Message, error) {
+ var (
+ m raftpb.Message
+ typ uint8
+ )
+ if _, err := io.ReadFull(dec.r, dec.uint8buf); err != nil {
+ return m, err
+ }
+ typ = dec.uint8buf[0]
+ switch typ {
+ case msgTypeLinkHeartbeat:
+ return linkHeartbeatMessage, nil
+ case msgTypeAppEntries:
+ m = raftpb.Message{
+ Type: raftpb.MsgApp,
+ From: uint64(dec.remote),
+ To: uint64(dec.local),
+ Term: dec.term,
+ LogTerm: dec.term,
+ Index: dec.index,
+ }
+
+ // decode entries
+ if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil {
+ return m, err
+ }
+ l := binary.BigEndian.Uint64(dec.uint64buf)
+ m.Entries = make([]raftpb.Entry, int(l))
+ for i := 0; i < int(l); i++ {
+ if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil {
+ return m, err
+ }
+ size := binary.BigEndian.Uint64(dec.uint64buf)
+ var buf []byte
+ if size < msgAppV2BufSize {
+ buf = dec.buf[:size]
+ if _, err := io.ReadFull(dec.r, buf); err != nil {
+ return m, err
+ }
+ } else {
+ buf = make([]byte, int(size))
+ if _, err := io.ReadFull(dec.r, buf); err != nil {
+ return m, err
+ }
+ }
+ dec.index++
+ // 1 alloc
+ pbutil.MustUnmarshal(&m.Entries[i], buf)
+ }
+ // decode commit index
+ if _, err := io.ReadFull(dec.r, dec.uint64buf); err != nil {
+ return m, err
+ }
+ m.Commit = binary.BigEndian.Uint64(dec.uint64buf)
+ case msgTypeApp:
+ var size uint64
+ if err := binary.Read(dec.r, binary.BigEndian, &size); err != nil {
+ return m, err
+ }
+ buf := make([]byte, int(size))
+ if _, err := io.ReadFull(dec.r, buf); err != nil {
+ return m, err
+ }
+ pbutil.MustUnmarshal(&m, buf)
+
+ dec.term = m.Term
+ dec.index = m.Index
+ if l := len(m.Entries); l > 0 {
+ dec.index = m.Entries[l-1].Index
+ }
+ default:
+ return m, fmt.Errorf("failed to parse type %d in msgappv2 stream", typ)
+ }
+ return m, nil
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/peer.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/peer.go
new file mode 100644
index 0000000..8130c4a
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/peer.go
@@ -0,0 +1,374 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "go.etcd.io/etcd/etcdserver/api/snap"
+ stats "go.etcd.io/etcd/etcdserver/api/v2stats"
+ "go.etcd.io/etcd/pkg/types"
+ "go.etcd.io/etcd/raft"
+ "go.etcd.io/etcd/raft/raftpb"
+
+ "go.uber.org/zap"
+ "golang.org/x/time/rate"
+)
+
+const (
+ // ConnReadTimeout and ConnWriteTimeout are the i/o timeout set on each connection rafthttp pkg creates.
+ // A 5 seconds timeout is good enough for recycling bad connections. Or we have to wait for
+ // tcp keepalive failing to detect a bad connection, which is at minutes level.
+ // For long term streaming connections, rafthttp pkg sends application level linkHeartbeatMessage
+ // to keep the connection alive.
+ // For short term pipeline connections, the connection MUST be killed to avoid it being
+ // put back to http pkg connection pool.
+ ConnReadTimeout = 5 * time.Second
+ ConnWriteTimeout = 5 * time.Second
+
+ recvBufSize = 4096
+ // maxPendingProposals holds the proposals during one leader election process.
+ // Generally one leader election takes at most 1 sec. It should have
+ // 0-2 election conflicts, and each one takes 0.5 sec.
+ // We assume the number of concurrent proposers is smaller than 4096.
+ // One client blocks on its proposal for at least 1 sec, so 4096 is enough
+ // to hold all proposals.
+ maxPendingProposals = 4096
+
+ streamAppV2 = "streamMsgAppV2"
+ streamMsg = "streamMsg"
+ pipelineMsg = "pipeline"
+ sendSnap = "sendMsgSnap"
+)
+
+type Peer interface {
+ // send sends the message to the remote peer. The function is non-blocking
+ // and has no promise that the message will be received by the remote.
+ // When it fails to send message out, it will report the status to underlying
+ // raft.
+ send(m raftpb.Message)
+
+ // sendSnap sends the merged snapshot message to the remote peer. Its behavior
+ // is similar to send.
+ sendSnap(m snap.Message)
+
+ // update updates the urls of remote peer.
+ update(urls types.URLs)
+
+ // attachOutgoingConn attaches the outgoing connection to the peer for
+ // stream usage. After the call, the ownership of the outgoing
+ // connection hands over to the peer. The peer will close the connection
+ // when it is no longer used.
+ attachOutgoingConn(conn *outgoingConn)
+ // activeSince returns the time that the connection with the
+ // peer becomes active.
+ activeSince() time.Time
+ // stop performs any necessary finalization and terminates the peer
+ // elegantly.
+ stop()
+}
+
+// peer is the representative of a remote raft node. Local raft node sends
+// messages to the remote through peer.
+// Each peer has two underlying mechanisms to send out a message: stream and
+// pipeline.
+// A stream is a receiver initialized long-polling connection, which
+// is always open to transfer messages. Besides general stream, peer also has
+// a optimized stream for sending msgApp since msgApp accounts for large part
+// of all messages. Only raft leader uses the optimized stream to send msgApp
+// to the remote follower node.
+// A pipeline is a series of http clients that send http requests to the remote.
+// It is only used when the stream has not been established.
+type peer struct {
+ lg *zap.Logger
+
+ localID types.ID
+ // id of the remote raft peer node
+ id types.ID
+
+ r Raft
+
+ status *peerStatus
+
+ picker *urlPicker
+
+ msgAppV2Writer *streamWriter
+ writer *streamWriter
+ pipeline *pipeline
+ snapSender *snapshotSender // snapshot sender to send v3 snapshot messages
+ msgAppV2Reader *streamReader
+ msgAppReader *streamReader
+
+ recvc chan raftpb.Message
+ propc chan raftpb.Message
+
+ mu sync.Mutex
+ paused bool
+
+ cancel context.CancelFunc // cancel pending works in go routine created by peer.
+ stopc chan struct{}
+}
+
+func startPeer(t *Transport, urls types.URLs, peerID types.ID, fs *stats.FollowerStats) *peer {
+ if t.Logger != nil {
+ t.Logger.Info("starting remote peer", zap.String("remote-peer-id", peerID.String()))
+ } else {
+ plog.Infof("starting peer %s...", peerID)
+ }
+ defer func() {
+ if t.Logger != nil {
+ t.Logger.Info("started remote peer", zap.String("remote-peer-id", peerID.String()))
+ } else {
+ plog.Infof("started peer %s", peerID)
+ }
+ }()
+
+ status := newPeerStatus(t.Logger, t.ID, peerID)
+ picker := newURLPicker(urls)
+ errorc := t.ErrorC
+ r := t.Raft
+ pipeline := &pipeline{
+ peerID: peerID,
+ tr: t,
+ picker: picker,
+ status: status,
+ followerStats: fs,
+ raft: r,
+ errorc: errorc,
+ }
+ pipeline.start()
+
+ p := &peer{
+ lg: t.Logger,
+ localID: t.ID,
+ id: peerID,
+ r: r,
+ status: status,
+ picker: picker,
+ msgAppV2Writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
+ writer: startStreamWriter(t.Logger, t.ID, peerID, status, fs, r),
+ pipeline: pipeline,
+ snapSender: newSnapshotSender(t, picker, peerID, status),
+ recvc: make(chan raftpb.Message, recvBufSize),
+ propc: make(chan raftpb.Message, maxPendingProposals),
+ stopc: make(chan struct{}),
+ }
+
+ ctx, cancel := context.WithCancel(context.Background())
+ p.cancel = cancel
+ go func() {
+ for {
+ select {
+ case mm := <-p.recvc:
+ if err := r.Process(ctx, mm); err != nil {
+ if t.Logger != nil {
+ t.Logger.Warn("failed to process Raft message", zap.Error(err))
+ } else {
+ plog.Warningf("failed to process raft message (%v)", err)
+ }
+ }
+ case <-p.stopc:
+ return
+ }
+ }
+ }()
+
+ // r.Process might block for processing proposal when there is no leader.
+ // Thus propc must be put into a separate routine with recvc to avoid blocking
+ // processing other raft messages.
+ go func() {
+ for {
+ select {
+ case mm := <-p.propc:
+ if err := r.Process(ctx, mm); err != nil {
+ plog.Warningf("failed to process raft message (%v)", err)
+ }
+ case <-p.stopc:
+ return
+ }
+ }
+ }()
+
+ p.msgAppV2Reader = &streamReader{
+ lg: t.Logger,
+ peerID: peerID,
+ typ: streamTypeMsgAppV2,
+ tr: t,
+ picker: picker,
+ status: status,
+ recvc: p.recvc,
+ propc: p.propc,
+ rl: rate.NewLimiter(t.DialRetryFrequency, 1),
+ }
+ p.msgAppReader = &streamReader{
+ lg: t.Logger,
+ peerID: peerID,
+ typ: streamTypeMessage,
+ tr: t,
+ picker: picker,
+ status: status,
+ recvc: p.recvc,
+ propc: p.propc,
+ rl: rate.NewLimiter(t.DialRetryFrequency, 1),
+ }
+
+ p.msgAppV2Reader.start()
+ p.msgAppReader.start()
+
+ return p
+}
+
+func (p *peer) send(m raftpb.Message) {
+ p.mu.Lock()
+ paused := p.paused
+ p.mu.Unlock()
+
+ if paused {
+ return
+ }
+
+ writec, name := p.pick(m)
+ select {
+ case writec <- m:
+ default:
+ p.r.ReportUnreachable(m.To)
+ if isMsgSnap(m) {
+ p.r.ReportSnapshot(m.To, raft.SnapshotFailure)
+ }
+ if p.status.isActive() {
+ if p.lg != nil {
+ p.lg.Warn(
+ "dropped internal Raft message since sending buffer is full (overloaded network)",
+ zap.String("message-type", m.Type.String()),
+ zap.String("local-member-id", p.localID.String()),
+ zap.String("from", types.ID(m.From).String()),
+ zap.String("remote-peer-id", p.id.String()),
+ zap.Bool("remote-peer-active", p.status.isActive()),
+ )
+ } else {
+ plog.MergeWarningf("dropped internal raft message to %s since %s's sending buffer is full (bad/overloaded network)", p.id, name)
+ }
+ } else {
+ if p.lg != nil {
+ p.lg.Warn(
+ "dropped internal Raft message since sending buffer is full (overloaded network)",
+ zap.String("message-type", m.Type.String()),
+ zap.String("local-member-id", p.localID.String()),
+ zap.String("from", types.ID(m.From).String()),
+ zap.String("remote-peer-id", p.id.String()),
+ zap.Bool("remote-peer-active", p.status.isActive()),
+ )
+ } else {
+ plog.Debugf("dropped %s to %s since %s's sending buffer is full", m.Type, p.id, name)
+ }
+ }
+ sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
+ }
+}
+
+func (p *peer) sendSnap(m snap.Message) {
+ go p.snapSender.send(m)
+}
+
+func (p *peer) update(urls types.URLs) {
+ p.picker.update(urls)
+}
+
+func (p *peer) attachOutgoingConn(conn *outgoingConn) {
+ var ok bool
+ switch conn.t {
+ case streamTypeMsgAppV2:
+ ok = p.msgAppV2Writer.attach(conn)
+ case streamTypeMessage:
+ ok = p.writer.attach(conn)
+ default:
+ if p.lg != nil {
+ p.lg.Panic("unknown stream type", zap.String("type", conn.t.String()))
+ } else {
+ plog.Panicf("unhandled stream type %s", conn.t)
+ }
+ }
+ if !ok {
+ conn.Close()
+ }
+}
+
+func (p *peer) activeSince() time.Time { return p.status.activeSince() }
+
+// Pause pauses the peer. The peer will simply drops all incoming
+// messages without returning an error.
+func (p *peer) Pause() {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ p.paused = true
+ p.msgAppReader.pause()
+ p.msgAppV2Reader.pause()
+}
+
+// Resume resumes a paused peer.
+func (p *peer) Resume() {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ p.paused = false
+ p.msgAppReader.resume()
+ p.msgAppV2Reader.resume()
+}
+
+func (p *peer) stop() {
+ if p.lg != nil {
+ p.lg.Info("stopping remote peer", zap.String("remote-peer-id", p.id.String()))
+ } else {
+ plog.Infof("stopping peer %s...", p.id)
+ }
+
+ defer func() {
+ if p.lg != nil {
+ p.lg.Info("stopped remote peer", zap.String("remote-peer-id", p.id.String()))
+ } else {
+ plog.Infof("stopped peer %s", p.id)
+ }
+ }()
+
+ close(p.stopc)
+ p.cancel()
+ p.msgAppV2Writer.stop()
+ p.writer.stop()
+ p.pipeline.stop()
+ p.snapSender.stop()
+ p.msgAppV2Reader.stop()
+ p.msgAppReader.stop()
+}
+
+// pick picks a chan for sending the given message. The picked chan and the picked chan
+// string name are returned.
+func (p *peer) pick(m raftpb.Message) (writec chan<- raftpb.Message, picked string) {
+ var ok bool
+ // Considering MsgSnap may have a big size, e.g., 1G, and will block
+ // stream for a long time, only use one of the N pipelines to send MsgSnap.
+ if isMsgSnap(m) {
+ return p.pipeline.msgc, pipelineMsg
+ } else if writec, ok = p.msgAppV2Writer.writec(); ok && isMsgApp(m) {
+ return writec, streamAppV2
+ } else if writec, ok = p.writer.writec(); ok {
+ return writec, streamMsg
+ }
+ return p.pipeline.msgc, pipelineMsg
+}
+
+func isMsgApp(m raftpb.Message) bool { return m.Type == raftpb.MsgApp }
+
+func isMsgSnap(m raftpb.Message) bool { return m.Type == raftpb.MsgSnap }
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/peer_status.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/peer_status.go
new file mode 100644
index 0000000..66149ff
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/peer_status.go
@@ -0,0 +1,96 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ "go.etcd.io/etcd/pkg/types"
+
+ "go.uber.org/zap"
+)
+
+type failureType struct {
+ source string
+ action string
+}
+
+type peerStatus struct {
+ lg *zap.Logger
+ local types.ID
+ id types.ID
+ mu sync.Mutex // protect variables below
+ active bool
+ since time.Time
+}
+
+func newPeerStatus(lg *zap.Logger, local, id types.ID) *peerStatus {
+ return &peerStatus{lg: lg, local: local, id: id}
+}
+
+func (s *peerStatus) activate() {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if !s.active {
+ if s.lg != nil {
+ s.lg.Info("peer became active", zap.String("peer-id", s.id.String()))
+ } else {
+ plog.Infof("peer %s became active", s.id)
+ }
+ s.active = true
+ s.since = time.Now()
+
+ activePeers.WithLabelValues(s.local.String(), s.id.String()).Inc()
+ }
+}
+
+func (s *peerStatus) deactivate(failure failureType, reason string) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ msg := fmt.Sprintf("failed to %s %s on %s (%s)", failure.action, s.id, failure.source, reason)
+ if s.active {
+ if s.lg != nil {
+ s.lg.Warn("peer became inactive (message send to peer failed)", zap.String("peer-id", s.id.String()), zap.Error(errors.New(msg)))
+ } else {
+ plog.Errorf(msg)
+ plog.Infof("peer %s became inactive (message send to peer failed)", s.id)
+ }
+ s.active = false
+ s.since = time.Time{}
+
+ activePeers.WithLabelValues(s.local.String(), s.id.String()).Dec()
+ disconnectedPeers.WithLabelValues(s.local.String(), s.id.String()).Inc()
+ return
+ }
+
+ if s.lg != nil {
+ s.lg.Debug("peer deactivated again", zap.String("peer-id", s.id.String()), zap.Error(errors.New(msg)))
+ }
+}
+
+func (s *peerStatus) isActive() bool {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ return s.active
+}
+
+func (s *peerStatus) activeSince() time.Time {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ return s.since
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/pipeline.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/pipeline.go
new file mode 100644
index 0000000..70f9257
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/pipeline.go
@@ -0,0 +1,180 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "io/ioutil"
+ "sync"
+ "time"
+
+ stats "go.etcd.io/etcd/etcdserver/api/v2stats"
+ "go.etcd.io/etcd/pkg/pbutil"
+ "go.etcd.io/etcd/pkg/types"
+ "go.etcd.io/etcd/raft"
+ "go.etcd.io/etcd/raft/raftpb"
+
+ "go.uber.org/zap"
+)
+
+const (
+ connPerPipeline = 4
+ // pipelineBufSize is the size of pipeline buffer, which helps hold the
+ // temporary network latency.
+ // The size ensures that pipeline does not drop messages when the network
+ // is out of work for less than 1 second in good path.
+ pipelineBufSize = 64
+)
+
+var errStopped = errors.New("stopped")
+
+type pipeline struct {
+ peerID types.ID
+
+ tr *Transport
+ picker *urlPicker
+ status *peerStatus
+ raft Raft
+ errorc chan error
+ // deprecate when we depercate v2 API
+ followerStats *stats.FollowerStats
+
+ msgc chan raftpb.Message
+ // wait for the handling routines
+ wg sync.WaitGroup
+ stopc chan struct{}
+}
+
+func (p *pipeline) start() {
+ p.stopc = make(chan struct{})
+ p.msgc = make(chan raftpb.Message, pipelineBufSize)
+ p.wg.Add(connPerPipeline)
+ for i := 0; i < connPerPipeline; i++ {
+ go p.handle()
+ }
+
+ if p.tr != nil && p.tr.Logger != nil {
+ p.tr.Logger.Info(
+ "started HTTP pipelining with remote peer",
+ zap.String("local-member-id", p.tr.ID.String()),
+ zap.String("remote-peer-id", p.peerID.String()),
+ )
+ } else {
+ plog.Infof("started HTTP pipelining with peer %s", p.peerID)
+ }
+}
+
+func (p *pipeline) stop() {
+ close(p.stopc)
+ p.wg.Wait()
+
+ if p.tr != nil && p.tr.Logger != nil {
+ p.tr.Logger.Info(
+ "stopped HTTP pipelining with remote peer",
+ zap.String("local-member-id", p.tr.ID.String()),
+ zap.String("remote-peer-id", p.peerID.String()),
+ )
+ } else {
+ plog.Infof("stopped HTTP pipelining with peer %s", p.peerID)
+ }
+}
+
+func (p *pipeline) handle() {
+ defer p.wg.Done()
+
+ for {
+ select {
+ case m := <-p.msgc:
+ start := time.Now()
+ err := p.post(pbutil.MustMarshal(&m))
+ end := time.Now()
+
+ if err != nil {
+ p.status.deactivate(failureType{source: pipelineMsg, action: "write"}, err.Error())
+
+ if m.Type == raftpb.MsgApp && p.followerStats != nil {
+ p.followerStats.Fail()
+ }
+ p.raft.ReportUnreachable(m.To)
+ if isMsgSnap(m) {
+ p.raft.ReportSnapshot(m.To, raft.SnapshotFailure)
+ }
+ sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
+ continue
+ }
+
+ p.status.activate()
+ if m.Type == raftpb.MsgApp && p.followerStats != nil {
+ p.followerStats.Succ(end.Sub(start))
+ }
+ if isMsgSnap(m) {
+ p.raft.ReportSnapshot(m.To, raft.SnapshotFinish)
+ }
+ sentBytes.WithLabelValues(types.ID(m.To).String()).Add(float64(m.Size()))
+ case <-p.stopc:
+ return
+ }
+ }
+}
+
+// post POSTs a data payload to a url. Returns nil if the POST succeeds,
+// error on any failure.
+func (p *pipeline) post(data []byte) (err error) {
+ u := p.picker.pick()
+ req := createPostRequest(u, RaftPrefix, bytes.NewBuffer(data), "application/protobuf", p.tr.URLs, p.tr.ID, p.tr.ClusterID)
+
+ done := make(chan struct{}, 1)
+ ctx, cancel := context.WithCancel(context.Background())
+ req = req.WithContext(ctx)
+ go func() {
+ select {
+ case <-done:
+ case <-p.stopc:
+ waitSchedule()
+ cancel()
+ }
+ }()
+
+ resp, err := p.tr.pipelineRt.RoundTrip(req)
+ done <- struct{}{}
+ if err != nil {
+ p.picker.unreachable(u)
+ return err
+ }
+ defer resp.Body.Close()
+ b, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ p.picker.unreachable(u)
+ return err
+ }
+
+ err = checkPostResponse(resp, b, req, p.peerID)
+ if err != nil {
+ p.picker.unreachable(u)
+ // errMemberRemoved is a critical error since a removed member should
+ // always be stopped. So we use reportCriticalError to report it to errorc.
+ if err == errMemberRemoved {
+ reportCriticalError(err, p.errorc)
+ }
+ return err
+ }
+
+ return nil
+}
+
+// waitSchedule waits other goroutines to be scheduled for a while
+func waitSchedule() { time.Sleep(time.Millisecond) }
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/probing_status.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/probing_status.go
new file mode 100644
index 0000000..474d9a0
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/probing_status.go
@@ -0,0 +1,104 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+ "time"
+
+ "github.com/prometheus/client_golang/prometheus"
+ "github.com/xiang90/probing"
+ "go.uber.org/zap"
+)
+
+const (
+ // RoundTripperNameRaftMessage is the name of round-tripper that sends
+ // all other Raft messages, other than "snap.Message".
+ RoundTripperNameRaftMessage = "ROUND_TRIPPER_RAFT_MESSAGE"
+ // RoundTripperNameSnapshot is the name of round-tripper that sends merged snapshot message.
+ RoundTripperNameSnapshot = "ROUND_TRIPPER_SNAPSHOT"
+)
+
+var (
+ // proberInterval must be shorter than read timeout.
+ // Or the connection will time-out.
+ proberInterval = ConnReadTimeout - time.Second
+ statusMonitoringInterval = 30 * time.Second
+ statusErrorInterval = 5 * time.Second
+)
+
+func addPeerToProber(lg *zap.Logger, p probing.Prober, id string, us []string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
+ hus := make([]string, len(us))
+ for i := range us {
+ hus[i] = us[i] + ProbingPrefix
+ }
+
+ p.AddHTTP(id, proberInterval, hus)
+
+ s, err := p.Status(id)
+ if err != nil {
+ if lg != nil {
+ lg.Warn("failed to add peer into prober", zap.String("remote-peer-id", id))
+ } else {
+ plog.Errorf("failed to add peer %s into prober", id)
+ }
+ return
+ }
+
+ go monitorProbingStatus(lg, s, id, roundTripperName, rttSecProm)
+}
+
+func monitorProbingStatus(lg *zap.Logger, s probing.Status, id string, roundTripperName string, rttSecProm *prometheus.HistogramVec) {
+ // set the first interval short to log error early.
+ interval := statusErrorInterval
+ for {
+ select {
+ case <-time.After(interval):
+ if !s.Health() {
+ if lg != nil {
+ lg.Warn(
+ "prober detected unhealthy status",
+ zap.String("round-tripper-name", roundTripperName),
+ zap.String("remote-peer-id", id),
+ zap.Duration("rtt", s.SRTT()),
+ zap.Error(s.Err()),
+ )
+ } else {
+ plog.Warningf("health check for peer %s could not connect: %v", id, s.Err())
+ }
+ interval = statusErrorInterval
+ } else {
+ interval = statusMonitoringInterval
+ }
+ if s.ClockDiff() > time.Second {
+ if lg != nil {
+ lg.Warn(
+ "prober found high clock drift",
+ zap.String("round-tripper-name", roundTripperName),
+ zap.String("remote-peer-id", id),
+ zap.Duration("clock-drift", s.ClockDiff()),
+ zap.Duration("rtt", s.SRTT()),
+ zap.Error(s.Err()),
+ )
+ } else {
+ plog.Warningf("the clock difference against peer %s is too high [%v > %v]", id, s.ClockDiff(), time.Second)
+ }
+ }
+ rttSecProm.WithLabelValues(id).Observe(s.SRTT().Seconds())
+
+ case <-s.StopNotify():
+ return
+ }
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/remote.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/remote.go
new file mode 100644
index 0000000..1ef2493
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/remote.go
@@ -0,0 +1,99 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+ "go.etcd.io/etcd/pkg/types"
+ "go.etcd.io/etcd/raft/raftpb"
+
+ "go.uber.org/zap"
+)
+
+type remote struct {
+ lg *zap.Logger
+ localID types.ID
+ id types.ID
+ status *peerStatus
+ pipeline *pipeline
+}
+
+func startRemote(tr *Transport, urls types.URLs, id types.ID) *remote {
+ picker := newURLPicker(urls)
+ status := newPeerStatus(tr.Logger, tr.ID, id)
+ pipeline := &pipeline{
+ peerID: id,
+ tr: tr,
+ picker: picker,
+ status: status,
+ raft: tr.Raft,
+ errorc: tr.ErrorC,
+ }
+ pipeline.start()
+
+ return &remote{
+ lg: tr.Logger,
+ localID: tr.ID,
+ id: id,
+ status: status,
+ pipeline: pipeline,
+ }
+}
+
+func (g *remote) send(m raftpb.Message) {
+ select {
+ case g.pipeline.msgc <- m:
+ default:
+ if g.status.isActive() {
+ if g.lg != nil {
+ g.lg.Warn(
+ "dropped internal Raft message since sending buffer is full (overloaded network)",
+ zap.String("message-type", m.Type.String()),
+ zap.String("local-member-id", g.localID.String()),
+ zap.String("from", types.ID(m.From).String()),
+ zap.String("remote-peer-id", g.id.String()),
+ zap.Bool("remote-peer-active", g.status.isActive()),
+ )
+ } else {
+ plog.MergeWarningf("dropped internal raft message to %s since sending buffer is full (bad/overloaded network)", g.id)
+ }
+ } else {
+ if g.lg != nil {
+ g.lg.Warn(
+ "dropped Raft message since sending buffer is full (overloaded network)",
+ zap.String("message-type", m.Type.String()),
+ zap.String("local-member-id", g.localID.String()),
+ zap.String("from", types.ID(m.From).String()),
+ zap.String("remote-peer-id", g.id.String()),
+ zap.Bool("remote-peer-active", g.status.isActive()),
+ )
+ } else {
+ plog.Debugf("dropped %s to %s since sending buffer is full", m.Type, g.id)
+ }
+ }
+ sentFailures.WithLabelValues(types.ID(m.To).String()).Inc()
+ }
+}
+
+func (g *remote) stop() {
+ g.pipeline.stop()
+}
+
+func (g *remote) Pause() {
+ g.stop()
+}
+
+func (g *remote) Resume() {
+ g.pipeline.start()
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/snapshot_sender.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/snapshot_sender.go
new file mode 100644
index 0000000..62efb0c
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/snapshot_sender.go
@@ -0,0 +1,207 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+ "bytes"
+ "context"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "time"
+
+ "go.etcd.io/etcd/etcdserver/api/snap"
+ "go.etcd.io/etcd/pkg/httputil"
+ pioutil "go.etcd.io/etcd/pkg/ioutil"
+ "go.etcd.io/etcd/pkg/types"
+ "go.etcd.io/etcd/raft"
+
+ "github.com/dustin/go-humanize"
+ "go.uber.org/zap"
+)
+
+var (
+ // timeout for reading snapshot response body
+ snapResponseReadTimeout = 5 * time.Second
+)
+
+type snapshotSender struct {
+ from, to types.ID
+ cid types.ID
+
+ tr *Transport
+ picker *urlPicker
+ status *peerStatus
+ r Raft
+ errorc chan error
+
+ stopc chan struct{}
+}
+
+func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *peerStatus) *snapshotSender {
+ return &snapshotSender{
+ from: tr.ID,
+ to: to,
+ cid: tr.ClusterID,
+ tr: tr,
+ picker: picker,
+ status: status,
+ r: tr.Raft,
+ errorc: tr.ErrorC,
+ stopc: make(chan struct{}),
+ }
+}
+
+func (s *snapshotSender) stop() { close(s.stopc) }
+
+func (s *snapshotSender) send(merged snap.Message) {
+ start := time.Now()
+
+ m := merged.Message
+ to := types.ID(m.To).String()
+
+ body := createSnapBody(s.tr.Logger, merged)
+ defer body.Close()
+
+ u := s.picker.pick()
+ req := createPostRequest(u, RaftSnapshotPrefix, body, "application/octet-stream", s.tr.URLs, s.from, s.cid)
+
+ if s.tr.Logger != nil {
+ s.tr.Logger.Info(
+ "sending database snapshot",
+ zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
+ zap.String("remote-peer-id", to),
+ zap.Int64("bytes", merged.TotalSize),
+ zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
+ )
+ } else {
+ plog.Infof("start to send database snapshot [index: %d, to %s]...", m.Snapshot.Metadata.Index, types.ID(m.To))
+ }
+
+ snapshotSendInflights.WithLabelValues(to).Inc()
+ defer func() {
+ snapshotSendInflights.WithLabelValues(to).Dec()
+ }()
+
+ err := s.post(req)
+ defer merged.CloseWithError(err)
+ if err != nil {
+ if s.tr.Logger != nil {
+ s.tr.Logger.Warn(
+ "failed to send database snapshot",
+ zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
+ zap.String("remote-peer-id", to),
+ zap.Int64("bytes", merged.TotalSize),
+ zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
+ zap.Error(err),
+ )
+ } else {
+ plog.Warningf("database snapshot [index: %d, to: %s] failed to be sent out (%v)", m.Snapshot.Metadata.Index, types.ID(m.To), err)
+ }
+
+ // errMemberRemoved is a critical error since a removed member should
+ // always be stopped. So we use reportCriticalError to report it to errorc.
+ if err == errMemberRemoved {
+ reportCriticalError(err, s.errorc)
+ }
+
+ s.picker.unreachable(u)
+ s.status.deactivate(failureType{source: sendSnap, action: "post"}, err.Error())
+ s.r.ReportUnreachable(m.To)
+ // report SnapshotFailure to raft state machine. After raft state
+ // machine knows about it, it would pause a while and retry sending
+ // new snapshot message.
+ s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
+ sentFailures.WithLabelValues(to).Inc()
+ snapshotSendFailures.WithLabelValues(to).Inc()
+ return
+ }
+ s.status.activate()
+ s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
+
+ if s.tr.Logger != nil {
+ s.tr.Logger.Info(
+ "sent database snapshot",
+ zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
+ zap.String("remote-peer-id", to),
+ zap.Int64("bytes", merged.TotalSize),
+ zap.String("size", humanize.Bytes(uint64(merged.TotalSize))),
+ )
+ } else {
+ plog.Infof("database snapshot [index: %d, to: %s] sent out successfully", m.Snapshot.Metadata.Index, types.ID(m.To))
+ }
+
+ sentBytes.WithLabelValues(to).Add(float64(merged.TotalSize))
+ snapshotSend.WithLabelValues(to).Inc()
+ snapshotSendSeconds.WithLabelValues(to).Observe(time.Since(start).Seconds())
+}
+
+// post posts the given request.
+// It returns nil when request is sent out and processed successfully.
+func (s *snapshotSender) post(req *http.Request) (err error) {
+ ctx, cancel := context.WithCancel(context.Background())
+ req = req.WithContext(ctx)
+ defer cancel()
+
+ type responseAndError struct {
+ resp *http.Response
+ body []byte
+ err error
+ }
+ result := make(chan responseAndError, 1)
+
+ go func() {
+ resp, err := s.tr.pipelineRt.RoundTrip(req)
+ if err != nil {
+ result <- responseAndError{resp, nil, err}
+ return
+ }
+
+ // close the response body when timeouts.
+ // prevents from reading the body forever when the other side dies right after
+ // successfully receives the request body.
+ time.AfterFunc(snapResponseReadTimeout, func() { httputil.GracefulClose(resp) })
+ body, err := ioutil.ReadAll(resp.Body)
+ result <- responseAndError{resp, body, err}
+ }()
+
+ select {
+ case <-s.stopc:
+ return errStopped
+ case r := <-result:
+ if r.err != nil {
+ return r.err
+ }
+ return checkPostResponse(r.resp, r.body, req, s.to)
+ }
+}
+
+func createSnapBody(lg *zap.Logger, merged snap.Message) io.ReadCloser {
+ buf := new(bytes.Buffer)
+ enc := &messageEncoder{w: buf}
+ // encode raft message
+ if err := enc.encode(&merged.Message); err != nil {
+ if lg != nil {
+ lg.Panic("failed to encode message", zap.Error(err))
+ } else {
+ plog.Panicf("encode message error (%v)", err)
+ }
+ }
+
+ return &pioutil.ReaderAndCloser{
+ Reader: io.MultiReader(buf, merged.ReadCloser),
+ Closer: merged.ReadCloser,
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/stream.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/stream.go
new file mode 100644
index 0000000..dcb2223
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/stream.go
@@ -0,0 +1,746 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+ "context"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "net/http"
+ "path"
+ "strings"
+ "sync"
+ "time"
+
+ stats "go.etcd.io/etcd/etcdserver/api/v2stats"
+ "go.etcd.io/etcd/pkg/httputil"
+ "go.etcd.io/etcd/pkg/transport"
+ "go.etcd.io/etcd/pkg/types"
+ "go.etcd.io/etcd/raft/raftpb"
+ "go.etcd.io/etcd/version"
+
+ "github.com/coreos/go-semver/semver"
+ "go.uber.org/zap"
+ "golang.org/x/time/rate"
+)
+
+const (
+ streamTypeMessage streamType = "message"
+ streamTypeMsgAppV2 streamType = "msgappv2"
+
+ streamBufSize = 4096
+)
+
+var (
+ errUnsupportedStreamType = fmt.Errorf("unsupported stream type")
+
+ // the key is in string format "major.minor.patch"
+ supportedStream = map[string][]streamType{
+ "2.0.0": {},
+ "2.1.0": {streamTypeMsgAppV2, streamTypeMessage},
+ "2.2.0": {streamTypeMsgAppV2, streamTypeMessage},
+ "2.3.0": {streamTypeMsgAppV2, streamTypeMessage},
+ "3.0.0": {streamTypeMsgAppV2, streamTypeMessage},
+ "3.1.0": {streamTypeMsgAppV2, streamTypeMessage},
+ "3.2.0": {streamTypeMsgAppV2, streamTypeMessage},
+ "3.3.0": {streamTypeMsgAppV2, streamTypeMessage},
+ }
+)
+
+type streamType string
+
+func (t streamType) endpoint() string {
+ switch t {
+ case streamTypeMsgAppV2:
+ return path.Join(RaftStreamPrefix, "msgapp")
+ case streamTypeMessage:
+ return path.Join(RaftStreamPrefix, "message")
+ default:
+ plog.Panicf("unhandled stream type %v", t)
+ return ""
+ }
+}
+
+func (t streamType) String() string {
+ switch t {
+ case streamTypeMsgAppV2:
+ return "stream MsgApp v2"
+ case streamTypeMessage:
+ return "stream Message"
+ default:
+ return "unknown stream"
+ }
+}
+
+var (
+ // linkHeartbeatMessage is a special message used as heartbeat message in
+ // link layer. It never conflicts with messages from raft because raft
+ // doesn't send out messages without From and To fields.
+ linkHeartbeatMessage = raftpb.Message{Type: raftpb.MsgHeartbeat}
+)
+
+func isLinkHeartbeatMessage(m *raftpb.Message) bool {
+ return m.Type == raftpb.MsgHeartbeat && m.From == 0 && m.To == 0
+}
+
+type outgoingConn struct {
+ t streamType
+ io.Writer
+ http.Flusher
+ io.Closer
+
+ localID types.ID
+ peerID types.ID
+}
+
+// streamWriter writes messages to the attached outgoingConn.
+type streamWriter struct {
+ lg *zap.Logger
+
+ localID types.ID
+ peerID types.ID
+
+ status *peerStatus
+ fs *stats.FollowerStats
+ r Raft
+
+ mu sync.Mutex // guard field working and closer
+ closer io.Closer
+ working bool
+
+ msgc chan raftpb.Message
+ connc chan *outgoingConn
+ stopc chan struct{}
+ done chan struct{}
+}
+
+// startStreamWriter creates a streamWrite and starts a long running go-routine that accepts
+// messages and writes to the attached outgoing connection.
+func startStreamWriter(lg *zap.Logger, local, id types.ID, status *peerStatus, fs *stats.FollowerStats, r Raft) *streamWriter {
+ w := &streamWriter{
+ lg: lg,
+
+ localID: local,
+ peerID: id,
+
+ status: status,
+ fs: fs,
+ r: r,
+ msgc: make(chan raftpb.Message, streamBufSize),
+ connc: make(chan *outgoingConn),
+ stopc: make(chan struct{}),
+ done: make(chan struct{}),
+ }
+ go w.run()
+ return w
+}
+
+func (cw *streamWriter) run() {
+ var (
+ msgc chan raftpb.Message
+ heartbeatc <-chan time.Time
+ t streamType
+ enc encoder
+ flusher http.Flusher
+ batched int
+ )
+ tickc := time.NewTicker(ConnReadTimeout / 3)
+ defer tickc.Stop()
+ unflushed := 0
+
+ if cw.lg != nil {
+ cw.lg.Info(
+ "started stream writer with remote peer",
+ zap.String("local-member-id", cw.localID.String()),
+ zap.String("remote-peer-id", cw.peerID.String()),
+ )
+ } else {
+ plog.Infof("started streaming with peer %s (writer)", cw.peerID)
+ }
+
+ for {
+ select {
+ case <-heartbeatc:
+ err := enc.encode(&linkHeartbeatMessage)
+ unflushed += linkHeartbeatMessage.Size()
+ if err == nil {
+ flusher.Flush()
+ batched = 0
+ sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
+ unflushed = 0
+ continue
+ }
+
+ cw.status.deactivate(failureType{source: t.String(), action: "heartbeat"}, err.Error())
+
+ sentFailures.WithLabelValues(cw.peerID.String()).Inc()
+ cw.close()
+ if cw.lg != nil {
+ cw.lg.Warn(
+ "lost TCP streaming connection with remote peer",
+ zap.String("stream-writer-type", t.String()),
+ zap.String("local-member-id", cw.localID.String()),
+ zap.String("remote-peer-id", cw.peerID.String()),
+ )
+ } else {
+ plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
+ }
+ heartbeatc, msgc = nil, nil
+
+ case m := <-msgc:
+ err := enc.encode(&m)
+ if err == nil {
+ unflushed += m.Size()
+
+ if len(msgc) == 0 || batched > streamBufSize/2 {
+ flusher.Flush()
+ sentBytes.WithLabelValues(cw.peerID.String()).Add(float64(unflushed))
+ unflushed = 0
+ batched = 0
+ } else {
+ batched++
+ }
+
+ continue
+ }
+
+ cw.status.deactivate(failureType{source: t.String(), action: "write"}, err.Error())
+ cw.close()
+ if cw.lg != nil {
+ cw.lg.Warn(
+ "lost TCP streaming connection with remote peer",
+ zap.String("stream-writer-type", t.String()),
+ zap.String("local-member-id", cw.localID.String()),
+ zap.String("remote-peer-id", cw.peerID.String()),
+ )
+ } else {
+ plog.Warningf("lost the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
+ }
+ heartbeatc, msgc = nil, nil
+ cw.r.ReportUnreachable(m.To)
+ sentFailures.WithLabelValues(cw.peerID.String()).Inc()
+
+ case conn := <-cw.connc:
+ cw.mu.Lock()
+ closed := cw.closeUnlocked()
+ t = conn.t
+ switch conn.t {
+ case streamTypeMsgAppV2:
+ enc = newMsgAppV2Encoder(conn.Writer, cw.fs)
+ case streamTypeMessage:
+ enc = &messageEncoder{w: conn.Writer}
+ default:
+ plog.Panicf("unhandled stream type %s", conn.t)
+ }
+ if cw.lg != nil {
+ cw.lg.Info(
+ "set message encoder",
+ zap.String("from", conn.localID.String()),
+ zap.String("to", conn.peerID.String()),
+ zap.String("stream-type", t.String()),
+ )
+ }
+ flusher = conn.Flusher
+ unflushed = 0
+ cw.status.activate()
+ cw.closer = conn.Closer
+ cw.working = true
+ cw.mu.Unlock()
+
+ if closed {
+ if cw.lg != nil {
+ cw.lg.Warn(
+ "closed TCP streaming connection with remote peer",
+ zap.String("stream-writer-type", t.String()),
+ zap.String("local-member-id", cw.localID.String()),
+ zap.String("remote-peer-id", cw.peerID.String()),
+ )
+ } else {
+ plog.Warningf("closed an existing TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
+ }
+ }
+ if cw.lg != nil {
+ cw.lg.Warn(
+ "established TCP streaming connection with remote peer",
+ zap.String("stream-writer-type", t.String()),
+ zap.String("local-member-id", cw.localID.String()),
+ zap.String("remote-peer-id", cw.peerID.String()),
+ )
+ } else {
+ plog.Infof("established a TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
+ }
+ heartbeatc, msgc = tickc.C, cw.msgc
+
+ case <-cw.stopc:
+ if cw.close() {
+ if cw.lg != nil {
+ cw.lg.Warn(
+ "closed TCP streaming connection with remote peer",
+ zap.String("stream-writer-type", t.String()),
+ zap.String("remote-peer-id", cw.peerID.String()),
+ )
+ } else {
+ plog.Infof("closed the TCP streaming connection with peer %s (%s writer)", cw.peerID, t)
+ }
+ }
+ if cw.lg != nil {
+ cw.lg.Warn(
+ "stopped TCP streaming connection with remote peer",
+ zap.String("stream-writer-type", t.String()),
+ zap.String("remote-peer-id", cw.peerID.String()),
+ )
+ } else {
+ plog.Infof("stopped streaming with peer %s (writer)", cw.peerID)
+ }
+ close(cw.done)
+ return
+ }
+ }
+}
+
+func (cw *streamWriter) writec() (chan<- raftpb.Message, bool) {
+ cw.mu.Lock()
+ defer cw.mu.Unlock()
+ return cw.msgc, cw.working
+}
+
+func (cw *streamWriter) close() bool {
+ cw.mu.Lock()
+ defer cw.mu.Unlock()
+ return cw.closeUnlocked()
+}
+
+func (cw *streamWriter) closeUnlocked() bool {
+ if !cw.working {
+ return false
+ }
+ if err := cw.closer.Close(); err != nil {
+ if cw.lg != nil {
+ cw.lg.Warn(
+ "failed to close connection with remote peer",
+ zap.String("remote-peer-id", cw.peerID.String()),
+ zap.Error(err),
+ )
+ } else {
+ plog.Errorf("peer %s (writer) connection close error: %v", cw.peerID, err)
+ }
+ }
+ if len(cw.msgc) > 0 {
+ cw.r.ReportUnreachable(uint64(cw.peerID))
+ }
+ cw.msgc = make(chan raftpb.Message, streamBufSize)
+ cw.working = false
+ return true
+}
+
+func (cw *streamWriter) attach(conn *outgoingConn) bool {
+ select {
+ case cw.connc <- conn:
+ return true
+ case <-cw.done:
+ return false
+ }
+}
+
+func (cw *streamWriter) stop() {
+ close(cw.stopc)
+ <-cw.done
+}
+
+// streamReader is a long-running go-routine that dials to the remote stream
+// endpoint and reads messages from the response body returned.
+type streamReader struct {
+ lg *zap.Logger
+
+ peerID types.ID
+ typ streamType
+
+ tr *Transport
+ picker *urlPicker
+ status *peerStatus
+ recvc chan<- raftpb.Message
+ propc chan<- raftpb.Message
+
+ rl *rate.Limiter // alters the frequency of dial retrial attempts
+
+ errorc chan<- error
+
+ mu sync.Mutex
+ paused bool
+ closer io.Closer
+
+ ctx context.Context
+ cancel context.CancelFunc
+ done chan struct{}
+}
+
+func (cr *streamReader) start() {
+ cr.done = make(chan struct{})
+ if cr.errorc == nil {
+ cr.errorc = cr.tr.ErrorC
+ }
+ if cr.ctx == nil {
+ cr.ctx, cr.cancel = context.WithCancel(context.Background())
+ }
+ go cr.run()
+}
+
+func (cr *streamReader) run() {
+ t := cr.typ
+
+ if cr.lg != nil {
+ cr.lg.Info(
+ "started stream reader with remote peer",
+ zap.String("stream-reader-type", t.String()),
+ zap.String("local-member-id", cr.tr.ID.String()),
+ zap.String("remote-peer-id", cr.peerID.String()),
+ )
+ } else {
+ plog.Infof("started streaming with peer %s (%s reader)", cr.peerID, t)
+ }
+
+ for {
+ rc, err := cr.dial(t)
+ if err != nil {
+ if err != errUnsupportedStreamType {
+ cr.status.deactivate(failureType{source: t.String(), action: "dial"}, err.Error())
+ }
+ } else {
+ cr.status.activate()
+ if cr.lg != nil {
+ cr.lg.Info(
+ "established TCP streaming connection with remote peer",
+ zap.String("stream-reader-type", cr.typ.String()),
+ zap.String("local-member-id", cr.tr.ID.String()),
+ zap.String("remote-peer-id", cr.peerID.String()),
+ )
+ } else {
+ plog.Infof("established a TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
+ }
+ err = cr.decodeLoop(rc, t)
+ if cr.lg != nil {
+ cr.lg.Warn(
+ "lost TCP streaming connection with remote peer",
+ zap.String("stream-reader-type", cr.typ.String()),
+ zap.String("local-member-id", cr.tr.ID.String()),
+ zap.String("remote-peer-id", cr.peerID.String()),
+ zap.Error(err),
+ )
+ } else {
+ plog.Warningf("lost the TCP streaming connection with peer %s (%s reader)", cr.peerID, cr.typ)
+ }
+ switch {
+ // all data is read out
+ case err == io.EOF:
+ // connection is closed by the remote
+ case transport.IsClosedConnError(err):
+ default:
+ cr.status.deactivate(failureType{source: t.String(), action: "read"}, err.Error())
+ }
+ }
+ // Wait for a while before new dial attempt
+ err = cr.rl.Wait(cr.ctx)
+ if cr.ctx.Err() != nil {
+ if cr.lg != nil {
+ cr.lg.Info(
+ "stopped stream reader with remote peer",
+ zap.String("stream-reader-type", t.String()),
+ zap.String("local-member-id", cr.tr.ID.String()),
+ zap.String("remote-peer-id", cr.peerID.String()),
+ )
+ } else {
+ plog.Infof("stopped streaming with peer %s (%s reader)", cr.peerID, t)
+ }
+ close(cr.done)
+ return
+ }
+ if err != nil {
+ if cr.lg != nil {
+ cr.lg.Warn(
+ "rate limit on stream reader with remote peer",
+ zap.String("stream-reader-type", t.String()),
+ zap.String("local-member-id", cr.tr.ID.String()),
+ zap.String("remote-peer-id", cr.peerID.String()),
+ zap.Error(err),
+ )
+ } else {
+ plog.Errorf("streaming with peer %s (%s reader) rate limiter error: %v", cr.peerID, t, err)
+ }
+ }
+ }
+}
+
+func (cr *streamReader) decodeLoop(rc io.ReadCloser, t streamType) error {
+ var dec decoder
+ cr.mu.Lock()
+ switch t {
+ case streamTypeMsgAppV2:
+ dec = newMsgAppV2Decoder(rc, cr.tr.ID, cr.peerID)
+ case streamTypeMessage:
+ dec = &messageDecoder{r: rc}
+ default:
+ if cr.lg != nil {
+ cr.lg.Panic("unknown stream type", zap.String("type", t.String()))
+ } else {
+ plog.Panicf("unhandled stream type %s", t)
+ }
+ }
+ select {
+ case <-cr.ctx.Done():
+ cr.mu.Unlock()
+ if err := rc.Close(); err != nil {
+ return err
+ }
+ return io.EOF
+ default:
+ cr.closer = rc
+ }
+ cr.mu.Unlock()
+
+ // gofail: labelRaftDropHeartbeat:
+ for {
+ m, err := dec.decode()
+ if err != nil {
+ cr.mu.Lock()
+ cr.close()
+ cr.mu.Unlock()
+ return err
+ }
+
+ // gofail-go: var raftDropHeartbeat struct{}
+ // continue labelRaftDropHeartbeat
+ receivedBytes.WithLabelValues(types.ID(m.From).String()).Add(float64(m.Size()))
+
+ cr.mu.Lock()
+ paused := cr.paused
+ cr.mu.Unlock()
+
+ if paused {
+ continue
+ }
+
+ if isLinkHeartbeatMessage(&m) {
+ // raft is not interested in link layer
+ // heartbeat message, so we should ignore
+ // it.
+ continue
+ }
+
+ recvc := cr.recvc
+ if m.Type == raftpb.MsgProp {
+ recvc = cr.propc
+ }
+
+ select {
+ case recvc <- m:
+ default:
+ if cr.status.isActive() {
+ if cr.lg != nil {
+ cr.lg.Warn(
+ "dropped internal Raft message since receiving buffer is full (overloaded network)",
+ zap.String("message-type", m.Type.String()),
+ zap.String("local-member-id", cr.tr.ID.String()),
+ zap.String("from", types.ID(m.From).String()),
+ zap.String("remote-peer-id", types.ID(m.To).String()),
+ zap.Bool("remote-peer-active", cr.status.isActive()),
+ )
+ } else {
+ plog.MergeWarningf("dropped internal raft message from %s since receiving buffer is full (overloaded network)", types.ID(m.From))
+ }
+ } else {
+ if cr.lg != nil {
+ cr.lg.Warn(
+ "dropped Raft message since receiving buffer is full (overloaded network)",
+ zap.String("message-type", m.Type.String()),
+ zap.String("local-member-id", cr.tr.ID.String()),
+ zap.String("from", types.ID(m.From).String()),
+ zap.String("remote-peer-id", types.ID(m.To).String()),
+ zap.Bool("remote-peer-active", cr.status.isActive()),
+ )
+ } else {
+ plog.Debugf("dropped %s from %s since receiving buffer is full", m.Type, types.ID(m.From))
+ }
+ }
+ recvFailures.WithLabelValues(types.ID(m.From).String()).Inc()
+ }
+ }
+}
+
+func (cr *streamReader) stop() {
+ cr.mu.Lock()
+ cr.cancel()
+ cr.close()
+ cr.mu.Unlock()
+ <-cr.done
+}
+
+func (cr *streamReader) dial(t streamType) (io.ReadCloser, error) {
+ u := cr.picker.pick()
+ uu := u
+ uu.Path = path.Join(t.endpoint(), cr.tr.ID.String())
+
+ if cr.lg != nil {
+ cr.lg.Debug(
+ "dial stream reader",
+ zap.String("from", cr.tr.ID.String()),
+ zap.String("to", cr.peerID.String()),
+ zap.String("address", uu.String()),
+ )
+ }
+ req, err := http.NewRequest("GET", uu.String(), nil)
+ if err != nil {
+ cr.picker.unreachable(u)
+ return nil, fmt.Errorf("failed to make http request to %v (%v)", u, err)
+ }
+ req.Header.Set("X-Server-From", cr.tr.ID.String())
+ req.Header.Set("X-Server-Version", version.Version)
+ req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
+ req.Header.Set("X-Etcd-Cluster-ID", cr.tr.ClusterID.String())
+ req.Header.Set("X-Raft-To", cr.peerID.String())
+
+ setPeerURLsHeader(req, cr.tr.URLs)
+
+ req = req.WithContext(cr.ctx)
+
+ cr.mu.Lock()
+ select {
+ case <-cr.ctx.Done():
+ cr.mu.Unlock()
+ return nil, fmt.Errorf("stream reader is stopped")
+ default:
+ }
+ cr.mu.Unlock()
+
+ resp, err := cr.tr.streamRt.RoundTrip(req)
+ if err != nil {
+ cr.picker.unreachable(u)
+ return nil, err
+ }
+
+ rv := serverVersion(resp.Header)
+ lv := semver.Must(semver.NewVersion(version.Version))
+ if compareMajorMinorVersion(rv, lv) == -1 && !checkStreamSupport(rv, t) {
+ httputil.GracefulClose(resp)
+ cr.picker.unreachable(u)
+ return nil, errUnsupportedStreamType
+ }
+
+ switch resp.StatusCode {
+ case http.StatusGone:
+ httputil.GracefulClose(resp)
+ cr.picker.unreachable(u)
+ reportCriticalError(errMemberRemoved, cr.errorc)
+ return nil, errMemberRemoved
+
+ case http.StatusOK:
+ return resp.Body, nil
+
+ case http.StatusNotFound:
+ httputil.GracefulClose(resp)
+ cr.picker.unreachable(u)
+ return nil, fmt.Errorf("peer %s failed to find local node %s", cr.peerID, cr.tr.ID)
+
+ case http.StatusPreconditionFailed:
+ b, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ cr.picker.unreachable(u)
+ return nil, err
+ }
+ httputil.GracefulClose(resp)
+ cr.picker.unreachable(u)
+
+ switch strings.TrimSuffix(string(b), "\n") {
+ case errIncompatibleVersion.Error():
+ if cr.lg != nil {
+ cr.lg.Warn(
+ "request sent was ignored by remote peer due to server version incompatibility",
+ zap.String("local-member-id", cr.tr.ID.String()),
+ zap.String("remote-peer-id", cr.peerID.String()),
+ zap.Error(errIncompatibleVersion),
+ )
+ } else {
+ plog.Errorf("request sent was ignored by peer %s (server version incompatible)", cr.peerID)
+ }
+ return nil, errIncompatibleVersion
+
+ case errClusterIDMismatch.Error():
+ if cr.lg != nil {
+ cr.lg.Warn(
+ "request sent was ignored by remote peer due to cluster ID mismatch",
+ zap.String("remote-peer-id", cr.peerID.String()),
+ zap.String("remote-peer-cluster-id", resp.Header.Get("X-Etcd-Cluster-ID")),
+ zap.String("local-member-id", cr.tr.ID.String()),
+ zap.String("local-member-cluster-id", cr.tr.ClusterID.String()),
+ zap.Error(errClusterIDMismatch),
+ )
+ } else {
+ plog.Errorf("request sent was ignored (cluster ID mismatch: peer[%s]=%s, local=%s)",
+ cr.peerID, resp.Header.Get("X-Etcd-Cluster-ID"), cr.tr.ClusterID)
+ }
+ return nil, errClusterIDMismatch
+
+ default:
+ return nil, fmt.Errorf("unhandled error %q when precondition failed", string(b))
+ }
+
+ default:
+ httputil.GracefulClose(resp)
+ cr.picker.unreachable(u)
+ return nil, fmt.Errorf("unhandled http status %d", resp.StatusCode)
+ }
+}
+
+func (cr *streamReader) close() {
+ if cr.closer != nil {
+ if err := cr.closer.Close(); err != nil {
+ if cr.lg != nil {
+ cr.lg.Warn(
+ "failed to close remote peer connection",
+ zap.String("local-member-id", cr.tr.ID.String()),
+ zap.String("remote-peer-id", cr.peerID.String()),
+ zap.Error(err),
+ )
+ } else {
+ plog.Errorf("peer %s (reader) connection close error: %v", cr.peerID, err)
+ }
+ }
+ }
+ cr.closer = nil
+}
+
+func (cr *streamReader) pause() {
+ cr.mu.Lock()
+ defer cr.mu.Unlock()
+ cr.paused = true
+}
+
+func (cr *streamReader) resume() {
+ cr.mu.Lock()
+ defer cr.mu.Unlock()
+ cr.paused = false
+}
+
+// checkStreamSupport checks whether the stream type is supported in the
+// given version.
+func checkStreamSupport(v *semver.Version, t streamType) bool {
+ nv := &semver.Version{Major: v.Major, Minor: v.Minor}
+ for _, s := range supportedStream[nv.String()] {
+ if s == t {
+ return true
+ }
+ }
+ return false
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/transport.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/transport.go
new file mode 100644
index 0000000..7191c3d
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/transport.go
@@ -0,0 +1,467 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+ "context"
+ "net/http"
+ "sync"
+ "time"
+
+ "go.etcd.io/etcd/etcdserver/api/snap"
+ stats "go.etcd.io/etcd/etcdserver/api/v2stats"
+ "go.etcd.io/etcd/pkg/logutil"
+ "go.etcd.io/etcd/pkg/transport"
+ "go.etcd.io/etcd/pkg/types"
+ "go.etcd.io/etcd/raft"
+ "go.etcd.io/etcd/raft/raftpb"
+
+ "github.com/coreos/pkg/capnslog"
+ "github.com/xiang90/probing"
+ "go.uber.org/zap"
+ "golang.org/x/time/rate"
+)
+
+var plog = logutil.NewMergeLogger(capnslog.NewPackageLogger("go.etcd.io/etcd", "rafthttp"))
+
+type Raft interface {
+ Process(ctx context.Context, m raftpb.Message) error
+ IsIDRemoved(id uint64) bool
+ ReportUnreachable(id uint64)
+ ReportSnapshot(id uint64, status raft.SnapshotStatus)
+}
+
+type Transporter interface {
+ // Start starts the given Transporter.
+ // Start MUST be called before calling other functions in the interface.
+ Start() error
+ // Handler returns the HTTP handler of the transporter.
+ // A transporter HTTP handler handles the HTTP requests
+ // from remote peers.
+ // The handler MUST be used to handle RaftPrefix(/raft)
+ // endpoint.
+ Handler() http.Handler
+ // Send sends out the given messages to the remote peers.
+ // Each message has a To field, which is an id that maps
+ // to an existing peer in the transport.
+ // If the id cannot be found in the transport, the message
+ // will be ignored.
+ Send(m []raftpb.Message)
+ // SendSnapshot sends out the given snapshot message to a remote peer.
+ // The behavior of SendSnapshot is similar to Send.
+ SendSnapshot(m snap.Message)
+ // AddRemote adds a remote with given peer urls into the transport.
+ // A remote helps newly joined member to catch up the progress of cluster,
+ // and will not be used after that.
+ // It is the caller's responsibility to ensure the urls are all valid,
+ // or it panics.
+ AddRemote(id types.ID, urls []string)
+ // AddPeer adds a peer with given peer urls into the transport.
+ // It is the caller's responsibility to ensure the urls are all valid,
+ // or it panics.
+ // Peer urls are used to connect to the remote peer.
+ AddPeer(id types.ID, urls []string)
+ // RemovePeer removes the peer with given id.
+ RemovePeer(id types.ID)
+ // RemoveAllPeers removes all the existing peers in the transport.
+ RemoveAllPeers()
+ // UpdatePeer updates the peer urls of the peer with the given id.
+ // It is the caller's responsibility to ensure the urls are all valid,
+ // or it panics.
+ UpdatePeer(id types.ID, urls []string)
+ // ActiveSince returns the time that the connection with the peer
+ // of the given id becomes active.
+ // If the connection is active since peer was added, it returns the adding time.
+ // If the connection is currently inactive, it returns zero time.
+ ActiveSince(id types.ID) time.Time
+ // ActivePeers returns the number of active peers.
+ ActivePeers() int
+ // Stop closes the connections and stops the transporter.
+ Stop()
+}
+
+// Transport implements Transporter interface. It provides the functionality
+// to send raft messages to peers, and receive raft messages from peers.
+// User should call Handler method to get a handler to serve requests
+// received from peerURLs.
+// User needs to call Start before calling other functions, and call
+// Stop when the Transport is no longer used.
+type Transport struct {
+ Logger *zap.Logger
+
+ DialTimeout time.Duration // maximum duration before timing out dial of the request
+ // DialRetryFrequency defines the frequency of streamReader dial retrial attempts;
+ // a distinct rate limiter is created per every peer (default value: 10 events/sec)
+ DialRetryFrequency rate.Limit
+
+ TLSInfo transport.TLSInfo // TLS information used when creating connection
+
+ ID types.ID // local member ID
+ URLs types.URLs // local peer URLs
+ ClusterID types.ID // raft cluster ID for request validation
+ Raft Raft // raft state machine, to which the Transport forwards received messages and reports status
+ Snapshotter *snap.Snapshotter
+ ServerStats *stats.ServerStats // used to record general transportation statistics
+ // used to record transportation statistics with followers when
+ // performing as leader in raft protocol
+ LeaderStats *stats.LeaderStats
+ // ErrorC is used to report detected critical errors, e.g.,
+ // the member has been permanently removed from the cluster
+ // When an error is received from ErrorC, user should stop raft state
+ // machine and thus stop the Transport.
+ ErrorC chan error
+
+ streamRt http.RoundTripper // roundTripper used by streams
+ pipelineRt http.RoundTripper // roundTripper used by pipelines
+
+ mu sync.RWMutex // protect the remote and peer map
+ remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up
+ peers map[types.ID]Peer // peers map
+
+ pipelineProber probing.Prober
+ streamProber probing.Prober
+}
+
+func (t *Transport) Start() error {
+ var err error
+ t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout)
+ if err != nil {
+ return err
+ }
+ t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout)
+ if err != nil {
+ return err
+ }
+ t.remotes = make(map[types.ID]*remote)
+ t.peers = make(map[types.ID]Peer)
+ t.pipelineProber = probing.NewProber(t.pipelineRt)
+ t.streamProber = probing.NewProber(t.streamRt)
+
+ // If client didn't provide dial retry frequency, use the default
+ // (100ms backoff between attempts to create a new stream),
+ // so it doesn't bring too much overhead when retry.
+ if t.DialRetryFrequency == 0 {
+ t.DialRetryFrequency = rate.Every(100 * time.Millisecond)
+ }
+ return nil
+}
+
+func (t *Transport) Handler() http.Handler {
+ pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID)
+ streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID)
+ snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID)
+ mux := http.NewServeMux()
+ mux.Handle(RaftPrefix, pipelineHandler)
+ mux.Handle(RaftStreamPrefix+"/", streamHandler)
+ mux.Handle(RaftSnapshotPrefix, snapHandler)
+ mux.Handle(ProbingPrefix, probing.NewHandler())
+ return mux
+}
+
+func (t *Transport) Get(id types.ID) Peer {
+ t.mu.RLock()
+ defer t.mu.RUnlock()
+ return t.peers[id]
+}
+
+func (t *Transport) Send(msgs []raftpb.Message) {
+ for _, m := range msgs {
+ if m.To == 0 {
+ // ignore intentionally dropped message
+ continue
+ }
+ to := types.ID(m.To)
+
+ t.mu.RLock()
+ p, pok := t.peers[to]
+ g, rok := t.remotes[to]
+ t.mu.RUnlock()
+
+ if pok {
+ if m.Type == raftpb.MsgApp {
+ t.ServerStats.SendAppendReq(m.Size())
+ }
+ p.send(m)
+ continue
+ }
+
+ if rok {
+ g.send(m)
+ continue
+ }
+
+ if t.Logger != nil {
+ t.Logger.Debug(
+ "ignored message send request; unknown remote peer target",
+ zap.String("type", m.Type.String()),
+ zap.String("unknown-target-peer-id", to.String()),
+ )
+ } else {
+ plog.Debugf("ignored message %s (sent to unknown peer %s)", m.Type, to)
+ }
+ }
+}
+
+func (t *Transport) Stop() {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+ for _, r := range t.remotes {
+ r.stop()
+ }
+ for _, p := range t.peers {
+ p.stop()
+ }
+ t.pipelineProber.RemoveAll()
+ t.streamProber.RemoveAll()
+ if tr, ok := t.streamRt.(*http.Transport); ok {
+ tr.CloseIdleConnections()
+ }
+ if tr, ok := t.pipelineRt.(*http.Transport); ok {
+ tr.CloseIdleConnections()
+ }
+ t.peers = nil
+ t.remotes = nil
+}
+
+// CutPeer drops messages to the specified peer.
+func (t *Transport) CutPeer(id types.ID) {
+ t.mu.RLock()
+ p, pok := t.peers[id]
+ g, gok := t.remotes[id]
+ t.mu.RUnlock()
+
+ if pok {
+ p.(Pausable).Pause()
+ }
+ if gok {
+ g.Pause()
+ }
+}
+
+// MendPeer recovers the message dropping behavior of the given peer.
+func (t *Transport) MendPeer(id types.ID) {
+ t.mu.RLock()
+ p, pok := t.peers[id]
+ g, gok := t.remotes[id]
+ t.mu.RUnlock()
+
+ if pok {
+ p.(Pausable).Resume()
+ }
+ if gok {
+ g.Resume()
+ }
+}
+
+func (t *Transport) AddRemote(id types.ID, us []string) {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+ if t.remotes == nil {
+ // there's no clean way to shutdown the golang http server
+ // (see: https://github.com/golang/go/issues/4674) before
+ // stopping the transport; ignore any new connections.
+ return
+ }
+ if _, ok := t.peers[id]; ok {
+ return
+ }
+ if _, ok := t.remotes[id]; ok {
+ return
+ }
+ urls, err := types.NewURLs(us)
+ if err != nil {
+ if t.Logger != nil {
+ t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err))
+ } else {
+ plog.Panicf("newURLs %+v should never fail: %+v", us, err)
+ }
+ }
+ t.remotes[id] = startRemote(t, urls, id)
+
+ if t.Logger != nil {
+ t.Logger.Info(
+ "added new remote peer",
+ zap.String("local-member-id", t.ID.String()),
+ zap.String("remote-peer-id", id.String()),
+ zap.Strings("remote-peer-urls", us),
+ )
+ }
+}
+
+func (t *Transport) AddPeer(id types.ID, us []string) {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+
+ if t.peers == nil {
+ panic("transport stopped")
+ }
+ if _, ok := t.peers[id]; ok {
+ return
+ }
+ urls, err := types.NewURLs(us)
+ if err != nil {
+ if t.Logger != nil {
+ t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err))
+ } else {
+ plog.Panicf("newURLs %+v should never fail: %+v", us, err)
+ }
+ }
+ fs := t.LeaderStats.Follower(id.String())
+ t.peers[id] = startPeer(t, urls, id, fs)
+ addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec)
+ addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec)
+
+ if t.Logger != nil {
+ t.Logger.Info(
+ "added remote peer",
+ zap.String("local-member-id", t.ID.String()),
+ zap.String("remote-peer-id", id.String()),
+ zap.Strings("remote-peer-urls", us),
+ )
+ } else {
+ plog.Infof("added peer %s", id)
+ }
+}
+
+func (t *Transport) RemovePeer(id types.ID) {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+ t.removePeer(id)
+}
+
+func (t *Transport) RemoveAllPeers() {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+ for id := range t.peers {
+ t.removePeer(id)
+ }
+}
+
+// the caller of this function must have the peers mutex.
+func (t *Transport) removePeer(id types.ID) {
+ if peer, ok := t.peers[id]; ok {
+ peer.stop()
+ } else {
+ if t.Logger != nil {
+ t.Logger.Panic("unexpected removal of unknown remote peer", zap.String("remote-peer-id", id.String()))
+ } else {
+ plog.Panicf("unexpected removal of unknown peer '%d'", id)
+ }
+ }
+ delete(t.peers, id)
+ delete(t.LeaderStats.Followers, id.String())
+ t.pipelineProber.Remove(id.String())
+ t.streamProber.Remove(id.String())
+
+ if t.Logger != nil {
+ t.Logger.Info(
+ "removed remote peer",
+ zap.String("local-member-id", t.ID.String()),
+ zap.String("removed-remote-peer-id", id.String()),
+ )
+ } else {
+ plog.Infof("removed peer %s", id)
+ }
+}
+
+func (t *Transport) UpdatePeer(id types.ID, us []string) {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+ // TODO: return error or just panic?
+ if _, ok := t.peers[id]; !ok {
+ return
+ }
+ urls, err := types.NewURLs(us)
+ if err != nil {
+ if t.Logger != nil {
+ t.Logger.Panic("failed NewURLs", zap.Strings("urls", us), zap.Error(err))
+ } else {
+ plog.Panicf("newURLs %+v should never fail: %+v", us, err)
+ }
+ }
+ t.peers[id].update(urls)
+
+ t.pipelineProber.Remove(id.String())
+ addPeerToProber(t.Logger, t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rttSec)
+ t.streamProber.Remove(id.String())
+ addPeerToProber(t.Logger, t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rttSec)
+
+ if t.Logger != nil {
+ t.Logger.Info(
+ "updated remote peer",
+ zap.String("local-member-id", t.ID.String()),
+ zap.String("updated-remote-peer-id", id.String()),
+ zap.Strings("updated-remote-peer-urls", us),
+ )
+ } else {
+ plog.Infof("updated peer %s", id)
+ }
+}
+
+func (t *Transport) ActiveSince(id types.ID) time.Time {
+ t.mu.RLock()
+ defer t.mu.RUnlock()
+ if p, ok := t.peers[id]; ok {
+ return p.activeSince()
+ }
+ return time.Time{}
+}
+
+func (t *Transport) SendSnapshot(m snap.Message) {
+ t.mu.Lock()
+ defer t.mu.Unlock()
+ p := t.peers[types.ID(m.To)]
+ if p == nil {
+ m.CloseWithError(errMemberNotFound)
+ return
+ }
+ p.sendSnap(m)
+}
+
+// Pausable is a testing interface for pausing transport traffic.
+type Pausable interface {
+ Pause()
+ Resume()
+}
+
+func (t *Transport) Pause() {
+ t.mu.RLock()
+ defer t.mu.RUnlock()
+ for _, p := range t.peers {
+ p.(Pausable).Pause()
+ }
+}
+
+func (t *Transport) Resume() {
+ t.mu.RLock()
+ defer t.mu.RUnlock()
+ for _, p := range t.peers {
+ p.(Pausable).Resume()
+ }
+}
+
+// ActivePeers returns a channel that closes when an initial
+// peer connection has been established. Use this to wait until the
+// first peer connection becomes active.
+func (t *Transport) ActivePeers() (cnt int) {
+ t.mu.RLock()
+ defer t.mu.RUnlock()
+ for _, p := range t.peers {
+ if !p.activeSince().IsZero() {
+ cnt++
+ }
+ }
+ return cnt
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/urlpick.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/urlpick.go
new file mode 100644
index 0000000..61ef468
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/urlpick.go
@@ -0,0 +1,57 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+ "net/url"
+ "sync"
+
+ "go.etcd.io/etcd/pkg/types"
+)
+
+type urlPicker struct {
+ mu sync.Mutex // guards urls and picked
+ urls types.URLs
+ picked int
+}
+
+func newURLPicker(urls types.URLs) *urlPicker {
+ return &urlPicker{
+ urls: urls,
+ }
+}
+
+func (p *urlPicker) update(urls types.URLs) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ p.urls = urls
+ p.picked = 0
+}
+
+func (p *urlPicker) pick() url.URL {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ return p.urls[p.picked]
+}
+
+// unreachable notices the picker that the given url is unreachable,
+// and it should use other possible urls.
+func (p *urlPicker) unreachable(u url.URL) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if u == p.urls[p.picked] {
+ p.picked = (p.picked + 1) % len(p.urls)
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/util.go b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/util.go
new file mode 100644
index 0000000..2093864
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/etcdserver/api/rafthttp/util.go
@@ -0,0 +1,190 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package rafthttp
+
+import (
+ "fmt"
+ "io"
+ "net"
+ "net/http"
+ "net/url"
+ "strings"
+ "time"
+
+ "go.etcd.io/etcd/pkg/transport"
+ "go.etcd.io/etcd/pkg/types"
+ "go.etcd.io/etcd/version"
+
+ "github.com/coreos/go-semver/semver"
+)
+
+var (
+ errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster")
+ errMemberNotFound = fmt.Errorf("member not found")
+)
+
+// NewListener returns a listener for raft message transfer between peers.
+// It uses timeout listener to identify broken streams promptly.
+func NewListener(u url.URL, tlsinfo *transport.TLSInfo) (net.Listener, error) {
+ return transport.NewTimeoutListener(u.Host, u.Scheme, tlsinfo, ConnReadTimeout, ConnWriteTimeout)
+}
+
+// NewRoundTripper returns a roundTripper used to send requests
+// to rafthttp listener of remote peers.
+func NewRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
+ // It uses timeout transport to pair with remote timeout listeners.
+ // It sets no read/write timeout, because message in requests may
+ // take long time to write out before reading out the response.
+ return transport.NewTimeoutTransport(tlsInfo, dialTimeout, 0, 0)
+}
+
+// newStreamRoundTripper returns a roundTripper used to send stream requests
+// to rafthttp listener of remote peers.
+// Read/write timeout is set for stream roundTripper to promptly
+// find out broken status, which minimizes the number of messages
+// sent on broken connection.
+func newStreamRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
+ return transport.NewTimeoutTransport(tlsInfo, dialTimeout, ConnReadTimeout, ConnWriteTimeout)
+}
+
+// createPostRequest creates a HTTP POST request that sends raft message.
+func createPostRequest(u url.URL, path string, body io.Reader, ct string, urls types.URLs, from, cid types.ID) *http.Request {
+ uu := u
+ uu.Path = path
+ req, err := http.NewRequest("POST", uu.String(), body)
+ if err != nil {
+ plog.Panicf("unexpected new request error (%v)", err)
+ }
+ req.Header.Set("Content-Type", ct)
+ req.Header.Set("X-Server-From", from.String())
+ req.Header.Set("X-Server-Version", version.Version)
+ req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
+ req.Header.Set("X-Etcd-Cluster-ID", cid.String())
+ setPeerURLsHeader(req, urls)
+
+ return req
+}
+
+// checkPostResponse checks the response of the HTTP POST request that sends
+// raft message.
+func checkPostResponse(resp *http.Response, body []byte, req *http.Request, to types.ID) error {
+ switch resp.StatusCode {
+ case http.StatusPreconditionFailed:
+ switch strings.TrimSuffix(string(body), "\n") {
+ case errIncompatibleVersion.Error():
+ plog.Errorf("request sent was ignored by peer %s (server version incompatible)", to)
+ return errIncompatibleVersion
+ case errClusterIDMismatch.Error():
+ plog.Errorf("request sent was ignored (cluster ID mismatch: remote[%s]=%s, local=%s)",
+ to, resp.Header.Get("X-Etcd-Cluster-ID"), req.Header.Get("X-Etcd-Cluster-ID"))
+ return errClusterIDMismatch
+ default:
+ return fmt.Errorf("unhandled error %q when precondition failed", string(body))
+ }
+ case http.StatusForbidden:
+ return errMemberRemoved
+ case http.StatusNoContent:
+ return nil
+ default:
+ return fmt.Errorf("unexpected http status %s while posting to %q", http.StatusText(resp.StatusCode), req.URL.String())
+ }
+}
+
+// reportCriticalError reports the given error through sending it into
+// the given error channel.
+// If the error channel is filled up when sending error, it drops the error
+// because the fact that error has happened is reported, which is
+// good enough.
+func reportCriticalError(err error, errc chan<- error) {
+ select {
+ case errc <- err:
+ default:
+ }
+}
+
+// compareMajorMinorVersion returns an integer comparing two versions based on
+// their major and minor version. The result will be 0 if a==b, -1 if a < b,
+// and 1 if a > b.
+func compareMajorMinorVersion(a, b *semver.Version) int {
+ na := &semver.Version{Major: a.Major, Minor: a.Minor}
+ nb := &semver.Version{Major: b.Major, Minor: b.Minor}
+ switch {
+ case na.LessThan(*nb):
+ return -1
+ case nb.LessThan(*na):
+ return 1
+ default:
+ return 0
+ }
+}
+
+// serverVersion returns the server version from the given header.
+func serverVersion(h http.Header) *semver.Version {
+ verStr := h.Get("X-Server-Version")
+ // backward compatibility with etcd 2.0
+ if verStr == "" {
+ verStr = "2.0.0"
+ }
+ return semver.Must(semver.NewVersion(verStr))
+}
+
+// serverVersion returns the min cluster version from the given header.
+func minClusterVersion(h http.Header) *semver.Version {
+ verStr := h.Get("X-Min-Cluster-Version")
+ // backward compatibility with etcd 2.0
+ if verStr == "" {
+ verStr = "2.0.0"
+ }
+ return semver.Must(semver.NewVersion(verStr))
+}
+
+// checkVersionCompatibility checks whether the given version is compatible
+// with the local version.
+func checkVersionCompatibility(name string, server, minCluster *semver.Version) (
+ localServer *semver.Version,
+ localMinCluster *semver.Version,
+ err error) {
+ localServer = semver.Must(semver.NewVersion(version.Version))
+ localMinCluster = semver.Must(semver.NewVersion(version.MinClusterVersion))
+ if compareMajorMinorVersion(server, localMinCluster) == -1 {
+ return localServer, localMinCluster, fmt.Errorf("remote version is too low: remote[%s]=%s, local=%s", name, server, localServer)
+ }
+ if compareMajorMinorVersion(minCluster, localServer) == 1 {
+ return localServer, localMinCluster, fmt.Errorf("local version is too low: remote[%s]=%s, local=%s", name, server, localServer)
+ }
+ return localServer, localMinCluster, nil
+}
+
+// setPeerURLsHeader reports local urls for peer discovery
+func setPeerURLsHeader(req *http.Request, urls types.URLs) {
+ if urls == nil {
+ // often not set in unit tests
+ return
+ }
+ peerURLs := make([]string, urls.Len())
+ for i := range urls {
+ peerURLs[i] = urls[i].String()
+ }
+ req.Header.Set("X-PeerURLs", strings.Join(peerURLs, ","))
+}
+
+// addRemoteFromRequest adds a remote peer according to an http request header
+func addRemoteFromRequest(tr Transporter, r *http.Request) {
+ if from, err := types.IDFromString(r.Header.Get("X-Server-From")); err == nil {
+ if urls := r.Header.Get("X-PeerURLs"); urls != "" {
+ tr.AddRemote(from, strings.Split(urls, ","))
+ }
+ }
+}