VOL-1867 move simulated olt from voltha-go to voltha-simolt-adapter
Sourced from voltha-go commit 251a11c0ffe60512318a644cd6ce0dc4e12f4018
Change-Id: I8e7ee4da1fed739b3c461917301d2729a79307f5
diff --git a/vendor/go.etcd.io/etcd/raft/OWNERS b/vendor/go.etcd.io/etcd/raft/OWNERS
new file mode 100644
index 0000000..ab78106
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/OWNERS
@@ -0,0 +1,19 @@
+approvers:
+- heyitsanthony
+- philips
+- fanminshi
+- gyuho
+- mitake
+- jpbetz
+- xiang90
+- bdarnell
+reviewers:
+- heyitsanthony
+- philips
+- fanminshi
+- gyuho
+- mitake
+- jpbetz
+- xiang90
+- bdarnell
+- tschottdorf
diff --git a/vendor/go.etcd.io/etcd/raft/README.md b/vendor/go.etcd.io/etcd/raft/README.md
new file mode 100644
index 0000000..a78e5f7
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/README.md
@@ -0,0 +1,197 @@
+# Raft library
+
+Raft is a protocol with which a cluster of nodes can maintain a replicated state machine.
+The state machine is kept in sync through the use of a replicated log.
+For more details on Raft, see "In Search of an Understandable Consensus Algorithm"
+(https://ramcloud.stanford.edu/raft.pdf) by Diego Ongaro and John Ousterhout.
+
+This Raft library is stable and feature complete. As of 2016, it is **the most widely used** Raft library in production, serving tens of thousands clusters each day. It powers distributed systems such as etcd, Kubernetes, Docker Swarm, Cloud Foundry Diego, CockroachDB, TiDB, Project Calico, Flannel, and more.
+
+Most Raft implementations have a monolithic design, including storage handling, messaging serialization, and network transport. This library instead follows a minimalistic design philosophy by only implementing the core raft algorithm. This minimalism buys flexibility, determinism, and performance.
+
+To keep the codebase small as well as provide flexibility, the library only implements the Raft algorithm; both network and disk IO are left to the user. Library users must implement their own transportation layer for message passing between Raft peers over the wire. Similarly, users must implement their own storage layer to persist the Raft log and state.
+
+In order to easily test the Raft library, its behavior should be deterministic. To achieve this determinism, the library models Raft as a state machine. The state machine takes a `Message` as input. A message can either be a local timer update or a network message sent from a remote peer. The state machine's output is a 3-tuple `{[]Messages, []LogEntries, NextState}` consisting of an array of `Messages`, `log entries`, and `Raft state changes`. For state machines with the same state, the same state machine input should always generate the same state machine output.
+
+A simple example application, _raftexample_, is also available to help illustrate how to use this package in practice: https://github.com/etcd-io/etcd/tree/master/contrib/raftexample
+
+# Features
+
+This raft implementation is a full feature implementation of Raft protocol. Features includes:
+
+- Leader election
+- Log replication
+- Log compaction
+- Membership changes
+- Leadership transfer extension
+- Efficient linearizable read-only queries served by both the leader and followers
+ - leader checks with quorum and bypasses Raft log before processing read-only queries
+ - followers asks leader to get a safe read index before processing read-only queries
+- More efficient lease-based linearizable read-only queries served by both the leader and followers
+ - leader bypasses Raft log and processing read-only queries locally
+ - followers asks leader to get a safe read index before processing read-only queries
+ - this approach relies on the clock of the all the machines in raft group
+
+This raft implementation also includes a few optional enhancements:
+
+- Optimistic pipelining to reduce log replication latency
+- Flow control for log replication
+- Batching Raft messages to reduce synchronized network I/O calls
+- Batching log entries to reduce disk synchronized I/O
+- Writing to leader's disk in parallel
+- Internal proposal redirection from followers to leader
+- Automatic stepping down when the leader loses quorum
+- Protection against unbounded log growth when quorum is lost
+
+## Notable Users
+
+- [cockroachdb](https://github.com/cockroachdb/cockroach) A Scalable, Survivable, Strongly-Consistent SQL Database
+- [dgraph](https://github.com/dgraph-io/dgraph) A Scalable, Distributed, Low Latency, High Throughput Graph Database
+- [etcd](https://github.com/etcd-io/etcd) A distributed reliable key-value store
+- [tikv](https://github.com/pingcap/tikv) A Distributed transactional key value database powered by Rust and Raft
+- [swarmkit](https://github.com/docker/swarmkit) A toolkit for orchestrating distributed systems at any scale.
+- [chain core](https://github.com/chain/chain) Software for operating permissioned, multi-asset blockchain networks
+
+## Usage
+
+The primary object in raft is a Node. Either start a Node from scratch using raft.StartNode or start a Node from some initial state using raft.RestartNode.
+
+To start a three-node cluster
+```go
+ storage := raft.NewMemoryStorage()
+ c := &Config{
+ ID: 0x01,
+ ElectionTick: 10,
+ HeartbeatTick: 1,
+ Storage: storage,
+ MaxSizePerMsg: 4096,
+ MaxInflightMsgs: 256,
+ }
+ // Set peer list to the other nodes in the cluster.
+ // Note that they need to be started separately as well.
+ n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})
+```
+
+Start a single node cluster, like so:
+```go
+ // Create storage and config as shown above.
+ // Set peer list to itself, so this node can become the leader of this single-node cluster.
+ peers := []raft.Peer{{ID: 0x01}}
+ n := raft.StartNode(c, peers)
+```
+
+To allow a new node to join this cluster, do not pass in any peers. First, add the node to the existing cluster by calling `ProposeConfChange` on any existing node inside the cluster. Then, start the node with an empty peer list, like so:
+```go
+ // Create storage and config as shown above.
+ n := raft.StartNode(c, nil)
+```
+
+To restart a node from previous state:
+```go
+ storage := raft.NewMemoryStorage()
+
+ // Recover the in-memory storage from persistent snapshot, state and entries.
+ storage.ApplySnapshot(snapshot)
+ storage.SetHardState(state)
+ storage.Append(entries)
+
+ c := &Config{
+ ID: 0x01,
+ ElectionTick: 10,
+ HeartbeatTick: 1,
+ Storage: storage,
+ MaxSizePerMsg: 4096,
+ MaxInflightMsgs: 256,
+ }
+
+ // Restart raft without peer information.
+ // Peer information is already included in the storage.
+ n := raft.RestartNode(c)
+```
+
+After creating a Node, the user has a few responsibilities:
+
+First, read from the Node.Ready() channel and process the updates it contains. These steps may be performed in parallel, except as noted in step 2.
+
+1. Write Entries, HardState and Snapshot to persistent storage in order, i.e. Entries first, then HardState and Snapshot if they are not empty. If persistent storage supports atomic writes then all of them can be written together. Note that when writing an Entry with Index i, any previously-persisted entries with Index >= i must be discarded.
+
+2. Send all Messages to the nodes named in the To field. It is important that no messages be sent until the latest HardState has been persisted to disk, and all Entries written by any previous Ready batch (Messages may be sent while entries from the same batch are being persisted). To reduce the I/O latency, an optimization can be applied to make leader write to disk in parallel with its followers (as explained at section 10.2.1 in Raft thesis). If any Message has type MsgSnap, call Node.ReportSnapshot() after it has been sent (these messages may be large). Note: Marshalling messages is not thread-safe; it is important to make sure that no new entries are persisted while marshalling. The easiest way to achieve this is to serialise the messages directly inside the main raft loop.
+
+3. Apply Snapshot (if any) and CommittedEntries to the state machine. If any committed Entry has Type EntryConfChange, call Node.ApplyConfChange() to apply it to the node. The configuration change may be cancelled at this point by setting the NodeID field to zero before calling ApplyConfChange (but ApplyConfChange must be called one way or the other, and the decision to cancel must be based solely on the state machine and not external information such as the observed health of the node).
+
+4. Call Node.Advance() to signal readiness for the next batch of updates. This may be done at any time after step 1, although all updates must be processed in the order they were returned by Ready.
+
+Second, all persisted log entries must be made available via an implementation of the Storage interface. The provided MemoryStorage type can be used for this (if repopulating its state upon a restart), or a custom disk-backed implementation can be supplied.
+
+Third, after receiving a message from another node, pass it to Node.Step:
+
+```go
+ func recvRaftRPC(ctx context.Context, m raftpb.Message) {
+ n.Step(ctx, m)
+ }
+```
+
+Finally, call `Node.Tick()` at regular intervals (probably via a `time.Ticker`). Raft has two important timeouts: heartbeat and the election timeout. However, internally to the raft package time is represented by an abstract "tick".
+
+The total state machine handling loop will look something like this:
+
+```go
+ for {
+ select {
+ case <-s.Ticker:
+ n.Tick()
+ case rd := <-s.Node.Ready():
+ saveToStorage(rd.HardState, rd.Entries, rd.Snapshot)
+ send(rd.Messages)
+ if !raft.IsEmptySnap(rd.Snapshot) {
+ processSnapshot(rd.Snapshot)
+ }
+ for _, entry := range rd.CommittedEntries {
+ process(entry)
+ if entry.Type == raftpb.EntryConfChange {
+ var cc raftpb.ConfChange
+ cc.Unmarshal(entry.Data)
+ s.Node.ApplyConfChange(cc)
+ }
+ }
+ s.Node.Advance()
+ case <-s.done:
+ return
+ }
+ }
+```
+
+To propose changes to the state machine from the node to take application data, serialize it into a byte slice and call:
+
+```go
+ n.Propose(ctx, data)
+```
+
+If the proposal is committed, data will appear in committed entries with type raftpb.EntryNormal. There is no guarantee that a proposed command will be committed; the command may have to be reproposed after a timeout.
+
+To add or remove node in a cluster, build ConfChange struct 'cc' and call:
+
+```go
+ n.ProposeConfChange(ctx, cc)
+```
+
+After config change is committed, some committed entry with type raftpb.EntryConfChange will be returned. This must be applied to node through:
+
+```go
+ var cc raftpb.ConfChange
+ cc.Unmarshal(data)
+ n.ApplyConfChange(cc)
+```
+
+Note: An ID represents a unique node in a cluster for all time. A
+given ID MUST be used only once even if the old node has been removed.
+This means that for example IP addresses make poor node IDs since they
+may be reused. Node IDs must be non-zero.
+
+## Implementation notes
+
+This implementation is up to date with the final Raft thesis (https://ramcloud.stanford.edu/~ongaro/thesis.pdf), although this implementation of the membership change protocol differs somewhat from that described in chapter 4. The key invariant that membership changes happen one node at a time is preserved, but in our implementation the membership change takes effect when its entry is applied, not when it is added to the log (so the entry is committed under the old membership instead of the new). This is equivalent in terms of safety, since the old and new configurations are guaranteed to overlap.
+
+To ensure there is no attempt to commit two membership changes at once by matching log positions (which would be unsafe since they should have different quorum requirements), any proposed membership change is simply disallowed while any uncommitted change appears in the leader's log.
+
+This approach introduces a problem when removing a member from a two-member cluster: If one of the members dies before the other one receives the commit of the confchange entry, then the member cannot be removed any more since the cluster cannot make progress. For this reason it is highly recommended to use three or more nodes in every cluster.
diff --git a/vendor/go.etcd.io/etcd/raft/design.md b/vendor/go.etcd.io/etcd/raft/design.md
new file mode 100644
index 0000000..7bc0531
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/design.md
@@ -0,0 +1,57 @@
+## Progress
+
+Progress represents a follower’s progress in the view of the leader. Leader maintains progresses of all followers, and sends `replication message` to the follower based on its progress.
+
+`replication message` is a `msgApp` with log entries.
+
+A progress has two attribute: `match` and `next`. `match` is the index of the highest known matched entry. If leader knows nothing about follower’s replication status, `match` is set to zero. `next` is the index of the first entry that will be replicated to the follower. Leader puts entries from `next` to its latest one in next `replication message`.
+
+A progress is in one of the three state: `probe`, `replicate`, `snapshot`.
+
+```
+ +--------------------------------------------------------+
+ | send snapshot |
+ | |
+ +---------+----------+ +----------v---------+
+ +---> probe | | snapshot |
+ | | max inflight = 1 <----------------------------------+ max inflight = 0 |
+ | +---------+----------+ +--------------------+
+ | | 1. snapshot success
+ | | (next=snapshot.index + 1)
+ | | 2. snapshot failure
+ | | (no change)
+ | | 3. receives msgAppResp(rej=false&&index>lastsnap.index)
+ | | (match=m.index,next=match+1)
+receives msgAppResp(rej=true)
+(next=match+1)| |
+ | |
+ | |
+ | | receives msgAppResp(rej=false&&index>match)
+ | | (match=m.index,next=match+1)
+ | |
+ | |
+ | |
+ | +---------v----------+
+ | | replicate |
+ +---+ max inflight = n |
+ +--------------------+
+```
+
+When the progress of a follower is in `probe` state, leader sends at most one `replication message` per heartbeat interval. The leader sends `replication message` slowly and probing the actual progress of the follower. A `msgHeartbeatResp` or a `msgAppResp` with reject might trigger the sending of the next `replication message`.
+
+When the progress of a follower is in `replicate` state, leader sends `replication message`, then optimistically increases `next` to the latest entry sent. This is an optimized state for fast replicating log entries to the follower.
+
+When the progress of a follower is in `snapshot` state, leader stops sending any `replication message`.
+
+A newly elected leader sets the progresses of all the followers to `probe` state with `match` = 0 and `next` = last index. The leader slowly (at most once per heartbeat) sends `replication message` to the follower and probes its progress.
+
+A progress changes to `replicate` when the follower replies with a non-rejection `msgAppResp`, which implies that it has matched the index sent. At this point, leader starts to stream log entries to the follower fast. The progress will fall back to `probe` when the follower replies a rejection `msgAppResp` or the link layer reports the follower is unreachable. We aggressively reset `next` to `match`+1 since if we receive any `msgAppResp` soon, both `match` and `next` will increase directly to the `index` in `msgAppResp`. (We might end up with sending some duplicate entries when aggressively reset `next` too low. see open question)
+
+A progress changes from `probe` to `snapshot` when the follower falls very far behind and requires a snapshot. After sending `msgSnap`, the leader waits until the success, failure or abortion of the previous snapshot sent. The progress will go back to `probe` after the sending result is applied.
+
+### Flow Control
+
+1. limit the max size of message sent per message. Max should be configurable.
+Lower the cost at probing state as we limit the size per message; lower the penalty when aggressively decreased to a too low `next`
+
+2. limit the # of in flight messages < N when in `replicate` state. N should be configurable. Most implementation will have a sending buffer on top of its actual network transport layer (not blocking raft node). We want to make sure raft does not overflow that buffer, which can cause message dropping and triggering a bunch of unnecessary resending repeatedly.
diff --git a/vendor/go.etcd.io/etcd/raft/doc.go b/vendor/go.etcd.io/etcd/raft/doc.go
new file mode 100644
index 0000000..c30d884
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/doc.go
@@ -0,0 +1,300 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+/*
+Package raft sends and receives messages in the Protocol Buffer format
+defined in the raftpb package.
+
+Raft is a protocol with which a cluster of nodes can maintain a replicated state machine.
+The state machine is kept in sync through the use of a replicated log.
+For more details on Raft, see "In Search of an Understandable Consensus Algorithm"
+(https://ramcloud.stanford.edu/raft.pdf) by Diego Ongaro and John Ousterhout.
+
+A simple example application, _raftexample_, is also available to help illustrate
+how to use this package in practice:
+https://github.com/etcd-io/etcd/tree/master/contrib/raftexample
+
+Usage
+
+The primary object in raft is a Node. You either start a Node from scratch
+using raft.StartNode or start a Node from some initial state using raft.RestartNode.
+
+To start a node from scratch:
+
+ storage := raft.NewMemoryStorage()
+ c := &Config{
+ ID: 0x01,
+ ElectionTick: 10,
+ HeartbeatTick: 1,
+ Storage: storage,
+ MaxSizePerMsg: 4096,
+ MaxInflightMsgs: 256,
+ }
+ n := raft.StartNode(c, []raft.Peer{{ID: 0x02}, {ID: 0x03}})
+
+To restart a node from previous state:
+
+ storage := raft.NewMemoryStorage()
+
+ // recover the in-memory storage from persistent
+ // snapshot, state and entries.
+ storage.ApplySnapshot(snapshot)
+ storage.SetHardState(state)
+ storage.Append(entries)
+
+ c := &Config{
+ ID: 0x01,
+ ElectionTick: 10,
+ HeartbeatTick: 1,
+ Storage: storage,
+ MaxSizePerMsg: 4096,
+ MaxInflightMsgs: 256,
+ }
+
+ // restart raft without peer information.
+ // peer information is already included in the storage.
+ n := raft.RestartNode(c)
+
+Now that you are holding onto a Node you have a few responsibilities:
+
+First, you must read from the Node.Ready() channel and process the updates
+it contains. These steps may be performed in parallel, except as noted in step
+2.
+
+1. Write HardState, Entries, and Snapshot to persistent storage if they are
+not empty. Note that when writing an Entry with Index i, any
+previously-persisted entries with Index >= i must be discarded.
+
+2. Send all Messages to the nodes named in the To field. It is important that
+no messages be sent until the latest HardState has been persisted to disk,
+and all Entries written by any previous Ready batch (Messages may be sent while
+entries from the same batch are being persisted). To reduce the I/O latency, an
+optimization can be applied to make leader write to disk in parallel with its
+followers (as explained at section 10.2.1 in Raft thesis). If any Message has type
+MsgSnap, call Node.ReportSnapshot() after it has been sent (these messages may be
+large).
+
+Note: Marshalling messages is not thread-safe; it is important that you
+make sure that no new entries are persisted while marshalling.
+The easiest way to achieve this is to serialize the messages directly inside
+your main raft loop.
+
+3. Apply Snapshot (if any) and CommittedEntries to the state machine.
+If any committed Entry has Type EntryConfChange, call Node.ApplyConfChange()
+to apply it to the node. The configuration change may be cancelled at this point
+by setting the NodeID field to zero before calling ApplyConfChange
+(but ApplyConfChange must be called one way or the other, and the decision to cancel
+must be based solely on the state machine and not external information such as
+the observed health of the node).
+
+4. Call Node.Advance() to signal readiness for the next batch of updates.
+This may be done at any time after step 1, although all updates must be processed
+in the order they were returned by Ready.
+
+Second, all persisted log entries must be made available via an
+implementation of the Storage interface. The provided MemoryStorage
+type can be used for this (if you repopulate its state upon a
+restart), or you can supply your own disk-backed implementation.
+
+Third, when you receive a message from another node, pass it to Node.Step:
+
+ func recvRaftRPC(ctx context.Context, m raftpb.Message) {
+ n.Step(ctx, m)
+ }
+
+Finally, you need to call Node.Tick() at regular intervals (probably
+via a time.Ticker). Raft has two important timeouts: heartbeat and the
+election timeout. However, internally to the raft package time is
+represented by an abstract "tick".
+
+The total state machine handling loop will look something like this:
+
+ for {
+ select {
+ case <-s.Ticker:
+ n.Tick()
+ case rd := <-s.Node.Ready():
+ saveToStorage(rd.State, rd.Entries, rd.Snapshot)
+ send(rd.Messages)
+ if !raft.IsEmptySnap(rd.Snapshot) {
+ processSnapshot(rd.Snapshot)
+ }
+ for _, entry := range rd.CommittedEntries {
+ process(entry)
+ if entry.Type == raftpb.EntryConfChange {
+ var cc raftpb.ConfChange
+ cc.Unmarshal(entry.Data)
+ s.Node.ApplyConfChange(cc)
+ }
+ }
+ s.Node.Advance()
+ case <-s.done:
+ return
+ }
+ }
+
+To propose changes to the state machine from your node take your application
+data, serialize it into a byte slice and call:
+
+ n.Propose(ctx, data)
+
+If the proposal is committed, data will appear in committed entries with type
+raftpb.EntryNormal. There is no guarantee that a proposed command will be
+committed; you may have to re-propose after a timeout.
+
+To add or remove a node in a cluster, build ConfChange struct 'cc' and call:
+
+ n.ProposeConfChange(ctx, cc)
+
+After config change is committed, some committed entry with type
+raftpb.EntryConfChange will be returned. You must apply it to node through:
+
+ var cc raftpb.ConfChange
+ cc.Unmarshal(data)
+ n.ApplyConfChange(cc)
+
+Note: An ID represents a unique node in a cluster for all time. A
+given ID MUST be used only once even if the old node has been removed.
+This means that for example IP addresses make poor node IDs since they
+may be reused. Node IDs must be non-zero.
+
+Implementation notes
+
+This implementation is up to date with the final Raft thesis
+(https://ramcloud.stanford.edu/~ongaro/thesis.pdf), although our
+implementation of the membership change protocol differs somewhat from
+that described in chapter 4. The key invariant that membership changes
+happen one node at a time is preserved, but in our implementation the
+membership change takes effect when its entry is applied, not when it
+is added to the log (so the entry is committed under the old
+membership instead of the new). This is equivalent in terms of safety,
+since the old and new configurations are guaranteed to overlap.
+
+To ensure that we do not attempt to commit two membership changes at
+once by matching log positions (which would be unsafe since they
+should have different quorum requirements), we simply disallow any
+proposed membership change while any uncommitted change appears in
+the leader's log.
+
+This approach introduces a problem when you try to remove a member
+from a two-member cluster: If one of the members dies before the
+other one receives the commit of the confchange entry, then the member
+cannot be removed any more since the cluster cannot make progress.
+For this reason it is highly recommended to use three or more nodes in
+every cluster.
+
+MessageType
+
+Package raft sends and receives message in Protocol Buffer format (defined
+in raftpb package). Each state (follower, candidate, leader) implements its
+own 'step' method ('stepFollower', 'stepCandidate', 'stepLeader') when
+advancing with the given raftpb.Message. Each step is determined by its
+raftpb.MessageType. Note that every step is checked by one common method
+'Step' that safety-checks the terms of node and incoming message to prevent
+stale log entries:
+
+ 'MsgHup' is used for election. If a node is a follower or candidate, the
+ 'tick' function in 'raft' struct is set as 'tickElection'. If a follower or
+ candidate has not received any heartbeat before the election timeout, it
+ passes 'MsgHup' to its Step method and becomes (or remains) a candidate to
+ start a new election.
+
+ 'MsgBeat' is an internal type that signals the leader to send a heartbeat of
+ the 'MsgHeartbeat' type. If a node is a leader, the 'tick' function in
+ the 'raft' struct is set as 'tickHeartbeat', and triggers the leader to
+ send periodic 'MsgHeartbeat' messages to its followers.
+
+ 'MsgProp' proposes to append data to its log entries. This is a special
+ type to redirect proposals to leader. Therefore, send method overwrites
+ raftpb.Message's term with its HardState's term to avoid attaching its
+ local term to 'MsgProp'. When 'MsgProp' is passed to the leader's 'Step'
+ method, the leader first calls the 'appendEntry' method to append entries
+ to its log, and then calls 'bcastAppend' method to send those entries to
+ its peers. When passed to candidate, 'MsgProp' is dropped. When passed to
+ follower, 'MsgProp' is stored in follower's mailbox(msgs) by the send
+ method. It is stored with sender's ID and later forwarded to leader by
+ rafthttp package.
+
+ 'MsgApp' contains log entries to replicate. A leader calls bcastAppend,
+ which calls sendAppend, which sends soon-to-be-replicated logs in 'MsgApp'
+ type. When 'MsgApp' is passed to candidate's Step method, candidate reverts
+ back to follower, because it indicates that there is a valid leader sending
+ 'MsgApp' messages. Candidate and follower respond to this message in
+ 'MsgAppResp' type.
+
+ 'MsgAppResp' is response to log replication request('MsgApp'). When
+ 'MsgApp' is passed to candidate or follower's Step method, it responds by
+ calling 'handleAppendEntries' method, which sends 'MsgAppResp' to raft
+ mailbox.
+
+ 'MsgVote' requests votes for election. When a node is a follower or
+ candidate and 'MsgHup' is passed to its Step method, then the node calls
+ 'campaign' method to campaign itself to become a leader. Once 'campaign'
+ method is called, the node becomes candidate and sends 'MsgVote' to peers
+ in cluster to request votes. When passed to leader or candidate's Step
+ method and the message's Term is lower than leader's or candidate's,
+ 'MsgVote' will be rejected ('MsgVoteResp' is returned with Reject true).
+ If leader or candidate receives 'MsgVote' with higher term, it will revert
+ back to follower. When 'MsgVote' is passed to follower, it votes for the
+ sender only when sender's last term is greater than MsgVote's term or
+ sender's last term is equal to MsgVote's term but sender's last committed
+ index is greater than or equal to follower's.
+
+ 'MsgVoteResp' contains responses from voting request. When 'MsgVoteResp' is
+ passed to candidate, the candidate calculates how many votes it has won. If
+ it's more than majority (quorum), it becomes leader and calls 'bcastAppend'.
+ If candidate receives majority of votes of denials, it reverts back to
+ follower.
+
+ 'MsgPreVote' and 'MsgPreVoteResp' are used in an optional two-phase election
+ protocol. When Config.PreVote is true, a pre-election is carried out first
+ (using the same rules as a regular election), and no node increases its term
+ number unless the pre-election indicates that the campaigning node would win.
+ This minimizes disruption when a partitioned node rejoins the cluster.
+
+ 'MsgSnap' requests to install a snapshot message. When a node has just
+ become a leader or the leader receives 'MsgProp' message, it calls
+ 'bcastAppend' method, which then calls 'sendAppend' method to each
+ follower. In 'sendAppend', if a leader fails to get term or entries,
+ the leader requests snapshot by sending 'MsgSnap' type message.
+
+ 'MsgSnapStatus' tells the result of snapshot install message. When a
+ follower rejected 'MsgSnap', it indicates the snapshot request with
+ 'MsgSnap' had failed from network issues which causes the network layer
+ to fail to send out snapshots to its followers. Then leader considers
+ follower's progress as probe. When 'MsgSnap' were not rejected, it
+ indicates that the snapshot succeeded and the leader sets follower's
+ progress to probe and resumes its log replication.
+
+ 'MsgHeartbeat' sends heartbeat from leader. When 'MsgHeartbeat' is passed
+ to candidate and message's term is higher than candidate's, the candidate
+ reverts back to follower and updates its committed index from the one in
+ this heartbeat. And it sends the message to its mailbox. When
+ 'MsgHeartbeat' is passed to follower's Step method and message's term is
+ higher than follower's, the follower updates its leaderID with the ID
+ from the message.
+
+ 'MsgHeartbeatResp' is a response to 'MsgHeartbeat'. When 'MsgHeartbeatResp'
+ is passed to leader's Step method, the leader knows which follower
+ responded. And only when the leader's last committed index is greater than
+ follower's Match index, the leader runs 'sendAppend` method.
+
+ 'MsgUnreachable' tells that request(message) wasn't delivered. When
+ 'MsgUnreachable' is passed to leader's Step method, the leader discovers
+ that the follower that sent this 'MsgUnreachable' is not reachable, often
+ indicating 'MsgApp' is lost. When follower's progress state is replicate,
+ the leader sets it back to probe.
+
+*/
+package raft
diff --git a/vendor/go.etcd.io/etcd/raft/log.go b/vendor/go.etcd.io/etcd/raft/log.go
new file mode 100644
index 0000000..03f83e6
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/log.go
@@ -0,0 +1,370 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raft
+
+import (
+ "fmt"
+ "log"
+
+ pb "go.etcd.io/etcd/raft/raftpb"
+)
+
+type raftLog struct {
+ // storage contains all stable entries since the last snapshot.
+ storage Storage
+
+ // unstable contains all unstable entries and snapshot.
+ // they will be saved into storage.
+ unstable unstable
+
+ // committed is the highest log position that is known to be in
+ // stable storage on a quorum of nodes.
+ committed uint64
+ // applied is the highest log position that the application has
+ // been instructed to apply to its state machine.
+ // Invariant: applied <= committed
+ applied uint64
+
+ logger Logger
+
+ // maxNextEntsSize is the maximum number aggregate byte size of the messages
+ // returned from calls to nextEnts.
+ maxNextEntsSize uint64
+}
+
+// newLog returns log using the given storage and default options. It
+// recovers the log to the state that it just commits and applies the
+// latest snapshot.
+func newLog(storage Storage, logger Logger) *raftLog {
+ return newLogWithSize(storage, logger, noLimit)
+}
+
+// newLogWithSize returns a log using the given storage and max
+// message size.
+func newLogWithSize(storage Storage, logger Logger, maxNextEntsSize uint64) *raftLog {
+ if storage == nil {
+ log.Panic("storage must not be nil")
+ }
+ log := &raftLog{
+ storage: storage,
+ logger: logger,
+ maxNextEntsSize: maxNextEntsSize,
+ }
+ firstIndex, err := storage.FirstIndex()
+ if err != nil {
+ panic(err) // TODO(bdarnell)
+ }
+ lastIndex, err := storage.LastIndex()
+ if err != nil {
+ panic(err) // TODO(bdarnell)
+ }
+ log.unstable.offset = lastIndex + 1
+ log.unstable.logger = logger
+ // Initialize our committed and applied pointers to the time of the last compaction.
+ log.committed = firstIndex - 1
+ log.applied = firstIndex - 1
+
+ return log
+}
+
+func (l *raftLog) String() string {
+ return fmt.Sprintf("committed=%d, applied=%d, unstable.offset=%d, len(unstable.Entries)=%d", l.committed, l.applied, l.unstable.offset, len(l.unstable.entries))
+}
+
+// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
+// it returns (last index of new entries, true).
+func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
+ if l.matchTerm(index, logTerm) {
+ lastnewi = index + uint64(len(ents))
+ ci := l.findConflict(ents)
+ switch {
+ case ci == 0:
+ case ci <= l.committed:
+ l.logger.Panicf("entry %d conflict with committed entry [committed(%d)]", ci, l.committed)
+ default:
+ offset := index + 1
+ l.append(ents[ci-offset:]...)
+ }
+ l.commitTo(min(committed, lastnewi))
+ return lastnewi, true
+ }
+ return 0, false
+}
+
+func (l *raftLog) append(ents ...pb.Entry) uint64 {
+ if len(ents) == 0 {
+ return l.lastIndex()
+ }
+ if after := ents[0].Index - 1; after < l.committed {
+ l.logger.Panicf("after(%d) is out of range [committed(%d)]", after, l.committed)
+ }
+ l.unstable.truncateAndAppend(ents)
+ return l.lastIndex()
+}
+
+// findConflict finds the index of the conflict.
+// It returns the first pair of conflicting entries between the existing
+// entries and the given entries, if there are any.
+// If there is no conflicting entries, and the existing entries contains
+// all the given entries, zero will be returned.
+// If there is no conflicting entries, but the given entries contains new
+// entries, the index of the first new entry will be returned.
+// An entry is considered to be conflicting if it has the same index but
+// a different term.
+// The first entry MUST have an index equal to the argument 'from'.
+// The index of the given entries MUST be continuously increasing.
+func (l *raftLog) findConflict(ents []pb.Entry) uint64 {
+ for _, ne := range ents {
+ if !l.matchTerm(ne.Index, ne.Term) {
+ if ne.Index <= l.lastIndex() {
+ l.logger.Infof("found conflict at index %d [existing term: %d, conflicting term: %d]",
+ ne.Index, l.zeroTermOnErrCompacted(l.term(ne.Index)), ne.Term)
+ }
+ return ne.Index
+ }
+ }
+ return 0
+}
+
+func (l *raftLog) unstableEntries() []pb.Entry {
+ if len(l.unstable.entries) == 0 {
+ return nil
+ }
+ return l.unstable.entries
+}
+
+// nextEnts returns all the available entries for execution.
+// If applied is smaller than the index of snapshot, it returns all committed
+// entries after the index of snapshot.
+func (l *raftLog) nextEnts() (ents []pb.Entry) {
+ off := max(l.applied+1, l.firstIndex())
+ if l.committed+1 > off {
+ ents, err := l.slice(off, l.committed+1, l.maxNextEntsSize)
+ if err != nil {
+ l.logger.Panicf("unexpected error when getting unapplied entries (%v)", err)
+ }
+ return ents
+ }
+ return nil
+}
+
+// hasNextEnts returns if there is any available entries for execution. This
+// is a fast check without heavy raftLog.slice() in raftLog.nextEnts().
+func (l *raftLog) hasNextEnts() bool {
+ off := max(l.applied+1, l.firstIndex())
+ return l.committed+1 > off
+}
+
+func (l *raftLog) snapshot() (pb.Snapshot, error) {
+ if l.unstable.snapshot != nil {
+ return *l.unstable.snapshot, nil
+ }
+ return l.storage.Snapshot()
+}
+
+func (l *raftLog) firstIndex() uint64 {
+ if i, ok := l.unstable.maybeFirstIndex(); ok {
+ return i
+ }
+ index, err := l.storage.FirstIndex()
+ if err != nil {
+ panic(err) // TODO(bdarnell)
+ }
+ return index
+}
+
+func (l *raftLog) lastIndex() uint64 {
+ if i, ok := l.unstable.maybeLastIndex(); ok {
+ return i
+ }
+ i, err := l.storage.LastIndex()
+ if err != nil {
+ panic(err) // TODO(bdarnell)
+ }
+ return i
+}
+
+func (l *raftLog) commitTo(tocommit uint64) {
+ // never decrease commit
+ if l.committed < tocommit {
+ if l.lastIndex() < tocommit {
+ l.logger.Panicf("tocommit(%d) is out of range [lastIndex(%d)]. Was the raft log corrupted, truncated, or lost?", tocommit, l.lastIndex())
+ }
+ l.committed = tocommit
+ }
+}
+
+func (l *raftLog) appliedTo(i uint64) {
+ if i == 0 {
+ return
+ }
+ if l.committed < i || i < l.applied {
+ l.logger.Panicf("applied(%d) is out of range [prevApplied(%d), committed(%d)]", i, l.applied, l.committed)
+ }
+ l.applied = i
+}
+
+func (l *raftLog) stableTo(i, t uint64) { l.unstable.stableTo(i, t) }
+
+func (l *raftLog) stableSnapTo(i uint64) { l.unstable.stableSnapTo(i) }
+
+func (l *raftLog) lastTerm() uint64 {
+ t, err := l.term(l.lastIndex())
+ if err != nil {
+ l.logger.Panicf("unexpected error when getting the last term (%v)", err)
+ }
+ return t
+}
+
+func (l *raftLog) term(i uint64) (uint64, error) {
+ // the valid term range is [index of dummy entry, last index]
+ dummyIndex := l.firstIndex() - 1
+ if i < dummyIndex || i > l.lastIndex() {
+ // TODO: return an error instead?
+ return 0, nil
+ }
+
+ if t, ok := l.unstable.maybeTerm(i); ok {
+ return t, nil
+ }
+
+ t, err := l.storage.Term(i)
+ if err == nil {
+ return t, nil
+ }
+ if err == ErrCompacted || err == ErrUnavailable {
+ return 0, err
+ }
+ panic(err) // TODO(bdarnell)
+}
+
+func (l *raftLog) entries(i, maxsize uint64) ([]pb.Entry, error) {
+ if i > l.lastIndex() {
+ return nil, nil
+ }
+ return l.slice(i, l.lastIndex()+1, maxsize)
+}
+
+// allEntries returns all entries in the log.
+func (l *raftLog) allEntries() []pb.Entry {
+ ents, err := l.entries(l.firstIndex(), noLimit)
+ if err == nil {
+ return ents
+ }
+ if err == ErrCompacted { // try again if there was a racing compaction
+ return l.allEntries()
+ }
+ // TODO (xiangli): handle error?
+ panic(err)
+}
+
+// isUpToDate determines if the given (lastIndex,term) log is more up-to-date
+// by comparing the index and term of the last entries in the existing logs.
+// If the logs have last entries with different terms, then the log with the
+// later term is more up-to-date. If the logs end with the same term, then
+// whichever log has the larger lastIndex is more up-to-date. If the logs are
+// the same, the given log is up-to-date.
+func (l *raftLog) isUpToDate(lasti, term uint64) bool {
+ return term > l.lastTerm() || (term == l.lastTerm() && lasti >= l.lastIndex())
+}
+
+func (l *raftLog) matchTerm(i, term uint64) bool {
+ t, err := l.term(i)
+ if err != nil {
+ return false
+ }
+ return t == term
+}
+
+func (l *raftLog) maybeCommit(maxIndex, term uint64) bool {
+ if maxIndex > l.committed && l.zeroTermOnErrCompacted(l.term(maxIndex)) == term {
+ l.commitTo(maxIndex)
+ return true
+ }
+ return false
+}
+
+func (l *raftLog) restore(s pb.Snapshot) {
+ l.logger.Infof("log [%s] starts to restore snapshot [index: %d, term: %d]", l, s.Metadata.Index, s.Metadata.Term)
+ l.committed = s.Metadata.Index
+ l.unstable.restore(s)
+}
+
+// slice returns a slice of log entries from lo through hi-1, inclusive.
+func (l *raftLog) slice(lo, hi, maxSize uint64) ([]pb.Entry, error) {
+ err := l.mustCheckOutOfBounds(lo, hi)
+ if err != nil {
+ return nil, err
+ }
+ if lo == hi {
+ return nil, nil
+ }
+ var ents []pb.Entry
+ if lo < l.unstable.offset {
+ storedEnts, err := l.storage.Entries(lo, min(hi, l.unstable.offset), maxSize)
+ if err == ErrCompacted {
+ return nil, err
+ } else if err == ErrUnavailable {
+ l.logger.Panicf("entries[%d:%d) is unavailable from storage", lo, min(hi, l.unstable.offset))
+ } else if err != nil {
+ panic(err) // TODO(bdarnell)
+ }
+
+ // check if ents has reached the size limitation
+ if uint64(len(storedEnts)) < min(hi, l.unstable.offset)-lo {
+ return storedEnts, nil
+ }
+
+ ents = storedEnts
+ }
+ if hi > l.unstable.offset {
+ unstable := l.unstable.slice(max(lo, l.unstable.offset), hi)
+ if len(ents) > 0 {
+ ents = append([]pb.Entry{}, ents...)
+ ents = append(ents, unstable...)
+ } else {
+ ents = unstable
+ }
+ }
+ return limitSize(ents, maxSize), nil
+}
+
+// l.firstIndex <= lo <= hi <= l.firstIndex + len(l.entries)
+func (l *raftLog) mustCheckOutOfBounds(lo, hi uint64) error {
+ if lo > hi {
+ l.logger.Panicf("invalid slice %d > %d", lo, hi)
+ }
+ fi := l.firstIndex()
+ if lo < fi {
+ return ErrCompacted
+ }
+
+ length := l.lastIndex() + 1 - fi
+ if lo < fi || hi > fi+length {
+ l.logger.Panicf("slice[%d,%d) out of bound [%d,%d]", lo, hi, fi, l.lastIndex())
+ }
+ return nil
+}
+
+func (l *raftLog) zeroTermOnErrCompacted(t uint64, err error) uint64 {
+ if err == nil {
+ return t
+ }
+ if err == ErrCompacted {
+ return 0
+ }
+ l.logger.Panicf("unexpected error (%v)", err)
+ return 0
+}
diff --git a/vendor/go.etcd.io/etcd/raft/log_unstable.go b/vendor/go.etcd.io/etcd/raft/log_unstable.go
new file mode 100644
index 0000000..1005bf6
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/log_unstable.go
@@ -0,0 +1,159 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raft
+
+import pb "go.etcd.io/etcd/raft/raftpb"
+
+// unstable.entries[i] has raft log position i+unstable.offset.
+// Note that unstable.offset may be less than the highest log
+// position in storage; this means that the next write to storage
+// might need to truncate the log before persisting unstable.entries.
+type unstable struct {
+ // the incoming unstable snapshot, if any.
+ snapshot *pb.Snapshot
+ // all entries that have not yet been written to storage.
+ entries []pb.Entry
+ offset uint64
+
+ logger Logger
+}
+
+// maybeFirstIndex returns the index of the first possible entry in entries
+// if it has a snapshot.
+func (u *unstable) maybeFirstIndex() (uint64, bool) {
+ if u.snapshot != nil {
+ return u.snapshot.Metadata.Index + 1, true
+ }
+ return 0, false
+}
+
+// maybeLastIndex returns the last index if it has at least one
+// unstable entry or snapshot.
+func (u *unstable) maybeLastIndex() (uint64, bool) {
+ if l := len(u.entries); l != 0 {
+ return u.offset + uint64(l) - 1, true
+ }
+ if u.snapshot != nil {
+ return u.snapshot.Metadata.Index, true
+ }
+ return 0, false
+}
+
+// maybeTerm returns the term of the entry at index i, if there
+// is any.
+func (u *unstable) maybeTerm(i uint64) (uint64, bool) {
+ if i < u.offset {
+ if u.snapshot == nil {
+ return 0, false
+ }
+ if u.snapshot.Metadata.Index == i {
+ return u.snapshot.Metadata.Term, true
+ }
+ return 0, false
+ }
+
+ last, ok := u.maybeLastIndex()
+ if !ok {
+ return 0, false
+ }
+ if i > last {
+ return 0, false
+ }
+ return u.entries[i-u.offset].Term, true
+}
+
+func (u *unstable) stableTo(i, t uint64) {
+ gt, ok := u.maybeTerm(i)
+ if !ok {
+ return
+ }
+ // if i < offset, term is matched with the snapshot
+ // only update the unstable entries if term is matched with
+ // an unstable entry.
+ if gt == t && i >= u.offset {
+ u.entries = u.entries[i+1-u.offset:]
+ u.offset = i + 1
+ u.shrinkEntriesArray()
+ }
+}
+
+// shrinkEntriesArray discards the underlying array used by the entries slice
+// if most of it isn't being used. This avoids holding references to a bunch of
+// potentially large entries that aren't needed anymore. Simply clearing the
+// entries wouldn't be safe because clients might still be using them.
+func (u *unstable) shrinkEntriesArray() {
+ // We replace the array if we're using less than half of the space in
+ // it. This number is fairly arbitrary, chosen as an attempt to balance
+ // memory usage vs number of allocations. It could probably be improved
+ // with some focused tuning.
+ const lenMultiple = 2
+ if len(u.entries) == 0 {
+ u.entries = nil
+ } else if len(u.entries)*lenMultiple < cap(u.entries) {
+ newEntries := make([]pb.Entry, len(u.entries))
+ copy(newEntries, u.entries)
+ u.entries = newEntries
+ }
+}
+
+func (u *unstable) stableSnapTo(i uint64) {
+ if u.snapshot != nil && u.snapshot.Metadata.Index == i {
+ u.snapshot = nil
+ }
+}
+
+func (u *unstable) restore(s pb.Snapshot) {
+ u.offset = s.Metadata.Index + 1
+ u.entries = nil
+ u.snapshot = &s
+}
+
+func (u *unstable) truncateAndAppend(ents []pb.Entry) {
+ after := ents[0].Index
+ switch {
+ case after == u.offset+uint64(len(u.entries)):
+ // after is the next index in the u.entries
+ // directly append
+ u.entries = append(u.entries, ents...)
+ case after <= u.offset:
+ u.logger.Infof("replace the unstable entries from index %d", after)
+ // The log is being truncated to before our current offset
+ // portion, so set the offset and replace the entries
+ u.offset = after
+ u.entries = ents
+ default:
+ // truncate to after and copy to u.entries
+ // then append
+ u.logger.Infof("truncate the unstable entries before index %d", after)
+ u.entries = append([]pb.Entry{}, u.slice(u.offset, after)...)
+ u.entries = append(u.entries, ents...)
+ }
+}
+
+func (u *unstable) slice(lo uint64, hi uint64) []pb.Entry {
+ u.mustCheckOutOfBounds(lo, hi)
+ return u.entries[lo-u.offset : hi-u.offset]
+}
+
+// u.offset <= lo <= hi <= u.offset+len(u.entries)
+func (u *unstable) mustCheckOutOfBounds(lo, hi uint64) {
+ if lo > hi {
+ u.logger.Panicf("invalid unstable.slice %d > %d", lo, hi)
+ }
+ upper := u.offset + uint64(len(u.entries))
+ if lo < u.offset || hi > upper {
+ u.logger.Panicf("unstable.slice[%d,%d) out of bound [%d,%d]", lo, hi, u.offset, upper)
+ }
+}
diff --git a/vendor/go.etcd.io/etcd/raft/logger.go b/vendor/go.etcd.io/etcd/raft/logger.go
new file mode 100644
index 0000000..426a77d
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/logger.go
@@ -0,0 +1,126 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raft
+
+import (
+ "fmt"
+ "io/ioutil"
+ "log"
+ "os"
+)
+
+type Logger interface {
+ Debug(v ...interface{})
+ Debugf(format string, v ...interface{})
+
+ Error(v ...interface{})
+ Errorf(format string, v ...interface{})
+
+ Info(v ...interface{})
+ Infof(format string, v ...interface{})
+
+ Warning(v ...interface{})
+ Warningf(format string, v ...interface{})
+
+ Fatal(v ...interface{})
+ Fatalf(format string, v ...interface{})
+
+ Panic(v ...interface{})
+ Panicf(format string, v ...interface{})
+}
+
+func SetLogger(l Logger) { raftLogger = l }
+
+var (
+ defaultLogger = &DefaultLogger{Logger: log.New(os.Stderr, "raft", log.LstdFlags)}
+ discardLogger = &DefaultLogger{Logger: log.New(ioutil.Discard, "", 0)}
+ raftLogger = Logger(defaultLogger)
+)
+
+const (
+ calldepth = 2
+)
+
+// DefaultLogger is a default implementation of the Logger interface.
+type DefaultLogger struct {
+ *log.Logger
+ debug bool
+}
+
+func (l *DefaultLogger) EnableTimestamps() {
+ l.SetFlags(l.Flags() | log.Ldate | log.Ltime)
+}
+
+func (l *DefaultLogger) EnableDebug() {
+ l.debug = true
+}
+
+func (l *DefaultLogger) Debug(v ...interface{}) {
+ if l.debug {
+ l.Output(calldepth, header("DEBUG", fmt.Sprint(v...)))
+ }
+}
+
+func (l *DefaultLogger) Debugf(format string, v ...interface{}) {
+ if l.debug {
+ l.Output(calldepth, header("DEBUG", fmt.Sprintf(format, v...)))
+ }
+}
+
+func (l *DefaultLogger) Info(v ...interface{}) {
+ l.Output(calldepth, header("INFO", fmt.Sprint(v...)))
+}
+
+func (l *DefaultLogger) Infof(format string, v ...interface{}) {
+ l.Output(calldepth, header("INFO", fmt.Sprintf(format, v...)))
+}
+
+func (l *DefaultLogger) Error(v ...interface{}) {
+ l.Output(calldepth, header("ERROR", fmt.Sprint(v...)))
+}
+
+func (l *DefaultLogger) Errorf(format string, v ...interface{}) {
+ l.Output(calldepth, header("ERROR", fmt.Sprintf(format, v...)))
+}
+
+func (l *DefaultLogger) Warning(v ...interface{}) {
+ l.Output(calldepth, header("WARN", fmt.Sprint(v...)))
+}
+
+func (l *DefaultLogger) Warningf(format string, v ...interface{}) {
+ l.Output(calldepth, header("WARN", fmt.Sprintf(format, v...)))
+}
+
+func (l *DefaultLogger) Fatal(v ...interface{}) {
+ l.Output(calldepth, header("FATAL", fmt.Sprint(v...)))
+ os.Exit(1)
+}
+
+func (l *DefaultLogger) Fatalf(format string, v ...interface{}) {
+ l.Output(calldepth, header("FATAL", fmt.Sprintf(format, v...)))
+ os.Exit(1)
+}
+
+func (l *DefaultLogger) Panic(v ...interface{}) {
+ l.Logger.Panic(v...)
+}
+
+func (l *DefaultLogger) Panicf(format string, v ...interface{}) {
+ l.Logger.Panicf(format, v...)
+}
+
+func header(lvl, msg string) string {
+ return fmt.Sprintf("%s: %s", lvl, msg)
+}
diff --git a/vendor/go.etcd.io/etcd/raft/node.go b/vendor/go.etcd.io/etcd/raft/node.go
new file mode 100644
index 0000000..2ec2c3a
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/node.go
@@ -0,0 +1,611 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raft
+
+import (
+ "context"
+ "errors"
+
+ pb "go.etcd.io/etcd/raft/raftpb"
+)
+
+type SnapshotStatus int
+
+const (
+ SnapshotFinish SnapshotStatus = 1
+ SnapshotFailure SnapshotStatus = 2
+)
+
+var (
+ emptyState = pb.HardState{}
+
+ // ErrStopped is returned by methods on Nodes that have been stopped.
+ ErrStopped = errors.New("raft: stopped")
+)
+
+// SoftState provides state that is useful for logging and debugging.
+// The state is volatile and does not need to be persisted to the WAL.
+type SoftState struct {
+ Lead uint64 // must use atomic operations to access; keep 64-bit aligned.
+ RaftState StateType
+}
+
+func (a *SoftState) equal(b *SoftState) bool {
+ return a.Lead == b.Lead && a.RaftState == b.RaftState
+}
+
+// Ready encapsulates the entries and messages that are ready to read,
+// be saved to stable storage, committed or sent to other peers.
+// All fields in Ready are read-only.
+type Ready struct {
+ // The current volatile state of a Node.
+ // SoftState will be nil if there is no update.
+ // It is not required to consume or store SoftState.
+ *SoftState
+
+ // The current state of a Node to be saved to stable storage BEFORE
+ // Messages are sent.
+ // HardState will be equal to empty state if there is no update.
+ pb.HardState
+
+ // ReadStates can be used for node to serve linearizable read requests locally
+ // when its applied index is greater than the index in ReadState.
+ // Note that the readState will be returned when raft receives msgReadIndex.
+ // The returned is only valid for the request that requested to read.
+ ReadStates []ReadState
+
+ // Entries specifies entries to be saved to stable storage BEFORE
+ // Messages are sent.
+ Entries []pb.Entry
+
+ // Snapshot specifies the snapshot to be saved to stable storage.
+ Snapshot pb.Snapshot
+
+ // CommittedEntries specifies entries to be committed to a
+ // store/state-machine. These have previously been committed to stable
+ // store.
+ CommittedEntries []pb.Entry
+
+ // Messages specifies outbound messages to be sent AFTER Entries are
+ // committed to stable storage.
+ // If it contains a MsgSnap message, the application MUST report back to raft
+ // when the snapshot has been received or has failed by calling ReportSnapshot.
+ Messages []pb.Message
+
+ // MustSync indicates whether the HardState and Entries must be synchronously
+ // written to disk or if an asynchronous write is permissible.
+ MustSync bool
+}
+
+func isHardStateEqual(a, b pb.HardState) bool {
+ return a.Term == b.Term && a.Vote == b.Vote && a.Commit == b.Commit
+}
+
+// IsEmptyHardState returns true if the given HardState is empty.
+func IsEmptyHardState(st pb.HardState) bool {
+ return isHardStateEqual(st, emptyState)
+}
+
+// IsEmptySnap returns true if the given Snapshot is empty.
+func IsEmptySnap(sp pb.Snapshot) bool {
+ return sp.Metadata.Index == 0
+}
+
+func (rd Ready) containsUpdates() bool {
+ return rd.SoftState != nil || !IsEmptyHardState(rd.HardState) ||
+ !IsEmptySnap(rd.Snapshot) || len(rd.Entries) > 0 ||
+ len(rd.CommittedEntries) > 0 || len(rd.Messages) > 0 || len(rd.ReadStates) != 0
+}
+
+// appliedCursor extracts from the Ready the highest index the client has
+// applied (once the Ready is confirmed via Advance). If no information is
+// contained in the Ready, returns zero.
+func (rd Ready) appliedCursor() uint64 {
+ if n := len(rd.CommittedEntries); n > 0 {
+ return rd.CommittedEntries[n-1].Index
+ }
+ if index := rd.Snapshot.Metadata.Index; index > 0 {
+ return index
+ }
+ return 0
+}
+
+// Node represents a node in a raft cluster.
+type Node interface {
+ // Tick increments the internal logical clock for the Node by a single tick. Election
+ // timeouts and heartbeat timeouts are in units of ticks.
+ Tick()
+ // Campaign causes the Node to transition to candidate state and start campaigning to become leader.
+ Campaign(ctx context.Context) error
+ // Propose proposes that data be appended to the log. Note that proposals can be lost without
+ // notice, therefore it is user's job to ensure proposal retries.
+ Propose(ctx context.Context, data []byte) error
+ // ProposeConfChange proposes config change.
+ // At most one ConfChange can be in the process of going through consensus.
+ // Application needs to call ApplyConfChange when applying EntryConfChange type entry.
+ ProposeConfChange(ctx context.Context, cc pb.ConfChange) error
+ // Step advances the state machine using the given message. ctx.Err() will be returned, if any.
+ Step(ctx context.Context, msg pb.Message) error
+
+ // Ready returns a channel that returns the current point-in-time state.
+ // Users of the Node must call Advance after retrieving the state returned by Ready.
+ //
+ // NOTE: No committed entries from the next Ready may be applied until all committed entries
+ // and snapshots from the previous one have finished.
+ Ready() <-chan Ready
+
+ // Advance notifies the Node that the application has saved progress up to the last Ready.
+ // It prepares the node to return the next available Ready.
+ //
+ // The application should generally call Advance after it applies the entries in last Ready.
+ //
+ // However, as an optimization, the application may call Advance while it is applying the
+ // commands. For example. when the last Ready contains a snapshot, the application might take
+ // a long time to apply the snapshot data. To continue receiving Ready without blocking raft
+ // progress, it can call Advance before finishing applying the last ready.
+ Advance()
+ // ApplyConfChange applies config change to the local node.
+ // Returns an opaque ConfState protobuf which must be recorded
+ // in snapshots. Will never return nil; it returns a pointer only
+ // to match MemoryStorage.Compact.
+ ApplyConfChange(cc pb.ConfChange) *pb.ConfState
+
+ // TransferLeadership attempts to transfer leadership to the given transferee.
+ TransferLeadership(ctx context.Context, lead, transferee uint64)
+
+ // ReadIndex request a read state. The read state will be set in the ready.
+ // Read state has a read index. Once the application advances further than the read
+ // index, any linearizable read requests issued before the read request can be
+ // processed safely. The read state will have the same rctx attached.
+ ReadIndex(ctx context.Context, rctx []byte) error
+
+ // Status returns the current status of the raft state machine.
+ Status() Status
+ // ReportUnreachable reports the given node is not reachable for the last send.
+ ReportUnreachable(id uint64)
+ // ReportSnapshot reports the status of the sent snapshot. The id is the raft ID of the follower
+ // who is meant to receive the snapshot, and the status is SnapshotFinish or SnapshotFailure.
+ // Calling ReportSnapshot with SnapshotFinish is a no-op. But, any failure in applying a
+ // snapshot (for e.g., while streaming it from leader to follower), should be reported to the
+ // leader with SnapshotFailure. When leader sends a snapshot to a follower, it pauses any raft
+ // log probes until the follower can apply the snapshot and advance its state. If the follower
+ // can't do that, for e.g., due to a crash, it could end up in a limbo, never getting any
+ // updates from the leader. Therefore, it is crucial that the application ensures that any
+ // failure in snapshot sending is caught and reported back to the leader; so it can resume raft
+ // log probing in the follower.
+ ReportSnapshot(id uint64, status SnapshotStatus)
+ // Stop performs any necessary termination of the Node.
+ Stop()
+}
+
+type Peer struct {
+ ID uint64
+ Context []byte
+}
+
+// StartNode returns a new Node given configuration and a list of raft peers.
+// It appends a ConfChangeAddNode entry for each given peer to the initial log.
+func StartNode(c *Config, peers []Peer) Node {
+ r := newRaft(c)
+ // become the follower at term 1 and apply initial configuration
+ // entries of term 1
+ r.becomeFollower(1, None)
+ for _, peer := range peers {
+ cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
+ d, err := cc.Marshal()
+ if err != nil {
+ panic("unexpected marshal error")
+ }
+ e := pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: r.raftLog.lastIndex() + 1, Data: d}
+ r.raftLog.append(e)
+ }
+ // Mark these initial entries as committed.
+ // TODO(bdarnell): These entries are still unstable; do we need to preserve
+ // the invariant that committed < unstable?
+ r.raftLog.committed = r.raftLog.lastIndex()
+ // Now apply them, mainly so that the application can call Campaign
+ // immediately after StartNode in tests. Note that these nodes will
+ // be added to raft twice: here and when the application's Ready
+ // loop calls ApplyConfChange. The calls to addNode must come after
+ // all calls to raftLog.append so progress.next is set after these
+ // bootstrapping entries (it is an error if we try to append these
+ // entries since they have already been committed).
+ // We do not set raftLog.applied so the application will be able
+ // to observe all conf changes via Ready.CommittedEntries.
+ for _, peer := range peers {
+ r.addNode(peer.ID)
+ }
+
+ n := newNode()
+ n.logger = c.Logger
+ go n.run(r)
+ return &n
+}
+
+// RestartNode is similar to StartNode but does not take a list of peers.
+// The current membership of the cluster will be restored from the Storage.
+// If the caller has an existing state machine, pass in the last log index that
+// has been applied to it; otherwise use zero.
+func RestartNode(c *Config) Node {
+ r := newRaft(c)
+
+ n := newNode()
+ n.logger = c.Logger
+ go n.run(r)
+ return &n
+}
+
+type msgWithResult struct {
+ m pb.Message
+ result chan error
+}
+
+// node is the canonical implementation of the Node interface
+type node struct {
+ propc chan msgWithResult
+ recvc chan pb.Message
+ confc chan pb.ConfChange
+ confstatec chan pb.ConfState
+ readyc chan Ready
+ advancec chan struct{}
+ tickc chan struct{}
+ done chan struct{}
+ stop chan struct{}
+ status chan chan Status
+
+ logger Logger
+}
+
+func newNode() node {
+ return node{
+ propc: make(chan msgWithResult),
+ recvc: make(chan pb.Message),
+ confc: make(chan pb.ConfChange),
+ confstatec: make(chan pb.ConfState),
+ readyc: make(chan Ready),
+ advancec: make(chan struct{}),
+ // make tickc a buffered chan, so raft node can buffer some ticks when the node
+ // is busy processing raft messages. Raft node will resume process buffered
+ // ticks when it becomes idle.
+ tickc: make(chan struct{}, 128),
+ done: make(chan struct{}),
+ stop: make(chan struct{}),
+ status: make(chan chan Status),
+ }
+}
+
+func (n *node) Stop() {
+ select {
+ case n.stop <- struct{}{}:
+ // Not already stopped, so trigger it
+ case <-n.done:
+ // Node has already been stopped - no need to do anything
+ return
+ }
+ // Block until the stop has been acknowledged by run()
+ <-n.done
+}
+
+func (n *node) run(r *raft) {
+ var propc chan msgWithResult
+ var readyc chan Ready
+ var advancec chan struct{}
+ var prevLastUnstablei, prevLastUnstablet uint64
+ var havePrevLastUnstablei bool
+ var prevSnapi uint64
+ var applyingToI uint64
+ var rd Ready
+
+ lead := None
+ prevSoftSt := r.softState()
+ prevHardSt := emptyState
+
+ for {
+ if advancec != nil {
+ readyc = nil
+ } else {
+ rd = newReady(r, prevSoftSt, prevHardSt)
+ if rd.containsUpdates() {
+ readyc = n.readyc
+ } else {
+ readyc = nil
+ }
+ }
+
+ if lead != r.lead {
+ if r.hasLeader() {
+ if lead == None {
+ r.logger.Infof("raft.node: %x elected leader %x at term %d", r.id, r.lead, r.Term)
+ } else {
+ r.logger.Infof("raft.node: %x changed leader from %x to %x at term %d", r.id, lead, r.lead, r.Term)
+ }
+ propc = n.propc
+ } else {
+ r.logger.Infof("raft.node: %x lost leader %x at term %d", r.id, lead, r.Term)
+ propc = nil
+ }
+ lead = r.lead
+ }
+
+ select {
+ // TODO: maybe buffer the config propose if there exists one (the way
+ // described in raft dissertation)
+ // Currently it is dropped in Step silently.
+ case pm := <-propc:
+ m := pm.m
+ m.From = r.id
+ err := r.Step(m)
+ if pm.result != nil {
+ pm.result <- err
+ close(pm.result)
+ }
+ case m := <-n.recvc:
+ // filter out response message from unknown From.
+ if pr := r.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
+ r.Step(m)
+ }
+ case cc := <-n.confc:
+ if cc.NodeID == None {
+ select {
+ case n.confstatec <- pb.ConfState{
+ Nodes: r.nodes(),
+ Learners: r.learnerNodes()}:
+ case <-n.done:
+ }
+ break
+ }
+ switch cc.Type {
+ case pb.ConfChangeAddNode:
+ r.addNode(cc.NodeID)
+ case pb.ConfChangeAddLearnerNode:
+ r.addLearner(cc.NodeID)
+ case pb.ConfChangeRemoveNode:
+ // block incoming proposal when local node is
+ // removed
+ if cc.NodeID == r.id {
+ propc = nil
+ }
+ r.removeNode(cc.NodeID)
+ case pb.ConfChangeUpdateNode:
+ default:
+ panic("unexpected conf type")
+ }
+ select {
+ case n.confstatec <- pb.ConfState{
+ Nodes: r.nodes(),
+ Learners: r.learnerNodes()}:
+ case <-n.done:
+ }
+ case <-n.tickc:
+ r.tick()
+ case readyc <- rd:
+ if rd.SoftState != nil {
+ prevSoftSt = rd.SoftState
+ }
+ if len(rd.Entries) > 0 {
+ prevLastUnstablei = rd.Entries[len(rd.Entries)-1].Index
+ prevLastUnstablet = rd.Entries[len(rd.Entries)-1].Term
+ havePrevLastUnstablei = true
+ }
+ if !IsEmptyHardState(rd.HardState) {
+ prevHardSt = rd.HardState
+ }
+ if !IsEmptySnap(rd.Snapshot) {
+ prevSnapi = rd.Snapshot.Metadata.Index
+ }
+ if index := rd.appliedCursor(); index != 0 {
+ applyingToI = index
+ }
+
+ r.msgs = nil
+ r.readStates = nil
+ r.reduceUncommittedSize(rd.CommittedEntries)
+ advancec = n.advancec
+ case <-advancec:
+ if applyingToI != 0 {
+ r.raftLog.appliedTo(applyingToI)
+ applyingToI = 0
+ }
+ if havePrevLastUnstablei {
+ r.raftLog.stableTo(prevLastUnstablei, prevLastUnstablet)
+ havePrevLastUnstablei = false
+ }
+ r.raftLog.stableSnapTo(prevSnapi)
+ advancec = nil
+ case c := <-n.status:
+ c <- getStatus(r)
+ case <-n.stop:
+ close(n.done)
+ return
+ }
+ }
+}
+
+// Tick increments the internal logical clock for this Node. Election timeouts
+// and heartbeat timeouts are in units of ticks.
+func (n *node) Tick() {
+ select {
+ case n.tickc <- struct{}{}:
+ case <-n.done:
+ default:
+ n.logger.Warningf("A tick missed to fire. Node blocks too long!")
+ }
+}
+
+func (n *node) Campaign(ctx context.Context) error { return n.step(ctx, pb.Message{Type: pb.MsgHup}) }
+
+func (n *node) Propose(ctx context.Context, data []byte) error {
+ return n.stepWait(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Data: data}}})
+}
+
+func (n *node) Step(ctx context.Context, m pb.Message) error {
+ // ignore unexpected local messages receiving over network
+ if IsLocalMsg(m.Type) {
+ // TODO: return an error?
+ return nil
+ }
+ return n.step(ctx, m)
+}
+
+func (n *node) ProposeConfChange(ctx context.Context, cc pb.ConfChange) error {
+ data, err := cc.Marshal()
+ if err != nil {
+ return err
+ }
+ return n.Step(ctx, pb.Message{Type: pb.MsgProp, Entries: []pb.Entry{{Type: pb.EntryConfChange, Data: data}}})
+}
+
+func (n *node) step(ctx context.Context, m pb.Message) error {
+ return n.stepWithWaitOption(ctx, m, false)
+}
+
+func (n *node) stepWait(ctx context.Context, m pb.Message) error {
+ return n.stepWithWaitOption(ctx, m, true)
+}
+
+// Step advances the state machine using msgs. The ctx.Err() will be returned,
+// if any.
+func (n *node) stepWithWaitOption(ctx context.Context, m pb.Message, wait bool) error {
+ if m.Type != pb.MsgProp {
+ select {
+ case n.recvc <- m:
+ return nil
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-n.done:
+ return ErrStopped
+ }
+ }
+ ch := n.propc
+ pm := msgWithResult{m: m}
+ if wait {
+ pm.result = make(chan error, 1)
+ }
+ select {
+ case ch <- pm:
+ if !wait {
+ return nil
+ }
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-n.done:
+ return ErrStopped
+ }
+ select {
+ case err := <-pm.result:
+ if err != nil {
+ return err
+ }
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-n.done:
+ return ErrStopped
+ }
+ return nil
+}
+
+func (n *node) Ready() <-chan Ready { return n.readyc }
+
+func (n *node) Advance() {
+ select {
+ case n.advancec <- struct{}{}:
+ case <-n.done:
+ }
+}
+
+func (n *node) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
+ var cs pb.ConfState
+ select {
+ case n.confc <- cc:
+ case <-n.done:
+ }
+ select {
+ case cs = <-n.confstatec:
+ case <-n.done:
+ }
+ return &cs
+}
+
+func (n *node) Status() Status {
+ c := make(chan Status)
+ select {
+ case n.status <- c:
+ return <-c
+ case <-n.done:
+ return Status{}
+ }
+}
+
+func (n *node) ReportUnreachable(id uint64) {
+ select {
+ case n.recvc <- pb.Message{Type: pb.MsgUnreachable, From: id}:
+ case <-n.done:
+ }
+}
+
+func (n *node) ReportSnapshot(id uint64, status SnapshotStatus) {
+ rej := status == SnapshotFailure
+
+ select {
+ case n.recvc <- pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej}:
+ case <-n.done:
+ }
+}
+
+func (n *node) TransferLeadership(ctx context.Context, lead, transferee uint64) {
+ select {
+ // manually set 'from' and 'to', so that leader can voluntarily transfers its leadership
+ case n.recvc <- pb.Message{Type: pb.MsgTransferLeader, From: transferee, To: lead}:
+ case <-n.done:
+ case <-ctx.Done():
+ }
+}
+
+func (n *node) ReadIndex(ctx context.Context, rctx []byte) error {
+ return n.step(ctx, pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
+}
+
+func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
+ rd := Ready{
+ Entries: r.raftLog.unstableEntries(),
+ CommittedEntries: r.raftLog.nextEnts(),
+ Messages: r.msgs,
+ }
+ if softSt := r.softState(); !softSt.equal(prevSoftSt) {
+ rd.SoftState = softSt
+ }
+ if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
+ rd.HardState = hardSt
+ }
+ if r.raftLog.unstable.snapshot != nil {
+ rd.Snapshot = *r.raftLog.unstable.snapshot
+ }
+ if len(r.readStates) != 0 {
+ rd.ReadStates = r.readStates
+ }
+ rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
+ return rd
+}
+
+// MustSync returns true if the hard state and count of Raft entries indicate
+// that a synchronous write to persistent storage is required.
+func MustSync(st, prevst pb.HardState, entsnum int) bool {
+ // Persistent state on all servers:
+ // (Updated on stable storage before responding to RPCs)
+ // currentTerm
+ // votedFor
+ // log entries[]
+ return entsnum != 0 || st.Vote != prevst.Vote || st.Term != prevst.Term
+}
diff --git a/vendor/go.etcd.io/etcd/raft/progress.go b/vendor/go.etcd.io/etcd/raft/progress.go
new file mode 100644
index 0000000..ef3787d
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/progress.go
@@ -0,0 +1,284 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raft
+
+import "fmt"
+
+const (
+ ProgressStateProbe ProgressStateType = iota
+ ProgressStateReplicate
+ ProgressStateSnapshot
+)
+
+type ProgressStateType uint64
+
+var prstmap = [...]string{
+ "ProgressStateProbe",
+ "ProgressStateReplicate",
+ "ProgressStateSnapshot",
+}
+
+func (st ProgressStateType) String() string { return prstmap[uint64(st)] }
+
+// Progress represents a follower’s progress in the view of the leader. Leader maintains
+// progresses of all followers, and sends entries to the follower based on its progress.
+type Progress struct {
+ Match, Next uint64
+ // State defines how the leader should interact with the follower.
+ //
+ // When in ProgressStateProbe, leader sends at most one replication message
+ // per heartbeat interval. It also probes actual progress of the follower.
+ //
+ // When in ProgressStateReplicate, leader optimistically increases next
+ // to the latest entry sent after sending replication message. This is
+ // an optimized state for fast replicating log entries to the follower.
+ //
+ // When in ProgressStateSnapshot, leader should have sent out snapshot
+ // before and stops sending any replication message.
+ State ProgressStateType
+
+ // Paused is used in ProgressStateProbe.
+ // When Paused is true, raft should pause sending replication message to this peer.
+ Paused bool
+ // PendingSnapshot is used in ProgressStateSnapshot.
+ // If there is a pending snapshot, the pendingSnapshot will be set to the
+ // index of the snapshot. If pendingSnapshot is set, the replication process of
+ // this Progress will be paused. raft will not resend snapshot until the pending one
+ // is reported to be failed.
+ PendingSnapshot uint64
+
+ // RecentActive is true if the progress is recently active. Receiving any messages
+ // from the corresponding follower indicates the progress is active.
+ // RecentActive can be reset to false after an election timeout.
+ RecentActive bool
+
+ // inflights is a sliding window for the inflight messages.
+ // Each inflight message contains one or more log entries.
+ // The max number of entries per message is defined in raft config as MaxSizePerMsg.
+ // Thus inflight effectively limits both the number of inflight messages
+ // and the bandwidth each Progress can use.
+ // When inflights is full, no more message should be sent.
+ // When a leader sends out a message, the index of the last
+ // entry should be added to inflights. The index MUST be added
+ // into inflights in order.
+ // When a leader receives a reply, the previous inflights should
+ // be freed by calling inflights.freeTo with the index of the last
+ // received entry.
+ ins *inflights
+
+ // IsLearner is true if this progress is tracked for a learner.
+ IsLearner bool
+}
+
+func (pr *Progress) resetState(state ProgressStateType) {
+ pr.Paused = false
+ pr.PendingSnapshot = 0
+ pr.State = state
+ pr.ins.reset()
+}
+
+func (pr *Progress) becomeProbe() {
+ // If the original state is ProgressStateSnapshot, progress knows that
+ // the pending snapshot has been sent to this peer successfully, then
+ // probes from pendingSnapshot + 1.
+ if pr.State == ProgressStateSnapshot {
+ pendingSnapshot := pr.PendingSnapshot
+ pr.resetState(ProgressStateProbe)
+ pr.Next = max(pr.Match+1, pendingSnapshot+1)
+ } else {
+ pr.resetState(ProgressStateProbe)
+ pr.Next = pr.Match + 1
+ }
+}
+
+func (pr *Progress) becomeReplicate() {
+ pr.resetState(ProgressStateReplicate)
+ pr.Next = pr.Match + 1
+}
+
+func (pr *Progress) becomeSnapshot(snapshoti uint64) {
+ pr.resetState(ProgressStateSnapshot)
+ pr.PendingSnapshot = snapshoti
+}
+
+// maybeUpdate returns false if the given n index comes from an outdated message.
+// Otherwise it updates the progress and returns true.
+func (pr *Progress) maybeUpdate(n uint64) bool {
+ var updated bool
+ if pr.Match < n {
+ pr.Match = n
+ updated = true
+ pr.resume()
+ }
+ if pr.Next < n+1 {
+ pr.Next = n + 1
+ }
+ return updated
+}
+
+func (pr *Progress) optimisticUpdate(n uint64) { pr.Next = n + 1 }
+
+// maybeDecrTo returns false if the given to index comes from an out of order message.
+// Otherwise it decreases the progress next index to min(rejected, last) and returns true.
+func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
+ if pr.State == ProgressStateReplicate {
+ // the rejection must be stale if the progress has matched and "rejected"
+ // is smaller than "match".
+ if rejected <= pr.Match {
+ return false
+ }
+ // directly decrease next to match + 1
+ pr.Next = pr.Match + 1
+ return true
+ }
+
+ // the rejection must be stale if "rejected" does not match next - 1
+ if pr.Next-1 != rejected {
+ return false
+ }
+
+ if pr.Next = min(rejected, last+1); pr.Next < 1 {
+ pr.Next = 1
+ }
+ pr.resume()
+ return true
+}
+
+func (pr *Progress) pause() { pr.Paused = true }
+func (pr *Progress) resume() { pr.Paused = false }
+
+// IsPaused returns whether sending log entries to this node has been
+// paused. A node may be paused because it has rejected recent
+// MsgApps, is currently waiting for a snapshot, or has reached the
+// MaxInflightMsgs limit.
+func (pr *Progress) IsPaused() bool {
+ switch pr.State {
+ case ProgressStateProbe:
+ return pr.Paused
+ case ProgressStateReplicate:
+ return pr.ins.full()
+ case ProgressStateSnapshot:
+ return true
+ default:
+ panic("unexpected state")
+ }
+}
+
+func (pr *Progress) snapshotFailure() { pr.PendingSnapshot = 0 }
+
+// needSnapshotAbort returns true if snapshot progress's Match
+// is equal or higher than the pendingSnapshot.
+func (pr *Progress) needSnapshotAbort() bool {
+ return pr.State == ProgressStateSnapshot && pr.Match >= pr.PendingSnapshot
+}
+
+func (pr *Progress) String() string {
+ return fmt.Sprintf("next = %d, match = %d, state = %s, waiting = %v, pendingSnapshot = %d", pr.Next, pr.Match, pr.State, pr.IsPaused(), pr.PendingSnapshot)
+}
+
+type inflights struct {
+ // the starting index in the buffer
+ start int
+ // number of inflights in the buffer
+ count int
+
+ // the size of the buffer
+ size int
+
+ // buffer contains the index of the last entry
+ // inside one message.
+ buffer []uint64
+}
+
+func newInflights(size int) *inflights {
+ return &inflights{
+ size: size,
+ }
+}
+
+// add adds an inflight into inflights
+func (in *inflights) add(inflight uint64) {
+ if in.full() {
+ panic("cannot add into a full inflights")
+ }
+ next := in.start + in.count
+ size := in.size
+ if next >= size {
+ next -= size
+ }
+ if next >= len(in.buffer) {
+ in.growBuf()
+ }
+ in.buffer[next] = inflight
+ in.count++
+}
+
+// grow the inflight buffer by doubling up to inflights.size. We grow on demand
+// instead of preallocating to inflights.size to handle systems which have
+// thousands of Raft groups per process.
+func (in *inflights) growBuf() {
+ newSize := len(in.buffer) * 2
+ if newSize == 0 {
+ newSize = 1
+ } else if newSize > in.size {
+ newSize = in.size
+ }
+ newBuffer := make([]uint64, newSize)
+ copy(newBuffer, in.buffer)
+ in.buffer = newBuffer
+}
+
+// freeTo frees the inflights smaller or equal to the given `to` flight.
+func (in *inflights) freeTo(to uint64) {
+ if in.count == 0 || to < in.buffer[in.start] {
+ // out of the left side of the window
+ return
+ }
+
+ idx := in.start
+ var i int
+ for i = 0; i < in.count; i++ {
+ if to < in.buffer[idx] { // found the first large inflight
+ break
+ }
+
+ // increase index and maybe rotate
+ size := in.size
+ if idx++; idx >= size {
+ idx -= size
+ }
+ }
+ // free i inflights and set new start index
+ in.count -= i
+ in.start = idx
+ if in.count == 0 {
+ // inflights is empty, reset the start index so that we don't grow the
+ // buffer unnecessarily.
+ in.start = 0
+ }
+}
+
+func (in *inflights) freeFirstOne() { in.freeTo(in.buffer[in.start]) }
+
+// full returns true if the inflights is full.
+func (in *inflights) full() bool {
+ return in.count == in.size
+}
+
+// resets frees all inflights.
+func (in *inflights) reset() {
+ in.count = 0
+ in.start = 0
+}
diff --git a/vendor/go.etcd.io/etcd/raft/raft.go b/vendor/go.etcd.io/etcd/raft/raft.go
new file mode 100644
index 0000000..e1e6a16
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/raft.go
@@ -0,0 +1,1575 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raft
+
+import (
+ "bytes"
+ "errors"
+ "fmt"
+ "math"
+ "math/rand"
+ "sort"
+ "strings"
+ "sync"
+ "time"
+
+ pb "go.etcd.io/etcd/raft/raftpb"
+)
+
+// None is a placeholder node ID used when there is no leader.
+const None uint64 = 0
+const noLimit = math.MaxUint64
+
+// Possible values for StateType.
+const (
+ StateFollower StateType = iota
+ StateCandidate
+ StateLeader
+ StatePreCandidate
+ numStates
+)
+
+type ReadOnlyOption int
+
+const (
+ // ReadOnlySafe guarantees the linearizability of the read only request by
+ // communicating with the quorum. It is the default and suggested option.
+ ReadOnlySafe ReadOnlyOption = iota
+ // ReadOnlyLeaseBased ensures linearizability of the read only request by
+ // relying on the leader lease. It can be affected by clock drift.
+ // If the clock drift is unbounded, leader might keep the lease longer than it
+ // should (clock can move backward/pause without any bound). ReadIndex is not safe
+ // in that case.
+ ReadOnlyLeaseBased
+)
+
+// Possible values for CampaignType
+const (
+ // campaignPreElection represents the first phase of a normal election when
+ // Config.PreVote is true.
+ campaignPreElection CampaignType = "CampaignPreElection"
+ // campaignElection represents a normal (time-based) election (the second phase
+ // of the election when Config.PreVote is true).
+ campaignElection CampaignType = "CampaignElection"
+ // campaignTransfer represents the type of leader transfer
+ campaignTransfer CampaignType = "CampaignTransfer"
+)
+
+// ErrProposalDropped is returned when the proposal is ignored by some cases,
+// so that the proposer can be notified and fail fast.
+var ErrProposalDropped = errors.New("raft proposal dropped")
+
+// lockedRand is a small wrapper around rand.Rand to provide
+// synchronization among multiple raft groups. Only the methods needed
+// by the code are exposed (e.g. Intn).
+type lockedRand struct {
+ mu sync.Mutex
+ rand *rand.Rand
+}
+
+func (r *lockedRand) Intn(n int) int {
+ r.mu.Lock()
+ v := r.rand.Intn(n)
+ r.mu.Unlock()
+ return v
+}
+
+var globalRand = &lockedRand{
+ rand: rand.New(rand.NewSource(time.Now().UnixNano())),
+}
+
+// CampaignType represents the type of campaigning
+// the reason we use the type of string instead of uint64
+// is because it's simpler to compare and fill in raft entries
+type CampaignType string
+
+// StateType represents the role of a node in a cluster.
+type StateType uint64
+
+var stmap = [...]string{
+ "StateFollower",
+ "StateCandidate",
+ "StateLeader",
+ "StatePreCandidate",
+}
+
+func (st StateType) String() string {
+ return stmap[uint64(st)]
+}
+
+// Config contains the parameters to start a raft.
+type Config struct {
+ // ID is the identity of the local raft. ID cannot be 0.
+ ID uint64
+
+ // peers contains the IDs of all nodes (including self) in the raft cluster. It
+ // should only be set when starting a new raft cluster. Restarting raft from
+ // previous configuration will panic if peers is set. peer is private and only
+ // used for testing right now.
+ peers []uint64
+
+ // learners contains the IDs of all learner nodes (including self if the
+ // local node is a learner) in the raft cluster. learners only receives
+ // entries from the leader node. It does not vote or promote itself.
+ learners []uint64
+
+ // ElectionTick is the number of Node.Tick invocations that must pass between
+ // elections. That is, if a follower does not receive any message from the
+ // leader of current term before ElectionTick has elapsed, it will become
+ // candidate and start an election. ElectionTick must be greater than
+ // HeartbeatTick. We suggest ElectionTick = 10 * HeartbeatTick to avoid
+ // unnecessary leader switching.
+ ElectionTick int
+ // HeartbeatTick is the number of Node.Tick invocations that must pass between
+ // heartbeats. That is, a leader sends heartbeat messages to maintain its
+ // leadership every HeartbeatTick ticks.
+ HeartbeatTick int
+
+ // Storage is the storage for raft. raft generates entries and states to be
+ // stored in storage. raft reads the persisted entries and states out of
+ // Storage when it needs. raft reads out the previous state and configuration
+ // out of storage when restarting.
+ Storage Storage
+ // Applied is the last applied index. It should only be set when restarting
+ // raft. raft will not return entries to the application smaller or equal to
+ // Applied. If Applied is unset when restarting, raft might return previous
+ // applied entries. This is a very application dependent configuration.
+ Applied uint64
+
+ // MaxSizePerMsg limits the max byte size of each append message. Smaller
+ // value lowers the raft recovery cost(initial probing and message lost
+ // during normal operation). On the other side, it might affect the
+ // throughput during normal replication. Note: math.MaxUint64 for unlimited,
+ // 0 for at most one entry per message.
+ MaxSizePerMsg uint64
+ // MaxCommittedSizePerReady limits the size of the committed entries which
+ // can be applied.
+ MaxCommittedSizePerReady uint64
+ // MaxUncommittedEntriesSize limits the aggregate byte size of the
+ // uncommitted entries that may be appended to a leader's log. Once this
+ // limit is exceeded, proposals will begin to return ErrProposalDropped
+ // errors. Note: 0 for no limit.
+ MaxUncommittedEntriesSize uint64
+ // MaxInflightMsgs limits the max number of in-flight append messages during
+ // optimistic replication phase. The application transportation layer usually
+ // has its own sending buffer over TCP/UDP. Setting MaxInflightMsgs to avoid
+ // overflowing that sending buffer. TODO (xiangli): feedback to application to
+ // limit the proposal rate?
+ MaxInflightMsgs int
+
+ // CheckQuorum specifies if the leader should check quorum activity. Leader
+ // steps down when quorum is not active for an electionTimeout.
+ CheckQuorum bool
+
+ // PreVote enables the Pre-Vote algorithm described in raft thesis section
+ // 9.6. This prevents disruption when a node that has been partitioned away
+ // rejoins the cluster.
+ PreVote bool
+
+ // ReadOnlyOption specifies how the read only request is processed.
+ //
+ // ReadOnlySafe guarantees the linearizability of the read only request by
+ // communicating with the quorum. It is the default and suggested option.
+ //
+ // ReadOnlyLeaseBased ensures linearizability of the read only request by
+ // relying on the leader lease. It can be affected by clock drift.
+ // If the clock drift is unbounded, leader might keep the lease longer than it
+ // should (clock can move backward/pause without any bound). ReadIndex is not safe
+ // in that case.
+ // CheckQuorum MUST be enabled if ReadOnlyOption is ReadOnlyLeaseBased.
+ ReadOnlyOption ReadOnlyOption
+
+ // Logger is the logger used for raft log. For multinode which can host
+ // multiple raft group, each raft group can have its own logger
+ Logger Logger
+
+ // DisableProposalForwarding set to true means that followers will drop
+ // proposals, rather than forwarding them to the leader. One use case for
+ // this feature would be in a situation where the Raft leader is used to
+ // compute the data of a proposal, for example, adding a timestamp from a
+ // hybrid logical clock to data in a monotonically increasing way. Forwarding
+ // should be disabled to prevent a follower with an inaccurate hybrid
+ // logical clock from assigning the timestamp and then forwarding the data
+ // to the leader.
+ DisableProposalForwarding bool
+}
+
+func (c *Config) validate() error {
+ if c.ID == None {
+ return errors.New("cannot use none as id")
+ }
+
+ if c.HeartbeatTick <= 0 {
+ return errors.New("heartbeat tick must be greater than 0")
+ }
+
+ if c.ElectionTick <= c.HeartbeatTick {
+ return errors.New("election tick must be greater than heartbeat tick")
+ }
+
+ if c.Storage == nil {
+ return errors.New("storage cannot be nil")
+ }
+
+ if c.MaxUncommittedEntriesSize == 0 {
+ c.MaxUncommittedEntriesSize = noLimit
+ }
+
+ // default MaxCommittedSizePerReady to MaxSizePerMsg because they were
+ // previously the same parameter.
+ if c.MaxCommittedSizePerReady == 0 {
+ c.MaxCommittedSizePerReady = c.MaxSizePerMsg
+ }
+
+ if c.MaxInflightMsgs <= 0 {
+ return errors.New("max inflight messages must be greater than 0")
+ }
+
+ if c.Logger == nil {
+ c.Logger = raftLogger
+ }
+
+ if c.ReadOnlyOption == ReadOnlyLeaseBased && !c.CheckQuorum {
+ return errors.New("CheckQuorum must be enabled when ReadOnlyOption is ReadOnlyLeaseBased")
+ }
+
+ return nil
+}
+
+type raft struct {
+ id uint64
+
+ Term uint64
+ Vote uint64
+
+ readStates []ReadState
+
+ // the log
+ raftLog *raftLog
+
+ maxMsgSize uint64
+ maxUncommittedSize uint64
+ maxInflight int
+ prs map[uint64]*Progress
+ learnerPrs map[uint64]*Progress
+ matchBuf uint64Slice
+
+ state StateType
+
+ // isLearner is true if the local raft node is a learner.
+ isLearner bool
+
+ votes map[uint64]bool
+
+ msgs []pb.Message
+
+ // the leader id
+ lead uint64
+ // leadTransferee is id of the leader transfer target when its value is not zero.
+ // Follow the procedure defined in raft thesis 3.10.
+ leadTransferee uint64
+ // Only one conf change may be pending (in the log, but not yet
+ // applied) at a time. This is enforced via pendingConfIndex, which
+ // is set to a value >= the log index of the latest pending
+ // configuration change (if any). Config changes are only allowed to
+ // be proposed if the leader's applied index is greater than this
+ // value.
+ pendingConfIndex uint64
+ // an estimate of the size of the uncommitted tail of the Raft log. Used to
+ // prevent unbounded log growth. Only maintained by the leader. Reset on
+ // term changes.
+ uncommittedSize uint64
+
+ readOnly *readOnly
+
+ // number of ticks since it reached last electionTimeout when it is leader
+ // or candidate.
+ // number of ticks since it reached last electionTimeout or received a
+ // valid message from current leader when it is a follower.
+ electionElapsed int
+
+ // number of ticks since it reached last heartbeatTimeout.
+ // only leader keeps heartbeatElapsed.
+ heartbeatElapsed int
+
+ checkQuorum bool
+ preVote bool
+
+ heartbeatTimeout int
+ electionTimeout int
+ // randomizedElectionTimeout is a random number between
+ // [electiontimeout, 2 * electiontimeout - 1]. It gets reset
+ // when raft changes its state to follower or candidate.
+ randomizedElectionTimeout int
+ disableProposalForwarding bool
+
+ tick func()
+ step stepFunc
+
+ logger Logger
+}
+
+func newRaft(c *Config) *raft {
+ if err := c.validate(); err != nil {
+ panic(err.Error())
+ }
+ raftlog := newLogWithSize(c.Storage, c.Logger, c.MaxCommittedSizePerReady)
+ hs, cs, err := c.Storage.InitialState()
+ if err != nil {
+ panic(err) // TODO(bdarnell)
+ }
+ peers := c.peers
+ learners := c.learners
+ if len(cs.Nodes) > 0 || len(cs.Learners) > 0 {
+ if len(peers) > 0 || len(learners) > 0 {
+ // TODO(bdarnell): the peers argument is always nil except in
+ // tests; the argument should be removed and these tests should be
+ // updated to specify their nodes through a snapshot.
+ panic("cannot specify both newRaft(peers, learners) and ConfState.(Nodes, Learners)")
+ }
+ peers = cs.Nodes
+ learners = cs.Learners
+ }
+ r := &raft{
+ id: c.ID,
+ lead: None,
+ isLearner: false,
+ raftLog: raftlog,
+ maxMsgSize: c.MaxSizePerMsg,
+ maxInflight: c.MaxInflightMsgs,
+ maxUncommittedSize: c.MaxUncommittedEntriesSize,
+ prs: make(map[uint64]*Progress),
+ learnerPrs: make(map[uint64]*Progress),
+ electionTimeout: c.ElectionTick,
+ heartbeatTimeout: c.HeartbeatTick,
+ logger: c.Logger,
+ checkQuorum: c.CheckQuorum,
+ preVote: c.PreVote,
+ readOnly: newReadOnly(c.ReadOnlyOption),
+ disableProposalForwarding: c.DisableProposalForwarding,
+ }
+ for _, p := range peers {
+ r.prs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight)}
+ }
+ for _, p := range learners {
+ if _, ok := r.prs[p]; ok {
+ panic(fmt.Sprintf("node %x is in both learner and peer list", p))
+ }
+ r.learnerPrs[p] = &Progress{Next: 1, ins: newInflights(r.maxInflight), IsLearner: true}
+ if r.id == p {
+ r.isLearner = true
+ }
+ }
+
+ if !isHardStateEqual(hs, emptyState) {
+ r.loadState(hs)
+ }
+ if c.Applied > 0 {
+ raftlog.appliedTo(c.Applied)
+ }
+ r.becomeFollower(r.Term, None)
+
+ var nodesStrs []string
+ for _, n := range r.nodes() {
+ nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
+ }
+
+ r.logger.Infof("newRaft %x [peers: [%s], term: %d, commit: %d, applied: %d, lastindex: %d, lastterm: %d]",
+ r.id, strings.Join(nodesStrs, ","), r.Term, r.raftLog.committed, r.raftLog.applied, r.raftLog.lastIndex(), r.raftLog.lastTerm())
+ return r
+}
+
+func (r *raft) hasLeader() bool { return r.lead != None }
+
+func (r *raft) softState() *SoftState { return &SoftState{Lead: r.lead, RaftState: r.state} }
+
+func (r *raft) hardState() pb.HardState {
+ return pb.HardState{
+ Term: r.Term,
+ Vote: r.Vote,
+ Commit: r.raftLog.committed,
+ }
+}
+
+func (r *raft) quorum() int { return len(r.prs)/2 + 1 }
+
+func (r *raft) nodes() []uint64 {
+ nodes := make([]uint64, 0, len(r.prs))
+ for id := range r.prs {
+ nodes = append(nodes, id)
+ }
+ sort.Sort(uint64Slice(nodes))
+ return nodes
+}
+
+func (r *raft) learnerNodes() []uint64 {
+ nodes := make([]uint64, 0, len(r.learnerPrs))
+ for id := range r.learnerPrs {
+ nodes = append(nodes, id)
+ }
+ sort.Sort(uint64Slice(nodes))
+ return nodes
+}
+
+// send persists state to stable storage and then sends to its mailbox.
+func (r *raft) send(m pb.Message) {
+ m.From = r.id
+ if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {
+ if m.Term == 0 {
+ // All {pre-,}campaign messages need to have the term set when
+ // sending.
+ // - MsgVote: m.Term is the term the node is campaigning for,
+ // non-zero as we increment the term when campaigning.
+ // - MsgVoteResp: m.Term is the new r.Term if the MsgVote was
+ // granted, non-zero for the same reason MsgVote is
+ // - MsgPreVote: m.Term is the term the node will campaign,
+ // non-zero as we use m.Term to indicate the next term we'll be
+ // campaigning for
+ // - MsgPreVoteResp: m.Term is the term received in the original
+ // MsgPreVote if the pre-vote was granted, non-zero for the
+ // same reasons MsgPreVote is
+ panic(fmt.Sprintf("term should be set when sending %s", m.Type))
+ }
+ } else {
+ if m.Term != 0 {
+ panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term))
+ }
+ // do not attach term to MsgProp, MsgReadIndex
+ // proposals are a way to forward to the leader and
+ // should be treated as local message.
+ // MsgReadIndex is also forwarded to leader.
+ if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
+ m.Term = r.Term
+ }
+ }
+ r.msgs = append(r.msgs, m)
+}
+
+func (r *raft) getProgress(id uint64) *Progress {
+ if pr, ok := r.prs[id]; ok {
+ return pr
+ }
+
+ return r.learnerPrs[id]
+}
+
+// sendAppend sends an append RPC with new entries (if any) and the
+// current commit index to the given peer.
+func (r *raft) sendAppend(to uint64) {
+ r.maybeSendAppend(to, true)
+}
+
+// maybeSendAppend sends an append RPC with new entries to the given peer,
+// if necessary. Returns true if a message was sent. The sendIfEmpty
+// argument controls whether messages with no entries will be sent
+// ("empty" messages are useful to convey updated Commit indexes, but
+// are undesirable when we're sending multiple messages in a batch).
+func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
+ pr := r.getProgress(to)
+ if pr.IsPaused() {
+ return false
+ }
+ m := pb.Message{}
+ m.To = to
+
+ term, errt := r.raftLog.term(pr.Next - 1)
+ ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
+ if len(ents) == 0 && !sendIfEmpty {
+ return false
+ }
+
+ if errt != nil || erre != nil { // send snapshot if we failed to get term or entries
+ if !pr.RecentActive {
+ r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
+ return false
+ }
+
+ m.Type = pb.MsgSnap
+ snapshot, err := r.raftLog.snapshot()
+ if err != nil {
+ if err == ErrSnapshotTemporarilyUnavailable {
+ r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
+ return false
+ }
+ panic(err) // TODO(bdarnell)
+ }
+ if IsEmptySnap(snapshot) {
+ panic("need non-empty snapshot")
+ }
+ m.Snapshot = snapshot
+ sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
+ r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
+ r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
+ pr.becomeSnapshot(sindex)
+ r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
+ } else {
+ m.Type = pb.MsgApp
+ m.Index = pr.Next - 1
+ m.LogTerm = term
+ m.Entries = ents
+ m.Commit = r.raftLog.committed
+ if n := len(m.Entries); n != 0 {
+ switch pr.State {
+ // optimistically increase the next when in ProgressStateReplicate
+ case ProgressStateReplicate:
+ last := m.Entries[n-1].Index
+ pr.optimisticUpdate(last)
+ pr.ins.add(last)
+ case ProgressStateProbe:
+ pr.pause()
+ default:
+ r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
+ }
+ }
+ }
+ r.send(m)
+ return true
+}
+
+// sendHeartbeat sends a heartbeat RPC to the given peer.
+func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
+ // Attach the commit as min(to.matched, r.committed).
+ // When the leader sends out heartbeat message,
+ // the receiver(follower) might not be matched with the leader
+ // or it might not have all the committed entries.
+ // The leader MUST NOT forward the follower's commit to
+ // an unmatched index.
+ commit := min(r.getProgress(to).Match, r.raftLog.committed)
+ m := pb.Message{
+ To: to,
+ Type: pb.MsgHeartbeat,
+ Commit: commit,
+ Context: ctx,
+ }
+
+ r.send(m)
+}
+
+func (r *raft) forEachProgress(f func(id uint64, pr *Progress)) {
+ for id, pr := range r.prs {
+ f(id, pr)
+ }
+
+ for id, pr := range r.learnerPrs {
+ f(id, pr)
+ }
+}
+
+// bcastAppend sends RPC, with entries to all peers that are not up-to-date
+// according to the progress recorded in r.prs.
+func (r *raft) bcastAppend() {
+ r.forEachProgress(func(id uint64, _ *Progress) {
+ if id == r.id {
+ return
+ }
+
+ r.sendAppend(id)
+ })
+}
+
+// bcastHeartbeat sends RPC, without entries to all the peers.
+func (r *raft) bcastHeartbeat() {
+ lastCtx := r.readOnly.lastPendingRequestCtx()
+ if len(lastCtx) == 0 {
+ r.bcastHeartbeatWithCtx(nil)
+ } else {
+ r.bcastHeartbeatWithCtx([]byte(lastCtx))
+ }
+}
+
+func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
+ r.forEachProgress(func(id uint64, _ *Progress) {
+ if id == r.id {
+ return
+ }
+ r.sendHeartbeat(id, ctx)
+ })
+}
+
+// maybeCommit attempts to advance the commit index. Returns true if
+// the commit index changed (in which case the caller should call
+// r.bcastAppend).
+func (r *raft) maybeCommit() bool {
+ // Preserving matchBuf across calls is an optimization
+ // used to avoid allocating a new slice on each call.
+ if cap(r.matchBuf) < len(r.prs) {
+ r.matchBuf = make(uint64Slice, len(r.prs))
+ }
+ mis := r.matchBuf[:len(r.prs)]
+ idx := 0
+ for _, p := range r.prs {
+ mis[idx] = p.Match
+ idx++
+ }
+ sort.Sort(mis)
+ mci := mis[len(mis)-r.quorum()]
+ return r.raftLog.maybeCommit(mci, r.Term)
+}
+
+func (r *raft) reset(term uint64) {
+ if r.Term != term {
+ r.Term = term
+ r.Vote = None
+ }
+ r.lead = None
+
+ r.electionElapsed = 0
+ r.heartbeatElapsed = 0
+ r.resetRandomizedElectionTimeout()
+
+ r.abortLeaderTransfer()
+
+ r.votes = make(map[uint64]bool)
+ r.forEachProgress(func(id uint64, pr *Progress) {
+ *pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.maxInflight), IsLearner: pr.IsLearner}
+ if id == r.id {
+ pr.Match = r.raftLog.lastIndex()
+ }
+ })
+
+ r.pendingConfIndex = 0
+ r.uncommittedSize = 0
+ r.readOnly = newReadOnly(r.readOnly.option)
+}
+
+func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
+ li := r.raftLog.lastIndex()
+ for i := range es {
+ es[i].Term = r.Term
+ es[i].Index = li + 1 + uint64(i)
+ }
+ // Track the size of this uncommitted proposal.
+ if !r.increaseUncommittedSize(es) {
+ r.logger.Debugf(
+ "%x appending new entries to log would exceed uncommitted entry size limit; dropping proposal",
+ r.id,
+ )
+ // Drop the proposal.
+ return false
+ }
+ // use latest "last" index after truncate/append
+ li = r.raftLog.append(es...)
+ r.getProgress(r.id).maybeUpdate(li)
+ // Regardless of maybeCommit's return, our caller will call bcastAppend.
+ r.maybeCommit()
+ return true
+}
+
+// tickElection is run by followers and candidates after r.electionTimeout.
+func (r *raft) tickElection() {
+ r.electionElapsed++
+
+ if r.promotable() && r.pastElectionTimeout() {
+ r.electionElapsed = 0
+ r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
+ }
+}
+
+// tickHeartbeat is run by leaders to send a MsgBeat after r.heartbeatTimeout.
+func (r *raft) tickHeartbeat() {
+ r.heartbeatElapsed++
+ r.electionElapsed++
+
+ if r.electionElapsed >= r.electionTimeout {
+ r.electionElapsed = 0
+ if r.checkQuorum {
+ r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
+ }
+ // If current leader cannot transfer leadership in electionTimeout, it becomes leader again.
+ if r.state == StateLeader && r.leadTransferee != None {
+ r.abortLeaderTransfer()
+ }
+ }
+
+ if r.state != StateLeader {
+ return
+ }
+
+ if r.heartbeatElapsed >= r.heartbeatTimeout {
+ r.heartbeatElapsed = 0
+ r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
+ }
+}
+
+func (r *raft) becomeFollower(term uint64, lead uint64) {
+ r.step = stepFollower
+ r.reset(term)
+ r.tick = r.tickElection
+ r.lead = lead
+ r.state = StateFollower
+ r.logger.Infof("%x became follower at term %d", r.id, r.Term)
+}
+
+func (r *raft) becomeCandidate() {
+ // TODO(xiangli) remove the panic when the raft implementation is stable
+ if r.state == StateLeader {
+ panic("invalid transition [leader -> candidate]")
+ }
+ r.step = stepCandidate
+ r.reset(r.Term + 1)
+ r.tick = r.tickElection
+ r.Vote = r.id
+ r.state = StateCandidate
+ r.logger.Infof("%x became candidate at term %d", r.id, r.Term)
+}
+
+func (r *raft) becomePreCandidate() {
+ // TODO(xiangli) remove the panic when the raft implementation is stable
+ if r.state == StateLeader {
+ panic("invalid transition [leader -> pre-candidate]")
+ }
+ // Becoming a pre-candidate changes our step functions and state,
+ // but doesn't change anything else. In particular it does not increase
+ // r.Term or change r.Vote.
+ r.step = stepCandidate
+ r.votes = make(map[uint64]bool)
+ r.tick = r.tickElection
+ r.lead = None
+ r.state = StatePreCandidate
+ r.logger.Infof("%x became pre-candidate at term %d", r.id, r.Term)
+}
+
+func (r *raft) becomeLeader() {
+ // TODO(xiangli) remove the panic when the raft implementation is stable
+ if r.state == StateFollower {
+ panic("invalid transition [follower -> leader]")
+ }
+ r.step = stepLeader
+ r.reset(r.Term)
+ r.tick = r.tickHeartbeat
+ r.lead = r.id
+ r.state = StateLeader
+ // Followers enter replicate mode when they've been successfully probed
+ // (perhaps after having received a snapshot as a result). The leader is
+ // trivially in this state. Note that r.reset() has initialized this
+ // progress with the last index already.
+ r.prs[r.id].becomeReplicate()
+
+ // Conservatively set the pendingConfIndex to the last index in the
+ // log. There may or may not be a pending config change, but it's
+ // safe to delay any future proposals until we commit all our
+ // pending log entries, and scanning the entire tail of the log
+ // could be expensive.
+ r.pendingConfIndex = r.raftLog.lastIndex()
+
+ emptyEnt := pb.Entry{Data: nil}
+ if !r.appendEntry(emptyEnt) {
+ // This won't happen because we just called reset() above.
+ r.logger.Panic("empty entry was dropped")
+ }
+ // As a special case, don't count the initial empty entry towards the
+ // uncommitted log quota. This is because we want to preserve the
+ // behavior of allowing one entry larger than quota if the current
+ // usage is zero.
+ r.reduceUncommittedSize([]pb.Entry{emptyEnt})
+ r.logger.Infof("%x became leader at term %d", r.id, r.Term)
+}
+
+func (r *raft) campaign(t CampaignType) {
+ var term uint64
+ var voteMsg pb.MessageType
+ if t == campaignPreElection {
+ r.becomePreCandidate()
+ voteMsg = pb.MsgPreVote
+ // PreVote RPCs are sent for the next term before we've incremented r.Term.
+ term = r.Term + 1
+ } else {
+ r.becomeCandidate()
+ voteMsg = pb.MsgVote
+ term = r.Term
+ }
+ if r.quorum() == r.poll(r.id, voteRespMsgType(voteMsg), true) {
+ // We won the election after voting for ourselves (which must mean that
+ // this is a single-node cluster). Advance to the next state.
+ if t == campaignPreElection {
+ r.campaign(campaignElection)
+ } else {
+ r.becomeLeader()
+ }
+ return
+ }
+ for id := range r.prs {
+ if id == r.id {
+ continue
+ }
+ r.logger.Infof("%x [logterm: %d, index: %d] sent %s request to %x at term %d",
+ r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), voteMsg, id, r.Term)
+
+ var ctx []byte
+ if t == campaignTransfer {
+ ctx = []byte(t)
+ }
+ r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
+ }
+}
+
+func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int) {
+ if v {
+ r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
+ } else {
+ r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
+ }
+ if _, ok := r.votes[id]; !ok {
+ r.votes[id] = v
+ }
+ for _, vv := range r.votes {
+ if vv {
+ granted++
+ }
+ }
+ return granted
+}
+
+func (r *raft) Step(m pb.Message) error {
+ // Handle the message term, which may result in our stepping down to a follower.
+ switch {
+ case m.Term == 0:
+ // local message
+ case m.Term > r.Term:
+ if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
+ force := bytes.Equal(m.Context, []byte(campaignTransfer))
+ inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
+ if !force && inLease {
+ // If a server receives a RequestVote request within the minimum election timeout
+ // of hearing from a current leader, it does not update its term or grant its vote
+ r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: lease is not expired (remaining ticks: %d)",
+ r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term, r.electionTimeout-r.electionElapsed)
+ return nil
+ }
+ }
+ switch {
+ case m.Type == pb.MsgPreVote:
+ // Never change our term in response to a PreVote
+ case m.Type == pb.MsgPreVoteResp && !m.Reject:
+ // We send pre-vote requests with a term in our future. If the
+ // pre-vote is granted, we will increment our term when we get a
+ // quorum. If it is not, the term comes from the node that
+ // rejected our vote so we should become a follower at the new
+ // term.
+ default:
+ r.logger.Infof("%x [term: %d] received a %s message with higher term from %x [term: %d]",
+ r.id, r.Term, m.Type, m.From, m.Term)
+ if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
+ r.becomeFollower(m.Term, m.From)
+ } else {
+ r.becomeFollower(m.Term, None)
+ }
+ }
+
+ case m.Term < r.Term:
+ if (r.checkQuorum || r.preVote) && (m.Type == pb.MsgHeartbeat || m.Type == pb.MsgApp) {
+ // We have received messages from a leader at a lower term. It is possible
+ // that these messages were simply delayed in the network, but this could
+ // also mean that this node has advanced its term number during a network
+ // partition, and it is now unable to either win an election or to rejoin
+ // the majority on the old term. If checkQuorum is false, this will be
+ // handled by incrementing term numbers in response to MsgVote with a
+ // higher term, but if checkQuorum is true we may not advance the term on
+ // MsgVote and must generate other messages to advance the term. The net
+ // result of these two features is to minimize the disruption caused by
+ // nodes that have been removed from the cluster's configuration: a
+ // removed node will send MsgVotes (or MsgPreVotes) which will be ignored,
+ // but it will not receive MsgApp or MsgHeartbeat, so it will not create
+ // disruptive term increases, by notifying leader of this node's activeness.
+ // The above comments also true for Pre-Vote
+ //
+ // When follower gets isolated, it soon starts an election ending
+ // up with a higher term than leader, although it won't receive enough
+ // votes to win the election. When it regains connectivity, this response
+ // with "pb.MsgAppResp" of higher term would force leader to step down.
+ // However, this disruption is inevitable to free this stuck node with
+ // fresh election. This can be prevented with Pre-Vote phase.
+ r.send(pb.Message{To: m.From, Type: pb.MsgAppResp})
+ } else if m.Type == pb.MsgPreVote {
+ // Before Pre-Vote enable, there may have candidate with higher term,
+ // but less log. After update to Pre-Vote, the cluster may deadlock if
+ // we drop messages with a lower term.
+ r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
+ r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
+ r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgPreVoteResp, Reject: true})
+ } else {
+ // ignore other cases
+ r.logger.Infof("%x [term: %d] ignored a %s message with lower term from %x [term: %d]",
+ r.id, r.Term, m.Type, m.From, m.Term)
+ }
+ return nil
+ }
+
+ switch m.Type {
+ case pb.MsgHup:
+ if r.state != StateLeader {
+ ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
+ if err != nil {
+ r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
+ }
+ if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
+ r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
+ return nil
+ }
+
+ r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
+ if r.preVote {
+ r.campaign(campaignPreElection)
+ } else {
+ r.campaign(campaignElection)
+ }
+ } else {
+ r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
+ }
+
+ case pb.MsgVote, pb.MsgPreVote:
+ if r.isLearner {
+ // TODO: learner may need to vote, in case of node down when confchange.
+ r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] ignored %s from %x [logterm: %d, index: %d] at term %d: learner can not vote",
+ r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
+ return nil
+ }
+ // We can vote if this is a repeat of a vote we've already cast...
+ canVote := r.Vote == m.From ||
+ // ...we haven't voted and we don't think there's a leader yet in this term...
+ (r.Vote == None && r.lead == None) ||
+ // ...or this is a PreVote for a future term...
+ (m.Type == pb.MsgPreVote && m.Term > r.Term)
+ // ...and we believe the candidate is up to date.
+ if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
+ r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] cast %s for %x [logterm: %d, index: %d] at term %d",
+ r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
+ // When responding to Msg{Pre,}Vote messages we include the term
+ // from the message, not the local term. To see why consider the
+ // case where a single node was previously partitioned away and
+ // it's local term is now of date. If we include the local term
+ // (recall that for pre-votes we don't update the local term), the
+ // (pre-)campaigning node on the other end will proceed to ignore
+ // the message (it ignores all out of date messages).
+ // The term in the original message and current local term are the
+ // same in the case of regular votes, but different for pre-votes.
+ r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
+ if m.Type == pb.MsgVote {
+ // Only record real votes.
+ r.electionElapsed = 0
+ r.Vote = m.From
+ }
+ } else {
+ r.logger.Infof("%x [logterm: %d, index: %d, vote: %x] rejected %s from %x [logterm: %d, index: %d] at term %d",
+ r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.Type, m.From, m.LogTerm, m.Index, r.Term)
+ r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
+ }
+
+ default:
+ err := r.step(r, m)
+ if err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+type stepFunc func(r *raft, m pb.Message) error
+
+func stepLeader(r *raft, m pb.Message) error {
+ // These message types do not require any progress for m.From.
+ switch m.Type {
+ case pb.MsgBeat:
+ r.bcastHeartbeat()
+ return nil
+ case pb.MsgCheckQuorum:
+ if !r.checkQuorumActive() {
+ r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
+ r.becomeFollower(r.Term, None)
+ }
+ return nil
+ case pb.MsgProp:
+ if len(m.Entries) == 0 {
+ r.logger.Panicf("%x stepped empty MsgProp", r.id)
+ }
+ if _, ok := r.prs[r.id]; !ok {
+ // If we are not currently a member of the range (i.e. this node
+ // was removed from the configuration while serving as leader),
+ // drop any new proposals.
+ return ErrProposalDropped
+ }
+ if r.leadTransferee != None {
+ r.logger.Debugf("%x [term %d] transfer leadership to %x is in progress; dropping proposal", r.id, r.Term, r.leadTransferee)
+ return ErrProposalDropped
+ }
+
+ for i, e := range m.Entries {
+ if e.Type == pb.EntryConfChange {
+ if r.pendingConfIndex > r.raftLog.applied {
+ r.logger.Infof("propose conf %s ignored since pending unapplied configuration [index %d, applied %d]",
+ e.String(), r.pendingConfIndex, r.raftLog.applied)
+ m.Entries[i] = pb.Entry{Type: pb.EntryNormal}
+ } else {
+ r.pendingConfIndex = r.raftLog.lastIndex() + uint64(i) + 1
+ }
+ }
+ }
+
+ if !r.appendEntry(m.Entries...) {
+ return ErrProposalDropped
+ }
+ r.bcastAppend()
+ return nil
+ case pb.MsgReadIndex:
+ if r.quorum() > 1 {
+ if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
+ // Reject read only request when this leader has not committed any log entry at its term.
+ return nil
+ }
+
+ // thinking: use an interally defined context instead of the user given context.
+ // We can express this in terms of the term and index instead of a user-supplied value.
+ // This would allow multiple reads to piggyback on the same message.
+ switch r.readOnly.option {
+ case ReadOnlySafe:
+ r.readOnly.addRequest(r.raftLog.committed, m)
+ r.bcastHeartbeatWithCtx(m.Entries[0].Data)
+ case ReadOnlyLeaseBased:
+ ri := r.raftLog.committed
+ if m.From == None || m.From == r.id { // from local member
+ r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
+ } else {
+ r.send(pb.Message{To: m.From, Type: pb.MsgReadIndexResp, Index: ri, Entries: m.Entries})
+ }
+ }
+ } else {
+ r.readStates = append(r.readStates, ReadState{Index: r.raftLog.committed, RequestCtx: m.Entries[0].Data})
+ }
+
+ return nil
+ }
+
+ // All other message types require a progress for m.From (pr).
+ pr := r.getProgress(m.From)
+ if pr == nil {
+ r.logger.Debugf("%x no progress available for %x", r.id, m.From)
+ return nil
+ }
+ switch m.Type {
+ case pb.MsgAppResp:
+ pr.RecentActive = true
+
+ if m.Reject {
+ r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
+ r.id, m.RejectHint, m.From, m.Index)
+ if pr.maybeDecrTo(m.Index, m.RejectHint) {
+ r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
+ if pr.State == ProgressStateReplicate {
+ pr.becomeProbe()
+ }
+ r.sendAppend(m.From)
+ }
+ } else {
+ oldPaused := pr.IsPaused()
+ if pr.maybeUpdate(m.Index) {
+ switch {
+ case pr.State == ProgressStateProbe:
+ pr.becomeReplicate()
+ case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
+ r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
+ // Transition back to replicating state via probing state
+ // (which takes the snapshot into account). If we didn't
+ // move to replicating state, that would only happen with
+ // the next round of appends (but there may not be a next
+ // round for a while, exposing an inconsistent RaftStatus).
+ pr.becomeProbe()
+ pr.becomeReplicate()
+ case pr.State == ProgressStateReplicate:
+ pr.ins.freeTo(m.Index)
+ }
+
+ if r.maybeCommit() {
+ r.bcastAppend()
+ } else if oldPaused {
+ // If we were paused before, this node may be missing the
+ // latest commit index, so send it.
+ r.sendAppend(m.From)
+ }
+ // We've updated flow control information above, which may
+ // allow us to send multiple (size-limited) in-flight messages
+ // at once (such as when transitioning from probe to
+ // replicate, or when freeTo() covers multiple messages). If
+ // we have more entries to send, send as many messages as we
+ // can (without sending empty messages for the commit index)
+ for r.maybeSendAppend(m.From, false) {
+ }
+ // Transfer leadership is in progress.
+ if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
+ r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
+ r.sendTimeoutNow(m.From)
+ }
+ }
+ }
+ case pb.MsgHeartbeatResp:
+ pr.RecentActive = true
+ pr.resume()
+
+ // free one slot for the full inflights window to allow progress.
+ if pr.State == ProgressStateReplicate && pr.ins.full() {
+ pr.ins.freeFirstOne()
+ }
+ if pr.Match < r.raftLog.lastIndex() {
+ r.sendAppend(m.From)
+ }
+
+ if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
+ return nil
+ }
+
+ ackCount := r.readOnly.recvAck(m)
+ if ackCount < r.quorum() {
+ return nil
+ }
+
+ rss := r.readOnly.advance(m)
+ for _, rs := range rss {
+ req := rs.req
+ if req.From == None || req.From == r.id { // from local member
+ r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
+ } else {
+ r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
+ }
+ }
+ case pb.MsgSnapStatus:
+ if pr.State != ProgressStateSnapshot {
+ return nil
+ }
+ if !m.Reject {
+ pr.becomeProbe()
+ r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
+ } else {
+ pr.snapshotFailure()
+ pr.becomeProbe()
+ r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
+ }
+ // If snapshot finish, wait for the msgAppResp from the remote node before sending
+ // out the next msgApp.
+ // If snapshot failure, wait for a heartbeat interval before next try
+ pr.pause()
+ case pb.MsgUnreachable:
+ // During optimistic replication, if the remote becomes unreachable,
+ // there is huge probability that a MsgApp is lost.
+ if pr.State == ProgressStateReplicate {
+ pr.becomeProbe()
+ }
+ r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
+ case pb.MsgTransferLeader:
+ if pr.IsLearner {
+ r.logger.Debugf("%x is learner. Ignored transferring leadership", r.id)
+ return nil
+ }
+ leadTransferee := m.From
+ lastLeadTransferee := r.leadTransferee
+ if lastLeadTransferee != None {
+ if lastLeadTransferee == leadTransferee {
+ r.logger.Infof("%x [term %d] transfer leadership to %x is in progress, ignores request to same node %x",
+ r.id, r.Term, leadTransferee, leadTransferee)
+ return nil
+ }
+ r.abortLeaderTransfer()
+ r.logger.Infof("%x [term %d] abort previous transferring leadership to %x", r.id, r.Term, lastLeadTransferee)
+ }
+ if leadTransferee == r.id {
+ r.logger.Debugf("%x is already leader. Ignored transferring leadership to self", r.id)
+ return nil
+ }
+ // Transfer leadership to third party.
+ r.logger.Infof("%x [term %d] starts to transfer leadership to %x", r.id, r.Term, leadTransferee)
+ // Transfer leadership should be finished in one electionTimeout, so reset r.electionElapsed.
+ r.electionElapsed = 0
+ r.leadTransferee = leadTransferee
+ if pr.Match == r.raftLog.lastIndex() {
+ r.sendTimeoutNow(leadTransferee)
+ r.logger.Infof("%x sends MsgTimeoutNow to %x immediately as %x already has up-to-date log", r.id, leadTransferee, leadTransferee)
+ } else {
+ r.sendAppend(leadTransferee)
+ }
+ }
+ return nil
+}
+
+// stepCandidate is shared by StateCandidate and StatePreCandidate; the difference is
+// whether they respond to MsgVoteResp or MsgPreVoteResp.
+func stepCandidate(r *raft, m pb.Message) error {
+ // Only handle vote responses corresponding to our candidacy (while in
+ // StateCandidate, we may get stale MsgPreVoteResp messages in this term from
+ // our pre-candidate state).
+ var myVoteRespType pb.MessageType
+ if r.state == StatePreCandidate {
+ myVoteRespType = pb.MsgPreVoteResp
+ } else {
+ myVoteRespType = pb.MsgVoteResp
+ }
+ switch m.Type {
+ case pb.MsgProp:
+ r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
+ return ErrProposalDropped
+ case pb.MsgApp:
+ r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
+ r.handleAppendEntries(m)
+ case pb.MsgHeartbeat:
+ r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
+ r.handleHeartbeat(m)
+ case pb.MsgSnap:
+ r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
+ r.handleSnapshot(m)
+ case myVoteRespType:
+ gr := r.poll(m.From, m.Type, !m.Reject)
+ r.logger.Infof("%x [quorum:%d] has received %d %s votes and %d vote rejections", r.id, r.quorum(), gr, m.Type, len(r.votes)-gr)
+ switch r.quorum() {
+ case gr:
+ if r.state == StatePreCandidate {
+ r.campaign(campaignElection)
+ } else {
+ r.becomeLeader()
+ r.bcastAppend()
+ }
+ case len(r.votes) - gr:
+ // pb.MsgPreVoteResp contains future term of pre-candidate
+ // m.Term > r.Term; reuse r.Term
+ r.becomeFollower(r.Term, None)
+ }
+ case pb.MsgTimeoutNow:
+ r.logger.Debugf("%x [term %d state %v] ignored MsgTimeoutNow from %x", r.id, r.Term, r.state, m.From)
+ }
+ return nil
+}
+
+func stepFollower(r *raft, m pb.Message) error {
+ switch m.Type {
+ case pb.MsgProp:
+ if r.lead == None {
+ r.logger.Infof("%x no leader at term %d; dropping proposal", r.id, r.Term)
+ return ErrProposalDropped
+ } else if r.disableProposalForwarding {
+ r.logger.Infof("%x not forwarding to leader %x at term %d; dropping proposal", r.id, r.lead, r.Term)
+ return ErrProposalDropped
+ }
+ m.To = r.lead
+ r.send(m)
+ case pb.MsgApp:
+ r.electionElapsed = 0
+ r.lead = m.From
+ r.handleAppendEntries(m)
+ case pb.MsgHeartbeat:
+ r.electionElapsed = 0
+ r.lead = m.From
+ r.handleHeartbeat(m)
+ case pb.MsgSnap:
+ r.electionElapsed = 0
+ r.lead = m.From
+ r.handleSnapshot(m)
+ case pb.MsgTransferLeader:
+ if r.lead == None {
+ r.logger.Infof("%x no leader at term %d; dropping leader transfer msg", r.id, r.Term)
+ return nil
+ }
+ m.To = r.lead
+ r.send(m)
+ case pb.MsgTimeoutNow:
+ if r.promotable() {
+ r.logger.Infof("%x [term %d] received MsgTimeoutNow from %x and starts an election to get leadership.", r.id, r.Term, m.From)
+ // Leadership transfers never use pre-vote even if r.preVote is true; we
+ // know we are not recovering from a partition so there is no need for the
+ // extra round trip.
+ r.campaign(campaignTransfer)
+ } else {
+ r.logger.Infof("%x received MsgTimeoutNow from %x but is not promotable", r.id, m.From)
+ }
+ case pb.MsgReadIndex:
+ if r.lead == None {
+ r.logger.Infof("%x no leader at term %d; dropping index reading msg", r.id, r.Term)
+ return nil
+ }
+ m.To = r.lead
+ r.send(m)
+ case pb.MsgReadIndexResp:
+ if len(m.Entries) != 1 {
+ r.logger.Errorf("%x invalid format of MsgReadIndexResp from %x, entries count: %d", r.id, m.From, len(m.Entries))
+ return nil
+ }
+ r.readStates = append(r.readStates, ReadState{Index: m.Index, RequestCtx: m.Entries[0].Data})
+ }
+ return nil
+}
+
+func (r *raft) handleAppendEntries(m pb.Message) {
+ if m.Index < r.raftLog.committed {
+ r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
+ return
+ }
+
+ if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
+ r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
+ } else {
+ r.logger.Debugf("%x [logterm: %d, index: %d] rejected msgApp [logterm: %d, index: %d] from %x",
+ r.id, r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(m.Index)), m.Index, m.LogTerm, m.Index, m.From)
+ r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
+ }
+}
+
+func (r *raft) handleHeartbeat(m pb.Message) {
+ r.raftLog.commitTo(m.Commit)
+ r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
+}
+
+func (r *raft) handleSnapshot(m pb.Message) {
+ sindex, sterm := m.Snapshot.Metadata.Index, m.Snapshot.Metadata.Term
+ if r.restore(m.Snapshot) {
+ r.logger.Infof("%x [commit: %d] restored snapshot [index: %d, term: %d]",
+ r.id, r.raftLog.committed, sindex, sterm)
+ r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
+ } else {
+ r.logger.Infof("%x [commit: %d] ignored snapshot [index: %d, term: %d]",
+ r.id, r.raftLog.committed, sindex, sterm)
+ r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
+ }
+}
+
+// restore recovers the state machine from a snapshot. It restores the log and the
+// configuration of state machine.
+func (r *raft) restore(s pb.Snapshot) bool {
+ if s.Metadata.Index <= r.raftLog.committed {
+ return false
+ }
+ if r.raftLog.matchTerm(s.Metadata.Index, s.Metadata.Term) {
+ r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] fast-forwarded commit to snapshot [index: %d, term: %d]",
+ r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
+ r.raftLog.commitTo(s.Metadata.Index)
+ return false
+ }
+
+ // The normal peer can't become learner.
+ if !r.isLearner {
+ for _, id := range s.Metadata.ConfState.Learners {
+ if id == r.id {
+ r.logger.Errorf("%x can't become learner when restores snapshot [index: %d, term: %d]", r.id, s.Metadata.Index, s.Metadata.Term)
+ return false
+ }
+ }
+ }
+
+ r.logger.Infof("%x [commit: %d, lastindex: %d, lastterm: %d] starts to restore snapshot [index: %d, term: %d]",
+ r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
+
+ r.raftLog.restore(s)
+ r.prs = make(map[uint64]*Progress)
+ r.learnerPrs = make(map[uint64]*Progress)
+ r.restoreNode(s.Metadata.ConfState.Nodes, false)
+ r.restoreNode(s.Metadata.ConfState.Learners, true)
+ return true
+}
+
+func (r *raft) restoreNode(nodes []uint64, isLearner bool) {
+ for _, n := range nodes {
+ match, next := uint64(0), r.raftLog.lastIndex()+1
+ if n == r.id {
+ match = next - 1
+ r.isLearner = isLearner
+ }
+ r.setProgress(n, match, next, isLearner)
+ r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.getProgress(n))
+ }
+}
+
+// promotable indicates whether state machine can be promoted to leader,
+// which is true when its own id is in progress list.
+func (r *raft) promotable() bool {
+ _, ok := r.prs[r.id]
+ return ok
+}
+
+func (r *raft) addNode(id uint64) {
+ r.addNodeOrLearnerNode(id, false)
+}
+
+func (r *raft) addLearner(id uint64) {
+ r.addNodeOrLearnerNode(id, true)
+}
+
+func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
+ pr := r.getProgress(id)
+ if pr == nil {
+ r.setProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
+ } else {
+ if isLearner && !pr.IsLearner {
+ // can only change Learner to Voter
+ r.logger.Infof("%x ignored addLearner: do not support changing %x from raft peer to learner.", r.id, id)
+ return
+ }
+
+ if isLearner == pr.IsLearner {
+ // Ignore any redundant addNode calls (which can happen because the
+ // initial bootstrapping entries are applied twice).
+ return
+ }
+
+ // change Learner to Voter, use origin Learner progress
+ delete(r.learnerPrs, id)
+ pr.IsLearner = false
+ r.prs[id] = pr
+ }
+
+ if r.id == id {
+ r.isLearner = isLearner
+ }
+
+ // When a node is first added, we should mark it as recently active.
+ // Otherwise, CheckQuorum may cause us to step down if it is invoked
+ // before the added node has a chance to communicate with us.
+ pr = r.getProgress(id)
+ pr.RecentActive = true
+}
+
+func (r *raft) removeNode(id uint64) {
+ r.delProgress(id)
+
+ // do not try to commit or abort transferring if there is no nodes in the cluster.
+ if len(r.prs) == 0 && len(r.learnerPrs) == 0 {
+ return
+ }
+
+ // The quorum size is now smaller, so see if any pending entries can
+ // be committed.
+ if r.maybeCommit() {
+ r.bcastAppend()
+ }
+ // If the removed node is the leadTransferee, then abort the leadership transferring.
+ if r.state == StateLeader && r.leadTransferee == id {
+ r.abortLeaderTransfer()
+ }
+}
+
+func (r *raft) setProgress(id, match, next uint64, isLearner bool) {
+ if !isLearner {
+ delete(r.learnerPrs, id)
+ r.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight)}
+ return
+ }
+
+ if _, ok := r.prs[id]; ok {
+ panic(fmt.Sprintf("%x unexpected changing from voter to learner for %x", r.id, id))
+ }
+ r.learnerPrs[id] = &Progress{Next: next, Match: match, ins: newInflights(r.maxInflight), IsLearner: true}
+}
+
+func (r *raft) delProgress(id uint64) {
+ delete(r.prs, id)
+ delete(r.learnerPrs, id)
+}
+
+func (r *raft) loadState(state pb.HardState) {
+ if state.Commit < r.raftLog.committed || state.Commit > r.raftLog.lastIndex() {
+ r.logger.Panicf("%x state.commit %d is out of range [%d, %d]", r.id, state.Commit, r.raftLog.committed, r.raftLog.lastIndex())
+ }
+ r.raftLog.committed = state.Commit
+ r.Term = state.Term
+ r.Vote = state.Vote
+}
+
+// pastElectionTimeout returns true iff r.electionElapsed is greater
+// than or equal to the randomized election timeout in
+// [electiontimeout, 2 * electiontimeout - 1].
+func (r *raft) pastElectionTimeout() bool {
+ return r.electionElapsed >= r.randomizedElectionTimeout
+}
+
+func (r *raft) resetRandomizedElectionTimeout() {
+ r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
+}
+
+// checkQuorumActive returns true if the quorum is active from
+// the view of the local raft state machine. Otherwise, it returns
+// false.
+// checkQuorumActive also resets all RecentActive to false.
+func (r *raft) checkQuorumActive() bool {
+ var act int
+
+ r.forEachProgress(func(id uint64, pr *Progress) {
+ if id == r.id { // self is always active
+ act++
+ return
+ }
+
+ if pr.RecentActive && !pr.IsLearner {
+ act++
+ }
+
+ pr.RecentActive = false
+ })
+
+ return act >= r.quorum()
+}
+
+func (r *raft) sendTimeoutNow(to uint64) {
+ r.send(pb.Message{To: to, Type: pb.MsgTimeoutNow})
+}
+
+func (r *raft) abortLeaderTransfer() {
+ r.leadTransferee = None
+}
+
+// increaseUncommittedSize computes the size of the proposed entries and
+// determines whether they would push leader over its maxUncommittedSize limit.
+// If the new entries would exceed the limit, the method returns false. If not,
+// the increase in uncommitted entry size is recorded and the method returns
+// true.
+func (r *raft) increaseUncommittedSize(ents []pb.Entry) bool {
+ var s uint64
+ for _, e := range ents {
+ s += uint64(PayloadSize(e))
+ }
+
+ if r.uncommittedSize > 0 && r.uncommittedSize+s > r.maxUncommittedSize {
+ // If the uncommitted tail of the Raft log is empty, allow any size
+ // proposal. Otherwise, limit the size of the uncommitted tail of the
+ // log and drop any proposal that would push the size over the limit.
+ return false
+ }
+ r.uncommittedSize += s
+ return true
+}
+
+// reduceUncommittedSize accounts for the newly committed entries by decreasing
+// the uncommitted entry size limit.
+func (r *raft) reduceUncommittedSize(ents []pb.Entry) {
+ if r.uncommittedSize == 0 {
+ // Fast-path for followers, who do not track or enforce the limit.
+ return
+ }
+
+ var s uint64
+ for _, e := range ents {
+ s += uint64(PayloadSize(e))
+ }
+ if s > r.uncommittedSize {
+ // uncommittedSize may underestimate the size of the uncommitted Raft
+ // log tail but will never overestimate it. Saturate at 0 instead of
+ // allowing overflow.
+ r.uncommittedSize = 0
+ } else {
+ r.uncommittedSize -= s
+ }
+}
+
+func numOfPendingConf(ents []pb.Entry) int {
+ n := 0
+ for i := range ents {
+ if ents[i].Type == pb.EntryConfChange {
+ n++
+ }
+ }
+ return n
+}
diff --git a/vendor/go.etcd.io/etcd/raft/raftpb/raft.pb.go b/vendor/go.etcd.io/etcd/raft/raftpb/raft.pb.go
new file mode 100644
index 0000000..fd9ee37
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/raftpb/raft.pb.go
@@ -0,0 +1,2004 @@
+// Code generated by protoc-gen-gogo. DO NOT EDIT.
+// source: raft.proto
+
+/*
+ Package raftpb is a generated protocol buffer package.
+
+ It is generated from these files:
+ raft.proto
+
+ It has these top-level messages:
+ Entry
+ SnapshotMetadata
+ Snapshot
+ Message
+ HardState
+ ConfState
+ ConfChange
+*/
+package raftpb
+
+import (
+ "fmt"
+
+ proto "github.com/golang/protobuf/proto"
+
+ math "math"
+
+ _ "github.com/gogo/protobuf/gogoproto"
+
+ io "io"
+)
+
+// Reference imports to suppress errors if they are not otherwise used.
+var _ = proto.Marshal
+var _ = fmt.Errorf
+var _ = math.Inf
+
+// This is a compile-time assertion to ensure that this generated file
+// is compatible with the proto package it is being compiled against.
+// A compilation error at this line likely means your copy of the
+// proto package needs to be updated.
+const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
+
+type EntryType int32
+
+const (
+ EntryNormal EntryType = 0
+ EntryConfChange EntryType = 1
+)
+
+var EntryType_name = map[int32]string{
+ 0: "EntryNormal",
+ 1: "EntryConfChange",
+}
+var EntryType_value = map[string]int32{
+ "EntryNormal": 0,
+ "EntryConfChange": 1,
+}
+
+func (x EntryType) Enum() *EntryType {
+ p := new(EntryType)
+ *p = x
+ return p
+}
+func (x EntryType) String() string {
+ return proto.EnumName(EntryType_name, int32(x))
+}
+func (x *EntryType) UnmarshalJSON(data []byte) error {
+ value, err := proto.UnmarshalJSONEnum(EntryType_value, data, "EntryType")
+ if err != nil {
+ return err
+ }
+ *x = EntryType(value)
+ return nil
+}
+func (EntryType) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft, []int{0} }
+
+type MessageType int32
+
+const (
+ MsgHup MessageType = 0
+ MsgBeat MessageType = 1
+ MsgProp MessageType = 2
+ MsgApp MessageType = 3
+ MsgAppResp MessageType = 4
+ MsgVote MessageType = 5
+ MsgVoteResp MessageType = 6
+ MsgSnap MessageType = 7
+ MsgHeartbeat MessageType = 8
+ MsgHeartbeatResp MessageType = 9
+ MsgUnreachable MessageType = 10
+ MsgSnapStatus MessageType = 11
+ MsgCheckQuorum MessageType = 12
+ MsgTransferLeader MessageType = 13
+ MsgTimeoutNow MessageType = 14
+ MsgReadIndex MessageType = 15
+ MsgReadIndexResp MessageType = 16
+ MsgPreVote MessageType = 17
+ MsgPreVoteResp MessageType = 18
+)
+
+var MessageType_name = map[int32]string{
+ 0: "MsgHup",
+ 1: "MsgBeat",
+ 2: "MsgProp",
+ 3: "MsgApp",
+ 4: "MsgAppResp",
+ 5: "MsgVote",
+ 6: "MsgVoteResp",
+ 7: "MsgSnap",
+ 8: "MsgHeartbeat",
+ 9: "MsgHeartbeatResp",
+ 10: "MsgUnreachable",
+ 11: "MsgSnapStatus",
+ 12: "MsgCheckQuorum",
+ 13: "MsgTransferLeader",
+ 14: "MsgTimeoutNow",
+ 15: "MsgReadIndex",
+ 16: "MsgReadIndexResp",
+ 17: "MsgPreVote",
+ 18: "MsgPreVoteResp",
+}
+var MessageType_value = map[string]int32{
+ "MsgHup": 0,
+ "MsgBeat": 1,
+ "MsgProp": 2,
+ "MsgApp": 3,
+ "MsgAppResp": 4,
+ "MsgVote": 5,
+ "MsgVoteResp": 6,
+ "MsgSnap": 7,
+ "MsgHeartbeat": 8,
+ "MsgHeartbeatResp": 9,
+ "MsgUnreachable": 10,
+ "MsgSnapStatus": 11,
+ "MsgCheckQuorum": 12,
+ "MsgTransferLeader": 13,
+ "MsgTimeoutNow": 14,
+ "MsgReadIndex": 15,
+ "MsgReadIndexResp": 16,
+ "MsgPreVote": 17,
+ "MsgPreVoteResp": 18,
+}
+
+func (x MessageType) Enum() *MessageType {
+ p := new(MessageType)
+ *p = x
+ return p
+}
+func (x MessageType) String() string {
+ return proto.EnumName(MessageType_name, int32(x))
+}
+func (x *MessageType) UnmarshalJSON(data []byte) error {
+ value, err := proto.UnmarshalJSONEnum(MessageType_value, data, "MessageType")
+ if err != nil {
+ return err
+ }
+ *x = MessageType(value)
+ return nil
+}
+func (MessageType) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft, []int{1} }
+
+type ConfChangeType int32
+
+const (
+ ConfChangeAddNode ConfChangeType = 0
+ ConfChangeRemoveNode ConfChangeType = 1
+ ConfChangeUpdateNode ConfChangeType = 2
+ ConfChangeAddLearnerNode ConfChangeType = 3
+)
+
+var ConfChangeType_name = map[int32]string{
+ 0: "ConfChangeAddNode",
+ 1: "ConfChangeRemoveNode",
+ 2: "ConfChangeUpdateNode",
+ 3: "ConfChangeAddLearnerNode",
+}
+var ConfChangeType_value = map[string]int32{
+ "ConfChangeAddNode": 0,
+ "ConfChangeRemoveNode": 1,
+ "ConfChangeUpdateNode": 2,
+ "ConfChangeAddLearnerNode": 3,
+}
+
+func (x ConfChangeType) Enum() *ConfChangeType {
+ p := new(ConfChangeType)
+ *p = x
+ return p
+}
+func (x ConfChangeType) String() string {
+ return proto.EnumName(ConfChangeType_name, int32(x))
+}
+func (x *ConfChangeType) UnmarshalJSON(data []byte) error {
+ value, err := proto.UnmarshalJSONEnum(ConfChangeType_value, data, "ConfChangeType")
+ if err != nil {
+ return err
+ }
+ *x = ConfChangeType(value)
+ return nil
+}
+func (ConfChangeType) EnumDescriptor() ([]byte, []int) { return fileDescriptorRaft, []int{2} }
+
+type Entry struct {
+ Term uint64 `protobuf:"varint,2,opt,name=Term" json:"Term"`
+ Index uint64 `protobuf:"varint,3,opt,name=Index" json:"Index"`
+ Type EntryType `protobuf:"varint,1,opt,name=Type,enum=raftpb.EntryType" json:"Type"`
+ Data []byte `protobuf:"bytes,4,opt,name=Data" json:"Data,omitempty"`
+ XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *Entry) Reset() { *m = Entry{} }
+func (m *Entry) String() string { return proto.CompactTextString(m) }
+func (*Entry) ProtoMessage() {}
+func (*Entry) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{0} }
+
+type SnapshotMetadata struct {
+ ConfState ConfState `protobuf:"bytes,1,opt,name=conf_state,json=confState" json:"conf_state"`
+ Index uint64 `protobuf:"varint,2,opt,name=index" json:"index"`
+ Term uint64 `protobuf:"varint,3,opt,name=term" json:"term"`
+ XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *SnapshotMetadata) Reset() { *m = SnapshotMetadata{} }
+func (m *SnapshotMetadata) String() string { return proto.CompactTextString(m) }
+func (*SnapshotMetadata) ProtoMessage() {}
+func (*SnapshotMetadata) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{1} }
+
+type Snapshot struct {
+ Data []byte `protobuf:"bytes,1,opt,name=data" json:"data,omitempty"`
+ Metadata SnapshotMetadata `protobuf:"bytes,2,opt,name=metadata" json:"metadata"`
+ XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *Snapshot) Reset() { *m = Snapshot{} }
+func (m *Snapshot) String() string { return proto.CompactTextString(m) }
+func (*Snapshot) ProtoMessage() {}
+func (*Snapshot) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{2} }
+
+type Message struct {
+ Type MessageType `protobuf:"varint,1,opt,name=type,enum=raftpb.MessageType" json:"type"`
+ To uint64 `protobuf:"varint,2,opt,name=to" json:"to"`
+ From uint64 `protobuf:"varint,3,opt,name=from" json:"from"`
+ Term uint64 `protobuf:"varint,4,opt,name=term" json:"term"`
+ LogTerm uint64 `protobuf:"varint,5,opt,name=logTerm" json:"logTerm"`
+ Index uint64 `protobuf:"varint,6,opt,name=index" json:"index"`
+ Entries []Entry `protobuf:"bytes,7,rep,name=entries" json:"entries"`
+ Commit uint64 `protobuf:"varint,8,opt,name=commit" json:"commit"`
+ Snapshot Snapshot `protobuf:"bytes,9,opt,name=snapshot" json:"snapshot"`
+ Reject bool `protobuf:"varint,10,opt,name=reject" json:"reject"`
+ RejectHint uint64 `protobuf:"varint,11,opt,name=rejectHint" json:"rejectHint"`
+ Context []byte `protobuf:"bytes,12,opt,name=context" json:"context,omitempty"`
+ XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *Message) Reset() { *m = Message{} }
+func (m *Message) String() string { return proto.CompactTextString(m) }
+func (*Message) ProtoMessage() {}
+func (*Message) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{3} }
+
+type HardState struct {
+ Term uint64 `protobuf:"varint,1,opt,name=term" json:"term"`
+ Vote uint64 `protobuf:"varint,2,opt,name=vote" json:"vote"`
+ Commit uint64 `protobuf:"varint,3,opt,name=commit" json:"commit"`
+ XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *HardState) Reset() { *m = HardState{} }
+func (m *HardState) String() string { return proto.CompactTextString(m) }
+func (*HardState) ProtoMessage() {}
+func (*HardState) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{4} }
+
+type ConfState struct {
+ Nodes []uint64 `protobuf:"varint,1,rep,name=nodes" json:"nodes,omitempty"`
+ Learners []uint64 `protobuf:"varint,2,rep,name=learners" json:"learners,omitempty"`
+ XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *ConfState) Reset() { *m = ConfState{} }
+func (m *ConfState) String() string { return proto.CompactTextString(m) }
+func (*ConfState) ProtoMessage() {}
+func (*ConfState) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{5} }
+
+type ConfChange struct {
+ ID uint64 `protobuf:"varint,1,opt,name=ID" json:"ID"`
+ Type ConfChangeType `protobuf:"varint,2,opt,name=Type,enum=raftpb.ConfChangeType" json:"Type"`
+ NodeID uint64 `protobuf:"varint,3,opt,name=NodeID" json:"NodeID"`
+ Context []byte `protobuf:"bytes,4,opt,name=Context" json:"Context,omitempty"`
+ XXX_unrecognized []byte `json:"-"`
+}
+
+func (m *ConfChange) Reset() { *m = ConfChange{} }
+func (m *ConfChange) String() string { return proto.CompactTextString(m) }
+func (*ConfChange) ProtoMessage() {}
+func (*ConfChange) Descriptor() ([]byte, []int) { return fileDescriptorRaft, []int{6} }
+
+func init() {
+ proto.RegisterType((*Entry)(nil), "raftpb.Entry")
+ proto.RegisterType((*SnapshotMetadata)(nil), "raftpb.SnapshotMetadata")
+ proto.RegisterType((*Snapshot)(nil), "raftpb.Snapshot")
+ proto.RegisterType((*Message)(nil), "raftpb.Message")
+ proto.RegisterType((*HardState)(nil), "raftpb.HardState")
+ proto.RegisterType((*ConfState)(nil), "raftpb.ConfState")
+ proto.RegisterType((*ConfChange)(nil), "raftpb.ConfChange")
+ proto.RegisterEnum("raftpb.EntryType", EntryType_name, EntryType_value)
+ proto.RegisterEnum("raftpb.MessageType", MessageType_name, MessageType_value)
+ proto.RegisterEnum("raftpb.ConfChangeType", ConfChangeType_name, ConfChangeType_value)
+}
+func (m *Entry) Marshal() (dAtA []byte, err error) {
+ size := m.Size()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalTo(dAtA)
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *Entry) MarshalTo(dAtA []byte) (int, error) {
+ var i int
+ _ = i
+ var l int
+ _ = l
+ dAtA[i] = 0x8
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.Type))
+ dAtA[i] = 0x10
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.Term))
+ dAtA[i] = 0x18
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.Index))
+ if m.Data != nil {
+ dAtA[i] = 0x22
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(len(m.Data)))
+ i += copy(dAtA[i:], m.Data)
+ }
+ if m.XXX_unrecognized != nil {
+ i += copy(dAtA[i:], m.XXX_unrecognized)
+ }
+ return i, nil
+}
+
+func (m *SnapshotMetadata) Marshal() (dAtA []byte, err error) {
+ size := m.Size()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalTo(dAtA)
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *SnapshotMetadata) MarshalTo(dAtA []byte) (int, error) {
+ var i int
+ _ = i
+ var l int
+ _ = l
+ dAtA[i] = 0xa
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.ConfState.Size()))
+ n1, err := m.ConfState.MarshalTo(dAtA[i:])
+ if err != nil {
+ return 0, err
+ }
+ i += n1
+ dAtA[i] = 0x10
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.Index))
+ dAtA[i] = 0x18
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.Term))
+ if m.XXX_unrecognized != nil {
+ i += copy(dAtA[i:], m.XXX_unrecognized)
+ }
+ return i, nil
+}
+
+func (m *Snapshot) Marshal() (dAtA []byte, err error) {
+ size := m.Size()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalTo(dAtA)
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *Snapshot) MarshalTo(dAtA []byte) (int, error) {
+ var i int
+ _ = i
+ var l int
+ _ = l
+ if m.Data != nil {
+ dAtA[i] = 0xa
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(len(m.Data)))
+ i += copy(dAtA[i:], m.Data)
+ }
+ dAtA[i] = 0x12
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.Metadata.Size()))
+ n2, err := m.Metadata.MarshalTo(dAtA[i:])
+ if err != nil {
+ return 0, err
+ }
+ i += n2
+ if m.XXX_unrecognized != nil {
+ i += copy(dAtA[i:], m.XXX_unrecognized)
+ }
+ return i, nil
+}
+
+func (m *Message) Marshal() (dAtA []byte, err error) {
+ size := m.Size()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalTo(dAtA)
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *Message) MarshalTo(dAtA []byte) (int, error) {
+ var i int
+ _ = i
+ var l int
+ _ = l
+ dAtA[i] = 0x8
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.Type))
+ dAtA[i] = 0x10
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.To))
+ dAtA[i] = 0x18
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.From))
+ dAtA[i] = 0x20
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.Term))
+ dAtA[i] = 0x28
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.LogTerm))
+ dAtA[i] = 0x30
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.Index))
+ if len(m.Entries) > 0 {
+ for _, msg := range m.Entries {
+ dAtA[i] = 0x3a
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(msg.Size()))
+ n, err := msg.MarshalTo(dAtA[i:])
+ if err != nil {
+ return 0, err
+ }
+ i += n
+ }
+ }
+ dAtA[i] = 0x40
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.Commit))
+ dAtA[i] = 0x4a
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.Snapshot.Size()))
+ n3, err := m.Snapshot.MarshalTo(dAtA[i:])
+ if err != nil {
+ return 0, err
+ }
+ i += n3
+ dAtA[i] = 0x50
+ i++
+ if m.Reject {
+ dAtA[i] = 1
+ } else {
+ dAtA[i] = 0
+ }
+ i++
+ dAtA[i] = 0x58
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.RejectHint))
+ if m.Context != nil {
+ dAtA[i] = 0x62
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(len(m.Context)))
+ i += copy(dAtA[i:], m.Context)
+ }
+ if m.XXX_unrecognized != nil {
+ i += copy(dAtA[i:], m.XXX_unrecognized)
+ }
+ return i, nil
+}
+
+func (m *HardState) Marshal() (dAtA []byte, err error) {
+ size := m.Size()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalTo(dAtA)
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *HardState) MarshalTo(dAtA []byte) (int, error) {
+ var i int
+ _ = i
+ var l int
+ _ = l
+ dAtA[i] = 0x8
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.Term))
+ dAtA[i] = 0x10
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.Vote))
+ dAtA[i] = 0x18
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.Commit))
+ if m.XXX_unrecognized != nil {
+ i += copy(dAtA[i:], m.XXX_unrecognized)
+ }
+ return i, nil
+}
+
+func (m *ConfState) Marshal() (dAtA []byte, err error) {
+ size := m.Size()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalTo(dAtA)
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *ConfState) MarshalTo(dAtA []byte) (int, error) {
+ var i int
+ _ = i
+ var l int
+ _ = l
+ if len(m.Nodes) > 0 {
+ for _, num := range m.Nodes {
+ dAtA[i] = 0x8
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(num))
+ }
+ }
+ if len(m.Learners) > 0 {
+ for _, num := range m.Learners {
+ dAtA[i] = 0x10
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(num))
+ }
+ }
+ if m.XXX_unrecognized != nil {
+ i += copy(dAtA[i:], m.XXX_unrecognized)
+ }
+ return i, nil
+}
+
+func (m *ConfChange) Marshal() (dAtA []byte, err error) {
+ size := m.Size()
+ dAtA = make([]byte, size)
+ n, err := m.MarshalTo(dAtA)
+ if err != nil {
+ return nil, err
+ }
+ return dAtA[:n], nil
+}
+
+func (m *ConfChange) MarshalTo(dAtA []byte) (int, error) {
+ var i int
+ _ = i
+ var l int
+ _ = l
+ dAtA[i] = 0x8
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.ID))
+ dAtA[i] = 0x10
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.Type))
+ dAtA[i] = 0x18
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(m.NodeID))
+ if m.Context != nil {
+ dAtA[i] = 0x22
+ i++
+ i = encodeVarintRaft(dAtA, i, uint64(len(m.Context)))
+ i += copy(dAtA[i:], m.Context)
+ }
+ if m.XXX_unrecognized != nil {
+ i += copy(dAtA[i:], m.XXX_unrecognized)
+ }
+ return i, nil
+}
+
+func encodeVarintRaft(dAtA []byte, offset int, v uint64) int {
+ for v >= 1<<7 {
+ dAtA[offset] = uint8(v&0x7f | 0x80)
+ v >>= 7
+ offset++
+ }
+ dAtA[offset] = uint8(v)
+ return offset + 1
+}
+func (m *Entry) Size() (n int) {
+ var l int
+ _ = l
+ n += 1 + sovRaft(uint64(m.Type))
+ n += 1 + sovRaft(uint64(m.Term))
+ n += 1 + sovRaft(uint64(m.Index))
+ if m.Data != nil {
+ l = len(m.Data)
+ n += 1 + l + sovRaft(uint64(l))
+ }
+ if m.XXX_unrecognized != nil {
+ n += len(m.XXX_unrecognized)
+ }
+ return n
+}
+
+func (m *SnapshotMetadata) Size() (n int) {
+ var l int
+ _ = l
+ l = m.ConfState.Size()
+ n += 1 + l + sovRaft(uint64(l))
+ n += 1 + sovRaft(uint64(m.Index))
+ n += 1 + sovRaft(uint64(m.Term))
+ if m.XXX_unrecognized != nil {
+ n += len(m.XXX_unrecognized)
+ }
+ return n
+}
+
+func (m *Snapshot) Size() (n int) {
+ var l int
+ _ = l
+ if m.Data != nil {
+ l = len(m.Data)
+ n += 1 + l + sovRaft(uint64(l))
+ }
+ l = m.Metadata.Size()
+ n += 1 + l + sovRaft(uint64(l))
+ if m.XXX_unrecognized != nil {
+ n += len(m.XXX_unrecognized)
+ }
+ return n
+}
+
+func (m *Message) Size() (n int) {
+ var l int
+ _ = l
+ n += 1 + sovRaft(uint64(m.Type))
+ n += 1 + sovRaft(uint64(m.To))
+ n += 1 + sovRaft(uint64(m.From))
+ n += 1 + sovRaft(uint64(m.Term))
+ n += 1 + sovRaft(uint64(m.LogTerm))
+ n += 1 + sovRaft(uint64(m.Index))
+ if len(m.Entries) > 0 {
+ for _, e := range m.Entries {
+ l = e.Size()
+ n += 1 + l + sovRaft(uint64(l))
+ }
+ }
+ n += 1 + sovRaft(uint64(m.Commit))
+ l = m.Snapshot.Size()
+ n += 1 + l + sovRaft(uint64(l))
+ n += 2
+ n += 1 + sovRaft(uint64(m.RejectHint))
+ if m.Context != nil {
+ l = len(m.Context)
+ n += 1 + l + sovRaft(uint64(l))
+ }
+ if m.XXX_unrecognized != nil {
+ n += len(m.XXX_unrecognized)
+ }
+ return n
+}
+
+func (m *HardState) Size() (n int) {
+ var l int
+ _ = l
+ n += 1 + sovRaft(uint64(m.Term))
+ n += 1 + sovRaft(uint64(m.Vote))
+ n += 1 + sovRaft(uint64(m.Commit))
+ if m.XXX_unrecognized != nil {
+ n += len(m.XXX_unrecognized)
+ }
+ return n
+}
+
+func (m *ConfState) Size() (n int) {
+ var l int
+ _ = l
+ if len(m.Nodes) > 0 {
+ for _, e := range m.Nodes {
+ n += 1 + sovRaft(uint64(e))
+ }
+ }
+ if len(m.Learners) > 0 {
+ for _, e := range m.Learners {
+ n += 1 + sovRaft(uint64(e))
+ }
+ }
+ if m.XXX_unrecognized != nil {
+ n += len(m.XXX_unrecognized)
+ }
+ return n
+}
+
+func (m *ConfChange) Size() (n int) {
+ var l int
+ _ = l
+ n += 1 + sovRaft(uint64(m.ID))
+ n += 1 + sovRaft(uint64(m.Type))
+ n += 1 + sovRaft(uint64(m.NodeID))
+ if m.Context != nil {
+ l = len(m.Context)
+ n += 1 + l + sovRaft(uint64(l))
+ }
+ if m.XXX_unrecognized != nil {
+ n += len(m.XXX_unrecognized)
+ }
+ return n
+}
+
+func sovRaft(x uint64) (n int) {
+ for {
+ n++
+ x >>= 7
+ if x == 0 {
+ break
+ }
+ }
+ return n
+}
+func sozRaft(x uint64) (n int) {
+ return sovRaft(uint64((x << 1) ^ uint64((int64(x) >> 63))))
+}
+func (m *Entry) Unmarshal(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: Entry: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: Entry: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType)
+ }
+ m.Type = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Type |= (EntryType(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 2:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Term", wireType)
+ }
+ m.Term = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Term |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 3:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType)
+ }
+ m.Index = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Index |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 4:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType)
+ }
+ var byteLen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ byteLen |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if byteLen < 0 {
+ return ErrInvalidLengthRaft
+ }
+ postIndex := iNdEx + byteLen
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...)
+ if m.Data == nil {
+ m.Data = []byte{}
+ }
+ iNdEx = postIndex
+ default:
+ iNdEx = preIndex
+ skippy, err := skipRaft(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if skippy < 0 {
+ return ErrInvalidLengthRaft
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
+func (m *SnapshotMetadata) Unmarshal(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: SnapshotMetadata: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: SnapshotMetadata: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field ConfState", wireType)
+ }
+ var msglen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ msglen |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if msglen < 0 {
+ return ErrInvalidLengthRaft
+ }
+ postIndex := iNdEx + msglen
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ if err := m.ConfState.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
+ return err
+ }
+ iNdEx = postIndex
+ case 2:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType)
+ }
+ m.Index = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Index |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 3:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Term", wireType)
+ }
+ m.Term = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Term |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ default:
+ iNdEx = preIndex
+ skippy, err := skipRaft(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if skippy < 0 {
+ return ErrInvalidLengthRaft
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
+func (m *Snapshot) Unmarshal(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: Snapshot: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: Snapshot: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType)
+ }
+ var byteLen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ byteLen |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if byteLen < 0 {
+ return ErrInvalidLengthRaft
+ }
+ postIndex := iNdEx + byteLen
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...)
+ if m.Data == nil {
+ m.Data = []byte{}
+ }
+ iNdEx = postIndex
+ case 2:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Metadata", wireType)
+ }
+ var msglen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ msglen |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if msglen < 0 {
+ return ErrInvalidLengthRaft
+ }
+ postIndex := iNdEx + msglen
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ if err := m.Metadata.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
+ return err
+ }
+ iNdEx = postIndex
+ default:
+ iNdEx = preIndex
+ skippy, err := skipRaft(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if skippy < 0 {
+ return ErrInvalidLengthRaft
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
+func (m *Message) Unmarshal(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: Message: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: Message: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType)
+ }
+ m.Type = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Type |= (MessageType(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 2:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field To", wireType)
+ }
+ m.To = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.To |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 3:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field From", wireType)
+ }
+ m.From = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.From |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 4:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Term", wireType)
+ }
+ m.Term = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Term |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 5:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field LogTerm", wireType)
+ }
+ m.LogTerm = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.LogTerm |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 6:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Index", wireType)
+ }
+ m.Index = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Index |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 7:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Entries", wireType)
+ }
+ var msglen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ msglen |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if msglen < 0 {
+ return ErrInvalidLengthRaft
+ }
+ postIndex := iNdEx + msglen
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.Entries = append(m.Entries, Entry{})
+ if err := m.Entries[len(m.Entries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
+ return err
+ }
+ iNdEx = postIndex
+ case 8:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Commit", wireType)
+ }
+ m.Commit = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Commit |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 9:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Snapshot", wireType)
+ }
+ var msglen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ msglen |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if msglen < 0 {
+ return ErrInvalidLengthRaft
+ }
+ postIndex := iNdEx + msglen
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ if err := m.Snapshot.Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
+ return err
+ }
+ iNdEx = postIndex
+ case 10:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Reject", wireType)
+ }
+ var v int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ v |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ m.Reject = bool(v != 0)
+ case 11:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field RejectHint", wireType)
+ }
+ m.RejectHint = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.RejectHint |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 12:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Context", wireType)
+ }
+ var byteLen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ byteLen |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if byteLen < 0 {
+ return ErrInvalidLengthRaft
+ }
+ postIndex := iNdEx + byteLen
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.Context = append(m.Context[:0], dAtA[iNdEx:postIndex]...)
+ if m.Context == nil {
+ m.Context = []byte{}
+ }
+ iNdEx = postIndex
+ default:
+ iNdEx = preIndex
+ skippy, err := skipRaft(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if skippy < 0 {
+ return ErrInvalidLengthRaft
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
+func (m *HardState) Unmarshal(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: HardState: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: HardState: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Term", wireType)
+ }
+ m.Term = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Term |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 2:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Vote", wireType)
+ }
+ m.Vote = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Vote |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 3:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Commit", wireType)
+ }
+ m.Commit = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Commit |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ default:
+ iNdEx = preIndex
+ skippy, err := skipRaft(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if skippy < 0 {
+ return ErrInvalidLengthRaft
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
+func (m *ConfState) Unmarshal(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: ConfState: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: ConfState: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType == 0 {
+ var v uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ v |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ m.Nodes = append(m.Nodes, v)
+ } else if wireType == 2 {
+ var packedLen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ packedLen |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if packedLen < 0 {
+ return ErrInvalidLengthRaft
+ }
+ postIndex := iNdEx + packedLen
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ for iNdEx < postIndex {
+ var v uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ v |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ m.Nodes = append(m.Nodes, v)
+ }
+ } else {
+ return fmt.Errorf("proto: wrong wireType = %d for field Nodes", wireType)
+ }
+ case 2:
+ if wireType == 0 {
+ var v uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ v |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ m.Learners = append(m.Learners, v)
+ } else if wireType == 2 {
+ var packedLen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ packedLen |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if packedLen < 0 {
+ return ErrInvalidLengthRaft
+ }
+ postIndex := iNdEx + packedLen
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ for iNdEx < postIndex {
+ var v uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ v |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ m.Learners = append(m.Learners, v)
+ }
+ } else {
+ return fmt.Errorf("proto: wrong wireType = %d for field Learners", wireType)
+ }
+ default:
+ iNdEx = preIndex
+ skippy, err := skipRaft(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if skippy < 0 {
+ return ErrInvalidLengthRaft
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
+func (m *ConfChange) Unmarshal(dAtA []byte) error {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ preIndex := iNdEx
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ fieldNum := int32(wire >> 3)
+ wireType := int(wire & 0x7)
+ if wireType == 4 {
+ return fmt.Errorf("proto: ConfChange: wiretype end group for non-group")
+ }
+ if fieldNum <= 0 {
+ return fmt.Errorf("proto: ConfChange: illegal tag %d (wire type %d)", fieldNum, wire)
+ }
+ switch fieldNum {
+ case 1:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field ID", wireType)
+ }
+ m.ID = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.ID |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 2:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType)
+ }
+ m.Type = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.Type |= (ConfChangeType(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 3:
+ if wireType != 0 {
+ return fmt.Errorf("proto: wrong wireType = %d for field NodeID", wireType)
+ }
+ m.NodeID = 0
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ m.NodeID |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ case 4:
+ if wireType != 2 {
+ return fmt.Errorf("proto: wrong wireType = %d for field Context", wireType)
+ }
+ var byteLen int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ byteLen |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ if byteLen < 0 {
+ return ErrInvalidLengthRaft
+ }
+ postIndex := iNdEx + byteLen
+ if postIndex > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.Context = append(m.Context[:0], dAtA[iNdEx:postIndex]...)
+ if m.Context == nil {
+ m.Context = []byte{}
+ }
+ iNdEx = postIndex
+ default:
+ iNdEx = preIndex
+ skippy, err := skipRaft(dAtA[iNdEx:])
+ if err != nil {
+ return err
+ }
+ if skippy < 0 {
+ return ErrInvalidLengthRaft
+ }
+ if (iNdEx + skippy) > l {
+ return io.ErrUnexpectedEOF
+ }
+ m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...)
+ iNdEx += skippy
+ }
+ }
+
+ if iNdEx > l {
+ return io.ErrUnexpectedEOF
+ }
+ return nil
+}
+func skipRaft(dAtA []byte) (n int, err error) {
+ l := len(dAtA)
+ iNdEx := 0
+ for iNdEx < l {
+ var wire uint64
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return 0, ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return 0, io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ wire |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ wireType := int(wire & 0x7)
+ switch wireType {
+ case 0:
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return 0, ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return 0, io.ErrUnexpectedEOF
+ }
+ iNdEx++
+ if dAtA[iNdEx-1] < 0x80 {
+ break
+ }
+ }
+ return iNdEx, nil
+ case 1:
+ iNdEx += 8
+ return iNdEx, nil
+ case 2:
+ var length int
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return 0, ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return 0, io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ length |= (int(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ iNdEx += length
+ if length < 0 {
+ return 0, ErrInvalidLengthRaft
+ }
+ return iNdEx, nil
+ case 3:
+ for {
+ var innerWire uint64
+ var start int = iNdEx
+ for shift := uint(0); ; shift += 7 {
+ if shift >= 64 {
+ return 0, ErrIntOverflowRaft
+ }
+ if iNdEx >= l {
+ return 0, io.ErrUnexpectedEOF
+ }
+ b := dAtA[iNdEx]
+ iNdEx++
+ innerWire |= (uint64(b) & 0x7F) << shift
+ if b < 0x80 {
+ break
+ }
+ }
+ innerWireType := int(innerWire & 0x7)
+ if innerWireType == 4 {
+ break
+ }
+ next, err := skipRaft(dAtA[start:])
+ if err != nil {
+ return 0, err
+ }
+ iNdEx = start + next
+ }
+ return iNdEx, nil
+ case 4:
+ return iNdEx, nil
+ case 5:
+ iNdEx += 4
+ return iNdEx, nil
+ default:
+ return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
+ }
+ }
+ panic("unreachable")
+}
+
+var (
+ ErrInvalidLengthRaft = fmt.Errorf("proto: negative length found during unmarshaling")
+ ErrIntOverflowRaft = fmt.Errorf("proto: integer overflow")
+)
+
+func init() { proto.RegisterFile("raft.proto", fileDescriptorRaft) }
+
+var fileDescriptorRaft = []byte{
+ // 815 bytes of a gzipped FileDescriptorProto
+ 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x64, 0x54, 0xcd, 0x6e, 0x23, 0x45,
+ 0x10, 0xf6, 0x8c, 0xc7, 0x7f, 0x35, 0x8e, 0xd3, 0xa9, 0x35, 0xa8, 0x15, 0x45, 0xc6, 0xb2, 0x38,
+ 0x58, 0x41, 0x1b, 0x20, 0x07, 0x0e, 0x48, 0x1c, 0x36, 0x09, 0x52, 0x22, 0xad, 0xa3, 0xc5, 0x9b,
+ 0xe5, 0x80, 0x84, 0x50, 0xc7, 0x53, 0x9e, 0x18, 0x32, 0xd3, 0xa3, 0x9e, 0xf6, 0xb2, 0xb9, 0x20,
+ 0x1e, 0x80, 0x07, 0xe0, 0xc2, 0xfb, 0xe4, 0xb8, 0x12, 0x77, 0xc4, 0x86, 0x17, 0x41, 0xdd, 0xd3,
+ 0x63, 0xcf, 0x24, 0xb7, 0xae, 0xef, 0xab, 0xae, 0xfa, 0xea, 0xeb, 0x9a, 0x01, 0x50, 0x62, 0xa9,
+ 0x8f, 0x32, 0x25, 0xb5, 0xc4, 0xb6, 0x39, 0x67, 0xd7, 0xfb, 0xc3, 0x58, 0xc6, 0xd2, 0x42, 0x9f,
+ 0x9b, 0x53, 0xc1, 0x4e, 0x7e, 0x83, 0xd6, 0xb7, 0xa9, 0x56, 0x77, 0xf8, 0x19, 0x04, 0x57, 0x77,
+ 0x19, 0x71, 0x6f, 0xec, 0x4d, 0x07, 0xc7, 0x7b, 0x47, 0xc5, 0xad, 0x23, 0x4b, 0x1a, 0xe2, 0x24,
+ 0xb8, 0xff, 0xe7, 0x93, 0xc6, 0xdc, 0x26, 0x21, 0x87, 0xe0, 0x8a, 0x54, 0xc2, 0xfd, 0xb1, 0x37,
+ 0x0d, 0x36, 0x0c, 0xa9, 0x04, 0xf7, 0xa1, 0x75, 0x91, 0x46, 0xf4, 0x8e, 0x37, 0x2b, 0x54, 0x01,
+ 0x21, 0x42, 0x70, 0x26, 0xb4, 0xe0, 0xc1, 0xd8, 0x9b, 0xf6, 0xe7, 0xf6, 0x3c, 0xf9, 0xdd, 0x03,
+ 0xf6, 0x3a, 0x15, 0x59, 0x7e, 0x23, 0xf5, 0x8c, 0xb4, 0x88, 0x84, 0x16, 0xf8, 0x15, 0xc0, 0x42,
+ 0xa6, 0xcb, 0x9f, 0x72, 0x2d, 0x74, 0xa1, 0x28, 0xdc, 0x2a, 0x3a, 0x95, 0xe9, 0xf2, 0xb5, 0x21,
+ 0x5c, 0xf1, 0xde, 0xa2, 0x04, 0x4c, 0xf3, 0x95, 0x6d, 0x5e, 0xd5, 0x55, 0x40, 0x46, 0xb2, 0x36,
+ 0x92, 0xab, 0xba, 0x2c, 0x32, 0xf9, 0x01, 0xba, 0xa5, 0x02, 0x23, 0xd1, 0x28, 0xb0, 0x3d, 0xfb,
+ 0x73, 0x7b, 0xc6, 0xaf, 0xa1, 0x9b, 0x38, 0x65, 0xb6, 0x70, 0x78, 0xcc, 0x4b, 0x2d, 0x8f, 0x95,
+ 0xbb, 0xba, 0x9b, 0xfc, 0xc9, 0x5f, 0x4d, 0xe8, 0xcc, 0x28, 0xcf, 0x45, 0x4c, 0xf8, 0x1c, 0x02,
+ 0xbd, 0x75, 0xf8, 0x59, 0x59, 0xc3, 0xd1, 0x55, 0x8f, 0x4d, 0x1a, 0x0e, 0xc1, 0xd7, 0xb2, 0x36,
+ 0x89, 0xaf, 0xa5, 0x19, 0x63, 0xa9, 0xe4, 0xa3, 0x31, 0x0c, 0xb2, 0x19, 0x30, 0x78, 0x3c, 0x20,
+ 0x8e, 0xa0, 0x73, 0x2b, 0x63, 0xfb, 0x60, 0xad, 0x0a, 0x59, 0x82, 0x5b, 0xdb, 0xda, 0x4f, 0x6d,
+ 0x7b, 0x0e, 0x1d, 0x4a, 0xb5, 0x5a, 0x51, 0xce, 0x3b, 0xe3, 0xe6, 0x34, 0x3c, 0xde, 0xa9, 0x6d,
+ 0x46, 0x59, 0xca, 0xe5, 0xe0, 0x01, 0xb4, 0x17, 0x32, 0x49, 0x56, 0x9a, 0x77, 0x2b, 0xb5, 0x1c,
+ 0x86, 0xc7, 0xd0, 0xcd, 0x9d, 0x63, 0xbc, 0x67, 0x9d, 0x64, 0x8f, 0x9d, 0x2c, 0x1d, 0x2c, 0xf3,
+ 0x4c, 0x45, 0x45, 0x3f, 0xd3, 0x42, 0x73, 0x18, 0x7b, 0xd3, 0x6e, 0x59, 0xb1, 0xc0, 0xf0, 0x53,
+ 0x80, 0xe2, 0x74, 0xbe, 0x4a, 0x35, 0x0f, 0x2b, 0x3d, 0x2b, 0x38, 0x72, 0xe8, 0x2c, 0x64, 0xaa,
+ 0xe9, 0x9d, 0xe6, 0x7d, 0xfb, 0xb0, 0x65, 0x38, 0xf9, 0x11, 0x7a, 0xe7, 0x42, 0x45, 0xc5, 0xfa,
+ 0x94, 0x0e, 0x7a, 0x4f, 0x1c, 0xe4, 0x10, 0xbc, 0x95, 0x9a, 0xea, 0xfb, 0x6e, 0x90, 0xca, 0xc0,
+ 0xcd, 0xa7, 0x03, 0x4f, 0xbe, 0x81, 0xde, 0x66, 0x5d, 0x71, 0x08, 0xad, 0x54, 0x46, 0x94, 0x73,
+ 0x6f, 0xdc, 0x9c, 0x06, 0xf3, 0x22, 0xc0, 0x7d, 0xe8, 0xde, 0x92, 0x50, 0x29, 0xa9, 0x9c, 0xfb,
+ 0x96, 0xd8, 0xc4, 0x93, 0x3f, 0x3c, 0x00, 0x73, 0xff, 0xf4, 0x46, 0xa4, 0xb1, 0xdd, 0x88, 0x8b,
+ 0xb3, 0x9a, 0x3a, 0xff, 0xe2, 0x0c, 0xbf, 0x70, 0x1f, 0xae, 0x6f, 0xd7, 0xea, 0xe3, 0xea, 0x67,
+ 0x52, 0xdc, 0x7b, 0xf2, 0xf5, 0x1e, 0x40, 0xfb, 0x52, 0x46, 0x74, 0x71, 0x56, 0xd7, 0x5c, 0x60,
+ 0xc6, 0xac, 0x53, 0x67, 0x56, 0xf1, 0xa1, 0x96, 0xe1, 0xe1, 0x97, 0xd0, 0xdb, 0xfc, 0x0e, 0x70,
+ 0x17, 0x42, 0x1b, 0x5c, 0x4a, 0x95, 0x88, 0x5b, 0xd6, 0xc0, 0x67, 0xb0, 0x6b, 0x81, 0x6d, 0x63,
+ 0xe6, 0x1d, 0xfe, 0xed, 0x43, 0x58, 0x59, 0x70, 0x04, 0x68, 0xcf, 0xf2, 0xf8, 0x7c, 0x9d, 0xb1,
+ 0x06, 0x86, 0xd0, 0x99, 0xe5, 0xf1, 0x09, 0x09, 0xcd, 0x3c, 0x17, 0xbc, 0x52, 0x32, 0x63, 0xbe,
+ 0xcb, 0x7a, 0x91, 0x65, 0xac, 0x89, 0x03, 0x80, 0xe2, 0x3c, 0xa7, 0x3c, 0x63, 0x81, 0x4b, 0xfc,
+ 0x5e, 0x6a, 0x62, 0x2d, 0x23, 0xc2, 0x05, 0x96, 0x6d, 0x3b, 0xd6, 0x2c, 0x13, 0xeb, 0x20, 0x83,
+ 0xbe, 0x69, 0x46, 0x42, 0xe9, 0x6b, 0xd3, 0xa5, 0x8b, 0x43, 0x60, 0x55, 0xc4, 0x5e, 0xea, 0x21,
+ 0xc2, 0x60, 0x96, 0xc7, 0x6f, 0x52, 0x45, 0x62, 0x71, 0x23, 0xae, 0x6f, 0x89, 0x01, 0xee, 0xc1,
+ 0x8e, 0x2b, 0x64, 0x1e, 0x6f, 0x9d, 0xb3, 0xd0, 0xa5, 0x9d, 0xde, 0xd0, 0xe2, 0x97, 0xef, 0xd6,
+ 0x52, 0xad, 0x13, 0xd6, 0xc7, 0x8f, 0x60, 0x6f, 0x96, 0xc7, 0x57, 0x4a, 0xa4, 0xf9, 0x92, 0xd4,
+ 0x4b, 0x12, 0x11, 0x29, 0xb6, 0xe3, 0x6e, 0x5f, 0xad, 0x12, 0x92, 0x6b, 0x7d, 0x29, 0x7f, 0x65,
+ 0x03, 0x27, 0x66, 0x4e, 0x22, 0xb2, 0x3f, 0x43, 0xb6, 0xeb, 0xc4, 0x6c, 0x10, 0x2b, 0x86, 0xb9,
+ 0x79, 0x5f, 0x29, 0xb2, 0x23, 0xee, 0xb9, 0xae, 0x2e, 0xb6, 0x39, 0x78, 0x78, 0x07, 0x83, 0xfa,
+ 0xf3, 0x1a, 0x1d, 0x5b, 0xe4, 0x45, 0x14, 0x99, 0xb7, 0x64, 0x0d, 0xe4, 0x30, 0xdc, 0xc2, 0x73,
+ 0x4a, 0xe4, 0x5b, 0xb2, 0x8c, 0x57, 0x67, 0xde, 0x64, 0x91, 0xd0, 0x05, 0xe3, 0xe3, 0x01, 0xf0,
+ 0x5a, 0xa9, 0x97, 0xc5, 0x36, 0x5a, 0xb6, 0x79, 0xc2, 0xef, 0x3f, 0x8c, 0x1a, 0xef, 0x3f, 0x8c,
+ 0x1a, 0xf7, 0x0f, 0x23, 0xef, 0xfd, 0xc3, 0xc8, 0xfb, 0xf7, 0x61, 0xe4, 0xfd, 0xf9, 0xdf, 0xa8,
+ 0xf1, 0x7f, 0x00, 0x00, 0x00, 0xff, 0xff, 0x86, 0x52, 0x5b, 0xe0, 0x74, 0x06, 0x00, 0x00,
+}
diff --git a/vendor/go.etcd.io/etcd/raft/raftpb/raft.proto b/vendor/go.etcd.io/etcd/raft/raftpb/raft.proto
new file mode 100644
index 0000000..644ce7b
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/raftpb/raft.proto
@@ -0,0 +1,95 @@
+syntax = "proto2";
+package raftpb;
+
+import "gogoproto/gogo.proto";
+
+option (gogoproto.marshaler_all) = true;
+option (gogoproto.sizer_all) = true;
+option (gogoproto.unmarshaler_all) = true;
+option (gogoproto.goproto_getters_all) = false;
+option (gogoproto.goproto_enum_prefix_all) = false;
+
+enum EntryType {
+ EntryNormal = 0;
+ EntryConfChange = 1;
+}
+
+message Entry {
+ optional uint64 Term = 2 [(gogoproto.nullable) = false]; // must be 64-bit aligned for atomic operations
+ optional uint64 Index = 3 [(gogoproto.nullable) = false]; // must be 64-bit aligned for atomic operations
+ optional EntryType Type = 1 [(gogoproto.nullable) = false];
+ optional bytes Data = 4;
+}
+
+message SnapshotMetadata {
+ optional ConfState conf_state = 1 [(gogoproto.nullable) = false];
+ optional uint64 index = 2 [(gogoproto.nullable) = false];
+ optional uint64 term = 3 [(gogoproto.nullable) = false];
+}
+
+message Snapshot {
+ optional bytes data = 1;
+ optional SnapshotMetadata metadata = 2 [(gogoproto.nullable) = false];
+}
+
+enum MessageType {
+ MsgHup = 0;
+ MsgBeat = 1;
+ MsgProp = 2;
+ MsgApp = 3;
+ MsgAppResp = 4;
+ MsgVote = 5;
+ MsgVoteResp = 6;
+ MsgSnap = 7;
+ MsgHeartbeat = 8;
+ MsgHeartbeatResp = 9;
+ MsgUnreachable = 10;
+ MsgSnapStatus = 11;
+ MsgCheckQuorum = 12;
+ MsgTransferLeader = 13;
+ MsgTimeoutNow = 14;
+ MsgReadIndex = 15;
+ MsgReadIndexResp = 16;
+ MsgPreVote = 17;
+ MsgPreVoteResp = 18;
+}
+
+message Message {
+ optional MessageType type = 1 [(gogoproto.nullable) = false];
+ optional uint64 to = 2 [(gogoproto.nullable) = false];
+ optional uint64 from = 3 [(gogoproto.nullable) = false];
+ optional uint64 term = 4 [(gogoproto.nullable) = false];
+ optional uint64 logTerm = 5 [(gogoproto.nullable) = false];
+ optional uint64 index = 6 [(gogoproto.nullable) = false];
+ repeated Entry entries = 7 [(gogoproto.nullable) = false];
+ optional uint64 commit = 8 [(gogoproto.nullable) = false];
+ optional Snapshot snapshot = 9 [(gogoproto.nullable) = false];
+ optional bool reject = 10 [(gogoproto.nullable) = false];
+ optional uint64 rejectHint = 11 [(gogoproto.nullable) = false];
+ optional bytes context = 12;
+}
+
+message HardState {
+ optional uint64 term = 1 [(gogoproto.nullable) = false];
+ optional uint64 vote = 2 [(gogoproto.nullable) = false];
+ optional uint64 commit = 3 [(gogoproto.nullable) = false];
+}
+
+message ConfState {
+ repeated uint64 nodes = 1;
+ repeated uint64 learners = 2;
+}
+
+enum ConfChangeType {
+ ConfChangeAddNode = 0;
+ ConfChangeRemoveNode = 1;
+ ConfChangeUpdateNode = 2;
+ ConfChangeAddLearnerNode = 3;
+}
+
+message ConfChange {
+ optional uint64 ID = 1 [(gogoproto.nullable) = false];
+ optional ConfChangeType Type = 2 [(gogoproto.nullable) = false];
+ optional uint64 NodeID = 3 [(gogoproto.nullable) = false];
+ optional bytes Context = 4;
+}
diff --git a/vendor/go.etcd.io/etcd/raft/rawnode.go b/vendor/go.etcd.io/etcd/raft/rawnode.go
new file mode 100644
index 0000000..d7a272d
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/rawnode.go
@@ -0,0 +1,295 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raft
+
+import (
+ "errors"
+
+ pb "go.etcd.io/etcd/raft/raftpb"
+)
+
+// ErrStepLocalMsg is returned when try to step a local raft message
+var ErrStepLocalMsg = errors.New("raft: cannot step raft local message")
+
+// ErrStepPeerNotFound is returned when try to step a response message
+// but there is no peer found in raft.prs for that node.
+var ErrStepPeerNotFound = errors.New("raft: cannot step as peer not found")
+
+// RawNode is a thread-unsafe Node.
+// The methods of this struct correspond to the methods of Node and are described
+// more fully there.
+type RawNode struct {
+ raft *raft
+ prevSoftSt *SoftState
+ prevHardSt pb.HardState
+}
+
+func (rn *RawNode) newReady() Ready {
+ return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt)
+}
+
+func (rn *RawNode) commitReady(rd Ready) {
+ if rd.SoftState != nil {
+ rn.prevSoftSt = rd.SoftState
+ }
+ if !IsEmptyHardState(rd.HardState) {
+ rn.prevHardSt = rd.HardState
+ }
+
+ // If entries were applied (or a snapshot), update our cursor for
+ // the next Ready. Note that if the current HardState contains a
+ // new Commit index, this does not mean that we're also applying
+ // all of the new entries due to commit pagination by size.
+ if index := rd.appliedCursor(); index > 0 {
+ rn.raft.raftLog.appliedTo(index)
+ }
+
+ if len(rd.Entries) > 0 {
+ e := rd.Entries[len(rd.Entries)-1]
+ rn.raft.raftLog.stableTo(e.Index, e.Term)
+ }
+ if !IsEmptySnap(rd.Snapshot) {
+ rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
+ }
+ if len(rd.ReadStates) != 0 {
+ rn.raft.readStates = nil
+ }
+}
+
+// NewRawNode returns a new RawNode given configuration and a list of raft peers.
+func NewRawNode(config *Config, peers []Peer) (*RawNode, error) {
+ if config.ID == 0 {
+ panic("config.ID must not be zero")
+ }
+ r := newRaft(config)
+ rn := &RawNode{
+ raft: r,
+ }
+ lastIndex, err := config.Storage.LastIndex()
+ if err != nil {
+ panic(err) // TODO(bdarnell)
+ }
+ // If the log is empty, this is a new RawNode (like StartNode); otherwise it's
+ // restoring an existing RawNode (like RestartNode).
+ // TODO(bdarnell): rethink RawNode initialization and whether the application needs
+ // to be able to tell us when it expects the RawNode to exist.
+ if lastIndex == 0 {
+ r.becomeFollower(1, None)
+ ents := make([]pb.Entry, len(peers))
+ for i, peer := range peers {
+ cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
+ data, err := cc.Marshal()
+ if err != nil {
+ panic("unexpected marshal error")
+ }
+
+ ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
+ }
+ r.raftLog.append(ents...)
+ r.raftLog.committed = uint64(len(ents))
+ for _, peer := range peers {
+ r.addNode(peer.ID)
+ }
+ }
+
+ // Set the initial hard and soft states after performing all initialization.
+ rn.prevSoftSt = r.softState()
+ if lastIndex == 0 {
+ rn.prevHardSt = emptyState
+ } else {
+ rn.prevHardSt = r.hardState()
+ }
+
+ return rn, nil
+}
+
+// Tick advances the internal logical clock by a single tick.
+func (rn *RawNode) Tick() {
+ rn.raft.tick()
+}
+
+// TickQuiesced advances the internal logical clock by a single tick without
+// performing any other state machine processing. It allows the caller to avoid
+// periodic heartbeats and elections when all of the peers in a Raft group are
+// known to be at the same state. Expected usage is to periodically invoke Tick
+// or TickQuiesced depending on whether the group is "active" or "quiesced".
+//
+// WARNING: Be very careful about using this method as it subverts the Raft
+// state machine. You should probably be using Tick instead.
+func (rn *RawNode) TickQuiesced() {
+ rn.raft.electionElapsed++
+}
+
+// Campaign causes this RawNode to transition to candidate state.
+func (rn *RawNode) Campaign() error {
+ return rn.raft.Step(pb.Message{
+ Type: pb.MsgHup,
+ })
+}
+
+// Propose proposes data be appended to the raft log.
+func (rn *RawNode) Propose(data []byte) error {
+ return rn.raft.Step(pb.Message{
+ Type: pb.MsgProp,
+ From: rn.raft.id,
+ Entries: []pb.Entry{
+ {Data: data},
+ }})
+}
+
+// ProposeConfChange proposes a config change.
+func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error {
+ data, err := cc.Marshal()
+ if err != nil {
+ return err
+ }
+ return rn.raft.Step(pb.Message{
+ Type: pb.MsgProp,
+ Entries: []pb.Entry{
+ {Type: pb.EntryConfChange, Data: data},
+ },
+ })
+}
+
+// ApplyConfChange applies a config change to the local node.
+func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
+ if cc.NodeID == None {
+ return &pb.ConfState{Nodes: rn.raft.nodes(), Learners: rn.raft.learnerNodes()}
+ }
+ switch cc.Type {
+ case pb.ConfChangeAddNode:
+ rn.raft.addNode(cc.NodeID)
+ case pb.ConfChangeAddLearnerNode:
+ rn.raft.addLearner(cc.NodeID)
+ case pb.ConfChangeRemoveNode:
+ rn.raft.removeNode(cc.NodeID)
+ case pb.ConfChangeUpdateNode:
+ default:
+ panic("unexpected conf type")
+ }
+ return &pb.ConfState{Nodes: rn.raft.nodes(), Learners: rn.raft.learnerNodes()}
+}
+
+// Step advances the state machine using the given message.
+func (rn *RawNode) Step(m pb.Message) error {
+ // ignore unexpected local messages receiving over network
+ if IsLocalMsg(m.Type) {
+ return ErrStepLocalMsg
+ }
+ if pr := rn.raft.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
+ return rn.raft.Step(m)
+ }
+ return ErrStepPeerNotFound
+}
+
+// Ready returns the current point-in-time state of this RawNode.
+func (rn *RawNode) Ready() Ready {
+ rd := rn.newReady()
+ rn.raft.msgs = nil
+ rn.raft.reduceUncommittedSize(rd.CommittedEntries)
+ return rd
+}
+
+// HasReady called when RawNode user need to check if any Ready pending.
+// Checking logic in this method should be consistent with Ready.containsUpdates().
+func (rn *RawNode) HasReady() bool {
+ r := rn.raft
+ if !r.softState().equal(rn.prevSoftSt) {
+ return true
+ }
+ if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt) {
+ return true
+ }
+ if r.raftLog.unstable.snapshot != nil && !IsEmptySnap(*r.raftLog.unstable.snapshot) {
+ return true
+ }
+ if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() {
+ return true
+ }
+ if len(r.readStates) != 0 {
+ return true
+ }
+ return false
+}
+
+// Advance notifies the RawNode that the application has applied and saved progress in the
+// last Ready results.
+func (rn *RawNode) Advance(rd Ready) {
+ rn.commitReady(rd)
+}
+
+// Status returns the current status of the given group.
+func (rn *RawNode) Status() *Status {
+ status := getStatus(rn.raft)
+ return &status
+}
+
+// StatusWithoutProgress returns a Status without populating the Progress field
+// (and returns the Status as a value to avoid forcing it onto the heap). This
+// is more performant if the Progress is not required. See WithProgress for an
+// allocation-free way to introspect the Progress.
+func (rn *RawNode) StatusWithoutProgress() Status {
+ return getStatusWithoutProgress(rn.raft)
+}
+
+// ProgressType indicates the type of replica a Progress corresponds to.
+type ProgressType byte
+
+const (
+ // ProgressTypePeer accompanies a Progress for a regular peer replica.
+ ProgressTypePeer ProgressType = iota
+ // ProgressTypeLearner accompanies a Progress for a learner replica.
+ ProgressTypeLearner
+)
+
+// WithProgress is a helper to introspect the Progress for this node and its
+// peers.
+func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr Progress)) {
+ for id, pr := range rn.raft.prs {
+ pr := *pr
+ pr.ins = nil
+ visitor(id, ProgressTypePeer, pr)
+ }
+ for id, pr := range rn.raft.learnerPrs {
+ pr := *pr
+ pr.ins = nil
+ visitor(id, ProgressTypeLearner, pr)
+ }
+}
+
+// ReportUnreachable reports the given node is not reachable for the last send.
+func (rn *RawNode) ReportUnreachable(id uint64) {
+ _ = rn.raft.Step(pb.Message{Type: pb.MsgUnreachable, From: id})
+}
+
+// ReportSnapshot reports the status of the sent snapshot.
+func (rn *RawNode) ReportSnapshot(id uint64, status SnapshotStatus) {
+ rej := status == SnapshotFailure
+
+ _ = rn.raft.Step(pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej})
+}
+
+// TransferLeader tries to transfer leadership to the given transferee.
+func (rn *RawNode) TransferLeader(transferee uint64) {
+ _ = rn.raft.Step(pb.Message{Type: pb.MsgTransferLeader, From: transferee})
+}
+
+// ReadIndex requests a read state. The read state will be set in ready.
+// Read State has a read index. Once the application advances further than the read
+// index, any linearizable read requests issued before the read request can be
+// processed safely. The read state will have the same rctx attached.
+func (rn *RawNode) ReadIndex(rctx []byte) {
+ _ = rn.raft.Step(pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
+}
diff --git a/vendor/go.etcd.io/etcd/raft/read_only.go b/vendor/go.etcd.io/etcd/raft/read_only.go
new file mode 100644
index 0000000..aecc6b2
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/read_only.go
@@ -0,0 +1,118 @@
+// Copyright 2016 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raft
+
+import pb "go.etcd.io/etcd/raft/raftpb"
+
+// ReadState provides state for read only query.
+// It's caller's responsibility to call ReadIndex first before getting
+// this state from ready, it's also caller's duty to differentiate if this
+// state is what it requests through RequestCtx, eg. given a unique id as
+// RequestCtx
+type ReadState struct {
+ Index uint64
+ RequestCtx []byte
+}
+
+type readIndexStatus struct {
+ req pb.Message
+ index uint64
+ acks map[uint64]struct{}
+}
+
+type readOnly struct {
+ option ReadOnlyOption
+ pendingReadIndex map[string]*readIndexStatus
+ readIndexQueue []string
+}
+
+func newReadOnly(option ReadOnlyOption) *readOnly {
+ return &readOnly{
+ option: option,
+ pendingReadIndex: make(map[string]*readIndexStatus),
+ }
+}
+
+// addRequest adds a read only reuqest into readonly struct.
+// `index` is the commit index of the raft state machine when it received
+// the read only request.
+// `m` is the original read only request message from the local or remote node.
+func (ro *readOnly) addRequest(index uint64, m pb.Message) {
+ ctx := string(m.Entries[0].Data)
+ if _, ok := ro.pendingReadIndex[ctx]; ok {
+ return
+ }
+ ro.pendingReadIndex[ctx] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}
+ ro.readIndexQueue = append(ro.readIndexQueue, ctx)
+}
+
+// recvAck notifies the readonly struct that the raft state machine received
+// an acknowledgment of the heartbeat that attached with the read only request
+// context.
+func (ro *readOnly) recvAck(m pb.Message) int {
+ rs, ok := ro.pendingReadIndex[string(m.Context)]
+ if !ok {
+ return 0
+ }
+
+ rs.acks[m.From] = struct{}{}
+ // add one to include an ack from local node
+ return len(rs.acks) + 1
+}
+
+// advance advances the read only request queue kept by the readonly struct.
+// It dequeues the requests until it finds the read only request that has
+// the same context as the given `m`.
+func (ro *readOnly) advance(m pb.Message) []*readIndexStatus {
+ var (
+ i int
+ found bool
+ )
+
+ ctx := string(m.Context)
+ rss := []*readIndexStatus{}
+
+ for _, okctx := range ro.readIndexQueue {
+ i++
+ rs, ok := ro.pendingReadIndex[okctx]
+ if !ok {
+ panic("cannot find corresponding read state from pending map")
+ }
+ rss = append(rss, rs)
+ if okctx == ctx {
+ found = true
+ break
+ }
+ }
+
+ if found {
+ ro.readIndexQueue = ro.readIndexQueue[i:]
+ for _, rs := range rss {
+ delete(ro.pendingReadIndex, string(rs.req.Entries[0].Data))
+ }
+ return rss
+ }
+
+ return nil
+}
+
+// lastPendingRequestCtx returns the context of the last pending read only
+// request in readonly struct.
+func (ro *readOnly) lastPendingRequestCtx() string {
+ if len(ro.readIndexQueue) == 0 {
+ return ""
+ }
+ return ro.readIndexQueue[len(ro.readIndexQueue)-1]
+}
diff --git a/vendor/go.etcd.io/etcd/raft/status.go b/vendor/go.etcd.io/etcd/raft/status.go
new file mode 100644
index 0000000..9feca7c
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/status.go
@@ -0,0 +1,94 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raft
+
+import (
+ "fmt"
+
+ pb "go.etcd.io/etcd/raft/raftpb"
+)
+
+type Status struct {
+ ID uint64
+
+ pb.HardState
+ SoftState
+
+ Applied uint64
+ Progress map[uint64]Progress
+
+ LeadTransferee uint64
+}
+
+func getProgressCopy(r *raft) map[uint64]Progress {
+ prs := make(map[uint64]Progress)
+ for id, p := range r.prs {
+ prs[id] = *p
+ }
+
+ for id, p := range r.learnerPrs {
+ prs[id] = *p
+ }
+ return prs
+}
+
+func getStatusWithoutProgress(r *raft) Status {
+ s := Status{
+ ID: r.id,
+ LeadTransferee: r.leadTransferee,
+ }
+ s.HardState = r.hardState()
+ s.SoftState = *r.softState()
+ s.Applied = r.raftLog.applied
+ return s
+}
+
+// getStatus gets a copy of the current raft status.
+func getStatus(r *raft) Status {
+ s := getStatusWithoutProgress(r)
+ if s.RaftState == StateLeader {
+ s.Progress = getProgressCopy(r)
+ }
+ return s
+}
+
+// MarshalJSON translates the raft status into JSON.
+// TODO: try to simplify this by introducing ID type into raft
+func (s Status) MarshalJSON() ([]byte, error) {
+ j := fmt.Sprintf(`{"id":"%x","term":%d,"vote":"%x","commit":%d,"lead":"%x","raftState":%q,"applied":%d,"progress":{`,
+ s.ID, s.Term, s.Vote, s.Commit, s.Lead, s.RaftState, s.Applied)
+
+ if len(s.Progress) == 0 {
+ j += "},"
+ } else {
+ for k, v := range s.Progress {
+ subj := fmt.Sprintf(`"%x":{"match":%d,"next":%d,"state":%q},`, k, v.Match, v.Next, v.State)
+ j += subj
+ }
+ // remove the trailing ","
+ j = j[:len(j)-1] + "},"
+ }
+
+ j += fmt.Sprintf(`"leadtransferee":"%x"}`, s.LeadTransferee)
+ return []byte(j), nil
+}
+
+func (s Status) String() string {
+ b, err := s.MarshalJSON()
+ if err != nil {
+ raftLogger.Panicf("unexpected error: %v", err)
+ }
+ return string(b)
+}
diff --git a/vendor/go.etcd.io/etcd/raft/storage.go b/vendor/go.etcd.io/etcd/raft/storage.go
new file mode 100644
index 0000000..14ad686
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/storage.go
@@ -0,0 +1,271 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raft
+
+import (
+ "errors"
+ "sync"
+
+ pb "go.etcd.io/etcd/raft/raftpb"
+)
+
+// ErrCompacted is returned by Storage.Entries/Compact when a requested
+// index is unavailable because it predates the last snapshot.
+var ErrCompacted = errors.New("requested index is unavailable due to compaction")
+
+// ErrSnapOutOfDate is returned by Storage.CreateSnapshot when a requested
+// index is older than the existing snapshot.
+var ErrSnapOutOfDate = errors.New("requested index is older than the existing snapshot")
+
+// ErrUnavailable is returned by Storage interface when the requested log entries
+// are unavailable.
+var ErrUnavailable = errors.New("requested entry at index is unavailable")
+
+// ErrSnapshotTemporarilyUnavailable is returned by the Storage interface when the required
+// snapshot is temporarily unavailable.
+var ErrSnapshotTemporarilyUnavailable = errors.New("snapshot is temporarily unavailable")
+
+// Storage is an interface that may be implemented by the application
+// to retrieve log entries from storage.
+//
+// If any Storage method returns an error, the raft instance will
+// become inoperable and refuse to participate in elections; the
+// application is responsible for cleanup and recovery in this case.
+type Storage interface {
+ // InitialState returns the saved HardState and ConfState information.
+ InitialState() (pb.HardState, pb.ConfState, error)
+ // Entries returns a slice of log entries in the range [lo,hi).
+ // MaxSize limits the total size of the log entries returned, but
+ // Entries returns at least one entry if any.
+ Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
+ // Term returns the term of entry i, which must be in the range
+ // [FirstIndex()-1, LastIndex()]. The term of the entry before
+ // FirstIndex is retained for matching purposes even though the
+ // rest of that entry may not be available.
+ Term(i uint64) (uint64, error)
+ // LastIndex returns the index of the last entry in the log.
+ LastIndex() (uint64, error)
+ // FirstIndex returns the index of the first log entry that is
+ // possibly available via Entries (older entries have been incorporated
+ // into the latest Snapshot; if storage only contains the dummy entry the
+ // first log entry is not available).
+ FirstIndex() (uint64, error)
+ // Snapshot returns the most recent snapshot.
+ // If snapshot is temporarily unavailable, it should return ErrSnapshotTemporarilyUnavailable,
+ // so raft state machine could know that Storage needs some time to prepare
+ // snapshot and call Snapshot later.
+ Snapshot() (pb.Snapshot, error)
+}
+
+// MemoryStorage implements the Storage interface backed by an
+// in-memory array.
+type MemoryStorage struct {
+ // Protects access to all fields. Most methods of MemoryStorage are
+ // run on the raft goroutine, but Append() is run on an application
+ // goroutine.
+ sync.Mutex
+
+ hardState pb.HardState
+ snapshot pb.Snapshot
+ // ents[i] has raft log position i+snapshot.Metadata.Index
+ ents []pb.Entry
+}
+
+// NewMemoryStorage creates an empty MemoryStorage.
+func NewMemoryStorage() *MemoryStorage {
+ return &MemoryStorage{
+ // When starting from scratch populate the list with a dummy entry at term zero.
+ ents: make([]pb.Entry, 1),
+ }
+}
+
+// InitialState implements the Storage interface.
+func (ms *MemoryStorage) InitialState() (pb.HardState, pb.ConfState, error) {
+ return ms.hardState, ms.snapshot.Metadata.ConfState, nil
+}
+
+// SetHardState saves the current HardState.
+func (ms *MemoryStorage) SetHardState(st pb.HardState) error {
+ ms.Lock()
+ defer ms.Unlock()
+ ms.hardState = st
+ return nil
+}
+
+// Entries implements the Storage interface.
+func (ms *MemoryStorage) Entries(lo, hi, maxSize uint64) ([]pb.Entry, error) {
+ ms.Lock()
+ defer ms.Unlock()
+ offset := ms.ents[0].Index
+ if lo <= offset {
+ return nil, ErrCompacted
+ }
+ if hi > ms.lastIndex()+1 {
+ raftLogger.Panicf("entries' hi(%d) is out of bound lastindex(%d)", hi, ms.lastIndex())
+ }
+ // only contains dummy entries.
+ if len(ms.ents) == 1 {
+ return nil, ErrUnavailable
+ }
+
+ ents := ms.ents[lo-offset : hi-offset]
+ return limitSize(ents, maxSize), nil
+}
+
+// Term implements the Storage interface.
+func (ms *MemoryStorage) Term(i uint64) (uint64, error) {
+ ms.Lock()
+ defer ms.Unlock()
+ offset := ms.ents[0].Index
+ if i < offset {
+ return 0, ErrCompacted
+ }
+ if int(i-offset) >= len(ms.ents) {
+ return 0, ErrUnavailable
+ }
+ return ms.ents[i-offset].Term, nil
+}
+
+// LastIndex implements the Storage interface.
+func (ms *MemoryStorage) LastIndex() (uint64, error) {
+ ms.Lock()
+ defer ms.Unlock()
+ return ms.lastIndex(), nil
+}
+
+func (ms *MemoryStorage) lastIndex() uint64 {
+ return ms.ents[0].Index + uint64(len(ms.ents)) - 1
+}
+
+// FirstIndex implements the Storage interface.
+func (ms *MemoryStorage) FirstIndex() (uint64, error) {
+ ms.Lock()
+ defer ms.Unlock()
+ return ms.firstIndex(), nil
+}
+
+func (ms *MemoryStorage) firstIndex() uint64 {
+ return ms.ents[0].Index + 1
+}
+
+// Snapshot implements the Storage interface.
+func (ms *MemoryStorage) Snapshot() (pb.Snapshot, error) {
+ ms.Lock()
+ defer ms.Unlock()
+ return ms.snapshot, nil
+}
+
+// ApplySnapshot overwrites the contents of this Storage object with
+// those of the given snapshot.
+func (ms *MemoryStorage) ApplySnapshot(snap pb.Snapshot) error {
+ ms.Lock()
+ defer ms.Unlock()
+
+ //handle check for old snapshot being applied
+ msIndex := ms.snapshot.Metadata.Index
+ snapIndex := snap.Metadata.Index
+ if msIndex >= snapIndex {
+ return ErrSnapOutOfDate
+ }
+
+ ms.snapshot = snap
+ ms.ents = []pb.Entry{{Term: snap.Metadata.Term, Index: snap.Metadata.Index}}
+ return nil
+}
+
+// CreateSnapshot makes a snapshot which can be retrieved with Snapshot() and
+// can be used to reconstruct the state at that point.
+// If any configuration changes have been made since the last compaction,
+// the result of the last ApplyConfChange must be passed in.
+func (ms *MemoryStorage) CreateSnapshot(i uint64, cs *pb.ConfState, data []byte) (pb.Snapshot, error) {
+ ms.Lock()
+ defer ms.Unlock()
+ if i <= ms.snapshot.Metadata.Index {
+ return pb.Snapshot{}, ErrSnapOutOfDate
+ }
+
+ offset := ms.ents[0].Index
+ if i > ms.lastIndex() {
+ raftLogger.Panicf("snapshot %d is out of bound lastindex(%d)", i, ms.lastIndex())
+ }
+
+ ms.snapshot.Metadata.Index = i
+ ms.snapshot.Metadata.Term = ms.ents[i-offset].Term
+ if cs != nil {
+ ms.snapshot.Metadata.ConfState = *cs
+ }
+ ms.snapshot.Data = data
+ return ms.snapshot, nil
+}
+
+// Compact discards all log entries prior to compactIndex.
+// It is the application's responsibility to not attempt to compact an index
+// greater than raftLog.applied.
+func (ms *MemoryStorage) Compact(compactIndex uint64) error {
+ ms.Lock()
+ defer ms.Unlock()
+ offset := ms.ents[0].Index
+ if compactIndex <= offset {
+ return ErrCompacted
+ }
+ if compactIndex > ms.lastIndex() {
+ raftLogger.Panicf("compact %d is out of bound lastindex(%d)", compactIndex, ms.lastIndex())
+ }
+
+ i := compactIndex - offset
+ ents := make([]pb.Entry, 1, 1+uint64(len(ms.ents))-i)
+ ents[0].Index = ms.ents[i].Index
+ ents[0].Term = ms.ents[i].Term
+ ents = append(ents, ms.ents[i+1:]...)
+ ms.ents = ents
+ return nil
+}
+
+// Append the new entries to storage.
+// TODO (xiangli): ensure the entries are continuous and
+// entries[0].Index > ms.entries[0].Index
+func (ms *MemoryStorage) Append(entries []pb.Entry) error {
+ if len(entries) == 0 {
+ return nil
+ }
+
+ ms.Lock()
+ defer ms.Unlock()
+
+ first := ms.firstIndex()
+ last := entries[0].Index + uint64(len(entries)) - 1
+
+ // shortcut if there is no new entry.
+ if last < first {
+ return nil
+ }
+ // truncate compacted entries
+ if first > entries[0].Index {
+ entries = entries[first-entries[0].Index:]
+ }
+
+ offset := entries[0].Index - ms.ents[0].Index
+ switch {
+ case uint64(len(ms.ents)) > offset:
+ ms.ents = append([]pb.Entry{}, ms.ents[:offset]...)
+ ms.ents = append(ms.ents, entries...)
+ case uint64(len(ms.ents)) == offset:
+ ms.ents = append(ms.ents, entries...)
+ default:
+ raftLogger.Panicf("missing log entry [last: %d, append at: %d]",
+ ms.lastIndex(), entries[0].Index)
+ }
+ return nil
+}
diff --git a/vendor/go.etcd.io/etcd/raft/util.go b/vendor/go.etcd.io/etcd/raft/util.go
new file mode 100644
index 0000000..c145d26
--- /dev/null
+++ b/vendor/go.etcd.io/etcd/raft/util.go
@@ -0,0 +1,142 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package raft
+
+import (
+ "bytes"
+ "fmt"
+
+ pb "go.etcd.io/etcd/raft/raftpb"
+)
+
+func (st StateType) MarshalJSON() ([]byte, error) {
+ return []byte(fmt.Sprintf("%q", st.String())), nil
+}
+
+// uint64Slice implements sort interface
+type uint64Slice []uint64
+
+func (p uint64Slice) Len() int { return len(p) }
+func (p uint64Slice) Less(i, j int) bool { return p[i] < p[j] }
+func (p uint64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
+
+func min(a, b uint64) uint64 {
+ if a > b {
+ return b
+ }
+ return a
+}
+
+func max(a, b uint64) uint64 {
+ if a > b {
+ return a
+ }
+ return b
+}
+
+func IsLocalMsg(msgt pb.MessageType) bool {
+ return msgt == pb.MsgHup || msgt == pb.MsgBeat || msgt == pb.MsgUnreachable ||
+ msgt == pb.MsgSnapStatus || msgt == pb.MsgCheckQuorum
+}
+
+func IsResponseMsg(msgt pb.MessageType) bool {
+ return msgt == pb.MsgAppResp || msgt == pb.MsgVoteResp || msgt == pb.MsgHeartbeatResp || msgt == pb.MsgUnreachable || msgt == pb.MsgPreVoteResp
+}
+
+// voteResponseType maps vote and prevote message types to their corresponding responses.
+func voteRespMsgType(msgt pb.MessageType) pb.MessageType {
+ switch msgt {
+ case pb.MsgVote:
+ return pb.MsgVoteResp
+ case pb.MsgPreVote:
+ return pb.MsgPreVoteResp
+ default:
+ panic(fmt.Sprintf("not a vote message: %s", msgt))
+ }
+}
+
+// EntryFormatter can be implemented by the application to provide human-readable formatting
+// of entry data. Nil is a valid EntryFormatter and will use a default format.
+type EntryFormatter func([]byte) string
+
+// DescribeMessage returns a concise human-readable description of a
+// Message for debugging.
+func DescribeMessage(m pb.Message, f EntryFormatter) string {
+ var buf bytes.Buffer
+ fmt.Fprintf(&buf, "%x->%x %v Term:%d Log:%d/%d", m.From, m.To, m.Type, m.Term, m.LogTerm, m.Index)
+ if m.Reject {
+ fmt.Fprintf(&buf, " Rejected (Hint: %d)", m.RejectHint)
+ }
+ if m.Commit != 0 {
+ fmt.Fprintf(&buf, " Commit:%d", m.Commit)
+ }
+ if len(m.Entries) > 0 {
+ fmt.Fprintf(&buf, " Entries:[")
+ for i, e := range m.Entries {
+ if i != 0 {
+ buf.WriteString(", ")
+ }
+ buf.WriteString(DescribeEntry(e, f))
+ }
+ fmt.Fprintf(&buf, "]")
+ }
+ if !IsEmptySnap(m.Snapshot) {
+ fmt.Fprintf(&buf, " Snapshot:%v", m.Snapshot)
+ }
+ return buf.String()
+}
+
+// PayloadSize is the size of the payload of this Entry. Notably, it does not
+// depend on its Index or Term.
+func PayloadSize(e pb.Entry) int {
+ return len(e.Data)
+}
+
+// DescribeEntry returns a concise human-readable description of an
+// Entry for debugging.
+func DescribeEntry(e pb.Entry, f EntryFormatter) string {
+ var formatted string
+ if e.Type == pb.EntryNormal && f != nil {
+ formatted = f(e.Data)
+ } else {
+ formatted = fmt.Sprintf("%q", e.Data)
+ }
+ return fmt.Sprintf("%d/%d %s %s", e.Term, e.Index, e.Type, formatted)
+}
+
+// DescribeEntries calls DescribeEntry for each Entry, adding a newline to
+// each.
+func DescribeEntries(ents []pb.Entry, f EntryFormatter) string {
+ var buf bytes.Buffer
+ for _, e := range ents {
+ _, _ = buf.WriteString(DescribeEntry(e, f) + "\n")
+ }
+ return buf.String()
+}
+
+func limitSize(ents []pb.Entry, maxSize uint64) []pb.Entry {
+ if len(ents) == 0 {
+ return ents
+ }
+ size := ents[0].Size()
+ var limit int
+ for limit = 1; limit < len(ents); limit++ {
+ size += ents[limit].Size()
+ if uint64(size) > maxSize {
+ break
+ }
+ }
+ return ents[:limit]
+}