[VOL-1349] EPON OLT adapter (package B)
Change-Id: I634ef62c53813dcf4456f54948f13e06358e263c
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/client.go
new file mode 100644
index 0000000..d977e38
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/client.go
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+ "context"
+ ca "github.com/opencord/voltha-protos/v3/go/inter_container"
+ "time"
+)
+
+const (
+ PartitionConsumer = iota
+ GroupCustomer = iota
+)
+
+const (
+ OffsetNewest = -1
+ OffsetOldest = -2
+)
+
+const (
+ GroupIdKey = "groupId"
+ Offset = "offset"
+)
+
+const (
+ DefaultKafkaHost = "127.0.0.1"
+ DefaultKafkaPort = 9092
+ DefaultKafkaAddress = DefaultKafkaHost + ":" + string(DefaultKafkaPort)
+ DefaultGroupName = "voltha"
+ DefaultSleepOnError = 1
+ DefaultProducerFlushFrequency = 10
+ DefaultProducerFlushMessages = 10
+ DefaultProducerFlushMaxmessages = 100
+ DefaultProducerReturnSuccess = true
+ DefaultProducerReturnErrors = true
+ DefaultProducerRetryMax = 3
+ DefaultProducerRetryBackoff = time.Millisecond * 100
+ DefaultConsumerMaxwait = 100
+ DefaultMaxProcessingTime = 100
+ DefaultConsumerType = PartitionConsumer
+ DefaultNumberPartitions = 3
+ DefaultNumberReplicas = 1
+ DefaultAutoCreateTopic = false
+ DefaultMetadataMaxRetry = 3
+ DefaultLivenessChannelInterval = time.Second * 30
+)
+
+// MsgClient represents the set of APIs a Kafka MsgClient must implement
+type Client interface {
+ Start(ctx context.Context) error
+ Stop(ctx context.Context)
+ CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error
+ DeleteTopic(ctx context.Context, topic *Topic) error
+ Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan *ca.InterContainerMessage, error)
+ UnSubscribe(ctx context.Context, topic *Topic, ch <-chan *ca.InterContainerMessage) error
+ SubscribeForMetadata(context.Context, func(fromTopic string, timestamp time.Time))
+ Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error
+ SendLiveness(ctx context.Context) error
+ EnableLivenessChannel(ctx context.Context, enable bool) chan bool
+ EnableHealthinessChannel(ctx context.Context, enable bool) chan bool
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/common.go
new file mode 100644
index 0000000..f229d46
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/common.go
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.CLogger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go
new file mode 100644
index 0000000..266f6c1
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go
@@ -0,0 +1,352 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+ "context"
+ "fmt"
+ "github.com/buraksezer/consistent"
+ "github.com/cespare/xxhash"
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "sync"
+)
+
+const (
+ // All the values below can be tuned to get optimal data distribution. The numbers below seems to work well when
+ // supporting 1000-10000 devices and 1 - 20 replicas of a service
+
+ // Keys are distributed among partitions. Prime numbers are good to distribute keys uniformly.
+ DefaultPartitionCount = 1117
+
+ // Represents how many times a node is replicated on the consistent ring.
+ DefaultReplicationFactor = 117
+
+ // Load is used to calculate average load.
+ DefaultLoad = 1.1
+)
+
+type Endpoint string // Endpoint of a service instance. When using kafka, this is the topic name of a service
+type ReplicaID int32 // The replication ID of a service instance
+
+type EndpointManager interface {
+
+ // GetEndpoint is called to get the endpoint to communicate with for a specific device and service type. For
+ // now this will return the topic name
+ GetEndpoint(ctx context.Context, deviceID string, serviceType string) (Endpoint, error)
+
+ // IsDeviceOwnedByService is invoked when a specific service (service type + replicaNumber) is restarted and
+ // devices owned by that service need to be reconciled
+ IsDeviceOwnedByService(ctx context.Context, deviceID string, serviceType string, replicaNumber int32) (bool, error)
+
+ // GetReplicaAssignment returns the replica number of the service that owns the deviceID. This is used by the
+ // test only
+ GetReplicaAssignment(ctx context.Context, deviceID string, serviceType string) (ReplicaID, error)
+}
+
+type service struct {
+ id string // Id of the service. The same id is used for all replicas
+ totalReplicas int32
+ replicas map[ReplicaID]Endpoint
+ consistentRing *consistent.Consistent
+}
+
+type endpointManager struct {
+ partitionCount int
+ replicationFactor int
+ load float64
+ backend *db.Backend
+ services map[string]*service
+ servicesLock sync.RWMutex
+ deviceTypeServiceMap map[string]string
+ deviceTypeServiceMapLock sync.RWMutex
+}
+
+type EndpointManagerOption func(*endpointManager)
+
+func PartitionCount(count int) EndpointManagerOption {
+ return func(args *endpointManager) {
+ args.partitionCount = count
+ }
+}
+
+func ReplicationFactor(replicas int) EndpointManagerOption {
+ return func(args *endpointManager) {
+ args.replicationFactor = replicas
+ }
+}
+
+func Load(load float64) EndpointManagerOption {
+ return func(args *endpointManager) {
+ args.load = load
+ }
+}
+
+func newEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+ tm := &endpointManager{
+ partitionCount: DefaultPartitionCount,
+ replicationFactor: DefaultReplicationFactor,
+ load: DefaultLoad,
+ backend: backend,
+ services: make(map[string]*service),
+ deviceTypeServiceMap: make(map[string]string),
+ }
+
+ for _, option := range opts {
+ option(tm)
+ }
+ return tm
+}
+
+func NewEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+ return newEndpointManager(backend, opts...)
+}
+
+func (ep *endpointManager) GetEndpoint(ctx context.Context, deviceID string, serviceType string) (Endpoint, error) {
+ logger.Debugw(ctx, "getting-endpoint", log.Fields{"device-id": deviceID, "service": serviceType})
+ owner, err := ep.getOwner(ctx, deviceID, serviceType)
+ if err != nil {
+ return "", err
+ }
+ m, ok := owner.(Member)
+ if !ok {
+ return "", status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+ }
+ endpoint := m.getEndPoint()
+ if endpoint == "" {
+ return "", status.Errorf(codes.Unavailable, "endpoint-not-set-%s", serviceType)
+ }
+ logger.Debugw(ctx, "returning-endpoint", log.Fields{"device-id": deviceID, "service": serviceType, "endpoint": endpoint})
+ return endpoint, nil
+}
+
+func (ep *endpointManager) IsDeviceOwnedByService(ctx context.Context, deviceID string, serviceType string, replicaNumber int32) (bool, error) {
+ logger.Debugw(ctx, "device-ownership", log.Fields{"device-id": deviceID, "service": serviceType, "replica-number": replicaNumber})
+ owner, err := ep.getOwner(ctx, deviceID, serviceType)
+ if err != nil {
+ return false, nil
+ }
+ m, ok := owner.(Member)
+ if !ok {
+ return false, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+ }
+ return m.getReplica() == ReplicaID(replicaNumber), nil
+}
+
+func (ep *endpointManager) GetReplicaAssignment(ctx context.Context, deviceID string, serviceType string) (ReplicaID, error) {
+ owner, err := ep.getOwner(ctx, deviceID, serviceType)
+ if err != nil {
+ return 0, nil
+ }
+ m, ok := owner.(Member)
+ if !ok {
+ return 0, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+ }
+ return m.getReplica(), nil
+}
+
+func (ep *endpointManager) getOwner(ctx context.Context, deviceID string, serviceType string) (consistent.Member, error) {
+ serv, dType, err := ep.getServiceAndDeviceType(ctx, serviceType)
+ if err != nil {
+ return nil, err
+ }
+ key := ep.makeKey(deviceID, dType, serviceType)
+ return serv.consistentRing.LocateKey(key), nil
+}
+
+func (ep *endpointManager) getServiceAndDeviceType(ctx context.Context, serviceType string) (*service, string, error) {
+ // Check whether service exist
+ ep.servicesLock.RLock()
+ serv, serviceExist := ep.services[serviceType]
+ ep.servicesLock.RUnlock()
+
+ // Load the service and device types if needed
+ if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+ if err := ep.loadServices(ctx); err != nil {
+ return nil, "", err
+ }
+
+ // Check whether the service exists now
+ ep.servicesLock.RLock()
+ serv, serviceExist = ep.services[serviceType]
+ ep.servicesLock.RUnlock()
+ if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+ return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+ }
+ }
+
+ ep.deviceTypeServiceMapLock.RLock()
+ defer ep.deviceTypeServiceMapLock.RUnlock()
+ for dType, sType := range ep.deviceTypeServiceMap {
+ if sType == serviceType {
+ return serv, dType, nil
+ }
+ }
+ return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+}
+
+func (ep *endpointManager) getConsistentConfig() consistent.Config {
+ return consistent.Config{
+ PartitionCount: ep.partitionCount,
+ ReplicationFactor: ep.replicationFactor,
+ Load: ep.load,
+ Hasher: hasher{},
+ }
+}
+
+// loadServices loads the services (adapters) and device types in memory. Because of the small size of the data and
+// the data format in the dB being binary protobuf then it is better to load all the data if inconsistency is detected,
+// instead of watching for updates in the dB and acting on it.
+func (ep *endpointManager) loadServices(ctx context.Context) error {
+ ep.servicesLock.Lock()
+ defer ep.servicesLock.Unlock()
+ ep.deviceTypeServiceMapLock.Lock()
+ defer ep.deviceTypeServiceMapLock.Unlock()
+
+ if ep.backend == nil {
+ return status.Error(codes.Aborted, "backend-not-set")
+ }
+ ep.services = make(map[string]*service)
+ ep.deviceTypeServiceMap = make(map[string]string)
+
+ // Load the adapters
+ blobs, err := ep.backend.List(log.WithSpanFromContext(context.Background(), ctx), "adapters")
+ if err != nil {
+ return err
+ }
+
+ // Data is marshalled as proto bytes in the data store
+ for _, blob := range blobs {
+ data := blob.Value.([]byte)
+ adapter := &voltha.Adapter{}
+ if err := proto.Unmarshal(data, adapter); err != nil {
+ return err
+ }
+ // A valid adapter should have the vendorID set
+ if adapter.Vendor != "" {
+ if _, ok := ep.services[adapter.Type]; !ok {
+ ep.services[adapter.Type] = &service{
+ id: adapter.Type,
+ totalReplicas: adapter.TotalReplicas,
+ replicas: make(map[ReplicaID]Endpoint),
+ consistentRing: consistent.New(nil, ep.getConsistentConfig()),
+ }
+
+ }
+ currentReplica := ReplicaID(adapter.CurrentReplica)
+ endpoint := Endpoint(adapter.Endpoint)
+ ep.services[adapter.Type].replicas[currentReplica] = endpoint
+ ep.services[adapter.Type].consistentRing.Add(newMember(adapter.Id, adapter.Type, adapter.Vendor, endpoint, adapter.Version, currentReplica))
+ }
+ }
+ // Load the device types
+ blobs, err = ep.backend.List(log.WithSpanFromContext(context.Background(), ctx), "device_types")
+ if err != nil {
+ return err
+ }
+ for _, blob := range blobs {
+ data := blob.Value.([]byte)
+ deviceType := &voltha.DeviceType{}
+ if err := proto.Unmarshal(data, deviceType); err != nil {
+ return err
+ }
+ if _, ok := ep.deviceTypeServiceMap[deviceType.Id]; !ok {
+ ep.deviceTypeServiceMap[deviceType.Id] = deviceType.Adapter
+ }
+ }
+
+ // Log the loaded data in debug mode to facilitate trouble shooting
+ if logger.V(log.DebugLevel) {
+ for key, val := range ep.services {
+ members := val.consistentRing.GetMembers()
+ logger.Debugw(ctx, "service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())})
+ for _, m := range members {
+ n := m.(Member)
+ logger.Debugw(ctx, "service-loaded", log.Fields{"serviceId": n.getID(), "serviceType": n.getServiceType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()})
+ }
+ }
+ logger.Debugw(ctx, "device-types-loaded", log.Fields{"device-types": ep.deviceTypeServiceMap})
+ }
+ return nil
+}
+
+// makeKey creates the string that the hash function uses to create the hash
+func (ep *endpointManager) makeKey(deviceID string, deviceType string, serviceType string) []byte {
+ return []byte(fmt.Sprintf("%s_%s_%s", serviceType, deviceType, deviceID))
+}
+
+// The consistent package requires a hasher function
+type hasher struct{}
+
+// Sum64 provides the hasher function. Based upon numerous testing scenarios, the xxhash package seems to provide the
+// best distribution compare to other hash packages
+func (h hasher) Sum64(data []byte) uint64 {
+ return xxhash.Sum64(data)
+}
+
+// Member represents a member on the consistent ring
+type Member interface {
+ String() string
+ getReplica() ReplicaID
+ getEndPoint() Endpoint
+ getID() string
+ getServiceType() string
+}
+
+// member implements the Member interface
+type member struct {
+ id string
+ serviceType string
+ vendor string
+ version string
+ replica ReplicaID
+ endpoint Endpoint
+}
+
+func newMember(ID string, serviceType string, vendor string, endPoint Endpoint, version string, replica ReplicaID) Member {
+ return &member{
+ id: ID,
+ serviceType: serviceType,
+ vendor: vendor,
+ version: version,
+ replica: replica,
+ endpoint: endPoint,
+ }
+}
+
+func (m *member) String() string {
+ return string(m.endpoint)
+}
+
+func (m *member) getReplica() ReplicaID {
+ return m.replica
+}
+
+func (m *member) getEndPoint() Endpoint {
+ return m.endpoint
+}
+
+func (m *member) getID() string {
+ return m.id
+}
+
+func (m *member) getServiceType() string {
+ return m.serviceType
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
new file mode 100644
index 0000000..92d2529
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
@@ -0,0 +1,1085 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "reflect"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/golang/protobuf/ptypes/any"
+ "github.com/google/uuid"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ "github.com/opentracing/opentracing-go"
+)
+
+const (
+ DefaultMaxRetries = 3
+ DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
+)
+
+const (
+ TransactionKey = "transactionID"
+ FromTopic = "fromTopic"
+)
+
+var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
+var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
+
+// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
+// obtained from that channel, this interface is invoked. This is used to handle
+// async requests into the Core via the kafka messaging bus
+type requestHandlerChannel struct {
+ requesthandlerInterface interface{}
+ ch <-chan *ic.InterContainerMessage
+}
+
+// transactionChannel represents a combination of a topic and a channel onto which a response received
+// on the kafka bus will be sent to
+type transactionChannel struct {
+ topic *Topic
+ ch chan *ic.InterContainerMessage
+}
+
+type InterContainerProxy interface {
+ Start(ctx context.Context) error
+ Stop(ctx context.Context)
+ GetDefaultTopic() *Topic
+ InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
+ InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
+ SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error
+ SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error
+ UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error
+ DeleteTopic(ctx context.Context, topic Topic) error
+ EnableLivenessChannel(ctx context.Context, enable bool) chan bool
+ SendLiveness(ctx context.Context) error
+}
+
+// interContainerProxy represents the messaging proxy
+type interContainerProxy struct {
+ kafkaAddress string
+ defaultTopic *Topic
+ defaultRequestHandlerInterface interface{}
+ kafkaClient Client
+ doneCh chan struct{}
+ doneOnce sync.Once
+
+ // This map is used to map a topic to an interface and channel. When a request is received
+ // on that channel (registered to the topic) then that interface is invoked.
+ topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
+ lockTopicRequestHandlerChannelMap sync.RWMutex
+
+ // This map is used to map a channel to a response topic. This channel handles all responses on that
+ // channel for that topic and forward them to the appropriate consumers channel, using the
+ // transactionIdToChannelMap.
+ topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
+ lockTopicResponseChannelMap sync.RWMutex
+
+ // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
+ // sent out and we are waiting for a response.
+ transactionIdToChannelMap map[string]*transactionChannel
+ lockTransactionIdToChannelMap sync.RWMutex
+}
+
+type InterContainerProxyOption func(*interContainerProxy)
+
+func InterContainerAddress(address string) InterContainerProxyOption {
+ return func(args *interContainerProxy) {
+ args.kafkaAddress = address
+ }
+}
+
+func DefaultTopic(topic *Topic) InterContainerProxyOption {
+ return func(args *interContainerProxy) {
+ args.defaultTopic = topic
+ }
+}
+
+func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
+ return func(args *interContainerProxy) {
+ args.defaultRequestHandlerInterface = handler
+ }
+}
+
+func MsgClient(client Client) InterContainerProxyOption {
+ return func(args *interContainerProxy) {
+ args.kafkaClient = client
+ }
+}
+
+func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
+ proxy := &interContainerProxy{
+ kafkaAddress: DefaultKafkaAddress,
+ doneCh: make(chan struct{}),
+ }
+
+ for _, option := range opts {
+ option(proxy)
+ }
+
+ return proxy
+}
+
+func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
+ return newInterContainerProxy(opts...)
+}
+
+func (kp *interContainerProxy) Start(ctx context.Context) error {
+ logger.Info(ctx, "Starting-Proxy")
+
+ // Kafka MsgClient should already have been created. If not, output fatal error
+ if kp.kafkaClient == nil {
+ logger.Fatal(ctx, "kafka-client-not-set")
+ }
+
+ // Start the kafka client
+ if err := kp.kafkaClient.Start(ctx); err != nil {
+ logger.Errorw(ctx, "Cannot-create-kafka-proxy", log.Fields{"error": err})
+ return err
+ }
+
+ // Create the topic to response channel map
+ kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
+ //
+ // Create the transactionId to Channel Map
+ kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
+
+ // Create the topic to request channel map
+ kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
+
+ return nil
+}
+
+func (kp *interContainerProxy) Stop(ctx context.Context) {
+ logger.Info(ctx, "stopping-intercontainer-proxy")
+ kp.doneOnce.Do(func() { close(kp.doneCh) })
+ // TODO : Perform cleanup
+ kp.kafkaClient.Stop(ctx)
+ err := kp.deleteAllTopicRequestHandlerChannelMap(ctx)
+ if err != nil {
+ logger.Errorw(ctx, "failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
+ }
+ err = kp.deleteAllTopicResponseChannelMap(ctx)
+ if err != nil {
+ logger.Errorw(ctx, "failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
+ }
+ kp.deleteAllTransactionIdToChannelMap(ctx)
+}
+
+func (kp *interContainerProxy) GetDefaultTopic() *Topic {
+ return kp.defaultTopic
+}
+
+// InvokeAsyncRPC is used to make an RPC request asynchronously
+func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
+ waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
+
+ logger.Debugw(ctx, "InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
+
+ spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, !waitForResponse)
+ if spanArg != nil {
+ kvArgs = append(kvArgs, &spanArg[0])
+ }
+ defer span.Finish()
+
+ // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
+ // typically the device ID.
+ responseTopic := replyToTopic
+ if responseTopic == nil {
+ responseTopic = kp.GetDefaultTopic()
+ }
+
+ chnl := make(chan *RpcResponse)
+
+ go func() {
+
+ // once we're done,
+ // close the response channel
+ defer close(chnl)
+
+ var err error
+ var protoRequest *ic.InterContainerMessage
+
+ // Encode the request
+ protoRequest, err = encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
+ if err != nil {
+ logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
+ log.MarkSpanError(ctx, errors.New("cannot-format-request"))
+ chnl <- NewResponse(RpcFormattingError, err, nil)
+ return
+ }
+
+ // Subscribe for response, if needed, before sending request
+ var ch <-chan *ic.InterContainerMessage
+ if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
+ logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
+ log.MarkSpanError(ctx, errors.New("failed-to-subscribe-for-response"))
+ chnl <- NewResponse(RpcTransportError, err, nil)
+ return
+ }
+
+ // Send request - if the topic is formatted with a device Id then we will send the request using a
+ // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
+ // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
+ logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
+
+ // if the message is not sent on kafka publish an event an close the channel
+ if err = kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
+ chnl <- NewResponse(RpcTransportError, err, nil)
+ return
+ }
+
+ // if the client is not waiting for a response send the ack and close the channel
+ chnl <- NewResponse(RpcSent, nil, nil)
+ if !waitForResponse {
+ return
+ }
+
+ defer func() {
+ // Remove the subscription for a response on return
+ if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
+ logger.Warnw(ctx, "invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
+ }
+ }()
+
+ // Wait for response as well as timeout or cancellation
+ select {
+ case msg, ok := <-ch:
+ if !ok {
+ logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
+ log.MarkSpanError(ctx, errors.New("channel-closed"))
+ chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
+ }
+ logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
+ if responseBody, err := decodeResponse(ctx, msg); err != nil {
+ chnl <- NewResponse(RpcReply, err, nil)
+ } else {
+ if responseBody.Success {
+ chnl <- NewResponse(RpcReply, nil, responseBody.Result)
+ } else {
+ // response body contains an error
+ unpackErr := &ic.Error{}
+ if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
+ chnl <- NewResponse(RpcReply, err, nil)
+ } else {
+ chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
+ }
+ }
+ }
+ case <-ctx.Done():
+ logger.Errorw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
+ log.MarkSpanError(ctx, errors.New("context-cancelled"))
+ err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
+ chnl <- NewResponse(RpcTimeout, err, nil)
+ case <-kp.doneCh:
+ chnl <- NewResponse(RpcSystemClosing, nil, nil)
+ logger.Warnw(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
+ }
+ }()
+ return chnl
+}
+
+// Method to extract Open-tracing Span from Context and serialize it for transport over Kafka embedded as a additional argument.
+// Additional argument is injected using key as "span" and value as Span marshalled into a byte slice
+//
+// The span name is automatically constructed using the RPC name with following convention (<rpc-name> represents name of invoked method):
+// - RPC invoked in Sync manner (WaitForResponse=true) : kafka-rpc-<rpc-name>
+// - RPC invoked in Async manner (WaitForResponse=false) : kafka-async-rpc-<rpc-name>
+// - Inter Adapter RPC invoked in Sync manner (WaitForResponse=true) : kafka-inter-adapter-rpc-<rpc-name>
+// - Inter Adapter RPC invoked in Async manner (WaitForResponse=false) : kafka-inter-adapter-async-rpc-<rpc-name>
+func (kp *interContainerProxy) embedSpanAsArg(ctx context.Context, rpc string, isAsync bool) ([]KVArg, opentracing.Span, context.Context) {
+ var err error
+ var newCtx context.Context
+ var spanToInject opentracing.Span
+
+ var spanName strings.Builder
+ spanName.WriteString("kafka-")
+
+ // In case of inter adapter message, use Msg Type for constructing RPC name
+ if rpc == "process_inter_adapter_message" {
+ if msgType, ok := ctx.Value("inter-adapter-msg-type").(ic.InterAdapterMessageType_Types); ok {
+ spanName.WriteString("inter-adapter-")
+ rpc = msgType.String()
+ }
+ }
+
+ if isAsync {
+ spanName.WriteString("async-rpc-")
+ } else {
+ spanName.WriteString("rpc-")
+ }
+ spanName.WriteString(rpc)
+
+ if isAsync {
+ spanToInject, newCtx = log.CreateAsyncSpan(ctx, spanName.String())
+ } else {
+ spanToInject, newCtx = log.CreateChildSpan(ctx, spanName.String())
+ }
+
+ spanToInject.SetBaggageItem("rpc-span-name", spanName.String())
+
+ textMapCarrier := opentracing.TextMapCarrier(make(map[string]string))
+ if err = opentracing.GlobalTracer().Inject(spanToInject.Context(), opentracing.TextMap, textMapCarrier); err != nil {
+ logger.Warnw(ctx, "unable-to-serialize-span-to-textmap", log.Fields{"span": spanToInject, "error": err})
+ return nil, spanToInject, newCtx
+ }
+
+ var textMapJson []byte
+ if textMapJson, err = json.Marshal(textMapCarrier); err != nil {
+ logger.Warnw(ctx, "unable-to-marshal-textmap-to-json-string", log.Fields{"textMap": textMapCarrier, "error": err})
+ return nil, spanToInject, newCtx
+ }
+
+ spanArg := make([]KVArg, 1)
+ spanArg[0] = KVArg{Key: "span", Value: &ic.StrType{Val: string(textMapJson)}}
+ return spanArg, spanToInject, newCtx
+}
+
+// InvokeRPC is used to send a request to a given topic
+func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
+ waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
+
+ spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, false)
+ if spanArg != nil {
+ kvArgs = append(kvArgs, &spanArg[0])
+ }
+ defer span.Finish()
+
+ // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
+ // typically the device ID.
+ responseTopic := replyToTopic
+ if responseTopic == nil {
+ responseTopic = kp.defaultTopic
+ }
+
+ // Encode the request
+ protoRequest, err := encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
+ if err != nil {
+ logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
+ log.MarkSpanError(ctx, errors.New("cannot-format-request"))
+ return false, nil
+ }
+
+ // Subscribe for response, if needed, before sending request
+ var ch <-chan *ic.InterContainerMessage
+ if waitForResponse {
+ var err error
+ if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
+ logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
+ }
+ }
+
+ // Send request - if the topic is formatted with a device Id then we will send the request using a
+ // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
+ // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
+ //key := GetDeviceIdFromTopic(*toTopic)
+ logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
+ go func() {
+ if err := kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
+ log.MarkSpanError(ctx, errors.New("send-failed"))
+ logger.Errorw(ctx, "send-failed", log.Fields{
+ "topic": toTopic,
+ "key": key,
+ "error": err})
+ }
+ }()
+
+ if waitForResponse {
+ // Create a child context based on the parent context, if any
+ var cancel context.CancelFunc
+ childCtx := context.Background()
+ if ctx == nil {
+ ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
+ } else {
+ childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
+ }
+ defer cancel()
+
+ // Wait for response as well as timeout or cancellation
+ // Remove the subscription for a response on return
+ defer func() {
+ if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
+ logger.Errorw(ctx, "response-unsubscribe-failed", log.Fields{
+ "id": protoRequest.Header.Id,
+ "error": err})
+ }
+ }()
+ select {
+ case msg, ok := <-ch:
+ if !ok {
+ logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
+ log.MarkSpanError(ctx, errors.New("channel-closed"))
+ protoError := &ic.Error{Reason: "channel-closed"}
+ var marshalledArg *any.Any
+ if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
+ return false, nil // Should never happen
+ }
+ return false, marshalledArg
+ }
+ logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
+ var responseBody *ic.InterContainerResponseBody
+ var err error
+ if responseBody, err = decodeResponse(ctx, msg); err != nil {
+ logger.Errorw(ctx, "decode-response-error", log.Fields{"error": err})
+ // FIXME we should return something
+ }
+ return responseBody.Success, responseBody.Result
+ case <-ctx.Done():
+ logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
+ log.MarkSpanError(ctx, errors.New("context-cancelled"))
+ // pack the error as proto any type
+ protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
+
+ var marshalledArg *any.Any
+ if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
+ return false, nil // Should never happen
+ }
+ return false, marshalledArg
+ case <-childCtx.Done():
+ logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
+ log.MarkSpanError(ctx, errors.New("context-cancelled"))
+ // pack the error as proto any type
+ protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
+
+ var marshalledArg *any.Any
+ if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
+ return false, nil // Should never happen
+ }
+ return false, marshalledArg
+ case <-kp.doneCh:
+ logger.Infow(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
+ return true, nil
+ }
+ }
+ return true, nil
+}
+
+// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
+// when a message is received on a given topic
+func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error {
+
+ // Subscribe to receive messages for that topic
+ var ch <-chan *ic.InterContainerMessage
+ var err error
+ if ch, err = kp.kafkaClient.Subscribe(ctx, &topic); err != nil {
+ //if ch, err = kp.Subscribe(topic); err != nil {
+ logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+ return err
+ }
+
+ kp.defaultRequestHandlerInterface = handler
+ kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
+ // Launch a go routine to receive and process kafka messages
+ go kp.waitForMessages(ctx, ch, topic, handler)
+
+ return nil
+}
+
+// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
+// when a message is received on a given topic. So far there is only 1 target registered per microservice
+func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error {
+ // Subscribe to receive messages for that topic
+ var ch <-chan *ic.InterContainerMessage
+ var err error
+ if ch, err = kp.kafkaClient.Subscribe(ctx, &topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
+ logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+ return err
+ }
+ kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
+
+ // Launch a go routine to receive and process kafka messages
+ go kp.waitForMessages(ctx, ch, topic, kp.defaultRequestHandlerInterface)
+
+ return nil
+}
+
+func (kp *interContainerProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error {
+ return kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name)
+}
+
+func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(ctx context.Context, topic string) error {
+ kp.lockTopicResponseChannelMap.Lock()
+ defer kp.lockTopicResponseChannelMap.Unlock()
+ if _, exist := kp.topicToResponseChannelMap[topic]; exist {
+ // Unsubscribe to this topic first - this will close the subscribed channel
+ var err error
+ if err = kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+ logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic})
+ }
+ delete(kp.topicToResponseChannelMap, topic)
+ return err
+ } else {
+ return fmt.Errorf("%s-Topic-not-found", topic)
+ }
+}
+
+// nolint: unused
+func (kp *interContainerProxy) deleteAllTopicResponseChannelMap(ctx context.Context) error {
+ logger.Debug(ctx, "delete-all-topic-response-channel")
+ kp.lockTopicResponseChannelMap.Lock()
+ defer kp.lockTopicResponseChannelMap.Unlock()
+ var unsubscribeFailTopics []string
+ for topic := range kp.topicToResponseChannelMap {
+ // Unsubscribe to this topic first - this will close the subscribed channel
+ if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+ unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
+ logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
+ // Do not return. Continue to try to unsubscribe to other topics.
+ } else {
+ // Only delete from channel map if successfully unsubscribed.
+ delete(kp.topicToResponseChannelMap, topic)
+ }
+ }
+ if len(unsubscribeFailTopics) > 0 {
+ return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+ }
+ return nil
+}
+
+func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
+ kp.lockTopicRequestHandlerChannelMap.Lock()
+ defer kp.lockTopicRequestHandlerChannelMap.Unlock()
+ if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
+ kp.topicToRequestHandlerChannelMap[topic] = arg
+ }
+}
+
+func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(ctx context.Context, topic string) error {
+ kp.lockTopicRequestHandlerChannelMap.Lock()
+ defer kp.lockTopicRequestHandlerChannelMap.Unlock()
+ if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
+ // Close the kafka client client first by unsubscribing to this topic
+ if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+ return err
+ }
+ delete(kp.topicToRequestHandlerChannelMap, topic)
+ return nil
+ } else {
+ return fmt.Errorf("%s-Topic-not-found", topic)
+ }
+}
+
+// nolint: unused
+func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap(ctx context.Context) error {
+ logger.Debug(ctx, "delete-all-topic-request-channel")
+ kp.lockTopicRequestHandlerChannelMap.Lock()
+ defer kp.lockTopicRequestHandlerChannelMap.Unlock()
+ var unsubscribeFailTopics []string
+ for topic := range kp.topicToRequestHandlerChannelMap {
+ // Close the kafka client client first by unsubscribing to this topic
+ if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+ unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
+ logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
+ // Do not return. Continue to try to unsubscribe to other topics.
+ } else {
+ // Only delete from channel map if successfully unsubscribed.
+ delete(kp.topicToRequestHandlerChannelMap, topic)
+ }
+ }
+ if len(unsubscribeFailTopics) > 0 {
+ return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+ }
+ return nil
+}
+
+func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
+ kp.lockTransactionIdToChannelMap.Lock()
+ defer kp.lockTransactionIdToChannelMap.Unlock()
+ if _, exist := kp.transactionIdToChannelMap[id]; !exist {
+ kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
+ }
+}
+
+func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
+ kp.lockTransactionIdToChannelMap.Lock()
+ defer kp.lockTransactionIdToChannelMap.Unlock()
+ if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
+ // Close the channel first
+ close(transChannel.ch)
+ delete(kp.transactionIdToChannelMap, id)
+ }
+}
+
+func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
+ kp.lockTransactionIdToChannelMap.Lock()
+ defer kp.lockTransactionIdToChannelMap.Unlock()
+ for key, value := range kp.transactionIdToChannelMap {
+ if value.topic.Name == id {
+ close(value.ch)
+ delete(kp.transactionIdToChannelMap, key)
+ }
+ }
+}
+
+// nolint: unused
+func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap(ctx context.Context) {
+ logger.Debug(ctx, "delete-all-transaction-id-channel-map")
+ kp.lockTransactionIdToChannelMap.Lock()
+ defer kp.lockTransactionIdToChannelMap.Unlock()
+ for key, value := range kp.transactionIdToChannelMap {
+ close(value.ch)
+ delete(kp.transactionIdToChannelMap, key)
+ }
+}
+
+func (kp *interContainerProxy) DeleteTopic(ctx context.Context, topic Topic) error {
+ // If we have any consumers on that topic we need to close them
+ if err := kp.deleteFromTopicResponseChannelMap(ctx, topic.Name); err != nil {
+ logger.Errorw(ctx, "delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
+ }
+ if err := kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name); err != nil {
+ logger.Errorw(ctx, "delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
+ }
+ kp.deleteTopicTransactionIdToChannelMap(topic.Name)
+
+ return kp.kafkaClient.DeleteTopic(ctx, &topic)
+}
+
+func encodeReturnedValue(ctx context.Context, returnedVal interface{}) (*any.Any, error) {
+ // Encode the response argument - needs to be a proto message
+ if returnedVal == nil {
+ return nil, nil
+ }
+ protoValue, ok := returnedVal.(proto.Message)
+ if !ok {
+ logger.Warnw(ctx, "response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
+ err := errors.New("response-value-not-proto-message")
+ return nil, err
+ }
+
+ // Marshal the returned value, if any
+ var marshalledReturnedVal *any.Any
+ var err error
+ if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
+ logger.Warnw(ctx, "cannot-marshal-returned-val", log.Fields{"error": err})
+ return nil, err
+ }
+ return marshalledReturnedVal, nil
+}
+
+func encodeDefaultFailedResponse(ctx context.Context, request *ic.InterContainerMessage) *ic.InterContainerMessage {
+ responseHeader := &ic.Header{
+ Id: request.Header.Id,
+ Type: ic.MessageType_RESPONSE,
+ FromTopic: request.Header.ToTopic,
+ ToTopic: request.Header.FromTopic,
+ Timestamp: ptypes.TimestampNow(),
+ }
+ responseBody := &ic.InterContainerResponseBody{
+ Success: false,
+ Result: nil,
+ }
+ var marshalledResponseBody *any.Any
+ var err error
+ // Error should never happen here
+ if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
+ logger.Warnw(ctx, "cannot-marshal-failed-response-body", log.Fields{"error": err})
+ }
+
+ return &ic.InterContainerMessage{
+ Header: responseHeader,
+ Body: marshalledResponseBody,
+ }
+
+}
+
+//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
+//or an error on failure
+func encodeResponse(ctx context.Context, request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
+ //logger.Debugw(ctx, "encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
+ responseHeader := &ic.Header{
+ Id: request.Header.Id,
+ Type: ic.MessageType_RESPONSE,
+ FromTopic: request.Header.ToTopic,
+ ToTopic: request.Header.FromTopic,
+ KeyTopic: request.Header.KeyTopic,
+ Timestamp: ptypes.TimestampNow(),
+ }
+
+ // Go over all returned values
+ var marshalledReturnedVal *any.Any
+ var err error
+
+ // for now we support only 1 returned value - (excluding the error)
+ if len(returnedValues) > 0 {
+ if marshalledReturnedVal, err = encodeReturnedValue(ctx, returnedValues[0]); err != nil {
+ logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
+ }
+ }
+
+ responseBody := &ic.InterContainerResponseBody{
+ Success: success,
+ Result: marshalledReturnedVal,
+ }
+
+ // Marshal the response body
+ var marshalledResponseBody *any.Any
+ if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
+ logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
+ return nil, err
+ }
+
+ return &ic.InterContainerMessage{
+ Header: responseHeader,
+ Body: marshalledResponseBody,
+ }, nil
+}
+
+func CallFuncByName(ctx context.Context, myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
+ myClassValue := reflect.ValueOf(myClass)
+ // Capitalize the first letter in the funcName to workaround the first capital letters required to
+ // invoke a function from a different package
+ funcName = strings.Title(funcName)
+ m := myClassValue.MethodByName(funcName)
+ if !m.IsValid() {
+ return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
+ }
+ in := make([]reflect.Value, len(params)+1)
+ in[0] = reflect.ValueOf(ctx)
+ for i, param := range params {
+ in[i+1] = reflect.ValueOf(param)
+ }
+ out = m.Call(in)
+ return
+}
+
+func (kp *interContainerProxy) addTransactionId(ctx context.Context, transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
+ arg := &KVArg{
+ Key: TransactionKey,
+ Value: &ic.StrType{Val: transactionId},
+ }
+
+ var marshalledArg *any.Any
+ var err error
+ if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
+ logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
+ return currentArgs
+ }
+ protoArg := &ic.Argument{
+ Key: arg.Key,
+ Value: marshalledArg,
+ }
+ return append(currentArgs, protoArg)
+}
+
+func (kp *interContainerProxy) addFromTopic(ctx context.Context, fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
+ var marshalledArg *any.Any
+ var err error
+ if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
+ logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
+ return currentArgs
+ }
+ protoArg := &ic.Argument{
+ Key: FromTopic,
+ Value: marshalledArg,
+ }
+ return append(currentArgs, protoArg)
+}
+
+// Method to extract the Span embedded in Kafka RPC request on the receiver side. If span is found embedded in the KV args (with key as "span"),
+// it is de-serialized and injected into the Context to be carried forward by the RPC request processor thread.
+// If no span is found embedded, even then a span is created with name as "kafka-rpc-<rpc-name>" to enrich the Context for RPC calls coming
+// from components currently not sending the span (e.g. openonu adapter)
+func (kp *interContainerProxy) enrichContextWithSpan(ctx context.Context, rpcName string, args []*ic.Argument) (opentracing.Span, context.Context) {
+
+ for _, arg := range args {
+ if arg.Key == "span" {
+ var err error
+ var textMapString ic.StrType
+ if err = ptypes.UnmarshalAny(arg.Value, &textMapString); err != nil {
+ logger.Warnw(ctx, "unable-to-unmarshal-kvarg-to-textmap-string", log.Fields{"value": arg.Value})
+ break
+ }
+
+ spanTextMap := make(map[string]string)
+ if err = json.Unmarshal([]byte(textMapString.Val), &spanTextMap); err != nil {
+ logger.Warnw(ctx, "unable-to-unmarshal-textmap-from-json-string", log.Fields{"textMapString": textMapString, "error": err})
+ break
+ }
+
+ var spanContext opentracing.SpanContext
+ if spanContext, err = opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(spanTextMap)); err != nil {
+ logger.Warnw(ctx, "unable-to-deserialize-textmap-to-span", log.Fields{"textMap": spanTextMap, "error": err})
+ break
+ }
+
+ var receivedRpcName string
+ extractBaggage := func(k, v string) bool {
+ if k == "rpc-span-name" {
+ receivedRpcName = v
+ return false
+ }
+ return true
+ }
+
+ spanContext.ForeachBaggageItem(extractBaggage)
+
+ return opentracing.StartSpanFromContext(ctx, receivedRpcName, opentracing.FollowsFrom(spanContext))
+ }
+ }
+
+ // Create new Child Span with rpc as name if no span details were received in kafka arguments
+ var spanName strings.Builder
+ spanName.WriteString("kafka-")
+
+ // In case of inter adapter message, use Msg Type for constructing RPC name
+ if rpcName == "process_inter_adapter_message" {
+ for _, arg := range args {
+ if arg.Key == "msg" {
+ iamsg := ic.InterAdapterMessage{}
+ if err := ptypes.UnmarshalAny(arg.Value, &iamsg); err == nil {
+ spanName.WriteString("inter-adapter-")
+ rpcName = iamsg.Header.Type.String()
+ }
+ }
+ }
+ }
+
+ spanName.WriteString("rpc-")
+ spanName.WriteString(rpcName)
+
+ return opentracing.StartSpanFromContext(ctx, spanName.String())
+}
+
+func (kp *interContainerProxy) handleMessage(ctx context.Context, msg *ic.InterContainerMessage, targetInterface interface{}) {
+
+ // First extract the header to know whether this is a request - responses are handled by a different handler
+ if msg.Header.Type == ic.MessageType_REQUEST {
+ var out []reflect.Value
+ var err error
+
+ // Get the request body
+ requestBody := &ic.InterContainerRequestBody{}
+ if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-request", log.Fields{"error": err})
+ } else {
+ span, ctx := kp.enrichContextWithSpan(ctx, requestBody.Rpc, requestBody.Args)
+ defer span.Finish()
+
+ logger.Debugw(ctx, "received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
+ // let the callee unpack the arguments as its the only one that knows the real proto type
+ // Augment the requestBody with the message Id as it will be used in scenarios where cores
+ // are set in pairs and competing
+ requestBody.Args = kp.addTransactionId(ctx, msg.Header.Id, requestBody.Args)
+
+ // Augment the requestBody with the From topic name as it will be used in scenarios where a container
+ // needs to send an unsollicited message to the currently requested container
+ requestBody.Args = kp.addFromTopic(ctx, msg.Header.FromTopic, requestBody.Args)
+
+ out, err = CallFuncByName(ctx, targetInterface, requestBody.Rpc, requestBody.Args)
+ if err != nil {
+ logger.Warn(ctx, err)
+ }
+ }
+ // Response required?
+ if requestBody.ResponseRequired {
+ // If we already have an error before then just return that
+ var returnError *ic.Error
+ var returnedValues []interface{}
+ var success bool
+ if err != nil {
+ returnError = &ic.Error{Reason: err.Error()}
+ returnedValues = make([]interface{}, 1)
+ returnedValues[0] = returnError
+ } else {
+ returnedValues = make([]interface{}, 0)
+ // Check for errors first
+ lastIndex := len(out) - 1
+ if out[lastIndex].Interface() != nil { // Error
+ if retError, ok := out[lastIndex].Interface().(error); ok {
+ if retError.Error() == ErrorTransactionNotAcquired.Error() {
+ logger.Debugw(ctx, "Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
+ return // Ignore - process is in competing mode and ignored transaction
+ }
+ returnError = &ic.Error{Reason: retError.Error()}
+ returnedValues = append(returnedValues, returnError)
+ } else { // Should never happen
+ returnError = &ic.Error{Reason: "incorrect-error-returns"}
+ returnedValues = append(returnedValues, returnError)
+ }
+ } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
+ logger.Warnw(ctx, "Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
+ return // Ignore - should not happen
+ } else { // Non-error case
+ success = true
+ for idx, val := range out {
+ //logger.Debugw(ctx, "returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
+ if idx != lastIndex {
+ returnedValues = append(returnedValues, val.Interface())
+ }
+ }
+ }
+ }
+
+ var icm *ic.InterContainerMessage
+ if icm, err = encodeResponse(ctx, msg, success, returnedValues...); err != nil {
+ logger.Warnw(ctx, "error-encoding-response-returning-failure-result", log.Fields{"error": err})
+ icm = encodeDefaultFailedResponse(ctx, msg)
+ }
+ // To preserve ordering of messages, all messages to a given topic are sent to the same partition
+ // by providing a message key. The key is encoded in the topic name. If the deviceId is not
+ // present then the key will be empty, hence all messages for a given topic will be sent to all
+ // partitions.
+ replyTopic := &Topic{Name: msg.Header.FromTopic}
+ key := msg.Header.KeyTopic
+ logger.Debugw(ctx, "sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
+ // TODO: handle error response.
+ go func() {
+ if err := kp.kafkaClient.Send(ctx, icm, replyTopic, key); err != nil {
+ logger.Errorw(ctx, "send-reply-failed", log.Fields{
+ "topic": replyTopic,
+ "key": key,
+ "error": err})
+ }
+ }()
+ }
+ } else if msg.Header.Type == ic.MessageType_RESPONSE {
+ logger.Debugw(ctx, "response-received", log.Fields{"msg-header": msg.Header})
+ go kp.dispatchResponse(ctx, msg)
+ } else {
+ logger.Warnw(ctx, "unsupported-message-received", log.Fields{"msg-header": msg.Header})
+ }
+}
+
+func (kp *interContainerProxy) waitForMessages(ctx context.Context, ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
+ // Wait for messages
+ for msg := range ch {
+ //logger.Debugw(ctx, "request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
+ go kp.handleMessage(context.Background(), msg, targetInterface)
+ }
+}
+
+func (kp *interContainerProxy) dispatchResponse(ctx context.Context, msg *ic.InterContainerMessage) {
+ kp.lockTransactionIdToChannelMap.RLock()
+ defer kp.lockTransactionIdToChannelMap.RUnlock()
+ if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
+ logger.Debugw(ctx, "no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
+ return
+ }
+ kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
+}
+
+// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
+// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
+// API. There is one response channel waiting for kafka messages before dispatching the message to the
+// corresponding waiting channel
+func (kp *interContainerProxy) subscribeForResponse(ctx context.Context, topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
+ logger.Debugw(ctx, "subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
+
+ // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
+ // broadcast any message for this topic to all channels waiting on it.
+ // Set channel size to 1 to prevent deadlock, see VOL-2708
+ ch := make(chan *ic.InterContainerMessage, 1)
+ kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
+
+ return ch, nil
+}
+
+func (kp *interContainerProxy) unSubscribeForResponse(ctx context.Context, trnsId string) error {
+ logger.Debugw(ctx, "unsubscribe-for-response", log.Fields{"trnsId": trnsId})
+ kp.deleteFromTransactionIdToChannelMap(trnsId)
+ return nil
+}
+
+func (kp *interContainerProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
+ return kp.kafkaClient.EnableLivenessChannel(ctx, enable)
+}
+
+func (kp *interContainerProxy) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
+ return kp.kafkaClient.EnableHealthinessChannel(ctx, enable)
+}
+
+func (kp *interContainerProxy) SendLiveness(ctx context.Context) error {
+ return kp.kafkaClient.SendLiveness(ctx)
+}
+
+//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
+//or an error on failure
+func encodeRequest(ctx context.Context, rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
+ requestHeader := &ic.Header{
+ Id: uuid.New().String(),
+ Type: ic.MessageType_REQUEST,
+ FromTopic: replyTopic.Name,
+ ToTopic: toTopic.Name,
+ KeyTopic: key,
+ Timestamp: ptypes.TimestampNow(),
+ }
+ requestBody := &ic.InterContainerRequestBody{
+ Rpc: rpc,
+ ResponseRequired: true,
+ ReplyToTopic: replyTopic.Name,
+ }
+
+ for _, arg := range kvArgs {
+ if arg == nil {
+ // In case the caller sends an array with empty args
+ continue
+ }
+ var marshalledArg *any.Any
+ var err error
+ // ascertain the value interface type is a proto.Message
+ protoValue, ok := arg.Value.(proto.Message)
+ if !ok {
+ logger.Warnw(ctx, "argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
+ err := errors.New("argument-value-not-proto-message")
+ return nil, err
+ }
+ if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
+ logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
+ return nil, err
+ }
+ protoArg := &ic.Argument{
+ Key: arg.Key,
+ Value: marshalledArg,
+ }
+ requestBody.Args = append(requestBody.Args, protoArg)
+ }
+
+ var marshalledData *any.Any
+ var err error
+ if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
+ logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
+ return nil, err
+ }
+ request := &ic.InterContainerMessage{
+ Header: requestHeader,
+ Body: marshalledData,
+ }
+ return request, nil
+}
+
+func decodeResponse(ctx context.Context, response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
+ // Extract the message body
+ responseBody := ic.InterContainerResponseBody{}
+ if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, err
+ }
+ //logger.Debugw(ctx, "response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
+
+ return &responseBody, nil
+
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
new file mode 100644
index 0000000..69450fa
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
@@ -0,0 +1,1154 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/Shopify/sarama"
+ scc "github.com/bsm/sarama-cluster"
+ "github.com/eapache/go-resiliency/breaker"
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/google/uuid"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+)
+
+// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
+// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
+//consumer or a group consumer
+type consumerChannels struct {
+ consumers []interface{}
+ channels []chan *ic.InterContainerMessage
+}
+
+// static check to ensure SaramaClient implements Client
+var _ Client = &SaramaClient{}
+
+// SaramaClient represents the messaging proxy
+type SaramaClient struct {
+ cAdmin sarama.ClusterAdmin
+ KafkaAddress string
+ producer sarama.AsyncProducer
+ consumer sarama.Consumer
+ groupConsumers map[string]*scc.Consumer
+ lockOfGroupConsumers sync.RWMutex
+ consumerGroupPrefix string
+ consumerType int
+ consumerGroupName string
+ producerFlushFrequency int
+ producerFlushMessages int
+ producerFlushMaxmessages int
+ producerRetryMax int
+ producerRetryBackOff time.Duration
+ producerReturnSuccess bool
+ producerReturnErrors bool
+ consumerMaxwait int
+ maxProcessingTime int
+ numPartitions int
+ numReplicas int
+ autoCreateTopic bool
+ doneCh chan int
+ metadataCallback func(fromTopic string, timestamp time.Time)
+ topicToConsumerChannelMap map[string]*consumerChannels
+ lockTopicToConsumerChannelMap sync.RWMutex
+ topicLockMap map[string]*sync.RWMutex
+ lockOfTopicLockMap sync.RWMutex
+ metadataMaxRetry int
+ alive bool
+ livenessMutex sync.Mutex
+ liveness chan bool
+ livenessChannelInterval time.Duration
+ lastLivenessTime time.Time
+ started bool
+ healthinessMutex sync.Mutex
+ healthy bool
+ healthiness chan bool
+}
+
+type SaramaClientOption func(*SaramaClient)
+
+func Address(address string) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.KafkaAddress = address
+ }
+}
+
+func ConsumerGroupPrefix(prefix string) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.consumerGroupPrefix = prefix
+ }
+}
+
+func ConsumerGroupName(name string) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.consumerGroupName = name
+ }
+}
+
+func ConsumerType(consumer int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.consumerType = consumer
+ }
+}
+
+func ProducerFlushFrequency(frequency int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.producerFlushFrequency = frequency
+ }
+}
+
+func ProducerFlushMessages(num int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.producerFlushMessages = num
+ }
+}
+
+func ProducerFlushMaxMessages(num int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.producerFlushMaxmessages = num
+ }
+}
+
+func ProducerMaxRetries(num int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.producerRetryMax = num
+ }
+}
+
+func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.producerRetryBackOff = duration
+ }
+}
+
+func ProducerReturnOnErrors(opt bool) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.producerReturnErrors = opt
+ }
+}
+
+func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.producerReturnSuccess = opt
+ }
+}
+
+func ConsumerMaxWait(wait int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.consumerMaxwait = wait
+ }
+}
+
+func MaxProcessingTime(pTime int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.maxProcessingTime = pTime
+ }
+}
+
+func NumPartitions(number int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.numPartitions = number
+ }
+}
+
+func NumReplicas(number int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.numReplicas = number
+ }
+}
+
+func AutoCreateTopic(opt bool) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.autoCreateTopic = opt
+ }
+}
+
+func MetadatMaxRetries(retry int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.metadataMaxRetry = retry
+ }
+}
+
+func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.livenessChannelInterval = opt
+ }
+}
+
+func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
+ client := &SaramaClient{
+ KafkaAddress: DefaultKafkaAddress,
+ }
+ client.consumerType = DefaultConsumerType
+ client.producerFlushFrequency = DefaultProducerFlushFrequency
+ client.producerFlushMessages = DefaultProducerFlushMessages
+ client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
+ client.producerReturnErrors = DefaultProducerReturnErrors
+ client.producerReturnSuccess = DefaultProducerReturnSuccess
+ client.producerRetryMax = DefaultProducerRetryMax
+ client.producerRetryBackOff = DefaultProducerRetryBackoff
+ client.consumerMaxwait = DefaultConsumerMaxwait
+ client.maxProcessingTime = DefaultMaxProcessingTime
+ client.numPartitions = DefaultNumberPartitions
+ client.numReplicas = DefaultNumberReplicas
+ client.autoCreateTopic = DefaultAutoCreateTopic
+ client.metadataMaxRetry = DefaultMetadataMaxRetry
+ client.livenessChannelInterval = DefaultLivenessChannelInterval
+
+ for _, option := range opts {
+ option(client)
+ }
+
+ client.groupConsumers = make(map[string]*scc.Consumer)
+
+ client.lockTopicToConsumerChannelMap = sync.RWMutex{}
+ client.topicLockMap = make(map[string]*sync.RWMutex)
+ client.lockOfTopicLockMap = sync.RWMutex{}
+ client.lockOfGroupConsumers = sync.RWMutex{}
+
+ // healthy and alive until proven otherwise
+ client.alive = true
+ client.healthy = true
+
+ return client
+}
+
+func (sc *SaramaClient) Start(ctx context.Context) error {
+ logger.Info(ctx, "Starting-kafka-sarama-client")
+
+ // Create the Done channel
+ sc.doneCh = make(chan int, 1)
+
+ var err error
+
+ // Add a cleanup in case of failure to startup
+ defer func() {
+ if err != nil {
+ sc.Stop(ctx)
+ }
+ }()
+
+ // Create the Cluster Admin
+ if err = sc.createClusterAdmin(ctx); err != nil {
+ logger.Errorw(ctx, "Cannot-create-cluster-admin", log.Fields{"error": err})
+ return err
+ }
+
+ // Create the Publisher
+ if err := sc.createPublisher(ctx); err != nil {
+ logger.Errorw(ctx, "Cannot-create-kafka-publisher", log.Fields{"error": err})
+ return err
+ }
+
+ if sc.consumerType == DefaultConsumerType {
+ // Create the master consumers
+ if err := sc.createConsumer(ctx); err != nil {
+ logger.Errorw(ctx, "Cannot-create-kafka-consumers", log.Fields{"error": err})
+ return err
+ }
+ }
+
+ // Create the topic to consumers/channel map
+ sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
+
+ logger.Info(ctx, "kafka-sarama-client-started")
+
+ sc.started = true
+
+ return nil
+}
+
+func (sc *SaramaClient) Stop(ctx context.Context) {
+ logger.Info(ctx, "stopping-sarama-client")
+
+ sc.started = false
+
+ //Send a message over the done channel to close all long running routines
+ sc.doneCh <- 1
+
+ if sc.producer != nil {
+ if err := sc.producer.Close(); err != nil {
+ logger.Errorw(ctx, "closing-producer-failed", log.Fields{"error": err})
+ }
+ }
+
+ if sc.consumer != nil {
+ if err := sc.consumer.Close(); err != nil {
+ logger.Errorw(ctx, "closing-partition-consumer-failed", log.Fields{"error": err})
+ }
+ }
+
+ for key, val := range sc.groupConsumers {
+ logger.Debugw(ctx, "closing-group-consumer", log.Fields{"topic": key})
+ if err := val.Close(); err != nil {
+ logger.Errorw(ctx, "closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
+ }
+ }
+
+ if sc.cAdmin != nil {
+ if err := sc.cAdmin.Close(); err != nil {
+ logger.Errorw(ctx, "closing-cluster-admin-failed", log.Fields{"error": err})
+ }
+ }
+
+ //TODO: Clear the consumers map
+ //sc.clearConsumerChannelMap()
+
+ logger.Info(ctx, "sarama-client-stopped")
+}
+
+//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
+// the invoking function must hold the lock
+func (sc *SaramaClient) createTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
+ // Set the topic details
+ topicDetail := &sarama.TopicDetail{}
+ topicDetail.NumPartitions = int32(numPartition)
+ topicDetail.ReplicationFactor = int16(repFactor)
+ topicDetail.ConfigEntries = make(map[string]*string)
+ topicDetails := make(map[string]*sarama.TopicDetail)
+ topicDetails[topic.Name] = topicDetail
+
+ if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
+ if err == sarama.ErrTopicAlreadyExists {
+ // Not an error
+ logger.Debugw(ctx, "topic-already-exist", log.Fields{"topic": topic.Name})
+ return nil
+ }
+ logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
+ return err
+ }
+ // TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
+ // do so.
+ logger.Debugw(ctx, "topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
+ return nil
+}
+
+//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
+// ensure no two go routines are performing operations on the same topic
+func (sc *SaramaClient) CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
+ sc.lockTopic(topic)
+ defer sc.unLockTopic(topic)
+
+ return sc.createTopic(ctx, topic, numPartition, repFactor)
+}
+
+//DeleteTopic removes a topic from the kafka Broker
+func (sc *SaramaClient) DeleteTopic(ctx context.Context, topic *Topic) error {
+ sc.lockTopic(topic)
+ defer sc.unLockTopic(topic)
+
+ // Remove the topic from the broker
+ if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
+ if err == sarama.ErrUnknownTopicOrPartition {
+ // Not an error as does not exist
+ logger.Debugw(ctx, "topic-not-exist", log.Fields{"topic": topic.Name})
+ return nil
+ }
+ logger.Errorw(ctx, "delete-topic-failed", log.Fields{"topic": topic, "error": err})
+ return err
+ }
+
+ // Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
+ if err := sc.clearTopicFromConsumerChannelMap(ctx, *topic); err != nil {
+ logger.Errorw(ctx, "failure-clearing-channels", log.Fields{"topic": topic, "error": err})
+ return err
+ }
+ return nil
+}
+
+// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
+// messages from that topic
+func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
+ sc.lockTopic(topic)
+ defer sc.unLockTopic(topic)
+
+ logger.Debugw(ctx, "subscribe", log.Fields{"topic": topic.Name})
+
+ // If a consumers already exist for that topic then resuse it
+ if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
+ logger.Debugw(ctx, "topic-already-subscribed", log.Fields{"topic": topic.Name})
+ // Create a channel specific for that consumers and add it to the consumers channel map
+ ch := make(chan *ic.InterContainerMessage)
+ sc.addChannelToConsumerChannelMap(ctx, topic, ch)
+ return ch, nil
+ }
+
+ // Register for the topic and set it up
+ var consumerListeningChannel chan *ic.InterContainerMessage
+ var err error
+
+ // Use the consumerType option to figure out the type of consumer to launch
+ if sc.consumerType == PartitionConsumer {
+ if sc.autoCreateTopic {
+ if err = sc.createTopic(ctx, topic, sc.numPartitions, sc.numReplicas); err != nil {
+ logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
+ return nil, err
+ }
+ }
+ if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(ctx, topic, getOffset(kvArgs...)); err != nil {
+ logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+ return nil, err
+ }
+ } else if sc.consumerType == GroupCustomer {
+ // TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
+ // does not consume from a precreated topic in some scenarios
+ //if sc.autoCreateTopic {
+ // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
+ // logger.Errorw(ctx, "create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
+ // return nil, err
+ // }
+ //}
+ //groupId := sc.consumerGroupName
+ groupId := getGroupId(kvArgs...)
+ // Include the group prefix
+ if groupId != "" {
+ groupId = sc.consumerGroupPrefix + groupId
+ } else {
+ // Need to use a unique group Id per topic
+ groupId = sc.consumerGroupPrefix + topic.Name
+ }
+ if consumerListeningChannel, err = sc.setupGroupConsumerChannel(ctx, topic, groupId, getOffset(kvArgs...)); err != nil {
+ logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+ return nil, err
+ }
+
+ } else {
+ logger.Warnw(ctx, "unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
+ return nil, errors.New("unknown-consumer-type")
+ }
+
+ return consumerListeningChannel, nil
+}
+
+//UnSubscribe unsubscribe a consumer from a given topic
+func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan *ic.InterContainerMessage) error {
+ sc.lockTopic(topic)
+ defer sc.unLockTopic(topic)
+
+ logger.Debugw(ctx, "unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
+ var err error
+ if err = sc.removeChannelFromConsumerChannelMap(ctx, *topic, ch); err != nil {
+ logger.Errorw(ctx, "failed-removing-channel", log.Fields{"error": err})
+ }
+ if err = sc.deleteFromGroupConsumers(ctx, topic.Name); err != nil {
+ logger.Errorw(ctx, "failed-deleting-group-consumer", log.Fields{"error": err})
+ }
+ return err
+}
+
+func (sc *SaramaClient) SubscribeForMetadata(ctx context.Context, callback func(fromTopic string, timestamp time.Time)) {
+ sc.metadataCallback = callback
+}
+
+func (sc *SaramaClient) updateLiveness(ctx context.Context, alive bool) {
+ // Post a consistent stream of liveness data to the channel,
+ // so that in a live state, the core does not timeout and
+ // send a forced liveness message. Production of liveness
+ // events to the channel is rate-limited by livenessChannelInterval.
+ sc.livenessMutex.Lock()
+ defer sc.livenessMutex.Unlock()
+ if sc.liveness != nil {
+ if sc.alive != alive {
+ logger.Info(ctx, "update-liveness-channel-because-change")
+ sc.liveness <- alive
+ sc.lastLivenessTime = time.Now()
+ } else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
+ logger.Info(ctx, "update-liveness-channel-because-interval")
+ sc.liveness <- alive
+ sc.lastLivenessTime = time.Now()
+ }
+ }
+
+ // Only emit a log message when the state changes
+ if sc.alive != alive {
+ logger.Info(ctx, "set-client-alive", log.Fields{"alive": alive})
+ sc.alive = alive
+ }
+}
+
+// Once unhealthy, we never go back
+func (sc *SaramaClient) setUnhealthy(ctx context.Context) {
+ sc.healthy = false
+ sc.healthinessMutex.Lock()
+ defer sc.healthinessMutex.Unlock()
+ if sc.healthiness != nil {
+ logger.Infow(ctx, "set-client-unhealthy", log.Fields{"healthy": sc.healthy})
+ sc.healthiness <- sc.healthy
+ }
+}
+
+func (sc *SaramaClient) isLivenessError(ctx context.Context, err error) bool {
+ // Sarama producers and consumers encapsulate the error inside
+ // a ProducerError or ConsumerError struct.
+ if prodError, ok := err.(*sarama.ProducerError); ok {
+ err = prodError.Err
+ } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
+ err = consumerError.Err
+ }
+
+ // Sarama-Cluster will compose the error into a ClusterError struct,
+ // which we can't do a compare by reference. To handle that, we the
+ // best we can do is compare the error strings.
+
+ switch err.Error() {
+ case context.DeadlineExceeded.Error():
+ logger.Info(ctx, "is-liveness-error-timeout")
+ return true
+ case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
+ logger.Info(ctx, "is-liveness-error-no-brokers")
+ return true
+ case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
+ logger.Info(ctx, "is-liveness-error-shutting-down")
+ return true
+ case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
+ logger.Info(ctx, "is-liveness-error-not-available")
+ return true
+ case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
+ logger.Info(ctx, "is-liveness-error-circuit-breaker-open")
+ return true
+ }
+
+ if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
+ logger.Info(ctx, "is-liveness-error-connection-refused")
+ return true
+ }
+
+ if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
+ logger.Info(ctx, "is-liveness-error-io-timeout")
+ return true
+ }
+
+ // Other errors shouldn't trigger a loss of liveness
+
+ logger.Infow(ctx, "is-liveness-error-ignored", log.Fields{"err": err})
+
+ return false
+}
+
+// send formats and sends the request onto the kafka messaging bus.
+func (sc *SaramaClient) Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error {
+
+ // Assert message is a proto message
+ var protoMsg proto.Message
+ var ok bool
+ // ascertain the value interface type is a proto.Message
+ if protoMsg, ok = msg.(proto.Message); !ok {
+ logger.Warnw(ctx, "message-not-proto-message", log.Fields{"msg": msg})
+ return fmt.Errorf("not-a-proto-msg-%s", msg)
+ }
+
+ var marshalled []byte
+ var err error
+ // Create the Sarama producer message
+ if marshalled, err = proto.Marshal(protoMsg); err != nil {
+ logger.Errorw(ctx, "marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
+ return err
+ }
+ key := ""
+ if len(keys) > 0 {
+ key = keys[0] // Only the first key is relevant
+ }
+ kafkaMsg := &sarama.ProducerMessage{
+ Topic: topic.Name,
+ Key: sarama.StringEncoder(key),
+ Value: sarama.ByteEncoder(marshalled),
+ }
+
+ // Send message to kafka
+ sc.producer.Input() <- kafkaMsg
+ // Wait for result
+ // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
+ select {
+ case ok := <-sc.producer.Successes():
+ logger.Debugw(ctx, "message-sent", log.Fields{"status": ok.Topic})
+ sc.updateLiveness(ctx, true)
+ case notOk := <-sc.producer.Errors():
+ logger.Debugw(ctx, "error-sending", log.Fields{"status": notOk})
+ if sc.isLivenessError(ctx, notOk) {
+ sc.updateLiveness(ctx, false)
+ }
+ return notOk
+ }
+ return nil
+}
+
+// Enable the liveness monitor channel. This channel will report
+// a "true" or "false" on every publish, which indicates whether
+// or not the channel is still live. This channel is then picked up
+// by the service (i.e. rw_core / ro_core) to update readiness status
+// and/or take other actions.
+func (sc *SaramaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
+ logger.Infow(ctx, "kafka-enable-liveness-channel", log.Fields{"enable": enable})
+ if enable {
+ sc.livenessMutex.Lock()
+ defer sc.livenessMutex.Unlock()
+ if sc.liveness == nil {
+ logger.Info(ctx, "kafka-create-liveness-channel")
+ // At least 1, so we can immediately post to it without blocking
+ // Setting a bigger number (10) allows the monitor to fall behind
+ // without blocking others. The monitor shouldn't really fall
+ // behind...
+ sc.liveness = make(chan bool, 10)
+ // post intial state to the channel
+ sc.liveness <- sc.alive
+ }
+ } else {
+ // TODO: Think about whether we need the ability to turn off
+ // liveness monitoring
+ panic("Turning off liveness reporting is not supported")
+ }
+ return sc.liveness
+}
+
+// Enable the Healthiness monitor channel. This channel will report "false"
+// if the kafka consumers die, or some other problem occurs which is
+// catastrophic that would require re-creating the client.
+func (sc *SaramaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
+ logger.Infow(ctx, "kafka-enable-healthiness-channel", log.Fields{"enable": enable})
+ if enable {
+ sc.healthinessMutex.Lock()
+ defer sc.healthinessMutex.Unlock()
+ if sc.healthiness == nil {
+ logger.Info(ctx, "kafka-create-healthiness-channel")
+ // At least 1, so we can immediately post to it without blocking
+ // Setting a bigger number (10) allows the monitor to fall behind
+ // without blocking others. The monitor shouldn't really fall
+ // behind...
+ sc.healthiness = make(chan bool, 10)
+ // post intial state to the channel
+ sc.healthiness <- sc.healthy
+ }
+ } else {
+ // TODO: Think about whether we need the ability to turn off
+ // liveness monitoring
+ panic("Turning off healthiness reporting is not supported")
+ }
+ return sc.healthiness
+}
+
+// send an empty message on the liveness channel to check whether connectivity has
+// been restored.
+func (sc *SaramaClient) SendLiveness(ctx context.Context) error {
+ if !sc.started {
+ return fmt.Errorf("SendLiveness() called while not started")
+ }
+
+ kafkaMsg := &sarama.ProducerMessage{
+ Topic: "_liveness_test",
+ Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
+ }
+
+ // Send message to kafka
+ sc.producer.Input() <- kafkaMsg
+ // Wait for result
+ // TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
+ select {
+ case ok := <-sc.producer.Successes():
+ logger.Debugw(ctx, "liveness-message-sent", log.Fields{"status": ok.Topic})
+ sc.updateLiveness(ctx, true)
+ case notOk := <-sc.producer.Errors():
+ logger.Debugw(ctx, "liveness-error-sending", log.Fields{"status": notOk})
+ if sc.isLivenessError(ctx, notOk) {
+ sc.updateLiveness(ctx, false)
+ }
+ return notOk
+ }
+ return nil
+}
+
+// getGroupId returns the group id from the key-value args.
+func getGroupId(kvArgs ...*KVArg) string {
+ for _, arg := range kvArgs {
+ if arg.Key == GroupIdKey {
+ return arg.Value.(string)
+ }
+ }
+ return ""
+}
+
+// getOffset returns the offset from the key-value args.
+func getOffset(kvArgs ...*KVArg) int64 {
+ for _, arg := range kvArgs {
+ if arg.Key == Offset {
+ return arg.Value.(int64)
+ }
+ }
+ return sarama.OffsetNewest
+}
+
+func (sc *SaramaClient) createClusterAdmin(ctx context.Context) error {
+ config := sarama.NewConfig()
+ config.Version = sarama.V1_0_0_0
+
+ // Create a cluster Admin
+ var cAdmin sarama.ClusterAdmin
+ var err error
+ if cAdmin, err = sarama.NewClusterAdmin([]string{sc.KafkaAddress}, config); err != nil {
+ logger.Errorw(ctx, "cluster-admin-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
+ return err
+ }
+ sc.cAdmin = cAdmin
+ return nil
+}
+
+func (sc *SaramaClient) lockTopic(topic *Topic) {
+ sc.lockOfTopicLockMap.Lock()
+ if _, exist := sc.topicLockMap[topic.Name]; exist {
+ sc.lockOfTopicLockMap.Unlock()
+ sc.topicLockMap[topic.Name].Lock()
+ } else {
+ sc.topicLockMap[topic.Name] = &sync.RWMutex{}
+ sc.lockOfTopicLockMap.Unlock()
+ sc.topicLockMap[topic.Name].Lock()
+ }
+}
+
+func (sc *SaramaClient) unLockTopic(topic *Topic) {
+ sc.lockOfTopicLockMap.Lock()
+ defer sc.lockOfTopicLockMap.Unlock()
+ if _, exist := sc.topicLockMap[topic.Name]; exist {
+ sc.topicLockMap[topic.Name].Unlock()
+ }
+}
+
+func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
+ sc.lockTopicToConsumerChannelMap.Lock()
+ defer sc.lockTopicToConsumerChannelMap.Unlock()
+ if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
+ sc.topicToConsumerChannelMap[id] = arg
+ }
+}
+
+func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
+ sc.lockTopicToConsumerChannelMap.RLock()
+ defer sc.lockTopicToConsumerChannelMap.RUnlock()
+
+ if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
+ return consumerCh
+ }
+ return nil
+}
+
+func (sc *SaramaClient) addChannelToConsumerChannelMap(ctx context.Context, topic *Topic, ch chan *ic.InterContainerMessage) {
+ sc.lockTopicToConsumerChannelMap.Lock()
+ defer sc.lockTopicToConsumerChannelMap.Unlock()
+ if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
+ consumerCh.channels = append(consumerCh.channels, ch)
+ return
+ }
+ logger.Warnw(ctx, "consumers-channel-not-exist", log.Fields{"topic": topic.Name})
+}
+
+//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
+func closeConsumers(ctx context.Context, consumers []interface{}) error {
+ var err error
+ for _, consumer := range consumers {
+ // Is it a partition consumers?
+ if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
+ if errTemp := partionConsumer.Close(); errTemp != nil {
+ logger.Debugw(ctx, "partition!!!", log.Fields{"err": errTemp})
+ if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
+ // This can occur on race condition
+ err = nil
+ } else {
+ err = errTemp
+ }
+ }
+ } else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
+ if errTemp := groupConsumer.Close(); errTemp != nil {
+ if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
+ // This can occur on race condition
+ err = nil
+ } else {
+ err = errTemp
+ }
+ }
+ }
+ }
+ return err
+}
+
+func (sc *SaramaClient) removeChannelFromConsumerChannelMap(ctx context.Context, topic Topic, ch <-chan *ic.InterContainerMessage) error {
+ sc.lockTopicToConsumerChannelMap.Lock()
+ defer sc.lockTopicToConsumerChannelMap.Unlock()
+ if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
+ // Channel will be closed in the removeChannel method
+ consumerCh.channels = removeChannel(ctx, consumerCh.channels, ch)
+ // If there are no more channels then we can close the consumers itself
+ if len(consumerCh.channels) == 0 {
+ logger.Debugw(ctx, "closing-consumers", log.Fields{"topic": topic})
+ err := closeConsumers(ctx, consumerCh.consumers)
+ //err := consumerCh.consumers.Close()
+ delete(sc.topicToConsumerChannelMap, topic.Name)
+ return err
+ }
+ return nil
+ }
+ logger.Warnw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
+ return errors.New("topic-does-not-exist")
+}
+
+func (sc *SaramaClient) clearTopicFromConsumerChannelMap(ctx context.Context, topic Topic) error {
+ sc.lockTopicToConsumerChannelMap.Lock()
+ defer sc.lockTopicToConsumerChannelMap.Unlock()
+ if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
+ for _, ch := range consumerCh.channels {
+ // Channel will be closed in the removeChannel method
+ removeChannel(ctx, consumerCh.channels, ch)
+ }
+ err := closeConsumers(ctx, consumerCh.consumers)
+ //if err == sarama.ErrUnknownTopicOrPartition {
+ // // Not an error
+ // err = nil
+ //}
+ //err := consumerCh.consumers.Close()
+ delete(sc.topicToConsumerChannelMap, topic.Name)
+ return err
+ }
+ logger.Debugw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
+ return nil
+}
+
+//createPublisher creates the publisher which is used to send a message onto kafka
+func (sc *SaramaClient) createPublisher(ctx context.Context) error {
+ // This Creates the publisher
+ config := sarama.NewConfig()
+ config.Producer.Partitioner = sarama.NewRandomPartitioner
+ config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
+ config.Producer.Flush.Messages = sc.producerFlushMessages
+ config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
+ config.Producer.Return.Errors = sc.producerReturnErrors
+ config.Producer.Return.Successes = sc.producerReturnSuccess
+ //config.Producer.RequiredAcks = sarama.WaitForAll
+ config.Producer.RequiredAcks = sarama.WaitForLocal
+
+ brokers := []string{sc.KafkaAddress}
+
+ if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
+ logger.Errorw(ctx, "error-starting-publisher", log.Fields{"error": err})
+ return err
+ } else {
+ sc.producer = producer
+ }
+ logger.Info(ctx, "Kafka-publisher-created")
+ return nil
+}
+
+func (sc *SaramaClient) createConsumer(ctx context.Context) error {
+ config := sarama.NewConfig()
+ config.Consumer.Return.Errors = true
+ config.Consumer.Fetch.Min = 1
+ config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
+ config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
+ config.Consumer.Offsets.Initial = sarama.OffsetNewest
+ config.Metadata.Retry.Max = sc.metadataMaxRetry
+ brokers := []string{sc.KafkaAddress}
+
+ if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
+ logger.Errorw(ctx, "error-starting-consumers", log.Fields{"error": err})
+ return err
+ } else {
+ sc.consumer = consumer
+ }
+ logger.Info(ctx, "Kafka-consumers-created")
+ return nil
+}
+
+// createGroupConsumer creates a consumers group
+func (sc *SaramaClient) createGroupConsumer(ctx context.Context, topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
+ config := scc.NewConfig()
+ config.ClientID = uuid.New().String()
+ config.Group.Mode = scc.ConsumerModeMultiplex
+ config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
+ config.Consumer.Return.Errors = true
+ //config.Group.Return.Notifications = false
+ //config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
+ //config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
+ config.Consumer.Offsets.Initial = initialOffset
+ //config.Consumer.Offsets.Initial = sarama.OffsetOldest
+ brokers := []string{sc.KafkaAddress}
+
+ topics := []string{topic.Name}
+ var consumer *scc.Consumer
+ var err error
+
+ if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
+ logger.Errorw(ctx, "create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+ return nil, err
+ }
+ logger.Debugw(ctx, "create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
+
+ //sc.groupConsumers[topic.Name] = consumer
+ sc.addToGroupConsumers(topic.Name, consumer)
+ return consumer, nil
+}
+
+// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
+// topic via the unique channel each subscriber received during subscription
+func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
+ // Need to go over all channels and publish messages to them - do we need to copy msg?
+ sc.lockTopicToConsumerChannelMap.RLock()
+ for _, ch := range consumerCh.channels {
+ go func(c chan *ic.InterContainerMessage) {
+ c <- protoMessage
+ }(ch)
+ }
+ sc.lockTopicToConsumerChannelMap.RUnlock()
+
+ if callback := sc.metadataCallback; callback != nil {
+ ts, _ := ptypes.Timestamp(protoMessage.Header.Timestamp)
+ callback(protoMessage.Header.FromTopic, ts)
+ }
+}
+
+func (sc *SaramaClient) consumeFromAPartition(ctx context.Context, topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
+ logger.Debugw(ctx, "starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
+startloop:
+ for {
+ select {
+ case err, ok := <-consumer.Errors():
+ if ok {
+ if sc.isLivenessError(ctx, err) {
+ sc.updateLiveness(ctx, false)
+ logger.Warnw(ctx, "partition-consumers-error", log.Fields{"error": err})
+ }
+ } else {
+ // Channel is closed
+ break startloop
+ }
+ case msg, ok := <-consumer.Messages():
+ //logger.Debugw(ctx, "message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
+ if !ok {
+ // channel is closed
+ break startloop
+ }
+ msgBody := msg.Value
+ sc.updateLiveness(ctx, true)
+ logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
+ icm := &ic.InterContainerMessage{}
+ if err := proto.Unmarshal(msgBody, icm); err != nil {
+ logger.Warnw(ctx, "partition-invalid-message", log.Fields{"error": err})
+ continue
+ }
+ go sc.dispatchToConsumers(consumerChnls, icm)
+ case <-sc.doneCh:
+ logger.Infow(ctx, "partition-received-exit-signal", log.Fields{"topic": topic.Name})
+ break startloop
+ }
+ }
+ logger.Infow(ctx, "partition-consumer-stopped", log.Fields{"topic": topic.Name})
+ sc.setUnhealthy(ctx)
+}
+
+func (sc *SaramaClient) consumeGroupMessages(ctx context.Context, topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
+ logger.Debugw(ctx, "starting-group-consumption-loop", log.Fields{"topic": topic.Name})
+
+startloop:
+ for {
+ select {
+ case err, ok := <-consumer.Errors():
+ if ok {
+ if sc.isLivenessError(ctx, err) {
+ sc.updateLiveness(ctx, false)
+ }
+ logger.Warnw(ctx, "group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
+ } else {
+ logger.Warnw(ctx, "group-consumers-closed-err", log.Fields{"topic": topic.Name})
+ // channel is closed
+ break startloop
+ }
+ case msg, ok := <-consumer.Messages():
+ if !ok {
+ logger.Warnw(ctx, "group-consumers-closed-msg", log.Fields{"topic": topic.Name})
+ // Channel closed
+ break startloop
+ }
+ sc.updateLiveness(ctx, true)
+ logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
+ msgBody := msg.Value
+ icm := &ic.InterContainerMessage{}
+ if err := proto.Unmarshal(msgBody, icm); err != nil {
+ logger.Warnw(ctx, "invalid-message", log.Fields{"error": err})
+ continue
+ }
+ go sc.dispatchToConsumers(consumerChnls, icm)
+ consumer.MarkOffset(msg, "")
+ case ntf := <-consumer.Notifications():
+ logger.Debugw(ctx, "group-received-notification", log.Fields{"notification": ntf})
+ case <-sc.doneCh:
+ logger.Infow(ctx, "group-received-exit-signal", log.Fields{"topic": topic.Name})
+ break startloop
+ }
+ }
+ logger.Infow(ctx, "group-consumer-stopped", log.Fields{"topic": topic.Name})
+ sc.setUnhealthy(ctx)
+}
+
+func (sc *SaramaClient) startConsumers(ctx context.Context, topic *Topic) error {
+ logger.Debugw(ctx, "starting-consumers", log.Fields{"topic": topic.Name})
+ var consumerCh *consumerChannels
+ if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
+ logger.Errorw(ctx, "consumers-not-exist", log.Fields{"topic": topic.Name})
+ return errors.New("consumers-not-exist")
+ }
+ // For each consumer listening for that topic, start a consumption loop
+ for _, consumer := range consumerCh.consumers {
+ if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
+ go sc.consumeFromAPartition(ctx, topic, pConsumer, consumerCh)
+ } else if gConsumer, ok := consumer.(*scc.Consumer); ok {
+ go sc.consumeGroupMessages(ctx, topic, gConsumer, consumerCh)
+ } else {
+ logger.Errorw(ctx, "invalid-consumer", log.Fields{"topic": topic})
+ return errors.New("invalid-consumer")
+ }
+ }
+ return nil
+}
+
+//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
+//// for that topic. It also starts the routine that listens for messages on that topic.
+func (sc *SaramaClient) setupPartitionConsumerChannel(ctx context.Context, topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
+ var pConsumers []sarama.PartitionConsumer
+ var err error
+
+ if pConsumers, err = sc.createPartitionConsumers(ctx, topic, initialOffset); err != nil {
+ logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
+ return nil, err
+ }
+
+ consumersIf := make([]interface{}, 0)
+ for _, pConsumer := range pConsumers {
+ consumersIf = append(consumersIf, pConsumer)
+ }
+
+ // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
+ // unbuffered to verify race conditions.
+ consumerListeningChannel := make(chan *ic.InterContainerMessage)
+ cc := &consumerChannels{
+ consumers: consumersIf,
+ channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
+ }
+
+ // Add the consumers channel to the map
+ sc.addTopicToConsumerChannelMap(topic.Name, cc)
+
+ //Start a consumers to listen on that specific topic
+ go func() {
+ if err := sc.startConsumers(ctx, topic); err != nil {
+ logger.Errorw(ctx, "start-consumers-failed", log.Fields{
+ "topic": topic,
+ "error": err})
+ }
+ }()
+
+ return consumerListeningChannel, nil
+}
+
+// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
+// for that topic. It also starts the routine that listens for messages on that topic.
+func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
+ // TODO: Replace this development partition consumers with a group consumers
+ var pConsumer *scc.Consumer
+ var err error
+ if pConsumer, err = sc.createGroupConsumer(ctx, topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
+ logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
+ return nil, err
+ }
+ // Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
+ // unbuffered to verify race conditions.
+ consumerListeningChannel := make(chan *ic.InterContainerMessage)
+ cc := &consumerChannels{
+ consumers: []interface{}{pConsumer},
+ channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
+ }
+
+ // Add the consumers channel to the map
+ sc.addTopicToConsumerChannelMap(topic.Name, cc)
+
+ //Start a consumers to listen on that specific topic
+ go func() {
+ if err := sc.startConsumers(ctx, topic); err != nil {
+ logger.Errorw(ctx, "start-consumers-failed", log.Fields{
+ "topic": topic,
+ "error": err})
+ }
+ }()
+
+ return consumerListeningChannel, nil
+}
+
+func (sc *SaramaClient) createPartitionConsumers(ctx context.Context, topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
+ logger.Debugw(ctx, "creating-partition-consumers", log.Fields{"topic": topic.Name})
+ partitionList, err := sc.consumer.Partitions(topic.Name)
+ if err != nil {
+ logger.Warnw(ctx, "get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
+ return nil, err
+ }
+
+ pConsumers := make([]sarama.PartitionConsumer, 0)
+ for _, partition := range partitionList {
+ var pConsumer sarama.PartitionConsumer
+ if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
+ logger.Warnw(ctx, "consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
+ return nil, err
+ }
+ pConsumers = append(pConsumers, pConsumer)
+ }
+ return pConsumers, nil
+}
+
+func removeChannel(ctx context.Context, channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
+ var i int
+ var channel chan *ic.InterContainerMessage
+ for i, channel = range channels {
+ if channel == ch {
+ channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
+ close(channel)
+ logger.Debug(ctx, "channel-closed")
+ return channels[:len(channels)-1]
+ }
+ }
+ return channels
+}
+
+func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
+ sc.lockOfGroupConsumers.Lock()
+ defer sc.lockOfGroupConsumers.Unlock()
+ if _, exist := sc.groupConsumers[topic]; !exist {
+ sc.groupConsumers[topic] = consumer
+ }
+}
+
+func (sc *SaramaClient) deleteFromGroupConsumers(ctx context.Context, topic string) error {
+ sc.lockOfGroupConsumers.Lock()
+ defer sc.lockOfGroupConsumers.Unlock()
+ if _, exist := sc.groupConsumers[topic]; exist {
+ consumer := sc.groupConsumers[topic]
+ delete(sc.groupConsumers, topic)
+ if err := consumer.Close(); err != nil {
+ logger.Errorw(ctx, "failure-closing-consumer", log.Fields{"error": err})
+ return err
+ }
+ }
+ return nil
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/utils.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/utils.go
new file mode 100644
index 0000000..bdc615f
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/utils.go
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+ "github.com/golang/protobuf/ptypes/any"
+ "strings"
+)
+
+const (
+ TopicSeparator = "_"
+ DeviceIdLength = 24
+)
+
+// A Topic definition - may be augmented with additional attributes eventually
+type Topic struct {
+ // The name of the topic. It must start with a letter,
+ // and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`),
+ // underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent
+ // signs (`%`).
+ Name string
+}
+
+type KVArg struct {
+ Key string
+ Value interface{}
+}
+
+type RpcMType int
+
+const (
+ RpcFormattingError RpcMType = iota
+ RpcSent
+ RpcReply
+ RpcTimeout
+ RpcTransportError
+ RpcSystemClosing
+)
+
+type RpcResponse struct {
+ MType RpcMType
+ Err error
+ Reply *any.Any
+}
+
+func NewResponse(messageType RpcMType, err error, body *any.Any) *RpcResponse {
+ return &RpcResponse{
+ MType: messageType,
+ Err: err,
+ Reply: body,
+ }
+}
+
+// TODO: Remove and provide better may to get the device id
+// GetDeviceIdFromTopic extract the deviceId from the topic name. The topic name is formatted either as:
+// <any string> or <any string>_<deviceId>. The device Id is 24 characters long.
+func GetDeviceIdFromTopic(topic Topic) string {
+ pos := strings.LastIndex(topic.Name, TopicSeparator)
+ if pos == -1 {
+ return ""
+ }
+ adjustedPos := pos + len(TopicSeparator)
+ if adjustedPos >= len(topic.Name) {
+ return ""
+ }
+ deviceId := topic.Name[adjustedPos:len(topic.Name)]
+ if len(deviceId) != DeviceIdLength {
+ return ""
+ }
+ return deviceId
+}