[VOL-2164] Update rw-core to use the Async Kafka API

This commit consists of the following:

1. Process per-device requests in the Core in the order they are
received. If there are lots of requests on a given device then
there will be some latencies introduced due to ordering.  With
recent changes in the model along with keeping the request lock
to a minimal then these latencies are reduced.  Testing did not
show and noticeable latencies.

2) Keep the request lock from the moment a request started
processing to the moment that request is sent to kafka (when
applicable).  Adapter responses are received and processed
asynchronously. Therefore, an adapter can takes all the time it
needs to process a transaction.  The Core still has a context
with timeout (configurable) to cater for cases where the adapter
does not return a response.

3) Adapter requests are processed to completion before sending a
reponse back to the adapter.  Previously, in some cases, a
separate go routine was created to process the request and a
successful response is sent to the adapter.  Now if the request
fails then the adapter will receive an error. The adapter
requests for a given device are therefore processed in the
order they are received.

4) Some changes are made when retrieving a handler to execute
a device state transition.  This was necessary as there was some
transition overlap found.

Update after multiple reviews.

Change-Id: I55a189efec1549a662f2d71e18e6eca9015a3a17
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 98c45eb..0d77429 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -19,10 +19,6 @@
 import (
 	"context"
 	"errors"
-	"reflect"
-	"runtime"
-	"sync"
-
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/rw_core/utils"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
@@ -33,6 +29,10 @@
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
+	"reflect"
+	"runtime"
+	"sync"
+	"time"
 )
 
 // DeviceManager represent device manager attributes
@@ -49,7 +49,7 @@
 	clusterDataProxy        *model.Proxy
 	coreInstanceID          string
 	exitChannel             chan int
-	defaultTimeout          int64
+	defaultTimeout          time.Duration
 	devicesLoadingLock      sync.RWMutex
 	deviceLoadingInProgress map[string][]chan int
 }
@@ -65,7 +65,7 @@
 	deviceMgr.clusterDataProxy = core.clusterDataProxy
 	deviceMgr.adapterMgr = core.adapterMgr
 	deviceMgr.lockRootDeviceMap = sync.RWMutex{}
-	deviceMgr.defaultTimeout = core.config.DefaultCoreTimeout
+	deviceMgr.defaultTimeout = time.Duration(core.config.DefaultCoreTimeout) * time.Millisecond
 	deviceMgr.devicesLoadingLock = sync.RWMutex{}
 	deviceMgr.deviceLoadingInProgress = make(map[string][]chan int)
 	return &deviceMgr
@@ -121,7 +121,7 @@
 	if ok {
 		return agent.(*DeviceAgent)
 	}
-	//	Try to load into memory - loading will also create the device agent and set the device ownership
+	// Try to load into memory - loading will also create the device agent and set the device ownership
 	err := dMgr.load(ctx, deviceID)
 	if err == nil {
 		agent, ok = dMgr.deviceAgents.Load(deviceID)
@@ -172,13 +172,13 @@
 	device.Root = true
 	// Create and start a device agent for that device
 	agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
-	dMgr.addDeviceAgentToMap(agent)
 	device, err = agent.start(ctx, device)
 	if err != nil {
-		log.Errorf("Failed to start device")
+		log.Errorw("Fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
 		sendResponse(ctx, ch, err)
 		return
 	}
+	dMgr.addDeviceAgentToMap(agent)
 
 	sendResponse(ctx, ch, device)
 }
@@ -251,19 +251,21 @@
 			// We do not need to stop the child devices as this is taken care by the state machine.
 		}
 		if agent := dMgr.getDeviceAgent(ctx, id); agent != nil {
-			agent.stop(ctx)
+			if err := agent.stop(ctx); err != nil {
+				log.Warnw("unable-to-stop-device-agent", log.Fields{"device-id": agent.deviceID, "error": err})
+			}
 			dMgr.deleteDeviceAgentFromMap(agent)
 			// Abandon the device ownership
 			err := dMgr.core.deviceOwnership.AbandonDevice(id)
 			if err != nil {
-				log.Errorw("unable-to-abandon-device", log.Fields{"error": err})
+				log.Warnw("unable-to-abandon-device", log.Fields{"error": err})
 			}
 		}
 	}
 }
 
 // RunPostDeviceDelete removes any reference of this device
