[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/Shopify/sarama/mockresponses.go b/vendor/github.com/Shopify/sarama/mockresponses.go
index c78f0ac..6654ed0 100644
--- a/vendor/github.com/Shopify/sarama/mockresponses.go
+++ b/vendor/github.com/Shopify/sarama/mockresponses.go
@@ -18,20 +18,20 @@
 // allows generating a response based on a request body. MockResponses are used
 // to program behavior of MockBroker in tests.
 type MockResponse interface {
-	For(reqBody versionedDecoder) (res encoder)
+	For(reqBody versionedDecoder) (res encoderWithHeader)
 }
 
 // MockWrapper is a mock response builder that returns a particular concrete
 // response regardless of the actual request passed to the `For` method.
 type MockWrapper struct {
-	res encoder
+	res encoderWithHeader
 }
 
-func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoder) {
+func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoderWithHeader) {
 	return mw.res
 }
 
-func NewMockWrapper(res encoder) *MockWrapper {
+func NewMockWrapper(res encoderWithHeader) *MockWrapper {
 	return &MockWrapper{res: res}
 }
 
@@ -50,7 +50,7 @@
 		switch res := res.(type) {
 		case MockResponse:
 			ms.responses[i] = res
-		case encoder:
+		case encoderWithHeader:
 			ms.responses[i] = NewMockWrapper(res)
 		default:
 			panic(fmt.Sprintf("Unexpected response type: %T", res))
@@ -59,7 +59,7 @@
 	return ms
 }
 
-func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) {
+func (mc *MockSequence) For(reqBody versionedDecoder) (res encoderWithHeader) {
 	res = mc.responses[0].For(reqBody)
 	if len(mc.responses) > 1 {
 		mc.responses = mc.responses[1:]
@@ -79,7 +79,7 @@
 	}
 }
 
-func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoder {
+func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	request := reqBody.(*ListGroupsRequest)
 	_ = request
 	response := &ListGroupsResponse{
@@ -110,7 +110,7 @@
 	return m
 }
 
-func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoder {
+func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	request := reqBody.(*DescribeGroupsRequest)
 
 	response := &DescribeGroupsResponse{}
@@ -166,7 +166,7 @@
 	return mmr
 }
 
-func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
+func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	metadataRequest := reqBody.(*MetadataRequest)
 	metadataResponse := &MetadataResponse{
 		Version:      metadataRequest.version(),
@@ -177,8 +177,8 @@
 	}
 
 	// Generate set of replicas
-	replicas := []int32{}
-	offlineReplicas := []int32{}
+	var replicas []int32
+	var offlineReplicas []int32
 	for _, brokerID := range mmr.brokers {
 		replicas = append(replicas, brokerID)
 	}
@@ -233,7 +233,7 @@
 	return mor
 }
 
-func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder {
+func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	offsetRequest := reqBody.(*OffsetRequest)
 	offsetResponse := &OffsetResponse{Version: mor.version}
 	for topic, partitions := range offsetRequest.blocks {
@@ -309,7 +309,7 @@
 	return mfr
 }
 
-func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder {
+func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	fetchRequest := reqBody.(*FetchRequest)
 	res := &FetchResponse{
 		Version: mfr.version,
@@ -393,7 +393,7 @@
 	return mr
 }
 
-func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*ConsumerMetadataRequest)
 	group := req.ConsumerGroup
 	res := &ConsumerMetadataResponse{}
@@ -442,7 +442,7 @@
 	return mr
 }
 
-func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*FindCoordinatorRequest)
 	res := &FindCoordinatorResponse{}
 	var v interface{}
@@ -489,7 +489,7 @@
 	return mr
 }
 
-func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*OffsetCommitRequest)
 	group := req.ConsumerGroup
 	res := &OffsetCommitResponse{}
@@ -546,7 +546,7 @@
 	return mr
 }
 
-func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*ProduceRequest)
 	res := &ProduceResponse{
 		Version: mr.version,
@@ -605,7 +605,7 @@
 	return mr
 }
 
-func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*OffsetFetchRequest)
 	group := req.ConsumerGroup
 	res := &OffsetFetchResponse{Version: req.Version}
