Add NETCONF notification for ONU activation and Kafka client to receive events, update dependencies
Change-Id: I5f768fa8077ef7c64e00a534744ca47492344935
diff --git a/internal/sysrepo/callbacks.go b/internal/sysrepo/callbacks.go
new file mode 100644
index 0000000..1256568
--- /dev/null
+++ b/internal/sysrepo/callbacks.go
@@ -0,0 +1,67 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+
+* http://www.apache.org/licenses/LICENSE-2.0
+
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sysrepo
+
+//#cgo LDFLAGS: -lsysrepo -lyang -Wl,--allow-multiple-definition
+//#include "plugin.c"
+import "C"
+import (
+ "context"
+
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-northbound-bbf-adapter/internal/core"
+)
+
+//export get_devices_cb
+func get_devices_cb(session *C.sr_session_ctx_t, parent **C.lyd_node) C.sr_error_t {
+ //This function is a callback for the retrieval of devices from sysrepo
+ //The "export" comment instructs CGO to create a C function for it
+
+ ctx := context.Background()
+ logger.Debug(ctx, "processing-get-devices-request")
+
+ if session == nil {
+ logger.Error(ctx, "sysrepo-get-devices-null-session")
+ return C.SR_ERR_OPERATION_FAILED
+ }
+
+ if parent == nil {
+ logger.Error(ctx, "sysrepo-get-devices-null-parent-node")
+ return C.SR_ERR_OPERATION_FAILED
+ }
+
+ if core.AdapterInstance == nil {
+ logger.Error(ctx, "sysrepo-get-devices-nil-translator")
+ return C.SR_ERR_OPERATION_FAILED
+ }
+
+ devices, err := core.AdapterInstance.GetDevices(ctx)
+ if err != nil {
+ logger.Errorw(ctx, "sysrepo-get-devices-translator-error", log.Fields{"err": err})
+ return C.SR_ERR_OPERATION_FAILED
+ }
+
+ err = updateYangTree(ctx, session, parent, devices)
+ if err != nil {
+ logger.Errorw(ctx, "sysrepo-get-devices-update-error", log.Fields{"err": err})
+ return C.SR_ERR_OPERATION_FAILED
+ }
+
+ logger.Info(ctx, "devices-information-request-served")
+
+ return C.SR_ERR_OK
+}
diff --git a/internal/sysrepo/events.go b/internal/sysrepo/events.go
new file mode 100644
index 0000000..4ababe3
--- /dev/null
+++ b/internal/sysrepo/events.go
@@ -0,0 +1,110 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+
+* http://www.apache.org/licenses/LICENSE-2.0
+
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sysrepo
+
+//#cgo LDFLAGS: -lsysrepo -lyang -Wl,--allow-multiple-definition
+//#include "plugin.c"
+import "C"
+import (
+ "context"
+ "fmt"
+
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-northbound-bbf-adapter/internal/core"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
+)
+
+const (
+ eventNameOnuActivated = "ONU_ACTIVATED_RAISE_EVENT"
+)
+
+//Performs the necessary operations on a new voltha event received from Kafka
+func (p *SysrepoPlugin) ManageVolthaEvent(ctx context.Context, event *voltha.Event) {
+ if event.Header.Type == voltha.EventType_DEVICE_EVENT {
+ devEvent, ok := event.EventType.(*voltha.Event_DeviceEvent)
+ if !ok {
+ logger.Errorw(ctx, "unexpected-event-type", log.Fields{
+ "headerType": event.Header.Type,
+ "actualType": fmt.Sprintf("%T", event.EventType),
+ })
+ return
+ }
+
+ //TODO: map other events to ONU state changes
+ switch devEvent.DeviceEvent.DeviceEventName {
+ case eventNameOnuActivated:
+ logger.Debugw(ctx, "onu-activated-event-received", log.Fields{
+ "header": event.Header,
+ "deviceEvent": devEvent.DeviceEvent,
+ })
+
+ if err := p.sendOnuActivatedNotification(ctx, event.Header, devEvent.DeviceEvent); err != nil {
+ logger.Errorw(ctx, "failed-to-send-onu-activated-notification", log.Fields{"err": err})
+ }
+ }
+ }
+}
+
+//Sends a notification based on the content of the received device event
+func (p *SysrepoPlugin) sendOnuActivatedNotification(ctx context.Context, eventHeader *voltha.EventHeader, deviceEvent *voltha.DeviceEvent) error {
+ //Prepare the content of the notification
+ notificationItems, channelTermItems, err := core.TranslateOnuActivatedEvent(eventHeader, deviceEvent)
+ if err != nil {
+ return fmt.Errorf("failed-to-translate-onu-activated-event: %v", err)
+ }
+
+ //Create the channel termination in the datastore to make the notification leafref valid
+ channelTermTree, err := createYangTree(ctx, p.operationalSession, channelTermItems)
+ if err != nil {
+ return fmt.Errorf("failed-to-create-channel-termination-tree: %v", err)
+ }
+ defer C.lyd_free_all(channelTermTree)
+
+ err = editDatastore(ctx, p.operationalSession, channelTermTree)
+ if err != nil {
+ return fmt.Errorf("failed-to-apply-channel-termination-to-datastore: %v", err)
+ }
+
+ //Create the notification tree
+ notificationTree, err := createYangTree(ctx, p.operationalSession, notificationItems)
+ if err != nil {
+ return fmt.Errorf("failed-to-create-onu-activated-notification-tree: %v", err)
+ }
+
+ //Let sysrepo manage the notification tree to properly free it after its delivery
+ var notificationData *C.sr_data_t
+ errCode := C.sr_acquire_data(p.connection, notificationTree, ¬ificationData)
+ if errCode != C.SR_ERR_OK {
+ err := fmt.Errorf("cannot-acquire-notification-data")
+ logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+ return err
+ }
+ defer C.sr_release_data(notificationData)
+
+ //Send the notification
+ logger.Infow(ctx, "sending-onu-activated-notification", log.Fields{
+ "onuSn": deviceEvent.Context["serial-number"],
+ })
+ errCode = C.sr_notif_send_tree(p.operationalSession, notificationData.tree, 0, 0)
+ if errCode != C.SR_ERR_OK {
+ err := fmt.Errorf("cannot-send-notification")
+ logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+ return err
+ }
+
+ return nil
+}
diff --git a/internal/sysrepo/plugin.c b/internal/sysrepo/plugin.c
index 08e7978..f9f53c6 100644
--- a/internal/sysrepo/plugin.c
+++ b/internal/sysrepo/plugin.c
@@ -25,6 +25,9 @@
typedef struct lyd_node lyd_node;
typedef struct ly_ctx ly_ctx;
+//Used to define the datastore edit mode
+const char* mergeOperation = "merge";
+
//Provides data for the schema-mount extension
LY_ERR mountpoint_ext_data_clb(
const struct lysc_ext_instance *ext,
@@ -37,7 +40,7 @@
return LY_SUCCESS;
}
-// Exported by sysrepo.go
+// Exported by callbacks.go
sr_error_t get_devices_cb(sr_session_ctx_t *session, lyd_node **parent);
//The wrapper functions are needed because CGO cannot express some keywords
diff --git a/internal/sysrepo/sysrepo.go b/internal/sysrepo/sysrepo.go
index 3a4dc79..973dbb9 100644
--- a/internal/sysrepo/sysrepo.go
+++ b/internal/sysrepo/sysrepo.go
@@ -31,73 +31,16 @@
)
type SysrepoPlugin struct {
- connection *C.sr_conn_ctx_t
- session *C.sr_session_ctx_t
- subscription *C.sr_subscription_ctx_t
- schemaMountData *C.lyd_node
-}
-
-func srErrorMsg(code C.int) string {
- return C.GoString(C.sr_strerror(code))
-}
-
-func lyErrorMsg(ly_ctx *C.ly_ctx) string {
- lyErrString := C.ly_errmsg(ly_ctx)
- defer freeCString(lyErrString)
-
- return C.GoString(lyErrString)
-}
-
-func freeCString(str *C.char) {
- if str != nil {
- C.free(unsafe.Pointer(str))
- str = nil
- }
-}
-
-func updateYangItems(ctx context.Context, session *C.sr_session_ctx_t, parent **C.lyd_node, items []core.YangItem) error {
- conn := C.sr_session_get_connection(session)
- if conn == nil {
- return fmt.Errorf("null-connection")
- }
-
- //libyang context
- ly_ctx := C.sr_acquire_context(conn)
- defer C.sr_release_context(conn)
- if ly_ctx == nil {
- return fmt.Errorf("null-libyang-context")
- }
-
- for _, item := range items {
- if item.Value == "" {
- continue
- }
-
- logger.Debugw(ctx, "updating-yang-item", log.Fields{"item": item})
-
- path := C.CString(item.Path)
- value := C.CString(item.Value)
-
- lyErr := C.lyd_new_path(*parent, ly_ctx, path, value, 0, nil)
- if lyErr != C.LY_SUCCESS {
- freeCString(path)
- freeCString(value)
-
- err := fmt.Errorf("libyang-new-path-failed: %d %s", lyErr, lyErrorMsg(ly_ctx))
-
- return err
- }
-
- freeCString(path)
- freeCString(value)
- }
-
- return nil
+ connection *C.sr_conn_ctx_t
+ operationalSession *C.sr_session_ctx_t
+ runningSession *C.sr_session_ctx_t
+ subscription *C.sr_subscription_ctx_t
+ schemaMountData *C.lyd_node
}
//createPluginState populates a SysrepoPlugin struct by establishing
//a connection and a session
-func (p *SysrepoPlugin) createSession(ctx context.Context) error {
+func (p *SysrepoPlugin) createSessions(ctx context.Context) error {
var errCode C.int
//Populates connection
@@ -108,10 +51,23 @@
return err
}
- //Populates session
- errCode = C.sr_session_start(p.connection, C.SR_DS_RUNNING, &p.session)
+ //Populates sessions
+ //The session on the operation datastore will be used for most operations
+ //The session on the running datastore will be used for the subscription to edits
+ //since the operational datastore can't be edited by the client
+ errCode = C.sr_session_start(p.connection, C.SR_DS_OPERATIONAL, &p.operationalSession)
if errCode != C.SR_ERR_OK {
- err := fmt.Errorf("sysrepo-session-error")
+ err := fmt.Errorf("sysrepo-operational-session-error")
+ logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+
+ _ = p.Stop(ctx)
+
+ return err
+ }
+
+ errCode = C.sr_session_start(p.connection, C.SR_DS_RUNNING, &p.runningSession)
+ if errCode != C.SR_ERR_OK {
+ err := fmt.Errorf("sysrepo-running-session-error")
logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
_ = p.Stop(ctx)
@@ -122,49 +78,20 @@
return nil
}
-//export get_devices_cb
-func get_devices_cb(session *C.sr_session_ctx_t, parent **C.lyd_node) C.sr_error_t {
- //This function is a callback for the retrieval of devices from sysrepo
- //The "export" comment instructs CGO to create a C function for it
-
- ctx := context.Background()
- logger.Debug(ctx, "processing-get-data-request")
-
- if session == nil {
- logger.Error(ctx, "sysrepo-get-data-null-session")
- return C.SR_ERR_OPERATION_FAILED
- }
-
- if parent == nil {
- logger.Error(ctx, "sysrepo-get-data-null-parent-node")
- return C.SR_ERR_OPERATION_FAILED
- }
-
- if core.AdapterInstance == nil {
- logger.Error(ctx, "sysrepo-get-data-nil-translator")
- return C.SR_ERR_OPERATION_FAILED
- }
-
- devices, err := core.AdapterInstance.GetDevices(ctx)
- if err != nil {
- logger.Errorw(ctx, "sysrepo-get-data-translator-error", log.Fields{"err": err})
- return C.SR_ERR_OPERATION_FAILED
- }
-
- err = updateYangItems(ctx, session, parent, devices)
- if err != nil {
- logger.Errorw(ctx, "sysrepo-get-data-update-error", log.Fields{"err": err})
- return C.SR_ERR_OPERATION_FAILED
- }
-
- return C.SR_ERR_OK
-}
-
func StartNewPlugin(ctx context.Context, schemaMountFilePath string) (*SysrepoPlugin, error) {
plugin := &SysrepoPlugin{}
+ //Set sysrepo and libyang log level
+ if logger.GetLogLevel() == log.DebugLevel {
+ C.sr_log_stderr(C.SR_LL_INF)
+ C.ly_log_level(C.LY_LLVRB)
+ } else {
+ C.sr_log_stderr(C.SR_LL_ERR)
+ C.ly_log_level(C.LY_LLERR)
+ }
+
//Open a session to sysrepo
- err := plugin.createSession(ctx)
+ err := plugin.createSessions(ctx)
if err != nil {
return nil, err
}
@@ -201,16 +128,15 @@
//Set callbacks for events
//Subscribe with a callback to the request of data on a certain path
- module := C.CString(core.DeviceAggregationModel)
- defer freeCString(module)
-
- path := C.CString(core.DevicesPath + "/*")
- defer freeCString(path)
+ devicesModule := C.CString(core.DeviceAggregationModule)
+ devicesPath := C.CString(core.DevicesPath + "/*")
+ defer freeCString(devicesModule)
+ defer freeCString(devicesPath)
errCode := C.sr_oper_get_subscribe(
- plugin.session,
- module,
- path,
+ plugin.operationalSession,
+ devicesModule,
+ devicesPath,
C.function(C.get_devices_cb_wrapper),
C.NULL,
C.SR_SUBSCR_DEFAULT,
@@ -244,15 +170,25 @@
p.subscription = nil
}
- //Frees session
- if p.session != nil {
- errCode = C.sr_session_stop(p.session)
+ //Frees sessions
+ if p.operationalSession != nil {
+ errCode = C.sr_session_stop(p.operationalSession)
if errCode != C.SR_ERR_OK {
- err := fmt.Errorf("failed-to-close-sysrepo-session")
+ err := fmt.Errorf("failed-to-close-operational-session")
logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
return err
}
- p.session = nil
+ p.operationalSession = nil
+ }
+
+ if p.runningSession != nil {
+ errCode = C.sr_session_stop(p.runningSession)
+ if errCode != C.SR_ERR_OK {
+ err := fmt.Errorf("failed-to-close-running-session")
+ logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+ return err
+ }
+ p.runningSession = nil
}
//Frees connection
diff --git a/internal/sysrepo/utils.go b/internal/sysrepo/utils.go
new file mode 100644
index 0000000..d564dc7
--- /dev/null
+++ b/internal/sysrepo/utils.go
@@ -0,0 +1,173 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+
+* http://www.apache.org/licenses/LICENSE-2.0
+
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sysrepo
+
+//#cgo LDFLAGS: -lsysrepo -lyang -Wl,--allow-multiple-definition
+//#include "plugin.c"
+import "C"
+import (
+ "context"
+ "fmt"
+ "unsafe"
+
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-northbound-bbf-adapter/internal/core"
+)
+
+//srErrorMsg provides a description of a sysrepo error code
+func srErrorMsg(code C.int) string {
+ return C.GoString(C.sr_strerror(code))
+}
+
+//lyErrorMsg provides the last libyang error message
+func lyErrorMsg(ly_ctx *C.ly_ctx) string {
+ lyErrString := C.ly_errmsg(ly_ctx)
+ defer freeCString(lyErrString)
+
+ return C.GoString(lyErrString)
+}
+
+func freeCString(str *C.char) {
+ if str != nil {
+ C.free(unsafe.Pointer(str))
+ str = nil
+ }
+}
+
+//Creates a new libyang nodes tree from a set of new paths.
+//The tree must bee manually freed after its use with C.lyd_free_all or
+//an equivalent function
+func createYangTree(ctx context.Context, session *C.sr_session_ctx_t, items []core.YangItem) (*C.lyd_node, error) {
+ if len(items) == 0 {
+ return nil, fmt.Errorf("no-items")
+ }
+
+ conn := C.sr_session_get_connection(session)
+ if conn == nil {
+ return nil, fmt.Errorf("null-connection")
+ }
+
+ //libyang context
+ ly_ctx := C.sr_acquire_context(conn)
+ if ly_ctx == nil {
+ return nil, fmt.Errorf("null-libyang-context")
+ }
+ defer C.sr_release_context(conn)
+
+ //Create parent node
+ parentPath := C.CString(items[0].Path)
+ parentValue := C.CString(items[0].Value)
+
+ var parent *C.lyd_node
+ lyErr := C.lyd_new_path(nil, ly_ctx, parentPath, parentValue, 0, &parent)
+ if lyErr != C.LY_SUCCESS {
+ err := fmt.Errorf("libyang-new-path-failed: %d %s", lyErr, lyErrorMsg(ly_ctx))
+ return nil, err
+ }
+ logger.Debugw(ctx, "creating-yang-item", log.Fields{"item": items[0]})
+
+ freeCString(parentPath)
+ freeCString(parentValue)
+
+ //Add remaining nodes
+ for _, item := range items[1:] {
+ logger.Debugw(ctx, "creating-yang-item", log.Fields{"item": item})
+
+ path := C.CString(item.Path)
+ value := C.CString(item.Value)
+
+ lyErr := C.lyd_new_path(parent, ly_ctx, path, value, 0, nil)
+ if lyErr != C.LY_SUCCESS {
+ freeCString(path)
+ freeCString(value)
+
+ //Free the partially created tree
+ C.lyd_free_all(parent)
+
+ err := fmt.Errorf("libyang-new-path-failed: %d %s", lyErr, lyErrorMsg(ly_ctx))
+
+ return nil, err
+ }
+
+ freeCString(path)
+ freeCString(value)
+ }
+
+ return parent, nil
+}
+
+//Creates a set of new paths under an existing libyang tree parent node
+func updateYangTree(ctx context.Context, session *C.sr_session_ctx_t, parent **C.lyd_node, items []core.YangItem) error {
+ if len(items) == 0 {
+ //Nothing to do
+ return nil
+ }
+
+ conn := C.sr_session_get_connection(session)
+ if conn == nil {
+ return fmt.Errorf("null-connection")
+ }
+
+ //libyang context
+ ly_ctx := C.sr_acquire_context(conn)
+ if ly_ctx == nil {
+ return fmt.Errorf("null-libyang-context")
+ }
+ defer C.sr_release_context(conn)
+
+ for _, item := range items {
+ logger.Debugw(ctx, "updating-yang-item", log.Fields{"item": item})
+
+ path := C.CString(item.Path)
+ value := C.CString(item.Value)
+
+ lyErr := C.lyd_new_path(*parent, ly_ctx, path, value, 0, nil)
+ if lyErr != C.LY_SUCCESS {
+ freeCString(path)
+ freeCString(value)
+
+ err := fmt.Errorf("libyang-new-path-failed: %d %s", lyErr, lyErrorMsg(ly_ctx))
+
+ return err
+ }
+
+ freeCString(path)
+ freeCString(value)
+ }
+
+ return nil
+}
+
+//Merges the content of a yang tree with the content of the datastore.
+//The target datastore is the one on which the session has been created
+func editDatastore(ctx context.Context, session *C.sr_session_ctx_t, editsTree *C.lyd_node) error {
+ errCode := C.sr_edit_batch(session, editsTree, C.mergeOperation)
+ if errCode != C.SR_ERR_OK {
+ err := fmt.Errorf("failed-to-edit-datastore")
+ logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+ return err
+ }
+
+ errCode = C.sr_apply_changes(session, 0)
+ if errCode != C.SR_ERR_OK {
+ err := fmt.Errorf("failed-to-apply-datastore-changes")
+ logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+ return err
+ }
+
+ return nil
+}