blob: d7a272d1435c7bbabf67954e9c95a967fc809ff0 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package raft
16
17import (
18 "errors"
19
20 pb "go.etcd.io/etcd/raft/raftpb"
21)
22
23// ErrStepLocalMsg is returned when try to step a local raft message
24var ErrStepLocalMsg = errors.New("raft: cannot step raft local message")
25
26// ErrStepPeerNotFound is returned when try to step a response message
27// but there is no peer found in raft.prs for that node.
28var ErrStepPeerNotFound = errors.New("raft: cannot step as peer not found")
29
30// RawNode is a thread-unsafe Node.
31// The methods of this struct correspond to the methods of Node and are described
32// more fully there.
33type RawNode struct {
34 raft *raft
35 prevSoftSt *SoftState
36 prevHardSt pb.HardState
37}
38
39func (rn *RawNode) newReady() Ready {
40 return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt)
41}
42
43func (rn *RawNode) commitReady(rd Ready) {
44 if rd.SoftState != nil {
45 rn.prevSoftSt = rd.SoftState
46 }
47 if !IsEmptyHardState(rd.HardState) {
48 rn.prevHardSt = rd.HardState
49 }
50
51 // If entries were applied (or a snapshot), update our cursor for
52 // the next Ready. Note that if the current HardState contains a
53 // new Commit index, this does not mean that we're also applying
54 // all of the new entries due to commit pagination by size.
55 if index := rd.appliedCursor(); index > 0 {
56 rn.raft.raftLog.appliedTo(index)
57 }
58
59 if len(rd.Entries) > 0 {
60 e := rd.Entries[len(rd.Entries)-1]
61 rn.raft.raftLog.stableTo(e.Index, e.Term)
62 }
63 if !IsEmptySnap(rd.Snapshot) {
64 rn.raft.raftLog.stableSnapTo(rd.Snapshot.Metadata.Index)
65 }
66 if len(rd.ReadStates) != 0 {
67 rn.raft.readStates = nil
68 }
69}
70
71// NewRawNode returns a new RawNode given configuration and a list of raft peers.
72func NewRawNode(config *Config, peers []Peer) (*RawNode, error) {
73 if config.ID == 0 {
74 panic("config.ID must not be zero")
75 }
76 r := newRaft(config)
77 rn := &RawNode{
78 raft: r,
79 }
80 lastIndex, err := config.Storage.LastIndex()
81 if err != nil {
82 panic(err) // TODO(bdarnell)
83 }
84 // If the log is empty, this is a new RawNode (like StartNode); otherwise it's
85 // restoring an existing RawNode (like RestartNode).
86 // TODO(bdarnell): rethink RawNode initialization and whether the application needs
87 // to be able to tell us when it expects the RawNode to exist.
88 if lastIndex == 0 {
89 r.becomeFollower(1, None)
90 ents := make([]pb.Entry, len(peers))
91 for i, peer := range peers {
92 cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer.ID, Context: peer.Context}
93 data, err := cc.Marshal()
94 if err != nil {
95 panic("unexpected marshal error")
96 }
97
98 ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: uint64(i + 1), Data: data}
99 }
100 r.raftLog.append(ents...)
101 r.raftLog.committed = uint64(len(ents))
102 for _, peer := range peers {
103 r.addNode(peer.ID)
104 }
105 }
106
107 // Set the initial hard and soft states after performing all initialization.
108 rn.prevSoftSt = r.softState()
109 if lastIndex == 0 {
110 rn.prevHardSt = emptyState
111 } else {
112 rn.prevHardSt = r.hardState()
113 }
114
115 return rn, nil
116}
117
118// Tick advances the internal logical clock by a single tick.
119func (rn *RawNode) Tick() {
120 rn.raft.tick()
121}
122
123// TickQuiesced advances the internal logical clock by a single tick without
124// performing any other state machine processing. It allows the caller to avoid
125// periodic heartbeats and elections when all of the peers in a Raft group are
126// known to be at the same state. Expected usage is to periodically invoke Tick
127// or TickQuiesced depending on whether the group is "active" or "quiesced".
128//
129// WARNING: Be very careful about using this method as it subverts the Raft
130// state machine. You should probably be using Tick instead.
131func (rn *RawNode) TickQuiesced() {
132 rn.raft.electionElapsed++
133}
134
135// Campaign causes this RawNode to transition to candidate state.
136func (rn *RawNode) Campaign() error {
137 return rn.raft.Step(pb.Message{
138 Type: pb.MsgHup,
139 })
140}
141
142// Propose proposes data be appended to the raft log.
143func (rn *RawNode) Propose(data []byte) error {
144 return rn.raft.Step(pb.Message{
145 Type: pb.MsgProp,
146 From: rn.raft.id,
147 Entries: []pb.Entry{
148 {Data: data},
149 }})
150}
151
152// ProposeConfChange proposes a config change.
153func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error {
154 data, err := cc.Marshal()
155 if err != nil {
156 return err
157 }
158 return rn.raft.Step(pb.Message{
159 Type: pb.MsgProp,
160 Entries: []pb.Entry{
161 {Type: pb.EntryConfChange, Data: data},
162 },
163 })
164}
165
166// ApplyConfChange applies a config change to the local node.
167func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
168 if cc.NodeID == None {
169 return &pb.ConfState{Nodes: rn.raft.nodes(), Learners: rn.raft.learnerNodes()}
170 }
171 switch cc.Type {
172 case pb.ConfChangeAddNode:
173 rn.raft.addNode(cc.NodeID)
174 case pb.ConfChangeAddLearnerNode:
175 rn.raft.addLearner(cc.NodeID)
176 case pb.ConfChangeRemoveNode:
177 rn.raft.removeNode(cc.NodeID)
178 case pb.ConfChangeUpdateNode:
179 default:
180 panic("unexpected conf type")
181 }
182 return &pb.ConfState{Nodes: rn.raft.nodes(), Learners: rn.raft.learnerNodes()}
183}
184
185// Step advances the state machine using the given message.
186func (rn *RawNode) Step(m pb.Message) error {
187 // ignore unexpected local messages receiving over network
188 if IsLocalMsg(m.Type) {
189 return ErrStepLocalMsg
190 }
191 if pr := rn.raft.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
192 return rn.raft.Step(m)
193 }
194 return ErrStepPeerNotFound
195}
196
197// Ready returns the current point-in-time state of this RawNode.
198func (rn *RawNode) Ready() Ready {
199 rd := rn.newReady()
200 rn.raft.msgs = nil
201 rn.raft.reduceUncommittedSize(rd.CommittedEntries)
202 return rd
203}
204
205// HasReady called when RawNode user need to check if any Ready pending.
206// Checking logic in this method should be consistent with Ready.containsUpdates().
207func (rn *RawNode) HasReady() bool {
208 r := rn.raft
209 if !r.softState().equal(rn.prevSoftSt) {
210 return true
211 }
212 if hardSt := r.hardState(); !IsEmptyHardState(hardSt) && !isHardStateEqual(hardSt, rn.prevHardSt) {
213 return true
214 }
215 if r.raftLog.unstable.snapshot != nil && !IsEmptySnap(*r.raftLog.unstable.snapshot) {
216 return true
217 }
218 if len(r.msgs) > 0 || len(r.raftLog.unstableEntries()) > 0 || r.raftLog.hasNextEnts() {
219 return true
220 }
221 if len(r.readStates) != 0 {
222 return true
223 }
224 return false
225}
226
227// Advance notifies the RawNode that the application has applied and saved progress in the
228// last Ready results.
229func (rn *RawNode) Advance(rd Ready) {
230 rn.commitReady(rd)
231}
232
233// Status returns the current status of the given group.
234func (rn *RawNode) Status() *Status {
235 status := getStatus(rn.raft)
236 return &status
237}
238
239// StatusWithoutProgress returns a Status without populating the Progress field
240// (and returns the Status as a value to avoid forcing it onto the heap). This
241// is more performant if the Progress is not required. See WithProgress for an
242// allocation-free way to introspect the Progress.
243func (rn *RawNode) StatusWithoutProgress() Status {
244 return getStatusWithoutProgress(rn.raft)
245}
246
247// ProgressType indicates the type of replica a Progress corresponds to.
248type ProgressType byte
249
250const (
251 // ProgressTypePeer accompanies a Progress for a regular peer replica.
252 ProgressTypePeer ProgressType = iota
253 // ProgressTypeLearner accompanies a Progress for a learner replica.
254 ProgressTypeLearner
255)
256
257// WithProgress is a helper to introspect the Progress for this node and its
258// peers.
259func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr Progress)) {
260 for id, pr := range rn.raft.prs {
261 pr := *pr
262 pr.ins = nil
263 visitor(id, ProgressTypePeer, pr)
264 }
265 for id, pr := range rn.raft.learnerPrs {
266 pr := *pr
267 pr.ins = nil
268 visitor(id, ProgressTypeLearner, pr)
269 }
270}
271
272// ReportUnreachable reports the given node is not reachable for the last send.
273func (rn *RawNode) ReportUnreachable(id uint64) {
274 _ = rn.raft.Step(pb.Message{Type: pb.MsgUnreachable, From: id})
275}
276
277// ReportSnapshot reports the status of the sent snapshot.
278func (rn *RawNode) ReportSnapshot(id uint64, status SnapshotStatus) {
279 rej := status == SnapshotFailure
280
281 _ = rn.raft.Step(pb.Message{Type: pb.MsgSnapStatus, From: id, Reject: rej})
282}
283
284// TransferLeader tries to transfer leadership to the given transferee.
285func (rn *RawNode) TransferLeader(transferee uint64) {
286 _ = rn.raft.Step(pb.Message{Type: pb.MsgTransferLeader, From: transferee})
287}
288
289// ReadIndex requests a read state. The read state will be set in ready.
290// Read State has a read index. Once the application advances further than the read
291// index, any linearizable read requests issued before the read request can be
292// processed safely. The read state will have the same rctx attached.
293func (rn *RawNode) ReadIndex(rctx []byte) {
294 _ = rn.raft.Step(pb.Message{Type: pb.MsgReadIndex, Entries: []pb.Entry{{Data: rctx}}})
295}