@@ -630,7 +630,7 @@
 	return &MockCreateTopicsResponse{t: t}
 }
 
-func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*CreateTopicsRequest)
 	res := &CreateTopicsResponse{
 		Version: req.Version,
@@ -659,7 +659,7 @@
 	return &MockDeleteTopicsResponse{t: t}
 }
 
-func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*DeleteTopicsRequest)
 	res := &DeleteTopicsResponse{}
 	res.TopicErrorCodes = make(map[string]KError)
@@ -667,6 +667,7 @@
 	for _, topic := range req.Topics {
 		res.TopicErrorCodes[topic] = ErrNoError
 	}
+	res.Version = req.Version
 	return res
 }
 
@@ -678,7 +679,7 @@
 	return &MockCreatePartitionsResponse{t: t}
 }
 
-func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*CreatePartitionsRequest)
 	res := &CreatePartitionsResponse{}
 	res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
@@ -697,6 +698,43 @@
 	return res
 }
 
+type MockAlterPartitionReassignmentsResponse struct {
+	t TestReporter
+}
+
+func NewMockAlterPartitionReassignmentsResponse(t TestReporter) *MockAlterPartitionReassignmentsResponse {
+	return &MockAlterPartitionReassignmentsResponse{t: t}
+}
+
+func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
+	req := reqBody.(*AlterPartitionReassignmentsRequest)
+	_ = req
+	res := &AlterPartitionReassignmentsResponse{}
+	return res
+}
+
+type MockListPartitionReassignmentsResponse struct {
+	t TestReporter
+}
+
+func NewMockListPartitionReassignmentsResponse(t TestReporter) *MockListPartitionReassignmentsResponse {
+	return &MockListPartitionReassignmentsResponse{t: t}
+}
+
+func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
+	req := reqBody.(*ListPartitionReassignmentsRequest)
+	_ = req
+	res := &ListPartitionReassignmentsResponse{}
+
+	for topic, partitions := range req.blocks {
+		for _, partition := range partitions {
+			res.AddBlock(topic, partition, []int32{0}, []int32{1}, []int32{2})
+		}
+	}
+
+	return res
+}
+
 type MockDeleteRecordsResponse struct {
 	t TestReporter
 }
@@ -705,7 +743,7 @@
 	return &MockDeleteRecordsResponse{t: t}
 }
 
-func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*DeleteRecordsRequest)
 	res := &DeleteRecordsResponse{}
 	res.Topics = make(map[string]*DeleteRecordsResponseTopic)
@@ -728,31 +766,87 @@
 	return &MockDescribeConfigsResponse{t: t}
 }
 
