[VOL-4290] Voltha go library updates for gRPC migration
Change-Id: I1aa2774beb6b7ed7419bc45aeb53fcae8a8ecda0
diff --git a/pkg/kafka/client.go b/pkg/kafka/client.go
index eee9631..fdc05bc 100755
--- a/pkg/kafka/client.go
+++ b/pkg/kafka/client.go
@@ -12,6 +12,11 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
+ *
+ *
+ * NOTE: The kafka client is used to publish events on Kafka in voltha
+ * release 2.9. It is no longer used for inter voltha container
+ * communication.
*/
package kafka
@@ -19,7 +24,7 @@
"context"
"time"
- ca "github.com/opencord/voltha-protos/v4/go/inter_container"
+ "github.com/golang/protobuf/proto"
)
const (
@@ -55,6 +60,7 @@
DefaultNumberReplicas = 1
DefaultAutoCreateTopic = false
DefaultMetadataMaxRetry = 3
+ DefaultMaxRetries = 3
DefaultLivenessChannelInterval = time.Second * 30
)
@@ -64,8 +70,8 @@
Stop(ctx context.Context)
CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error
DeleteTopic(ctx context.Context, topic *Topic) error
- Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan *ca.InterContainerMessage, error)
- UnSubscribe(ctx context.Context, topic *Topic, ch <-chan *ca.InterContainerMessage) error
+ Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan proto.Message, error)
+ UnSubscribe(ctx context.Context, topic *Topic, ch <-chan proto.Message) error
SubscribeForMetadata(context.Context, func(fromTopic string, timestamp time.Time))
Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error
SendLiveness(ctx context.Context) error
diff --git a/pkg/kafka/common.go b/pkg/kafka/common.go
index aa37746..c0d169a 100644
--- a/pkg/kafka/common.go
+++ b/pkg/kafka/common.go
@@ -16,7 +16,7 @@
package kafka
import (
- "github.com/opencord/voltha-lib-go/v6/pkg/log"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
)
var logger log.CLogger
diff --git a/pkg/kafka/endpoint_manager.go b/pkg/kafka/endpoint_manager.go
deleted file mode 100644
index b10d7bf..0000000
--- a/pkg/kafka/endpoint_manager.go
+++ /dev/null
@@ -1,352 +0,0 @@
-/*
- * Copyright 2020-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka
-
-import (
- "context"
- "fmt"
- "github.com/buraksezer/consistent"
- "github.com/cespare/xxhash"
- "github.com/golang/protobuf/proto"
- "github.com/opencord/voltha-lib-go/v6/pkg/db"
- "github.com/opencord/voltha-lib-go/v6/pkg/log"
- "github.com/opencord/voltha-protos/v4/go/voltha"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
- "sync"
-)
-
-const (
- // All the values below can be tuned to get optimal data distribution. The numbers below seems to work well when
- // supporting 1000-10000 devices and 1 - 20 replicas of a service
-
- // Keys are distributed among partitions. Prime numbers are good to distribute keys uniformly.
- DefaultPartitionCount = 1117
-
- // Represents how many times a node is replicated on the consistent ring.
- DefaultReplicationFactor = 117
-
- // Load is used to calculate average load.
- DefaultLoad = 1.1
-)
-
-type Endpoint string // Endpoint of a service instance. When using kafka, this is the topic name of a service
-type ReplicaID int32 // The replication ID of a service instance
-
-type EndpointManager interface {
-
- // GetEndpoint is called to get the endpoint to communicate with for a specific device and service type. For
- // now this will return the topic name
- GetEndpoint(ctx context.Context, deviceID string, serviceType string) (Endpoint, error)
-
- // IsDeviceOwnedByService is invoked when a specific service (service type + replicaNumber) is restarted and
- // devices owned by that service need to be reconciled
- IsDeviceOwnedByService(ctx context.Context, deviceID string, serviceType string, replicaNumber int32) (bool, error)
-
- // GetReplicaAssignment returns the replica number of the service that owns the deviceID. This is used by the
- // test only
- GetReplicaAssignment(ctx context.Context, deviceID string, serviceType string) (ReplicaID, error)
-}
-
-type service struct {
- id string // Id of the service. The same id is used for all replicas
- totalReplicas int32
- replicas map[ReplicaID]Endpoint
- consistentRing *consistent.Consistent
-}
-
-type endpointManager struct {
- partitionCount int
- replicationFactor int
- load float64
- backend *db.Backend
- services map[string]*service
- servicesLock sync.RWMutex
- deviceTypeServiceMap map[string]string
- deviceTypeServiceMapLock sync.RWMutex
-}
-
-type EndpointManagerOption func(*endpointManager)
-
-func PartitionCount(count int) EndpointManagerOption {
- return func(args *endpointManager) {
- args.partitionCount = count
- }
-}
-
-func ReplicationFactor(replicas int) EndpointManagerOption {
- return func(args *endpointManager) {
- args.replicationFactor = replicas
- }
-}
-
-func Load(load float64) EndpointManagerOption {
- return func(args *endpointManager) {
- args.load = load
- }
-}
-
-func newEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
- tm := &endpointManager{
- partitionCount: DefaultPartitionCount,
- replicationFactor: DefaultReplicationFactor,
- load: DefaultLoad,
- backend: backend,
- services: make(map[string]*service),
- deviceTypeServiceMap: make(map[string]string),
- }
-
- for _, option := range opts {
- option(tm)
- }
- return tm
-}
-
-func NewEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
- return newEndpointManager(backend, opts...)
-}
-
-func (ep *endpointManager) GetEndpoint(ctx context.Context, deviceID string, serviceType string) (Endpoint, error) {
- logger.Debugw(ctx, "getting-endpoint", log.Fields{"device-id": deviceID, "service": serviceType})
- owner, err := ep.getOwner(ctx, deviceID, serviceType)
- if err != nil {
- return "", err
- }
- m, ok := owner.(Member)
- if !ok {
- return "", status.Errorf(codes.Aborted, "invalid-member-%v", owner)
- }
- endpoint := m.getEndPoint()
- if endpoint == "" {
- return "", status.Errorf(codes.Unavailable, "endpoint-not-set-%s", serviceType)
- }
- logger.Debugw(ctx, "returning-endpoint", log.Fields{"device-id": deviceID, "service": serviceType, "endpoint": endpoint})
- return endpoint, nil
-}
-
-func (ep *endpointManager) IsDeviceOwnedByService(ctx context.Context, deviceID string, serviceType string, replicaNumber int32) (bool, error) {
- logger.Debugw(ctx, "device-ownership", log.Fields{"device-id": deviceID, "service": serviceType, "replica-number": replicaNumber})
- owner, err := ep.getOwner(ctx, deviceID, serviceType)
- if err != nil {
- return false, nil
- }
- m, ok := owner.(Member)
- if !ok {
- return false, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
- }
- return m.getReplica() == ReplicaID(replicaNumber), nil
-}
-
-func (ep *endpointManager) GetReplicaAssignment(ctx context.Context, deviceID string, serviceType string) (ReplicaID, error) {
- owner, err := ep.getOwner(ctx, deviceID, serviceType)
- if err != nil {
- return 0, nil
- }
- m, ok := owner.(Member)
- if !ok {
- return 0, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
- }
- return m.getReplica(), nil
-}
-
-func (ep *endpointManager) getOwner(ctx context.Context, deviceID string, serviceType string) (consistent.Member, error) {
- serv, dType, err := ep.getServiceAndDeviceType(ctx, serviceType)
- if err != nil {
- return nil, err
- }
- key := ep.makeKey(deviceID, dType, serviceType)
- return serv.consistentRing.LocateKey(key), nil
-}
-
-func (ep *endpointManager) getServiceAndDeviceType(ctx context.Context, serviceType string) (*service, string, error) {
- // Check whether service exist
- ep.servicesLock.RLock()
- serv, serviceExist := ep.services[serviceType]
- ep.servicesLock.RUnlock()
-
- // Load the service and device types if needed
- if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
- if err := ep.loadServices(ctx); err != nil {
- return nil, "", err
- }
-
- // Check whether the service exists now
- ep.servicesLock.RLock()
- serv, serviceExist = ep.services[serviceType]
- ep.servicesLock.RUnlock()
- if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
- return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
- }
- }
-
- ep.deviceTypeServiceMapLock.RLock()
- defer ep.deviceTypeServiceMapLock.RUnlock()
- for dType, sType := range ep.deviceTypeServiceMap {
- if sType == serviceType {
- return serv, dType, nil
- }
- }
- return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
-}
-
-func (ep *endpointManager) getConsistentConfig() consistent.Config {
- return consistent.Config{
- PartitionCount: ep.partitionCount,
- ReplicationFactor: ep.replicationFactor,
- Load: ep.load,
- Hasher: hasher{},
- }
-}
-
-// loadServices loads the services (adapters) and device types in memory. Because of the small size of the data and
-// the data format in the dB being binary protobuf then it is better to load all the data if inconsistency is detected,
-// instead of watching for updates in the dB and acting on it.
-func (ep *endpointManager) loadServices(ctx context.Context) error {
- ep.servicesLock.Lock()
- defer ep.servicesLock.Unlock()
- ep.deviceTypeServiceMapLock.Lock()
- defer ep.deviceTypeServiceMapLock.Unlock()
-
- if ep.backend == nil {
- return status.Error(codes.Aborted, "backend-not-set")
- }
- ep.services = make(map[string]*service)
- ep.deviceTypeServiceMap = make(map[string]string)
-
- // Load the adapters
- blobs, err := ep.backend.List(log.WithSpanFromContext(context.Background(), ctx), "adapters")
- if err != nil {
- return err
- }
-
- // Data is marshalled as proto bytes in the data store
- for _, blob := range blobs {
- data := blob.Value.([]byte)
- adapter := &voltha.Adapter{}
- if err := proto.Unmarshal(data, adapter); err != nil {
- return err
- }
- // A valid adapter should have the vendorID set
- if adapter.Vendor != "" {
- if _, ok := ep.services[adapter.Type]; !ok {
- ep.services[adapter.Type] = &service{
- id: adapter.Type,
- totalReplicas: adapter.TotalReplicas,
- replicas: make(map[ReplicaID]Endpoint),
- consistentRing: consistent.New(nil, ep.getConsistentConfig()),
- }
-
- }
- currentReplica := ReplicaID(adapter.CurrentReplica)
- endpoint := Endpoint(adapter.Endpoint)
- ep.services[adapter.Type].replicas[currentReplica] = endpoint
- ep.services[adapter.Type].consistentRing.Add(newMember(adapter.Id, adapter.Type, adapter.Vendor, endpoint, adapter.Version, currentReplica))
- }
- }
- // Load the device types
- blobs, err = ep.backend.List(log.WithSpanFromContext(context.Background(), ctx), "device_types")
- if err != nil {
- return err
- }
- for _, blob := range blobs {
- data := blob.Value.([]byte)
- deviceType := &voltha.DeviceType{}
- if err := proto.Unmarshal(data, deviceType); err != nil {
- return err
- }
- if _, ok := ep.deviceTypeServiceMap[deviceType.Id]; !ok {
- ep.deviceTypeServiceMap[deviceType.Id] = deviceType.Adapter
- }
- }
-
- // Log the loaded data in debug mode to facilitate trouble shooting
- if logger.V(log.DebugLevel) {
- for key, val := range ep.services {
- members := val.consistentRing.GetMembers()
- logger.Debugw(ctx, "service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())})
- for _, m := range members {
- n := m.(Member)
- logger.Debugw(ctx, "service-loaded", log.Fields{"serviceId": n.getID(), "serviceType": n.getServiceType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()})
- }
- }
- logger.Debugw(ctx, "device-types-loaded", log.Fields{"device-types": ep.deviceTypeServiceMap})
- }
- return nil
-}
-
-// makeKey creates the string that the hash function uses to create the hash
-func (ep *endpointManager) makeKey(deviceID string, deviceType string, serviceType string) []byte {
- return []byte(fmt.Sprintf("%s_%s_%s", serviceType, deviceType, deviceID))
-}
-
-// The consistent package requires a hasher function
-type hasher struct{}
-
-// Sum64 provides the hasher function. Based upon numerous testing scenarios, the xxhash package seems to provide the
-// best distribution compare to other hash packages
-func (h hasher) Sum64(data []byte) uint64 {
- return xxhash.Sum64(data)
-}
-
-// Member represents a member on the consistent ring
-type Member interface {
- String() string
- getReplica() ReplicaID
- getEndPoint() Endpoint
- getID() string
- getServiceType() string
-}
-
-// member implements the Member interface
-type member struct {
- id string
- serviceType string
- vendor string
- version string
- replica ReplicaID
- endpoint Endpoint
-}
-
-func newMember(ID string, serviceType string, vendor string, endPoint Endpoint, version string, replica ReplicaID) Member {
- return &member{
- id: ID,
- serviceType: serviceType,
- vendor: vendor,
- version: version,
- replica: replica,
- endpoint: endPoint,
- }
-}
-
-func (m *member) String() string {
- return string(m.endpoint)
-}
-
-func (m *member) getReplica() ReplicaID {
- return m.replica
-}
-
-func (m *member) getEndPoint() Endpoint {
- return m.endpoint
-}
-
-func (m *member) getID() string {
- return m.id
-}
-
-func (m *member) getServiceType() string {
- return m.serviceType
-}
diff --git a/pkg/kafka/endpoint_manager_test.go b/pkg/kafka/endpoint_manager_test.go
deleted file mode 100644
index a45b3ee..0000000
--- a/pkg/kafka/endpoint_manager_test.go
+++ /dev/null
@@ -1,257 +0,0 @@
-/*
- * Copyright 2020-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka
-
-import (
- "context"
- "fmt"
- "github.com/golang/protobuf/proto"
- "github.com/google/uuid"
- "github.com/opencord/voltha-lib-go/v6/pkg/db"
- "github.com/opencord/voltha-lib-go/v6/pkg/log"
- "github.com/opencord/voltha-lib-go/v6/pkg/mocks/etcd"
- "github.com/opencord/voltha-protos/v4/go/voltha"
- "github.com/phayes/freeport"
- "github.com/stretchr/testify/assert"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
- "math"
- "strconv"
- "testing"
- "time"
-)
-
-type EPTest struct {
- etcdServer *etcd.EtcdServer
- backend *db.Backend
- maxReplicas int
- minReplicas int
-}
-
-func newEPTest(minReplicas, maxReplicas int) *EPTest {
- ctx := context.Background()
- test := &EPTest{
- minReplicas: minReplicas,
- maxReplicas: maxReplicas,
- }
-
- // Create backend
- if err := test.initBackend(); err != nil {
- logger.Fatalw(ctx, "setting-backend-failed", log.Fields{"error": err})
- }
-
- // Populate backend with data
- if err := test.populateBackend(); err != nil {
- logger.Fatalw(ctx, "populating-db-failed", log.Fields{"error": err})
- }
- return test
-}
-
-func (ep *EPTest) initBackend() error {
- ctx := context.Background()
- configName := "voltha-lib.kafka.ep.test"
- storageDir := "voltha-lib.kafka.ep.etcd"
- logLevel := "error"
- timeout := 5 * time.Second
-
- kvClientPort, err := freeport.GetFreePort()
- if err != nil {
- return err
- }
- peerPort, err := freeport.GetFreePort()
- if err != nil {
- return err
- }
- ep.etcdServer = etcd.StartEtcdServer(ctx, etcd.MKConfig(ctx, configName, kvClientPort, peerPort, storageDir, logLevel))
- if ep.etcdServer == nil {
- return status.Error(codes.Internal, "Embedded server failed to start")
- }
-
- ep.backend = db.NewBackend(ctx, "etcd", "127.0.0.1"+":"+strconv.Itoa(kvClientPort), timeout, "service/voltha")
- return nil
-}
-
-func (ep *EPTest) stopAll() {
- if ep.etcdServer != nil {
- ep.etcdServer.Stop(context.Background())
- }
-}
-
-func (ep *EPTest) populateBackend() error {
- // Add an adapter with multiple replicas
- adapterPrefix := "adapter_brcm_openomci_onu"
- numReplicas := ep.maxReplicas
- for i := 0; i < numReplicas; i++ {
- adapter := &voltha.Adapter{
- Id: fmt.Sprintf("%s_%d", adapterPrefix, i),
- Vendor: "VOLTHA OpenONU",
- Version: "2.4.0-dev0",
- Type: adapterPrefix,
- CurrentReplica: int32(i),
- TotalReplicas: int32(numReplicas),
- Endpoint: fmt.Sprintf("%s_%d", adapterPrefix, i),
- }
- adapterKVKey := fmt.Sprintf("%s/%d", adapterPrefix, i)
- blob, err := proto.Marshal(adapter)
- if err != nil {
- return err
- }
- if err := ep.backend.Put(context.Background(), "adapters/"+adapterKVKey, blob); err != nil {
- return err
- }
- }
-
- // Add an adapter with minreplicas
- adapterPrefix = "adapter_openolt"
- numReplicas = ep.minReplicas
- for i := 0; i < numReplicas; i++ {
- adapter := &voltha.Adapter{
- Id: fmt.Sprintf("%s_%d", adapterPrefix, i),
- Vendor: "VOLTHA OpenOLT",
- Version: "2.3.1-dev",
- Type: adapterPrefix,
- CurrentReplica: int32(i),
- TotalReplicas: int32(numReplicas),
- Endpoint: fmt.Sprintf("%s_%d", adapterPrefix, i),
- }
- adapterKVKey := fmt.Sprintf("%s/%d", adapterPrefix, i)
- blob, err := proto.Marshal(adapter)
- if err != nil {
- return err
- }
- if err := ep.backend.Put(context.Background(), "adapters/"+adapterKVKey, blob); err != nil {
- return err
- }
- }
-
- // Add the brcm_openomci_onu device type
- dType := "brcm_openomci_onu"
- adapterName := "adapter_brcm_openomci_onu"
- deviceType := &voltha.DeviceType{
- Id: dType,
- VendorIds: []string{"OPEN", "ALCL", "BRCM", "TWSH", "ALPH", "ISKT", "SFAA", "BBSM", "SCOM", "ARPX", "DACM", "ERSN", "HWTC", "CIGG"},
- Adapter: adapterName,
- AcceptsAddRemoveFlowUpdates: true,
- }
- blob, err := proto.Marshal(deviceType)
- if err != nil {
- return err
- }
- if err := ep.backend.Put(context.Background(), "device_types/"+deviceType.Id, blob); err != nil {
- return err
- }
-
- // Add the openolt device type
- dType = "openolt"
- adapterName = "adapter_openolt"
- deviceType = &voltha.DeviceType{
- Id: dType,
- Adapter: adapterName,
- AcceptsAddRemoveFlowUpdates: true,
- }
- blob, err = proto.Marshal(deviceType)
- if err != nil {
- return err
- }
- if err := ep.backend.Put(context.Background(), "device_types/"+deviceType.Id, blob); err != nil {
- return err
- }
- return nil
-}
-
-func getMeanAndStdDeviation(val []int, replicas int) (float64, float64) {
- var sum, mean, sd float64
- for i := 0; i < replicas; i++ {
- sum += float64(val[i])
- }
- mean = sum / float64(replicas)
-
- for j := 0; j < replicas; j++ {
- sd += math.Pow(float64(val[j])-mean, 2)
- }
- sd = math.Sqrt(sd / float64(replicas))
- return mean, sd
-}
-
-func (ep *EPTest) testEndpointManagerAPIs(t *testing.T, tm EndpointManager, serviceType string, deviceType string, replicas int) {
- ctx := context.Background()
- // Map of device ids to topic
- deviceIDs := make(map[string]Endpoint)
- numDevices := 1000
- total := make([]int, replicas)
- for i := 0; i < numDevices; i++ {
- deviceID := uuid.New().String()
- endpoint, err := tm.GetEndpoint(ctx, deviceID, serviceType)
- if err != nil {
- logger.Fatalw(ctx, "error-getting-endpoint", log.Fields{"error": err})
- }
- deviceIDs[deviceID] = endpoint
- replicaID, err := tm.GetReplicaAssignment(ctx, deviceID, serviceType)
- if err != nil {
- logger.Fatalw(ctx, "error-getting-endpoint", log.Fields{"error": err})
- }
- total[replicaID] += 1
- }
-
- mean, sdtDev := getMeanAndStdDeviation(total, replicas)
- fmt.Println(fmt.Sprintf("Device distributions => devices:%d service_replicas:%d mean:%d standard_deviation:%d, distributions:%v", numDevices, replicas, int(mean), int(sdtDev), total))
-
- // Verify that we get the same topic for a given device ID, irrespective of the number of iterations
- numIterations := 10
- for i := 0; i < numIterations; i++ {
- for deviceID, expectedEndpoint := range deviceIDs {
- endpointByServiceType, err := tm.GetEndpoint(ctx, deviceID, serviceType)
- if err != nil {
- logger.Fatalw(ctx, "error-getting-endpoint", log.Fields{"error": err})
- }
- assert.Equal(t, expectedEndpoint, endpointByServiceType)
- }
- }
-
- // Verify that a device belong to the correct node
- for deviceID := range deviceIDs {
- replicaID, err := tm.GetReplicaAssignment(ctx, deviceID, serviceType)
- if err != nil {
- logger.Fatalw(ctx, "error-getting-topic", log.Fields{"error": err})
- }
- for k := 0; k < replicas; k++ {
- owned, err := tm.IsDeviceOwnedByService(ctx, deviceID, serviceType, int32(k))
- if err != nil {
- logger.Fatalw(ctx, "error-verifying-device-ownership", log.Fields{"error": err})
- }
- assert.Equal(t, ReplicaID(k) == replicaID, owned)
- }
- }
-}
-
-func TestEndpointManagerSuite(t *testing.T) {
- tmt := newEPTest(1, 10)
- assert.NotNil(t, tmt)
-
- tm := NewEndpointManager(
- tmt.backend,
- PartitionCount(1117),
- ReplicationFactor(200),
- Load(1.1))
-
- defer tmt.stopAll()
-
- //1. Test APIs with multiple replicas
- tmt.testEndpointManagerAPIs(t, tm, "adapter_brcm_openomci_onu", "brcm_openomci_onu", tmt.maxReplicas)
-
- //2. Test APIs with single replica
- tmt.testEndpointManagerAPIs(t, tm, "adapter_openolt", "openolt", tmt.minReplicas)
-}
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
deleted file mode 100644
index 0f006dd..0000000
--- a/pkg/kafka/kafka_inter_container_library.go
+++ /dev/null
@@ -1,1097 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka
-
-import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "reflect"
- "strings"
- "sync"
- "time"
-
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
-
- "github.com/golang/protobuf/proto"
- "github.com/golang/protobuf/ptypes"
- "github.com/golang/protobuf/ptypes/any"
- "github.com/google/uuid"
- "github.com/opencord/voltha-lib-go/v6/pkg/log"
- ic "github.com/opencord/voltha-protos/v4/go/inter_container"
- "github.com/opentracing/opentracing-go"
-)
-
-const (
- DefaultMaxRetries = 3
- DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
-)
-
-const (
- TransactionKey = "transactionID"
- FromTopic = "fromTopic"
-)
-
-var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
-var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
-
-// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
-// obtained from that channel, this interface is invoked. This is used to handle
-// async requests into the Core via the kafka messaging bus
-type requestHandlerChannel struct {
- requesthandlerInterface interface{}
- ch <-chan *ic.InterContainerMessage
-}
-
-// transactionChannel represents a combination of a topic and a channel onto which a response received
-// on the kafka bus will be sent to
-type transactionChannel struct {
- topic *Topic
- ch chan *ic.InterContainerMessage
-}
-
-type InterContainerProxy interface {
- Start(ctx context.Context) error
- Stop(ctx context.Context)
- GetDefaultTopic() *Topic
- InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
- InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
- SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error
- SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error
- UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error
- DeleteTopic(ctx context.Context, topic Topic) error
- EnableLivenessChannel(ctx context.Context, enable bool) chan bool
- SendLiveness(ctx context.Context) error
-}
-
-// interContainerProxy represents the messaging proxy
-type interContainerProxy struct {
- kafkaAddress string
- defaultTopic *Topic
- defaultRequestHandlerInterface interface{}
- kafkaClient Client
- doneCh chan struct{}
- doneOnce sync.Once
-
- // This map is used to map a topic to an interface and channel. When a request is received
- // on that channel (registered to the topic) then that interface is invoked.
- topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
- lockTopicRequestHandlerChannelMap sync.RWMutex
-
- // This map is used to map a channel to a response topic. This channel handles all responses on that
- // channel for that topic and forward them to the appropriate consumers channel, using the
- // transactionIdToChannelMap.
- topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
- lockTopicResponseChannelMap sync.RWMutex
-
- // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
- // sent out and we are waiting for a response.
- transactionIdToChannelMap map[string]*transactionChannel
- lockTransactionIdToChannelMap sync.RWMutex
-}
-
-type InterContainerProxyOption func(*interContainerProxy)
-
-func InterContainerAddress(address string) InterContainerProxyOption {
- return func(args *interContainerProxy) {
- args.kafkaAddress = address
- }
-}
-
-func DefaultTopic(topic *Topic) InterContainerProxyOption {
- return func(args *interContainerProxy) {
- args.defaultTopic = topic
- }
-}
-
-func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
- return func(args *interContainerProxy) {
- args.defaultRequestHandlerInterface = handler
- }
-}
-
-func MsgClient(client Client) InterContainerProxyOption {
- return func(args *interContainerProxy) {
- args.kafkaClient = client
- }
-}
-
-func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
- proxy := &interContainerProxy{
- kafkaAddress: DefaultKafkaAddress,
- doneCh: make(chan struct{}),
- }
-
- for _, option := range opts {
- option(proxy)
- }
-
- return proxy
-}
-
-func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
- return newInterContainerProxy(opts...)
-}
-
-func (kp *interContainerProxy) Start(ctx context.Context) error {
- logger.Info(ctx, "Starting-Proxy")
-
- // Kafka MsgClient should already have been created. If not, output fatal error
- if kp.kafkaClient == nil {
- logger.Fatal(ctx, "kafka-client-not-set")
- }
-
- // Start the kafka client
- if err := kp.kafkaClient.Start(ctx); err != nil {
- logger.Errorw(ctx, "Cannot-create-kafka-proxy", log.Fields{"error": err})
- return err
- }
-
- // Create the topic to response channel map
- kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
- //
- // Create the transactionId to Channel Map
- kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
-
- // Create the topic to request channel map
- kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
-
- return nil
-}
-
-func (kp *interContainerProxy) Stop(ctx context.Context) {
- logger.Info(ctx, "stopping-intercontainer-proxy")
- kp.doneOnce.Do(func() { close(kp.doneCh) })
- // TODO : Perform cleanup
- kp.kafkaClient.Stop(ctx)
- err := kp.deleteAllTopicRequestHandlerChannelMap(ctx)
- if err != nil {
- logger.Errorw(ctx, "failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
- }
- err = kp.deleteAllTopicResponseChannelMap(ctx)
- if err != nil {
- logger.Errorw(ctx, "failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
- }
- kp.deleteAllTransactionIdToChannelMap(ctx)
-}
-
-func (kp *interContainerProxy) GetDefaultTopic() *Topic {
- return kp.defaultTopic
-}
-
-// InvokeAsyncRPC is used to make an RPC request asynchronously
-func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
- waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
-
- spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, !waitForResponse)
- if spanArg != nil {
- kvArgs = append(kvArgs, &spanArg[0])
- }
-
- defer span.Finish()
-
- logger.Debugw(ctx, "InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key, "kvArgs": kvArgs})
-
- // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
- // typically the device ID.
- responseTopic := replyToTopic
- if responseTopic == nil {
- responseTopic = kp.GetDefaultTopic()
- }
-
- chnl := make(chan *RpcResponse)
-
- go func() {
-
- // once we're done,
- // close the response channel
- defer close(chnl)
-
- var err error
- var protoRequest *ic.InterContainerMessage
-
- // Encode the request
- protoRequest, err = encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
- if err != nil {
- logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
- log.MarkSpanError(ctx, errors.New("cannot-format-request"))
- chnl <- NewResponse(RpcFormattingError, err, nil)
- return
- }
-
- // Subscribe for response, if needed, before sending request
- var ch <-chan *ic.InterContainerMessage
- if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
- logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
- log.MarkSpanError(ctx, errors.New("failed-to-subscribe-for-response"))
- chnl <- NewResponse(RpcTransportError, err, nil)
- return
- }
-
- // Send request - if the topic is formatted with a device Id then we will send the request using a
- // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
- // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
- logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
-
- // if the message is not sent on kafka publish an event an close the channel
- if err = kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
- chnl <- NewResponse(RpcTransportError, err, nil)
- return
- }
-
- // if the client is not waiting for a response send the ack and close the channel
- chnl <- NewResponse(RpcSent, nil, nil)
- if !waitForResponse {
- return
- }
-
- defer func() {
- // Remove the subscription for a response on return
- if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
- logger.Warnw(ctx, "invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
- }
- }()
-
- // Wait for response as well as timeout or cancellation
- select {
- case msg, ok := <-ch:
- if !ok {
- logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
- log.MarkSpanError(ctx, errors.New("channel-closed"))
- chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
- }
- logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
- if responseBody, err := decodeResponse(ctx, msg); err != nil {
- chnl <- NewResponse(RpcReply, err, nil)
- } else {
- if responseBody.Success {
- chnl <- NewResponse(RpcReply, nil, responseBody.Result)
- } else {
- // response body contains an error
- unpackErr := &ic.Error{}
- if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
- chnl <- NewResponse(RpcReply, err, nil)
- } else {
- chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
- }
- }
- }
- case <-ctx.Done():
- logger.Errorw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
- log.MarkSpanError(ctx, errors.New("context-cancelled"))
- err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
- chnl <- NewResponse(RpcTimeout, err, nil)
- case <-kp.doneCh:
- chnl <- NewResponse(RpcSystemClosing, nil, nil)
- logger.Warnw(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
- }
- }()
- return chnl
-}
-
-// Method to extract Open-tracing Span from Context and serialize it for transport over Kafka embedded as a additional argument.
-// Additional argument is injected using key as "span" and value as Span marshalled into a byte slice
-//
-// The span name is automatically constructed using the RPC name with following convention (<rpc-name> represents name of invoked method):
-// - RPC invoked in Sync manner (WaitForResponse=true) : kafka-rpc-<rpc-name>
-// - RPC invoked in Async manner (WaitForResponse=false) : kafka-async-rpc-<rpc-name>
-// - Inter Adapter RPC invoked in Sync manner (WaitForResponse=true) : kafka-inter-adapter-rpc-<rpc-name>
-// - Inter Adapter RPC invoked in Async manner (WaitForResponse=false) : kafka-inter-adapter-async-rpc-<rpc-name>
-func (kp *interContainerProxy) embedSpanAsArg(ctx context.Context, rpc string, isAsync bool) ([]KVArg, opentracing.Span, context.Context) {
- var err error
- var newCtx context.Context
- var spanToInject opentracing.Span
-
- if !log.GetGlobalLFM().GetLogCorrelationStatus() && !log.GetGlobalLFM().GetTracePublishingStatus() {
- // if both log correlation and trace publishing is disable do not generate the span
- logger.Debugw(ctx, "not-embedding-span-in-KVArg-", log.Fields{"rpc": rpc,
- "log-correlation-status": log.GetGlobalLFM().GetLogCorrelationStatus(), "trace-publishing-status": log.GetGlobalLFM().GetTracePublishingStatus()})
- return nil, opentracing.GlobalTracer().StartSpan(rpc), ctx
- }
-
- var spanName strings.Builder
- spanName.WriteString("kafka-")
-
- // In case of inter adapter message, use Msg Type for constructing RPC name
- if rpc == "process_inter_adapter_message" {
- if msgType, ok := ctx.Value("inter-adapter-msg-type").(ic.InterAdapterMessageType_Types); ok {
- spanName.WriteString("inter-adapter-")
- rpc = msgType.String()
- }
- }
-
- if isAsync {
- spanName.WriteString("async-rpc-")
- } else {
- spanName.WriteString("rpc-")
- }
- spanName.WriteString(rpc)
-
- if isAsync {
- spanToInject, newCtx = log.CreateAsyncSpan(ctx, spanName.String())
- } else {
- spanToInject, newCtx = log.CreateChildSpan(ctx, spanName.String())
- }
-
- spanToInject.SetBaggageItem("rpc-span-name", spanName.String())
-
- textMapCarrier := opentracing.TextMapCarrier(make(map[string]string))
- if err = opentracing.GlobalTracer().Inject(spanToInject.Context(), opentracing.TextMap, textMapCarrier); err != nil {
- logger.Warnw(ctx, "unable-to-serialize-span-to-textmap", log.Fields{"span": spanToInject, "error": err})
- return nil, spanToInject, newCtx
- }
-
- var textMapJson []byte
- if textMapJson, err = json.Marshal(textMapCarrier); err != nil {
- logger.Warnw(ctx, "unable-to-marshal-textmap-to-json-string", log.Fields{"textMap": textMapCarrier, "error": err})
- return nil, spanToInject, newCtx
- }
-
- spanArg := make([]KVArg, 1)
- spanArg[0] = KVArg{Key: "span", Value: &ic.StrType{Val: string(textMapJson)}}
- return spanArg, spanToInject, newCtx
-}
-
-// InvokeRPC is used to send a request to a given topic
-func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
- waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
-
- spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, false)
- if spanArg != nil {
- kvArgs = append(kvArgs, &spanArg[0])
- }
-
- defer span.Finish()
-
- logger.Debugw(ctx, "InvokeRPC", log.Fields{"rpc": rpc, "key": key, "kvArgs": kvArgs})
-
- // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
- // typically the device ID.
- responseTopic := replyToTopic
- if responseTopic == nil {
- responseTopic = kp.defaultTopic
- }
-
- // Encode the request
- protoRequest, err := encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
- if err != nil {
- logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
- log.MarkSpanError(ctx, errors.New("cannot-format-request"))
- return false, nil
- }
-
- // Subscribe for response, if needed, before sending request
- var ch <-chan *ic.InterContainerMessage
- if waitForResponse {
- var err error
- if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
- logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
- }
- }
-
- // Send request - if the topic is formatted with a device Id then we will send the request using a
- // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
- // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
- //key := GetDeviceIdFromTopic(*toTopic)
- logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
- go func() {
- if err := kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
- log.MarkSpanError(ctx, errors.New("send-failed"))
- logger.Errorw(ctx, "send-failed", log.Fields{
- "topic": toTopic,
- "key": key,
- "error": err})
- }
- }()
-
- if waitForResponse {
- // Create a child context based on the parent context, if any
- var cancel context.CancelFunc
- childCtx := context.Background()
- if ctx == nil {
- ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
- } else {
- childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
- }
- defer cancel()
-
- // Wait for response as well as timeout or cancellation
- // Remove the subscription for a response on return
- defer func() {
- if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
- logger.Errorw(ctx, "response-unsubscribe-failed", log.Fields{
- "id": protoRequest.Header.Id,
- "error": err})
- }
- }()
- select {
- case msg, ok := <-ch:
- if !ok {
- logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
- log.MarkSpanError(ctx, errors.New("channel-closed"))
- protoError := &ic.Error{Reason: "channel-closed"}
- var marshalledArg *any.Any
- if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
- return false, nil // Should never happen
- }
- return false, marshalledArg
- }
- logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
- var responseBody *ic.InterContainerResponseBody
- var err error
- if responseBody, err = decodeResponse(ctx, msg); err != nil {
- logger.Errorw(ctx, "decode-response-error", log.Fields{"error": err})
- // FIXME we should return something
- }
- return responseBody.Success, responseBody.Result
- case <-ctx.Done():
- logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
- log.MarkSpanError(ctx, errors.New("context-cancelled"))
- // pack the error as proto any type
- protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
-
- var marshalledArg *any.Any
- if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
- return false, nil // Should never happen
- }
- return false, marshalledArg
- case <-childCtx.Done():
- logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
- log.MarkSpanError(ctx, errors.New("context-cancelled"))
- // pack the error as proto any type
- protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
-
- var marshalledArg *any.Any
- if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
- return false, nil // Should never happen
- }
- return false, marshalledArg
- case <-kp.doneCh:
- logger.Infow(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
- return true, nil
- }
- }
- return true, nil
-}
-
-// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
-// when a message is received on a given topic
-func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error {
-
- // Subscribe to receive messages for that topic
- var ch <-chan *ic.InterContainerMessage
- var err error
- if ch, err = kp.kafkaClient.Subscribe(ctx, &topic); err != nil {
- //if ch, err = kp.Subscribe(topic); err != nil {
- logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
- return err
- }
-
- kp.defaultRequestHandlerInterface = handler
- kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
- // Launch a go routine to receive and process kafka messages
- go kp.waitForMessages(ctx, ch, topic, handler)
-
- return nil
-}
-
-// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
-// when a message is received on a given topic. So far there is only 1 target registered per microservice
-func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error {
- // Subscribe to receive messages for that topic
- var ch <-chan *ic.InterContainerMessage
- var err error
- if ch, err = kp.kafkaClient.Subscribe(ctx, &topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
- logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
- return err
- }
- kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
-
- // Launch a go routine to receive and process kafka messages
- go kp.waitForMessages(ctx, ch, topic, kp.defaultRequestHandlerInterface)
-
- return nil
-}
-
-func (kp *interContainerProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error {
- return kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name)
-}
-
-func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(ctx context.Context, topic string) error {
- kp.lockTopicResponseChannelMap.Lock()
- defer kp.lockTopicResponseChannelMap.Unlock()
- if _, exist := kp.topicToResponseChannelMap[topic]; exist {
- // Unsubscribe to this topic first - this will close the subscribed channel
- var err error
- if err = kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
- logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic})
- }
- delete(kp.topicToResponseChannelMap, topic)
- return err
- } else {
- return fmt.Errorf("%s-Topic-not-found", topic)
- }
-}
-
-// nolint: unused
-func (kp *interContainerProxy) deleteAllTopicResponseChannelMap(ctx context.Context) error {
- logger.Debug(ctx, "delete-all-topic-response-channel")
- kp.lockTopicResponseChannelMap.Lock()
- defer kp.lockTopicResponseChannelMap.Unlock()
- var unsubscribeFailTopics []string
- for topic := range kp.topicToResponseChannelMap {
- // Unsubscribe to this topic first - this will close the subscribed channel
- if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
- unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
- logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
- // Do not return. Continue to try to unsubscribe to other topics.
- } else {
- // Only delete from channel map if successfully unsubscribed.
- delete(kp.topicToResponseChannelMap, topic)
- }
- }
- if len(unsubscribeFailTopics) > 0 {
- return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
- }
- return nil
-}
-
-func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
- kp.lockTopicRequestHandlerChannelMap.Lock()
- defer kp.lockTopicRequestHandlerChannelMap.Unlock()
- if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
- kp.topicToRequestHandlerChannelMap[topic] = arg
- }
-}
-
-func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(ctx context.Context, topic string) error {
- kp.lockTopicRequestHandlerChannelMap.Lock()
- defer kp.lockTopicRequestHandlerChannelMap.Unlock()
- if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
- // Close the kafka client client first by unsubscribing to this topic
- if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
- return err
- }
- delete(kp.topicToRequestHandlerChannelMap, topic)
- return nil
- } else {
- return fmt.Errorf("%s-Topic-not-found", topic)
- }
-}
-
-// nolint: unused
-func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap(ctx context.Context) error {
- logger.Debug(ctx, "delete-all-topic-request-channel")
- kp.lockTopicRequestHandlerChannelMap.Lock()
- defer kp.lockTopicRequestHandlerChannelMap.Unlock()
- var unsubscribeFailTopics []string
- for topic := range kp.topicToRequestHandlerChannelMap {
- // Close the kafka client client first by unsubscribing to this topic
- if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
- unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
- logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
- // Do not return. Continue to try to unsubscribe to other topics.
- } else {
- // Only delete from channel map if successfully unsubscribed.
- delete(kp.topicToRequestHandlerChannelMap, topic)
- }
- }
- if len(unsubscribeFailTopics) > 0 {
- return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
- }
- return nil
-}
-
-func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
- kp.lockTransactionIdToChannelMap.Lock()
- defer kp.lockTransactionIdToChannelMap.Unlock()
- if _, exist := kp.transactionIdToChannelMap[id]; !exist {
- kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
- }
-}
-
-func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
- kp.lockTransactionIdToChannelMap.Lock()
- defer kp.lockTransactionIdToChannelMap.Unlock()
- if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
- // Close the channel first
- close(transChannel.ch)
- delete(kp.transactionIdToChannelMap, id)
- }
-}
-
-func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
- kp.lockTransactionIdToChannelMap.Lock()
- defer kp.lockTransactionIdToChannelMap.Unlock()
- for key, value := range kp.transactionIdToChannelMap {
- if value.topic.Name == id {
- close(value.ch)
- delete(kp.transactionIdToChannelMap, key)
- }
- }
-}
-
-// nolint: unused
-func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap(ctx context.Context) {
- logger.Debug(ctx, "delete-all-transaction-id-channel-map")
- kp.lockTransactionIdToChannelMap.Lock()
- defer kp.lockTransactionIdToChannelMap.Unlock()
- for key, value := range kp.transactionIdToChannelMap {
- close(value.ch)
- delete(kp.transactionIdToChannelMap, key)
- }
-}
-
-func (kp *interContainerProxy) DeleteTopic(ctx context.Context, topic Topic) error {
- // If we have any consumers on that topic we need to close them
- if err := kp.deleteFromTopicResponseChannelMap(ctx, topic.Name); err != nil {
- logger.Errorw(ctx, "delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
- }
- if err := kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name); err != nil {
- logger.Errorw(ctx, "delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
- }
- kp.deleteTopicTransactionIdToChannelMap(topic.Name)
-
- return kp.kafkaClient.DeleteTopic(ctx, &topic)
-}
-
-func encodeReturnedValue(ctx context.Context, returnedVal interface{}) (*any.Any, error) {
- // Encode the response argument - needs to be a proto message
- if returnedVal == nil {
- return nil, nil
- }
- protoValue, ok := returnedVal.(proto.Message)
- if !ok {
- logger.Warnw(ctx, "response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
- err := errors.New("response-value-not-proto-message")
- return nil, err
- }
-
- // Marshal the returned value, if any
- var marshalledReturnedVal *any.Any
- var err error
- if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
- logger.Warnw(ctx, "cannot-marshal-returned-val", log.Fields{"error": err})
- return nil, err
- }
- return marshalledReturnedVal, nil
-}
-
-func encodeDefaultFailedResponse(ctx context.Context, request *ic.InterContainerMessage) *ic.InterContainerMessage {
- responseHeader := &ic.Header{
- Id: request.Header.Id,
- Type: ic.MessageType_RESPONSE,
- FromTopic: request.Header.ToTopic,
- ToTopic: request.Header.FromTopic,
- Timestamp: ptypes.TimestampNow(),
- }
- responseBody := &ic.InterContainerResponseBody{
- Success: false,
- Result: nil,
- }
- var marshalledResponseBody *any.Any
- var err error
- // Error should never happen here
- if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
- logger.Warnw(ctx, "cannot-marshal-failed-response-body", log.Fields{"error": err})
- }
-
- return &ic.InterContainerMessage{
- Header: responseHeader,
- Body: marshalledResponseBody,
- }
-
-}
-
-//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
-//or an error on failure
-func encodeResponse(ctx context.Context, request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
- //logger.Debugw(ctx, "encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
- responseHeader := &ic.Header{
- Id: request.Header.Id,
- Type: ic.MessageType_RESPONSE,
- FromTopic: request.Header.ToTopic,
- ToTopic: request.Header.FromTopic,
- KeyTopic: request.Header.KeyTopic,
- Timestamp: ptypes.TimestampNow(),
- }
-
- // Go over all returned values
- var marshalledReturnedVal *any.Any
- var err error
-
- // for now we support only 1 returned value - (excluding the error)
- if len(returnedValues) > 0 {
- if marshalledReturnedVal, err = encodeReturnedValue(ctx, returnedValues[0]); err != nil {
- logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
- }
- }
-
- responseBody := &ic.InterContainerResponseBody{
- Success: success,
- Result: marshalledReturnedVal,
- }
-
- // Marshal the response body
- var marshalledResponseBody *any.Any
- if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
- logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
- return nil, err
- }
-
- return &ic.InterContainerMessage{
- Header: responseHeader,
- Body: marshalledResponseBody,
- }, nil
-}
-
-func CallFuncByName(ctx context.Context, myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
- myClassValue := reflect.ValueOf(myClass)
- // Capitalize the first letter in the funcName to workaround the first capital letters required to
- // invoke a function from a different package
- funcName = strings.Title(funcName)
- m := myClassValue.MethodByName(funcName)
- if !m.IsValid() {
- return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
- }
- in := make([]reflect.Value, len(params)+1)
- in[0] = reflect.ValueOf(ctx)
- for i, param := range params {
- in[i+1] = reflect.ValueOf(param)
- }
- out = m.Call(in)
- return
-}
-
-func (kp *interContainerProxy) addTransactionId(ctx context.Context, transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
- arg := &KVArg{
- Key: TransactionKey,
- Value: &ic.StrType{Val: transactionId},
- }
-
- var marshalledArg *any.Any
- var err error
- if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
- logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
- return currentArgs
- }
- protoArg := &ic.Argument{
- Key: arg.Key,
- Value: marshalledArg,
- }
- return append(currentArgs, protoArg)
-}
-
-func (kp *interContainerProxy) addFromTopic(ctx context.Context, fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
- var marshalledArg *any.Any
- var err error
- if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
- logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
- return currentArgs
- }
- protoArg := &ic.Argument{
- Key: FromTopic,
- Value: marshalledArg,
- }
- return append(currentArgs, protoArg)
-}
-
-// Method to extract the Span embedded in Kafka RPC request on the receiver side. If span is found embedded in the KV args (with key as "span"),
-// it is de-serialized and injected into the Context to be carried forward by the RPC request processor thread.
-// If no span is found embedded, even then a span is created with name as "kafka-rpc-<rpc-name>" to enrich the Context for RPC calls coming
-// from components currently not sending the span (e.g. openonu adapter)
-func (kp *interContainerProxy) enrichContextWithSpan(ctx context.Context, rpcName string, args []*ic.Argument) (opentracing.Span, context.Context) {
-
- for _, arg := range args {
- if arg.Key == "span" {
- var err error
- var textMapString ic.StrType
- if err = ptypes.UnmarshalAny(arg.Value, &textMapString); err != nil {
- logger.Debug(ctx, "unable-to-unmarshal-kvarg-to-textmap-string", log.Fields{"value": arg.Value})
- break
- }
-
- spanTextMap := make(map[string]string)
- if err = json.Unmarshal([]byte(textMapString.Val), &spanTextMap); err != nil {
- logger.Debug(ctx, "unable-to-unmarshal-textmap-from-json-string", log.Fields{"textMapString": textMapString, "error": err})
- break
- }
-
- var spanContext opentracing.SpanContext
- if spanContext, err = opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(spanTextMap)); err != nil {
- logger.Debug(ctx, "unable-to-deserialize-textmap-to-span", log.Fields{"textMap": spanTextMap, "error": err})
- break
- }
-
- var receivedRpcName string
- extractBaggage := func(k, v string) bool {
- if k == "rpc-span-name" {
- receivedRpcName = v
- return false
- }
- return true
- }
-
- spanContext.ForeachBaggageItem(extractBaggage)
-
- return opentracing.StartSpanFromContext(ctx, receivedRpcName, opentracing.FollowsFrom(spanContext))
- }
- }
-
- // Create new Child Span with rpc as name if no span details were received in kafka arguments
- var spanName strings.Builder
- spanName.WriteString("kafka-")
-
- // In case of inter adapter message, use Msg Type for constructing RPC name
- if rpcName == "process_inter_adapter_message" {
- for _, arg := range args {
- if arg.Key == "msg" {
- iamsg := ic.InterAdapterMessage{}
- if err := ptypes.UnmarshalAny(arg.Value, &iamsg); err == nil {
- spanName.WriteString("inter-adapter-")
- rpcName = iamsg.Header.Type.String()
- }
- }
- }
- }
-
- spanName.WriteString("rpc-")
- spanName.WriteString(rpcName)
-
- return opentracing.StartSpanFromContext(ctx, spanName.String())
-}
-
-func (kp *interContainerProxy) handleMessage(ctx context.Context, msg *ic.InterContainerMessage, targetInterface interface{}) {
-
- // First extract the header to know whether this is a request - responses are handled by a different handler
- if msg.Header.Type == ic.MessageType_REQUEST {
- var out []reflect.Value
- var err error
-
- // Get the request body
- requestBody := &ic.InterContainerRequestBody{}
- if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-request", log.Fields{"error": err})
- } else {
- logger.Debugw(ctx, "received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header, "args": requestBody.Args})
- span, ctx := kp.enrichContextWithSpan(ctx, requestBody.Rpc, requestBody.Args)
- defer span.Finish()
-
- // let the callee unpack the arguments as its the only one that knows the real proto type
- // Augment the requestBody with the message Id as it will be used in scenarios where cores
- // are set in pairs and competing
- requestBody.Args = kp.addTransactionId(ctx, msg.Header.Id, requestBody.Args)
-
- // Augment the requestBody with the From topic name as it will be used in scenarios where a container
- // needs to send an unsollicited message to the currently requested container
- requestBody.Args = kp.addFromTopic(ctx, msg.Header.FromTopic, requestBody.Args)
-
- out, err = CallFuncByName(ctx, targetInterface, requestBody.Rpc, requestBody.Args)
- if err != nil {
- logger.Warn(ctx, err)
- }
- }
- // Response required?
- if requestBody.ResponseRequired {
- // If we already have an error before then just return that
- var returnError *ic.Error
- var returnedValues []interface{}
- var success bool
- if err != nil {
- returnError = &ic.Error{Reason: err.Error()}
- returnedValues = make([]interface{}, 1)
- returnedValues[0] = returnError
- } else {
- returnedValues = make([]interface{}, 0)
- // Check for errors first
- lastIndex := len(out) - 1
- if out[lastIndex].Interface() != nil { // Error
- if retError, ok := out[lastIndex].Interface().(error); ok {
- if retError.Error() == ErrorTransactionNotAcquired.Error() {
- logger.Debugw(ctx, "Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
- return // Ignore - process is in competing mode and ignored transaction
- }
- returnError = &ic.Error{Reason: retError.Error()}
- returnedValues = append(returnedValues, returnError)
- } else { // Should never happen
- returnError = &ic.Error{Reason: "incorrect-error-returns"}
- returnedValues = append(returnedValues, returnError)
- }
- } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
- logger.Warnw(ctx, "Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
- return // Ignore - should not happen
- } else { // Non-error case
- success = true
- for idx, val := range out {
- //logger.Debugw(ctx, "returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
- if idx != lastIndex {
- returnedValues = append(returnedValues, val.Interface())
- }
- }
- }
- }
-
- var icm *ic.InterContainerMessage
- if icm, err = encodeResponse(ctx, msg, success, returnedValues...); err != nil {
- logger.Warnw(ctx, "error-encoding-response-returning-failure-result", log.Fields{"error": err})
- icm = encodeDefaultFailedResponse(ctx, msg)
- }
- // To preserve ordering of messages, all messages to a given topic are sent to the same partition
- // by providing a message key. The key is encoded in the topic name. If the deviceId is not
- // present then the key will be empty, hence all messages for a given topic will be sent to all
- // partitions.
- replyTopic := &Topic{Name: msg.Header.FromTopic}
- key := msg.Header.KeyTopic
- logger.Debugw(ctx, "sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
- // TODO: handle error response.
- go func() {
- if err := kp.kafkaClient.Send(ctx, icm, replyTopic, key); err != nil {
- logger.Errorw(ctx, "send-reply-failed", log.Fields{
- "topic": replyTopic,
- "key": key,
- "error": err})
- }
- }()
- }
- } else if msg.Header.Type == ic.MessageType_RESPONSE {
- logger.Debugw(ctx, "response-received", log.Fields{"msg-header": msg.Header})
- go kp.dispatchResponse(ctx, msg)
- } else {
- logger.Warnw(ctx, "unsupported-message-received", log.Fields{"msg-header": msg.Header})
- }
-}
-
-func (kp *interContainerProxy) waitForMessages(ctx context.Context, ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
- // Wait for messages
- for msg := range ch {
- //logger.Debugw(ctx, "request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
- go kp.handleMessage(context.Background(), msg, targetInterface)
- }
-}
-
-func (kp *interContainerProxy) dispatchResponse(ctx context.Context, msg *ic.InterContainerMessage) {
- kp.lockTransactionIdToChannelMap.RLock()
- defer kp.lockTransactionIdToChannelMap.RUnlock()
- if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
- logger.Debugw(ctx, "no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
- return
- }
- kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
-}
-
-// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
-// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
-// API. There is one response channel waiting for kafka messages before dispatching the message to the
-// corresponding waiting channel
-func (kp *interContainerProxy) subscribeForResponse(ctx context.Context, topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
- logger.Debugw(ctx, "subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
-
- // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
- // broadcast any message for this topic to all channels waiting on it.
- // Set channel size to 1 to prevent deadlock, see VOL-2708
- ch := make(chan *ic.InterContainerMessage, 1)
- kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
-
- return ch, nil
-}
-
-func (kp *interContainerProxy) unSubscribeForResponse(ctx context.Context, trnsId string) error {
- logger.Debugw(ctx, "unsubscribe-for-response", log.Fields{"trnsId": trnsId})
- kp.deleteFromTransactionIdToChannelMap(trnsId)
- return nil
-}
-
-func (kp *interContainerProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
- return kp.kafkaClient.EnableLivenessChannel(ctx, enable)
-}
-
-func (kp *interContainerProxy) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
- return kp.kafkaClient.EnableHealthinessChannel(ctx, enable)
-}
-
-func (kp *interContainerProxy) SendLiveness(ctx context.Context) error {
- return kp.kafkaClient.SendLiveness(ctx)
-}
-
-//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
-//or an error on failure
-func encodeRequest(ctx context.Context, rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
- requestHeader := &ic.Header{
- Id: uuid.New().String(),
- Type: ic.MessageType_REQUEST,
- FromTopic: replyTopic.Name,
- ToTopic: toTopic.Name,
- KeyTopic: key,
- Timestamp: ptypes.TimestampNow(),
- }
- requestBody := &ic.InterContainerRequestBody{
- Rpc: rpc,
- ResponseRequired: true,
- ReplyToTopic: replyTopic.Name,
- }
-
- for _, arg := range kvArgs {
- if arg == nil {
- // In case the caller sends an array with empty args
- continue
- }
- var marshalledArg *any.Any
- var err error
- // ascertain the value interface type is a proto.Message
- protoValue, ok := arg.Value.(proto.Message)
- if !ok {
- logger.Warnw(ctx, "argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
- err := errors.New("argument-value-not-proto-message")
- return nil, err
- }
- if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
- logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
- return nil, err
- }
- protoArg := &ic.Argument{
- Key: arg.Key,
- Value: marshalledArg,
- }
- requestBody.Args = append(requestBody.Args, protoArg)
- }
-
- var marshalledData *any.Any
- var err error
- if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
- logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
- return nil, err
- }
- request := &ic.InterContainerMessage{
- Header: requestHeader,
- Body: marshalledData,
- }
- return request, nil
-}
-
-func decodeResponse(ctx context.Context, response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
- // Extract the message body
- responseBody := ic.InterContainerResponseBody{}
- if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
- logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
- return nil, err
- }
- //logger.Debugw(ctx, "response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
-
- return &responseBody, nil
-
-}
diff --git a/pkg/kafka/kafka_inter_container_library_test.go b/pkg/kafka/kafka_inter_container_library_test.go
deleted file mode 100644
index 8c88750..0000000
--- a/pkg/kafka/kafka_inter_container_library_test.go
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka
-
-import (
- "context"
- "github.com/stretchr/testify/assert"
- "testing"
-)
-
-func TestDefaultKafkaProxy(t *testing.T) {
- actualResult := newInterContainerProxy()
- assert.Equal(t, actualResult.kafkaAddress, DefaultKafkaAddress)
- assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
-}
-
-func TestKafkaProxyOptionAddress(t *testing.T) {
- actualResult := newInterContainerProxy(InterContainerAddress("10.20.30.40:1020"))
- assert.Equal(t, actualResult.kafkaAddress, "10.20.30.40:1020")
- assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
-}
-
-func TestKafkaProxyOptionTopic(t *testing.T) {
- actualResult := newInterContainerProxy(DefaultTopic(&Topic{Name: "Adapter"}))
- assert.Equal(t, actualResult.kafkaAddress, DefaultKafkaAddress)
- assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
- assert.Equal(t, actualResult.defaultTopic.Name, "Adapter")
-}
-
-type myInterface struct {
-}
-
-func TestKafkaProxyOptionTargetInterface(t *testing.T) {
- var m *myInterface
- actualResult := newInterContainerProxy(RequestHandlerInterface(m))
- assert.Equal(t, actualResult.kafkaAddress, DefaultKafkaAddress)
- assert.Equal(t, actualResult.defaultRequestHandlerInterface, m)
-}
-
-func TestKafkaProxyChangeAllOptions(t *testing.T) {
- var m *myInterface
- actualResult := newInterContainerProxy(
- InterContainerAddress("10.20.30.40:1020"),
- DefaultTopic(&Topic{Name: "Adapter"}),
- RequestHandlerInterface(m))
- assert.Equal(t, actualResult.kafkaAddress, "10.20.30.40:1020")
- assert.Equal(t, actualResult.defaultRequestHandlerInterface, m)
- assert.Equal(t, actualResult.defaultTopic.Name, "Adapter")
-}
-
-func TestKafkaProxyEnableLivenessChannel(t *testing.T) {
- var m *myInterface
-
- // Note: This doesn't actually start the client
- client := NewSaramaClient()
-
- probe := newInterContainerProxy(
- InterContainerAddress("10.20.30.40:1020"),
- DefaultTopic(&Topic{Name: "Adapter"}),
- RequestHandlerInterface(m),
- MsgClient(client),
- )
-
- ch := probe.EnableLivenessChannel(context.Background(), true)
-
- // The channel should have one "true" message on it
- assert.NotEmpty(t, ch)
-
- select {
- case stuff := <-ch:
- assert.True(t, stuff)
- default:
- t.Error("Failed to read from the channel")
- }
-}
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index 79827aa..185f6ec 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -27,10 +27,8 @@
scc "github.com/bsm/sarama-cluster"
"github.com/eapache/go-resiliency/breaker"
"github.com/golang/protobuf/proto"
- "github.com/golang/protobuf/ptypes"
"github.com/google/uuid"
- "github.com/opencord/voltha-lib-go/v6/pkg/log"
- ic "github.com/opencord/voltha-protos/v4/go/inter_container"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
)
// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
@@ -38,7 +36,7 @@
//consumer or a group consumer
type consumerChannels struct {
consumers []interface{}
- channels []chan *ic.InterContainerMessage
+ channels []chan proto.Message
}
// static check to ensure SaramaClient implements Client
@@ -378,7 +376,7 @@
// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
// messages from that topic
-func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan proto.Message, error) {
sc.lockTopic(topic)
defer sc.unLockTopic(topic)
@@ -388,13 +386,13 @@
if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
logger.Debugw(ctx, "topic-already-subscribed", log.Fields{"topic": topic.Name})
// Create a channel specific for that consumers and add it to the consumers channel map
- ch := make(chan *ic.InterContainerMessage)
+ ch := make(chan proto.Message)
sc.addChannelToConsumerChannelMap(ctx, topic, ch)
return ch, nil
}
// Register for the topic and set it up
- var consumerListeningChannel chan *ic.InterContainerMessage
+ var consumerListeningChannel chan proto.Message
var err error
// Use the consumerType option to figure out the type of consumer to launch
@@ -441,7 +439,7 @@
}
//UnSubscribe unsubscribe a consumer from a given topic
-func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan *ic.InterContainerMessage) error {
+func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan proto.Message) error {
sc.lockTopic(topic)
defer sc.unLockTopic(topic)
@@ -609,7 +607,7 @@
// without blocking others. The monitor shouldn't really fall
// behind...
sc.liveness = make(chan bool, 10)
- // post intial state to the channel
+ // post initial state to the channel
sc.liveness <- sc.alive
}
} else {
@@ -635,7 +633,7 @@
// without blocking others. The monitor shouldn't really fall
// behind...
sc.healthiness = make(chan bool, 10)
- // post intial state to the channel
+ // post initial state to the channel
sc.healthiness <- sc.healthy
}
} else {
@@ -749,7 +747,7 @@
return nil
}
-func (sc *SaramaClient) addChannelToConsumerChannelMap(ctx context.Context, topic *Topic, ch chan *ic.InterContainerMessage) {
+func (sc *SaramaClient) addChannelToConsumerChannelMap(ctx context.Context, topic *Topic, ch chan proto.Message) {
sc.lockTopicToConsumerChannelMap.Lock()
defer sc.lockTopicToConsumerChannelMap.Unlock()
if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
@@ -788,7 +786,7 @@
return err
}
-func (sc *SaramaClient) removeChannelFromConsumerChannelMap(ctx context.Context, topic Topic, ch <-chan *ic.InterContainerMessage) error {
+func (sc *SaramaClient) removeChannelFromConsumerChannelMap(ctx context.Context, topic Topic, ch <-chan proto.Message) error {
sc.lockTopicToConsumerChannelMap.Lock()
defer sc.lockTopicToConsumerChannelMap.Unlock()
if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
@@ -908,19 +906,18 @@
// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
// topic via the unique channel each subscriber received during subscription
-func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
+func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage proto.Message, fromTopic string, ts time.Time) {
// Need to go over all channels and publish messages to them - do we need to copy msg?
sc.lockTopicToConsumerChannelMap.RLock()
for _, ch := range consumerCh.channels {
- go func(c chan *ic.InterContainerMessage) {
+ go func(c chan proto.Message) {
c <- protoMessage
}(ch)
}
sc.lockTopicToConsumerChannelMap.RUnlock()
if callback := sc.metadataCallback; callback != nil {
- ts, _ := ptypes.Timestamp(protoMessage.Header.Timestamp)
- callback(protoMessage.Header.FromTopic, ts)
+ callback(fromTopic, ts)
}
}
@@ -948,12 +945,12 @@
msgBody := msg.Value
sc.updateLiveness(ctx, true)
logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
- icm := &ic.InterContainerMessage{}
- if err := proto.Unmarshal(msgBody, icm); err != nil {
+ var protoMsg proto.Message
+ if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
logger.Warnw(ctx, "partition-invalid-message", log.Fields{"error": err})
continue
}
- go sc.dispatchToConsumers(consumerChnls, icm)
+ go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
case <-sc.doneCh:
logger.Infow(ctx, "partition-received-exit-signal", log.Fields{"topic": topic.Name})
break startloop
@@ -989,12 +986,12 @@
sc.updateLiveness(ctx, true)
logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
msgBody := msg.Value
- icm := &ic.InterContainerMessage{}
- if err := proto.Unmarshal(msgBody, icm); err != nil {
+ var protoMsg proto.Message
+ if err := proto.Unmarshal(msgBody, protoMsg); err != nil {
logger.Warnw(ctx, "invalid-message", log.Fields{"error": err})
continue
}
- go sc.dispatchToConsumers(consumerChnls, icm)
+ go sc.dispatchToConsumers(consumerChnls, protoMsg, msg.Topic, msg.Timestamp)
consumer.MarkOffset(msg, "")
case ntf := <-consumer.Notifications():
logger.Debugw(ctx, "group-received-notification", log.Fields{"notification": ntf})
@@ -1030,7 +1027,7 @@
//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
//// for that topic. It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupPartitionConsumerChannel(ctx context.Context, topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) setupPartitionConsumerChannel(ctx context.Context, topic *Topic, initialOffset int64) (chan proto.Message, error) {
var pConsumers []sarama.PartitionConsumer
var err error
@@ -1046,10 +1043,10 @@
// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
// unbuffered to verify race conditions.
- consumerListeningChannel := make(chan *ic.InterContainerMessage)
+ consumerListeningChannel := make(chan proto.Message)
cc := &consumerChannels{
consumers: consumersIf,
- channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
+ channels: []chan proto.Message{consumerListeningChannel},
}
// Add the consumers channel to the map
@@ -1069,7 +1066,7 @@
// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
// for that topic. It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan proto.Message, error) {
// TODO: Replace this development partition consumers with a group consumers
var pConsumer *scc.Consumer
var err error
@@ -1079,10 +1076,10 @@
}
// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
// unbuffered to verify race conditions.
- consumerListeningChannel := make(chan *ic.InterContainerMessage)
+ consumerListeningChannel := make(chan proto.Message)
cc := &consumerChannels{
consumers: []interface{}{pConsumer},
- channels: []chan *ic.InterContainerMessage{consumerListeningChannel},
+ channels: []chan proto.Message{consumerListeningChannel},
}
// Add the consumers channel to the map
@@ -1120,9 +1117,9 @@
return pConsumers, nil
}
-func removeChannel(ctx context.Context, channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
+func removeChannel(ctx context.Context, channels []chan proto.Message, ch <-chan proto.Message) []chan proto.Message {
var i int
- var channel chan *ic.InterContainerMessage
+ var channel chan proto.Message
for i, channel = range channels {
if channel == ch {
channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
diff --git a/pkg/kafka/utils.go b/pkg/kafka/utils.go
index bdc615f..608361b 100644
--- a/pkg/kafka/utils.go
+++ b/pkg/kafka/utils.go
@@ -16,8 +16,14 @@
package kafka
import (
- "github.com/golang/protobuf/ptypes/any"
+ "context"
+ "errors"
"strings"
+ "time"
+
+ "github.com/golang/protobuf/ptypes/any"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-lib-go/v7/pkg/probe"
)
const (
@@ -82,3 +88,98 @@
}
return deviceId
}
+
+// WaitUntilKafkaConnectionIsUp waits until the kafka client can establish a connection to the kafka broker or until the
+// context times out.
+func StartAndWaitUntilKafkaConnectionIsUp(ctx context.Context, kClient Client, connectionRetryInterval time.Duration, serviceName string) error {
+ if kClient == nil {
+ return errors.New("kafka-client-is-nil")
+ }
+ for {
+ if err := kClient.Start(ctx); err != nil {
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
+ logger.Warnw(ctx, "kafka-connection-down", log.Fields{"error": err})
+ select {
+ case <-time.After(connectionRetryInterval):
+ continue
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ }
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+ logger.Info(ctx, "kafka-connection-up")
+ break
+ }
+ return nil
+}
+
+/**
+MonitorKafkaReadiness checks the liveliness and readiness of the kafka service
+and update the status in the probe.
+*/
+func MonitorKafkaReadiness(ctx context.Context,
+ kClient Client,
+ liveProbeInterval, notLiveProbeInterval time.Duration,
+ serviceName string) {
+
+ if kClient == nil {
+ logger.Fatal(ctx, "kafka-client-is-nil")
+ }
+
+ logger.Infow(ctx, "monitor-kafka-readiness", log.Fields{"service": serviceName})
+
+ livelinessChannel := kClient.EnableLivenessChannel(ctx, true)
+ healthinessChannel := kClient.EnableHealthinessChannel(ctx, true)
+ timeout := liveProbeInterval
+ failed := false
+ for {
+ timeoutTimer := time.NewTimer(timeout)
+
+ select {
+ case healthiness := <-healthinessChannel:
+ if !healthiness {
+ // This will eventually cause K8s to restart the container, and will do
+ // so in a way that allows cleanup to continue, rather than an immediate
+ // panic and exit here.
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusFailed)
+ logger.Infow(ctx, "kafka-not-healthy", log.Fields{"service": serviceName})
+ failed = true
+ }
+ // Check if the timer has expired or not
+ if !timeoutTimer.Stop() {
+ <-timeoutTimer.C
+ }
+ case liveliness := <-livelinessChannel:
+ if failed {
+ // Failures of the message bus are permanent and can't ever be recovered from,
+ // so make sure we never inadvertently reset a failed state back to unready.
+ } else if !liveliness {
+ // kafka not reachable or down, updating the status to not ready state
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
+ logger.Infow(ctx, "kafka-not-live", log.Fields{"service": serviceName})
+ timeout = notLiveProbeInterval
+ } else {
+ // kafka is reachable , updating the status to running state
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+ timeout = liveProbeInterval
+ }
+ // Check if the timer has expired or not
+ if !timeoutTimer.Stop() {
+ <-timeoutTimer.C
+ }
+ case <-timeoutTimer.C:
+ logger.Infow(ctx, "kafka-proxy-liveness-recheck", log.Fields{"service": serviceName})
+ // send the liveness probe in a goroutine; we don't want to deadlock ourselves as
+ // the liveness probe may wait (and block) writing to our channel.
+ go func() {
+ err := kClient.SendLiveness(ctx)
+ if err != nil {
+ // Catch possible error case if sending liveness after Sarama has been stopped.
+ logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err, "service": serviceName})
+ }
+ }()
+ case <-ctx.Done():
+ return // just exit
+ }
+ }
+}