VOL-1691 Fix openolt adapter getting stuck while registartion with core
Change-Id: Ide8131f325bc15f1b909e14d7af6ee9bcd6b3b5b
diff --git a/vendor/gopkg.in/Shopify/sarama.v1/mockresponses.go b/vendor/gopkg.in/Shopify/sarama.v1/mockresponses.go
index 348c223..c78f0ac 100644
--- a/vendor/gopkg.in/Shopify/sarama.v1/mockresponses.go
+++ b/vendor/gopkg.in/Shopify/sarama.v1/mockresponses.go
@@ -2,6 +2,7 @@
import (
"fmt"
+ "strings"
)
// TestReporter has methods matching go's testing.T to avoid importing
@@ -177,7 +178,7 @@
// Generate set of replicas
replicas := []int32{}
-
+ offlineReplicas := []int32{}
for _, brokerID := range mmr.brokers {
replicas = append(replicas, brokerID)
}
@@ -185,14 +186,14 @@
if len(metadataRequest.Topics) == 0 {
for topic, partitions := range mmr.leaders {
for partition, brokerID := range partitions {
- metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, ErrNoError)
+ metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
}
}
return metadataResponse
}
for _, topic := range metadataRequest.Topics {
for partition, brokerID := range mmr.leaders[topic] {
- metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, ErrNoError)
+ metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, offlineReplicas, ErrNoError)
}
}
return metadataResponse
@@ -573,6 +574,7 @@
// MockOffsetFetchResponse is a `OffsetFetchResponse` builder.
type MockOffsetFetchResponse struct {
offsets map[string]map[string]map[int32]*OffsetFetchResponseBlock
+ error KError
t TestReporter
}
@@ -598,15 +600,25 @@
return mr
}
+func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchResponse {
+ mr.error = kerror
+ return mr
+}
+
func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
req := reqBody.(*OffsetFetchRequest)
group := req.ConsumerGroup
- res := &OffsetFetchResponse{}
+ res := &OffsetFetchResponse{Version: req.Version}
+
for topic, partitions := range mr.offsets[group] {
for partition, block := range partitions {
res.AddBlock(topic, partition, block)
}
}
+
+ if res.Version >= 2 {
+ res.Err = mr.error
+ }
return res
}
@@ -620,10 +632,20 @@
func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder {
req := reqBody.(*CreateTopicsRequest)
- res := &CreateTopicsResponse{}
+ res := &CreateTopicsResponse{
+ Version: req.Version,
+ }
res.TopicErrors = make(map[string]*TopicError)
- for topic, _ := range req.TopicDetails {
+ for topic := range req.TopicDetails {
+ if res.Version >= 1 && strings.HasPrefix(topic, "_") {
+ msg := "insufficient permissions to create topic with reserved prefix"
+ res.TopicErrors[topic] = &TopicError{
+ Err: ErrTopicAuthorizationFailed,
+ ErrMsg: &msg,
+ }
+ continue
+ }
res.TopicErrors[topic] = &TopicError{Err: ErrNoError}
}
return res
@@ -661,7 +683,15 @@
res := &CreatePartitionsResponse{}
res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
- for topic, _ := range req.TopicPartitions {
+ for topic := range req.TopicPartitions {
+ if strings.HasPrefix(topic, "_") {
+ msg := "insufficient permissions to create partition on topic with reserved prefix"
+ res.TopicPartitionErrors[topic] = &TopicPartitionError{
+ Err: ErrTopicAuthorizationFailed,
+ ErrMsg: &msg,
+ }
+ continue
+ }
res.TopicPartitionErrors[topic] = &TopicPartitionError{Err: ErrNoError}
}
return res
@@ -682,7 +712,7 @@
for topic, deleteRecordRequestTopic := range req.Topics {
partitions := make(map[int32]*DeleteRecordsResponsePartition)
- for partition, _ := range deleteRecordRequestTopic.PartitionOffsets {
+ for partition := range deleteRecordRequestTopic.PartitionOffsets {
partitions[partition] = &DeleteRecordsResponsePartition{Err: ErrNoError}
}
res.Topics[topic] = &DeleteRecordsResponseTopic{Partitions: partitions}
@@ -866,3 +896,26 @@
}
return res
}
+
+type MockDeleteGroupsResponse struct {
+ deletedGroups []string
+}
+
+func NewMockDeleteGroupsRequest(t TestReporter) *MockDeleteGroupsResponse {
+ return &MockDeleteGroupsResponse{}
+}
+
+func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDeleteGroupsResponse {
+ m.deletedGroups = groups
+ return m
+}
+
+func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoder {
+ resp := &DeleteGroupsResponse{
+ GroupErrorCodes: map[string]KError{},
+ }
+ for _, group := range m.deletedGroups {
+ resp.GroupErrorCodes[group] = ErrNoError
+ }
+ return resp
+}