Added core_pair_topic flag. Removed UpdateMembership and GetMembership calls.
This flag must also be added to the helm charts to work correctly.
VOL-1828
Change-Id: Id992c31b04e54468a94cb5bebcb779600f592ecf
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 224b3cb..d82119c 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -17,8 +17,6 @@
import (
"context"
- "errors"
- "fmt"
grpcserver "github.com/opencord/voltha-go/common/grpc"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
@@ -27,9 +25,6 @@
"github.com/opencord/voltha-go/rw_core/config"
"github.com/opencord/voltha-protos/go/voltha"
"google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
- "sync"
)
type Core struct {
@@ -48,8 +43,6 @@
exitChannel chan int
kvClient kvstore.Client
kafkaClient kafka.Client
- coreMembership *voltha.Membership
- membershipLock sync.RWMutex
deviceOwnership *DeviceOwnership
}
@@ -179,53 +172,16 @@
return err
}
+ // Register the core-pair topic to handle core-bound requests destined to the core pair
+ if err := core.kmp.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: core.config.CorePairTopic}, kafka.OffsetNewest); err != nil {
+ log.Fatalw("Failed-registering-pair-handler", log.Fields{"topic": core.config.CorePairTopic})
+ return err
+ }
+
log.Info("request-handler-registered")
return nil
}
-func (core *Core) isMembershipRegistrationComplete() bool {
- core.membershipLock.RLock()
- defer core.membershipLock.RUnlock()
- return core.coreMembership != nil
-}
-
-func (core *Core) updateCoreMembership(ctx context.Context, membership *voltha.Membership) error {
- core.membershipLock.Lock()
- defer core.membershipLock.Unlock()
- // Sanity check - can only register once
- if core.coreMembership != nil {
- log.Warnw("core-reregistration-not-allowed", log.Fields{"membership": membership})
- return status.Errorf(codes.AlreadyExists, "%s", membership.GroupName)
- }
-
- if membership == nil || membership.GroupName == "" {
- return errors.New("invalid-membership-info")
- }
-
- core.coreMembership = membership
-
- // Use the group name to register a specific kafka topic for this container
- go func(groupName string) {
- // Register the core-pair topic to handle core-bound requests destined to the core pair
- pTopic := kafka.Topic{Name: fmt.Sprintf("%s_%s", core.config.CoreTopic, groupName)}
- // TODO: Set a retry here to ensure this subscription happens
- if err := core.kmp.SubscribeWithDefaultRequestHandler(pTopic, kafka.OffsetNewest); err != nil {
- log.Fatalw("Failed-registering-pair-handler", log.Fields{"topic": core.config.CoreTopic})
- }
- // Update the CoreTopic to be used by the adapter proxy
- core.deviceMgr.adapterProxy.updateCoreTopic(&pTopic)
- }(membership.GroupName)
-
- log.Info("core-membership-registered")
- return nil
-}
-
-func (core *Core) getCoreMembership(ctx context.Context) *voltha.Membership {
- core.membershipLock.RLock()
- defer core.membershipLock.RUnlock()
- return core.coreMembership
-}
-
func (core *Core) startDeviceManager(ctx context.Context) {
log.Info("DeviceManager-Starting...")
core.deviceMgr.start(ctx, core.logicalDeviceMgr)