blob: 928aa95b6b1623f1701dece4bdf7b0abc8f6991e [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2015 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18 "io"
19
20 "github.com/coreos/etcd/mvcc/backend"
21 "github.com/coreos/etcd/raft/raftpb"
22 "github.com/coreos/etcd/snap"
23)
24
25// createMergedSnapshotMessage creates a snapshot message that contains: raft status (term, conf),
26// a snapshot of v2 store inside raft.Snapshot as []byte, a snapshot of v3 KV in the top level message
27// as ReadCloser.
28func (s *EtcdServer) createMergedSnapshotMessage(m raftpb.Message, snapt, snapi uint64, confState raftpb.ConfState) snap.Message {
29 // get a snapshot of v2 store as []byte
30 clone := s.store.Clone()
31 d, err := clone.SaveNoCopy()
32 if err != nil {
33 plog.Panicf("store save should never fail: %v", err)
34 }
35
36 // commit kv to write metadata(for example: consistent index).
37 s.KV().Commit()
38 dbsnap := s.be.Snapshot()
39 // get a snapshot of v3 KV as readCloser
40 rc := newSnapshotReaderCloser(dbsnap)
41
42 // put the []byte snapshot of store into raft snapshot and return the merged snapshot with
43 // KV readCloser snapshot.
44 snapshot := raftpb.Snapshot{
45 Metadata: raftpb.SnapshotMetadata{
46 Index: snapi,
47 Term: snapt,
48 ConfState: confState,
49 },
50 Data: d,
51 }
52 m.Snapshot = snapshot
53
54 return *snap.NewMessage(m, rc, dbsnap.Size())
55}
56
57func newSnapshotReaderCloser(snapshot backend.Snapshot) io.ReadCloser {
58 pr, pw := io.Pipe()
59 go func() {
60 n, err := snapshot.WriteTo(pw)
61 if err == nil {
62 plog.Infof("wrote database snapshot out [total bytes: %d]", n)
63 } else {
64 plog.Warningf("failed to write database snapshot out [written bytes: %d]: %v", n, err)
65 }
66 pw.CloseWithError(err)
67 err = snapshot.Close()
68 if err != nil {
69 plog.Panicf("failed to close database snapshot: %v", err)
70 }
71 }()
72 return pr
73}