Add NETCONF notification for ONU activation and Kafka client to receive events, update dependencies

Change-Id: I5f768fa8077ef7c64e00a534744ca47492344935
diff --git a/internal/sysrepo/sysrepo.go b/internal/sysrepo/sysrepo.go
index 3a4dc79..973dbb9 100644
--- a/internal/sysrepo/sysrepo.go
+++ b/internal/sysrepo/sysrepo.go
@@ -31,73 +31,16 @@
 )
 
 type SysrepoPlugin struct {
-	connection      *C.sr_conn_ctx_t
-	session         *C.sr_session_ctx_t
-	subscription    *C.sr_subscription_ctx_t
-	schemaMountData *C.lyd_node
-}
-
-func srErrorMsg(code C.int) string {
-	return C.GoString(C.sr_strerror(code))
-}
-
-func lyErrorMsg(ly_ctx *C.ly_ctx) string {
-	lyErrString := C.ly_errmsg(ly_ctx)
-	defer freeCString(lyErrString)
-
-	return C.GoString(lyErrString)
-}
-
-func freeCString(str *C.char) {
-	if str != nil {
-		C.free(unsafe.Pointer(str))
-		str = nil
-	}
-}
-
-func updateYangItems(ctx context.Context, session *C.sr_session_ctx_t, parent **C.lyd_node, items []core.YangItem) error {
-	conn := C.sr_session_get_connection(session)
-	if conn == nil {
-		return fmt.Errorf("null-connection")
-	}
-
-	//libyang context
-	ly_ctx := C.sr_acquire_context(conn)
-	defer C.sr_release_context(conn)
-	if ly_ctx == nil {
-		return fmt.Errorf("null-libyang-context")
-	}
-
-	for _, item := range items {
-		if item.Value == "" {
-			continue
-		}
-
-		logger.Debugw(ctx, "updating-yang-item", log.Fields{"item": item})
-
-		path := C.CString(item.Path)
-		value := C.CString(item.Value)
-
-		lyErr := C.lyd_new_path(*parent, ly_ctx, path, value, 0, nil)
-		if lyErr != C.LY_SUCCESS {
-			freeCString(path)
-			freeCString(value)
-
-			err := fmt.Errorf("libyang-new-path-failed: %d %s", lyErr, lyErrorMsg(ly_ctx))
-
-			return err
-		}
-
-		freeCString(path)
-		freeCString(value)
-	}
-
-	return nil
+	connection         *C.sr_conn_ctx_t
+	operationalSession *C.sr_session_ctx_t
+	runningSession     *C.sr_session_ctx_t
+	subscription       *C.sr_subscription_ctx_t
+	schemaMountData    *C.lyd_node
 }
 
 //createPluginState populates a SysrepoPlugin struct by establishing
 //a connection and a session
