[VOL-1036] Initial implementation of device lifecycle management

Change-Id: I5aa58fdcbcd852f6f5eef35d48f25f76e20c0418
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index a0e25f3..bccb227 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -18,6 +18,7 @@
 import (
 	"context"
 	"github.com/golang/protobuf/ptypes"
+	a "github.com/golang/protobuf/ptypes/any"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/kafka"
 	ca "github.com/opencord/voltha-go/protos/core_adapter"
@@ -37,28 +38,47 @@
 	return &proxy
 }
 
+func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
+	if success {
+		return nil
+	} else {
+		unpackResult := &ca.Error{}
+		var err error
+		if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
+			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+		}
+		log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
+		// TODO:  Need to get the real error code
+		return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
+	}
+}
+
 func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("AdoptDevice", log.Fields{"device": device})
+	rpc := "adopt_device"
 	topic := kafka.Topic{Name: device.Type}
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
-	success, result := ap.kafkaProxy.InvokeRPC(ctx, "adopt_device", &topic, true, args...)
-	log.Debugw("AdoptDevice-response", log.Fields{"deviceid": device.Id, "success": success, "result": result})
-	if success {
-		return nil
-	} else {
-		unpackResult := &ca.Error{}
-		var err error
-		if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
-			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
-		}
-		log.Debugw("AdoptDevice-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
-		// TODO:  Need to get the real error code
-		return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
+	success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+	log.Debugw("AdoptDevice-response", log.Fields{"deviceid": device.Id, "success": success})
+	return unPackResponse(rpc, device.Id, success, result)
+}
+
+func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
+	log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
+	rpc := "disable_device"
+	topic := kafka.Topic{Name: device.Type}
+	args := make([]*kafka.KVArg, 1)
+	args[0] = &kafka.KVArg{
+		Key:   "device",
+		Value: device,
 	}
+	success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
+	return unPackResponse(rpc, device.Id, success, result)
 }
 
 func (ap *AdapterProxy) AdapterDescriptor() (*voltha.Adapter, error) {
@@ -76,7 +96,7 @@
 	return nil, nil
 }
 
-func (ap *AdapterProxy) ReconcileDevice(device voltha.Device) error {
+func (ap *AdapterProxy) ReconcileDevice(device *voltha.Device) error {
 	log.Debug("ReconcileDevice")
 	return nil
 }
@@ -86,22 +106,26 @@
 	return nil
 }
 
-func (ap *AdapterProxy) DisableDevice(device voltha.Device) error {
-	log.Debug("DisableDevice")
-	return nil
+func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
+	log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
+	rpc := "reenable_device"
+	topic := kafka.Topic{Name: device.Type}
+	args := make([]*kafka.KVArg, 1)
+	args[0] = &kafka.KVArg{
+		Key:   "device",
+		Value: device,
+	}
+	success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+	log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
+	return unPackResponse(rpc, device.Id, success, result)
 }
 
-func (ap *AdapterProxy) ReEnableDevice(device voltha.Device) error {
-	log.Debug("ReEnableDevice")
-	return nil
-}
-
-func (ap *AdapterProxy) RebootDevice(device voltha.Device) error {
+func (ap *AdapterProxy) RebootDevice(device *voltha.Device) error {
 	log.Debug("RebootDevice")
 	return nil
 }
 
-func (ap *AdapterProxy) DeleteDevice(device voltha.Device) error {
+func (ap *AdapterProxy) DeleteDevice(device *voltha.Device) error {
 	log.Debug("DeleteDevice")
 	return nil
 }
@@ -172,7 +196,7 @@
 }
 
 func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ca.SwitchCapability, error) {
-	log.Debugw("GetOfpDeviceInfo", log.Fields{"device": device})
+	log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
 	topic := kafka.Topic{Name: device.Type}
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
@@ -180,7 +204,7 @@
 		Value: device,
 	}
 	success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_device_info", &topic, true, args...)
-	log.Debugw("GetOfpDeviceInfo-response", log.Fields{"device": device, "success": success, "result": result})
+	log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
 	if success {
 		unpackResult := &ca.SwitchCapability{}
 		if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
@@ -201,7 +225,7 @@
 }
 
 func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ca.PortCapability, error) {
-	log.Debug("GetOfpPortInfo", log.Fields{"device": device})
+	log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
 	topic := kafka.Topic{Name: device.Type}
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
@@ -215,7 +239,7 @@
 	}
 
 	success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_port_info", &topic, true, args...)
-	log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "device": device, "success": success, "result": result})
+	log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
 	if success {
 		unpackResult := &ca.PortCapability{}
 		if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {