blob: bfaa587830d729b03a3a612e50d9b389f1e76080 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package cluster
2
3import (
4 "sort"
5 "sync"
6 "time"
7
8 "github.com/Shopify/sarama"
9)
10
11// PartitionConsumer allows code to consume individual partitions from the cluster.
12//
13// See docs for Consumer.Partitions() for more on how to implement this.
14type PartitionConsumer interface {
15 sarama.PartitionConsumer
16
17 // Topic returns the consumed topic name
18 Topic() string
19
20 // Partition returns the consumed partition
21 Partition() int32
22
23 // InitialOffset returns the offset used for creating the PartitionConsumer instance.
24 // The returned offset can be a literal offset, or OffsetNewest, or OffsetOldest
25 InitialOffset() int64
26
27 // MarkOffset marks the offset of a message as preocessed.
28 MarkOffset(offset int64, metadata string)
29
30 // ResetOffset resets the offset to a previously processed message.
31 ResetOffset(offset int64, metadata string)
32}
33
34type partitionConsumer struct {
35 sarama.PartitionConsumer
36
37 state partitionState
38 mu sync.Mutex
39
40 topic string
41 partition int32
42 initialOffset int64
43
44 closeOnce sync.Once
45 closeErr error
46
47 dying, dead chan none
48}
49
50func newPartitionConsumer(manager sarama.Consumer, topic string, partition int32, info offsetInfo, defaultOffset int64) (*partitionConsumer, error) {
51 offset := info.NextOffset(defaultOffset)
52 pcm, err := manager.ConsumePartition(topic, partition, offset)
53
54 // Resume from default offset, if requested offset is out-of-range
55 if err == sarama.ErrOffsetOutOfRange {
56 info.Offset = -1
57 offset = defaultOffset
58 pcm, err = manager.ConsumePartition(topic, partition, offset)
59 }
60 if err != nil {
61 return nil, err
62 }
63
64 return &partitionConsumer{
65 PartitionConsumer: pcm,
66 state: partitionState{Info: info},
67
68 topic: topic,
69 partition: partition,
70 initialOffset: offset,
71
72 dying: make(chan none),
73 dead: make(chan none),
74 }, nil
75}
76
77// Topic implements PartitionConsumer
78func (c *partitionConsumer) Topic() string { return c.topic }
79
80// Partition implements PartitionConsumer
81func (c *partitionConsumer) Partition() int32 { return c.partition }
82
83// InitialOffset implements PartitionConsumer
84func (c *partitionConsumer) InitialOffset() int64 { return c.initialOffset }
85
86// AsyncClose implements PartitionConsumer
87func (c *partitionConsumer) AsyncClose() {
88 c.closeOnce.Do(func() {
89 c.closeErr = c.PartitionConsumer.Close()
90 close(c.dying)
91 })
92}
93
94// Close implements PartitionConsumer
95func (c *partitionConsumer) Close() error {
96 c.AsyncClose()
97 <-c.dead
98 return c.closeErr
99}
100
101func (c *partitionConsumer) waitFor(stopper <-chan none, errors chan<- error) {
102 defer close(c.dead)
103
104 for {
105 select {
106 case err, ok := <-c.Errors():
107 if !ok {
108 return
109 }
110 select {
111 case errors <- err:
112 case <-stopper:
113 return
114 case <-c.dying:
115 return
116 }
117 case <-stopper:
118 return
119 case <-c.dying:
120 return
121 }
122 }
123}
124
125func (c *partitionConsumer) multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) {
126 defer close(c.dead)
127
128 for {
129 select {
130 case msg, ok := <-c.Messages():
131 if !ok {
132 return
133 }
134 select {
135 case messages <- msg:
136 case <-stopper:
137 return
138 case <-c.dying:
139 return
140 }
141 case err, ok := <-c.Errors():
142 if !ok {
143 return
144 }
145 select {
146 case errors <- err:
147 case <-stopper:
148 return
149 case <-c.dying:
150 return
151 }
152 case <-stopper:
153 return
154 case <-c.dying:
155 return
156 }
157 }
158}
159
160func (c *partitionConsumer) getState() partitionState {
161 c.mu.Lock()
162 state := c.state
163 c.mu.Unlock()
164
165 return state
166}
167
168func (c *partitionConsumer) markCommitted(offset int64) {
169 c.mu.Lock()
170 if offset == c.state.Info.Offset {
171 c.state.Dirty = false
172 }
173 c.mu.Unlock()
174}
175
176// MarkOffset implements PartitionConsumer
177func (c *partitionConsumer) MarkOffset(offset int64, metadata string) {
178 c.mu.Lock()
179 if next := offset + 1; next > c.state.Info.Offset {
180 c.state.Info.Offset = next
181 c.state.Info.Metadata = metadata
182 c.state.Dirty = true
183 }
184 c.mu.Unlock()
185}
186
187// ResetOffset implements PartitionConsumer
188func (c *partitionConsumer) ResetOffset(offset int64, metadata string) {
189 c.mu.Lock()
190 if next := offset + 1; next <= c.state.Info.Offset {
191 c.state.Info.Offset = next
192 c.state.Info.Metadata = metadata
193 c.state.Dirty = true
194 }
195 c.mu.Unlock()
196}
197
198// --------------------------------------------------------------------
199
200type partitionState struct {
201 Info offsetInfo
202 Dirty bool
203 LastCommit time.Time
204}
205
206// --------------------------------------------------------------------
207
208type partitionMap struct {
209 data map[topicPartition]*partitionConsumer
210 mu sync.RWMutex
211}
212
213func newPartitionMap() *partitionMap {
214 return &partitionMap{
215 data: make(map[topicPartition]*partitionConsumer),
216 }
217}
218
219func (m *partitionMap) IsSubscribedTo(topic string) bool {
220 m.mu.RLock()
221 defer m.mu.RUnlock()
222
223 for tp := range m.data {
224 if tp.Topic == topic {
225 return true
226 }
227 }
228 return false
229}
230
231func (m *partitionMap) Fetch(topic string, partition int32) *partitionConsumer {
232 m.mu.RLock()
233 pc, _ := m.data[topicPartition{topic, partition}]
234 m.mu.RUnlock()
235 return pc
236}
237
238func (m *partitionMap) Store(topic string, partition int32, pc *partitionConsumer) {
239 m.mu.Lock()
240 m.data[topicPartition{topic, partition}] = pc
241 m.mu.Unlock()
242}
243
244func (m *partitionMap) Snapshot() map[topicPartition]partitionState {
245 m.mu.RLock()
246 defer m.mu.RUnlock()
247
248 snap := make(map[topicPartition]partitionState, len(m.data))
249 for tp, pc := range m.data {
250 snap[tp] = pc.getState()
251 }
252 return snap
253}
254
255func (m *partitionMap) Stop() {
256 m.mu.RLock()
257 defer m.mu.RUnlock()
258
259 var wg sync.WaitGroup
260 for tp := range m.data {
261 wg.Add(1)
262 go func(p *partitionConsumer) {
263 _ = p.Close()
264 wg.Done()
265 }(m.data[tp])
266 }
267 wg.Wait()
268}
269
270func (m *partitionMap) Clear() {
271 m.mu.Lock()
272 for tp := range m.data {
273 delete(m.data, tp)
274 }
275 m.mu.Unlock()
276}
277
278func (m *partitionMap) Info() map[string][]int32 {
279 info := make(map[string][]int32)
280 m.mu.RLock()
281 for tp := range m.data {
282 info[tp.Topic] = append(info[tp.Topic], tp.Partition)
283 }
284 m.mu.RUnlock()
285
286 for topic := range info {
287 sort.Sort(int32Slice(info[topic]))
288 }
289 return info
290}