Add NETCONF notification for ONU activation and Kafka client to receive events, update dependencies
Change-Id: I5f768fa8077ef7c64e00a534744ca47492344935
diff --git a/internal/sysrepo/sysrepo.go b/internal/sysrepo/sysrepo.go
index 3a4dc79..973dbb9 100644
--- a/internal/sysrepo/sysrepo.go
+++ b/internal/sysrepo/sysrepo.go
@@ -31,73 +31,16 @@
)
type SysrepoPlugin struct {
- connection *C.sr_conn_ctx_t
- session *C.sr_session_ctx_t
- subscription *C.sr_subscription_ctx_t
- schemaMountData *C.lyd_node
-}
-
-func srErrorMsg(code C.int) string {
- return C.GoString(C.sr_strerror(code))
-}
-
-func lyErrorMsg(ly_ctx *C.ly_ctx) string {
- lyErrString := C.ly_errmsg(ly_ctx)
- defer freeCString(lyErrString)
-
- return C.GoString(lyErrString)
-}
-
-func freeCString(str *C.char) {
- if str != nil {
- C.free(unsafe.Pointer(str))
- str = nil
- }
-}
-
-func updateYangItems(ctx context.Context, session *C.sr_session_ctx_t, parent **C.lyd_node, items []core.YangItem) error {
- conn := C.sr_session_get_connection(session)
- if conn == nil {
- return fmt.Errorf("null-connection")
- }
-
- //libyang context
- ly_ctx := C.sr_acquire_context(conn)
- defer C.sr_release_context(conn)
- if ly_ctx == nil {
- return fmt.Errorf("null-libyang-context")
- }
-
- for _, item := range items {
- if item.Value == "" {
- continue
- }
-
- logger.Debugw(ctx, "updating-yang-item", log.Fields{"item": item})
-
- path := C.CString(item.Path)
- value := C.CString(item.Value)
-
- lyErr := C.lyd_new_path(*parent, ly_ctx, path, value, 0, nil)
- if lyErr != C.LY_SUCCESS {
- freeCString(path)
- freeCString(value)
-
- err := fmt.Errorf("libyang-new-path-failed: %d %s", lyErr, lyErrorMsg(ly_ctx))
-
- return err
- }
-
- freeCString(path)
- freeCString(value)
- }
-
- return nil
+ connection *C.sr_conn_ctx_t
+ operationalSession *C.sr_session_ctx_t
+ runningSession *C.sr_session_ctx_t
+ subscription *C.sr_subscription_ctx_t
+ schemaMountData *C.lyd_node
}
//createPluginState populates a SysrepoPlugin struct by establishing
//a connection and a session
-func (p *SysrepoPlugin) createSession(ctx context.Context) error {
+func (p *SysrepoPlugin) createSessions(ctx context.Context) error {
var errCode C.int
//Populates connection
@@ -108,10 +51,23 @@
return err
}
- //Populates session
- errCode = C.sr_session_start(p.connection, C.SR_DS_RUNNING, &p.session)
+ //Populates sessions
+ //The session on the operation datastore will be used for most operations
+ //The session on the running datastore will be used for the subscription to edits
+ //since the operational datastore can't be edited by the client
+ errCode = C.sr_session_start(p.connection, C.SR_DS_OPERATIONAL, &p.operationalSession)
if errCode != C.SR_ERR_OK {
- err := fmt.Errorf("sysrepo-session-error")
+ err := fmt.Errorf("sysrepo-operational-session-error")
+ logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+
+ _ = p.Stop(ctx)
+
+ return err
+ }
+
+ errCode = C.sr_session_start(p.connection, C.SR_DS_RUNNING, &p.runningSession)
+ if errCode != C.SR_ERR_OK {
+ err := fmt.Errorf("sysrepo-running-session-error")
logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
_ = p.Stop(ctx)
@@ -122,49 +78,20 @@
return nil
}
-//export get_devices_cb
-func get_devices_cb(session *C.sr_session_ctx_t, parent **C.lyd_node) C.sr_error_t {
- //This function is a callback for the retrieval of devices from sysrepo
- //The "export" comment instructs CGO to create a C function for it
-
- ctx := context.Background()
- logger.Debug(ctx, "processing-get-data-request")
-
- if session == nil {
- logger.Error(ctx, "sysrepo-get-data-null-session")
- return C.SR_ERR_OPERATION_FAILED
- }
-
- if parent == nil {
- logger.Error(ctx, "sysrepo-get-data-null-parent-node")
- return C.SR_ERR_OPERATION_FAILED
- }
-
- if core.AdapterInstance == nil {
- logger.Error(ctx, "sysrepo-get-data-nil-translator")
- return C.SR_ERR_OPERATION_FAILED
- }
-
- devices, err := core.AdapterInstance.GetDevices(ctx)
- if err != nil {
- logger.Errorw(ctx, "sysrepo-get-data-translator-error", log.Fields{"err": err})
- return C.SR_ERR_OPERATION_FAILED
- }
-
- err = updateYangItems(ctx, session, parent, devices)
- if err != nil {
- logger.Errorw(ctx, "sysrepo-get-data-update-error", log.Fields{"err": err})
- return C.SR_ERR_OPERATION_FAILED
- }
-
- return C.SR_ERR_OK
-}
-
func StartNewPlugin(ctx context.Context, schemaMountFilePath string) (*SysrepoPlugin, error) {
plugin := &SysrepoPlugin{}
+ //Set sysrepo and libyang log level
+ if logger.GetLogLevel() == log.DebugLevel {
+ C.sr_log_stderr(C.SR_LL_INF)
+ C.ly_log_level(C.LY_LLVRB)
+ } else {
+ C.sr_log_stderr(C.SR_LL_ERR)
+ C.ly_log_level(C.LY_LLERR)
+ }
+
//Open a session to sysrepo
- err := plugin.createSession(ctx)
+ err := plugin.createSessions(ctx)
if err != nil {
return nil, err
}
@@ -201,16 +128,15 @@
//Set callbacks for events
//Subscribe with a callback to the request of data on a certain path
- module := C.CString(core.DeviceAggregationModel)
- defer freeCString(module)
-
- path := C.CString(core.DevicesPath + "/*")
- defer freeCString(path)
+ devicesModule := C.CString(core.DeviceAggregationModule)
+ devicesPath := C.CString(core.DevicesPath + "/*")
+ defer freeCString(devicesModule)
+ defer freeCString(devicesPath)
errCode := C.sr_oper_get_subscribe(
- plugin.session,
- module,
- path,
+ plugin.operationalSession,
+ devicesModule,
+ devicesPath,
C.function(C.get_devices_cb_wrapper),
C.NULL,
C.SR_SUBSCR_DEFAULT,
@@ -244,15 +170,25 @@
p.subscription = nil
}
- //Frees session
- if p.session != nil {
- errCode = C.sr_session_stop(p.session)
+ //Frees sessions
+ if p.operationalSession != nil {
+ errCode = C.sr_session_stop(p.operationalSession)
if errCode != C.SR_ERR_OK {
- err := fmt.Errorf("failed-to-close-sysrepo-session")
+ err := fmt.Errorf("failed-to-close-operational-session")
logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
return err
}
- p.session = nil
+ p.operationalSession = nil
+ }
+
+ if p.runningSession != nil {
+ errCode = C.sr_session_stop(p.runningSession)
+ if errCode != C.SR_ERR_OK {
+ err := fmt.Errorf("failed-to-close-running-session")
+ logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+ return err
+ }
+ p.runningSession = nil
}
//Frees connection