[VOL-1503] Add a GetMembership API to the core

This API is used by the Affinity Router to query the Go Core
for its membership info, it any.

Change-Id: I46aac8579a0452b665175cbd379876a702cce102
diff --git a/common/core/northbound/grpc/default_api_handler.go b/common/core/northbound/grpc/default_api_handler.go
index 30320a2..b6cd185 100644
--- a/common/core/northbound/grpc/default_api_handler.go
+++ b/common/core/northbound/grpc/default_api_handler.go
@@ -42,6 +42,11 @@
 	return nil, errors.New("UnImplemented")
 }
 
+func (handler *DefaultAPIHandler) GetMembership(ctx context.Context, empty *empty.Empty) (*voltha.Membership, error) {
+	log.Debug("GetMembership-request")
+	return nil, errors.New("UnImplemented")
+}
+
 func (handler *DefaultAPIHandler) UpdateMembership(ctx context.Context, membership *voltha.Membership) (*empty.Empty, error) {
 	log.Debugw("UpdateMembership-request", log.Fields{"membership": membership})
 	return nil, errors.New("UnImplemented")
diff --git a/protos/voltha.proto b/protos/voltha.proto
index ef6eedf..7048e62 100644
--- a/protos/voltha.proto
+++ b/protos/voltha.proto
@@ -159,6 +159,13 @@
         };
     }
 
+    // Get the membership group of a Voltha Core
+    rpc GetMembership(google.protobuf.Empty) returns(Membership) {
+        option (google.api.http) = {
+            get: "/api/v1/membership"
+        };
+    }
+
     // Set the membership group of a Voltha Core
     rpc UpdateMembership(Membership) returns(google.protobuf.Empty) {
         option (google.api.http) = {
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index d576a6e..b3bbc71 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -27,6 +27,9 @@
 	"github.com/opencord/voltha-go/protos/voltha"
 	"github.com/opencord/voltha-go/rw_core/config"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"sync"
 )
 
 type Core struct {
@@ -45,7 +48,8 @@
 	exitChannel       chan int
 	kvClient          kvstore.Client
 	kafkaClient       kafka.Client
-	coreMembershipRegistered bool
+	coreMembership *voltha.Membership
+	membershipLock *sync.RWMutex
 }
 
 func init() {
@@ -75,7 +79,7 @@
 	core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
 	core.clusterDataProxy = core.clusterDataRoot.CreateProxy("/", false)
 	core.localDataProxy = core.localDataRoot.CreateProxy("/", false)
-	core.coreMembershipRegistered = false
+	core.membershipLock = &sync.RWMutex{}
 	return &core
 }
 
@@ -188,28 +192,50 @@
 }
 
 
-func (core *Core) registerCoreTopic(ctx context.Context, coreTopicSuffix string) error {
+func (core *Core) isMembershipRegistrationComplete() bool {
+	core.membershipLock.RLock()
+	defer core.membershipLock.RUnlock()
+	return core.coreMembership != nil
+}
+
+func (core *Core) updateCoreMembership(ctx context.Context, membership *voltha.Membership) error {
+	core.membershipLock.Lock()
+	defer core.membershipLock.Unlock()
 	// Sanity check - can only register once
-	if core.coreMembershipRegistered {
-		return errors.New("Can-only-set-once")
+	if core.coreMembership != nil {
+		log.Warnw("core-reregistration-not-allowed", log.Fields{"membership": membership})
+		return status.Errorf(codes.AlreadyExists, "%s", membership.GroupName)
 	}
 
-	go func(coreTopicSuffix string) {
-		// Register the core-pair topic to handle core-bound requests aimed to the core pair
-		pTopic := kafka.Topic{Name: fmt.Sprintf("%s_%s", core.config.CoreTopic, coreTopicSuffix)}
+	if membership == nil || membership.GroupName == "" {
+		return errors.New("invalid-membership-info")
+	}
+
+	core.coreMembership = membership
+
+
+	// Use the group name to register a specific kafka topic for this container
+	go func(groupName string) {
+		// Register the core-pair topic to handle core-bound requests destined to the core pair
+		pTopic := kafka.Topic{Name: fmt.Sprintf("%s_%s", core.config.CoreTopic, groupName)}
 		// TODO: Set a retry here to ensure this subscription happens
 		if err := core.kmp.SubscribeWithDefaultRequestHandler(pTopic, kafka.OffsetNewest); err != nil {
 			log.Fatalw("Failed-registering-pair-handler", log.Fields{"topic": core.config.CoreTopic})
 		}
-		// Update the CoreTopic to use by the adapter proxy
+		// Update the CoreTopic to be used by the adapter proxy
 		core.deviceMgr.adapterProxy.updateCoreTopic(&pTopic)
-	}(coreTopicSuffix)
+	}(membership.GroupName)
 
-	core.coreMembershipRegistered = true
-	log.Info("request-handlers-registered")
+	log.Info("core-membership-registered")
 	return nil
 }
 
+func (core *Core) getCoreMembership(ctx context.Context) *voltha.Membership {
+	core.membershipLock.RLock()
+	defer core.membershipLock.RUnlock()
+	return core.coreMembership
+}
+
 
 func (core *Core) startDeviceManager(ctx context.Context) {
 	// TODO: Interaction between the logicaldevicemanager and devicemanager should mostly occur via
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 46382c2..0bbe5de 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -206,12 +206,21 @@
 func (handler *APIHandler) UpdateMembership(ctx context.Context, membership *voltha.Membership) (*empty.Empty, error) {
 	log.Debugw("UpdateMembership-request", log.Fields{"membership": membership})
 	out := new(empty.Empty)
-	if err := handler.core.registerCoreTopic(ctx, membership.GroupName); err != nil {
+	if err := handler.core.updateCoreMembership(ctx, membership); err != nil {
 		return out, err
 	}
 	return out, nil
 }
 
+func (handler *APIHandler) GetMembership(ctx context.Context, empty *empty.Empty) (*voltha.Membership, error) {
+	log.Debug("GetMembership-request")
+	if membership := handler.core.getCoreMembership(ctx); membership != nil {
+		return membership, nil
+	}
+	return &voltha.Membership{}, nil
+}
+
+
 func (handler *APIHandler) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
 	log.Debugw("EnableLogicalDevicePort-request", log.Fields{"id": id, "test": common.TestModeKeys_api_test.String()})
 	if isTestMode(ctx) {