blob: 9d9164e9c5c6d44ca99e5238747da025a184ac50 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001// Copyright 2017 The etcd Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package compactor
16
17import (
18 "context"
19 "sync"
20 "time"
21
22 pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
23 "github.com/coreos/etcd/mvcc"
24
25 "github.com/jonboulle/clockwork"
26)
27
28// Periodic compacts the log by purging revisions older than
29// the configured retention time.
30type Periodic struct {
31 clock clockwork.Clock
32 period time.Duration
33
34 rg RevGetter
35 c Compactable
36
37 revs []int64
38 ctx context.Context
39 cancel context.CancelFunc
40
41 // mu protects paused
42 mu sync.RWMutex
43 paused bool
44}
45
46// NewPeriodic creates a new instance of Periodic compactor that purges
47// the log older than h Duration.
48func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic {
49 return newPeriodic(clockwork.NewRealClock(), h, rg, c)
50}
51
52func newPeriodic(clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic {
53 t := &Periodic{
54 clock: clock,
55 period: h,
56 rg: rg,
57 c: c,
58 revs: make([]int64, 0),
59 }
60 t.ctx, t.cancel = context.WithCancel(context.Background())
61 return t
62}
63
64/*
65Compaction period 1-hour:
66 1. compute compaction period, which is 1-hour
67 2. record revisions for every 1/10 of 1-hour (6-minute)
68 3. keep recording revisions with no compaction for first 1-hour
69 4. do compact with revs[0]
70 - success? contiue on for-loop and move sliding window; revs = revs[1:]
71 - failure? update revs, and retry after 1/10 of 1-hour (6-minute)
72
73Compaction period 24-hour:
74 1. compute compaction period, which is 1-hour
75 2. record revisions for every 1/10 of 1-hour (6-minute)
76 3. keep recording revisions with no compaction for first 24-hour
77 4. do compact with revs[0]
78 - success? contiue on for-loop and move sliding window; revs = revs[1:]
79 - failure? update revs, and retry after 1/10 of 1-hour (6-minute)
80
81Compaction period 59-min:
82 1. compute compaction period, which is 59-min
83 2. record revisions for every 1/10 of 59-min (5.9-min)
84 3. keep recording revisions with no compaction for first 59-min
85 4. do compact with revs[0]
86 - success? contiue on for-loop and move sliding window; revs = revs[1:]
87 - failure? update revs, and retry after 1/10 of 59-min (5.9-min)
88
89Compaction period 5-sec:
90 1. compute compaction period, which is 5-sec
91 2. record revisions for every 1/10 of 5-sec (0.5-sec)
92 3. keep recording revisions with no compaction for first 5-sec
93 4. do compact with revs[0]
94 - success? contiue on for-loop and move sliding window; revs = revs[1:]
95 - failure? update revs, and retry after 1/10 of 5-sec (0.5-sec)
96*/
97
98// Run runs periodic compactor.
99func (t *Periodic) Run() {
100 compactInterval := t.getCompactInterval()
101 retryInterval := t.getRetryInterval()
102 retentions := t.getRetentions()
103
104 go func() {
105 lastSuccess := t.clock.Now()
106 baseInterval := t.period
107 for {
108 t.revs = append(t.revs, t.rg.Rev())
109 if len(t.revs) > retentions {
110 t.revs = t.revs[1:] // t.revs[0] is always the rev at t.period ago
111 }
112
113 select {
114 case <-t.ctx.Done():
115 return
116 case <-t.clock.After(retryInterval):
117 t.mu.Lock()
118 p := t.paused
119 t.mu.Unlock()
120 if p {
121 continue
122 }
123 }
124
125 if t.clock.Now().Sub(lastSuccess) < baseInterval {
126 continue
127 }
128
129 // wait up to initial given period
130 if baseInterval == t.period {
131 baseInterval = compactInterval
132 }
133 rev := t.revs[0]
134
135 plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period)
136 _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev})
137 if err == nil || err == mvcc.ErrCompacted {
138 lastSuccess = t.clock.Now()
139 plog.Noticef("Finished auto-compaction at revision %d", rev)
140 } else {
141 plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err)
142 plog.Noticef("Retry after %v", retryInterval)
143 }
144 }
145 }()
146}
147
148// if given compaction period x is <1-hour, compact every x duration.
149// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute)
150// if given compaction period x is >1-hour, compact every hour.
151// (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour)
152func (t *Periodic) getCompactInterval() time.Duration {
153 itv := t.period
154 if itv > time.Hour {
155 itv = time.Hour
156 }
157 return itv
158}
159
160func (t *Periodic) getRetentions() int {
161 return int(t.period/t.getRetryInterval()) + 1
162}
163
164const retryDivisor = 10
165
166func (t *Periodic) getRetryInterval() time.Duration {
167 itv := t.period
168 if itv > time.Hour {
169 itv = time.Hour
170 }
171 return itv / retryDivisor
172}
173
174// Stop stops periodic compactor.
175func (t *Periodic) Stop() {
176 t.cancel()
177}
178
179// Pause pauses periodic compactor.
180func (t *Periodic) Pause() {
181 t.mu.Lock()
182 defer t.mu.Unlock()
183 t.paused = true
184}
185
186// Resume resumes periodic compactor.
187func (t *Periodic) Resume() {
188 t.mu.Lock()
189 defer t.mu.Unlock()
190 t.paused = false
191}