[VOL-2164] Update rw-core to use the Async Kafka API

This commit consists of the following:

1. Process per-device requests in the Core in the order they are
received. If there are lots of requests on a given device then
there will be some latencies introduced due to ordering.  With
recent changes in the model along with keeping the request lock
to a minimal then these latencies are reduced.  Testing did not
show and noticeable latencies.

2) Keep the request lock from the moment a request started
processing to the moment that request is sent to kafka (when
applicable).  Adapter responses are received and processed
asynchronously. Therefore, an adapter can takes all the time it
needs to process a transaction.  The Core still has a context
with timeout (configurable) to cater for cases where the adapter
does not return a response.

3) Adapter requests are processed to completion before sending a
reponse back to the adapter.  Previously, in some cases, a
separate go routine was created to process the request and a
successful response is sent to the adapter.  Now if the request
fails then the adapter will receive an error. The adapter
requests for a given device are therefore processed in the
order they are received.

4) Some changes are made when retrieving a handler to execute
a device state transition.  This was necessary as there was some
transition overlap found.

Update after multiple reviews.

Change-Id: I55a189efec1549a662f2d71e18e6eca9015a3a17
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index fd07c7e..85af539 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -19,7 +19,6 @@
 import (
 	"context"
 	"errors"
-
 	"github.com/golang/protobuf/ptypes"
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/db/model"
@@ -28,29 +27,27 @@
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
+	"time"
 )
 
 // AdapterRequestHandlerProxy represent adapter request handler proxy attributes
 type AdapterRequestHandlerProxy struct {
-	TestMode                  bool
 	coreInstanceID            string
 	deviceMgr                 *DeviceManager
 	lDeviceMgr                *LogicalDeviceManager
 	adapterMgr                *AdapterManager
 	localDataProxy            *model.Proxy
 	clusterDataProxy          *model.Proxy
-	defaultRequestTimeout     int64
-	longRunningRequestTimeout int64
+	defaultRequestTimeout     time.Duration
+	longRunningRequestTimeout time.Duration
 	coreInCompetingMode       bool
 	core                      *Core
 }
 
 // NewAdapterRequestHandlerProxy assigns values for adapter request handler proxy attributes and returns the new instance
 func NewAdapterRequestHandlerProxy(core *Core, coreInstanceID string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
-	aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy, incompetingMode bool, longRunningRequestTimeout int64,
-	defaultRequestTimeout int64) *AdapterRequestHandlerProxy {
+	aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy, incompetingMode bool, longRunningRequestTimeout time.Duration,
+	defaultRequestTimeout time.Duration) *AdapterRequestHandlerProxy {
 	var proxy AdapterRequestHandlerProxy
 	proxy.core = core
 	proxy.coreInstanceID = coreInstanceID
@@ -67,7 +64,7 @@
 
 // This is a helper function that attempts to acquire the request by using the device ownership model
 func (rhp *AdapterRequestHandlerProxy) takeRequestOwnership(ctx context.Context, transactionID string, devID string, maxTimeout ...int64) (*KVTransaction, error) {
-	timeout := rhp.defaultRequestTimeout
+	timeout := rhp.defaultRequestTimeout.Milliseconds()
 	if len(maxTimeout) > 0 {
 		timeout = maxTimeout[0]
 	}
@@ -131,7 +128,7 @@
 			}
 		}
 	}
-	log.Debugw("Register", log.Fields{"Adapter": *adapter, "DeviceTypes": deviceTypes, "transactionID": transactionID.Val, "coreID": rhp.coreInstanceID})
+	log.Debugw("Register", log.Fields{"adapter": *adapter, "device-types": deviceTypes, "transaction-id": transactionID.Val, "core-id": rhp.coreInstanceID})
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
@@ -146,10 +143,6 @@
 		}
 		defer txn.Close(context.TODO())
 	}
-
-	if rhp.TestMode { // Execute only for test cases
-		return &voltha.CoreInstance{InstanceId: "CoreInstance"}, nil
-	}
 	return rhp.adapterMgr.registerAdapter(adapter, deviceTypes)
 }
 
@@ -189,17 +182,12 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return &voltha.Device{Id: pID.Id}, nil
-	}
-
 	// Get the device via the device manager
 	device, err := rhp.deviceMgr.GetDevice(context.TODO(), pID.Id)
 	if err != nil {
-		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+		log.Debugw("get-device-failed", log.Fields{"deviceID": pID.Id, "error": err})
 	}
