[VOL-2164] Update rw-core to use the Async Kafka API

This commit consists of the following:

1. Process per-device requests in the Core in the order they are
received. If there are lots of requests on a given device then
there will be some latencies introduced due to ordering.  With
recent changes in the model along with keeping the request lock
to a minimal then these latencies are reduced.  Testing did not
show and noticeable latencies.

2) Keep the request lock from the moment a request started
processing to the moment that request is sent to kafka (when
applicable).  Adapter responses are received and processed
asynchronously. Therefore, an adapter can takes all the time it
needs to process a transaction.  The Core still has a context
with timeout (configurable) to cater for cases where the adapter
does not return a response.

3) Adapter requests are processed to completion before sending a
reponse back to the adapter.  Previously, in some cases, a
separate go routine was created to process the request and a
successful response is sent to the adapter.  Now if the request
fails then the adapter will receive an error. The adapter
requests for a given device are therefore processed in the
order they are received.

4) Some changes are made when retrieving a handler to execute
a device state transition.  This was necessary as there was some
transition overlap found.

Update after multiple reviews.

Change-Id: I55a189efec1549a662f2d71e18e6eca9015a3a17
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index f6e9867..e3c362e 100755
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -18,21 +18,15 @@
 
 import (
 	"context"
-
-	"github.com/golang/protobuf/ptypes"
-	a "github.com/golang/protobuf/ptypes/any"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 	"github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
 )
 
 // AdapterProxy represents adapter proxy attributes
 type AdapterProxy struct {
-	TestMode              bool
 	deviceTopicRegistered bool
 	corePairTopic         string
 	kafkaICProxy          kafka.InterContainerProxy
@@ -47,21 +41,6 @@
 	}
 }
 
-func unPackResponse(rpc string, deviceID string, success bool, response *a.Any) error {
-	if success {
-		return nil
-	}
-	unpackResult := &ic.Error{}
-	var err error
-	if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
-		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
-		return err
-	}
-	log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceID, "success": success, "error": err})
-	// TODO:  Need to get the real error code
-	return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
-}
-
 func (ap *AdapterProxy) getCoreTopic() kafka.Topic {
 	return kafka.Topic{Name: ap.corePairTopic}
 }
@@ -70,575 +49,296 @@
 	return kafka.Topic{Name: adapterName}
 }
 
-// AdoptDevice invokes adopt device rpc
-func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
-	log.Debugw("AdoptDevice", log.Fields{"device": device})
+func (ap *AdapterProxy) sendRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
+	waitForResponse bool, deviceID string, kvArgs ...*kafka.KVArg) (chan *kafka.RpcResponse, error) {
+
+	// Sent the request to kafka
+	respChnl := ap.kafkaICProxy.InvokeAsyncRPC(ctx, rpc, toTopic, replyToTopic, waitForResponse, deviceID, kvArgs...)
+
+	// Wait for first response which would indicate whether the request was successfully sent to kafka.
+	firstResponse, ok := <-respChnl
+	if !ok || firstResponse.MType != kafka.RpcSent {
+		log.Errorw("failure to request to kafka", log.Fields{"rpc": rpc, "device-id": deviceID, "error": firstResponse.Err})
+		return nil, firstResponse.Err
+	}
+	// return the kafka channel for the caller to wait for the response of the RPC call
+	return respChnl, nil
+}
+
+// adoptDevice invokes adopt device rpc
+func (ap *AdapterProxy) adoptDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
+	log.Debugw("adoptDevice", log.Fields{"device-id": device.Id})
 	rpc := "adopt_device"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	//topic := kafka.Topic{Name: device.Adapter}
-	args := make([]*kafka.KVArg, 1)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
 	}
-	// Use a device topic for the response as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
 	ap.deviceTopicRegistered = true
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// DisableDevice invokes disable device rpc
-func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
-	log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
+// disableDevice invokes disable device rpc
+func (ap *AdapterProxy) disableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
+	log.Debugw("disableDevice", log.Fields{"device-id": device.Id})
 	rpc := "disable_device"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-
-	// Use a device specific topic to send the request.  The adapter handling the device creates a device
-	// specific topic
-	//toTopic := kafka.CreateSubTopic(device.Adapter, device.Id)
-	args := make([]*kafka.KVArg, 1)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
 	}
