| // Copyright 2015 The etcd Authors |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package rafthttp |
| |
| import ( |
| "context" |
| "net/http" |
| "sync" |
| "time" |
| |
| "github.com/coreos/etcd/etcdserver/stats" |
| "github.com/coreos/etcd/pkg/logutil" |
| "github.com/coreos/etcd/pkg/transport" |
| "github.com/coreos/etcd/pkg/types" |
| "github.com/coreos/etcd/raft" |
| "github.com/coreos/etcd/raft/raftpb" |
| "github.com/coreos/etcd/snap" |
| |
| "github.com/coreos/pkg/capnslog" |
| "github.com/xiang90/probing" |
| "golang.org/x/time/rate" |
| ) |
| |
| var plog = logutil.NewMergeLogger(capnslog.NewPackageLogger("github.com/coreos/etcd", "rafthttp")) |
| |
| type Raft interface { |
| Process(ctx context.Context, m raftpb.Message) error |
| IsIDRemoved(id uint64) bool |
| ReportUnreachable(id uint64) |
| ReportSnapshot(id uint64, status raft.SnapshotStatus) |
| } |
| |
| type Transporter interface { |
| // Start starts the given Transporter. |
| // Start MUST be called before calling other functions in the interface. |
| Start() error |
| // Handler returns the HTTP handler of the transporter. |
| // A transporter HTTP handler handles the HTTP requests |
| // from remote peers. |
| // The handler MUST be used to handle RaftPrefix(/raft) |
| // endpoint. |
| Handler() http.Handler |
| // Send sends out the given messages to the remote peers. |
| // Each message has a To field, which is an id that maps |
| // to an existing peer in the transport. |
| // If the id cannot be found in the transport, the message |
| // will be ignored. |
| Send(m []raftpb.Message) |
| // SendSnapshot sends out the given snapshot message to a remote peer. |
| // The behavior of SendSnapshot is similar to Send. |
| SendSnapshot(m snap.Message) |
| // AddRemote adds a remote with given peer urls into the transport. |
| // A remote helps newly joined member to catch up the progress of cluster, |
| // and will not be used after that. |
| // It is the caller's responsibility to ensure the urls are all valid, |
| // or it panics. |
| AddRemote(id types.ID, urls []string) |
| // AddPeer adds a peer with given peer urls into the transport. |
| // It is the caller's responsibility to ensure the urls are all valid, |
| // or it panics. |
| // Peer urls are used to connect to the remote peer. |
| AddPeer(id types.ID, urls []string) |
| // RemovePeer removes the peer with given id. |
| RemovePeer(id types.ID) |
| // RemoveAllPeers removes all the existing peers in the transport. |
| RemoveAllPeers() |
| // UpdatePeer updates the peer urls of the peer with the given id. |
| // It is the caller's responsibility to ensure the urls are all valid, |
| // or it panics. |
| UpdatePeer(id types.ID, urls []string) |
| // ActiveSince returns the time that the connection with the peer |
| // of the given id becomes active. |
| // If the connection is active since peer was added, it returns the adding time. |
| // If the connection is currently inactive, it returns zero time. |
| ActiveSince(id types.ID) time.Time |
| // ActivePeers returns the number of active peers. |
| ActivePeers() int |
| // Stop closes the connections and stops the transporter. |
| Stop() |
| } |
| |
| // Transport implements Transporter interface. It provides the functionality |
| // to send raft messages to peers, and receive raft messages from peers. |
| // User should call Handler method to get a handler to serve requests |
| // received from peerURLs. |
| // User needs to call Start before calling other functions, and call |
| // Stop when the Transport is no longer used. |
| type Transport struct { |
| DialTimeout time.Duration // maximum duration before timing out dial of the request |
| // DialRetryFrequency defines the frequency of streamReader dial retrial attempts; |
| // a distinct rate limiter is created per every peer (default value: 10 events/sec) |
| DialRetryFrequency rate.Limit |
| |
| TLSInfo transport.TLSInfo // TLS information used when creating connection |
| |
| ID types.ID // local member ID |
| URLs types.URLs // local peer URLs |
| ClusterID types.ID // raft cluster ID for request validation |
| Raft Raft // raft state machine, to which the Transport forwards received messages and reports status |
| Snapshotter *snap.Snapshotter |
| ServerStats *stats.ServerStats // used to record general transportation statistics |
| // used to record transportation statistics with followers when |
| // performing as leader in raft protocol |
| LeaderStats *stats.LeaderStats |
| // ErrorC is used to report detected critical errors, e.g., |
| // the member has been permanently removed from the cluster |
| // When an error is received from ErrorC, user should stop raft state |
| // machine and thus stop the Transport. |
| ErrorC chan error |
| |
| streamRt http.RoundTripper // roundTripper used by streams |
| pipelineRt http.RoundTripper // roundTripper used by pipelines |
| |
| mu sync.RWMutex // protect the remote and peer map |
| remotes map[types.ID]*remote // remotes map that helps newly joined member to catch up |
| peers map[types.ID]Peer // peers map |
| |
| pipelineProber probing.Prober |
| streamProber probing.Prober |
| } |
| |
| func (t *Transport) Start() error { |
| var err error |
| t.streamRt, err = newStreamRoundTripper(t.TLSInfo, t.DialTimeout) |
| if err != nil { |
| return err |
| } |
| t.pipelineRt, err = NewRoundTripper(t.TLSInfo, t.DialTimeout) |
| if err != nil { |
| return err |
| } |
| t.remotes = make(map[types.ID]*remote) |
| t.peers = make(map[types.ID]Peer) |
| t.pipelineProber = probing.NewProber(t.pipelineRt) |
| t.streamProber = probing.NewProber(t.streamRt) |
| |
| // If client didn't provide dial retry frequency, use the default |
| // (100ms backoff between attempts to create a new stream), |
| // so it doesn't bring too much overhead when retry. |
| if t.DialRetryFrequency == 0 { |
| t.DialRetryFrequency = rate.Every(100 * time.Millisecond) |
| } |
| return nil |
| } |
| |
| func (t *Transport) Handler() http.Handler { |
| pipelineHandler := newPipelineHandler(t, t.Raft, t.ClusterID) |
| streamHandler := newStreamHandler(t, t, t.Raft, t.ID, t.ClusterID) |
| snapHandler := newSnapshotHandler(t, t.Raft, t.Snapshotter, t.ClusterID) |
| mux := http.NewServeMux() |
| mux.Handle(RaftPrefix, pipelineHandler) |
| mux.Handle(RaftStreamPrefix+"/", streamHandler) |
| mux.Handle(RaftSnapshotPrefix, snapHandler) |
| mux.Handle(ProbingPrefix, probing.NewHandler()) |
| return mux |
| } |
| |
| func (t *Transport) Get(id types.ID) Peer { |
| t.mu.RLock() |
| defer t.mu.RUnlock() |
| return t.peers[id] |
| } |
| |
| func (t *Transport) Send(msgs []raftpb.Message) { |
| for _, m := range msgs { |
| if m.To == 0 { |
| // ignore intentionally dropped message |
| continue |
| } |
| to := types.ID(m.To) |
| |
| t.mu.RLock() |
| p, pok := t.peers[to] |
| g, rok := t.remotes[to] |
| t.mu.RUnlock() |
| |
| if pok { |
| if m.Type == raftpb.MsgApp { |
| t.ServerStats.SendAppendReq(m.Size()) |
| } |
| p.send(m) |
| continue |
| } |
| |
| if rok { |
| g.send(m) |
| continue |
| } |
| |
| plog.Debugf("ignored message %s (sent to unknown peer %s)", m.Type, to) |
| } |
| } |
| |
| func (t *Transport) Stop() { |
| t.mu.Lock() |
| defer t.mu.Unlock() |
| for _, r := range t.remotes { |
| r.stop() |
| } |
| for _, p := range t.peers { |
| p.stop() |
| } |
| t.pipelineProber.RemoveAll() |
| t.streamProber.RemoveAll() |
| if tr, ok := t.streamRt.(*http.Transport); ok { |
| tr.CloseIdleConnections() |
| } |
| if tr, ok := t.pipelineRt.(*http.Transport); ok { |
| tr.CloseIdleConnections() |
| } |
| t.peers = nil |
| t.remotes = nil |
| } |
| |
| // CutPeer drops messages to the specified peer. |
| func (t *Transport) CutPeer(id types.ID) { |
| t.mu.RLock() |
| p, pok := t.peers[id] |
| g, gok := t.remotes[id] |
| t.mu.RUnlock() |
| |
| if pok { |
| p.(Pausable).Pause() |
| } |
| if gok { |
| g.Pause() |
| } |
| } |
| |
| // MendPeer recovers the message dropping behavior of the given peer. |
| func (t *Transport) MendPeer(id types.ID) { |
| t.mu.RLock() |
| p, pok := t.peers[id] |
| g, gok := t.remotes[id] |
| t.mu.RUnlock() |
| |
| if pok { |
| p.(Pausable).Resume() |
| } |
| if gok { |
| g.Resume() |
| } |
| } |
| |
| func (t *Transport) AddRemote(id types.ID, us []string) { |
| t.mu.Lock() |
| defer t.mu.Unlock() |
| if t.remotes == nil { |
| // there's no clean way to shutdown the golang http server |
| // (see: https://github.com/golang/go/issues/4674) before |
| // stopping the transport; ignore any new connections. |
| return |
| } |
| if _, ok := t.peers[id]; ok { |
| return |
| } |
| if _, ok := t.remotes[id]; ok { |
| return |
| } |
| urls, err := types.NewURLs(us) |
| if err != nil { |
| plog.Panicf("newURLs %+v should never fail: %+v", us, err) |
| } |
| t.remotes[id] = startRemote(t, urls, id) |
| } |
| |
| func (t *Transport) AddPeer(id types.ID, us []string) { |
| t.mu.Lock() |
| defer t.mu.Unlock() |
| |
| if t.peers == nil { |
| panic("transport stopped") |
| } |
| if _, ok := t.peers[id]; ok { |
| return |
| } |
| urls, err := types.NewURLs(us) |
| if err != nil { |
| plog.Panicf("newURLs %+v should never fail: %+v", us, err) |
| } |
| fs := t.LeaderStats.Follower(id.String()) |
| t.peers[id] = startPeer(t, urls, id, fs) |
| addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts) |
| addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts) |
| plog.Infof("added peer %s", id) |
| } |
| |
| func (t *Transport) RemovePeer(id types.ID) { |
| t.mu.Lock() |
| defer t.mu.Unlock() |
| t.removePeer(id) |
| } |
| |
| func (t *Transport) RemoveAllPeers() { |
| t.mu.Lock() |
| defer t.mu.Unlock() |
| for id := range t.peers { |
| t.removePeer(id) |
| } |
| } |
| |
| // the caller of this function must have the peers mutex. |
| func (t *Transport) removePeer(id types.ID) { |
| if peer, ok := t.peers[id]; ok { |
| peer.stop() |
| } else { |
| plog.Panicf("unexpected removal of unknown peer '%d'", id) |
| } |
| delete(t.peers, id) |
| delete(t.LeaderStats.Followers, id.String()) |
| t.pipelineProber.Remove(id.String()) |
| t.streamProber.Remove(id.String()) |
| plog.Infof("removed peer %s", id) |
| } |
| |
| func (t *Transport) UpdatePeer(id types.ID, us []string) { |
| t.mu.Lock() |
| defer t.mu.Unlock() |
| // TODO: return error or just panic? |
| if _, ok := t.peers[id]; !ok { |
| return |
| } |
| urls, err := types.NewURLs(us) |
| if err != nil { |
| plog.Panicf("newURLs %+v should never fail: %+v", us, err) |
| } |
| t.peers[id].update(urls) |
| |
| t.pipelineProber.Remove(id.String()) |
| addPeerToProber(t.pipelineProber, id.String(), us, RoundTripperNameSnapshot, rtts) |
| t.streamProber.Remove(id.String()) |
| addPeerToProber(t.streamProber, id.String(), us, RoundTripperNameRaftMessage, rtts) |
| plog.Infof("updated peer %s", id) |
| } |
| |
| func (t *Transport) ActiveSince(id types.ID) time.Time { |
| t.mu.Lock() |
| defer t.mu.Unlock() |
| if p, ok := t.peers[id]; ok { |
| return p.activeSince() |
| } |
| return time.Time{} |
| } |
| |
| func (t *Transport) SendSnapshot(m snap.Message) { |
| t.mu.Lock() |
| defer t.mu.Unlock() |
| p := t.peers[types.ID(m.To)] |
| if p == nil { |
| m.CloseWithError(errMemberNotFound) |
| return |
| } |
| p.sendSnap(m) |
| } |
| |
| // Pausable is a testing interface for pausing transport traffic. |
| type Pausable interface { |
| Pause() |
| Resume() |
| } |
| |
| func (t *Transport) Pause() { |
| t.mu.RLock() |
| defer t.mu.RUnlock() |
| for _, p := range t.peers { |
| p.(Pausable).Pause() |
| } |
| } |
| |
| func (t *Transport) Resume() { |
| t.mu.RLock() |
| defer t.mu.RUnlock() |
| for _, p := range t.peers { |
| p.(Pausable).Resume() |
| } |
| } |
| |
| // ActivePeers returns a channel that closes when an initial |
| // peer connection has been established. Use this to wait until the |
| // first peer connection becomes active. |
| func (t *Transport) ActivePeers() (cnt int) { |
| t.mu.RLock() |
| defer t.mu.RUnlock() |
| for _, p := range t.peers { |
| if !p.activeSince().IsZero() { |
| cnt++ |
| } |
| } |
| return cnt |
| } |
| |
| type nopTransporter struct{} |
| |
| func NewNopTransporter() Transporter { |
| return &nopTransporter{} |
| } |
| |
| func (s *nopTransporter) Start() error { return nil } |
| func (s *nopTransporter) Handler() http.Handler { return nil } |
| func (s *nopTransporter) Send(m []raftpb.Message) {} |
| func (s *nopTransporter) SendSnapshot(m snap.Message) {} |
| func (s *nopTransporter) AddRemote(id types.ID, us []string) {} |
| func (s *nopTransporter) AddPeer(id types.ID, us []string) {} |
| func (s *nopTransporter) RemovePeer(id types.ID) {} |
| func (s *nopTransporter) RemoveAllPeers() {} |
| func (s *nopTransporter) UpdatePeer(id types.ID, us []string) {} |
| func (s *nopTransporter) ActiveSince(id types.ID) time.Time { return time.Time{} } |
| func (s *nopTransporter) ActivePeers() int { return 0 } |
| func (s *nopTransporter) Stop() {} |
| func (s *nopTransporter) Pause() {} |
| func (s *nopTransporter) Resume() {} |
| |
| type snapTransporter struct { |
| nopTransporter |
| snapDoneC chan snap.Message |
| snapDir string |
| } |
| |
| func NewSnapTransporter(snapDir string) (Transporter, <-chan snap.Message) { |
| ch := make(chan snap.Message, 1) |
| tr := &snapTransporter{snapDoneC: ch, snapDir: snapDir} |
| return tr, ch |
| } |
| |
| func (s *snapTransporter) SendSnapshot(m snap.Message) { |
| ss := snap.New(s.snapDir) |
| ss.SaveDBFrom(m.ReadCloser, m.Snapshot.Metadata.Index+1) |
| m.CloseWithError(nil) |
| s.snapDoneC <- m |
| } |