[VOL-1503] Add a GetMembership API to the core
This API is used by the Affinity Router to query the Go Core
for its membership info, it any.
Change-Id: I46aac8579a0452b665175cbd379876a702cce102
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index d576a6e..b3bbc71 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -27,6 +27,9 @@
"github.com/opencord/voltha-go/protos/voltha"
"github.com/opencord/voltha-go/rw_core/config"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "sync"
)
type Core struct {
@@ -45,7 +48,8 @@
exitChannel chan int
kvClient kvstore.Client
kafkaClient kafka.Client
- coreMembershipRegistered bool
+ coreMembership *voltha.Membership
+ membershipLock *sync.RWMutex
}
func init() {
@@ -75,7 +79,7 @@
core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
core.clusterDataProxy = core.clusterDataRoot.CreateProxy("/", false)
core.localDataProxy = core.localDataRoot.CreateProxy("/", false)
- core.coreMembershipRegistered = false
+ core.membershipLock = &sync.RWMutex{}
return &core
}
@@ -188,28 +192,50 @@
}
-func (core *Core) registerCoreTopic(ctx context.Context, coreTopicSuffix string) error {
+func (core *Core) isMembershipRegistrationComplete() bool {
+ core.membershipLock.RLock()
+ defer core.membershipLock.RUnlock()
+ return core.coreMembership != nil
+}
+
+func (core *Core) updateCoreMembership(ctx context.Context, membership *voltha.Membership) error {
+ core.membershipLock.Lock()
+ defer core.membershipLock.Unlock()
// Sanity check - can only register once
- if core.coreMembershipRegistered {
- return errors.New("Can-only-set-once")
+ if core.coreMembership != nil {
+ log.Warnw("core-reregistration-not-allowed", log.Fields{"membership": membership})
+ return status.Errorf(codes.AlreadyExists, "%s", membership.GroupName)
}
- go func(coreTopicSuffix string) {
- // Register the core-pair topic to handle core-bound requests aimed to the core pair
- pTopic := kafka.Topic{Name: fmt.Sprintf("%s_%s", core.config.CoreTopic, coreTopicSuffix)}
+ if membership == nil || membership.GroupName == "" {
+ return errors.New("invalid-membership-info")
+ }
+
+ core.coreMembership = membership
+
+
+ // Use the group name to register a specific kafka topic for this container
+ go func(groupName string) {
+ // Register the core-pair topic to handle core-bound requests destined to the core pair
+ pTopic := kafka.Topic{Name: fmt.Sprintf("%s_%s", core.config.CoreTopic, groupName)}
// TODO: Set a retry here to ensure this subscription happens
if err := core.kmp.SubscribeWithDefaultRequestHandler(pTopic, kafka.OffsetNewest); err != nil {
log.Fatalw("Failed-registering-pair-handler", log.Fields{"topic": core.config.CoreTopic})
}
- // Update the CoreTopic to use by the adapter proxy
+ // Update the CoreTopic to be used by the adapter proxy
core.deviceMgr.adapterProxy.updateCoreTopic(&pTopic)
- }(coreTopicSuffix)
+ }(membership.GroupName)
- core.coreMembershipRegistered = true
- log.Info("request-handlers-registered")
+ log.Info("core-membership-registered")
return nil
}
+func (core *Core) getCoreMembership(ctx context.Context) *voltha.Membership {
+ core.membershipLock.RLock()
+ defer core.membershipLock.RUnlock()
+ return core.coreMembership
+}
+
func (core *Core) startDeviceManager(ctx context.Context) {
// TODO: Interaction between the logicaldevicemanager and devicemanager should mostly occur via