[VOL-1346] This commit addresses device discovery notifications
which will be principally used by the affinity router. In doing so
this commit also rename the core_adapter.proto to inter_container.proto.
Change-Id: Ib2a7b84efa50367d0ffbc482fba6096a225f3150
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index e0d0fe6..f7ac794 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -24,51 +24,53 @@
// RW Core service default constants
const (
- ConsulStoreName = "consul"
- EtcdStoreName = "etcd"
- default_InstanceID = "rwcore001"
- default_GrpcPort = 50057
- default_GrpcHost = ""
- default_KafkaAdapterHost = "127.0.0.1"
- default_KafkaAdapterPort = 9092
- default_KafkaClusterHost = "127.0.0.1"
- default_KafkaClusterPort = 9094
- default_KVStoreType = EtcdStoreName
- default_KVStoreTimeout = 5 //in seconds
- default_KVStoreHost = "127.0.0.1"
- default_KVStorePort = 2379 // Consul = 8500; Etcd = 2379
- default_KVTxnKeyDelTime = 60
- default_LogLevel = 0
- default_Banner = false
- default_CoreTopic = "rwcore"
- default_RWCoreEndpoint = "rwcore"
- default_RWCoreKey = "pki/voltha.key"
- default_RWCoreCert = "pki/voltha.crt"
- default_RWCoreCA = "pki/voltha-CA.pem"
+ ConsulStoreName = "consul"
+ EtcdStoreName = "etcd"
+ default_InstanceID = "rwcore001"
+ default_GrpcPort = 50057
+ default_GrpcHost = ""
+ default_KafkaAdapterHost = "127.0.0.1"
+ default_KafkaAdapterPort = 9092
+ default_KafkaClusterHost = "127.0.0.1"
+ default_KafkaClusterPort = 9094
+ default_KVStoreType = EtcdStoreName
+ default_KVStoreTimeout = 5 //in seconds
+ default_KVStoreHost = "127.0.0.1"
+ default_KVStorePort = 2379 // Consul = 8500; Etcd = 2379
+ default_KVTxnKeyDelTime = 60
+ default_LogLevel = 0
+ default_Banner = false
+ default_CoreTopic = "rwcore"
+ default_RWCoreEndpoint = "rwcore"
+ default_RWCoreKey = "pki/voltha.key"
+ default_RWCoreCert = "pki/voltha.crt"
+ default_RWCoreCA = "pki/voltha-CA.pem"
+ default_Affinity_Router_Topic = "affinityRouter"
)
// RWCoreFlags represents the set of configurations used by the read-write core service
type RWCoreFlags struct {
// Command line parameters
- InstanceID string
- RWCoreEndpoint string
- GrpcHost string
- GrpcPort int
- KafkaAdapterHost string
- KafkaAdapterPort int
- KafkaClusterHost string
- KafkaClusterPort int
- KVStoreType string
- KVStoreTimeout int // in seconds
- KVStoreHost string
- KVStorePort int
- KVTxnKeyDelTime int
- CoreTopic string
- LogLevel int
- Banner bool
- RWCoreKey string
- RWCoreCert string
- RWCoreCA string
+ InstanceID string
+ RWCoreEndpoint string
+ GrpcHost string
+ GrpcPort int
+ KafkaAdapterHost string
+ KafkaAdapterPort int
+ KafkaClusterHost string
+ KafkaClusterPort int
+ KVStoreType string
+ KVStoreTimeout int // in seconds
+ KVStoreHost string
+ KVStorePort int
+ KVTxnKeyDelTime int
+ CoreTopic string
+ LogLevel int
+ Banner bool
+ RWCoreKey string
+ RWCoreCert string
+ RWCoreCA string
+ AffinityRouterTopic string
}
func init() {
@@ -78,25 +80,26 @@
// NewRWCoreFlags returns a new RWCore config
func NewRWCoreFlags() *RWCoreFlags {
var rwCoreFlag = RWCoreFlags{ // Default values
- InstanceID: default_InstanceID,
- RWCoreEndpoint: default_RWCoreEndpoint,
- GrpcHost: default_GrpcHost,
- GrpcPort: default_GrpcPort,
- KafkaAdapterHost: default_KafkaAdapterHost,
- KafkaAdapterPort: default_KafkaAdapterPort,
- KafkaClusterHost: default_KafkaClusterHost,
- KafkaClusterPort: default_KafkaClusterPort,
- KVStoreType: default_KVStoreType,
- KVStoreTimeout: default_KVStoreTimeout,
- KVStoreHost: default_KVStoreHost,
- KVStorePort: default_KVStorePort,
- KVTxnKeyDelTime: default_KVTxnKeyDelTime,
- CoreTopic: default_CoreTopic,
- LogLevel: default_LogLevel,
- Banner: default_Banner,
- RWCoreKey: default_RWCoreKey,
- RWCoreCert: default_RWCoreCert,
- RWCoreCA: default_RWCoreCA,
+ InstanceID: default_InstanceID,
+ RWCoreEndpoint: default_RWCoreEndpoint,
+ GrpcHost: default_GrpcHost,
+ GrpcPort: default_GrpcPort,
+ KafkaAdapterHost: default_KafkaAdapterHost,
+ KafkaAdapterPort: default_KafkaAdapterPort,
+ KafkaClusterHost: default_KafkaClusterHost,
+ KafkaClusterPort: default_KafkaClusterPort,
+ KVStoreType: default_KVStoreType,
+ KVStoreTimeout: default_KVStoreTimeout,
+ KVStoreHost: default_KVStoreHost,
+ KVStorePort: default_KVStorePort,
+ KVTxnKeyDelTime: default_KVTxnKeyDelTime,
+ CoreTopic: default_CoreTopic,
+ LogLevel: default_LogLevel,
+ Banner: default_Banner,
+ RWCoreKey: default_RWCoreKey,
+ RWCoreCert: default_RWCoreCert,
+ RWCoreCA: default_RWCoreCA,
+ AffinityRouterTopic: default_Affinity_Router_Topic,
}
return &rwCoreFlag
}
@@ -130,6 +133,9 @@
help = fmt.Sprintf("RW Core topic")
flag.StringVar(&(cf.CoreTopic), "rw_core_topic", default_CoreTopic, help)
+ help = fmt.Sprintf("Affinity Router topic")
+ flag.StringVar(&(cf.AffinityRouterTopic), "affinity_router_topic", default_Affinity_Router_Topic, help)
+
help = fmt.Sprintf("KV store type")
flag.StringVar(&(cf.KVStoreType), "kv_store_type", default_KVStoreType, help)
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 3ce59a7..c287c10 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -21,7 +21,7 @@
a "github.com/golang/protobuf/ptypes/any"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/kafka"
- ca "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
"github.com/opencord/voltha-go/protos/openflow_13"
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
@@ -43,7 +43,7 @@
if success {
return nil
} else {
- unpackResult := &ca.Error{}
+ unpackResult := &ic.Error{}
var err error
if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
@@ -170,7 +170,7 @@
return unPackResponse(rpc, device.Id, success, result)
}
-func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ca.SwitchCapability, error) {
+func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) {
log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
toTopic := kafka.CreateSubTopic(device.Type, device.Id)
args := make([]*kafka.KVArg, 1)
@@ -183,14 +183,14 @@
success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, args...)
log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
if success {
- unpackResult := &ca.SwitchCapability{}
+ unpackResult := &ic.SwitchCapability{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
}
return unpackResult, nil
} else {
- unpackResult := &ca.Error{}
+ unpackResult := &ic.Error{}
var err error
if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
@@ -201,7 +201,7 @@
}
}
-func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ca.PortCapability, error) {
+func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ic.PortCapability, error) {
log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
toTopic := kafka.CreateSubTopic(device.Type, device.Id)
args := make([]*kafka.KVArg, 2)
@@ -209,7 +209,7 @@
Key: "device",
Value: device,
}
- pNo := &ca.IntType{Val: int64(portNo)}
+ pNo := &ic.IntType{Val: int64(portNo)}
args[1] = &kafka.KVArg{
Key: "port_no",
Value: pNo,
@@ -219,14 +219,14 @@
success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, args...)
log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
if success {
- unpackResult := &ca.PortCapability{}
+ unpackResult := &ic.PortCapability{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
}
return unpackResult, nil
} else {
- unpackResult := &ca.Error{}
+ unpackResult := &ic.Error{}
var err error
if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
@@ -303,13 +303,13 @@
log.Debugw("packetOut", log.Fields{"deviceId": deviceId})
toTopic := kafka.CreateSubTopic(deviceType, deviceId)
rpc := "receive_packet_out"
- dId := &ca.StrType{Val: deviceId}
+ dId := &ic.StrType{Val: deviceId}
args := make([]*kafka.KVArg, 3)
args[0] = &kafka.KVArg{
Key: "deviceId",
Value: dId,
}
- op := &ca.IntType{Val: int64(outPort)}
+ op := &ic.IntType{Val: int64(outPort)}
args[1] = &kafka.KVArg{
Key: "outPort",
Value: op,
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 40563d4..d7e1b0a 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -22,7 +22,7 @@
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
- ca "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -47,7 +47,7 @@
return &proxy
}
-func (rhp *AdapterRequestHandlerProxy) Register(args []*ca.Argument) (*voltha.CoreInstance, error) {
+func (rhp *AdapterRequestHandlerProxy) Register(args []*ic.Argument) (*voltha.CoreInstance, error) {
if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -78,7 +78,7 @@
return &voltha.CoreInstance{InstanceId: rhp.coreInstanceId}, nil
}
-func (rhp *AdapterRequestHandlerProxy) GetDevice(args []*ca.Argument) (*voltha.Device, error) {
+func (rhp *AdapterRequestHandlerProxy) GetDevice(args []*ic.Argument) (*voltha.Device, error) {
if len(args) != 1 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -99,6 +99,7 @@
if device, err := rhp.deviceMgr.GetDevice(pID.Id); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
} else {
+ log.Debugw("GetDevice-response", log.Fields{"deviceId": pID.Id})
return device, nil
}
}
@@ -122,7 +123,7 @@
return cloned, nil
}
-func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(args []*ic.Argument) (*empty.Empty, error) {
if len(args) != 1 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -151,7 +152,7 @@
return new(empty.Empty), nil
}
-func (rhp *AdapterRequestHandlerProxy) GetChildDevice(args []*ca.Argument) (*voltha.Device, error) {
+func (rhp *AdapterRequestHandlerProxy) GetChildDevice(args []*ic.Argument) (*voltha.Device, error) {
if len(args) < 1 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -170,14 +171,14 @@
return nil, nil
}
-func (rhp *AdapterRequestHandlerProxy) GetPorts(args []*ca.Argument) (*voltha.Ports, error) {
+func (rhp *AdapterRequestHandlerProxy) GetPorts(args []*ic.Argument) (*voltha.Ports, error) {
if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
deviceId := &voltha.ID{}
- pt := &ca.IntType{}
+ pt := &ic.IntType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -202,7 +203,7 @@
return rhp.deviceMgr.getPorts(nil, deviceId.Id, voltha.Port_PortType(pt.Val))
}
-func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ca.Argument) (*voltha.Device, error) {
+func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ic.Argument) (*voltha.Device, error) {
if len(args) != 1 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -225,7 +226,7 @@
// ChildDeviceDetected is invoked when a child device is detected. The following
// parameters are expected:
// {parent_device_id, parent_port_no, child_device_type, proxy_address, admin_state, **kw)
-func (rhp *AdapterRequestHandlerProxy) ChildDeviceDetected(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) ChildDeviceDetected(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 4 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -233,9 +234,9 @@
}
pID := &voltha.ID{}
- portNo := &ca.IntType{}
- dt := &ca.StrType{}
- chnlId := &ca.IntType{}
+ portNo := &ic.IntType{}
+ dt := &ic.StrType{}
+ chnlId := &ic.IntType{}
for _, arg := range args {
switch arg.Key {
case "parent_device_id":
@@ -272,15 +273,15 @@
return new(empty.Empty), nil
}
-func (rhp *AdapterRequestHandlerProxy) DeviceStateUpdate(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DeviceStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
deviceId := &voltha.ID{}
- operStatus := &ca.IntType{}
- connStatus := &ca.IntType{}
+ operStatus := &ic.IntType{}
+ connStatus := &ic.IntType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -309,15 +310,15 @@
return new(empty.Empty), nil
}
-func (rhp *AdapterRequestHandlerProxy) ChildrenStateUpdate(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) ChildrenStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
deviceId := &voltha.ID{}
- operStatus := &ca.IntType{}
- connStatus := &ca.IntType{}
+ operStatus := &ic.IntType{}
+ connStatus := &ic.IntType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -347,16 +348,16 @@
return new(empty.Empty), nil
}
-func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
deviceId := &voltha.ID{}
- portType := &ca.IntType{}
- portNo := &ca.IntType{}
- operStatus := &ca.IntType{}
+ portType := &ic.IntType{}
+ portNo := &ic.IntType{}
+ operStatus := &ic.IntType{}
for _, arg := range args {
switch arg.Key {
case "device_id":
@@ -390,7 +391,7 @@
return new(empty.Empty), nil
}
-func (rhp *AdapterRequestHandlerProxy) PortCreated(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) PortCreated(args []*ic.Argument) (*empty.Empty, error) {
if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -423,14 +424,14 @@
return new(empty.Empty), nil
}
-func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(args []*ic.Argument) (*empty.Empty, error) {
if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
pmConfigs := &voltha.PmConfigs{}
- init := &ca.BoolType{}
+ init := &ic.BoolType{}
for _, arg := range args {
switch arg.Key {
case "device_pm_config":
@@ -458,15 +459,15 @@
return new(empty.Empty), nil
}
-func (rhp *AdapterRequestHandlerProxy) PacketIn(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) PacketIn(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
deviceId := &voltha.ID{}
- portNo := &ca.IntType{}
- packet := &ca.Packet{}
+ portNo := &ic.IntType{}
+ packet := &ic.Packet{}
for _, arg := range args {
switch arg.Key {
case "device_id":
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 7423563..0908146 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -131,7 +131,8 @@
kafka.InterContainerHost(core.config.KafkaAdapterHost),
kafka.InterContainerPort(core.config.KafkaAdapterPort),
kafka.MsgClient(core.kafkaClient),
- kafka.DefaultTopic(&kafka.Topic{Name: core.config.CoreTopic})); err != nil {
+ kafka.DefaultTopic(&kafka.Topic{Name: core.config.CoreTopic}),
+ kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: core.config.AffinityRouterTopic})); err != nil {
log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
return err
}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 7e7f42a..784b506 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -21,7 +21,7 @@
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
- "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
ofp "github.com/opencord/voltha-go/protos/openflow_13"
"github.com/opencord/voltha-go/protos/voltha"
fu "github.com/opencord/voltha-go/rw_core/utils"
@@ -326,12 +326,12 @@
// getSwitchCapability is a helper method that a logical device agent uses to retrieve the switch capability of a
// parent device
-func (agent *DeviceAgent) getSwitchCapability(ctx context.Context) (*core_adapter.SwitchCapability, error) {
+func (agent *DeviceAgent) getSwitchCapability(ctx context.Context) (*ic.SwitchCapability, error) {
log.Debugw("getSwitchCapability", log.Fields{"deviceId": agent.deviceId})
if device, err := agent.deviceMgr.GetDevice(agent.deviceId); device == nil {
return nil, err
} else {
- var switchCap *core_adapter.SwitchCapability
+ var switchCap *ic.SwitchCapability
var err error
if switchCap, err = agent.adapterProxy.GetOfpDeviceInfo(ctx, device); err != nil {
log.Debugw("getSwitchCapability-error", log.Fields{"id": device.Id, "error": err})
@@ -343,12 +343,12 @@
// getPortCapability is a helper method that a logical device agent uses to retrieve the port capability of a
// device
-func (agent *DeviceAgent) getPortCapability(ctx context.Context, portNo uint32) (*core_adapter.PortCapability, error) {
+func (agent *DeviceAgent) getPortCapability(ctx context.Context, portNo uint32) (*ic.PortCapability, error) {
log.Debugw("getPortCapability", log.Fields{"deviceId": agent.deviceId})
if device, err := agent.deviceMgr.GetDevice(agent.deviceId); device == nil {
return nil, err
} else {
- var portCap *core_adapter.PortCapability
+ var portCap *ic.PortCapability
var err error
if portCap, err = agent.adapterProxy.GetOfpPortInfo(ctx, device, portNo); err != nil {
log.Debugw("getPortCapability-error", log.Fields{"id": device.Id, "error": err})
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 6f4a874..45584a1 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -22,7 +22,7 @@
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/kafka"
- "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
ofp "github.com/opencord/voltha-go/protos/openflow_13"
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
@@ -249,7 +249,7 @@
return status.Errorf(codes.NotFound, "%s", deviceId)
}
-func (dMgr *DeviceManager) getSwitchCapability(ctx context.Context, deviceId string) (*core_adapter.SwitchCapability, error) {
+func (dMgr *DeviceManager) getSwitchCapability(ctx context.Context, deviceId string) (*ic.SwitchCapability, error) {
log.Debugw("getSwitchCapability", log.Fields{"deviceid": deviceId})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
return agent.getSwitchCapability(ctx)
@@ -266,7 +266,7 @@
}
-func (dMgr *DeviceManager) getPortCapability(ctx context.Context, deviceId string, portNo uint32) (*core_adapter.PortCapability, error) {
+func (dMgr *DeviceManager) getPortCapability(ctx context.Context, deviceId string, portNo uint32) (*ic.PortCapability, error) {
log.Debugw("getPortCapability", log.Fields{"deviceid": deviceId})
if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
return agent.getPortCapability(ctx, portNo)
@@ -340,9 +340,12 @@
// Activate the child device
if agent := dMgr.getDeviceAgent(agent.deviceId); agent != nil {
- return agent.enableDevice(nil)
+ go agent.enableDevice(nil)
}
+ // Publish on the messaging bus that we have discovered new devices
+ go dMgr.kafkaICProxy.DeviceDiscovered(agent.deviceId, deviceType, parentDeviceId)
+
return nil
}
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 60692e5..8a69967 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -22,7 +22,7 @@
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/model"
- ca "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
ofp "github.com/opencord/voltha-go/protos/openflow_13"
"github.com/opencord/voltha-go/protos/voltha"
fd "github.com/opencord/voltha-go/rw_core/flow_decomposition"
@@ -68,7 +68,7 @@
func (agent *LogicalDeviceAgent) start(ctx context.Context) error {
log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
//Build the logical device based on information retrieved from the device adapter
- var switchCap *ca.SwitchCapability
+ var switchCap *ic.SwitchCapability
var err error
if switchCap, err = agent.deviceMgr.getSwitchCapability(ctx, agent.rootDeviceId); err != nil {
log.Errorw("error-creating-logical-device", log.Fields{"error": err})
@@ -88,7 +88,7 @@
if nniPorts, err = agent.deviceMgr.getPorts(ctx, agent.rootDeviceId, voltha.Port_ETHERNET_NNI); err != nil {
log.Errorw("error-creating-logical-port", log.Fields{"error": err})
}
- var portCap *ca.PortCapability
+ var portCap *ic.PortCapability
for _, port := range nniPorts.Items {
log.Infow("!!!!!!!NNI PORTS", log.Fields{"NNI": port})
if portCap, err = agent.deviceMgr.getPortCapability(ctx, agent.rootDeviceId, port.PortNo); err != nil {
@@ -223,7 +223,6 @@
return nil
}
-
// getLogicalDeviceWithoutLock retrieves a logical device from the model without locking it. This is used only by
// functions that have already acquired the logical device lock to the model
func (agent *LogicalDeviceAgent) getLogicalDeviceWithoutLock() (*voltha.LogicalDevice, error) {
@@ -240,7 +239,7 @@
func (agent *LogicalDeviceAgent) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device) error {
log.Infow("addUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
// Build the logical device based on information retrieved from the device adapter
- var portCap *ca.PortCapability
+ var portCap *ic.PortCapability
var err error
//Get UNI port number
@@ -327,7 +326,6 @@
"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceId, groupMod.GetCommand())
}
-
//updateFlowGroupsWithoutLock updates the flows in the logical device without locking the logical device. This function
//must only be called by a function that is holding the lock on the logical device
func (agent *LogicalDeviceAgent) updateFlowGroupsWithoutLock(groups []*ofp.OfpGroupEntry) error {
@@ -737,7 +735,7 @@
func (agent *LogicalDeviceAgent) GetRoute(ingressPortNo uint32, egressPortNo uint32) []graph.RouteHop {
log.Debugw("getting-route", log.Fields{"ingress-port": ingressPortNo, "egress-port": egressPortNo})
// Get the updated logical device
- var ld *ca.LogicalDevice
+ var ld *ic.LogicalDevice
routes := make([]graph.RouteHop, 0)
var err error
if ld, err = agent.getLogicalDeviceWithoutLock(); err != nil {
diff --git a/rw_core/main.go b/rw_core/main.go
index 472072a..dbb82b0 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -23,7 +23,7 @@
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
"github.com/opencord/voltha-go/kafka"
- ca "github.com/opencord/voltha-go/protos/core_adapter"
+ ic "github.com/opencord/voltha-go/protos/inter_container"
"github.com/opencord/voltha-go/rw_core/config"
c "github.com/opencord/voltha-go/rw_core/core"
"os"
@@ -43,7 +43,7 @@
kafkaClient kafka.Client
core *c.Core
//For test
- receiverChannels []<-chan *ca.InterContainerMessage
+ receiverChannels []<-chan *ic.InterContainerMessage
}
func init() {
@@ -73,7 +73,7 @@
kafka.ProducerReturnOnErrors(true),
kafka.ProducerReturnOnSuccess(true),
kafka.ProducerMaxRetries(6),
- kafka.ProducerRetryBackoff(time.Millisecond * 30)), nil
+ kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
}
return nil, errors.New("unsupported-client-type")
}
@@ -83,7 +83,7 @@
rwCore.config = cf
rwCore.halted = false
rwCore.exitChannel = make(chan int, 1)
- rwCore.receiverChannels = make([]<-chan *ca.InterContainerMessage, 0)
+ rwCore.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
return &rwCore
}