blob: fe55200c63f73c68813a2255dc662022c9996235 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "fmt"
5)
6
7// TestReporter has methods matching go's testing.T to avoid importing
8// `testing` in the main part of the library.
9type TestReporter interface {
10 Error(...interface{})
11 Errorf(string, ...interface{})
12 Fatal(...interface{})
13 Fatalf(string, ...interface{})
14}
15
16// MockResponse is a response builder interface it defines one method that
17// allows generating a response based on a request body. MockResponses are used
18// to program behavior of MockBroker in tests.
19type MockResponse interface {
20 For(reqBody versionedDecoder) (res encoder)
21}
22
23// MockWrapper is a mock response builder that returns a particular concrete
24// response regardless of the actual request passed to the `For` method.
25type MockWrapper struct {
26 res encoder
27}
28
29func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoder) {
30 return mw.res
31}
32
33func NewMockWrapper(res encoder) *MockWrapper {
34 return &MockWrapper{res: res}
35}
36
37// MockSequence is a mock response builder that is created from a sequence of
38// concrete responses. Every time when a `MockBroker` calls its `For` method
39// the next response from the sequence is returned. When the end of the
40// sequence is reached the last element from the sequence is returned.
41type MockSequence struct {
42 responses []MockResponse
43}
44
45func NewMockSequence(responses ...interface{}) *MockSequence {
46 ms := &MockSequence{}
47 ms.responses = make([]MockResponse, len(responses))
48 for i, res := range responses {
49 switch res := res.(type) {
50 case MockResponse:
51 ms.responses[i] = res
52 case encoder:
53 ms.responses[i] = NewMockWrapper(res)
54 default:
55 panic(fmt.Sprintf("Unexpected response type: %T", res))
56 }
57 }
58 return ms
59}
60
61func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) {
62 res = mc.responses[0].For(reqBody)
63 if len(mc.responses) > 1 {
64 mc.responses = mc.responses[1:]
65 }
66 return res
67}
68
69// MockMetadataResponse is a `MetadataResponse` builder.
70type MockMetadataResponse struct {
71 controllerID int32
72 leaders map[string]map[int32]int32
73 brokers map[string]int32
74 t TestReporter
75}
76
77func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
78 return &MockMetadataResponse{
79 leaders: make(map[string]map[int32]int32),
80 brokers: make(map[string]int32),
81 t: t,
82 }
83}
84
85func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse {
86 partitions := mmr.leaders[topic]
87 if partitions == nil {
88 partitions = make(map[int32]int32)
89 mmr.leaders[topic] = partitions
90 }
91 partitions[partition] = brokerID
92 return mmr
93}
94
95func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse {
96 mmr.brokers[addr] = brokerID
97 return mmr
98}
99
100func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {
101 mmr.controllerID = brokerID
102 return mmr
103}
104
105func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
106 metadataRequest := reqBody.(*MetadataRequest)
107 metadataResponse := &MetadataResponse{
108 Version: metadataRequest.version(),
109 ControllerID: mmr.controllerID,
110 }
111 for addr, brokerID := range mmr.brokers {
112 metadataResponse.AddBroker(addr, brokerID)
113 }
114 if len(metadataRequest.Topics) == 0 {
115 for topic, partitions := range mmr.leaders {
116 for partition, brokerID := range partitions {
117 metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError)
118 }
119 }
120 return metadataResponse
121 }
122 for _, topic := range metadataRequest.Topics {
123 for partition, brokerID := range mmr.leaders[topic] {
124 metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError)
125 }
126 }
127 return metadataResponse
128}
129
130// MockOffsetResponse is an `OffsetResponse` builder.
131type MockOffsetResponse struct {
132 offsets map[string]map[int32]map[int64]int64
133 t TestReporter
134 version int16
135}
136
137func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
138 return &MockOffsetResponse{
139 offsets: make(map[string]map[int32]map[int64]int64),
140 t: t,
141 }
142}
143
144func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse {
145 mor.version = version
146 return mor
147}
148
149func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
150 partitions := mor.offsets[topic]
151 if partitions == nil {
152 partitions = make(map[int32]map[int64]int64)
153 mor.offsets[topic] = partitions
154 }
155 times := partitions[partition]
156 if times == nil {
157 times = make(map[int64]int64)
158 partitions[partition] = times
159 }
160 times[time] = offset
161 return mor
162}
163
164func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder {
165 offsetRequest := reqBody.(*OffsetRequest)
166 offsetResponse := &OffsetResponse{Version: mor.version}
167 for topic, partitions := range offsetRequest.blocks {
168 for partition, block := range partitions {
169 offset := mor.getOffset(topic, partition, block.time)
170 offsetResponse.AddTopicPartition(topic, partition, offset)
171 }
172 }
173 return offsetResponse
174}
175
176func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
177 partitions := mor.offsets[topic]
178 if partitions == nil {
179 mor.t.Errorf("missing topic: %s", topic)
180 }
181 times := partitions[partition]
182 if times == nil {
183 mor.t.Errorf("missing partition: %d", partition)
184 }
185 offset, ok := times[time]
186 if !ok {
187 mor.t.Errorf("missing time: %d", time)
188 }
189 return offset
190}
191
192// MockFetchResponse is a `FetchResponse` builder.
193type MockFetchResponse struct {
194 messages map[string]map[int32]map[int64]Encoder
195 highWaterMarks map[string]map[int32]int64
196 t TestReporter
197 batchSize int
198 version int16
199}
200
201func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
202 return &MockFetchResponse{
203 messages: make(map[string]map[int32]map[int64]Encoder),
204 highWaterMarks: make(map[string]map[int32]int64),
205 t: t,
206 batchSize: batchSize,
207 }
208}
209
210func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse {
211 mfr.version = version
212 return mfr
213}
214
215func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
216 partitions := mfr.messages[topic]
217 if partitions == nil {
218 partitions = make(map[int32]map[int64]Encoder)
219 mfr.messages[topic] = partitions
220 }
221 messages := partitions[partition]
222 if messages == nil {
223 messages = make(map[int64]Encoder)
224 partitions[partition] = messages
225 }
226 messages[offset] = msg
227 return mfr
228}
229
230func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse {
231 partitions := mfr.highWaterMarks[topic]
232 if partitions == nil {
233 partitions = make(map[int32]int64)
234 mfr.highWaterMarks[topic] = partitions
235 }
236 partitions[partition] = offset
237 return mfr
238}
239
240func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder {
241 fetchRequest := reqBody.(*FetchRequest)
242 res := &FetchResponse{
243 Version: mfr.version,
244 }
245 for topic, partitions := range fetchRequest.blocks {
246 for partition, block := range partitions {
247 initialOffset := block.fetchOffset
248 offset := initialOffset
249 maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
250 for i := 0; i < mfr.batchSize && offset < maxOffset; {
251 msg := mfr.getMessage(topic, partition, offset)
252 if msg != nil {
253 res.AddMessage(topic, partition, nil, msg, offset)
254 i++
255 }
256 offset++
257 }
258 fb := res.GetBlock(topic, partition)
259 if fb == nil {
260 res.AddError(topic, partition, ErrNoError)
261 fb = res.GetBlock(topic, partition)
262 }
263 fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
264 }
265 }
266 return res
267}
268
269func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
270 partitions := mfr.messages[topic]
271 if partitions == nil {
272 return nil
273 }
274 messages := partitions[partition]
275 if messages == nil {
276 return nil
277 }
278 return messages[offset]
279}
280
281func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int {
282 partitions := mfr.messages[topic]
283 if partitions == nil {
284 return 0
285 }
286 messages := partitions[partition]
287 if messages == nil {
288 return 0
289 }
290 return len(messages)
291}
292
293func (mfr *MockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
294 partitions := mfr.highWaterMarks[topic]
295 if partitions == nil {
296 return 0
297 }
298 return partitions[partition]
299}
300
301// MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
302type MockConsumerMetadataResponse struct {
303 coordinators map[string]interface{}
304 t TestReporter
305}
306
307func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse {
308 return &MockConsumerMetadataResponse{
309 coordinators: make(map[string]interface{}),
310 t: t,
311 }
312}
313
314func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse {
315 mr.coordinators[group] = broker
316 return mr
317}
318
319func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse {
320 mr.coordinators[group] = kerror
321 return mr
322}
323
324func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder {
325 req := reqBody.(*ConsumerMetadataRequest)
326 group := req.ConsumerGroup
327 res := &ConsumerMetadataResponse{}
328 v := mr.coordinators[group]
329 switch v := v.(type) {
330 case *MockBroker:
331 res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
332 case KError:
333 res.Err = v
334 }
335 return res
336}
337
338// MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
339type MockFindCoordinatorResponse struct {
340 groupCoordinators map[string]interface{}
341 transCoordinators map[string]interface{}
342 t TestReporter
343}
344
345func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {
346 return &MockFindCoordinatorResponse{
347 groupCoordinators: make(map[string]interface{}),
348 transCoordinators: make(map[string]interface{}),
349 t: t,
350 }
351}
352
353func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {
354 switch coordinatorType {
355 case CoordinatorGroup:
356 mr.groupCoordinators[group] = broker
357 case CoordinatorTransaction:
358 mr.transCoordinators[group] = broker
359 }
360 return mr
361}
362
363func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {
364 switch coordinatorType {
365 case CoordinatorGroup:
366 mr.groupCoordinators[group] = kerror
367 case CoordinatorTransaction:
368 mr.transCoordinators[group] = kerror
369 }
370 return mr
371}
372
373func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoder {
374 req := reqBody.(*FindCoordinatorRequest)
375 res := &FindCoordinatorResponse{}
376 var v interface{}
377 switch req.CoordinatorType {
378 case CoordinatorGroup:
379 v = mr.groupCoordinators[req.CoordinatorKey]
380 case CoordinatorTransaction:
381 v = mr.transCoordinators[req.CoordinatorKey]
382 }
383 switch v := v.(type) {
384 case *MockBroker:
385 res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
386 case KError:
387 res.Err = v
388 }
389 return res
390}
391
392// MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
393type MockOffsetCommitResponse struct {
394 errors map[string]map[string]map[int32]KError
395 t TestReporter
396}
397
398func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse {
399 return &MockOffsetCommitResponse{t: t}
400}
401
402func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse {
403 if mr.errors == nil {
404 mr.errors = make(map[string]map[string]map[int32]KError)
405 }
406 topics := mr.errors[group]
407 if topics == nil {
408 topics = make(map[string]map[int32]KError)
409 mr.errors[group] = topics
410 }
411 partitions := topics[topic]
412 if partitions == nil {
413 partitions = make(map[int32]KError)
414 topics[topic] = partitions
415 }
416 partitions[partition] = kerror
417 return mr
418}
419
420func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoder {
421 req := reqBody.(*OffsetCommitRequest)
422 group := req.ConsumerGroup
423 res := &OffsetCommitResponse{}
424 for topic, partitions := range req.blocks {
425 for partition := range partitions {
426 res.AddError(topic, partition, mr.getError(group, topic, partition))
427 }
428 }
429 return res
430}
431
432func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
433 topics := mr.errors[group]
434 if topics == nil {
435 return ErrNoError
436 }
437 partitions := topics[topic]
438 if partitions == nil {
439 return ErrNoError
440 }
441 kerror, ok := partitions[partition]
442 if !ok {
443 return ErrNoError
444 }
445 return kerror
446}
447
448// MockProduceResponse is a `ProduceResponse` builder.
449type MockProduceResponse struct {
450 version int16
451 errors map[string]map[int32]KError
452 t TestReporter
453}
454
455func NewMockProduceResponse(t TestReporter) *MockProduceResponse {
456 return &MockProduceResponse{t: t}
457}
458
459func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {
460 mr.version = version
461 return mr
462}
463
464func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {
465 if mr.errors == nil {
466 mr.errors = make(map[string]map[int32]KError)
467 }
468 partitions := mr.errors[topic]
469 if partitions == nil {
470 partitions = make(map[int32]KError)
471 mr.errors[topic] = partitions
472 }
473 partitions[partition] = kerror
474 return mr
475}
476
477func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder {
478 req := reqBody.(*ProduceRequest)
479 res := &ProduceResponse{
480 Version: mr.version,
481 }
482 for topic, partitions := range req.records {
483 for partition := range partitions {
484 res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
485 }
486 }
487 return res
488}
489
490func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
491 partitions := mr.errors[topic]
492 if partitions == nil {
493 return ErrNoError
494 }
495 kerror, ok := partitions[partition]
496 if !ok {
497 return ErrNoError
498 }
499 return kerror
500}
501
502// MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
503type MockOffsetFetchResponse struct {
504 offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
505 t TestReporter
506}
507
508func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse {
509 return &MockOffsetFetchResponse{t: t}
510}
511
512func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse {
513 if mr.offsets == nil {
514 mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
515 }
516 topics := mr.offsets[group]
517 if topics == nil {
518 topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
519 mr.offsets[group] = topics
520 }
521 partitions := topics[topic]
522 if partitions == nil {
523 partitions = make(map[int32]*OffsetFetchResponseBlock)
524 topics[topic] = partitions
525 }
526 partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
527 return mr
528}
529
530func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
531 req := reqBody.(*OffsetFetchRequest)
532 group := req.ConsumerGroup
533 res := &OffsetFetchResponse{}
534 for topic, partitions := range mr.offsets[group] {
535 for partition, block := range partitions {
536 res.AddBlock(topic, partition, block)
537 }
538 }
539 return res
540}
541
542type MockCreateTopicsResponse struct {
543 t TestReporter
544}
545
546func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {
547 return &MockCreateTopicsResponse{t: t}
548}
549
550func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder {
551 req := reqBody.(*CreateTopicsRequest)
552 res := &CreateTopicsResponse{}
553 res.TopicErrors = make(map[string]*TopicError)
554
555 for topic, _ := range req.TopicDetails {
556 res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
557 }
558 return res
559}
560
561type MockDeleteTopicsResponse struct {
562 t TestReporter
563}
564
565func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse {
566 return &MockDeleteTopicsResponse{t: t}
567}
568
569func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoder {
570 req := reqBody.(*DeleteTopicsRequest)
571 res := &DeleteTopicsResponse{}
572 res.TopicErrorCodes = make(map[string]KError)
573
574 for _, topic := range req.Topics {
575 res.TopicErrorCodes[topic] = ErrNoError
576 }
577 return res
578}
579
580type MockCreatePartitionsResponse struct {
581 t TestReporter
582}
583
584func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse {
585 return &MockCreatePartitionsResponse{t: t}
586}
587
588func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder {
589 req := reqBody.(*CreatePartitionsRequest)
590 res := &CreatePartitionsResponse{}
591 res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
592
593 for topic, _ := range req.TopicPartitions {
594 res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
595 }
596 return res
597}
598
599type MockDeleteRecordsResponse struct {
600 t TestReporter
601}
602
603func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse {
604 return &MockDeleteRecordsResponse{t: t}
605}
606
607func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoder {
608 req := reqBody.(*DeleteRecordsRequest)
609 res := &DeleteRecordsResponse{}
610 res.Topics = make(map[string]*DeleteRecordsResponseTopic)
611
612 for topic, deleteRecordRequestTopic := range req.Topics {
613 partitions := make(map[int32]*DeleteRecordsResponsePartition)
614 for partition, _ := range deleteRecordRequestTopic.PartitionOffsets {
615 partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError}
616 }
617 res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions}
618 }
619 return res
620}
621
622type MockDescribeConfigsResponse struct {
623 t TestReporter
624}
625
626func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse {
627 return &MockDescribeConfigsResponse{t: t}
628}
629
630func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
631 req := reqBody.(*DescribeConfigsRequest)
632 res := &DescribeConfigsResponse{}
633
634 var configEntries []*ConfigEntry
635 configEntries = append(configEntries, &ConfigEntry{Name: "my_topic",
636 Value: "my_topic",
637 ReadOnly: true,
638 Default: true,
639 Sensitive: false,
640 })
641
642 for _, r := range req.Resources {
643 res.Resources = append(res.Resources, &ResourceResponse{Name: r.Name, Configs: configEntries})
644 }
645 return res
646}
647
648type MockAlterConfigsResponse struct {
649 t TestReporter
650}
651
652func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse {
653 return &MockAlterConfigsResponse{t: t}
654}
655
656func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder {
657 req := reqBody.(*AlterConfigsRequest)
658 res := &AlterConfigsResponse{}
659
660 for _, r := range req.Resources {
661 res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name,
662 Type: TopicResource,
663 ErrorMsg: "",
664 })
665 }
666 return res
667}
668
669type MockCreateAclsResponse struct {
670 t TestReporter
671}
672
673func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse {
674 return &MockCreateAclsResponse{t: t}
675}
676
677func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoder {
678 req := reqBody.(*CreateAclsRequest)
679 res := &CreateAclsResponse{}
680
681 for range req.AclCreations {
682 res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError})
683 }
684 return res
685}
686
687type MockListAclsResponse struct {
688 t TestReporter
689}
690
691func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse {
692 return &MockListAclsResponse{t: t}
693}
694
695func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoder {
696 req := reqBody.(*DescribeAclsRequest)
697 res := &DescribeAclsResponse{}
698
699 res.Err = ErrNoError
700 acl := &ResourceAcls{}
701 acl.Resource.ResourceName = *req.ResourceName
702 acl.Resource.ResourceType = req.ResourceType
703 acl.Acls = append(acl.Acls, &Acl{})
704 res.ResourceAcls = append(res.ResourceAcls, acl)
705
706 return res
707}
708
709type MockDeleteAclsResponse struct {
710 t TestReporter
711}
712
713func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
714 return &MockDeleteAclsResponse{t: t}
715}
716
717func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder {
718 req := reqBody.(*DeleteAclsRequest)
719 res := &DeleteAclsResponse{}
720
721 for range req.Filters {
722 response := &FilterResponse{Err: ErrNoError}
723 response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError})
724 res.FilterResponses = append(res.FilterResponses, response)
725 }
726 return res
727}