This commit consists of adding two new northbound APIs to the Core
to be used mostly by the Affinity Router: ListDeviceIds retrieves the
ids of devices present in a Core memory; ReconcileDevices ia an API
the Affinity router can use to push a list of device ids to a Core
for the latter to load and reconcile the devices in memory (used
mostly of a core restart).
Change-Id: I0d292054e09a099ad8be7669fbc3fe3ba15a5579
diff --git a/common/core/northbound/grpc/default_api_handler.go b/common/core/northbound/grpc/default_api_handler.go
index f935462..0cc76c5 100644
--- a/common/core/northbound/grpc/default_api_handler.go
+++ b/common/core/northbound/grpc/default_api_handler.go
@@ -117,6 +117,19 @@
return nil, errors.New("UnImplemented")
}
+func (handler *DefaultAPIHandler) ListDeviceIds(ctx context.Context, empty *empty.Empty) (*voltha.IDs, error) {
+ log.Debug("ListDeviceIDs-request")
+ return nil, errors.New("UnImplemented")
+}
+
+func (handler *DefaultAPIHandler) ReconcileDevices(ctx context.Context, ids *voltha.IDs) (*empty.Empty, error) {
+ if ids != nil {
+ log.Debugw("ReconcileDevices-request", log.Fields{"length": len(ids.Items)})
+ return nil, errors.New("UnImplemented")
+ }
+ return nil, errors.New("ids-null")
+}
+
func (handler *DefaultAPIHandler) GetDevice(ctx context.Context, id *voltha.ID) (*voltha.Device, error) {
log.Debugw("GetDevice-request", log.Fields{"id": id})
return nil, errors.New("UnImplemented")
diff --git a/protos/common.proto b/protos/common.proto
index d1234d5..b1c13ae 100644
--- a/protos/common.proto
+++ b/protos/common.proto
@@ -11,6 +11,11 @@
string id = 1;
}
+// Represents a list of IDs
+message IDs {
+ repeated ID items = 1;
+}
+
enum TestModeKeys {
api_test=0;
}
diff --git a/protos/voltha.proto b/protos/voltha.proto
index e03442a..f2f885f 100644
--- a/protos/voltha.proto
+++ b/protos/voltha.proto
@@ -269,6 +269,23 @@
option (voltha.yang_xml_tag).xml_tag = 'devices';
}
+ // List all physical devices IDs controlled by the Voltha cluster
+ rpc ListDeviceIds(google.protobuf.Empty) returns(IDs) {
+ option (google.api.http) = {
+ get: "/api/v1/deviceids"
+ };
+ option (voltha.yang_xml_tag).xml_tag = 'id';
+ option (voltha.yang_xml_tag).list_items_name = 'items';
+ }
+
+ // Request to a voltha Core to reconcile a set of devices based on their IDs
+ rpc ReconcileDevices(IDs) returns(google.protobuf.Empty) {
+ option (google.api.http) = {
+ post: "/api/v1/deviceids"
+ body: "*"
+ };
+ }
+
// Get more information on a given physical device
rpc GetDevice(ID) returns(Device) {
option (google.api.http) = {
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index c42a64e..484ff35 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -60,15 +60,15 @@
// Do not call NewBackend constructor; it creates its own KV client
// Commented the backend for now until the issue between the model and the KV store
// is resolved.
- //backend := model.Backend{
- // MsgClient: kvClient,
- // StoreType: cf.KVStoreType,
- // Host: cf.KVStoreHost,
- // Port: cf.KVStorePort,
- // Timeout: cf.KVStoreTimeout,
- // PathPrefix: "service/voltha"}
- core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, nil)
- core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, nil)
+ backend := model.Backend{
+ Client: kvClient,
+ StoreType: cf.KVStoreType,
+ Host: cf.KVStoreHost,
+ Port: cf.KVStorePort,
+ Timeout: cf.KVStoreTimeout,
+ PathPrefix: "service/voltha"}
+ core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, &backend)
+ core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, &backend)
core.clusterDataProxy = core.clusterDataRoot.CreateProxy("/", false)
core.localDataProxy = core.localDataRoot.CreateProxy("/", false)
return &core
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 9e8710b..13e0adf 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -285,18 +285,15 @@
agent.lockDevice.Unlock()
return status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceId, voltha.AdminState_DISABLED)
}
- // Send the request to an Adapter and wait for a response
- if err := agent.adapterProxy.DeleteDevice(ctx, device); err != nil {
- log.Debugw("deleteDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
- agent.lockDevice.Unlock()
- return err
+ if device.AdminState != voltha.AdminState_PREPROVISIONED {
+ // Send the request to an Adapter and wait for a response
+ if err := agent.adapterProxy.DeleteDevice(ctx, device); err != nil {
+ log.Debugw("deleteDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ agent.lockDevice.Unlock()
+ return err
+ }
}
- // Set the device Admin state to DELETED in order to trigger the callback to delete
- // child devices, if any
- // Received an Ack (no error found above). Now update the device in the model to the expected state
- cloned := proto.Clone(device).(*voltha.Device)
- cloned.AdminState = voltha.AdminState_DELETED
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ if removed := agent.clusterDataProxy.Remove("/devices/"+agent.deviceId, ""); removed == nil {
agent.lockDevice.Unlock()
return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index e50c035..3f27e10 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -104,6 +104,16 @@
return nil
}
+func (dMgr *DeviceManager) listDeviceIdsFromMap() *voltha.IDs {
+ dMgr.lockDeviceAgentsMap.Lock()
+ defer dMgr.lockDeviceAgentsMap.Unlock()
+ result := &voltha.IDs{Items: make([]*voltha.ID, 0)}
+ for key, _ := range dMgr.deviceAgents {
+ result.Items = append(result.Items, &voltha.ID{Id: key})
+ }
+ return result
+}
+
func (dMgr *DeviceManager) createDevice(ctx context.Context, device *voltha.Device, ch chan interface{}) {
log.Debugw("createDevice", log.Fields{"device": device, "aproxy": dMgr.adapterProxy})
@@ -202,19 +212,46 @@
return result, nil
}
-//func (dMgr *DeviceManager) ListDevices() (*voltha.Devices, error) {
-// log.Debug("ListDevices")
-// result := &voltha.Devices{}
-// dMgr.lockDeviceAgentsMap.Lock()
-// defer dMgr.lockDeviceAgentsMap.Unlock()
-// for _, agent := range dMgr.deviceAgents {
-// if device, err := agent.getDevice(); err == nil {
-// //cloned := proto.Clone(device).(*voltha.Device)
-// result.Items = append(result.Items, device)
-// }
-// }
-// return result, nil
-//}
+// ListDeviceIds retrieves the latest device IDs information from the data model (memory data only)
+func (dMgr *DeviceManager) ListDeviceIds() (*voltha.IDs, error) {
+ log.Debug("ListDeviceIDs")
+ // Report only device IDs that are in the device agent map
+ return dMgr.listDeviceIdsFromMap(), nil
+}
+
+//ReconcileDevices is a request to a voltha core to managed a list of devices based on their IDs
+func (dMgr *DeviceManager) ReconcileDevices(ctx context.Context, ids *voltha.IDs, ch chan interface{}) {
+ log.Debug("ReconcileDevices")
+ var res interface{}
+ if ids != nil {
+ toReconcile := len(ids.Items)
+ reconciled := 0
+ for _, id := range ids.Items {
+ // Act on the device only if its not present in the agent map
+ if agent := dMgr.getDeviceAgent(id.Id); agent == nil {
+ // Device Id not in memory
+ log.Debugw("reconciling-device", log.Fields{"id": id.Id})
+ // Load device from model
+ if device := dMgr.clusterDataProxy.Get("/devices/"+id.Id, 0, false, ""); device != nil {
+ agent = newDeviceAgent(dMgr.adapterProxy, device.(*voltha.Device), dMgr, dMgr.clusterDataProxy)
+ dMgr.addDeviceAgentToMap(agent)
+ agent.start(nil)
+ reconciled += 1
+ } else {
+ log.Warnw("device-inexistent", log.Fields{"id": id.Id})
+ }
+ } else {
+ reconciled += 1
+ }
+ }
+ if toReconcile != reconciled {
+ res = status.Errorf(codes.DataLoss, "less-device-reconciled:%d/%d", reconciled, toReconcile)
+ }
+ } else {
+ res = status.Errorf(codes.InvalidArgument, "empty-list-of-ids")
+ }
+ sendResponse(ctx, ch, res)
+}
func (dMgr *DeviceManager) updateDevice(device *voltha.Device) error {
log.Debugw("updateDevice", log.Fields{"deviceid": device.Id, "device": device})
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 9b7fac7..fa6d0ca 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -168,6 +168,29 @@
return handler.deviceMgr.ListDevices()
}
+// ListDeviceIds returns the list of device ids managed by a voltha core
+func (handler *APIHandler) ListDeviceIds(ctx context.Context, empty *empty.Empty) (*voltha.IDs, error) {
+ log.Debug("ListDeviceIDs")
+ if isTestMode(ctx) {
+ out := &voltha.IDs{Items: make([]*voltha.ID, 0)}
+ return out, nil
+ }
+ return handler.deviceMgr.ListDeviceIds()
+}
+
+//ReconcileDevices is a request to a voltha core to managed a list of devices based on their IDs
+func (handler *APIHandler) ReconcileDevices(ctx context.Context, ids *voltha.IDs) (*empty.Empty, error) {
+ log.Debug("ReconcileDevices")
+ if isTestMode(ctx) {
+ out := new(empty.Empty)
+ return out, nil
+ }
+ ch := make(chan interface{})
+ defer close(ch)
+ go handler.deviceMgr.ReconcileDevices(ctx, ids, ch)
+ return waitForNilResponseOnSuccess(ctx, ch)
+}
+
// GetLogicalDevice must be implemented in the read-only containers - should it also be implemented here?
func (handler *APIHandler) GetLogicalDevice(ctx context.Context, id *voltha.ID) (*voltha.LogicalDevice, error) {
log.Debugw("GetLogicalDevice-request", log.Fields{"id": id})
diff --git a/tests/core/nbi_test.go b/tests/core/nbi_test.go
new file mode 100644
index 0000000..6e1b531
--- /dev/null
+++ b/tests/core/nbi_test.go
@@ -0,0 +1,240 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package core
+
+import (
+ "context"
+ "fmt"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/protos/voltha"
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/grpc"
+ "os"
+ "os/exec"
+ "testing"
+ "time"
+)
+
+var conn *grpc.ClientConn
+var stub voltha.VolthaServiceClient
+var devices map[string]*voltha.Device
+
+func init() {
+ log.AddPackage(log.JSON, log.ErrorLevel, nil)
+ log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
+ log.SetAllLogLevel(log.ErrorLevel)
+
+ //Start kafka and Etcd
+ startKafkaEtcd()
+ time.Sleep(10 * time.Second) //TODO: Find a better way to ascertain they are up
+
+ stub = setupGrpcConnection()
+ stub = voltha.NewVolthaServiceClient(conn)
+ devices = make(map[string]*voltha.Device)
+}
+
+func setupGrpcConnection() voltha.VolthaServiceClient {
+ grpcHostIP := os.Getenv("DOCKER_HOST_IP")
+ grpcPort := 50057
+ grpcHost := fmt.Sprintf("%s:%d", grpcHostIP, grpcPort)
+ var err error
+ conn, err = grpc.Dial(grpcHost, grpc.WithInsecure())
+ if err != nil {
+ log.Fatalf("did not connect: %s", err)
+ }
+ return voltha.NewVolthaServiceClient(conn)
+}
+
+func clearAllDevices(clearMap bool) {
+ for key, _ := range devices {
+ ctx := context.Background()
+ response, err := stub.DeleteDevice(ctx, &voltha.ID{Id: key})
+ log.Infow("response", log.Fields{"res": response, "error": err})
+ if clearMap {
+ delete(devices, key)
+ }
+ }
+}
+
+// Verify if all ids are present in the global list of devices
+func hasAllIds(ids *voltha.IDs) bool {
+ if ids == nil && len(devices) == 0 {
+ return true
+ }
+ if ids == nil {
+ return false
+ }
+ for _, id := range ids.Items {
+ if _, exist := devices[id.Id]; !exist {
+ return false
+ }
+ }
+ return true
+}
+
+func startKafkaEtcd() {
+ fmt.Println("Starting Kafka and Etcd ...")
+ command := "docker-compose"
+ cmd := exec.Command(command, "-f", "../../compose/docker-compose-zk-kafka-test.yml", "up", "-d")
+ if err := cmd.Run(); err != nil {
+ log.Fatal(err)
+ }
+ cmd = exec.Command(command, "-f", "../../compose/docker-compose-etcd.yml", "up", "-d")
+ if err := cmd.Run(); err != nil {
+ log.Fatal(err)
+ }
+}
+
+func stopKafkaEtcd() {
+ fmt.Println("Stopping Kafka and Etcd ...")
+ command := "docker-compose"
+ cmd := exec.Command(command, "-f", "../../compose/docker-compose-zk-kafka-test.yml", "down")
+ if err := cmd.Run(); err != nil {
+ // ignore error - as this is mostly due network being left behind as its being used by other
+ // containers
+ log.Warn(err)
+ }
+ cmd = exec.Command(command, "-f", "../../compose/docker-compose-etcd.yml", "down")
+ if err := cmd.Run(); err != nil {
+ // ignore error - as this is mostly due network being left behind as its being used by other
+ // containers
+ log.Warn(err)
+ }
+}
+
+func startCore() {
+ fmt.Println("Starting voltha core ...")
+ command := "docker-compose"
+ cmd := exec.Command(command, "-f", "../../compose/rw_core.yml", "up", "-d")
+ if err := cmd.Run(); err != nil {
+ log.Fatal(err)
+ }
+}
+
+func stopCore() {
+ fmt.Println("Stopping voltha core ...")
+ command := "docker-compose"
+ cmd := exec.Command(command, "-f", "../../compose/rw_core.yml", "down")
+ if err := cmd.Run(); err != nil {
+ // ignore error - as this is mostly due network being left behind as its being used by other
+ // containers
+ log.Warn(err)
+ }
+}
+
+func TestListDeviceIds(t *testing.T) {
+ //1. Start the core
+ startCore()
+
+ // Wait until it's up - TODO: find a better way to check
+ time.Sleep(10 * time.Second)
+
+ //2. Create a set of devices into the Core
+ for i := 0; i < 10; i++ {
+ ctx := context.Background()
+ device := &voltha.Device{Type: "simulated_olt"}
+ response, err := stub.CreateDevice(ctx, device)
+ log.Infow("response", log.Fields{"res": response, "error": err})
+ assert.Nil(t, err)
+ devices[response.Id] = response
+ }
+
+ //3. Verify devices have been added correctly
+ ctx := context.Background()
+ response, err := stub.ListDeviceIds(ctx, &empty.Empty{})
+ log.Infow("response", log.Fields{"res": response, "error": err})
+ assert.Nil(t, err)
+ assert.True(t, hasAllIds(response))
+
+ // 4. Stop the core
+ stopCore()
+}
+
+func TestReconcileDevices(t *testing.T) {
+ //1. Start the core
+ startCore()
+
+ // Wait until it's up - TODO: find a better way to check
+ time.Sleep(10 * time.Second)
+
+ //2. Create a set of devices into the Core
+ for i := 0; i < 10; i++ {
+ ctx := context.Background()
+ device := &voltha.Device{Type: "simulated_olt"}
+ response, err := stub.CreateDevice(ctx, device)
+ log.Infow("response", log.Fields{"res": response, "error": err})
+ assert.Nil(t, err)
+ devices[response.Id] = response
+ }
+ //3. Verify devices have been added correctly
+ ctx := context.Background()
+ response, err := stub.ListDeviceIds(ctx, &empty.Empty{})
+ log.Infow("response", log.Fields{"res": response, "error": err})
+ assert.Nil(t, err)
+ assert.True(t, hasAllIds(response))
+
+ //4. Stop the core and restart it. This will start the core with no data in memory but
+ // etcd will still have the data.
+ stopCore()
+ time.Sleep(5 * time.Second)
+ startCore()
+ time.Sleep(10 * time.Second)
+
+ //5. Setup the connection again
+ stub = setupGrpcConnection()
+
+ //6. Verify there are no devices left
+ ctx = context.Background()
+ response, err = stub.ListDeviceIds(ctx, &empty.Empty{})
+ log.Infow("response", log.Fields{"res": response, "error": err})
+ assert.Nil(t, err)
+ assert.Equal(t, len(response.Items), 0)
+
+ //7. Invoke reconcile with all stored list
+ toRestore := &voltha.IDs{Items: make([]*voltha.ID, 0)}
+ for key, _ := range devices {
+ toRestore.Items = append(toRestore.Items, &voltha.ID{Id: key})
+ }
+ ctx = context.Background()
+ _, err = stub.ReconcileDevices(ctx, toRestore)
+ assert.Nil(t, err)
+
+ //8. Verify all devices have been restored
+ ctx = context.Background()
+ response, err = stub.ListDeviceIds(ctx, &empty.Empty{})
+ log.Infow("response", log.Fields{"res": response, "error": err})
+ assert.Nil(t, err)
+ assert.True(t, hasAllIds(response))
+
+ for _, id := range response.Items {
+ fmt.Println("id", id.Id)
+ }
+
+ //9. Store the core
+ stopCore()
+}
+
+func shutdown() {
+ conn.Close()
+ stopKafkaEtcd()
+}
+
+func TestMain(m *testing.M) {
+ code := m.Run()
+ shutdown()
+ os.Exit(code)
+}