Add NETCONF notification for ONU activation and Kafka client to receive events, update dependencies

Change-Id: I5f768fa8077ef7c64e00a534744ca47492344935
diff --git a/internal/core/adapter.go b/internal/core/adapter.go
index a96767f..6a59650 100644
--- a/internal/core/adapter.go
+++ b/internal/core/adapter.go
@@ -23,6 +23,7 @@
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 	"github.com/opencord/voltha-northbound-bbf-adapter/internal/clients"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
 )
 
 var AdapterInstance *VolthaYangAdapter
@@ -42,13 +43,35 @@
 func (t *VolthaYangAdapter) GetDevices(ctx context.Context) ([]YangItem, error) {
 	devices, err := t.volthaNbiClient.Service.ListDevices(ctx, &empty.Empty{})
 	if err != nil {
-		err = fmt.Errorf("get-devices-failed: %v", err)
-		return nil, err
+		return nil, fmt.Errorf("get-devices-failed: %v", err)
 	}
+	logger.Debugw(ctx, "get-devices-success", log.Fields{"devices": devices})
 
-	items := translateDevices(*devices)
+	items := []YangItem{}
 
-	logger.Debugw(ctx, "get-devices-success", log.Fields{"items": items})
+	for _, device := range devices.Items {
+		items = append(items, translateDevice(device)...)
+
+		if !device.Root {
+			//If the device is an ONU, also expose UNIs
+			ports, err := t.volthaNbiClient.Service.ListDevicePorts(ctx, &voltha.ID{Id: device.Id})
+			if err != nil {
+				return nil, fmt.Errorf("get-onu-ports-failed: %v", err)
+			}
+			logger.Debugw(ctx, "get-ports-success", log.Fields{"deviceId": device.Id, "ports": ports})
+
+			portsItems, err := translateOnuPorts(device.Id, ports)
+			if err != nil {
+				logger.Errorw(ctx, "cannot-translate-onu-ports", log.Fields{
+					"deviceId": device.Id,
+					"err":      err,
+				})
+				continue
+			}
+
+			items = append(items, portsItems...)
+		}
+	}
 
 	return items, nil
 }
diff --git a/internal/core/translation.go b/internal/core/translation.go
index 634272c..ef6582d 100644
--- a/internal/core/translation.go
+++ b/internal/core/translation.go
@@ -18,15 +18,15 @@
 
 import (
 	"fmt"
+	"time"
 
 	"github.com/opencord/voltha-protos/v5/go/common"
 	"github.com/opencord/voltha-protos/v5/go/voltha"
 )
 
 const (
-	DeviceAggregationModel = "bbf-device-aggregation"
-	DevicesPath            = "/" + DeviceAggregationModel + ":devices"
-	HardwarePath           = "data/ietf-hardware:hardware"
+	DeviceAggregationModule = "bbf-device-aggregation"
+	DevicesPath             = "/" + DeviceAggregationModule + ":devices"
 
 	//Device types
 	DeviceTypeOlt = "bbf-device-types:olt"
@@ -42,6 +42,13 @@
 	ietfOperStateDisabled = "disabled"
 	ietfOperStateEnabled  = "enabled"
 	ietfOperStateTesting  = "testing"
+	ietfOperStateUp       = "up"
+	ietfOperStateDown     = "down"
+
+	//Keys of useful values in device events
+	eventContextKeyPonId = "pon-id"
+	eventContextKeyOnuSn = "serial-number"
+	eventContextKeyOltSn = "olt-serial-number"
 )
 
 type YangItem struct {
@@ -56,7 +63,7 @@
 
 //getDevicePath returns the yang path to the root of the device's hardware module in its data mountpoint
 func getDeviceHardwarePath(id string) string {
-	return fmt.Sprintf("%s/device[name='%s']/%s/component[name='%s']", DevicesPath, id, HardwarePath, id)
+	return fmt.Sprintf("%s/device[name='%s']/data/ietf-hardware:hardware/component[name='%s']", DevicesPath, id, id)
 }
 
 //ietfHardwareAdminState returns the string that represents the ietf-hardware admin state
@@ -101,8 +108,30 @@
 	return ietfOperStateUnknown
 }
 