-	// Use a device specific topic as we are the only core handling requests for this device
-	//replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// ReEnableDevice invokes reenable device rpc
-func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
-	log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
+// reEnableDevice invokes reenable device rpc
+func (ap *AdapterProxy) reEnableDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
+	log.Debugw("reEnableDevice", log.Fields{"device-id": device.Id})
 	rpc := "reenable_device"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 1)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
 	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// RebootDevice invokes reboot device rpc
-func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
-	log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
+// rebootDevice invokes reboot device rpc
+func (ap *AdapterProxy) rebootDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
+	log.Debugw("rebootDevice", log.Fields{"device-id": device.Id})
 	rpc := "reboot_device"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 1)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
 	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// DeleteDevice invokes delete device rpc
-func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
-	log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
+// deleteDevice invokes delete device rpc
+func (ap *AdapterProxy) deleteDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
+	log.Debugw("deleteDevice", log.Fields{"device-id": device.Id})
 	rpc := "delete_device"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 1)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
 	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
-
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// GetOfpDeviceInfo invokes get ofp device info rpc
-func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) {
-	log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
+// getOfpDeviceInfo invokes get ofp device info rpc
+func (ap *AdapterProxy) getOfpDeviceInfo(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
+	log.Debugw("getOfpDeviceInfo", log.Fields{"device-id": device.Id})
+	rpc := "get_ofp_device_info"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 1)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
 	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
-	if success {
-		unpackResult := &ic.SwitchCapability{}
-		if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
-			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
-			return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
-		}
-		return unpackResult, nil
-	}
-	unpackResult := &ic.Error{}
-	var err error
-	if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
-		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
-	}
-	log.Debugw("GetOfpDeviceInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
-	// TODO:  Need to get the real error code
-	return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// GetOfpPortInfo invokes get ofp port info rpc
-func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ic.PortCapability, error) {
-	log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
+// getOfpPortInfo invokes get ofp port info rpc
+func (ap *AdapterProxy) getOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (chan *kafka.RpcResponse, error) {
+	log.Debugw("getOfpPortInfo", log.Fields{"device-id": device.Id, "port-no": portNo})
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 2)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
+		{Key: "port_no", Value: &ic.IntType{Val: int64(portNo)}},
 	}
