This commit consists of adding two new northbound APIs to the Core
to be used mostly by the Affinity Router: ListDeviceIds retrieves the
ids of devices present in a Core memory; ReconcileDevices ia an API
the Affinity router can use to push a list of device ids to a Core
for the latter to load and reconcile the devices in memory (used
mostly of a core restart).

Change-Id: I0d292054e09a099ad8be7669fbc3fe3ba15a5579
diff --git a/common/core/northbound/grpc/default_api_handler.go b/common/core/northbound/grpc/default_api_handler.go
index f935462..0cc76c5 100644
--- a/common/core/northbound/grpc/default_api_handler.go
+++ b/common/core/northbound/grpc/default_api_handler.go
@@ -117,6 +117,19 @@
 	return nil, errors.New("UnImplemented")
 }
 
+func (handler *DefaultAPIHandler) ListDeviceIds(ctx context.Context, empty *empty.Empty) (*voltha.IDs, error) {
+	log.Debug("ListDeviceIDs-request")
+	return nil, errors.New("UnImplemented")
+}
+
+func (handler *DefaultAPIHandler) ReconcileDevices(ctx context.Context, ids *voltha.IDs) (*empty.Empty, error) {
+	if ids != nil {
+		log.Debugw("ReconcileDevices-request", log.Fields{"length": len(ids.Items)})
+		return nil, errors.New("UnImplemented")
+	}
+	return nil, errors.New("ids-null")
+}
+
 func (handler *DefaultAPIHandler) GetDevice(ctx context.Context, id *voltha.ID) (*voltha.Device, error) {
 	log.Debugw("GetDevice-request", log.Fields{"id": id})
 	return nil, errors.New("UnImplemented")
diff --git a/protos/common.proto b/protos/common.proto
index d1234d5..b1c13ae 100644
--- a/protos/common.proto
+++ b/protos/common.proto
@@ -11,6 +11,11 @@
     string id = 1;
 }
 
+// Represents a list of IDs
+message IDs {
+    repeated ID items = 1;
+}
+
 enum TestModeKeys {
     api_test=0;
 }
diff --git a/protos/voltha.proto b/protos/voltha.proto
index e03442a..f2f885f 100644
--- a/protos/voltha.proto
+++ b/protos/voltha.proto
@@ -269,6 +269,23 @@
         option (voltha.yang_xml_tag).xml_tag = 'devices';
     }
 
+    // List all physical devices IDs controlled by the Voltha cluster
+    rpc ListDeviceIds(google.protobuf.Empty) returns(IDs) {
+        option (google.api.http) = {
+            get: "/api/v1/deviceids"
+        };
+        option (voltha.yang_xml_tag).xml_tag = 'id';
+        option (voltha.yang_xml_tag).list_items_name = 'items';
+    }
+
+    // Request to a voltha Core to reconcile a set of devices based on their IDs
+    rpc ReconcileDevices(IDs) returns(google.protobuf.Empty) {
+        option (google.api.http) = {
+            post: "/api/v1/deviceids"
+            body: "*"
+        };
+    }
+
     // Get more information on a given physical device
     rpc GetDevice(ID) returns(Device) {
         option (google.api.http) = {
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index c42a64e..484ff35 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -60,15 +60,15 @@
 	// Do not call NewBackend constructor; it creates its own KV client
 	// Commented the backend for now until the issue between the model and the KV store
 	// is resolved.
-	//backend := model.Backend{
-	//	MsgClient:     kvClient,
-	//	StoreType:  cf.KVStoreType,
-	//	Host:       cf.KVStoreHost,
-	//	Port:       cf.KVStorePort,
-	//	Timeout:    cf.KVStoreTimeout,
-	//	PathPrefix: "service/voltha"}
-	core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, nil)
-	core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, nil)
+	backend := model.Backend{
+		Client:     kvClient,
+		StoreType:  cf.KVStoreType,
+		Host:       cf.KVStoreHost,
+		Port:       cf.KVStorePort,
+		Timeout:    cf.KVStoreTimeout,
+		PathPrefix: "service/voltha"}
+	core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &backend)
+	core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
 	core.clusterDataProxy = core.clusterDataRoot.CreateProxy("/", false)
 	core.localDataProxy = core.localDataRoot.CreateProxy("/", false)
 	return &core
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 9e8710b..13e0adf 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -285,18 +285,15 @@
 			agent.lockDevice.Unlock()
 			return status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceId, voltha.AdminState_DISABLED)
 		}
