khenaidoo | d948f77 | 2021-08-11 17:49:24 -0400 | [diff] [blame] | 1 | // Copyright 2017 The etcd Authors |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package compactor |
| 16 | |
| 17 | import ( |
| 18 | "context" |
| 19 | "sync" |
| 20 | "time" |
| 21 | |
| 22 | pb "github.com/coreos/etcd/etcdserver/etcdserverpb" |
| 23 | "github.com/coreos/etcd/mvcc" |
| 24 | |
| 25 | "github.com/jonboulle/clockwork" |
| 26 | ) |
| 27 | |
| 28 | // Periodic compacts the log by purging revisions older than |
| 29 | // the configured retention time. |
| 30 | type Periodic struct { |
| 31 | clock clockwork.Clock |
| 32 | period time.Duration |
| 33 | |
| 34 | rg RevGetter |
| 35 | c Compactable |
| 36 | |
| 37 | revs []int64 |
| 38 | ctx context.Context |
| 39 | cancel context.CancelFunc |
| 40 | |
| 41 | // mu protects paused |
| 42 | mu sync.RWMutex |
| 43 | paused bool |
| 44 | } |
| 45 | |
| 46 | // NewPeriodic creates a new instance of Periodic compactor that purges |
| 47 | // the log older than h Duration. |
| 48 | func NewPeriodic(h time.Duration, rg RevGetter, c Compactable) *Periodic { |
| 49 | return newPeriodic(clockwork.NewRealClock(), h, rg, c) |
| 50 | } |
| 51 | |
| 52 | func newPeriodic(clock clockwork.Clock, h time.Duration, rg RevGetter, c Compactable) *Periodic { |
| 53 | t := &Periodic{ |
| 54 | clock: clock, |
| 55 | period: h, |
| 56 | rg: rg, |
| 57 | c: c, |
| 58 | revs: make([]int64, 0), |
| 59 | } |
| 60 | t.ctx, t.cancel = context.WithCancel(context.Background()) |
| 61 | return t |
| 62 | } |
| 63 | |
| 64 | /* |
| 65 | Compaction period 1-hour: |
| 66 | 1. compute compaction period, which is 1-hour |
| 67 | 2. record revisions for every 1/10 of 1-hour (6-minute) |
| 68 | 3. keep recording revisions with no compaction for first 1-hour |
| 69 | 4. do compact with revs[0] |
| 70 | - success? contiue on for-loop and move sliding window; revs = revs[1:] |
| 71 | - failure? update revs, and retry after 1/10 of 1-hour (6-minute) |
| 72 | |
| 73 | Compaction period 24-hour: |
| 74 | 1. compute compaction period, which is 1-hour |
| 75 | 2. record revisions for every 1/10 of 1-hour (6-minute) |
| 76 | 3. keep recording revisions with no compaction for first 24-hour |
| 77 | 4. do compact with revs[0] |
| 78 | - success? contiue on for-loop and move sliding window; revs = revs[1:] |
| 79 | - failure? update revs, and retry after 1/10 of 1-hour (6-minute) |
| 80 | |
| 81 | Compaction period 59-min: |
| 82 | 1. compute compaction period, which is 59-min |
| 83 | 2. record revisions for every 1/10 of 59-min (5.9-min) |
| 84 | 3. keep recording revisions with no compaction for first 59-min |
| 85 | 4. do compact with revs[0] |
| 86 | - success? contiue on for-loop and move sliding window; revs = revs[1:] |
| 87 | - failure? update revs, and retry after 1/10 of 59-min (5.9-min) |
| 88 | |
| 89 | Compaction period 5-sec: |
| 90 | 1. compute compaction period, which is 5-sec |
| 91 | 2. record revisions for every 1/10 of 5-sec (0.5-sec) |
| 92 | 3. keep recording revisions with no compaction for first 5-sec |
| 93 | 4. do compact with revs[0] |
| 94 | - success? contiue on for-loop and move sliding window; revs = revs[1:] |
| 95 | - failure? update revs, and retry after 1/10 of 5-sec (0.5-sec) |
| 96 | */ |
| 97 | |
| 98 | // Run runs periodic compactor. |
| 99 | func (t *Periodic) Run() { |
| 100 | compactInterval := t.getCompactInterval() |
| 101 | retryInterval := t.getRetryInterval() |
| 102 | retentions := t.getRetentions() |
| 103 | |
| 104 | go func() { |
| 105 | lastSuccess := t.clock.Now() |
| 106 | baseInterval := t.period |
| 107 | for { |
| 108 | t.revs = append(t.revs, t.rg.Rev()) |
| 109 | if len(t.revs) > retentions { |
| 110 | t.revs = t.revs[1:] // t.revs[0] is always the rev at t.period ago |
| 111 | } |
| 112 | |
| 113 | select { |
| 114 | case <-t.ctx.Done(): |
| 115 | return |
| 116 | case <-t.clock.After(retryInterval): |
| 117 | t.mu.Lock() |
| 118 | p := t.paused |
| 119 | t.mu.Unlock() |
| 120 | if p { |
| 121 | continue |
| 122 | } |
| 123 | } |
| 124 | |
| 125 | if t.clock.Now().Sub(lastSuccess) < baseInterval { |
| 126 | continue |
| 127 | } |
| 128 | |
| 129 | // wait up to initial given period |
| 130 | if baseInterval == t.period { |
| 131 | baseInterval = compactInterval |
| 132 | } |
| 133 | rev := t.revs[0] |
| 134 | |
| 135 | plog.Noticef("Starting auto-compaction at revision %d (retention: %v)", rev, t.period) |
| 136 | _, err := t.c.Compact(t.ctx, &pb.CompactionRequest{Revision: rev}) |
| 137 | if err == nil || err == mvcc.ErrCompacted { |
| 138 | lastSuccess = t.clock.Now() |
| 139 | plog.Noticef("Finished auto-compaction at revision %d", rev) |
| 140 | } else { |
| 141 | plog.Noticef("Failed auto-compaction at revision %d (%v)", rev, err) |
| 142 | plog.Noticef("Retry after %v", retryInterval) |
| 143 | } |
| 144 | } |
| 145 | }() |
| 146 | } |
| 147 | |
| 148 | // if given compaction period x is <1-hour, compact every x duration. |
| 149 | // (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='10m', then compact every 10-minute) |
| 150 | // if given compaction period x is >1-hour, compact every hour. |
| 151 | // (e.g. --auto-compaction-mode 'periodic' --auto-compaction-retention='2h', then compact every 1-hour) |
| 152 | func (t *Periodic) getCompactInterval() time.Duration { |
| 153 | itv := t.period |
| 154 | if itv > time.Hour { |
| 155 | itv = time.Hour |
| 156 | } |
| 157 | return itv |
| 158 | } |
| 159 | |
| 160 | func (t *Periodic) getRetentions() int { |
| 161 | return int(t.period/t.getRetryInterval()) + 1 |
| 162 | } |
| 163 | |
| 164 | const retryDivisor = 10 |
| 165 | |
| 166 | func (t *Periodic) getRetryInterval() time.Duration { |
| 167 | itv := t.period |
| 168 | if itv > time.Hour { |
| 169 | itv = time.Hour |
| 170 | } |
| 171 | return itv / retryDivisor |
| 172 | } |
| 173 | |
| 174 | // Stop stops periodic compactor. |
| 175 | func (t *Periodic) Stop() { |
| 176 | t.cancel() |
| 177 | } |
| 178 | |
| 179 | // Pause pauses periodic compactor. |
| 180 | func (t *Periodic) Pause() { |
| 181 | t.mu.Lock() |
| 182 | defer t.mu.Unlock() |
| 183 | t.paused = true |
| 184 | } |
| 185 | |
| 186 | // Resume resumes periodic compactor. |
| 187 | func (t *Periodic) Resume() { |
| 188 | t.mu.Lock() |
| 189 | defer t.mu.Unlock() |
| 190 | t.paused = false |
| 191 | } |