-func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*DescribeConfigsRequest)
-	res := &DescribeConfigsResponse{}
+	res := &DescribeConfigsResponse{
+		Version: req.Version,
+	}
+
+	includeSynonyms := req.Version > 0
+	includeSource := req.Version > 0
 
 	for _, r := range req.Resources {
 		var configEntries []*ConfigEntry
 		switch r.Type {
-		case TopicResource:
+		case BrokerResource:
 			configEntries = append(configEntries,
-				&ConfigEntry{Name: "max.message.bytes",
-					Value:     "1000000",
-					ReadOnly:  false,
-					Default:   true,
-					Sensitive: false,
-				}, &ConfigEntry{Name: "retention.ms",
-					Value:     "5000",
-					ReadOnly:  false,
-					Default:   false,
-					Sensitive: false,
-				}, &ConfigEntry{Name: "password",
-					Value:     "12345",
-					ReadOnly:  false,
-					Default:   false,
-					Sensitive: true,
-				})
+				&ConfigEntry{
+					Name:     "min.insync.replicas",
+					Value:    "2",
+					ReadOnly: false,
+					Default:  false,
+				},
+			)
+			res.Resources = append(res.Resources, &ResourceResponse{
+				Name:    r.Name,
+				Configs: configEntries,
+			})
+		case BrokerLoggerResource:
+			configEntries = append(configEntries,
+				&ConfigEntry{
+					Name:     "kafka.controller.KafkaController",
+					Value:    "DEBUG",
+					ReadOnly: false,
+					Default:  false,
+				},
+			)
+			res.Resources = append(res.Resources, &ResourceResponse{
+				Name:    r.Name,
+				Configs: configEntries,
+			})
+		case TopicResource:
+			maxMessageBytes := &ConfigEntry{
+				Name:      "max.message.bytes",
+				Value:     "1000000",
+				ReadOnly:  false,
+				Default:   !includeSource,
+				Sensitive: false,
+			}
+			if includeSource {
+				maxMessageBytes.Source = SourceDefault
+			}
+			if includeSynonyms {
+				maxMessageBytes.Synonyms = []*ConfigSynonym{
+					{
+						ConfigName:  "max.message.bytes",
+						ConfigValue: "500000",
+					},
+				}
+			}
+			retentionMs := &ConfigEntry{
+				Name:      "retention.ms",
+				Value:     "5000",
+				ReadOnly:  false,
+				Default:   false,
+				Sensitive: false,
+			}
+			if includeSynonyms {
+				retentionMs.Synonyms = []*ConfigSynonym{
+					{
+						ConfigName:  "log.retention.ms",
+						ConfigValue: "2500",
+					},
+				}
+			}
+			password := &ConfigEntry{
+				Name:      "password",
+				Value:     "12345",
+				ReadOnly:  false,
+				Default:   false,
+				Sensitive: true,
+			}
+			configEntries = append(
+				configEntries, maxMessageBytes, retentionMs, password)
 			res.Resources = append(res.Resources, &ResourceResponse{
 				Name:    r.Name,
 				Configs: configEntries,
@@ -762,6 +856,31 @@
 	return res
 }
 
+type MockDescribeConfigsResponseWithErrorCode struct {
+	t TestReporter
+}
+
+func NewMockDescribeConfigsResponseWithErrorCode(t TestReporter) *MockDescribeConfigsResponseWithErrorCode {
+	return &MockDescribeConfigsResponseWithErrorCode{t: t}
+}
+
+func (mr *MockDescribeConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
+	req := reqBody.(*DescribeConfigsRequest)
+	res := &DescribeConfigsResponse{
+		Version: req.Version,
+	}
+
+	for _, r := range req.Resources {
+		res.Resources = append(res.Resources, &ResourceResponse{
+			Name:      r.Name,
+			Type:      r.Type,
+			ErrorCode: 83,
+			ErrorMsg:  "",
+		})
+	}
+	return res
+}
+
 type MockAlterConfigsResponse struct {
 	t TestReporter
 }
@@ -770,19 +889,43 @@
 	return &MockAlterConfigsResponse{t: t}
 }
 
-func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*AlterConfigsRequest)
 	res := &AlterConfigsResponse{}
 
 	for _, r := range req.Resources {
-		res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name,
-			Type:     TopicResource,
+		res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
+			Name:     r.Name,
+			Type:     r.Type,
 			ErrorMsg: "",
 		})
 	}
 	return res
 }
 
+type MockAlterConfigsResponseWithErrorCode struct {
+	t TestReporter
+}
+
+func NewMockAlterConfigsResponseWithErrorCode(t TestReporter) *MockAlterConfigsResponseWithErrorCode {
+	return &MockAlterConfigsResponseWithErrorCode{t: t}
+}
+
+func (mr *MockAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
+	req := reqBody.(*AlterConfigsRequest)
+	res := &AlterConfigsResponse{}
+
+	for _, r := range req.Resources {
+		res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
+			Name:      r.Name,
+			Type:      r.Type,
+			ErrorCode: 83,
+			ErrorMsg:  "",
+		})
+	}
+	return res
+}
+
 type MockCreateAclsResponse struct {
 	t TestReporter
 }
@@ -791,7 +934,7 @@
 	return &MockCreateAclsResponse{t: t}
 }
 
-func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*CreateAclsRequest)
 	res := &CreateAclsResponse{}
 
@@ -809,17 +952,35 @@
 	return &MockListAclsResponse{t: t}
 }
 
-func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*DescribeAclsRequest)
 	res := &DescribeAclsResponse{}
-
 	res.Err = ErrNoError
 	acl := &ResourceAcls{}