-	log.Debugw("GetDevice-response", log.Fields{"deviceID": pID.Id})
-	return device, nil
+	return device, err
 }
 
 // DeviceUpdate updates device using adapter data
@@ -239,16 +227,12 @@
 	}
 
 	log.Debugw("DeviceUpdate got txn", log.Fields{"deviceID": device.Id, "transactionID": transactionID.Val})
-	if rhp.TestMode { // Execute only for test cases
-		return new(empty.Empty), nil
+
+	if err := rhp.deviceMgr.updateDeviceUsingAdapterData(context.TODO(), device); err != nil {
+		log.Debugw("unable-to-update-device-using-adapter-data", log.Fields{"error": err})
+		return nil, err
 	}
-	go func() {
-		err := rhp.deviceMgr.updateDeviceUsingAdapterData(context.TODO(), device)
-		if err != nil {
-			log.Errorw("unable-to-update-device-using-adapter-data", log.Fields{"error": err})
-		}
-	}()
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // GetChildDevice returns details of child device
@@ -305,9 +289,6 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return &voltha.Device{Id: pID.Id}, nil
-	}
 	return rhp.deviceMgr.GetChildDevice(context.TODO(), pID.Id, serialNumber.Val, onuID.Val, parentPortNo.Val)
 }
 
@@ -347,9 +328,6 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return &voltha.Device{Id: proxyAddress.DeviceId}, nil
-	}
 	return rhp.deviceMgr.GetChildDeviceWithProxyAddress(context.TODO(), proxyAddress)
 }
 
@@ -383,12 +361,7 @@
 		}
 	}
 	log.Debugw("GetPorts", log.Fields{"deviceID": deviceID.Id, "portype": pt.Val, "transactionID": transactionID.Val})
-	if rhp.TestMode { // Execute only for test cases
-		aPort := &voltha.Port{Label: "test_port"}
-		allPorts := &voltha.Ports{}
-		allPorts.Items = append(allPorts.Items, aPort)
-		return allPorts, nil
-	}
+
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
 		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
@@ -438,15 +411,10 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return &voltha.Devices{Items: nil}, nil
-	}
-
 	return rhp.deviceMgr.getAllChildDevices(context.TODO(), pID.Id)
 }
 
-// ChildDeviceDetected is invoked when a child device is detected.  The following
-// parameters are expected:
+// ChildDeviceDetected is invoked when a child device is detected.  The following parameters are expected:
 // {parent_device_id, parent_port_no, child_device_type, channel_id, vendor_id, serial_number)
 func (rhp *AdapterRequestHandlerProxy) ChildDeviceDetected(args []*ic.Argument) (*voltha.Device, error) {
 	if len(args) < 5 {
@@ -521,16 +489,11 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
-	}
 	device, err := rhp.deviceMgr.childDeviceDetected(context.TODO(), pID.Id, portNo.Val, dt.Val, chnlID.Val, vendorID.Val, serialNumber.Val, onuID.Val)
 	if err != nil {
-		log.Errorw("child-detection-failed", log.Fields{"parentID": pID.Id, "onuID": onuID.Val, "error": err})
-		return nil, err
+		log.Debugw("child-detection-failed", log.Fields{"parentID": pID.Id, "onuID": onuID.Val, "error": err})
 	}
-
-	return device, nil
+	return device, err
 }
 
 // DeviceStateUpdate updates device status
@@ -581,19 +544,12 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
+	if err := rhp.deviceMgr.updateDeviceStatus(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
+		voltha.ConnectStatus_Types(connStatus.Val)); err != nil {
+		log.Debugw("unable-to-update-device-status", log.Fields{"error": err})
+		return nil, err
 	}
-	// When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
-	go func() {
-		err := rhp.deviceMgr.updateDeviceStatus(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
-			voltha.ConnectStatus_Types(connStatus.Val))
-		if err != nil {
-			log.Errorw("unable-to-update-device-status", log.Fields{"error": err})
-		}
-	}()
-
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // ChildrenStateUpdate updates child device status
@@ -644,24 +600,13 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
-	}
-
 	// When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
