blob: 984e5aaca88d4217d37f740979aa44abbdaaf256 [file] [log] [blame]
Pragya Arya324337e2020-02-20 14:35:08 +05301package sarama
2
3import (
4 "fmt"
5 "strings"
6)
7
8// TestReporter has methods matching go's testing.T to avoid importing
9// `testing` in the main part of the library.
10type TestReporter interface {
11 Error(...interface{})
12 Errorf(string, ...interface{})
13 Fatal(...interface{})
14 Fatalf(string, ...interface{})
15}
16
17// MockResponse is a response builder interface it defines one method that
18// allows generating a response based on a request body. MockResponses are used
19// to program behavior of MockBroker in tests.
20type MockResponse interface {
21 For(reqBody versionedDecoder) (res encoder)
22}
23
24// MockWrapper is a mock response builder that returns a particular concrete
25// response regardless of the actual request passed to the `For` method.
26type MockWrapper struct {
27 res encoder
28}
29
30func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoder) {
31 return mw.res
32}
33
34func NewMockWrapper(res encoder) *MockWrapper {
35 return &MockWrapper{res: res}
36}
37
38// MockSequence is a mock response builder that is created from a sequence of
39// concrete responses. Every time when a `MockBroker` calls its `For` method
40// the next response from the sequence is returned. When the end of the
41// sequence is reached the last element from the sequence is returned.
42type MockSequence struct {
43 responses []MockResponse
44}
45
46func NewMockSequence(responses ...interface{}) *MockSequence {
47 ms := &MockSequence{}
48 ms.responses = make([]MockResponse, len(responses))
49 for i, res := range responses {
50 switch res := res.(type) {
51 case MockResponse:
52 ms.responses[i] = res
53 case encoder:
54 ms.responses[i] = NewMockWrapper(res)
55 default:
56 panic(fmt.Sprintf("Unexpected response type: %T", res))
57 }
58 }
59 return ms
60}
61
62func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) {
63 res = mc.responses[0].For(reqBody)
64 if len(mc.responses) > 1 {
65 mc.responses = mc.responses[1:]
66 }
67 return res
68}
69
70type MockListGroupsResponse struct {
71 groups map[string]string
72 t TestReporter
73}
74
75func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse {
76 return &MockListGroupsResponse{
77 groups: make(map[string]string),
78 t: t,
79 }
80}
81
82func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoder {
83 request := reqBody.(*ListGroupsRequest)
84 _ = request
85 response := &ListGroupsResponse{
86 Groups: m.groups,
87 }
88 return response
89}
90
91func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse {
92 m.groups[groupID] = protocolType
93 return m
94}
95
96type MockDescribeGroupsResponse struct {
97 groups map[string]*GroupDescription
98 t TestReporter
99}
100
101func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse {
102 return &MockDescribeGroupsResponse{
103 t: t,
104 groups: make(map[string]*GroupDescription),
105 }
106}
107
108func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse {
109 m.groups[groupID] = description
110 return m
111}
112
113func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoder {
114 request := reqBody.(*DescribeGroupsRequest)
115
116 response := &DescribeGroupsResponse{}
117 for _, requestedGroup := range request.Groups {
118 if group, ok := m.groups[requestedGroup]; ok {
119 response.Groups = append(response.Groups, group)
120 } else {
121 // Mimic real kafka - if a group doesn't exist, return
122 // an entry with state "Dead"
123 response.Groups = append(response.Groups, &GroupDescription{
124 GroupId: requestedGroup,
125 State: "Dead",
126 })
127 }
128 }
129
130 return response
131}
132
133// MockMetadataResponse is a `MetadataResponse` builder.
134type MockMetadataResponse struct {
135 controllerID int32
136 leaders map[string]map[int32]int32
137 brokers map[string]int32
138 t TestReporter
139}
140
141func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
142 return &MockMetadataResponse{
143 leaders: make(map[string]map[int32]int32),
144 brokers: make(map[string]int32),
145 t: t,
146 }
147}
148
149func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse {
150 partitions := mmr.leaders[topic]
151 if partitions == nil {
152 partitions = make(map[int32]int32)
153 mmr.leaders[topic] = partitions
154 }
155 partitions[partition] = brokerID
156 return mmr
157}
158
159func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse {
160 mmr.brokers[addr] = brokerID
161 return mmr
162}
163
164func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {
165 mmr.controllerID = brokerID
166 return mmr
167}
168
169func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
170 metadataRequest := reqBody.(*MetadataRequest)
171 metadataResponse := &MetadataResponse{
172 Version: metadataRequest.version(),
173 ControllerID: mmr.controllerID,
174 }
175 for addr, brokerID := range mmr.brokers {
176 metadataResponse.AddBroker(addr, brokerID)
177 }
178
179 // Generate set of replicas
180 replicas := []int32{}
181 offlineReplicas := []int32{}
182 for _, brokerID := range mmr.brokers {
183 replicas = append(replicas, brokerID)
184 }
185
186 if len(metadataRequest.Topics) == 0 {
187 for topic, partitions := range mmr.leaders {
188 for partition, brokerID := range partitions {
189 metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
190 }
191 }
192 return metadataResponse
193 }
194 for _, topic := range metadataRequest.Topics {
195 for partition, brokerID := range mmr.leaders[topic] {
196 metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
197 }
198 }
199 return metadataResponse
200}
201
202// MockOffsetResponse is an `OffsetResponse` builder.
203type MockOffsetResponse struct {
204 offsets map[string]map[int32]map[int64]int64
205 t TestReporter
206 version int16
207}
208
209func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
210 return &MockOffsetResponse{
211 offsets: make(map[string]map[int32]map[int64]int64),
212 t: t,
213 }
214}
215
216func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse {
217 mor.version = version
218 return mor
219}
220
221func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
222 partitions := mor.offsets[topic]
223 if partitions == nil {
224 partitions = make(map[int32]map[int64]int64)
225 mor.offsets[topic] = partitions
226 }
227 times := partitions[partition]
228 if times == nil {
229 times = make(map[int64]int64)
230 partitions[partition] = times
231 }
232 times[time] = offset
233 return mor
234}
235
236func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder {
237 offsetRequest := reqBody.(*OffsetRequest)
238 offsetResponse := &OffsetResponse{Version: mor.version}
239 for topic, partitions := range offsetRequest.blocks {
240 for partition, block := range partitions {
241 offset := mor.getOffset(topic, partition, block.time)
242 offsetResponse.AddTopicPartition(topic, partition, offset)
243 }
244 }
245 return offsetResponse
246}
247
248func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
249 partitions := mor.offsets[topic]
250 if partitions == nil {
251 mor.t.Errorf("missing topic: %s", topic)
252 }
253 times := partitions[partition]
254 if times == nil {
255 mor.t.Errorf("missing partition: %d", partition)
256 }
257 offset, ok := times[time]
258 if !ok {
259 mor.t.Errorf("missing time: %d", time)
260 }
261 return offset
262}
263
264// MockFetchResponse is a `FetchResponse` builder.
265type MockFetchResponse struct {
266 messages map[string]map[int32]map[int64]Encoder
267 highWaterMarks map[string]map[int32]int64
268 t TestReporter
269 batchSize int
270 version int16
271}
272
273func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
274 return &MockFetchResponse{
275 messages: make(map[string]map[int32]map[int64]Encoder),
276 highWaterMarks: make(map[string]map[int32]int64),
277 t: t,
278 batchSize: batchSize,
279 }
280}
281
282func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse {
283 mfr.version = version
284 return mfr
285}
286
287func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
288 partitions := mfr.messages[topic]
289 if partitions == nil {
290 partitions = make(map[int32]map[int64]Encoder)
291 mfr.messages[topic] = partitions
292 }
293 messages := partitions[partition]
294 if messages == nil {
295 messages = make(map[int64]Encoder)
296 partitions[partition] = messages
297 }
298 messages[offset] = msg
299 return mfr
300}
301
302func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse {
303 partitions := mfr.highWaterMarks[topic]
304 if partitions == nil {
305 partitions = make(map[int32]int64)
306 mfr.highWaterMarks[topic] = partitions
307 }
308 partitions[partition] = offset
309 return mfr
310}
311
312func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder {
313 fetchRequest := reqBody.(*FetchRequest)
314 res := &FetchResponse{
315 Version: mfr.version,
316 }
317 for topic, partitions := range fetchRequest.blocks {
318 for partition, block := range partitions {
319 initialOffset := block.fetchOffset
320 offset := initialOffset
321 maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
322 for i := 0; i < mfr.batchSize && offset < maxOffset; {
323 msg := mfr.getMessage(topic, partition, offset)
324 if msg != nil {
325 res.AddMessage(topic, partition, nil, msg, offset)
326 i++
327 }
328 offset++
329 }
330 fb := res.GetBlock(topic, partition)
331 if fb == nil {
332 res.AddError(topic, partition, ErrNoError)
333 fb = res.GetBlock(topic, partition)
334 }
335 fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
336 }
337 }
338 return res
339}
340
341func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
342 partitions := mfr.messages[topic]
343 if partitions == nil {
344 return nil
345 }
346 messages := partitions[partition]
347 if messages == nil {
348 return nil
349 }
350 return messages[offset]
351}
352
353func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int {
354 partitions := mfr.messages[topic]
355 if partitions == nil {
356 return 0
357 }
358 messages := partitions[partition]
359 if messages == nil {
360 return 0
361 }
362 return len(messages)
363}
364
365func (mfr *MockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
366 partitions := mfr.highWaterMarks[topic]
367 if partitions == nil {
368 return 0
369 }
370 return partitions[partition]
371}
372
373// MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
374type MockConsumerMetadataResponse struct {
375 coordinators map[string]interface{}
376 t TestReporter
377}
378
379func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse {
380 return &MockConsumerMetadataResponse{
381 coordinators: make(map[string]interface{}),
382 t: t,
383 }
384}
385
386func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse {
387 mr.coordinators[group] = broker
388 return mr
389}
390
391func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse {
392 mr.coordinators[group] = kerror
393 return mr
394}
395
396func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder {
397 req := reqBody.(*ConsumerMetadataRequest)
398 group := req.ConsumerGroup
399 res := &ConsumerMetadataResponse{}
400 v := mr.coordinators[group]
401 switch v := v.(type) {
402 case *MockBroker:
403 res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
404 case KError:
405 res.Err = v
406 }
407 return res
408}
409
410// MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
411type MockFindCoordinatorResponse struct {
412 groupCoordinators map[string]interface{}
413 transCoordinators map[string]interface{}
414 t TestReporter
415}
416
417func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {
418 return &MockFindCoordinatorResponse{
419 groupCoordinators: make(map[string]interface{}),
420 transCoordinators: make(map[string]interface{}),
421 t: t,
422 }
423}
424
425func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {
426 switch coordinatorType {
427 case CoordinatorGroup:
428 mr.groupCoordinators[group] = broker
429 case CoordinatorTransaction:
430 mr.transCoordinators[group] = broker
431 }
432 return mr
433}
434
435func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {
436 switch coordinatorType {
437 case CoordinatorGroup:
438 mr.groupCoordinators[group] = kerror
439 case CoordinatorTransaction:
440 mr.transCoordinators[group] = kerror
441 }
442 return mr
443}
444
445func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoder {
446 req := reqBody.(*FindCoordinatorRequest)
447 res := &FindCoordinatorResponse{}
448 var v interface{}
449 switch req.CoordinatorType {
450 case CoordinatorGroup:
451 v = mr.groupCoordinators[req.CoordinatorKey]
452 case CoordinatorTransaction:
453 v = mr.transCoordinators[req.CoordinatorKey]
454 }
455 switch v := v.(type) {
456 case *MockBroker:
457 res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
458 case KError:
459 res.Err = v
460 }
461 return res
462}
463
464// MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
465type MockOffsetCommitResponse struct {
466 errors map[string]map[string]map[int32]KError
467 t TestReporter
468}
469
470func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse {
471 return &MockOffsetCommitResponse{t: t}
472}
473
474func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse {
475 if mr.errors == nil {
476 mr.errors = make(map[string]map[string]map[int32]KError)
477 }
478 topics := mr.errors[group]
479 if topics == nil {
480 topics = make(map[string]map[int32]KError)
481 mr.errors[group] = topics
482 }
483 partitions := topics[topic]
484 if partitions == nil {
485 partitions = make(map[int32]KError)
486 topics[topic] = partitions
487 }
488 partitions[partition] = kerror
489 return mr
490}
491
492func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoder {
493 req := reqBody.(*OffsetCommitRequest)
494 group := req.ConsumerGroup
495 res := &OffsetCommitResponse{}
496 for topic, partitions := range req.blocks {
497 for partition := range partitions {
498 res.AddError(topic, partition, mr.getError(group, topic, partition))
499 }
500 }
501 return res
502}
503
504func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
505 topics := mr.errors[group]
506 if topics == nil {
507 return ErrNoError
508 }
509 partitions := topics[topic]
510 if partitions == nil {
511 return ErrNoError
512 }
513 kerror, ok := partitions[partition]
514 if !ok {
515 return ErrNoError
516 }
517 return kerror
518}
519
520// MockProduceResponse is a `ProduceResponse` builder.
521type MockProduceResponse struct {
522 version int16
523 errors map[string]map[int32]KError
524 t TestReporter
525}
526
527func NewMockProduceResponse(t TestReporter) *MockProduceResponse {
528 return &MockProduceResponse{t: t}
529}
530
531func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {
532 mr.version = version
533 return mr
534}
535
536func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {
537 if mr.errors == nil {
538 mr.errors = make(map[string]map[int32]KError)
539 }
540 partitions := mr.errors[topic]
541 if partitions == nil {
542 partitions = make(map[int32]KError)
543 mr.errors[topic] = partitions
544 }
545 partitions[partition] = kerror
546 return mr
547}
548
549func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder {
550 req := reqBody.(*ProduceRequest)
551 res := &ProduceResponse{
552 Version: mr.version,
553 }
554 for topic, partitions := range req.records {
555 for partition := range partitions {
556 res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
557 }
558 }
559 return res
560}
561
562func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
563 partitions := mr.errors[topic]
564 if partitions == nil {
565 return ErrNoError
566 }
567 kerror, ok := partitions[partition]
568 if !ok {
569 return ErrNoError
570 }
571 return kerror
572}
573
574// MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
575type MockOffsetFetchResponse struct {
576 offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
577 error KError
578 t TestReporter
579}
580
581func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse {
582 return &MockOffsetFetchResponse{t: t}
583}
584
585func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse {
586 if mr.offsets == nil {
587 mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
588 }
589 topics := mr.offsets[group]
590 if topics == nil {
591 topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
592 mr.offsets[group] = topics
593 }
594 partitions := topics[topic]
595 if partitions == nil {
596 partitions = make(map[int32]*OffsetFetchResponseBlock)
597 topics[topic] = partitions
598 }
599 partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
600 return mr
601}
602
603func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse {
604 mr.error = kerror
605 return mr
606}
607
608func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
609 req := reqBody.(*OffsetFetchRequest)
610 group := req.ConsumerGroup
611 res := &OffsetFetchResponse{Version: req.Version}
612
613 for topic, partitions := range mr.offsets[group] {
614 for partition, block := range partitions {
615 res.AddBlock(topic, partition, block)
616 }
617 }
618
619 if res.Version >= 2 {
620 res.Err = mr.error
621 }
622 return res
623}
624
625type MockCreateTopicsResponse struct {
626 t TestReporter
627}
628
629func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {
630 return &MockCreateTopicsResponse{t: t}
631}
632
633func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder {
634 req := reqBody.(*CreateTopicsRequest)
635 res := &CreateTopicsResponse{
636 Version: req.Version,
637 }
638 res.TopicErrors = make(map[string]*TopicError)
639
640 for topic := range req.TopicDetails {
641 if res.Version >= 1 && strings.HasPrefix(topic, "_") {
642 msg := "insufficient permissions to create topic with reserved prefix"
643 res.TopicErrors[topic] = &TopicError{
644 Err: ErrTopicAuthorizationFailed,
645 ErrMsg: &msg,
646 }
647 continue
648 }
649 res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
650 }
651 return res
652}
653
654type MockDeleteTopicsResponse struct {
655 t TestReporter
656}
657
658func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse {
659 return &MockDeleteTopicsResponse{t: t}
660}
661
662func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoder {
663 req := reqBody.(*DeleteTopicsRequest)
664 res := &DeleteTopicsResponse{}
665 res.TopicErrorCodes = make(map[string]KError)
666
667 for _, topic := range req.Topics {
668 res.TopicErrorCodes[topic] = ErrNoError
669 }
670 res.Version = int16(req.Version)
671 return res
672}
673
674type MockCreatePartitionsResponse struct {
675 t TestReporter
676}
677
678func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse {
679 return &MockCreatePartitionsResponse{t: t}
680}
681
682func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder {
683 req := reqBody.(*CreatePartitionsRequest)
684 res := &CreatePartitionsResponse{}
685 res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
686
687 for topic := range req.TopicPartitions {
688 if strings.HasPrefix(topic, "_") {
689 msg := "insufficient permissions to create partition on topic with reserved prefix"
690 res.TopicPartitionErrors[topic] = &TopicPartitionError{
691 Err: ErrTopicAuthorizationFailed,
692 ErrMsg: &msg,
693 }
694 continue
695 }
696 res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
697 }
698 return res
699}
700
701type MockDeleteRecordsResponse struct {
702 t TestReporter
703}
704
705func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse {
706 return &MockDeleteRecordsResponse{t: t}
707}
708
709func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoder {
710 req := reqBody.(*DeleteRecordsRequest)
711 res := &DeleteRecordsResponse{}
712 res.Topics = make(map[string]*DeleteRecordsResponseTopic)
713
714 for topic, deleteRecordRequestTopic := range req.Topics {
715 partitions := make(map[int32]*DeleteRecordsResponsePartition)
716 for partition := range deleteRecordRequestTopic.PartitionOffsets {
717 partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError}
718 }
719 res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions}
720 }
721 return res
722}
723
724type MockDescribeConfigsResponse struct {
725 t TestReporter
726}
727
728func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse {
729 return &MockDescribeConfigsResponse{t: t}
730}
731
732func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
733 req := reqBody.(*DescribeConfigsRequest)
734 res := &DescribeConfigsResponse{
735 Version: req.Version,
736 }
737
738 includeSynonyms := (req.Version > 0)
739
740 for _, r := range req.Resources {
741 var configEntries []*ConfigEntry
742 switch r.Type {
743 case BrokerResource:
744 configEntries = append(configEntries,
745 &ConfigEntry{
746 Name: "min.insync.replicas",
747 Value: "2",
748 ReadOnly: false,
749 Default: false,
750 },
751 )
752 res.Resources = append(res.Resources, &ResourceResponse{
753 Name: r.Name,
754 Configs: configEntries,
755 })
756 case BrokerLoggerResource:
757 configEntries = append(configEntries,
758 &ConfigEntry{
759 Name: "kafka.controller.KafkaController",
760 Value: "DEBUG",
761 ReadOnly: false,
762 Default: false,
763 },
764 )
765 res.Resources = append(res.Resources, &ResourceResponse{
766 Name: r.Name,
767 Configs: configEntries,
768 })
769 case TopicResource:
770 maxMessageBytes := &ConfigEntry{Name: "max.message.bytes",
771 Value: "1000000",
772 ReadOnly: false,
773 Default: true,
774 Sensitive: false,
775 }
776 if includeSynonyms {
777 maxMessageBytes.Synonyms = []*ConfigSynonym{
778 {
779 ConfigName: "max.message.bytes",
780 ConfigValue: "500000",
781 },
782 }
783 }
784 retentionMs := &ConfigEntry{Name: "retention.ms",
785 Value: "5000",
786 ReadOnly: false,
787 Default: false,
788 Sensitive: false,
789 }
790 if includeSynonyms {
791 retentionMs.Synonyms = []*ConfigSynonym{
792 {
793 ConfigName: "log.retention.ms",
794 ConfigValue: "2500",
795 },
796 }
797 }
798 password := &ConfigEntry{Name: "password",
799 Value: "12345",
800 ReadOnly: false,
801 Default: false,
802 Sensitive: true,
803 }
804 configEntries = append(
805 configEntries, maxMessageBytes, retentionMs, password)
806 res.Resources = append(res.Resources, &ResourceResponse{
807 Name: r.Name,
808 Configs: configEntries,
809 })
810 }
811 }
812 return res
813}
814
815type MockAlterConfigsResponse struct {
816 t TestReporter
817}
818
819func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse {
820 return &MockAlterConfigsResponse{t: t}
821}
822
823func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder {
824 req := reqBody.(*AlterConfigsRequest)
825 res := &AlterConfigsResponse{}
826
827 for _, r := range req.Resources {
828 res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name,
829 Type: r.Type,
830 ErrorMsg: "",
831 })
832 }
833 return res
834}
835
836type MockCreateAclsResponse struct {
837 t TestReporter
838}
839
840func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse {
841 return &MockCreateAclsResponse{t: t}
842}
843
844func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoder {
845 req := reqBody.(*CreateAclsRequest)
846 res := &CreateAclsResponse{}
847
848 for range req.AclCreations {
849 res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError})
850 }
851 return res
852}
853
854type MockListAclsResponse struct {
855 t TestReporter
856}
857
858func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse {
859 return &MockListAclsResponse{t: t}
860}
861
862func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoder {
863 req := reqBody.(*DescribeAclsRequest)
864 res := &DescribeAclsResponse{}
865 res.Err = ErrNoError
866 acl := &ResourceAcls{}
867 if req.ResourceName != nil {
868 acl.Resource.ResourceName = *req.ResourceName
869 }
870 acl.Resource.ResourcePatternType = req.ResourcePatternTypeFilter
871 acl.Resource.ResourceType = req.ResourceType
872
873 host := "*"
874 if req.Host != nil {
875 host = *req.Host
876 }
877
878 principal := "User:test"
879 if req.Principal != nil {
880 principal = *req.Principal
881 }
882
883 permissionType := req.PermissionType
884 if permissionType == AclPermissionAny {
885 permissionType = AclPermissionAllow
886 }
887
888 acl.Acls = append(acl.Acls, &Acl{Operation: req.Operation, PermissionType: permissionType, Host: host, Principal: principal})
889 res.ResourceAcls = append(res.ResourceAcls, acl)
890 res.Version = int16(req.Version)
891 return res
892}
893
894type MockSaslAuthenticateResponse struct {
895 t TestReporter
896 kerror KError
897 saslAuthBytes []byte
898}
899
900func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse {
901 return &MockSaslAuthenticateResponse{t: t}
902}
903
904func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoder {
905 res := &SaslAuthenticateResponse{}
906 res.Err = msar.kerror
907 res.SaslAuthBytes = msar.saslAuthBytes
908 return res
909}
910
911func (msar *MockSaslAuthenticateResponse) SetError(kerror KError) *MockSaslAuthenticateResponse {
912 msar.kerror = kerror
913 return msar
914}
915
916func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse {
917 msar.saslAuthBytes = saslAuthBytes
918 return msar
919}
920
921type MockDeleteAclsResponse struct {
922 t TestReporter
923}
924
925type MockSaslHandshakeResponse struct {
926 enabledMechanisms []string
927 kerror KError
928 t TestReporter
929}
930
931func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse {
932 return &MockSaslHandshakeResponse{t: t}
933}
934
935func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoder {
936 res := &SaslHandshakeResponse{}
937 res.Err = mshr.kerror
938 res.EnabledMechanisms = mshr.enabledMechanisms
939 return res
940}
941
942func (mshr *MockSaslHandshakeResponse) SetError(kerror KError) *MockSaslHandshakeResponse {
943 mshr.kerror = kerror
944 return mshr
945}
946
947func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse {
948 mshr.enabledMechanisms = enabledMechanisms
949 return mshr
950}
951
952func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
953 return &MockDeleteAclsResponse{t: t}
954}
955
956func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder {
957 req := reqBody.(*DeleteAclsRequest)
958 res := &DeleteAclsResponse{}
959
960 for range req.Filters {
961 response := &FilterResponse{Err: ErrNoError}
962 response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError})
963 res.FilterResponses = append(res.FilterResponses, response)
964 }
965 res.Version = int16(req.Version)
966 return res
967}
968
969type MockDeleteGroupsResponse struct {
970 deletedGroups []string
971}
972
973func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse {
974 return &MockDeleteGroupsResponse{}
975}
976
977func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse {
978 m.deletedGroups = groups
979 return m
980}
981
982func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoder {
983 resp := &DeleteGroupsResponse{
984 GroupErrorCodes: map[string]KError{},
985 }
986 for _, group := range m.deletedGroups {
987 resp.GroupErrorCodes[group] = ErrNoError
988 }
989 return resp
990}