Add NETCONF notification for ONU activation and Kafka client to receive events, update dependencies

Change-Id: I5f768fa8077ef7c64e00a534744ca47492344935
diff --git a/internal/clients/kafka.go b/internal/clients/kafka.go
new file mode 100644
index 0000000..9350895
--- /dev/null
+++ b/internal/clients/kafka.go
@@ -0,0 +1,155 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+
+* http://www.apache.org/licenses/LICENSE-2.0
+
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package clients
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	"github.com/Shopify/sarama"
+	"github.com/golang/protobuf/proto"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
+)
+
+const (
+	volthaEventsTopic    = "voltha.events"
+	kafkaBackoffInterval = time.Second * 10
+)
+
+//Used to listen for events coming from VOLTHA
+type KafkaConsumer struct {
+	address           string
+	config            *sarama.Config
+	client            sarama.Client
+	consumer          sarama.Consumer
+	partitionConsumer sarama.PartitionConsumer
+	highwater         int64
+}
+
+//Creates a sarama client with the specified address
+func NewKafkaConsumer(clusterAddress string) *KafkaConsumer {
+	c := KafkaConsumer{address: clusterAddress}
+	c.config = sarama.NewConfig()
+	c.config.ClientID = "bbf-adapter-consumer"
+	c.config.Consumer.Return.Errors = true
+	c.config.Consumer.Offsets.Initial = sarama.OffsetNewest
+	c.config.Version = sarama.V1_0_0_0
+
+	return &c
+}
+
+//Starts consuming new messages on the voltha events topic, executing the provided callback on each event
+func (c *KafkaConsumer) Start(ctx context.Context, eventCallback func(context.Context, *voltha.Event)) error {
+	var err error
+
+	for {
+		if c.client, err = sarama.NewClient([]string{c.address}, c.config); err == nil {
+			logger.Debug(ctx, "kafka-client-created")
+			break
+		} else {
+			logger.Warnw(ctx, "kafka-not-reachable", log.Fields{
+				"err": err,
+			})
+		}
+
+		//Wait a bit before trying again
+		select {
+		case <-ctx.Done():
+			return fmt.Errorf("kafka-client-creation-stopped-due-to-context-done")
+		case <-time.After(kafkaBackoffInterval):
+			continue
+		}
+	}
+
+	c.consumer, err = sarama.NewConsumerFromClient(c.client)
+	if err != nil {
+		return err
+	}
+
+	partitions, _ := c.consumer.Partitions(volthaEventsTopic)
+
+	// TODO: Add support for multiple partitions
+	if len(partitions) > 1 {
+		logger.Warnw(ctx, "only-listening-one-partition", log.Fields{
+			"topic":         volthaEventsTopic,
+			"partitionsNum": len(partitions),
+		})
+	}
+
+	hw, err := c.client.GetOffset(volthaEventsTopic, partitions[0], sarama.OffsetNewest)
+	if err != nil {
+		return fmt.Errorf("cannot-get-highwater: %v", err)
+	}
+	c.highwater = hw
+
+	c.partitionConsumer, err = c.consumer.ConsumePartition(volthaEventsTopic, partitions[0], sarama.OffsetOldest)
+	if nil != err {
+		return fmt.Errorf("Error in consume(): Topic %v Partitions: %v", volthaEventsTopic, partitions)
+	}
+
+	//Start consuming the event topic in a goroutine
+	logger.Debugw(ctx, "start-consuming-kafka-topic", log.Fields{"topic": volthaEventsTopic})
+	go func(topic string, pConsumer sarama.PartitionConsumer) {
+		for {
+			select {
+			case <-ctx.Done():
+				logger.Info(ctx, "stopped-listening-for-events-due-to-context-done")
+				return
+			case err := <-pConsumer.Errors():
+				logger.Errorw(ctx, "kafka-consumer-error", log.Fields{
+					"err":       err.Error(),
+					"topic":     err.Topic,
+					"partition": err.Partition,
+				})
+			case msg := <-pConsumer.Messages():
+				if msg.Offset <= c.highwater {
+					continue
+				}
+
+				//Unmarshal the content of the message to a voltha Event protobuf message
+				event := &voltha.Event{}
+				if err := proto.Unmarshal(msg.Value, event); err != nil {
+					logger.Errorw(ctx, "error-unmarshalling-kafka-event", log.Fields{"err": err})
+					continue
+				}
+
+				eventCallback(ctx, event)
+			}
+		}
+	}(volthaEventsTopic, c.partitionConsumer)
+
+	return nil
+}
+
+//Closes the sarama client and all consumers
+func (c *KafkaConsumer) Stop() error {
+	if err := c.partitionConsumer.Close(); err != nil {
+		return err
+	}
+
+	if err := c.consumer.Close(); err != nil {
+		return err
+	}
+
+	if err := c.client.Close(); err != nil {
+		return err
+	}
+
+	return nil
+}
diff --git a/internal/config/config.go b/internal/config/config.go
index d3be1fc..ff7f36f 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -36,6 +36,7 @@
 	OnosUser              string
 	OnosPassword          string
 	SchemaMountFilePath   string
+	KafkaClusterAddress   string
 }
 
 // LoadConfig loads the BBF adapter configuration through
@@ -57,6 +58,7 @@
 	flag.StringVar(&conf.OnosUser, "onos_user", conf.OnosUser, "Username for ONOS REST APIs")
 	flag.StringVar(&conf.OnosPassword, "onos_pass", conf.OnosPassword, "Password for ONOS REST APIs")
 	flag.StringVar(&conf.SchemaMountFilePath, "schema_mount_path", conf.SchemaMountFilePath, "Path to the XML file that defines schema-mounts for libyang")
+	flag.StringVar(&conf.KafkaClusterAddress, "kafka_cluster_address", conf.KafkaClusterAddress, "Kafka cluster messaging address")
 
 	flag.Parse()
 
@@ -80,5 +82,6 @@
 		OnosUser:              "onos",
 		OnosPassword:          "rocks",
 		SchemaMountFilePath:   "/schema-mount.xml",
+		KafkaClusterAddress:   "127.0.0.1:9092",
 	}
 }
