[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index 26021ca..def6dc9 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -19,21 +19,21 @@
import (
"context"
"errors"
- "github.com/opencord/voltha-lib-go/v5/pkg/probe"
"io"
"strconv"
"strings"
"sync"
"time"
+ "github.com/opencord/voltha-lib-go/v7/pkg/probe"
+
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/core/device/event"
"github.com/opencord/voltha-go/rw_core/utils"
- "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v5/pkg/log"
- "github.com/opencord/voltha-protos/v4/go/openflow_13"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-protos/v5/go/openflow_13"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -43,26 +43,25 @@
*event.Manager
logicalDeviceAgents sync.Map
deviceMgr *Manager
- kafkaICProxy kafka.InterContainerProxy
dbPath *model.Path
ldProxy *model.Proxy
- defaultTimeout time.Duration
+ internalTimeout time.Duration
logicalDevicesLoadingLock sync.RWMutex
logicalDeviceLoadingInProgress map[string][]chan int
}
-func (ldMgr *LogicalManager) Start(ctx context.Context) {
+func (ldMgr *LogicalManager) Start(ctx context.Context, serviceName string) {
logger.Info(ctx, "starting-logical-device-manager")
- probe.UpdateStatusFromContext(ctx, "logical-device-manager", probe.ServiceStatusPreparing)
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusPreparing)
// Load all the logical devices from the dB
var logicalDevices []*voltha.LogicalDevice
if err := ldMgr.ldProxy.List(ctx, &logicalDevices); err != nil {
- logger.Fatalw(ctx, "failed-to-list-logical-devices-from-cluster-proxy", log.Fields{"error": err})
+ logger.Fatalw(ctx, "failed-to-list-logical-devices-from-cluster-proxy", log.Fields{"error": err, "service-name": serviceName})
}
for _, lDevice := range logicalDevices {
// Create an agent for each device
- agent := newLogicalAgent(ctx, lDevice.Id, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.dbPath, ldMgr.ldProxy, ldMgr.defaultTimeout)
+ agent := newLogicalAgent(ctx, lDevice.Id, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.dbPath, ldMgr.ldProxy, ldMgr.internalTimeout)
if err := agent.start(ctx, true, lDevice); err != nil {
logger.Warnw(ctx, "failure-starting-logical-agent", log.Fields{"logical-device-id": lDevice.Id})
} else {
@@ -70,7 +69,7 @@
}
}
- probe.UpdateStatusFromContext(ctx, "logical-device-manager", probe.ServiceStatusRunning)
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
logger.Info(ctx, "logical-device-manager-started")
}
@@ -128,7 +127,9 @@
var logicalDevices []*voltha.LogicalDevice
ldMgr.logicalDeviceAgents.Range(func(key, value interface{}) bool {
if ld, err := value.(*LogicalAgent).GetLogicalDeviceReadOnly(ctx); err == nil {
- logicalDevices = append(logicalDevices, ld)
+ if ld != nil {
+ logicalDevices = append(logicalDevices, ld)
+ }
} else {
logger.Errorw(ctx, "unable-to-get-logical-device", log.Fields{"err": err})
}
@@ -159,7 +160,7 @@
logger.Debugw(ctx, "logical-device-id", log.Fields{"logical-device-id": id})
- agent := newLogicalAgent(ctx, id, sn, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.dbPath, ldMgr.ldProxy, ldMgr.defaultTimeout)
+ agent := newLogicalAgent(ctx, id, sn, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.dbPath, ldMgr.ldProxy, ldMgr.internalTimeout)
ldMgr.addLogicalDeviceAgentToMap(agent)
// Update the root device with the logical device Id reference
@@ -232,7 +233,7 @@
ldMgr.logicalDevicesLoadingLock.Unlock()
if _, err := ldMgr.getLogicalDeviceFromModel(ctx, lDeviceID); err == nil {
logger.Debugw(ctx, "loading-logical-device", log.Fields{"lDeviceId": lDeviceID})
- agent := newLogicalAgent(ctx, lDeviceID, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.dbPath, ldMgr.ldProxy, ldMgr.defaultTimeout)
+ agent := newLogicalAgent(ctx, lDeviceID, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.dbPath, ldMgr.ldProxy, ldMgr.internalTimeout)
if err := agent.start(ctx, true, nil); err != nil {
return err
}
@@ -562,10 +563,10 @@
return &empty.Empty{}, agent.disableLogicalPort(ctx, uint32(portNo))
}
-func (ldMgr *LogicalManager) packetIn(ctx context.Context, logicalDeviceID string, port uint32, transactionID string, packet []byte) error {
+func (ldMgr *LogicalManager) packetIn(ctx context.Context, logicalDeviceID string, port uint32, packet []byte) error {
logger.Debugw(ctx, "packet-in", log.Fields{"logical-device-id": logicalDeviceID, "port": port})
if agent := ldMgr.getLogicalDeviceAgent(ctx, logicalDeviceID); agent != nil {
- agent.packetIn(ctx, port, transactionID, packet)
+ agent.packetIn(ctx, port, packet)
} else {
logger.Error(ctx, "logical-device-not-exist", log.Fields{"logical-device-id": logicalDeviceID})
}