[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/coreos/etcd/etcdserver/raft.go b/vendor/github.com/coreos/etcd/etcdserver/raft.go
new file mode 100644
index 0000000..212dccb
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/etcdserver/raft.go
@@ -0,0 +1,626 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package etcdserver
+
+import (
+	"encoding/json"
+	"expvar"
+	"sort"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/etcdserver/membership"
+	"github.com/coreos/etcd/pkg/contention"
+	"github.com/coreos/etcd/pkg/pbutil"
+	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft"
+	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/rafthttp"
+	"github.com/coreos/etcd/wal"
+	"github.com/coreos/etcd/wal/walpb"
+	"github.com/coreos/pkg/capnslog"
+)
+
+const (
+	// Number of entries for slow follower to catch-up after compacting
+	// the raft storage entries.
+	// We expect the follower has a millisecond level latency with the leader.
+	// The max throughput is around 10K. Keep a 5K entries is enough for helping
+	// follower to catch up.
+	numberOfCatchUpEntries = 5000
+
+	// The max throughput of etcd will not exceed 100MB/s (100K * 1KB value).
+	// Assuming the RTT is around 10ms, 1MB max size is large enough.
+	maxSizePerMsg = 1 * 1024 * 1024
+	// Never overflow the rafthttp buffer, which is 4096.
+	// TODO: a better const?
+	maxInflightMsgs = 4096 / 8
+)
+
+var (
+	// protects raftStatus
+	raftStatusMu sync.Mutex
+	// indirection for expvar func interface
+	// expvar panics when publishing duplicate name
+	// expvar does not support remove a registered name
+	// so only register a func that calls raftStatus
+	// and change raftStatus as we need.
+	raftStatus func() raft.Status
+)
+
+func init() {
+	raft.SetLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "raft"))
+	expvar.Publish("raft.status", expvar.Func(func() interface{} {
+		raftStatusMu.Lock()
+		defer raftStatusMu.Unlock()
+		return raftStatus()
+	}))
+}
+
+type RaftTimer interface {
+	Index() uint64
+	Term() uint64
+}
+
+// apply contains entries, snapshot to be applied. Once
+// an apply is consumed, the entries will be persisted to
+// to raft storage concurrently; the application must read
+// raftDone before assuming the raft messages are stable.
+type apply struct {
+	entries  []raftpb.Entry
+	snapshot raftpb.Snapshot
+	// notifyc synchronizes etcd server applies with the raft node
+	notifyc chan struct{}
+}
+
+type raftNode struct {
+	// Cache of the latest raft index and raft term the server has seen.
+	// These three unit64 fields must be the first elements to keep 64-bit
+	// alignment for atomic access to the fields.
+	index uint64
+	term  uint64
+	lead  uint64
+
+	tickMu *sync.Mutex
+	raftNodeConfig
+
+	// a chan to send/receive snapshot
+	msgSnapC chan raftpb.Message
+
+	// a chan to send out apply
+	applyc chan apply
+
+	// a chan to send out readState
+	readStateC chan raft.ReadState
+
+	// utility
+	ticker *time.Ticker
+	// contention detectors for raft heartbeat message
+	td *contention.TimeoutDetector
+
+	stopped chan struct{}
+	done    chan struct{}
+}
+
+type raftNodeConfig struct {
+	// to check if msg receiver is removed from cluster
+	isIDRemoved func(id uint64) bool
+	raft.Node
+	raftStorage *raft.MemoryStorage
+	storage     Storage
+	heartbeat   time.Duration // for logging
+	// transport specifies the transport to send and receive msgs to members.
+	// Sending messages MUST NOT block. It is okay to drop messages, since
+	// clients should timeout and reissue their messages.
+	// If transport is nil, server will panic.
+	transport rafthttp.Transporter
+}
+
+func newRaftNode(cfg raftNodeConfig) *raftNode {
+	r := &raftNode{
+		tickMu:         new(sync.Mutex),
+		raftNodeConfig: cfg,
+		// set up contention detectors for raft heartbeat message.
+		// expect to send a heartbeat within 2 heartbeat intervals.
+		td:         contention.NewTimeoutDetector(2 * cfg.heartbeat),
+		readStateC: make(chan raft.ReadState, 1),
+		msgSnapC:   make(chan raftpb.Message, maxInFlightMsgSnap),
+		applyc:     make(chan apply),
+		stopped:    make(chan struct{}),
+		done:       make(chan struct{}),
+	}
+	if r.heartbeat == 0 {
+		r.ticker = &time.Ticker{}
+	} else {
+		r.ticker = time.NewTicker(r.heartbeat)
+	}
+	return r
+}
+
+// raft.Node does not have locks in Raft package
+func (r *raftNode) tick() {
+	r.tickMu.Lock()
+	r.Tick()
+	r.tickMu.Unlock()
+}
+
+// start prepares and starts raftNode in a new goroutine. It is no longer safe
+// to modify the fields after it has been started.
+func (r *raftNode) start(rh *raftReadyHandler) {
+	internalTimeout := time.Second
+
+	go func() {
+		defer r.onStop()
+		islead := false
+
+		for {
+			select {
+			case <-r.ticker.C:
+				r.tick()
+			case rd := <-r.Ready():
+				if rd.SoftState != nil {
+					newLeader := rd.SoftState.Lead != raft.None && atomic.LoadUint64(&r.lead) != rd.SoftState.Lead
+					if newLeader {
+						leaderChanges.Inc()
+					}
+
+					if rd.SoftState.Lead == raft.None {
+						hasLeader.Set(0)
+					} else {
+						hasLeader.Set(1)
+					}
+
+					atomic.StoreUint64(&r.lead, rd.SoftState.Lead)
+					islead = rd.RaftState == raft.StateLeader
+					if islead {
+						isLeader.Set(1)
+					} else {
+						isLeader.Set(0)
+					}
+					rh.updateLeadership(newLeader)
+					r.td.Reset()
+				}
+
+				if len(rd.ReadStates) != 0 {
+					select {
+					case r.readStateC <- rd.ReadStates[len(rd.ReadStates)-1]:
+					case <-time.After(internalTimeout):
+						plog.Warningf("timed out sending read state")
+					case <-r.stopped:
+						return
+					}
+				}
+
+				notifyc := make(chan struct{}, 1)
+				ap := apply{
+					entries:  rd.CommittedEntries,
+					snapshot: rd.Snapshot,
+					notifyc:  notifyc,
+				}
+
+				updateCommittedIndex(&ap, rh)
+
+				select {
+				case r.applyc <- ap:
+				case <-r.stopped:
+					return
+				}
+
+				// the leader can write to its disk in parallel with replicating to the followers and them
+				// writing to their disks.
+				// For more details, check raft thesis 10.2.1
+				if islead {
+					// gofail: var raftBeforeLeaderSend struct{}
+					r.transport.Send(r.processMessages(rd.Messages))
+				}
+
+				// Must save the snapshot file and WAL snapshot entry before saving any other entries or hardstate to
+				// ensure that recovery after a snapshot restore is possible.
+				if !raft.IsEmptySnap(rd.Snapshot) {
+					// gofail: var raftBeforeSaveSnap struct{}
+					if err := r.storage.SaveSnap(rd.Snapshot); err != nil {
+						plog.Fatalf("failed to save Raft snapshot %v", err)
+					}
+					// gofail: var raftAfterSaveSnap struct{}
+				}
+
+				// gofail: var raftBeforeSave struct{}
+				if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
+					plog.Fatalf("failed to raft save state and entries %v", err)
+				}
+				if !raft.IsEmptyHardState(rd.HardState) {
+					proposalsCommitted.Set(float64(rd.HardState.Commit))
+				}
+				// gofail: var raftAfterSave struct{}
+
+				if !raft.IsEmptySnap(rd.Snapshot) {
+					// Force WAL to fsync its hard state before Release() releases
+					// old data from the WAL. Otherwise could get an error like:
+					// panic: tocommit(107) is out of range [lastIndex(84)]. Was the raft log corrupted, truncated, or lost?
+					// See https://github.com/etcd-io/etcd/issues/10219 for more details.
+					if err := r.storage.Sync(); err != nil {
+						plog.Fatalf("failed to sync Raft snapshot %v", err)
+					}
+
+					// etcdserver now claim the snapshot has been persisted onto the disk
+					notifyc <- struct{}{}
+
+					// gofail: var raftAfterSaveSnap struct{}
+					r.raftStorage.ApplySnapshot(rd.Snapshot)
+					plog.Infof("raft applied incoming snapshot at index %d", rd.Snapshot.Metadata.Index)
+					// gofail: var raftAfterApplySnap struct{}
+
+					if err := r.storage.Release(rd.Snapshot); err != nil {
+						plog.Fatalf("failed to release Raft wal %v", err)
+					}
+				}
+
+				r.raftStorage.Append(rd.Entries)
+
+				if !islead {
+					// finish processing incoming messages before we signal raftdone chan
+					msgs := r.processMessages(rd.Messages)
+
+					// now unblocks 'applyAll' that waits on Raft log disk writes before triggering snapshots
+					notifyc <- struct{}{}
+
+					// Candidate or follower needs to wait for all pending configuration
+					// changes to be applied before sending messages.
+					// Otherwise we might incorrectly count votes (e.g. votes from removed members).
+					// Also slow machine's follower raft-layer could proceed to become the leader
+					// on its own single-node cluster, before apply-layer applies the config change.
+					// We simply wait for ALL pending entries to be applied for now.
+					// We might improve this later on if it causes unnecessary long blocking issues.
+					waitApply := false
+					for _, ent := range rd.CommittedEntries {
+						if ent.Type == raftpb.EntryConfChange {
+							waitApply = true
+							break
+						}
+					}
+					if waitApply {
+						// blocks until 'applyAll' calls 'applyWait.Trigger'
+						// to be in sync with scheduled config-change job
+						// (assume notifyc has cap of 1)
+						select {
+						case notifyc <- struct{}{}:
+						case <-r.stopped:
+							return
+						}
+					}
+
+					// gofail: var raftBeforeFollowerSend struct{}
+					r.transport.Send(msgs)
+				} else {
+					// leader already processed 'MsgSnap' and signaled
+					notifyc <- struct{}{}
+				}
+
+				r.Advance()
+			case <-r.stopped:
+				return
+			}
+		}
+	}()
+}
+
+func updateCommittedIndex(ap *apply, rh *raftReadyHandler) {
+	var ci uint64
+	if len(ap.entries) != 0 {
+		ci = ap.entries[len(ap.entries)-1].Index
+	}
+	if ap.snapshot.Metadata.Index > ci {
+		ci = ap.snapshot.Metadata.Index
+	}
+	if ci != 0 {
+		rh.updateCommittedIndex(ci)
+	}
+}
+
+func (r *raftNode) processMessages(ms []raftpb.Message) []raftpb.Message {
+	sentAppResp := false
+	for i := len(ms) - 1; i >= 0; i-- {
+		if r.isIDRemoved(ms[i].To) {
+			ms[i].To = 0
+		}
+
+		if ms[i].Type == raftpb.MsgAppResp {
+			if sentAppResp {
+				ms[i].To = 0
+			} else {
+				sentAppResp = true
+			}
+		}
+
+		if ms[i].Type == raftpb.MsgSnap {
+			// There are two separate data store: the store for v2, and the KV for v3.
+			// The msgSnap only contains the most recent snapshot of store without KV.
+			// So we need to redirect the msgSnap to etcd server main loop for merging in the
+			// current store snapshot and KV snapshot.
+			select {
+			case r.msgSnapC <- ms[i]:
+			default:
+				// drop msgSnap if the inflight chan if full.
+			}
+			ms[i].To = 0
+		}
+		if ms[i].Type == raftpb.MsgHeartbeat {
+			ok, exceed := r.td.Observe(ms[i].To)
+			if !ok {
+				// TODO: limit request rate.
+				plog.Warningf("failed to send out heartbeat on time (exceeded the %v timeout for %v, to %x)", r.heartbeat, exceed, ms[i].To)
+				plog.Warningf("server is likely overloaded")
+				heartbeatSendFailures.Inc()
+			}
+		}
+	}
+	return ms
+}
+
+func (r *raftNode) apply() chan apply {
+	return r.applyc
+}
+
+func (r *raftNode) stop() {
+	r.stopped <- struct{}{}
+	<-r.done
+}
+
+func (r *raftNode) onStop() {
+	r.Stop()
+	r.ticker.Stop()
+	r.transport.Stop()
+	if err := r.storage.Close(); err != nil {
+		plog.Panicf("raft close storage error: %v", err)
+	}
+	close(r.done)
+}
+
+// for testing
+func (r *raftNode) pauseSending() {
+	p := r.transport.(rafthttp.Pausable)
+	p.Pause()
+}
+
+func (r *raftNode) resumeSending() {
+	p := r.transport.(rafthttp.Pausable)
+	p.Resume()
+}
+
+// advanceTicks advances ticks of Raft node.
+// This can be used for fast-forwarding election
+// ticks in multi data-center deployments, thus
+// speeding up election process.
+func (r *raftNode) advanceTicks(ticks int) {
+	for i := 0; i < ticks; i++ {
+		r.tick()
+	}
+}
+
+func startNode(cfg ServerConfig, cl *membership.RaftCluster, ids []types.ID) (id types.ID, n raft.Node, s *raft.MemoryStorage, w *wal.WAL) {
+	var err error
+	member := cl.MemberByName(cfg.Name)
+	metadata := pbutil.MustMarshal(
+		&pb.Metadata{
+			NodeID:    uint64(member.ID),
+			ClusterID: uint64(cl.ID()),
+		},
+	)
+	if w, err = wal.Create(cfg.WALDir(), metadata); err != nil {
+		plog.Panicf("create wal error: %v", err)
+	}
+	peers := make([]raft.Peer, len(ids))
+	for i, id := range ids {
+		ctx, err := json.Marshal((*cl).Member(id))
+		if err != nil {
+			plog.Panicf("marshal member should never fail: %v", err)
+		}
+		peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
+	}
+	id = member.ID
+	plog.Infof("starting member %s in cluster %s", id, cl.ID())
+	s = raft.NewMemoryStorage()
+	c := &raft.Config{
+		ID:              uint64(id),
+		ElectionTick:    cfg.ElectionTicks,
+		HeartbeatTick:   1,
+		Storage:         s,
+		MaxSizePerMsg:   maxSizePerMsg,
+		MaxInflightMsgs: maxInflightMsgs,
+		CheckQuorum:     true,
+	}
+
+	n = raft.StartNode(c, peers)
+	raftStatusMu.Lock()
+	raftStatus = n.Status
+	raftStatusMu.Unlock()
+	return id, n, s, w
+}
+
+func restartNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
+	var walsnap walpb.Snapshot
+	if snapshot != nil {
+		walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
+	}
+	w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
+
+	plog.Infof("restarting member %s in cluster %s at commit index %d", id, cid, st.Commit)
+	cl := membership.NewCluster("")
+	cl.SetID(cid)
+	s := raft.NewMemoryStorage()
+	if snapshot != nil {
+		s.ApplySnapshot(*snapshot)
+	}
+	s.SetHardState(st)
+	s.Append(ents)
+	c := &raft.Config{
+		ID:              uint64(id),
+		ElectionTick:    cfg.ElectionTicks,
+		HeartbeatTick:   1,
+		Storage:         s,
+		MaxSizePerMsg:   maxSizePerMsg,
+		MaxInflightMsgs: maxInflightMsgs,
+		CheckQuorum:     true,
+	}
+
+	n := raft.RestartNode(c)
+	raftStatusMu.Lock()
+	raftStatus = n.Status
+	raftStatusMu.Unlock()
+	return id, cl, n, s, w
+}
+
+func restartAsStandaloneNode(cfg ServerConfig, snapshot *raftpb.Snapshot) (types.ID, *membership.RaftCluster, raft.Node, *raft.MemoryStorage, *wal.WAL) {
+	var walsnap walpb.Snapshot
+	if snapshot != nil {
+		walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
+	}
+	w, id, cid, st, ents := readWAL(cfg.WALDir(), walsnap)
+
+	// discard the previously uncommitted entries
+	for i, ent := range ents {
+		if ent.Index > st.Commit {
+			plog.Infof("discarding %d uncommitted WAL entries ", len(ents)-i)
+			ents = ents[:i]
+			break
+		}
+	}
+
+	// force append the configuration change entries
+	toAppEnts := createConfigChangeEnts(getIDs(snapshot, ents), uint64(id), st.Term, st.Commit)
+	ents = append(ents, toAppEnts...)
+
+	// force commit newly appended entries
+	err := w.Save(raftpb.HardState{}, toAppEnts)
+	if err != nil {
+		plog.Fatalf("%v", err)
+	}
+	if len(ents) != 0 {
+		st.Commit = ents[len(ents)-1].Index
+	}
+
+	plog.Printf("forcing restart of member %s in cluster %s at commit index %d", id, cid, st.Commit)
+	cl := membership.NewCluster("")
+	cl.SetID(cid)
+	s := raft.NewMemoryStorage()
+	if snapshot != nil {
+		s.ApplySnapshot(*snapshot)
+	}
+	s.SetHardState(st)
+	s.Append(ents)
+	c := &raft.Config{
+		ID:              uint64(id),
+		ElectionTick:    cfg.ElectionTicks,
+		HeartbeatTick:   1,
+		Storage:         s,
+		MaxSizePerMsg:   maxSizePerMsg,
+		MaxInflightMsgs: maxInflightMsgs,
+		CheckQuorum:     true,
+	}
+	n := raft.RestartNode(c)
+	raftStatus = n.Status
+	return id, cl, n, s, w
+}
+
+// getIDs returns an ordered set of IDs included in the given snapshot and
+// the entries. The given snapshot/entries can contain two kinds of
+// ID-related entry:
+// - ConfChangeAddNode, in which case the contained ID will be added into the set.
+// - ConfChangeRemoveNode, in which case the contained ID will be removed from the set.
+func getIDs(snap *raftpb.Snapshot, ents []raftpb.Entry) []uint64 {
+	ids := make(map[uint64]bool)
+	if snap != nil {
+		for _, id := range snap.Metadata.ConfState.Nodes {
+			ids[id] = true
+		}
+	}
+	for _, e := range ents {
+		if e.Type != raftpb.EntryConfChange {
+			continue
+		}
+		var cc raftpb.ConfChange
+		pbutil.MustUnmarshal(&cc, e.Data)
+		switch cc.Type {
+		case raftpb.ConfChangeAddNode:
+			ids[cc.NodeID] = true
+		case raftpb.ConfChangeRemoveNode:
+			delete(ids, cc.NodeID)
+		case raftpb.ConfChangeUpdateNode:
+			// do nothing
+		default:
+			plog.Panicf("ConfChange Type should be either ConfChangeAddNode or ConfChangeRemoveNode!")
+		}
+	}
+	sids := make(types.Uint64Slice, 0, len(ids))
+	for id := range ids {
+		sids = append(sids, id)
+	}
+	sort.Sort(sids)
+	return []uint64(sids)
+}
+
+// createConfigChangeEnts creates a series of Raft entries (i.e.
+// EntryConfChange) to remove the set of given IDs from the cluster. The ID
+// `self` is _not_ removed, even if present in the set.
+// If `self` is not inside the given ids, it creates a Raft entry to add a
+// default member with the given `self`.
+func createConfigChangeEnts(ids []uint64, self uint64, term, index uint64) []raftpb.Entry {
+	ents := make([]raftpb.Entry, 0)
+	next := index + 1
+	found := false
+	for _, id := range ids {
+		if id == self {
+			found = true
+			continue
+		}
+		cc := &raftpb.ConfChange{
+			Type:   raftpb.ConfChangeRemoveNode,
+			NodeID: id,
+		}
+		e := raftpb.Entry{
+			Type:  raftpb.EntryConfChange,
+			Data:  pbutil.MustMarshal(cc),
+			Term:  term,
+			Index: next,
+		}
+		ents = append(ents, e)
+		next++
+	}
+	if !found {
+		m := membership.Member{
+			ID:             types.ID(self),
+			RaftAttributes: membership.RaftAttributes{PeerURLs: []string{"http://localhost:2380"}},
+		}
+		ctx, err := json.Marshal(m)
+		if err != nil {
+			plog.Panicf("marshal member should never fail: %v", err)
+		}
+		cc := &raftpb.ConfChange{
+			Type:    raftpb.ConfChangeAddNode,
+			NodeID:  self,
+			Context: ctx,
+		}
+		e := raftpb.Entry{
+			Type:  raftpb.EntryConfChange,
+			Data:  pbutil.MustMarshal(cc),
+			Term:  term,
+			Index: next,
+		}
+		ents = append(ents, e)
+	}
+	return ents
+}