[VOL-1346] This commit addresses device discovery notifications
which will be principally used by the affinity router. In doing so
this commit also rename the core_adapter.proto to inter_container.proto.
Change-Id: Ib2a7b84efa50367d0ffbc482fba6096a225f3150
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 3ce59a7..c287c10 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -21,7 +21,7 @@
a "github.com/golang/protobuf/ptypes/any"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/kafka"
- ca "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
"github.com/opencord/voltha-go/protos/openflow_13"
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
@@ -43,7 +43,7 @@
if success {
return nil
} else {
- unpackResult := &ca.Error{}
+ unpackResult := &ic.Error{}
var err error
if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
@@ -170,7 +170,7 @@
return unPackResponse(rpc, device.Id, success, result)
}
-func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ca.SwitchCapability, error) {
+func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) {
log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
toTopic := kafka.CreateSubTopic(device.Type, device.Id)
args := make([]*kafka.KVArg, 1)
@@ -183,14 +183,14 @@
success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, args...)
log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
if success {
- unpackResult := &ca.SwitchCapability{}
+ unpackResult := &ic.SwitchCapability{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
}
return unpackResult, nil
} else {
- unpackResult := &ca.Error{}
+ unpackResult := &ic.Error{}
var err error
if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
@@ -201,7 +201,7 @@
}
}
-func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ca.PortCapability, error) {
+func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ic.PortCapability, error) {
log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
toTopic := kafka.CreateSubTopic(device.Type, device.Id)
args := make([]*kafka.KVArg, 2)
@@ -209,7 +209,7 @@
Key: "device",
Value: device,
}
- pNo := &ca.IntType{Val: int64(portNo)}
+ pNo := &ic.IntType{Val: int64(portNo)}
args[1] = &kafka.KVArg{
Key: "port_no",
Value: pNo,
@@ -219,14 +219,14 @@
success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, args...)
log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
if success {
- unpackResult := &ca.PortCapability{}
+ unpackResult := &ic.PortCapability{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
}
return unpackResult, nil
} else {
- unpackResult := &ca.Error{}
+ unpackResult := &ic.Error{}
var err error
if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
@@ -303,13 +303,13 @@
log.Debugw("packetOut", log.Fields{"deviceId": deviceId})
toTopic := kafka.CreateSubTopic(deviceType, deviceId)
rpc := "receive_packet_out"
- dId := &ca.StrType{Val: deviceId}
+ dId := &ic.StrType{Val: deviceId}
args := make([]*kafka.KVArg, 3)
args[0] = &kafka.KVArg{
Key: "deviceId",
Value: dId,
}
- op := &ca.IntType{Val: int64(outPort)}
+ op := &ic.IntType{Val: int64(outPort)}
args[1] = &kafka.KVArg{
Key: "outPort",
Value: op,
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 40563d4..d7e1b0a 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -22,7 +22,7 @@
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
- ca "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -47,7 +47,7 @@
return &proxy
}
-func (rhp *AdapterRequestHandlerProxy) Register(args []*ca.Argument) (*voltha.CoreInstance, error) {
+func (rhp *AdapterRequestHandlerProxy) Register(args []*ic.Argument) (*voltha.CoreInstance, error) {
if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -78,7 +78,7 @@
return &voltha.CoreInstance{InstanceId: rhp.coreInstanceId}, nil
}
-func (rhp *AdapterRequestHandlerProxy) GetDevice(args []*ca.Argument) (*voltha.Device, error) {
+func (rhp *AdapterRequestHandlerProxy) GetDevice(args []*ic.Argument) (*voltha.Device, error) {
if len(args) != 1 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -99,6 +99,7 @@
if device, err := rhp.deviceMgr.GetDevice(pID.Id); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
} else {
+ log.Debugw("GetDevice-response", log.Fields{"deviceId": pID.Id})
return device, nil
}
}
@@ -122,7 +123,7 @@
return cloned, nil
}
-func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(args []*ic.Argument) (*empty.Empty, error) {
if len(args) != 1 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -151,7 +152,7 @@
return new(empty.Empty), nil
}
-func (rhp *AdapterRequestHandlerProxy) GetChildDevice(args []*ca.Argument) (*voltha.Device, error) {
+func (rhp *AdapterRequestHandlerProxy) GetChildDevice(args []*ic.Argument) (*voltha.Device, error) {
if len(args) < 1 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -170,14 +171,14 @@
return nil, nil
}
-func (rhp *AdapterRequestHandlerProxy) GetPorts(args []*ca.Argument) (*voltha.Ports, error) {
+func (rhp *AdapterRequestHandlerProxy) GetPorts(args []*ic.Argument) (*voltha.Ports, error) {
if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
deviceId := &voltha.ID{}
- pt := &ca.IntType{}
+ pt := &ic.IntType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -202,7 +203,7 @@
return rhp.deviceMgr.getPorts(nil, deviceId.Id, voltha.Port_PortType(pt.Val))
}
-func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ca.Argument) (*voltha.Device, error) {
+func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ic.Argument) (*voltha.Device, error) {
if len(args) != 1 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -225,7 +226,7 @@
// ChildDeviceDetected is invoked when a child device is detected. The following
// parameters are expected:
// {parent_device_id, parent_port_no, child_device_type, proxy_address, admin_state, **kw)
-func (rhp *AdapterRequestHandlerProxy) ChildDeviceDetected(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) ChildDeviceDetected(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 4 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -233,9 +234,9 @@
}
pID := &voltha.ID{}
- portNo := &ca.IntType{}
- dt := &ca.StrType{}
- chnlId := &ca.IntType{}
+ portNo := &ic.IntType{}
+ dt := &ic.StrType{}
+ chnlId := &ic.IntType{}
for _, arg := range args {
switch arg.Key {
case "parent_device_id":
@@ -272,15 +273,15 @@
return new(empty.Empty), nil
}
-func (rhp *AdapterRequestHandlerProxy) DeviceStateUpdate(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DeviceStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
deviceId := &voltha.ID{}
- operStatus := &ca.IntType{}
- connStatus := &ca.IntType{}
+ operStatus := &ic.IntType{}
+ connStatus := &ic.IntType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -309,15 +310,15 @@
return new(empty.Empty), nil
}
-func (rhp *AdapterRequestHandlerProxy) ChildrenStateUpdate(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) ChildrenStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
deviceId := &voltha.ID{}
- operStatus := &ca.IntType{}
- connStatus := &ca.IntType{}
+ operStatus := &ic.IntType{}
+ connStatus := &ic.IntType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -347,16 +348,16 @@
return new(empty.Empty), nil
}
-func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
deviceId := &voltha.ID{}
- portType := &ca.IntType{}
- portNo := &ca.IntType{}
- operStatus := &ca.IntType{}
+ portType := &ic.IntType{}
+ portNo := &ic.IntType{}
+ operStatus := &ic.IntType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -390,7 +391,7 @@
return new(empty.Empty), nil
}
-func (rhp *AdapterRequestHandlerProxy) PortCreated(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) PortCreated(args []*ic.Argument) (*empty.Empty, error) {
if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -423,14 +424,14 @@
return new(empty.Empty), nil
}
-func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(args []*ic.Argument) (*empty.Empty, error) {
if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
pmConfigs := &voltha.PmConfigs{}
- init := &ca.BoolType{}
+ init := &ic.BoolType{}
for _, arg := range args {
switch arg.Key {
case "device_pm_config":
@@ -458,15 +459,15 @@
return new(empty.Empty), nil
}
-func (rhp *AdapterRequestHandlerProxy) PacketIn(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) PacketIn(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
deviceId := &voltha.ID{}
- portNo := &ca.IntType{}
- packet := &ca.Packet{}
+ portNo := &ic.IntType{}
+ packet := &ic.Packet{}
for _, arg := range args {
switch arg.Key {
case "device_id":
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 7423563..0908146 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -131,7 +131,8 @@
kafka.InterContainerHost(core.config.KafkaAdapterHost),
kafka.InterContainerPort(core.config.KafkaAdapterPort),
kafka.MsgClient(core.kafkaClient),
- kafka.DefaultTopic(&kafka.Topic{Name: core.config.CoreTopic})); err != nil {
+ kafka.DefaultTopic(&kafka.Topic{Name: core.config.CoreTopic}),
+ kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: core.config.AffinityRouterTopic})); err != nil {
log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
return err
}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 7e7f42a..784b506 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -21,7 +21,7 @@
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
- "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
ofp "github.com/opencord/voltha-go/protos/openflow_13"
"github.com/opencord/voltha-go/protos/voltha"
fu "github.com/opencord/voltha-go/rw_core/utils"
@@ -326,12 +326,12 @@
// getSwitchCapability is a helper method that a logical device agent uses to retrieve the switch capability of a
// parent device
-func (agent *DeviceAgent) getSwitchCapability(ctx context.Context) (*core_adapter.SwitchCapability, error) {
+func (agent *DeviceAgent) getSwitchCapability(ctx context.Context) (*ic.SwitchCapability, error) {
log.Debugw("getSwitchCapability", log.Fields{"deviceId": agent.deviceId})
if device, err := agent.deviceMgr.GetDevice(agent.deviceId); device == nil {
return nil, err
} else {
- var switchCap *core_adapter.SwitchCapability
+ var switchCap *ic.SwitchCapability
var err error
if switchCap, err = agent.adapterProxy.GetOfpDeviceInfo(ctx, device); err != nil {
log.Debugw("getSwitchCapability-error", log.Fields{"id": device.Id, "error": err})
@@ -343,12 +343,12 @@
// getPortCapability is a helper method that a logical device agent uses to retrieve the port capability of a
// device
-func (agent *DeviceAgent) getPortCapability(ctx context.Context, portNo uint32) (*core_adapter.PortCapability, error) {
+func (agent *DeviceAgent) getPortCapability(ctx context.Context, portNo uint32) (*ic.PortCapability, error) {
log.Debugw("getPortCapability", log.Fields{"deviceId": agent.deviceId})
if device, err := agent.deviceMgr.GetDevice(agent.deviceId); device == nil {
return nil, err
} else {
- var portCap *core_adapter.PortCapability
+ var portCap *ic.PortCapability
var err error
if portCap, err = agent.adapterProxy.GetOfpPortInfo(ctx, device, portNo); err != nil {
log.Debugw("getPortCapability-error", log.Fields{"id": device.Id, "error": err})
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 6f4a874..45584a1 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -22,7 +22,7 @@
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/kafka"
- "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
ofp "github.com/opencord/voltha-go/protos/openflow_13"
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
@@ -249,7 +249,7 @@
return status.Errorf(codes.NotFound, "%s", deviceId)
}
-func (dMgr *DeviceManager) getSwitchCapability(ctx context.Context, deviceId string) (*core_adapter.SwitchCapability, error) {
+func (dMgr *DeviceManager) getSwitchCapability(ctx context.Context, deviceId string) (*ic.SwitchCapability, error) {
log.Debugw("getSwitchCapability", log.Fields{"deviceid": deviceId})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
return agent.getSwitchCapability(ctx)
@@ -266,7 +266,7 @@
}
-func (dMgr *DeviceManager) getPortCapability(ctx context.Context, deviceId string, portNo uint32) (*core_adapter.PortCapability, error) {
+func (dMgr *DeviceManager) getPortCapability(ctx context.Context, deviceId string, portNo uint32) (*ic.PortCapability, error) {
log.Debugw("getPortCapability", log.Fields{"deviceid": deviceId})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
return agent.getPortCapability(ctx, portNo)
@@ -340,9 +340,12 @@
// Activate the child device
if agent := dMgr.getDeviceAgent(agent.deviceId); agent != nil {
- return agent.enableDevice(nil)
+ go agent.enableDevice(nil)
}
+ // Publish on the messaging bus that we have discovered new devices
+ go dMgr.kafkaICProxy.DeviceDiscovered(agent.deviceId, deviceType, parentDeviceId)
+
return nil
}
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 60692e5..8a69967 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -22,7 +22,7 @@
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
- ca "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
ofp "github.com/opencord/voltha-go/protos/openflow_13"
"github.com/opencord/voltha-go/protos/voltha"
fd "github.com/opencord/voltha-go/rw_core/flow_decomposition"
@@ -68,7 +68,7 @@
func (agent *LogicalDeviceAgent) start(ctx context.Context) error {
log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
//Build the logical device based on information retrieved from the device adapter
- var switchCap *ca.SwitchCapability
+ var switchCap *ic.SwitchCapability
var err error
if switchCap, err = agent.deviceMgr.getSwitchCapability(ctx, agent.rootDeviceId); err != nil {
log.Errorw("error-creating-logical-device", log.Fields{"error": err})
@@ -88,7 +88,7 @@
if nniPorts, err = agent.deviceMgr.getPorts(ctx, agent.rootDeviceId, voltha.Port_ETHERNET_NNI); err != nil {
log.Errorw("error-creating-logical-port", log.Fields{"error": err})
}
- var portCap *ca.PortCapability
+ var portCap *ic.PortCapability
for _, port := range nniPorts.Items {
log.Infow("!!!!!!!NNI PORTS", log.Fields{"NNI": port})
if portCap, err = agent.deviceMgr.getPortCapability(ctx, agent.rootDeviceId, port.PortNo); err != nil {
@@ -223,7 +223,6 @@
return nil
}
-
// getLogicalDeviceWithoutLock retrieves a logical device from the model without locking it. This is used only by
// functions that have already acquired the logical device lock to the model
func (agent *LogicalDeviceAgent) getLogicalDeviceWithoutLock() (*voltha.LogicalDevice, error) {
@@ -240,7 +239,7 @@
func (agent *LogicalDeviceAgent) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device) error {
log.Infow("addUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
// Build the logical device based on information retrieved from the device adapter
- var portCap *ca.PortCapability
+ var portCap *ic.PortCapability
var err error
//Get UNI port number
@@ -327,7 +326,6 @@
"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceId, groupMod.GetCommand())
}
-
//updateFlowGroupsWithoutLock updates the flows in the logical device without locking the logical device. This function
//must only be called by a function that is holding the lock on the logical device
func (agent *LogicalDeviceAgent) updateFlowGroupsWithoutLock(groups []*ofp.OfpGroupEntry) error {
@@ -737,7 +735,7 @@
func (agent *LogicalDeviceAgent) GetRoute(ingressPortNo uint32, egressPortNo uint32) []graph.RouteHop {
log.Debugw("getting-route", log.Fields{"ingress-port": ingressPortNo, "egress-port": egressPortNo})
// Get the updated logical device
- var ld *ca.LogicalDevice
+ var ld *ic.LogicalDevice
routes := make([]graph.RouteHop, 0)
var err error
if ld, err = agent.getLogicalDeviceWithoutLock(); err != nil {