-		// Send the request to an Adapter and wait for a response
-		if err := agent.adapterProxy.DeleteDevice(ctx, device); err != nil {
-			log.Debugw("deleteDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
-			agent.lockDevice.Unlock()
-			return err
+		if device.AdminState != voltha.AdminState_PREPROVISIONED {
+			// Send the request to an Adapter and wait for a response
+			if err := agent.adapterProxy.DeleteDevice(ctx, device); err != nil {
+				log.Debugw("deleteDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+				agent.lockDevice.Unlock()
+				return err
+			}
 		}
-		//	Set the device Admin state to DELETED in order to trigger the callback to delete
-		// child devices, if any
-		// Received an Ack (no error found above).  Now update the device in the model to the expected state
-		cloned := proto.Clone(device).(*voltha.Device)
-		cloned.AdminState = voltha.AdminState_DELETED
-		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+		if removed := agent.clusterDataProxy.Remove("/devices/"+agent.deviceId, ""); removed == nil {
 			agent.lockDevice.Unlock()
 			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
 		}
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index e50c035..3f27e10 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -104,6 +104,16 @@
 	return nil
 }
 
+func (dMgr *DeviceManager) listDeviceIdsFromMap() *voltha.IDs {
+	dMgr.lockDeviceAgentsMap.Lock()
+	defer dMgr.lockDeviceAgentsMap.Unlock()
+	result := &voltha.IDs{Items: make([]*voltha.ID, 0)}
+	for key, _ := range dMgr.deviceAgents {
+		result.Items = append(result.Items, &voltha.ID{Id: key})
+	}
+	return result
+}
+
 func (dMgr *DeviceManager) createDevice(ctx context.Context, device *voltha.Device, ch chan interface{}) {
 	log.Debugw("createDevice", log.Fields{"device": device, "aproxy": dMgr.adapterProxy})
 
@@ -202,19 +212,46 @@
 	return result, nil
 }
 
-//func (dMgr *DeviceManager) ListDevices() (*voltha.Devices, error) {
-//	log.Debug("ListDevices")
-//	result := &voltha.Devices{}
-//	dMgr.lockDeviceAgentsMap.Lock()
-//	defer dMgr.lockDeviceAgentsMap.Unlock()
-//	for _, agent := range dMgr.deviceAgents {
-//		if device, err := agent.getDevice(); err == nil {
-//			//cloned := proto.Clone(device).(*voltha.Device)
-//			result.Items = append(result.Items, device)
-//		}
-//	}
-//	return result, nil
-//}
+// ListDeviceIds retrieves the latest device IDs information from the data model (memory data only)
+func (dMgr *DeviceManager) ListDeviceIds() (*voltha.IDs, error) {
+	log.Debug("ListDeviceIDs")
+	// Report only device IDs that are in the device agent map
+	return dMgr.listDeviceIdsFromMap(), nil
+}
+
+//ReconcileDevices is a request to a voltha core to managed a list of devices based on their IDs
+func (dMgr *DeviceManager) ReconcileDevices(ctx context.Context, ids *voltha.IDs, ch chan interface{}) {
+	log.Debug("ReconcileDevices")
+	var res interface{}
+	if ids != nil {
+		toReconcile := len(ids.Items)
+		reconciled := 0
+		for _, id := range ids.Items {
+			//	 Act on the device only if its not present in the agent map
+			if agent := dMgr.getDeviceAgent(id.Id); agent == nil {
+				//	Device Id not in memory
+				log.Debugw("reconciling-device", log.Fields{"id": id.Id})
+				// Load device from model
+				if device := dMgr.clusterDataProxy.Get("/devices/"+id.Id, 0, false, ""); device != nil {
+					agent = newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
+					dMgr.addDeviceAgentToMap(agent)
+					agent.start(nil)
+					reconciled += 1
+				} else {
+					log.Warnw("device-inexistent", log.Fields{"id": id.Id})
+				}
+			} else {
+				reconciled += 1
+			}
+		}
+		if toReconcile != reconciled {
+			res = status.Errorf(codes.DataLoss, "less-device-reconciled:%d/%d", reconciled, toReconcile)
+		}
+	} else {
+		res = status.Errorf(codes.InvalidArgument, "empty-list-of-ids")
+	}
+	sendResponse(ctx, ch, res)
+}
 
 func (dMgr *DeviceManager) updateDevice(device *voltha.Device) error {
 	log.Debugw("updateDevice", log.Fields{"deviceid": device.Id, "device": device})
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 9b7fac7..fa6d0ca 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -168,6 +168,29 @@
 	return handler.deviceMgr.ListDevices()
 }
 
+// ListDeviceIds returns the list of device ids managed by a voltha core
+func (handler *APIHandler) ListDeviceIds(ctx context.Context, empty *empty.Empty) (*voltha.IDs, error) {
+	log.Debug("ListDeviceIDs")
+	if isTestMode(ctx) {
+		out := &voltha.IDs{Items: make([]*voltha.ID, 0)}
+		return out, nil
+	}
+	return handler.deviceMgr.ListDeviceIds()
+}
+
+//ReconcileDevices is a request to a voltha core to managed a list of devices  based on their IDs
+func (handler *APIHandler) ReconcileDevices(ctx context.Context, ids *voltha.IDs) (*empty.Empty, error) {
+	log.Debug("ReconcileDevices")
+	if isTestMode(ctx) {
+		out := new(empty.Empty)
+		return out, nil
+	}
+	ch := make(chan interface{})
+	defer close(ch)
+	go handler.deviceMgr.ReconcileDevices(ctx, ids, ch)
+	return waitForNilResponseOnSuccess(ctx, ch)
+}
+
 // GetLogicalDevice must be implemented in the read-only containers - should it also be implemented here?
 func (handler *APIHandler) GetLogicalDevice(ctx context.Context, id *voltha.ID) (*voltha.LogicalDevice, error) {
 	log.Debugw("GetLogicalDevice-request", log.Fields{"id": id})
diff --git a/tests/core/nbi_test.go b/tests/core/nbi_test.go
new file mode 100644
index 0000000..6e1b531
--- /dev/null
+++ b/tests/core/nbi_test.go
@@ -0,0 +1,240 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+	"context"
+	"fmt"
+	"github.com/golang/protobuf/ptypes/empty"
+	"github.com/opencord/voltha-go/common/log"
+	"github.com/opencord/voltha-go/protos/voltha"
+	"github.com/stretchr/testify/assert"
+	"google.golang.org/grpc"
+	"os"
+	"os/exec"
+	"testing"
+	"time"
+)
+
+var conn *grpc.ClientConn
+var stub voltha.VolthaServiceClient
+var devices map[string]*voltha.Device
+
+func init() {
+	log.AddPackage(log.JSON, log.ErrorLevel, nil)
+	log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
+	log.SetAllLogLevel(log.ErrorLevel)
+
+	//Start kafka and Etcd
+	startKafkaEtcd()
+	time.Sleep(10 * time.Second) //TODO: Find a better way to ascertain they are up
+
+	stub = setupGrpcConnection()
+	stub = voltha.NewVolthaServiceClient(conn)
+	devices = make(map[string]*voltha.Device)
+}
+
+func setupGrpcConnection() voltha.VolthaServiceClient {
+	grpcHostIP := os.Getenv("DOCKER_HOST_IP")
+	grpcPort := 50057
+	grpcHost := fmt.Sprintf("%s:%d", grpcHostIP, grpcPort)
+	var err error
+	conn, err = grpc.Dial(grpcHost, grpc.WithInsecure())
+	if err != nil {
+		log.Fatalf("did not connect: %s", err)
+	}
+	return voltha.NewVolthaServiceClient(conn)
+}
+
+func clearAllDevices(clearMap bool) {
+	for key, _ := range devices {
+		ctx := context.Background()
+		response, err := stub.DeleteDevice(ctx, &voltha.ID{Id: key})
+		log.Infow("response", log.Fields{"res": response, "error": err})
+		if clearMap {
+			delete(devices, key)
+		}
+	}
+}
+
+// Verify if all ids are present in the global list of devices
+func hasAllIds(ids *voltha.IDs) bool {
+	if ids == nil && len(devices) == 0 {
+		return true
+	}
+	if ids == nil {
+		return false
+	}
+	for _, id := range ids.Items {
+		if _, exist := devices[id.Id]; !exist {
+			return false
+		}
+	}
+	return true
+}
+
+func startKafkaEtcd() {
+	fmt.Println("Starting Kafka and Etcd ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../compose/docker-compose-zk-kafka-test.yml", "up", "-d")
+	if err := cmd.Run(); err != nil {
+		log.Fatal(err)
+	}
+	cmd = exec.Command(command, "-f", "../../compose/docker-compose-etcd.yml", "up", "-d")
+	if err := cmd.Run(); err != nil {
+		log.Fatal(err)
+	}
+}
+
+func stopKafkaEtcd() {
+	fmt.Println("Stopping Kafka and Etcd ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../compose/docker-compose-zk-kafka-test.yml", "down")
+	if err := cmd.Run(); err != nil {
+		// ignore error - as this is mostly due network being left behind as its being used by other
+		// containers
+		log.Warn(err)
+	}
+	cmd = exec.Command(command, "-f", "../../compose/docker-compose-etcd.yml", "down")
+	if err := cmd.Run(); err != nil {
+		// ignore error - as this is mostly due network being left behind as its being used by other
+		// containers
+		log.Warn(err)
+	}
+}
+
+func startCore() {
+	fmt.Println("Starting voltha core ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../compose/rw_core.yml", "up", "-d")
+	if err := cmd.Run(); err != nil {
+		log.Fatal(err)
+	}
+}
+
+func stopCore() {
+	fmt.Println("Stopping voltha core ...")
+	command := "docker-compose"
+	cmd := exec.Command(command, "-f", "../../compose/rw_core.yml", "down")
+	if err := cmd.Run(); err != nil {
+		// ignore error - as this is mostly due network being left behind as its being used by other
+		// containers
+		log.Warn(err)
+	}
+}
+
+func TestListDeviceIds(t *testing.T) {
+	//1. Start the core
+	startCore()
+
+	// Wait until it's up - TODO: find a better way to check
+	time.Sleep(10 * time.Second)
+
+	//2. Create a set of devices into the Core
+	for i := 0; i < 10; i++ {
+		ctx := context.Background()
+		device := &voltha.Device{Type: "simulated_olt"}
+		response, err := stub.CreateDevice(ctx, device)
+		log.Infow("response", log.Fields{"res": response, "error": err})
+		assert.Nil(t, err)
+		devices[response.Id] = response
+	}
+
+	//3. Verify devices have been added correctly
+	ctx := context.Background()
+	response, err := stub.ListDeviceIds(ctx, &empty.Empty{})
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Nil(t, err)
+	assert.True(t, hasAllIds(response))
+
+	//	4. Stop the core
+	stopCore()
+}
+
+func TestReconcileDevices(t *testing.T) {
+	//1. Start the core
+	startCore()
+
+	// Wait until it's up - TODO: find a better way to check
+	time.Sleep(10 * time.Second)
+
+	//2. Create a set of devices into the Core
+	for i := 0; i < 10; i++ {
+		ctx := context.Background()
+		device := &voltha.Device{Type: "simulated_olt"}
+		response, err := stub.CreateDevice(ctx, device)
+		log.Infow("response", log.Fields{"res": response, "error": err})
+		assert.Nil(t, err)
+		devices[response.Id] = response
+	}
+	//3. Verify devices have been added correctly
+	ctx := context.Background()
+	response, err := stub.ListDeviceIds(ctx, &empty.Empty{})
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Nil(t, err)
+	assert.True(t, hasAllIds(response))
+
+	//4. Stop the core and restart it. This will start the core with no data in memory but
+	// etcd will still have the data.
+	stopCore()
+	time.Sleep(5 * time.Second)
+	startCore()
+	time.Sleep(10 * time.Second)
+
+	//5. Setup the connection again
+	stub = setupGrpcConnection()
+
+	//6. Verify there are no devices left
+	ctx = context.Background()
+	response, err = stub.ListDeviceIds(ctx, &empty.Empty{})
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Nil(t, err)
+	assert.Equal(t, len(response.Items), 0)
+
+	//7. Invoke reconcile with all stored list
+	toRestore := &voltha.IDs{Items: make([]*voltha.ID, 0)}
+	for key, _ := range devices {
+		toRestore.Items = append(toRestore.Items, &voltha.ID{Id: key})
+	}
+	ctx = context.Background()
+	_, err = stub.ReconcileDevices(ctx, toRestore)
+	assert.Nil(t, err)
+
+	//8. Verify all devices have been restored
+	ctx = context.Background()
+	response, err = stub.ListDeviceIds(ctx, &empty.Empty{})
+	log.Infow("response", log.Fields{"res": response, "error": err})
+	assert.Nil(t, err)
+	assert.True(t, hasAllIds(response))
+
+	for _, id := range response.Items {
+		fmt.Println("id", id.Id)
+	}
+
+	//9. Store the core
+	stopCore()
+}
+
+func shutdown() {
+	conn.Close()
+	stopKafkaEtcd()
+}
+
+func TestMain(m *testing.M) {
+	code := m.Run()
+	shutdown()
+	os.Exit(code)
+}