khenaidoo | ab1f7bd | 2019-11-14 14:00:27 -0500 | [diff] [blame] | 1 | // Copyright 2017 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package v3compactor |
| 16 | |
| 17 | import ( |
| 18 | "context" |
| 19 | "sync" |
| 20 | "time" |
| 21 | |
| 22 | pb "go.etcd.io/etcd/etcdserver/etcdserverpb" |
| 23 | "go.etcd.io/etcd/mvcc" |
| 24 | |
| 25 | "github.com/jonboulle/clockwork" |
| 26 | "go.uber.org/zap" |
| 27 | ) |
| 28 | |
| 29 | // Periodic compacts the log by purging revisions older than |
| 30 | // the configured retention time. |
| 31 | type Periodic struct { |
| 32 | lg *zap.Logger |
| 33 | clock clockwork.Clock |
| 34 | period time.Duration |
| 35 | |
| 36 | rg RevGetter |
| 37 | c Compactable |
| 38 | |
| 39 | revs []int64 |
| 40 | ctx context.Context |
| 41 | cancel context.CancelFunc |
| 42 | |
| 43 | // mu protects paused |
| 44 | mu sync.RWMutex |
| 45 | paused bool |
| 46 | } |
| 47 | |
| 48 | // newPeriodic creates a new instance of Periodic compactor that purges |
| 49 | // the log older than h Duration. |
| 50 | func newPeriodic(lg *zap.Logger, clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic { |
| 51 | pc := &Periodic{ |
| 52 | lg: lg, |
| 53 | clock: clock, |
| 54 | period: h, |
| 55 | rg: rg, |
| 56 | c: c, |
| 57 | revs: make([]int64, 0), |
| 58 | } |
| 59 | pc.ctx, pc.cancel = context.WithCancel(context.Background()) |
| 60 | return pc |
| 61 | } |
| 62 | |
| 63 | /* |
| 64 | Compaction period 1-hour: |
| 65 | 1. compute compaction period, which is 1-hour |
| 66 | 2. record revisions for every 1/10 of 1-hour (6-minute) |
| 67 | 3. keep recording revisions with no compaction for first 1-hour |
| 68 | 4. do compact with revs[0] |
| 69 | - success? contiue on for-loop and move sliding window; revs = revs[1:] |
| 70 | - failure? update revs, and retry after 1/10 of 1-hour (6-minute) |
| 71 | |
| 72 | Compaction period 24-hour: |
| 73 | 1. compute compaction period, which is 1-hour |
| 74 | 2. record revisions for every 1/10 of 1-hour (6-minute) |
| 75 | 3. keep recording revisions with no compaction for first 24-hour |
| 76 | 4. do compact with revs[0] |
| 77 | - success? contiue on for-loop and move sliding window; revs = revs[1:] |
| 78 | - failure? update revs, and retry after 1/10 of 1-hour (6-minute) |
| 79 | |
| 80 | Compaction period 59-min: |
| 81 | 1. compute compaction period, which is 59-min |
| 82 | 2. record revisions for every 1/10 of 59-min (5.9-min) |
| 83 | 3. keep recording revisions with no compaction for first 59-min |
| 84 | 4. do compact with revs[0] |
| 85 | - success? contiue on for-loop and move sliding window; revs = revs[1:] |
| 86 | - failure? update revs, and retry after 1/10 of 59-min (5.9-min) |
| 87 | |
| 88 | Compaction period 5-sec: |
| 89 | 1. compute compaction period, which is 5-sec |
| 90 | 2. record revisions for every 1/10 of 5-sec (0.5-sec) |
| 91 | 3. keep recording revisions with no compaction for first 5-sec |
| 92 | 4. do compact with revs[0] |
| 93 | - success? contiue on for-loop and move sliding window; revs = revs[1:] |
| 94 | - failure? update revs, and retry after 1/10 of 5-sec (0.5-sec) |
| 95 | */ |
| 96 | |
| 97 | // Run runs periodic compactor. |
| 98 | func (pc *Periodic) Run() { |
| 99 | compactInterval := pc.getCompactInterval() |
| 100 | retryInterval := pc.getRetryInterval() |
| 101 | retentions := pc.getRetentions() |
| 102 | |
| 103 | go func() { |
| 104 | lastSuccess := pc.clock.Now() |
| 105 | baseInterval := pc.period |
| 106 | for { |
| 107 | pc.revs = append(pc.revs, pc.rg.Rev()) |
| 108 | if len(pc.revs) > retentions { |
| 109 | pc.revs = pc.revs[1:] // pc.revs[0] is always the rev at pc.period ago |
| 110 | } |
| 111 | |
| 112 | select { |
| 113 | case <-pc.ctx.Done(): |
| 114 | return |
| 115 | case <-pc.clock.After(retryInterval): |
| 116 | pc.mu.Lock() |
| 117 | p := pc.paused |
| 118 | pc.mu.Unlock() |
| 119 | if p { |
| 120 | continue |
| 121 | } |
| 122 | } |
| 123 | |
| 124 | if pc.clock.Now().Sub(lastSuccess) < baseInterval { |
| 125 | continue |
| 126 | } |
| 127 | |
| 128 | // wait up to initial given period |
| 129 | if baseInterval == pc.period { |
| 130 | baseInterval = compactInterval |
| 131 | } |
| 132 | rev := pc.revs[0] |
| 133 | |
| 134 | if pc.lg != nil { |
| 135 | pc.lg.Info( |
| 136 | "starting auto periodic compaction", |
| 137 | zap.Int64("revision", rev), |
| 138 | zap.Duration("compact-period", pc.period), |
| 139 | ) |
| 140 | } else { |
| 141 | plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, pc.period) |
| 142 | } |
| 143 | _, err := pc.c.Compact(pc.ctx, &pb.CompactionRequest{Revision: rev}) |
| 144 | if err == nil || err == mvcc.ErrCompacted { |
| 145 | if pc.lg != nil { |
| 146 | pc.lg.Info( |
| 147 | "completed auto periodic compaction", |
| 148 | zap.Int64("revision", rev), |
| 149 | zap.Duration("compact-period", pc.period), |
| 150 | zap.Duration("took", time.Since(lastSuccess)), |
| 151 | ) |
| 152 | } else { |
| 153 | plog.Noticef("Finished auto-compaction at revision %d", rev) |
| 154 | } |
| 155 | lastSuccess = pc.clock.Now() |
| 156 | } else { |
| 157 | if pc.lg != nil { |
| 158 | pc.lg.Warn( |
| 159 | "failed auto periodic compaction", |
| 160 | zap.Int64("revision", rev), |
| 161 | zap.Duration("compact-period", pc.period), |
| 162 | zap.Duration("retry-interval", retryInterval), |
| 163 | zap.Error(err), |
| 164 | ) |
| 165 | } else { |
| 166 | plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) |
| 167 | plog.Noticef("Retry after %v", retryInterval) |
| 168 | } |
| 169 | } |
| 170 | } |
| 171 | }() |
| 172 | } |
| 173 | |
| 174 | // if given compaction period x is <1-hour, compact every x duration. |
| 175 | // (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute) |
| 176 | // if given compaction period x is >1-hour, compact every hour. |
| 177 | // (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour) |
| 178 | func (pc *Periodic) getCompactInterval() time.Duration { |
| 179 | itv := pc.period |
| 180 | if itv > time.Hour { |
| 181 | itv = time.Hour |
| 182 | } |
| 183 | return itv |
| 184 | } |
| 185 | |
| 186 | func (pc *Periodic) getRetentions() int { |
| 187 | return int(pc.period/pc.getRetryInterval()) + 1 |
| 188 | } |
| 189 | |
| 190 | const retryDivisor = 10 |
| 191 | |
| 192 | func (pc *Periodic) getRetryInterval() time.Duration { |
| 193 | itv := pc.period |
| 194 | if itv > time.Hour { |
| 195 | itv = time.Hour |
| 196 | } |
| 197 | return itv / retryDivisor |
| 198 | } |
| 199 | |
| 200 | // Stop stops periodic compactor. |
| 201 | func (pc *Periodic) Stop() { |
| 202 | pc.cancel() |
| 203 | } |
| 204 | |
| 205 | // Pause pauses periodic compactor. |
| 206 | func (pc *Periodic) Pause() { |
| 207 | pc.mu.Lock() |
| 208 | pc.paused = true |
| 209 | pc.mu.Unlock() |
| 210 | } |
| 211 | |
| 212 | // Resume resumes periodic compactor. |
| 213 | func (pc *Periodic) Resume() { |
| 214 | pc.mu.Lock() |
| 215 | pc.paused = false |
| 216 | pc.mu.Unlock() |
| 217 | } |