-func (dMgr *DeviceManager) RunPostDeviceDelete(ctx context.Context, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) RunPostDeviceDelete(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
 	log.Infow("RunPostDeviceDelete", log.Fields{"deviceId": cDevice.Id})
 	dMgr.stopManagingDevice(ctx, cDevice.Id)
 	return nil
@@ -273,7 +275,7 @@
 func (dMgr *DeviceManager) GetDevice(ctx context.Context, id string) (*voltha.Device, error) {
 	log.Debugw("GetDevice", log.Fields{"deviceid": id})
 	if agent := dMgr.getDeviceAgent(ctx, id); agent != nil {
-		return agent.getDevice(), nil
+		return agent.getDevice(ctx)
 	}
 	return nil, status.Errorf(codes.NotFound, "%s", id)
 }
@@ -404,19 +406,19 @@
 		return nil, err
 	}
 	if devices != nil {
-		for _, device := range devices.([]interface{}) {
+		for _, d := range devices.([]interface{}) {
+			device := d.(*voltha.Device)
 			// If device is not in memory then set it up
-			if !dMgr.IsDeviceInCache(device.(*voltha.Device).Id) {
-				log.Debugw("loading-device-from-Model", log.Fields{"id": device.(*voltha.Device).Id})
-				agent := newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
+			if !dMgr.IsDeviceInCache(device.Id) {
+				log.Debugw("loading-device-from-Model", log.Fields{"id": device.Id})
+				agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
 				if _, err := agent.start(ctx, nil); err != nil {
-					log.Warnw("failure-starting-agent", log.Fields{"deviceId": device.(*voltha.Device).Id})
-					agent.stop(ctx)
+					log.Warnw("failure-starting-agent", log.Fields{"deviceId": device.Id})
 				} else {
 					dMgr.addDeviceAgentToMap(agent)
 				}
 			}
-			result.Items = append(result.Items, device.(*voltha.Device))
+			result.Items = append(result.Items, device)
 		}
 	}
 	log.Debugw("ListDevices-end", log.Fields{"len": len(result.Items)})
@@ -480,7 +482,6 @@
 				agent := newDeviceAgent(dMgr.adapterProxy, device, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
 				if _, err = agent.start(ctx, nil); err != nil {
 					log.Warnw("Failure loading device", log.Fields{"deviceId": deviceID, "error": err})
-					agent.stop(ctx)
 				} else {
 					dMgr.addDeviceAgentToMap(agent)
 				}
@@ -554,7 +555,10 @@
 		return err
 	}
 	// Get the loaded device details
-	device := dAgent.getDevice()
+	device, err := dAgent.getDevice(ctx)
+	if err != nil {
+		return err
+	}
 
 	// If the device is in Pre-provisioning or deleted state stop here
 	if device.AdminState == voltha.AdminState_PREPROVISIONED || device.AdminState == voltha.AdminState_DELETED {
@@ -679,14 +683,20 @@
 	// to the adapter.   We will therefore bypass the adapter adapter and send the request directly to the adapter via
 	// the adapter_proxy.
 	response := utils.NewResponse()
-	go func(device *voltha.Device) {
-		if err := dMgr.adapterProxy.ReconcileDevice(ctx, device); err != nil {
-			log.Errorw("reconcile-request-failed", log.Fields{"deviceId": device.Id, "error": err})
-			response.Error(status.Errorf(codes.Internal, "device: %s", device.Id))
+	ch, err := dMgr.adapterProxy.reconcileDevice(ctx, device)
+	if err != nil {
+		response.Error(err)
+	}
+	// Wait for adapter response in its own routine
+	go func() {
+		resp, ok := <-ch
+		if !ok {
+			response.Error(status.Errorf(codes.Aborted, "channel-closed-device: %s", device.Id))
+		} else if resp.Err != nil {
+			response.Error(resp.Err)
 		}
 		response.Done()
-	}(device)
-
+	}()
 	return response
 }
 
@@ -751,14 +761,6 @@
 	return status.Errorf(codes.NotFound, "%s", deviceID)
 }
 
-func (dMgr *DeviceManager) deletePeerPorts(ctx context.Context, fromDeviceID string, deviceID string) error {
-	log.Debugw("deletePeerPorts", log.Fields{"fromDeviceId": fromDeviceID, "deviceid": deviceID})
-	if agent := dMgr.getDeviceAgent(ctx, fromDeviceID); agent != nil {
-		return agent.deletePeerPorts(ctx, deviceID)
-	}
-	return status.Errorf(codes.NotFound, "%s", deviceID)
-}
-
 func (dMgr *DeviceManager) addFlowsAndGroups(ctx context.Context, deviceID string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("addFlowsAndGroups", log.Fields{"deviceid": deviceID, "groups:": groups, "flowMetadata": flowMetadata})
 	if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
@@ -894,6 +896,7 @@
 				}
 			}()
 		}
