blob: ab64cb70619a5895ba7329d00a4033a99d463c87 [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package v3compactor
16
17import (
18 "context"
19 "sync"
20 "time"
21
22 pb "go.etcd.io/etcd/etcdserver/etcdserverpb"
23 "go.etcd.io/etcd/mvcc"
24
25 "github.com/jonboulle/clockwork"
26 "go.uber.org/zap"
27)
28
29// Periodic compacts the log by purging revisions older than
30// the configured retention time.
31type Periodic struct {
32 lg *zap.Logger
33 clock clockwork.Clock
34 period time.Duration
35
36 rg RevGetter
37 c Compactable
38
39 revs []int64
40 ctx context.Context
41 cancel context.CancelFunc
42
43 // mu protects paused
44 mu sync.RWMutex
45 paused bool
46}
47
48// newPeriodic creates a new instance of Periodic compactor that purges
49// the log older than h Duration.
50func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic {
51 pc := &Periodic{
52 lg: lg,
53 clock: clock,
54 period: h,
55 rg: rg,
56 c: c,
57 revs: make([]int64, 0),
58 }
59 pc.ctx, pc.cancel = context.WithCancel(context.Background())
60 return pc
61}
62
63/*
64Compaction period 1-hour:
65 1. compute compaction period, which is 1-hour
66 2. record revisions for every 1/10 of 1-hour (6-minute)
67 3. keep recording revisions with no compaction for first 1-hour
68 4. do compact with revs[0]
69 - success? contiue on for-loop and move sliding window; revs = revs[1:]
70 - failure? update revs, and retry after 1/10 of 1-hour (6-minute)
71
72Compaction period 24-hour:
73 1. compute compaction period, which is 1-hour
74 2. record revisions for every 1/10 of 1-hour (6-minute)
75 3. keep recording revisions with no compaction for first 24-hour
76 4. do compact with revs[0]
77 - success? contiue on for-loop and move sliding window; revs = revs[1:]
78 - failure? update revs, and retry after 1/10 of 1-hour (6-minute)
79
80Compaction period 59-min:
81 1. compute compaction period, which is 59-min
82 2. record revisions for every 1/10 of 59-min (5.9-min)
83 3. keep recording revisions with no compaction for first 59-min
84 4. do compact with revs[0]
85 - success? contiue on for-loop and move sliding window; revs = revs[1:]
86 - failure? update revs, and retry after 1/10 of 59-min (5.9-min)
87
88Compaction period 5-sec:
89 1. compute compaction period, which is 5-sec
90 2. record revisions for every 1/10 of 5-sec (0.5-sec)
91 3. keep recording revisions with no compaction for first 5-sec
92 4. do compact with revs[0]
93 - success? contiue on for-loop and move sliding window; revs = revs[1:]
94 - failure? update revs, and retry after 1/10 of 5-sec (0.5-sec)
95*/
96
97// Run runs periodic compactor.
98func (pc *Periodic) Run() {
99 compactInterval := pc.getCompactInterval()
100 retryInterval := pc.getRetryInterval()
101 retentions := pc.getRetentions()
102
103 go func() {
104 lastSuccess := pc.clock.Now()
105 baseInterval := pc.period
106 for {
107 pc.revs = append(pc.revs, pc.rg.Rev())
108 if len(pc.revs) > retentions {
109 pc.revs = pc.revs[1:] // pc.revs[0] is always the rev at pc.period ago
110 }
111
112 select {
113 case <-pc.ctx.Done():
114 return
115 case <-pc.clock.After(retryInterval):
116 pc.mu.Lock()
117 p := pc.paused
118 pc.mu.Unlock()
119 if p {
120 continue
121 }
122 }
123
124 if pc.clock.Now().Sub(lastSuccess) < baseInterval {
125 continue
126 }
127
128 // wait up to initial given period
129 if baseInterval == pc.period {
130 baseInterval = compactInterval
131 }
132 rev := pc.revs[0]
133
134 if pc.lg != nil {
135 pc.lg.Info(
136 "starting auto periodic compaction",
137 zap.Int64("revision", rev),
138 zap.Duration("compact-period", pc.period),
139 )
140 } else {
141 plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, pc.period)
142 }
143 _, err := pc.c.Compact(pc.ctx, &pb.CompactionRequest{Revision: rev})
144 if err == nil || err == mvcc.ErrCompacted {
145 if pc.lg != nil {
146 pc.lg.Info(
147 "completed auto periodic compaction",
148 zap.Int64("revision", rev),
149 zap.Duration("compact-period", pc.period),
150 zap.Duration("took", time.Since(lastSuccess)),
151 )
152 } else {
153 plog.Noticef("Finished auto-compaction at revision %d", rev)
154 }
155 lastSuccess = pc.clock.Now()
156 } else {
157 if pc.lg != nil {
158 pc.lg.Warn(
159 "failed auto periodic compaction",
160 zap.Int64("revision", rev),
161 zap.Duration("compact-period", pc.period),
162 zap.Duration("retry-interval", retryInterval),
163 zap.Error(err),
164 )
165 } else {
166 plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
167 plog.Noticef("Retry after %v", retryInterval)
168 }
169 }
170 }
171 }()
172}
173
174// if given compaction period x is <1-hour, compact every x duration.
175// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute)
176// if given compaction period x is >1-hour, compact every hour.
177// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour)
178func (pc *Periodic) getCompactInterval() time.Duration {
179 itv := pc.period
180 if itv > time.Hour {
181 itv = time.Hour
182 }
183 return itv
184}
185
186func (pc *Periodic) getRetentions() int {
187 return int(pc.period/pc.getRetryInterval()) + 1
188}
189
190const retryDivisor = 10
191
192func (pc *Periodic) getRetryInterval() time.Duration {
193 itv := pc.period
194 if itv > time.Hour {
195 itv = time.Hour
196 }
197 return itv / retryDivisor
198}
199
200// Stop stops periodic compactor.
201func (pc *Periodic) Stop() {
202 pc.cancel()
203}
204
205// Pause pauses periodic compactor.
206func (pc *Periodic) Pause() {
207 pc.mu.Lock()
208 pc.paused = true
209 pc.mu.Unlock()
210}
211
212// Resume resumes periodic compactor.
213func (pc *Periodic) Resume() {
214 pc.mu.Lock()
215 pc.paused = false
216 pc.mu.Unlock()
217}