VOL-1624 Support for tech-profile creation on the first flow that references the tp-id (in write-metadata)
Getting meter from flow itself and bug fixes
Bug fix for dhcp packet-out
Change-Id: Ia466988bfdbfe49fd9a44729a4ba4a30fd991c54
diff --git a/vendor/github.com/opencord/voltha-go/adapters/common/core_proxy.go b/vendor/github.com/opencord/voltha-go/adapters/common/core_proxy.go
index 8371e09..c9f332c 100644
--- a/vendor/github.com/opencord/voltha-go/adapters/common/core_proxy.go
+++ b/vendor/github.com/opencord/voltha-go/adapters/common/core_proxy.go
@@ -499,3 +499,38 @@
log.Debugw("SendPacketIn-response", log.Fields{"pDeviceId": deviceId, "success": success})
return unPackResponse(rpc, deviceId, success, result)
}
+
+func (ap *CoreProxy) DevicePMConfigUpdate(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
+ log.Debugw("DevicePMConfigUpdate", log.Fields{"pmConfigs": pmConfigs})
+ rpc := "DevicePMConfigUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(pmConfigs.Id)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device_pm_config",
+ Value: pmConfigs,
+ }
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, pmConfigs.Id, args...)
+ log.Debugw("DevicePMConfigUpdate-response", log.Fields{"pDeviceId": pmConfigs.Id, "success": success})
+ return unPackResponse(rpc, pmConfigs.Id, success, result)
+}
+
+func (ap *CoreProxy) ReconcileChildDevices(ctx context.Context, parentDeviceId string) error {
+ log.Debugw("ReconcileChildDevices", log.Fields{"parentDeviceId": parentDeviceId})
+ rpc := "ReconcileChildDevices"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := []*kafka.KVArg{
+ {Key: "parent_device_id", Value: &voltha.ID{Id: parentDeviceId}},
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ log.Debugw("ReconcileChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+ return unPackResponse(rpc, parentDeviceId, success, result)
+}
diff --git a/vendor/github.com/opencord/voltha-go/adapters/common/performance_metrics.go b/vendor/github.com/opencord/voltha-go/adapters/common/performance_metrics.go
new file mode 100644
index 0000000..8f74439
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-go/adapters/common/performance_metrics.go
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+ "github.com/opencord/voltha-protos/go/voltha"
+)
+
+type PmMetrics struct {
+ deviceId string
+ frequency uint32
+ grouped bool
+ frequencyOverride bool
+ metrics map[string]*voltha.PmConfig
+}
+
+type PmMetricsOption func(*PmMetrics)
+
+func Frequency(frequency uint32) PmMetricsOption {
+ return func(args *PmMetrics) {
+ args.frequency = frequency
+ }
+}
+
+func Grouped(grouped bool) PmMetricsOption {
+ return func(args *PmMetrics) {
+ args.grouped = grouped
+ }
+}
+
+func FrequencyOverride(frequencyOverride bool) PmMetricsOption {
+ return func(args *PmMetrics) {
+ args.frequencyOverride = frequencyOverride
+ }
+}
+
+func Metrics(pmNames []string) PmMetricsOption {
+ return func(args *PmMetrics) {
+ args.metrics = make(map[string]*voltha.PmConfig)
+ for _, name := range pmNames {
+ args.metrics[name] = &voltha.PmConfig{
+ Name: name,
+ Type: voltha.PmConfig_COUNTER,
+ Enabled: true,
+ }
+ }
+ }
+}
+
+func NewPmMetrics(deviceId string, opts ...PmMetricsOption) *PmMetrics {
+ pm := &PmMetrics{deviceId: deviceId}
+ for _, option := range opts {
+ option(pm)
+ }
+ return pm
+}
+
+func (pm *PmMetrics) ToPmConfigs() *voltha.PmConfigs {
+ pmConfigs := &voltha.PmConfigs{
+ Id: pm.deviceId,
+ DefaultFreq: pm.frequency,
+ Grouped: pm.grouped,
+ FreqOverride: pm.frequencyOverride,
+ }
+ for _, v := range pm.metrics {
+ pmConfigs.Metrics = append(pmConfigs.Metrics, &voltha.PmConfig{Name: v.Name, Type: v.Type, Enabled: v.Enabled})
+ }
+ return pmConfigs
+}
diff --git a/vendor/github.com/opencord/voltha-go/adapters/common/request_handler.go b/vendor/github.com/opencord/voltha-go/adapters/common/request_handler.go
index 7ce4414..b18f1d1 100644
--- a/vendor/github.com/opencord/voltha-go/adapters/common/request_handler.go
+++ b/vendor/github.com/opencord/voltha-go/adapters/common/request_handler.go
@@ -99,6 +99,41 @@
}
func (rhp *RequestHandlerProxy) Reconcile_device(args []*ic.Argument) (*empty.Empty, error) {
+ if len(args) < 3 {
+ log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+
+ device := &voltha.Device{}
+ transactionID := &ic.StrType{}
+ fromTopic := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.FromTopic:
+ if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+ log.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ //Update the core reference for that device
+ rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
+
+ //Invoke the reconcile device API on the adapter
+ if err := rhp.adapter.Reconcile_device(device); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
return new(empty.Empty), nil
}
@@ -256,7 +291,7 @@
}
//Update the core reference for that device
rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
- //Invoke the Disable_device API on the adapter
+ //Invoke the delete_device API on the adapter
if err := rhp.adapter.Delete_device(device); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
@@ -269,7 +304,7 @@
func (rhp *RequestHandlerProxy) Update_flows_bulk(args []*ic.Argument) (*empty.Empty, error) {
log.Debug("Update_flows_bulk")
- if len(args) < 4 {
+ if len(args) < 5 {
log.Warn("Update_flows_bulk-invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
@@ -277,6 +312,7 @@
device := &voltha.Device{}
transactionID := &ic.StrType{}
flows := &voltha.Flows{}
+ flowMetadata := &voltha.FlowMetadata{}
groups := &voltha.FlowGroups{}
for _, arg := range args {
switch arg.Key {
@@ -295,6 +331,11 @@
log.Warnw("cannot-unmarshal-groups", log.Fields{"error": err})
return nil, err
}
+ case "flow_metadata":
+ if err := ptypes.UnmarshalAny(arg.Value, flowMetadata); err != nil {
+ log.Warnw("cannot-unmarshal-metadata", log.Fields{"error": err})
+ return nil, err
+ }
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
@@ -303,8 +344,8 @@
}
}
log.Debugw("Update_flows_bulk", log.Fields{"flows": flows, "groups": groups})
- //Invoke the adopt device on the adapter
- if err := rhp.adapter.Update_flows_bulk(device, flows, groups); err != nil {
+ //Invoke the bulk flow update API of the adapter
+ if err := rhp.adapter.Update_flows_bulk(device, flows, groups, flowMetadata); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
return new(empty.Empty), nil
@@ -312,7 +353,7 @@
func (rhp *RequestHandlerProxy) Update_flows_incrementally(args []*ic.Argument) (*empty.Empty, error) {
log.Debug("Update_flows_incrementally")
- if len(args) < 3 {
+ if len(args) < 5 {
log.Warn("Update_flows_incrementally-invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
@@ -320,6 +361,7 @@
device := &voltha.Device{}
transactionID := &ic.StrType{}
flows := &openflow_13.FlowChanges{}
+ flowMetadata := &voltha.FlowMetadata{}
groups := &openflow_13.FlowGroupChanges{}
for _, arg := range args {
switch arg.Key {
@@ -338,6 +380,11 @@
log.Warnw("cannot-unmarshal-groups", log.Fields{"error": err})
return nil, err
}
+ case "flow_metadata":
+ if err := ptypes.UnmarshalAny(arg.Value, flowMetadata); err != nil {
+ log.Warnw("cannot-unmarshal-metadata", log.Fields{"error": err})
+ return nil, err
+ }
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
@@ -346,14 +393,47 @@
}
}
log.Debugw("Update_flows_incrementally", log.Fields{"flows": flows, "groups": groups})
- //Invoke the adopt device on the adapter
- if err := rhp.adapter.Update_flows_incrementally(device, flows, groups); err != nil {
+ //Invoke the incremental flow update API of the adapter
+ if err := rhp.adapter.Update_flows_incrementally(device, flows, groups, flowMetadata); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
return new(empty.Empty), nil
}
func (rhp *RequestHandlerProxy) Update_pm_config(args []*ic.Argument) (*empty.Empty, error) {
+ log.Debug("Update_pm_config")
+ if len(args) < 2 {
+ log.Warn("Update_pm_config-invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ device := &voltha.Device{}
+ transactionID := &ic.StrType{}
+ pmConfigs := &voltha.PmConfigs{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case "pm_configs":
+ if err := ptypes.UnmarshalAny(arg.Value, pmConfigs); err != nil {
+ log.Warnw("cannot-unmarshal-pm-configs", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ log.Debugw("Update_pm_config", log.Fields{"deviceId": device.Id, "pmConfigs": pmConfigs})
+ //Invoke the pm config update API of the adapter
+ if err := rhp.adapter.Update_pm_config(device, pmConfigs); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
return new(empty.Empty), nil
}
diff --git a/vendor/github.com/opencord/voltha-go/adapters/iAdapter.go b/vendor/github.com/opencord/voltha-go/adapters/iAdapter.go
index 05df234..82fa644 100644
--- a/vendor/github.com/opencord/voltha-go/adapters/iAdapter.go
+++ b/vendor/github.com/opencord/voltha-go/adapters/iAdapter.go
@@ -35,8 +35,8 @@
Self_test_device(device *voltha.Device) error
Delete_device(device *voltha.Device) error
Get_device_details(device *voltha.Device) error
- Update_flows_bulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error
- Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges) error
+ Update_flows_bulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) error
+ Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error
Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error
Receive_packet_out(deviceId string, egress_port_no int, msg *openflow_13.OfpPacketOut) error
Suppress_alarm(filter *voltha.AlarmFilter) error
diff --git a/vendor/github.com/opencord/voltha-go/common/techprofile/tech_profile.go b/vendor/github.com/opencord/voltha-go/common/techprofile/tech_profile.go
index e41e064..2799802 100644
--- a/vendor/github.com/opencord/voltha-go/common/techprofile/tech_profile.go
+++ b/vendor/github.com/opencord/voltha-go/common/techprofile/tech_profile.go
@@ -25,7 +25,7 @@
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
"github.com/opencord/voltha-go/db/model"
- openolt_pb "github.com/opencord/voltha-protos/go/openolt"
+ tp_pb "github.com/opencord/voltha-protos/go/tech_profile"
)
// Interface to pon resource manager APIs
@@ -171,8 +171,8 @@
PbitMap string `json:"pbit_map"`
AesEncryption string `json:"aes_encryption"`
SchedulingPolicy string `json:"scheduling_policy"`
- PriorityQueue int `json:"priority_q"`
- Weight int `json:"weight"`
+ PriorityQueue uint32 `json:"priority_q"`
+ Weight uint32 `json:"weight"`
DiscardPolicy string `json:"discard_policy"`
DiscardConfig DiscardConfig `json:"discard_config"`
}
@@ -191,8 +191,8 @@
PbitMap string `json:"pbit_map"`
AesEncryption string `json:"aes_encryption"`
SchedulingPolicy string `json:"scheduling_policy"`
- PriorityQueue int `json:"priority_q"`
- Weight int `json:"weight"`
+ PriorityQueue uint32 `json:"priority_q"`
+ Weight uint32 `json:"weight"`
DiscardPolicy string `json:"discard_policy"`
DiscardConfig DiscardConfig `json:"discard_config"`
}
@@ -442,6 +442,7 @@
var dsGemPortAttributeList []GemPortAttribute
for _, pbit := range t.config.DefaultPbits {
+ log.Debugw("Creating GEM port", log.Fields{"pbit": pbit})
usGemPortAttributeList = append(usGemPortAttributeList,
GemPortAttribute{
MaxQueueSize: defaultMaxQueueSize,
@@ -498,26 +499,26 @@
var result int32 = -1
if paramType == "direction" {
- for key, val := range openolt_pb.Direction_value {
+ for key, val := range tp_pb.Direction_value {
if key == paramKey {
result = val
}
}
} else if paramType == "discard_policy" {
- for key, val := range openolt_pb.DiscardPolicy_value {
+ for key, val := range tp_pb.DiscardPolicy_value {
if key == paramKey {
result = val
}
}
} else if paramType == "sched_policy" {
- for key, val := range openolt_pb.SchedulingPolicy_value {
+ for key, val := range tp_pb.SchedulingPolicy_value {
if key == paramKey {
log.Debugw("Got value in proto", log.Fields{"key": key, "value": val})
result = val
}
}
} else if paramType == "additional_bw" {
- for key, val := range openolt_pb.AdditionalBW_value {
+ for key, val := range tp_pb.AdditionalBW_value {
if key == paramKey {
result = val
}
@@ -530,23 +531,23 @@
return result
}
-func (t *TechProfileMgr) GetUsScheduler(tpInstance *TechProfile) *openolt_pb.Scheduler {
- dir := openolt_pb.Direction(t.GetprotoBufParamValue("direction", tpInstance.UsScheduler.Direction))
+func (t *TechProfileMgr) GetUsScheduler(tpInstance *TechProfile) *tp_pb.SchedulerConfig {
+ dir := tp_pb.Direction(t.GetprotoBufParamValue("direction", tpInstance.UsScheduler.Direction))
if dir == -1 {
log.Fatal("Error in getting Proto for direction for upstream scheduler")
return nil
}
- bw := openolt_pb.AdditionalBW(t.GetprotoBufParamValue("additional_bw", tpInstance.UsScheduler.AdditionalBw))
+ bw := tp_pb.AdditionalBW(t.GetprotoBufParamValue("additional_bw", tpInstance.UsScheduler.AdditionalBw))
if bw == -1 {
log.Fatal("Error in getting Proto for bandwidth for upstream scheduler")
return nil
}
- policy := openolt_pb.SchedulingPolicy(t.GetprotoBufParamValue("sched_policy", tpInstance.UsScheduler.QSchedPolicy))
+ policy := tp_pb.SchedulingPolicy(t.GetprotoBufParamValue("sched_policy", tpInstance.UsScheduler.QSchedPolicy))
if policy == -1 {
log.Fatal("Error in getting Proto for scheduling policy for upstream scheduler")
return nil
}
- return &openolt_pb.Scheduler{
+ return &tp_pb.SchedulerConfig{
Direction: dir,
AdditionalBw: bw,
Priority: tpInstance.UsScheduler.Priority,
@@ -554,25 +555,25 @@
SchedPolicy: policy}
}
-func (t *TechProfileMgr) GetDsScheduler(tpInstance *TechProfile) *openolt_pb.Scheduler {
+func (t *TechProfileMgr) GetDsScheduler(tpInstance *TechProfile) *tp_pb.SchedulerConfig {
- dir := openolt_pb.Direction(t.GetprotoBufParamValue("direction", tpInstance.DsScheduler.Direction))
+ dir := tp_pb.Direction(t.GetprotoBufParamValue("direction", tpInstance.DsScheduler.Direction))
if dir == -1 {
log.Fatal("Error in getting Proto for direction for downstream scheduler")
return nil
}
- bw := openolt_pb.AdditionalBW(t.GetprotoBufParamValue("additional_bw", tpInstance.DsScheduler.AdditionalBw))
+ bw := tp_pb.AdditionalBW(t.GetprotoBufParamValue("additional_bw", tpInstance.DsScheduler.AdditionalBw))
if bw == -1 {
log.Fatal("Error in getting Proto for bandwidth for downstream scheduler")
return nil
}
- policy := openolt_pb.SchedulingPolicy(t.GetprotoBufParamValue("sched_policy", tpInstance.DsScheduler.QSchedPolicy))
+ policy := tp_pb.SchedulingPolicy(t.GetprotoBufParamValue("sched_policy", tpInstance.DsScheduler.QSchedPolicy))
if policy == -1 {
log.Fatal("Error in getting Proto for scheduling policy for downstream scheduler")
return nil
}
- return &openolt_pb.Scheduler{
+ return &tp_pb.SchedulerConfig{
Direction: dir,
AdditionalBw: bw,
Priority: tpInstance.DsScheduler.Priority,
@@ -580,33 +581,112 @@
SchedPolicy: policy}
}
-func (t *TechProfileMgr) GetTconts(tpInstance *TechProfile, usSched *openolt_pb.Scheduler, dsSched *openolt_pb.Scheduler) []*openolt_pb.Tcont {
- if usSched == nil {
- if usSched = t.GetUsScheduler(tpInstance); usSched == nil {
- log.Fatal("Error in getting upstream scheduler from techprofile")
- return nil
+func (t *TechProfileMgr) GetTrafficScheduler(tpInstance *TechProfile, SchedCfg *tp_pb.SchedulerConfig,
+ ShapingCfg *tp_pb.TrafficShapingInfo) *tp_pb.TrafficScheduler {
+
+ tSched := &tp_pb.TrafficScheduler{
+ Direction: SchedCfg.Direction,
+ AllocId: tpInstance.UsScheduler.AllocID,
+ TrafficShapingInfo: ShapingCfg,
+ Scheduler: SchedCfg}
+
+ return tSched
+}
+
+func (tpm *TechProfileMgr) GetTrafficQueues(tp *TechProfile, Dir tp_pb.Direction) []*tp_pb.TrafficQueue {
+
+ var encryp bool
+ if Dir == tp_pb.Direction_UPSTREAM {
+ // upstream GEM ports
+ NumGemPorts := len(tp.UpstreamGemPortAttributeList)
+ GemPorts := make([]*tp_pb.TrafficQueue, 0)
+ for Count := 0; Count < NumGemPorts; Count++ {
+ if tp.UpstreamGemPortAttributeList[Count].AesEncryption == "True" {
+ encryp = true
+ } else {
+ encryp = false
+ }
+ GemPorts = append(GemPorts, &tp_pb.TrafficQueue{
+ Direction: tp_pb.Direction(tpm.GetprotoBufParamValue("direction", tp.UsScheduler.Direction)),
+ GemportId: tp.UpstreamGemPortAttributeList[Count].GemportID,
+ PbitMap: tp.UpstreamGemPortAttributeList[Count].PbitMap,
+ AesEncryption: encryp,
+ SchedPolicy: tp_pb.SchedulingPolicy(tpm.GetprotoBufParamValue("sched_policy", tp.UpstreamGemPortAttributeList[Count].SchedulingPolicy)),
+ Priority: tp.UpstreamGemPortAttributeList[Count].PriorityQueue,
+ Weight: tp.UpstreamGemPortAttributeList[Count].Weight,
+ DiscardPolicy: tp_pb.DiscardPolicy(tpm.GetprotoBufParamValue("discard_policy", tp.UpstreamGemPortAttributeList[Count].DiscardPolicy)),
+ })
+ }
+ log.Debugw("Upstream Traffic queue list ", log.Fields{"queuelist": GemPorts})
+ return GemPorts
+ } else if Dir == tp_pb.Direction_DOWNSTREAM {
+ //downstream GEM ports
+ NumGemPorts := len(tp.DownstreamGemPortAttributeList)
+ GemPorts := make([]*tp_pb.TrafficQueue, 0)
+ for Count := 0; Count < NumGemPorts; Count++ {
+ if tp.DownstreamGemPortAttributeList[Count].AesEncryption == "True" {
+ encryp = true
+ } else {
+ encryp = false
+ }
+ GemPorts = append(GemPorts, &tp_pb.TrafficQueue{
+ Direction: tp_pb.Direction(tpm.GetprotoBufParamValue("direction", tp.DsScheduler.Direction)),
+ GemportId: tp.DownstreamGemPortAttributeList[Count].GemportID,
+ PbitMap: tp.DownstreamGemPortAttributeList[Count].PbitMap,
+ AesEncryption: encryp,
+ SchedPolicy: tp_pb.SchedulingPolicy(tpm.GetprotoBufParamValue("sched_policy", tp.DownstreamGemPortAttributeList[Count].SchedulingPolicy)),
+ Priority: tp.DownstreamGemPortAttributeList[Count].PriorityQueue,
+ Weight: tp.DownstreamGemPortAttributeList[Count].Weight,
+ DiscardPolicy: tp_pb.DiscardPolicy(tpm.GetprotoBufParamValue("discard_policy", tp.DownstreamGemPortAttributeList[Count].DiscardPolicy)),
+ })
+ }
+ log.Debugw("Downstream Traffic queue list ", log.Fields{"queuelist": GemPorts})
+ return GemPorts
+ }
+ return nil
+}
+
+func (tpm *TechProfileMgr) GetUsTrafficScheduler(tp *TechProfile) *tp_pb.TrafficScheduler {
+ UsScheduler := tpm.GetUsScheduler(tp)
+
+ return &tp_pb.TrafficScheduler{Direction: UsScheduler.Direction,
+ AllocId: tp.UsScheduler.AllocID,
+ Scheduler: UsScheduler}
+}
+
+func (t *TechProfileMgr) GetGemportIDForPbit(tp *TechProfile, Dir tp_pb.Direction, pbit uint32) uint32 {
+ /*
+ Function to get the Gemport ID mapped to a pbit.
+ */
+ if Dir == tp_pb.Direction_UPSTREAM {
+ // upstream GEM ports
+ NumGemPorts := len(tp.UpstreamGemPortAttributeList)
+ for Count := 0; Count < NumGemPorts; Count++ {
+ NumPbitMaps := len(tp.UpstreamGemPortAttributeList[Count].PbitMap)
+ for ICount := 2; ICount < NumPbitMaps; ICount++ {
+ if p, err := strconv.Atoi(string(tp.UpstreamGemPortAttributeList[Count].PbitMap[ICount])); err == nil {
+ if uint32(ICount-2) == pbit && p == 1 { // Check this p-bit is set
+ log.Debugw("Found-US-GEMport-for-Pcp", log.Fields{"pbit": pbit, "GEMport": tp.UpstreamGemPortAttributeList[Count].GemportID})
+ return tp.UpstreamGemPortAttributeList[Count].GemportID
+ }
+ }
+ }
+ }
+ } else if Dir == tp_pb.Direction_DOWNSTREAM {
+ //downstream GEM ports
+ NumGemPorts := len(tp.DownstreamGemPortAttributeList)
+ for Count := 0; Count < NumGemPorts; Count++ {
+ NumPbitMaps := len(tp.DownstreamGemPortAttributeList[Count].PbitMap)
+ for ICount := 2; ICount < NumPbitMaps; ICount++ {
+ if p, err := strconv.Atoi(string(tp.DownstreamGemPortAttributeList[Count].PbitMap[ICount])); err == nil {
+ if uint32(ICount-2) == pbit && p == 1 { // Check this p-bit is set
+ log.Debugw("Found-DS-GEMport-for-Pcp", log.Fields{"pbit": pbit, "GEMport": tp.DownstreamGemPortAttributeList[Count].GemportID})
+ return tp.DownstreamGemPortAttributeList[Count].GemportID
+ }
+ }
+ }
}
}
- if dsSched == nil {
- if dsSched = t.GetDsScheduler(tpInstance); dsSched == nil {
- log.Fatal("Error in getting downstream scheduler from techprofile")
- return nil
- }
- }
- tconts := []*openolt_pb.Tcont{}
- // upstream scheduler
- tcont_us := &openolt_pb.Tcont{
- Direction: usSched.Direction,
- AllocId: tpInstance.UsScheduler.AllocID,
- Scheduler: usSched} /*TrafficShapingInfo: ? */
- tconts = append(tconts, tcont_us)
-
- // downstream scheduler
- tcont_ds := &openolt_pb.Tcont{
- Direction: dsSched.Direction,
- AllocId: tpInstance.DsScheduler.AllocID,
- Scheduler: dsSched}
-
- tconts = append(tconts, tcont_ds)
- return tconts
+ log.Errorw("No-GemportId-Found-For-Pcp", log.Fields{"pcpVlan": pbit})
+ return 0
}
diff --git a/vendor/github.com/opencord/voltha-go/db/kvstore/client.go b/vendor/github.com/opencord/voltha-go/db/kvstore/client.go
index 34ab711..937eefe 100644
--- a/vendor/github.com/opencord/voltha-go/db/kvstore/client.go
+++ b/vendor/github.com/opencord/voltha-go/db/kvstore/client.go
@@ -38,6 +38,7 @@
type KVPair struct {
Key string
Value interface{}
+ Version int64
Session string
Lease int64
}
@@ -47,12 +48,13 @@
}
// NewKVPair creates a new KVPair object
-func NewKVPair(key string, value interface{}, session string, lease int64) *KVPair {
+func NewKVPair(key string, value interface{}, session string, lease int64, version int64) *KVPair {
kv := new(KVPair)
kv.Key = key
kv.Value = value
kv.Session = session
kv.Lease = lease
+ kv.Version = version
return kv
}
@@ -61,14 +63,16 @@
EventType int
Key interface{}
Value interface{}
+ Version int64
}
// NewEvent creates a new Event object
-func NewEvent(eventType int, key interface{}, value interface{}) *Event {
+func NewEvent(eventType int, key interface{}, value interface{}, version int64) *Event {
evnt := new(Event)
evnt.EventType = eventType
evnt.Key = key
evnt.Value = value
+ evnt.Version = version
return evnt
}
diff --git a/vendor/github.com/opencord/voltha-go/db/kvstore/consulclient.go b/vendor/github.com/opencord/voltha-go/db/kvstore/consulclient.go
index 2d02342..4b25b5f 100644
--- a/vendor/github.com/opencord/voltha-go/db/kvstore/consulclient.go
+++ b/vendor/github.com/opencord/voltha-go/db/kvstore/consulclient.go
@@ -79,7 +79,7 @@
}
m := make(map[string]*KVPair)
for _, kvp := range kvps {
- m[string(kvp.Key)] = NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0)
+ m[string(kvp.Key)] = NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0, -1)
}
return m, nil
}
@@ -100,7 +100,7 @@
return nil, err
}
if kvp != nil {
- return NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0), nil
+ return NewKVPair(string(kvp.Key), kvp.Value, string(kvp.Session), 0, -1), nil
}
return nil, nil
@@ -455,7 +455,7 @@
default:
if err != nil {
log.Warnw("error-from-watch", log.Fields{"error": err})
- ch <- NewEvent(CONNECTIONDOWN, key, []byte(""))
+ ch <- NewEvent(CONNECTIONDOWN, key, []byte(""), -1)
} else {
log.Debugw("index-state", log.Fields{"lastindex": lastIndex, "newindex": meta.LastIndex, "key": key})
}
@@ -469,12 +469,12 @@
} else {
log.Debugw("update-received", log.Fields{"pair": pair})
if pair == nil {
- ch <- NewEvent(DELETE, key, []byte(""))
+ ch <- NewEvent(DELETE, key, []byte(""), -1)
} else if !c.isKVEqual(pair, previousKVPair) {
// Push the change onto the channel if the data has changed
// For now just assume it's a PUT change
log.Debugw("pair-details", log.Fields{"session": pair.Session, "key": pair.Key, "value": pair.Value})
- ch <- NewEvent(PUT, pair.Key, pair.Value)
+ ch <- NewEvent(PUT, pair.Key, pair.Value, -1)
}
previousKVPair = pair
lastIndex = meta.LastIndex
diff --git a/vendor/github.com/opencord/voltha-go/db/kvstore/etcdclient.go b/vendor/github.com/opencord/voltha-go/db/kvstore/etcdclient.go
index 6935296..7f6940a 100644
--- a/vendor/github.com/opencord/voltha-go/db/kvstore/etcdclient.go
+++ b/vendor/github.com/opencord/voltha-go/db/kvstore/etcdclient.go
@@ -74,7 +74,7 @@
}
m := make(map[string]*KVPair)
for _, ev := range resp.Kvs {
- m[string(ev.Key)] = NewKVPair(string(ev.Key), ev.Value, "", ev.Lease)
+ m[string(ev.Key)] = NewKVPair(string(ev.Key), ev.Value, "", ev.Lease, ev.Version)
}
return m, nil
}
@@ -94,7 +94,7 @@
}
for _, ev := range resp.Kvs {
// Only one value is returned
- return NewKVPair(string(ev.Key), ev.Value, "", ev.Lease), nil
+ return NewKVPair(string(ev.Key), ev.Value, "", ev.Lease, ev.Version), nil
}
return nil, nil
}
@@ -311,7 +311,9 @@
channelMaps := c.addChannelMap(key, channelMap)
- log.Debugw("watched-channels", log.Fields{"channels": channelMaps})
+ // Changing the log field (from channelMaps) as the underlying logger cannot format the map of channels into a
+ // json format.
+ log.Debugw("watched-channels", log.Fields{"len": len(channelMaps)})
// Launch a go routine to listen for updates
go c.listenForKeyChange(channel, ch)
@@ -399,7 +401,7 @@
for resp := range channel {
for _, ev := range resp.Events {
//log.Debugf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
- ch <- NewEvent(getEventType(ev), ev.Kv.Key, ev.Kv.Value)
+ ch <- NewEvent(getEventType(ev), ev.Kv.Key, ev.Kv.Value, ev.Kv.Version)
}
}
log.Debug("stop-listening-on-channel ...")
diff --git a/vendor/github.com/opencord/voltha-go/db/model/branch.go b/vendor/github.com/opencord/voltha-go/db/model/branch.go
index 5502e63..3389291 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/branch.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/branch.go
@@ -26,7 +26,7 @@
// Branch structure is used to classify a collection of transaction based revisions
type Branch struct {
- sync.RWMutex
+ mutex sync.RWMutex
Node *node
Txid string
Origin Revision
@@ -85,8 +85,8 @@
// SetLatest assigns the latest revision for this branch
func (b *Branch) SetLatest(latest Revision) {
- b.Lock()
- defer b.Unlock()
+ b.mutex.Lock()
+ defer b.mutex.Unlock()
if b.Latest != nil {
log.Debugw("updating-latest-revision", log.Fields{"current": b.Latest.GetHash(), "new": latest.GetHash()})
@@ -119,16 +119,16 @@
// GetLatest retrieves the latest revision of the branch
func (b *Branch) GetLatest() Revision {
- b.Lock()
- defer b.Unlock()
+ b.mutex.RLock()
+ defer b.mutex.RUnlock()
return b.Latest
}
// GetOrigin retrieves the original revision of the branch
func (b *Branch) GetOrigin() Revision {
- b.Lock()
- defer b.Unlock()
+ b.mutex.RLock()
+ defer b.mutex.RUnlock()
return b.Origin
}
@@ -142,8 +142,8 @@
// GetRevision pulls a revision entry at the specified hash
func (b *Branch) GetRevision(hash string) Revision {
- b.Lock()
- defer b.Unlock()
+ b.mutex.RLock()
+ defer b.mutex.RUnlock()
if revision, ok := b.Revisions[hash]; ok {
return revision
@@ -154,16 +154,16 @@
// SetRevision updates a revision entry at the specified hash
func (b *Branch) SetRevision(hash string, revision Revision) {
- b.Lock()
- defer b.Unlock()
+ b.mutex.Lock()
+ defer b.mutex.Unlock()
b.Revisions[hash] = revision
}
// DeleteRevision removes a revision with the specified hash
func (b *Branch) DeleteRevision(hash string) {
- b.Lock()
- defer b.Unlock()
+ b.mutex.Lock()
+ defer b.mutex.Unlock()
if _, ok := b.Revisions[hash]; ok {
delete(b.Revisions, hash)
diff --git a/vendor/github.com/opencord/voltha-go/db/model/child_type.go b/vendor/github.com/opencord/voltha-go/db/model/child_type.go
index da6f688..250de9c 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/child_type.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/child_type.go
@@ -27,18 +27,50 @@
"sync"
)
-type singletonChildTypeCache struct {
+type childTypesSingleton struct {
+ mutex sync.RWMutex
Cache map[interface{}]map[string]*ChildType
}
-var instanceChildTypeCache *singletonChildTypeCache
-var onceChildTypeCache sync.Once
+var instanceChildTypes *childTypesSingleton
+var onceChildTypes sync.Once
-func getChildTypeCache() *singletonChildTypeCache {
- onceChildTypeCache.Do(func() {
- instanceChildTypeCache = &singletonChildTypeCache{}
+func getChildTypes() *childTypesSingleton {
+ onceChildTypes.Do(func() {
+ instanceChildTypes = &childTypesSingleton{}
})
- return instanceChildTypeCache
+ return instanceChildTypes
+}
+
+func (s *childTypesSingleton) GetCache() map[interface{}]map[string]*ChildType {
+ s.mutex.RLock()
+ defer s.mutex.RUnlock()
+ return s.Cache
+}
+
+func (s *childTypesSingleton) SetCache(cache map[interface{}]map[string]*ChildType) {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ s.Cache = cache
+}
+
+func (s *childTypesSingleton) GetCacheEntry(key interface{}) (map[string]*ChildType, bool) {
+ s.mutex.RLock()
+ defer s.mutex.RUnlock()
+ childTypeMap, exists := s.Cache[key]
+ return childTypeMap, exists
+}
+
+func (s *childTypesSingleton) SetCacheEntry(key interface{}, value map[string]*ChildType) {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ s.Cache[key] = value
+}
+
+func (s *childTypesSingleton) ResetCache() {
+ s.mutex.Lock()
+ defer s.mutex.Unlock()
+ s.Cache = make(map[interface{}]map[string]*ChildType)
}
// ChildType structure contains construct details of an object
@@ -58,12 +90,12 @@
var names map[string]*ChildType
var namesExist bool
- if getChildTypeCache().Cache == nil {
- getChildTypeCache().Cache = make(map[interface{}]map[string]*ChildType)
+ if getChildTypes().Cache == nil {
+ getChildTypes().Cache = make(map[interface{}]map[string]*ChildType)
}
msgType := reflect.TypeOf(cls)
- inst := getChildTypeCache()
+ inst := getChildTypes()
if names, namesExist = inst.Cache[msgType.String()]; !namesExist {
names = make(map[string]*ChildType)
@@ -127,9 +159,10 @@
}
}
- getChildTypeCache().Cache[msgType.String()] = names
+ getChildTypes().Cache[msgType.String()] = names
} else {
- log.Debugf("Cache entry for %s: %+v", msgType.String(), inst.Cache[msgType.String()])
+ entry, _ := inst.GetCacheEntry(msgType.String())
+ log.Debugf("Cache entry for %s: %+v", msgType.String(), entry)
}
return names
diff --git a/vendor/github.com/opencord/voltha-go/db/model/model.go b/vendor/github.com/opencord/voltha-go/db/model/model.go
index 18ff905..3446303 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/model.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/model.go
@@ -23,3 +23,15 @@
log.AddPackage(log.JSON, log.InfoLevel, log.Fields{"instanceId": "DB_MODEL"})
defer log.CleanUp()
}
+
+const (
+ // period to determine when data requires a refresh (in milliseconds)
+ // TODO: make this configurable?
+ DataRefreshPeriod int64 = 5000
+
+ // Attribute used to store a timestamp in the context object
+ RequestTimestamp = "request-timestamp"
+
+ // Time limit for a KV path reservation (in seconds)
+ ReservationTTL int64 = 180
+)
diff --git a/vendor/github.com/opencord/voltha-go/db/model/node.go b/vendor/github.com/opencord/voltha-go/db/model/node.go
index 207df09..fcd3b5f 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/node.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/node.go
@@ -20,6 +20,7 @@
// TODO: proper logging
import (
+ "context"
"fmt"
"github.com/golang/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
@@ -32,10 +33,6 @@
// When a branch has no transaction id, everything gets stored in NONE
const (
NONE string = "none"
-
- // period to determine when data requires a refresh (in seconds)
- // TODO: make this configurable?
- DATA_REFRESH_PERIOD int64 = 5000
)
// Node interface is an abstraction of the node data structure
@@ -43,10 +40,14 @@
MakeLatest(branch *Branch, revision Revision, changeAnnouncement []ChangeTuple)
// CRUD functions
- Add(path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision
- Get(path string, hash string, depth int, deep bool, txid string) interface{}
- Update(path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision
- Remove(path string, txid string, makeBranch MakeBranchFunction) Revision
+ Add(ctx context.Context, path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision
+ Get(ctx context.Context, path string, hash string, depth int, deep bool, txid string) interface{}
+ List(ctx context.Context, path string, hash string, depth int, deep bool, txid string) interface{}
+ Update(ctx context.Context, path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision
+ Remove(ctx context.Context, path string, txid string, makeBranch MakeBranchFunction) Revision
+ CreateProxy(ctx context.Context, path string, exclusive bool) *Proxy
+
+ GetProxy() *Proxy
MakeBranch(txid string) *Branch
DeleteBranch(txid string)
@@ -55,16 +56,12 @@
MakeTxBranch() string
DeleteTxBranch(txid string)
FoldTxBranch(txid string)
-
- CreateProxy(path string, exclusive bool) *Proxy
- GetProxy() *Proxy
}
type node struct {
- mutex sync.RWMutex
- Root *root
- Type interface{}
-
+ mutex sync.RWMutex
+ Root *root
+ Type interface{}
Branches map[string]*Branch
Tags map[string]Revision
Proxy *Proxy
@@ -133,7 +130,7 @@
log.Debugw("saving-latest-data", log.Fields{"hash": revision.GetHash(), "data": revision.GetData()})
// Tag a timestamp to that revision
revision.SetLastUpdate()
- GetRevCache().Cache.Store(revision.GetName(), revision)
+ GetRevCache().Set(revision.GetName(), revision)
}
branch.SetLatest(revision)
}
@@ -148,13 +145,13 @@
for _, change := range changeAnnouncement {
log.Debugw("adding-callback",
log.Fields{
- "callbacks": n.Proxy.getCallbacks(change.Type),
+ "callbacks": n.GetProxy().getCallbacks(change.Type),
"type": change.Type,
"previousData": change.PreviousData,
"latestData": change.LatestData,
})
n.Root.AddCallback(
- n.Proxy.InvokeCallbacks,
+ n.GetProxy().InvokeCallbacks,
change.Type,
true,
change.PreviousData,
@@ -253,7 +250,7 @@
}
// Get retrieves the data from a node tree that resides at the specified path
-func (n *node) List(path string, hash string, depth int, deep bool, txid string) interface{} {
+func (n *node) List(ctx context.Context, path string, hash string, depth int, deep bool, txid string) interface{} {
n.mutex.Lock()
defer n.mutex.Unlock()
@@ -281,7 +278,7 @@
var result interface{}
var prList []interface{}
- if pr := rev.LoadFromPersistence(path, txid, nil); pr != nil {
+ if pr := rev.LoadFromPersistence(ctx, path, txid, nil); pr != nil {
for _, revEntry := range pr {
prList = append(prList, revEntry.GetData())
}
@@ -292,7 +289,7 @@
}
// Get retrieves the data from a node tree that resides at the specified path
-func (n *node) Get(path string, hash string, depth int, reconcile bool, txid string) interface{} {
+func (n *node) Get(ctx context.Context, path string, hash string, depth int, reconcile bool, txid string) interface{} {
n.mutex.Lock()
defer n.mutex.Unlock()
@@ -323,9 +320,9 @@
// 1. Start with the cache which stores revisions by watch names
// 2. Then look in the revision tree, especially if it's a sub-path such as /devices/1234/flows
// 3. Move on to the KV store if that path cannot be found or if the entry has expired
- if entry, exists := GetRevCache().Cache.Load(path); exists && entry.(Revision) != nil {
+ if entry, exists := GetRevCache().Get(path); exists && entry.(Revision) != nil {
entryAge := time.Now().Sub(entry.(Revision).GetLastUpdate()).Nanoseconds() / int64(time.Millisecond)
- if entryAge < DATA_REFRESH_PERIOD {
+ if entryAge < DataRefreshPeriod {
log.Debugw("using-cache-entry", log.Fields{
"path": path,
"hash": hash,
@@ -335,7 +332,7 @@
} else {
log.Debugw("cache-entry-expired", log.Fields{"path": path, "hash": hash, "age": entryAge})
}
- } else if result = n.getPath(rev.GetBranch().GetLatest(), path, depth); result != nil && reflect.ValueOf(result).IsValid() && !reflect.ValueOf(result).IsNil() {
+ } else if result = n.getPath(ctx, rev.GetBranch().GetLatest(), path, depth); result != nil && reflect.ValueOf(result).IsValid() && !reflect.ValueOf(result).IsNil() {
log.Debugw("using-rev-tree-entry", log.Fields{"path": path, "hash": hash, "depth": depth, "reconcile": reconcile, "txid": txid})
return result
} else {
@@ -357,7 +354,7 @@
// If we got to this point, we are either trying to reconcile with the db
// or we simply failed at getting information from memory
if n.Root.KvStore != nil {
- if pr := rev.LoadFromPersistence(path, txid, nil); pr != nil && len(pr) > 0 {
+ if pr := rev.LoadFromPersistence(ctx, path, txid, nil); pr != nil && len(pr) > 0 {
// Did we receive a single or multiple revisions?
if len(pr) > 1 {
var revs []interface{}
@@ -375,7 +372,7 @@
}
//getPath traverses the specified path and retrieves the data associated to it
-func (n *node) getPath(rev Revision, path string, depth int) interface{} {
+func (n *node) getPath(ctx context.Context, rev Revision, path string, depth int) interface{} {
if path == "" {
return n.getData(rev, depth)
}
@@ -406,7 +403,7 @@
return nil
} else {
childNode := childRev.GetNode()
- return childNode.getPath(childRev, path, depth)
+ return childNode.getPath(ctx, childRev, path, depth)
}
} else {
var response []interface{}
@@ -430,11 +427,13 @@
}
return response
}
+ } else if children := rev.GetChildren(name); children != nil && len(children) > 0 {
+ childRev := children[0]
+ childNode := childRev.GetNode()
+ return childNode.getPath(ctx, childRev, path, depth)
}
- childRev := rev.GetChildren(name)[0]
- childNode := childRev.GetNode()
- return childNode.getPath(childRev, path, depth)
+ return nil
}
// getData retrieves the data from a node revision
@@ -454,7 +453,7 @@
}
// Update changes the content of a node at the specified path with the provided data
-func (n *node) Update(path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision {
+func (n *node) Update(ctx context.Context, path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision {
n.mutex.Lock()
defer n.mutex.Unlock()
@@ -475,7 +474,7 @@
log.Debugf("Branch data : %+v, Passed data: %+v", branch.GetLatest().GetData(), data)
}
if path == "" {
- return n.doUpdate(branch, data, strict)
+ return n.doUpdate(ctx, branch, data, strict)
}
rev := branch.GetLatest()
@@ -493,7 +492,7 @@
var children []Revision
if field == nil {
- return n.doUpdate(branch, data, strict)
+ return n.doUpdate(ctx, branch, data, strict)
}
if field.IsContainer {
@@ -523,11 +522,11 @@
// Save proxy in child node to ensure callbacks are called later on
// only assign in cases of non sub-folder proxies, i.e. "/"
- if childNode.Proxy == nil && n.Proxy != nil && n.Proxy.getFullPath() == "" {
+ if childNode.Proxy == nil && n.Proxy != nil && n.GetProxy().getFullPath() == "" {
childNode.Proxy = n.Proxy
}
- newChildRev := childNode.Update(path, data, strict, txid, makeBranch)
+ newChildRev := childNode.Update(ctx, path, data, strict, txid, makeBranch)
if newChildRev.GetHash() == childRev.GetHash() {
if newChildRev != childRev {
@@ -559,7 +558,7 @@
children = append(children, newChildRev)
}
- updatedRev := rev.UpdateChildren(name, children, branch)
+ updatedRev := rev.UpdateChildren(ctx, name, children, branch)
n.makeLatest(branch, updatedRev, nil)
updatedRev.ChildDrop(name, childRev.GetHash())
@@ -572,12 +571,12 @@
} else {
childRev := rev.GetChildren(name)[0]
childNode := childRev.GetNode()
- newChildRev := childNode.Update(path, data, strict, txid, makeBranch)
+ newChildRev := childNode.Update(ctx, path, data, strict, txid, makeBranch)
branch.LatestLock.Lock()
defer branch.LatestLock.Unlock()
- updatedRev := rev.UpdateChildren(name, []Revision{newChildRev}, branch)
+ updatedRev := rev.UpdateChildren(ctx, name, []Revision{newChildRev}, branch)
n.makeLatest(branch, updatedRev, nil)
updatedRev.ChildDrop(name, childRev.GetHash())
@@ -588,7 +587,7 @@
return nil
}
-func (n *node) doUpdate(branch *Branch, data interface{}, strict bool) Revision {
+func (n *node) doUpdate(ctx context.Context, branch *Branch, data interface{}, strict bool) Revision {
log.Debugw("comparing-types", log.Fields{"expected": reflect.ValueOf(n.Type).Type(), "actual": reflect.TypeOf(data)})
if reflect.TypeOf(data) != reflect.ValueOf(n.Type).Type() {
@@ -613,7 +612,7 @@
log.Debugf("checking access violations")
}
- rev := branch.GetLatest().UpdateData(data, branch)
+ rev := branch.GetLatest().UpdateData(ctx, data, branch)
changes := []ChangeTuple{{POST_UPDATE, branch.GetLatest().GetData(), rev.GetData()}}
n.makeLatest(branch, rev, changes)
@@ -623,7 +622,7 @@
}
// Add inserts a new node at the specified path with the provided data
-func (n *node) Add(path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision {
+func (n *node) Add(ctx context.Context, path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision {
n.mutex.Lock()
defer n.mutex.Unlock()
@@ -688,7 +687,7 @@
children = append(children, childRev)
- updatedRev := rev.UpdateChildren(name, children, branch)
+ updatedRev := rev.UpdateChildren(ctx, name, children, branch)
changes := []ChangeTuple{{POST_ADD, nil, childRev.GetData()}}
childRev.SetupWatch(childRev.GetName())
@@ -718,7 +717,7 @@
}
childNode := childRev.GetNode()
- newChildRev := childNode.Add(path, data, txid, makeBranch)
+ newChildRev := childNode.Add(ctx, path, data, txid, makeBranch)
// Prefix the hash with the data type (e.g. devices, logical_devices, adapters)
newChildRev.SetName(name + "/" + keyValue.(string))
@@ -732,7 +731,7 @@
children = append(children, newChildRev)
}
- updatedRev := rev.UpdateChildren(name, children, branch)
+ updatedRev := rev.UpdateChildren(ctx, name, children, branch)
n.makeLatest(branch, updatedRev, nil)
updatedRev.ChildDrop(name, childRev.GetHash())
@@ -749,7 +748,7 @@
}
// Remove eliminates a node at the specified path
-func (n *node) Remove(path string, txid string, makeBranch MakeBranchFunction) Revision {
+func (n *node) Remove(ctx context.Context, path string, txid string, makeBranch MakeBranchFunction) Revision {
n.mutex.Lock()
defer n.mutex.Unlock()
@@ -805,7 +804,7 @@
if childNode.Proxy == nil {
childNode.Proxy = n.Proxy
}
- newChildRev := childNode.Remove(path, txid, makeBranch)
+ newChildRev := childNode.Remove(ctx, path, txid, makeBranch)
branch.LatestLock.Lock()
defer branch.LatestLock.Unlock()
@@ -833,7 +832,7 @@
}
childRev.StorageDrop(txid, true)
- GetRevCache().Cache.Delete(childRev.GetName())
+ GetRevCache().Delete(childRev.GetName())
branch.LatestLock.Lock()
defer branch.LatestLock.Unlock()
@@ -950,11 +949,11 @@
// ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ node Proxy ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
// CreateProxy returns a reference to a sub-tree of the data model
-func (n *node) CreateProxy(path string, exclusive bool) *Proxy {
- return n.createProxy(path, path, n, exclusive)
+func (n *node) CreateProxy(ctx context.Context, path string, exclusive bool) *Proxy {
+ return n.createProxy(ctx, path, path, n, exclusive)
}
-func (n *node) createProxy(path string, fullPath string, parentNode *node, exclusive bool) *Proxy {
+func (n *node) createProxy(ctx context.Context, path string, fullPath string, parentNode *node, exclusive bool) *Proxy {
log.Debugw("node-create-proxy", log.Fields{
"node-type": reflect.ValueOf(n.Type).Type(),
"parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
@@ -973,7 +972,6 @@
partition := strings.SplitN(path, "/", 2)
name := partition[0]
var nodeType interface{}
- // Node type is chosen depending on if we have reached the end of path or not
if len(partition) < 2 {
path = ""
nodeType = n.Type
@@ -1020,8 +1018,6 @@
children = make([]Revision, len(rev.GetChildren(name)))
copy(children, rev.GetChildren(name))
- // Try to find a matching revision in memory
- // If not found try the db
var childRev Revision
if _, childRev = n.findRevByKey(children, field.Key, keyValue); childRev != nil {
log.Debugw("found-revision-matching-key-in-memory", log.Fields{
@@ -1030,7 +1026,7 @@
"fullPath": fullPath,
"name": name,
})
- } else if revs := n.GetBranch(NONE).GetLatest().LoadFromPersistence(fullPath, "", nil); revs != nil && len(revs) > 0 {
+ } else if revs := n.GetBranch(NONE).GetLatest().LoadFromPersistence(ctx, fullPath, "", nil); revs != nil && len(revs) > 0 {
log.Debugw("found-revision-matching-key-in-db", log.Fields{
"node-type": reflect.ValueOf(n.Type).Type(),
"parent-node-type": reflect.ValueOf(parentNode.Type).Type(),
@@ -1048,7 +1044,7 @@
}
if childRev != nil {
childNode := childRev.GetNode()
- return childNode.createProxy(path, fullPath, n, exclusive)
+ return childNode.createProxy(ctx, path, fullPath, n, exclusive)
}
} else {
log.Errorw("cannot-access-index-of-empty-container", log.Fields{
@@ -1067,7 +1063,7 @@
})
childRev := rev.GetChildren(name)[0]
childNode := childRev.GetNode()
- return childNode.createProxy(path, fullPath, n, exclusive)
+ return childNode.createProxy(ctx, path, fullPath, n, exclusive)
}
} else {
log.Debugw("field-object-is-nil", log.Fields{
@@ -1116,12 +1112,12 @@
n.Proxy = NewProxy(r, n, parentNode, path, fullPath, exclusive)
} else {
log.Debugw("node-has-existing-proxy", log.Fields{
- "node-type": reflect.ValueOf(n.Proxy.Node.Type).Type(),
- "parent-node-type": reflect.ValueOf(n.Proxy.ParentNode.Type).Type(),
- "path": n.Proxy.Path,
- "fullPath": n.Proxy.FullPath,
+ "node-type": reflect.ValueOf(n.GetProxy().Node.Type).Type(),
+ "parent-node-type": reflect.ValueOf(n.GetProxy().ParentNode.Type).Type(),
+ "path": n.GetProxy().Path,
+ "fullPath": n.GetProxy().FullPath,
})
- if n.Proxy.Exclusive {
+ if n.GetProxy().Exclusive {
log.Error("node is already owned exclusively")
}
}
@@ -1160,3 +1156,6 @@
func (n *node) GetRoot() *root {
return n.Root
}
+func (n *node) SetRoot(root *root) {
+ n.Root = root
+}
diff --git a/vendor/github.com/opencord/voltha-go/db/model/non_persisted_revision.go b/vendor/github.com/opencord/voltha-go/db/model/non_persisted_revision.go
index 297a740..6900c5d 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/non_persisted_revision.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/non_persisted_revision.go
@@ -17,6 +17,7 @@
import (
"bytes"
+ "context"
"crypto/md5"
"fmt"
"github.com/golang/protobuf/proto"
@@ -35,6 +36,16 @@
Cache sync.Map
}
+func (s *revCacheSingleton) Get(path string) (interface{}, bool) {
+ return s.Cache.Load(path)
+}
+func (s *revCacheSingleton) Set(path string, value interface{}) {
+ s.Cache.Store(path, value)
+}
+func (s *revCacheSingleton) Delete(path string) {
+ s.Cache.Delete(path)
+}
+
var revCacheInstance *revCacheSingleton
var revCacheOnce sync.Once
@@ -269,10 +280,17 @@
}
// UpdateData will refresh the data content of the revision
-func (npr *NonPersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
+func (npr *NonPersistedRevision) UpdateData(ctx context.Context, data interface{}, branch *Branch) Revision {
npr.mutex.Lock()
defer npr.mutex.Unlock()
+ if ctx != nil {
+ if ctxTS, ok := ctx.Value(RequestTimestamp).(int64); ok && npr.lastUpdate.UnixNano() > ctxTS {
+ log.Warnw("data-is-older-than-current", log.Fields{"ctx-ts": ctxTS, "rev-ts": npr.lastUpdate.UnixNano()})
+ return npr
+ }
+ }
+
// Do not update the revision if data is the same
if npr.Config.Data != nil && npr.Config.hashData(npr.Root, data) == npr.Config.Hash {
log.Debugw("stored-data-matches-latest", log.Fields{"stored": npr.Config.Data, "provided": data})
@@ -300,7 +318,7 @@
// UpdateChildren will refresh the list of children with the provided ones
// It will carefully go through the list and ensure that no child is lost
-func (npr *NonPersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
+func (npr *NonPersistedRevision) UpdateChildren(ctx context.Context, name string, children []Revision, branch *Branch) Revision {
npr.mutex.Lock()
defer npr.mutex.Unlock()
@@ -358,7 +376,7 @@
})
// replace entry
- newChild.GetNode().Root = existingChildren[nameIndex].GetNode().Root
+ newChild.GetNode().SetRoot(existingChildren[nameIndex].GetNode().GetRoot())
updatedChildren = append(updatedChildren, newChild)
} else {
log.Debugw("keeping-existing-child", log.Fields{
@@ -461,7 +479,7 @@
return npr.lastUpdate
}
-func (npr *NonPersistedRevision) LoadFromPersistence(path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
+func (npr *NonPersistedRevision) LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
// stub... required by interface
return nil
}
@@ -473,3 +491,7 @@
func (npr *NonPersistedRevision) StorageDrop(txid string, includeConfig bool) {
// stub ... required by interface
}
+
+func (npr *NonPersistedRevision) getVersion() int64 {
+ return -1
+}
diff --git a/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go b/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go
index 2ab91b7..d2d228f 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go
@@ -19,7 +19,9 @@
import (
"bytes"
"compress/gzip"
+ "context"
"github.com/golang/protobuf/proto"
+ "github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
"reflect"
@@ -32,11 +34,13 @@
Revision
Compress bool
- events chan *kvstore.Event
- kvStore *Backend
- mutex sync.RWMutex
- isStored bool
- isWatched bool
+ events chan *kvstore.Event
+ kvStore *Backend
+ mutex sync.RWMutex
+ versionMutex sync.RWMutex
+ Version int64
+ isStored bool
+ isWatched bool
}
type watchCache struct {
@@ -57,10 +61,23 @@
func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
pr := &PersistedRevision{}
pr.kvStore = branch.Node.GetRoot().KvStore
+ pr.Version = 1
pr.Revision = NewNonPersistedRevision(nil, branch, data, children)
return pr
}
+func (pr *PersistedRevision) getVersion() int64 {
+ pr.versionMutex.RLock()
+ defer pr.versionMutex.RUnlock()
+ return pr.Version
+}
+
+func (pr *PersistedRevision) setVersion(version int64) {
+ pr.versionMutex.Lock()
+ defer pr.versionMutex.Unlock()
+ pr.Version = version
+}
+
// Finalize is responsible of saving the revision in the persistent storage
func (pr *PersistedRevision) Finalize(skipOnExist bool) {
pr.store(skipOnExist)
@@ -73,8 +90,12 @@
log.Debugw("ready-to-store-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
- if blob, err := proto.Marshal(pr.GetConfig().Data.(proto.Message)); err != nil {
- // TODO report error
+ // clone the revision data to avoid any race conditions with processes
+ // accessing the same data
+ cloned := proto.Clone(pr.GetConfig().Data.(proto.Message))
+
+ if blob, err := proto.Marshal(cloned); err != nil {
+ log.Errorw("problem-to-marshal", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetData()})
} else {
if pr.Compress {
var b bytes.Buffer
@@ -84,10 +105,11 @@
blob = b.Bytes()
}
+ GetRevCache().Set(pr.GetName(), pr)
if err := pr.kvStore.Put(pr.GetName(), blob); err != nil {
log.Warnw("problem-storing-revision", log.Fields{"error": err, "hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
} else {
- log.Debugw("storing-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data})
+ log.Debugw("storing-revision", log.Fields{"hash": pr.GetHash(), "name": pr.GetName(), "data": pr.GetConfig().Data, "version": pr.getVersion()})
pr.isStored = true
}
}
@@ -145,6 +167,20 @@
case kvstore.PUT:
log.Debugw("update-in-memory", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
+ if latestRev.getVersion() >= event.Version {
+ log.Debugw("skipping-matching-or-older-revision", log.Fields{
+ "watch": latestRev.GetName(),
+ "watch-version": event.Version,
+ "latest-version": latestRev.getVersion(),
+ })
+ continue
+ } else {
+ log.Debugw("watch-revision-is-newer", log.Fields{
+ "watch": latestRev.GetName(),
+ "watch-version": event.Version,
+ "latest-version": latestRev.getVersion(),
+ })
+ }
data := reflect.New(reflect.TypeOf(latestRev.GetData()).Elem())
@@ -154,7 +190,6 @@
log.Debugw("un-marshaled-watch-data", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName(), "data": data.Interface()})
var pathLock string
- var pac *proxyAccessControl
var blobs map[string]*kvstore.KVPair
// The watch reported new persistence data.
@@ -166,6 +201,7 @@
Value: event.Value,
Session: "",
Lease: 0,
+ Version: event.Version,
}
if latestRev.GetNode().GetProxy() != nil {
@@ -173,82 +209,35 @@
// If a proxy exists for this revision, use it to lock access to the path
// and prevent simultaneous updates to the object in memory
//
- pathLock, _ = latestRev.GetNode().GetProxy().parseForControlledPath(latestRev.GetNode().GetProxy().getFullPath())
//If the proxy already has a request in progress, then there is no need to process the watch
- log.Debugw("checking-if-path-is-locked", log.Fields{"key": latestRev.GetHash(), "pathLock": pathLock})
- if PAC().IsReserved(pathLock) {
+ if latestRev.GetNode().GetProxy().GetOperation() != PROXY_NONE {
log.Debugw("operation-in-progress", log.Fields{
"key": latestRev.GetHash(),
"path": latestRev.GetNode().GetProxy().getFullPath(),
- "operation": latestRev.GetNode().GetProxy().Operation.String(),
+ "operation": latestRev.GetNode().GetProxy().operation.String(),
})
-
- //continue
-
- // Identify the operation type and determine if the watch event should be applied or not.
- switch latestRev.GetNode().GetProxy().Operation {
- case PROXY_REMOVE:
- fallthrough
-
- case PROXY_ADD:
- fallthrough
-
- case PROXY_UPDATE:
- // We will need to reload once the operation completes.
- // Therefore, the data of the current event is most likely out-dated
- // and should be ignored
- log.Debugw("ignore-watch-event", log.Fields{
- "key": latestRev.GetHash(),
- "path": latestRev.GetNode().GetProxy().getFullPath(),
- "operation": latestRev.GetNode().GetProxy().Operation.String(),
- })
-
- continue
-
- case PROXY_CREATE:
- fallthrough
-
- case PROXY_LIST:
- fallthrough
-
- case PROXY_GET:
- fallthrough
-
- case PROXY_WATCH:
- fallthrough
-
- default:
- log.Debugw("process-watch-event", log.Fields{
- "key": latestRev.GetHash(),
- "path": latestRev.GetNode().GetProxy().getFullPath(),
- "operation": latestRev.GetNode().GetProxy().Operation.String(),
- })
- }
+ continue
}
+ pathLock, _ = latestRev.GetNode().GetProxy().parseForControlledPath(latestRev.GetNode().GetProxy().getFullPath())
+
// Reserve the path to prevent others to modify while we reload from persistence
- log.Debugw("reserve-and-lock-path", log.Fields{"key": latestRev.GetHash(), "path": pathLock})
- pac = PAC().ReservePath(latestRev.GetNode().GetProxy().getFullPath(),
- latestRev.GetNode().GetProxy(), pathLock)
- pac.lock()
- latestRev.GetNode().GetProxy().Operation = PROXY_WATCH
- pac.SetProxy(latestRev.GetNode().GetProxy())
+ latestRev.GetNode().GetProxy().GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
+ latestRev.GetNode().GetProxy().SetOperation(PROXY_WATCH)
// Load changes and apply to memory
- latestRev.LoadFromPersistence(latestRev.GetName(), "", blobs)
+ latestRev.LoadFromPersistence(context.Background(), latestRev.GetName(), "", blobs)
- log.Debugw("release-and-unlock-path", log.Fields{"key": latestRev.GetHash(), "path": pathLock})
- pac.getProxy().Operation = PROXY_GET
- pac.unlock()
- PAC().ReleasePath(pathLock)
+ // Release path
+ latestRev.GetNode().GetProxy().GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
} else {
// This block should be reached only if coming from a non-proxied request
log.Debugw("revision-with-no-proxy", log.Fields{"key": latestRev.GetHash(), "watch": latestRev.GetName()})
// Load changes and apply to memory
- latestRev.LoadFromPersistence(latestRev.GetName(), "", blobs)
+ latestRev.LoadFromPersistence(context.Background(), latestRev.GetName(), "", blobs)
}
}
@@ -264,16 +253,17 @@
}
// UpdateData modifies the information in the data model and saves it in the persistent storage
-func (pr *PersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
+func (pr *PersistedRevision) UpdateData(ctx context.Context, data interface{}, branch *Branch) Revision {
log.Debugw("updating-persisted-data", log.Fields{"hash": pr.GetHash()})
- newNPR := pr.Revision.UpdateData(data, branch)
+ newNPR := pr.Revision.UpdateData(ctx, data, branch)
newPR := &PersistedRevision{
Revision: newNPR,
Compress: pr.Compress,
kvStore: pr.kvStore,
events: pr.events,
+ Version: pr.getVersion(),
isWatched: pr.isWatched,
}
@@ -289,17 +279,17 @@
}
// UpdateChildren modifies the children of a revision and of a specific component and saves it in the persistent storage
-func (pr *PersistedRevision) UpdateChildren(name string, children []Revision,
- branch *Branch) Revision {
+func (pr *PersistedRevision) UpdateChildren(ctx context.Context, name string, children []Revision, branch *Branch) Revision {
log.Debugw("updating-persisted-children", log.Fields{"hash": pr.GetHash()})
- newNPR := pr.Revision.UpdateChildren(name, children, branch)
+ newNPR := pr.Revision.UpdateChildren(ctx, name, children, branch)
newPR := &PersistedRevision{
Revision: newNPR,
Compress: pr.Compress,
kvStore: pr.kvStore,
events: pr.events,
+ Version: pr.getVersion(),
isWatched: pr.isWatched,
}
@@ -324,6 +314,7 @@
Compress: pr.Compress,
kvStore: pr.kvStore,
events: pr.events,
+ Version: pr.getVersion(),
isWatched: pr.isWatched,
}
@@ -346,8 +337,7 @@
// Drop takes care of eliminating a revision hash that is no longer needed
// and its associated config when required
func (pr *PersistedRevision) StorageDrop(txid string, includeConfig bool) {
- log.Debugw("dropping-revision",
- log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash})
+ log.Debugw("dropping-revision", log.Fields{"txid": txid, "hash": pr.GetHash(), "config-hash": pr.GetConfig().Hash})
pr.mutex.Lock()
defer pr.mutex.Unlock()
@@ -376,9 +366,10 @@
}
// verifyPersistedEntry validates if the provided data is available or not in memory and applies updates as required
-func (pr *PersistedRevision) verifyPersistedEntry(data interface{}, typeName string, keyName string, keyValue string, txid string) (response Revision) {
+func (pr *PersistedRevision) verifyPersistedEntry(ctx context.Context, data interface{}, typeName string, keyName string,
+ keyValue string, txid string, version int64) (response Revision) {
// Parent which holds the current node entry
- parent := pr.GetBranch().Node.Root
+ parent := pr.GetBranch().Node.GetRoot()
// Get a copy of the parent's children
children := make([]Revision, len(parent.GetBranch(NONE).Latest.GetChildren(typeName)))
@@ -389,11 +380,12 @@
// A child matching the provided key exists in memory
// Verify if the data differs from what was retrieved from persistence
// Also check if we are treating a newer revision of the data or not
- if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() {
+ if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() && childRev.getVersion() < version {
log.Debugw("revision-data-is-different", log.Fields{
- "key": childRev.GetHash(),
- "name": childRev.GetName(),
- "data": childRev.GetData(),
+ "key": childRev.GetHash(),
+ "name": childRev.GetName(),
+ "data": childRev.GetData(),
+ "version": childRev.getVersion(),
})
//
@@ -404,14 +396,15 @@
childRev.GetBranch().LatestLock.Lock()
// Update child
- updatedChildRev := childRev.UpdateData(data, childRev.GetBranch())
+ updatedChildRev := childRev.UpdateData(ctx, data, childRev.GetBranch())
updatedChildRev.GetNode().SetProxy(childRev.GetNode().GetProxy())
updatedChildRev.SetupWatch(updatedChildRev.GetName())
updatedChildRev.SetLastUpdate()
+ updatedChildRev.(*PersistedRevision).setVersion(version)
// Update cache
- GetRevCache().Cache.Store(updatedChildRev.GetName(), updatedChildRev)
+ GetRevCache().Set(updatedChildRev.GetName(), updatedChildRev)
childRev.Drop(txid, false)
childRev.GetBranch().LatestLock.Unlock()
@@ -423,7 +416,7 @@
// BEGIN lock parent -- Update parent
parent.GetBranch(NONE).LatestLock.Lock()
- updatedRev := parent.GetBranch(NONE).Latest.UpdateChildren(typeName, children, parent.GetBranch(NONE))
+ updatedRev := parent.GetBranch(NONE).GetLatest().UpdateChildren(ctx, typeName, children, parent.GetBranch(NONE))
parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
parent.GetBranch(NONE).LatestLock.Unlock()
@@ -441,14 +434,8 @@
response = updatedChildRev
}
} else {
- // Data is the same. Continue to the next entry
- log.Debugw("same-revision-data", log.Fields{
- "key": childRev.GetHash(),
- "name": childRev.GetName(),
- "data": childRev.GetData(),
- })
if childRev != nil {
- log.Debugw("keeping-same-revision-data", log.Fields{
+ log.Debugw("keeping-revision-data", log.Fields{
"key": childRev.GetHash(),
"name": childRev.GetName(),
"data": childRev.GetData(),
@@ -456,7 +443,10 @@
// Update timestamp to reflect when it was last read and to reset tracked timeout
childRev.SetLastUpdate()
- GetRevCache().Cache.Store(childRev.GetName(), childRev)
+ if childRev.getVersion() < version {
+ childRev.(*PersistedRevision).setVersion(version)
+ }
+ GetRevCache().Set(childRev.GetName(), childRev)
response = childRev
}
}
@@ -479,6 +469,10 @@
// We need to start watching this entry for future changes
childRev.SetName(typeName + "/" + keyValue)
childRev.SetupWatch(childRev.GetName())
+ childRev.(*PersistedRevision).setVersion(version)
+
+ // Add entry to cache
+ GetRevCache().Set(childRev.GetName(), childRev)
pr.GetBranch().LatestLock.Unlock()
// END child lock
@@ -490,7 +484,7 @@
// BEGIN parent lock
parent.GetBranch(NONE).LatestLock.Lock()
children = append(children, childRev)
- updatedRev := parent.GetBranch(NONE).Latest.UpdateChildren(typeName, children, parent.GetBranch(NONE))
+ updatedRev := parent.GetBranch(NONE).GetLatest().UpdateChildren(ctx, typeName, children, parent.GetBranch(NONE))
updatedRev.GetNode().SetProxy(parent.GetBranch(NONE).Node.GetProxy())
parent.GetBranch(NONE).Node.makeLatest(parent.GetBranch(NONE), updatedRev, nil)
parent.GetBranch(NONE).LatestLock.Unlock()
@@ -512,7 +506,7 @@
// LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
// by adding missing entries, updating changed entries and ignoring unchanged ones
-func (pr *PersistedRevision) LoadFromPersistence(path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
+func (pr *PersistedRevision) LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) []Revision {
pr.mutex.Lock()
defer pr.mutex.Unlock()
@@ -539,7 +533,7 @@
nodeType = pr.GetBranch().Node.Type
} else {
path = partition[1]
- nodeType = pr.GetBranch().Node.Root.Type
+ nodeType = pr.GetBranch().Node.GetRoot().Type
}
field := ChildrenFields(nodeType)[name]
@@ -574,7 +568,7 @@
// based on the field's key attribute
_, key := GetAttributeValue(data.Interface(), field.Key, 0)
- if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(), txid); entry != nil {
+ if entry := pr.verifyPersistedEntry(ctx, data.Interface(), name, field.Key, key.String(), txid, blob.Version); entry != nil {
response = append(response, entry)
}
} else {
@@ -601,7 +595,7 @@
}
keyValue := field.KeyFromStr(key)
- if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string), txid); entry != nil {
+ if entry := pr.verifyPersistedEntry(ctx, data.Interface(), name, field.Key, keyValue.(string), txid, blob.Version); entry != nil {
response = append(response, entry)
}
}
diff --git a/vendor/github.com/opencord/voltha-go/db/model/proxy.go b/vendor/github.com/opencord/voltha-go/db/model/proxy.go
index 182dcdd..5c4d772 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/proxy.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/proxy.go
@@ -17,9 +17,11 @@
package model
import (
+ "context"
"crypto/md5"
"errors"
"fmt"
+ "github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
"reflect"
"runtime"
@@ -54,7 +56,7 @@
// Proxy holds the information for a specific location with the data model
type Proxy struct {
- sync.RWMutex
+ mutex sync.RWMutex
Root *root
Node *node
ParentNode *node
@@ -62,7 +64,7 @@
FullPath string
Exclusive bool
Callbacks map[CallbackType]map[string]*CallbackTuple
- Operation ProxyOperation
+ operation ProxyOperation
}
// NewProxy instantiates a new proxy to a specific location
@@ -100,6 +102,9 @@
// getCallbacks returns the full list of callbacks associated to the proxy
func (p *Proxy) getCallbacks(callbackType CallbackType) map[string]*CallbackTuple {
+ p.mutex.RLock()
+ defer p.mutex.RUnlock()
+
if p != nil {
if cb, exists := p.Callbacks[callbackType]; exists {
return cb
@@ -112,8 +117,8 @@
// getCallback returns a specific callback matching the type and function hash
func (p *Proxy) getCallback(callbackType CallbackType, funcHash string) *CallbackTuple {
- p.Lock()
- defer p.Unlock()
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
if tuple, exists := p.Callbacks[callbackType][funcHash]; exists {
return tuple
}
@@ -122,22 +127,22 @@
// setCallbacks applies a callbacks list to a type
func (p *Proxy) setCallbacks(callbackType CallbackType, callbacks map[string]*CallbackTuple) {
- p.Lock()
- defer p.Unlock()
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
p.Callbacks[callbackType] = callbacks
}
// setCallback applies a callback to a type and hash value
func (p *Proxy) setCallback(callbackType CallbackType, funcHash string, tuple *CallbackTuple) {
- p.Lock()
- defer p.Unlock()
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
p.Callbacks[callbackType][funcHash] = tuple
}
// DeleteCallback removes a callback matching the type and hash
func (p *Proxy) DeleteCallback(callbackType CallbackType, funcHash string) {
- p.Lock()
- defer p.Unlock()
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
delete(p.Callbacks[callbackType], funcHash)
}
@@ -146,7 +151,8 @@
// Enumerated list of callback types
const (
- PROXY_GET ProxyOperation = iota
+ PROXY_NONE ProxyOperation = iota
+ PROXY_GET
PROXY_LIST
PROXY_ADD
PROXY_UPDATE
@@ -156,6 +162,7 @@
)
var proxyOperationTypes = []string{
+ "PROXY_NONE",
"PROXY_GET",
"PROXY_LIST",
"PROXY_ADD",
@@ -169,6 +176,18 @@
return proxyOperationTypes[t]
}
+func (p *Proxy) GetOperation() ProxyOperation {
+ p.mutex.RLock()
+ defer p.mutex.RUnlock()
+ return p.operation
+}
+
+func (p *Proxy) SetOperation(operation ProxyOperation) {
+ p.mutex.Lock()
+ defer p.mutex.Unlock()
+ p.operation = operation
+}
+
// parseForControlledPath verifies if a proxy path matches a pattern
// for locations that need to be access controlled.
func (p *Proxy) parseForControlledPath(path string) (pathLock string, controlled bool) {
@@ -195,7 +214,7 @@
// List will retrieve information from the data model at the specified path location
// A list operation will force access to persistence storage
-func (p *Proxy) List(path string, depth int, deep bool, txid string) interface{} {
+func (p *Proxy) List(ctx context.Context, path string, depth int, deep bool, txid string) interface{} {
var effectivePath string
if path == "/" {
effectivePath = p.getFullPath()
@@ -205,28 +224,24 @@
pathLock, controlled := p.parseForControlledPath(effectivePath)
+ p.SetOperation(PROXY_LIST)
+ defer p.SetOperation(PROXY_NONE)
+
log.Debugw("proxy-list", log.Fields{
"path": path,
"effective": effectivePath,
"pathLock": pathLock,
"controlled": controlled,
+ "operation": p.GetOperation(),
})
- pac := PAC().ReservePath(effectivePath, p, pathLock)
- defer PAC().ReleasePath(pathLock)
- p.Operation = PROXY_LIST
- pac.SetProxy(p)
- defer func(op ProxyOperation) {
- pac.getProxy().Operation = op
- }(PROXY_GET)
-
- rv := pac.List(path, depth, deep, txid, controlled)
+ rv := p.GetRoot().List(ctx, path, "", depth, deep, txid)
return rv
}
// Get will retrieve information from the data model at the specified path location
-func (p *Proxy) Get(path string, depth int, deep bool, txid string) interface{} {
+func (p *Proxy) Get(ctx context.Context, path string, depth int, deep bool, txid string) interface{} {
var effectivePath string
if path == "/" {
effectivePath = p.getFullPath()
@@ -236,25 +251,24 @@
pathLock, controlled := p.parseForControlledPath(effectivePath)
+ p.SetOperation(PROXY_GET)
+ defer p.SetOperation(PROXY_NONE)
+
log.Debugw("proxy-get", log.Fields{
"path": path,
"effective": effectivePath,
"pathLock": pathLock,
"controlled": controlled,
+ "operation": p.GetOperation(),
})
- pac := PAC().ReservePath(effectivePath, p, pathLock)
- defer PAC().ReleasePath(pathLock)
- p.Operation = PROXY_GET
- pac.SetProxy(p)
-
- rv := pac.Get(path, depth, deep, txid, controlled)
+ rv := p.GetRoot().Get(ctx, path, "", depth, deep, txid)
return rv
}
// Update will modify information in the data model at the specified location with the provided data
-func (p *Proxy) Update(path string, data interface{}, strict bool, txid string) interface{} {
+func (p *Proxy) Update(ctx context.Context, path string, data interface{}, strict bool, txid string) interface{} {
if !strings.HasPrefix(path, "/") {
log.Errorf("invalid path: %s", path)
return nil
@@ -271,31 +285,36 @@
pathLock, controlled := p.parseForControlledPath(effectivePath)
+ p.SetOperation(PROXY_UPDATE)
+ defer p.SetOperation(PROXY_NONE)
+
log.Debugw("proxy-update", log.Fields{
"path": path,
"effective": effectivePath,
"full": fullPath,
"pathLock": pathLock,
"controlled": controlled,
+ "operation": p.GetOperation(),
})
- pac := PAC().ReservePath(effectivePath, p, pathLock)
- defer PAC().ReleasePath(pathLock)
+ if p.GetRoot().KvStore != nil {
+ p.GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
+ defer p.GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
+ }
- p.Operation = PROXY_UPDATE
- pac.SetProxy(p)
- defer func(op ProxyOperation) {
- pac.getProxy().Operation = op
- }(PROXY_GET)
- log.Debugw("proxy-operation--update", log.Fields{"operation": p.Operation})
+ result := p.GetRoot().Update(ctx, fullPath, data, strict, txid, nil)
- return pac.Update(fullPath, data, strict, txid, controlled)
+ if result != nil {
+ return result.GetData()
+ }
+
+ return nil
}
// AddWithID will insert new data at specified location.
// This method also allows the user to specify the ID of the data entry to ensure
// that access control is active while inserting the information.
-func (p *Proxy) AddWithID(path string, id string, data interface{}, txid string) interface{} {
+func (p *Proxy) AddWithID(ctx context.Context, path string, id string, data interface{}, txid string) interface{} {
if !strings.HasPrefix(path, "/") {
log.Errorf("invalid path: %s", path)
return nil
@@ -312,31 +331,34 @@
pathLock, controlled := p.parseForControlledPath(effectivePath)
+ p.SetOperation(PROXY_ADD)
+ defer p.SetOperation(PROXY_NONE)
+
log.Debugw("proxy-add-with-id", log.Fields{
"path": path,
"effective": effectivePath,
"full": fullPath,
"pathLock": pathLock,
"controlled": controlled,
+ "operation": p.GetOperation(),
})
- pac := PAC().ReservePath(path, p, pathLock)
- defer PAC().ReleasePath(pathLock)
+ if p.GetRoot().KvStore != nil {
+ p.GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
+ defer p.GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
+ }
- p.Operation = PROXY_ADD
- defer func(op ProxyOperation) {
- pac.getProxy().Operation = op
- }(PROXY_GET)
+ result := p.GetRoot().Add(ctx, fullPath, data, txid, nil)
- pac.SetProxy(p)
+ if result != nil {
+ return result.GetData()
+ }
- log.Debugw("proxy-operation--add", log.Fields{"operation": p.Operation})
-
- return pac.Add(fullPath, data, txid, controlled)
+ return nil
}
// Add will insert new data at specified location.
-func (p *Proxy) Add(path string, data interface{}, txid string) interface{} {
+func (p *Proxy) Add(ctx context.Context, path string, data interface{}, txid string) interface{} {
if !strings.HasPrefix(path, "/") {
log.Errorf("invalid path: %s", path)
return nil
@@ -353,30 +375,34 @@
pathLock, controlled := p.parseForControlledPath(effectivePath)
+ p.SetOperation(PROXY_ADD)
+ defer p.SetOperation(PROXY_NONE)
+
log.Debugw("proxy-add", log.Fields{
"path": path,
"effective": effectivePath,
"full": fullPath,
"pathLock": pathLock,
"controlled": controlled,
+ "operation": p.GetOperation(),
})
- pac := PAC().ReservePath(path, p, pathLock)
- defer PAC().ReleasePath(pathLock)
+ if p.GetRoot().KvStore != nil {
+ p.GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
+ defer p.GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
+ }
- p.Operation = PROXY_ADD
- pac.SetProxy(p)
- defer func(op ProxyOperation) {
- pac.getProxy().Operation = op
- }(PROXY_GET)
+ result := p.GetRoot().Add(ctx, fullPath, data, txid, nil)
- log.Debugw("proxy-operation--add", log.Fields{"operation": p.Operation})
+ if result != nil {
+ return result.GetData()
+ }
- return pac.Add(fullPath, data, txid, controlled)
+ return nil
}
// Remove will delete an entry at the specified location
-func (p *Proxy) Remove(path string, txid string) interface{} {
+func (p *Proxy) Remove(ctx context.Context, path string, txid string) interface{} {
if !strings.HasPrefix(path, "/") {
log.Errorf("invalid path: %s", path)
return nil
@@ -393,30 +419,34 @@
pathLock, controlled := p.parseForControlledPath(effectivePath)
+ p.SetOperation(PROXY_REMOVE)
+ defer p.SetOperation(PROXY_NONE)
+
log.Debugw("proxy-remove", log.Fields{
"path": path,
"effective": effectivePath,
"full": fullPath,
"pathLock": pathLock,
"controlled": controlled,
+ "operation": p.GetOperation(),
})
- pac := PAC().ReservePath(effectivePath, p, pathLock)
- defer PAC().ReleasePath(pathLock)
+ if p.GetRoot().KvStore != nil {
+ p.GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
+ defer p.GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
+ }
- p.Operation = PROXY_REMOVE
- pac.SetProxy(p)
- defer func(op ProxyOperation) {
- pac.getProxy().Operation = op
- }(PROXY_GET)
+ result := p.GetRoot().Remove(ctx, fullPath, txid, nil)
- log.Debugw("proxy-operation--remove", log.Fields{"operation": p.Operation})
+ if result != nil {
+ return result.GetData()
+ }
- return pac.Remove(fullPath, txid, controlled)
+ return nil
}
// CreateProxy to interact with specific path directly
-func (p *Proxy) CreateProxy(path string, exclusive bool) *Proxy {
+func (p *Proxy) CreateProxy(ctx context.Context, path string, exclusive bool) *Proxy {
if !strings.HasPrefix(path, "/") {
log.Errorf("invalid path: %s", path)
return nil
@@ -434,26 +464,24 @@
pathLock, controlled := p.parseForControlledPath(effectivePath)
+ p.SetOperation(PROXY_CREATE)
+ defer p.SetOperation(PROXY_NONE)
+
log.Debugw("proxy-create", log.Fields{
"path": path,
"effective": effectivePath,
"full": fullPath,
"pathLock": pathLock,
"controlled": controlled,
+ "operation": p.GetOperation(),
})
- pac := PAC().ReservePath(path, p, pathLock)
- defer PAC().ReleasePath(pathLock)
+ if p.GetRoot().KvStore != nil {
+ p.GetRoot().KvStore.Client.Reserve(pathLock+"_", uuid.New().String(), ReservationTTL)
+ defer p.GetRoot().KvStore.Client.ReleaseReservation(pathLock + "_")
+ }
- p.Operation = PROXY_CREATE
- pac.SetProxy(p)
- defer func(op ProxyOperation) {
- pac.getProxy().Operation = op
- }(PROXY_GET)
-
- log.Debugw("proxy-operation--create-proxy", log.Fields{"operation": p.Operation})
-
- return pac.CreateProxy(fullPath, exclusive, controlled)
+ return p.GetRoot().CreateProxy(ctx, fullPath, exclusive)
}
// OpenTransaction creates a new transaction branch to isolate operations made to the data model
@@ -553,7 +581,7 @@
var err error
if callbacks := p.getCallbacks(callbackType); callbacks != nil {
- p.Lock()
+ p.mutex.Lock()
for _, callback := range callbacks {
if result, err = p.invoke(callback, context); err != nil {
if !proceedOnError {
@@ -563,7 +591,7 @@
log.Info("An error occurred. Invoking next callback")
}
}
- p.Unlock()
+ p.mutex.Unlock()
}
return result
diff --git a/vendor/github.com/opencord/voltha-go/db/model/proxy_access_control.go b/vendor/github.com/opencord/voltha-go/db/model/proxy_access_control.go
deleted file mode 100644
index a1ea6be..0000000
--- a/vendor/github.com/opencord/voltha-go/db/model/proxy_access_control.go
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package model
-
-import (
- "github.com/opencord/voltha-go/common/log"
- "sync"
- "time"
-)
-
-type singletonProxyAccessControl struct {
- sync.RWMutex
- cache sync.Map
- reservedCount int
-}
-
-var instanceProxyAccessControl *singletonProxyAccessControl
-var onceProxyAccessControl sync.Once
-
-// PAC provides access to the proxy access control singleton instance
-func PAC() *singletonProxyAccessControl {
- onceProxyAccessControl.Do(func() {
- instanceProxyAccessControl = &singletonProxyAccessControl{}
- })
- return instanceProxyAccessControl
-}
-
-// IsReserved will verify if access control is active for a specific path within the model
-func (singleton *singletonProxyAccessControl) IsReserved(pathLock string) bool {
- singleton.Lock()
- defer singleton.Unlock()
-
- _, exists := singleton.cache.Load(pathLock)
- log.Debugw("is-reserved", log.Fields{"pathLock": pathLock, "exists": exists})
-
- return exists
-}
-
-// ReservePath will apply access control for a specific path within the model
-func (singleton *singletonProxyAccessControl) ReservePath(path string, proxy *Proxy, pathLock string) *proxyAccessControl {
- singleton.Lock()
- defer singleton.Unlock()
- singleton.reservedCount++
- if pac, exists := singleton.cache.Load(pathLock); !exists {
- log.Debugf("Creating new PAC entry for path:%s pathLock:%s", path, pathLock)
- newPac := NewProxyAccessControl(proxy, pathLock)
- singleton.cache.Store(pathLock, newPac)
- return newPac
- } else {
- log.Debugf("Re-using existing PAC entry for path:%s pathLock:%s", path, pathLock)
- return pac.(*proxyAccessControl)
- }
-}
-
-// ReleasePath will remove access control for a specific path within the model
-func (singleton *singletonProxyAccessControl) ReleasePath(pathLock string) {
- singleton.Lock()
- defer singleton.Unlock()
-
- singleton.reservedCount--
-
- if singleton.reservedCount == 0 {
- singleton.cache.Delete(pathLock)
- }
-}
-
-// ProxyAccessControl is the abstraction interface to the base proxyAccessControl structure
-type ProxyAccessControl interface {
- Get(path string, depth int, deep bool, txid string, control bool) interface{}
- Update(path string, data interface{}, strict bool, txid string, control bool) interface{}
- Add(path string, data interface{}, txid string, control bool) interface{}
- Remove(path string, txid string, control bool) interface{}
- SetProxy(proxy *Proxy)
-}
-
-// proxyAccessControl holds details of the path and proxy that requires access control
-type proxyAccessControl struct {
- sync.RWMutex
- Proxy *Proxy
- PathLock chan struct{}
- Path string
-
- start time.Time
- stop time.Time
-}
-
-// NewProxyAccessControl creates a new instance of an access control structure
-func NewProxyAccessControl(proxy *Proxy, path string) *proxyAccessControl {
- return &proxyAccessControl{
- Proxy: proxy,
- Path: path,
- PathLock: make(chan struct{}, 1),
- }
-}
-
-// lock will prevent access to a model path
-func (pac *proxyAccessControl) lock() {
- log.Debugw("locking", log.Fields{"path": pac.Path})
- pac.PathLock <- struct{}{}
- pac.setStart(time.Now())
-}
-
-// unlock will release control of a model path
-func (pac *proxyAccessControl) unlock() {
- <-pac.PathLock
- log.Debugw("unlocking", log.Fields{"path": pac.Path})
- pac.setStop(time.Now())
- GetProfiling().AddToInMemoryLockTime(pac.getStop().Sub(pac.getStart()).Seconds())
-}
-
-// getStart is used for profiling purposes and returns the time at which access control was applied
-func (pac *proxyAccessControl) getStart() time.Time {
- pac.Lock()
- defer pac.Unlock()
- return pac.start
-}
-
-// getStart is used for profiling purposes and returns the time at which access control was removed
-func (pac *proxyAccessControl) getStop() time.Time {
- pac.Lock()
- defer pac.Unlock()
- return pac.stop
-}
-
-// getPath returns the access controlled path
-func (pac *proxyAccessControl) getPath() string {
- pac.Lock()
- defer pac.Unlock()
- return pac.Path
-}
-
-// getProxy returns the proxy used to reach a specific location in the data model
-func (pac *proxyAccessControl) getProxy() *Proxy {
- pac.Lock()
- defer pac.Unlock()
- return pac.Proxy
-}
-
-// setStart is for profiling purposes and applies a start time value at which access control was started
-func (pac *proxyAccessControl) setStart(time time.Time) {
- pac.Lock()
- defer pac.Unlock()
- pac.start = time
-}
-
-// setStop is for profiling purposes and applies a stop time value at which access control was stopped
-func (pac *proxyAccessControl) setStop(time time.Time) {
- pac.Lock()
- defer pac.Unlock()
- pac.stop = time
-}
-
-// SetProxy is used to changed the proxy object of an access controlled path
-func (pac *proxyAccessControl) SetProxy(proxy *Proxy) {
- pac.Lock()
- defer pac.Unlock()
- pac.Proxy = proxy
-}
-
-// List retrieves data linked to a data model path
-func (pac *proxyAccessControl) List(path string, depth int, deep bool, txid string, control bool) interface{} {
- if control {
- pac.lock()
- log.Debugw("locked-access--list", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
- defer pac.unlock()
- defer log.Debugw("unlocked-access--list", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
- }
-
- // FIXME: Forcing depth to 0 for now due to problems deep copying the data structure
- // The data traversal through reflection currently corrupts the content
-
- return pac.getProxy().GetRoot().List(path, "", depth, deep, txid)
-}
-
-// Get retrieves data linked to a data model path
-func (pac *proxyAccessControl) Get(path string, depth int, deep bool, txid string, control bool) interface{} {
- if control {
- pac.lock()
- log.Debugw("locked-access--get", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
- defer pac.unlock()
- defer log.Debugw("unlocked-access--get", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
- }
-
- // FIXME: Forcing depth to 0 for now due to problems deep copying the data structure
- // The data traversal through reflection currently corrupts the content
- return pac.getProxy().GetRoot().Get(path, "", 0, deep, txid)
-}
-
-// Update changes the content of the data model at the specified location with the provided data
-func (pac *proxyAccessControl) Update(path string, data interface{}, strict bool, txid string, control bool) interface{} {
- if control {
- pac.lock()
- log.Debugw("locked-access--update", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
- defer pac.unlock()
- defer log.Debugw("unlocked-access--update", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
- }
-
- result := pac.getProxy().GetRoot().Update(path, data, strict, txid, nil)
-
- if result != nil {
- return result.GetData()
- }
- return nil
-}
-
-// Add creates a new data model entry at the specified location with the provided data
-func (pac *proxyAccessControl) Add(path string, data interface{}, txid string, control bool) interface{} {
- if control {
- pac.lock()
- log.Debugw("locked-access--add", log.Fields{"path": path, "fullPath": pac.Path})
- defer pac.unlock()
- defer log.Debugw("unlocked-access--add", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
- }
-
- result := pac.getProxy().GetRoot().Add(path, data, txid, nil)
-
- if result != nil {
- return result.GetData()
- }
- return nil
-}
-
-// Remove discards information linked to the data model path
-func (pac *proxyAccessControl) Remove(path string, txid string, control bool) interface{} {
- if control {
- pac.lock()
- log.Debugw("locked-access--remove", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
- defer pac.unlock()
- defer log.Debugw("unlocked-access--remove", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
- }
-
- return pac.getProxy().GetRoot().Remove(path, txid, nil)
-}
-
-// CreateProxy allows interaction for a specific path
-func (pac *proxyAccessControl) CreateProxy(path string, exclusive bool, control bool) *Proxy {
- if control {
- pac.lock()
- log.Debugw("locked-access--create-proxy", log.Fields{"path": path, "fullPath": pac.Path})
- defer pac.unlock()
- defer log.Debugw("unlocked-access--create-proxy", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
- }
-
- result := pac.getProxy().ParentNode.CreateProxy(path, exclusive)
-
- if result != nil {
- return result
- }
- return nil
-}
diff --git a/vendor/github.com/opencord/voltha-go/db/model/revision.go b/vendor/github.com/opencord/voltha-go/db/model/revision.go
index cd4c5df..6f52248 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/revision.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/revision.go
@@ -16,6 +16,7 @@
package model
import (
+ "context"
"github.com/opencord/voltha-go/db/kvstore"
"time"
)
@@ -34,6 +35,7 @@
SetHash(hash string)
GetHash() string
ClearHash()
+ getVersion() int64
SetupWatch(key string)
SetName(name string)
GetName() string
@@ -42,10 +44,10 @@
Get(int) interface{}
GetData() interface{}
GetNode() *node
- LoadFromPersistence(path string, txid string, blobs map[string]*kvstore.KVPair) []Revision
SetLastUpdate(ts ...time.Time)
GetLastUpdate() time.Time
- UpdateData(data interface{}, branch *Branch) Revision
- UpdateChildren(name string, children []Revision, branch *Branch) Revision
+ LoadFromPersistence(ctx context.Context, path string, txid string, blobs map[string]*kvstore.KVPair) []Revision
+ UpdateData(ctx context.Context, data interface{}, branch *Branch) Revision
+ UpdateChildren(ctx context.Context, name string, children []Revision, branch *Branch) Revision
UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision
}
diff --git a/vendor/github.com/opencord/voltha-go/db/model/root.go b/vendor/github.com/opencord/voltha-go/db/model/root.go
index 5036ce1..8331e11 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/root.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/root.go
@@ -17,6 +17,7 @@
package model
import (
+ "context"
"encoding/hex"
"encoding/json"
"github.com/golang/protobuf/proto"
@@ -103,7 +104,7 @@
r.DeleteTxBranch(txid)
} else {
r.node.MergeBranch(txid, false)
- r.ExecuteCallbacks()
+ r.node.GetRoot().ExecuteCallbacks()
r.DeleteTxBranch(txid)
}
}
@@ -162,7 +163,7 @@
}
func (r *root) syncParent(childRev Revision, txid string) {
- data := proto.Clone(r.Proxy.ParentNode.Latest().GetData().(proto.Message))
+ data := proto.Clone(r.GetProxy().ParentNode.Latest().GetData().(proto.Message))
for fieldName, _ := range ChildrenFields(data) {
childDataName, childDataHolder := GetAttributeValue(data, fieldName, 0)
@@ -172,12 +173,12 @@
}
}
- r.Proxy.ParentNode.Latest().SetConfig(NewDataRevision(r.Proxy.ParentNode.Root, data))
- r.Proxy.ParentNode.Latest(txid).Finalize(false)
+ r.GetProxy().ParentNode.Latest().SetConfig(NewDataRevision(r.GetProxy().ParentNode.GetRoot(), data))
+ r.GetProxy().ParentNode.Latest(txid).Finalize(false)
}
// Update modifies the content of an object at a given path with the provided data
-func (r *root) Update(path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision {
+func (r *root) Update(ctx context.Context, path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision {
var result Revision
if makeBranch != nil {
@@ -193,13 +194,13 @@
r.DirtyNodes[txid] = append(r.DirtyNodes[txid], node)
return node.MakeBranch(txid)
}
- result = r.node.Update(path, data, strict, txid, trackDirty)
+ result = r.node.Update(ctx, path, data, strict, txid, trackDirty)
} else {
- result = r.node.Update(path, data, strict, "", nil)
+ result = r.node.Update(ctx, path, data, strict, "", nil)
}
if result != nil {
- if r.Proxy.FullPath != r.Proxy.Path {
+ if r.GetProxy().FullPath != r.GetProxy().Path {
r.syncParent(result, txid)
} else {
result.Finalize(false)
@@ -212,7 +213,7 @@
}
// Add creates a new object at the given path with the provided data
-func (r *root) Add(path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision {
+func (r *root) Add(ctx context.Context, path string, data interface{}, txid string, makeBranch MakeBranchFunction) Revision {
var result Revision
if makeBranch != nil {
@@ -228,9 +229,9 @@
r.DirtyNodes[txid] = append(r.DirtyNodes[txid], node)
return node.MakeBranch(txid)
}
- result = r.node.Add(path, data, txid, trackDirty)
+ result = r.node.Add(ctx, path, data, txid, trackDirty)
} else {
- result = r.node.Add(path, data, "", nil)
+ result = r.node.Add(ctx, path, data, "", nil)
}
if result != nil {
@@ -241,7 +242,7 @@
}
// Remove discards an object at a given path
-func (r *root) Remove(path string, txid string, makeBranch MakeBranchFunction) Revision {
+func (r *root) Remove(ctx context.Context, path string, txid string, makeBranch MakeBranchFunction) Revision {
var result Revision
if makeBranch != nil {
@@ -257,9 +258,9 @@
r.DirtyNodes[txid] = append(r.DirtyNodes[txid], node)
return node.MakeBranch(txid)
}
- result = r.node.Remove(path, txid, trackDirty)
+ result = r.node.Remove(ctx, path, txid, trackDirty)
} else {
- result = r.node.Remove(path, "", nil)
+ result = r.node.Remove(ctx, path, "", nil)
}
r.node.GetRoot().ExecuteCallbacks()
diff --git a/vendor/github.com/opencord/voltha-go/db/model/transaction.go b/vendor/github.com/opencord/voltha-go/db/model/transaction.go
index fa8de1d..7529ff2 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/transaction.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/transaction.go
@@ -16,6 +16,7 @@
package model
import (
+ "context"
"github.com/opencord/voltha-go/common/log"
)
@@ -31,34 +32,34 @@
}
return tx
}
-func (t *Transaction) Get(path string, depth int, deep bool) interface{} {
+func (t *Transaction) Get(ctx context.Context, path string, depth int, deep bool) interface{} {
if t.txid == "" {
log.Errorf("closed transaction")
return nil
}
// TODO: need to review the return values at the different layers!!!!!
- return t.proxy.Get(path, depth, deep, t.txid)
+ return t.proxy.Get(ctx, path, depth, deep, t.txid)
}
-func (t *Transaction) Update(path string, data interface{}, strict bool) interface{} {
+func (t *Transaction) Update(ctx context.Context, path string, data interface{}, strict bool) interface{} {
if t.txid == "" {
log.Errorf("closed transaction")
return nil
}
- return t.proxy.Update(path, data, strict, t.txid)
+ return t.proxy.Update(ctx, path, data, strict, t.txid)
}
-func (t *Transaction) Add(path string, data interface{}) interface{} {
+func (t *Transaction) Add(ctx context.Context, path string, data interface{}) interface{} {
if t.txid == "" {
log.Errorf("closed transaction")
return nil
}
- return t.proxy.Add(path, data, t.txid)
+ return t.proxy.Add(ctx, path, data, t.txid)
}
-func (t *Transaction) Remove(path string) interface{} {
+func (t *Transaction) Remove(ctx context.Context, path string) interface{} {
if t.txid == "" {
log.Errorf("closed transaction")
return nil
}
- return t.proxy.Remove(path, t.txid)
+ return t.proxy.Remove(ctx, path, t.txid)
}
func (t *Transaction) Cancel() {
t.proxy.cancelTransaction(t.txid)
diff --git a/vendor/github.com/opencord/voltha-go/rw_core/utils/core_utils.go b/vendor/github.com/opencord/voltha-go/rw_core/utils/core_utils.go
index c9cd56d..aad1348 100644
--- a/vendor/github.com/opencord/voltha-go/rw_core/utils/core_utils.go
+++ b/vendor/github.com/opencord/voltha-go/rw_core/utils/core_utils.go
@@ -41,6 +41,9 @@
//If no errors is found then nil is returned. This method also takes in a timeout in milliseconds. If a
//timeout is obtained then this function will stop waiting for the remaining responses and abort.
func WaitForNilOrErrorResponses(timeout int64, chnls ...chan interface{}) []error {
+ if len(chnls) == 0 {
+ return nil
+ }
// Create a timeout channel
tChnl := make(chan *interface{})
go func() {
diff --git a/vendor/github.com/opencord/voltha-go/rw_core/utils/flow_utils.go b/vendor/github.com/opencord/voltha-go/rw_core/utils/flow_utils.go
index 3828b39..4293126 100644
--- a/vendor/github.com/opencord/voltha-go/rw_core/utils/flow_utils.go
+++ b/vendor/github.com/opencord/voltha-go/rw_core/utils/flow_utils.go
@@ -29,7 +29,9 @@
var (
// Instructions shortcut
- APPLY_ACTIONS = ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS
+ APPLY_ACTIONS = ofp.OfpInstructionType_OFPIT_APPLY_ACTIONS
+ WRITE_METADATA = ofp.OfpInstructionType_OFPIT_WRITE_METADATA
+ METER_ACTION = ofp.OfpInstructionType_OFPIT_METER
//OFPAT_* shortcuts
OUTPUT = ofp.OfpActionType_OFPAT_OUTPUT
@@ -456,6 +458,22 @@
return 0
}
+func GetMeterId(flow *ofp.OfpFlowStats) uint32 {
+ if flow == nil {
+ return 0
+ }
+ for _, instruction := range flow.Instructions {
+ if instruction.Type == uint32(ofp.OfpInstructionType_OFPIT_METER) {
+ MeterInstruction := instruction.GetMeter()
+ if MeterInstruction == nil {
+ return 0
+ }
+ return MeterInstruction.GetMeterId()
+ }
+ }
+ return 0
+}
+
func GetTunnelId(flow *ofp.OfpFlowStats) uint64 {
if flow == nil {
return 0
@@ -475,9 +493,10 @@
}
for _, field := range GetOfbFields(flow) {
if field.Type == METADATA {
- return uint32(field.GetTableMetadata() & 0xffffffff)
+ return uint32(field.GetTableMetadata() & 0xFFFFFFFF)
}
}
+ log.Debug("No-metadata-present")
return 0
}
@@ -490,28 +509,83 @@
return field.GetTableMetadata()
}
}
+ log.Debug("No-metadata-present")
return 0
}
-// GetPortNumberFromMetadata retrieves the port number from the Metadata_ofp. The port number (UNI on ONU) is in the
-// lower 32-bits of Metadata_ofp and the inner_tag is in the upper 32-bits. This is set in the ONOS OltPipeline as
-// a Metadata_ofp field
-func GetPortNumberFromMetadata(flow *ofp.OfpFlowStats) uint64 {
- md := GetMetaData64Bit(flow)
- if md == 0 {
- return 0
+// function returns write metadata value from write_metadata action field
+func GetMetadataFromWriteMetadataAction(flow *ofp.OfpFlowStats) uint64 {
+ if flow != nil {
+ for _, instruction := range flow.Instructions {
+ if instruction.Type == uint32(WRITE_METADATA) {
+ if writeMetadata := instruction.GetWriteMetadata(); writeMetadata != nil {
+ return writeMetadata.GetMetadata()
+ }
+ }
+ }
}
- if md <= 0xffffffff {
- log.Debugw("onos-upgrade-suggested", log.Fields{"Metadata_ofp": md, "message": "Legacy MetaData detected form OltPipeline"})
- return md
+ log.Debugw("No-write-metadata-present", log.Fields{"flow": flow})
+ return 0
+}
+
+func GetTechProfileIDFromWriteMetaData(metadata uint64) uint16 {
+ /*
+ Write metadata instruction value (metadata) is 8 bytes:
+ MS 2 bytes: C Tag
+ Next 2 bytes: Technology Profile Id
+ Next 4 bytes: Port number (uni or nni)
+
+ This is set in the ONOS OltPipeline as a write metadata instruction
+ */
+ var tpId uint16 = 0
+ log.Debugw("Write metadata value for Techprofile ID", log.Fields{"metadata": metadata})
+ if metadata != 0 {
+ tpId = uint16((metadata >> 32) & 0xFFFF)
+ log.Debugw("Found techprofile ID from write metadata action", log.Fields{"tpid": tpId})
}
- return md & 0xffffffff
+ return tpId
+}
+
+func GetEgressPortNumberFromWriteMetadata(flow *ofp.OfpFlowStats) uint32 {
+ /*
+ Write metadata instruction value (metadata) is 8 bytes:
+ MS 2 bytes: C Tag
+ Next 2 bytes: Technology Profile Id
+ Next 4 bytes: Port number (uni or nni)
+ This is set in the ONOS OltPipeline as a write metadata instruction
+ */
+ var uniPort uint32 = 0
+ md := GetMetadataFromWriteMetadataAction(flow)
+ log.Debugw("Metadata found for egress/uni port ", log.Fields{"metadata": md})
+ if md != 0 {
+ uniPort = uint32(md & 0xFFFFFFFF)
+ log.Debugw("Found EgressPort from write metadata action", log.Fields{"egress_port": uniPort})
+ }
+ return uniPort
+
+}
+
+func GetInnerTagFromMetaData(flow *ofp.OfpFlowStats) uint16 {
+ /*
+ Write metadata instruction value (metadata) is 8 bytes:
+ MS 2 bytes: C Tag
+ Next 2 bytes: Technology Profile Id
+ Next 4 bytes: Port number (uni or nni)
+ This is set in the ONOS OltPipeline as a write metadata instruction
+ */
+ var innerTag uint16 = 0
+ md := GetMetadataFromWriteMetadataAction(flow)
+ if md != 0 {
+ innerTag = uint16((md >> 48) & 0xFFFF)
+ log.Debugw("Found CVLAN from write metadate action", log.Fields{"c_vlan": innerTag})
+ }
+ return innerTag
}
//GetInnerTagFromMetaData retrieves the inner tag from the Metadata_ofp. The port number (UNI on ONU) is in the
// lower 32-bits of Metadata_ofp and the inner_tag is in the upper 32-bits. This is set in the ONOS OltPipeline as
//// a Metadata_ofp field
-func GetInnerTagFromMetaData(flow *ofp.OfpFlowStats) uint64 {
+/*func GetInnerTagFromMetaData(flow *ofp.OfpFlowStats) uint64 {
md := GetMetaData64Bit(flow)
if md == 0 {
return 0
@@ -521,7 +595,7 @@
return md
}
return (md >> 32) & 0xffffffff
-}
+}*/
// Extract the child device port from a flow that contains the parent device peer port. Typically the UNI port of an
// ONU child device. Per TST agreement this will be the lower 32 bits of tunnel id reserving upper 32 bits for later
@@ -571,6 +645,23 @@
return nil
}
+// GetMeterIdFlowModArgs returns the meterId if the "meter_id" is present in the map, otherwise return 0
+func GetMeterIdFlowModArgs(kw OfpFlowModArgs) uint32 {
+ if val, exist := kw["meter_id"]; exist {
+ return uint32(val)
+ }
+ return 0
+}
+
+// Function returns the metadata if the "write_metadata" is present in the map, otherwise return nil
+func GetMetadataFlowModArgs(kw OfpFlowModArgs) uint64 {
+ if val, exist := kw["write_metadata"]; exist {
+ ret := uint64(val)
+ return ret
+ }
+ return 0
+}
+
// Return unique 64-bit integer hash for flow covering the following attributes:
// 'table_id', 'priority', 'flags', 'cookie', 'match', '_instruction_string'
func HashFlowStats(flow *ofp.OfpFlowStats) uint64 {
@@ -619,6 +710,53 @@
return group
}
+// flowStatsEntryFromFlowModMessage maps an ofp_flow_mod message to an ofp_flow_stats message
+func MeterEntryFromMeterMod(meterMod *ofp.OfpMeterMod) *ofp.OfpMeterEntry {
+ bandStats := make([]*ofp.OfpMeterBandStats, 0)
+ meter := &ofp.OfpMeterEntry{Config: &ofp.OfpMeterConfig{},
+ Stats: &ofp.OfpMeterStats{BandStats: bandStats}}
+ if meterMod == nil {
+ log.Error("Invalid meter mod command")
+ return meter
+ }
+ // config init
+ meter.Config.MeterId = meterMod.MeterId
+ meter.Config.Flags = meterMod.Flags
+ meter.Config.Bands = meterMod.Bands
+ // meter stats init
+ meter.Stats.MeterId = meterMod.MeterId
+ meter.Stats.FlowCount = 0
+ meter.Stats.PacketInCount = 0
+ meter.Stats.ByteInCount = 0
+ meter.Stats.DurationSec = 0
+ meter.Stats.DurationNsec = 0
+ // band stats init
+ for _, _ = range meterMod.Bands {
+ band := &ofp.OfpMeterBandStats{}
+ band.PacketBandCount = 0
+ band.ByteBandCount = 0
+ bandStats = append(bandStats, band)
+ }
+ meter.Stats.BandStats = bandStats
+ log.Debugw("Allocated meter entry", log.Fields{"meter": *meter})
+ return meter
+
+}
+
+func GetMeterIdFromFlow(flow *ofp.OfpFlowStats) uint32 {
+ if flow != nil {
+ for _, instruction := range flow.Instructions {
+ if instruction.Type == uint32(METER_ACTION) {
+ if meterInst := instruction.GetMeter(); meterInst != nil {
+ return meterInst.GetMeterId()
+ }
+ }
+ }
+ }
+
+ return uint32(0)
+}
+
func MkOxmFields(matchFields []ofp.OfpOxmField) []*ofp.OfpOxmField {
oxmFields := make([]*ofp.OfpOxmField, 0)
for _, matchField := range matchFields {
@@ -653,6 +791,20 @@
inst := ofp.OfpInstruction{Type: uint32(ofp.OfpInstructionType_OFPIT_GOTO_TABLE), Data: &instGotoTable}
instructions = append(instructions, &inst)
}
+ // Process meter action
+ if meterId := GetMeterIdFlowModArgs(kw); meterId != 0 {
+ var instMeter ofp.OfpInstruction_Meter
+ instMeter.Meter = &ofp.OfpInstructionMeter{MeterId: meterId}
+ inst := ofp.OfpInstruction{Type: uint32(METER_ACTION), Data: &instMeter}
+ instructions = append(instructions, &inst)
+ }
+ //process write_metadata action
+ if metadata := GetMetadataFlowModArgs(kw); metadata != 0 {
+ var instWriteMetadata ofp.OfpInstruction_WriteMetadata
+ instWriteMetadata.WriteMetadata = &ofp.OfpInstructionWriteMetadata{Metadata: metadata}
+ inst := ofp.OfpInstruction{Type: uint32(WRITE_METADATA), Data: &instWriteMetadata}
+ instructions = append(instructions, &inst)
+ }
// Process match fields
oxmFields := make([]*ofp.OfpOxmField, 0)
@@ -745,7 +897,7 @@
// MkFlowStat is a helper method to build flows
func MkFlowStat(fa *FlowArgs) *ofp.OfpFlowStats {
- //Build the matchfields
+ //Build the match-fields
matchFields := make([]*ofp.OfpOxmField, 0)
for _, val := range fa.MatchFields {
matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})