Added core_pair_topic flag. Removed UpdateMembership and GetMembership calls.
This flag must also be added to the helm charts to work correctly.
VOL-1828
Change-Id: Id992c31b04e54468a94cb5bebcb779600f592ecf
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 9671f1a..41d96f4 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -51,6 +51,7 @@
default_DefaultRequestTimeout = int64(500)
default_CoreTimeout = int64(500)
default_CoreBindingKey = "voltha_backend_name"
+ default_CorePairTopic = "rwcore_1"
)
// RWCoreFlags represents the set of configurations used by the read-write core service
@@ -82,6 +83,7 @@
DefaultRequestTimeout int64
DefaultCoreTimeout int64
CoreBindingKey string
+ CorePairTopic string
}
func init() {
@@ -117,6 +119,7 @@
LongRunningRequestTimeout: default_LongRunningRequestTimeout,
DefaultCoreTimeout: default_CoreTimeout,
CoreBindingKey: default_CoreBindingKey,
+ CorePairTopic: default_CorePairTopic,
}
return &rwCoreFlag
}
@@ -195,5 +198,8 @@
help = fmt.Sprintf("The name of the meta-key whose value is the rw-core group to which the ofagent is bound")
flag.StringVar(&(cf.CoreBindingKey), "core_binding_key", default_CoreBindingKey, help)
+ help = fmt.Sprintf("Core pairing group topic")
+ flag.StringVar(&cf.CorePairTopic, "core_pair_topic", default_CorePairTopic, help)
+
flag.Parse()
}
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index a606a75..816c179 100755
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -31,15 +31,16 @@
type AdapterProxy struct {
TestMode bool
deviceTopicRegistered bool
- coreTopic *kafka.Topic
+ corePairTopic string
kafkaICProxy *kafka.InterContainerProxy
}
-func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy) *AdapterProxy {
- var proxy AdapterProxy
- proxy.kafkaICProxy = kafkaProxy
- proxy.deviceTopicRegistered = false
- return &proxy
+func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy, corePairTopic string) *AdapterProxy {
+ return &AdapterProxy{
+ kafkaICProxy: kafkaProxy,
+ corePairTopic: corePairTopic,
+ deviceTopicRegistered: false,
+ }
}
func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
@@ -58,15 +59,8 @@
}
}
-func (ap *AdapterProxy) updateCoreTopic(coreTopic *kafka.Topic) {
- ap.coreTopic = coreTopic
-}
-
func (ap *AdapterProxy) getCoreTopic() kafka.Topic {
- if ap.coreTopic != nil {
- return *ap.coreTopic
- }
- return kafka.Topic{Name: ap.kafkaICProxy.DefaultTopic.Name}
+ return kafka.Topic{Name: ap.corePairTopic}
}
func (ap *AdapterProxy) getAdapterTopic(adapterName string) kafka.Topic {
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 224b3cb..d82119c 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -17,8 +17,6 @@
import (
"context"
- "errors"
- "fmt"
grpcserver "github.com/opencord/voltha-go/common/grpc"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
@@ -27,9 +25,6 @@
"github.com/opencord/voltha-go/rw_core/config"
"github.com/opencord/voltha-protos/go/voltha"
"google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
- "sync"
)
type Core struct {
@@ -48,8 +43,6 @@
exitChannel chan int
kvClient kvstore.Client
kafkaClient kafka.Client
- coreMembership *voltha.Membership
- membershipLock sync.RWMutex
deviceOwnership *DeviceOwnership
}
@@ -179,53 +172,16 @@
return err
}
+ // Register the core-pair topic to handle core-bound requests destined to the core pair
+ if err := core.kmp.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: core.config.CorePairTopic}, kafka.OffsetNewest); err != nil {
+ log.Fatalw("Failed-registering-pair-handler", log.Fields{"topic": core.config.CorePairTopic})
+ return err
+ }
+
log.Info("request-handler-registered")
return nil
}
-func (core *Core) isMembershipRegistrationComplete() bool {
- core.membershipLock.RLock()
- defer core.membershipLock.RUnlock()
- return core.coreMembership != nil
-}
-
-func (core *Core) updateCoreMembership(ctx context.Context, membership *voltha.Membership) error {
- core.membershipLock.Lock()
- defer core.membershipLock.Unlock()
- // Sanity check - can only register once
- if core.coreMembership != nil {
- log.Warnw("core-reregistration-not-allowed", log.Fields{"membership": membership})
- return status.Errorf(codes.AlreadyExists, "%s", membership.GroupName)
- }
-
- if membership == nil || membership.GroupName == "" {
- return errors.New("invalid-membership-info")
- }
-
- core.coreMembership = membership
-
- // Use the group name to register a specific kafka topic for this container
- go func(groupName string) {
- // Register the core-pair topic to handle core-bound requests destined to the core pair
- pTopic := kafka.Topic{Name: fmt.Sprintf("%s_%s", core.config.CoreTopic, groupName)}
- // TODO: Set a retry here to ensure this subscription happens
- if err := core.kmp.SubscribeWithDefaultRequestHandler(pTopic, kafka.OffsetNewest); err != nil {
- log.Fatalw("Failed-registering-pair-handler", log.Fields{"topic": core.config.CoreTopic})
- }
- // Update the CoreTopic to be used by the adapter proxy
- core.deviceMgr.adapterProxy.updateCoreTopic(&pTopic)
- }(membership.GroupName)
-
- log.Info("core-membership-registered")
- return nil
-}
-
-func (core *Core) getCoreMembership(ctx context.Context) *voltha.Membership {
- core.membershipLock.RLock()
- defer core.membershipLock.RUnlock()
- return core.coreMembership
-}
-
func (core *Core) startDeviceManager(ctx context.Context) {
log.Info("DeviceManager-Starting...")
core.deviceMgr.start(ctx, core.logicalDeviceMgr)
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index f92645b..68ee082 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -54,7 +54,7 @@
deviceMgr.exitChannel = make(chan int, 1)
deviceMgr.rootDevices = make(map[string]bool)
deviceMgr.kafkaICProxy = core.kmp
- deviceMgr.adapterProxy = NewAdapterProxy(core.kmp)
+ deviceMgr.adapterProxy = NewAdapterProxy(core.kmp, core.config.CorePairTopic)
deviceMgr.coreInstanceId = core.instanceId
deviceMgr.clusterDataProxy = core.clusterDataProxy
deviceMgr.adapterMgr = core.adapterMgr
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 8180ecd..55d3855 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -207,23 +207,6 @@
return out, nil
}
-func (handler *APIHandler) UpdateMembership(ctx context.Context, membership *voltha.Membership) (*empty.Empty, error) {
- log.Debugw("UpdateMembership-request", log.Fields{"membership": membership})
- out := new(empty.Empty)
- if err := handler.core.updateCoreMembership(ctx, membership); err != nil {
- return out, err
- }
- return out, nil
-}
-
-func (handler *APIHandler) GetMembership(ctx context.Context, empty *empty.Empty) (*voltha.Membership, error) {
- log.Debug("GetMembership-request")
- if membership := handler.core.getCoreMembership(ctx); membership != nil {
- return membership, nil
- }
- return &voltha.Membership{}, nil
-}
-
func (handler *APIHandler) GetLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
log.Debugw("GetLogicalDevicePort-request", log.Fields{"id": *id})