-	pNo := &ic.IntType{Val: int64(portNo)}
-	args[1] = &kafka.KVArg{
-		Key:   "port_no",
-		Value: pNo,
-	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
-	if success {
-		unpackResult := &ic.PortCapability{}
-		if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
-			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
-			return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
-		}
-		return unpackResult, nil
-	}
-	unpackResult := &ic.Error{}
-	var err error
-	if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
-		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
-	}
-	log.Debugw("GetOfpPortInfo-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
-	// TODO:  Need to get the real error code
-	return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+	return ap.sendRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-//TODO: Implement the functions below
-
-// AdapterDescriptor - TODO
-func (ap *AdapterProxy) AdapterDescriptor() (*voltha.Adapter, error) {
-	log.Debug("AdapterDescriptor")
-	return nil, nil
-}
-
-// DeviceTypes - TODO
-func (ap *AdapterProxy) DeviceTypes() (*voltha.DeviceType, error) {
-	log.Debug("DeviceTypes")
-	return nil, nil
-}
-
-// Health - TODO
-func (ap *AdapterProxy) Health() (*voltha.HealthStatus, error) {
-	log.Debug("Health")
-	return nil, nil
-}
-
-// ReconcileDevice invokes reconcile device rpc
-func (ap *AdapterProxy) ReconcileDevice(ctx context.Context, device *voltha.Device) error {
-	log.Debugw("ReconcileDevice", log.Fields{"deviceId": device.Id})
+// reconcileDevice invokes reconcile device rpc
+func (ap *AdapterProxy) reconcileDevice(ctx context.Context, device *voltha.Device) (chan *kafka.RpcResponse, error) {
+	log.Debugw("reconcileDevice", log.Fields{"device-id": device.Id})
 	rpc := "reconcile_device"
 	toTopic := ap.getAdapterTopic(device.Adapter)
 	args := []*kafka.KVArg{
 		{Key: "device", Value: device},
 	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("ReconcileDevice-response", log.Fields{"deviceid": device.Id, "success": success})
-
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// AbandonDevice - TODO
-func (ap *AdapterProxy) AbandonDevice(device voltha.Device) error {
-	log.Debug("AbandonDevice")
-	return nil
-}
-
-// GetDeviceDetails - TODO
-func (ap *AdapterProxy) GetDeviceDetails(device voltha.Device) (*voltha.Device, error) {
-	log.Debug("GetDeviceDetails")
-	return nil, nil
-}
-
-// DownloadImage invokes download image rpc
-func (ap *AdapterProxy) DownloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
-	log.Debugw("DownloadImage", log.Fields{"deviceId": device.Id, "image": download.Name})
+// downloadImage invokes download image rpc
+func (ap *AdapterProxy) downloadImage(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
+	log.Debugw("downloadImage", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "download_image"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 2)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
+		{Key: "request", Value: download},
 	}
-	args[1] = &kafka.KVArg{
-		Key:   "request",
-		Value: download,
-	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("DownloadImage-response", log.Fields{"deviceId": device.Id, "success": success})
-
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// GetImageDownloadStatus invokes get image download status rpc
-func (ap *AdapterProxy) GetImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (*voltha.ImageDownload, error) {
-	log.Debugw("GetImageDownloadStatus", log.Fields{"deviceId": device.Id, "image": download.Name})
+// getImageDownloadStatus invokes get image download status rpc
+func (ap *AdapterProxy) getImageDownloadStatus(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
+	log.Debugw("getImageDownloadStatus", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "get_image_download_status"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 2)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
+		{Key: "request", Value: download},
 	}
-	args[1] = &kafka.KVArg{
-		Key:   "request",
-		Value: download,
-	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("GetImageDownloadStatus-response", log.Fields{"deviceId": device.Id, "success": success})
-
-	if success {
-		unpackResult := &voltha.ImageDownload{}
-		if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
-			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
-			return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
-		}
-		return unpackResult, nil
-	}
-	unpackResult := &ic.Error{}
-	var err error
-	if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
-		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
-		return nil, err
-	}
-	log.Debugw("GetImageDownloadStatus-return", log.Fields{"deviceid": device.Id, "success": success, "error": err})
-	return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// CancelImageDownload invokes cancel image download rpc
-func (ap *AdapterProxy) CancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
-	log.Debugw("CancelImageDownload", log.Fields{"deviceId": device.Id, "image": download.Name})
+// cancelImageDownload invokes cancel image download rpc
+func (ap *AdapterProxy) cancelImageDownload(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
+	log.Debugw("cancelImageDownload", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "cancel_image_download"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 2)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
+		{Key: "request", Value: download},
 	}
-	args[1] = &kafka.KVArg{
-		Key:   "request",
-		Value: download,
-	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("CancelImageDownload-response", log.Fields{"deviceId": device.Id, "success": success})
-
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// ActivateImageUpdate invokes activate image update rpc
-func (ap *AdapterProxy) ActivateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
-	log.Debugw("ActivateImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
+// activateImageUpdate invokes activate image update rpc
+func (ap *AdapterProxy) activateImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
+	log.Debugw("activateImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "activate_image_update"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 2)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
+		{Key: "request", Value: download},
 	}
-	args[1] = &kafka.KVArg{
-		Key:   "request",
-		Value: download,
-	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("ActivateImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
-
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// RevertImageUpdate invokes revert image update rpc
-func (ap *AdapterProxy) RevertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) error {
-	log.Debugw("RevertImageUpdate", log.Fields{"deviceId": device.Id, "image": download.Name})
+// revertImageUpdate invokes revert image update rpc
+func (ap *AdapterProxy) revertImageUpdate(ctx context.Context, device *voltha.Device, download *voltha.ImageDownload) (chan *kafka.RpcResponse, error) {
+	log.Debugw("revertImageUpdate", log.Fields{"device-id": device.Id, "image": download.Name})
 	rpc := "revert_image_update"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 2)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
+		{Key: "request", Value: download},
 	}
-	args[1] = &kafka.KVArg{
-		Key:   "request",
-		Value: download,
-	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("RevertImageUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
-
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// SelfTestDevice - TODO
-func (ap *AdapterProxy) SelfTestDevice(device voltha.Device) (*voltha.SelfTestResponse, error) {
-	log.Debug("SelfTestDevice")
-	return nil, nil
-}
-
-func (ap *AdapterProxy) packetOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *openflow_13.OfpPacketOut) error {
-	log.Debugw("packetOut", log.Fields{"deviceId": deviceID})
+func (ap *AdapterProxy) packetOut(ctx context.Context, deviceType string, deviceID string, outPort uint32, packet *openflow_13.OfpPacketOut) (chan *kafka.RpcResponse, error) {
+	log.Debugw("packetOut", log.Fields{"device-id": deviceID, "device-type": deviceType, "out-port": outPort})
 	toTopic := ap.getAdapterTopic(deviceType)
 	rpc := "receive_packet_out"
-	dID := &ic.StrType{Val: deviceID}
-	args := make([]*kafka.KVArg, 3)
-	args[0] = &kafka.KVArg{
-		Key:   "deviceId",
-		Value: dID,
+	args := []*kafka.KVArg{
+		{Key: "deviceId", Value: &ic.StrType{Val: deviceID}},
+		{Key: "outPort", Value: &ic.IntType{Val: int64(outPort)}},
+		{Key: "packet", Value: packet},
 	}
-	op := &ic.IntType{Val: int64(outPort)}
-	args[1] = &kafka.KVArg{
-		Key:   "outPort",
-		Value: op,
-	}
-	args[2] = &kafka.KVArg{
-		Key:   "packet",
-		Value: packet,
-	}
-
-	// TODO:  Do we need to wait for an ACK on a packet Out?
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, deviceID, args...)
-	log.Debugw("packetOut", log.Fields{"deviceid": deviceID, "success": success})
-	return unPackResponse(rpc, deviceID, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, deviceID, args...)
 }
 
-// UpdateFlowsBulk invokes update flows bulk rpc
-func (ap *AdapterProxy) UpdateFlowsBulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) error {
-	log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id, "flowsInUpdate": len(flows.Items), "groupsToUpdate": len(groups.Items)})
+// updateFlowsBulk invokes update flows bulk rpc
+func (ap *AdapterProxy) updateFlowsBulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
+	log.Debugw("updateFlowsBulk", log.Fields{"device-id": device.Id, "flow-count": len(flows.Items), "group-count": len(groups.Items), "flow-metadata": flowMetadata})
 	toTopic := ap.getAdapterTopic(device.Adapter)
 	rpc := "update_flows_bulk"
-	args := make([]*kafka.KVArg, 4)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
+		{Key: "flows", Value: flows},
+		{Key: "groups", Value: groups},
+		{Key: "flow_metadata", Value: flowMetadata},
 	}
-	args[1] = &kafka.KVArg{
-		Key:   "flows",
-		Value: flows,
-	}
-	args[2] = &kafka.KVArg{
-		Key:   "groups",
-		Value: groups,
-	}
-	args[3] = &kafka.KVArg{
-		Key:   "flow_metadata",
-		Value: flowMetadata,
-	}
-
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(context.TODO(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// UpdateFlowsIncremental invokes update flows incremental rpc
-func (ap *AdapterProxy) UpdateFlowsIncremental(ctx context.Context, device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
-	log.Debugw("UpdateFlowsIncremental",
+// updateFlowsIncremental invokes update flows incremental rpc
+func (ap *AdapterProxy) updateFlowsIncremental(ctx context.Context, device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) (chan *kafka.RpcResponse, error) {
+	log.Debugw("updateFlowsIncremental",
 		log.Fields{
-			"deviceId":       device.Id,
-			"flowsToAdd":     len(flowChanges.ToAdd.Items),
-			"flowsToDelete":  len(flowChanges.ToRemove.Items),
-			"groupsToAdd":    len(groupChanges.ToAdd.Items),
-			"groupsToDelete": len(groupChanges.ToRemove.Items),
-			"groupsToUpdate": len(groupChanges.ToUpdate.Items),
+			"device-id":             device.Id,
+			"flow-to-add-count":     len(flowChanges.ToAdd.Items),
+			"flow-to-delete-count":  len(flowChanges.ToRemove.Items),
+			"group-to-add-count":    len(groupChanges.ToAdd.Items),
+			"group-to-delete-count": len(groupChanges.ToRemove.Items),
+			"group-to-update-count": len(groupChanges.ToUpdate.Items),
 		})
 	toTopic := ap.getAdapterTopic(device.Adapter)
 	rpc := "update_flows_incrementally"
-	args := make([]*kafka.KVArg, 4)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
+		{Key: "flow_changes", Value: flowChanges},
+		{Key: "group_changes", Value: groupChanges},
+		{Key: "flow_metadata", Value: flowMetadata},
 	}
-	args[1] = &kafka.KVArg{
-		Key:   "flow_changes",
-		Value: flowChanges,
-	}
-	args[2] = &kafka.KVArg{
-		Key:   "group_changes",
-		Value: groupChanges,
-	}
-
-	args[3] = &kafka.KVArg{
-		Key:   "flow_metadata",
-		Value: flowMetadata,
-	}
-	// Use a device specific topic as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(context.TODO(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// UpdatePmConfigs invokes update pm configs rpc
-func (ap *AdapterProxy) UpdatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) error {
-	log.Debugw("UpdatePmConfigs", log.Fields{"deviceId": device.Id})
+// updatePmConfigs invokes update pm configs rpc
+func (ap *AdapterProxy) updatePmConfigs(ctx context.Context, device *voltha.Device, pmConfigs *voltha.PmConfigs) (chan *kafka.RpcResponse, error) {
+	log.Debugw("updatePmConfigs", log.Fields{"device-id": device.Id, "pm-configs-id": pmConfigs.Id})
 	toTopic := ap.getAdapterTopic(device.Adapter)
 	rpc := "Update_pm_config"
-	args := make([]*kafka.KVArg, 2)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
+		{Key: "pm_configs", Value: pmConfigs},
 	}
-	args[1] = &kafka.KVArg{
-		Key:   "pm_configs",
-		Value: pmConfigs,
-	}
-
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("UpdatePmConfigs-response", log.Fields{"deviceid": device.Id, "success": success})
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// ReceivePacketOut - TODO
-func (ap *AdapterProxy) ReceivePacketOut(deviceID voltha.ID, egressPortNo int, msg interface{}) error {
-	log.Debug("ReceivePacketOut")
-	return nil
-}
-
-func (ap *AdapterProxy) SuppressEvent(filter *voltha.EventFilter) error {
-	log.Debug("SuppressEvent")
-	return nil
-}
-
-func (ap *AdapterProxy) UnSuppressEvent(filter *voltha.EventFilter) error {
-	log.Debug("UnSuppressEvent")
-	return nil
-}
-
-// SimulateAlarm invokes simulate alarm rpc
-func (ap *AdapterProxy) SimulateAlarm(ctx context.Context, device *voltha.Device, simulatereq *voltha.SimulateAlarmRequest) error {
-	log.Debugw("SimulateAlarm", log.Fields{"id": simulatereq.Id})
+// simulateAlarm invokes simulate alarm rpc
+func (ap *AdapterProxy) simulateAlarm(ctx context.Context, device *voltha.Device, simulateReq *voltha.SimulateAlarmRequest) (chan *kafka.RpcResponse, error) {
+	log.Debugw("simulateAlarm", log.Fields{"device-id": device.Id, "simulate-req-id": simulateReq.Id})
 	rpc := "simulate_alarm"
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	args := make([]*kafka.KVArg, 2)
-	args[0] = &kafka.KVArg{
-		Key:   "device",
-		Value: device,
+	args := []*kafka.KVArg{
+		{Key: "device", Value: device},
+		{Key: "request", Value: simulateReq},
 	}
-	args[1] = &kafka.KVArg{
-		Key:   "request",
-		Value: simulatereq,
-	}
-
-	// Use a device topic for the response as we are the only core handling requests for this device
 	replyToTopic := ap.getCoreTopic()
 	ap.deviceTopicRegistered = true
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("SimulateAlarm-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-func (ap *AdapterProxy) disablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
+func (ap *AdapterProxy) disablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
 	log.Debugw("disablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
 	rpc := "disable_port"
-	deviceID := &ic.StrType{Val: device.Id}
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	// Use a device specific topic to send the request.  The adapter handling the device creates a device
-	// specific topic
-	args := make([]*kafka.KVArg, 2)
-	args[0] = &kafka.KVArg{
-		Key:   "deviceId",
-		Value: deviceID,
+	args := []*kafka.KVArg{
+		{Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
+		{Key: "port", Value: port},
 	}
-
-	args[1] = &kafka.KVArg{
-		Key:   "port",
-		Value: port,
-	}
-
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("disablePort-response", log.Fields{"device-id": device.Id, "port-no": port.PortNo, "success": success})
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-func (ap *AdapterProxy) enablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
+func (ap *AdapterProxy) enablePort(ctx context.Context, device *voltha.Device, port *voltha.Port) (chan *kafka.RpcResponse, error) {
 	log.Debugw("enablePort", log.Fields{"device-id": device.Id, "port-no": port.PortNo})
 	rpc := "enable_port"
-	deviceID := &ic.StrType{Val: device.Id}
 	toTopic := ap.getAdapterTopic(device.Adapter)
-	// Use a device specific topic to send the request.  The adapter handling the device creates a device
-	// specific topic
-	args := make([]*kafka.KVArg, 2)
-	args[0] = &kafka.KVArg{
-		Key:   "deviceId",
-		Value: deviceID,
+	args := []*kafka.KVArg{
+		{Key: "deviceId", Value: &ic.StrType{Val: device.Id}},
+		{Key: "port", Value: port},
 	}
-
-	args[1] = &kafka.KVArg{
-		Key:   "port",
-		Value: port,
-	}
-
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
-	log.Debugw("enablePort-response", log.Fields{"device-id": device.Id, "port-no": port.PortNo, "success": success})
-	return unPackResponse(rpc, device.Id, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
 }
 
-// ChildDeviceLost invokes Child device_lost rpc
-func (ap *AdapterProxy) ChildDeviceLost(ctx context.Context, deviceType string, pDeviceID string, pPortNo uint32, onuID uint32) error {
-	log.Debugw("ChildDeviceLost", log.Fields{"pDeviceId": pDeviceID, "pPortNo": pPortNo, "onuID": onuID})
+// childDeviceLost invokes child device_lost rpc
+func (ap *AdapterProxy) childDeviceLost(ctx context.Context, deviceType string, pDeviceID string, pPortNo uint32, onuID uint32) (chan *kafka.RpcResponse, error) {
+	log.Debugw("childDeviceLost", log.Fields{"parent-device-id": pDeviceID, "parent-port-no": pPortNo, "onu-id": onuID})
 	rpc := "child_device_lost"
 	toTopic := ap.getAdapterTopic(deviceType)
-	dID := &ic.StrType{Val: pDeviceID}
-	PortNo := &ic.IntType{Val: int64(pPortNo)}
-	oID := &ic.IntType{Val: int64(onuID)}
 	args := []*kafka.KVArg{
-		{
-			Key:   "pDeviceId",
-			Value: dID,
-		},
-		{
-			Key:   "pPortNo",
-			Value: PortNo,
-		},
-		{
-			Key:   "onuID",
-			Value: oID,
-		}}
-
-	// Use a device specific topic as we are the only core handling requests for this device
+		{Key: "pDeviceId", Value: &ic.StrType{Val: pDeviceID}},
+		{Key: "pPortNo", Value: &ic.IntType{Val: int64(pPortNo)}},
+		{Key: "onuID", Value: &ic.IntType{Val: int64(onuID)}},
+	}
 	replyToTopic := ap.getCoreTopic()
-	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, pDeviceID, args...)
-	log.Debugw("ChildDeviceLost-response", log.Fields{"pDeviceId": pDeviceID, "success": success})
-
-	return unPackResponse(rpc, pDeviceID, success, result)
+	return ap.sendRPC(ctx, rpc, &toTopic, &replyToTopic, true, pDeviceID, args...)
 }