This commit fixes a few issues:
1) The number of arguments to decode in a request to the simulated
OLT was incorrect
2) Adapter type was not set properly when a device is loaded from
DB
Change-Id: I7aa9a5314bd167565372138b0819df9aa744c41b
diff --git a/adapters/common/request_handler.go b/adapters/common/request_handler.go
index 3f8465c..b3606b0 100644
--- a/adapters/common/request_handler.go
+++ b/adapters/common/request_handler.go
@@ -21,6 +21,7 @@
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/adapters"
"github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/kafka"
ic "github.com/opencord/voltha-go/protos/inter_container"
"github.com/opencord/voltha-go/protos/voltha"
"google.golang.org/grpc/codes"
@@ -53,16 +54,28 @@
}
func (rhp *RequestHandlerProxy) Adopt_device(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) != 1 {
+ if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
device := &voltha.Device{}
- if err := ptypes.UnmarshalAny(args[0].Value, device); err != nil {
- log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
- return nil, err
+ transactionID := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
}
+
log.Debugw("Adopt_device", log.Fields{"deviceId": device.Id})
//Invoke the adopt device on the adapter
@@ -130,16 +143,28 @@
}
func (rhp *RequestHandlerProxy) Get_ofp_device_info(args []*ic.Argument) (*ic.SwitchCapability, error) {
- if len(args) != 1 {
+ if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
device := &voltha.Device{}
- if err := ptypes.UnmarshalAny(args[0].Value, device); err != nil {
- log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
- return nil, err
+ transactionID := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
}
+
log.Debugw("Get_ofp_device_info", log.Fields{"deviceId": device.Id})
var cap *ic.SwitchCapability
@@ -147,17 +172,19 @@
if cap, err = rhp.adapter.Get_ofp_device_info(device); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
+ log.Debugw("Get_ofp_device_info", log.Fields{"cap": cap})
return cap, nil
}
func (rhp *RequestHandlerProxy) Get_ofp_port_info(args []*ic.Argument) (*ic.PortCapability, error) {
- if len(args) != 2 {
+ if len(args) != 3 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
device := &voltha.Device{}
pNo := &ic.IntType{}
+ transactionID := &ic.StrType{}
for _, arg := range args {
switch arg.Key {
case "device":
@@ -170,6 +197,11 @@
log.Warnw("cannot-unmarshal-port-no", log.Fields{"error": err})
return nil, err
}
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
}
}
log.Debugw("Get_ofp_port_info", log.Fields{"deviceId": device.Id, "portNo": pNo.Val})
@@ -182,15 +214,26 @@
}
func (rhp *RequestHandlerProxy) Process_inter_adapter_message(args []*ic.Argument) (*empty.Empty, error) {
- if len(args) != 1 {
+ if len(args) != 2 {
log.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
iaMsg := &ic.InterAdapterMessage{}
- if err := ptypes.UnmarshalAny(args[0].Value, iaMsg); err != nil {
- log.Warnw("cannot-unmarshal-message", log.Fields{"error": err})
- return nil, err
+ transactionID := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "msg":
+ if err := ptypes.UnmarshalAny(arg.Value, iaMsg); err != nil {
+ log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
}
log.Debugw("Process_inter_adapter_message", log.Fields{"msgId": iaMsg.Header.Id})
diff --git a/adapters/simulated_olt/adaptercore/device_handler.go b/adapters/simulated_olt/adaptercore/device_handler.go
index 0741761..3e2d01d 100644
--- a/adapters/simulated_olt/adaptercore/device_handler.go
+++ b/adapters/simulated_olt/adaptercore/device_handler.go
@@ -133,6 +133,9 @@
cloned.ConnectStatus = voltha.ConnectStatus_REACHABLE
cloned.OperStatus = voltha.OperStatus_ACTIVE
+ dh.device = cloned
+ //dh.device.SerialNumber = cloned.SerialNumber
+
// Update the device state
if err := dh.coreProxy.DeviceStateUpdate(nil, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
log.Errorw("error-creating-nni-port", log.Fields{"deviceId": device.Id, "error": err})
@@ -149,7 +152,6 @@
"simulated_onu",
initialUniPortNo+i)
}
- dh.device = cloned
}
func (dh *DeviceHandler) GetOfpDeviceInfo(device *voltha.Device) (*ic.SwitchCapability, error) {
diff --git a/adapters/simulated_olt/adaptercore/simulated_olt.go b/adapters/simulated_olt/adaptercore/simulated_olt.go
index 2dd7c67..edf3135 100644
--- a/adapters/simulated_olt/adaptercore/simulated_olt.go
+++ b/adapters/simulated_olt/adaptercore/simulated_olt.go
@@ -126,7 +126,9 @@
func (so *SimulatedOLT) Get_ofp_device_info(device *voltha.Device) (*ic.SwitchCapability, error) {
log.Infow("Get_ofp_device_info", log.Fields{"deviceId": device.Id})
if handler := so.getDeviceHandler(device.Id); handler != nil {
- return handler.GetOfpDeviceInfo(device)
+ info, err := handler.GetOfpDeviceInfo(device)
+ log.Infow("Get_ofp_device_info-resp", log.Fields{"switch": info})
+ return info, err
}
log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})
return nil, errors.New("device-handler-not-set")
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 4359f7d..099d0d8 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -38,7 +38,7 @@
const (
DefaultMaxRetries = 3
- DefaultRequestTimeout = 3000 // 3000 milliseconds - to handle a wider latency range
+ DefaultRequestTimeout = 10000 // 10000 milliseconds - to handle a wider latency range
)
const (
@@ -256,7 +256,7 @@
// subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
key := GetDeviceIdFromTopic(*toTopic)
log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key})
- go kp.kafkaClient.Send(protoRequest, toTopic, key)
+ kp.kafkaClient.Send(protoRequest, toTopic, key)
if waitForResponse {
// Create a child context based on the parent context, if any
@@ -756,6 +756,12 @@
}
kp.setupTopicResponseChannelMap(topic.Name, subscribedCh)
go kp.waitForResponseLoop(subscribedCh, &topic)
+
+ // Wait until topic is ready - it takes on average 300 ms for a topic to be created. This is a one time
+ // delay everything a device is created.
+ // TODO: Implement a mechanism to determine when a topic is ready instead of relying on a timeout
+ //kp.kafkaClient.WaitForTopicToBeReady
+ time.Sleep(400 * time.Millisecond)
}
// Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 9ddda85..ed09c9d 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -47,6 +47,7 @@
var err error
if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ return err
}
log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
// TODO: Need to get the real error code
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 116e2bb..af717ef 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -84,6 +84,7 @@
if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
if d, ok := device.(*voltha.Device); ok {
agent.lastData = proto.Clone(d).(*voltha.Device)
+ agent.deviceType = agent.lastData.Adapter
}
} else {
log.Errorw("failed-to-load-device", log.Fields{"deviceId": agent.deviceId})
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index bda249f..9d05757 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -86,8 +86,10 @@
log.Errorw("error-creating-datapath-id", log.Fields{"error": err})
return err
}
+
ld.DatapathId = datapathID
ld.Desc = (proto.Clone(switchCap.Desc)).(*ofp.OfpDesc)
+ log.Debugw("Switch-capability", log.Fields{"Desc": ld.Desc, "fromAd": switchCap.Desc})
ld.SwitchFeatures = (proto.Clone(switchCap.SwitchFeatures)).(*ofp.OfpSwitchFeatures)
ld.Flows = &ofp.Flows{Items: nil}
ld.FlowGroups = &ofp.FlowGroups{Items: nil}
@@ -847,7 +849,7 @@
}
//it is possible that the downstream ports are not created, but the flow_decomposition has already
//kicked in. In such scenarios, cut short the processing and return.
- if len(downstreamPorts) == 0 {
+ if len(downstreamPorts) == 0 || len(upstreamPorts) == 0{
return fg
}
// set up the default flows