Added core_pair_topic flag.  Removed UpdateMembership and GetMembership calls.

This flag must also be added to the helm charts to work correctly.

VOL-1828

Change-Id: Id992c31b04e54468a94cb5bebcb779600f592ecf
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 9671f1a..41d96f4 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -51,6 +51,7 @@
 	default_DefaultRequestTimeout     = int64(500)
 	default_CoreTimeout               = int64(500)
 	default_CoreBindingKey            = "voltha_backend_name"
+	default_CorePairTopic             = "rwcore_1"
 )
 
 // RWCoreFlags represents the set of configurations used by the read-write core service
@@ -82,6 +83,7 @@
 	DefaultRequestTimeout     int64
 	DefaultCoreTimeout        int64
 	CoreBindingKey            string
+	CorePairTopic             string
 }
 
 func init() {
@@ -117,6 +119,7 @@
 		LongRunningRequestTimeout: default_LongRunningRequestTimeout,
 		DefaultCoreTimeout:        default_CoreTimeout,
 		CoreBindingKey:            default_CoreBindingKey,
+		CorePairTopic:             default_CorePairTopic,
 	}
 	return &rwCoreFlag
 }
@@ -195,5 +198,8 @@
 	help = fmt.Sprintf("The name of the meta-key whose value is the rw-core group to which the ofagent is bound")
 	flag.StringVar(&(cf.CoreBindingKey), "core_binding_key", default_CoreBindingKey, help)
 
+	help = fmt.Sprintf("Core pairing group topic")
+	flag.StringVar(&cf.CorePairTopic, "core_pair_topic", default_CorePairTopic, help)
+
 	flag.Parse()
 }
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index a606a75..816c179 100755
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -31,15 +31,16 @@
 type AdapterProxy struct {
 	TestMode              bool
 	deviceTopicRegistered bool
-	coreTopic             *kafka.Topic
+	corePairTopic         string
 	kafkaICProxy          *kafka.InterContainerProxy
 }
 
-func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy) *AdapterProxy {
-	var proxy AdapterProxy
-	proxy.kafkaICProxy = kafkaProxy
-	proxy.deviceTopicRegistered = false
-	return &proxy
+func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy, corePairTopic string) *AdapterProxy {
+	return &AdapterProxy{
+		kafkaICProxy:          kafkaProxy,
+		corePairTopic:         corePairTopic,
+		deviceTopicRegistered: false,
+	}
 }
 
 func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
@@ -58,15 +59,8 @@
 	}
 }
 
-func (ap *AdapterProxy) updateCoreTopic(coreTopic *kafka.Topic) {
-	ap.coreTopic = coreTopic
-}
-
 func (ap *AdapterProxy) getCoreTopic() kafka.Topic {
-	if ap.coreTopic != nil {
-		return *ap.coreTopic
-	}
-	return kafka.Topic{Name: ap.kafkaICProxy.DefaultTopic.Name}
+	return kafka.Topic{Name: ap.corePairTopic}
 }
 
 func (ap *AdapterProxy) getAdapterTopic(adapterName string) kafka.Topic {
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 224b3cb..d82119c 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -17,8 +17,6 @@
 
 import (
 	"context"
-	"errors"
-	"fmt"
 	grpcserver "github.com/opencord/voltha-go/common/grpc"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/kvstore"
@@ -27,9 +25,6 @@
 	"github.com/opencord/voltha-go/rw_core/config"
 	"github.com/opencord/voltha-protos/go/voltha"
 	"google.golang.org/grpc"
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
-	"sync"
 )
 
 type Core struct {
@@ -48,8 +43,6 @@
 	exitChannel       chan int
 	kvClient          kvstore.Client
 	kafkaClient       kafka.Client
-	coreMembership    *voltha.Membership
-	membershipLock    sync.RWMutex
 	deviceOwnership   *DeviceOwnership
 }
 
@@ -179,53 +172,16 @@
 		return err
 	}
 
