blob: 348c22311920e0abe81686940d803bff073eb6e2 [file] [log] [blame]
Scott Bakereee8dd82019-09-24 12:52:34 -07001package sarama
2
3import (
4 "fmt"
5)
6
7// TestReporter has methods matching go's testing.T to avoid importing
8// `testing` in the main part of the library.
9type TestReporter interface {
10 Error(...interface{})
11 Errorf(string, ...interface{})
12 Fatal(...interface{})
13 Fatalf(string, ...interface{})
14}
15
16// MockResponse is a response builder interface it defines one method that
17// allows generating a response based on a request body. MockResponses are used
18// to program behavior of MockBroker in tests.
19type MockResponse interface {
20 For(reqBody versionedDecoder) (res encoder)
21}
22
23// MockWrapper is a mock response builder that returns a particular concrete
24// response regardless of the actual request passed to the `For` method.
25type MockWrapper struct {
26 res encoder
27}
28
29func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoder) {
30 return mw.res
31}
32
33func NewMockWrapper(res encoder) *MockWrapper {
34 return &MockWrapper{res: res}
35}
36
37// MockSequence is a mock response builder that is created from a sequence of
38// concrete responses. Every time when a `MockBroker` calls its `For` method
39// the next response from the sequence is returned. When the end of the
40// sequence is reached the last element from the sequence is returned.
41type MockSequence struct {
42 responses []MockResponse
43}
44
45func NewMockSequence(responses ...interface{}) *MockSequence {
46 ms := &MockSequence{}
47 ms.responses = make([]MockResponse, len(responses))
48 for i, res := range responses {
49 switch res := res.(type) {
50 case MockResponse:
51 ms.responses[i] = res
52 case encoder:
53 ms.responses[i] = NewMockWrapper(res)
54 default:
55 panic(fmt.Sprintf("Unexpected response type: %T", res))
56 }
57 }
58 return ms
59}
60
61func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) {
62 res = mc.responses[0].For(reqBody)
63 if len(mc.responses) > 1 {
64 mc.responses = mc.responses[1:]
65 }
66 return res
67}
68
69type MockListGroupsResponse struct {
70 groups map[string]string
71 t TestReporter
72}
73
74func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse {
75 return &MockListGroupsResponse{
76 groups: make(map[string]string),
77 t: t,
78 }
79}
80
81func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoder {
82 request := reqBody.(*ListGroupsRequest)
83 _ = request
84 response := &ListGroupsResponse{
85 Groups: m.groups,
86 }
87 return response
88}
89
90func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse {
91 m.groups[groupID] = protocolType
92 return m
93}
94
95type MockDescribeGroupsResponse struct {
96 groups map[string]*GroupDescription
97 t TestReporter
98}
99
100func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse {
101 return &MockDescribeGroupsResponse{
102 t: t,
103 groups: make(map[string]*GroupDescription),
104 }
105}
106
107func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse {
108 m.groups[groupID] = description
109 return m
110}
111
112func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoder {
113 request := reqBody.(*DescribeGroupsRequest)
114
115 response := &DescribeGroupsResponse{}
116 for _, requestedGroup := range request.Groups {
117 if group, ok := m.groups[requestedGroup]; ok {
118 response.Groups = append(response.Groups, group)
119 } else {
120 // Mimic real kafka - if a group doesn't exist, return
121 // an entry with state "Dead"
122 response.Groups = append(response.Groups, &GroupDescription{
123 GroupId: requestedGroup,
124 State: "Dead",
125 })
126 }
127 }
128
129 return response
130}
131
132// MockMetadataResponse is a `MetadataResponse` builder.
133type MockMetadataResponse struct {
134 controllerID int32
135 leaders map[string]map[int32]int32
136 brokers map[string]int32
137 t TestReporter
138}
139
140func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
141 return &MockMetadataResponse{
142 leaders: make(map[string]map[int32]int32),
143 brokers: make(map[string]int32),
144 t: t,
145 }
146}
147
148func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse {
149 partitions := mmr.leaders[topic]
150 if partitions == nil {
151 partitions = make(map[int32]int32)
152 mmr.leaders[topic] = partitions
153 }
154 partitions[partition] = brokerID
155 return mmr
156}
157
158func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse {
159 mmr.brokers[addr] = brokerID
160 return mmr
161}
162
163func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {
164 mmr.controllerID = brokerID
165 return mmr
166}
167
168func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
169 metadataRequest := reqBody.(*MetadataRequest)
170 metadataResponse := &MetadataResponse{
171 Version: metadataRequest.version(),
172 ControllerID: mmr.controllerID,
173 }
174 for addr, brokerID := range mmr.brokers {
175 metadataResponse.AddBroker(addr, brokerID)
176 }
177
178 // Generate set of replicas
179 replicas := []int32{}
180
181 for _, brokerID := range mmr.brokers {
182 replicas = append(replicas, brokerID)
183 }
184
185 if len(metadataRequest.Topics) == 0 {
186 for topic, partitions := range mmr.leaders {
187 for partition, brokerID := range partitions {
188 metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, ErrNoError)
189 }
190 }
191 return metadataResponse
192 }
193 for _, topic := range metadataRequest.Topics {
194 for partition, brokerID := range mmr.leaders[topic] {
195 metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, ErrNoError)
196 }
197 }
198 return metadataResponse
199}
200
201// MockOffsetResponse is an `OffsetResponse` builder.
202type MockOffsetResponse struct {
203 offsets map[string]map[int32]map[int64]int64
204 t TestReporter
205 version int16
206}
207
208func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
209 return &MockOffsetResponse{
210 offsets: make(map[string]map[int32]map[int64]int64),
211 t: t,
212 }
213}
214
215func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse {
216 mor.version = version
217 return mor
218}
219
220func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
221 partitions := mor.offsets[topic]
222 if partitions == nil {
223 partitions = make(map[int32]map[int64]int64)
224 mor.offsets[topic] = partitions
225 }
226 times := partitions[partition]
227 if times == nil {
228 times = make(map[int64]int64)
229 partitions[partition] = times
230 }
231 times[time] = offset
232 return mor
233}
234
235func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder {
236 offsetRequest := reqBody.(*OffsetRequest)
237 offsetResponse := &OffsetResponse{Version: mor.version}
238 for topic, partitions := range offsetRequest.blocks {
239 for partition, block := range partitions {
240 offset := mor.getOffset(topic, partition, block.time)
241 offsetResponse.AddTopicPartition(topic, partition, offset)
242 }
243 }
244 return offsetResponse
245}
246
247func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
248 partitions := mor.offsets[topic]
249 if partitions == nil {
250 mor.t.Errorf("missing topic: %s", topic)
251 }
252 times := partitions[partition]
253 if times == nil {
254 mor.t.Errorf("missing partition: %d", partition)
255 }
256 offset, ok := times[time]
257 if !ok {
258 mor.t.Errorf("missing time: %d", time)
259 }
260 return offset
261}
262
263// MockFetchResponse is a `FetchResponse` builder.
264type MockFetchResponse struct {
265 messages map[string]map[int32]map[int64]Encoder
266 highWaterMarks map[string]map[int32]int64
267 t TestReporter
268 batchSize int
269 version int16
270}
271
272func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
273 return &MockFetchResponse{
274 messages: make(map[string]map[int32]map[int64]Encoder),
275 highWaterMarks: make(map[string]map[int32]int64),
276 t: t,
277 batchSize: batchSize,
278 }
279}
280
281func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse {
282 mfr.version = version
283 return mfr
284}
285
286func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
287 partitions := mfr.messages[topic]
288 if partitions == nil {
289 partitions = make(map[int32]map[int64]Encoder)
290 mfr.messages[topic] = partitions
291 }
292 messages := partitions[partition]
293 if messages == nil {
294 messages = make(map[int64]Encoder)
295 partitions[partition] = messages
296 }
297 messages[offset] = msg
298 return mfr
299}
300
301func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse {
302 partitions := mfr.highWaterMarks[topic]
303 if partitions == nil {
304 partitions = make(map[int32]int64)
305 mfr.highWaterMarks[topic] = partitions
306 }
307 partitions[partition] = offset
308 return mfr
309}
310
311func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder {
312 fetchRequest := reqBody.(*FetchRequest)
313 res := &FetchResponse{
314 Version: mfr.version,
315 }
316 for topic, partitions := range fetchRequest.blocks {
317 for partition, block := range partitions {
318 initialOffset := block.fetchOffset
319 offset := initialOffset
320 maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
321 for i := 0; i < mfr.batchSize && offset < maxOffset; {
322 msg := mfr.getMessage(topic, partition, offset)
323 if msg != nil {
324 res.AddMessage(topic, partition, nil, msg, offset)
325 i++
326 }
327 offset++
328 }
329 fb := res.GetBlock(topic, partition)
330 if fb == nil {
331 res.AddError(topic, partition, ErrNoError)
332 fb = res.GetBlock(topic, partition)
333 }
334 fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
335 }
336 }
337 return res
338}
339
340func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
341 partitions := mfr.messages[topic]
342 if partitions == nil {
343 return nil
344 }
345 messages := partitions[partition]
346 if messages == nil {
347 return nil
348 }
349 return messages[offset]
350}
351
352func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int {
353 partitions := mfr.messages[topic]
354 if partitions == nil {
355 return 0
356 }
357 messages := partitions[partition]
358 if messages == nil {
359 return 0
360 }
361 return len(messages)
362}
363
364func (mfr *MockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
365 partitions := mfr.highWaterMarks[topic]
366 if partitions == nil {
367 return 0
368 }
369 return partitions[partition]
370}
371
372// MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
373type MockConsumerMetadataResponse struct {
374 coordinators map[string]interface{}
375 t TestReporter
376}
377
378func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse {
379 return &MockConsumerMetadataResponse{
380 coordinators: make(map[string]interface{}),
381 t: t,
382 }
383}
384
385func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse {
386 mr.coordinators[group] = broker
387 return mr
388}
389
390func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse {
391 mr.coordinators[group] = kerror
392 return mr
393}
394
395func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder {
396 req := reqBody.(*ConsumerMetadataRequest)
397 group := req.ConsumerGroup
398 res := &ConsumerMetadataResponse{}
399 v := mr.coordinators[group]
400 switch v := v.(type) {
401 case *MockBroker:
402 res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
403 case KError:
404 res.Err = v
405 }
406 return res
407}
408
409// MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
410type MockFindCoordinatorResponse struct {
411 groupCoordinators map[string]interface{}
412 transCoordinators map[string]interface{}
413 t TestReporter
414}
415
416func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {
417 return &MockFindCoordinatorResponse{
418 groupCoordinators: make(map[string]interface{}),
419 transCoordinators: make(map[string]interface{}),
420 t: t,
421 }
422}
423
424func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {
425 switch coordinatorType {
426 case CoordinatorGroup:
427 mr.groupCoordinators[group] = broker
428 case CoordinatorTransaction:
429 mr.transCoordinators[group] = broker
430 }
431 return mr
432}
433
434func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {
435 switch coordinatorType {
436 case CoordinatorGroup:
437 mr.groupCoordinators[group] = kerror
438 case CoordinatorTransaction:
439 mr.transCoordinators[group] = kerror
440 }
441 return mr
442}
443
444func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoder {
445 req := reqBody.(*FindCoordinatorRequest)
446 res := &FindCoordinatorResponse{}
447 var v interface{}
448 switch req.CoordinatorType {
449 case CoordinatorGroup:
450 v = mr.groupCoordinators[req.CoordinatorKey]
451 case CoordinatorTransaction:
452 v = mr.transCoordinators[req.CoordinatorKey]
453 }
454 switch v := v.(type) {
455 case *MockBroker:
456 res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
457 case KError:
458 res.Err = v
459 }
460 return res
461}
462
463// MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
464type MockOffsetCommitResponse struct {
465 errors map[string]map[string]map[int32]KError
466 t TestReporter
467}
468
469func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse {
470 return &MockOffsetCommitResponse{t: t}
471}
472
473func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse {
474 if mr.errors == nil {
475 mr.errors = make(map[string]map[string]map[int32]KError)
476 }
477 topics := mr.errors[group]
478 if topics == nil {
479 topics = make(map[string]map[int32]KError)
480 mr.errors[group] = topics
481 }
482 partitions := topics[topic]
483 if partitions == nil {
484 partitions = make(map[int32]KError)
485 topics[topic] = partitions
486 }
487 partitions[partition] = kerror
488 return mr
489}
490
491func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoder {
492 req := reqBody.(*OffsetCommitRequest)
493 group := req.ConsumerGroup
494 res := &OffsetCommitResponse{}
495 for topic, partitions := range req.blocks {
496 for partition := range partitions {
497 res.AddError(topic, partition, mr.getError(group, topic, partition))
498 }
499 }
500 return res
501}
502
503func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
504 topics := mr.errors[group]
505 if topics == nil {
506 return ErrNoError
507 }
508 partitions := topics[topic]
509 if partitions == nil {
510 return ErrNoError
511 }
512 kerror, ok := partitions[partition]
513 if !ok {
514 return ErrNoError
515 }
516 return kerror
517}
518
519// MockProduceResponse is a `ProduceResponse` builder.
520type MockProduceResponse struct {
521 version int16
522 errors map[string]map[int32]KError
523 t TestReporter
524}
525
526func NewMockProduceResponse(t TestReporter) *MockProduceResponse {
527 return &MockProduceResponse{t: t}
528}
529
530func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {
531 mr.version = version
532 return mr
533}
534
535func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {
536 if mr.errors == nil {
537 mr.errors = make(map[string]map[int32]KError)
538 }
539 partitions := mr.errors[topic]
540 if partitions == nil {
541 partitions = make(map[int32]KError)
542 mr.errors[topic] = partitions
543 }
544 partitions[partition] = kerror
545 return mr
546}
547
548func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder {
549 req := reqBody.(*ProduceRequest)
550 res := &ProduceResponse{
551 Version: mr.version,
552 }
553 for topic, partitions := range req.records {
554 for partition := range partitions {
555 res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
556 }
557 }
558 return res
559}
560
561func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
562 partitions := mr.errors[topic]
563 if partitions == nil {
564 return ErrNoError
565 }
566 kerror, ok := partitions[partition]
567 if !ok {
568 return ErrNoError
569 }
570 return kerror
571}
572
573// MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
574type MockOffsetFetchResponse struct {
575 offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
576 t TestReporter
577}
578
579func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse {
580 return &MockOffsetFetchResponse{t: t}
581}
582
583func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse {
584 if mr.offsets == nil {
585 mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
586 }
587 topics := mr.offsets[group]
588 if topics == nil {
589 topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
590 mr.offsets[group] = topics
591 }
592 partitions := topics[topic]
593 if partitions == nil {
594 partitions = make(map[int32]*OffsetFetchResponseBlock)
595 topics[topic] = partitions
596 }
597 partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
598 return mr
599}
600
601func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
602 req := reqBody.(*OffsetFetchRequest)
603 group := req.ConsumerGroup
604 res := &OffsetFetchResponse{}
605 for topic, partitions := range mr.offsets[group] {
606 for partition, block := range partitions {
607 res.AddBlock(topic, partition, block)
608 }
609 }
610 return res
611}
612
613type MockCreateTopicsResponse struct {
614 t TestReporter
615}
616
617func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {
618 return &MockCreateTopicsResponse{t: t}
619}
620
621func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder {
622 req := reqBody.(*CreateTopicsRequest)
623 res := &CreateTopicsResponse{}
624 res.TopicErrors = make(map[string]*TopicError)
625
626 for topic, _ := range req.TopicDetails {
627 res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
628 }
629 return res
630}
631
632type MockDeleteTopicsResponse struct {
633 t TestReporter
634}
635
636func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse {
637 return &MockDeleteTopicsResponse{t: t}
638}
639
640func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoder {
641 req := reqBody.(*DeleteTopicsRequest)
642 res := &DeleteTopicsResponse{}
643 res.TopicErrorCodes = make(map[string]KError)
644
645 for _, topic := range req.Topics {
646 res.TopicErrorCodes[topic] = ErrNoError
647 }
648 return res
649}
650
651type MockCreatePartitionsResponse struct {
652 t TestReporter
653}
654
655func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse {
656 return &MockCreatePartitionsResponse{t: t}
657}
658
659func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder {
660 req := reqBody.(*CreatePartitionsRequest)
661 res := &CreatePartitionsResponse{}
662 res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
663
664 for topic, _ := range req.TopicPartitions {
665 res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
666 }
667 return res
668}
669
670type MockDeleteRecordsResponse struct {
671 t TestReporter
672}
673
674func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse {
675 return &MockDeleteRecordsResponse{t: t}
676}
677
678func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoder {
679 req := reqBody.(*DeleteRecordsRequest)
680 res := &DeleteRecordsResponse{}
681 res.Topics = make(map[string]*DeleteRecordsResponseTopic)
682
683 for topic, deleteRecordRequestTopic := range req.Topics {
684 partitions := make(map[int32]*DeleteRecordsResponsePartition)
685 for partition, _ := range deleteRecordRequestTopic.PartitionOffsets {
686 partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError}
687 }
688 res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions}
689 }
690 return res
691}
692
693type MockDescribeConfigsResponse struct {
694 t TestReporter
695}
696
697func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse {
698 return &MockDescribeConfigsResponse{t: t}
699}
700
701func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
702 req := reqBody.(*DescribeConfigsRequest)
703 res := &DescribeConfigsResponse{}
704
705 for _, r := range req.Resources {
706 var configEntries []*ConfigEntry
707 switch r.Type {
708 case TopicResource:
709 configEntries = append(configEntries,
710 &ConfigEntry{Name: "max.message.bytes",
711 Value: "1000000",
712 ReadOnly: false,
713 Default: true,
714 Sensitive: false,
715 }, &ConfigEntry{Name: "retention.ms",
716 Value: "5000",
717 ReadOnly: false,
718 Default: false,
719 Sensitive: false,
720 }, &ConfigEntry{Name: "password",
721 Value: "12345",
722 ReadOnly: false,
723 Default: false,
724 Sensitive: true,
725 })
726 res.Resources = append(res.Resources, &ResourceResponse{
727 Name: r.Name,
728 Configs: configEntries,
729 })
730 }
731 }
732 return res
733}
734
735type MockAlterConfigsResponse struct {
736 t TestReporter
737}
738
739func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse {
740 return &MockAlterConfigsResponse{t: t}
741}
742
743func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder {
744 req := reqBody.(*AlterConfigsRequest)
745 res := &AlterConfigsResponse{}
746
747 for _, r := range req.Resources {
748 res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name,
749 Type: TopicResource,
750 ErrorMsg: "",
751 })
752 }
753 return res
754}
755
756type MockCreateAclsResponse struct {
757 t TestReporter
758}
759
760func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse {
761 return &MockCreateAclsResponse{t: t}
762}
763
764func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoder {
765 req := reqBody.(*CreateAclsRequest)
766 res := &CreateAclsResponse{}
767
768 for range req.AclCreations {
769 res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError})
770 }
771 return res
772}
773
774type MockListAclsResponse struct {
775 t TestReporter
776}
777
778func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse {
779 return &MockListAclsResponse{t: t}
780}
781
782func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoder {
783 req := reqBody.(*DescribeAclsRequest)
784 res := &DescribeAclsResponse{}
785
786 res.Err = ErrNoError
787 acl := &ResourceAcls{}
788 acl.Resource.ResourceName = *req.ResourceName
789 acl.Resource.ResourceType = req.ResourceType
790 acl.Acls = append(acl.Acls, &Acl{})
791 res.ResourceAcls = append(res.ResourceAcls, acl)
792
793 return res
794}
795
796type MockSaslAuthenticateResponse struct {
797 t TestReporter
798 kerror KError
799 saslAuthBytes []byte
800}
801
802func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse {
803 return &MockSaslAuthenticateResponse{t: t}
804}
805
806func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoder {
807 res := &SaslAuthenticateResponse{}
808 res.Err = msar.kerror
809 res.SaslAuthBytes = msar.saslAuthBytes
810 return res
811}
812
813func (msar *MockSaslAuthenticateResponse) SetError(kerror KError) *MockSaslAuthenticateResponse {
814 msar.kerror = kerror
815 return msar
816}
817
818func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse {
819 msar.saslAuthBytes = saslAuthBytes
820 return msar
821}
822
823type MockDeleteAclsResponse struct {
824 t TestReporter
825}
826
827type MockSaslHandshakeResponse struct {
828 enabledMechanisms []string
829 kerror KError
830 t TestReporter
831}
832
833func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse {
834 return &MockSaslHandshakeResponse{t: t}
835}
836
837func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoder {
838 res := &SaslHandshakeResponse{}
839 res.Err = mshr.kerror
840 res.EnabledMechanisms = mshr.enabledMechanisms
841 return res
842}
843
844func (mshr *MockSaslHandshakeResponse) SetError(kerror KError) *MockSaslHandshakeResponse {
845 mshr.kerror = kerror
846 return mshr
847}
848
849func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse {
850 mshr.enabledMechanisms = enabledMechanisms
851 return mshr
852}
853
854func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
855 return &MockDeleteAclsResponse{t: t}
856}
857
858func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder {
859 req := reqBody.(*DeleteAclsRequest)
860 res := &DeleteAclsResponse{}
861
862 for range req.Filters {
863 response := &FilterResponse{Err: ErrNoError}
864 response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError})
865 res.FilterResponses = append(res.FilterResponses, response)
866 }
867 return res
868}