blob: 6654ed07c396a6360e05e367a4909a3e693eac22 [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001package sarama
2
3import (
4 "fmt"
5 "strings"
6)
7
8// TestReporter has methods matching go's testing.T to avoid importing
9// `testing` in the main part of the library.
10type TestReporter interface {
11 Error(...interface{})
12 Errorf(string, ...interface{})
13 Fatal(...interface{})
14 Fatalf(string, ...interface{})
15}
16
17// MockResponse is a response builder interface it defines one method that
18// allows generating a response based on a request body. MockResponses are used
19// to program behavior of MockBroker in tests.
20type MockResponse interface {
khenaidoo7d3c5582021-08-11 18:09:44 -040021 For(reqBody versionedDecoder) (res encoderWithHeader)
Holger Hildebrandtfa074992020-03-27 15:42:06 +000022}
23
24// MockWrapper is a mock response builder that returns a particular concrete
25// response regardless of the actual request passed to the `For` method.
26type MockWrapper struct {
khenaidoo7d3c5582021-08-11 18:09:44 -040027 res encoderWithHeader
Holger Hildebrandtfa074992020-03-27 15:42:06 +000028}
29
khenaidoo7d3c5582021-08-11 18:09:44 -040030func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoderWithHeader) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +000031 return mw.res
32}
33
khenaidoo7d3c5582021-08-11 18:09:44 -040034func NewMockWrapper(res encoderWithHeader) *MockWrapper {
Holger Hildebrandtfa074992020-03-27 15:42:06 +000035 return &MockWrapper{res: res}
36}
37
38// MockSequence is a mock response builder that is created from a sequence of
39// concrete responses. Every time when a `MockBroker` calls its `For` method
40// the next response from the sequence is returned. When the end of the
41// sequence is reached the last element from the sequence is returned.
42type MockSequence struct {
43 responses []MockResponse
44}
45
46func NewMockSequence(responses ...interface{}) *MockSequence {
47 ms := &MockSequence{}
48 ms.responses = make([]MockResponse, len(responses))
49 for i, res := range responses {
50 switch res := res.(type) {
51 case MockResponse:
52 ms.responses[i] = res
khenaidoo7d3c5582021-08-11 18:09:44 -040053 case encoderWithHeader:
Holger Hildebrandtfa074992020-03-27 15:42:06 +000054 ms.responses[i] = NewMockWrapper(res)
55 default:
56 panic(fmt.Sprintf("Unexpected response type: %T", res))
57 }
58 }
59 return ms
60}
61
khenaidoo7d3c5582021-08-11 18:09:44 -040062func (mc *MockSequence) For(reqBody versionedDecoder) (res encoderWithHeader) {
Holger Hildebrandtfa074992020-03-27 15:42:06 +000063 res = mc.responses[0].For(reqBody)
64 if len(mc.responses) > 1 {
65 mc.responses = mc.responses[1:]
66 }
67 return res
68}
69
70type MockListGroupsResponse struct {
71 groups map[string]string
72 t TestReporter
73}
74
75func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse {
76 return &MockListGroupsResponse{
77 groups: make(map[string]string),
78 t: t,
79 }
80}
81
khenaidoo7d3c5582021-08-11 18:09:44 -040082func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +000083 request := reqBody.(*ListGroupsRequest)
84 _ = request
85 response := &ListGroupsResponse{
86 Groups: m.groups,
87 }
88 return response
89}
90
91func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse {
92 m.groups[groupID] = protocolType
93 return m
94}
95
96type MockDescribeGroupsResponse struct {
97 groups map[string]*GroupDescription
98 t TestReporter
99}
100
101func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse {
102 return &MockDescribeGroupsResponse{
103 t: t,
104 groups: make(map[string]*GroupDescription),
105 }
106}
107
108func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse {
109 m.groups[groupID] = description
110 return m
111}
112
khenaidoo7d3c5582021-08-11 18:09:44 -0400113func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000114 request := reqBody.(*DescribeGroupsRequest)
115
116 response := &DescribeGroupsResponse{}
117 for _, requestedGroup := range request.Groups {
118 if group, ok := m.groups[requestedGroup]; ok {
119 response.Groups = append(response.Groups, group)
120 } else {
121 // Mimic real kafka - if a group doesn't exist, return
122 // an entry with state "Dead"
123 response.Groups = append(response.Groups, &GroupDescription{
124 GroupId: requestedGroup,
125 State: "Dead",
126 })
127 }
128 }
129
130 return response
131}
132
133// MockMetadataResponse is a `MetadataResponse` builder.
134type MockMetadataResponse struct {
135 controllerID int32
136 leaders map[string]map[int32]int32
137 brokers map[string]int32
138 t TestReporter
139}
140
141func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
142 return &MockMetadataResponse{
143 leaders: make(map[string]map[int32]int32),
144 brokers: make(map[string]int32),
145 t: t,
146 }
147}
148
149func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse {
150 partitions := mmr.leaders[topic]
151 if partitions == nil {
152 partitions = make(map[int32]int32)
153 mmr.leaders[topic] = partitions
154 }
155 partitions[partition] = brokerID
156 return mmr
157}
158
159func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse {
160 mmr.brokers[addr] = brokerID
161 return mmr
162}
163
164func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {
165 mmr.controllerID = brokerID
166 return mmr
167}
168
khenaidoo7d3c5582021-08-11 18:09:44 -0400169func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000170 metadataRequest := reqBody.(*MetadataRequest)
171 metadataResponse := &MetadataResponse{
172 Version: metadataRequest.version(),
173 ControllerID: mmr.controllerID,
174 }
175 for addr, brokerID := range mmr.brokers {
176 metadataResponse.AddBroker(addr, brokerID)
177 }
178
179 // Generate set of replicas
khenaidoo7d3c5582021-08-11 18:09:44 -0400180 var replicas []int32
181 var offlineReplicas []int32
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000182 for _, brokerID := range mmr.brokers {
183 replicas = append(replicas, brokerID)
184 }
185
186 if len(metadataRequest.Topics) == 0 {
187 for topic, partitions := range mmr.leaders {
188 for partition, brokerID := range partitions {
189 metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
190 }
191 }
192 return metadataResponse
193 }
194 for _, topic := range metadataRequest.Topics {
195 for partition, brokerID := range mmr.leaders[topic] {
196 metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
197 }
198 }
199 return metadataResponse
200}
201
202// MockOffsetResponse is an `OffsetResponse` builder.
203type MockOffsetResponse struct {
204 offsets map[string]map[int32]map[int64]int64
205 t TestReporter
206 version int16
207}
208
209func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
210 return &MockOffsetResponse{
211 offsets: make(map[string]map[int32]map[int64]int64),
212 t: t,
213 }
214}
215
216func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse {
217 mor.version = version
218 return mor
219}
220
221func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
222 partitions := mor.offsets[topic]
223 if partitions == nil {
224 partitions = make(map[int32]map[int64]int64)
225 mor.offsets[topic] = partitions
226 }
227 times := partitions[partition]
228 if times == nil {
229 times = make(map[int64]int64)
230 partitions[partition] = times
231 }
232 times[time] = offset
233 return mor
234}
235
khenaidoo7d3c5582021-08-11 18:09:44 -0400236func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000237 offsetRequest := reqBody.(*OffsetRequest)
238 offsetResponse := &OffsetResponse{Version: mor.version}
239 for topic, partitions := range offsetRequest.blocks {
240 for partition, block := range partitions {
241 offset := mor.getOffset(topic, partition, block.time)
242 offsetResponse.AddTopicPartition(topic, partition, offset)
243 }
244 }
245 return offsetResponse
246}
247
248func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
249 partitions := mor.offsets[topic]
250 if partitions == nil {
251 mor.t.Errorf("missing topic: %s", topic)
252 }
253 times := partitions[partition]
254 if times == nil {
255 mor.t.Errorf("missing partition: %d", partition)
256 }
257 offset, ok := times[time]
258 if !ok {
259 mor.t.Errorf("missing time: %d", time)
260 }
261 return offset
262}
263
264// MockFetchResponse is a `FetchResponse` builder.
265type MockFetchResponse struct {
266 messages map[string]map[int32]map[int64]Encoder
267 highWaterMarks map[string]map[int32]int64
268 t TestReporter
269 batchSize int
270 version int16
271}
272
273func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
274 return &MockFetchResponse{
275 messages: make(map[string]map[int32]map[int64]Encoder),
276 highWaterMarks: make(map[string]map[int32]int64),
277 t: t,
278 batchSize: batchSize,
279 }
280}
281
282func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse {
283 mfr.version = version
284 return mfr
285}
286
287func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
288 partitions := mfr.messages[topic]
289 if partitions == nil {
290 partitions = make(map[int32]map[int64]Encoder)
291 mfr.messages[topic] = partitions
292 }
293 messages := partitions[partition]
294 if messages == nil {
295 messages = make(map[int64]Encoder)
296 partitions[partition] = messages
297 }
298 messages[offset] = msg
299 return mfr
300}
301
302func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse {
303 partitions := mfr.highWaterMarks[topic]
304 if partitions == nil {
305 partitions = make(map[int32]int64)
306 mfr.highWaterMarks[topic] = partitions
307 }
308 partitions[partition] = offset
309 return mfr
310}
311
khenaidoo7d3c5582021-08-11 18:09:44 -0400312func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000313 fetchRequest := reqBody.(*FetchRequest)
314 res := &FetchResponse{
315 Version: mfr.version,
316 }
317 for topic, partitions := range fetchRequest.blocks {
318 for partition, block := range partitions {
319 initialOffset := block.fetchOffset
320 offset := initialOffset
321 maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
322 for i := 0; i < mfr.batchSize && offset < maxOffset; {
323 msg := mfr.getMessage(topic, partition, offset)
324 if msg != nil {
325 res.AddMessage(topic, partition, nil, msg, offset)
326 i++
327 }
328 offset++
329 }
330 fb := res.GetBlock(topic, partition)
331 if fb == nil {
332 res.AddError(topic, partition, ErrNoError)
333 fb = res.GetBlock(topic, partition)
334 }
335 fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
336 }
337 }
338 return res
339}
340
341func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
342 partitions := mfr.messages[topic]
343 if partitions == nil {
344 return nil
345 }
346 messages := partitions[partition]
347 if messages == nil {
348 return nil
349 }
350 return messages[offset]
351}
352
353func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int {
354 partitions := mfr.messages[topic]
355 if partitions == nil {
356 return 0
357 }
358 messages := partitions[partition]
359 if messages == nil {
360 return 0
361 }
362 return len(messages)
363}
364
365func (mfr *MockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
366 partitions := mfr.highWaterMarks[topic]
367 if partitions == nil {
368 return 0
369 }
370 return partitions[partition]
371}
372
373// MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
374type MockConsumerMetadataResponse struct {
375 coordinators map[string]interface{}
376 t TestReporter
377}
378
379func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse {
380 return &MockConsumerMetadataResponse{
381 coordinators: make(map[string]interface{}),
382 t: t,
383 }
384}
385
386func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse {
387 mr.coordinators[group] = broker
388 return mr
389}
390
391func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse {
392 mr.coordinators[group] = kerror
393 return mr
394}
395
khenaidoo7d3c5582021-08-11 18:09:44 -0400396func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000397 req := reqBody.(*ConsumerMetadataRequest)
398 group := req.ConsumerGroup
399 res := &ConsumerMetadataResponse{}
400 v := mr.coordinators[group]
401 switch v := v.(type) {
402 case *MockBroker:
403 res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
404 case KError:
405 res.Err = v
406 }
407 return res
408}
409
410// MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
411type MockFindCoordinatorResponse struct {
412 groupCoordinators map[string]interface{}
413 transCoordinators map[string]interface{}
414 t TestReporter
415}
416
417func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {
418 return &MockFindCoordinatorResponse{
419 groupCoordinators: make(map[string]interface{}),
420 transCoordinators: make(map[string]interface{}),
421 t: t,
422 }
423}
424
425func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {
426 switch coordinatorType {
427 case CoordinatorGroup:
428 mr.groupCoordinators[group] = broker
429 case CoordinatorTransaction:
430 mr.transCoordinators[group] = broker
431 }
432 return mr
433}
434
435func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {
436 switch coordinatorType {
437 case CoordinatorGroup:
438 mr.groupCoordinators[group] = kerror
439 case CoordinatorTransaction:
440 mr.transCoordinators[group] = kerror
441 }
442 return mr
443}
444
khenaidoo7d3c5582021-08-11 18:09:44 -0400445func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000446 req := reqBody.(*FindCoordinatorRequest)
447 res := &FindCoordinatorResponse{}
448 var v interface{}
449 switch req.CoordinatorType {
450 case CoordinatorGroup:
451 v = mr.groupCoordinators[req.CoordinatorKey]
452 case CoordinatorTransaction:
453 v = mr.transCoordinators[req.CoordinatorKey]
454 }
455 switch v := v.(type) {
456 case *MockBroker:
457 res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
458 case KError:
459 res.Err = v
460 }
461 return res
462}
463
464// MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
465type MockOffsetCommitResponse struct {
466 errors map[string]map[string]map[int32]KError
467 t TestReporter
468}
469
470func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse {
471 return &MockOffsetCommitResponse{t: t}
472}
473
474func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse {
475 if mr.errors == nil {
476 mr.errors = make(map[string]map[string]map[int32]KError)
477 }
478 topics := mr.errors[group]
479 if topics == nil {
480 topics = make(map[string]map[int32]KError)
481 mr.errors[group] = topics
482 }
483 partitions := topics[topic]
484 if partitions == nil {
485 partitions = make(map[int32]KError)
486 topics[topic] = partitions
487 }
488 partitions[partition] = kerror
489 return mr
490}
491
khenaidoo7d3c5582021-08-11 18:09:44 -0400492func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000493 req := reqBody.(*OffsetCommitRequest)
494 group := req.ConsumerGroup
495 res := &OffsetCommitResponse{}
496 for topic, partitions := range req.blocks {
497 for partition := range partitions {
498 res.AddError(topic, partition, mr.getError(group, topic, partition))
499 }
500 }
501 return res
502}
503
504func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
505 topics := mr.errors[group]
506 if topics == nil {
507 return ErrNoError
508 }
509 partitions := topics[topic]
510 if partitions == nil {
511 return ErrNoError
512 }
513 kerror, ok := partitions[partition]
514 if !ok {
515 return ErrNoError
516 }
517 return kerror
518}
519
520// MockProduceResponse is a `ProduceResponse` builder.
521type MockProduceResponse struct {
522 version int16
523 errors map[string]map[int32]KError
524 t TestReporter
525}
526
527func NewMockProduceResponse(t TestReporter) *MockProduceResponse {
528 return &MockProduceResponse{t: t}
529}
530
531func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {
532 mr.version = version
533 return mr
534}
535
536func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {
537 if mr.errors == nil {
538 mr.errors = make(map[string]map[int32]KError)
539 }
540 partitions := mr.errors[topic]
541 if partitions == nil {
542 partitions = make(map[int32]KError)
543 mr.errors[topic] = partitions
544 }
545 partitions[partition] = kerror
546 return mr
547}
548
khenaidoo7d3c5582021-08-11 18:09:44 -0400549func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000550 req := reqBody.(*ProduceRequest)
551 res := &ProduceResponse{
552 Version: mr.version,
553 }
554 for topic, partitions := range req.records {
555 for partition := range partitions {
556 res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
557 }
558 }
559 return res
560}
561
562func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
563 partitions := mr.errors[topic]
564 if partitions == nil {
565 return ErrNoError
566 }
567 kerror, ok := partitions[partition]
568 if !ok {
569 return ErrNoError
570 }
571 return kerror
572}
573
574// MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
575type MockOffsetFetchResponse struct {
576 offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
577 error KError
578 t TestReporter
579}
580
581func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse {
582 return &MockOffsetFetchResponse{t: t}
583}
584
585func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse {
586 if mr.offsets == nil {
587 mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
588 }
589 topics := mr.offsets[group]
590 if topics == nil {
591 topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
592 mr.offsets[group] = topics
593 }
594 partitions := topics[topic]
595 if partitions == nil {
596 partitions = make(map[int32]*OffsetFetchResponseBlock)
597 topics[topic] = partitions
598 }
599 partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
600 return mr
601}
602
603func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse {
604 mr.error = kerror
605 return mr
606}
607
khenaidoo7d3c5582021-08-11 18:09:44 -0400608func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000609 req := reqBody.(*OffsetFetchRequest)
610 group := req.ConsumerGroup
611 res := &OffsetFetchResponse{Version: req.Version}
612
613 for topic, partitions := range mr.offsets[group] {
614 for partition, block := range partitions {
615 res.AddBlock(topic, partition, block)
616 }
617 }
618
619 if res.Version >= 2 {
620 res.Err = mr.error
621 }
622 return res
623}
624
625type MockCreateTopicsResponse struct {
626 t TestReporter
627}
628
629func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {
630 return &MockCreateTopicsResponse{t: t}
631}
632
khenaidoo7d3c5582021-08-11 18:09:44 -0400633func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000634 req := reqBody.(*CreateTopicsRequest)
635 res := &CreateTopicsResponse{
636 Version: req.Version,
637 }
638 res.TopicErrors = make(map[string]*TopicError)
639
640 for topic := range req.TopicDetails {
641 if res.Version >= 1 && strings.HasPrefix(topic, "_") {
642 msg := "insufficient permissions to create topic with reserved prefix"
643 res.TopicErrors[topic] = &TopicError{
644 Err: ErrTopicAuthorizationFailed,
645 ErrMsg: &msg,
646 }
647 continue
648 }
649 res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
650 }
651 return res
652}
653
654type MockDeleteTopicsResponse struct {
655 t TestReporter
656}
657
658func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse {
659 return &MockDeleteTopicsResponse{t: t}
660}
661
khenaidoo7d3c5582021-08-11 18:09:44 -0400662func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000663 req := reqBody.(*DeleteTopicsRequest)
664 res := &DeleteTopicsResponse{}
665 res.TopicErrorCodes = make(map[string]KError)
666
667 for _, topic := range req.Topics {
668 res.TopicErrorCodes[topic] = ErrNoError
669 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400670 res.Version = req.Version
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000671 return res
672}
673
674type MockCreatePartitionsResponse struct {
675 t TestReporter
676}
677
678func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse {
679 return &MockCreatePartitionsResponse{t: t}
680}
681
khenaidoo7d3c5582021-08-11 18:09:44 -0400682func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000683 req := reqBody.(*CreatePartitionsRequest)
684 res := &CreatePartitionsResponse{}
685 res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
686
687 for topic := range req.TopicPartitions {
688 if strings.HasPrefix(topic, "_") {
689 msg := "insufficient permissions to create partition on topic with reserved prefix"
690 res.TopicPartitionErrors[topic] = &TopicPartitionError{
691 Err: ErrTopicAuthorizationFailed,
692 ErrMsg: &msg,
693 }
694 continue
695 }
696 res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
697 }
698 return res
699}
700
khenaidoo7d3c5582021-08-11 18:09:44 -0400701type MockAlterPartitionReassignmentsResponse struct {
702 t TestReporter
703}
704
705func NewMockAlterPartitionReassignmentsResponse(t TestReporter) *MockAlterPartitionReassignmentsResponse {
706 return &MockAlterPartitionReassignmentsResponse{t: t}
707}
708
709func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
710 req := reqBody.(*AlterPartitionReassignmentsRequest)
711 _ = req
712 res := &AlterPartitionReassignmentsResponse{}
713 return res
714}
715
716type MockListPartitionReassignmentsResponse struct {
717 t TestReporter
718}
719
720func NewMockListPartitionReassignmentsResponse(t TestReporter) *MockListPartitionReassignmentsResponse {
721 return &MockListPartitionReassignmentsResponse{t: t}
722}
723
724func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
725 req := reqBody.(*ListPartitionReassignmentsRequest)
726 _ = req
727 res := &ListPartitionReassignmentsResponse{}
728
729 for topic, partitions := range req.blocks {
730 for _, partition := range partitions {
731 res.AddBlock(topic, partition, []int32{0}, []int32{1}, []int32{2})
732 }
733 }
734
735 return res
736}
737
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000738type MockDeleteRecordsResponse struct {
739 t TestReporter
740}
741
742func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse {
743 return &MockDeleteRecordsResponse{t: t}
744}
745
khenaidoo7d3c5582021-08-11 18:09:44 -0400746func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000747 req := reqBody.(*DeleteRecordsRequest)
748 res := &DeleteRecordsResponse{}
749 res.Topics = make(map[string]*DeleteRecordsResponseTopic)
750
751 for topic, deleteRecordRequestTopic := range req.Topics {
752 partitions := make(map[int32]*DeleteRecordsResponsePartition)
753 for partition := range deleteRecordRequestTopic.PartitionOffsets {
754 partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError}
755 }
756 res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions}
757 }
758 return res
759}
760
761type MockDescribeConfigsResponse struct {
762 t TestReporter
763}
764
765func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse {
766 return &MockDescribeConfigsResponse{t: t}
767}
768
khenaidoo7d3c5582021-08-11 18:09:44 -0400769func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000770 req := reqBody.(*DescribeConfigsRequest)
khenaidoo7d3c5582021-08-11 18:09:44 -0400771 res := &DescribeConfigsResponse{
772 Version: req.Version,
773 }
774
775 includeSynonyms := req.Version > 0
776 includeSource := req.Version > 0
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000777
778 for _, r := range req.Resources {
779 var configEntries []*ConfigEntry
780 switch r.Type {
khenaidoo7d3c5582021-08-11 18:09:44 -0400781 case BrokerResource:
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000782 configEntries = append(configEntries,
khenaidoo7d3c5582021-08-11 18:09:44 -0400783 &ConfigEntry{
784 Name: "min.insync.replicas",
785 Value: "2",
786 ReadOnly: false,
787 Default: false,
788 },
789 )
790 res.Resources = append(res.Resources, &ResourceResponse{
791 Name: r.Name,
792 Configs: configEntries,
793 })
794 case BrokerLoggerResource:
795 configEntries = append(configEntries,
796 &ConfigEntry{
797 Name: "kafka.controller.KafkaController",
798 Value: "DEBUG",
799 ReadOnly: false,
800 Default: false,
801 },
802 )
803 res.Resources = append(res.Resources, &ResourceResponse{
804 Name: r.Name,
805 Configs: configEntries,
806 })
807 case TopicResource:
808 maxMessageBytes := &ConfigEntry{
809 Name: "max.message.bytes",
810 Value: "1000000",
811 ReadOnly: false,
812 Default: !includeSource,
813 Sensitive: false,
814 }
815 if includeSource {
816 maxMessageBytes.Source = SourceDefault
817 }
818 if includeSynonyms {
819 maxMessageBytes.Synonyms = []*ConfigSynonym{
820 {
821 ConfigName: "max.message.bytes",
822 ConfigValue: "500000",
823 },
824 }
825 }
826 retentionMs := &ConfigEntry{
827 Name: "retention.ms",
828 Value: "5000",
829 ReadOnly: false,
830 Default: false,
831 Sensitive: false,
832 }
833 if includeSynonyms {
834 retentionMs.Synonyms = []*ConfigSynonym{
835 {
836 ConfigName: "log.retention.ms",
837 ConfigValue: "2500",
838 },
839 }
840 }
841 password := &ConfigEntry{
842 Name: "password",
843 Value: "12345",
844 ReadOnly: false,
845 Default: false,
846 Sensitive: true,
847 }
848 configEntries = append(
849 configEntries, maxMessageBytes, retentionMs, password)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000850 res.Resources = append(res.Resources, &ResourceResponse{
851 Name: r.Name,
852 Configs: configEntries,
853 })
854 }
855 }
856 return res
857}
858
khenaidoo7d3c5582021-08-11 18:09:44 -0400859type MockDescribeConfigsResponseWithErrorCode struct {
860 t TestReporter
861}
862
863func NewMockDescribeConfigsResponseWithErrorCode(t TestReporter) *MockDescribeConfigsResponseWithErrorCode {
864 return &MockDescribeConfigsResponseWithErrorCode{t: t}
865}
866
867func (mr *MockDescribeConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
868 req := reqBody.(*DescribeConfigsRequest)
869 res := &DescribeConfigsResponse{
870 Version: req.Version,
871 }
872
873 for _, r := range req.Resources {
874 res.Resources = append(res.Resources, &ResourceResponse{
875 Name: r.Name,
876 Type: r.Type,
877 ErrorCode: 83,
878 ErrorMsg: "",
879 })
880 }
881 return res
882}
883
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000884type MockAlterConfigsResponse struct {
885 t TestReporter
886}
887
888func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse {
889 return &MockAlterConfigsResponse{t: t}
890}
891
khenaidoo7d3c5582021-08-11 18:09:44 -0400892func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000893 req := reqBody.(*AlterConfigsRequest)
894 res := &AlterConfigsResponse{}
895
896 for _, r := range req.Resources {
khenaidoo7d3c5582021-08-11 18:09:44 -0400897 res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
898 Name: r.Name,
899 Type: r.Type,
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000900 ErrorMsg: "",
901 })
902 }
903 return res
904}
905
khenaidoo7d3c5582021-08-11 18:09:44 -0400906type MockAlterConfigsResponseWithErrorCode struct {
907 t TestReporter
908}
909
910func NewMockAlterConfigsResponseWithErrorCode(t TestReporter) *MockAlterConfigsResponseWithErrorCode {
911 return &MockAlterConfigsResponseWithErrorCode{t: t}
912}
913
914func (mr *MockAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
915 req := reqBody.(*AlterConfigsRequest)
916 res := &AlterConfigsResponse{}
917
918 for _, r := range req.Resources {
919 res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
920 Name: r.Name,
921 Type: r.Type,
922 ErrorCode: 83,
923 ErrorMsg: "",
924 })
925 }
926 return res
927}
928
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000929type MockCreateAclsResponse struct {
930 t TestReporter
931}
932
933func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse {
934 return &MockCreateAclsResponse{t: t}
935}
936
khenaidoo7d3c5582021-08-11 18:09:44 -0400937func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000938 req := reqBody.(*CreateAclsRequest)
939 res := &CreateAclsResponse{}
940
941 for range req.AclCreations {
942 res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError})
943 }
944 return res
945}
946
947type MockListAclsResponse struct {
948 t TestReporter
949}
950
951func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse {
952 return &MockListAclsResponse{t: t}
953}
954
khenaidoo7d3c5582021-08-11 18:09:44 -0400955func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000956 req := reqBody.(*DescribeAclsRequest)
957 res := &DescribeAclsResponse{}
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000958 res.Err = ErrNoError
959 acl := &ResourceAcls{}
khenaidoo7d3c5582021-08-11 18:09:44 -0400960 if req.ResourceName != nil {
961 acl.Resource.ResourceName = *req.ResourceName
962 }
963 acl.Resource.ResourcePatternType = req.ResourcePatternTypeFilter
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000964 acl.Resource.ResourceType = req.ResourceType
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000965
khenaidoo7d3c5582021-08-11 18:09:44 -0400966 host := "*"
967 if req.Host != nil {
968 host = *req.Host
969 }
970
971 principal := "User:test"
972 if req.Principal != nil {
973 principal = *req.Principal
974 }
975
976 permissionType := req.PermissionType
977 if permissionType == AclPermissionAny {
978 permissionType = AclPermissionAllow
979 }
980
981 acl.Acls = append(acl.Acls, &Acl{Operation: req.Operation, PermissionType: permissionType, Host: host, Principal: principal})
982 res.ResourceAcls = append(res.ResourceAcls, acl)
983 res.Version = int16(req.Version)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000984 return res
985}
986
987type MockSaslAuthenticateResponse struct {
988 t TestReporter
989 kerror KError
990 saslAuthBytes []byte
991}
992
993func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse {
994 return &MockSaslAuthenticateResponse{t: t}
995}
996
khenaidoo7d3c5582021-08-11 18:09:44 -0400997func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000998 res := &SaslAuthenticateResponse{}
999 res.Err = msar.kerror
1000 res.SaslAuthBytes = msar.saslAuthBytes
1001 return res
1002}
1003
1004func (msar *MockSaslAuthenticateResponse) SetError(kerror KError) *MockSaslAuthenticateResponse {
1005 msar.kerror = kerror
1006 return msar
1007}
1008
1009func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse {
1010 msar.saslAuthBytes = saslAuthBytes
1011 return msar
1012}
1013
1014type MockDeleteAclsResponse struct {
1015 t TestReporter
1016}
1017
1018type MockSaslHandshakeResponse struct {
1019 enabledMechanisms []string
1020 kerror KError
1021 t TestReporter
1022}
1023
1024func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse {
1025 return &MockSaslHandshakeResponse{t: t}
1026}
1027
khenaidoo7d3c5582021-08-11 18:09:44 -04001028func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001029 res := &SaslHandshakeResponse{}
1030 res.Err = mshr.kerror
1031 res.EnabledMechanisms = mshr.enabledMechanisms
1032 return res
1033}
1034
1035func (mshr *MockSaslHandshakeResponse) SetError(kerror KError) *MockSaslHandshakeResponse {
1036 mshr.kerror = kerror
1037 return mshr
1038}
1039
1040func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse {
1041 mshr.enabledMechanisms = enabledMechanisms
1042 return mshr
1043}
1044
1045func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
1046 return &MockDeleteAclsResponse{t: t}
1047}
1048
khenaidoo7d3c5582021-08-11 18:09:44 -04001049func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001050 req := reqBody.(*DeleteAclsRequest)
1051 res := &DeleteAclsResponse{}
1052
1053 for range req.Filters {
1054 response := &FilterResponse{Err: ErrNoError}
1055 response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError})
1056 res.FilterResponses = append(res.FilterResponses, response)
1057 }
khenaidoo7d3c5582021-08-11 18:09:44 -04001058 res.Version = int16(req.Version)
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001059 return res
1060}
1061
1062type MockDeleteGroupsResponse struct {
1063 deletedGroups []string
1064}
1065
1066func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse {
1067 return &MockDeleteGroupsResponse{}
1068}
1069
1070func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse {
1071 m.deletedGroups = groups
1072 return m
1073}
1074
khenaidoo7d3c5582021-08-11 18:09:44 -04001075func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001076 resp := &DeleteGroupsResponse{
1077 GroupErrorCodes: map[string]KError{},
1078 }
1079 for _, group := range m.deletedGroups {
1080 resp.GroupErrorCodes[group] = ErrNoError
1081 }
1082 return resp
1083}
khenaidoo7d3c5582021-08-11 18:09:44 -04001084
1085type MockJoinGroupResponse struct {
1086 t TestReporter
1087
1088 ThrottleTime int32
1089 Err KError
1090 GenerationId int32
1091 GroupProtocol string
1092 LeaderId string
1093 MemberId string
1094 Members map[string][]byte
1095}
1096
1097func NewMockJoinGroupResponse(t TestReporter) *MockJoinGroupResponse {
1098 return &MockJoinGroupResponse{
1099 t: t,
1100 Members: make(map[string][]byte),
1101 }
1102}
1103
1104func (m *MockJoinGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
1105 req := reqBody.(*JoinGroupRequest)
1106 resp := &JoinGroupResponse{
1107 Version: req.Version,
1108 ThrottleTime: m.ThrottleTime,
1109 Err: m.Err,
1110 GenerationId: m.GenerationId,
1111 GroupProtocol: m.GroupProtocol,
1112 LeaderId: m.LeaderId,
1113 MemberId: m.MemberId,
1114 Members: m.Members,
1115 }
1116 return resp
1117}
1118
1119func (m *MockJoinGroupResponse) SetThrottleTime(t int32) *MockJoinGroupResponse {
1120 m.ThrottleTime = t
1121 return m
1122}
1123
1124func (m *MockJoinGroupResponse) SetError(kerr KError) *MockJoinGroupResponse {
1125 m.Err = kerr
1126 return m
1127}
1128
1129func (m *MockJoinGroupResponse) SetGenerationId(id int32) *MockJoinGroupResponse {
1130 m.GenerationId = id
1131 return m
1132}
1133
1134func (m *MockJoinGroupResponse) SetGroupProtocol(proto string) *MockJoinGroupResponse {
1135 m.GroupProtocol = proto
1136 return m
1137}
1138
1139func (m *MockJoinGroupResponse) SetLeaderId(id string) *MockJoinGroupResponse {
1140 m.LeaderId = id
1141 return m
1142}
1143
1144func (m *MockJoinGroupResponse) SetMemberId(id string) *MockJoinGroupResponse {
1145 m.MemberId = id
1146 return m
1147}
1148
1149func (m *MockJoinGroupResponse) SetMember(id string, meta *ConsumerGroupMemberMetadata) *MockJoinGroupResponse {
1150 bin, err := encode(meta, nil)
1151 if err != nil {
1152 panic(fmt.Sprintf("error encoding member metadata: %v", err))
1153 }
1154 m.Members[id] = bin
1155 return m
1156}
1157
1158type MockLeaveGroupResponse struct {
1159 t TestReporter
1160
1161 Err KError
1162}
1163
1164func NewMockLeaveGroupResponse(t TestReporter) *MockLeaveGroupResponse {
1165 return &MockLeaveGroupResponse{t: t}
1166}
1167
1168func (m *MockLeaveGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
1169 resp := &LeaveGroupResponse{
1170 Err: m.Err,
1171 }
1172 return resp
1173}
1174
1175func (m *MockLeaveGroupResponse) SetError(kerr KError) *MockLeaveGroupResponse {
1176 m.Err = kerr
1177 return m
1178}
1179
1180type MockSyncGroupResponse struct {
1181 t TestReporter
1182
1183 Err KError
1184 MemberAssignment []byte
1185}
1186
1187func NewMockSyncGroupResponse(t TestReporter) *MockSyncGroupResponse {
1188 return &MockSyncGroupResponse{t: t}
1189}
1190
1191func (m *MockSyncGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
1192 resp := &SyncGroupResponse{
1193 Err: m.Err,
1194 MemberAssignment: m.MemberAssignment,
1195 }
1196 return resp
1197}
1198
1199func (m *MockSyncGroupResponse) SetError(kerr KError) *MockSyncGroupResponse {
1200 m.Err = kerr
1201 return m
1202}
1203
1204func (m *MockSyncGroupResponse) SetMemberAssignment(assignment *ConsumerGroupMemberAssignment) *MockSyncGroupResponse {
1205 bin, err := encode(assignment, nil)
1206 if err != nil {
1207 panic(fmt.Sprintf("error encoding member assignment: %v", err))
1208 }
1209 m.MemberAssignment = bin
1210 return m
1211}
1212
1213type MockHeartbeatResponse struct {
1214 t TestReporter
1215
1216 Err KError
1217}
1218
1219func NewMockHeartbeatResponse(t TestReporter) *MockHeartbeatResponse {
1220 return &MockHeartbeatResponse{t: t}
1221}
1222
1223func (m *MockHeartbeatResponse) For(reqBody versionedDecoder) encoderWithHeader {
1224 resp := &HeartbeatResponse{}
1225 return resp
1226}
1227
1228func (m *MockHeartbeatResponse) SetError(kerr KError) *MockHeartbeatResponse {
1229 m.Err = kerr
1230 return m
1231}
1232
1233type MockDescribeLogDirsResponse struct {
1234 t TestReporter
1235 logDirs []DescribeLogDirsResponseDirMetadata
1236}
1237
1238func NewMockDescribeLogDirsResponse(t TestReporter) *MockDescribeLogDirsResponse {
1239 return &MockDescribeLogDirsResponse{t: t}
1240}
1241
1242func (m *MockDescribeLogDirsResponse) SetLogDirs(logDirPath string, topicPartitions map[string]int) *MockDescribeLogDirsResponse {
1243 var topics []DescribeLogDirsResponseTopic
1244 for topic := range topicPartitions {
1245 var partitions []DescribeLogDirsResponsePartition
1246 for i := 0; i < topicPartitions[topic]; i++ {
1247 partitions = append(partitions, DescribeLogDirsResponsePartition{
1248 PartitionID: int32(i),
1249 IsTemporary: false,
1250 OffsetLag: int64(0),
1251 Size: int64(1234),
1252 })
1253 }
1254 topics = append(topics, DescribeLogDirsResponseTopic{
1255 Topic: topic,
1256 Partitions: partitions,
1257 })
1258 }
1259 logDir := DescribeLogDirsResponseDirMetadata{
1260 ErrorCode: ErrNoError,
1261 Path: logDirPath,
1262 Topics: topics,
1263 }
1264 m.logDirs = []DescribeLogDirsResponseDirMetadata{logDir}
1265 return m
1266}
1267
1268func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithHeader {
1269 resp := &DescribeLogDirsResponse{
1270 LogDirs: m.logDirs,
1271 }
1272 return resp
1273}