blob: da816963a5047ab3dbdc144c325493abc0edf42c [file] [log] [blame]
kesavand2cde6582020-06-22 04:56:23 -04001package sarama
2
3import (
4 "fmt"
5 "strings"
kesavandc71914f2022-03-25 11:19:03 +05306 "sync"
kesavand2cde6582020-06-22 04:56:23 -04007)
8
9// TestReporter has methods matching go's testing.T to avoid importing
10// `testing` in the main part of the library.
11type TestReporter interface {
12 Error(...interface{})
13 Errorf(string, ...interface{})
14 Fatal(...interface{})
15 Fatalf(string, ...interface{})
16}
17
18// MockResponse is a response builder interface it defines one method that
19// allows generating a response based on a request body. MockResponses are used
20// to program behavior of MockBroker in tests.
21type MockResponse interface {
kesavandc71914f2022-03-25 11:19:03 +053022 For(reqBody versionedDecoder) (res encoderWithHeader)
kesavand2cde6582020-06-22 04:56:23 -040023}
24
25// MockWrapper is a mock response builder that returns a particular concrete
26// response regardless of the actual request passed to the `For` method.
27type MockWrapper struct {
kesavandc71914f2022-03-25 11:19:03 +053028 res encoderWithHeader
kesavand2cde6582020-06-22 04:56:23 -040029}
30
kesavandc71914f2022-03-25 11:19:03 +053031func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoderWithHeader) {
kesavand2cde6582020-06-22 04:56:23 -040032 return mw.res
33}
34
kesavandc71914f2022-03-25 11:19:03 +053035func NewMockWrapper(res encoderWithHeader) *MockWrapper {
kesavand2cde6582020-06-22 04:56:23 -040036 return &MockWrapper{res: res}
37}
38
39// MockSequence is a mock response builder that is created from a sequence of
40// concrete responses. Every time when a `MockBroker` calls its `For` method
41// the next response from the sequence is returned. When the end of the
42// sequence is reached the last element from the sequence is returned.
43type MockSequence struct {
44 responses []MockResponse
45}
46
47func NewMockSequence(responses ...interface{}) *MockSequence {
48 ms := &MockSequence{}
49 ms.responses = make([]MockResponse, len(responses))
50 for i, res := range responses {
51 switch res := res.(type) {
52 case MockResponse:
53 ms.responses[i] = res
kesavandc71914f2022-03-25 11:19:03 +053054 case encoderWithHeader:
kesavand2cde6582020-06-22 04:56:23 -040055 ms.responses[i] = NewMockWrapper(res)
56 default:
57 panic(fmt.Sprintf("Unexpected response type: %T", res))
58 }
59 }
60 return ms
61}
62
kesavandc71914f2022-03-25 11:19:03 +053063func (mc *MockSequence) For(reqBody versionedDecoder) (res encoderWithHeader) {
kesavand2cde6582020-06-22 04:56:23 -040064 res = mc.responses[0].For(reqBody)
65 if len(mc.responses) > 1 {
66 mc.responses = mc.responses[1:]
67 }
68 return res
69}
70
71type MockListGroupsResponse struct {
72 groups map[string]string
73 t TestReporter
74}
75
76func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse {
77 return &MockListGroupsResponse{
78 groups: make(map[string]string),
79 t: t,
80 }
81}
82
kesavandc71914f2022-03-25 11:19:03 +053083func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -040084 request := reqBody.(*ListGroupsRequest)
85 _ = request
86 response := &ListGroupsResponse{
87 Groups: m.groups,
88 }
89 return response
90}
91
92func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse {
93 m.groups[groupID] = protocolType
94 return m
95}
96
97type MockDescribeGroupsResponse struct {
98 groups map[string]*GroupDescription
99 t TestReporter
100}
101
102func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse {
103 return &MockDescribeGroupsResponse{
104 t: t,
105 groups: make(map[string]*GroupDescription),
106 }
107}
108
109func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse {
110 m.groups[groupID] = description
111 return m
112}
113
kesavandc71914f2022-03-25 11:19:03 +0530114func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -0400115 request := reqBody.(*DescribeGroupsRequest)
116
117 response := &DescribeGroupsResponse{}
118 for _, requestedGroup := range request.Groups {
119 if group, ok := m.groups[requestedGroup]; ok {
120 response.Groups = append(response.Groups, group)
121 } else {
122 // Mimic real kafka - if a group doesn't exist, return
123 // an entry with state "Dead"
124 response.Groups = append(response.Groups, &GroupDescription{
125 GroupId: requestedGroup,
126 State: "Dead",
127 })
128 }
129 }
130
131 return response
132}
133
134// MockMetadataResponse is a `MetadataResponse` builder.
135type MockMetadataResponse struct {
136 controllerID int32
137 leaders map[string]map[int32]int32
138 brokers map[string]int32
139 t TestReporter
140}
141
142func NewMockMetadataResponse(t TestReporter) *MockMetadataResponse {
143 return &MockMetadataResponse{
144 leaders: make(map[string]map[int32]int32),
145 brokers: make(map[string]int32),
146 t: t,
147 }
148}
149
150func (mmr *MockMetadataResponse) SetLeader(topic string, partition, brokerID int32) *MockMetadataResponse {
151 partitions := mmr.leaders[topic]
152 if partitions == nil {
153 partitions = make(map[int32]int32)
154 mmr.leaders[topic] = partitions
155 }
156 partitions[partition] = brokerID
157 return mmr
158}
159
160func (mmr *MockMetadataResponse) SetBroker(addr string, brokerID int32) *MockMetadataResponse {
161 mmr.brokers[addr] = brokerID
162 return mmr
163}
164
165func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResponse {
166 mmr.controllerID = brokerID
167 return mmr
168}
169
kesavandc71914f2022-03-25 11:19:03 +0530170func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -0400171 metadataRequest := reqBody.(*MetadataRequest)
172 metadataResponse := &MetadataResponse{
173 Version: metadataRequest.version(),
174 ControllerID: mmr.controllerID,
175 }
176 for addr, brokerID := range mmr.brokers {
177 metadataResponse.AddBroker(addr, brokerID)
178 }
179
180 // Generate set of replicas
kesavandc71914f2022-03-25 11:19:03 +0530181 var replicas []int32
182 var offlineReplicas []int32
kesavand2cde6582020-06-22 04:56:23 -0400183 for _, brokerID := range mmr.brokers {
184 replicas = append(replicas, brokerID)
185 }
186
187 if len(metadataRequest.Topics) == 0 {
188 for topic, partitions := range mmr.leaders {
189 for partition, brokerID := range partitions {
190 metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
191 }
192 }
193 return metadataResponse
194 }
195 for _, topic := range metadataRequest.Topics {
196 for partition, brokerID := range mmr.leaders[topic] {
197 metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
198 }
199 }
200 return metadataResponse
201}
202
203// MockOffsetResponse is an `OffsetResponse` builder.
204type MockOffsetResponse struct {
205 offsets map[string]map[int32]map[int64]int64
206 t TestReporter
207 version int16
208}
209
210func NewMockOffsetResponse(t TestReporter) *MockOffsetResponse {
211 return &MockOffsetResponse{
212 offsets: make(map[string]map[int32]map[int64]int64),
213 t: t,
214 }
215}
216
217func (mor *MockOffsetResponse) SetVersion(version int16) *MockOffsetResponse {
218 mor.version = version
219 return mor
220}
221
222func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, offset int64) *MockOffsetResponse {
223 partitions := mor.offsets[topic]
224 if partitions == nil {
225 partitions = make(map[int32]map[int64]int64)
226 mor.offsets[topic] = partitions
227 }
228 times := partitions[partition]
229 if times == nil {
230 times = make(map[int64]int64)
231 partitions[partition] = times
232 }
233 times[time] = offset
234 return mor
235}
236
kesavandc71914f2022-03-25 11:19:03 +0530237func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -0400238 offsetRequest := reqBody.(*OffsetRequest)
239 offsetResponse := &OffsetResponse{Version: mor.version}
240 for topic, partitions := range offsetRequest.blocks {
241 for partition, block := range partitions {
242 offset := mor.getOffset(topic, partition, block.time)
243 offsetResponse.AddTopicPartition(topic, partition, offset)
244 }
245 }
246 return offsetResponse
247}
248
249func (mor *MockOffsetResponse) getOffset(topic string, partition int32, time int64) int64 {
250 partitions := mor.offsets[topic]
251 if partitions == nil {
252 mor.t.Errorf("missing topic: %s", topic)
253 }
254 times := partitions[partition]
255 if times == nil {
256 mor.t.Errorf("missing partition: %d", partition)
257 }
258 offset, ok := times[time]
259 if !ok {
260 mor.t.Errorf("missing time: %d", time)
261 }
262 return offset
263}
264
265// MockFetchResponse is a `FetchResponse` builder.
266type MockFetchResponse struct {
267 messages map[string]map[int32]map[int64]Encoder
kesavandc71914f2022-03-25 11:19:03 +0530268 messagesLock *sync.RWMutex
kesavand2cde6582020-06-22 04:56:23 -0400269 highWaterMarks map[string]map[int32]int64
270 t TestReporter
271 batchSize int
272 version int16
273}
274
275func NewMockFetchResponse(t TestReporter, batchSize int) *MockFetchResponse {
276 return &MockFetchResponse{
277 messages: make(map[string]map[int32]map[int64]Encoder),
kesavandc71914f2022-03-25 11:19:03 +0530278 messagesLock: &sync.RWMutex{},
kesavand2cde6582020-06-22 04:56:23 -0400279 highWaterMarks: make(map[string]map[int32]int64),
280 t: t,
281 batchSize: batchSize,
282 }
283}
284
285func (mfr *MockFetchResponse) SetVersion(version int16) *MockFetchResponse {
286 mfr.version = version
287 return mfr
288}
289
290func (mfr *MockFetchResponse) SetMessage(topic string, partition int32, offset int64, msg Encoder) *MockFetchResponse {
kesavandc71914f2022-03-25 11:19:03 +0530291 mfr.messagesLock.Lock()
292 defer mfr.messagesLock.Unlock()
kesavand2cde6582020-06-22 04:56:23 -0400293 partitions := mfr.messages[topic]
294 if partitions == nil {
295 partitions = make(map[int32]map[int64]Encoder)
296 mfr.messages[topic] = partitions
297 }
298 messages := partitions[partition]
299 if messages == nil {
300 messages = make(map[int64]Encoder)
301 partitions[partition] = messages
302 }
303 messages[offset] = msg
304 return mfr
305}
306
307func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, offset int64) *MockFetchResponse {
308 partitions := mfr.highWaterMarks[topic]
309 if partitions == nil {
310 partitions = make(map[int32]int64)
311 mfr.highWaterMarks[topic] = partitions
312 }
313 partitions[partition] = offset
314 return mfr
315}
316
kesavandc71914f2022-03-25 11:19:03 +0530317func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -0400318 fetchRequest := reqBody.(*FetchRequest)
319 res := &FetchResponse{
320 Version: mfr.version,
321 }
322 for topic, partitions := range fetchRequest.blocks {
323 for partition, block := range partitions {
324 initialOffset := block.fetchOffset
325 offset := initialOffset
326 maxOffset := initialOffset + int64(mfr.getMessageCount(topic, partition))
327 for i := 0; i < mfr.batchSize && offset < maxOffset; {
328 msg := mfr.getMessage(topic, partition, offset)
329 if msg != nil {
330 res.AddMessage(topic, partition, nil, msg, offset)
331 i++
332 }
333 offset++
334 }
335 fb := res.GetBlock(topic, partition)
336 if fb == nil {
337 res.AddError(topic, partition, ErrNoError)
338 fb = res.GetBlock(topic, partition)
339 }
340 fb.HighWaterMarkOffset = mfr.getHighWaterMark(topic, partition)
341 }
342 }
343 return res
344}
345
346func (mfr *MockFetchResponse) getMessage(topic string, partition int32, offset int64) Encoder {
kesavandc71914f2022-03-25 11:19:03 +0530347 mfr.messagesLock.RLock()
348 defer mfr.messagesLock.RUnlock()
kesavand2cde6582020-06-22 04:56:23 -0400349 partitions := mfr.messages[topic]
350 if partitions == nil {
351 return nil
352 }
353 messages := partitions[partition]
354 if messages == nil {
355 return nil
356 }
357 return messages[offset]
358}
359
360func (mfr *MockFetchResponse) getMessageCount(topic string, partition int32) int {
kesavandc71914f2022-03-25 11:19:03 +0530361 mfr.messagesLock.RLock()
362 defer mfr.messagesLock.RUnlock()
kesavand2cde6582020-06-22 04:56:23 -0400363 partitions := mfr.messages[topic]
364 if partitions == nil {
365 return 0
366 }
367 messages := partitions[partition]
368 if messages == nil {
369 return 0
370 }
371 return len(messages)
372}
373
374func (mfr *MockFetchResponse) getHighWaterMark(topic string, partition int32) int64 {
375 partitions := mfr.highWaterMarks[topic]
376 if partitions == nil {
377 return 0
378 }
379 return partitions[partition]
380}
381
382// MockConsumerMetadataResponse is a `ConsumerMetadataResponse` builder.
383type MockConsumerMetadataResponse struct {
384 coordinators map[string]interface{}
385 t TestReporter
386}
387
388func NewMockConsumerMetadataResponse(t TestReporter) *MockConsumerMetadataResponse {
389 return &MockConsumerMetadataResponse{
390 coordinators: make(map[string]interface{}),
391 t: t,
392 }
393}
394
395func (mr *MockConsumerMetadataResponse) SetCoordinator(group string, broker *MockBroker) *MockConsumerMetadataResponse {
396 mr.coordinators[group] = broker
397 return mr
398}
399
400func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *MockConsumerMetadataResponse {
401 mr.coordinators[group] = kerror
402 return mr
403}
404
kesavandc71914f2022-03-25 11:19:03 +0530405func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -0400406 req := reqBody.(*ConsumerMetadataRequest)
407 group := req.ConsumerGroup
408 res := &ConsumerMetadataResponse{}
409 v := mr.coordinators[group]
410 switch v := v.(type) {
411 case *MockBroker:
412 res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
413 case KError:
414 res.Err = v
415 }
416 return res
417}
418
419// MockFindCoordinatorResponse is a `FindCoordinatorResponse` builder.
420type MockFindCoordinatorResponse struct {
421 groupCoordinators map[string]interface{}
422 transCoordinators map[string]interface{}
423 t TestReporter
424}
425
426func NewMockFindCoordinatorResponse(t TestReporter) *MockFindCoordinatorResponse {
427 return &MockFindCoordinatorResponse{
428 groupCoordinators: make(map[string]interface{}),
429 transCoordinators: make(map[string]interface{}),
430 t: t,
431 }
432}
433
434func (mr *MockFindCoordinatorResponse) SetCoordinator(coordinatorType CoordinatorType, group string, broker *MockBroker) *MockFindCoordinatorResponse {
435 switch coordinatorType {
436 case CoordinatorGroup:
437 mr.groupCoordinators[group] = broker
438 case CoordinatorTransaction:
439 mr.transCoordinators[group] = broker
440 }
441 return mr
442}
443
444func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType, group string, kerror KError) *MockFindCoordinatorResponse {
445 switch coordinatorType {
446 case CoordinatorGroup:
447 mr.groupCoordinators[group] = kerror
448 case CoordinatorTransaction:
449 mr.transCoordinators[group] = kerror
450 }
451 return mr
452}
453
kesavandc71914f2022-03-25 11:19:03 +0530454func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -0400455 req := reqBody.(*FindCoordinatorRequest)
456 res := &FindCoordinatorResponse{}
457 var v interface{}
458 switch req.CoordinatorType {
459 case CoordinatorGroup:
460 v = mr.groupCoordinators[req.CoordinatorKey]
461 case CoordinatorTransaction:
462 v = mr.transCoordinators[req.CoordinatorKey]
463 }
464 switch v := v.(type) {
465 case *MockBroker:
466 res.Coordinator = &Broker{id: v.BrokerID(), addr: v.Addr()}
467 case KError:
468 res.Err = v
469 }
470 return res
471}
472
473// MockOffsetCommitResponse is a `OffsetCommitResponse` builder.
474type MockOffsetCommitResponse struct {
475 errors map[string]map[string]map[int32]KError
476 t TestReporter
477}
478
479func NewMockOffsetCommitResponse(t TestReporter) *MockOffsetCommitResponse {
480 return &MockOffsetCommitResponse{t: t}
481}
482
483func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int32, kerror KError) *MockOffsetCommitResponse {
484 if mr.errors == nil {
485 mr.errors = make(map[string]map[string]map[int32]KError)
486 }
487 topics := mr.errors[group]
488 if topics == nil {
489 topics = make(map[string]map[int32]KError)
490 mr.errors[group] = topics
491 }
492 partitions := topics[topic]
493 if partitions == nil {
494 partitions = make(map[int32]KError)
495 topics[topic] = partitions
496 }
497 partitions[partition] = kerror
498 return mr
499}
500
kesavandc71914f2022-03-25 11:19:03 +0530501func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -0400502 req := reqBody.(*OffsetCommitRequest)
503 group := req.ConsumerGroup
504 res := &OffsetCommitResponse{}
505 for topic, partitions := range req.blocks {
506 for partition := range partitions {
507 res.AddError(topic, partition, mr.getError(group, topic, partition))
508 }
509 }
510 return res
511}
512
513func (mr *MockOffsetCommitResponse) getError(group, topic string, partition int32) KError {
514 topics := mr.errors[group]
515 if topics == nil {
516 return ErrNoError
517 }
518 partitions := topics[topic]
519 if partitions == nil {
520 return ErrNoError
521 }
522 kerror, ok := partitions[partition]
523 if !ok {
524 return ErrNoError
525 }
526 return kerror
527}
528
529// MockProduceResponse is a `ProduceResponse` builder.
530type MockProduceResponse struct {
531 version int16
532 errors map[string]map[int32]KError
533 t TestReporter
534}
535
536func NewMockProduceResponse(t TestReporter) *MockProduceResponse {
537 return &MockProduceResponse{t: t}
538}
539
540func (mr *MockProduceResponse) SetVersion(version int16) *MockProduceResponse {
541 mr.version = version
542 return mr
543}
544
545func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KError) *MockProduceResponse {
546 if mr.errors == nil {
547 mr.errors = make(map[string]map[int32]KError)
548 }
549 partitions := mr.errors[topic]
550 if partitions == nil {
551 partitions = make(map[int32]KError)
552 mr.errors[topic] = partitions
553 }
554 partitions[partition] = kerror
555 return mr
556}
557
kesavandc71914f2022-03-25 11:19:03 +0530558func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -0400559 req := reqBody.(*ProduceRequest)
560 res := &ProduceResponse{
561 Version: mr.version,
562 }
563 for topic, partitions := range req.records {
564 for partition := range partitions {
565 res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
566 }
567 }
568 return res
569}
570
571func (mr *MockProduceResponse) getError(topic string, partition int32) KError {
572 partitions := mr.errors[topic]
573 if partitions == nil {
574 return ErrNoError
575 }
576 kerror, ok := partitions[partition]
577 if !ok {
578 return ErrNoError
579 }
580 return kerror
581}
582
583// MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
584type MockOffsetFetchResponse struct {
585 offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
586 error KError
587 t TestReporter
588}
589
590func NewMockOffsetFetchResponse(t TestReporter) *MockOffsetFetchResponse {
591 return &MockOffsetFetchResponse{t: t}
592}
593
594func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int32, offset int64, metadata string, kerror KError) *MockOffsetFetchResponse {
595 if mr.offsets == nil {
596 mr.offsets = make(map[string]map[string]map[int32]*OffsetFetchResponseBlock)
597 }
598 topics := mr.offsets[group]
599 if topics == nil {
600 topics = make(map[string]map[int32]*OffsetFetchResponseBlock)
601 mr.offsets[group] = topics
602 }
603 partitions := topics[topic]
604 if partitions == nil {
605 partitions = make(map[int32]*OffsetFetchResponseBlock)
606 topics[topic] = partitions
607 }
608 partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
609 return mr
610}
611
612func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse {
613 mr.error = kerror
614 return mr
615}
616
kesavandc71914f2022-03-25 11:19:03 +0530617func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -0400618 req := reqBody.(*OffsetFetchRequest)
619 group := req.ConsumerGroup
620 res := &OffsetFetchResponse{Version: req.Version}
621
622 for topic, partitions := range mr.offsets[group] {
623 for partition, block := range partitions {
624 res.AddBlock(topic, partition, block)
625 }
626 }
627
628 if res.Version >= 2 {
629 res.Err = mr.error
630 }
631 return res
632}
633
634type MockCreateTopicsResponse struct {
635 t TestReporter
636}
637
638func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {
639 return &MockCreateTopicsResponse{t: t}
640}
641
kesavandc71914f2022-03-25 11:19:03 +0530642func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -0400643 req := reqBody.(*CreateTopicsRequest)
644 res := &CreateTopicsResponse{
645 Version: req.Version,
646 }
647 res.TopicErrors = make(map[string]*TopicError)
648
649 for topic := range req.TopicDetails {
650 if res.Version >= 1 && strings.HasPrefix(topic, "_") {
651 msg := "insufficient permissions to create topic with reserved prefix"
652 res.TopicErrors[topic] = &TopicError{
653 Err: ErrTopicAuthorizationFailed,
654 ErrMsg: &msg,
655 }
656 continue
657 }
658 res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
659 }
660 return res
661}
662
663type MockDeleteTopicsResponse struct {
664 t TestReporter
665}
666
667func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse {
668 return &MockDeleteTopicsResponse{t: t}
669}
670
kesavandc71914f2022-03-25 11:19:03 +0530671func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -0400672 req := reqBody.(*DeleteTopicsRequest)
673 res := &DeleteTopicsResponse{}
674 res.TopicErrorCodes = make(map[string]KError)
675
676 for _, topic := range req.Topics {
677 res.TopicErrorCodes[topic] = ErrNoError
678 }
kesavandc71914f2022-03-25 11:19:03 +0530679 res.Version = req.Version
kesavand2cde6582020-06-22 04:56:23 -0400680 return res
681}
682
683type MockCreatePartitionsResponse struct {
684 t TestReporter
685}
686
687func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsResponse {
688 return &MockCreatePartitionsResponse{t: t}
689}
690
kesavandc71914f2022-03-25 11:19:03 +0530691func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -0400692 req := reqBody.(*CreatePartitionsRequest)
693 res := &CreatePartitionsResponse{}
694 res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
695
696 for topic := range req.TopicPartitions {
697 if strings.HasPrefix(topic, "_") {
698 msg := "insufficient permissions to create partition on topic with reserved prefix"
699 res.TopicPartitionErrors[topic] = &TopicPartitionError{
700 Err: ErrTopicAuthorizationFailed,
701 ErrMsg: &msg,
702 }
703 continue
704 }
705 res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
706 }
707 return res
708}
709
kesavandc71914f2022-03-25 11:19:03 +0530710type MockAlterPartitionReassignmentsResponse struct {
711 t TestReporter
712}
713
714func NewMockAlterPartitionReassignmentsResponse(t TestReporter) *MockAlterPartitionReassignmentsResponse {
715 return &MockAlterPartitionReassignmentsResponse{t: t}
716}
717
718func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
719 req := reqBody.(*AlterPartitionReassignmentsRequest)
720 _ = req
721 res := &AlterPartitionReassignmentsResponse{}
722 return res
723}
724
725type MockListPartitionReassignmentsResponse struct {
726 t TestReporter
727}
728
729func NewMockListPartitionReassignmentsResponse(t TestReporter) *MockListPartitionReassignmentsResponse {
730 return &MockListPartitionReassignmentsResponse{t: t}
731}
732
733func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
734 req := reqBody.(*ListPartitionReassignmentsRequest)
735 _ = req
736 res := &ListPartitionReassignmentsResponse{}
737
738 for topic, partitions := range req.blocks {
739 for _, partition := range partitions {
740 res.AddBlock(topic, partition, []int32{0}, []int32{1}, []int32{2})
741 }
742 }
743
744 return res
745}
746
kesavand2cde6582020-06-22 04:56:23 -0400747type MockDeleteRecordsResponse struct {
748 t TestReporter
749}
750
751func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse {
752 return &MockDeleteRecordsResponse{t: t}
753}
754
kesavandc71914f2022-03-25 11:19:03 +0530755func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -0400756 req := reqBody.(*DeleteRecordsRequest)
757 res := &DeleteRecordsResponse{}
758 res.Topics = make(map[string]*DeleteRecordsResponseTopic)
759
760 for topic, deleteRecordRequestTopic := range req.Topics {
761 partitions := make(map[int32]*DeleteRecordsResponsePartition)
762 for partition := range deleteRecordRequestTopic.PartitionOffsets {
763 partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError}
764 }
765 res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions}
766 }
767 return res
768}
769
770type MockDescribeConfigsResponse struct {
771 t TestReporter
772}
773
774func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse {
775 return &MockDescribeConfigsResponse{t: t}
776}
777
kesavandc71914f2022-03-25 11:19:03 +0530778func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -0400779 req := reqBody.(*DescribeConfigsRequest)
kesavandc71914f2022-03-25 11:19:03 +0530780 res := &DescribeConfigsResponse{
781 Version: req.Version,
782 }
783
784 includeSynonyms := req.Version > 0
785 includeSource := req.Version > 0
kesavand2cde6582020-06-22 04:56:23 -0400786
787 for _, r := range req.Resources {
788 var configEntries []*ConfigEntry
789 switch r.Type {
kesavandc71914f2022-03-25 11:19:03 +0530790 case BrokerResource:
kesavand2cde6582020-06-22 04:56:23 -0400791 configEntries = append(configEntries,
kesavandc71914f2022-03-25 11:19:03 +0530792 &ConfigEntry{
793 Name: "min.insync.replicas",
794 Value: "2",
795 ReadOnly: false,
796 Default: false,
797 },
798 )
799 res.Resources = append(res.Resources, &ResourceResponse{
800 Name: r.Name,
801 Configs: configEntries,
802 })
803 case BrokerLoggerResource:
804 configEntries = append(configEntries,
805 &ConfigEntry{
806 Name: "kafka.controller.KafkaController",
807 Value: "DEBUG",
808 ReadOnly: false,
809 Default: false,
810 },
811 )
812 res.Resources = append(res.Resources, &ResourceResponse{
813 Name: r.Name,
814 Configs: configEntries,
815 })
816 case TopicResource:
817 maxMessageBytes := &ConfigEntry{
818 Name: "max.message.bytes",
819 Value: "1000000",
820 ReadOnly: false,
821 Default: !includeSource,
822 Sensitive: false,
823 }
824 if includeSource {
825 maxMessageBytes.Source = SourceDefault
826 }
827 if includeSynonyms {
828 maxMessageBytes.Synonyms = []*ConfigSynonym{
829 {
830 ConfigName: "max.message.bytes",
831 ConfigValue: "500000",
832 },
833 }
834 }
835 retentionMs := &ConfigEntry{
836 Name: "retention.ms",
837 Value: "5000",
838 ReadOnly: false,
839 Default: false,
840 Sensitive: false,
841 }
842 if includeSynonyms {
843 retentionMs.Synonyms = []*ConfigSynonym{
844 {
845 ConfigName: "log.retention.ms",
846 ConfigValue: "2500",
847 },
848 }
849 }
850 password := &ConfigEntry{
851 Name: "password",
852 Value: "12345",
853 ReadOnly: false,
854 Default: false,
855 Sensitive: true,
856 }
857 configEntries = append(
858 configEntries, maxMessageBytes, retentionMs, password)
kesavand2cde6582020-06-22 04:56:23 -0400859 res.Resources = append(res.Resources, &ResourceResponse{
860 Name: r.Name,
861 Configs: configEntries,
862 })
863 }
864 }
865 return res
866}
867
kesavandc71914f2022-03-25 11:19:03 +0530868type MockDescribeConfigsResponseWithErrorCode struct {
869 t TestReporter
870}
871
872func NewMockDescribeConfigsResponseWithErrorCode(t TestReporter) *MockDescribeConfigsResponseWithErrorCode {
873 return &MockDescribeConfigsResponseWithErrorCode{t: t}
874}
875
876func (mr *MockDescribeConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
877 req := reqBody.(*DescribeConfigsRequest)
878 res := &DescribeConfigsResponse{
879 Version: req.Version,
880 }
881
882 for _, r := range req.Resources {
883 res.Resources = append(res.Resources, &ResourceResponse{
884 Name: r.Name,
885 Type: r.Type,
886 ErrorCode: 83,
887 ErrorMsg: "",
888 })
889 }
890 return res
891}
892
kesavand2cde6582020-06-22 04:56:23 -0400893type MockAlterConfigsResponse struct {
894 t TestReporter
895}
896
897func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse {
898 return &MockAlterConfigsResponse{t: t}
899}
900
kesavandc71914f2022-03-25 11:19:03 +0530901func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -0400902 req := reqBody.(*AlterConfigsRequest)
903 res := &AlterConfigsResponse{}
904
905 for _, r := range req.Resources {
kesavandc71914f2022-03-25 11:19:03 +0530906 res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
907 Name: r.Name,
908 Type: r.Type,
kesavand2cde6582020-06-22 04:56:23 -0400909 ErrorMsg: "",
910 })
911 }
912 return res
913}
914
kesavandc71914f2022-03-25 11:19:03 +0530915type MockAlterConfigsResponseWithErrorCode struct {
916 t TestReporter
917}
918
919func NewMockAlterConfigsResponseWithErrorCode(t TestReporter) *MockAlterConfigsResponseWithErrorCode {
920 return &MockAlterConfigsResponseWithErrorCode{t: t}
921}
922
923func (mr *MockAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
924 req := reqBody.(*AlterConfigsRequest)
925 res := &AlterConfigsResponse{}
926
927 for _, r := range req.Resources {
928 res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
929 Name: r.Name,
930 Type: r.Type,
931 ErrorCode: 83,
932 ErrorMsg: "",
933 })
934 }
935 return res
936}
937
938type MockIncrementalAlterConfigsResponse struct {
939 t TestReporter
940}
941
942func NewMockIncrementalAlterConfigsResponse(t TestReporter) *MockIncrementalAlterConfigsResponse {
943 return &MockIncrementalAlterConfigsResponse{t: t}
944}
945
946func (mr *MockIncrementalAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
947 req := reqBody.(*IncrementalAlterConfigsRequest)
948 res := &IncrementalAlterConfigsResponse{}
949
950 for _, r := range req.Resources {
951 res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
952 Name: r.Name,
953 Type: r.Type,
954 ErrorMsg: "",
955 })
956 }
957 return res
958}
959
960type MockIncrementalAlterConfigsResponseWithErrorCode struct {
961 t TestReporter
962}
963
964func NewMockIncrementalAlterConfigsResponseWithErrorCode(t TestReporter) *MockIncrementalAlterConfigsResponseWithErrorCode {
965 return &MockIncrementalAlterConfigsResponseWithErrorCode{t: t}
966}
967
968func (mr *MockIncrementalAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
969 req := reqBody.(*IncrementalAlterConfigsRequest)
970 res := &IncrementalAlterConfigsResponse{}
971
972 for _, r := range req.Resources {
973 res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
974 Name: r.Name,
975 Type: r.Type,
976 ErrorCode: 83,
977 ErrorMsg: "",
978 })
979 }
980 return res
981}
982
kesavand2cde6582020-06-22 04:56:23 -0400983type MockCreateAclsResponse struct {
984 t TestReporter
985}
986
987func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse {
988 return &MockCreateAclsResponse{t: t}
989}
990
kesavandc71914f2022-03-25 11:19:03 +0530991func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -0400992 req := reqBody.(*CreateAclsRequest)
993 res := &CreateAclsResponse{}
994
995 for range req.AclCreations {
996 res.AclCreationResponses = append(res.AclCreationResponses, &AclCreationResponse{Err: ErrNoError})
997 }
998 return res
999}
1000
1001type MockListAclsResponse struct {
1002 t TestReporter
1003}
1004
1005func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse {
1006 return &MockListAclsResponse{t: t}
1007}
1008
kesavandc71914f2022-03-25 11:19:03 +05301009func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -04001010 req := reqBody.(*DescribeAclsRequest)
1011 res := &DescribeAclsResponse{}
kesavand2cde6582020-06-22 04:56:23 -04001012 res.Err = ErrNoError
1013 acl := &ResourceAcls{}
kesavandc71914f2022-03-25 11:19:03 +05301014 if req.ResourceName != nil {
1015 acl.Resource.ResourceName = *req.ResourceName
1016 }
1017 acl.Resource.ResourcePatternType = req.ResourcePatternTypeFilter
kesavand2cde6582020-06-22 04:56:23 -04001018 acl.Resource.ResourceType = req.ResourceType
kesavand2cde6582020-06-22 04:56:23 -04001019
kesavandc71914f2022-03-25 11:19:03 +05301020 host := "*"
1021 if req.Host != nil {
1022 host = *req.Host
1023 }
1024
1025 principal := "User:test"
1026 if req.Principal != nil {
1027 principal = *req.Principal
1028 }
1029
1030 permissionType := req.PermissionType
1031 if permissionType == AclPermissionAny {
1032 permissionType = AclPermissionAllow
1033 }
1034
1035 acl.Acls = append(acl.Acls, &Acl{Operation: req.Operation, PermissionType: permissionType, Host: host, Principal: principal})
1036 res.ResourceAcls = append(res.ResourceAcls, acl)
1037 res.Version = int16(req.Version)
kesavand2cde6582020-06-22 04:56:23 -04001038 return res
1039}
1040
1041type MockSaslAuthenticateResponse struct {
1042 t TestReporter
1043 kerror KError
1044 saslAuthBytes []byte
1045}
1046
1047func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateResponse {
1048 return &MockSaslAuthenticateResponse{t: t}
1049}
1050
kesavandc71914f2022-03-25 11:19:03 +05301051func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -04001052 res := &SaslAuthenticateResponse{}
1053 res.Err = msar.kerror
1054 res.SaslAuthBytes = msar.saslAuthBytes
1055 return res
1056}
1057
1058func (msar *MockSaslAuthenticateResponse) SetError(kerror KError) *MockSaslAuthenticateResponse {
1059 msar.kerror = kerror
1060 return msar
1061}
1062
1063func (msar *MockSaslAuthenticateResponse) SetAuthBytes(saslAuthBytes []byte) *MockSaslAuthenticateResponse {
1064 msar.saslAuthBytes = saslAuthBytes
1065 return msar
1066}
1067
1068type MockDeleteAclsResponse struct {
1069 t TestReporter
1070}
1071
1072type MockSaslHandshakeResponse struct {
1073 enabledMechanisms []string
1074 kerror KError
1075 t TestReporter
1076}
1077
1078func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse {
1079 return &MockSaslHandshakeResponse{t: t}
1080}
1081
kesavandc71914f2022-03-25 11:19:03 +05301082func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -04001083 res := &SaslHandshakeResponse{}
1084 res.Err = mshr.kerror
1085 res.EnabledMechanisms = mshr.enabledMechanisms
1086 return res
1087}
1088
1089func (mshr *MockSaslHandshakeResponse) SetError(kerror KError) *MockSaslHandshakeResponse {
1090 mshr.kerror = kerror
1091 return mshr
1092}
1093
1094func (mshr *MockSaslHandshakeResponse) SetEnabledMechanisms(enabledMechanisms []string) *MockSaslHandshakeResponse {
1095 mshr.enabledMechanisms = enabledMechanisms
1096 return mshr
1097}
1098
1099func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
1100 return &MockDeleteAclsResponse{t: t}
1101}
1102
kesavandc71914f2022-03-25 11:19:03 +05301103func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -04001104 req := reqBody.(*DeleteAclsRequest)
1105 res := &DeleteAclsResponse{}
1106
1107 for range req.Filters {
1108 response := &FilterResponse{Err: ErrNoError}
1109 response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError})
1110 res.FilterResponses = append(res.FilterResponses, response)
1111 }
kesavandc71914f2022-03-25 11:19:03 +05301112 res.Version = int16(req.Version)
kesavand2cde6582020-06-22 04:56:23 -04001113 return res
1114}
1115
1116type MockDeleteGroupsResponse struct {
1117 deletedGroups []string
1118}
1119
1120func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse {
1121 return &MockDeleteGroupsResponse{}
1122}
1123
1124func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse {
1125 m.deletedGroups = groups
1126 return m
1127}
1128
kesavandc71914f2022-03-25 11:19:03 +05301129func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
kesavand2cde6582020-06-22 04:56:23 -04001130 resp := &DeleteGroupsResponse{
1131 GroupErrorCodes: map[string]KError{},
1132 }
1133 for _, group := range m.deletedGroups {
1134 resp.GroupErrorCodes[group] = ErrNoError
1135 }
1136 return resp
1137}
kesavandc71914f2022-03-25 11:19:03 +05301138
1139type MockDeleteOffsetResponse struct {
1140 errorCode KError
1141 topic string
1142 partition int32
1143 errorPartition KError
1144}
1145
1146func NewMockDeleteOffsetRequest(t TestReporter) *MockDeleteOffsetResponse {
1147 return &MockDeleteOffsetResponse{}
1148}
1149
1150func (m *MockDeleteOffsetResponse) SetDeletedOffset(errorCode KError, topic string, partition int32, errorPartition KError) *MockDeleteOffsetResponse {
1151 m.errorCode = errorCode
1152 m.topic = topic
1153 m.partition = partition
1154 m.errorPartition = errorPartition
1155 return m
1156}
1157
1158func (m *MockDeleteOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {
1159 resp := &DeleteOffsetsResponse{
1160 ErrorCode: m.errorCode,
1161 Errors: map[string]map[int32]KError{
1162 m.topic: {m.partition: m.errorPartition},
1163 },
1164 }
1165 return resp
1166}
1167
1168type MockJoinGroupResponse struct {
1169 t TestReporter
1170
1171 ThrottleTime int32
1172 Err KError
1173 GenerationId int32
1174 GroupProtocol string
1175 LeaderId string
1176 MemberId string
1177 Members map[string][]byte
1178}
1179
1180func NewMockJoinGroupResponse(t TestReporter) *MockJoinGroupResponse {
1181 return &MockJoinGroupResponse{
1182 t: t,
1183 Members: make(map[string][]byte),
1184 }
1185}
1186
1187func (m *MockJoinGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
1188 req := reqBody.(*JoinGroupRequest)
1189 resp := &JoinGroupResponse{
1190 Version: req.Version,
1191 ThrottleTime: m.ThrottleTime,
1192 Err: m.Err,
1193 GenerationId: m.GenerationId,
1194 GroupProtocol: m.GroupProtocol,
1195 LeaderId: m.LeaderId,
1196 MemberId: m.MemberId,
1197 Members: m.Members,
1198 }
1199 return resp
1200}
1201
1202func (m *MockJoinGroupResponse) SetThrottleTime(t int32) *MockJoinGroupResponse {
1203 m.ThrottleTime = t
1204 return m
1205}
1206
1207func (m *MockJoinGroupResponse) SetError(kerr KError) *MockJoinGroupResponse {
1208 m.Err = kerr
1209 return m
1210}
1211
1212func (m *MockJoinGroupResponse) SetGenerationId(id int32) *MockJoinGroupResponse {
1213 m.GenerationId = id
1214 return m
1215}
1216
1217func (m *MockJoinGroupResponse) SetGroupProtocol(proto string) *MockJoinGroupResponse {
1218 m.GroupProtocol = proto
1219 return m
1220}
1221
1222func (m *MockJoinGroupResponse) SetLeaderId(id string) *MockJoinGroupResponse {
1223 m.LeaderId = id
1224 return m
1225}
1226
1227func (m *MockJoinGroupResponse) SetMemberId(id string) *MockJoinGroupResponse {
1228 m.MemberId = id
1229 return m
1230}
1231
1232func (m *MockJoinGroupResponse) SetMember(id string, meta *ConsumerGroupMemberMetadata) *MockJoinGroupResponse {
1233 bin, err := encode(meta, nil)
1234 if err != nil {
1235 panic(fmt.Sprintf("error encoding member metadata: %v", err))
1236 }
1237 m.Members[id] = bin
1238 return m
1239}
1240
1241type MockLeaveGroupResponse struct {
1242 t TestReporter
1243
1244 Err KError
1245}
1246
1247func NewMockLeaveGroupResponse(t TestReporter) *MockLeaveGroupResponse {
1248 return &MockLeaveGroupResponse{t: t}
1249}
1250
1251func (m *MockLeaveGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
1252 resp := &LeaveGroupResponse{
1253 Err: m.Err,
1254 }
1255 return resp
1256}
1257
1258func (m *MockLeaveGroupResponse) SetError(kerr KError) *MockLeaveGroupResponse {
1259 m.Err = kerr
1260 return m
1261}
1262
1263type MockSyncGroupResponse struct {
1264 t TestReporter
1265
1266 Err KError
1267 MemberAssignment []byte
1268}
1269
1270func NewMockSyncGroupResponse(t TestReporter) *MockSyncGroupResponse {
1271 return &MockSyncGroupResponse{t: t}
1272}
1273
1274func (m *MockSyncGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
1275 resp := &SyncGroupResponse{
1276 Err: m.Err,
1277 MemberAssignment: m.MemberAssignment,
1278 }
1279 return resp
1280}
1281
1282func (m *MockSyncGroupResponse) SetError(kerr KError) *MockSyncGroupResponse {
1283 m.Err = kerr
1284 return m
1285}
1286
1287func (m *MockSyncGroupResponse) SetMemberAssignment(assignment *ConsumerGroupMemberAssignment) *MockSyncGroupResponse {
1288 bin, err := encode(assignment, nil)
1289 if err != nil {
1290 panic(fmt.Sprintf("error encoding member assignment: %v", err))
1291 }
1292 m.MemberAssignment = bin
1293 return m
1294}
1295
1296type MockHeartbeatResponse struct {
1297 t TestReporter
1298
1299 Err KError
1300}
1301
1302func NewMockHeartbeatResponse(t TestReporter) *MockHeartbeatResponse {
1303 return &MockHeartbeatResponse{t: t}
1304}
1305
1306func (m *MockHeartbeatResponse) For(reqBody versionedDecoder) encoderWithHeader {
1307 resp := &HeartbeatResponse{}
1308 return resp
1309}
1310
1311func (m *MockHeartbeatResponse) SetError(kerr KError) *MockHeartbeatResponse {
1312 m.Err = kerr
1313 return m
1314}
1315
1316type MockDescribeLogDirsResponse struct {
1317 t TestReporter
1318 logDirs []DescribeLogDirsResponseDirMetadata
1319}
1320
1321func NewMockDescribeLogDirsResponse(t TestReporter) *MockDescribeLogDirsResponse {
1322 return &MockDescribeLogDirsResponse{t: t}
1323}
1324
1325func (m *MockDescribeLogDirsResponse) SetLogDirs(logDirPath string, topicPartitions map[string]int) *MockDescribeLogDirsResponse {
1326 var topics []DescribeLogDirsResponseTopic
1327 for topic := range topicPartitions {
1328 var partitions []DescribeLogDirsResponsePartition
1329 for i := 0; i < topicPartitions[topic]; i++ {
1330 partitions = append(partitions, DescribeLogDirsResponsePartition{
1331 PartitionID: int32(i),
1332 IsTemporary: false,
1333 OffsetLag: int64(0),
1334 Size: int64(1234),
1335 })
1336 }
1337 topics = append(topics, DescribeLogDirsResponseTopic{
1338 Topic: topic,
1339 Partitions: partitions,
1340 })
1341 }
1342 logDir := DescribeLogDirsResponseDirMetadata{
1343 ErrorCode: ErrNoError,
1344 Path: logDirPath,
1345 Topics: topics,
1346 }
1347 m.logDirs = []DescribeLogDirsResponseDirMetadata{logDir}
1348 return m
1349}
1350
1351func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithHeader {
1352 resp := &DescribeLogDirsResponse{
1353 LogDirs: m.logDirs,
1354 }
1355 return resp
1356}
1357
1358type MockApiVersionsResponse struct {
1359 t TestReporter
1360 apiKeys []ApiVersionsResponseKey
1361}
1362
1363func NewMockApiVersionsResponse(t TestReporter) *MockApiVersionsResponse {
1364 return &MockApiVersionsResponse{
1365 t: t,
1366 apiKeys: []ApiVersionsResponseKey{
1367 {
1368 ApiKey: 0,
1369 MinVersion: 5,
1370 MaxVersion: 8,
1371 },
1372 {
1373 ApiKey: 1,
1374 MinVersion: 7,
1375 MaxVersion: 11,
1376 },
1377 },
1378 }
1379}
1380
1381func (m *MockApiVersionsResponse) SetApiKeys(apiKeys []ApiVersionsResponseKey) *MockApiVersionsResponse {
1382 m.apiKeys = apiKeys
1383 return m
1384}
1385
1386func (m *MockApiVersionsResponse) For(reqBody versionedDecoder) encoderWithHeader {
1387 req := reqBody.(*ApiVersionsRequest)
1388 res := &ApiVersionsResponse{
1389 Version: req.Version,
1390 ApiKeys: m.apiKeys,
1391 }
1392 return res
1393}