Add NETCONF notification for ONU activation and Kafka client to receive events, update dependencies
Change-Id: I5f768fa8077ef7c64e00a534744ca47492344935
diff --git a/internal/core/adapter.go b/internal/core/adapter.go
index a96767f..6a59650 100644
--- a/internal/core/adapter.go
+++ b/internal/core/adapter.go
@@ -23,6 +23,7 @@
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-lib-go/v7/pkg/log"
"github.com/opencord/voltha-northbound-bbf-adapter/internal/clients"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
)
var AdapterInstance *VolthaYangAdapter
@@ -42,13 +43,35 @@
func (t *VolthaYangAdapter) GetDevices(ctx context.Context) ([]YangItem, error) {
devices, err := t.volthaNbiClient.Service.ListDevices(ctx, &empty.Empty{})
if err != nil {
- err = fmt.Errorf("get-devices-failed: %v", err)
- return nil, err
+ return nil, fmt.Errorf("get-devices-failed: %v", err)
}
+ logger.Debugw(ctx, "get-devices-success", log.Fields{"devices": devices})
- items := translateDevices(*devices)
+ items := []YangItem{}
- logger.Debugw(ctx, "get-devices-success", log.Fields{"items": items})
+ for _, device := range devices.Items {
+ items = append(items, translateDevice(device)...)
+
+ if !device.Root {
+ //If the device is an ONU, also expose UNIs
+ ports, err := t.volthaNbiClient.Service.ListDevicePorts(ctx, &voltha.ID{Id: device.Id})
+ if err != nil {
+ return nil, fmt.Errorf("get-onu-ports-failed: %v", err)
+ }
+ logger.Debugw(ctx, "get-ports-success", log.Fields{"deviceId": device.Id, "ports": ports})
+
+ portsItems, err := translateOnuPorts(device.Id, ports)
+ if err != nil {
+ logger.Errorw(ctx, "cannot-translate-onu-ports", log.Fields{
+ "deviceId": device.Id,
+ "err": err,
+ })
+ continue
+ }
+
+ items = append(items, portsItems...)
+ }
+ }
return items, nil
}
diff --git a/internal/core/translation.go b/internal/core/translation.go
index 634272c..ef6582d 100644
--- a/internal/core/translation.go
+++ b/internal/core/translation.go
@@ -18,15 +18,15 @@
import (
"fmt"
+ "time"
"github.com/opencord/voltha-protos/v5/go/common"
"github.com/opencord/voltha-protos/v5/go/voltha"
)
const (
- DeviceAggregationModel = "bbf-device-aggregation"
- DevicesPath = "/" + DeviceAggregationModel + ":devices"
- HardwarePath = "data/ietf-hardware:hardware"
+ DeviceAggregationModule = "bbf-device-aggregation"
+ DevicesPath = "/" + DeviceAggregationModule + ":devices"
//Device types
DeviceTypeOlt = "bbf-device-types:olt"
@@ -42,6 +42,13 @@
ietfOperStateDisabled = "disabled"
ietfOperStateEnabled = "enabled"
ietfOperStateTesting = "testing"
+ ietfOperStateUp = "up"
+ ietfOperStateDown = "down"
+
+ //Keys of useful values in device events
+ eventContextKeyPonId = "pon-id"
+ eventContextKeyOnuSn = "serial-number"
+ eventContextKeyOltSn = "olt-serial-number"
)
type YangItem struct {
@@ -56,7 +63,7 @@
//getDevicePath returns the yang path to the root of the device's hardware module in its data mountpoint
func getDeviceHardwarePath(id string) string {
- return fmt.Sprintf("%s/device[name='%s']/%s/component[name='%s']", DevicesPath, id, HardwarePath, id)
+ return fmt.Sprintf("%s/device[name='%s']/data/ietf-hardware:hardware/component[name='%s']", DevicesPath, id, id)
}
//ietfHardwareAdminState returns the string that represents the ietf-hardware admin state
@@ -101,8 +108,30 @@
return ietfOperStateUnknown
}
+//ietfHardwareOperState returns the string that represents the ietf-interfaces oper state
+//enum value corresponding to the one of VOLTHA
+func ietfInterfacesOperState(volthaOperState voltha.OperStatus_Types) string {
+ //TODO: verify this mapping is correct
+ switch volthaOperState {
+ case common.OperStatus_UNKNOWN:
+ return ietfOperStateUnknown
+ case common.OperStatus_TESTING:
+ return ietfOperStateTesting
+ case common.OperStatus_ACTIVE:
+ return ietfOperStateUp
+ case common.OperStatus_DISCOVERED:
+ case common.OperStatus_ACTIVATING:
+ case common.OperStatus_FAILED:
+ case common.OperStatus_RECONCILING:
+ case common.OperStatus_RECONCILING_FAILED:
+ return ietfOperStateDown
+ }
+
+ return ietfOperStateUnknown
+}
+
//translateDevice returns a slice of yang items that represent a voltha device
-func translateDevice(device voltha.Device) []YangItem {
+func translateDevice(device *voltha.Device) []YangItem {
devicePath := getDevicePath(device.Id)
hardwarePath := getDeviceHardwarePath(device.Id)
@@ -110,70 +139,148 @@
//Device type
if device.Root {
+ //OLT
result = append(result, YangItem{
- Path: fmt.Sprintf("%s/type", devicePath),
+ Path: devicePath + "/type",
Value: DeviceTypeOlt,
})
} else {
+ //ONU
result = append(result, YangItem{
- Path: fmt.Sprintf("%s/type", devicePath),
+ Path: devicePath + "/type",
Value: DeviceTypeOnu,
})
}
//Vendor name
result = append(result, YangItem{
- Path: fmt.Sprintf("%s/mfg-name", hardwarePath),
+ Path: hardwarePath + "/mfg-name",
Value: device.Vendor,
})
//Model
result = append(result, YangItem{
- Path: fmt.Sprintf("%s/model-name", hardwarePath),
+ Path: hardwarePath + "/model-name",
Value: device.Model,
})
//Hardware version
result = append(result, YangItem{
- Path: fmt.Sprintf("%s/hardware-rev", hardwarePath),
+ Path: hardwarePath + "/hardware-rev",
Value: device.HardwareVersion,
})
//Firmware version
result = append(result, YangItem{
- Path: fmt.Sprintf("%s/firmware-rev", hardwarePath),
+ Path: hardwarePath + "/firmware-rev",
Value: device.FirmwareVersion,
})
//Serial number
result = append(result, YangItem{
- Path: fmt.Sprintf("%s/serial-num", hardwarePath),
+ Path: hardwarePath + "/serial-num",
Value: device.SerialNumber,
})
//Administrative state
//Translates VOLTHA admin state enum to ietf-hardware enum
result = append(result, YangItem{
- Path: fmt.Sprintf("%s/state/admin-state", hardwarePath),
+ Path: hardwarePath + "/state/admin-state",
Value: ietfHardwareAdminState(device.AdminState),
})
//Operative state
result = append(result, YangItem{
- Path: fmt.Sprintf("%s/state/oper-state", hardwarePath),
+ Path: hardwarePath + "/state/oper-state",
Value: ietfHardwareOperState(device.OperStatus),
})
return result
}
-//translateDevices returns a slice of yang items that represent a list of voltha devices
-func translateDevices(devices voltha.Devices) []YangItem {
+//translateOnuPorts returns a slice of yang items that represent the UNIs of an ONU
+func translateOnuPorts(deviceId string, ports *voltha.Ports) ([]YangItem, error) {
+ interfacesPath := getDevicePath(deviceId) + "/data/ietf-interfaces:interfaces"
result := []YangItem{}
- for _, device := range devices.Items {
- result = append(result, translateDevice(*device)...)
+ for _, port := range ports.Items {
+ if port.Type == voltha.Port_ETHERNET_UNI {
+ if port.OfpPort == nil {
+ return nil, fmt.Errorf("no-ofp-port-in-uni: %s %d", deviceId, port.PortNo)
+ }
+
+ interfacePath := fmt.Sprintf("%s/interface[name='%s']", interfacesPath, port.OfpPort.Name)
+
+ result = append(result, []YangItem{
+ {
+ Path: interfacePath + "/type",
+ Value: "bbf-xpon-if-type:onu-v-vrefpoint",
+ },
+ {
+ Path: interfacePath + "/oper-status",
+ Value: ietfInterfacesOperState(port.OperStatus),
+ },
+ }...)
+ }
}
- return result
+ return result, nil
+}
+
+//TranslateOnuActivatedEvent returns a slice of yang items and the name of the channel termination to populate
+//an ONU discovery notification with data from ONU_ACTIVATED_RAISE_EVENT coming from the Kafka bus
+func TranslateOnuActivatedEvent(eventHeader *voltha.EventHeader, deviceEvent *voltha.DeviceEvent) (notification []YangItem, channelTermination []YangItem, err error) {
+
+ //TODO: the use of this notification, which requires the creation of a dummy channel termination node,
+ //is temporary, and will be substituted with a more fitting one as soon as it will be defined
+
+ //Check if the needed information is present
+ ponId, ok := deviceEvent.Context[eventContextKeyPonId]
+ if !ok {
+ return nil, nil, fmt.Errorf("missing-key-from-event-context: %s", eventContextKeyPonId)
+ }
+ oltId, ok := deviceEvent.Context[eventContextKeyOltSn]
+ if !ok {
+ return nil, nil, fmt.Errorf("missing-key-from-event-context: %s", eventContextKeyPonId)
+ }
+ ponName := oltId + "-pon-" + ponId
+
+ onuSn, ok := deviceEvent.Context[eventContextKeyOnuSn]
+ if !ok {
+ return nil, nil, fmt.Errorf("missing-key-from-event-context: %s", eventContextKeyOnuSn)
+ }
+
+ notificationPath := "/bbf-xpon-onu-states:onu-state-change"
+
+ notification = []YangItem{
+ {
+ Path: notificationPath + "/detected-serial-number",
+ Value: onuSn,
+ },
+ {
+ Path: notificationPath + "/channel-termination-ref",
+ Value: ponName,
+ },
+ {
+ Path: notificationPath + "/onu-state-last-change",
+ Value: eventHeader.RaisedTs.AsTime().Format(time.RFC3339),
+ },
+ {
+ Path: notificationPath + "/onu-state",
+ Value: "bbf-xpon-onu-types:onu-present",
+ },
+ {
+ Path: notificationPath + "/detected-registration-id",
+ Value: deviceEvent.ResourceId,
+ },
+ }
+
+ channelTermination = []YangItem{
+ {
+ Path: fmt.Sprintf("/ietf-interfaces:interfaces/interface[name='%s']/type", ponName),
+ Value: "bbf-if-type:vlan-sub-interface",
+ },
+ }
+
+ return notification, channelTermination, nil
}
diff --git a/internal/core/translation_test.go b/internal/core/translation_test.go
index fb2e751..41dc1dc 100644
--- a/internal/core/translation_test.go
+++ b/internal/core/translation_test.go
@@ -19,9 +19,12 @@
import (
"fmt"
"testing"
+ "time"
+ "github.com/opencord/voltha-protos/v5/go/openflow_13"
"github.com/opencord/voltha-protos/v5/go/voltha"
"github.com/stretchr/testify/assert"
+ "google.golang.org/protobuf/types/known/timestamppb"
)
const (
@@ -44,58 +47,253 @@
}
func TestTranslateDevice(t *testing.T) {
- olt := voltha.Device{
- Id: testDeviceId,
- Root: true,
+ olt := &voltha.Device{
+ Id: testDeviceId,
+ Root: true,
+ Vendor: "BBSim",
+ Model: "asfvolt16",
+ SerialNumber: "BBSIM_OLT_10",
+ HardwareVersion: "v0.0.2",
+ FirmwareVersion: "v0.0.3",
+ AdminState: voltha.AdminState_ENABLED,
+ OperStatus: voltha.OperStatus_ACTIVE,
}
items := translateDevice(olt)
- val, ok := getItemWithPath(items, fmt.Sprintf("%s/type", getDevicePath(testDeviceId)))
- assert.True(t, ok, "No type item for olt")
- assert.Equal(t, DeviceTypeOlt, val)
+ oltPath := getDevicePath(testDeviceId)
+ oltHwPath := getDeviceHardwarePath(testDeviceId)
- onu := voltha.Device{
- Id: testDeviceId,
- Root: false,
+ expected := []YangItem{
+ {
+ Path: oltPath + "/type",
+ Value: DeviceTypeOlt,
+ },
+ {
+ Path: oltHwPath + "/mfg-name",
+ Value: "BBSim",
+ },
+ {
+ Path: oltHwPath + "/model-name",
+ Value: "asfvolt16",
+ },
+ {
+ Path: oltHwPath + "/hardware-rev",
+ Value: "v0.0.2",
+ },
+ {
+ Path: oltHwPath + "/firmware-rev",
+ Value: "v0.0.3",
+ },
+ {
+ Path: oltHwPath + "/serial-num",
+ Value: "BBSIM_OLT_10",
+ },
+ {
+ Path: oltHwPath + "/state/admin-state",
+ Value: ietfAdminStateUnlocked,
+ },
+ {
+ Path: oltHwPath + "/state/oper-state",
+ Value: ietfOperStateEnabled,
+ },
+ }
+
+ assert.NotEmpty(t, items, "No OLT items")
+ for _, e := range expected {
+ val, ok := getItemWithPath(items, e.Path)
+ assert.True(t, ok, e.Path+" missing for OLT")
+ assert.Equal(t, e.Value, val, "Wrong value for "+e.Path)
+ }
+
+ onu := &voltha.Device{
+ Id: testDeviceId,
+ Root: false,
+ Vendor: "BBSM",
+ Model: "v0.0.1",
+ SerialNumber: "BBSM000a0001",
+ HardwareVersion: "v0.0.2",
+ FirmwareVersion: "v0.0.3",
+ AdminState: voltha.AdminState_ENABLED,
+ OperStatus: voltha.OperStatus_ACTIVE,
}
items = translateDevice(onu)
- val, ok = getItemWithPath(items, fmt.Sprintf("%s/type", getDevicePath(testDeviceId)))
- assert.True(t, ok, "No type item for onu")
- assert.Equal(t, DeviceTypeOnu, val)
+ onuPath := getDevicePath(testDeviceId)
+ onuHwPath := getDeviceHardwarePath(testDeviceId)
+
+ expected = []YangItem{
+ {
+ Path: onuPath + "/type",
+ Value: DeviceTypeOnu,
+ },
+ {
+ Path: onuHwPath + "/mfg-name",
+ Value: "BBSM",
+ },
+ {
+ Path: onuHwPath + "/model-name",
+ Value: "v0.0.1",
+ },
+ {
+ Path: onuHwPath + "/hardware-rev",
+ Value: "v0.0.2",
+ },
+ {
+ Path: onuHwPath + "/firmware-rev",
+ Value: "v0.0.3",
+ },
+ {
+ Path: onuHwPath + "/serial-num",
+ Value: "BBSM000a0001",
+ },
+ {
+ Path: onuHwPath + "/state/admin-state",
+ Value: ietfAdminStateUnlocked,
+ },
+ {
+ Path: onuHwPath + "/state/oper-state",
+ Value: ietfOperStateEnabled,
+ },
+ }
+
+ assert.NotEmpty(t, items, "No ONU items")
+ for _, e := range expected {
+ val, ok := getItemWithPath(items, e.Path)
+ assert.True(t, ok, e.Path+" missing for ONU")
+ assert.Equal(t, e.Value, val, "Wrong value for "+e.Path)
+ }
}
-func TestTranslateDevices(t *testing.T) {
- devicesNum := 10
-
- //Create test devices
- devices := voltha.Devices{
- Items: []*voltha.Device{},
+func TestTranslateOnuPorts(t *testing.T) {
+ ports := &voltha.Ports{
+ Items: []*voltha.Port{
+ {
+ PortNo: 0,
+ Type: voltha.Port_ETHERNET_UNI,
+ OperStatus: voltha.OperStatus_ACTIVE,
+ },
+ },
}
- for i := 0; i < devicesNum; i++ {
- devices.Items = append(devices.Items, &voltha.Device{
- Id: fmt.Sprintf("%d", i),
- Root: i%2 == 0,
- })
+ _, err := translateOnuPorts(testDeviceId, ports)
+ assert.Error(t, err, "No error for missing Ofp port")
+
+ ports = &voltha.Ports{
+ Items: []*voltha.Port{
+ {
+ PortNo: 0,
+ Type: voltha.Port_ETHERNET_UNI,
+ OfpPort: &openflow_13.OfpPort{
+ Name: "BBSM000a0001-1",
+ },
+ OperStatus: voltha.OperStatus_ACTIVE,
+ },
+ {
+ PortNo: 1,
+ Type: voltha.Port_ETHERNET_UNI,
+ OfpPort: &openflow_13.OfpPort{
+ Name: "BBSM000a0001-2",
+ },
+ OperStatus: voltha.OperStatus_UNKNOWN,
+ },
+ {
+ PortNo: 0,
+ Type: voltha.Port_PON_ONU,
+ OperStatus: voltha.OperStatus_UNKNOWN,
+ },
+ },
}
- //Translate them to items
- items := translateDevices(devices)
+ portsItems, err := translateOnuPorts(testDeviceId, ports)
+ assert.Nil(t, err, "Translation error")
- //Check if the number of generated items is correct
- singleDeviceItemsNum := len(translateDevice(*devices.Items[0]))
- assert.Equal(t, singleDeviceItemsNum*devicesNum, len(items))
+ /*2 items for 2 UNIs, PON is ignored*/
+ assert.Equal(t, 4, len(portsItems), "No ports items")
- //Check if the content is right
- for i := 0; i < devicesNum; i++ {
- val, ok := getItemWithPath(items, fmt.Sprintf("%s/type", getDevicePath(devices.Items[i].Id)))
- assert.True(t, ok, fmt.Sprintf("No type item for device %d", i))
+ interfacesPath := getDevicePath(testDeviceId) + "/data/ietf-interfaces:interfaces"
- if devices.Items[i].Root {
- assert.Equal(t, DeviceTypeOlt, val)
- } else {
- assert.Equal(t, DeviceTypeOnu, val)
- }
+ expected := []YangItem{
+ {
+ Path: fmt.Sprintf("%s/interface[name='%s']/oper-status", interfacesPath, "BBSM000a0001-1"),
+ Value: ietfOperStateUp,
+ },
+ {
+ Path: fmt.Sprintf("%s/interface[name='%s']/type", interfacesPath, "BBSM000a0001-1"),
+ Value: "bbf-xpon-if-type:onu-v-vrefpoint",
+ },
+ {
+ Path: fmt.Sprintf("%s/interface[name='%s']/oper-status", interfacesPath, "BBSM000a0001-2"),
+ Value: ietfOperStateUnknown,
+ },
+ {
+ Path: fmt.Sprintf("%s/interface[name='%s']/type", interfacesPath, "BBSM000a0001-2"),
+ Value: "bbf-xpon-if-type:onu-v-vrefpoint",
+ },
+ }
+
+ for _, e := range expected {
+ val, ok := getItemWithPath(portsItems, e.Path)
+ assert.True(t, ok, e.Path+" missing for ports")
+ assert.Equal(t, e.Value, val, "Wrong value for "+e.Path)
+ }
+}
+
+func TestTranslateOnuActive(t *testing.T) {
+ timestamp := time.Now()
+ eventHeader := &voltha.EventHeader{
+ Id: "Voltha.openolt.ONU_ACTIVATED.1657705515351182767",
+ RaisedTs: timestamppb.New(timestamp),
+ Category: voltha.EventCategory_EQUIPMENT,
+ Type: voltha.EventType_DEVICE_EVENT,
+ }
+
+ deviceEvent := &voltha.DeviceEvent{
+ ResourceId: testDeviceId,
+ DeviceEventName: "ONU_ACTIVATED_RAISE_EVENT",
+ Description: "ONU Event - ONU_ACTIVATED - Raised",
+ Context: map[string]string{},
+ }
+
+ _, _, err := TranslateOnuActivatedEvent(eventHeader, deviceEvent)
+ assert.Error(t, err, "Empty context produces no error")
+
+ deviceEvent.Context[eventContextKeyPonId] = "0"
+ deviceEvent.Context[eventContextKeyOnuSn] = "BBSM000a0001"
+ deviceEvent.Context[eventContextKeyOltSn] = "BBSIM_OLT_10"
+
+ notificationPath := "/bbf-xpon-onu-states:onu-state-change"
+ expected := []YangItem{
+ {
+ Path: notificationPath + "/detected-serial-number",
+ Value: "BBSM000a0001",
+ },
+ {
+ Path: notificationPath + "/onu-state-last-change",
+ Value: timestamp.Format(time.RFC3339),
+ },
+ {
+ Path: notificationPath + "/onu-state",
+ Value: "bbf-xpon-onu-types:onu-present",
+ },
+ {
+ Path: notificationPath + "/detected-registration-id",
+ Value: testDeviceId,
+ },
+ }
+
+ notificationItems, channelTerminationItems, err := TranslateOnuActivatedEvent(eventHeader, deviceEvent)
+ assert.Nil(t, err, "Translation error")
+
+ assert.NotEmpty(t, channelTerminationItems, "No channel termination items")
+
+ assert.NotEmpty(t, notificationItems, "No notification items")
+
+ _, ok := getItemWithPath(notificationItems, notificationPath+"/channel-termination-ref")
+ assert.True(t, ok, "No channel termination reference in notification")
+
+ for _, e := range expected {
+ val, ok := getItemWithPath(notificationItems, e.Path)
+ assert.True(t, ok, e.Path+" missing for notification")
+ assert.Equal(t, e.Value, val, "Wrong value for "+e.Path)
}
}