[VOL-1435] Initial submission for device management integration
tests.  This update also comprises of some fixes with kafka
consumer and random mac address generation.

Change-Id: I4f8081752af646c3ed218ab17a541abb1b70cf5c
diff --git a/adapters/common/utils.go b/adapters/common/utils.go
index 8cade8f..98468b0 100644
--- a/adapters/common/utils.go
+++ b/adapters/common/utils.go
@@ -23,7 +23,7 @@
 
 //GetRandomSerialNumber returns a serial number formatted as "HOST:PORT"
 func GetRandomSerialNumber() string {
-	rand.Seed(time.Now().Unix())
+	rand.Seed(time.Now().UnixNano())
 	return fmt.Sprintf("%d.%d.%d.%d:%d",
 		rand.Intn(255),
 		rand.Intn(255),
@@ -35,7 +35,7 @@
 
 //GetRandomMacAddress returns a random mac address
 func GetRandomMacAddress() string {
-	rand.Seed(time.Now().Unix())
+	rand.Seed(time.Now().UnixNano())
 	return fmt.Sprintf("%02x:%02x:%02x:%02x:%02x:%02x",
 		rand.Intn(128),
 		rand.Intn(128),
diff --git a/adapters/simulated_olt/adaptercore/simulated_olt.go b/adapters/simulated_olt/adaptercore/simulated_olt.go
index aa6e248..2dd7c67 100644
--- a/adapters/simulated_olt/adaptercore/simulated_olt.go
+++ b/adapters/simulated_olt/adaptercore/simulated_olt.go
@@ -99,7 +99,7 @@
 func (so *SimulatedOLT) createDeviceTopic(device *voltha.Device) error {
 	log.Infow("create-device-topic", log.Fields{"deviceId": device.Id})
 	deviceTopic := kafka.Topic{Name: so.kafkaICProxy.DefaultTopic.Name + "_" + device.Id}
-	if err := so.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic); err != nil {
+	if err := so.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic, kafka.OffsetOldest); err != nil {
 		log.Infow("create-device-topic-failed", log.Fields{"deviceId": device.Id, "error": err})
 		return err
 	}
diff --git a/adapters/simulated_onu/adaptercore/simulated_onu.go b/adapters/simulated_onu/adaptercore/simulated_onu.go
index 1aefcad..2d43944 100644
--- a/adapters/simulated_onu/adaptercore/simulated_onu.go
+++ b/adapters/simulated_onu/adaptercore/simulated_onu.go
@@ -97,7 +97,7 @@
 func (so *SimulatedONU) createDeviceTopic(device *voltha.Device) error {
 	log.Infow("create-device-topic", log.Fields{"deviceId": device.Id})
 	deviceTopic := kafka.Topic{Name: so.kafkaICProxy.DefaultTopic.Name + "_" + device.Id}
-	if err := so.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic); err != nil {
+	if err := so.kafkaICProxy.SubscribeWithDefaultRequestHandler(deviceTopic, kafka.OffsetOldest); err != nil {
 		log.Infow("create-device-topic-failed", log.Fields{"deviceId": device.Id, "error": err})
 		return err
 	}
diff --git a/afrouter/afrouter/config.go b/afrouter/afrouter/config.go
index c495376..528a3f7 100644
--- a/afrouter/afrouter/config.go
+++ b/afrouter/afrouter/config.go
@@ -209,7 +209,7 @@
 						log.Debugf("Reference to router '%s' found for package '%s'", rPkg.Router, rPkg.Package)
 						conf.Servers[k].routers[rPkg.Package] = &conf.Routers[rk]
 					} else {
-						err := errors.New(fmt.Sprintf("Duplicate router '%s' defined for package '%s'",rPkg.Package))
+						err := errors.New(fmt.Sprintf("Duplicate router '%s' defined for package '%s'",rPkg.Router, rPkg.Package))
 						log.Error(err)
 						return err
 					}
diff --git a/kafka/client.go b/kafka/client.go
index 8cc1999..316a4a5 100644
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -26,7 +26,13 @@
 )
 
 const (
+	OffsetNewest = -1
+	OffsetOldest = -2
+)
+
+const (
 	GroupIdKey = "groupId"
+	Offset = "offset"
 )
 
 const (
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index c0670d3..05d0af5 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -334,11 +334,11 @@
 
 // SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
 // when a message is received on a given topic.  So far there is only 1 target registered per microservice
-func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic) error {
+func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
 	// Subscribe to receive messages for that topic
 	var ch <-chan *ic.InterContainerMessage
 	var err error
-	if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
+	if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key:Offset, Value:initialOffset}); err != nil {
 		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
 		return err
 	}
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 35ede44..55c68a3 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -372,7 +372,7 @@
 				return nil, err
 			}
 		}
-		if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, sarama.OffsetNewest); err != nil {
+		if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
 			log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
 			return nil, err
 		}
@@ -394,7 +394,7 @@
 			// Need to use a unique group Id per topic
 			groupId = sc.consumerGroupPrefix + topic.Name
 		}
-		if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId); err != nil {
+		if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
 			log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
 			return nil, err
 		}
@@ -477,6 +477,16 @@
 	return ""
 }
 
+// getOffset returns the offset from the key-value args.
+func getOffset(kvArgs ...*KVArg) int64 {
+	for _, arg := range kvArgs {
+		if arg.Key == Offset {
+			return arg.Value.(int64)
+		}
+	}
+	return sarama.OffsetNewest
+}
+
 func (sc *SaramaClient) createClusterAdmin() error {
 	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
 	config := sarama.NewConfig()
@@ -684,7 +694,7 @@
 }
 
 // createGroupConsumer creates a consumers group
-func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, retries int) (*scc.Consumer, error) {
+func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
 	config := scc.NewConfig()
 	config.ClientID = uuid.New().String()
 	config.Group.Mode = scc.ConsumerModeMultiplex
@@ -692,7 +702,7 @@
 	//config.Group.Return.Notifications = false
 	//config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
 	//config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
-	config.Consumer.Offsets.Initial = sarama.OffsetNewest
+	config.Consumer.Offsets.Initial = initialOffset
 	//config.Consumer.Offsets.Initial = sarama.OffsetOldest
 	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
 	brokers := []string{kafkaFullAddr}
@@ -713,7 +723,7 @@
 }
 
 // dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