-	acl.Resource.ResourceName = *req.ResourceName
+	if req.ResourceName != nil {
+		acl.Resource.ResourceName = *req.ResourceName
+	}
+	acl.Resource.ResourcePatternType = req.ResourcePatternTypeFilter
 	acl.Resource.ResourceType = req.ResourceType
-	acl.Acls = append(acl.Acls, &Acl{})
-	res.ResourceAcls = append(res.ResourceAcls, acl)
 
+	host := "*"
+	if req.Host != nil {
+		host = *req.Host
+	}
+
+	principal := "User:test"
+	if req.Principal != nil {
+		principal = *req.Principal
+	}
+
+	permissionType := req.PermissionType
+	if permissionType == AclPermissionAny {
+		permissionType = AclPermissionAllow
+	}
+
+	acl.Acls = append(acl.Acls, &Acl{Operation: req.Operation, PermissionType: permissionType, Host: host, Principal: principal})
+	res.ResourceAcls = append(res.ResourceAcls, acl)
+	res.Version = int16(req.Version)
 	return res
 }
 
@@ -833,7 +994,7 @@
 	return &MockSaslAuthenticateResponse{t: t}
 }
 
-func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoder {
+func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	res := &SaslAuthenticateResponse{}
 	res.Err = msar.kerror
 	res.SaslAuthBytes = msar.saslAuthBytes
@@ -864,7 +1025,7 @@
 	return &MockSaslHandshakeResponse{t: t}
 }
 
-func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoder {
+func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	res := &SaslHandshakeResponse{}
 	res.Err = mshr.kerror
 	res.EnabledMechanisms = mshr.enabledMechanisms
@@ -885,7 +1046,7 @@
 	return &MockDeleteAclsResponse{t: t}
 }
 
-func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*DeleteAclsRequest)
 	res := &DeleteAclsResponse{}
 
@@ -894,6 +1055,7 @@
 		response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError})
 		res.FilterResponses = append(res.FilterResponses, response)
 	}
+	res.Version = int16(req.Version)
 	return res
 }
 
@@ -910,7 +1072,7 @@
 	return m
 }
 
-func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoder {
+func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	resp := &DeleteGroupsResponse{
 		GroupErrorCodes: map[string]KError{},
 	}
@@ -919,3 +1081,193 @@
 	}
 	return resp
 }
