Add NETCONF notification for ONU activation and Kafka client to receive events, update dependencies

Change-Id: I5f768fa8077ef7c64e00a534744ca47492344935
diff --git a/internal/sysrepo/callbacks.go b/internal/sysrepo/callbacks.go
new file mode 100644
index 0000000..1256568
--- /dev/null
+++ b/internal/sysrepo/callbacks.go
@@ -0,0 +1,67 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+
+* http://www.apache.org/licenses/LICENSE-2.0
+
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sysrepo
+
+//#cgo LDFLAGS: -lsysrepo -lyang -Wl,--allow-multiple-definition
+//#include "plugin.c"
+import "C"
+import (
+	"context"
+
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-northbound-bbf-adapter/internal/core"
+)
+
+//export get_devices_cb
+func get_devices_cb(session *C.sr_session_ctx_t, parent **C.lyd_node) C.sr_error_t {
+	//This function is a callback for the retrieval of devices from sysrepo
+	//The "export" comment instructs CGO to create a C function for it
+
+	ctx := context.Background()
+	logger.Debug(ctx, "processing-get-devices-request")
+
+	if session == nil {
+		logger.Error(ctx, "sysrepo-get-devices-null-session")
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	if parent == nil {
+		logger.Error(ctx, "sysrepo-get-devices-null-parent-node")
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	if core.AdapterInstance == nil {
+		logger.Error(ctx, "sysrepo-get-devices-nil-translator")
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	devices, err := core.AdapterInstance.GetDevices(ctx)
+	if err != nil {
+		logger.Errorw(ctx, "sysrepo-get-devices-translator-error", log.Fields{"err": err})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	err = updateYangTree(ctx, session, parent, devices)
+	if err != nil {
+		logger.Errorw(ctx, "sysrepo-get-devices-update-error", log.Fields{"err": err})
+		return C.SR_ERR_OPERATION_FAILED
+	}
+
+	logger.Info(ctx, "devices-information-request-served")
+
+	return C.SR_ERR_OK
+}
diff --git a/internal/sysrepo/events.go b/internal/sysrepo/events.go
new file mode 100644
index 0000000..4ababe3
--- /dev/null
+++ b/internal/sysrepo/events.go
@@ -0,0 +1,110 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+
+* http://www.apache.org/licenses/LICENSE-2.0
+
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sysrepo
+
+//#cgo LDFLAGS: -lsysrepo -lyang -Wl,--allow-multiple-definition
+//#include "plugin.c"
+import "C"
+import (
+	"context"
+	"fmt"
+
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-northbound-bbf-adapter/internal/core"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
+)
+
+const (
+	eventNameOnuActivated = "ONU_ACTIVATED_RAISE_EVENT"
+)
+
+//Performs the necessary operations on a new voltha event received from Kafka
+func (p *SysrepoPlugin) ManageVolthaEvent(ctx context.Context, event *voltha.Event) {
+	if event.Header.Type == voltha.EventType_DEVICE_EVENT {
+		devEvent, ok := event.EventType.(*voltha.Event_DeviceEvent)
+		if !ok {
+			logger.Errorw(ctx, "unexpected-event-type", log.Fields{
+				"headerType": event.Header.Type,
+				"actualType": fmt.Sprintf("%T", event.EventType),
+			})
+			return
+		}
+
+		//TODO: map other events to ONU state changes
+		switch devEvent.DeviceEvent.DeviceEventName {
+		case eventNameOnuActivated:
+			logger.Debugw(ctx, "onu-activated-event-received", log.Fields{
+				"header":      event.Header,
+				"deviceEvent": devEvent.DeviceEvent,
+			})
+
+			if err := p.sendOnuActivatedNotification(ctx, event.Header, devEvent.DeviceEvent); err != nil {
+				logger.Errorw(ctx, "failed-to-send-onu-activated-notification", log.Fields{"err": err})
+			}
+		}
+	}
+}
+
+//Sends a notification based on the content of the received device event
+func (p *SysrepoPlugin) sendOnuActivatedNotification(ctx context.Context, eventHeader *voltha.EventHeader, deviceEvent *voltha.DeviceEvent) error {
+	//Prepare the content of the notification
+	notificationItems, channelTermItems, err := core.TranslateOnuActivatedEvent(eventHeader, deviceEvent)
+	if err != nil {
+		return fmt.Errorf("failed-to-translate-onu-activated-event: %v", err)
+	}
+
+	//Create the channel termination in the datastore to make the notification leafref valid
+	channelTermTree, err := createYangTree(ctx, p.operationalSession, channelTermItems)
+	if err != nil {
+		return fmt.Errorf("failed-to-create-channel-termination-tree: %v", err)
+	}
+	defer C.lyd_free_all(channelTermTree)
+
+	err = editDatastore(ctx, p.operationalSession, channelTermTree)
+	if err != nil {
+		return fmt.Errorf("failed-to-apply-channel-termination-to-datastore: %v", err)
+	}
+
+	//Create the notification tree
+	notificationTree, err := createYangTree(ctx, p.operationalSession, notificationItems)
+	if err != nil {
+		return fmt.Errorf("failed-to-create-onu-activated-notification-tree: %v", err)
+	}
+
+	//Let sysrepo manage the notification tree to properly free it after its delivery
+	var notificationData *C.sr_data_t
+	errCode := C.sr_acquire_data(p.connection, notificationTree, &notificationData)
+	if errCode != C.SR_ERR_OK {
+		err := fmt.Errorf("cannot-acquire-notification-data")
+		logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+		return err
+	}
+	defer C.sr_release_data(notificationData)
+
+	//Send the notification
+	logger.Infow(ctx, "sending-onu-activated-notification", log.Fields{
+		"onuSn": deviceEvent.Context["serial-number"],
+	})
+	errCode = C.sr_notif_send_tree(p.operationalSession, notificationData.tree, 0, 0)
+	if errCode != C.SR_ERR_OK {
+		err := fmt.Errorf("cannot-send-notification")
+		logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+		return err
+	}
+
+	return nil
+}
diff --git a/internal/sysrepo/plugin.c b/internal/sysrepo/plugin.c
index 08e7978..f9f53c6 100644
--- a/internal/sysrepo/plugin.c
+++ b/internal/sysrepo/plugin.c
@@ -25,6 +25,9 @@
 typedef struct lyd_node lyd_node;
 typedef struct ly_ctx ly_ctx;
 
+//Used to define the datastore edit mode
+const char* mergeOperation = "merge";
+
 //Provides data for the schema-mount extension
 LY_ERR mountpoint_ext_data_clb(
     const struct lysc_ext_instance *ext,
@@ -37,7 +40,7 @@
     return LY_SUCCESS;
 }
 
-// Exported by sysrepo.go
+// Exported by callbacks.go
 sr_error_t get_devices_cb(sr_session_ctx_t *session, lyd_node **parent);
 
 //The wrapper functions are needed because CGO cannot express some keywords
diff --git a/internal/sysrepo/sysrepo.go b/internal/sysrepo/sysrepo.go
index 3a4dc79..973dbb9 100644
--- a/internal/sysrepo/sysrepo.go
+++ b/internal/sysrepo/sysrepo.go
@@ -31,73 +31,16 @@
 )
 
 type SysrepoPlugin struct {
-	connection      *C.sr_conn_ctx_t
-	session         *C.sr_session_ctx_t
-	subscription    *C.sr_subscription_ctx_t
-	schemaMountData *C.lyd_node
-}
-
-func srErrorMsg(code C.int) string {
-	return C.GoString(C.sr_strerror(code))
-}
-
-func lyErrorMsg(ly_ctx *C.ly_ctx) string {
-	lyErrString := C.ly_errmsg(ly_ctx)
-	defer freeCString(lyErrString)
-
-	return C.GoString(lyErrString)
-}
-
-func freeCString(str *C.char) {
-	if str != nil {
-		C.free(unsafe.Pointer(str))
-		str = nil
-	}
-}
-
-func updateYangItems(ctx context.Context, session *C.sr_session_ctx_t, parent **C.lyd_node, items []core.YangItem) error {
-	conn := C.sr_session_get_connection(session)
-	if conn == nil {
-		return fmt.Errorf("null-connection")
-	}
-
-	//libyang context
-	ly_ctx := C.sr_acquire_context(conn)
-	defer C.sr_release_context(conn)
-	if ly_ctx == nil {
-		return fmt.Errorf("null-libyang-context")
-	}
-
-	for _, item := range items {
-		if item.Value == "" {
-			continue
-		}
-
-		logger.Debugw(ctx, "updating-yang-item", log.Fields{"item": item})
-
-		path := C.CString(item.Path)
-		value := C.CString(item.Value)
-
-		lyErr := C.lyd_new_path(*parent, ly_ctx, path, value, 0, nil)
-		if lyErr != C.LY_SUCCESS {
-			freeCString(path)
-			freeCString(value)
-
-			err := fmt.Errorf("libyang-new-path-failed: %d %s", lyErr, lyErrorMsg(ly_ctx))
-
-			return err
-		}
-
-		freeCString(path)
-		freeCString(value)
-	}
-
-	return nil
+	connection         *C.sr_conn_ctx_t
+	operationalSession *C.sr_session_ctx_t
+	runningSession     *C.sr_session_ctx_t
+	subscription       *C.sr_subscription_ctx_t
+	schemaMountData    *C.lyd_node
 }
 
 //createPluginState populates a SysrepoPlugin struct by establishing
 //a connection and a session
-func (p *SysrepoPlugin) createSession(ctx context.Context) error {
+func (p *SysrepoPlugin) createSessions(ctx context.Context) error {
 	var errCode C.int
 
 	//Populates connection
@@ -108,10 +51,23 @@
 		return err
 	}
 
-	//Populates session
-	errCode = C.sr_session_start(p.connection, C.SR_DS_RUNNING, &p.session)
+	//Populates sessions
+	//The session on the operation datastore will be used for most operations
+	//The session on the running datastore will be used for the subscription to edits
+	//since the operational datastore can't be edited by the client
+	errCode = C.sr_session_start(p.connection, C.SR_DS_OPERATIONAL, &p.operationalSession)
 	if errCode != C.SR_ERR_OK {
-		err := fmt.Errorf("sysrepo-session-error")
+		err := fmt.Errorf("sysrepo-operational-session-error")
+		logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+
+		_ = p.Stop(ctx)
+
+		return err
+	}
+
+	errCode = C.sr_session_start(p.connection, C.SR_DS_RUNNING, &p.runningSession)
+	if errCode != C.SR_ERR_OK {
+		err := fmt.Errorf("sysrepo-running-session-error")
 		logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
 
 		_ = p.Stop(ctx)
@@ -122,49 +78,20 @@
 	return nil
 }
 
-//export get_devices_cb
-func get_devices_cb(session *C.sr_session_ctx_t, parent **C.lyd_node) C.sr_error_t {
-	//This function is a callback for the retrieval of devices from sysrepo
-	//The "export" comment instructs CGO to create a C function for it
-
-	ctx := context.Background()
-	logger.Debug(ctx, "processing-get-data-request")
-
-	if session == nil {
-		logger.Error(ctx, "sysrepo-get-data-null-session")
-		return C.SR_ERR_OPERATION_FAILED
-	}
-
-	if parent == nil {
-		logger.Error(ctx, "sysrepo-get-data-null-parent-node")
-		return C.SR_ERR_OPERATION_FAILED
-	}
-
-	if core.AdapterInstance == nil {
-		logger.Error(ctx, "sysrepo-get-data-nil-translator")
-		return C.SR_ERR_OPERATION_FAILED
-	}
-
-	devices, err := core.AdapterInstance.GetDevices(ctx)
-	if err != nil {
-		logger.Errorw(ctx, "sysrepo-get-data-translator-error", log.Fields{"err": err})
-		return C.SR_ERR_OPERATION_FAILED
-	}
-
-	err = updateYangItems(ctx, session, parent, devices)
-	if err != nil {
-		logger.Errorw(ctx, "sysrepo-get-data-update-error", log.Fields{"err": err})
-		return C.SR_ERR_OPERATION_FAILED
-	}
-
-	return C.SR_ERR_OK
-}
-
 func StartNewPlugin(ctx context.Context, schemaMountFilePath string) (*SysrepoPlugin, error) {
 	plugin := &SysrepoPlugin{}
 
+	//Set sysrepo and libyang log level
+	if logger.GetLogLevel() == log.DebugLevel {
+		C.sr_log_stderr(C.SR_LL_INF)
+		C.ly_log_level(C.LY_LLVRB)
+	} else {
+		C.sr_log_stderr(C.SR_LL_ERR)
+		C.ly_log_level(C.LY_LLERR)
+	}
+
 	//Open a session to sysrepo
-	err := plugin.createSession(ctx)
+	err := plugin.createSessions(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -201,16 +128,15 @@
 	//Set callbacks for events
 
 	//Subscribe with a callback to the request of data on a certain path
-	module := C.CString(core.DeviceAggregationModel)
-	defer freeCString(module)
-
-	path := C.CString(core.DevicesPath + "/*")
-	defer freeCString(path)
+	devicesModule := C.CString(core.DeviceAggregationModule)
+	devicesPath := C.CString(core.DevicesPath + "/*")
+	defer freeCString(devicesModule)
+	defer freeCString(devicesPath)
 
 	errCode := C.sr_oper_get_subscribe(
-		plugin.session,
-		module,
-		path,
+		plugin.operationalSession,
+		devicesModule,
+		devicesPath,
 		C.function(C.get_devices_cb_wrapper),
 		C.NULL,
 		C.SR_SUBSCR_DEFAULT,
@@ -244,15 +170,25 @@
 		p.subscription = nil
 	}
 
-	//Frees session
-	if p.session != nil {
-		errCode = C.sr_session_stop(p.session)
+	//Frees sessions
+	if p.operationalSession != nil {
+		errCode = C.sr_session_stop(p.operationalSession)
 		if errCode != C.SR_ERR_OK {
-			err := fmt.Errorf("failed-to-close-sysrepo-session")
+			err := fmt.Errorf("failed-to-close-operational-session")
 			logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
 			return err
 		}
-		p.session = nil
+		p.operationalSession = nil
+	}
+
+	if p.runningSession != nil {
+		errCode = C.sr_session_stop(p.runningSession)
+		if errCode != C.SR_ERR_OK {
+			err := fmt.Errorf("failed-to-close-running-session")
+			logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+			return err
+		}
+		p.runningSession = nil
 	}
 
 	//Frees connection
diff --git a/internal/sysrepo/utils.go b/internal/sysrepo/utils.go
new file mode 100644
index 0000000..d564dc7
--- /dev/null
+++ b/internal/sysrepo/utils.go
@@ -0,0 +1,173 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+
+* http://www.apache.org/licenses/LICENSE-2.0
+
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sysrepo
+
+//#cgo LDFLAGS: -lsysrepo -lyang -Wl,--allow-multiple-definition
+//#include "plugin.c"
+import "C"
+import (
+	"context"
+	"fmt"
+	"unsafe"
+
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-northbound-bbf-adapter/internal/core"
+)
+
+//srErrorMsg provides a description of a sysrepo error code
+func srErrorMsg(code C.int) string {
+	return C.GoString(C.sr_strerror(code))
+}
+
+//lyErrorMsg provides the last libyang error message
+func lyErrorMsg(ly_ctx *C.ly_ctx) string {
+	lyErrString := C.ly_errmsg(ly_ctx)
+	defer freeCString(lyErrString)
+
+	return C.GoString(lyErrString)
+}
+
+func freeCString(str *C.char) {
+	if str != nil {
+		C.free(unsafe.Pointer(str))
+		str = nil
+	}
+}
+
+//Creates a new libyang nodes tree from a set of new paths.
+//The tree must bee manually freed after its use with C.lyd_free_all or
+//an equivalent function
+func createYangTree(ctx context.Context, session *C.sr_session_ctx_t, items []core.YangItem) (*C.lyd_node, error) {
+	if len(items) == 0 {
+		return nil, fmt.Errorf("no-items")
+	}
+
+	conn := C.sr_session_get_connection(session)
+	if conn == nil {
+		return nil, fmt.Errorf("null-connection")
+	}
+
+	//libyang context
+	ly_ctx := C.sr_acquire_context(conn)
+	if ly_ctx == nil {
+		return nil, fmt.Errorf("null-libyang-context")
+	}
+	defer C.sr_release_context(conn)
+
+	//Create parent node
+	parentPath := C.CString(items[0].Path)
+	parentValue := C.CString(items[0].Value)
+
+	var parent *C.lyd_node
+	lyErr := C.lyd_new_path(nil, ly_ctx, parentPath, parentValue, 0, &parent)
+	if lyErr != C.LY_SUCCESS {
+		err := fmt.Errorf("libyang-new-path-failed: %d %s", lyErr, lyErrorMsg(ly_ctx))
+		return nil, err
+	}
+	logger.Debugw(ctx, "creating-yang-item", log.Fields{"item": items[0]})
+
+	freeCString(parentPath)
+	freeCString(parentValue)
+
+	//Add remaining nodes
+	for _, item := range items[1:] {
+		logger.Debugw(ctx, "creating-yang-item", log.Fields{"item": item})
+
+		path := C.CString(item.Path)
+		value := C.CString(item.Value)
+
+		lyErr := C.lyd_new_path(parent, ly_ctx, path, value, 0, nil)
+		if lyErr != C.LY_SUCCESS {
+			freeCString(path)
+			freeCString(value)
+
+			//Free the partially created tree
+			C.lyd_free_all(parent)
+
+			err := fmt.Errorf("libyang-new-path-failed: %d %s", lyErr, lyErrorMsg(ly_ctx))
+
+			return nil, err
+		}
+
+		freeCString(path)
+		freeCString(value)
+	}
+
+	return parent, nil
+}
+
+//Creates a set of new paths under an existing libyang tree parent node
+func updateYangTree(ctx context.Context, session *C.sr_session_ctx_t, parent **C.lyd_node, items []core.YangItem) error {
+	if len(items) == 0 {
+		//Nothing to do
+		return nil
+	}
+
+	conn := C.sr_session_get_connection(session)
+	if conn == nil {
+		return fmt.Errorf("null-connection")
+	}
+
+	//libyang context
+	ly_ctx := C.sr_acquire_context(conn)
+	if ly_ctx == nil {
+		return fmt.Errorf("null-libyang-context")
+	}
+	defer C.sr_release_context(conn)
+
+	for _, item := range items {
+		logger.Debugw(ctx, "updating-yang-item", log.Fields{"item": item})
+
+		path := C.CString(item.Path)
+		value := C.CString(item.Value)
+
+		lyErr := C.lyd_new_path(*parent, ly_ctx, path, value, 0, nil)
+		if lyErr != C.LY_SUCCESS {
+			freeCString(path)
+			freeCString(value)
+
+			err := fmt.Errorf("libyang-new-path-failed: %d %s", lyErr, lyErrorMsg(ly_ctx))
+
+			return err
+		}
+
+		freeCString(path)
+		freeCString(value)
+	}
+
+	return nil
+}
+
+//Merges the content of a yang tree with the content of the datastore.
+//The target datastore is the one on which the session has been created
+func editDatastore(ctx context.Context, session *C.sr_session_ctx_t, editsTree *C.lyd_node) error {
+	errCode := C.sr_edit_batch(session, editsTree, C.mergeOperation)
+	if errCode != C.SR_ERR_OK {
+		err := fmt.Errorf("failed-to-edit-datastore")
+		logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+		return err
+	}
+
+	errCode = C.sr_apply_changes(session, 0)
+	if errCode != C.SR_ERR_OK {
+		err := fmt.Errorf("failed-to-apply-datastore-changes")
+		logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+		return err
+	}
+
+	return nil
+}