[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/Shopify/sarama/mockresponses.go b/vendor/github.com/Shopify/sarama/mockresponses.go
index c78f0ac..6654ed0 100644
--- a/vendor/github.com/Shopify/sarama/mockresponses.go
+++ b/vendor/github.com/Shopify/sarama/mockresponses.go
@@ -18,20 +18,20 @@
// allows generating a response based on a request body. MockResponses are used
// to program behavior of MockBroker in tests.
type MockResponse interface {
- For(reqBody versionedDecoder) (res encoder)
+ For(reqBody versionedDecoder) (res encoderWithHeader)
}
// MockWrapper is a mock response builder that returns a particular concrete
// response regardless of the actual request passed to the `For` method.
type MockWrapper struct {
- res encoder
+ res encoderWithHeader
}
-func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoder) {
+func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoderWithHeader) {
return mw.res
}
-func NewMockWrapper(res encoder) *MockWrapper {
+func NewMockWrapper(res encoderWithHeader) *MockWrapper {
return &MockWrapper{res: res}
}
@@ -50,7 +50,7 @@
switch res := res.(type) {
case MockResponse:
ms.responses[i] = res
- case encoder:
+ case encoderWithHeader:
ms.responses[i] = NewMockWrapper(res)
default:
panic(fmt.Sprintf("Unexpected response type: %T", res))
@@ -59,7 +59,7 @@
return ms
}
-func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) {
+func (mc *MockSequence) For(reqBody versionedDecoder) (res encoderWithHeader) {
res = mc.responses[0].For(reqBody)
if len(mc.responses) > 1 {
mc.responses = mc.responses[1:]
@@ -79,7 +79,7 @@
}
}
-func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoder {
+func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
request := reqBody.(*ListGroupsRequest)
_ = request
response := &ListGroupsResponse{
@@ -110,7 +110,7 @@
return m
}
-func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoder {
+func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
request := reqBody.(*DescribeGroupsRequest)
response := &DescribeGroupsResponse{}
@@ -166,7 +166,7 @@
return mmr
}
-func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
+func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
metadataRequest := reqBody.(*MetadataRequest)
metadataResponse := &MetadataResponse{
Version: metadataRequest.version(),
@@ -177,8 +177,8 @@
}
// Generate set of replicas
- replicas := []int32{}
- offlineReplicas := []int32{}
+ var replicas []int32
+ var offlineReplicas []int32
for _, brokerID := range mmr.brokers {
replicas = append(replicas, brokerID)
}
@@ -233,7 +233,7 @@
return mor
}
-func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder {
+func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {
offsetRequest := reqBody.(*OffsetRequest)
offsetResponse := &OffsetResponse{Version: mor.version}
for topic, partitions := range offsetRequest.blocks {
@@ -309,7 +309,7 @@
return mfr
}
-func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder {
+func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
fetchRequest := reqBody.(*FetchRequest)
res := &FetchResponse{
Version: mfr.version,
@@ -393,7 +393,7 @@
return mr
}
-func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*ConsumerMetadataRequest)
group := req.ConsumerGroup
res := &ConsumerMetadataResponse{}
@@ -442,7 +442,7 @@
return mr
}
-func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*FindCoordinatorRequest)
res := &FindCoordinatorResponse{}
var v interface{}
@@ -489,7 +489,7 @@
return mr
}
-func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*OffsetCommitRequest)
group := req.ConsumerGroup
res := &OffsetCommitResponse{}
@@ -546,7 +546,7 @@
return mr
}
-func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*ProduceRequest)
res := &ProduceResponse{
Version: mr.version,
@@ -605,7 +605,7 @@
return mr
}
-func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*OffsetFetchRequest)
group := req.ConsumerGroup
res := &OffsetFetchResponse{Version: req.Version}
@@ -630,7 +630,7 @@
return &MockCreateTopicsResponse{t: t}
}
-func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*CreateTopicsRequest)
res := &CreateTopicsResponse{
Version: req.Version,
@@ -659,7 +659,7 @@
return &MockDeleteTopicsResponse{t: t}
}
-func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*DeleteTopicsRequest)
res := &DeleteTopicsResponse{}
res.TopicErrorCodes = make(map[string]KError)
@@ -667,6 +667,7 @@
for _, topic := range req.Topics {
res.TopicErrorCodes[topic] = ErrNoError
}
+ res.Version = req.Version
return res
}
@@ -678,7 +679,7 @@
return &MockCreatePartitionsResponse{t: t}
}
-func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*CreatePartitionsRequest)
res := &CreatePartitionsResponse{}
res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
@@ -697,6 +698,43 @@
return res
}
+type MockAlterPartitionReassignmentsResponse struct {
+ t TestReporter
+}
+
+func NewMockAlterPartitionReassignmentsResponse(t TestReporter) *MockAlterPartitionReassignmentsResponse {
+ return &MockAlterPartitionReassignmentsResponse{t: t}
+}
+
+func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
+ req := reqBody.(*AlterPartitionReassignmentsRequest)
+ _ = req
+ res := &AlterPartitionReassignmentsResponse{}
+ return res
+}
+
+type MockListPartitionReassignmentsResponse struct {
+ t TestReporter
+}
+
+func NewMockListPartitionReassignmentsResponse(t TestReporter) *MockListPartitionReassignmentsResponse {
+ return &MockListPartitionReassignmentsResponse{t: t}
+}
+
+func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
+ req := reqBody.(*ListPartitionReassignmentsRequest)
+ _ = req
+ res := &ListPartitionReassignmentsResponse{}
+
+ for topic, partitions := range req.blocks {
+ for _, partition := range partitions {
+ res.AddBlock(topic, partition, []int32{0}, []int32{1}, []int32{2})
+ }
+ }
+
+ return res
+}
+
type MockDeleteRecordsResponse struct {
t TestReporter
}
@@ -705,7 +743,7 @@
return &MockDeleteRecordsResponse{t: t}
}
-func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*DeleteRecordsRequest)
res := &DeleteRecordsResponse{}
res.Topics = make(map[string]*DeleteRecordsResponseTopic)
@@ -728,31 +766,87 @@
return &MockDescribeConfigsResponse{t: t}
}
-func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*DescribeConfigsRequest)
- res := &DescribeConfigsResponse{}
+ res := &DescribeConfigsResponse{
+ Version: req.Version,
+ }
+
+ includeSynonyms := req.Version > 0
+ includeSource := req.Version > 0
for _, r := range req.Resources {
var configEntries []*ConfigEntry
switch r.Type {
- case TopicResource:
+ case BrokerResource:
configEntries = append(configEntries,
- &ConfigEntry{Name: "max.message.bytes",
- Value: "1000000",
- ReadOnly: false,
- Default: true,
- Sensitive: false,
- }, &ConfigEntry{Name: "retention.ms",
- Value: "5000",
- ReadOnly: false,
- Default: false,
- Sensitive: false,
- }, &ConfigEntry{Name: "password",
- Value: "12345",
- ReadOnly: false,
- Default: false,
- Sensitive: true,
- })
+ &ConfigEntry{
+ Name: "min.insync.replicas",
+ Value: "2",
+ ReadOnly: false,
+ Default: false,
+ },
+ )
+ res.Resources = append(res.Resources, &ResourceResponse{
+ Name: r.Name,
+ Configs: configEntries,
+ })
+ case BrokerLoggerResource:
+ configEntries = append(configEntries,
+ &ConfigEntry{
+ Name: "kafka.controller.KafkaController",
+ Value: "DEBUG",
+ ReadOnly: false,
+ Default: false,
+ },
+ )
+ res.Resources = append(res.Resources, &ResourceResponse{
+ Name: r.Name,
+ Configs: configEntries,
+ })
+ case TopicResource:
+ maxMessageBytes := &ConfigEntry{
+ Name: "max.message.bytes",
+ Value: "1000000",
+ ReadOnly: false,
+ Default: !includeSource,
+ Sensitive: false,
+ }
+ if includeSource {
+ maxMessageBytes.Source = SourceDefault
+ }
+ if includeSynonyms {
+ maxMessageBytes.Synonyms = []*ConfigSynonym{
+ {
+ ConfigName: "max.message.bytes",
+ ConfigValue: "500000",
+ },
+ }
+ }
+ retentionMs := &ConfigEntry{
+ Name: "retention.ms",
+ Value: "5000",
+ ReadOnly: false,
+ Default: false,
+ Sensitive: false,
+ }
+ if includeSynonyms {
+ retentionMs.Synonyms = []*ConfigSynonym{
+ {
+ ConfigName: "log.retention.ms",
+ ConfigValue: "2500",
+ },
+ }
+ }
+ password := &ConfigEntry{
+ Name: "password",
+ Value: "12345",
+ ReadOnly: false,
+ Default: false,
+ Sensitive: true,
+ }
+ configEntries = append(
+ configEntries, maxMessageBytes, retentionMs, password)
res.Resources = append(res.Resources, &ResourceResponse{
Name: r.Name,
Configs: configEntries,
@@ -762,6 +856,31 @@
return res
}
+type MockDescribeConfigsResponseWithErrorCode struct {
+ t TestReporter
+}
+
+func NewMockDescribeConfigsResponseWithErrorCode(t TestReporter) *MockDescribeConfigsResponseWithErrorCode {
+ return &MockDescribeConfigsResponseWithErrorCode{t: t}
+}
+
+func (mr *MockDescribeConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
+ req := reqBody.(*DescribeConfigsRequest)
+ res := &DescribeConfigsResponse{
+ Version: req.Version,
+ }
+
+ for _, r := range req.Resources {
+ res.Resources = append(res.Resources, &ResourceResponse{
+ Name: r.Name,
+ Type: r.Type,
+ ErrorCode: 83,
+ ErrorMsg: "",
+ })
+ }
+ return res
+}
+
type MockAlterConfigsResponse struct {
t TestReporter
}
@@ -770,19 +889,43 @@
return &MockAlterConfigsResponse{t: t}
}
-func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*AlterConfigsRequest)
res := &AlterConfigsResponse{}
for _, r := range req.Resources {
- res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name,
- Type: TopicResource,
+ res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
+ Name: r.Name,
+ Type: r.Type,
ErrorMsg: "",
})
}
return res
}
+type MockAlterConfigsResponseWithErrorCode struct {
+ t TestReporter
+}
+
+func NewMockAlterConfigsResponseWithErrorCode(t TestReporter) *MockAlterConfigsResponseWithErrorCode {
+ return &MockAlterConfigsResponseWithErrorCode{t: t}
+}
+
+func (mr *MockAlterConfigsResponseWithErrorCode) For(reqBody versionedDecoder) encoderWithHeader {
+ req := reqBody.(*AlterConfigsRequest)
+ res := &AlterConfigsResponse{}
+
+ for _, r := range req.Resources {
+ res.Resources = append(res.Resources, &AlterConfigsResourceResponse{
+ Name: r.Name,
+ Type: r.Type,
+ ErrorCode: 83,
+ ErrorMsg: "",
+ })
+ }
+ return res
+}
+
type MockCreateAclsResponse struct {
t TestReporter
}
@@ -791,7 +934,7 @@
return &MockCreateAclsResponse{t: t}
}
-func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*CreateAclsRequest)
res := &CreateAclsResponse{}
@@ -809,17 +952,35 @@
return &MockListAclsResponse{t: t}
}
-func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*DescribeAclsRequest)
res := &DescribeAclsResponse{}
-
res.Err = ErrNoError
acl := &ResourceAcls{}
- acl.Resource.ResourceName = *req.ResourceName
+ if req.ResourceName != nil {
+ acl.Resource.ResourceName = *req.ResourceName
+ }
+ acl.Resource.ResourcePatternType = req.ResourcePatternTypeFilter
acl.Resource.ResourceType = req.ResourceType
- acl.Acls = append(acl.Acls, &Acl{})
- res.ResourceAcls = append(res.ResourceAcls, acl)
+ host := "*"
+ if req.Host != nil {
+ host = *req.Host
+ }
+
+ principal := "User:test"
+ if req.Principal != nil {
+ principal = *req.Principal
+ }
+
+ permissionType := req.PermissionType
+ if permissionType == AclPermissionAny {
+ permissionType = AclPermissionAllow
+ }
+
+ acl.Acls = append(acl.Acls, &Acl{Operation: req.Operation, PermissionType: permissionType, Host: host, Principal: principal})
+ res.ResourceAcls = append(res.ResourceAcls, acl)
+ res.Version = int16(req.Version)
return res
}
@@ -833,7 +994,7 @@
return &MockSaslAuthenticateResponse{t: t}
}
-func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoder {
+func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoderWithHeader {
res := &SaslAuthenticateResponse{}
res.Err = msar.kerror
res.SaslAuthBytes = msar.saslAuthBytes
@@ -864,7 +1025,7 @@
return &MockSaslHandshakeResponse{t: t}
}
-func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoder {
+func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoderWithHeader {
res := &SaslHandshakeResponse{}
res.Err = mshr.kerror
res.EnabledMechanisms = mshr.enabledMechanisms
@@ -885,7 +1046,7 @@
return &MockDeleteAclsResponse{t: t}
}
-func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
req := reqBody.(*DeleteAclsRequest)
res := &DeleteAclsResponse{}
@@ -894,6 +1055,7 @@
response.MatchingAcls = append(response.MatchingAcls, &MatchingAcl{Err: ErrNoError})
res.FilterResponses = append(res.FilterResponses, response)
}
+ res.Version = int16(req.Version)
return res
}
@@ -910,7 +1072,7 @@
return m
}
-func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoder {
+func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
resp := &DeleteGroupsResponse{
GroupErrorCodes: map[string]KError{},
}
@@ -919,3 +1081,193 @@
}
return resp
}
+
+type MockJoinGroupResponse struct {
+ t TestReporter
+
+ ThrottleTime int32
+ Err KError
+ GenerationId int32
+ GroupProtocol string
+ LeaderId string
+ MemberId string
+ Members map[string][]byte
+}
+
+func NewMockJoinGroupResponse(t TestReporter) *MockJoinGroupResponse {
+ return &MockJoinGroupResponse{
+ t: t,
+ Members: make(map[string][]byte),
+ }
+}
+
+func (m *MockJoinGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
+ req := reqBody.(*JoinGroupRequest)
+ resp := &JoinGroupResponse{
+ Version: req.Version,
+ ThrottleTime: m.ThrottleTime,
+ Err: m.Err,
+ GenerationId: m.GenerationId,
+ GroupProtocol: m.GroupProtocol,
+ LeaderId: m.LeaderId,
+ MemberId: m.MemberId,
+ Members: m.Members,
+ }
+ return resp
+}
+
+func (m *MockJoinGroupResponse) SetThrottleTime(t int32) *MockJoinGroupResponse {
+ m.ThrottleTime = t
+ return m
+}
+
+func (m *MockJoinGroupResponse) SetError(kerr KError) *MockJoinGroupResponse {
+ m.Err = kerr
+ return m
+}
+
+func (m *MockJoinGroupResponse) SetGenerationId(id int32) *MockJoinGroupResponse {
+ m.GenerationId = id
+ return m
+}
+
+func (m *MockJoinGroupResponse) SetGroupProtocol(proto string) *MockJoinGroupResponse {
+ m.GroupProtocol = proto
+ return m
+}
+
+func (m *MockJoinGroupResponse) SetLeaderId(id string) *MockJoinGroupResponse {
+ m.LeaderId = id
+ return m
+}
+
+func (m *MockJoinGroupResponse) SetMemberId(id string) *MockJoinGroupResponse {
+ m.MemberId = id
+ return m
+}
+
+func (m *MockJoinGroupResponse) SetMember(id string, meta *ConsumerGroupMemberMetadata) *MockJoinGroupResponse {
+ bin, err := encode(meta, nil)
+ if err != nil {
+ panic(fmt.Sprintf("error encoding member metadata: %v", err))
+ }
+ m.Members[id] = bin
+ return m
+}
+
+type MockLeaveGroupResponse struct {
+ t TestReporter
+
+ Err KError
+}
+
+func NewMockLeaveGroupResponse(t TestReporter) *MockLeaveGroupResponse {
+ return &MockLeaveGroupResponse{t: t}
+}
+
+func (m *MockLeaveGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
+ resp := &LeaveGroupResponse{
+ Err: m.Err,
+ }
+ return resp
+}
+
+func (m *MockLeaveGroupResponse) SetError(kerr KError) *MockLeaveGroupResponse {
+ m.Err = kerr
+ return m
+}
+
+type MockSyncGroupResponse struct {
+ t TestReporter
+
+ Err KError
+ MemberAssignment []byte
+}
+
+func NewMockSyncGroupResponse(t TestReporter) *MockSyncGroupResponse {
+ return &MockSyncGroupResponse{t: t}
+}
+
+func (m *MockSyncGroupResponse) For(reqBody versionedDecoder) encoderWithHeader {
+ resp := &SyncGroupResponse{
+ Err: m.Err,
+ MemberAssignment: m.MemberAssignment,
+ }
+ return resp
+}
+
+func (m *MockSyncGroupResponse) SetError(kerr KError) *MockSyncGroupResponse {
+ m.Err = kerr
+ return m
+}
+
+func (m *MockSyncGroupResponse) SetMemberAssignment(assignment *ConsumerGroupMemberAssignment) *MockSyncGroupResponse {
+ bin, err := encode(assignment, nil)
+ if err != nil {
+ panic(fmt.Sprintf("error encoding member assignment: %v", err))
+ }
+ m.MemberAssignment = bin
+ return m
+}
+
+type MockHeartbeatResponse struct {
+ t TestReporter
+
+ Err KError
+}
+
+func NewMockHeartbeatResponse(t TestReporter) *MockHeartbeatResponse {
+ return &MockHeartbeatResponse{t: t}
+}
+
+func (m *MockHeartbeatResponse) For(reqBody versionedDecoder) encoderWithHeader {
+ resp := &HeartbeatResponse{}
+ return resp
+}
+
+func (m *MockHeartbeatResponse) SetError(kerr KError) *MockHeartbeatResponse {
+ m.Err = kerr
+ return m
+}
+
+type MockDescribeLogDirsResponse struct {
+ t TestReporter
+ logDirs []DescribeLogDirsResponseDirMetadata
+}
+
+func NewMockDescribeLogDirsResponse(t TestReporter) *MockDescribeLogDirsResponse {
+ return &MockDescribeLogDirsResponse{t: t}
+}
+
+func (m *MockDescribeLogDirsResponse) SetLogDirs(logDirPath string, topicPartitions map[string]int) *MockDescribeLogDirsResponse {
+ var topics []DescribeLogDirsResponseTopic
+ for topic := range topicPartitions {
+ var partitions []DescribeLogDirsResponsePartition
+ for i := 0; i < topicPartitions[topic]; i++ {
+ partitions = append(partitions, DescribeLogDirsResponsePartition{
+ PartitionID: int32(i),
+ IsTemporary: false,
+ OffsetLag: int64(0),
+ Size: int64(1234),
+ })
+ }
+ topics = append(topics, DescribeLogDirsResponseTopic{
+ Topic: topic,
+ Partitions: partitions,
+ })
+ }
+ logDir := DescribeLogDirsResponseDirMetadata{
+ ErrorCode: ErrNoError,
+ Path: logDirPath,
+ Topics: topics,
+ }
+ m.logDirs = []DescribeLogDirsResponseDirMetadata{logDir}
+ return m
+}
+
+func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithHeader {
+ resp := &DescribeLogDirsResponse{
+ LogDirs: m.logDirs,
+ }
+ return resp
+}