VOL-1536 Inter OpenOLT/Onu Adapter Messaging
1. Add GetDevice/GetChildDevice to facilitate ONU adapter look up ONU data in core
2. fix bug in sending proxy message

Change-Id: I1206430627408c9e6fcedc12d9e78d95eaa60ee5
diff --git a/adapters/common/adapter_proxy.go b/adapters/common/adapter_proxy.go
index 13b98b0..6c32422 100644
--- a/adapters/common/adapter_proxy.go
+++ b/adapters/common/adapter_proxy.go
@@ -86,9 +86,9 @@
 	}
 
 	// Set up the required rpc arguments
-	topic := kafka.Topic{Name: fromAdapter}
-	replyToTopic := kafka.Topic{Name: toAdapter}
-	rpc := "Process_inter_adapter_message"
+	topic := kafka.Topic{Name: toAdapter}
+	replyToTopic := kafka.Topic{Name: fromAdapter}
+	rpc := "process_inter_adapter_message"
 
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
 	log.Debugw("inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
diff --git a/adapters/common/core_proxy.go b/adapters/common/core_proxy.go
index 137877f..738a77a 100644
--- a/adapters/common/core_proxy.go
+++ b/adapters/common/core_proxy.go
@@ -29,10 +29,10 @@
 )
 
 type CoreProxy struct {
-	kafkaICProxy *kafka.InterContainerProxy
-	adapterTopic string
-	coreTopic    string
-	deviceIdCoreMap map[string]string
+	kafkaICProxy        *kafka.InterContainerProxy
+	adapterTopic        string
+	coreTopic           string
+	deviceIdCoreMap     map[string]string
 	lockDeviceIdCoreMap sync.RWMutex
 
 }
@@ -233,4 +233,101 @@
 	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
 	log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
 	return unPackResponse(rpc, parentDeviceId, success, result)
+
+}
+
+func (ap *CoreProxy) GetDevice(ctx context.Context, parentDeviceId string, deviceId string) (*voltha.Device, error) {
+	log.Debugw("GetDevice", log.Fields{"deviceId": deviceId})
+	rpc := "GetDevice"
+
+	toTopic := ap.getCoreTopic(parentDeviceId)
+	replyToTopic := ap.getAdapterTopic()
+
+	args := make([]*kafka.KVArg, 1)
+	id := &voltha.ID{Id: deviceId}
+	args[0] = &kafka.KVArg{
+		Key:   "device_id",
+		Value: id,
+	}
+
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+	log.Debugw("GetDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+
+	if success {
+		volthaDevice := &voltha.Device{}
+		if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
+			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+			return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+		}
+		return volthaDevice, nil
+	} else {
+		unpackResult := &ic.Error{}
+		var err error
+		if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+		}
+		log.Debugw("GetDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
+		// TODO:  Need to get the real error code
+		return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+	}
+}
+
+func (ap *CoreProxy) GetChildDevice(ctx context.Context, parentDeviceId string, kwargs map[string]interface{}) (*voltha.Device, error) {
+	log.Debugw("GetChildDevice", log.Fields{"parentDeviceId": parentDeviceId, "kwargs": kwargs})
+	rpc := "GetChildDevice"
+
+	toTopic := ap.getCoreTopic(parentDeviceId)
+	replyToTopic := ap.getAdapterTopic()
+
+	args := make([]*kafka.KVArg, 4)
+	id := &voltha.ID{Id: parentDeviceId}
+	args[0] = &kafka.KVArg{
+		Key:   "device_id",
+		Value: id,
+	}
+
+	var cnt uint8 = 0
+	for k, v := range kwargs {
+		cnt += 1
+		if k == "serial_number" {
+			val := &ic.StrType{Val: v.(string)}
+			args[cnt] = &kafka.KVArg{
+				Key:   k,
+				Value: val,
+			}
+		} else if k == "onu_id" {
+			val := &ic.IntType{Val: int64(v.(uint32))}
+			args[cnt] = &kafka.KVArg{
+				Key:   k,
+				Value: val,
+			}
+		} else if k == "parent_port_no" {
+			val := &ic.IntType{Val: int64(v.(uint32))}
+			args[cnt] = &kafka.KVArg{
+				Key:   k,
+				Value: val,
+			}
+		}
+	}
+
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+	log.Debugw("GetChildDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+
+	if success {
+		volthaDevice := &voltha.Device{}
+		if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
+			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+			return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+		}
+		return volthaDevice, nil
+	} else {
+		unpackResult := &ic.Error{}
+		var err error
+		if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+		}
+		log.Debugw("GetChildDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
+		// TODO:  Need to get the real error code
+		return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+	}
 }