+		return nil
 	}
 	return status.Errorf(codes.NotFound, "%s", deviceID)
 }
@@ -997,28 +1000,30 @@
 	childDevice.SerialNumber = serialNumber
 	childDevice.Root = false
 
-	//Get parent device type
-	parent, err := dMgr.GetDevice(ctx, parentDeviceID)
-	if err != nil {
-		log.Error("no-parent-found", log.Fields{"parentId": parentDeviceID})
+	// Get parent device type
+	pAgent := dMgr.getDeviceAgent(ctx, parentDeviceID)
+	if pAgent == nil {
 		return nil, status.Errorf(codes.NotFound, "%s", parentDeviceID)
 	}
+	if pAgent.deviceType == "" {
+		return nil, status.Errorf(codes.FailedPrecondition, "device Type not set %s", parentDeviceID)
+	}
 
 	if device, err := dMgr.GetChildDevice(ctx, parentDeviceID, serialNumber, onuID, parentPortNo); err == nil {
 		log.Warnw("child-device-exists", log.Fields{"parentId": parentDeviceID, "serialNumber": serialNumber})
 		return device, status.Errorf(codes.AlreadyExists, "%s", serialNumber)
 	}
 
-	childDevice.ProxyAddress = &voltha.Device_ProxyAddress{DeviceId: parentDeviceID, DeviceType: parent.Type, ChannelId: uint32(channelID), OnuId: uint32(onuID)}
+	childDevice.ProxyAddress = &voltha.Device_ProxyAddress{DeviceId: parentDeviceID, DeviceType: pAgent.deviceType, ChannelId: uint32(channelID), OnuId: uint32(onuID)}
 
 	// Create and start a device agent for that device
 	agent := newDeviceAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.clusterDataProxy, dMgr.defaultTimeout)
-	dMgr.addDeviceAgentToMap(agent)
-	childDevice, err = agent.start(ctx, childDevice)
+	childDevice, err := agent.start(ctx, childDevice)
 	if err != nil {
-		log.Error("error-starting-child")
+		log.Errorw("error-starting-child-device", log.Fields{"parent-device-id": childDevice.ParentId, "child-device-id": agent.deviceID, "error": err})
 		return nil, err
 	}
+	dMgr.addDeviceAgentToMap(agent)
 
 	// Since this Core has handled this request then it therefore owns this child device.  Set the
 	// ownership of this device to this Core
@@ -1055,10 +1060,10 @@
 		log.Debugw("no-op-transition", log.Fields{"deviceId": current.Id})
 		return nil
 	}