+	// Register the core-pair topic to handle core-bound requests destined to the core pair
+	if err := core.kmp.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: core.config.CorePairTopic}, kafka.OffsetNewest); err != nil {
+		log.Fatalw("Failed-registering-pair-handler", log.Fields{"topic": core.config.CorePairTopic})
+		return err
+	}
+
 	log.Info("request-handler-registered")
 	return nil
 }
 
-func (core *Core) isMembershipRegistrationComplete() bool {
-	core.membershipLock.RLock()
-	defer core.membershipLock.RUnlock()
-	return core.coreMembership != nil
-}
-
-func (core *Core) updateCoreMembership(ctx context.Context, membership *voltha.Membership) error {
-	core.membershipLock.Lock()
-	defer core.membershipLock.Unlock()
-	// Sanity check - can only register once
-	if core.coreMembership != nil {
-		log.Warnw("core-reregistration-not-allowed", log.Fields{"membership": membership})
-		return status.Errorf(codes.AlreadyExists, "%s", membership.GroupName)
-	}
-
-	if membership == nil || membership.GroupName == "" {
-		return errors.New("invalid-membership-info")
-	}
-
-	core.coreMembership = membership
-
-	// Use the group name to register a specific kafka topic for this container
-	go func(groupName string) {
-		// Register the core-pair topic to handle core-bound requests destined to the core pair
-		pTopic := kafka.Topic{Name: fmt.Sprintf("%s_%s", core.config.CoreTopic, groupName)}
-		// TODO: Set a retry here to ensure this subscription happens
-		if err := core.kmp.SubscribeWithDefaultRequestHandler(pTopic, kafka.OffsetNewest); err != nil {
-			log.Fatalw("Failed-registering-pair-handler", log.Fields{"topic": core.config.CoreTopic})
-		}
-		// Update the CoreTopic to be used by the adapter proxy
-		core.deviceMgr.adapterProxy.updateCoreTopic(&pTopic)
-	}(membership.GroupName)
-
-	log.Info("core-membership-registered")
-	return nil
-}
-
-func (core *Core) getCoreMembership(ctx context.Context) *voltha.Membership {
-	core.membershipLock.RLock()
-	defer core.membershipLock.RUnlock()
-	return core.coreMembership
-}
-
 func (core *Core) startDeviceManager(ctx context.Context) {
 	log.Info("DeviceManager-Starting...")
 	core.deviceMgr.start(ctx, core.logicalDeviceMgr)
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index f92645b..68ee082 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -54,7 +54,7 @@
 	deviceMgr.exitChannel = make(chan int, 1)
 	deviceMgr.rootDevices = make(map[string]bool)
 	deviceMgr.kafkaICProxy = core.kmp
-	deviceMgr.adapterProxy = NewAdapterProxy(core.kmp)
+	deviceMgr.adapterProxy = NewAdapterProxy(core.kmp, core.config.CorePairTopic)
 	deviceMgr.coreInstanceId = core.instanceId
 	deviceMgr.clusterDataProxy = core.clusterDataProxy
 	deviceMgr.adapterMgr = core.adapterMgr
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 8180ecd..55d3855 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -207,23 +207,6 @@
 	return out, nil
 }
 
-func (handler *APIHandler) UpdateMembership(ctx context.Context, membership *voltha.Membership) (*empty.Empty, error) {
-	log.Debugw("UpdateMembership-request", log.Fields{"membership": membership})
-	out := new(empty.Empty)
-	if err := handler.core.updateCoreMembership(ctx, membership); err != nil {
-		return out, err
-	}
-	return out, nil
-}
-
-func (handler *APIHandler) GetMembership(ctx context.Context, empty *empty.Empty) (*voltha.Membership, error) {
-	log.Debug("GetMembership-request")
-	if membership := handler.core.getCoreMembership(ctx); membership != nil {
-		return membership, nil
-	}
-	return &voltha.Membership{}, nil
-}
-
 func (handler *APIHandler) GetLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
 	log.Debugw("GetLogicalDevicePort-request", log.Fields{"id": *id})