[VOL-2164] Update rw-core to use the Async Kafka API

This commit consists of the following:

1. Process per-device requests in the Core in the order they are
received. If there are lots of requests on a given device then
there will be some latencies introduced due to ordering.  With
recent changes in the model along with keeping the request lock
to a minimal then these latencies are reduced.  Testing did not
show and noticeable latencies.

2) Keep the request lock from the moment a request started
processing to the moment that request is sent to kafka (when
applicable).  Adapter responses are received and processed
asynchronously. Therefore, an adapter can takes all the time it
needs to process a transaction.  The Core still has a context
with timeout (configurable) to cater for cases where the adapter
does not return a response.

3) Adapter requests are processed to completion before sending a
reponse back to the adapter.  Previously, in some cases, a
separate go routine was created to process the request and a
successful response is sent to the adapter.  Now if the request
fails then the adapter will receive an error. The adapter
requests for a given device are therefore processed in the
order they are received.

4) Some changes are made when retrieving a handler to execute
a device state transition.  This was necessary as there was some
transition overlap found.

Update after multiple reviews.

Change-Id: I55a189efec1549a662f2d71e18e6eca9015a3a17
diff --git a/rw_core/core/adapter_proxy_test.go b/rw_core/core/adapter_proxy_test.go
index 3989142..36e5e51 100755
--- a/rw_core/core/adapter_proxy_test.go
+++ b/rw_core/core/adapter_proxy_test.go
@@ -18,19 +18,22 @@
 import (
 	"context"
 	"crypto/rand"
-	"testing"
-	"time"
-
+	"github.com/golang/protobuf/ptypes"
+	any2 "github.com/golang/protobuf/ptypes/any"
 	cm "github.com/opencord/voltha-go/rw_core/mocks"
 	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
+	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 	of "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"github.com/stretchr/testify/assert"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
+	"strings"
+	"testing"
+	"time"
 )
 
 const (
@@ -52,7 +55,7 @@
 		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
 	}
 	// Set the log level to Warning
-	log.SetAllLogLevel(2)
+	log.SetAllLogLevel(log.WarnLevel)
 
 	var err error
 
@@ -63,6 +66,7 @@
 	coreKafkaICProxy = kafka.NewInterContainerProxy(
 		kafka.MsgClient(kc),
 		kafka.DefaultTopic(&kafka.Topic{Name: coreName}))
+
 	if err = coreKafkaICProxy.Start(); err != nil {
 		log.Fatalw("Failure-starting-core-kafka-intercontainerProxy", log.Fields{"error": err})
 	}
@@ -78,6 +82,7 @@
 		kafka.MsgClient(kc),
 		kafka.DefaultTopic(&kafka.Topic{Name: adapterName}),
 		kafka.RequestHandlerInterface(adapterReqHandler))
+
 	if err = adapterKafkaICProxy.Start(); err != nil {
 		log.Fatalw("Failure-starting-adapter-kafka-intercontainerProxy", log.Fields{"error": err})
 	}
@@ -97,42 +102,62 @@
 	assert.NotNil(t, ap)
 }
 
+func waitForResponse(ctx context.Context, ch chan *kafka.RpcResponse) (*any2.Any, error) {
+	select {
+	case rpcResponse, ok := <-ch:
+		if !ok {
+			return nil, status.Error(codes.Aborted, "channel-closed")
+		} else if rpcResponse.Err != nil {
+			return nil, rpcResponse.Err
+		} else {
+			return rpcResponse.Reply, nil
+		}
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	}
+}
+
 func testSimpleRequests(t *testing.T) {
-	type simpleRequest func(context.Context, *voltha.Device) error
+	type simpleRequest func(context.Context, *voltha.Device) (chan *kafka.RpcResponse, error)
 	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
 	simpleRequests := []simpleRequest{
-		ap.AdoptDevice,
-		ap.DisableDevice,
-		ap.RebootDevice,
-		ap.DeleteDevice,
-		ap.ReconcileDevice,
-		ap.ReEnableDevice,
+		ap.adoptDevice,
+		ap.disableDevice,
+		ap.rebootDevice,
+		ap.deleteDevice,
+		ap.reconcileDevice,
+		ap.reEnableDevice,
 	}
 	for _, f := range simpleRequests {
-		//Success
+		// Success
 		d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 		ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
-		err := f(ctx, d)
-		cancel()
+		rpcResponse, err := f(ctx, d)
 		assert.Nil(t, err)
+		_, err = waitForResponse(ctx, rpcResponse)
+		assert.Nil(t, err)
+		cancel()
 
 		//	Failure - invalid adapter
-		expectedError := status.Error(codes.Canceled, "context deadline exceeded")
+		expectedError := "context deadline exceeded"
 		d = &voltha.Device{Id: "deviceId", Adapter: "adapter_mock_1"}
 		ctx, cancel = context.WithTimeout(context.Background(), 20*time.Millisecond)
-		err = f(ctx, d)
+		rpcResponse, err = f(ctx, d)
+		assert.Nil(t, err)
+		_, err = waitForResponse(ctx, rpcResponse)
 		cancel()
 		assert.NotNil(t, err)
-		assert.Equal(t, expectedError.Error(), err.Error())
+		assert.True(t, strings.Contains(err.Error(), expectedError))
 
-		// Failure - short timeout
-		expectedError = status.Error(codes.Canceled, "context deadline exceeded")
+		// Failure -  timeout
 		d = &voltha.Device{Id: "deviceId", Adapter: adapterName}
 		ctx, cancel = context.WithTimeout(context.Background(), 100*time.Nanosecond)
-		err = f(ctx, d)
+		rpcResponse, err = f(ctx, d)
+		assert.Nil(t, err)
+		_, err = waitForResponse(ctx, rpcResponse)
 		cancel()
 		assert.NotNil(t, err)
-		assert.Equal(t, expectedError.Error(), err.Error())
+		assert.True(t, strings.Contains(err.Error(), expectedError))
 	}
 }
 