-	log.Debugw("handler-found", log.Fields{"num-handlers": len(handlers), "isParent": current.Root, "current-data": current})
+	log.Debugw("handler-found", log.Fields{"num-expectedHandlers": len(handlers), "isParent": current.Root, "current-data": current})
 	for _, handler := range handlers {
 		log.Debugw("running-handler", log.Fields{"handler": funcName(handler)})
-		if err := handler(ctx, current); err != nil {
+		if err := handler(ctx, current, previous); err != nil {
 			log.Warnw("handler-failed", log.Fields{"handler": funcName(handler), "error": err})
 			return err
 		}
@@ -1104,7 +1109,7 @@
 }
 
 // CreateLogicalDevice creates logical device in core
-func (dMgr *DeviceManager) CreateLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) CreateLogicalDevice(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
 	log.Info("CreateLogicalDevice")
 	// Verify whether the logical device has already been created
 	if cDevice.ParentId != "" {
@@ -1120,7 +1125,7 @@
 }
 
 // DeleteLogicalDevice deletes logical device from core
-func (dMgr *DeviceManager) DeleteLogicalDevice(ctx context.Context, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) DeleteLogicalDevice(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
 	log.Info("DeleteLogicalDevice")
 	var err error
 	if err = dMgr.logicalDeviceMgr.deleteLogicalDevice(ctx, cDevice); err != nil {
@@ -1151,10 +1156,10 @@
 }
 
 // DeleteLogicalPorts removes the logical ports associated with that deviceId
-func (dMgr *DeviceManager) DeleteLogicalPorts(ctx context.Context, device *voltha.Device) error {
-	log.Info("deleteLogicalPorts")
-	if err := dMgr.logicalDeviceMgr.deleteLogicalPorts(ctx, device.Id); err != nil {
-		log.Warnw("deleteLogical-ports-error", log.Fields{"deviceId": device.Id})
+func (dMgr *DeviceManager) DeleteLogicalPorts(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
+	log.Debugw("delete-all-logical-ports", log.Fields{"device-id": cDevice.Id})
+	if err := dMgr.logicalDeviceMgr.deleteLogicalPorts(ctx, cDevice.Id); err != nil {
+		log.Warnw("deleteLogical-ports-error", log.Fields{"deviceId": cDevice.Id})
 		return err
 	}
 	return nil
@@ -1180,7 +1185,7 @@
 		log.Warnw("failed-getting-device", log.Fields{"deviceId": parentDeviceID, "error": err})
 		return err
 	}
-	return dMgr.DisableAllChildDevices(ctx, parentDevice)
+	return dMgr.DisableAllChildDevices(ctx, parentDevice, nil)
 }
 
 //childDevicesDetected is invoked by an adapter when child devices are found, typically after after a
@@ -1230,15 +1235,15 @@
 */
 
 //DisableAllChildDevices is invoked as a callback when the parent device is disabled
-func (dMgr *DeviceManager) DisableAllChildDevices(ctx context.Context, parentDevice *voltha.Device) error {
+func (dMgr *DeviceManager) DisableAllChildDevices(ctx context.Context, parentCurrDevice *voltha.Device, parentPrevDevice *voltha.Device) error {
 	log.Debug("DisableAllChildDevices")
 	var childDeviceIds []string
 	var err error
-	if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentDevice); err != nil {
-		return status.Errorf(codes.NotFound, "%s", parentDevice.Id)
+	if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentCurrDevice); err != nil {
+		return status.Errorf(codes.NotFound, "%s", parentCurrDevice.Id)
 	}
 	if len(childDeviceIds) == 0 {
-		log.Debugw("no-child-device", log.Fields{"parentDeviceId": parentDevice.Id})
+		log.Debugw("no-child-device", log.Fields{"parentDeviceId": parentCurrDevice.Id})
 	}
 	allChildDisable := true
 	for _, childDeviceID := range childDeviceIds {
@@ -1256,15 +1261,15 @@
 }
 
 //DeleteAllChildDevices is invoked as a callback when the parent device is deleted
-func (dMgr *DeviceManager) DeleteAllChildDevices(ctx context.Context, parentDevice *voltha.Device) error {
+func (dMgr *DeviceManager) DeleteAllChildDevices(ctx context.Context, parentCurrDevice *voltha.Device, parentPrevDevice *voltha.Device) error {
 	log.Debug("DeleteAllChildDevices")
 	var childDeviceIds []string
 	var err error
-	if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentDevice); err != nil {
-		return status.Errorf(codes.NotFound, "%s", parentDevice.Id)
+	if childDeviceIds, err = dMgr.getAllChildDeviceIds(parentCurrDevice); err != nil {
+		return status.Errorf(codes.NotFound, "%s", parentCurrDevice.Id)
 	}
 	if len(childDeviceIds) == 0 {
-		log.Debugw("no-child-device", log.Fields{"parentDeviceId": parentDevice.Id})
+		log.Debugw("no-child-device", log.Fields{"parentDeviceId": parentCurrDevice.Id})
 	}
 	allChildDeleted := true
 	for _, childDeviceID := range childDeviceIds {
@@ -1284,9 +1289,9 @@
 }
 
 //DeleteAllUNILogicalPorts is invoked as a callback when the parent device is deleted
-func (dMgr *DeviceManager) DeleteAllUNILogicalPorts(ctx context.Context, parentDevice *voltha.Device) error {
-	log.Debugw("delete-all-uni-logical-ports", log.Fields{"parent-device-id": parentDevice.Id})
-	if err := dMgr.logicalDeviceMgr.deleteAllUNILogicalPorts(ctx, parentDevice); err != nil {
+func (dMgr *DeviceManager) DeleteAllUNILogicalPorts(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error {
+	log.Debugw("delete-all-uni-logical-ports", log.Fields{"parent-device-id": curr.Id})
+	if err := dMgr.logicalDeviceMgr.deleteAllUNILogicalPorts(ctx, curr); err != nil {
 		return err
 	}
 	return nil
@@ -1325,7 +1330,7 @@
 }
 
 // SetupUNILogicalPorts creates UNI ports on the logical device that represents a child UNI interface
-func (dMgr *DeviceManager) SetupUNILogicalPorts(ctx context.Context, cDevice *voltha.Device) error {
+func (dMgr *DeviceManager) SetupUNILogicalPorts(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
 	log.Info("addUNILogicalPort")
 	if err := dMgr.logicalDeviceMgr.setupUNILogicalPorts(ctx, cDevice); err != nil {
 		log.Warnw("addUNILogicalPort-error", log.Fields{"device": cDevice, "err": err})
@@ -1438,22 +1443,15 @@
 	return nil, status.Errorf(codes.NotFound, "%s", deviceID)
 }
 
-// SetAdminStateToEnable sets admin state of device to enabled
-func (dMgr *DeviceManager) SetAdminStateToEnable(ctx context.Context, cDevice *voltha.Device) error {
-	log.Info("SetAdminStateToEnable")
-	if agent := dMgr.getDeviceAgent(ctx, cDevice.Id); agent != nil {
-		return agent.updateAdminState(ctx, voltha.AdminState_ENABLED)
-	}
-	return status.Errorf(codes.NotFound, "%s", cDevice.Id)
-}
-
-// NotifyInvalidTransition notifies about invalid transition
-func (dMgr *DeviceManager) NotifyInvalidTransition(ctx context.Context, pcDevice *voltha.Device) error {
+func (dMgr *DeviceManager) NotifyInvalidTransition(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
 	log.Errorw("NotifyInvalidTransition", log.Fields{
-		"device":     pcDevice.Id,
-		"adminState": pcDevice.AdminState,
-		"operState":  pcDevice.OperStatus,
-		"connState":  pcDevice.ConnectStatus,
+		"device":           cDevice.Id,
+		"prev-admin-state": pDevice.AdminState,
+		"prev-oper-state":  pDevice.OperStatus,
+		"prev-conn-state":  pDevice.ConnectStatus,
+		"curr-admin-state": cDevice.AdminState,
+		"curr-oper-state":  cDevice.OperStatus,
+		"curr-conn-state":  cDevice.ConnectStatus,
 	})
 	//TODO: notify over kafka?
 	return nil
@@ -1488,7 +1486,7 @@
 	var res interface{}
 	if agent := dMgr.getDeviceAgent(ctx, simulatereq.Id); agent != nil {
 		res = agent.simulateAlarm(ctx, simulatereq)
-		log.Debugw("SimulateAlarm-result", log.Fields{"result": res})
+		log.Debugw("simulateAlarm-result", log.Fields{"result": res})
 	}
 	//TODO CLI always get successful response
 	sendResponse(ctx, ch, res)
@@ -1528,11 +1526,11 @@
 	sendResponse(ctx, ch, res)
 }
 
-// ChildDeviceLost  calls parent adapter to delete child device and all its references
-func (dMgr *DeviceManager) ChildDeviceLost(ctx context.Context, cDevice *voltha.Device) error {
-	log.Debugw("ChildDeviceLost", log.Fields{"deviceid": cDevice.Id})
-	if parentAgent := dMgr.getDeviceAgent(ctx, cDevice.ParentId); parentAgent != nil {
-		return parentAgent.ChildDeviceLost(ctx, cDevice)
+// childDeviceLost  calls parent adapter to delete child device and all its references
+func (dMgr *DeviceManager) ChildDeviceLost(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error {
+	log.Debugw("childDeviceLost", log.Fields{"device-id": curr.Id})
+	if parentAgent := dMgr.getDeviceAgent(ctx, curr.ParentId); parentAgent != nil {
+		return parentAgent.ChildDeviceLost(ctx, curr)
 	}
-	return status.Errorf(codes.NotFound, "%s", cDevice.Id)
+	return status.Errorf(codes.NotFound, "%s", curr.Id)
 }