blob: c78f0acc0e7184e3a4e6cc4c1e55481a2a701799 [file] [log] [blame]
Scott Bakere7144bc2019-10-01 14:16:47 -07001package sarama
2
3import (
4 "fmt"
Scott Bakerf579f132019-10-24 14:31:41 -07005 "strings"
Scott Bakere7144bc2019-10-01 14:16:47 -07006)
7
8// TestReporter has methods matching go's testing.T to avoid importing
9// `testing` in the main part of the library.
10type TestReporter interface {
11 Error(...interface{})
12 Errorf(string, ...interface{})
13 Fatal(...interface{})
14 Fatalf(string, ...interface{})
15}
16
17// MockResponse is a response builder interface it defines one method that
18// allows generating a response based on a request body. MockResponses are used
19// to program behavior of MockBroker in tests.
20type MockResponse interface {
21 For(reqBody versionedDecoder) (res encoder)
22}
23
24// MockWrapper is a mock response builder that returns a particular concrete
25// response regardless of the actual request passed to the `For` method.
26type MockWrapper struct {
27 res encoder
28}
29
30func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoder) {
31 return mw.res
32}
33
34func NewMockWrapper(res encoder) *MockWrapper {
35 return &MockWrapper{res: res}
36}
37
38// MockSequence is a mock response builder that is created from a sequence of
39// concrete responses. Every time when a `MockBroker` calls its `For` method
40// the next response from the sequence is returned. When the end of the
41// sequence is reached the last element from the sequence is returned.
42type MockSequence struct {
43 responses []MockResponse
44}
45
46func NewMockSequence(responses ...interface{}) *MockSequence {
47 ms := &MockSequence{}
48 ms.responses = make([]MockResponse, len(responses))
49 for i, res := range responses {
50 switch res := res.(type) {
51 case MockResponse:
52 ms.responses[i] = res
53 case encoder:
54 ms.responses[i] = NewMockWrapper(res)
55 default:
56 panic(fmt.Sprintf("Unexpected response type: %T", res))
57 }
58 }
59 return ms
60}
61
62func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) {
63 res = mc.responses[0].For(reqBody)
64 if len(mc.responses) > 1 {
65 mc.responses = mc.responses[1:]
66 }
67 return res
68}
69
70type MockListGroupsResponse struct {
71 groups map[string]string
72 t TestReporter
73}
74
75func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse {
76 return &MockListGroupsResponse{
77 groups: make(map[string]string),
78 t: t,
79 }
80}
81
82func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoder {
83 request := reqBody.(*ListGroupsRequest)
84 _ = request
85 response := &ListGroupsResponse{
86 Groups: m.groups,
87 }
88 return response
89}
90
91func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse {
92 m.groups[groupID] = protocolType
93 return m
94}
95
96type MockDescribeGroupsResponse struct {
97 groups map[string]*GroupDescription
98 t TestReporter
99}
100
101func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse {
102 return &MockDescribeGroupsResponse{
103 t: t,
104 groups: make(map[string]*GroupDescription),
105 }
106}
107
108func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse {
109 m.groups[groupID] = description
110 return m
111}
112
113func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoder {
114 request := reqBody.(*DescribeGroupsRequest)
115
116 response := &DescribeGroupsResponse{}
117 for _, requestedGroup := range request.Groups {
118 if group, ok := m.groups[requestedGroup]; ok {
119 response.Groups = append(response.Groups, group)
120 } else {
121 // Mimic real kafka - if a group doesn't exist, return
122 // an entry with state "Dead"
123 response.Groups = append(response.Groups, &GroupDescription{
124 GroupId: requestedGroup,
125 State: "Dead",
126 })
127 }
128 }
129
130 return response
131}
132
133// MockMetadataResponse is a `MetadataResponse` builder.
134type MockMetadataResponse struct {
135 controllerID int32
136 leaders map[string]map[int32]int32
137 brokers map[string]int32
138 t TestReporter
139}
140
141func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
142 return &MockMetadataResponse{
143 leaders: make(map[string]map[int32]int32),
144 brokers: make(map[string]int32),
145 t: t,
146 }
147}
148
149func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse {
150 partitions := mmr.leaders[topic]
151 if partitions == nil {
152 partitions = make(map[int32]int32)
153 mmr.leaders[topic] = partitions
154 }
155 partitions[partition] = brokerID
156 return mmr
157}
158
159func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse {
160 mmr.brokers[addr] = brokerID
161 return mmr
162}
163
164func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {
165 mmr.controllerID = brokerID
166 return mmr
167}
168
169func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
170 metadataRequest := reqBody.(*MetadataRequest)
171 metadataResponse := &MetadataResponse{
172 Version: metadataRequest.version(),
173 ControllerID: mmr.controllerID,
174 }
175 for addr, brokerID := range mmr.brokers {
176 metadataResponse.AddBroker(addr, brokerID)
177 }
178
179 // Generate set of replicas
180 replicas := []int32{}
Scott Bakerf579f132019-10-24 14:31:41 -0700181 offlineReplicas := []int32{}
Scott Bakere7144bc2019-10-01 14:16:47 -0700182 for _, brokerID := range mmr.brokers {
183 replicas = append(replicas, brokerID)
184 }
185
186 if len(metadataRequest.Topics) == 0 {
187 for topic, partitions := range mmr.leaders {
188 for partition, brokerID := range partitions {
Scott Bakerf579f132019-10-24 14:31:41 -0700189 metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
Scott Bakere7144bc2019-10-01 14:16:47 -0700190 }
191 }
192 return metadataResponse
193 }
194 for _, topic := range metadataRequest.Topics {
195 for partition, brokerID := range mmr.leaders[topic] {
Scott Bakerf579f132019-10-24 14:31:41 -0700196 metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
Scott Bakere7144bc2019-10-01 14:16:47 -0700197 }
198 }
199 return metadataResponse
200}
201
202// MockOffsetResponse is an `OffsetResponse` builder.
203type MockOffsetResponse struct {
204 offsets map[string]map[int32]map[int64]int64
205 t TestReporter
206 version int16
207}
208
209func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
210 return &MockOffsetResponse{
211 offsets: make(map[string]map[int32]map[int64]int64),
212 t: t,
213 }
214}
215
216func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse {
217 mor.version = version
218 return mor
219}
220
221func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
222 partitions := mor.offsets[topic]
223 if partitions == nil {
224 partitions = make(map[int32]map[int64]int64)
225 mor.offsets[topic] = partitions
226 }
227 times := partitions[partition]
228 if times == nil {
229 times = make(map[int64]int64)
230 partitions[partition] = times
231 }
232 times[time] = offset
233 return mor
234}
235
236func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder {
237 offsetRequest := reqBody.(*OffsetRequest)
238 offsetResponse := &OffsetResponse{Version: mor.version}
239 for topic, partitions := range offsetRequest.blocks {
240 for partition, block := range partitions {
241 offset := mor.getOffset(topic, partition, block.time)
242 offsetResponse.AddTopicPartition(topic, partition, offset)
243 }
244 }
245 return offsetResponse
246}
247
248func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
249 partitions := mor.offsets[topic]
250 if partitions == nil {
251 mor.t.Errorf("missing topic: %s", topic)
252 }
253 times := partitions[partition]
254 if times == nil {
255 mor.t.Errorf("missing partition: %d", partition)
256 }
257 offset, ok := times[time]
258 if !ok {
259 mor.t.Errorf("missing time: %d", time)
260 }
261 return offset
262}
263
264// MockFetchResponse is a `FetchResponse` builder.
265type MockFetchResponse struct {
266 messages map[string]map[int32]map[int64]Encoder
267 highWaterMarks map[string]map[int32]int64
268 t TestReporter
269 batchSize int
270 version int16
271}
272
273func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
274 return &MockFetchResponse{
275 messages: make(map[string]map[int32]map[int64]Encoder),
276 highWaterMarks: make(map[string]map[int32]int64),
277 t: t,
278 batchSize: batchSize,
279 }
280}
281
282func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse {
283 mfr.version = version
284 return mfr
285}
286
287func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
288 partitions := mfr.messages[topic]
289 if partitions == nil {
290 partitions = make(map[int32]map[int64]Encoder)
291 mfr.messages[topic] = partitions
292 }
293 messages := partitions[partition]
294 if messages == nil {
295 messages = make(map[int64]Encoder)
296 partitions[partition] = messages
297 }
298 messages[offset] = msg
299 return mfr
300}
301
302func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse {
303 partitions := mfr.highWaterMarks[topic]
304 if partitions == nil {
305 partitions = make(map[int32]int64)
306 mfr.highWaterMarks[topic] = partitions
307 }
308 partitions[partition] = offset
309 return mfr
310}
311
312func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder {
313 fetchRequest := reqBody.(*FetchRequest)
314 res := &FetchResponse{
315 Version: mfr.version,
316 }
317 for topic, partitions := range fetchRequest.blocks {
318 for partition, block := range partitions {
319 initialOffset := block.fetchOffset
320 offset := initialOffset
321 maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
322 for i := 0; i < mfr.batchSize && offset < maxOffset; {
323 msg := mfr.getMessage(topic, partition, offset)
324 if msg != nil {
325 res.AddMessage(topic, partition, nil, msg, offset)
326 i++
327 }
328 offset++
329 }
330 fb := res.GetBlock(topic, partition)
331 if fb == nil {
332 res.AddError(topic, partition, ErrNoError)
333 fb = res.GetBlock(topic, partition)
334 }
335 fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
336 }
337 }
338 return res
339}
340
341func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
342 partitions := mfr.messages[topic]
343 if partitions == nil {
344 return nil
345 }
346 messages := partitions[partition]
347 if messages == nil {
348 return nil
349 }
350 return messages[offset]
351}
352
353func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int {
354 partitions := mfr.messages[topic]
355 if partitions == nil {
356 return 0
357 }
358 messages := partitions[partition]
359 if messages == nil {
360 return 0
361 }
362 return len(messages)
363}
364
365func (mfr *MockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
366 partitions := mfr.highWaterMarks[topic]
367 if partitions == nil {
368 return 0
369 }
370 return partitions[partition]
371}
372
373// MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
374type MockConsumerMetadataResponse struct {
375 coordinators map[string]interface{}
376 t TestReporter
377}
378
379func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse {
380 return &MockConsumerMetadataResponse{
381 coordinators: make(map[string]interface{}),
382 t: t,
383 }
384}
385
386func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse {
387 mr.coordinators[group] = broker
388 return mr
389}
390
391func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse {
392 mr.coordinators[group] = kerror
393 return mr
394}
395
396func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder {
397 req := reqBody.(*ConsumerMetadataRequest)
398 group := req.ConsumerGroup
399 res := &ConsumerMetadataResponse{}
400 v := mr.coordinators[group]
401 switch v := v.(type) {
402 case *MockBroker:
403 res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
404 case KError:
405 res.Err = v
406 }
407 return res
408}
409
410// MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
411type MockFindCoordinatorResponse struct {
412 groupCoordinators map[string]interface{}
413 transCoordinators map[string]interface{}
414 t TestReporter
415}
416
417func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {
418 return &MockFindCoordinatorResponse{
419 groupCoordinators: make(map[string]interface{}),
420 transCoordinators: make(map[string]interface{}),
421 t: t,
422 }
423}
424
425func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {
426 switch coordinatorType {
427 case CoordinatorGroup:
428 mr.groupCoordinators[group] = broker
429 case CoordinatorTransaction:
430 mr.transCoordinators[group] = broker
431 }
432 return mr
433}
434
435func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {
436 switch coordinatorType {
437 case CoordinatorGroup:
438 mr.groupCoordinators[group] = kerror
439 case CoordinatorTransaction:
440 mr.transCoordinators[group] = kerror
441 }
442 return mr
443}
444
445func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoder {
446 req := reqBody.(*FindCoordinatorRequest)
447 res := &FindCoordinatorResponse{}
448 var v interface{}
449 switch req.CoordinatorType {
450 case CoordinatorGroup:
451 v = mr.groupCoordinators[req.CoordinatorKey]
452 case CoordinatorTransaction:
453 v = mr.transCoordinators[req.CoordinatorKey]
454 }
455 switch v := v.(type) {
456 case *MockBroker:
457 res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
458 case KError:
459 res.Err = v
460 }
461 return res
462}
463
464// MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
465type MockOffsetCommitResponse struct {
466 errors map[string]map[string]map[int32]KError
467 t TestReporter
468}
469
470func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse {
471 return &MockOffsetCommitResponse{t: t}
472}
473
474func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse {
475 if mr.errors == nil {
476 mr.errors = make(map[string]map[string]map[int32]KError)
477 }
478 topics := mr.errors[group]
479 if topics == nil {
480 topics = make(map[string]map[int32]KError)
481 mr.errors[group] = topics
482 }
483 partitions := topics[topic]
484 if partitions == nil {
485 partitions = make(map[int32]KError)
486 topics[topic] = partitions
487 }
488 partitions[partition] = kerror
489 return mr
490}
491
492func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoder {
493 req := reqBody.(*OffsetCommitRequest)
494 group := req.ConsumerGroup
495 res := &OffsetCommitResponse{}
496 for topic, partitions := range req.blocks {
497 for partition := range partitions {
498 res.AddError(topic, partition, mr.getError(group, topic, partition))
499 }
500 }
501 return res
502}
503
504func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
505 topics := mr.errors[group]
506 if topics == nil {
507 return ErrNoError
508 }
509 partitions := topics[topic]
510 if partitions == nil {
511 return ErrNoError
512 }
513 kerror, ok := partitions[partition]
514 if !ok {
515 return ErrNoError
516 }
517 return kerror
518}
519
520// MockProduceResponse is a `ProduceResponse` builder.
521type MockProduceResponse struct {
522 version int16
523 errors map[string]map[int32]KError
524 t TestReporter
525}
526
527func NewMockProduceResponse(t TestReporter) *MockProduceResponse {
528 return &MockProduceResponse{t: t}
529}
530
531func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {
532 mr.version = version
533 return mr
534}
535
536func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {
537 if mr.errors == nil {
538 mr.errors = make(map[string]map[int32]KError)
539 }
540 partitions := mr.errors[topic]
541 if partitions == nil {
542 partitions = make(map[int32]KError)
543 mr.errors[topic] = partitions
544 }
545 partitions[partition] = kerror
546 return mr
547}
548
549func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder {
550 req := reqBody.(*ProduceRequest)
551 res := &ProduceResponse{
552 Version: mr.version,
553 }
554 for topic, partitions := range req.records {
555 for partition := range partitions {
556 res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
557 }
558 }
559 return res
560}
561
562func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
563 partitions := mr.errors[topic]
564 if partitions == nil {
565 return ErrNoError
566 }
567 kerror, ok := partitions[partition]
568 if !ok {
569 return ErrNoError
570 }
571 return kerror
572}
573
574// MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
575type MockOffsetFetchResponse struct {
576 offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
Scott Bakerf579f132019-10-24 14:31:41 -0700577 error KError
Scott Bakere7144bc2019-10-01 14:16:47 -0700578 t TestReporter
579}
580
581func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse {
582 return &MockOffsetFetchResponse{t: t}
583}
584
585func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse {
586 if mr.offsets == nil {
587 mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
588 }
589 topics := mr.offsets[group]
590 if topics == nil {
591 topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
592 mr.offsets[group] = topics
593 }
594 partitions := topics[topic]
595 if partitions == nil {
596 partitions = make(map[int32]*OffsetFetchResponseBlock)
597 topics[topic] = partitions
598 }
599 partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
600 return mr
601}
602
Scott Bakerf579f132019-10-24 14:31:41 -0700603func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse {
604 mr.error = kerror
605 return mr
606}
607
Scott Bakere7144bc2019-10-01 14:16:47 -0700608func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
609 req := reqBody.(*OffsetFetchRequest)
610 group := req.ConsumerGroup
Scott Bakerf579f132019-10-24 14:31:41 -0700611 res := &OffsetFetchResponse{Version: req.Version}
612
Scott Bakere7144bc2019-10-01 14:16:47 -0700613 for topic, partitions := range mr.offsets[group] {
614 for partition, block := range partitions {
615 res.AddBlock(topic, partition, block)
616 }
617 }
Scott Bakerf579f132019-10-24 14:31:41 -0700618
619 if res.Version >= 2 {
620 res.Err = mr.error
621 }
Scott Bakere7144bc2019-10-01 14:16:47 -0700622 return res
623}
624
625type MockCreateTopicsResponse struct {
626 t TestReporter
627}
628
629func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {
630 return &MockCreateTopicsResponse{t: t}
631}
632
633func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder {
634 req := reqBody.(*CreateTopicsRequest)
Scott Bakerf579f132019-10-24 14:31:41 -0700635 res := &CreateTopicsResponse{
636 Version: req.Version,
637 }
Scott Bakere7144bc2019-10-01 14:16:47 -0700638 res.TopicErrors = make(map[string]*TopicError)
639
Scott Bakerf579f132019-10-24 14:31:41 -0700640 for topic := range req.TopicDetails {
641 if res.Version >= 1 && strings.HasPrefix(topic, "_") {
642 msg := "insufficient permissions to create topic with reserved prefix"
643 res.TopicErrors[topic] = &TopicError{
644 Err: ErrTopicAuthorizationFailed,
645 ErrMsg: &msg,
646 }
647 continue
648 }
Scott Bakere7144bc2019-10-01 14:16:47 -0700649 res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
650 }
651 return res
652}
653
654type MockDeleteTopicsResponse struct {
655 t TestReporter
656}
657
658func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse {
659 return &MockDeleteTopicsResponse{t: t}
660}
661
662func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoder {
663 req := reqBody.(*DeleteTopicsRequest)
664 res := &DeleteTopicsResponse{}
665 res.TopicErrorCodes = make(map[string]KError)
666
667 for _, topic := range req.Topics {
668 res.TopicErrorCodes[topic] = ErrNoError
669 }
670 return res
671}
672
673type MockCreatePartitionsResponse struct {
674 t TestReporter
675}
676
677func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse {
678 return &MockCreatePartitionsResponse{t: t}
679}
680
681func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder {
682 req := reqBody.(*CreatePartitionsRequest)
683 res := &CreatePartitionsResponse{}
684 res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
685
Scott Bakerf579f132019-10-24 14:31:41 -0700686 for topic := range req.TopicPartitions {
687 if strings.HasPrefix(topic, "_") {
688 msg := "insufficient permissions to create partition on topic with reserved prefix"
689 res.TopicPartitionErrors[topic] = &TopicPartitionError{
690 Err: ErrTopicAuthorizationFailed,
691 ErrMsg: &msg,
692 }
693 continue
694 }
Scott Bakere7144bc2019-10-01 14:16:47 -0700695 res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
696 }
697 return res
698}
699
700type MockDeleteRecordsResponse struct {
701 t TestReporter
702}
703
704func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse {
705 return &MockDeleteRecordsResponse{t: t}
706}
707
708func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoder {
709 req := reqBody.(*DeleteRecordsRequest)
710 res := &DeleteRecordsResponse{}
711 res.Topics = make(map[string]*DeleteRecordsResponseTopic)
712
713 for topic, deleteRecordRequestTopic := range req.Topics {
714 partitions := make(map[int32]*DeleteRecordsResponsePartition)
Scott Bakerf579f132019-10-24 14:31:41 -0700715 for partition := range deleteRecordRequestTopic.PartitionOffsets {
Scott Bakere7144bc2019-10-01 14:16:47 -0700716 partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError}
717 }
718 res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions}
719 }
720 return res
721}
722
723type MockDescribeConfigsResponse struct {
724 t TestReporter
725}
726
727func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse {
728 return &MockDescribeConfigsResponse{t: t}
729}
730
731func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
732 req := reqBody.(*DescribeConfigsRequest)
733 res := &DescribeConfigsResponse{}
734
735 for _, r := range req.Resources {
736 var configEntries []*ConfigEntry
737 switch r.Type {
738 case TopicResource:
739 configEntries = append(configEntries,
740 &ConfigEntry{Name: "max.message.bytes",
741 Value: "1000000",
742 ReadOnly: false,
743 Default: true,
744 Sensitive: false,
745 }, &ConfigEntry{Name: "retention.ms",
746 Value: "5000",
747 ReadOnly: false,
748 Default: false,
749 Sensitive: false,
750 }, &ConfigEntry{Name: "password",
751 Value: "12345",
752 ReadOnly: false,
753 Default: false,
754 Sensitive: true,
755 })
756 res.Resources = append(res.Resources, &ResourceResponse{
757 Name: r.Name,
758 Configs: configEntries,
759 })
760 }
761 }
762 return res
763}
764
765type MockAlterConfigsResponse struct {
766 t TestReporter
767}
768
769func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse {
770 return &MockAlterConfigsResponse{t: t}
771}
772
773func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder {
774 req := reqBody.(*AlterConfigsRequest)
775 res := &AlterConfigsResponse{}
776
777 for _, r := range req.Resources {
778 res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name,
779 Type: TopicResource,
780 ErrorMsg: "",
781 })
782 }
783 return res
784}
785
786type MockCreateAclsResponse struct {
787 t TestReporter
788}
789
790func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse {
791 return &MockCreateAclsResponse{t: t}
792}
793
794func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoder {
795 req := reqBody.(*CreateAclsRequest)
796 res := &CreateAclsResponse{}
797
798 for range req.AclCreations {
799 res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError})
800 }
801 return res
802}
803
804type MockListAclsResponse struct {
805 t TestReporter
806}
807
808func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse {
809 return &MockListAclsResponse{t: t}
810}
811
812func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoder {
813 req := reqBody.(*DescribeAclsRequest)
814 res := &DescribeAclsResponse{}
815
816 res.Err = ErrNoError
817 acl := &ResourceAcls{}
818 acl.Resource.ResourceName = *req.ResourceName
819 acl.Resource.ResourceType = req.ResourceType
820 acl.Acls = append(acl.Acls, &Acl{})
821 res.ResourceAcls = append(res.ResourceAcls, acl)
822
823 return res
824}
825
826type MockSaslAuthenticateResponse struct {
827 t TestReporter
828 kerror KError
829 saslAuthBytes []byte
830}
831
832func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse {
833 return &MockSaslAuthenticateResponse{t: t}
834}
835
836func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoder {
837 res := &SaslAuthenticateResponse{}
838 res.Err = msar.kerror
839 res.SaslAuthBytes = msar.saslAuthBytes
840 return res
841}
842
843func (msar *MockSaslAuthenticateResponse) SetError(kerror KError) *MockSaslAuthenticateResponse {
844 msar.kerror = kerror
845 return msar
846}
847
848func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse {
849 msar.saslAuthBytes = saslAuthBytes
850 return msar
851}
852
853type MockDeleteAclsResponse struct {
854 t TestReporter
855}
856
857type MockSaslHandshakeResponse struct {
858 enabledMechanisms []string
859 kerror KError
860 t TestReporter
861}
862
863func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse {
864 return &MockSaslHandshakeResponse{t: t}
865}
866
867func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoder {
868 res := &SaslHandshakeResponse{}
869 res.Err = mshr.kerror
870 res.EnabledMechanisms = mshr.enabledMechanisms
871 return res
872}
873
874func (mshr *MockSaslHandshakeResponse) SetError(kerror KError) *MockSaslHandshakeResponse {
875 mshr.kerror = kerror
876 return mshr
877}
878
879func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse {
880 mshr.enabledMechanisms = enabledMechanisms
881 return mshr
882}
883
884func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
885 return &MockDeleteAclsResponse{t: t}
886}
887
888func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder {
889 req := reqBody.(*DeleteAclsRequest)
890 res := &DeleteAclsResponse{}
891
892 for range req.Filters {
893 response := &FilterResponse{Err: ErrNoError}
894 response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError})
895 res.FilterResponses = append(res.FilterResponses, response)
896 }
897 return res
898}
Scott Bakerf579f132019-10-24 14:31:41 -0700899
900type MockDeleteGroupsResponse struct {
901 deletedGroups []string
902}
903
904func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse {
905 return &MockDeleteGroupsResponse{}
906}
907
908func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse {
909 m.deletedGroups = groups
910 return m
911}
912
913func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoder {
914 resp := &DeleteGroupsResponse{
915 GroupErrorCodes: map[string]KError{},
916 }
917 for _, group := range m.deletedGroups {
918 resp.GroupErrorCodes[group] = ErrNoError
919 }
920 return resp
921}