-func (p *SysrepoPlugin) createSession(ctx context.Context) error {
+func (p *SysrepoPlugin) createSessions(ctx context.Context) error {
 	var errCode C.int
 
 	//Populates connection
@@ -108,10 +51,23 @@
 		return err
 	}
 
-	//Populates session
-	errCode = C.sr_session_start(p.connection, C.SR_DS_RUNNING, &p.session)
+	//Populates sessions
+	//The session on the operation datastore will be used for most operations
+	//The session on the running datastore will be used for the subscription to edits
+	//since the operational datastore can't be edited by the client
+	errCode = C.sr_session_start(p.connection, C.SR_DS_OPERATIONAL, &p.operationalSession)
 	if errCode != C.SR_ERR_OK {
-		err := fmt.Errorf("sysrepo-session-error")
+		err := fmt.Errorf("sysrepo-operational-session-error")
+		logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+
+		_ = p.Stop(ctx)
+
+		return err
+	}
+
+	errCode = C.sr_session_start(p.connection, C.SR_DS_RUNNING, &p.runningSession)
+	if errCode != C.SR_ERR_OK {
+		err := fmt.Errorf("sysrepo-running-session-error")
 		logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
 
 		_ = p.Stop(ctx)
@@ -122,49 +78,20 @@
 	return nil
 }
 
-//export get_devices_cb
-func get_devices_cb(session *C.sr_session_ctx_t, parent **C.lyd_node) C.sr_error_t {
-	//This function is a callback for the retrieval of devices from sysrepo
-	//The "export" comment instructs CGO to create a C function for it
-
-	ctx := context.Background()
-	logger.Debug(ctx, "processing-get-data-request")
-
-	if session == nil {
-		logger.Error(ctx, "sysrepo-get-data-null-session")
-		return C.SR_ERR_OPERATION_FAILED
-	}
-
-	if parent == nil {
-		logger.Error(ctx, "sysrepo-get-data-null-parent-node")
-		return C.SR_ERR_OPERATION_FAILED
-	}
-
-	if core.AdapterInstance == nil {
-		logger.Error(ctx, "sysrepo-get-data-nil-translator")
-		return C.SR_ERR_OPERATION_FAILED
-	}
-
-	devices, err := core.AdapterInstance.GetDevices(ctx)
-	if err != nil {
-		logger.Errorw(ctx, "sysrepo-get-data-translator-error", log.Fields{"err": err})
-		return C.SR_ERR_OPERATION_FAILED
-	}
-
-	err = updateYangItems(ctx, session, parent, devices)
-	if err != nil {
-		logger.Errorw(ctx, "sysrepo-get-data-update-error", log.Fields{"err": err})
-		return C.SR_ERR_OPERATION_FAILED
-	}
-
-	return C.SR_ERR_OK
-}
-
 func StartNewPlugin(ctx context.Context, schemaMountFilePath string) (*SysrepoPlugin, error) {
 	plugin := &SysrepoPlugin{}
 
+	//Set sysrepo and libyang log level
+	if logger.GetLogLevel() == log.DebugLevel {
+		C.sr_log_stderr(C.SR_LL_INF)
+		C.ly_log_level(C.LY_LLVRB)
+	} else {
+		C.sr_log_stderr(C.SR_LL_ERR)
+		C.ly_log_level(C.LY_LLERR)
+	}
+
 	//Open a session to sysrepo
-	err := plugin.createSession(ctx)
+	err := plugin.createSessions(ctx)
 	if err != nil {
 		return nil, err
 	}
@@ -201,16 +128,15 @@
 	//Set callbacks for events
 
 	//Subscribe with a callback to the request of data on a certain path
-	module := C.CString(core.DeviceAggregationModel)
-	defer freeCString(module)
-
-	path := C.CString(core.DevicesPath + "/*")
-	defer freeCString(path)
+	devicesModule := C.CString(core.DeviceAggregationModule)
+	devicesPath := C.CString(core.DevicesPath + "/*")
+	defer freeCString(devicesModule)
+	defer freeCString(devicesPath)
 
 	errCode := C.sr_oper_get_subscribe(
-		plugin.session,
-		module,
-		path,
+		plugin.operationalSession,
+		devicesModule,
+		devicesPath,
 		C.function(C.get_devices_cb_wrapper),
 		C.NULL,
 		C.SR_SUBSCR_DEFAULT,
@@ -244,15 +170,25 @@
 		p.subscription = nil
 	}
 
-	//Frees session
-	if p.session != nil {
-		errCode = C.sr_session_stop(p.session)
+	//Frees sessions
+	if p.operationalSession != nil {
+		errCode = C.sr_session_stop(p.operationalSession)
 		if errCode != C.SR_ERR_OK {
-			err := fmt.Errorf("failed-to-close-sysrepo-session")
+			err := fmt.Errorf("failed-to-close-operational-session")
 			logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
 			return err
 		}
-		p.session = nil
+		p.operationalSession = nil
+	}
+
+	if p.runningSession != nil {
+		errCode = C.sr_session_stop(p.runningSession)
+		if errCode != C.SR_ERR_OK {
+			err := fmt.Errorf("failed-to-close-running-session")
+			logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+			return err
+		}
+		p.runningSession = nil
 	}
 
 	//Frees connection