blob: 01ba19256864019083579ef7ec35db666896ac36 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package etcdserver
16
17import (
18 "fmt"
19 "os"
20 "time"
21
22 "go.etcd.io/etcd/etcdserver/api/snap"
23 "go.etcd.io/etcd/lease"
24 "go.etcd.io/etcd/mvcc"
25 "go.etcd.io/etcd/mvcc/backend"
26 "go.etcd.io/etcd/raft/raftpb"
27
28 "go.uber.org/zap"
29)
30
31func newBackend(cfg ServerConfig) backend.Backend {
32 bcfg := backend.DefaultBackendConfig()
33 bcfg.Path = cfg.backendPath()
34 if cfg.BackendBatchLimit != 0 {
35 bcfg.BatchLimit = cfg.BackendBatchLimit
36 if cfg.Logger != nil {
37 cfg.Logger.Info("setting backend batch limit", zap.Int("batch limit", cfg.BackendBatchLimit))
38 }
39 }
40 if cfg.BackendBatchInterval != 0 {
41 bcfg.BatchInterval = cfg.BackendBatchInterval
42 if cfg.Logger != nil {
43 cfg.Logger.Info("setting backend batch interval", zap.Duration("batch interval", cfg.BackendBatchInterval))
44 }
45 }
46 bcfg.BackendFreelistType = cfg.BackendFreelistType
47 bcfg.Logger = cfg.Logger
48 if cfg.QuotaBackendBytes > 0 && cfg.QuotaBackendBytes != DefaultQuotaBytes {
49 // permit 10% excess over quota for disarm
50 bcfg.MmapSize = uint64(cfg.QuotaBackendBytes + cfg.QuotaBackendBytes/10)
51 }
52 return backend.New(bcfg)
53}
54
55// openSnapshotBackend renames a snapshot db to the current etcd db and opens it.
56func openSnapshotBackend(cfg ServerConfig, ss *snap.Snapshotter, snapshot raftpb.Snapshot) (backend.Backend, error) {
57 snapPath, err := ss.DBFilePath(snapshot.Metadata.Index)
58 if err != nil {
59 return nil, fmt.Errorf("failed to find database snapshot file (%v)", err)
60 }
61 if err := os.Rename(snapPath, cfg.backendPath()); err != nil {
62 return nil, fmt.Errorf("failed to rename database snapshot file (%v)", err)
63 }
64 return openBackend(cfg), nil
65}
66
67// openBackend returns a backend using the current etcd db.
68func openBackend(cfg ServerConfig) backend.Backend {
69 fn := cfg.backendPath()
70
71 now, beOpened := time.Now(), make(chan backend.Backend)
72 go func() {
73 beOpened <- newBackend(cfg)
74 }()
75
76 select {
77 case be := <-beOpened:
78 if cfg.Logger != nil {
79 cfg.Logger.Info("opened backend db", zap.String("path", fn), zap.Duration("took", time.Since(now)))
80 }
81 return be
82
83 case <-time.After(10 * time.Second):
84 if cfg.Logger != nil {
85 cfg.Logger.Info(
86 "db file is flocked by another process, or taking too long",
87 zap.String("path", fn),
88 zap.Duration("took", time.Since(now)),
89 )
90 } else {
91 plog.Warningf("another etcd process is using %q and holds the file lock, or loading backend file is taking >10 seconds", fn)
92 plog.Warningf("waiting for it to exit before starting...")
93 }
94 }
95
96 return <-beOpened
97}
98
99// recoverBackendSnapshot recovers the DB from a snapshot in case etcd crashes
100// before updating the backend db after persisting raft snapshot to disk,
101// violating the invariant snapshot.Metadata.Index < db.consistentIndex. In this
102// case, replace the db with the snapshot db sent by the leader.
103func recoverSnapshotBackend(cfg ServerConfig, oldbe backend.Backend, snapshot raftpb.Snapshot) (backend.Backend, error) {
104 var cIndex consistentIndex
105 kv := mvcc.New(cfg.Logger, oldbe, &lease.FakeLessor{}, &cIndex, mvcc.StoreConfig{CompactionBatchLimit: cfg.CompactionBatchLimit})
106 defer kv.Close()
107 if snapshot.Metadata.Index <= kv.ConsistentIndex() {
108 return oldbe, nil
109 }
110 oldbe.Close()
111 return openSnapshotBackend(cfg, snap.New(cfg.Logger, cfg.SnapDir()), snapshot)
112}