@@ -140,8 +165,13 @@
 	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
-	switchCap, err := ap.GetOfpDeviceInfo(ctx, d)
-	cancel()
+	defer cancel()
+	rpcResponse, err := ap.getOfpDeviceInfo(ctx, d)
+	assert.Nil(t, err)
+	response, err := waitForResponse(ctx, rpcResponse)
+	assert.Nil(t, err)
+	switchCap := &ic.SwitchCapability{}
+	err = ptypes.UnmarshalAny(response, switchCap)
 	assert.Nil(t, err)
 	assert.NotNil(t, switchCap)
 	expectedCap, _ := adapter.Get_ofp_device_info(d)
@@ -152,13 +182,18 @@
 	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
+	defer cancel()
 	portNo := uint32(1)
-	portInfo, err := ap.GetOfpPortInfo(ctx, d, portNo)
-	cancel()
+	rpcResponse, err := ap.getOfpPortInfo(ctx, d, portNo)
 	assert.Nil(t, err)
-	assert.NotNil(t, portInfo)
+	response, err := waitForResponse(ctx, rpcResponse)
+	assert.Nil(t, err)
+	portCap := &ic.PortCapability{}
+	err = ptypes.UnmarshalAny(response, portCap)
+	assert.Nil(t, err)
+	assert.NotNil(t, portCap)
 	expectedPortInfo, _ := adapter.Get_ofp_port_info(d, int64(portNo))
-	assert.Equal(t, portInfo.String(), expectedPortInfo.String())
+	assert.Equal(t, portCap.String(), expectedPortInfo.String())
 }
 
 func testPacketOut(t *testing.T) {
@@ -167,18 +202,26 @@
 	outPort := uint32(1)
 	packet, err := getRandomBytes(50)
 	assert.Nil(t, err)
-	err = ap.packetOut(context.Background(), adapterName, d.Id, outPort, &of.OfpPacketOut{Data: packet})
+	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+	defer cancel()
+	rpcResponse, err := ap.packetOut(ctx, adapterName, d.Id, outPort, &of.OfpPacketOut{Data: packet})
+	assert.Nil(t, err)
+	_, err = waitForResponse(ctx, rpcResponse)
 	assert.Nil(t, err)
 }
 
 func testFlowUpdates(t *testing.T) {
 	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
-	err := ap.UpdateFlowsBulk(context.Background(), d, &voltha.Flows{}, &voltha.FlowGroups{}, &voltha.FlowMetadata{})
+	_, err := ap.updateFlowsBulk(context.Background(), d, &voltha.Flows{}, &voltha.FlowGroups{}, &voltha.FlowMetadata{})
 	assert.Nil(t, err)
 	flowChanges := &voltha.FlowChanges{ToAdd: &voltha.Flows{Items: nil}, ToRemove: &voltha.Flows{Items: nil}}
 	groupChanges := &voltha.FlowGroupChanges{ToAdd: &voltha.FlowGroups{Items: nil}, ToRemove: &voltha.FlowGroups{Items: nil}, ToUpdate: &voltha.FlowGroups{Items: nil}}
-	err = ap.UpdateFlowsIncremental(context.Background(), d, flowChanges, groupChanges, &voltha.FlowMetadata{})
+	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+	defer cancel()
+	rpcResponse, err := ap.updateFlowsIncremental(ctx, d, flowChanges, groupChanges, &voltha.FlowMetadata{})
+	assert.Nil(t, err)
+	_, err = waitForResponse(ctx, rpcResponse)
 	assert.Nil(t, err)
 }
 
@@ -186,8 +229,10 @@
 	ap := NewAdapterProxy(coreKafkaICProxy, coreName)
 	d := &voltha.Device{Id: "deviceId", Adapter: adapterName}
 	ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
-	err := ap.UpdatePmConfigs(ctx, d, &voltha.PmConfigs{})
-	cancel()
+	defer cancel()
+	rpcResponse, err := ap.updatePmConfigs(ctx, d, &voltha.PmConfigs{})
+	assert.Nil(t, err)
+	_, err = waitForResponse(ctx, rpcResponse)
 	assert.Nil(t, err)
 }