-// topic via the unique channel each subsciber received during subscription
+// topic via the unique channel each subscriber received during subscription
 func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
 	// Need to go over all channels and publish messages to them - do we need to copy msg?
 	sc.lockTopicToConsumerChannelMap.Lock()
@@ -851,11 +861,11 @@
 
 // setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
 // for that topic.  It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string) (chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
 	// TODO:  Replace this development partition consumers with a group consumers
 	var pConsumer *scc.Consumer
 	var err error
-	if pConsumer, err = sc.createGroupConsumer(topic, groupId, DefaultMaxRetries); err != nil {
+	if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
 		log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
 		return nil, err
 	}
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index d21cfdb..9d029fc 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -77,7 +77,7 @@
 	}
 	// Use a device topic for the response as we are the only core handling requests for this device
 	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
-	if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
+	if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic, kafka.OffsetOldest); err != nil {
 		log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
 		return err
 	}
diff --git a/rw_core/core/grpc_nbi_api_handler_client_test.go b/rw_core/core/grpc_nbi_api_handler_client_test.go
deleted file mode 100644
index 58dcf13..0000000
--- a/rw_core/core/grpc_nbi_api_handler_client_test.go
+++ /dev/null
@@ -1,272 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package core
-
-import (
-	"context"
-	"github.com/golang/protobuf/ptypes/empty"
-	"github.com/opencord/voltha-go/common/log"
-	"github.com/opencord/voltha-go/protos/common"
-	"github.com/opencord/voltha-go/protos/openflow_13"
-	"github.com/opencord/voltha-go/protos/voltha"
-	"github.com/stretchr/testify/assert"
-	"google.golang.org/grpc"
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/metadata"
-	"google.golang.org/grpc/status"
-	"os"
-	"testing"
-)
-
-var conn *grpc.ClientConn
-var stub voltha.VolthaServiceClient
-var testMode string
-
-/*
-Prerequite:  These tests require the rw_core to run prior to executing these test cases.
-*/
-
-func setup() {
-	var err error
-
-	if _, err = log.AddPackage(log.JSON, log.WarnLevel, log.Fields{"instanceId": "testing"}); err != nil {
-		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
-	}
-	conn, err = grpc.Dial("localhost:50057", grpc.WithInsecure())
-	if err != nil {
-		log.Fatalf("did not connect: %s", err)
-	}
-
-	stub = voltha.NewVolthaServiceClient(conn)
-	testMode = common.TestModeKeys_api_test.String()
-}
-
-func TestGetDevice(t *testing.T) {
-	var id common.ID
-	id.Id = "anyid"
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	response, err := stub.GetDevice(ctx, &id)
-	assert.Nil(t, response)
-	st, _ := status.FromError(err)
-	assert.Equal(t, id.Id, st.Message())
-	assert.Equal(t, codes.NotFound, st.Code())
-}
-
-func TestUpdateLogLevelError(t *testing.T) {
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	level := voltha.Logging{PackageName: "github.com/opencord/voltha-go/rw_core/core", Level: common.LogLevel_ERROR}
-	response, err := stub.UpdateLogLevel(ctx, &level)
-	log.Infow("response", log.Fields{"res": response, "error": err})
-	assert.Equal(t, &empty.Empty{}, response)
-	assert.Nil(t, err)
-}
-
-func TestGetVoltha(t *testing.T) {
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	response, err := stub.GetVoltha(ctx, &empty.Empty{})
-	assert.Nil(t, response)
-	st, _ := status.FromError(err)
-	assert.Equal(t, "UnImplemented", st.Message())
-}
-
-func TestUpdateLogLevelDebug(t *testing.T) {
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	level := voltha.Logging{PackageName: "github.com/opencord/voltha-go/rw_core/core", Level: common.LogLevel_DEBUG}
-	response, err := stub.UpdateLogLevel(ctx, &level)
-	log.Infow("response", log.Fields{"res": response, "error": err})
-	assert.Equal(t, &empty.Empty{}, response)
-	assert.Nil(t, err)
-}
-
-func TestGetCoreInstance(t *testing.T) {
-	id := &voltha.ID{Id: "getCoreInstance"}
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	response, err := stub.GetCoreInstance(ctx, id)
-	assert.Nil(t, response)
-	st, _ := status.FromError(err)
-	assert.Equal(t, "UnImplemented", st.Message())
-}
-
-func TestGetLogicalDevice(t *testing.T) {
-	id := &voltha.ID{Id: "getLogicalDevice"}
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	response, err := stub.GetLogicalDevice(ctx, id)
-	assert.Nil(t, response)
-	st, _ := status.FromError(err)
-	assert.Equal(t, id.Id, st.Message())
-	assert.Equal(t, codes.NotFound, st.Code())
-}
-
-func TestGetLogicalDevicePort(t *testing.T) {
-	id := &voltha.LogicalPortId{Id: "GetLogicalDevicePort"}
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	response, err := stub.GetLogicalDevicePort(ctx, id)
-	assert.Nil(t, response)
-	st, _ := status.FromError(err)
-	assert.Equal(t, "UnImplemented", st.Message())
-}
-
-func TestListLogicalDevicePorts(t *testing.T) {
-	id := &voltha.ID{Id: "listLogicalDevicePorts"}
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	response, err := stub.ListLogicalDevicePorts(ctx, id)
-	assert.Nil(t, response)
-	st, _ := status.FromError(err)
-	assert.Equal(t, "UnImplemented", st.Message())
-}
-
-func TestListLogicalDeviceFlows(t *testing.T) {
-	id := &voltha.ID{Id: "ListLogicalDeviceFlows"}
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	response, err := stub.ListLogicalDeviceFlows(ctx, id)
-	assert.Nil(t, response)
-	st, _ := status.FromError(err)
-	assert.Equal(t, "UnImplemented", st.Message())
-}
-
-func TestListLogicalDeviceFlowGroups(t *testing.T) {
-	id := &voltha.ID{Id: "ListLogicalDeviceFlowGroups"}
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	response, err := stub.ListLogicalDeviceFlowGroups(ctx, id)
-	assert.Nil(t, response)
-	st, _ := status.FromError(err)
-	assert.Equal(t, "UnImplemented", st.Message())
-}
-
-func TestListDevices(t *testing.T) {
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	response, _ := stub.ListDevices(ctx, &empty.Empty{})
-	assert.Equal(t, len(response.Items), 0)
-}
-
-func TestListAdapters(t *testing.T) {
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	response, err := stub.ListAdapters(ctx, &empty.Empty{})
-	assert.Nil(t, response)
-	st, _ := status.FromError(err)
-	assert.Equal(t, "UnImplemented", st.Message())
-}
-
-func TestListLogicalDevices(t *testing.T) {
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	response, _ := stub.ListLogicalDevices(ctx, &empty.Empty{})
-	assert.Equal(t, len(response.Items), 0)
-}
-
-func TestListCoreInstances(t *testing.T) {
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	response, err := stub.ListCoreInstances(ctx, &empty.Empty{})
-	assert.Nil(t, response)
-	st, _ := status.FromError(err)
-	assert.Equal(t, "UnImplemented", st.Message())
-}
-
-func TestCreateDevice(t *testing.T) {
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	device := &voltha.Device{Id: "newdevice"}
-	response, err := stub.CreateDevice(ctx, device)
-	log.Infow("response", log.Fields{"res": response, "error": err})
-	assert.Equal(t, &voltha.Device{Id: "newdevice"}, response)
-	assert.Nil(t, err)
-}
-
-func TestEnableDevice(t *testing.T) {
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	id := &voltha.ID{Id: "enabledevice"}
-	response, err := stub.EnableDevice(ctx, id)
-	log.Infow("response", log.Fields{"res": response, "error": err})
-	assert.Equal(t, &empty.Empty{}, response)
-	assert.Nil(t, err)
-}
-
-func TestDisableDevice(t *testing.T) {
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	id := &voltha.ID{Id: "DisableDevice"}
-	response, err := stub.DisableDevice(ctx, id)
-	log.Infow("response", log.Fields{"res": response, "error": err})
-	assert.Equal(t, &empty.Empty{}, response)
-	assert.Nil(t, err)
-}
-
-func TestRebootDevice(t *testing.T) {
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	id := &voltha.ID{Id: "RebootDevice"}
-	response, err := stub.RebootDevice(ctx, id)
-	log.Infow("response", log.Fields{"res": response, "error": err})
-	assert.Equal(t, &empty.Empty{}, response)
-	assert.Nil(t, err)
-}
-
-func TestDeleteDevice(t *testing.T) {
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	id := &voltha.ID{Id: "DeleteDevice"}
-	response, err := stub.DeleteDevice(ctx, id)
-	log.Infow("response", log.Fields{"res": response, "error": err})
-	assert.Equal(t, &empty.Empty{}, response)
-	assert.Nil(t, err)
-}
-
-func TestEnableLogicalDevicePort(t *testing.T) {
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	id := &voltha.LogicalPortId{Id: "EnableLogicalDevicePort"}
-	response, err := stub.EnableLogicalDevicePort(ctx, id)
-	if e, ok := status.FromError(err); ok {
-		log.Infow("response", log.Fields{"error": err, "errorcode": e.Code(), "msg": e.Message()})
-	}
-	log.Infow("response", log.Fields{"res": response, "error": err})
-	assert.Equal(t, &empty.Empty{}, response)
-	assert.Nil(t, err)
-}
-
-func TestDisableLogicalDevicePort(t *testing.T) {
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	id := &voltha.LogicalPortId{Id: "DisableLogicalDevicePort"}
-	response, err := stub.DisableLogicalDevicePort(ctx, id)
-	log.Infow("response", log.Fields{"res": response, "error": err})
-	assert.Equal(t, &empty.Empty{}, response)
-	assert.Nil(t, err)
-}
-
-func TestUpdateLogicalDeviceFlowGroupTable(t *testing.T) {
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	flow := &openflow_13.FlowGroupTableUpdate{Id: "UpdateLogicalDeviceFlowGroupTable"}
-	response, err := stub.UpdateLogicalDeviceFlowGroupTable(ctx, flow)
-	log.Infow("response", log.Fields{"res": response, "error": err})
-	assert.Equal(t, &empty.Empty{}, response)
-	assert.Nil(t, err)
-}
-
-func TestGetImageDownloadStatus(t *testing.T) {
-	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
-	img := &voltha.ImageDownload{Id: "GetImageDownloadStatus"}
-	response, err := stub.GetImageDownloadStatus(ctx, img)
-	assert.Nil(t, response)
-	st, _ := status.FromError(err)
-	assert.Equal(t, "UnImplemented", st.Message())
-}
-
-// TODO: complete the remaining tests
-
-func shutdown() {
-	conn.Close()
-}
-
-func TestMain(m *testing.M) {
-	setup()
-	code := m.Run()
-	shutdown()
-	os.Exit(code)
-}
diff --git a/tests/core/grpc_nbi_api_handler_client_test.go b/tests/core/grpc_nbi_api_handler_client_test.go
new file mode 100644
index 0000000..420bf39
--- /dev/null
+++ b/tests/core/grpc_nbi_api_handler_client_test.go
@@ -0,0 +1,693 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+	"context"
+	"fmt"
+	com "github.com/opencord/voltha-go/adapters/common"
+	"github.com/golang/protobuf/ptypes/empty"
+	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/protos/common"
+	"github.com/opencord/voltha-go/protos/openflow_13"
+	"github.com/opencord/voltha-go/protos/voltha"
+	"github.com/stretchr/testify/assert"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/metadata"
+	"google.golang.org/grpc/status"
+	"os"
+	"os/exec"
+	"strings"
+	"testing"
+	"time"
+)
+
+var conn *grpc.ClientConn
+var stub voltha.VolthaServiceClient
+var testMode string
+
+/*
+Prerequite:  These tests require the rw_core to run prior to executing these test cases.
+*/
+
+var devices map[string]*voltha.Device
+
+//func init() {
+//	log.AddPackage(log.JSON, log.ErrorLevel, nil)
+//	log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
+//	log.SetAllLogLevel(log.ErrorLevel)
+//
+//	//Start kafka and Etcd
+//	startKafkaEtcd()
+//	time.Sleep(10 * time.Second) //TODO: Find a better way to ascertain they are up
+//
+//	stub = setupGrpcConnection()
+//	stub = voltha.NewVolthaServiceClient(conn)
+//	devices = make(map[string]*voltha.Device)
+//}
+
+func setup() {
+	var err error
+
+	if _, err = log.AddPackage(log.JSON, log.WarnLevel, log.Fields{"instanceId": "testing"}); err != nil {
+		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+	}
+	log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
+	log.SetAllLogLevel(log.ErrorLevel)
+
+	//Start kafka and Etcd
+	startKafka()
+	startEtcd()
+	time.Sleep(10 * time.Second) //TODO: Find a better way to ascertain they are up
+
+	stub = setupGrpcConnection()
+	testMode = common.TestModeKeys_api_test.String()
+	devices = make(map[string]*voltha.Device)
+}
+
+func setupGrpcConnection() voltha.VolthaServiceClient {
+	grpcHostIP := os.Getenv("DOCKER_HOST_IP")
+	grpcPort := 50057
+	grpcHost := fmt.Sprintf("%s:%d", grpcHostIP, grpcPort)
+	var err error
+	conn, err = grpc.Dial(grpcHost, grpc.WithInsecure())
+	if err != nil {
+		log.Fatalf("did not connect: %s", err)
+	}
+	return voltha.NewVolthaServiceClient(conn)
+}
+
+func clearAllDevices(clearMap bool) {
+	for key, _ := range devices {
+		ctx := context.Background()
+		response, err := stub.DeleteDevice(ctx, &voltha.ID{Id: key})
+		log.Infow("response", log.Fields{"res": response, "error": err})
+		if clearMap {
+			delete(devices, key)
+		}
+	}
+}
+
+// Verify if all ids are present in the global list of devices
+func hasAllIds(ids *voltha.IDs) bool {
+	if ids == nil && len(devices) == 0 {
+		return true
+	}
+	if ids == nil {
+		return false
+	}
+	for _, id := range ids.Items {
+		if _, exist := devices[id.Id]; !exist {
+			return false
+		}
+	}
+	return true
+}
+
+func startKafka() {
+	fmt.Println("Starting Kafka and Etcd ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../compose/docker-compose-zk-kafka-test.yml", "up", "-d")
+	if err := cmd.Run(); err != nil {
+		log.Fatal(err)
+	}
+}
+
+func startEtcd() {
+	fmt.Println("Starting Etcd ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../compose/docker-compose-etcd.yml", "up", "-d")
+	if err := cmd.Run(); err != nil {
+		log.Fatal(err)
+	}
+}
+
+func stopKafka() {
+	fmt.Println("Stopping Kafka and Etcd ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../compose/docker-compose-zk-kafka-test.yml", "down")
+	if err := cmd.Run(); err != nil {
+		// ignore error - as this is mostly due network being left behind as its being used by other
+		// containers
+		log.Warn(err)
+	}
+}
+
+func stopEtcd() {
+	fmt.Println("Stopping Etcd ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../compose/docker-compose-etcd.yml", "down")
+	if err := cmd.Run(); err != nil {
+		// ignore error - as this is mostly due network being left behind as its being used by other
+		// containers
+		log.Warn(err)
+	}
+}
+
+func startCore() {
+	fmt.Println("Starting voltha core ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../compose/rw_core.yml", "up", "-d")
+	if err := cmd.Run(); err != nil {
+		log.Fatal(err)
+	}
+}
+
+func stopCore() {
+	fmt.Println("Stopping voltha core ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../compose/rw_core.yml", "down")
+	if err := cmd.Run(); err != nil {
+		// ignore error - as this is mostly due network being left behind as its being used by other
+		// containers
+		log.Warn(err)
+	}
+}
+
+func startSimulatedOLTAndONUAdapters() {
+	fmt.Println("Starting simulated OLT and ONU adapters ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../compose/adapters-simulated.yml", "up", "-d")
+	if err := cmd.Run(); err != nil {
+		log.Fatal(err)
+	}
+}
+
+func stopSimulatedOLTAndONUAdapters() {
+	fmt.Println("Stopping simulated OLT and ONU adapters ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../compose/adapters-simulated.yml", "down")
+	if err := cmd.Run(); err != nil {
+		// ignore error - as this is mostly due network being left behind as its being used by other
+		// containers
+		log.Warn(err)
+	}
+}
+
+
+func TestListDeviceIds(t *testing.T) {
+	fmt.Println("Testing list Devices Ids ...")
+	//0. Start kafka and Ectd
+	startKafka()
+	startEtcd()
+
+	//1. Start the core
+	startCore()
+
+	// Wait until it's up - TODO: find a better way to check
+	time.Sleep(10 * time.Second)
+
+	//2. Create a set of devices into the Core
+	for i := 0; i < 10; i++ {
+		ctx := context.Background()
+		device := &voltha.Device{Type: "simulated_olt"}
+		response, err := stub.CreateDevice(ctx, device)
+		log.Infow("response", log.Fields{"res": response, "error": err})
+		assert.NotNil(t, response)
+		assert.Nil(t, err)
+		devices[response.Id] = response
+	}
+
+	//3. Verify devices have been added correctly
+	ctx := context.Background()
+	response, err := stub.ListDeviceIds(ctx, &empty.Empty{})
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Nil(t, err)
+	assert.True(t, hasAllIds(response))
+
+	//4. Stop the core
+	stopCore()
+
+	//5. Stop Kafka and Etcd
+	stopKafka()
+	stopEtcd()
+}
+
+func TestReconcileDevices(t *testing.T) {
+	fmt.Println("Testing Reconcile Devices ...")
+
+	//0. Start kafka and Ectd
+	startKafka()
+	startEtcd()
+
+	//1. Start the core
+	startCore()
+
+	// Wait until it's up - TODO: find a better way to check
+	time.Sleep(10 * time.Second)
+
+	//2. Create a set of devices into the Core
+	for i := 0; i < 10; i++ {
+		ctx := context.Background()
+		device := &voltha.Device{Type: "simulated_olt"}
+		response, err := stub.CreateDevice(ctx, device)
+		log.Infow("response", log.Fields{"res": response, "error": err})
+		assert.Nil(t, err)
+		devices[response.Id] = response
+	}
+	//3. Verify devices have been added correctly
+	ctx := context.Background()
+	response, err := stub.ListDeviceIds(ctx, &empty.Empty{})
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Nil(t, err)
+	assert.True(t, hasAllIds(response))
+
+	//4. Stop the core and restart it. This will start the core with no data in memory but
+	// etcd will still have the data.
+	stopCore()
+	time.Sleep(5 * time.Second)
+	startCore()
+	time.Sleep(10 * time.Second)
+
+	//5. Setup the connection again
+	stub = setupGrpcConnection()
+
+	//6. Verify there are no devices left
+	ctx = context.Background()
+	response, err = stub.ListDeviceIds(ctx, &empty.Empty{})
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Nil(t, err)
+	assert.Equal(t, len(response.Items), 0)
+
+	//7. Invoke reconcile with all stored list
+	toRestore := &voltha.IDs{Items: make([]*voltha.ID, 0)}
+	for key, _ := range devices {
+		toRestore.Items = append(toRestore.Items, &voltha.ID{Id: key})
+	}
+	ctx = context.Background()
+	_, err = stub.ReconcileDevices(ctx, toRestore)
+	assert.Nil(t, err)
+
+	//8. Verify all devices have been restored
+	ctx = context.Background()
+	response, err = stub.ListDeviceIds(ctx, &empty.Empty{})
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Nil(t, err)
+	assert.True(t, hasAllIds(response))
+
+	for _, id := range response.Items {
+		fmt.Println("id", id.Id)
+	}
+
+	//9. Store the core
+	stopCore()
+
+	//10. Stop Kafka and Etcd
+	stopKafka()
+	stopEtcd()
+}
+
+func TestDeviceManagement(t *testing.T) {
+	fmt.Println("Testing Device Management ...")
+
+	numberOfOLTDevices := 1
+
+	//0. Start kafka and Ectd
+	startKafka()
+	startEtcd()
+
+	//1. Start the core
+	startCore()
+
+	//2. Start the simulated adapters
+	startSimulatedOLTAndONUAdapters()
+
+	// Wait until the core and adapters sync up
+	time.Sleep(10 * time.Second)
+
+	//3. Create a set of devices into the Core
+	devices = make(map[string]*voltha.Device)
+	logicalDevices := make(map[string]*voltha.LogicalDevice)
+	for i := 0; i < numberOfOLTDevices; i++ {
+		ctx := context.Background()
+		randomMacAddress := strings.ToUpper(com.GetRandomMacAddress())
+		device := &voltha.Device{Type: "simulated_olt", MacAddress:randomMacAddress}
+		response, err := stub.CreateDevice(ctx, device)
+		log.Infow("response", log.Fields{"res": response, "error": err})
+		assert.Nil(t, err)
+		devices[response.Id] = response
+	}
+
+	//4. Enable all the devices
+	for id, _ := range devices {
+		ctx := context.Background()
+		response, err := stub.EnableDevice(ctx, &common.ID{Id:id})
+		log.Infow("response", log.Fields{"res": response, "error": err})
+		assert.Nil(t, err)
+	}
+
+	// Wait until all devices have been enabled
+	if numberOfOLTDevices < 5 {
+		time.Sleep(3 * time.Second)
+	} else if numberOfOLTDevices < 20 {
+		time.Sleep(20 * time.Second)
+	} else {
+		time.Sleep(30 * time.Second)
+	}
+	//time.Sleep(1 * time.Second * time.Duration(numberOfDevices))
+
+	//5. Verify that all devices are in enabled state
+	ctx := context.Background()
+	response, err := stub.ListDevices(ctx, &empty.Empty{})
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Nil(t, err)
+	assert.Equal(t, len(devices)*2, len(response.Items))
+	for _, d := range (response.Items) {
+		devices[d.Id] = d
+		assert.Equal(t, d.AdminState, voltha.AdminState_ENABLED)
+	}
+
+	//6. Get the logical devices
+	ctx = context.Background()
+	lresponse, lerr := stub.ListLogicalDevices(ctx, &empty.Empty{})
+	log.Infow("response", log.Fields{"res": response, "error": lerr})
+	assert.Nil(t, lerr)
+	assert.Equal(t, numberOfOLTDevices, len(lresponse.Items))
+	for _, ld := range (lresponse.Items) {
+		logicalDevices[ld.Id] = ld
+		// Ensure each logical device have two ports
+		assert.Equal(t, 2, len(ld.Ports))
+	}
+
+	//7. Disable all ONUs & check status & check logical device
+	for id, d := range devices {
+		ctx := context.Background()
+		if d.Type == "simulated_onu" {
+			response, err := stub.DisableDevice(ctx, &common.ID{Id:id})
+			log.Infow("response", log.Fields{"res": response, "error": err})
+			assert.Nil(t, err)
+		}
+	}
+
+	// Wait for all the changes to be populated
+	time.Sleep(3 * time.Second)
+
+	ctx = context.Background()
+	response, err = stub.ListDevices(ctx, &empty.Empty{})
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Nil(t, err)
+	assert.Equal(t, len(devices), len(response.Items))
+	for _, d := range (response.Items) {
+		if d.Type == "simulated_onu" {
+			assert.Equal(t, d.AdminState, voltha.AdminState_DISABLED)
+			devices[d.Id] = d
+		} else {
+			assert.Equal(t, d.AdminState, voltha.AdminState_ENABLED)
+		}
+	}
+
+	ctx = context.Background()
+	lresponse, lerr = stub.ListLogicalDevices(ctx, &empty.Empty{})
+	log.Infow("response", log.Fields{"res": response, "error": lerr})
+	assert.Nil(t, lerr)
+	assert.Equal(t, numberOfOLTDevices, len(lresponse.Items))
+	for _, ld := range (lresponse.Items) {
+		logicalDevices[ld.Id] = ld
+		// Ensure each logical device have one port - only olt port
+		assert.Equal(t, 1, len(ld.Ports))
+	}
+
+	//8. Enable all ONUs & check status & check logical device
+	for id, d := range devices {
+		ctx := context.Background()
+		if d.Type == "simulated_onu" {
+			response, err := stub.EnableDevice(ctx, &common.ID{Id:id})
+			log.Infow("response", log.Fields{"res": response, "error": err})
+			assert.Nil(t, err)
+		}
+	}
+
+	// Wait for all the changes to be populated
+	time.Sleep(3 * time.Second)
+
+	ctx = context.Background()
+	response, err = stub.ListDevices(ctx, &empty.Empty{})
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Nil(t, err)
+	assert.Equal(t, len(devices), len(response.Items))
+	for _, d := range (response.Items) {
+		assert.Equal(t, d.AdminState, voltha.AdminState_ENABLED)
+		devices[d.Id] = d
+	}
+
+	//ctx = context.Background()
+	//lresponse, lerr = stub.ListLogicalDevices(ctx, &empty.Empty{})
+	//log.Infow("response", log.Fields{"res": response, "error": lerr})
+	//assert.Nil(t, lerr)
+	//assert.Equal(t, numberOfOLTDevices, len(lresponse.Items))
+	//for _, ld := range (lresponse.Items) {
+	//	logicalDevices[ld.Id] = ld
+	//	// Ensure each logical device have two ports
+	//	assert.Equal(t, 2, len(ld.Ports))
+	//}
+
+	//9. Disable all OLTs & check status & check logical device
+
+	//10. Enable all OLTs & Enable all ONUs & check status & check logical device
+
+	//11. Disable all OLTs & check status & check logical device
+
+	//12. Delete all Devices & check status & check logical device
+
+	////13. Store simulated adapters
+	//stopSimulatedOLTAndONUAdapters()
+	//
+	////14. Store the core
+	//stopCore()
+	//
+	////15. Stop Kafka and Etcd
+	//stopKafka()
+	//stopEtcd()
+}
+
+
+func TestGetDevice(t *testing.T) {
+	var id common.ID
+	id.Id = "anyid"
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	response, err := stub.GetDevice(ctx, &id)
+	assert.Nil(t, response)
+	st, _ := status.FromError(err)
+	assert.Equal(t, id.Id, st.Message())
+	assert.Equal(t, codes.NotFound, st.Code())
+}
+
+func TestUpdateLogLevelError(t *testing.T) {
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	level := voltha.Logging{PackageName: "github.com/opencord/voltha-go/rw_core/core", Level: common.LogLevel_ERROR}
+	response, err := stub.UpdateLogLevel(ctx, &level)
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Equal(t, &empty.Empty{}, response)
+	assert.Nil(t, err)
+}
+
+func TestGetVoltha(t *testing.T) {
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	response, err := stub.GetVoltha(ctx, &empty.Empty{})
+	assert.Nil(t, response)
+	st, _ := status.FromError(err)
+	assert.Equal(t, "UnImplemented", st.Message())
+}
+
+func TestUpdateLogLevelDebug(t *testing.T) {
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	level := voltha.Logging{PackageName: "github.com/opencord/voltha-go/rw_core/core", Level: common.LogLevel_DEBUG}
+	response, err := stub.UpdateLogLevel(ctx, &level)
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Equal(t, &empty.Empty{}, response)
+	assert.Nil(t, err)
+}
+
+func TestGetCoreInstance(t *testing.T) {
+	id := &voltha.ID{Id: "getCoreInstance"}
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	response, err := stub.GetCoreInstance(ctx, id)
+	assert.Nil(t, response)
+	st, _ := status.FromError(err)
+	assert.Equal(t, "UnImplemented", st.Message())
+}
+
+func TestGetLogicalDevice(t *testing.T) {
+	id := &voltha.ID{Id: "getLogicalDevice"}
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	response, err := stub.GetLogicalDevice(ctx, id)
+	assert.Nil(t, response)
+	st, _ := status.FromError(err)
+	assert.Equal(t, id.Id, st.Message())
+	assert.Equal(t, codes.NotFound, st.Code())
+}
+
+func TestGetLogicalDevicePort(t *testing.T) {
+	id := &voltha.LogicalPortId{Id: "GetLogicalDevicePort"}
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	response, err := stub.GetLogicalDevicePort(ctx, id)
+	assert.Nil(t, response)
+	st, _ := status.FromError(err)
+	assert.Equal(t, "UnImplemented", st.Message())
+}
+
+func TestListLogicalDevicePorts(t *testing.T) {
+	id := &voltha.ID{Id: "listLogicalDevicePorts"}
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	response, err := stub.ListLogicalDevicePorts(ctx, id)
+	assert.Nil(t, response)
+	st, _ := status.FromError(err)
+	assert.Equal(t, "UnImplemented", st.Message())
+}
+
+func TestListLogicalDeviceFlows(t *testing.T) {
+	id := &voltha.ID{Id: "ListLogicalDeviceFlows"}
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	response, err := stub.ListLogicalDeviceFlows(ctx, id)
+	assert.Nil(t, response)
+	st, _ := status.FromError(err)
+	assert.Equal(t, "UnImplemented", st.Message())
+}
+
+func TestListLogicalDeviceFlowGroups(t *testing.T) {
+	id := &voltha.ID{Id: "ListLogicalDeviceFlowGroups"}
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	response, err := stub.ListLogicalDeviceFlowGroups(ctx, id)
+	assert.Nil(t, response)
+	st, _ := status.FromError(err)
+	assert.Equal(t, "UnImplemented", st.Message())
+}
+
+func TestListDevices(t *testing.T) {
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	response, _ := stub.ListDevices(ctx, &empty.Empty{})
+	assert.Equal(t, len(response.Items), 0)
+}
+
+func TestListAdapters(t *testing.T) {
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	response, err := stub.ListAdapters(ctx, &empty.Empty{})
+	assert.Nil(t, response)
+	st, _ := status.FromError(err)
+	assert.Equal(t, "UnImplemented", st.Message())
+}
+
+func TestListLogicalDevices(t *testing.T) {
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	response, _ := stub.ListLogicalDevices(ctx, &empty.Empty{})
+	assert.Equal(t, len(response.Items), 0)
+}
+
+func TestListCoreInstances(t *testing.T) {
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	response, err := stub.ListCoreInstances(ctx, &empty.Empty{})
+	assert.Nil(t, response)
+	st, _ := status.FromError(err)
+	assert.Equal(t, "UnImplemented", st.Message())
+}
+
+func TestCreateDevice(t *testing.T) {
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	device := &voltha.Device{Id: "newdevice"}
+	response, err := stub.CreateDevice(ctx, device)
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Equal(t, &voltha.Device{Id: "newdevice"}, response)
+	assert.Nil(t, err)
+}
+
+func TestEnableDevice(t *testing.T) {
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	id := &voltha.ID{Id: "enabledevice"}
+	response, err := stub.EnableDevice(ctx, id)
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Equal(t, &empty.Empty{}, response)
+	assert.Nil(t, err)
+}
+
+func TestDisableDevice(t *testing.T) {
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	id := &voltha.ID{Id: "DisableDevice"}
+	response, err := stub.DisableDevice(ctx, id)
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Equal(t, &empty.Empty{}, response)
+	assert.Nil(t, err)
+}
+
+func TestRebootDevice(t *testing.T) {
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	id := &voltha.ID{Id: "RebootDevice"}
+	response, err := stub.RebootDevice(ctx, id)
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Equal(t, &empty.Empty{}, response)
+	assert.Nil(t, err)
+}
+
+func TestDeleteDevice(t *testing.T) {
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	id := &voltha.ID{Id: "DeleteDevice"}
+	response, err := stub.DeleteDevice(ctx, id)
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Equal(t, &empty.Empty{}, response)
+	assert.Nil(t, err)
+}
+
+func TestEnableLogicalDevicePort(t *testing.T) {
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	id := &voltha.LogicalPortId{Id: "EnableLogicalDevicePort"}
+	response, err := stub.EnableLogicalDevicePort(ctx, id)
+	if e, ok := status.FromError(err); ok {
+		log.Infow("response", log.Fields{"error": err, "errorcode": e.Code(), "msg": e.Message()})
+	}
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Equal(t, &empty.Empty{}, response)
+	assert.Nil(t, err)
+}
+
+func TestDisableLogicalDevicePort(t *testing.T) {
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	id := &voltha.LogicalPortId{Id: "DisableLogicalDevicePort"}
+	response, err := stub.DisableLogicalDevicePort(ctx, id)
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Equal(t, &empty.Empty{}, response)
+	assert.Nil(t, err)
+}
+
+func TestUpdateLogicalDeviceFlowGroupTable(t *testing.T) {
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	flow := &openflow_13.FlowGroupTableUpdate{Id: "UpdateLogicalDeviceFlowGroupTable"}
+	response, err := stub.UpdateLogicalDeviceFlowGroupTable(ctx, flow)
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Equal(t, &empty.Empty{}, response)
+	assert.Nil(t, err)
+}
+
+func TestGetImageDownloadStatus(t *testing.T) {
+	ctx := metadata.NewOutgoingContext(context.Background(), metadata.Pairs(testMode, "true"))
+	img := &voltha.ImageDownload{Id: "GetImageDownloadStatus"}
+	response, err := stub.GetImageDownloadStatus(ctx, img)
+	assert.Nil(t, response)
+	st, _ := status.FromError(err)
+	assert.Equal(t, "UnImplemented", st.Message())
+}
+
+// TODO: complete the remaining tests
+
+func shutdown() {
+	conn.Close()
+}
+
+func TestMain(m *testing.M) {
+	setup()
+	code := m.Run()
+	shutdown()
+	os.Exit(code)
+}
diff --git a/tests/core/nbi_test.go b/tests/core/nbi_test.go
deleted file mode 100644
index 6e1b531..0000000
--- a/tests/core/nbi_test.go
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package core
-
-import (
-	"context"
-	"fmt"
-	"github.com/golang/protobuf/ptypes/empty"
-	"github.com/opencord/voltha-go/common/log"
-	"github.com/opencord/voltha-go/protos/voltha"
-	"github.com/stretchr/testify/assert"
-	"google.golang.org/grpc"
-	"os"
-	"os/exec"
-	"testing"
-	"time"
-)
-
-var conn *grpc.ClientConn
-var stub voltha.VolthaServiceClient
-var devices map[string]*voltha.Device
-
-func init() {
-	log.AddPackage(log.JSON, log.ErrorLevel, nil)
-	log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
-	log.SetAllLogLevel(log.ErrorLevel)
-
-	//Start kafka and Etcd
-	startKafkaEtcd()
-	time.Sleep(10 * time.Second) //TODO: Find a better way to ascertain they are up
-
-	stub = setupGrpcConnection()
-	stub = voltha.NewVolthaServiceClient(conn)
-	devices = make(map[string]*voltha.Device)
-}
-
-func setupGrpcConnection() voltha.VolthaServiceClient {
-	grpcHostIP := os.Getenv("DOCKER_HOST_IP")
-	grpcPort := 50057
-	grpcHost := fmt.Sprintf("%s:%d", grpcHostIP, grpcPort)
-	var err error
-	conn, err = grpc.Dial(grpcHost, grpc.WithInsecure())
-	if err != nil {
-		log.Fatalf("did not connect: %s", err)
-	}
-	return voltha.NewVolthaServiceClient(conn)
-}
-
-func clearAllDevices(clearMap bool) {
-	for key, _ := range devices {
-		ctx := context.Background()
-		response, err := stub.DeleteDevice(ctx, &voltha.ID{Id: key})
-		log.Infow("response", log.Fields{"res": response, "error": err})
-		if clearMap {
-			delete(devices, key)
-		}
-	}
-}
-
-// Verify if all ids are present in the global list of devices
-func hasAllIds(ids *voltha.IDs) bool {
-	if ids == nil && len(devices) == 0 {
-		return true
-	}
-	if ids == nil {
-		return false
-	}
-	for _, id := range ids.Items {
-		if _, exist := devices[id.Id]; !exist {
-			return false
-		}
-	}
-	return true
-}
-
-func startKafkaEtcd() {
-	fmt.Println("Starting Kafka and Etcd ...")
-	command := "docker-compose"
-	cmd := exec.Command(command, "-f", "../../compose/docker-compose-zk-kafka-test.yml", "up", "-d")
-	if err := cmd.Run(); err != nil {
-		log.Fatal(err)
-	}
-	cmd = exec.Command(command, "-f", "../../compose/docker-compose-etcd.yml", "up", "-d")
-	if err := cmd.Run(); err != nil {
-		log.Fatal(err)
-	}
-}
-
-func stopKafkaEtcd() {
-	fmt.Println("Stopping Kafka and Etcd ...")
-	command := "docker-compose"
-	cmd := exec.Command(command, "-f", "../../compose/docker-compose-zk-kafka-test.yml", "down")
-	if err := cmd.Run(); err != nil {
-		// ignore error - as this is mostly due network being left behind as its being used by other
-		// containers
-		log.Warn(err)
-	}
-	cmd = exec.Command(command, "-f", "../../compose/docker-compose-etcd.yml", "down")
-	if err := cmd.Run(); err != nil {
-		// ignore error - as this is mostly due network being left behind as its being used by other
-		// containers
-		log.Warn(err)
-	}
-}
-
-func startCore() {
-	fmt.Println("Starting voltha core ...")
-	command := "docker-compose"
-	cmd := exec.Command(command, "-f", "../../compose/rw_core.yml", "up", "-d")
-	if err := cmd.Run(); err != nil {
-		log.Fatal(err)
-	}
-}
-
-func stopCore() {
-	fmt.Println("Stopping voltha core ...")
-	command := "docker-compose"
-	cmd := exec.Command(command, "-f", "../../compose/rw_core.yml", "down")
-	if err := cmd.Run(); err != nil {
-		// ignore error - as this is mostly due network being left behind as its being used by other
-		// containers
-		log.Warn(err)
-	}
-}
-
-func TestListDeviceIds(t *testing.T) {
-	//1. Start the core
-	startCore()
-
-	// Wait until it's up - TODO: find a better way to check
-	time.Sleep(10 * time.Second)
-
-	//2. Create a set of devices into the Core
-	for i := 0; i < 10; i++ {
-		ctx := context.Background()
-		device := &voltha.Device{Type: "simulated_olt"}
-		response, err := stub.CreateDevice(ctx, device)
-		log.Infow("response", log.Fields{"res": response, "error": err})
-		assert.Nil(t, err)
-		devices[response.Id] = response
-	}
-
-	//3. Verify devices have been added correctly
-	ctx := context.Background()
-	response, err := stub.ListDeviceIds(ctx, &empty.Empty{})
-	log.Infow("response", log.Fields{"res": response, "error": err})
-	assert.Nil(t, err)
-	assert.True(t, hasAllIds(response))
-
-	//	4. Stop the core
-	stopCore()
-}
-
-func TestReconcileDevices(t *testing.T) {
-	//1. Start the core
-	startCore()
-
-	// Wait until it's up - TODO: find a better way to check
-	time.Sleep(10 * time.Second)
-
-	//2. Create a set of devices into the Core
-	for i := 0; i < 10; i++ {
-		ctx := context.Background()
-		device := &voltha.Device{Type: "simulated_olt"}
-		response, err := stub.CreateDevice(ctx, device)
-		log.Infow("response", log.Fields{"res": response, "error": err})
-		assert.Nil(t, err)
-		devices[response.Id] = response
-	}
-	//3. Verify devices have been added correctly
-	ctx := context.Background()
-	response, err := stub.ListDeviceIds(ctx, &empty.Empty{})
-	log.Infow("response", log.Fields{"res": response, "error": err})
-	assert.Nil(t, err)
-	assert.True(t, hasAllIds(response))
-
-	//4. Stop the core and restart it. This will start the core with no data in memory but
-	// etcd will still have the data.
-	stopCore()
-	time.Sleep(5 * time.Second)
-	startCore()
-	time.Sleep(10 * time.Second)
-
-	//5. Setup the connection again
-	stub = setupGrpcConnection()
-
-	//6. Verify there are no devices left
-	ctx = context.Background()
-	response, err = stub.ListDeviceIds(ctx, &empty.Empty{})
-	log.Infow("response", log.Fields{"res": response, "error": err})
-	assert.Nil(t, err)
-	assert.Equal(t, len(response.Items), 0)
-
-	//7. Invoke reconcile with all stored list
-	toRestore := &voltha.IDs{Items: make([]*voltha.ID, 0)}
-	for key, _ := range devices {
-		toRestore.Items = append(toRestore.Items, &voltha.ID{Id: key})
-	}
-	ctx = context.Background()
-	_, err = stub.ReconcileDevices(ctx, toRestore)
-	assert.Nil(t, err)
-
-	//8. Verify all devices have been restored
-	ctx = context.Background()
-	response, err = stub.ListDeviceIds(ctx, &empty.Empty{})
-	log.Infow("response", log.Fields{"res": response, "error": err})
-	assert.Nil(t, err)
-	assert.True(t, hasAllIds(response))
-
-	for _, id := range response.Items {
-		fmt.Println("id", id.Id)
-	}
-
-	//9. Store the core
-	stopCore()
-}
-
-func shutdown() {
-	conn.Close()
-	stopKafkaEtcd()
-}
-
-func TestMain(m *testing.M) {
-	code := m.Run()
-	shutdown()
-	os.Exit(code)
-}