+
+type MockJoinGroupResponse struct {
+	t TestReporter
+
+	ThrottleTime  int32
+	Err           KError
+	GenerationId  int32
+	GroupProtocol string
+	LeaderId      string
+	MemberId      string
+	Members       map[string][]byte
+}
+
+func NewMockJoinGroupResponse(t TestReporter) *MockJoinGroupResponse {
+	return &MockJoinGroupResponse{
+		t:       t,
+		Members: make(map[string][]byte),
+	}
+}
+
+func (m *MockJoinGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
+	req := reqBody.(*JoinGroupRequest)
+	resp := &JoinGroupResponse{
+		Version:       req.Version,
+		ThrottleTime:  m.ThrottleTime,
+		Err:           m.Err,
+		GenerationId:  m.GenerationId,
+		GroupProtocol: m.GroupProtocol,
+		LeaderId:      m.LeaderId,
+		MemberId:      m.MemberId,
+		Members:       m.Members,
+	}
+	return resp
+}
+
+func (m *MockJoinGroupResponse) SetThrottleTime(t int32) *MockJoinGroupResponse {
+	m.ThrottleTime = t
+	return m
+}
+
+func (m *MockJoinGroupResponse) SetError(kerr KError) *MockJoinGroupResponse {
+	m.Err = kerr
+	return m
+}
+
+func (m *MockJoinGroupResponse) SetGenerationId(id int32) *MockJoinGroupResponse {
+	m.GenerationId = id
+	return m
+}
+
+func (m *MockJoinGroupResponse) SetGroupProtocol(proto string) *MockJoinGroupResponse {
+	m.GroupProtocol = proto
+	return m
+}
+
+func (m *MockJoinGroupResponse) SetLeaderId(id string) *MockJoinGroupResponse {
+	m.LeaderId = id
+	return m
+}
+
+func (m *MockJoinGroupResponse) SetMemberId(id string) *MockJoinGroupResponse {
+	m.MemberId = id
+	return m
+}
+
+func (m *MockJoinGroupResponse) SetMember(id string, meta *ConsumerGroupMemberMetadata) *MockJoinGroupResponse {
+	bin, err := encode(meta, nil)
+	if err != nil {
+		panic(fmt.Sprintf("error encoding member metadata: %v", err))
+	}
+	m.Members[id] = bin
+	return m
+}
+
+type MockLeaveGroupResponse struct {
+	t TestReporter
+
+	Err KError
+}
+
+func NewMockLeaveGroupResponse(t TestReporter) *MockLeaveGroupResponse {
+	return &MockLeaveGroupResponse{t: t}
+}
+
+func (m *MockLeaveGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
+	resp := &LeaveGroupResponse{
+		Err: m.Err,
+	}
+	return resp
+}
+
+func (m *MockLeaveGroupResponse) SetError(kerr KError) *MockLeaveGroupResponse {
+	m.Err = kerr
+	return m
+}
+
+type MockSyncGroupResponse struct {
+	t TestReporter
+
+	Err              KError
+	MemberAssignment []byte
+}
+
+func NewMockSyncGroupResponse(t TestReporter) *MockSyncGroupResponse {
+	return &MockSyncGroupResponse{t: t}
+}
+
+func (m *MockSyncGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
+	resp := &SyncGroupResponse{
+		Err:              m.Err,
+		MemberAssignment: m.MemberAssignment,
+	}
+	return resp
+}
+
+func (m *MockSyncGroupResponse) SetError(kerr KError) *MockSyncGroupResponse {
+	m.Err = kerr
+	return m
+}
+
+func (m *MockSyncGroupResponse) SetMemberAssignment(assignment *ConsumerGroupMemberAssignment) *MockSyncGroupResponse {
+	bin, err := encode(assignment, nil)
+	if err != nil {
+		panic(fmt.Sprintf("error encoding member assignment: %v", err))
+	}
+	m.MemberAssignment = bin
+	return m
+}
+
+type MockHeartbeatResponse struct {
+	t TestReporter
+
+	Err KError
+}
+
+func NewMockHeartbeatResponse(t TestReporter) *MockHeartbeatResponse {
+	return &MockHeartbeatResponse{t: t}
+}
+
+func (m *MockHeartbeatResponse) For(reqBody versionedDecoder) encoderWithHeader {
+	resp := &HeartbeatResponse{}
+	return resp
+}
+
+func (m *MockHeartbeatResponse) SetError(kerr KError) *MockHeartbeatResponse {
+	m.Err = kerr
+	return m
+}
+
+type MockDescribeLogDirsResponse struct {
+	t       TestReporter
+	logDirs []DescribeLogDirsResponseDirMetadata
+}
+
+func NewMockDescribeLogDirsResponse(t TestReporter) *MockDescribeLogDirsResponse {
+	return &MockDescribeLogDirsResponse{t: t}
+}
+
+func (m *MockDescribeLogDirsResponse) SetLogDirs(logDirPath string, topicPartitions map[string]int) *MockDescribeLogDirsResponse {
+	var topics []DescribeLogDirsResponseTopic
+	for topic := range topicPartitions {
+		var partitions []DescribeLogDirsResponsePartition
+		for i := 0; i < topicPartitions[topic]; i++ {
+			partitions = append(partitions, DescribeLogDirsResponsePartition{
+				PartitionID: int32(i),
+				IsTemporary: false,
+				OffsetLag:   int64(0),
+				Size:        int64(1234),
+			})
+		}
+		topics = append(topics, DescribeLogDirsResponseTopic{
+			Topic:      topic,
+			Partitions: partitions,
+		})
+	}
+	logDir := DescribeLogDirsResponseDirMetadata{
+		ErrorCode: ErrNoError,
+		Path:      logDirPath,
+		Topics:    topics,
+	}
+	m.logDirs = []DescribeLogDirsResponseDirMetadata{logDir}
+	return m
+}
+
+func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithHeader {
+	resp := &DescribeLogDirsResponse{
+		LogDirs: m.logDirs,
+	}
+	return resp
+}