[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index b9ad95a..cbf2625 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -26,16 +26,16 @@
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/core/device/flow"
"github.com/opencord/voltha-go/rw_core/core/device/group"
- "github.com/opencord/voltha-go/rw_core/core/device/logical_port"
+ lp "github.com/opencord/voltha-go/rw_core/core/device/logical_port"
"github.com/opencord/voltha-go/rw_core/core/device/meter"
fd "github.com/opencord/voltha-go/rw_core/flowdecomposition"
"github.com/opencord/voltha-go/rw_core/route"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
- fu "github.com/opencord/voltha-lib-go/v5/pkg/flows"
- "github.com/opencord/voltha-lib-go/v5/pkg/log"
- ic "github.com/opencord/voltha-protos/v4/go/inter_container"
- ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ fu "github.com/opencord/voltha-lib-go/v7/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+ ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -51,7 +51,7 @@
stopped bool
deviceRoutes *route.DeviceRoutes
flowDecomposer *fd.FlowDecomposer
- defaultTimeout time.Duration
+ internalTimeout time.Duration
logicalDevice *voltha.LogicalDevice
requestQueue *coreutils.RequestQueue
orderedEvents orderedEvents
@@ -61,11 +61,11 @@
flowCache *flow.Cache
meterLoader *meter.Loader
groupCache *group.Cache
- portLoader *port.Loader
+ portLoader *lp.Loader
}
func newLogicalAgent(ctx context.Context, id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
- deviceMgr *Manager, dbProxy *model.Path, ldProxy *model.Proxy, defaultTimeout time.Duration) *LogicalAgent {
+ deviceMgr *Manager, dbProxy *model.Path, ldProxy *model.Proxy, internalTimeout time.Duration) *LogicalAgent {
return &LogicalAgent{
logicalDeviceID: id,
serialNumber: sn,
@@ -75,13 +75,13 @@
ldeviceMgr: ldeviceMgr,
deviceRoutes: route.NewDeviceRoutes(id, deviceID, deviceMgr.listDevicePorts),
flowDecomposer: fd.NewFlowDecomposer(deviceMgr.getDeviceReadOnly),
- defaultTimeout: defaultTimeout,
+ internalTimeout: internalTimeout,
requestQueue: coreutils.NewRequestQueue(),
flowCache: flow.NewCache(),
groupCache: group.NewCache(),
meterLoader: meter.NewLoader(dbProxy.SubPath("logical_meters").Proxy(id)),
- portLoader: port.NewLoader(dbProxy.SubPath("logical_ports").Proxy(id)),
+ portLoader: lp.NewLoader(dbProxy.SubPath("logical_ports").Proxy(id)),
}
}
@@ -196,7 +196,7 @@
return
}
defer agent.requestQueue.RequestComplete()
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.internalTimeout)
// Before deletion of the logical agent, make sure all events for ldagent are sent to avoid race conditions
if err := agent.orderedEvents.waitForAllEventsToBeSent(subCtx, cancel); err != nil {
//Log the error here
@@ -236,7 +236,7 @@
response := coreutils.NewResponse()
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.internalTimeout)
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
defer cancel()
@@ -272,7 +272,7 @@
response := coreutils.NewResponse()
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.internalTimeout)
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
defer cancel()
@@ -306,7 +306,7 @@
response := coreutils.NewResponse()
responses = append(responses, response)
go func(deviceId string, value *fu.FlowsAndGroups) {
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.internalTimeout)
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
defer cancel()
@@ -342,7 +342,7 @@
}
logger.Debugw(ctx, "uni-port", log.Fields{"flows": flows, "uni-port": uniPort})
go func(uniPort uint32, metadata *voltha.FlowMetadata) {
- subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.internalTimeout)
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
defer cancel()
@@ -375,16 +375,15 @@
}
}
-func (agent *LogicalAgent) packetIn(ctx context.Context, port uint32, transactionID string, packet []byte) {
+func (agent *LogicalAgent) packetIn(ctx context.Context, port uint32, packet []byte) {
if logger.V(log.InfoLevel) {
- logger.Debugw(ctx, "packet-in", log.Fields{
- "port": port,
- "packet": hex.EncodeToString(packet),
- "transactionId": transactionID,
+ logger.Infow(ctx, "packet-in", log.Fields{
+ "port": port,
+ "packet": hex.EncodeToString(packet),
})
}
packetIn := fu.MkPacketIn(port, packet)
- agent.ldeviceMgr.SendPacketIn(ctx, agent.logicalDeviceID, transactionID, packetIn)
+ agent.ldeviceMgr.SendPacketIn(ctx, agent.logicalDeviceID, packetIn)
logger.Debugw(ctx, "sending-packet-in", log.Fields{"packet": hex.EncodeToString(packetIn.Data)})
}