William Kurkian | ea86948 | 2019-04-09 15:16:11 -0400 | [diff] [blame] | 1 | package cluster |
| 2 | |
| 3 | import ( |
| 4 | "fmt" |
| 5 | "sort" |
| 6 | "sync" |
| 7 | ) |
| 8 | |
| 9 | type none struct{} |
| 10 | |
| 11 | type topicPartition struct { |
| 12 | Topic string |
| 13 | Partition int32 |
| 14 | } |
| 15 | |
| 16 | func (tp *topicPartition) String() string { |
| 17 | return fmt.Sprintf("%s-%d", tp.Topic, tp.Partition) |
| 18 | } |
| 19 | |
| 20 | type offsetInfo struct { |
| 21 | Offset int64 |
| 22 | Metadata string |
| 23 | } |
| 24 | |
| 25 | func (i offsetInfo) NextOffset(fallback int64) int64 { |
| 26 | if i.Offset > -1 { |
| 27 | return i.Offset |
| 28 | } |
| 29 | return fallback |
| 30 | } |
| 31 | |
| 32 | type int32Slice []int32 |
| 33 | |
| 34 | func (p int32Slice) Len() int { return len(p) } |
| 35 | func (p int32Slice) Less(i, j int) bool { return p[i] < p[j] } |
| 36 | func (p int32Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } |
| 37 | |
| 38 | func (p int32Slice) Diff(o int32Slice) (res []int32) { |
| 39 | on := len(o) |
| 40 | for _, x := range p { |
| 41 | n := sort.Search(on, func(i int) bool { return o[i] >= x }) |
| 42 | if n < on && o[n] == x { |
| 43 | continue |
| 44 | } |
| 45 | res = append(res, x) |
| 46 | } |
| 47 | return |
| 48 | } |
| 49 | |
| 50 | // -------------------------------------------------------------------- |
| 51 | |
| 52 | type loopTomb struct { |
| 53 | c chan none |
| 54 | o sync.Once |
| 55 | w sync.WaitGroup |
| 56 | } |
| 57 | |
| 58 | func newLoopTomb() *loopTomb { |
| 59 | return &loopTomb{c: make(chan none)} |
| 60 | } |
| 61 | |
| 62 | func (t *loopTomb) stop() { t.o.Do(func() { close(t.c) }) } |
| 63 | func (t *loopTomb) Close() { t.stop(); t.w.Wait() } |
| 64 | |
| 65 | func (t *loopTomb) Dying() <-chan none { return t.c } |
| 66 | func (t *loopTomb) Go(f func(<-chan none)) { |
| 67 | t.w.Add(1) |
| 68 | |
| 69 | go func() { |
| 70 | defer t.stop() |
| 71 | defer t.w.Done() |
| 72 | |
| 73 | f(t.c) |
| 74 | }() |
| 75 | } |