diff --git a/internal/core/adapter.go b/internal/core/adapter.go
index a96767f..6a59650 100644
--- a/internal/core/adapter.go
+++ b/internal/core/adapter.go
@@ -23,6 +23,7 @@
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 	"github.com/opencord/voltha-northbound-bbf-adapter/internal/clients"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
 )
 
 var AdapterInstance *VolthaYangAdapter
@@ -42,13 +43,35 @@
 func (t *VolthaYangAdapter) GetDevices(ctx context.Context) ([]YangItem, error) {
 	devices, err := t.volthaNbiClient.Service.ListDevices(ctx, &empty.Empty{})
 	if err != nil {
-		err = fmt.Errorf("get-devices-failed: %v", err)
-		return nil, err
+		return nil, fmt.Errorf("get-devices-failed: %v", err)
 	}
+	logger.Debugw(ctx, "get-devices-success", log.Fields{"devices": devices})
 
-	items := translateDevices(*devices)
+	items := []YangItem{}
 
-	logger.Debugw(ctx, "get-devices-success", log.Fields{"items": items})
+	for _, device := range devices.Items {
+		items = append(items, translateDevice(device)...)
+
+		if !device.Root {
+			//If the device is an ONU, also expose UNIs
+			ports, err := t.volthaNbiClient.Service.ListDevicePorts(ctx, &voltha.ID{Id: device.Id})
+			if err != nil {
+				return nil, fmt.Errorf("get-onu-ports-failed: %v", err)
+			}
+			logger.Debugw(ctx, "get-ports-success", log.Fields{"deviceId": device.Id, "ports": ports})
+
+			portsItems, err := translateOnuPorts(device.Id, ports)
+			if err != nil {
+				logger.Errorw(ctx, "cannot-translate-onu-ports", log.Fields{
+					"deviceId": device.Id,
+					"err":      err,
+				})
+				continue
+			}
+
+			items = append(items, portsItems...)
+		}
+	}
 
 	return items, nil
 }
diff --git a/internal/core/translation.go b/internal/core/translation.go
index 634272c..ef6582d 100644
--- a/internal/core/translation.go
+++ b/internal/core/translation.go
@@ -18,15 +18,15 @@
 
 import (
 	"fmt"
+	"time"
 
 	"github.com/opencord/voltha-protos/v5/go/common"
 	"github.com/opencord/voltha-protos/v5/go/voltha"
 )
 
 const (
-	DeviceAggregationModel = "bbf-device-aggregation"
-	DevicesPath            = "/" + DeviceAggregationModel + ":devices"
-	HardwarePath           = "data/ietf-hardware:hardware"
+	DeviceAggregationModule = "bbf-device-aggregation"
+	DevicesPath             = "/" + DeviceAggregationModule + ":devices"
 
 	//Device types
 	DeviceTypeOlt = "bbf-device-types:olt"
@@ -42,6 +42,13 @@
 	ietfOperStateDisabled = "disabled"
 	ietfOperStateEnabled  = "enabled"
 	ietfOperStateTesting  = "testing"
+	ietfOperStateUp       = "up"
+	ietfOperStateDown     = "down"
+
+	//Keys of useful values in device events
+	eventContextKeyPonId = "pon-id"
+	eventContextKeyOnuSn = "serial-number"
+	eventContextKeyOltSn = "olt-serial-number"
 )
 
 type YangItem struct {
@@ -56,7 +63,7 @@
 
 //getDevicePath returns the yang path to the root of the device's hardware module in its data mountpoint
 func getDeviceHardwarePath(id string) string {
-	return fmt.Sprintf("%s/device[name='%s']/%s/component[name='%s']", DevicesPath, id, HardwarePath, id)
+	return fmt.Sprintf("%s/device[name='%s']/data/ietf-hardware:hardware/component[name='%s']", DevicesPath, id, id)
 }
 
 //ietfHardwareAdminState returns the string that represents the ietf-hardware admin state
@@ -101,8 +108,30 @@
 	return ietfOperStateUnknown
 }
 
+//ietfHardwareOperState returns the string that represents the ietf-interfaces oper state
+//enum value corresponding to the one of VOLTHA
+func ietfInterfacesOperState(volthaOperState voltha.OperStatus_Types) string {
+	//TODO: verify this mapping is correct
+	switch volthaOperState {
+	case common.OperStatus_UNKNOWN:
+		return ietfOperStateUnknown
+	case common.OperStatus_TESTING:
+		return ietfOperStateTesting
+	case common.OperStatus_ACTIVE:
+		return ietfOperStateUp
+	case common.OperStatus_DISCOVERED:
+	case common.OperStatus_ACTIVATING:
+	case common.OperStatus_FAILED:
+	case common.OperStatus_RECONCILING:
+	case common.OperStatus_RECONCILING_FAILED:
+		return ietfOperStateDown
+	}
+
+	return ietfOperStateUnknown
+}
+
 //translateDevice returns a slice of yang items that represent a voltha device
