VOL-2970 - Improved readability & traceability of startup code.
Changed Start() function to implement majority of the startup functionality, with less helpers. Start() also defines local variables for each component created, to avoid accidentally using a component that isn't ready.
Also merged the rwCore into the Core.
Also changed Core to cancel a local context to on shutdown, and then wait for shutdown to complete.
Change-Id: I285e8486773476531e20ec352ff85a1b145432bf
diff --git a/rw_core/core/api/adapter_request_handler.go b/rw_core/core/api/adapter_request_handler.go
index 7c03618..4deca75 100644
--- a/rw_core/core/api/adapter_request_handler.go
+++ b/rw_core/core/api/adapter_request_handler.go
@@ -19,13 +19,10 @@
import (
"context"
"errors"
- "github.com/opencord/voltha-go/rw_core/core/adapter"
- "github.com/opencord/voltha-go/rw_core/core/device"
- "time"
-
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
- "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/rw_core/core/adapter"
+ "github.com/opencord/voltha-go/rw_core/core/device"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
@@ -34,28 +31,16 @@
// AdapterRequestHandlerProxy represent adapter request handler proxy attributes
type AdapterRequestHandlerProxy struct {
- coreInstanceID string
- deviceMgr *device.Manager
- adapterMgr *adapter.Manager
- localDataProxy *model.Proxy
- clusterDataProxy *model.Proxy
- defaultRequestTimeout time.Duration
- longRunningRequestTimeout time.Duration
+ deviceMgr *device.Manager
+ adapterMgr *adapter.Manager
}
// NewAdapterRequestHandlerProxy assigns values for adapter request handler proxy attributes and returns the new instance
-func NewAdapterRequestHandlerProxy(coreInstanceID string, dMgr *device.Manager,
- aMgr *adapter.Manager, cdProxy *model.Proxy, ldProxy *model.Proxy, longRunningRequestTimeout time.Duration,
- defaultRequestTimeout time.Duration) *AdapterRequestHandlerProxy {
- var proxy AdapterRequestHandlerProxy
- proxy.coreInstanceID = coreInstanceID
- proxy.deviceMgr = dMgr
- proxy.clusterDataProxy = cdProxy
- proxy.localDataProxy = ldProxy
- proxy.adapterMgr = aMgr
- proxy.defaultRequestTimeout = defaultRequestTimeout
- proxy.longRunningRequestTimeout = longRunningRequestTimeout
- return &proxy
+func NewAdapterRequestHandlerProxy(dMgr *device.Manager, aMgr *adapter.Manager) *AdapterRequestHandlerProxy {
+ return &AdapterRequestHandlerProxy{
+ deviceMgr: dMgr,
+ adapterMgr: aMgr,
+ }
}
func (rhp *AdapterRequestHandlerProxy) Register(args []*ic.Argument) (*voltha.CoreInstance, error) {
@@ -86,7 +71,7 @@
}
}
}
- logger.Debugw("Register", log.Fields{"adapter": *adapter, "device-types": deviceTypes, "transaction-id": transactionID.Val, "core-id": rhp.coreInstanceID})
+ logger.Debugw("Register", log.Fields{"adapter": *adapter, "device-types": deviceTypes, "transaction-id": transactionID.Val})
return rhp.adapterMgr.RegisterAdapter(adapter, deviceTypes)
}
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index e8b651d..592ccea 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -127,16 +127,12 @@
proxy := model.NewProxy(backend, "/")
nb.adapterMgr = adapter.NewAdapterManager(proxy, nb.coreInstanceID, nb.kClient)
nb.deviceMgr, nb.logicalDeviceMgr = device.NewManagers(proxy, nb.adapterMgr, nb.kmp, endpointMgr, cfg.CorePairTopic, nb.coreInstanceID, cfg.DefaultCoreTimeout)
- if err = nb.adapterMgr.Start(ctx); err != nil {
- logger.Fatalf("Cannot start adapterMgr: %s", err)
- }
- nb.deviceMgr.Start(ctx)
- nb.logicalDeviceMgr.Start(ctx)
+ nb.adapterMgr.Start(ctx)
- if err = nb.kmp.Start(); err != nil {
+ if err := nb.kmp.Start(); err != nil {
logger.Fatalf("Cannot start InterContainerProxy: %s", err)
}
- requestProxy := NewAdapterRequestHandlerProxy(nb.coreInstanceID, nb.deviceMgr, nb.adapterMgr, proxy, proxy, cfg.LongRunningRequestTimeout, cfg.DefaultRequestTimeout)
+ requestProxy := NewAdapterRequestHandlerProxy(nb.deviceMgr, nb.adapterMgr)
if err := nb.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: cfg.CoreTopic}, requestProxy); err != nil {
logger.Fatalf("Cannot add request handler: %s", err)
}
@@ -201,12 +197,6 @@
if nb.kClient != nil {
nb.kClient.Stop()
}
- if nb.logicalDeviceMgr != nil {
- nb.logicalDeviceMgr.Stop(context.Background())
- }
- if nb.deviceMgr != nil {
- nb.deviceMgr.Stop(context.Background())
- }
if nb.kmp != nil {
nb.kmp.Stop()
}