-	go func() {
-		err := rhp.deviceMgr.updateChildrenStatus(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
-			voltha.ConnectStatus_Types(connStatus.Val))
-		if err != nil {
-			log.Errorw("unable-to-update-children-status", log.Fields{"error": err})
-		}
-	}()
-
-	//if err := rhp.deviceMgr.updateChildrenStatus(deviceID.ID, voltha.OperStatus_OperStatus(operStatus.Val),
-	//	voltha.ConnectStatus_ConnectStatus(connStatus.Val)); err != nil {
-	//	return nil, err
-	//}
-	return new(empty.Empty), nil
+	if err := rhp.deviceMgr.updateChildrenStatus(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
+		voltha.ConnectStatus_Types(connStatus.Val)); err != nil {
+		log.Debugw("unable-to-update-children-status", log.Fields{"error": err})
+		return nil, err
+	}
+	return &empty.Empty{}, nil
 }
 
 // PortsStateUpdate updates the ports state related to the device
@@ -705,18 +650,11 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
+	if err := rhp.deviceMgr.updatePortsState(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val)); err != nil {
+		log.Debugw("unable-to-update-ports-state", log.Fields{"error": err})
+		return nil, err
 	}
-
-	go func() {
-		err := rhp.deviceMgr.updatePortsState(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val))
-		if err != nil {
-			log.Errorw("unable-to-update-ports-state", log.Fields{"error": err})
-		}
-	}()
-
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // PortStateUpdate updates the port state of the device
@@ -773,26 +711,15 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
+	if err := rhp.deviceMgr.updatePortState(context.TODO(), deviceID.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
+		voltha.OperStatus_Types(operStatus.Val)); err != nil {
+		// If the error doesn't change behavior and is essentially ignored, it is not an error, it is a
+		// warning.
+		// TODO: VOL-2707
+		log.Debugw("unable-to-update-port-state", log.Fields{"error": err})
+		return nil, err
 	}
-
-	go func() {
-		if err := rhp.deviceMgr.updatePortState(context.TODO(), deviceID.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
-			voltha.OperStatus_Types(operStatus.Val)); err != nil {
-			// If the error doesn't change behavior and is
-			// essentially ignored, it is not an error, it is a
-			// warning.
-			// TODO: VOL-2707
-			log.Warnw("unable-to-update-port-state", log.Fields{"error": err})
-		}
-	}()
-
-	//if err := rhp.deviceMgr.updatePortState(deviceID.ID, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
-	//	voltha.OperStatus_OperStatus(operStatus.Val)); err != nil {
-	//	return nil, err
-	//}
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // DeleteAllPorts deletes all ports of device
@@ -830,18 +757,11 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
+	if err := rhp.deviceMgr.deleteAllPorts(context.TODO(), deviceID.Id); err != nil {
+		log.Debugw("unable-to-delete-ports", log.Fields{"error": err})
+		return nil, err
 	}
-
-	go func() {
-		err := rhp.deviceMgr.deleteAllPorts(context.TODO(), deviceID.Id)
-		if err != nil {
-			log.Errorw("unable-to-delete-ports", log.Fields{"error": err})
-		}
-	}()
-
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // ChildDevicesLost indicates that a parent device is in a state (Disabled) where it cannot manage the child devices.
@@ -880,18 +800,11 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
+	if err := rhp.deviceMgr.childDevicesLost(context.TODO(), parentDeviceID.Id); err != nil {
+		log.Debugw("unable-to-disable-child-devices", log.Fields{"error": err})
+		return nil, err
 	}
-
-	go func() {
-		err := rhp.deviceMgr.childDevicesLost(context.TODO(), parentDeviceID.Id)
-		if err != nil {
-			log.Errorw("unable-to-disable-child-devices", log.Fields{"error": err})
-		}
-	}()
-
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // ChildDevicesDetected invoked by an adapter when child devices are found, typically after after a disable/enable sequence.
@@ -930,16 +843,11 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
-	}
-
 	if err := rhp.deviceMgr.childDevicesDetected(context.TODO(), parentDeviceID.Id); err != nil {
-		log.Errorw("child-devices-dection-failed", log.Fields{"parentID": parentDeviceID.Id, "error": err})
+		log.Debugw("child-devices-detection-failed", log.Fields{"parentID": parentDeviceID.Id, "error": err})
 		return nil, err
 	}
