This commit fixes a few issues:
1) The number of arguments to decode in a request to the simulated
OLT was incorrect
2) Adapter type was not set properly when a device is loaded from
DB

Change-Id: I7aa9a5314bd167565372138b0819df9aa744c41b
diff --git a/adapters/common/request_handler.go b/adapters/common/request_handler.go
index 3f8465c..b3606b0 100644
--- a/adapters/common/request_handler.go
+++ b/adapters/common/request_handler.go
@@ -21,6 +21,7 @@
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/adapters"
 	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/kafka"
 	ic "github.com/opencord/voltha-go/protos/inter_container"
 	"github.com/opencord/voltha-go/protos/voltha"
 	"google.golang.org/grpc/codes"
@@ -53,16 +54,28 @@
 }
 
 func (rhp *RequestHandlerProxy) Adopt_device(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) != 1 {
+	if len(args) != 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	device := &voltha.Device{}
-	if err := ptypes.UnmarshalAny(args[0].Value, device); err != nil {
-		log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
-		return nil, err
+	transactionID := &ic.StrType{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "device":
+			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+				log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		}
 	}
+
 	log.Debugw("Adopt_device", log.Fields{"deviceId": device.Id})
 
 	//Invoke the adopt device on the adapter
@@ -130,16 +143,28 @@
 }
 
 func (rhp *RequestHandlerProxy) Get_ofp_device_info(args []*ic.Argument) (*ic.SwitchCapability, error) {
-	if len(args) != 1 {
+	if len(args) != 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	device := &voltha.Device{}
-	if err := ptypes.UnmarshalAny(args[0].Value, device); err != nil {
-		log.Warnw("cannot-unmarshal-ID", log.Fields{"error": err})
-		return nil, err
+	transactionID := &ic.StrType{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "device":
+			if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+				log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		}
 	}
+
 	log.Debugw("Get_ofp_device_info", log.Fields{"deviceId": device.Id})
 
 	var cap *ic.SwitchCapability
@@ -147,17 +172,19 @@
 	if cap, err = rhp.adapter.Get_ofp_device_info(device); err != nil {
 		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
 	}
+	log.Debugw("Get_ofp_device_info", log.Fields{"cap": cap})
 	return cap, nil
 }
 
 func (rhp *RequestHandlerProxy) Get_ofp_port_info(args []*ic.Argument) (*ic.PortCapability, error) {
-	if len(args) != 2 {
+	if len(args) != 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	device := &voltha.Device{}
 	pNo := &ic.IntType{}
+	transactionID := &ic.StrType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device":
@@ -170,6 +197,11 @@
 				log.Warnw("cannot-unmarshal-port-no", log.Fields{"error": err})
 				return nil, err
 			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
 		}
 	}
 	log.Debugw("Get_ofp_port_info", log.Fields{"deviceId": device.Id, "portNo": pNo.Val})
@@ -182,15 +214,26 @@
 }
 
 func (rhp *RequestHandlerProxy) Process_inter_adapter_message(args []*ic.Argument) (*empty.Empty, error) {
-	if len(args) != 1 {
+	if len(args) != 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	iaMsg := &ic.InterAdapterMessage{}
-	if err := ptypes.UnmarshalAny(args[0].Value, iaMsg); err != nil {
-		log.Warnw("cannot-unmarshal-message", log.Fields{"error": err})
-		return nil, err
+	transactionID := &ic.StrType{}
+	for _, arg := range args {
+		switch arg.Key {
+		case "msg":
+			if err := ptypes.UnmarshalAny(arg.Value, iaMsg); err != nil {
+				log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+				return nil, err
+			}
+		case kafka.TransactionKey:
+			if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+				log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+				return nil, err
+			}
+		}
 	}
 
 	log.Debugw("Process_inter_adapter_message", log.Fields{"msgId": iaMsg.Header.Id})
diff --git a/adapters/simulated_olt/adaptercore/device_handler.go b/adapters/simulated_olt/adaptercore/device_handler.go
index 0741761..3e2d01d 100644
--- a/adapters/simulated_olt/adaptercore/device_handler.go
+++ b/adapters/simulated_olt/adaptercore/device_handler.go
@@ -133,6 +133,9 @@
 	cloned.ConnectStatus = voltha.ConnectStatus_REACHABLE
 	cloned.OperStatus = voltha.OperStatus_ACTIVE
 
+	dh.device = cloned
+	//dh.device.SerialNumber = cloned.SerialNumber
+
 	//	Update the device state
 	if err := dh.coreProxy.DeviceStateUpdate(nil, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
 		log.Errorw("error-creating-nni-port", log.Fields{"deviceId": device.Id, "error": err})
@@ -149,7 +152,6 @@
 			"simulated_onu",
 			initialUniPortNo+i)
 	}
-	dh.device = cloned
 }
 
 func (dh *DeviceHandler) GetOfpDeviceInfo(device *voltha.Device) (*ic.SwitchCapability, error) {
diff --git a/adapters/simulated_olt/adaptercore/simulated_olt.go b/adapters/simulated_olt/adaptercore/simulated_olt.go
index 2dd7c67..edf3135 100644
--- a/adapters/simulated_olt/adaptercore/simulated_olt.go
+++ b/adapters/simulated_olt/adaptercore/simulated_olt.go
@@ -126,7 +126,9 @@
 func (so *SimulatedOLT) Get_ofp_device_info(device *voltha.Device) (*ic.SwitchCapability, error) {
 	log.Infow("Get_ofp_device_info", log.Fields{"deviceId": device.Id})
 	if handler := so.getDeviceHandler(device.Id); handler != nil {
-		return handler.GetOfpDeviceInfo(device)
+		info, err := handler.GetOfpDeviceInfo(device)
+		log.Infow("Get_ofp_device_info-resp", log.Fields{"switch": info})
+		return info, err
 	}
 	log.Errorw("device-handler-not-set", log.Fields{"deviceId": device.Id})
 	return nil, errors.New("device-handler-not-set")
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 4359f7d..099d0d8 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -38,7 +38,7 @@
 
 const (
 	DefaultMaxRetries     = 3
-	DefaultRequestTimeout = 3000 // 3000 milliseconds - to handle a wider latency range
+	DefaultRequestTimeout = 10000 // 10000 milliseconds - to handle a wider latency range
 )
 
 const (
@@ -256,7 +256,7 @@
 	// subscriber on that topic will receive the request in the order it was sent.  The key used is the deviceId.
 	key := GetDeviceIdFromTopic(*toTopic)
 	log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key})
-	go kp.kafkaClient.Send(protoRequest, toTopic, key)
+	kp.kafkaClient.Send(protoRequest, toTopic, key)
 
 	if waitForResponse {
 		// Create a child context based on the parent context, if any
@@ -756,6 +756,12 @@
 		}
 		kp.setupTopicResponseChannelMap(topic.Name, subscribedCh)
 		go kp.waitForResponseLoop(subscribedCh, &topic)
+
+		//	Wait until topic is ready - it takes on average 300 ms for a topic to be created.  This is a one time
+		//	delay everything a device is created.
+		// TODO:  Implement a mechanism to determine when a topic is ready instead of relying on a timeout
+		//kp.kafkaClient.WaitForTopicToBeReady
+		time.Sleep(400 * time.Millisecond)
 	}
 
 	// Create a specific channel for this consumers.  We cannot use the channel from the kafkaclient as it will
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 9ddda85..ed09c9d 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -47,6 +47,7 @@
 		var err error
 		if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
 			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+			return err
 		}
 		log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
 		// TODO:  Need to get the real error code
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 116e2bb..af717ef 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -84,6 +84,7 @@
 		if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
 			if d, ok := device.(*voltha.Device); ok {
 				agent.lastData = proto.Clone(d).(*voltha.Device)
+				agent.deviceType = agent.lastData.Adapter
 			}
 		} else {
 			log.Errorw("failed-to-load-device", log.Fields{"deviceId": agent.deviceId})
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index bda249f..9d05757 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -86,8 +86,10 @@
 			log.Errorw("error-creating-datapath-id", log.Fields{"error": err})
 			return err
 		}
+
 		ld.DatapathId = datapathID
 		ld.Desc = (proto.Clone(switchCap.Desc)).(*ofp.OfpDesc)
+		log.Debugw("Switch-capability", log.Fields{"Desc": ld.Desc, "fromAd": switchCap.Desc})
 		ld.SwitchFeatures = (proto.Clone(switchCap.SwitchFeatures)).(*ofp.OfpSwitchFeatures)
 		ld.Flows = &ofp.Flows{Items: nil}
 		ld.FlowGroups = &ofp.FlowGroups{Items: nil}
@@ -847,7 +849,7 @@
 	}
 	//it is possible that the downstream ports are not created, but the flow_decomposition has already
 	//kicked in. In such scenarios, cut short the processing and return.
-	if len(downstreamPorts) == 0 {
+	if len(downstreamPorts) == 0 || len(upstreamPorts) == 0{
 		return fg
 	}
 	// set up the default flows