-func translateDevice(device voltha.Device) []YangItem {
+func translateDevice(device *voltha.Device) []YangItem {
 	devicePath := getDevicePath(device.Id)
 	hardwarePath := getDeviceHardwarePath(device.Id)
 
@@ -110,70 +139,148 @@
 
 	//Device type
 	if device.Root {
+		//OLT
 		result = append(result, YangItem{
-			Path:  fmt.Sprintf("%s/type", devicePath),
+			Path:  devicePath + "/type",
 			Value: DeviceTypeOlt,
 		})
 	} else {
+		//ONU
 		result = append(result, YangItem{
-			Path:  fmt.Sprintf("%s/type", devicePath),
+			Path:  devicePath + "/type",
 			Value: DeviceTypeOnu,
 		})
 	}
 
 	//Vendor name
 	result = append(result, YangItem{
-		Path:  fmt.Sprintf("%s/mfg-name", hardwarePath),
+		Path:  hardwarePath + "/mfg-name",
 		Value: device.Vendor,
 	})
 
 	//Model
 	result = append(result, YangItem{
-		Path:  fmt.Sprintf("%s/model-name", hardwarePath),
+		Path:  hardwarePath + "/model-name",
 		Value: device.Model,
 	})
 
 	//Hardware version
 	result = append(result, YangItem{
-		Path:  fmt.Sprintf("%s/hardware-rev", hardwarePath),
+		Path:  hardwarePath + "/hardware-rev",
 		Value: device.HardwareVersion,
 	})
 
 	//Firmware version
 	result = append(result, YangItem{
-		Path:  fmt.Sprintf("%s/firmware-rev", hardwarePath),
+		Path:  hardwarePath + "/firmware-rev",
 		Value: device.FirmwareVersion,
 	})
 
 	//Serial number
 	result = append(result, YangItem{
-		Path:  fmt.Sprintf("%s/serial-num", hardwarePath),
+		Path:  hardwarePath + "/serial-num",
 		Value: device.SerialNumber,
 	})
 
 	//Administrative state
 	//Translates VOLTHA admin state enum to ietf-hardware enum
 	result = append(result, YangItem{
-		Path:  fmt.Sprintf("%s/state/admin-state", hardwarePath),
+		Path:  hardwarePath + "/state/admin-state",
 		Value: ietfHardwareAdminState(device.AdminState),
 	})
 
 	//Operative state
 	result = append(result, YangItem{
-		Path:  fmt.Sprintf("%s/state/oper-state", hardwarePath),
+		Path:  hardwarePath + "/state/oper-state",
 		Value: ietfHardwareOperState(device.OperStatus),
 	})
 
 	return result
 }
 
-//translateDevices returns a slice of yang items that represent a list of voltha devices
-func translateDevices(devices voltha.Devices) []YangItem {
+//translateOnuPorts returns a slice of yang items that represent the UNIs of an ONU
+func translateOnuPorts(deviceId string, ports *voltha.Ports) ([]YangItem, error) {
+	interfacesPath := getDevicePath(deviceId) + "/data/ietf-interfaces:interfaces"
 	result := []YangItem{}
 
-	for _, device := range devices.Items {
-		result = append(result, translateDevice(*device)...)
+	for _, port := range ports.Items {
+		if port.Type == voltha.Port_ETHERNET_UNI {
+			if port.OfpPort == nil {
+				return nil, fmt.Errorf("no-ofp-port-in-uni: %s %d", deviceId, port.PortNo)
+			}
+
+			interfacePath := fmt.Sprintf("%s/interface[name='%s']", interfacesPath, port.OfpPort.Name)
+
+			result = append(result, []YangItem{
+				{
+					Path:  interfacePath + "/type",
+					Value: "bbf-xpon-if-type:onu-v-vrefpoint",
+				},
+				{
+					Path:  interfacePath + "/oper-status",
+					Value: ietfInterfacesOperState(port.OperStatus),
+				},
+			}...)
+		}
 	}
 
-	return result
+	return result, nil
+}
+
+//TranslateOnuActivatedEvent returns a slice of yang items and the name of the channel termination to populate
+//an ONU discovery notification with data from ONU_ACTIVATED_RAISE_EVENT coming from the Kafka bus
+func TranslateOnuActivatedEvent(eventHeader *voltha.EventHeader, deviceEvent *voltha.DeviceEvent) (notification []YangItem, channelTermination []YangItem, err error) {
+
+	//TODO: the use of this notification, which requires the creation of a dummy channel termination node,
+	//is temporary, and will be substituted with a more fitting one as soon as it will be defined
+
+	//Check if the needed information is present
+	ponId, ok := deviceEvent.Context[eventContextKeyPonId]
+	if !ok {
+		return nil, nil, fmt.Errorf("missing-key-from-event-context: %s", eventContextKeyPonId)
+	}
+	oltId, ok := deviceEvent.Context[eventContextKeyOltSn]
+	if !ok {
+		return nil, nil, fmt.Errorf("missing-key-from-event-context: %s", eventContextKeyPonId)
+	}
+	ponName := oltId + "-pon-" + ponId
+
+	onuSn, ok := deviceEvent.Context[eventContextKeyOnuSn]
+	if !ok {
+		return nil, nil, fmt.Errorf("missing-key-from-event-context: %s", eventContextKeyOnuSn)
+	}
+
+	notificationPath := "/bbf-xpon-onu-states:onu-state-change"
+
+	notification = []YangItem{
+		{
+			Path:  notificationPath + "/detected-serial-number",
+			Value: onuSn,
+		},
+		{
+			Path:  notificationPath + "/channel-termination-ref",
+			Value: ponName,
+		},
+		{
+			Path:  notificationPath + "/onu-state-last-change",
+			Value: eventHeader.RaisedTs.AsTime().Format(time.RFC3339),
+		},
+		{
+			Path:  notificationPath + "/onu-state",
+			Value: "bbf-xpon-onu-types:onu-present",
+		},
+		{
+			Path:  notificationPath + "/detected-registration-id",
+			Value: deviceEvent.ResourceId,
+		},
+	}
+
+	channelTermination = []YangItem{
+		{
+			Path:  fmt.Sprintf("/ietf-interfaces:interfaces/interface[name='%s']/type", ponName),
+			Value: "bbf-if-type:vlan-sub-interface",
+		},
+	}
+
+	return notification, channelTermination, nil
 }
diff --git a/internal/core/translation_test.go b/internal/core/translation_test.go
index fb2e751..41dc1dc 100644
--- a/internal/core/translation_test.go
+++ b/internal/core/translation_test.go
@@ -19,9 +19,12 @@
 import (
 	"fmt"
 	"testing"
+	"time"
 
+	"github.com/opencord/voltha-protos/v5/go/openflow_13"
 	"github.com/opencord/voltha-protos/v5/go/voltha"
 	"github.com/stretchr/testify/assert"
+	"google.golang.org/protobuf/types/known/timestamppb"
 )
 
 const (
@@ -44,58 +47,253 @@
 }
 
 func TestTranslateDevice(t *testing.T) {
-	olt := voltha.Device{
-		Id:   testDeviceId,
-		Root: true,
+	olt := &voltha.Device{
+		Id:              testDeviceId,
+		Root:            true,
+		Vendor:          "BBSim",
+		Model:           "asfvolt16",
+		SerialNumber:    "BBSIM_OLT_10",
+		HardwareVersion: "v0.0.2",
+		FirmwareVersion: "v0.0.3",
+		AdminState:      voltha.AdminState_ENABLED,
+		OperStatus:      voltha.OperStatus_ACTIVE,
 	}
 	items := translateDevice(olt)
 
-	val, ok := getItemWithPath(items, fmt.Sprintf("%s/type", getDevicePath(testDeviceId)))
-	assert.True(t, ok, "No type item for olt")
-	assert.Equal(t, DeviceTypeOlt, val)
+	oltPath := getDevicePath(testDeviceId)
+	oltHwPath := getDeviceHardwarePath(testDeviceId)
 
-	onu := voltha.Device{
-		Id:   testDeviceId,
-		Root: false,
+	expected := []YangItem{
+		{
+			Path:  oltPath + "/type",
+			Value: DeviceTypeOlt,
+		},
+		{
+			Path:  oltHwPath + "/mfg-name",
+			Value: "BBSim",
+		},
+		{
+			Path:  oltHwPath + "/model-name",
+			Value: "asfvolt16",
+		},
+		{
+			Path:  oltHwPath + "/hardware-rev",
+			Value: "v0.0.2",
+		},
+		{
+			Path:  oltHwPath + "/firmware-rev",
+			Value: "v0.0.3",
+		},
+		{
+			Path:  oltHwPath + "/serial-num",
+			Value: "BBSIM_OLT_10",
+		},
+		{
+			Path:  oltHwPath + "/state/admin-state",
+			Value: ietfAdminStateUnlocked,
+		},
+		{
+			Path:  oltHwPath + "/state/oper-state",
+			Value: ietfOperStateEnabled,
+		},
+	}
+
+	assert.NotEmpty(t, items, "No OLT items")
+	for _, e := range expected {
+		val, ok := getItemWithPath(items, e.Path)
+		assert.True(t, ok, e.Path+" missing for OLT")
+		assert.Equal(t, e.Value, val, "Wrong value for "+e.Path)
+	}
+
+	onu := &voltha.Device{
+		Id:              testDeviceId,
+		Root:            false,
+		Vendor:          "BBSM",
+		Model:           "v0.0.1",
+		SerialNumber:    "BBSM000a0001",
+		HardwareVersion: "v0.0.2",
+		FirmwareVersion: "v0.0.3",
+		AdminState:      voltha.AdminState_ENABLED,
+		OperStatus:      voltha.OperStatus_ACTIVE,
 	}
 	items = translateDevice(onu)
 
-	val, ok = getItemWithPath(items, fmt.Sprintf("%s/type", getDevicePath(testDeviceId)))
-	assert.True(t, ok, "No type item for onu")
-	assert.Equal(t, DeviceTypeOnu, val)
+	onuPath := getDevicePath(testDeviceId)
+	onuHwPath := getDeviceHardwarePath(testDeviceId)
+
+	expected = []YangItem{
+		{
+			Path:  onuPath + "/type",
+			Value: DeviceTypeOnu,
+		},
+		{
+			Path:  onuHwPath + "/mfg-name",
+			Value: "BBSM",
+		},
+		{
+			Path:  onuHwPath + "/model-name",
+			Value: "v0.0.1",
+		},
+		{
+			Path:  onuHwPath + "/hardware-rev",
+			Value: "v0.0.2",
+		},
+		{
+			Path:  onuHwPath + "/firmware-rev",
+			Value: "v0.0.3",
+		},
+		{
+			Path:  onuHwPath + "/serial-num",
+			Value: "BBSM000a0001",
+		},
+		{
+			Path:  onuHwPath + "/state/admin-state",
+			Value: ietfAdminStateUnlocked,
+		},
+		{
+			Path:  onuHwPath + "/state/oper-state",
+			Value: ietfOperStateEnabled,
+		},
+	}
+
+	assert.NotEmpty(t, items, "No ONU items")
+	for _, e := range expected {
+		val, ok := getItemWithPath(items, e.Path)
+		assert.True(t, ok, e.Path+" missing for ONU")
+		assert.Equal(t, e.Value, val, "Wrong value for "+e.Path)
+	}
 }
 
-func TestTranslateDevices(t *testing.T) {
-	devicesNum := 10
-
-	//Create test devices
-	devices := voltha.Devices{
-		Items: []*voltha.Device{},
+func TestTranslateOnuPorts(t *testing.T) {
+	ports := &voltha.Ports{
+		Items: []*voltha.Port{
+			{
+				PortNo:     0,
+				Type:       voltha.Port_ETHERNET_UNI,
+				OperStatus: voltha.OperStatus_ACTIVE,
+			},
+		},
 	}
 
-	for i := 0; i < devicesNum; i++ {
-		devices.Items = append(devices.Items, &voltha.Device{
-			Id:   fmt.Sprintf("%d", i),
-			Root: i%2 == 0,
-		})
+	_, err := translateOnuPorts(testDeviceId, ports)
+	assert.Error(t, err, "No error for missing Ofp port")
+
+	ports = &voltha.Ports{
+		Items: []*voltha.Port{
+			{
+				PortNo: 0,
+				Type:   voltha.Port_ETHERNET_UNI,
+				OfpPort: &openflow_13.OfpPort{
+					Name: "BBSM000a0001-1",
+				},
+				OperStatus: voltha.OperStatus_ACTIVE,
+			},
+			{
+				PortNo: 1,
+				Type:   voltha.Port_ETHERNET_UNI,
+				OfpPort: &openflow_13.OfpPort{
+					Name: "BBSM000a0001-2",
+				},
+				OperStatus: voltha.OperStatus_UNKNOWN,
+			},
+			{
+				PortNo:     0,
+				Type:       voltha.Port_PON_ONU,
+				OperStatus: voltha.OperStatus_UNKNOWN,
+			},
+		},
 	}
 
-	//Translate them to items
-	items := translateDevices(devices)
+	portsItems, err := translateOnuPorts(testDeviceId, ports)
+	assert.Nil(t, err, "Translation error")
 
-	//Check if the number of generated items is correct
-	singleDeviceItemsNum := len(translateDevice(*devices.Items[0]))
-	assert.Equal(t, singleDeviceItemsNum*devicesNum, len(items))
+	/*2 items for 2 UNIs, PON is ignored*/
+	assert.Equal(t, 4, len(portsItems), "No ports items")
 
-	//Check if the content is right
-	for i := 0; i < devicesNum; i++ {
-		val, ok := getItemWithPath(items, fmt.Sprintf("%s/type", getDevicePath(devices.Items[i].Id)))
-		assert.True(t, ok, fmt.Sprintf("No type item for device %d", i))
+	interfacesPath := getDevicePath(testDeviceId) + "/data/ietf-interfaces:interfaces"
 
-		if devices.Items[i].Root {
-			assert.Equal(t, DeviceTypeOlt, val)
-		} else {
-			assert.Equal(t, DeviceTypeOnu, val)
-		}
+	expected := []YangItem{
+		{
+			Path:  fmt.Sprintf("%s/interface[name='%s']/oper-status", interfacesPath, "BBSM000a0001-1"),
+			Value: ietfOperStateUp,
+		},
+		{
+			Path:  fmt.Sprintf("%s/interface[name='%s']/type", interfacesPath, "BBSM000a0001-1"),
+			Value: "bbf-xpon-if-type:onu-v-vrefpoint",
+		},
+		{
+			Path:  fmt.Sprintf("%s/interface[name='%s']/oper-status", interfacesPath, "BBSM000a0001-2"),
+			Value: ietfOperStateUnknown,
+		},
+		{
+			Path:  fmt.Sprintf("%s/interface[name='%s']/type", interfacesPath, "BBSM000a0001-2"),
+			Value: "bbf-xpon-if-type:onu-v-vrefpoint",
+		},
+	}
+
+	for _, e := range expected {
+		val, ok := getItemWithPath(portsItems, e.Path)
+		assert.True(t, ok, e.Path+" missing for ports")
+		assert.Equal(t, e.Value, val, "Wrong value for "+e.Path)
+	}
+}
+
+func TestTranslateOnuActive(t *testing.T) {
+	timestamp := time.Now()
+	eventHeader := &voltha.EventHeader{
+		Id:       "Voltha.openolt.ONU_ACTIVATED.1657705515351182767",
+		RaisedTs: timestamppb.New(timestamp),
+		Category: voltha.EventCategory_EQUIPMENT,
+		Type:     voltha.EventType_DEVICE_EVENT,
+	}
+
+	deviceEvent := &voltha.DeviceEvent{
+		ResourceId:      testDeviceId,
+		DeviceEventName: "ONU_ACTIVATED_RAISE_EVENT",
+		Description:     "ONU Event - ONU_ACTIVATED - Raised",
+		Context:         map[string]string{},
+	}
+
+	_, _, err := TranslateOnuActivatedEvent(eventHeader, deviceEvent)
+	assert.Error(t, err, "Empty context produces no error")
+
+	deviceEvent.Context[eventContextKeyPonId] = "0"
+	deviceEvent.Context[eventContextKeyOnuSn] = "BBSM000a0001"
+	deviceEvent.Context[eventContextKeyOltSn] = "BBSIM_OLT_10"
+
+	notificationPath := "/bbf-xpon-onu-states:onu-state-change"
+	expected := []YangItem{
+		{
+			Path:  notificationPath + "/detected-serial-number",
+			Value: "BBSM000a0001",
+		},
+		{
+			Path:  notificationPath + "/onu-state-last-change",
+			Value: timestamp.Format(time.RFC3339),
+		},
+		{
+			Path:  notificationPath + "/onu-state",
+			Value: "bbf-xpon-onu-types:onu-present",
+		},
+		{
+			Path:  notificationPath + "/detected-registration-id",
+			Value: testDeviceId,
+		},
+	}
+
+	notificationItems, channelTerminationItems, err := TranslateOnuActivatedEvent(eventHeader, deviceEvent)
+	assert.Nil(t, err, "Translation error")
+
+	assert.NotEmpty(t, channelTerminationItems, "No channel termination items")
+
+	assert.NotEmpty(t, notificationItems, "No notification items")
+
+	_, ok := getItemWithPath(notificationItems, notificationPath+"/channel-termination-ref")
+	assert.True(t, ok, "No channel termination reference in notification")
+
+	for _, e := range expected {
+		val, ok := getItemWithPath(notificationItems, e.Path)
+		assert.True(t, ok, e.Path+" missing for notification")
+		assert.Equal(t, e.Value, val, "Wrong value for "+e.Path)
 	}
 }
diff --git a/internal/sysrepo/callbacks.go b/internal/sysrepo/callbacks.go
new file mode 100644
index 0000000..1256568
--- /dev/null
+++ b/internal/sysrepo/callbacks.go
@@ -0,0 +1,67 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+
+* http://www.apache.org/licenses/LICENSE-2.0
+
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sysrepo
+
+//#cgo LDFLAGS: -lsysrepo -lyang -Wl,--allow-multiple-definition
+//#include "plugin.c"
+import "C"
+import (
+	"context"
+
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-northbound-bbf-adapter/internal/core"
+)
+
+//export get_devices_cb
+func get_devices_cb(session *C.sr_session_ctx_t, parent **C.lyd_node) C.sr_error_t {
+	//This function is a callback for the retrieval of devices from sysrepo
+	//The "export" comment instructs CGO to create a C function for it
+
+	ctx := context.Background()
+	logger.Debug(ctx, "processing-get-devices-request")
+
+	if session == nil {
+		logger.Error(ctx, "sysrepo-get-devices-null-session")
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	if parent == nil {
+		logger.Error(ctx, "sysrepo-get-devices-null-parent-node")
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	if core.AdapterInstance == nil {
+		logger.Error(ctx, "sysrepo-get-devices-nil-translator")
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	devices, err := core.AdapterInstance.GetDevices(ctx)
+	if err != nil {
+		logger.Errorw(ctx, "sysrepo-get-devices-translator-error", log.Fields{"err": err})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	err = updateYangTree(ctx, session, parent, devices)
+	if err != nil {
+		logger.Errorw(ctx, "sysrepo-get-devices-update-error", log.Fields{"err": err})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	logger.Info(ctx, "devices-information-request-served")
+
+	return C.SR_ERR_OK
+}
diff --git a/internal/sysrepo/events.go b/internal/sysrepo/events.go
new file mode 100644
index 0000000..4ababe3
--- /dev/null
+++ b/internal/sysrepo/events.go
@@ -0,0 +1,110 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+
+* http://www.apache.org/licenses/LICENSE-2.0
+
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sysrepo
+
+//#cgo LDFLAGS: -lsysrepo -lyang -Wl,--allow-multiple-definition
+//#include "plugin.c"
+import "C"
+import (
+	"context"
+	"fmt"
+
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-northbound-bbf-adapter/internal/core"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
+)
+
+const (
+	eventNameOnuActivated = "ONU_ACTIVATED_RAISE_EVENT"
+)
+
+//Performs the necessary operations on a new voltha event received from Kafka
+func (p *SysrepoPlugin) ManageVolthaEvent(ctx context.Context, event *voltha.Event) {
+	if event.Header.Type == voltha.EventType_DEVICE_EVENT {
+		devEvent, ok := event.EventType.(*voltha.Event_DeviceEvent)
+		if !ok {
+			logger.Errorw(ctx, "unexpected-event-type", log.Fields{
+				"headerType": event.Header.Type,
+				"actualType": fmt.Sprintf("%T", event.EventType),
+			})
+			return
+		}
+
+		//TODO: map other events to ONU state changes
+		switch devEvent.DeviceEvent.DeviceEventName {
+		case eventNameOnuActivated:
+			logger.Debugw(ctx, "onu-activated-event-received", log.Fields{
+				"header":      event.Header,
+				"deviceEvent": devEvent.DeviceEvent,
+			})
+
+			if err := p.sendOnuActivatedNotification(ctx, event.Header, devEvent.DeviceEvent); err != nil {
+				logger.Errorw(ctx, "failed-to-send-onu-activated-notification", log.Fields{"err": err})
+			}
+		}
+	}
+}
+
+//Sends a notification based on the content of the received device event
+func (p *SysrepoPlugin) sendOnuActivatedNotification(ctx context.Context, eventHeader *voltha.EventHeader, deviceEvent *voltha.DeviceEvent) error {
+	//Prepare the content of the notification
+	notificationItems, channelTermItems, err := core.TranslateOnuActivatedEvent(eventHeader, deviceEvent)
+	if err != nil {
+		return fmt.Errorf("failed-to-translate-onu-activated-event: %v", err)
+	}
+
+	//Create the channel termination in the datastore to make the notification leafref valid
+	channelTermTree, err := createYangTree(ctx, p.operationalSession, channelTermItems)
+	if err != nil {
+		return fmt.Errorf("failed-to-create-channel-termination-tree: %v", err)
+	}
+	defer C.lyd_free_all(channelTermTree)
+
+	err = editDatastore(ctx, p.operationalSession, channelTermTree)
+	if err != nil {
+		return fmt.Errorf("failed-to-apply-channel-termination-to-datastore: %v", err)
+	}
+
+	//Create the notification tree
+	notificationTree, err := createYangTree(ctx, p.operationalSession, notificationItems)
+	if err != nil {
+		return fmt.Errorf("failed-to-create-onu-activated-notification-tree: %v", err)
+	}
+
+	//Let sysrepo manage the notification tree to properly free it after its delivery
+	var notificationData *C.sr_data_t
+	errCode := C.sr_acquire_data(p.connection, notificationTree, &notificationData)
+	if errCode != C.SR_ERR_OK {
+		err := fmt.Errorf("cannot-acquire-notification-data")
+		logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+		return err
+	}
+	defer C.sr_release_data(notificationData)
+
+	//Send the notification
+	logger.Infow(ctx, "sending-onu-activated-notification", log.Fields{
+		"onuSn": deviceEvent.Context["serial-number"],
+	})
+	errCode = C.sr_notif_send_tree(p.operationalSession, notificationData.tree, 0, 0)
+	if errCode != C.SR_ERR_OK {
+		err := fmt.Errorf("cannot-send-notification")
+		logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+		return err
+	}
+
+	return nil
+}
diff --git a/internal/sysrepo/plugin.c b/internal/sysrepo/plugin.c
index 08e7978..f9f53c6 100644
--- a/internal/sysrepo/plugin.c
+++ b/internal/sysrepo/plugin.c
@@ -25,6 +25,9 @@
 typedef struct lyd_node lyd_node;
 typedef struct ly_ctx ly_ctx;
 
+//Used to define the datastore edit mode
+const char* mergeOperation = "merge";
+
 //Provides data for the schema-mount extension
 LY_ERR mountpoint_ext_data_clb(
     const struct lysc_ext_instance *ext,
@@ -37,7 +40,7 @@
     return LY_SUCCESS;
 }
 
-// Exported by sysrepo.go
+// Exported by callbacks.go
 sr_error_t get_devices_cb(sr_session_ctx_t *session, lyd_node **parent);
 
 //The wrapper functions are needed because CGO cannot express some keywords
diff --git a/internal/sysrepo/sysrepo.go b/internal/sysrepo/sysrepo.go
index 3a4dc79..973dbb9 100644
--- a/internal/sysrepo/sysrepo.go
+++ b/internal/sysrepo/sysrepo.go
@@ -31,73 +31,16 @@
 )
 
 type SysrepoPlugin struct {
-	connection      *C.sr_conn_ctx_t
-	session         *C.sr_session_ctx_t
-	subscription    *C.sr_subscription_ctx_t
-	schemaMountData *C.lyd_node
-}
-
-func srErrorMsg(code C.int) string {
-	return C.GoString(C.sr_strerror(code))
-}
-
-func lyErrorMsg(ly_ctx *C.ly_ctx) string {
-	lyErrString := C.ly_errmsg(ly_ctx)
-	defer freeCString(lyErrString)
-
-	return C.GoString(lyErrString)
-}
-
-func freeCString(str *C.char) {
-	if str != nil {
-		C.free(unsafe.Pointer(str))
-		str = nil
-	}
-}
-
-func updateYangItems(ctx context.Context, session *C.sr_session_ctx_t, parent **C.lyd_node, items []core.YangItem) error {
-	conn := C.sr_session_get_connection(session)
-	if conn == nil {
-		return fmt.Errorf("null-connection")
-	}
-
-	//libyang context
-	ly_ctx := C.sr_acquire_context(conn)
-	defer C.sr_release_context(conn)
-	if ly_ctx == nil {
-		return fmt.Errorf("null-libyang-context")
-	}
-
-	for _, item := range items {
-		if item.Value == "" {
-			continue
-		}
-
-		logger.Debugw(ctx, "updating-yang-item", log.Fields{"item": item})
-
-		path := C.CString(item.Path)
-		value := C.CString(item.Value)
-
-		lyErr := C.lyd_new_path(*parent, ly_ctx, path, value, 0, nil)
-		if lyErr != C.LY_SUCCESS {
-			freeCString(path)
-			freeCString(value)
-
-			err := fmt.Errorf("libyang-new-path-failed: %d %s", lyErr, lyErrorMsg(ly_ctx))
-
-			return err
-		}
-
-		freeCString(path)
-		freeCString(value)
-	}
-
-	return nil
+	connection         *C.sr_conn_ctx_t
+	operationalSession *C.sr_session_ctx_t
+	runningSession     *C.sr_session_ctx_t
+	subscription       *C.sr_subscription_ctx_t
+	schemaMountData    *C.lyd_node
 }
 
 //createPluginState populates a SysrepoPlugin struct by establishing
 //a connection and a session
-func (p *SysrepoPlugin) createSession(ctx context.Context) error {
+func (p *SysrepoPlugin) createSessions(ctx context.Context) error {
 	var errCode C.int
 
 	//Populates connection
@@ -108,10 +51,23 @@
 		return err
 	}
 
-	//Populates session
-	errCode = C.sr_session_start(p.connection, C.SR_DS_RUNNING, &p.session)
+	//Populates sessions
+	//The session on the operation datastore will be used for most operations
+	//The session on the running datastore will be used for the subscription to edits
+	//since the operational datastore can't be edited by the client
+	errCode = C.sr_session_start(p.connection, C.SR_DS_OPERATIONAL, &p.operationalSession)
 	if errCode != C.SR_ERR_OK {
-		err := fmt.Errorf("sysrepo-session-error")
+		err := fmt.Errorf("sysrepo-operational-session-error")
+		logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+
+		_ = p.Stop(ctx)
+
+		return err
+	}
+
+	errCode = C.sr_session_start(p.connection, C.SR_DS_RUNNING, &p.runningSession)
+	if errCode != C.SR_ERR_OK {
+		err := fmt.Errorf("sysrepo-running-session-error")
 		logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
 
 		_ = p.Stop(ctx)
@@ -122,49 +78,20 @@
 	return nil
 }
 
-//export get_devices_cb
-func get_devices_cb(session *C.sr_session_ctx_t, parent **C.lyd_node) C.sr_error_t {
-	//This function is a callback for the retrieval of devices from sysrepo
-	//The "export" comment instructs CGO to create a C function for it
-
-	ctx := context.Background()
-	logger.Debug(ctx, "processing-get-data-request")
-
-	if session == nil {
-		logger.Error(ctx, "sysrepo-get-data-null-session")
-		return C.SR_ERR_OPERATION_FAILED
-	}
-
-	if parent == nil {
-		logger.Error(ctx, "sysrepo-get-data-null-parent-node")
-		return C.SR_ERR_OPERATION_FAILED
-	}
-
-	if core.AdapterInstance == nil {
-		logger.Error(ctx, "sysrepo-get-data-nil-translator")
-		return C.SR_ERR_OPERATION_FAILED
-	}
-
-	devices, err := core.AdapterInstance.GetDevices(ctx)
-	if err != nil {
-		logger.Errorw(ctx, "sysrepo-get-data-translator-error", log.Fields{"err": err})
-		return C.SR_ERR_OPERATION_FAILED
-	}
-
-	err = updateYangItems(ctx, session, parent, devices)
-	if err != nil {
-		logger.Errorw(ctx, "sysrepo-get-data-update-error", log.Fields{"err": err})
-		return C.SR_ERR_OPERATION_FAILED
-	}
-
-	return C.SR_ERR_OK
-}
-
 func StartNewPlugin(ctx context.Context, schemaMountFilePath string) (*SysrepoPlugin, error) {
 	plugin := &SysrepoPlugin{}
 
+	//Set sysrepo and libyang log level
+	if logger.GetLogLevel() == log.DebugLevel {
+		C.sr_log_stderr(C.SR_LL_INF)
+		C.ly_log_level(C.LY_LLVRB)
+	} else {
+		C.sr_log_stderr(C.SR_LL_ERR)
+		C.ly_log_level(C.LY_LLERR)
+	}
+
 	//Open a session to sysrepo
-	err := plugin.createSession(ctx)
+	err := plugin.createSessions(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -201,16 +128,15 @@
 	//Set callbacks for events
 
 	//Subscribe with a callback to the request of data on a certain path
-	module := C.CString(core.DeviceAggregationModel)
-	defer freeCString(module)
-
-	path := C.CString(core.DevicesPath + "/*")
-	defer freeCString(path)
+	devicesModule := C.CString(core.DeviceAggregationModule)
+	devicesPath := C.CString(core.DevicesPath + "/*")
+	defer freeCString(devicesModule)
+	defer freeCString(devicesPath)
 
 	errCode := C.sr_oper_get_subscribe(
-		plugin.session,
-		module,
-		path,
+		plugin.operationalSession,
+		devicesModule,
+		devicesPath,
 		C.function(C.get_devices_cb_wrapper),
 		C.NULL,
 		C.SR_SUBSCR_DEFAULT,
@@ -244,15 +170,25 @@
 		p.subscription = nil
 	}
 
-	//Frees session
-	if p.session != nil {
-		errCode = C.sr_session_stop(p.session)
+	//Frees sessions
+	if p.operationalSession != nil {
+		errCode = C.sr_session_stop(p.operationalSession)
 		if errCode != C.SR_ERR_OK {
-			err := fmt.Errorf("failed-to-close-sysrepo-session")
+			err := fmt.Errorf("failed-to-close-operational-session")
 			logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
 			return err
 		}
-		p.session = nil
+		p.operationalSession = nil
+	}
+
+	if p.runningSession != nil {
+		errCode = C.sr_session_stop(p.runningSession)
+		if errCode != C.SR_ERR_OK {
+			err := fmt.Errorf("failed-to-close-running-session")
+			logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+			return err
+		}
+		p.runningSession = nil
 	}
 
 	//Frees connection
diff --git a/internal/sysrepo/utils.go b/internal/sysrepo/utils.go
new file mode 100644
index 0000000..d564dc7
--- /dev/null
+++ b/internal/sysrepo/utils.go
@@ -0,0 +1,173 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+
+* http://www.apache.org/licenses/LICENSE-2.0
+
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sysrepo
+
+//#cgo LDFLAGS: -lsysrepo -lyang -Wl,--allow-multiple-definition
+//#include "plugin.c"
+import "C"
+import (
+	"context"
+	"fmt"
+	"unsafe"
+
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-northbound-bbf-adapter/internal/core"
+)
+
+//srErrorMsg provides a description of a sysrepo error code
+func srErrorMsg(code C.int) string {
+	return C.GoString(C.sr_strerror(code))
+}
+
+//lyErrorMsg provides the last libyang error message
+func lyErrorMsg(ly_ctx *C.ly_ctx) string {
+	lyErrString := C.ly_errmsg(ly_ctx)
+	defer freeCString(lyErrString)
+
+	return C.GoString(lyErrString)
+}
+
+func freeCString(str *C.char) {
+	if str != nil {
+		C.free(unsafe.Pointer(str))
+		str = nil
+	}
+}
+
+//Creates a new libyang nodes tree from a set of new paths.
+//The tree must bee manually freed after its use with C.lyd_free_all or
+//an equivalent function
+func createYangTree(ctx context.Context, session *C.sr_session_ctx_t, items []core.YangItem) (*C.lyd_node, error) {
+	if len(items) == 0 {
+		return nil, fmt.Errorf("no-items")
+	}
+
+	conn := C.sr_session_get_connection(session)
+	if conn == nil {
+		return nil, fmt.Errorf("null-connection")
+	}
+
+	//libyang context
+	ly_ctx := C.sr_acquire_context(conn)
+	if ly_ctx == nil {
+		return nil, fmt.Errorf("null-libyang-context")
+	}
+	defer C.sr_release_context(conn)
+
+	//Create parent node
+	parentPath := C.CString(items[0].Path)
+	parentValue := C.CString(items[0].Value)
+
+	var parent *C.lyd_node
+	lyErr := C.lyd_new_path(nil, ly_ctx, parentPath, parentValue, 0, &parent)
+	if lyErr != C.LY_SUCCESS {
+		err := fmt.Errorf("libyang-new-path-failed: %d %s", lyErr, lyErrorMsg(ly_ctx))
+		return nil, err
+	}
+	logger.Debugw(ctx, "creating-yang-item", log.Fields{"item": items[0]})
+
+	freeCString(parentPath)
+	freeCString(parentValue)
+
+	//Add remaining nodes
+	for _, item := range items[1:] {
+		logger.Debugw(ctx, "creating-yang-item", log.Fields{"item": item})
+
+		path := C.CString(item.Path)
+		value := C.CString(item.Value)
+
+		lyErr := C.lyd_new_path(parent, ly_ctx, path, value, 0, nil)
+		if lyErr != C.LY_SUCCESS {
+			freeCString(path)
+			freeCString(value)
+
+			//Free the partially created tree
+			C.lyd_free_all(parent)
+
+			err := fmt.Errorf("libyang-new-path-failed: %d %s", lyErr, lyErrorMsg(ly_ctx))
+
+			return nil, err
+		}
+
+		freeCString(path)
+		freeCString(value)
+	}
+
+	return parent, nil
+}
+
+//Creates a set of new paths under an existing libyang tree parent node
+func updateYangTree(ctx context.Context, session *C.sr_session_ctx_t, parent **C.lyd_node, items []core.YangItem) error {
+	if len(items) == 0 {
+		//Nothing to do
+		return nil
+	}
+
+	conn := C.sr_session_get_connection(session)
+	if conn == nil {
+		return fmt.Errorf("null-connection")
+	}
+
+	//libyang context
+	ly_ctx := C.sr_acquire_context(conn)
+	if ly_ctx == nil {
+		return fmt.Errorf("null-libyang-context")
+	}
+	defer C.sr_release_context(conn)
+
+	for _, item := range items {
+		logger.Debugw(ctx, "updating-yang-item", log.Fields{"item": item})
+
+		path := C.CString(item.Path)
+		value := C.CString(item.Value)
+
+		lyErr := C.lyd_new_path(*parent, ly_ctx, path, value, 0, nil)
+		if lyErr != C.LY_SUCCESS {
+			freeCString(path)
+			freeCString(value)
+
+			err := fmt.Errorf("libyang-new-path-failed: %d %s", lyErr, lyErrorMsg(ly_ctx))
+
+			return err
+		}
+
+		freeCString(path)
+		freeCString(value)
+	}
+
+	return nil
+}
+
+//Merges the content of a yang tree with the content of the datastore.
+//The target datastore is the one on which the session has been created
+func editDatastore(ctx context.Context, session *C.sr_session_ctx_t, editsTree *C.lyd_node) error {
+	errCode := C.sr_edit_batch(session, editsTree, C.mergeOperation)
+	if errCode != C.SR_ERR_OK {
+		err := fmt.Errorf("failed-to-edit-datastore")
+		logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+		return err
+	}
+
+	errCode = C.sr_apply_changes(session, 0)
+	if errCode != C.SR_ERR_OK {
+		err := fmt.Errorf("failed-to-apply-datastore-changes")
+		logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+		return err
+	}
+
+	return nil
+}