-
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // PortCreated adds port to device
@@ -983,17 +891,11 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
+	if err := rhp.deviceMgr.addPort(context.TODO(), deviceID.Id, port); err != nil {
+		log.Debugw("unable-to-add-port", log.Fields{"error": err})
+		return nil, err
 	}
-	go func() {
-		err := rhp.deviceMgr.addPort(context.TODO(), deviceID.Id, port)
-		if err != nil {
-			log.Errorw("unable-to-add-port", log.Fields{"error": err})
-		}
-	}()
-
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // DevicePMConfigUpdate initializes the pm configs as defined by the adapter.
@@ -1032,18 +934,11 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
+	if err := rhp.deviceMgr.initPmConfigs(context.TODO(), pmConfigs.Id, pmConfigs); err != nil {
+		log.Debugw("unable-to-initialize-pm-configs", log.Fields{"error": err})
+		return nil, err
 	}
-
-	go func() {
-		err := rhp.deviceMgr.initPmConfigs(context.TODO(), pmConfigs.Id, pmConfigs)
-		if err != nil {
-			log.Errorw("unable-to-initialize-pm-configs", log.Fields{"error": err})
-		}
-	}()
-
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // PacketIn sends the incoming packet of device
@@ -1095,17 +990,13 @@
 		}
 		defer txn.Close(context.TODO())
 	}
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
-	}
-	go func() {
-		err := rhp.deviceMgr.PacketIn(context.TODO(), deviceID.Id, uint32(portNo.Val), transactionID.Val, packet.Payload)
-		if err != nil {
-			log.Errorw("unable-to-receive-packet-from-adapter", log.Fields{"error": err})
-		}
-	}()
 
-	return new(empty.Empty), nil
+	if err := rhp.deviceMgr.PacketIn(context.TODO(), deviceID.Id, uint32(portNo.Val), transactionID.Val, packet.Payload); err != nil {
+		log.Debugw("unable-to-receive-packet-from-adapter", log.Fields{"error": err})
+		return nil, err
+
+	}
+	return &empty.Empty{}, nil
 }
 
 // UpdateImageDownload updates image download
@@ -1150,19 +1041,11 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
+	if err := rhp.deviceMgr.updateImageDownload(context.TODO(), deviceID.Id, img); err != nil {
+		log.Debugw("unable-to-update-image-download", log.Fields{"error": err})
+		return nil, err
 	}
-	go func() {
-		err := rhp.deviceMgr.updateImageDownload(context.TODO(), deviceID.Id, img)
-		if err != nil {
-			log.Errorw("unable-to-update-image-download", log.Fields{"error": err})
-		}
-	}()
-	//if err := rhp.deviceMgr.updateImageDownload(deviceID.ID, img); err != nil {
-	//	return nil, err
-	//}
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // ReconcileChildDevices reconciles child devices
@@ -1200,19 +1083,11 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
+	if err := rhp.deviceMgr.reconcileChildDevices(context.TODO(), parentDeviceID.Id); err != nil {
+		log.Debugw("unable-to-reconcile-child-devices", log.Fields{"error": err})
+		return nil, err
 	}
-
-	// Run it in its own routine
-	go func() {
-		err := rhp.deviceMgr.reconcileChildDevices(context.TODO(), parentDeviceID.Id)
-		if err != nil {
-			log.Errorw("unable-to-reconcile-child-devices", log.Fields{"error": err})
-		}
-	}()
-
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }
 
 // DeviceReasonUpdate updates device reason
@@ -1257,17 +1132,10 @@
 		defer txn.Close(context.TODO())
 	}
 
-	if rhp.TestMode { // Execute only for test cases
-		return nil, nil
+	if err := rhp.deviceMgr.updateDeviceReason(context.TODO(), deviceID.Id, reason.Val); err != nil {
+		log.Debugw("unable-to-update-device-reason", log.Fields{"error": err})
+		return nil, err
+
 	}
-
-	// Run it in its own routine (w/ background context)
-	go func() {
-		err := rhp.deviceMgr.updateDeviceReason(context.TODO(), deviceID.Id, reason.Val)
-		if err != nil {
-			log.Errorw("unable-to-update-device-reason", log.Fields{"error": err})
-		}
-	}()
-
-	return new(empty.Empty), nil
+	return &empty.Empty{}, nil
 }