+//ietfHardwareOperState returns the string that represents the ietf-interfaces oper state
+//enum value corresponding to the one of VOLTHA
+func ietfInterfacesOperState(volthaOperState voltha.OperStatus_Types) string {
+	//TODO: verify this mapping is correct
+	switch volthaOperState {
+	case common.OperStatus_UNKNOWN:
+		return ietfOperStateUnknown
+	case common.OperStatus_TESTING:
+		return ietfOperStateTesting
+	case common.OperStatus_ACTIVE:
+		return ietfOperStateUp
+	case common.OperStatus_DISCOVERED:
+	case common.OperStatus_ACTIVATING:
+	case common.OperStatus_FAILED:
+	case common.OperStatus_RECONCILING:
+	case common.OperStatus_RECONCILING_FAILED:
+		return ietfOperStateDown
+	}
+
+	return ietfOperStateUnknown
+}
+
 //translateDevice returns a slice of yang items that represent a voltha device
-func translateDevice(device voltha.Device) []YangItem {
+func translateDevice(device *voltha.Device) []YangItem {
 	devicePath := getDevicePath(device.Id)
 	hardwarePath := getDeviceHardwarePath(device.Id)
 
@@ -110,70 +139,148 @@
 
 	//Device type
 	if device.Root {
+		//OLT
 		result = append(result, YangItem{
-			Path:  fmt.Sprintf("%s/type", devicePath),
+			Path:  devicePath + "/type",
 			Value: DeviceTypeOlt,
 		})
 	} else {
+		//ONU
 		result = append(result, YangItem{
-			Path:  fmt.Sprintf("%s/type", devicePath),
+			Path:  devicePath + "/type",
 			Value: DeviceTypeOnu,
 		})
 	}
 
 	//Vendor name
 	result = append(result, YangItem{
-		Path:  fmt.Sprintf("%s/mfg-name", hardwarePath),
+		Path:  hardwarePath + "/mfg-name",
 		Value: device.Vendor,
 	})
 
 	//Model
 	result = append(result, YangItem{
-		Path:  fmt.Sprintf("%s/model-name", hardwarePath),
+		Path:  hardwarePath + "/model-name",
 		Value: device.Model,
 	})
 
 	//Hardware version
 	result = append(result, YangItem{
-		Path:  fmt.Sprintf("%s/hardware-rev", hardwarePath),
+		Path:  hardwarePath + "/hardware-rev",
 		Value: device.HardwareVersion,
 	})
 
 	//Firmware version
 	result = append(result, YangItem{
-		Path:  fmt.Sprintf("%s/firmware-rev", hardwarePath),
+		Path:  hardwarePath + "/firmware-rev",
 		Value: device.FirmwareVersion,
 	})
 
 	//Serial number
 	result = append(result, YangItem{
-		Path:  fmt.Sprintf("%s/serial-num", hardwarePath),
+		Path:  hardwarePath + "/serial-num",
 		Value: device.SerialNumber,
 	})
 
 	//Administrative state
 	//Translates VOLTHA admin state enum to ietf-hardware enum
 	result = append(result, YangItem{
-		Path:  fmt.Sprintf("%s/state/admin-state", hardwarePath),
+		Path:  hardwarePath + "/state/admin-state",
 		Value: ietfHardwareAdminState(device.AdminState),
 	})
 
 	//Operative state
 	result = append(result, YangItem{
-		Path:  fmt.Sprintf("%s/state/oper-state", hardwarePath),
+		Path:  hardwarePath + "/state/oper-state",
 		Value: ietfHardwareOperState(device.OperStatus),
 	})
 
 	return result
 }
 
-//translateDevices returns a slice of yang items that represent a list of voltha devices
-func translateDevices(devices voltha.Devices) []YangItem {
+//translateOnuPorts returns a slice of yang items that represent the UNIs of an ONU
+func translateOnuPorts(deviceId string, ports *voltha.Ports) ([]YangItem, error) {
+	interfacesPath := getDevicePath(deviceId) + "/data/ietf-interfaces:interfaces"
 	result := []YangItem{}
 
-	for _, device := range devices.Items {
-		result = append(result, translateDevice(*device)...)
+	for _, port := range ports.Items {
+		if port.Type == voltha.Port_ETHERNET_UNI {
+			if port.OfpPort == nil {
+				return nil, fmt.Errorf("no-ofp-port-in-uni: %s %d", deviceId, port.PortNo)
+			}
+
+			interfacePath := fmt.Sprintf("%s/interface[name='%s']", interfacesPath, port.OfpPort.Name)
+
+			result = append(result, []YangItem{
+				{
+					Path:  interfacePath + "/type",
+					Value: "bbf-xpon-if-type:onu-v-vrefpoint",
+				},
+				{
+					Path:  interfacePath + "/oper-status",
+					Value: ietfInterfacesOperState(port.OperStatus),
+				},
+			}...)
+		}
 	}
 
-	return result
+	return result, nil
+}
+
+//TranslateOnuActivatedEvent returns a slice of yang items and the name of the channel termination to populate
+//an ONU discovery notification with data from ONU_ACTIVATED_RAISE_EVENT coming from the Kafka bus
+func TranslateOnuActivatedEvent(eventHeader *voltha.EventHeader, deviceEvent *voltha.DeviceEvent) (notification []YangItem, channelTermination []YangItem, err error) {
+
+	//TODO: the use of this notification, which requires the creation of a dummy channel termination node,
+	//is temporary, and will be substituted with a more fitting one as soon as it will be defined
+
+	//Check if the needed information is present
+	ponId, ok := deviceEvent.Context[eventContextKeyPonId]
+	if !ok {
+		return nil, nil, fmt.Errorf("missing-key-from-event-context: %s", eventContextKeyPonId)
+	}
+	oltId, ok := deviceEvent.Context[eventContextKeyOltSn]
+	if !ok {
+		return nil, nil, fmt.Errorf("missing-key-from-event-context: %s", eventContextKeyPonId)
+	}
+	ponName := oltId + "-pon-" + ponId
+
+	onuSn, ok := deviceEvent.Context[eventContextKeyOnuSn]
+	if !ok {
+		return nil, nil, fmt.Errorf("missing-key-from-event-context: %s", eventContextKeyOnuSn)
+	}
+
+	notificationPath := "/bbf-xpon-onu-states:onu-state-change"
+
+	notification = []YangItem{
+		{
+			Path:  notificationPath + "/detected-serial-number",
+			Value: onuSn,
+		},
+		{
+			Path:  notificationPath + "/channel-termination-ref",
+			Value: ponName,
+		},
+		{
+			Path:  notificationPath + "/onu-state-last-change",
+			Value: eventHeader.RaisedTs.AsTime().Format(time.RFC3339),
+		},
+		{
+			Path:  notificationPath + "/onu-state",
+			Value: "bbf-xpon-onu-types:onu-present",
+		},
+		{
+			Path:  notificationPath + "/detected-registration-id",
+			Value: deviceEvent.ResourceId,
+		},
+	}
+
+	channelTermination = []YangItem{
+		{
+			Path:  fmt.Sprintf("/ietf-interfaces:interfaces/interface[name='%s']/type", ponName),
+			Value: "bbf-if-type:vlan-sub-interface",
+		},
+	}
+
+	return notification, channelTermination, nil
 }
diff --git a/internal/core/translation_test.go b/internal/core/translation_test.go
index fb2e751..41dc1dc 100644
--- a/internal/core/translation_test.go
+++ b/internal/core/translation_test.go
@@ -19,9 +19,12 @@
 import (
 	"fmt"
 	"testing"
+	"time"
 
+	"github.com/opencord/voltha-protos/v5/go/openflow_13"
 	"github.com/opencord/voltha-protos/v5/go/voltha"
 	"github.com/stretchr/testify/assert"
+	"google.golang.org/protobuf/types/known/timestamppb"
 )
 
 const (
@@ -44,58 +47,253 @@
 }
 
 func TestTranslateDevice(t *testing.T) {
-	olt := voltha.Device{
-		Id:   testDeviceId,
-		Root: true,
+	olt := &voltha.Device{
+		Id:              testDeviceId,
+		Root:            true,
+		Vendor:          "BBSim",
+		Model:           "asfvolt16",
+		SerialNumber:    "BBSIM_OLT_10",
+		HardwareVersion: "v0.0.2",
+		FirmwareVersion: "v0.0.3",
+		AdminState:      voltha.AdminState_ENABLED,
+		OperStatus:      voltha.OperStatus_ACTIVE,
 	}
 	items := translateDevice(olt)
 
-	val, ok := getItemWithPath(items, fmt.Sprintf("%s/type", getDevicePath(testDeviceId)))
-	assert.True(t, ok, "No type item for olt")
-	assert.Equal(t, DeviceTypeOlt, val)
+	oltPath := getDevicePath(testDeviceId)
+	oltHwPath := getDeviceHardwarePath(testDeviceId)
 
-	onu := voltha.Device{
-		Id:   testDeviceId,
-		Root: false,
+	expected := []YangItem{
+		{
+			Path:  oltPath + "/type",
+			Value: DeviceTypeOlt,
+		},
+		{
+			Path:  oltHwPath + "/mfg-name",
+			Value: "BBSim",
+		},
+		{
+			Path:  oltHwPath + "/model-name",
+			Value: "asfvolt16",
+		},
+		{
+			Path:  oltHwPath + "/hardware-rev",
+			Value: "v0.0.2",
+		},
+		{
+			Path:  oltHwPath + "/firmware-rev",
+			Value: "v0.0.3",
+		},
+		{
+			Path:  oltHwPath + "/serial-num",
+			Value: "BBSIM_OLT_10",
+		},
+		{
+			Path:  oltHwPath + "/state/admin-state",
+			Value: ietfAdminStateUnlocked,
+		},
+		{
+			Path:  oltHwPath + "/state/oper-state",
+			Value: ietfOperStateEnabled,
+		},
+	}
+
+	assert.NotEmpty(t, items, "No OLT items")
+	for _, e := range expected {
+		val, ok := getItemWithPath(items, e.Path)
+		assert.True(t, ok, e.Path+" missing for OLT")
+		assert.Equal(t, e.Value, val, "Wrong value for "+e.Path)
+	}
+
+	onu := &voltha.Device{
+		Id:              testDeviceId,
+		Root:            false,
+		Vendor:          "BBSM",
+		Model:           "v0.0.1",
+		SerialNumber:    "BBSM000a0001",
+		HardwareVersion: "v0.0.2",
+		FirmwareVersion: "v0.0.3",
+		AdminState:      voltha.AdminState_ENABLED,
+		OperStatus:      voltha.OperStatus_ACTIVE,
 	}
 	items = translateDevice(onu)
 
-	val, ok = getItemWithPath(items, fmt.Sprintf("%s/type", getDevicePath(testDeviceId)))
-	assert.True(t, ok, "No type item for onu")
-	assert.Equal(t, DeviceTypeOnu, val)
+	onuPath := getDevicePath(testDeviceId)
+	onuHwPath := getDeviceHardwarePath(testDeviceId)
+
+	expected = []YangItem{
+		{
+			Path:  onuPath + "/type",
+			Value: DeviceTypeOnu,
+		},
+		{
+			Path:  onuHwPath + "/mfg-name",
+			Value: "BBSM",
+		},
+		{
+			Path:  onuHwPath + "/model-name",
+			Value: "v0.0.1",
+		},
+		{
+			Path:  onuHwPath + "/hardware-rev",
+			Value: "v0.0.2",
+		},
+		{
+			Path:  onuHwPath + "/firmware-rev",
+			Value: "v0.0.3",
+		},
+		{
+			Path:  onuHwPath + "/serial-num",
+			Value: "BBSM000a0001",
+		},
+		{
+			Path:  onuHwPath + "/state/admin-state",
+			Value: ietfAdminStateUnlocked,
+		},
+		{
+			Path:  onuHwPath + "/state/oper-state",
+			Value: ietfOperStateEnabled,
+		},
+	}
+
+	assert.NotEmpty(t, items, "No ONU items")
+	for _, e := range expected {
+		val, ok := getItemWithPath(items, e.Path)
+		assert.True(t, ok, e.Path+" missing for ONU")
+		assert.Equal(t, e.Value, val, "Wrong value for "+e.Path)
+	}
 }
 
-func TestTranslateDevices(t *testing.T) {
-	devicesNum := 10
-
-	//Create test devices
-	devices := voltha.Devices{
-		Items: []*voltha.Device{},
+func TestTranslateOnuPorts(t *testing.T) {
+	ports := &voltha.Ports{
+		Items: []*voltha.Port{
+			{
+				PortNo:     0,
+				Type:       voltha.Port_ETHERNET_UNI,
+				OperStatus: voltha.OperStatus_ACTIVE,
+			},
+		},
 	}
 
-	for i := 0; i < devicesNum; i++ {
-		devices.Items = append(devices.Items, &voltha.Device{
-			Id:   fmt.Sprintf("%d", i),
-			Root: i%2 == 0,
-		})
+	_, err := translateOnuPorts(testDeviceId, ports)
+	assert.Error(t, err, "No error for missing Ofp port")
+
+	ports = &voltha.Ports{
+		Items: []*voltha.Port{
+			{
+				PortNo: 0,
+				Type:   voltha.Port_ETHERNET_UNI,
+				OfpPort: &openflow_13.OfpPort{
+					Name: "BBSM000a0001-1",
+				},
+				OperStatus: voltha.OperStatus_ACTIVE,
+			},
+			{
+				PortNo: 1,
+				Type:   voltha.Port_ETHERNET_UNI,
+				OfpPort: &openflow_13.OfpPort{
+					Name: "BBSM000a0001-2",
+				},
+				OperStatus: voltha.OperStatus_UNKNOWN,
+			},
+			{
+				PortNo:     0,
+				Type:       voltha.Port_PON_ONU,
+				OperStatus: voltha.OperStatus_UNKNOWN,
+			},
+		},
 	}
 
-	//Translate them to items
-	items := translateDevices(devices)
+	portsItems, err := translateOnuPorts(testDeviceId, ports)
+	assert.Nil(t, err, "Translation error")
 
-	//Check if the number of generated items is correct
-	singleDeviceItemsNum := len(translateDevice(*devices.Items[0]))
-	assert.Equal(t, singleDeviceItemsNum*devicesNum, len(items))
+	/*2 items for 2 UNIs, PON is ignored*/
+	assert.Equal(t, 4, len(portsItems), "No ports items")
 
-	//Check if the content is right
-	for i := 0; i < devicesNum; i++ {
-		val, ok := getItemWithPath(items, fmt.Sprintf("%s/type", getDevicePath(devices.Items[i].Id)))
-		assert.True(t, ok, fmt.Sprintf("No type item for device %d", i))
+	interfacesPath := getDevicePath(testDeviceId) + "/data/ietf-interfaces:interfaces"
 
-		if devices.Items[i].Root {
-			assert.Equal(t, DeviceTypeOlt, val)
-		} else {
-			assert.Equal(t, DeviceTypeOnu, val)
-		}
+	expected := []YangItem{
+		{
+			Path:  fmt.Sprintf("%s/interface[name='%s']/oper-status", interfacesPath, "BBSM000a0001-1"),
+			Value: ietfOperStateUp,
+		},
+		{
+			Path:  fmt.Sprintf("%s/interface[name='%s']/type", interfacesPath, "BBSM000a0001-1"),
+			Value: "bbf-xpon-if-type:onu-v-vrefpoint",
+		},
+		{
+			Path:  fmt.Sprintf("%s/interface[name='%s']/oper-status", interfacesPath, "BBSM000a0001-2"),
+			Value: ietfOperStateUnknown,
+		},
+		{
+			Path:  fmt.Sprintf("%s/interface[name='%s']/type", interfacesPath, "BBSM000a0001-2"),
+			Value: "bbf-xpon-if-type:onu-v-vrefpoint",
+		},
+	}
+
+	for _, e := range expected {
+		val, ok := getItemWithPath(portsItems, e.Path)
+		assert.True(t, ok, e.Path+" missing for ports")
+		assert.Equal(t, e.Value, val, "Wrong value for "+e.Path)
+	}
+}
+
+func TestTranslateOnuActive(t *testing.T) {
+	timestamp := time.Now()
+	eventHeader := &voltha.EventHeader{
+		Id:       "Voltha.openolt.ONU_ACTIVATED.1657705515351182767",
+		RaisedTs: timestamppb.New(timestamp),
+		Category: voltha.EventCategory_EQUIPMENT,
+		Type:     voltha.EventType_DEVICE_EVENT,
+	}
+
+	deviceEvent := &voltha.DeviceEvent{
+		ResourceId:      testDeviceId,
+		DeviceEventName: "ONU_ACTIVATED_RAISE_EVENT",
+		Description:     "ONU Event - ONU_ACTIVATED - Raised",
+		Context:         map[string]string{},
+	}
+
+	_, _, err := TranslateOnuActivatedEvent(eventHeader, deviceEvent)
+	assert.Error(t, err, "Empty context produces no error")
+
+	deviceEvent.Context[eventContextKeyPonId] = "0"
+	deviceEvent.Context[eventContextKeyOnuSn] = "BBSM000a0001"
+	deviceEvent.Context[eventContextKeyOltSn] = "BBSIM_OLT_10"
+
+	notificationPath := "/bbf-xpon-onu-states:onu-state-change"
+	expected := []YangItem{
+		{
+			Path:  notificationPath + "/detected-serial-number",
+			Value: "BBSM000a0001",
+		},
+		{
+			Path:  notificationPath + "/onu-state-last-change",
+			Value: timestamp.Format(time.RFC3339),
+		},
+		{
+			Path:  notificationPath + "/onu-state",
+			Value: "bbf-xpon-onu-types:onu-present",
+		},
+		{
+			Path:  notificationPath + "/detected-registration-id",
+			Value: testDeviceId,
+		},
+	}
+
+	notificationItems, channelTerminationItems, err := TranslateOnuActivatedEvent(eventHeader, deviceEvent)
+	assert.Nil(t, err, "Translation error")
+
+	assert.NotEmpty(t, channelTerminationItems, "No channel termination items")
+
+	assert.NotEmpty(t, notificationItems, "No notification items")
+
+	_, ok := getItemWithPath(notificationItems, notificationPath+"/channel-termination-ref")
+	assert.True(t, ok, "No channel termination reference in notification")
+
+	for _, e := range expected {
+		val, ok := getItemWithPath(notificationItems, e.Path)
+		assert.True(t, ok, e.Path+" missing for notification")
+		assert.Equal(t, e.Value, val, "Wrong value for "+e.Path)
 	}
 }