Add initial support for provisioning and removing services, getting service data

Change-Id: Ie49206d788a202e70a8d64f083c3f85b92ced8fb
diff --git a/internal/sysrepo/callbacks.go b/internal/sysrepo/callbacks.go
index 1256568..27c8362 100644
--- a/internal/sysrepo/callbacks.go
+++ b/internal/sysrepo/callbacks.go
@@ -21,6 +21,8 @@
 import "C"
 import (
 	"context"
+	"fmt"
+	"strconv"
 
 	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 	"github.com/opencord/voltha-northbound-bbf-adapter/internal/core"
@@ -65,3 +67,331 @@
 
 	return C.SR_ERR_OK
 }
+
+//export get_services_cb
+func get_services_cb(session *C.sr_session_ctx_t, parent **C.lyd_node) C.sr_error_t {
+	//This function is a callback for the retrieval of devices from sysrepo
+	//The "export" comment instructs CGO to create a C function for it
+
+	ctx := context.Background()
+	logger.Debug(ctx, "processing-get-services-request")
+
+	if session == nil {
+		logger.Error(ctx, "sysrepo-get-services-null-session")
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	if parent == nil {
+		logger.Error(ctx, "sysrepo-get-services-null-parent-node")
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	if core.AdapterInstance == nil {
+		logger.Error(ctx, "sysrepo-get-services-nil-translator")
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	services, err := core.AdapterInstance.GetServices(ctx)
+	if err != nil {
+		logger.Errorw(ctx, "sysrepo-get-services-translation-error", log.Fields{"err": err})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	err = updateYangTree(ctx, session, parent, services)
+	if err != nil {
+		logger.Errorw(ctx, "sysrepo-get-services-update-error", log.Fields{"err": err})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	logger.Info(ctx, "services-information-request-served")
+
+	return C.SR_ERR_OK
+}
+
+//export get_vlans_cb
+func get_vlans_cb(session *C.sr_session_ctx_t, parent **C.lyd_node) C.sr_error_t {
+	//This function is a callback for the retrieval of vlans from sysrepo
+	//The "export" comment instructs CGO to create a C function for it
+
+	ctx := context.Background()
+	logger.Debug(ctx, "processing-get-vlans-request")
+
+	if session == nil {
+		logger.Error(ctx, "sysrepo-get-vlans-null-session")
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	if parent == nil {
+		logger.Error(ctx, "sysrepo-get-vlans-null-parent-node")
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	if core.AdapterInstance == nil {
+		logger.Error(ctx, "sysrepo-get-vlans-nil-translator")
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	vlans, err := core.AdapterInstance.GetVlans(ctx)
+	if err != nil {
+		logger.Errorw(ctx, "sysrepo-get-vlans-translation-error", log.Fields{"err": err})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	err = updateYangTree(ctx, session, parent, vlans)
+	if err != nil {
+		logger.Errorw(ctx, "sysrepo-get-vlans-update-error", log.Fields{"err": err})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	logger.Info(ctx, "vlans-information-request-served")
+
+	return C.SR_ERR_OK
+}
+
+//export get_bandwidth_profiles_cb
+func get_bandwidth_profiles_cb(session *C.sr_session_ctx_t, parent **C.lyd_node) C.sr_error_t {
+	//This function is a callback for the retrieval of bandwidth profiles from sysrepo
+	//The "export" comment instructs CGO to create a C function for it
+
+	ctx := context.Background()
+	logger.Debug(ctx, "processing-get-bandwidth-profiles-request")
+
+	if session == nil {
+		logger.Error(ctx, "sysrepo-get-bandwidth-profiles-null-session")
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	if parent == nil {
+		logger.Error(ctx, "sysrepo-get-bandwidth-profiles-null-parent-node")
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	if core.AdapterInstance == nil {
+		logger.Error(ctx, "sysrepo-get-bandwidth-profiles-nil-translator")
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	bwProfiles, err := core.AdapterInstance.GetBandwidthProfiles(ctx)
+	if err != nil {
+		logger.Errorw(ctx, "sysrepo-get-bandwidth-profiles-translation-error", log.Fields{"err": err})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	err = updateYangTree(ctx, session, parent, bwProfiles)
+	if err != nil {
+		logger.Errorw(ctx, "sysrepo-get-bandwidth-profiles-update-error", log.Fields{"err": err})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	logger.Info(ctx, "bandwidth-profiles-information-request-served")
+
+	return C.SR_ERR_OK
+}
+
+//export edit_service_profiles_cb
+func edit_service_profiles_cb(editSession *C.sr_session_ctx_t, runningSession *C.sr_session_ctx_t, event C.sr_event_t) C.sr_error_t {
+	//This function is a callback for changes on service profiles
+	//The "export" comment instructs CGO to create a C function for it
+
+	if event != C.SR_EV_CHANGE {
+		return C.SR_ERR_OK
+	}
+
+	ctx := context.Background()
+	logger.Debug(ctx, "processing-service-profile-changes")
+
+	serviceNamesChanges, err := getChangesList(ctx, editSession, core.ServiceProfilesPath+"/service-profile/name")
+	if err != nil {
+		logger.Errorw(ctx, "cannot-get-service-profile-names-changes", log.Fields{"err": err})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	for _, n := range serviceNamesChanges {
+		switch n.Operation {
+		case C.SR_OP_CREATED:
+			if errCode := edit_service_create(ctx, editSession, runningSession, n.Value); errCode != C.SR_ERR_OK {
+				return errCode
+			}
+		case C.SR_OP_DELETED:
+			if errCode := edit_service_delete(ctx, editSession, runningSession, n.Value); errCode != C.SR_ERR_OK {
+				return errCode
+			}
+		default:
+			return C.SR_ERR_UNSUPPORTED
+		}
+	}
+
+	return C.SR_ERR_OK
+}
+
+func edit_service_create(ctx context.Context, editSession *C.sr_session_ctx_t, runningSession *C.sr_session_ctx_t, serviceName string) C.sr_error_t {
+	portName, err := getSingleChangeValue(ctx, editSession, fmt.Sprintf("%s/service-profile[name='%s']/ports/port/name", core.ServiceProfilesPath, serviceName))
+	if err != nil {
+		logger.Errorw(ctx, "cannot-get-service-profile-port-changes", log.Fields{"err": err, "service": serviceName})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	servicePortPath := core.GetServicePortPath(serviceName, portName)
+
+	tpId, err := getSingleChangeValue(ctx, editSession, servicePortPath+"/bbf-nt-service-profile-voltha:technology-profile-id")
+	if err != nil {
+		logger.Errorw(ctx, "cannot-get-service-profile-tp-id-change", log.Fields{"err": err, "service": serviceName})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	vlanName, err := getSingleChangeValue(ctx, editSession, servicePortPath+"/port-vlans/port-vlan/name")
+	if err != nil {
+		logger.Errorw(ctx, "cannot-get-service-profile-vlan-change", log.Fields{"err": err, "service": serviceName})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	vlansPath := core.GetVlansPath(vlanName)
+
+	sTag, err := getSingleChangeValue(ctx, editSession, vlansPath+"/ingress-rewrite/push-outer-tag/vlan-id")
+	if err != nil {
+		logger.Errorw(ctx, "cannot-get-service-profile-stag-changes", log.Fields{"err": err, "service": serviceName})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+	if sTag == core.YangVlanIdAny {
+		sTag = strconv.Itoa(core.VolthaVlanIdAny)
+	}
+
+	cTag, err := getSingleChangeValue(ctx, editSession, vlansPath+"/ingress-rewrite/push-second-tag/vlan-id")
+	if err != nil {
+		logger.Errorw(ctx, "cannot-get-service-profile-stag-changes", log.Fields{"err": err, "service": serviceName})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+	if cTag == core.YangVlanIdAny {
+		cTag = strconv.Itoa(core.VolthaVlanIdAny)
+	}
+
+	logger.Infow(ctx, "new-service-profile-information", log.Fields{
+		"service":  serviceName,
+		"port":     portName,
+		"vlanName": vlanName,
+		"tpId":     tpId,
+		"sTag":     sTag,
+		"cTag":     cTag,
+	})
+
+	if core.AdapterInstance == nil {
+		logger.Error(ctx, "sysrepo-service-changes-nil-translator")
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	if err := core.AdapterInstance.ProvisionService(portName, sTag, cTag, tpId); err != nil {
+		logger.Errorw(ctx, "service-provisioning-error", log.Fields{
+			"service": serviceName,
+			"err":     err,
+		})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	logger.Infow(ctx, "service-profile-creation-request-served", log.Fields{
+		"service": serviceName,
+	})
+
+	return C.SR_ERR_OK
+}
+
+func edit_service_delete(ctx context.Context, editSession *C.sr_session_ctx_t, runningSession *C.sr_session_ctx_t, serviceName string) C.sr_error_t {
+	portName, err := getDatastoreLeafValue(ctx, runningSession, fmt.Sprintf("%s/service-profile[name='%s']/ports/port/name", core.ServiceProfilesPath, serviceName))
+	if err != nil {
+		logger.Errorw(ctx, "cannot-get-service-profile-port-leaf", log.Fields{"err": err, "service": serviceName})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	servicePortPath := core.GetServicePortPath(serviceName, portName)
+
+	tpId, err := getDatastoreLeafValue(ctx, runningSession, servicePortPath+"/bbf-nt-service-profile-voltha:technology-profile-id")
+	if err != nil {
+		logger.Errorw(ctx, "cannot-get-service-profile-tp-id-leaf", log.Fields{"err": err, "service": serviceName})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	vlanName, err := getDatastoreLeafValue(ctx, runningSession, servicePortPath+"/port-vlans/port-vlan/name")
+	if err != nil {
+		logger.Errorw(ctx, "cannot-get-service-profile-vlan-leaf", log.Fields{"err": err, "service": serviceName})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	vlansPath := core.GetVlansPath(vlanName)
+
+	sTag, err := getDatastoreLeafValue(ctx, runningSession, vlansPath+"/ingress-rewrite/push-outer-tag/vlan-id")
+	if err != nil {
+		logger.Errorw(ctx, "cannot-get-service-profile-stag-leaf", log.Fields{"err": err, "service": serviceName})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+	if sTag == core.YangVlanIdAny {
+		sTag = strconv.Itoa(core.VolthaVlanIdAny)
+	}
+
+	cTag, err := getDatastoreLeafValue(ctx, runningSession, vlansPath+"/ingress-rewrite/push-second-tag/vlan-id")
+	if err != nil {
+		logger.Errorw(ctx, "cannot-get-service-profile-stag-leaf", log.Fields{"err": err, "service": serviceName})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+	if cTag == core.YangVlanIdAny {
+		cTag = strconv.Itoa(core.VolthaVlanIdAny)
+	}
+
+	logger.Infow(ctx, "service-profile-deletion-information", log.Fields{
+		"service":  serviceName,
+		"port":     portName,
+		"vlanName": vlanName,
+		"tpId":     tpId,
+		"sTag":     sTag,
+		"cTag":     cTag,
+	})
+
+	if err := core.AdapterInstance.RemoveService(portName, sTag, cTag, tpId); err != nil {
+		logger.Errorw(ctx, "service-removal-error", log.Fields{
+			"service": serviceName,
+			"err":     err,
+		})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	logger.Infow(ctx, "service-profile-removal-request-served", log.Fields{
+		"service": serviceName,
+	})
+
+	return C.SR_ERR_OK
+}
+
+//export edit_vlans_cb
+func edit_vlans_cb(editSession *C.sr_session_ctx_t, event C.sr_event_t) C.sr_error_t {
+	//This function is a callback for changes on VLANs
+	//The "export" comment instructs CGO to create a C function for it
+
+	if event != C.SR_EV_CHANGE {
+		return C.SR_ERR_OK
+	}
+
+	ctx := context.Background()
+	logger.Debug(ctx, "processing-vlans-changes")
+
+	vlanChanges, err := getChangesList(ctx, editSession, core.VlansPath+"//.")
+	if err != nil {
+		logger.Errorw(ctx, "cannot-get-vlans-changes", log.Fields{"err": err})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	for _, n := range vlanChanges {
+		//VLANs must be defined through creation (for service provisioning)
+		//or deletion (for service removal). Changes to the VLAN values
+		//are not supported, because VOLTHA does not support dynamic changes
+		//to the service.
+		switch n.Operation {
+		case C.SR_OP_CREATED:
+		case C.SR_OP_DELETED:
+			//Everything will be handled in the services callback
+			//Just approve the change here
+			return C.SR_ERR_OK
+		default:
+			return C.SR_ERR_UNSUPPORTED
+		}
+	}
+
+	return C.SR_ERR_OK
+}
diff --git a/internal/sysrepo/plugin.c b/internal/sysrepo/plugin.c
index f9f53c6..82e9e3e 100644
--- a/internal/sysrepo/plugin.c
+++ b/internal/sysrepo/plugin.c
@@ -42,6 +42,11 @@
 
 // Exported by callbacks.go
 sr_error_t get_devices_cb(sr_session_ctx_t *session, lyd_node **parent);
+sr_error_t get_services_cb(sr_session_ctx_t *session, lyd_node **parent);
+sr_error_t get_vlans_cb(sr_session_ctx_t *session, lyd_node **parent);
+sr_error_t get_bandwidth_profiles_cb(sr_session_ctx_t *session, lyd_node **parent);
+sr_error_t edit_service_profiles_cb(sr_session_ctx_t *session, sr_session_ctx_t *runningSession, sr_event_t event);
+sr_error_t edit_vlans_cb(sr_session_ctx_t *session, sr_event_t event);
 
 //The wrapper functions are needed because CGO cannot express some keywords
 //such as "const", and thus it can't match sysrepo's callback signature
@@ -57,4 +62,68 @@
     void *private_data)
 {
     return get_devices_cb(session, parent);
+}
+
+int get_services_cb_wrapper(
+    sr_session_ctx_t *session,
+    uint32_t subscription_id,
+    const char *module_name,
+    const char *path,
+    const char *request_xpath,
+    uint32_t request_id,
+    struct lyd_node **parent,
+    void *private_data)
+{
+    return get_services_cb(session, parent);
+}
+
+int get_vlans_cb_wrapper(
+    sr_session_ctx_t *session,
+    uint32_t subscription_id,
+    const char *module_name,
+    const char *path,
+    const char *request_xpath,
+    uint32_t request_id,
+    struct lyd_node **parent,
+    void *private_data)
+{
+    return get_vlans_cb(session, parent);
+}
+
+int get_bandwidth_profiles_cb_wrapper(
+    sr_session_ctx_t *session,
+    uint32_t subscription_id,
+    const char *module_name,
+    const char *path,
+    const char *request_xpath,
+    uint32_t request_id,
+    struct lyd_node **parent,
+    void *private_data)
+{
+    return get_bandwidth_profiles_cb(session, parent);
+}
+
+int edit_service_profiles_cb_wrapper(
+    sr_session_ctx_t *session,
+    uint32_t subscription_id,
+    const char *module_name,
+    const char *path,
+    sr_event_t event,
+    uint32_t request_id,
+    void *private_data)
+{
+    sr_session_ctx_t* runningSession = (sr_session_ctx_t*)private_data;
+    return edit_service_profiles_cb(session, runningSession, event);
+}
+
+int edit_vlans_cb_wrapper(
+    sr_session_ctx_t *session,
+    uint32_t subscription_id,
+    const char *module_name,
+    const char *path,
+    sr_event_t event,
+    uint32_t request_id,
+    void *private_data)
+{
+    return edit_vlans_cb(session, event);
 }
\ No newline at end of file
diff --git a/internal/sysrepo/sysrepo.go b/internal/sysrepo/sysrepo.go
index 973dbb9..a2b19af 100644
--- a/internal/sysrepo/sysrepo.go
+++ b/internal/sysrepo/sysrepo.go
@@ -133,6 +133,22 @@
 	defer freeCString(devicesModule)
 	defer freeCString(devicesPath)
 
+	servicesModule := C.CString(core.ServiceProfileModule)
+	servicesPath := C.CString(core.ServiceProfilesPath + "/*")
+	defer freeCString(servicesModule)
+	defer freeCString(servicesPath)
+
+	vlansModule := C.CString(core.VlansModule)
+	vlansPath := C.CString(core.VlansPath + "/*")
+	defer freeCString(vlansModule)
+	defer freeCString(vlansPath)
+
+	bwProfilesModule := C.CString(core.BandwidthProfileModule)
+	bwProfilesPath := C.CString(core.BandwidthProfilesPath + "/*")
+	defer freeCString(bwProfilesModule)
+	defer freeCString(bwProfilesPath)
+
+	//Get devices
 	errCode := C.sr_oper_get_subscribe(
 		plugin.operationalSession,
 		devicesModule,
@@ -143,7 +159,90 @@
 		&plugin.subscription,
 	)
 	if errCode != C.SR_ERR_OK {
-		err := fmt.Errorf("sysrepo-failed-subscription-to-get-events")
+		err := fmt.Errorf("sysrepo-failed-subscription-to-get-devices")
+		logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+		return nil, err
+	}
+
+	//Get services
+	errCode = C.sr_oper_get_subscribe(
+		plugin.operationalSession,
+		servicesModule,
+		servicesPath,
+		C.function(C.get_services_cb_wrapper),
+		C.NULL,
+		C.SR_SUBSCR_DEFAULT,
+		&plugin.subscription,
+	)
+	if errCode != C.SR_ERR_OK {
+		err := fmt.Errorf("sysrepo-failed-subscription-to-get-services")
+		logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+		return nil, err
+	}
+
+	//Get vlans
+	errCode = C.sr_oper_get_subscribe(
+		plugin.operationalSession,
+		vlansModule,
+		vlansPath,
+		C.function(C.get_vlans_cb_wrapper),
+		C.NULL,
+		C.SR_SUBSCR_DEFAULT,
+		&plugin.subscription,
+	)
+	if errCode != C.SR_ERR_OK {
+		err := fmt.Errorf("sysrepo-failed-subscription-to-get-services")
+		logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+		return nil, err
+	}
+
+	//Get bandwidth profiles
+	errCode = C.sr_oper_get_subscribe(
+		plugin.operationalSession,
+		bwProfilesModule,
+		bwProfilesPath,
+		C.function(C.get_bandwidth_profiles_cb_wrapper),
+		C.NULL,
+		C.SR_SUBSCR_DEFAULT,
+		&plugin.subscription,
+	)
+	if errCode != C.SR_ERR_OK {
+		err := fmt.Errorf("sysrepo-failed-subscription-to-get-services")
+		logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+		return nil, err
+	}
+
+	//Subscribe with a callback to changes of configuration in the services modules
+	//Changes to services
+	errCode = C.sr_module_change_subscribe(
+		plugin.runningSession,
+		servicesModule,
+		servicesPath,
+		C.function(C.edit_service_profiles_cb_wrapper),
+		unsafe.Pointer(plugin.runningSession), //Pass session for running datastore to get current data
+		0,
+		C.SR_SUBSCR_DEFAULT,
+		&plugin.subscription,
+	)
+	if errCode != C.SR_ERR_OK {
+		err := fmt.Errorf("sysrepo-failed-subscription-to-change-services")
+		logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+		return nil, err
+	}
+
+	//Changes to VLANs
+	errCode = C.sr_module_change_subscribe(
+		plugin.runningSession,
+		vlansModule,
+		vlansPath,
+		C.function(C.edit_vlans_cb_wrapper),
+		C.NULL,
+		0,
+		C.SR_SUBSCR_DEFAULT,
+		&plugin.subscription,
+	)
+	if errCode != C.SR_ERR_OK {
+		err := fmt.Errorf("sysrepo-failed-subscription-to-change-vlans")
 		logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
 		return nil, err
 	}
diff --git a/internal/sysrepo/utils.go b/internal/sysrepo/utils.go
index d564dc7..4db9a53 100644
--- a/internal/sysrepo/utils.go
+++ b/internal/sysrepo/utils.go
@@ -135,7 +135,7 @@
 		path := C.CString(item.Path)
 		value := C.CString(item.Value)
 
-		lyErr := C.lyd_new_path(*parent, ly_ctx, path, value, 0, nil)
+		lyErr := C.lyd_new_path(*parent, ly_ctx, path, value, C.LYD_NEW_PATH_UPDATE, nil)
 		if lyErr != C.LY_SUCCESS {
 			freeCString(path)
 			freeCString(value)
@@ -171,3 +171,110 @@
 
 	return nil
 }
+
+type YangChange struct {
+	Path      string
+	Value     string
+	Operation C.sr_change_oper_t
+	/* Operation values:
+	SR_OP_CREATED
+	SR_OP_MODIFIED
+	SR_OP_DELETED
+	SR_OP_MOVED
+	*/
+}
+
+//Provides a list of the changes occured under a specific path
+//Should only be used on the session from an sr_module_change_subscribe callback
+func getChangesList(ctx context.Context, editSession *C.sr_session_ctx_t, path string) ([]YangChange, error) {
+	result := []YangChange{}
+
+	changesPath := C.CString(path)
+	defer freeCString(changesPath)
+
+	var changesIterator *C.sr_change_iter_t
+	errCode := C.sr_get_changes_iter(editSession, changesPath, &changesIterator)
+	if errCode != C.SR_ERR_OK {
+		return nil, fmt.Errorf("cannot-get-iterator: %d %s", errCode, srErrorMsg(errCode))
+	}
+	defer C.sr_free_change_iter(changesIterator)
+
+	//Iterate over the changes
+	var operation C.sr_change_oper_t
+	var prevValue, prevList *C.char
+	var prevDefault C.int
+
+	var node *C.lyd_node
+	defer C.lyd_free_all(node)
+
+	errCode = C.sr_get_change_tree_next(editSession, changesIterator, &operation, &node, &prevValue, &prevList, &prevDefault)
+	for errCode != C.SR_ERR_NOT_FOUND {
+		if errCode != C.SR_ERR_OK {
+			return nil, fmt.Errorf("next-change-error: %d %s", errCode, srErrorMsg(errCode))
+		}
+
+		currentChange := YangChange{}
+		currentChange.Operation = operation
+
+		nodePath := C.lyd_path(node, C.LYD_PATH_STD, nil, 0)
+		if nodePath == nil {
+			return nil, fmt.Errorf("cannot-get-change-path")
+		}
+		currentChange.Path = C.GoString(nodePath)
+		freeCString(nodePath)
+
+		nodeValue := C.lyd_get_value(node)
+		if nodeValue != nil {
+			currentChange.Value = C.GoString(nodeValue)
+			result = append(result, currentChange)
+		}
+
+		errCode = C.sr_get_change_tree_next(editSession, changesIterator, &operation, &node, &prevValue, &prevList, &prevDefault)
+	}
+
+	return result, nil
+}
+
+//Verify that only one change occured under the specified path, and return its value
+//Should only be used on the session from an sr_module_change_subscribe callback
+func getSingleChangeValue(ctx context.Context, session *C.sr_session_ctx_t, path string) (string, error) {
+	changesList, err := getChangesList(ctx, session, path)
+	if err != nil {
+		return "", err
+	}
+
+	if len(changesList) != 1 {
+		logger.Errorw(ctx, "unexpected-number-of-yang-changes", log.Fields{
+			"changes": changesList,
+		})
+		return "", fmt.Errorf("unexpected-number-of-yang-changes")
+	}
+
+	return changesList[0].Value, nil
+}
+
+//Get the value of a leaf from the datastore
+//The target datastore is the one on which the session has been created
+func getDatastoreLeafValue(ctx context.Context, session *C.sr_session_ctx_t, path string) (string, error) {
+	cPath := C.CString(path)
+	defer freeCString(cPath)
+
+	var data *C.sr_data_t
+	defer C.sr_release_data(data)
+
+	errCode := C.sr_get_subtree(session, cPath, 0, &data)
+	if errCode != C.SR_ERR_OK {
+		return "", fmt.Errorf("cannot-get-data-from-datastore: %d %s", errCode, srErrorMsg(errCode))
+	}
+
+	if data == nil {
+		return "", fmt.Errorf("no-data-found-for-path: %s", path)
+	}
+
+	nodeValue := C.lyd_get_value(data.tree)
+	if nodeValue == nil {
+		return "", fmt.Errorf("cannot-get-value-from-data: %s", path)
+	}
+
+	return C.GoString(nodeValue), nil
+}