Add NETCONF notification for ONU activation and Kafka client to receive events, update dependencies
Change-Id: I5f768fa8077ef7c64e00a534744ca47492344935
diff --git a/internal/clients/kafka.go b/internal/clients/kafka.go
new file mode 100644
index 0000000..9350895
--- /dev/null
+++ b/internal/clients/kafka.go
@@ -0,0 +1,155 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+
+* http://www.apache.org/licenses/LICENSE-2.0
+
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package clients
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/Shopify/sarama"
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
+)
+
+const (
+ volthaEventsTopic = "voltha.events"
+ kafkaBackoffInterval = time.Second * 10
+)
+
+//Used to listen for events coming from VOLTHA
+type KafkaConsumer struct {
+ address string
+ config *sarama.Config
+ client sarama.Client
+ consumer sarama.Consumer
+ partitionConsumer sarama.PartitionConsumer
+ highwater int64
+}
+
+//Creates a sarama client with the specified address
+func NewKafkaConsumer(clusterAddress string) *KafkaConsumer {
+ c := KafkaConsumer{address: clusterAddress}
+ c.config = sarama.NewConfig()
+ c.config.ClientID = "bbf-adapter-consumer"
+ c.config.Consumer.Return.Errors = true
+ c.config.Consumer.Offsets.Initial = sarama.OffsetNewest
+ c.config.Version = sarama.V1_0_0_0
+
+ return &c
+}
+
+//Starts consuming new messages on the voltha events topic, executing the provided callback on each event
+func (c *KafkaConsumer) Start(ctx context.Context, eventCallback func(context.Context, *voltha.Event)) error {
+ var err error
+
+ for {
+ if c.client, err = sarama.NewClient([]string{c.address}, c.config); err == nil {
+ logger.Debug(ctx, "kafka-client-created")
+ break
+ } else {
+ logger.Warnw(ctx, "kafka-not-reachable", log.Fields{
+ "err": err,
+ })
+ }
+
+ //Wait a bit before trying again
+ select {
+ case <-ctx.Done():
+ return fmt.Errorf("kafka-client-creation-stopped-due-to-context-done")
+ case <-time.After(kafkaBackoffInterval):
+ continue
+ }
+ }
+
+ c.consumer, err = sarama.NewConsumerFromClient(c.client)
+ if err != nil {
+ return err
+ }
+
+ partitions, _ := c.consumer.Partitions(volthaEventsTopic)
+
+ // TODO: Add support for multiple partitions
+ if len(partitions) > 1 {
+ logger.Warnw(ctx, "only-listening-one-partition", log.Fields{
+ "topic": volthaEventsTopic,
+ "partitionsNum": len(partitions),
+ })
+ }
+
+ hw, err := c.client.GetOffset(volthaEventsTopic, partitions[0], sarama.OffsetNewest)
+ if err != nil {
+ return fmt.Errorf("cannot-get-highwater: %v", err)
+ }
+ c.highwater = hw
+
+ c.partitionConsumer, err = c.consumer.ConsumePartition(volthaEventsTopic, partitions[0], sarama.OffsetOldest)
+ if nil != err {
+ return fmt.Errorf("Error in consume(): Topic %v Partitions: %v", volthaEventsTopic, partitions)
+ }
+
+ //Start consuming the event topic in a goroutine
+ logger.Debugw(ctx, "start-consuming-kafka-topic", log.Fields{"topic": volthaEventsTopic})
+ go func(topic string, pConsumer sarama.PartitionConsumer) {
+ for {
+ select {
+ case <-ctx.Done():
+ logger.Info(ctx, "stopped-listening-for-events-due-to-context-done")
+ return
+ case err := <-pConsumer.Errors():
+ logger.Errorw(ctx, "kafka-consumer-error", log.Fields{
+ "err": err.Error(),
+ "topic": err.Topic,
+ "partition": err.Partition,
+ })
+ case msg := <-pConsumer.Messages():
+ if msg.Offset <= c.highwater {
+ continue
+ }
+
+ //Unmarshal the content of the message to a voltha Event protobuf message
+ event := &voltha.Event{}
+ if err := proto.Unmarshal(msg.Value, event); err != nil {
+ logger.Errorw(ctx, "error-unmarshalling-kafka-event", log.Fields{"err": err})
+ continue
+ }
+
+ eventCallback(ctx, event)
+ }
+ }
+ }(volthaEventsTopic, c.partitionConsumer)
+
+ return nil
+}
+
+//Closes the sarama client and all consumers
+func (c *KafkaConsumer) Stop() error {
+ if err := c.partitionConsumer.Close(); err != nil {
+ return err
+ }
+
+ if err := c.consumer.Close(); err != nil {
+ return err
+ }
+
+ if err := c.client.Close(); err != nil {
+ return err
+ }
+
+ return nil
+}
diff --git a/internal/config/config.go b/internal/config/config.go
index d3be1fc..ff7f36f 100644
--- a/internal/config/config.go
+++ b/internal/config/config.go
@@ -36,6 +36,7 @@
OnosUser string
OnosPassword string
SchemaMountFilePath string
+ KafkaClusterAddress string
}
// LoadConfig loads the BBF adapter configuration through
@@ -57,6 +58,7 @@
flag.StringVar(&conf.OnosUser, "onos_user", conf.OnosUser, "Username for ONOS REST APIs")
flag.StringVar(&conf.OnosPassword, "onos_pass", conf.OnosPassword, "Password for ONOS REST APIs")
flag.StringVar(&conf.SchemaMountFilePath, "schema_mount_path", conf.SchemaMountFilePath, "Path to the XML file that defines schema-mounts for libyang")
+ flag.StringVar(&conf.KafkaClusterAddress, "kafka_cluster_address", conf.KafkaClusterAddress, "Kafka cluster messaging address")
flag.Parse()
@@ -80,5 +82,6 @@
OnosUser: "onos",
OnosPassword: "rocks",
SchemaMountFilePath: "/schema-mount.xml",
+ KafkaClusterAddress: "127.0.0.1:9092",
}
}
diff --git a/internal/core/adapter.go b/internal/core/adapter.go
index a96767f..6a59650 100644
--- a/internal/core/adapter.go
+++ b/internal/core/adapter.go
@@ -23,6 +23,7 @@
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-lib-go/v7/pkg/log"
"github.com/opencord/voltha-northbound-bbf-adapter/internal/clients"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
)
var AdapterInstance *VolthaYangAdapter
@@ -42,13 +43,35 @@
func (t *VolthaYangAdapter) GetDevices(ctx context.Context) ([]YangItem, error) {
devices, err := t.volthaNbiClient.Service.ListDevices(ctx, &empty.Empty{})
if err != nil {
- err = fmt.Errorf("get-devices-failed: %v", err)
- return nil, err
+ return nil, fmt.Errorf("get-devices-failed: %v", err)
}
+ logger.Debugw(ctx, "get-devices-success", log.Fields{"devices": devices})
- items := translateDevices(*devices)
+ items := []YangItem{}
- logger.Debugw(ctx, "get-devices-success", log.Fields{"items": items})
+ for _, device := range devices.Items {
+ items = append(items, translateDevice(device)...)
+
+ if !device.Root {
+ //If the device is an ONU, also expose UNIs
+ ports, err := t.volthaNbiClient.Service.ListDevicePorts(ctx, &voltha.ID{Id: device.Id})
+ if err != nil {
+ return nil, fmt.Errorf("get-onu-ports-failed: %v", err)
+ }
+ logger.Debugw(ctx, "get-ports-success", log.Fields{"deviceId": device.Id, "ports": ports})
+
+ portsItems, err := translateOnuPorts(device.Id, ports)
+ if err != nil {
+ logger.Errorw(ctx, "cannot-translate-onu-ports", log.Fields{
+ "deviceId": device.Id,
+ "err": err,
+ })
+ continue
+ }
+
+ items = append(items, portsItems...)
+ }
+ }
return items, nil
}
diff --git a/internal/core/translation.go b/internal/core/translation.go
index 634272c..ef6582d 100644
--- a/internal/core/translation.go
+++ b/internal/core/translation.go
@@ -18,15 +18,15 @@
import (
"fmt"
+ "time"
"github.com/opencord/voltha-protos/v5/go/common"
"github.com/opencord/voltha-protos/v5/go/voltha"
)
const (
- DeviceAggregationModel = "bbf-device-aggregation"
- DevicesPath = "/" + DeviceAggregationModel + ":devices"
- HardwarePath = "data/ietf-hardware:hardware"
+ DeviceAggregationModule = "bbf-device-aggregation"
+ DevicesPath = "/" + DeviceAggregationModule + ":devices"
//Device types
DeviceTypeOlt = "bbf-device-types:olt"
@@ -42,6 +42,13 @@
ietfOperStateDisabled = "disabled"
ietfOperStateEnabled = "enabled"
ietfOperStateTesting = "testing"
+ ietfOperStateUp = "up"
+ ietfOperStateDown = "down"
+
+ //Keys of useful values in device events
+ eventContextKeyPonId = "pon-id"
+ eventContextKeyOnuSn = "serial-number"
+ eventContextKeyOltSn = "olt-serial-number"
)
type YangItem struct {
@@ -56,7 +63,7 @@
//getDevicePath returns the yang path to the root of the device's hardware module in its data mountpoint
func getDeviceHardwarePath(id string) string {
- return fmt.Sprintf("%s/device[name='%s']/%s/component[name='%s']", DevicesPath, id, HardwarePath, id)
+ return fmt.Sprintf("%s/device[name='%s']/data/ietf-hardware:hardware/component[name='%s']", DevicesPath, id, id)
}
//ietfHardwareAdminState returns the string that represents the ietf-hardware admin state
@@ -101,8 +108,30 @@
return ietfOperStateUnknown
}
+//ietfHardwareOperState returns the string that represents the ietf-interfaces oper state
+//enum value corresponding to the one of VOLTHA
+func ietfInterfacesOperState(volthaOperState voltha.OperStatus_Types) string {
+ //TODO: verify this mapping is correct
+ switch volthaOperState {
+ case common.OperStatus_UNKNOWN:
+ return ietfOperStateUnknown
+ case common.OperStatus_TESTING:
+ return ietfOperStateTesting
+ case common.OperStatus_ACTIVE:
+ return ietfOperStateUp
+ case common.OperStatus_DISCOVERED:
+ case common.OperStatus_ACTIVATING:
+ case common.OperStatus_FAILED:
+ case common.OperStatus_RECONCILING:
+ case common.OperStatus_RECONCILING_FAILED:
+ return ietfOperStateDown
+ }
+
+ return ietfOperStateUnknown
+}
+
//translateDevice returns a slice of yang items that represent a voltha device
-func translateDevice(device voltha.Device) []YangItem {
+func translateDevice(device *voltha.Device) []YangItem {
devicePath := getDevicePath(device.Id)
hardwarePath := getDeviceHardwarePath(device.Id)
@@ -110,70 +139,148 @@
//Device type
if device.Root {
+ //OLT
result = append(result, YangItem{
- Path: fmt.Sprintf("%s/type", devicePath),
+ Path: devicePath + "/type",
Value: DeviceTypeOlt,
})
} else {
+ //ONU
result = append(result, YangItem{
- Path: fmt.Sprintf("%s/type", devicePath),
+ Path: devicePath + "/type",
Value: DeviceTypeOnu,
})
}
//Vendor name
result = append(result, YangItem{
- Path: fmt.Sprintf("%s/mfg-name", hardwarePath),
+ Path: hardwarePath + "/mfg-name",
Value: device.Vendor,
})
//Model
result = append(result, YangItem{
- Path: fmt.Sprintf("%s/model-name", hardwarePath),
+ Path: hardwarePath + "/model-name",
Value: device.Model,
})
//Hardware version
result = append(result, YangItem{
- Path: fmt.Sprintf("%s/hardware-rev", hardwarePath),
+ Path: hardwarePath + "/hardware-rev",
Value: device.HardwareVersion,
})
//Firmware version
result = append(result, YangItem{
- Path: fmt.Sprintf("%s/firmware-rev", hardwarePath),
+ Path: hardwarePath + "/firmware-rev",
Value: device.FirmwareVersion,
})
//Serial number
result = append(result, YangItem{
- Path: fmt.Sprintf("%s/serial-num", hardwarePath),
+ Path: hardwarePath + "/serial-num",
Value: device.SerialNumber,
})
//Administrative state
//Translates VOLTHA admin state enum to ietf-hardware enum
result = append(result, YangItem{
- Path: fmt.Sprintf("%s/state/admin-state", hardwarePath),
+ Path: hardwarePath + "/state/admin-state",
Value: ietfHardwareAdminState(device.AdminState),
})
//Operative state
result = append(result, YangItem{
- Path: fmt.Sprintf("%s/state/oper-state", hardwarePath),
+ Path: hardwarePath + "/state/oper-state",
Value: ietfHardwareOperState(device.OperStatus),
})
return result
}
-//translateDevices returns a slice of yang items that represent a list of voltha devices
-func translateDevices(devices voltha.Devices) []YangItem {
+//translateOnuPorts returns a slice of yang items that represent the UNIs of an ONU
+func translateOnuPorts(deviceId string, ports *voltha.Ports) ([]YangItem, error) {
+ interfacesPath := getDevicePath(deviceId) + "/data/ietf-interfaces:interfaces"
result := []YangItem{}
- for _, device := range devices.Items {
- result = append(result, translateDevice(*device)...)
+ for _, port := range ports.Items {
+ if port.Type == voltha.Port_ETHERNET_UNI {
+ if port.OfpPort == nil {
+ return nil, fmt.Errorf("no-ofp-port-in-uni: %s %d", deviceId, port.PortNo)
+ }
+
+ interfacePath := fmt.Sprintf("%s/interface[name='%s']", interfacesPath, port.OfpPort.Name)
+
+ result = append(result, []YangItem{
+ {
+ Path: interfacePath + "/type",
+ Value: "bbf-xpon-if-type:onu-v-vrefpoint",
+ },
+ {
+ Path: interfacePath + "/oper-status",
+ Value: ietfInterfacesOperState(port.OperStatus),
+ },
+ }...)
+ }
}
- return result
+ return result, nil
+}
+
+//TranslateOnuActivatedEvent returns a slice of yang items and the name of the channel termination to populate
+//an ONU discovery notification with data from ONU_ACTIVATED_RAISE_EVENT coming from the Kafka bus
+func TranslateOnuActivatedEvent(eventHeader *voltha.EventHeader, deviceEvent *voltha.DeviceEvent) (notification []YangItem, channelTermination []YangItem, err error) {
+
+ //TODO: the use of this notification, which requires the creation of a dummy channel termination node,
+ //is temporary, and will be substituted with a more fitting one as soon as it will be defined
+
+ //Check if the needed information is present
+ ponId, ok := deviceEvent.Context[eventContextKeyPonId]
+ if !ok {
+ return nil, nil, fmt.Errorf("missing-key-from-event-context: %s", eventContextKeyPonId)
+ }
+ oltId, ok := deviceEvent.Context[eventContextKeyOltSn]
+ if !ok {
+ return nil, nil, fmt.Errorf("missing-key-from-event-context: %s", eventContextKeyPonId)
+ }
+ ponName := oltId + "-pon-" + ponId
+
+ onuSn, ok := deviceEvent.Context[eventContextKeyOnuSn]
+ if !ok {
+ return nil, nil, fmt.Errorf("missing-key-from-event-context: %s", eventContextKeyOnuSn)
+ }
+
+ notificationPath := "/bbf-xpon-onu-states:onu-state-change"
+
+ notification = []YangItem{
+ {
+ Path: notificationPath + "/detected-serial-number",
+ Value: onuSn,
+ },
+ {
+ Path: notificationPath + "/channel-termination-ref",
+ Value: ponName,
+ },
+ {
+ Path: notificationPath + "/onu-state-last-change",
+ Value: eventHeader.RaisedTs.AsTime().Format(time.RFC3339),
+ },
+ {
+ Path: notificationPath + "/onu-state",
+ Value: "bbf-xpon-onu-types:onu-present",
+ },
+ {
+ Path: notificationPath + "/detected-registration-id",
+ Value: deviceEvent.ResourceId,
+ },
+ }
+
+ channelTermination = []YangItem{
+ {
+ Path: fmt.Sprintf("/ietf-interfaces:interfaces/interface[name='%s']/type", ponName),
+ Value: "bbf-if-type:vlan-sub-interface",
+ },
+ }
+
+ return notification, channelTermination, nil
}
diff --git a/internal/core/translation_test.go b/internal/core/translation_test.go
index fb2e751..41dc1dc 100644
--- a/internal/core/translation_test.go
+++ b/internal/core/translation_test.go
@@ -19,9 +19,12 @@
import (
"fmt"
"testing"
+ "time"
+ "github.com/opencord/voltha-protos/v5/go/openflow_13"
"github.com/opencord/voltha-protos/v5/go/voltha"
"github.com/stretchr/testify/assert"
+ "google.golang.org/protobuf/types/known/timestamppb"
)
const (
@@ -44,58 +47,253 @@
}
func TestTranslateDevice(t *testing.T) {
- olt := voltha.Device{
- Id: testDeviceId,
- Root: true,
+ olt := &voltha.Device{
+ Id: testDeviceId,
+ Root: true,
+ Vendor: "BBSim",
+ Model: "asfvolt16",
+ SerialNumber: "BBSIM_OLT_10",
+ HardwareVersion: "v0.0.2",
+ FirmwareVersion: "v0.0.3",
+ AdminState: voltha.AdminState_ENABLED,
+ OperStatus: voltha.OperStatus_ACTIVE,
}
items := translateDevice(olt)
- val, ok := getItemWithPath(items, fmt.Sprintf("%s/type", getDevicePath(testDeviceId)))
- assert.True(t, ok, "No type item for olt")
- assert.Equal(t, DeviceTypeOlt, val)
+ oltPath := getDevicePath(testDeviceId)
+ oltHwPath := getDeviceHardwarePath(testDeviceId)
- onu := voltha.Device{
- Id: testDeviceId,
- Root: false,
+ expected := []YangItem{
+ {
+ Path: oltPath + "/type",
+ Value: DeviceTypeOlt,
+ },
+ {
+ Path: oltHwPath + "/mfg-name",
+ Value: "BBSim",
+ },
+ {
+ Path: oltHwPath + "/model-name",
+ Value: "asfvolt16",
+ },
+ {
+ Path: oltHwPath + "/hardware-rev",
+ Value: "v0.0.2",
+ },
+ {
+ Path: oltHwPath + "/firmware-rev",
+ Value: "v0.0.3",
+ },
+ {
+ Path: oltHwPath + "/serial-num",
+ Value: "BBSIM_OLT_10",
+ },
+ {
+ Path: oltHwPath + "/state/admin-state",
+ Value: ietfAdminStateUnlocked,
+ },
+ {
+ Path: oltHwPath + "/state/oper-state",
+ Value: ietfOperStateEnabled,
+ },
+ }
+
+ assert.NotEmpty(t, items, "No OLT items")
+ for _, e := range expected {
+ val, ok := getItemWithPath(items, e.Path)
+ assert.True(t, ok, e.Path+" missing for OLT")
+ assert.Equal(t, e.Value, val, "Wrong value for "+e.Path)
+ }
+
+ onu := &voltha.Device{
+ Id: testDeviceId,
+ Root: false,
+ Vendor: "BBSM",
+ Model: "v0.0.1",
+ SerialNumber: "BBSM000a0001",
+ HardwareVersion: "v0.0.2",
+ FirmwareVersion: "v0.0.3",
+ AdminState: voltha.AdminState_ENABLED,
+ OperStatus: voltha.OperStatus_ACTIVE,
}
items = translateDevice(onu)
- val, ok = getItemWithPath(items, fmt.Sprintf("%s/type", getDevicePath(testDeviceId)))
- assert.True(t, ok, "No type item for onu")
- assert.Equal(t, DeviceTypeOnu, val)
+ onuPath := getDevicePath(testDeviceId)
+ onuHwPath := getDeviceHardwarePath(testDeviceId)
+
+ expected = []YangItem{
+ {
+ Path: onuPath + "/type",
+ Value: DeviceTypeOnu,
+ },
+ {
+ Path: onuHwPath + "/mfg-name",
+ Value: "BBSM",
+ },
+ {
+ Path: onuHwPath + "/model-name",
+ Value: "v0.0.1",
+ },
+ {
+ Path: onuHwPath + "/hardware-rev",
+ Value: "v0.0.2",
+ },
+ {
+ Path: onuHwPath + "/firmware-rev",
+ Value: "v0.0.3",
+ },
+ {
+ Path: onuHwPath + "/serial-num",
+ Value: "BBSM000a0001",
+ },
+ {
+ Path: onuHwPath + "/state/admin-state",
+ Value: ietfAdminStateUnlocked,
+ },
+ {
+ Path: onuHwPath + "/state/oper-state",
+ Value: ietfOperStateEnabled,
+ },
+ }
+
+ assert.NotEmpty(t, items, "No ONU items")
+ for _, e := range expected {
+ val, ok := getItemWithPath(items, e.Path)
+ assert.True(t, ok, e.Path+" missing for ONU")
+ assert.Equal(t, e.Value, val, "Wrong value for "+e.Path)
+ }
}
-func TestTranslateDevices(t *testing.T) {
- devicesNum := 10
-
- //Create test devices
- devices := voltha.Devices{
- Items: []*voltha.Device{},
+func TestTranslateOnuPorts(t *testing.T) {
+ ports := &voltha.Ports{
+ Items: []*voltha.Port{
+ {
+ PortNo: 0,
+ Type: voltha.Port_ETHERNET_UNI,
+ OperStatus: voltha.OperStatus_ACTIVE,
+ },
+ },
}
- for i := 0; i < devicesNum; i++ {
- devices.Items = append(devices.Items, &voltha.Device{
- Id: fmt.Sprintf("%d", i),
- Root: i%2 == 0,
- })
+ _, err := translateOnuPorts(testDeviceId, ports)
+ assert.Error(t, err, "No error for missing Ofp port")
+
+ ports = &voltha.Ports{
+ Items: []*voltha.Port{
+ {
+ PortNo: 0,
+ Type: voltha.Port_ETHERNET_UNI,
+ OfpPort: &openflow_13.OfpPort{
+ Name: "BBSM000a0001-1",
+ },
+ OperStatus: voltha.OperStatus_ACTIVE,
+ },
+ {
+ PortNo: 1,
+ Type: voltha.Port_ETHERNET_UNI,
+ OfpPort: &openflow_13.OfpPort{
+ Name: "BBSM000a0001-2",
+ },
+ OperStatus: voltha.OperStatus_UNKNOWN,
+ },
+ {
+ PortNo: 0,
+ Type: voltha.Port_PON_ONU,
+ OperStatus: voltha.OperStatus_UNKNOWN,
+ },
+ },
}
- //Translate them to items
- items := translateDevices(devices)
+ portsItems, err := translateOnuPorts(testDeviceId, ports)
+ assert.Nil(t, err, "Translation error")
- //Check if the number of generated items is correct
- singleDeviceItemsNum := len(translateDevice(*devices.Items[0]))
- assert.Equal(t, singleDeviceItemsNum*devicesNum, len(items))
+ /*2 items for 2 UNIs, PON is ignored*/
+ assert.Equal(t, 4, len(portsItems), "No ports items")
- //Check if the content is right
- for i := 0; i < devicesNum; i++ {
- val, ok := getItemWithPath(items, fmt.Sprintf("%s/type", getDevicePath(devices.Items[i].Id)))
- assert.True(t, ok, fmt.Sprintf("No type item for device %d", i))
+ interfacesPath := getDevicePath(testDeviceId) + "/data/ietf-interfaces:interfaces"
- if devices.Items[i].Root {
- assert.Equal(t, DeviceTypeOlt, val)
- } else {
- assert.Equal(t, DeviceTypeOnu, val)
- }
+ expected := []YangItem{
+ {
+ Path: fmt.Sprintf("%s/interface[name='%s']/oper-status", interfacesPath, "BBSM000a0001-1"),
+ Value: ietfOperStateUp,
+ },
+ {
+ Path: fmt.Sprintf("%s/interface[name='%s']/type", interfacesPath, "BBSM000a0001-1"),
+ Value: "bbf-xpon-if-type:onu-v-vrefpoint",
+ },
+ {
+ Path: fmt.Sprintf("%s/interface[name='%s']/oper-status", interfacesPath, "BBSM000a0001-2"),
+ Value: ietfOperStateUnknown,
+ },
+ {
+ Path: fmt.Sprintf("%s/interface[name='%s']/type", interfacesPath, "BBSM000a0001-2"),
+ Value: "bbf-xpon-if-type:onu-v-vrefpoint",
+ },
+ }
+
+ for _, e := range expected {
+ val, ok := getItemWithPath(portsItems, e.Path)
+ assert.True(t, ok, e.Path+" missing for ports")
+ assert.Equal(t, e.Value, val, "Wrong value for "+e.Path)
+ }
+}
+
+func TestTranslateOnuActive(t *testing.T) {
+ timestamp := time.Now()
+ eventHeader := &voltha.EventHeader{
+ Id: "Voltha.openolt.ONU_ACTIVATED.1657705515351182767",
+ RaisedTs: timestamppb.New(timestamp),
+ Category: voltha.EventCategory_EQUIPMENT,
+ Type: voltha.EventType_DEVICE_EVENT,
+ }
+
+ deviceEvent := &voltha.DeviceEvent{
+ ResourceId: testDeviceId,
+ DeviceEventName: "ONU_ACTIVATED_RAISE_EVENT",
+ Description: "ONU Event - ONU_ACTIVATED - Raised",
+ Context: map[string]string{},
+ }
+
+ _, _, err := TranslateOnuActivatedEvent(eventHeader, deviceEvent)
+ assert.Error(t, err, "Empty context produces no error")
+
+ deviceEvent.Context[eventContextKeyPonId] = "0"
+ deviceEvent.Context[eventContextKeyOnuSn] = "BBSM000a0001"
+ deviceEvent.Context[eventContextKeyOltSn] = "BBSIM_OLT_10"
+
+ notificationPath := "/bbf-xpon-onu-states:onu-state-change"
+ expected := []YangItem{
+ {
+ Path: notificationPath + "/detected-serial-number",
+ Value: "BBSM000a0001",
+ },
+ {
+ Path: notificationPath + "/onu-state-last-change",
+ Value: timestamp.Format(time.RFC3339),
+ },
+ {
+ Path: notificationPath + "/onu-state",
+ Value: "bbf-xpon-onu-types:onu-present",
+ },
+ {
+ Path: notificationPath + "/detected-registration-id",
+ Value: testDeviceId,
+ },
+ }
+
+ notificationItems, channelTerminationItems, err := TranslateOnuActivatedEvent(eventHeader, deviceEvent)
+ assert.Nil(t, err, "Translation error")
+
+ assert.NotEmpty(t, channelTerminationItems, "No channel termination items")
+
+ assert.NotEmpty(t, notificationItems, "No notification items")
+
+ _, ok := getItemWithPath(notificationItems, notificationPath+"/channel-termination-ref")
+ assert.True(t, ok, "No channel termination reference in notification")
+
+ for _, e := range expected {
+ val, ok := getItemWithPath(notificationItems, e.Path)
+ assert.True(t, ok, e.Path+" missing for notification")
+ assert.Equal(t, e.Value, val, "Wrong value for "+e.Path)
}
}
diff --git a/internal/sysrepo/callbacks.go b/internal/sysrepo/callbacks.go
new file mode 100644
index 0000000..1256568
--- /dev/null
+++ b/internal/sysrepo/callbacks.go
@@ -0,0 +1,67 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+
+* http://www.apache.org/licenses/LICENSE-2.0
+
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sysrepo
+
+//#cgo LDFLAGS: -lsysrepo -lyang -Wl,--allow-multiple-definition
+//#include "plugin.c"
+import "C"
+import (
+ "context"
+
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-northbound-bbf-adapter/internal/core"
+)
+
+//export get_devices_cb
+func get_devices_cb(session *C.sr_session_ctx_t, parent **C.lyd_node) C.sr_error_t {
+ //This function is a callback for the retrieval of devices from sysrepo
+ //The "export" comment instructs CGO to create a C function for it
+
+ ctx := context.Background()
+ logger.Debug(ctx, "processing-get-devices-request")
+
+ if session == nil {
+ logger.Error(ctx, "sysrepo-get-devices-null-session")
+ return C.SR_ERR_OPERATION_FAILED
+ }
+
+ if parent == nil {
+ logger.Error(ctx, "sysrepo-get-devices-null-parent-node")
+ return C.SR_ERR_OPERATION_FAILED
+ }
+
+ if core.AdapterInstance == nil {
+ logger.Error(ctx, "sysrepo-get-devices-nil-translator")
+ return C.SR_ERR_OPERATION_FAILED
+ }
+
+ devices, err := core.AdapterInstance.GetDevices(ctx)
+ if err != nil {
+ logger.Errorw(ctx, "sysrepo-get-devices-translator-error", log.Fields{"err": err})
+ return C.SR_ERR_OPERATION_FAILED
+ }
+
+ err = updateYangTree(ctx, session, parent, devices)
+ if err != nil {
+ logger.Errorw(ctx, "sysrepo-get-devices-update-error", log.Fields{"err": err})
+ return C.SR_ERR_OPERATION_FAILED
+ }
+
+ logger.Info(ctx, "devices-information-request-served")
+
+ return C.SR_ERR_OK
+}
diff --git a/internal/sysrepo/events.go b/internal/sysrepo/events.go
new file mode 100644
index 0000000..4ababe3
--- /dev/null
+++ b/internal/sysrepo/events.go
@@ -0,0 +1,110 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+
+* http://www.apache.org/licenses/LICENSE-2.0
+
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sysrepo
+
+//#cgo LDFLAGS: -lsysrepo -lyang -Wl,--allow-multiple-definition
+//#include "plugin.c"
+import "C"
+import (
+ "context"
+ "fmt"
+
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-northbound-bbf-adapter/internal/core"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
+)
+
+const (
+ eventNameOnuActivated = "ONU_ACTIVATED_RAISE_EVENT"
+)
+
+//Performs the necessary operations on a new voltha event received from Kafka
+func (p *SysrepoPlugin) ManageVolthaEvent(ctx context.Context, event *voltha.Event) {
+ if event.Header.Type == voltha.EventType_DEVICE_EVENT {
+ devEvent, ok := event.EventType.(*voltha.Event_DeviceEvent)
+ if !ok {
+ logger.Errorw(ctx, "unexpected-event-type", log.Fields{
+ "headerType": event.Header.Type,
+ "actualType": fmt.Sprintf("%T", event.EventType),
+ })
+ return
+ }
+
+ //TODO: map other events to ONU state changes
+ switch devEvent.DeviceEvent.DeviceEventName {
+ case eventNameOnuActivated:
+ logger.Debugw(ctx, "onu-activated-event-received", log.Fields{
+ "header": event.Header,
+ "deviceEvent": devEvent.DeviceEvent,
+ })
+
+ if err := p.sendOnuActivatedNotification(ctx, event.Header, devEvent.DeviceEvent); err != nil {
+ logger.Errorw(ctx, "failed-to-send-onu-activated-notification", log.Fields{"err": err})
+ }
+ }
+ }
+}
+
+//Sends a notification based on the content of the received device event
+func (p *SysrepoPlugin) sendOnuActivatedNotification(ctx context.Context, eventHeader *voltha.EventHeader, deviceEvent *voltha.DeviceEvent) error {
+ //Prepare the content of the notification
+ notificationItems, channelTermItems, err := core.TranslateOnuActivatedEvent(eventHeader, deviceEvent)
+ if err != nil {
+ return fmt.Errorf("failed-to-translate-onu-activated-event: %v", err)
+ }
+
+ //Create the channel termination in the datastore to make the notification leafref valid
+ channelTermTree, err := createYangTree(ctx, p.operationalSession, channelTermItems)
+ if err != nil {
+ return fmt.Errorf("failed-to-create-channel-termination-tree: %v", err)
+ }
+ defer C.lyd_free_all(channelTermTree)
+
+ err = editDatastore(ctx, p.operationalSession, channelTermTree)
+ if err != nil {
+ return fmt.Errorf("failed-to-apply-channel-termination-to-datastore: %v", err)
+ }
+
+ //Create the notification tree
+ notificationTree, err := createYangTree(ctx, p.operationalSession, notificationItems)
+ if err != nil {
+ return fmt.Errorf("failed-to-create-onu-activated-notification-tree: %v", err)
+ }
+
+ //Let sysrepo manage the notification tree to properly free it after its delivery
+ var notificationData *C.sr_data_t
+ errCode := C.sr_acquire_data(p.connection, notificationTree, ¬ificationData)
+ if errCode != C.SR_ERR_OK {
+ err := fmt.Errorf("cannot-acquire-notification-data")
+ logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+ return err
+ }
+ defer C.sr_release_data(notificationData)
+
+ //Send the notification
+ logger.Infow(ctx, "sending-onu-activated-notification", log.Fields{
+ "onuSn": deviceEvent.Context["serial-number"],
+ })
+ errCode = C.sr_notif_send_tree(p.operationalSession, notificationData.tree, 0, 0)
+ if errCode != C.SR_ERR_OK {
+ err := fmt.Errorf("cannot-send-notification")
+ logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+ return err
+ }
+
+ return nil
+}
diff --git a/internal/sysrepo/plugin.c b/internal/sysrepo/plugin.c
index 08e7978..f9f53c6 100644
--- a/internal/sysrepo/plugin.c
+++ b/internal/sysrepo/plugin.c
@@ -25,6 +25,9 @@
typedef struct lyd_node lyd_node;
typedef struct ly_ctx ly_ctx;
+//Used to define the datastore edit mode
+const char* mergeOperation = "merge";
+
//Provides data for the schema-mount extension
LY_ERR mountpoint_ext_data_clb(
const struct lysc_ext_instance *ext,
@@ -37,7 +40,7 @@
return LY_SUCCESS;
}
-// Exported by sysrepo.go
+// Exported by callbacks.go
sr_error_t get_devices_cb(sr_session_ctx_t *session, lyd_node **parent);
//The wrapper functions are needed because CGO cannot express some keywords
diff --git a/internal/sysrepo/sysrepo.go b/internal/sysrepo/sysrepo.go
index 3a4dc79..973dbb9 100644
--- a/internal/sysrepo/sysrepo.go
+++ b/internal/sysrepo/sysrepo.go
@@ -31,73 +31,16 @@
)
type SysrepoPlugin struct {
- connection *C.sr_conn_ctx_t
- session *C.sr_session_ctx_t
- subscription *C.sr_subscription_ctx_t
- schemaMountData *C.lyd_node
-}
-
-func srErrorMsg(code C.int) string {
- return C.GoString(C.sr_strerror(code))
-}
-
-func lyErrorMsg(ly_ctx *C.ly_ctx) string {
- lyErrString := C.ly_errmsg(ly_ctx)
- defer freeCString(lyErrString)
-
- return C.GoString(lyErrString)
-}
-
-func freeCString(str *C.char) {
- if str != nil {
- C.free(unsafe.Pointer(str))
- str = nil
- }
-}
-
-func updateYangItems(ctx context.Context, session *C.sr_session_ctx_t, parent **C.lyd_node, items []core.YangItem) error {
- conn := C.sr_session_get_connection(session)
- if conn == nil {
- return fmt.Errorf("null-connection")
- }
-
- //libyang context
- ly_ctx := C.sr_acquire_context(conn)
- defer C.sr_release_context(conn)
- if ly_ctx == nil {
- return fmt.Errorf("null-libyang-context")
- }
-
- for _, item := range items {
- if item.Value == "" {
- continue
- }
-
- logger.Debugw(ctx, "updating-yang-item", log.Fields{"item": item})
-
- path := C.CString(item.Path)
- value := C.CString(item.Value)
-
- lyErr := C.lyd_new_path(*parent, ly_ctx, path, value, 0, nil)
- if lyErr != C.LY_SUCCESS {
- freeCString(path)
- freeCString(value)
-
- err := fmt.Errorf("libyang-new-path-failed: %d %s", lyErr, lyErrorMsg(ly_ctx))
-
- return err
- }
-
- freeCString(path)
- freeCString(value)
- }
-
- return nil
+ connection *C.sr_conn_ctx_t
+ operationalSession *C.sr_session_ctx_t
+ runningSession *C.sr_session_ctx_t
+ subscription *C.sr_subscription_ctx_t
+ schemaMountData *C.lyd_node
}
//createPluginState populates a SysrepoPlugin struct by establishing
//a connection and a session
-func (p *SysrepoPlugin) createSession(ctx context.Context) error {
+func (p *SysrepoPlugin) createSessions(ctx context.Context) error {
var errCode C.int
//Populates connection
@@ -108,10 +51,23 @@
return err
}
- //Populates session
- errCode = C.sr_session_start(p.connection, C.SR_DS_RUNNING, &p.session)
+ //Populates sessions
+ //The session on the operation datastore will be used for most operations
+ //The session on the running datastore will be used for the subscription to edits
+ //since the operational datastore can't be edited by the client
+ errCode = C.sr_session_start(p.connection, C.SR_DS_OPERATIONAL, &p.operationalSession)
if errCode != C.SR_ERR_OK {
- err := fmt.Errorf("sysrepo-session-error")
+ err := fmt.Errorf("sysrepo-operational-session-error")
+ logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+
+ _ = p.Stop(ctx)
+
+ return err
+ }
+
+ errCode = C.sr_session_start(p.connection, C.SR_DS_RUNNING, &p.runningSession)
+ if errCode != C.SR_ERR_OK {
+ err := fmt.Errorf("sysrepo-running-session-error")
logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
_ = p.Stop(ctx)
@@ -122,49 +78,20 @@
return nil
}
-//export get_devices_cb
-func get_devices_cb(session *C.sr_session_ctx_t, parent **C.lyd_node) C.sr_error_t {
- //This function is a callback for the retrieval of devices from sysrepo
- //The "export" comment instructs CGO to create a C function for it
-
- ctx := context.Background()
- logger.Debug(ctx, "processing-get-data-request")
-
- if session == nil {
- logger.Error(ctx, "sysrepo-get-data-null-session")
- return C.SR_ERR_OPERATION_FAILED
- }
-
- if parent == nil {
- logger.Error(ctx, "sysrepo-get-data-null-parent-node")
- return C.SR_ERR_OPERATION_FAILED
- }
-
- if core.AdapterInstance == nil {
- logger.Error(ctx, "sysrepo-get-data-nil-translator")
- return C.SR_ERR_OPERATION_FAILED
- }
-
- devices, err := core.AdapterInstance.GetDevices(ctx)
- if err != nil {
- logger.Errorw(ctx, "sysrepo-get-data-translator-error", log.Fields{"err": err})
- return C.SR_ERR_OPERATION_FAILED
- }
-
- err = updateYangItems(ctx, session, parent, devices)
- if err != nil {
- logger.Errorw(ctx, "sysrepo-get-data-update-error", log.Fields{"err": err})
- return C.SR_ERR_OPERATION_FAILED
- }
-
- return C.SR_ERR_OK
-}
-
func StartNewPlugin(ctx context.Context, schemaMountFilePath string) (*SysrepoPlugin, error) {
plugin := &SysrepoPlugin{}
+ //Set sysrepo and libyang log level
+ if logger.GetLogLevel() == log.DebugLevel {
+ C.sr_log_stderr(C.SR_LL_INF)
+ C.ly_log_level(C.LY_LLVRB)
+ } else {
+ C.sr_log_stderr(C.SR_LL_ERR)
+ C.ly_log_level(C.LY_LLERR)
+ }
+
//Open a session to sysrepo
- err := plugin.createSession(ctx)
+ err := plugin.createSessions(ctx)
if err != nil {
return nil, err
}
@@ -201,16 +128,15 @@
//Set callbacks for events
//Subscribe with a callback to the request of data on a certain path
- module := C.CString(core.DeviceAggregationModel)
- defer freeCString(module)
-
- path := C.CString(core.DevicesPath + "/*")
- defer freeCString(path)
+ devicesModule := C.CString(core.DeviceAggregationModule)
+ devicesPath := C.CString(core.DevicesPath + "/*")
+ defer freeCString(devicesModule)
+ defer freeCString(devicesPath)
errCode := C.sr_oper_get_subscribe(
- plugin.session,
- module,
- path,
+ plugin.operationalSession,
+ devicesModule,
+ devicesPath,
C.function(C.get_devices_cb_wrapper),
C.NULL,
C.SR_SUBSCR_DEFAULT,
@@ -244,15 +170,25 @@
p.subscription = nil
}
- //Frees session
- if p.session != nil {
- errCode = C.sr_session_stop(p.session)
+ //Frees sessions
+ if p.operationalSession != nil {
+ errCode = C.sr_session_stop(p.operationalSession)
if errCode != C.SR_ERR_OK {
- err := fmt.Errorf("failed-to-close-sysrepo-session")
+ err := fmt.Errorf("failed-to-close-operational-session")
logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
return err
}
- p.session = nil
+ p.operationalSession = nil
+ }
+
+ if p.runningSession != nil {
+ errCode = C.sr_session_stop(p.runningSession)
+ if errCode != C.SR_ERR_OK {
+ err := fmt.Errorf("failed-to-close-running-session")
+ logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+ return err
+ }
+ p.runningSession = nil
}
//Frees connection
diff --git a/internal/sysrepo/utils.go b/internal/sysrepo/utils.go
new file mode 100644
index 0000000..d564dc7
--- /dev/null
+++ b/internal/sysrepo/utils.go
@@ -0,0 +1,173 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+
+* http://www.apache.org/licenses/LICENSE-2.0
+
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package sysrepo
+
+//#cgo LDFLAGS: -lsysrepo -lyang -Wl,--allow-multiple-definition
+//#include "plugin.c"
+import "C"
+import (
+ "context"
+ "fmt"
+ "unsafe"
+
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-northbound-bbf-adapter/internal/core"
+)
+
+//srErrorMsg provides a description of a sysrepo error code
+func srErrorMsg(code C.int) string {
+ return C.GoString(C.sr_strerror(code))
+}
+
+//lyErrorMsg provides the last libyang error message
+func lyErrorMsg(ly_ctx *C.ly_ctx) string {
+ lyErrString := C.ly_errmsg(ly_ctx)
+ defer freeCString(lyErrString)
+
+ return C.GoString(lyErrString)
+}
+
+func freeCString(str *C.char) {
+ if str != nil {
+ C.free(unsafe.Pointer(str))
+ str = nil
+ }
+}
+
+//Creates a new libyang nodes tree from a set of new paths.
+//The tree must bee manually freed after its use with C.lyd_free_all or
+//an equivalent function
+func createYangTree(ctx context.Context, session *C.sr_session_ctx_t, items []core.YangItem) (*C.lyd_node, error) {
+ if len(items) == 0 {
+ return nil, fmt.Errorf("no-items")
+ }
+
+ conn := C.sr_session_get_connection(session)
+ if conn == nil {
+ return nil, fmt.Errorf("null-connection")
+ }
+
+ //libyang context
+ ly_ctx := C.sr_acquire_context(conn)
+ if ly_ctx == nil {
+ return nil, fmt.Errorf("null-libyang-context")
+ }
+ defer C.sr_release_context(conn)
+
+ //Create parent node
+ parentPath := C.CString(items[0].Path)
+ parentValue := C.CString(items[0].Value)
+
+ var parent *C.lyd_node
+ lyErr := C.lyd_new_path(nil, ly_ctx, parentPath, parentValue, 0, &parent)
+ if lyErr != C.LY_SUCCESS {
+ err := fmt.Errorf("libyang-new-path-failed: %d %s", lyErr, lyErrorMsg(ly_ctx))
+ return nil, err
+ }
+ logger.Debugw(ctx, "creating-yang-item", log.Fields{"item": items[0]})
+
+ freeCString(parentPath)
+ freeCString(parentValue)
+
+ //Add remaining nodes
+ for _, item := range items[1:] {
+ logger.Debugw(ctx, "creating-yang-item", log.Fields{"item": item})
+
+ path := C.CString(item.Path)
+ value := C.CString(item.Value)
+
+ lyErr := C.lyd_new_path(parent, ly_ctx, path, value, 0, nil)
+ if lyErr != C.LY_SUCCESS {
+ freeCString(path)
+ freeCString(value)
+
+ //Free the partially created tree
+ C.lyd_free_all(parent)
+
+ err := fmt.Errorf("libyang-new-path-failed: %d %s", lyErr, lyErrorMsg(ly_ctx))
+
+ return nil, err
+ }
+
+ freeCString(path)
+ freeCString(value)
+ }
+
+ return parent, nil
+}
+
+//Creates a set of new paths under an existing libyang tree parent node
+func updateYangTree(ctx context.Context, session *C.sr_session_ctx_t, parent **C.lyd_node, items []core.YangItem) error {
+ if len(items) == 0 {
+ //Nothing to do
+ return nil
+ }
+
+ conn := C.sr_session_get_connection(session)
+ if conn == nil {
+ return fmt.Errorf("null-connection")
+ }
+
+ //libyang context
+ ly_ctx := C.sr_acquire_context(conn)
+ if ly_ctx == nil {
+ return fmt.Errorf("null-libyang-context")
+ }
+ defer C.sr_release_context(conn)
+
+ for _, item := range items {
+ logger.Debugw(ctx, "updating-yang-item", log.Fields{"item": item})
+
+ path := C.CString(item.Path)
+ value := C.CString(item.Value)
+
+ lyErr := C.lyd_new_path(*parent, ly_ctx, path, value, 0, nil)
+ if lyErr != C.LY_SUCCESS {
+ freeCString(path)
+ freeCString(value)
+
+ err := fmt.Errorf("libyang-new-path-failed: %d %s", lyErr, lyErrorMsg(ly_ctx))
+
+ return err
+ }
+
+ freeCString(path)
+ freeCString(value)
+ }
+
+ return nil
+}
+
+//Merges the content of a yang tree with the content of the datastore.
+//The target datastore is the one on which the session has been created
+func editDatastore(ctx context.Context, session *C.sr_session_ctx_t, editsTree *C.lyd_node) error {
+ errCode := C.sr_edit_batch(session, editsTree, C.mergeOperation)
+ if errCode != C.SR_ERR_OK {
+ err := fmt.Errorf("failed-to-edit-datastore")
+ logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+ return err
+ }
+
+ errCode = C.sr_apply_changes(session, 0)
+ if errCode != C.SR_ERR_OK {
+ err := fmt.Errorf("failed-to-apply-datastore-changes")
+ logger.Errorw(ctx, err.Error(), log.Fields{"errCode": errCode, "errMsg": srErrorMsg(errCode)})
+ return err
+ }
+
+ return nil
+}