Make now runs lint and any unit tests.
Also update vendored voltha-go to add new api updates
Change-Id: I08e11ae043b1db46fed4cc64fddc890a6729dedf
diff --git a/.gitignore b/.gitignore
index 229b4bd..69d9456 100644
--- a/.gitignore
+++ b/.gitignore
@@ -65,3 +65,5 @@
# Development versions of locally built libraries
python/local_imports
+# test output
+tests/results
diff --git a/Gopkg.lock b/Gopkg.lock
index 5342cc2..a5d667f 100644
--- a/Gopkg.lock
+++ b/Gopkg.lock
@@ -216,7 +216,7 @@
[[projects]]
branch = "master"
- digest = "1:d2c1e6c77d74f59631dc3d14172386e602617d54d9bd87a97876049cf80090bb"
+ digest = "1:89203eb51605633cffa2a11f70f3547c2ac040fcf1b8e45e6a16124312caf7b4"
name = "github.com/opencord/voltha-go"
packages = [
"adapters",
@@ -233,7 +233,7 @@
"rw_core/utils",
]
pruneopts = "UT"
- revision = "142516e3824069a87b34b9255df09f4569213953"
+ revision = "3ab34888e669e50c0ff7e412eba61adaefff48ed"
[[projects]]
branch = "master"
diff --git a/Makefile b/Makefile
index f272c3c..4e350ee 100644
--- a/Makefile
+++ b/Makefile
@@ -93,4 +93,50 @@
endif
docker build $(DOCKER_BUILD_ARGS) -t ${REGISTRY}${REPOSITORY}voltha-openolt-adapter-go:${TAG} -f docker/Dockerfile.openolt .
+lint-style:
+ifeq (,$(shell which gofmt))
+ go get -u github.com/golang/go/src/cmd/gofmt
+endif
+ @echo "Running style check..."
+ @gofmt_out="$$(gofmt -l $$(find . -name '*.go' -not -path './vendor/*'))" ;\
+ if [ ! -z "$$gofmt_out" ]; then \
+ echo "$$gofmt_out" ;\
+ echo "Style check failed on one or more files ^, run 'go fmt' to fix." ;\
+ exit 1 ;\
+ fi
+ @echo "Style check OK"
+
+lint-sanity:
+ @echo "Running sanity check..."
+ @go vet ./...
+ @echo "Sanity check OK"
+
+lint-dep:
+ @echo "Running dependency check..."
+ @dep check
+ @echo "Dependency check OK"
+
+lint: lint-style lint-sanity lint-dep
+
+GO_JUNIT_REPORT:=$(shell which go-junit-report)
+GOCOVER_COBERTURA:=$(shell which gocover-cobertura)
+test:
+ifeq (,$(GO_JUNIT_REPORT))
+ go get -u github.com/jstemmer/go-junit-report
+ @GO_JUNIT_REPORT=$(GOPATH)/bin/go-junit-report
+endif
+
+ifeq (,$(GOCOVER_COBERTURA))
+ go get -u github.com/t-yuki/gocover-cobertura
+ @GOCOVER_COBERTURA=$(GOPATH)/bin/gocover-cobertura
+endif
+
+ @mkdir -p ./tests/results
+
+ @go test -v -coverprofile ./tests/results/go-test-coverage.out -covermode count ./... 2>&1 | tee ./tests/results/go-test-results.out ;\
+ RETURN=$$? ;\
+ $(GO_JUNIT_REPORT) < ./tests/results/go-test-results.out > ./tests/results/go-test-results.xml ;\
+ $(GOCOVER_COBERTURA) < ./tests/results/go-test-coverage.out > ./tests/results/go-test-coverage.xml ;\
+ exit $$RETURN
+
# end file
diff --git a/vendor/github.com/opencord/voltha-go/adapters/common/core_proxy.go b/vendor/github.com/opencord/voltha-go/adapters/common/core_proxy.go
index 738a77a..5bbd176 100644
--- a/vendor/github.com/opencord/voltha-go/adapters/common/core_proxy.go
+++ b/vendor/github.com/opencord/voltha-go/adapters/common/core_proxy.go
@@ -34,7 +34,6 @@
coreTopic string
deviceIdCoreMap map[string]string
lockDeviceIdCoreMap sync.RWMutex
-
}
func NewCoreProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
@@ -153,6 +152,32 @@
return unPackResponse(rpc, deviceId, success, result)
}
+func (ap *CoreProxy) PortsStateUpdate(ctx context.Context, deviceId string, operStatus voltha.OperStatus_OperStatus) error {
+ log.Debugw("PortsStateUpdate", log.Fields{"deviceId": deviceId})
+ rpc := "PortsStateUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ args := make([]*kafka.KVArg, 2)
+ id := &voltha.ID{Id: deviceId}
+ oStatus := &ic.IntType{Val: int64(operStatus)}
+
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "oper_status",
+ Value: oStatus,
+ }
+
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := ap.getAdapterTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ log.Debugw("PortsStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
+ return unPackResponse(rpc, deviceId, success, result)
+}
+
func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
connStatus voltha.ConnectStatus_ConnectStatus, operStatus voltha.OperStatus_OperStatus) error {
log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId})
@@ -185,7 +210,7 @@
}
func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
- childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64 ) error {
+ childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64) error {
log.Debugw("ChildDeviceDetected", log.Fields{"pPeviceId": parentDeviceId, "channelId": channelId})
rpc := "ChildDeviceDetected"
// Use a device specific topic to send the request. The adapter handling the device creates a device
diff --git a/vendor/github.com/opencord/voltha-go/adapters/common/request_handler.go b/vendor/github.com/opencord/voltha-go/adapters/common/request_handler.go
index d16ad95..8b582b8 100644
--- a/vendor/github.com/opencord/voltha-go/adapters/common/request_handler.go
+++ b/vendor/github.com/opencord/voltha-go/adapters/common/request_handler.go
@@ -23,8 +23,8 @@
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/kafka"
ic "github.com/opencord/voltha-protos/go/inter_container"
+ "github.com/opencord/voltha-protos/go/openflow_13"
"github.com/opencord/voltha-protos/go/voltha"
- "github.com/opencord/voltha-protos/go/openflow_13"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -33,7 +33,7 @@
TestMode bool
coreInstanceId string
adapter adapters.IAdapter
- coreProxy *CoreProxy
+ coreProxy *CoreProxy
}
func NewRequestHandlerProxy(coreInstanceId string, iadapter adapters.IAdapter, cProxy *CoreProxy) *RequestHandlerProxy {
@@ -107,10 +107,78 @@
}
func (rhp *RequestHandlerProxy) Disable_device(args []*ic.Argument) (*empty.Empty, error) {
+ if len(args) < 3 {
+ log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+
+ device := &voltha.Device{}
+ transactionID := &ic.StrType{}
+ fromTopic := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.FromTopic:
+ if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+ log.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ //Update the core reference for that device
+ rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
+ //Invoke the Disable_device API on the adapter
+ if err := rhp.adapter.Disable_device(device); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
return new(empty.Empty), nil
}
func (rhp *RequestHandlerProxy) Reenable_device(args []*ic.Argument) (*empty.Empty, error) {
+ if len(args) < 3 {
+ log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+
+ device := &voltha.Device{}
+ transactionID := &ic.StrType{}
+ fromTopic := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.FromTopic:
+ if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+ log.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ //Update the core reference for that device
+ rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
+ //Invoke the Reenable_device API on the adapter
+ if err := rhp.adapter.Reenable_device(device); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
return new(empty.Empty), nil
}
@@ -135,7 +203,7 @@
}
func (rhp *RequestHandlerProxy) Update_flows_incrementally(args []*ic.Argument) (*empty.Empty, error) {
- log.Debug("Update_flows_incrementally")
+ log.Debug("Update_flows_incrementally")
if len(args) < 3 {
log.Warn("Update_flows_incrementally-invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
@@ -143,8 +211,8 @@
}
device := &voltha.Device{}
transactionID := &ic.StrType{}
- flows := &openflow_13.FlowChanges{}
- groups := &openflow_13.FlowGroupChanges{}
+ flows := &openflow_13.FlowChanges{}
+ groups := &openflow_13.FlowGroupChanges{}
for _, arg := range args {
switch arg.Key {
case "device":
@@ -169,11 +237,11 @@
}
}
}
- log.Debugw("Update_flows_incrementally",log.Fields{"flows":flows,"groups":groups})
- //Invoke the adopt device on the adapter
- if err := rhp.adapter.Update_flows_incrementally(device,flows,groups); err != nil {
- return nil, status.Errorf(codes.NotFound, "%s", err.Error())
- }
+ log.Debugw("Update_flows_incrementally", log.Fields{"flows": flows, "groups": groups})
+ //Invoke the adopt device on the adapter
+ if err := rhp.adapter.Update_flows_incrementally(device, flows, groups); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
return new(empty.Empty), nil
}
diff --git a/vendor/github.com/opencord/voltha-go/adapters/common/utils.go b/vendor/github.com/opencord/voltha-go/adapters/common/utils.go
index 810a3d0..d3c562a 100644
--- a/vendor/github.com/opencord/voltha-go/adapters/common/utils.go
+++ b/vendor/github.com/opencord/voltha-go/adapters/common/utils.go
@@ -70,4 +70,4 @@
remain--
}
return string(b)
-}
\ No newline at end of file
+}
diff --git a/vendor/github.com/opencord/voltha-go/common/log/log.go b/vendor/github.com/opencord/voltha-go/common/log/log.go
index 16fed74..33100dc 100644
--- a/vendor/github.com/opencord/voltha-go/common/log/log.go
+++ b/vendor/github.com/opencord/voltha-go/common/log/log.go
@@ -179,7 +179,6 @@
return ErrorLevel
}
-
func getDefaultConfig(outputType string, level int, defaultFields Fields) zp.Config {
return zp.Config{
Level: intToAtomicLevel(level),
diff --git a/vendor/github.com/opencord/voltha-go/common/ponresourcemanager/ponresourcemanager.go b/vendor/github.com/opencord/voltha-go/common/ponresourcemanager/ponresourcemanager.go
index 2a3cae6..c37307b 100755
--- a/vendor/github.com/opencord/voltha-go/common/ponresourcemanager/ponresourcemanager.go
+++ b/vendor/github.com/opencord/voltha-go/common/ponresourcemanager/ponresourcemanager.go
@@ -17,184 +17,184 @@
package ponresourcemanager
import (
- "encoding/base64"
- "encoding/json"
- "errors"
- "fmt"
- "strconv"
+ "encoding/base64"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "strconv"
- bitmap "github.com/boljen/go-bitmap"
- "github.com/opencord/voltha-go/common/log"
- "github.com/opencord/voltha-go/db/kvstore"
- "github.com/opencord/voltha-go/db/model"
- tp "github.com/opencord/voltha-go/common/techprofile"
+ bitmap "github.com/boljen/go-bitmap"
+ "github.com/opencord/voltha-go/common/log"
+ tp "github.com/opencord/voltha-go/common/techprofile"
+ "github.com/opencord/voltha-go/db/kvstore"
+ "github.com/opencord/voltha-go/db/model"
)
const (
- //Constants to identify resource pool
- UNI_ID = "UNI_ID"
- ONU_ID = "ONU_ID"
- ALLOC_ID = "ALLOC_ID"
- GEMPORT_ID = "GEMPORT_ID"
- FLOW_ID = "FLOW_ID"
+ //Constants to identify resource pool
+ UNI_ID = "UNI_ID"
+ ONU_ID = "ONU_ID"
+ ALLOC_ID = "ALLOC_ID"
+ GEMPORT_ID = "GEMPORT_ID"
+ FLOW_ID = "FLOW_ID"
- //Constants for passing command line arugments
- OLT_MODEL_ARG = "--olt_model"
- PATH_PREFIX = "service/voltha/resource_manager/{%s}"
- /*The resource ranges for a given device model should be placed
- at 'resource_manager/<technology>/resource_ranges/<olt_model_type>'
- path on the KV store.
- If Resource Range parameters are to be read from the external KV store,
- they are expected to be stored in the following format.
- Note: All parameters are MANDATORY for now.
- constants used as keys to reference the resource range parameters from
- and external KV store.
- */
- UNI_ID_START_IDX = "uni_id_start"
- UNI_ID_END_IDX = "uni_id_end"
- ONU_ID_START_IDX = "onu_id_start"
- ONU_ID_END_IDX = "onu_id_end"
- ONU_ID_SHARED_IDX = "onu_id_shared"
- ALLOC_ID_START_IDX = "alloc_id_start"
- ALLOC_ID_END_IDX = "alloc_id_end"
- ALLOC_ID_SHARED_IDX = "alloc_id_shared"
- GEMPORT_ID_START_IDX = "gemport_id_start"
- GEMPORT_ID_END_IDX = "gemport_id_end"
- GEMPORT_ID_SHARED_IDX = "gemport_id_shared"
- FLOW_ID_START_IDX = "flow_id_start"
- FLOW_ID_END_IDX = "flow_id_end"
- FLOW_ID_SHARED_IDX = "flow_id_shared"
- NUM_OF_PON_PORT = "pon_ports"
+ //Constants for passing command line arugments
+ OLT_MODEL_ARG = "--olt_model"
+ PATH_PREFIX = "service/voltha/resource_manager/{%s}"
+ /*The resource ranges for a given device model should be placed
+ at 'resource_manager/<technology>/resource_ranges/<olt_model_type>'
+ path on the KV store.
+ If Resource Range parameters are to be read from the external KV store,
+ they are expected to be stored in the following format.
+ Note: All parameters are MANDATORY for now.
+ constants used as keys to reference the resource range parameters from
+ and external KV store.
+ */
+ UNI_ID_START_IDX = "uni_id_start"
+ UNI_ID_END_IDX = "uni_id_end"
+ ONU_ID_START_IDX = "onu_id_start"
+ ONU_ID_END_IDX = "onu_id_end"
+ ONU_ID_SHARED_IDX = "onu_id_shared"
+ ALLOC_ID_START_IDX = "alloc_id_start"
+ ALLOC_ID_END_IDX = "alloc_id_end"
+ ALLOC_ID_SHARED_IDX = "alloc_id_shared"
+ GEMPORT_ID_START_IDX = "gemport_id_start"
+ GEMPORT_ID_END_IDX = "gemport_id_end"
+ GEMPORT_ID_SHARED_IDX = "gemport_id_shared"
+ FLOW_ID_START_IDX = "flow_id_start"
+ FLOW_ID_END_IDX = "flow_id_end"
+ FLOW_ID_SHARED_IDX = "flow_id_shared"
+ NUM_OF_PON_PORT = "pon_ports"
- /*
- The KV store backend is initialized with a path prefix and we need to
- provide only the suffix.
- */
- PON_RESOURCE_RANGE_CONFIG_PATH = "resource_ranges/%s"
+ /*
+ The KV store backend is initialized with a path prefix and we need to
+ provide only the suffix.
+ */
+ PON_RESOURCE_RANGE_CONFIG_PATH = "resource_ranges/%s"
- //resource path suffix
- //Path on the KV store for storing alloc id ranges and resource pool for a given interface
- //Format: <device_id>/alloc_id_pool/<pon_intf_id>
- ALLOC_ID_POOL_PATH = "{%s}/alloc_id_pool/{%d}"
- //Path on the KV store for storing gemport id ranges and resource pool for a given interface
- //Format: <device_id>/gemport_id_pool/<pon_intf_id>
- GEMPORT_ID_POOL_PATH = "{%s}/gemport_id_pool/{%d}"
- //Path on the KV store for storing onu id ranges and resource pool for a given interface
- //Format: <device_id>/onu_id_pool/<pon_intf_id>
- ONU_ID_POOL_PATH = "{%s}/onu_id_pool/{%d}"
- //Path on the KV store for storing flow id ranges and resource pool for a given interface
- //Format: <device_id>/flow_id_pool/<pon_intf_id>
- FLOW_ID_POOL_PATH = "{%s}/flow_id_pool/{%d}"
+ //resource path suffix
+ //Path on the KV store for storing alloc id ranges and resource pool for a given interface
+ //Format: <device_id>/alloc_id_pool/<pon_intf_id>
+ ALLOC_ID_POOL_PATH = "{%s}/alloc_id_pool/{%d}"
+ //Path on the KV store for storing gemport id ranges and resource pool for a given interface
+ //Format: <device_id>/gemport_id_pool/<pon_intf_id>
+ GEMPORT_ID_POOL_PATH = "{%s}/gemport_id_pool/{%d}"
+ //Path on the KV store for storing onu id ranges and resource pool for a given interface
+ //Format: <device_id>/onu_id_pool/<pon_intf_id>
+ ONU_ID_POOL_PATH = "{%s}/onu_id_pool/{%d}"
+ //Path on the KV store for storing flow id ranges and resource pool for a given interface
+ //Format: <device_id>/flow_id_pool/<pon_intf_id>
+ FLOW_ID_POOL_PATH = "{%s}/flow_id_pool/{%d}"
- //Path on the KV store for storing list of alloc IDs for a given ONU
- //Format: <device_id>/<(pon_intf_id, onu_id)>/alloc_ids
- ALLOC_ID_RESOURCE_MAP_PATH = "{%s}/{%s}/alloc_ids"
+ //Path on the KV store for storing list of alloc IDs for a given ONU
+ //Format: <device_id>/<(pon_intf_id, onu_id)>/alloc_ids
+ ALLOC_ID_RESOURCE_MAP_PATH = "{%s}/{%s}/alloc_ids"
- //Path on the KV store for storing list of gemport IDs for a given ONU
- //Format: <device_id>/<(pon_intf_id, onu_id)>/gemport_ids
- GEMPORT_ID_RESOURCE_MAP_PATH = "{%s}/{%s}/gemport_ids"
+ //Path on the KV store for storing list of gemport IDs for a given ONU
+ //Format: <device_id>/<(pon_intf_id, onu_id)>/gemport_ids
+ GEMPORT_ID_RESOURCE_MAP_PATH = "{%s}/{%s}/gemport_ids"
- //Path on the KV store for storing list of Flow IDs for a given ONU
- //Format: <device_id>/<(pon_intf_id, onu_id)>/flow_ids
- FLOW_ID_RESOURCE_MAP_PATH = "{%s}/{%s}/flow_ids"
+ //Path on the KV store for storing list of Flow IDs for a given ONU
+ //Format: <device_id>/<(pon_intf_id, onu_id)>/flow_ids
+ FLOW_ID_RESOURCE_MAP_PATH = "{%s}/{%s}/flow_ids"
- //Flow Id info: Use to store more metadata associated with the flow_id
- //Format: <device_id>/<(pon_intf_id, onu_id)>/flow_id_info/<flow_id>
- FLOW_ID_INFO_PATH = "{%s}/{%s}/flow_id_info/{%d}"
+ //Flow Id info: Use to store more metadata associated with the flow_id
+ //Format: <device_id>/<(pon_intf_id, onu_id)>/flow_id_info/<flow_id>
+ FLOW_ID_INFO_PATH = "{%s}/{%s}/flow_id_info/{%d}"
- //Constants for internal usage.
- PON_INTF_ID = "pon_intf_id"
- START_IDX = "start_idx"
- END_IDX = "end_idx"
- POOL = "pool"
- NUM_OF_PON_INTF = 16
+ //Constants for internal usage.
+ PON_INTF_ID = "pon_intf_id"
+ START_IDX = "start_idx"
+ END_IDX = "end_idx"
+ POOL = "pool"
+ NUM_OF_PON_INTF = 16
- KVSTORE_RETRY_TIMEOUT = 5
+ KVSTORE_RETRY_TIMEOUT = 5
)
//type ResourceTypeIndex string
//type ResourceType string
type PONResourceManager struct {
- //Implements APIs to initialize/allocate/release alloc/gemport/onu IDs.
- Technology string
- DeviceType string
- DeviceID string
- Backend string // ETCD, or consul
- Host string // host ip of the KV store
- Port int // port number for the KV store
- OLTModel string
- KVStore *model.Backend
- TechProfileMgr *tp.TechProfileMgr
+ //Implements APIs to initialize/allocate/release alloc/gemport/onu IDs.
+ Technology string
+ DeviceType string
+ DeviceID string
+ Backend string // ETCD, or consul
+ Host string // host ip of the KV store
+ Port int // port number for the KV store
+ OLTModel string
+ KVStore *model.Backend
+ TechProfileMgr *tp.TechProfileMgr
- // Below attribute, pon_resource_ranges, should be initialized
- // by reading from KV store.
- PonResourceRanges map[string]interface{}
- SharedResourceMgrs map[string]*PONResourceManager
- SharedIdxByType map[string]string
- IntfIDs []uint32 // list of pon interface IDs
+ // Below attribute, pon_resource_ranges, should be initialized
+ // by reading from KV store.
+ PonResourceRanges map[string]interface{}
+ SharedResourceMgrs map[string]*PONResourceManager
+ SharedIdxByType map[string]string
+ IntfIDs []uint32 // list of pon interface IDs
}
func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
- log.Infow("kv-store-type", log.Fields{"store": storeType})
- switch storeType {
- case "consul":
- return kvstore.NewConsulClient(address, timeout)
- case "etcd":
- return kvstore.NewEtcdClient(address, timeout)
- }
- return nil, errors.New("unsupported-kv-store")
+ log.Infow("kv-store-type", log.Fields{"store": storeType})
+ switch storeType {
+ case "consul":
+ return kvstore.NewConsulClient(address, timeout)
+ case "etcd":
+ return kvstore.NewEtcdClient(address, timeout)
+ }
+ return nil, errors.New("unsupported-kv-store")
}
func SetKVClient(Technology string, Backend string, Host string, Port int) *model.Backend {
- addr := Host + ":" + strconv.Itoa(Port)
- // TODO : Make sure direct call to NewBackend is working fine with backend , currently there is some
- // issue between kv store and backend , core is not calling NewBackend directly
- kvClient, err := newKVClient(Backend, addr, KVSTORE_RETRY_TIMEOUT)
- if err != nil {
- log.Fatalw("Failed to init KV client\n", log.Fields{"err": err})
- return nil
- }
- kvbackend := &model.Backend{
- Client: kvClient,
- StoreType: Backend,
- Host: Host,
- Port: Port,
- Timeout: KVSTORE_RETRY_TIMEOUT,
- PathPrefix: fmt.Sprintf(PATH_PREFIX, Technology)}
+ addr := Host + ":" + strconv.Itoa(Port)
+ // TODO : Make sure direct call to NewBackend is working fine with backend , currently there is some
+ // issue between kv store and backend , core is not calling NewBackend directly
+ kvClient, err := newKVClient(Backend, addr, KVSTORE_RETRY_TIMEOUT)
+ if err != nil {
+ log.Fatalw("Failed to init KV client\n", log.Fields{"err": err})
+ return nil
+ }
+ kvbackend := &model.Backend{
+ Client: kvClient,
+ StoreType: Backend,
+ Host: Host,
+ Port: Port,
+ Timeout: KVSTORE_RETRY_TIMEOUT,
+ PathPrefix: fmt.Sprintf(PATH_PREFIX, Technology)}
- return kvbackend
+ return kvbackend
}
// NewPONResourceManager creates a new PON resource manager.
func NewPONResourceManager(Technology string, DeviceType string, DeviceID string, Backend string, Host string, Port int) (*PONResourceManager, error) {
- var PONMgr PONResourceManager
- PONMgr.Technology = Technology
- PONMgr.DeviceType = DeviceType
- PONMgr.DeviceID = DeviceID
- PONMgr.Backend = Backend
- PONMgr.Host = Host
- PONMgr.Port = Port
- PONMgr.KVStore = SetKVClient(Technology, Backend, Host, Port)
- if PONMgr.KVStore == nil {
- log.Error("KV Client initilization failed")
- return nil, errors.New("Failed to init KV client")
- }
- // Initialize techprofile for this technology
- if PONMgr.TechProfileMgr,_ = tp.NewTechProfile(&PONMgr);PONMgr.TechProfileMgr == nil{
- log.Error("Techprofile initialization failed")
- return nil,errors.New("Failed to init tech profile")
- }
- PONMgr.PonResourceRanges = make(map[string]interface{})
- PONMgr.SharedResourceMgrs = make(map[string]*PONResourceManager)
- PONMgr.SharedIdxByType = make(map[string]string)
- PONMgr.SharedIdxByType[ONU_ID] = ONU_ID_SHARED_IDX
- PONMgr.SharedIdxByType[ALLOC_ID] = ALLOC_ID_SHARED_IDX
- PONMgr.SharedIdxByType[GEMPORT_ID] = GEMPORT_ID_SHARED_IDX
- PONMgr.SharedIdxByType[FLOW_ID] = FLOW_ID_SHARED_IDX
- PONMgr.IntfIDs = make([]uint32, NUM_OF_PON_INTF)
- PONMgr.OLTModel = DeviceType
- return &PONMgr, nil
+ var PONMgr PONResourceManager
+ PONMgr.Technology = Technology
+ PONMgr.DeviceType = DeviceType
+ PONMgr.DeviceID = DeviceID
+ PONMgr.Backend = Backend
+ PONMgr.Host = Host
+ PONMgr.Port = Port
+ PONMgr.KVStore = SetKVClient(Technology, Backend, Host, Port)
+ if PONMgr.KVStore == nil {
+ log.Error("KV Client initilization failed")
+ return nil, errors.New("Failed to init KV client")
+ }
+ // Initialize techprofile for this technology
+ if PONMgr.TechProfileMgr, _ = tp.NewTechProfile(&PONMgr); PONMgr.TechProfileMgr == nil {
+ log.Error("Techprofile initialization failed")
+ return nil, errors.New("Failed to init tech profile")
+ }
+ PONMgr.PonResourceRanges = make(map[string]interface{})
+ PONMgr.SharedResourceMgrs = make(map[string]*PONResourceManager)
+ PONMgr.SharedIdxByType = make(map[string]string)
+ PONMgr.SharedIdxByType[ONU_ID] = ONU_ID_SHARED_IDX
+ PONMgr.SharedIdxByType[ALLOC_ID] = ALLOC_ID_SHARED_IDX
+ PONMgr.SharedIdxByType[GEMPORT_ID] = GEMPORT_ID_SHARED_IDX
+ PONMgr.SharedIdxByType[FLOW_ID] = FLOW_ID_SHARED_IDX
+ PONMgr.IntfIDs = make([]uint32, NUM_OF_PON_INTF)
+ PONMgr.OLTModel = DeviceType
+ return &PONMgr, nil
}
/*
@@ -205,909 +205,907 @@
*/
func (PONRMgr *PONResourceManager) InitResourceRangesFromKVStore() bool {
- //Initialize PON resource ranges with config fetched from kv store.
- //:return boolean: True if PON resource ranges initialized else false
- // Try to initialize the PON Resource Ranges from KV store based on the
- // OLT model key, if available
- if PONRMgr.OLTModel == "" {
- log.Error("Failed to get OLT model")
- return false
- }
- Path := fmt.Sprintf(PON_RESOURCE_RANGE_CONFIG_PATH, PONRMgr.OLTModel)
- //get resource from kv store
- Result, err := PONRMgr.KVStore.Get(Path)
- if err != nil {
- log.Debugf("Error in fetching resource %s from KV strore", Path)
- return false
- }
- if Result == nil {
- log.Debug("There may be no resources in the KV store in case of fresh bootup, return true")
- return false
- }
- //update internal ranges from kv ranges. If there are missing
- // values in the KV profile, continue to use the defaults
- Value, err := ToByte(Result.Value)
- if err != nil {
- log.Error("Failed to convert kvpair to byte string")
- return false
- }
- if err := json.Unmarshal(Value, &PONRMgr.PonResourceRanges); err != nil {
- log.Error("Failed to Unmarshal json byte")
- return false
- }
- log.Debug("Init resource ranges from kvstore success")
- return true
+ //Initialize PON resource ranges with config fetched from kv store.
+ //:return boolean: True if PON resource ranges initialized else false
+ // Try to initialize the PON Resource Ranges from KV store based on the
+ // OLT model key, if available
+ if PONRMgr.OLTModel == "" {
+ log.Error("Failed to get OLT model")
+ return false
+ }
+ Path := fmt.Sprintf(PON_RESOURCE_RANGE_CONFIG_PATH, PONRMgr.OLTModel)
+ //get resource from kv store
+ Result, err := PONRMgr.KVStore.Get(Path)
+ if err != nil {
+ log.Debugf("Error in fetching resource %s from KV strore", Path)
+ return false
+ }
+ if Result == nil {
+ log.Debug("There may be no resources in the KV store in case of fresh bootup, return true")
+ return false
+ }
+ //update internal ranges from kv ranges. If there are missing
+ // values in the KV profile, continue to use the defaults
+ Value, err := ToByte(Result.Value)
+ if err != nil {
+ log.Error("Failed to convert kvpair to byte string")
+ return false
+ }
+ if err := json.Unmarshal(Value, &PONRMgr.PonResourceRanges); err != nil {
+ log.Error("Failed to Unmarshal json byte")
+ return false
+ }
+ log.Debug("Init resource ranges from kvstore success")
+ return true
}
func (PONRMgr *PONResourceManager) UpdateRanges(StartIDx string, StartID uint32, EndIDx string, EndID uint32,
- SharedIDx string, SharedPoolID uint32, RMgr *PONResourceManager) {
- /*
- Update the ranges for all reosurce type in the intermnal maps
- param: resource type start index
- param: start ID
- param: resource type end index
- param: end ID
- param: resource type shared index
- param: shared pool id
- param: global resource manager
- */
- log.Debugf("update ranges for %s, %d", StartIDx, StartID)
+ SharedIDx string, SharedPoolID uint32, RMgr *PONResourceManager) {
+ /*
+ Update the ranges for all reosurce type in the intermnal maps
+ param: resource type start index
+ param: start ID
+ param: resource type end index
+ param: end ID
+ param: resource type shared index
+ param: shared pool id
+ param: global resource manager
+ */
+ log.Debugf("update ranges for %s, %d", StartIDx, StartID)
- if StartID != 0 {
- PONRMgr.PonResourceRanges[StartIDx] = StartID
- }
- if EndID != 0 {
- PONRMgr.PonResourceRanges[EndIDx] = EndID
- }
- //if SharedPoolID != 0 {
- PONRMgr.PonResourceRanges[SharedIDx] = SharedPoolID
- //}
- if RMgr != nil {
- PONRMgr.SharedResourceMgrs[SharedIDx] = RMgr
- }
+ if StartID != 0 {
+ PONRMgr.PonResourceRanges[StartIDx] = StartID
+ }
+ if EndID != 0 {
+ PONRMgr.PonResourceRanges[EndIDx] = EndID
+ }
+ //if SharedPoolID != 0 {
+ PONRMgr.PonResourceRanges[SharedIDx] = SharedPoolID
+ //}
+ if RMgr != nil {
+ PONRMgr.SharedResourceMgrs[SharedIDx] = RMgr
+ }
}
func (PONRMgr *PONResourceManager) InitDefaultPONResourceRanges(ONUIDStart uint32,
- ONUIDEnd uint32,
- ONUIDSharedPoolID uint32,
- AllocIDStart uint32,
- AllocIDEnd uint32,
- AllocIDSharedPoolID uint32,
- GEMPortIDStart uint32,
- GEMPortIDEnd uint32,
- GEMPortIDSharedPoolID uint32,
- FlowIDStart uint32,
- FlowIDEnd uint32,
- FlowIDSharedPoolID uint32,
- UNIIDStart uint32,
- UNIIDEnd uint32,
- NoOfPONPorts uint32,
- IntfIDs []uint32) bool {
+ ONUIDEnd uint32,
+ ONUIDSharedPoolID uint32,
+ AllocIDStart uint32,
+ AllocIDEnd uint32,
+ AllocIDSharedPoolID uint32,
+ GEMPortIDStart uint32,
+ GEMPortIDEnd uint32,
+ GEMPortIDSharedPoolID uint32,
+ FlowIDStart uint32,
+ FlowIDEnd uint32,
+ FlowIDSharedPoolID uint32,
+ UNIIDStart uint32,
+ UNIIDEnd uint32,
+ NoOfPONPorts uint32,
+ IntfIDs []uint32) bool {
- /*Initialize default PON resource ranges
+ /*Initialize default PON resource ranges
- :param onu_id_start_idx: onu id start index
- :param onu_id_end_idx: onu id end index
- :param onu_id_shared_pool_id: pool idx for id shared by all intfs or None for no sharing
- :param alloc_id_start_idx: alloc id start index
- :param alloc_id_end_idx: alloc id end index
- :param alloc_id_shared_pool_id: pool idx for alloc id shared by all intfs or None for no sharing
- :param gemport_id_start_idx: gemport id start index
- :param gemport_id_end_idx: gemport id end index
- :param gemport_id_shared_pool_id: pool idx for gemport id shared by all intfs or None for no sharing
- :param flow_id_start_idx: flow id start index
- :param flow_id_end_idx: flow id end index
- :param flow_id_shared_pool_id: pool idx for flow id shared by all intfs or None for no sharing
- :param num_of_pon_ports: number of PON ports
- :param intf_ids: interfaces serviced by this manager
- */
- PONRMgr.UpdateRanges(ONU_ID_START_IDX, ONUIDStart, ONU_ID_END_IDX, ONUIDEnd, ONU_ID_SHARED_IDX, ONUIDSharedPoolID, nil)
- PONRMgr.UpdateRanges(ALLOC_ID_START_IDX, AllocIDStart, ALLOC_ID_END_IDX, AllocIDEnd, ALLOC_ID_SHARED_IDX, AllocIDSharedPoolID, nil)
- PONRMgr.UpdateRanges(GEMPORT_ID_START_IDX, GEMPortIDStart, GEMPORT_ID_END_IDX, GEMPortIDEnd, GEMPORT_ID_SHARED_IDX, GEMPortIDSharedPoolID, nil)
- PONRMgr.UpdateRanges(FLOW_ID_START_IDX, FlowIDStart, FLOW_ID_END_IDX, FlowIDEnd, FLOW_ID_SHARED_IDX, FlowIDSharedPoolID, nil)
- PONRMgr.UpdateRanges(UNI_ID_START_IDX, UNIIDStart, UNI_ID_END_IDX, UNIIDEnd, "", 0, nil)
- log.Debug("Initialize default range values")
- var i uint32
- if IntfIDs == nil {
- for i = 0; i < NoOfPONPorts; i++ {
- PONRMgr.IntfIDs = append(PONRMgr.IntfIDs, i)
- }
- } else {
- PONRMgr.IntfIDs = IntfIDs
- }
- return true
+ :param onu_id_start_idx: onu id start index
+ :param onu_id_end_idx: onu id end index
+ :param onu_id_shared_pool_id: pool idx for id shared by all intfs or None for no sharing
+ :param alloc_id_start_idx: alloc id start index
+ :param alloc_id_end_idx: alloc id end index
+ :param alloc_id_shared_pool_id: pool idx for alloc id shared by all intfs or None for no sharing
+ :param gemport_id_start_idx: gemport id start index
+ :param gemport_id_end_idx: gemport id end index
+ :param gemport_id_shared_pool_id: pool idx for gemport id shared by all intfs or None for no sharing
+ :param flow_id_start_idx: flow id start index
+ :param flow_id_end_idx: flow id end index
+ :param flow_id_shared_pool_id: pool idx for flow id shared by all intfs or None for no sharing
+ :param num_of_pon_ports: number of PON ports
+ :param intf_ids: interfaces serviced by this manager
+ */
+ PONRMgr.UpdateRanges(ONU_ID_START_IDX, ONUIDStart, ONU_ID_END_IDX, ONUIDEnd, ONU_ID_SHARED_IDX, ONUIDSharedPoolID, nil)
+ PONRMgr.UpdateRanges(ALLOC_ID_START_IDX, AllocIDStart, ALLOC_ID_END_IDX, AllocIDEnd, ALLOC_ID_SHARED_IDX, AllocIDSharedPoolID, nil)
+ PONRMgr.UpdateRanges(GEMPORT_ID_START_IDX, GEMPortIDStart, GEMPORT_ID_END_IDX, GEMPortIDEnd, GEMPORT_ID_SHARED_IDX, GEMPortIDSharedPoolID, nil)
+ PONRMgr.UpdateRanges(FLOW_ID_START_IDX, FlowIDStart, FLOW_ID_END_IDX, FlowIDEnd, FLOW_ID_SHARED_IDX, FlowIDSharedPoolID, nil)
+ PONRMgr.UpdateRanges(UNI_ID_START_IDX, UNIIDStart, UNI_ID_END_IDX, UNIIDEnd, "", 0, nil)
+ log.Debug("Initialize default range values")
+ var i uint32
+ if IntfIDs == nil {
+ for i = 0; i < NoOfPONPorts; i++ {
+ PONRMgr.IntfIDs = append(PONRMgr.IntfIDs, i)
+ }
+ } else {
+ PONRMgr.IntfIDs = IntfIDs
+ }
+ return true
}
func (PONRMgr *PONResourceManager) InitDeviceResourcePool() error {
- //Initialize resource pool for all PON ports.
+ //Initialize resource pool for all PON ports.
- log.Debug("Init resource ranges")
+ log.Debug("Init resource ranges")
- var err error
- for _, Intf := range PONRMgr.IntfIDs {
- SharedPoolID := PONRMgr.PonResourceRanges[ONU_ID_SHARED_IDX].(uint32)
- if SharedPoolID != 0 {
- Intf = SharedPoolID
- }
- if err = PONRMgr.InitResourceIDPool(Intf, ONU_ID,
- PONRMgr.PonResourceRanges[ONU_ID_START_IDX].(uint32),
- PONRMgr.PonResourceRanges[ONU_ID_END_IDX].(uint32)); err != nil {
- log.Error("Failed to init ONU ID resource pool")
- return err
- }
- if SharedPoolID != 0 {
- break
- }
- }
+ var err error
+ for _, Intf := range PONRMgr.IntfIDs {
+ SharedPoolID := PONRMgr.PonResourceRanges[ONU_ID_SHARED_IDX].(uint32)
+ if SharedPoolID != 0 {
+ Intf = SharedPoolID
+ }
+ if err = PONRMgr.InitResourceIDPool(Intf, ONU_ID,
+ PONRMgr.PonResourceRanges[ONU_ID_START_IDX].(uint32),
+ PONRMgr.PonResourceRanges[ONU_ID_END_IDX].(uint32)); err != nil {
+ log.Error("Failed to init ONU ID resource pool")
+ return err
+ }
+ if SharedPoolID != 0 {
+ break
+ }
+ }
- for _, Intf := range PONRMgr.IntfIDs {
- SharedPoolID := PONRMgr.PonResourceRanges[ALLOC_ID_SHARED_IDX].(uint32)
- if SharedPoolID != 0 {
- Intf = SharedPoolID
- }
- if err = PONRMgr.InitResourceIDPool(Intf, ALLOC_ID,
- PONRMgr.PonResourceRanges[ALLOC_ID_START_IDX].(uint32),
- PONRMgr.PonResourceRanges[ALLOC_ID_END_IDX].(uint32)); err != nil {
- log.Error("Failed to init ALLOC ID resource pool ")
- return err
- }
- if SharedPoolID != 0 {
- break
- }
- }
- for _, Intf := range PONRMgr.IntfIDs {
- SharedPoolID := PONRMgr.PonResourceRanges[GEMPORT_ID_SHARED_IDX].(uint32)
- if SharedPoolID != 0 {
- Intf = SharedPoolID
- }
- if err = PONRMgr.InitResourceIDPool(Intf, GEMPORT_ID,
- PONRMgr.PonResourceRanges[GEMPORT_ID_START_IDX].(uint32),
- PONRMgr.PonResourceRanges[GEMPORT_ID_END_IDX].(uint32)); err != nil {
- log.Error("Failed to init GEMPORT ID resource pool")
- return err
- }
- if SharedPoolID != 0 {
- break
- }
- }
+ for _, Intf := range PONRMgr.IntfIDs {
+ SharedPoolID := PONRMgr.PonResourceRanges[ALLOC_ID_SHARED_IDX].(uint32)
+ if SharedPoolID != 0 {
+ Intf = SharedPoolID
+ }
+ if err = PONRMgr.InitResourceIDPool(Intf, ALLOC_ID,
+ PONRMgr.PonResourceRanges[ALLOC_ID_START_IDX].(uint32),
+ PONRMgr.PonResourceRanges[ALLOC_ID_END_IDX].(uint32)); err != nil {
+ log.Error("Failed to init ALLOC ID resource pool ")
+ return err
+ }
+ if SharedPoolID != 0 {
+ break
+ }
+ }
+ for _, Intf := range PONRMgr.IntfIDs {
+ SharedPoolID := PONRMgr.PonResourceRanges[GEMPORT_ID_SHARED_IDX].(uint32)
+ if SharedPoolID != 0 {
+ Intf = SharedPoolID
+ }
+ if err = PONRMgr.InitResourceIDPool(Intf, GEMPORT_ID,
+ PONRMgr.PonResourceRanges[GEMPORT_ID_START_IDX].(uint32),
+ PONRMgr.PonResourceRanges[GEMPORT_ID_END_IDX].(uint32)); err != nil {
+ log.Error("Failed to init GEMPORT ID resource pool")
+ return err
+ }
+ if SharedPoolID != 0 {
+ break
+ }
+ }
- for _, Intf := range PONRMgr.IntfIDs {
- SharedPoolID := PONRMgr.PonResourceRanges[FLOW_ID_SHARED_IDX].(uint32)
- if SharedPoolID != 0 {
- Intf = SharedPoolID
- }
- if err = PONRMgr.InitResourceIDPool(Intf, FLOW_ID,
- PONRMgr.PonResourceRanges[FLOW_ID_START_IDX].(uint32),
- PONRMgr.PonResourceRanges[FLOW_ID_END_IDX].(uint32)); err != nil {
- log.Error("Failed to init FLOW ID resource pool")
- return err
- }
- if SharedPoolID != 0 {
- break
- }
- }
- return err
+ for _, Intf := range PONRMgr.IntfIDs {
+ SharedPoolID := PONRMgr.PonResourceRanges[FLOW_ID_SHARED_IDX].(uint32)
+ if SharedPoolID != 0 {
+ Intf = SharedPoolID
+ }
+ if err = PONRMgr.InitResourceIDPool(Intf, FLOW_ID,
+ PONRMgr.PonResourceRanges[FLOW_ID_START_IDX].(uint32),
+ PONRMgr.PonResourceRanges[FLOW_ID_END_IDX].(uint32)); err != nil {
+ log.Error("Failed to init FLOW ID resource pool")
+ return err
+ }
+ if SharedPoolID != 0 {
+ break
+ }
+ }
+ return err
}
func (PONRMgr *PONResourceManager) InitResourceIDPool(Intf uint32, ResourceType string, StartID uint32, EndID uint32) error {
- /*Initialize Resource ID pool for a given Resource Type on a given PON Port
+ /*Initialize Resource ID pool for a given Resource Type on a given PON Port
- :param pon_intf_id: OLT PON interface id
- :param resource_type: String to identify type of resource
- :param start_idx: start index for onu id pool
- :param end_idx: end index for onu id pool
- :return boolean: True if resource id pool initialized else false
- */
+ :param pon_intf_id: OLT PON interface id
+ :param resource_type: String to identify type of resource
+ :param start_idx: start index for onu id pool
+ :param end_idx: end index for onu id pool
+ :return boolean: True if resource id pool initialized else false
+ */
- // delegate to the master instance if sharing enabled across instances
- SharedResourceMgr := PONRMgr.SharedResourceMgrs[PONRMgr.SharedIdxByType[ResourceType]]
- if SharedResourceMgr != nil && PONRMgr != SharedResourceMgr {
- return SharedResourceMgr.InitResourceIDPool(Intf, ResourceType, StartID, EndID)
- }
+ // delegate to the master instance if sharing enabled across instances
+ SharedResourceMgr := PONRMgr.SharedResourceMgrs[PONRMgr.SharedIdxByType[ResourceType]]
+ if SharedResourceMgr != nil && PONRMgr != SharedResourceMgr {
+ return SharedResourceMgr.InitResourceIDPool(Intf, ResourceType, StartID, EndID)
+ }
- Path := PONRMgr.GetPath(Intf, ResourceType)
- if Path == "" {
- log.Errorf("Failed to get path for resource type %s", ResourceType)
- return errors.New(fmt.Sprintf("Failed to get path for resource type %s", ResourceType))
- }
+ Path := PONRMgr.GetPath(Intf, ResourceType)
+ if Path == "" {
+ log.Errorf("Failed to get path for resource type %s", ResourceType)
+ return errors.New(fmt.Sprintf("Failed to get path for resource type %s", ResourceType))
+ }
- //In case of adapter reboot and reconciliation resource in kv store
- //checked for its presence if not kv store update happens
- Res, err := PONRMgr.GetResource(Path)
- if (err == nil) && (Res != nil) {
- log.Debugf("Resource %s already present in store ", Path)
- return nil
- } else {
- FormatResult, err := PONRMgr.FormatResource(Intf, StartID, EndID)
- if err != nil {
- log.Errorf("Failed to format resource")
- return err
- }
- // Add resource as json in kv store.
- err = PONRMgr.KVStore.Put(Path, FormatResult)
- if err == nil {
- log.Debug("Successfuly posted to kv store")
- return err
- }
- }
+ //In case of adapter reboot and reconciliation resource in kv store
+ //checked for its presence if not kv store update happens
+ Res, err := PONRMgr.GetResource(Path)
+ if (err == nil) && (Res != nil) {
+ log.Debugf("Resource %s already present in store ", Path)
+ return nil
+ } else {
+ FormatResult, err := PONRMgr.FormatResource(Intf, StartID, EndID)
+ if err != nil {
+ log.Errorf("Failed to format resource")
+ return err
+ }
+ // Add resource as json in kv store.
+ err = PONRMgr.KVStore.Put(Path, FormatResult)
+ if err == nil {
+ log.Debug("Successfuly posted to kv store")
+ return err
+ }
+ }
- log.Debug("Error initializing pool")
+ log.Debug("Error initializing pool")
- return err
+ return err
}
func (PONRMgr *PONResourceManager) FormatResource(IntfID uint32, StartIDx uint32, EndIDx uint32) ([]byte, error) {
- /*
- Format resource as json.
- :param pon_intf_id: OLT PON interface id
- :param start_idx: start index for id pool
- :param end_idx: end index for id pool
- :return dictionary: resource formatted as map
- */
- // Format resource as json to be stored in backend store
- Resource := make(map[string]interface{})
- Resource[PON_INTF_ID] = IntfID
- Resource[START_IDX] = StartIDx
- Resource[END_IDX] = EndIDx
- /*
- Resource pool stored in backend store as binary string.
- Tracking the resource allocation will be done by setting the bits \
- in the byte array. The index set will be the resource number allocated.
- */
- var TSData *bitmap.Threadsafe
- if TSData = bitmap.NewTS(int(EndIDx)); TSData == nil {
- log.Error("Failed to create a bitmap")
- return nil, errors.New("Failed to create bitmap")
- }
- Resource[POOL] = TSData.Data(false) //we pass false so as the TSData lib api does not do a copy of the data and return
+ /*
+ Format resource as json.
+ :param pon_intf_id: OLT PON interface id
+ :param start_idx: start index for id pool
+ :param end_idx: end index for id pool
+ :return dictionary: resource formatted as map
+ */
+ // Format resource as json to be stored in backend store
+ Resource := make(map[string]interface{})
+ Resource[PON_INTF_ID] = IntfID
+ Resource[START_IDX] = StartIDx
+ Resource[END_IDX] = EndIDx
+ /*
+ Resource pool stored in backend store as binary string.
+ Tracking the resource allocation will be done by setting the bits \
+ in the byte array. The index set will be the resource number allocated.
+ */
+ var TSData *bitmap.Threadsafe
+ if TSData = bitmap.NewTS(int(EndIDx)); TSData == nil {
+ log.Error("Failed to create a bitmap")
+ return nil, errors.New("Failed to create bitmap")
+ }
+ Resource[POOL] = TSData.Data(false) //we pass false so as the TSData lib api does not do a copy of the data and return
- Value, err := json.Marshal(Resource)
- if err != nil {
- log.Errorf("Failed to marshall resource")
- return nil, err
- }
- return Value, err
+ Value, err := json.Marshal(Resource)
+ if err != nil {
+ log.Errorf("Failed to marshall resource")
+ return nil, err
+ }
+ return Value, err
}
func (PONRMgr *PONResourceManager) GetResource(Path string) (map[string]interface{}, error) {
- /*
- Get resource from kv store.
+ /*
+ Get resource from kv store.
- :param path: path to get resource
- :return: resource if resource present in kv store else None
- */
- //get resource from kv store
+ :param path: path to get resource
+ :return: resource if resource present in kv store else None
+ */
+ //get resource from kv store
- var Value []byte
- Result := make(map[string]interface{})
- var Str string
+ var Value []byte
+ Result := make(map[string]interface{})
+ var Str string
- Resource, err := PONRMgr.KVStore.Get(Path)
- if (err != nil) || (Resource == nil) {
- log.Debugf("Resource unavailable at %s", Path)
- return nil, err
- }
+ Resource, err := PONRMgr.KVStore.Get(Path)
+ if (err != nil) || (Resource == nil) {
+ log.Debugf("Resource unavailable at %s", Path)
+ return nil, err
+ }
- Value, err = ToByte(Resource.Value)
+ Value, err = ToByte(Resource.Value)
- // decode resource fetched from backend store to dictionary
- err = json.Unmarshal(Value, &Result)
- if err != nil {
- log.Error("Failed to decode resource")
- return Result, err
- }
- /*
- resource pool in backend store stored as binary string whereas to
- access the pool to generate/release IDs it need to be converted
- as BitArray
- */
- Str, err = ToString(Result[POOL])
- if err != nil {
- log.Error("Failed to conver to kv pair to string")
- return Result, err
- }
- Decode64, _ := base64.StdEncoding.DecodeString(Str)
- Result[POOL], err = ToByte(Decode64)
- if err != nil {
- log.Error("Failed to convert resource pool to byte")
- return Result, err
- }
+ // decode resource fetched from backend store to dictionary
+ err = json.Unmarshal(Value, &Result)
+ if err != nil {
+ log.Error("Failed to decode resource")
+ return Result, err
+ }
+ /*
+ resource pool in backend store stored as binary string whereas to
+ access the pool to generate/release IDs it need to be converted
+ as BitArray
+ */
+ Str, err = ToString(Result[POOL])
+ if err != nil {
+ log.Error("Failed to conver to kv pair to string")
+ return Result, err
+ }
+ Decode64, _ := base64.StdEncoding.DecodeString(Str)
+ Result[POOL], err = ToByte(Decode64)
+ if err != nil {
+ log.Error("Failed to convert resource pool to byte")
+ return Result, err
+ }
- return Result, err
+ return Result, err
}
func (PONRMgr *PONResourceManager) GetPath(IntfID uint32, ResourceType string) string {
- /*
- Get path for given resource type.
- :param pon_intf_id: OLT PON interface id
- :param resource_type: String to identify type of resource
- :return: path for given resource type
- */
+ /*
+ Get path for given resource type.
+ :param pon_intf_id: OLT PON interface id
+ :param resource_type: String to identify type of resource
+ :return: path for given resource type
+ */
- /*
- Get the shared pool for the given resource type.
- all the resource ranges and the shared resource maps are initialized during the init.
- */
- SharedPoolID := PONRMgr.PonResourceRanges[PONRMgr.SharedIdxByType[ResourceType]].(uint32)
- if SharedPoolID != 0 {
- IntfID = SharedPoolID
- }
- var Path string
- if ResourceType == ONU_ID {
- Path = fmt.Sprintf(ONU_ID_POOL_PATH, PONRMgr.DeviceID, IntfID)
- } else if ResourceType == ALLOC_ID {
- Path = fmt.Sprintf(ALLOC_ID_POOL_PATH, PONRMgr.DeviceID, IntfID)
- } else if ResourceType == GEMPORT_ID {
- Path = fmt.Sprintf(GEMPORT_ID_POOL_PATH, PONRMgr.DeviceID, IntfID)
- } else if ResourceType == FLOW_ID {
- Path = fmt.Sprintf(FLOW_ID_POOL_PATH, PONRMgr.DeviceID, IntfID)
- } else {
- log.Error("Invalid resource pool identifier")
- }
- return Path
+ /*
+ Get the shared pool for the given resource type.
+ all the resource ranges and the shared resource maps are initialized during the init.
+ */
+ SharedPoolID := PONRMgr.PonResourceRanges[PONRMgr.SharedIdxByType[ResourceType]].(uint32)
+ if SharedPoolID != 0 {
+ IntfID = SharedPoolID
+ }
+ var Path string
+ if ResourceType == ONU_ID {
+ Path = fmt.Sprintf(ONU_ID_POOL_PATH, PONRMgr.DeviceID, IntfID)
+ } else if ResourceType == ALLOC_ID {
+ Path = fmt.Sprintf(ALLOC_ID_POOL_PATH, PONRMgr.DeviceID, IntfID)
+ } else if ResourceType == GEMPORT_ID {
+ Path = fmt.Sprintf(GEMPORT_ID_POOL_PATH, PONRMgr.DeviceID, IntfID)
+ } else if ResourceType == FLOW_ID {
+ Path = fmt.Sprintf(FLOW_ID_POOL_PATH, PONRMgr.DeviceID, IntfID)
+ } else {
+ log.Error("Invalid resource pool identifier")
+ }
+ return Path
}
func (PONRMgr *PONResourceManager) GetResourceID(IntfID uint32, ResourceType string, NumIDs uint32) ([]uint32, error) {
- /*
- Create alloc/gemport/onu/flow id for given OLT PON interface.
- :param pon_intf_id: OLT PON interface id
- :param resource_type: String to identify type of resource
- :param num_of_id: required number of ids
- :return list/uint32/None: list, uint32 or None if resource type is
- alloc_id/gemport_id, onu_id or invalid type respectively
- */
- if NumIDs < 1 {
- log.Error("Invalid number of resources requested")
- return nil, errors.New(fmt.Sprintf("Invalid number of resources requested %d", NumIDs))
- }
- // delegate to the master instance if sharing enabled across instances
+ /*
+ Create alloc/gemport/onu/flow id for given OLT PON interface.
+ :param pon_intf_id: OLT PON interface id
+ :param resource_type: String to identify type of resource
+ :param num_of_id: required number of ids
+ :return list/uint32/None: list, uint32 or None if resource type is
+ alloc_id/gemport_id, onu_id or invalid type respectively
+ */
+ if NumIDs < 1 {
+ log.Error("Invalid number of resources requested")
+ return nil, errors.New(fmt.Sprintf("Invalid number of resources requested %d", NumIDs))
+ }
+ // delegate to the master instance if sharing enabled across instances
- SharedResourceMgr := PONRMgr.SharedResourceMgrs[PONRMgr.SharedIdxByType[ResourceType]]
- if SharedResourceMgr != nil && PONRMgr != SharedResourceMgr {
- return SharedResourceMgr.GetResourceID(IntfID, ResourceType, NumIDs)
- }
+ SharedResourceMgr := PONRMgr.SharedResourceMgrs[PONRMgr.SharedIdxByType[ResourceType]]
+ if SharedResourceMgr != nil && PONRMgr != SharedResourceMgr {
+ return SharedResourceMgr.GetResourceID(IntfID, ResourceType, NumIDs)
+ }
- Path := PONRMgr.GetPath(IntfID, ResourceType)
- if Path == "" {
- log.Errorf("Failed to get path for resource type %s", ResourceType)
- return nil, errors.New(fmt.Sprintf("Failed to get path for resource type %s", ResourceType))
- }
- log.Debugf("Get resource for type %s on path %s", ResourceType, Path)
- var Result []uint32
- var NextID uint32
- Resource, err := PONRMgr.GetResource(Path)
- if (err == nil) && (ResourceType == ONU_ID) || (ResourceType == FLOW_ID) {
- if NextID, err = PONRMgr.GenerateNextID(Resource); err != nil {
- log.Error("Failed to Generate ID")
- return Result, err
- }
- Result = append(Result, NextID)
- } else if (err == nil) && ((ResourceType == GEMPORT_ID) || (ResourceType == ALLOC_ID)) {
- if NumIDs == 1 {
- if NextID, err = PONRMgr.GenerateNextID(Resource); err != nil {
- log.Error("Failed to Generate ID")
- return Result, err
- }
- Result = append(Result, NextID)
- } else {
- for NumIDs > 0 {
- if NextID, err = PONRMgr.GenerateNextID(Resource); err != nil {
- log.Error("Failed to Generate ID")
- return Result, err
- }
- Result = append(Result, NextID)
- NumIDs--
- }
- }
- } else {
- log.Error("get resource failed")
- return Result, err
- }
+ Path := PONRMgr.GetPath(IntfID, ResourceType)
+ if Path == "" {
+ log.Errorf("Failed to get path for resource type %s", ResourceType)
+ return nil, errors.New(fmt.Sprintf("Failed to get path for resource type %s", ResourceType))
+ }
+ log.Debugf("Get resource for type %s on path %s", ResourceType, Path)
+ var Result []uint32
+ var NextID uint32
+ Resource, err := PONRMgr.GetResource(Path)
+ if (err == nil) && (ResourceType == ONU_ID) || (ResourceType == FLOW_ID) {
+ if NextID, err = PONRMgr.GenerateNextID(Resource); err != nil {
+ log.Error("Failed to Generate ID")
+ return Result, err
+ }
+ Result = append(Result, NextID)
+ } else if (err == nil) && ((ResourceType == GEMPORT_ID) || (ResourceType == ALLOC_ID)) {
+ if NumIDs == 1 {
+ if NextID, err = PONRMgr.GenerateNextID(Resource); err != nil {
+ log.Error("Failed to Generate ID")
+ return Result, err
+ }
+ Result = append(Result, NextID)
+ } else {
+ for NumIDs > 0 {
+ if NextID, err = PONRMgr.GenerateNextID(Resource); err != nil {
+ log.Error("Failed to Generate ID")
+ return Result, err
+ }
+ Result = append(Result, NextID)
+ NumIDs--
+ }
+ }
+ } else {
+ log.Error("get resource failed")
+ return Result, err
+ }
- //Update resource in kv store
- if PONRMgr.UpdateResource(Path, Resource) != nil {
- log.Errorf("Failed to update resource %s", Path)
- return nil, errors.New(fmt.Sprintf("Failed to update resource %s", Path))
- }
- return Result, nil
+ //Update resource in kv store
+ if PONRMgr.UpdateResource(Path, Resource) != nil {
+ log.Errorf("Failed to update resource %s", Path)
+ return nil, errors.New(fmt.Sprintf("Failed to update resource %s", Path))
+ }
+ return Result, nil
}
func checkValidResourceType(ResourceType string) bool {
- KnownResourceTypes := []string{ONU_ID, ALLOC_ID, GEMPORT_ID, FLOW_ID}
+ KnownResourceTypes := []string{ONU_ID, ALLOC_ID, GEMPORT_ID, FLOW_ID}
- for _, v := range KnownResourceTypes {
- if v == ResourceType {
- return true
- }
- }
- return false
+ for _, v := range KnownResourceTypes {
+ if v == ResourceType {
+ return true
+ }
+ }
+ return false
}
func (PONRMgr *PONResourceManager) FreeResourceID(IntfID uint32, ResourceType string, ReleaseContent []uint32) bool {
- /*
- Release alloc/gemport/onu/flow id for given OLT PON interface.
- :param pon_intf_id: OLT PON interface id
- :param resource_type: String to identify type of resource
- :param release_content: required number of ids
- :return boolean: True if all IDs in given release_content release else False
- */
- if checkValidResourceType(ResourceType) == false {
- log.Error("Invalid resource type")
- return false
- }
- if ReleaseContent == nil {
- log.Debug("Nothing to release")
- return true
- }
- // delegate to the master instance if sharing enabled across instances
- SharedResourceMgr := PONRMgr.SharedResourceMgrs[PONRMgr.SharedIdxByType[ResourceType]]
- if SharedResourceMgr != nil && PONRMgr != SharedResourceMgr {
- return SharedResourceMgr.FreeResourceID(IntfID, ResourceType, ReleaseContent)
- }
- Path := PONRMgr.GetPath(IntfID, ResourceType)
- if Path == "" {
- log.Error("Failed to get path")
- return false
- }
- Resource, err := PONRMgr.GetResource(Path)
- if err != nil {
- log.Error("Failed to get resource")
- return false
- }
- for _, Val := range ReleaseContent {
- PONRMgr.ReleaseID(Resource, Val)
- }
- if PONRMgr.UpdateResource(Path, Resource) != nil {
- log.Errorf("Free resource for %s failed", Path)
- return false
- }
- return true
+ /*
+ Release alloc/gemport/onu/flow id for given OLT PON interface.
+ :param pon_intf_id: OLT PON interface id
+ :param resource_type: String to identify type of resource
+ :param release_content: required number of ids
+ :return boolean: True if all IDs in given release_content release else False
+ */
+ if checkValidResourceType(ResourceType) == false {
+ log.Error("Invalid resource type")
+ return false
+ }
+ if ReleaseContent == nil {
+ log.Debug("Nothing to release")
+ return true
+ }
+ // delegate to the master instance if sharing enabled across instances
+ SharedResourceMgr := PONRMgr.SharedResourceMgrs[PONRMgr.SharedIdxByType[ResourceType]]
+ if SharedResourceMgr != nil && PONRMgr != SharedResourceMgr {
+ return SharedResourceMgr.FreeResourceID(IntfID, ResourceType, ReleaseContent)
+ }
+ Path := PONRMgr.GetPath(IntfID, ResourceType)
+ if Path == "" {
+ log.Error("Failed to get path")
+ return false
+ }
+ Resource, err := PONRMgr.GetResource(Path)
+ if err != nil {
+ log.Error("Failed to get resource")
+ return false
+ }
+ for _, Val := range ReleaseContent {
+ PONRMgr.ReleaseID(Resource, Val)
+ }
+ if PONRMgr.UpdateResource(Path, Resource) != nil {
+ log.Errorf("Free resource for %s failed", Path)
+ return false
+ }
+ return true
}
func (PONRMgr *PONResourceManager) UpdateResource(Path string, Resource map[string]interface{}) error {
- /*
- Update resource in resource kv store.
- :param path: path to update resource
- :param resource: resource need to be updated
- :return boolean: True if resource updated in kv store else False
- */
- // TODO resource[POOL] = resource[POOL].bin
- Value, err := json.Marshal(Resource)
- if err != nil {
- log.Error("failed to Marshal")
- return err
- }
- err = PONRMgr.KVStore.Put(Path, Value)
- if err != nil {
- log.Error("failed to put data to kv store %s", Path)
- return err
- }
- return nil
+ /*
+ Update resource in resource kv store.
+ :param path: path to update resource
+ :param resource: resource need to be updated
+ :return boolean: True if resource updated in kv store else False
+ */
+ // TODO resource[POOL] = resource[POOL].bin
+ Value, err := json.Marshal(Resource)
+ if err != nil {
+ log.Error("failed to Marshal")
+ return err
+ }
+ err = PONRMgr.KVStore.Put(Path, Value)
+ if err != nil {
+ log.Error("failed to put data to kv store %s", Path)
+ return err
+ }
+ return nil
}
func (PONRMgr *PONResourceManager) ClearResourceIDPool(IntfID uint32, ResourceType string) bool {
- /*
- Clear Resource Pool for a given Resource Type on a given PON Port.
- :return boolean: True if removed else False
- */
+ /*
+ Clear Resource Pool for a given Resource Type on a given PON Port.
+ :return boolean: True if removed else False
+ */
- // delegate to the master instance if sharing enabled across instances
- SharedResourceMgr := PONRMgr.SharedResourceMgrs[PONRMgr.SharedIdxByType[ResourceType]]
- if SharedResourceMgr != nil && PONRMgr != SharedResourceMgr {
- return SharedResourceMgr.ClearResourceIDPool(IntfID, ResourceType)
- }
- Path := PONRMgr.GetPath(IntfID, ResourceType)
- if Path == "" {
- log.Error("Failed to get path")
- return false
- }
+ // delegate to the master instance if sharing enabled across instances
+ SharedResourceMgr := PONRMgr.SharedResourceMgrs[PONRMgr.SharedIdxByType[ResourceType]]
+ if SharedResourceMgr != nil && PONRMgr != SharedResourceMgr {
+ return SharedResourceMgr.ClearResourceIDPool(IntfID, ResourceType)
+ }
+ Path := PONRMgr.GetPath(IntfID, ResourceType)
+ if Path == "" {
+ log.Error("Failed to get path")
+ return false
+ }
- if err := PONRMgr.KVStore.Delete(Path); err != nil {
- log.Errorf("Failed to delete resource %s", Path)
- return false
- }
- log.Debugf("Cleared resource %s", Path)
- return true
+ if err := PONRMgr.KVStore.Delete(Path); err != nil {
+ log.Errorf("Failed to delete resource %s", Path)
+ return false
+ }
+ log.Debugf("Cleared resource %s", Path)
+ return true
}
func (PONRMgr PONResourceManager) InitResourceMap(PONIntfONUID string) {
- /*
- Initialize resource map
- :param pon_intf_onu_id: reference of PON interface id and onu id
- */
- // initialize pon_intf_onu_id tuple to alloc_ids map
- AllocIDPath := fmt.Sprintf(ALLOC_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, PONIntfONUID)
- var AllocIDs []byte
- Result := PONRMgr.KVStore.Put(AllocIDPath, AllocIDs)
- if Result != nil {
- log.Error("Failed to update the KV store")
- return
- }
- // initialize pon_intf_onu_id tuple to gemport_ids map
- GEMPortIDPath := fmt.Sprintf(GEMPORT_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, PONIntfONUID)
- var GEMPortIDs []byte
- Result = PONRMgr.KVStore.Put(GEMPortIDPath, GEMPortIDs)
- if Result != nil {
- log.Error("Failed to update the KV store")
- return
- }
+ /*
+ Initialize resource map
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ */
+ // initialize pon_intf_onu_id tuple to alloc_ids map
+ AllocIDPath := fmt.Sprintf(ALLOC_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, PONIntfONUID)
+ var AllocIDs []byte
+ Result := PONRMgr.KVStore.Put(AllocIDPath, AllocIDs)
+ if Result != nil {
+ log.Error("Failed to update the KV store")
+ return
+ }
+ // initialize pon_intf_onu_id tuple to gemport_ids map
+ GEMPortIDPath := fmt.Sprintf(GEMPORT_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, PONIntfONUID)
+ var GEMPortIDs []byte
+ Result = PONRMgr.KVStore.Put(GEMPortIDPath, GEMPortIDs)
+ if Result != nil {
+ log.Error("Failed to update the KV store")
+ return
+ }
}
func (PONRMgr PONResourceManager) RemoveResourceMap(PONIntfONUID string) bool {
- /*
- Remove resource map
- :param pon_intf_onu_id: reference of PON interface id and onu id
- */
- // remove pon_intf_onu_id tuple to alloc_ids map
- var err error
- AllocIDPath := fmt.Sprintf(ALLOC_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, PONIntfONUID)
- if err = PONRMgr.KVStore.Delete(AllocIDPath); err != nil {
- log.Errorf("Failed to remove resource %s", AllocIDPath)
- return false
- }
- // remove pon_intf_onu_id tuple to gemport_ids map
- GEMPortIDPath := fmt.Sprintf(GEMPORT_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, PONIntfONUID)
- err = PONRMgr.KVStore.Delete(GEMPortIDPath)
- if err != nil {
- log.Errorf("Failed to remove resource %s", GEMPortIDPath)
- return false
- }
+ /*
+ Remove resource map
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ */
+ // remove pon_intf_onu_id tuple to alloc_ids map
+ var err error
+ AllocIDPath := fmt.Sprintf(ALLOC_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, PONIntfONUID)
+ if err = PONRMgr.KVStore.Delete(AllocIDPath); err != nil {
+ log.Errorf("Failed to remove resource %s", AllocIDPath)
+ return false
+ }
+ // remove pon_intf_onu_id tuple to gemport_ids map
+ GEMPortIDPath := fmt.Sprintf(GEMPORT_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, PONIntfONUID)
+ err = PONRMgr.KVStore.Delete(GEMPortIDPath)
+ if err != nil {
+ log.Errorf("Failed to remove resource %s", GEMPortIDPath)
+ return false
+ }
- FlowIDPath := fmt.Sprintf(FLOW_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, PONIntfONUID)
- if FlowIDs, err := PONRMgr.KVStore.List(FlowIDPath); err != nil {
- for _, Flow := range FlowIDs {
- FlowIDInfoPath := fmt.Sprintf(FLOW_ID_INFO_PATH, PONRMgr.DeviceID, PONIntfONUID, Flow)
- if err = PONRMgr.KVStore.Delete(FlowIDInfoPath); err != nil {
- log.Errorf("Failed to remove resource %s", FlowIDInfoPath)
- return false
- }
- }
- }
+ FlowIDPath := fmt.Sprintf(FLOW_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, PONIntfONUID)
+ if FlowIDs, err := PONRMgr.KVStore.List(FlowIDPath); err != nil {
+ for _, Flow := range FlowIDs {
+ FlowIDInfoPath := fmt.Sprintf(FLOW_ID_INFO_PATH, PONRMgr.DeviceID, PONIntfONUID, Flow.Value)
+ if err = PONRMgr.KVStore.Delete(FlowIDInfoPath); err != nil {
+ log.Errorf("Failed to remove resource %s", FlowIDInfoPath)
+ return false
+ }
+ }
+ }
- if err = PONRMgr.KVStore.Delete(FlowIDPath); err != nil {
- log.Errorf("Failed to remove resource %s", FlowIDPath)
- return false
- }
+ if err = PONRMgr.KVStore.Delete(FlowIDPath); err != nil {
+ log.Errorf("Failed to remove resource %s", FlowIDPath)
+ return false
+ }
- return true
+ return true
}
func (PONRMgr *PONResourceManager) GetCurrentAllocIDForOnu(IntfONUID string) []uint32 {
- /*
- Get currently configured alloc ids for given pon_intf_onu_id
- :param pon_intf_onu_id: reference of PON interface id and onu id
- :return list: List of alloc_ids if available, else None
- */
- Path := fmt.Sprintf(ALLOC_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, IntfONUID)
+ /*
+ Get currently configured alloc ids for given pon_intf_onu_id
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ :return list: List of alloc_ids if available, else None
+ */
+ Path := fmt.Sprintf(ALLOC_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, IntfONUID)
- var Data []uint32
- Value, err := PONRMgr.KVStore.Get(Path)
- if err == nil {
- if Value != nil {
- Val,err := ToByte(Value.Value)
- if err != nil{
- log.Errorw("Failed to convert into byte array",log.Fields{"error":err})
- return Data
- }
- if err = json.Unmarshal(Val, &Data); err != nil {
- log.Error("Failed to unmarshal",log.Fields{"error":err})
- return Data
- }
- }
- }
- return Data
+ var Data []uint32
+ Value, err := PONRMgr.KVStore.Get(Path)
+ if err == nil {
+ if Value != nil {
+ Val, err := ToByte(Value.Value)
+ if err != nil {
+ log.Errorw("Failed to convert into byte array", log.Fields{"error": err})
+ return Data
+ }
+ if err = json.Unmarshal(Val, &Data); err != nil {
+ log.Error("Failed to unmarshal", log.Fields{"error": err})
+ return Data
+ }
+ }
+ }
+ return Data
}
func (PONRMgr *PONResourceManager) GetCurrentGEMPortIDsForOnu(IntfONUID string) []uint32 {
- /*
- Get currently configured gemport ids for given pon_intf_onu_id
- :param pon_intf_onu_id: reference of PON interface id and onu id
- :return list: List of gemport IDs if available, else None
- */
+ /*
+ Get currently configured gemport ids for given pon_intf_onu_id
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ :return list: List of gemport IDs if available, else None
+ */
- Path := fmt.Sprintf(GEMPORT_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, IntfONUID)
- log.Debugf("Getting current gemports for %s", Path)
- var Data []uint32
- Value, err := PONRMgr.KVStore.Get(Path)
- if err == nil {
- if Value != nil {
- Val, _ := ToByte(Value.Value)
- if err = json.Unmarshal(Val, &Data); err != nil {
- log.Errorw("Failed to unmarshal",log.Fields{"error":err})
- return Data
- }
- }
- } else {
- log.Errorf("Failed to get data from kvstore for %s", Path)
- }
- return Data
+ Path := fmt.Sprintf(GEMPORT_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, IntfONUID)
+ log.Debugf("Getting current gemports for %s", Path)
+ var Data []uint32
+ Value, err := PONRMgr.KVStore.Get(Path)
+ if err == nil {
+ if Value != nil {
+ Val, _ := ToByte(Value.Value)
+ if err = json.Unmarshal(Val, &Data); err != nil {
+ log.Errorw("Failed to unmarshal", log.Fields{"error": err})
+ return Data
+ }
+ }
+ } else {
+ log.Errorf("Failed to get data from kvstore for %s", Path)
+ }
+ return Data
}
func (PONRMgr *PONResourceManager) GetCurrentFlowIDsForOnu(IntfONUID string) []uint32 {
- /*
- Get currently configured flow ids for given pon_intf_onu_id
- :param pon_intf_onu_id: reference of PON interface id and onu id
- :return list: List of Flow IDs if available, else None
- */
+ /*
+ Get currently configured flow ids for given pon_intf_onu_id
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ :return list: List of Flow IDs if available, else None
+ */
- Path := fmt.Sprintf(FLOW_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, IntfONUID)
+ Path := fmt.Sprintf(FLOW_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, IntfONUID)
- var Data []uint32
- Value, err := PONRMgr.KVStore.Get(Path)
- if err == nil {
- if Value != nil {
- Val, _ := ToByte(Value.Value)
- if err = json.Unmarshal(Val, &Data); err != nil {
- log.Error("Failed to unmarshal")
- return Data
- }
- }
- }
- return Data
+ var Data []uint32
+ Value, err := PONRMgr.KVStore.Get(Path)
+ if err == nil {
+ if Value != nil {
+ Val, _ := ToByte(Value.Value)
+ if err = json.Unmarshal(Val, &Data); err != nil {
+ log.Error("Failed to unmarshal")
+ return Data
+ }
+ }
+ }
+ return Data
}
-func (PONRMgr *PONResourceManager) GetFlowIDInfo(IntfONUID string, FlowID uint32, Data interface{})error{
- /*
- Get flow details configured for the ONU.
- :param pon_intf_onu_id: reference of PON interface id and onu id
- :param flow_id: Flow Id reference
- :param Data: Result
- :return error: nil if no error in getting from KV store
- */
+func (PONRMgr *PONResourceManager) GetFlowIDInfo(IntfONUID string, FlowID uint32, Data interface{}) error {
+ /*
+ Get flow details configured for the ONU.
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ :param flow_id: Flow Id reference
+ :param Data: Result
+ :return error: nil if no error in getting from KV store
+ */
- Path := fmt.Sprintf(FLOW_ID_INFO_PATH, PONRMgr.DeviceID, IntfONUID, FlowID)
+ Path := fmt.Sprintf(FLOW_ID_INFO_PATH, PONRMgr.DeviceID, IntfONUID, FlowID)
- Value, err := PONRMgr.KVStore.Get(Path)
- if err == nil {
- if Value != nil {
- Val,err := ToByte(Value.Value)
- if err != nil{
- log.Errorw("Failed to convert flowinfo into byte array",log.Fields{"error":err})
- return err
- }
- if err = json.Unmarshal(Val, Data); err != nil {
- log.Errorw("Failed to unmarshal",log.Fields{"error":err})
- return err
- }
- }
- }
- return err
+ Value, err := PONRMgr.KVStore.Get(Path)
+ if err == nil {
+ if Value != nil {
+ Val, err := ToByte(Value.Value)
+ if err != nil {
+ log.Errorw("Failed to convert flowinfo into byte array", log.Fields{"error": err})
+ return err
+ }
+ if err = json.Unmarshal(Val, Data); err != nil {
+ log.Errorw("Failed to unmarshal", log.Fields{"error": err})
+ return err
+ }
+ }
+ }
+ return err
}
func (PONRMgr *PONResourceManager) RemoveFlowIDInfo(IntfONUID string, FlowID uint32) bool {
- /*
- Get flow_id details configured for the ONU.
- :param pon_intf_onu_id: reference of PON interface id and onu id
- :param flow_id: Flow Id reference
- */
- Path := fmt.Sprintf(FLOW_ID_INFO_PATH, PONRMgr.DeviceID, IntfONUID, FlowID)
+ /*
+ Get flow_id details configured for the ONU.
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ :param flow_id: Flow Id reference
+ */
+ Path := fmt.Sprintf(FLOW_ID_INFO_PATH, PONRMgr.DeviceID, IntfONUID, FlowID)
- if err := PONRMgr.KVStore.Delete(Path); err != nil {
- log.Errorf("Falied to remove resource %s", Path)
- return false
- }
- return true
+ if err := PONRMgr.KVStore.Delete(Path); err != nil {
+ log.Errorf("Falied to remove resource %s", Path)
+ return false
+ }
+ return true
}
func (PONRMgr *PONResourceManager) UpdateAllocIdsForOnu(IntfONUID string, AllocIDs []uint32) error {
- /*
- Update currently configured alloc ids for given pon_intf_onu_id
- :param pon_intf_onu_id: reference of PON interface id and onu id
- :param alloc_ids: list of alloc ids
- */
- var Value []byte
- var err error
- Path := fmt.Sprintf(ALLOC_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, IntfONUID)
- Value, err = json.Marshal(AllocIDs)
- if err != nil {
- log.Error("failed to Marshal")
- return err
- }
+ /*
+ Update currently configured alloc ids for given pon_intf_onu_id
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ :param alloc_ids: list of alloc ids
+ */
+ var Value []byte
+ var err error
+ Path := fmt.Sprintf(ALLOC_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, IntfONUID)
+ Value, err = json.Marshal(AllocIDs)
+ if err != nil {
+ log.Error("failed to Marshal")
+ return err
+ }
- if err = PONRMgr.KVStore.Put(Path, Value); err != nil {
- log.Errorf("Failed to update resource %s", Path)
- return err
- }
- return err
+ if err = PONRMgr.KVStore.Put(Path, Value); err != nil {
+ log.Errorf("Failed to update resource %s", Path)
+ return err
+ }
+ return err
}
func (PONRMgr *PONResourceManager) UpdateGEMPortIDsForOnu(IntfONUID string, GEMPortIDs []uint32) error {
- /*
- Update currently configured gemport ids for given pon_intf_onu_id
- :param pon_intf_onu_id: reference of PON interface id and onu id
- :param gemport_ids: list of gem port ids
- */
+ /*
+ Update currently configured gemport ids for given pon_intf_onu_id
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ :param gemport_ids: list of gem port ids
+ */
- var Value []byte
- var err error
- Path := fmt.Sprintf(GEMPORT_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, IntfONUID)
- log.Debugf("Updating gemport ids for %s", Path)
- Value, err = json.Marshal(GEMPortIDs)
- if err != nil {
- log.Error("failed to Marshal")
- return err
- }
+ var Value []byte
+ var err error
+ Path := fmt.Sprintf(GEMPORT_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, IntfONUID)
+ log.Debugf("Updating gemport ids for %s", Path)
+ Value, err = json.Marshal(GEMPortIDs)
+ if err != nil {
+ log.Error("failed to Marshal")
+ return err
+ }
- if err = PONRMgr.KVStore.Put(Path, Value); err != nil {
- log.Errorf("Failed to update resource %s", Path)
- return err
- }
- return err
+ if err = PONRMgr.KVStore.Put(Path, Value); err != nil {
+ log.Errorf("Failed to update resource %s", Path)
+ return err
+ }
+ return err
}
func checkForFlowIDInList(FlowIDList []uint32, FlowID uint32) (bool, uint32) {
- /*
- Check for a flow id in a given list of flow IDs.
- :param FLowIDList: List of Flow IDs
- :param FlowID: Flowd to check in the list
- : return true and the index if present false otherwise.
- */
+ /*
+ Check for a flow id in a given list of flow IDs.
+ :param FLowIDList: List of Flow IDs
+ :param FlowID: Flowd to check in the list
+ : return true and the index if present false otherwise.
+ */
- for idx, _ := range FlowIDList {
- if FlowID == FlowIDList[idx] {
- return true, uint32(idx)
- }
- }
- return false, 0
+ for idx, _ := range FlowIDList {
+ if FlowID == FlowIDList[idx] {
+ return true, uint32(idx)
+ }
+ }
+ return false, 0
}
func (PONRMgr *PONResourceManager) UpdateFlowIDForOnu(IntfONUID string, FlowID uint32, Add bool) error {
- /*
- Update the flow_id list of the ONU (add or remove flow_id from the list)
- :param pon_intf_onu_id: reference of PON interface id and onu id
- :param flow_id: flow ID
- :param add: Boolean flag to indicate whether the flow_id should be
- added or removed from the list. Defaults to adding the flow.
- */
- var Value []byte
- var err error
- var RetVal bool
- var IDx uint32
- Path := fmt.Sprintf(FLOW_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, IntfONUID)
- FlowIDs := PONRMgr.GetCurrentFlowIDsForOnu(IntfONUID)
+ /*
+ Update the flow_id list of the ONU (add or remove flow_id from the list)
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ :param flow_id: flow ID
+ :param add: Boolean flag to indicate whether the flow_id should be
+ added or removed from the list. Defaults to adding the flow.
+ */
+ var Value []byte
+ var err error
+ var RetVal bool
+ var IDx uint32
+ Path := fmt.Sprintf(FLOW_ID_RESOURCE_MAP_PATH, PONRMgr.DeviceID, IntfONUID)
+ FlowIDs := PONRMgr.GetCurrentFlowIDsForOnu(IntfONUID)
- if Add {
- if RetVal, IDx = checkForFlowIDInList(FlowIDs, FlowID); RetVal == true {
- return err
- }
- FlowIDs = append(FlowIDs, FlowID)
- } else {
- if RetVal, IDx = checkForFlowIDInList(FlowIDs, FlowID); RetVal == false {
- return err
- }
- // delete the index and shift
- FlowIDs = append(FlowIDs[:IDx], FlowIDs[IDx+1:]...)
- }
- Value, err = json.Marshal(FlowIDs)
- if err != nil {
- log.Error("Failed to Marshal")
- return err
- }
+ if Add {
+ if RetVal, IDx = checkForFlowIDInList(FlowIDs, FlowID); RetVal == true {
+ return err
+ }
+ FlowIDs = append(FlowIDs, FlowID)
+ } else {
+ if RetVal, IDx = checkForFlowIDInList(FlowIDs, FlowID); RetVal == false {
+ return err
+ }
+ // delete the index and shift
+ FlowIDs = append(FlowIDs[:IDx], FlowIDs[IDx+1:]...)
+ }
+ Value, err = json.Marshal(FlowIDs)
+ if err != nil {
+ log.Error("Failed to Marshal")
+ return err
+ }
- if err = PONRMgr.KVStore.Put(Path, Value); err != nil {
- log.Errorf("Failed to update resource %s", Path)
- return err
- }
- return err
+ if err = PONRMgr.KVStore.Put(Path, Value); err != nil {
+ log.Errorf("Failed to update resource %s", Path)
+ return err
+ }
+ return err
}
-func (PONRMgr *PONResourceManager) UpdateFlowIDInfoForOnu(IntfONUID string, FlowID uint32, FlowData interface {}) error {
- /*
- Update any metadata associated with the flow_id. The flow_data could be json
- or any of other data structure. The resource manager doesnt care
- :param pon_intf_onu_id: reference of PON interface id and onu id
- :param flow_id: Flow ID
- :param flow_data: Flow data blob
- */
- var Value []byte
- var err error
- Path := fmt.Sprintf(FLOW_ID_INFO_PATH, PONRMgr.DeviceID, IntfONUID, FlowID)
- Value, err = json.Marshal(FlowData)
- if err != nil {
- log.Error("failed to Marshal")
- return err
- }
+func (PONRMgr *PONResourceManager) UpdateFlowIDInfoForOnu(IntfONUID string, FlowID uint32, FlowData interface{}) error {
+ /*
+ Update any metadata associated with the flow_id. The flow_data could be json
+ or any of other data structure. The resource manager doesnt care
+ :param pon_intf_onu_id: reference of PON interface id and onu id
+ :param flow_id: Flow ID
+ :param flow_data: Flow data blob
+ */
+ var Value []byte
+ var err error
+ Path := fmt.Sprintf(FLOW_ID_INFO_PATH, PONRMgr.DeviceID, IntfONUID, FlowID)
+ Value, err = json.Marshal(FlowData)
+ if err != nil {
+ log.Error("failed to Marshal")
+ return err
+ }
- if err = PONRMgr.KVStore.Put(Path, Value); err != nil {
- log.Errorf("Failed to update resource %s", Path)
- return err
- }
- return err
+ if err = PONRMgr.KVStore.Put(Path, Value); err != nil {
+ log.Errorf("Failed to update resource %s", Path)
+ return err
+ }
+ return err
}
func (PONRMgr *PONResourceManager) GenerateNextID(Resource map[string]interface{}) (uint32, error) {
- /*
- Generate unique id having OFFSET as start
- :param resource: resource used to generate ID
- :return uint32: generated id
- */
- ByteArray, err := ToByte(Resource[POOL])
- if err != nil {
- log.Error("Failed to convert resource to byte array")
- return 0, err
- }
- Data := bitmap.TSFromData(ByteArray, false)
- if Data == nil {
- log.Error("Failed to get data from byte array")
- return 0, errors.New("Failed to get data from byte array")
- }
+ /*
+ Generate unique id having OFFSET as start
+ :param resource: resource used to generate ID
+ :return uint32: generated id
+ */
+ ByteArray, err := ToByte(Resource[POOL])
+ if err != nil {
+ log.Error("Failed to convert resource to byte array")
+ return 0, err
+ }
+ Data := bitmap.TSFromData(ByteArray, false)
+ if Data == nil {
+ log.Error("Failed to get data from byte array")
+ return 0, errors.New("Failed to get data from byte array")
+ }
- Len := Data.Len()
- var Idx int
- for Idx = 0; Idx < Len; Idx++ {
- Val := Data.Get(Idx)
- if Val == false {
- break
- }
- }
- Data.Set(Idx, true)
- res := uint32(Resource[START_IDX].(float64))
- Resource[POOL] = Data.Data(false)
- log.Debugf("Generated ID for %d", (uint32(Idx) + res))
- return (uint32(Idx) + res), err
+ Len := Data.Len()
+ var Idx int
+ for Idx = 0; Idx < Len; Idx++ {
+ Val := Data.Get(Idx)
+ if Val == false {
+ break
+ }
+ }
+ Data.Set(Idx, true)
+ res := uint32(Resource[START_IDX].(float64))
+ Resource[POOL] = Data.Data(false)
+ log.Debugf("Generated ID for %d", (uint32(Idx) + res))
+ return (uint32(Idx) + res), err
}
func (PONRMgr *PONResourceManager) ReleaseID(Resource map[string]interface{}, Id uint32) bool {
- /*
- Release unique id having OFFSET as start index.
- :param resource: resource used to release ID
- :param unique_id: id need to be released
- */
- ByteArray, err := ToByte(Resource[POOL])
- if err != nil {
- log.Error("Failed to convert resource to byte array")
- return false
- }
- Data := bitmap.TSFromData(ByteArray, false)
- if Data == nil {
- log.Error("Failed to get resource pool")
- return false
- }
- var Idx uint32
- Idx = Id - uint32(Resource[START_IDX].(float64))
- Data.Set(int(Idx), false)
- Resource[POOL] = Data.Data(false)
+ /*
+ Release unique id having OFFSET as start index.
+ :param resource: resource used to release ID
+ :param unique_id: id need to be released
+ */
+ ByteArray, err := ToByte(Resource[POOL])
+ if err != nil {
+ log.Error("Failed to convert resource to byte array")
+ return false
+ }
+ Data := bitmap.TSFromData(ByteArray, false)
+ if Data == nil {
+ log.Error("Failed to get resource pool")
+ return false
+ }
+ var Idx uint32
+ Idx = Id - uint32(Resource[START_IDX].(float64))
+ Data.Set(int(Idx), false)
+ Resource[POOL] = Data.Data(false)
- return true
+ return true
}
-func (PONRMgr *PONResourceManager) GetTechnology()string{
- return PONRMgr.Technology
+func (PONRMgr *PONResourceManager) GetTechnology() string {
+ return PONRMgr.Technology
}
-func (PONRMgr *PONResourceManager) GetResourceTypeAllocID()string{
- return ALLOC_ID
+func (PONRMgr *PONResourceManager) GetResourceTypeAllocID() string {
+ return ALLOC_ID
}
-func (PONRMgr *PONResourceManager) GetResourceTypeGemPortID()string{
- return GEMPORT_ID
+func (PONRMgr *PONResourceManager) GetResourceTypeGemPortID() string {
+ return GEMPORT_ID
}
-
-
// ToByte converts an interface value to a []byte. The interface should either be of
// a string type or []byte. Otherwise, an error is returned.
func ToByte(value interface{}) ([]byte, error) {
- switch t := value.(type) {
- case []byte:
- return value.([]byte), nil
- case string:
- return []byte(value.(string)), nil
- default:
- return nil, fmt.Errorf("unexpected-type-%T", t)
- }
+ switch t := value.(type) {
+ case []byte:
+ return value.([]byte), nil
+ case string:
+ return []byte(value.(string)), nil
+ default:
+ return nil, fmt.Errorf("unexpected-type-%T", t)
+ }
}
// ToString converts an interface value to a string. The interface should either be of
// a string type or []byte. Otherwise, an error is returned.
func ToString(value interface{}) (string, error) {
- switch t := value.(type) {
- case []byte:
- return string(value.([]byte)), nil
- case string:
- return value.(string), nil
- default:
- return "", fmt.Errorf("unexpected-type-%T", t)
- }
+ switch t := value.(type) {
+ case []byte:
+ return string(value.([]byte)), nil
+ case string:
+ return value.(string), nil
+ default:
+ return "", fmt.Errorf("unexpected-type-%T", t)
+ }
}
diff --git a/vendor/github.com/opencord/voltha-go/common/techprofile/tech_profile.go b/vendor/github.com/opencord/voltha-go/common/techprofile/tech_profile.go
index 2a256e9..9f7bebf 100644
--- a/vendor/github.com/opencord/voltha-go/common/techprofile/tech_profile.go
+++ b/vendor/github.com/opencord/voltha-go/common/techprofile/tech_profile.go
@@ -122,7 +122,7 @@
const MAX_GEM_PAYLOAD = "max_gem_payload_size"
type InstanceControl struct {
- Onu string `json:ONU"`
+ Onu string `json:"ONU"`
Uni string `json:"uni"`
MaxGemPayloadSize string `json:"max_gem_payload_size"`
}
diff --git a/vendor/github.com/opencord/voltha-go/db/kvstore/consulclient.go b/vendor/github.com/opencord/voltha-go/db/kvstore/consulclient.go
index 738ca92..2d02342 100644
--- a/vendor/github.com/opencord/voltha-go/db/kvstore/consulclient.go
+++ b/vendor/github.com/opencord/voltha-go/db/kvstore/consulclient.go
@@ -498,11 +498,10 @@
}
}
-
-func (c *ConsulClient) AcquireLock(lockName string, timeout int) error {
+func (c *ConsulClient) AcquireLock(lockName string, timeout int) error {
return nil
}
-func (c *ConsulClient) ReleaseLock(lockName string) error {
+func (c *ConsulClient) ReleaseLock(lockName string) error {
return nil
-}
\ No newline at end of file
+}
diff --git a/vendor/github.com/opencord/voltha-go/db/kvstore/etcdclient.go b/vendor/github.com/opencord/voltha-go/db/kvstore/etcdclient.go
index e5f6dfe..6935296 100644
--- a/vendor/github.com/opencord/voltha-go/db/kvstore/etcdclient.go
+++ b/vendor/github.com/opencord/voltha-go/db/kvstore/etcdclient.go
@@ -66,15 +66,6 @@
ctx, cancel := context.WithTimeout(context.Background(), duration)
- // DO NOT lock by default; otherwise lock per instructed value
- if len(lock) > 0 && lock[0] {
- session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
- mu := v3Concurrency.NewMutex(session, "/lock"+key)
- mu.Lock(context.Background())
- defer mu.Unlock(context.Background())
- defer session.Close()
- }
-
resp, err := c.ectdAPI.Get(ctx, key, v3Client.WithPrefix())
cancel()
if err != nil {
@@ -95,15 +86,6 @@
ctx, cancel := context.WithTimeout(context.Background(), duration)
- // Lock by default; otherwise lock per instructed value
- if len(lock) > 0 && lock[0] {
- session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
- mu := v3Concurrency.NewMutex(session, "/lock"+key)
- mu.Lock(context.Background())
- defer mu.Unlock(context.Background())
- defer session.Close()
- }
-
resp, err := c.ectdAPI.Get(ctx, key)
cancel()
if err != nil {
@@ -133,15 +115,6 @@
ctx, cancel := context.WithTimeout(context.Background(), duration)
- // Lock by default; otherwise lock per instructed value
- if len(lock) == 0 || lock[0] {
- session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
- mu := v3Concurrency.NewMutex(session, "/lock"+key)
- mu.Lock(context.Background())
- defer mu.Unlock(context.Background())
- defer session.Close()
- }
-
c.writeLock.Lock()
defer c.writeLock.Unlock()
_, err := c.ectdAPI.Put(ctx, key, val)
@@ -170,15 +143,6 @@
ctx, cancel := context.WithTimeout(context.Background(), duration)
- // Lock by default; otherwise lock per instructed value
- if len(lock) == 0 || lock[0] {
- session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
- mu := v3Concurrency.NewMutex(session, "/lock"+key)
- mu.Lock(context.Background())
- defer mu.Unlock(context.Background())
- defer session.Close()
- }
-
defer cancel()
c.writeLock.Lock()
@@ -287,7 +251,7 @@
// ReleaseReservation releases reservation for a specific key.
func (c *EtcdClient) ReleaseReservation(key string) error {
// Get the leaseid using the key
- log.Debugw("Release-reservation", log.Fields{"key":key})
+ log.Debugw("Release-reservation", log.Fields{"key": key})
var ok bool
var leaseID *v3Client.LeaseID
c.writeLock.Lock()
@@ -491,13 +455,14 @@
func (c *EtcdClient) AcquireLock(lockName string, timeout int) error {
duration := GetDuration(timeout)
ctx, cancel := context.WithTimeout(context.Background(), duration)
+ defer cancel()
session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
mu := v3Concurrency.NewMutex(session, "/devicelock_"+lockName)
if err := mu.Lock(context.Background()); err != nil {
+ cancel()
return err
}
c.addLockName(lockName, mu, session)
- cancel()
return nil
}
diff --git a/vendor/github.com/opencord/voltha-go/db/model/branch.go b/vendor/github.com/opencord/voltha-go/db/model/branch.go
index 40c66ad..ca89df0 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/branch.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/branch.go
@@ -27,11 +27,12 @@
// Branch structure is used to classify a collection of transaction based revisions
type Branch struct {
sync.RWMutex
- Node *node
- Txid string
- Origin Revision
- Revisions map[string]Revision
- Latest Revision
+ Node *node
+ Txid string
+ Origin Revision
+ Revisions map[string]Revision
+ LatestLock sync.RWMutex
+ Latest Revision
}
// NewBranch creates a new instance of the Branch structure
@@ -46,17 +47,69 @@
return b
}
+// Utility function to extract all children names for a given revision (mostly for debugging purposes)
+func (b *Branch) retrieveChildrenNames(revision Revision) []string {
+ var childrenNames []string
+
+ for _, child := range revision.GetChildren("devices") {
+ childrenNames = append(childrenNames, child.GetName())
+ }
+
+ return childrenNames
+}
+
+// Utility function to compare children names and report the missing ones (mostly for debugging purposes)
+func (b *Branch) findMissingChildrenNames(previousNames, latestNames []string) []string {
+ var missingNames []string
+
+ for _, previousName := range previousNames {
+ found := false
+
+ if len(latestNames) == 0 {
+ break
+ }
+
+ for _, latestName := range latestNames {
+ if previousName == latestName {
+ found = true
+ break
+ }
+ }
+ if !found {
+ missingNames = append(missingNames, previousName)
+ }
+ }
+
+ return missingNames
+}
+
// SetLatest assigns the latest revision for this branch
func (b *Branch) SetLatest(latest Revision) {
b.Lock()
defer b.Unlock()
if b.Latest != nil {
- log.Debugf("Switching latest from <%s> to <%s>", b.Latest.GetHash(), latest.GetHash())
- } else {
- log.Debugf("Switching latest from <NIL> to <%s>", latest.GetHash())
- }
+ log.Debugw("updating-latest-revision", log.Fields{"current": b.Latest.GetHash(), "new": latest.GetHash()})
+ // Go through list of children names in current revision and new revision
+ // and then compare the resulting outputs to ensure that we have not lost any entries.
+ var previousNames, latestNames, missingNames []string
+
+ if previousNames = b.retrieveChildrenNames(b.Latest); len(previousNames) > 0 {
+ log.Debugw("children-of-previous-revision", log.Fields{"hash": b.Latest.GetHash(), "names": previousNames})
+ }
+
+ if latestNames = b.retrieveChildrenNames(b.Latest); len(latestNames) > 0 {
+ log.Debugw("children-of-latest-revision", log.Fields{"hash": latest.GetHash(), "names": latestNames})
+ }
+
+ if missingNames = b.findMissingChildrenNames(previousNames, latestNames); len(missingNames) > 0 {
+ log.Debugw("children-missing-in-latest-revision", log.Fields{"hash": latest.GetHash(), "names": missingNames})
+ }
+
+ } else {
+ log.Debugw("setting-latest-revision", log.Fields{"new": latest.GetHash()})
+ }
b.Latest = latest
}
@@ -103,3 +156,13 @@
b.Revisions[hash] = revision
}
+
+// DeleteRevision removes a revision with the specified hash
+func (b *Branch) DeleteRevision(hash string) {
+ b.Lock()
+ defer b.Unlock()
+
+ if _, ok := b.Revisions[hash]; ok {
+ delete(b.Revisions, hash)
+ }
+}
diff --git a/vendor/github.com/opencord/voltha-go/db/model/node.go b/vendor/github.com/opencord/voltha-go/db/model/node.go
index 2a9309c..7bfdca0 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/node.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/node.go
@@ -117,22 +117,34 @@
n.Lock()
defer n.Unlock()
+ // Keep a reference to the current revision
+ var previous string
+ if branch.GetLatest() != nil {
+ previous = branch.GetLatest().GetHash()
+ }
+
branch.AddRevision(revision)
+ // If anything is new, then set the revision as the latest
if branch.GetLatest() == nil || revision.GetHash() != branch.GetLatest().GetHash() {
branch.SetLatest(revision)
}
+ // Delete the previous revision if anything has changed
+ if previous != "" && previous != branch.GetLatest().GetHash() {
+ branch.DeleteRevision(previous)
+ }
+
if changeAnnouncement != nil && branch.Txid == "" {
if n.Proxy != nil {
for _, change := range changeAnnouncement {
- //log.Debugw("invoking callback",
- // log.Fields{
- // "callbacks": n.Proxy.getCallbacks(change.Type),
- // "type": change.Type,
- // "previousData": change.PreviousData,
- // "latestData": change.LatestData,
- // })
+ log.Debugw("adding-callback",
+ log.Fields{
+ "callbacks": n.Proxy.getCallbacks(change.Type),
+ "type": change.Type,
+ "previousData": change.PreviousData,
+ "latestData": change.LatestData,
+ })
n.Root.AddCallback(
n.Proxy.InvokeCallbacks,
change.Type,
@@ -141,19 +153,6 @@
change.LatestData)
}
}
-
- //for _, change := range changeAnnouncement {
- //log.Debugf("sending notification - changeType: %+v, previous:%+v, latest: %+v",
- // change.Type,
- // change.PreviousData,
- // change.LatestData)
- //n.Root.AddNotificationCallback(
- // n.makeEventBus().Advertise,
- // change.Type,
- // revision.GetHash(),
- // change.PreviousData,
- // change.LatestData)
- //}
}
}
@@ -288,8 +287,7 @@
// Get retrieves the data from a node tree that resides at the specified path
func (n *node) Get(path string, hash string, depth int, reconcile bool, txid string) interface{} {
- log.Debugw("node-get-request", log.Fields{"path": path, "hash": hash, "depth": depth, "reconcile": reconcile,
- "txid": txid})
+ log.Debugw("node-get-request", log.Fields{"path": path, "hash": hash, "depth": depth, "reconcile": reconcile, "txid": txid})
for strings.HasPrefix(path, "/") {
path = path[1:]
}
@@ -311,8 +309,7 @@
// If there is not request to reconcile, try to get it from memory
if !reconcile {
- if result = n.getPath(rev.GetBranch().GetLatest(), path, depth);
- result != nil && reflect.ValueOf(result).IsValid() && !reflect.ValueOf(result).IsNil() {
+ if result = n.getPath(rev.GetBranch().GetLatest(), path, depth); result != nil && reflect.ValueOf(result).IsValid() && !reflect.ValueOf(result).IsNil() {
return result
}
}
@@ -473,6 +470,11 @@
copy(children, rev.GetChildren(name))
idx, childRev := n.findRevByKey(children, field.Key, keyValue)
+
+ if childRev == nil {
+ return branch.GetLatest()
+ }
+
childNode := childRev.GetNode()
// Save proxy in child node to ensure callbacks are called later on
@@ -502,12 +504,20 @@
// Prefix the hash value with the data type (e.g. devices, logical_devices, adapters)
newChildRev.SetName(name + "/" + _keyValueType)
- children[idx] = newChildRev
+
+ if idx >= 0 {
+ children[idx] = newChildRev
+ } else {
+ children = append(children, newChildRev)
+ }
+
+ branch.LatestLock.Lock()
+ defer branch.LatestLock.Unlock()
updatedRev := rev.UpdateChildren(name, children, branch)
- branch.GetLatest().Drop(txid, false)
n.makeLatest(branch, updatedRev, nil)
+ updatedRev.ChildDrop(name, childRev.GetHash())
return newChildRev
@@ -518,10 +528,15 @@
childRev := rev.GetChildren(name)[0]
childNode := childRev.GetNode()
newChildRev := childNode.Update(path, data, strict, txid, makeBranch)
+
+ branch.LatestLock.Lock()
+ defer branch.LatestLock.Unlock()
+
updatedRev := rev.UpdateChildren(name, []Revision{newChildRev}, branch)
- rev.Drop(txid, false)
n.makeLatest(branch, updatedRev, nil)
+ updatedRev.ChildDrop(name, childRev.GetHash())
+
return newChildRev
}
@@ -557,16 +572,10 @@
rev := branch.GetLatest().UpdateData(data, branch)
changes := []ChangeTuple{{POST_UPDATE, branch.GetLatest().GetData(), rev.GetData()}}
-
- // FIXME VOL-1293: the following statement corrupts the kv when using a subproxy (commenting for now)
- // FIXME VOL-1293 cont'd: need to figure out the required conditions otherwise we are not cleaning up entries
- //branch.GetLatest().Drop(branch.Txid, false)
-
n.makeLatest(branch, rev, changes)
return rev
}
-
return branch.GetLatest()
}
@@ -628,15 +637,16 @@
// Prefix the hash with the data type (e.g. devices, logical_devices, adapters)
childRev.SetName(name + "/" + key.String())
- // Create watch for <component>/<key>
- childRev.SetupWatch(childRev.GetName())
+ branch.LatestLock.Lock()
+ defer branch.LatestLock.Unlock()
children = append(children, childRev)
- rev = rev.UpdateChildren(name, children, branch)
- changes := []ChangeTuple{{POST_ADD, nil, childRev.GetData()}}
- rev.Drop(txid, false)
- n.makeLatest(branch, rev, changes)
+ updatedRev := rev.UpdateChildren(name, children, branch)
+ changes := []ChangeTuple{{POST_ADD, nil, childRev.GetData()}}
+ childRev.SetupWatch(childRev.GetName())
+
+ n.makeLatest(branch, updatedRev, changes)
return childRev
}
@@ -657,17 +667,29 @@
idx, childRev := n.findRevByKey(children, field.Key, keyValue)
+ if childRev == nil {
+ return branch.GetLatest()
+ }
+
childNode := childRev.GetNode()
newChildRev := childNode.Add(path, data, txid, makeBranch)
// Prefix the hash with the data type (e.g. devices, logical_devices, adapters)
childRev.SetName(name + "/" + keyValue.(string))
- children[idx] = newChildRev
+ if idx >= 0 {
+ children[idx] = newChildRev
+ } else {
+ children = append(children, newChildRev)
+ }
- rev = rev.UpdateChildren(name, children, branch)
- rev.Drop(txid, false)
- n.makeLatest(branch, rev.GetBranch().GetLatest(), nil)
+ branch.LatestLock.Lock()
+ defer branch.LatestLock.Unlock()
+
+ updatedRev := rev.UpdateChildren(name, children, branch)
+ n.makeLatest(branch, updatedRev, nil)
+
+ updatedRev.ChildDrop(name, childRev.GetHash())
return newChildRev
} else {
@@ -729,20 +751,30 @@
copy(children, rev.GetChildren(name))
if path != "" {
- idx, childRev := n.findRevByKey(children, field.Key, keyValue)
- childNode := childRev.GetNode()
- if childNode.Proxy == nil {
- childNode.Proxy = n.Proxy
+ if idx, childRev := n.findRevByKey(children, field.Key, keyValue); childRev != nil {
+ childNode := childRev.GetNode()
+ if childNode.Proxy == nil {
+ childNode.Proxy = n.Proxy
+ }
+ newChildRev := childNode.Remove(path, txid, makeBranch)
+
+ if idx >= 0 {
+ children[idx] = newChildRev
+ } else {
+ children = append(children, newChildRev)
+ }
+
+ branch.LatestLock.Lock()
+ defer branch.LatestLock.Unlock()
+
+ rev.SetChildren(name, children)
+ branch.GetLatest().Drop(txid, false)
+ n.makeLatest(branch, rev, nil)
}
- newChildRev := childNode.Remove(path, txid, makeBranch)
- children[idx] = newChildRev
- rev.SetChildren(name, children)
- branch.GetLatest().Drop(txid, false)
- n.makeLatest(branch, rev, nil)
- return nil
+ return branch.GetLatest()
}
- if idx, childRev := n.findRevByKey(children, field.Key, keyValue); childRev != nil {
+ if idx, childRev := n.findRevByKey(children, field.Key, keyValue); childRev != nil && idx >= 0 {
if n.GetProxy() != nil {
data := childRev.GetData()
n.GetProxy().InvokeCallbacks(PRE_REMOVE, false, data)
@@ -752,6 +784,10 @@
}
childRev.StorageDrop(txid, true)
+
+ branch.LatestLock.Lock()
+ defer branch.LatestLock.Unlock()
+
children = append(children[:idx], children[idx+1:]...)
rev.SetChildren(name, children)
diff --git a/vendor/github.com/opencord/voltha-go/db/model/non_persisted_revision.go b/vendor/github.com/opencord/voltha-go/db/model/non_persisted_revision.go
index 3c39e01..0ccc58e 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/non_persisted_revision.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/non_persisted_revision.go
@@ -22,34 +22,41 @@
"github.com/golang/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
"reflect"
+ "runtime/debug"
"sort"
"sync"
)
-type revCacheSingleton struct {
- sync.RWMutex
- Cache map[string]interface{}
-}
-
-var revCacheInstance *revCacheSingleton
-var revCacheOnce sync.Once
-
-func GetRevCache() *revCacheSingleton {
- revCacheOnce.Do(func() {
- revCacheInstance = &revCacheSingleton{Cache: make(map[string]interface{})}
- })
- return revCacheInstance
-}
+// TODO: Cache logic will have to be revisited to cleanup unused entries in memory (disabled for now)
+//
+//type revCacheSingleton struct {
+// sync.RWMutex
+// //Cache map[string]interface{}
+// Cache sync.Map
+//}
+//
+//var revCacheInstance *revCacheSingleton
+//var revCacheOnce sync.Once
+//
+//func GetRevCache() *revCacheSingleton {
+// revCacheOnce.Do(func() {
+// //revCacheInstance = &revCacheSingleton{Cache: make(map[string]interface{})}
+// revCacheInstance = &revCacheSingleton{Cache: sync.Map{}}
+// })
+// return revCacheInstance
+//}
type NonPersistedRevision struct {
- mutex sync.RWMutex
- Root *root
- Config *DataRevision
- Children map[string][]Revision
- Hash string
- Branch *Branch
- WeakRef string
- Name string
+ mutex sync.RWMutex
+ Root *root
+ Config *DataRevision
+ childrenLock sync.RWMutex
+ Children map[string][]Revision
+ Hash string
+ Branch *Branch
+ WeakRef string
+ Name string
+ discarded bool
}
func NewNonPersistedRevision(root *root, branch *Branch, data interface{}, children map[string][]Revision) Revision {
@@ -59,9 +66,14 @@
r.Config = NewDataRevision(root, data)
r.Children = children
r.Hash = r.hashContent()
+ r.discarded = false
return r
}
+func (npr *NonPersistedRevision) IsDiscarded() bool {
+ return npr.discarded
+}
+
func (npr *NonPersistedRevision) SetConfig(config *DataRevision) {
npr.mutex.Lock()
defer npr.mutex.Unlock()
@@ -75,28 +87,35 @@
}
func (npr *NonPersistedRevision) SetAllChildren(children map[string][]Revision) {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
- npr.Children = children
-}
+ npr.childrenLock.Lock()
+ defer npr.childrenLock.Unlock()
+ npr.Children = make(map[string][]Revision)
-func (npr *NonPersistedRevision) SetChildren(name string, children []Revision) {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
- if _, exists := npr.Children[name]; exists {
- npr.Children[name] = children
+ for key, value := range children {
+ npr.Children[key] = make([]Revision, len(value))
+ copy(npr.Children[key], value)
}
}
+func (npr *NonPersistedRevision) SetChildren(name string, children []Revision) {
+ npr.childrenLock.Lock()
+ defer npr.childrenLock.Unlock()
+
+ npr.Children[name] = make([]Revision, len(children))
+ copy(npr.Children[name], children)
+}
+
func (npr *NonPersistedRevision) GetAllChildren() map[string][]Revision {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
+ npr.childrenLock.Lock()
+ defer npr.childrenLock.Unlock()
+
return npr.Children
}
func (npr *NonPersistedRevision) GetChildren(name string) []Revision {
- npr.mutex.Lock()
- defer npr.mutex.Unlock()
+ npr.childrenLock.Lock()
+ defer npr.childrenLock.Unlock()
+
if _, exists := npr.Children[name]; exists {
return npr.Children[name]
}
@@ -160,22 +179,11 @@
}
func (npr *NonPersistedRevision) Finalize(skipOnExist bool) {
- GetRevCache().Lock()
- defer GetRevCache().Unlock()
-
- if !skipOnExist {
- npr.Hash = npr.hashContent()
- }
- if _, exists := GetRevCache().Cache[npr.Hash]; !exists {
- GetRevCache().Cache[npr.Hash] = npr
- }
- if _, exists := GetRevCache().Cache[npr.Config.Hash]; !exists {
- GetRevCache().Cache[npr.Config.Hash] = npr.Config
- } else {
- npr.Config = GetRevCache().Cache[npr.Config.Hash].(*DataRevision)
- }
+ npr.Hash = npr.hashContent()
}
+// hashContent generates a hash string based on the contents of the revision.
+// The string should be unique to avoid conflicts with other revisions
func (npr *NonPersistedRevision) hashContent() string {
var buffer bytes.Buffer
var childrenKeys []string
@@ -184,6 +192,10 @@
buffer.WriteString(npr.Config.Hash)
}
+ if npr.Name != "" {
+ buffer.WriteString(npr.Name)
+ }
+
for key := range npr.Children {
childrenKeys = append(childrenKeys, key)
}
@@ -204,18 +216,20 @@
return fmt.Sprintf("%x", md5.Sum(buffer.Bytes()))[:12]
}
+// Get will retrieve the data for the current revision
func (npr *NonPersistedRevision) Get(depth int) interface{} {
// 1. Clone the data to avoid any concurrent access issues
// 2. The current rev might still be pointing to an old config
// thus, force the revision to get its latest value
latestRev := npr.GetBranch().GetLatest()
originalData := proto.Clone(latestRev.GetData().(proto.Message))
-
data := originalData
- // Get back to the interface type
- //data := reflect.ValueOf(originalData).Interface()
if depth != 0 {
+ // FIXME: Traversing the struct through reflection sometimes corrupts the data.
+ // Unlike the original python implementation, golang structs are not lazy loaded.
+ // Keeping this non-critical logic for now, but Get operations should be forced to
+ // depth=0 to avoid going through the following loop.
for fieldName, field := range ChildrenFields(latestRev.GetData()) {
childDataName, childDataHolder := GetAttributeValue(data, fieldName, 0)
if field.IsContainer {
@@ -235,8 +249,8 @@
}
}
} else {
- if revs := latestRev.GetChildren(fieldName); revs != nil && len(revs) > 0 {
- rev := latestRev.GetChildren(fieldName)[0]
+ if revs := npr.GetBranch().GetLatest().GetChildren(fieldName); revs != nil && len(revs) > 0 {
+ rev := revs[0]
if rev != nil {
childData := rev.Get(depth - 1)
if reflect.TypeOf(childData) == reflect.TypeOf(childDataHolder.Interface()) {
@@ -260,22 +274,27 @@
return result
}
+// UpdateData will refresh the data content of the revision
func (npr *NonPersistedRevision) UpdateData(data interface{}, branch *Branch) Revision {
npr.mutex.Lock()
defer npr.mutex.Unlock()
+ // Do not update the revision if data is the same
if npr.Config.Data != nil && npr.Config.hashData(npr.Root, data) == npr.Config.Hash {
log.Debugw("stored-data-matches-latest", log.Fields{"stored": npr.Config.Data, "provided": data})
return npr
}
+ // Construct a new revision based on the current one
newRev := NonPersistedRevision{}
newRev.Config = NewDataRevision(npr.Root, data)
newRev.Hash = npr.Hash
+ newRev.Root = npr.Root
+ newRev.Name = npr.Name
newRev.Branch = branch
newRev.Children = make(map[string][]Revision)
- for entryName, childrenEntry := range npr.Children {
+ for entryName, childrenEntry := range branch.GetLatest().GetAllChildren() {
newRev.Children[entryName] = append(newRev.Children[entryName], childrenEntry...)
}
@@ -284,44 +303,91 @@
return &newRev
}
+// UpdateChildren will refresh the list of children with the provided ones
+// It will carefully go through the list and ensure that no child is lost
func (npr *NonPersistedRevision) UpdateChildren(name string, children []Revision, branch *Branch) Revision {
npr.mutex.Lock()
defer npr.mutex.Unlock()
- updatedRev := npr
-
- // Verify if the map contains already contains an entry matching the name value
- // If so, we need to retain the contents of that entry and merge them with the provided children revision list
- if _, exists := updatedRev.Children[name]; exists {
- // Go through all child hashes and save their index within the map
- existChildMap := make(map[string]int)
- for i, child := range updatedRev.Children[name] {
- existChildMap[child.GetHash()] = i
- }
-
- for _, newChild := range children {
- if _, childExists := existChildMap[newChild.GetHash()]; !childExists {
- // revision is not present in the existing list... add it
- updatedRev.Children[name] = append(updatedRev.Children[name], newChild)
- } else {
- // replace
- updatedRev.Children[name][existChildMap[newChild.GetHash()]] = newChild
- }
- }
- } else {
- // Map entry does not exist, thus just create a new entry and assign the provided revisions
- updatedRev.Children[name] = make([]Revision, len(children))
- copy(updatedRev.Children[name], children)
- }
-
+ // Construct a new revision based on the current one
+ updatedRev := &NonPersistedRevision{}
updatedRev.Config = NewDataRevision(npr.Root, npr.Config.Data)
updatedRev.Hash = npr.Hash
updatedRev.Branch = branch
+ updatedRev.Name = npr.Name
+
+ updatedRev.Children = make(map[string][]Revision)
+ for entryName, childrenEntry := range branch.GetLatest().GetAllChildren() {
+ updatedRev.Children[entryName] = append(updatedRev.Children[entryName], childrenEntry...)
+ }
+
+ var updatedChildren []Revision
+
+ // Verify if the map contains already contains an entry matching the name value
+ // If so, we need to retain the contents of that entry and merge them with the provided children revision list
+ if existingChildren := branch.GetLatest().GetChildren(name); existingChildren != nil {
+ // Construct a map of unique child names with the respective index value
+ // for the children in the existing revision as well as the new ones
+ existingNames := make(map[string]int)
+ newNames := make(map[string]int)
+
+ for i, newChild := range children {
+ newNames[newChild.GetName()] = i
+ }
+
+ for i, existingChild := range existingChildren {
+ existingNames[existingChild.GetName()] = i
+
+ // If an existing entry is not in the new list, add it to the updated list, so it is not forgotten
+ if _, exists := newNames[existingChild.GetName()]; !exists {
+ updatedChildren = append(updatedChildren, existingChild)
+ }
+ }
+
+ log.Debugw("existing-children-names", log.Fields{"hash": npr.GetHash(), "names": existingNames})
+
+ // Merge existing and new children
+ for _, newChild := range children {
+ nameIndex, nameExists := existingNames[newChild.GetName()]
+
+ // Does the existing list contain a child with that name?
+ if nameExists {
+ // Check if the data has changed or not
+ if existingChildren[nameIndex].GetData().(proto.Message).String() != newChild.GetData().(proto.Message).String() {
+ // replace entry
+ newChild.GetNode().Root = existingChildren[nameIndex].GetNode().Root
+ updatedChildren = append(updatedChildren, newChild)
+ } else {
+ // keep existing entry
+ updatedChildren = append(updatedChildren, existingChildren[nameIndex])
+ }
+ } else {
+ // new entry ... just add it
+ updatedChildren = append(updatedChildren, newChild)
+ }
+ }
+
+ // Save children in new revision
+ updatedRev.SetChildren(name, updatedChildren)
+
+ updatedNames := make(map[string]int)
+ for i, updatedChild := range updatedChildren {
+ updatedNames[updatedChild.GetName()] = i
+ }
+
+ log.Debugw("updated-children-names", log.Fields{"hash": npr.GetHash(), "names": updatedNames})
+
+ } else {
+ // There are no children available, just save the provided ones
+ updatedRev.SetChildren(name, children)
+ }
+
updatedRev.Finalize(false)
return updatedRev
}
+// UpdateAllChildren will replace the current list of children with the provided ones
func (npr *NonPersistedRevision) UpdateAllChildren(children map[string][]Revision, branch *Branch) Revision {
npr.mutex.Lock()
defer npr.mutex.Unlock()
@@ -330,6 +396,8 @@
newRev.Config = npr.Config
newRev.Hash = npr.Hash
newRev.Branch = branch
+ newRev.Name = npr.Name
+
newRev.Children = make(map[string][]Revision)
for entryName, childrenEntry := range children {
newRev.Children[entryName] = append(newRev.Children[entryName], childrenEntry...)
@@ -339,14 +407,25 @@
return newRev
}
+// Drop is used to indicate when a revision is no longer required
func (npr *NonPersistedRevision) Drop(txid string, includeConfig bool) {
- GetRevCache().Lock()
- defer GetRevCache().Unlock()
+ log.Debugw("dropping-revision", log.Fields{"hash": npr.GetHash(), "stack": string(debug.Stack())})
+ npr.discarded = true
+}
- if includeConfig {
- delete(GetRevCache().Cache, npr.Config.Hash)
+// ChildDrop will remove a child entry matching the provided parameters from the current revision
+func (npr *NonPersistedRevision) ChildDrop(childType string, childHash string) {
+ if childType != "" {
+ children := make([]Revision, len(npr.GetChildren(childType)))
+ copy(children, npr.GetChildren(childType))
+ for i, child := range children {
+ if child.GetHash() == childHash {
+ children = append(children[:i], children[i+1:]...)
+ npr.SetChildren(childType, children)
+ break
+ }
+ }
}
- delete(GetRevCache().Cache, npr.Hash)
}
func (npr *NonPersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
diff --git a/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go b/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go
index c2a6c64..a56b776 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/persisted_revision.go
@@ -19,9 +19,7 @@
import (
"bytes"
"compress/gzip"
- "encoding/hex"
"github.com/golang/protobuf/proto"
- "github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
"github.com/opencord/voltha-go/db/kvstore"
"reflect"
@@ -35,13 +33,27 @@
Revision
Compress bool
- events chan *kvstore.Event `json:"-"`
- kvStore *Backend `json:"-"`
- mutex sync.RWMutex `json:"-"`
+ events chan *kvstore.Event
+ kvStore *Backend
+ mutex sync.RWMutex
isStored bool
isWatched bool
}
+type watchCache struct {
+ Cache sync.Map
+}
+
+var watchCacheInstance *watchCache
+var watchCacheOne sync.Once
+
+func Watches() *watchCache {
+ watchCacheOne.Do(func() {
+ watchCacheInstance = &watchCache{Cache: sync.Map{}}
+ })
+ return watchCacheInstance
+}
+
// NewPersistedRevision creates a new instance of a PersistentRevision structure
func NewPersistedRevision(branch *Branch, data interface{}, children map[string][]Revision) Revision {
pr := &PersistedRevision{}
@@ -55,11 +67,6 @@
pr.store(skipOnExist)
}
-type revData struct {
- Children map[string][]string
- Config string
-}
-
func (pr *PersistedRevision) store(skipOnExist bool) {
if pr.GetBranch().Txid != "" {
return
@@ -92,97 +99,43 @@
}
func (pr *PersistedRevision) SetupWatch(key string) {
+ if key == "" {
+ log.Debugw("ignoring-watch", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
+ return
+ }
+
+ if _, exists := Watches().Cache.LoadOrStore(key+"-"+pr.GetHash(), struct{}{}); exists {
+ return
+ }
+
if pr.events == nil {
pr.events = make(chan *kvstore.Event)
- log.Debugw("setting-watch", log.Fields{"key": key, "revision-hash": pr.GetHash()})
+ log.Debugw("setting-watch-channel", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
pr.SetName(key)
pr.events = pr.kvStore.CreateWatch(key)
+ }
+ if !pr.isWatched {
pr.isWatched = true
+ log.Debugw("setting-watch-routine", log.Fields{"key": key, "revision-hash": pr.GetHash(), "stack": string(debug.Stack())})
+
// Start watching
go pr.startWatching()
}
}
-func (pr *PersistedRevision) updateInMemory(data interface{}) {
- pr.mutex.Lock()
- defer pr.mutex.Unlock()
-
- var pac *proxyAccessControl
- var pathLock string
-
- //
- // If a proxy exists for this revision, use it to lock access to the path
- // and prevent simultaneous updates to the object in memory
- //
- if pr.GetNode().GetProxy() != nil {
- pathLock, _ = pr.GetNode().GetProxy().parseForControlledPath(pr.GetNode().GetProxy().getFullPath())
-
- // If the proxy already has a request in progress, then there is no need to process the watch
- log.Debugw("update-in-memory--checking-pathlock", log.Fields{"key": pr.GetHash(), "pathLock": pathLock})
- if PAC().IsReserved(pathLock) {
- switch pr.GetNode().GetRoot().GetProxy().Operation {
- case PROXY_ADD:
- fallthrough
- case PROXY_REMOVE:
- fallthrough
- case PROXY_UPDATE:
- log.Debugw("update-in-memory--skipping", log.Fields{"key": pr.GetHash(), "path": pr.GetNode().GetProxy().getFullPath()})
- return
- default:
- log.Debugw("update-in-memory--operation", log.Fields{"operation": pr.GetNode().GetRoot().GetProxy().Operation})
- }
- } else {
- log.Debugw("update-in-memory--path-not-locked", log.Fields{"key": pr.GetHash(), "path": pr.GetNode().GetProxy().getFullPath()})
- }
-
- log.Debugw("update-in-memory--reserve-and-lock", log.Fields{"key": pr.GetHash(), "path": pathLock})
-
- pac = PAC().ReservePath(pr.GetNode().GetProxy().getFullPath(), pr.GetNode().GetProxy(), pathLock)
- pac.SetProxy(pr.GetNode().GetProxy())
- pac.lock()
-
- defer log.Debugw("update-in-memory--release-and-unlock", log.Fields{"key": pr.GetHash(), "path": pathLock})
- defer pac.unlock()
- defer PAC().ReleasePath(pathLock)
- }
-
- //
- // Update the object in memory through a transaction
- // This will allow for the object to be subsequently merged with any changes
- // that might have occurred in memory
- //
-
- log.Debugw("update-in-memory--custom-transaction", log.Fields{"key": pr.GetHash()})
-
- // Prepare the transaction
- branch := pr.GetBranch()
- latest := branch.GetLatest()
- txidBin, _ := uuid.New().MarshalBinary()
- txid := hex.EncodeToString(txidBin)[:12]
-
- makeBranch := func(node *node) *Branch {
- return node.MakeBranch(txid)
- }
-
- // Apply the update in a transaction branch
- updatedRev := latest.GetNode().Update("", data, false, txid, makeBranch)
- updatedRev.SetName(latest.GetName())
-
- // Merge the transaction branch in memory
- if mergedRev, _ := latest.GetNode().MergeBranch(txid, false); mergedRev != nil {
- branch.SetLatest(mergedRev)
- }
-}
-
func (pr *PersistedRevision) startWatching() {
- log.Debugw("starting-watch", log.Fields{"key": pr.GetHash()})
+ log.Debugw("starting-watch", log.Fields{"key": pr.GetHash(), "stack": string(debug.Stack())})
StopWatchLoop:
for {
+ if pr.IsDiscarded() {
+ break StopWatchLoop
+ }
+
select {
case event, ok := <-pr.events:
if !ok {
@@ -209,7 +162,9 @@
if err := proto.Unmarshal(dataPair.Value.([]byte), data.Interface().(proto.Message)); err != nil {
log.Errorw("update-in-memory--unmarshal-failed", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "error": err})
} else {
- pr.updateInMemory(data.Interface())
+ if pr.GetNode().GetProxy() != nil {
+ pr.LoadFromPersistence(pr.GetNode().GetProxy().getFullPath(), "")
+ }
}
}
@@ -219,110 +174,9 @@
}
}
- log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName()})
-}
+ Watches().Cache.Delete(pr.GetName() + "-" + pr.GetHash())
-func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
- log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
-
- var response []Revision
- var rev Revision
-
- rev = pr
-
- if pr.kvStore != nil && path != "" {
- blobMap, _ := pr.kvStore.List(path)
-
- partition := strings.SplitN(path, "/", 2)
- name := partition[0]
-
- if len(partition) < 2 {
- path = ""
- } else {
- path = partition[1]
- }
-
- field := ChildrenFields(rev.GetBranch().Node.Type)[name]
-
- if field != nil && field.IsContainer {
- var children []Revision
- children = make([]Revision, len(rev.GetChildren(name)))
- copy(children, rev.GetChildren(name))
- existChildMap := make(map[string]int)
- for i, child := range rev.GetChildren(name) {
- existChildMap[child.GetHash()] = i
- }
-
- for _, blob := range blobMap {
- output := blob.Value.([]byte)
-
- data := reflect.New(field.ClassType.Elem())
-
- if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
- log.Errorw(
- "loading-from-persistence--failed-to-unmarshal",
- log.Fields{"path": path, "txid": txid, "error": err},
- )
- } else if field.Key != "" {
- var key reflect.Value
- var keyValue interface{}
- var keyStr string
-
- if path == "" {
- // e.g. /logical_devices --> path="" name=logical_devices key=""
- _, key = GetAttributeValue(data.Interface(), field.Key, 0)
- keyStr = key.String()
-
- } else {
- // e.g.
- // /logical_devices/abcde --> path="abcde" name=logical_devices
- // /logical_devices/abcde/image_downloads --> path="abcde/image_downloads" name=logical_devices
-
- partition := strings.SplitN(path, "/", 2)
- key := partition[0]
- if len(partition) < 2 {
- path = ""
- } else {
- path = partition[1]
- }
- keyValue = field.KeyFromStr(key)
- keyStr = keyValue.(string)
-
- if idx, childRev := rev.GetBranch().Node.findRevByKey(children, field.Key, keyValue); childRev != nil {
- // Key is memory, continue recursing path
- if newChildRev := childRev.LoadFromPersistence(path, txid); newChildRev != nil {
- children[idx] = newChildRev[0]
-
- rev := rev.UpdateChildren(name, rev.GetChildren(name), rev.GetBranch())
- rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
-
- response = append(response, newChildRev[0])
- continue
- }
- }
- }
-
- childRev := rev.GetBranch().Node.MakeNode(data.Interface(), txid).Latest(txid)
- childRev.SetName(name + "/" + keyStr)
-
- // Do not process a child that is already in memory
- if _, childExists := existChildMap[childRev.GetHash()]; !childExists {
- // Create watch for <component>/<key>
- childRev.SetupWatch(childRev.GetName())
-
- children = append(children, childRev)
- rev = rev.UpdateChildren(name, children, rev.GetBranch())
-
- rev.GetBranch().Node.makeLatest(rev.GetBranch(), rev, nil)
- }
- response = append(response, childRev)
- continue
- }
- }
- }
- }
-
- return response
+ log.Debugw("exiting-watch", log.Fields{"key": pr.GetHash(), "watch": pr.GetName(), "stack": string(debug.Stack())})
}
// UpdateData modifies the information in the data model and saves it in the persistent storage
@@ -335,6 +189,17 @@
Revision: newNPR,
Compress: pr.Compress,
kvStore: pr.kvStore,
+ events: pr.events,
+ }
+
+ if newPR.GetHash() != pr.GetHash() {
+ newPR.isWatched = false
+ newPR.isStored = false
+ pr.Drop(branch.Txid, false)
+ newPR.SetupWatch(newPR.GetName())
+ } else {
+ newPR.isWatched = true
+ newPR.isStored = true
}
return newPR
@@ -350,6 +215,17 @@
Revision: newNPR,
Compress: pr.Compress,
kvStore: pr.kvStore,
+ events: pr.events,
+ }
+
+ if newPR.GetHash() != pr.GetHash() {
+ newPR.isWatched = false
+ newPR.isStored = false
+ pr.Drop(branch.Txid, false)
+ newPR.SetupWatch(newPR.GetName())
+ } else {
+ newPR.isWatched = true
+ newPR.isStored = true
}
return newPR
@@ -365,6 +241,17 @@
Revision: newNPR,
Compress: pr.Compress,
kvStore: pr.kvStore,
+ events: pr.events,
+ }
+
+ if newPR.GetHash() != pr.GetHash() {
+ newPR.isWatched = false
+ newPR.isStored = false
+ pr.Drop(branch.Txid, false)
+ newPR.SetupWatch(newPR.GetName())
+ } else {
+ newPR.isWatched = true
+ newPR.isStored = true
}
return newPR
@@ -407,3 +294,182 @@
pr.Revision.Drop(txid, includeConfig)
}
+
+// verifyPersistedEntry validates if the provided data is available or not in memory and applies updates as required
+func (pr *PersistedRevision) verifyPersistedEntry(data interface{}, typeName string, keyName string, keyValue string, txid string) (response Revision) {
+ rev := pr
+
+ children := make([]Revision, len(rev.GetBranch().GetLatest().GetChildren(typeName)))
+ copy(children, rev.GetBranch().GetLatest().GetChildren(typeName))
+
+ // Verify if the revision contains a child that matches that key
+ if childIdx, childRev := rev.GetNode().findRevByKey(rev.GetBranch().GetLatest().GetChildren(typeName), keyName, keyValue); childRev != nil {
+ // A child matching the provided key exists in memory
+ // Verify if the data differs to what was retrieved from persistence
+ if childRev.GetData().(proto.Message).String() != data.(proto.Message).String() {
+ log.Debugw("verify-persisted-entry--data-is-different", log.Fields{
+ "key": childRev.GetHash(),
+ "name": childRev.GetName(),
+ })
+
+ // Data has changed; replace the child entry and update the parent revision
+ updatedChildRev := childRev.UpdateData(data, childRev.GetBranch())
+ updatedChildRev.SetupWatch(updatedChildRev.GetName())
+ childRev.Drop(txid, false)
+
+ if childIdx >= 0 {
+ children[childIdx] = updatedChildRev
+ } else {
+ children = append(children, updatedChildRev)
+ }
+
+ rev.GetBranch().LatestLock.Lock()
+ updatedRev := rev.UpdateChildren(typeName, children, rev.GetBranch())
+ rev.GetBranch().Node.makeLatest(rev.GetBranch(), updatedRev, nil)
+ rev.GetBranch().LatestLock.Unlock()
+
+ // Drop the previous child revision
+ rev.GetBranch().Node.Latest().ChildDrop(typeName, childRev.GetHash())
+
+ if updatedChildRev != nil {
+ log.Debugw("verify-persisted-entry--adding-child", log.Fields{
+ "key": updatedChildRev.GetHash(),
+ "name": updatedChildRev.GetName(),
+ })
+ response = updatedChildRev
+ }
+ } else {
+ // Data is the same. Continue to the next entry
+ log.Debugw("verify-persisted-entry--same-data", log.Fields{
+ "key": childRev.GetHash(),
+ "name": childRev.GetName(),
+ })
+ if childRev != nil {
+ log.Debugw("verify-persisted-entry--keeping-child", log.Fields{
+ "key": childRev.GetHash(),
+ "name": childRev.GetName(),
+ })
+ response = childRev
+ }
+ }
+ } else {
+ // There is no available child with that key value.
+ // Create a new child and update the parent revision.
+ log.Debugw("verify-persisted-entry--no-such-entry", log.Fields{
+ "key": keyValue,
+ "name": typeName,
+ })
+
+ // Construct a new child node with the retrieved persistence data
+ childRev = rev.GetBranch().Node.MakeNode(data, txid).Latest(txid)
+
+ // We need to start watching this entry for future changes
+ childRev.SetName(typeName + "/" + keyValue)
+
+ // Add the child to the parent revision
+ rev.GetBranch().LatestLock.Lock()
+ children = append(children, childRev)
+ updatedRev := rev.GetBranch().Node.Latest().UpdateChildren(typeName, children, rev.GetBranch())
+ childRev.SetupWatch(childRev.GetName())
+
+ //rev.GetBranch().Node.Latest().Drop(txid, false)
+ rev.GetBranch().Node.makeLatest(rev.GetBranch(), updatedRev, nil)
+ rev.GetBranch().LatestLock.Unlock()
+
+ // Child entry is valid and can be included in the response object
+ if childRev != nil {
+ log.Debugw("verify-persisted-entry--adding-child", log.Fields{
+ "key": childRev.GetHash(),
+ "name": childRev.GetName(),
+ })
+ response = childRev
+ }
+ }
+
+ return response
+}
+
+// LoadFromPersistence retrieves data from kv store at the specified location and refreshes the memory
+// by adding missing entries, updating changed entries and ignoring unchanged ones
+func (pr *PersistedRevision) LoadFromPersistence(path string, txid string) []Revision {
+ pr.mutex.Lock()
+ defer pr.mutex.Unlock()
+
+ log.Debugw("loading-from-persistence", log.Fields{"path": path, "txid": txid})
+
+ var response []Revision
+ var rev Revision
+
+ rev = pr
+
+ if pr.kvStore != nil && path != "" {
+ blobMap, _ := pr.kvStore.List(path)
+
+ partition := strings.SplitN(path, "/", 2)
+ name := partition[0]
+
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
+
+ field := ChildrenFields(rev.GetBranch().Node.Type)[name]
+
+ if field != nil && field.IsContainer {
+ log.Debugw("load-from-persistence--start-blobs", log.Fields{
+ "path": path,
+ "name": name,
+ "size": len(blobMap),
+ })
+
+ for _, blob := range blobMap {
+ output := blob.Value.([]byte)
+
+ data := reflect.New(field.ClassType.Elem())
+
+ if err := proto.Unmarshal(output, data.Interface().(proto.Message)); err != nil {
+ log.Errorw("load-from-persistence--failed-to-unmarshal", log.Fields{
+ "path": path,
+ "txid": txid,
+ "error": err,
+ })
+ } else if path == "" {
+ if field.Key != "" {
+ // Retrieve the key identifier value from the data structure
+ // based on the field's key attribute
+ _, key := GetAttributeValue(data.Interface(), field.Key, 0)
+
+ if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, key.String(), txid); entry != nil {
+ response = append(response, entry)
+ }
+ }
+
+ } else if field.Key != "" {
+ // The request is for a specific entry/id
+ partition := strings.SplitN(path, "/", 2)
+ key := partition[0]
+ if len(partition) < 2 {
+ path = ""
+ } else {
+ path = partition[1]
+ }
+ keyValue := field.KeyFromStr(key)
+
+ if entry := pr.verifyPersistedEntry(data.Interface(), name, field.Key, keyValue.(string), txid); entry != nil {
+ response = append(response, entry)
+ }
+ }
+ }
+
+ log.Debugw("load-from-persistence--end-blobs", log.Fields{"path": path, "name": name})
+ } else {
+ log.Debugw("load-from-persistence--cannot-process-field", log.Fields{
+ "type": rev.GetBranch().Node.Type,
+ "name": name,
+ })
+ }
+ }
+
+ return response
+}
diff --git a/vendor/github.com/opencord/voltha-go/db/model/proxy.go b/vendor/github.com/opencord/voltha-go/db/model/proxy.go
index 86d426a..b45fb1d 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/proxy.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/proxy.go
@@ -147,6 +147,7 @@
PROXY_ADD
PROXY_UPDATE
PROXY_REMOVE
+ PROXY_CREATE
)
// parseForControlledPath verifies if a proxy path matches a pattern
@@ -243,8 +244,11 @@
p.Operation = PROXY_UPDATE
pac.SetProxy(p)
+ defer func(op ProxyOperation) {
+ pac.getProxy().Operation = op
+ }(PROXY_GET)
- log.Debugw("proxy-operation--update", log.Fields{"operation":p.Operation})
+ log.Debugw("proxy-operation--update", log.Fields{"operation": p.Operation})
return pac.Update(fullPath, data, strict, txid, controlled)
}
@@ -275,9 +279,13 @@
defer PAC().ReleasePath(pathLock)
p.Operation = PROXY_ADD
+ defer func() {
+ p.Operation = PROXY_GET
+ }()
+
pac.SetProxy(p)
- log.Debugw("proxy-operation--add", log.Fields{"operation":p.Operation})
+ log.Debugw("proxy-operation--add", log.Fields{"operation": p.Operation})
return pac.Add(fullPath, data, txid, controlled)
}
@@ -307,8 +315,11 @@
p.Operation = PROXY_ADD
pac.SetProxy(p)
+ defer func() {
+ p.Operation = PROXY_GET
+ }()
- log.Debugw("proxy-operation--add", log.Fields{"operation":p.Operation})
+ log.Debugw("proxy-operation--add", log.Fields{"operation": p.Operation})
return pac.Add(fullPath, data, txid, controlled)
}
@@ -338,12 +349,50 @@
p.Operation = PROXY_REMOVE
pac.SetProxy(p)
+ defer func() {
+ p.Operation = PROXY_GET
+ }()
- log.Debugw("proxy-operation--remove", log.Fields{"operation":p.Operation})
+ log.Debugw("proxy-operation--remove", log.Fields{"operation": p.Operation})
return pac.Remove(fullPath, txid, controlled)
}
+// CreateProxy to interact with specific path directly
+func (p *Proxy) CreateProxy(path string, exclusive bool) *Proxy {
+ if !strings.HasPrefix(path, "/") {
+ log.Errorf("invalid path: %s", path)
+ return nil
+ }
+
+ var fullPath string
+ var effectivePath string
+ if path == "/" {
+ fullPath = p.getPath()
+ effectivePath = p.getFullPath()
+ } else {
+ fullPath = p.getPath() + path
+ effectivePath = p.getFullPath() + path
+ }
+
+ pathLock, controlled := p.parseForControlledPath(effectivePath)
+
+ log.Debugf("Path: %s, Effective: %s, Full: %s, PathLock: %s", path, effectivePath, fullPath, pathLock)
+
+ pac := PAC().ReservePath(path, p, pathLock)
+ defer PAC().ReleasePath(pathLock)
+
+ p.Operation = PROXY_CREATE
+ pac.SetProxy(p)
+ defer func() {
+ p.Operation = PROXY_GET
+ }()
+
+ log.Debugw("proxy-operation--create-proxy", log.Fields{"operation": p.Operation})
+
+ return pac.CreateProxy(fullPath, exclusive, controlled)
+}
+
// OpenTransaction creates a new transaction branch to isolate operations made to the data model
func (p *Proxy) OpenTransaction() *Transaction {
txid := p.GetRoot().MakeTxBranch()
diff --git a/vendor/github.com/opencord/voltha-go/db/model/proxy_access_control.go b/vendor/github.com/opencord/voltha-go/db/model/proxy_access_control.go
index 66d3222..2a5d034 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/proxy_access_control.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/proxy_access_control.go
@@ -109,6 +109,7 @@
// lock will prevent access to a model path
func (pac *proxyAccessControl) lock() {
+ log.Debugw("locking", log.Fields{"path": pac.Path})
pac.PathLock <- struct{}{}
pac.setStart(time.Now())
}
@@ -116,6 +117,7 @@
// unlock will release control of a model path
func (pac *proxyAccessControl) unlock() {
<-pac.PathLock
+ log.Debugw("unlocking", log.Fields{"path": pac.Path})
pac.setStop(time.Now())
GetProfiling().AddToInMemoryLockTime(pac.getStop().Sub(pac.getStart()).Seconds())
}
@@ -243,3 +245,20 @@
return pac.getProxy().GetRoot().Remove(path, txid, nil)
}
+
+// CreateProxy allows interaction for a specific path
+func (pac *proxyAccessControl) CreateProxy(path string, exclusive bool, control bool) *Proxy {
+ if control {
+ pac.lock()
+ log.Debugw("locked-access--create-proxy", log.Fields{"path": path, "fullPath": pac.Path})
+ defer pac.unlock()
+ defer log.Debugw("unlocked-access--create-proxy", log.Fields{"path": path, "fullPath": pac.Proxy.getFullPath()})
+ }
+
+ result := pac.getProxy().GetRoot().CreateProxy(path, exclusive)
+
+ if result != nil {
+ return result
+ }
+ return nil
+}
diff --git a/vendor/github.com/opencord/voltha-go/db/model/revision.go b/vendor/github.com/opencord/voltha-go/db/model/revision.go
index 2c10137..79620e1 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/revision.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/revision.go
@@ -17,10 +17,12 @@
type Revision interface {
Finalize(bool)
+ IsDiscarded() bool
SetConfig(revision *DataRevision)
GetConfig() *DataRevision
Drop(txid string, includeConfig bool)
StorageDrop(txid string, includeConfig bool)
+ ChildDrop(childType string, childHash string)
SetChildren(name string, children []Revision)
GetChildren(name string) []Revision
SetAllChildren(children map[string][]Revision)
diff --git a/vendor/github.com/opencord/voltha-go/db/model/root.go b/vendor/github.com/opencord/voltha-go/db/model/root.go
index 8f9e001..338ef67 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/root.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/root.go
@@ -119,11 +119,11 @@
r.Callbacks = r.Callbacks[1:]
go callback.Execute(nil)
}
- for len(r.NotificationCallbacks) > 0 {
- callback := r.NotificationCallbacks[0]
- r.NotificationCallbacks = r.NotificationCallbacks[1:]
- go callback.Execute(nil)
- }
+ //for len(r.NotificationCallbacks) > 0 {
+ // callback := r.NotificationCallbacks[0]
+ // r.NotificationCallbacks = r.NotificationCallbacks[1:]
+ // go callback.Execute(nil)
+ //}
}
func (r *root) hasCallbacks() bool {
@@ -181,7 +181,6 @@
r.Proxy.ParentNode.Latest(txid).Finalize(false)
}
-
// Update modifies the content of an object at a given path with the provided data
func (r *root) Update(path string, data interface{}, strict bool, txid string, makeBranch MakeBranchFunction) Revision {
var result Revision
@@ -310,6 +309,6 @@
}
type rootData struct {
- Latest string `json:latest`
- Tags map[string]string `json:tags`
-}
\ No newline at end of file
+ Latest string `json:"latest"`
+ Tags map[string]string `json:"tags"`
+}
diff --git a/vendor/github.com/opencord/voltha-go/db/model/utils.go b/vendor/github.com/opencord/voltha-go/db/model/utils.go
index f0fd618..b28e92f 100644
--- a/vendor/github.com/opencord/voltha-go/db/model/utils.go
+++ b/vendor/github.com/opencord/voltha-go/db/model/utils.go
@@ -36,8 +36,8 @@
// FindOwnerType will traverse a data structure and find the parent type of the specified object
func FindOwnerType(obj reflect.Value, name string, depth int, found bool) reflect.Type {
- prefix := ""
- for d:=0; d< depth; d++ {
+ prefix := ""
+ for d := 0; d < depth; d++ {
prefix += ">>"
}
k := obj.Kind()
diff --git a/vendor/github.com/opencord/voltha-go/kafka/client.go b/vendor/github.com/opencord/voltha-go/kafka/client.go
index a4c49ca..3d37f6e 100644
--- a/vendor/github.com/opencord/voltha-go/kafka/client.go
+++ b/vendor/github.com/opencord/voltha-go/kafka/client.go
@@ -32,7 +32,7 @@
const (
GroupIdKey = "groupId"
- Offset = "offset"
+ Offset = "offset"
)
const (
diff --git a/vendor/github.com/opencord/voltha-go/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-go/kafka/kafka_inter_container_library.go
index b9c03e6..afad2ac 100644
--- a/vendor/github.com/opencord/voltha-go/kafka/kafka_inter_container_library.go
+++ b/vendor/github.com/opencord/voltha-go/kafka/kafka_inter_container_library.go
@@ -549,7 +549,7 @@
Type: ic.MessageType_RESPONSE,
FromTopic: request.Header.ToTopic,
ToTopic: request.Header.FromTopic,
- KeyTopic: request.Header.KeyTopic,
+ KeyTopic: request.Header.KeyTopic,
Timestamp: time.Now().UnixNano(),
}
@@ -706,7 +706,7 @@
key := msg.Header.KeyTopic
log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
// TODO: handle error response.
- go kp.kafkaClient.Send(icm, replyTopic, key)
+ go kp.kafkaClient.Send(icm, replyTopic, key)
}
} else if msg.Header.Type == ic.MessageType_RESPONSE {
log.Debugw("response-received", log.Fields{"msg-header": msg.Header})
@@ -763,7 +763,7 @@
Type: ic.MessageType_REQUEST,
FromTopic: replyTopic.Name,
ToTopic: toTopic.Name,
- KeyTopic: key,
+ KeyTopic: key,
Timestamp: time.Now().UnixNano(),
}
requestBody := &ic.InterContainerRequestBody{
diff --git a/vendor/github.com/opencord/voltha-go/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-go/kafka/sarama_client.go
index add1900..e920a83 100644
--- a/vendor/github.com/opencord/voltha-go/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-go/kafka/sarama_client.go
@@ -52,7 +52,7 @@
producer sarama.AsyncProducer
consumer sarama.Consumer
groupConsumers map[string]*scc.Consumer
- lockOfGroupConsumers sync.RWMutex
+ lockOfGroupConsumers sync.RWMutex
consumerGroupPrefix string
consumerType int
consumerGroupName string
@@ -454,7 +454,6 @@
// Send message to kafka
sc.producer.Input() <- kafkaMsg
-
// Wait for result
// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
select {
@@ -920,7 +919,6 @@
return channels
}
-
func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
sc.lockOfGroupConsumers.Lock()
defer sc.lockOfGroupConsumers.Unlock()
@@ -935,7 +933,7 @@
if _, exist := sc.groupConsumers[topic]; exist {
consumer := sc.groupConsumers[topic]
delete(sc.groupConsumers, topic)
- if err := consumer.Close(); err!= nil {
+ if err := consumer.Close(); err != nil {
log.Errorw("failure-closing-consumer", log.Fields{"error": err})
return err
}
diff --git a/vendor/github.com/opencord/voltha-go/rw_core/coreIf/logical_device_agent_if.go b/vendor/github.com/opencord/voltha-go/rw_core/coreIf/logical_device_agent_if.go
index 8394fac..c2614b2 100644
--- a/vendor/github.com/opencord/voltha-go/rw_core/coreIf/logical_device_agent_if.go
+++ b/vendor/github.com/opencord/voltha-go/rw_core/coreIf/logical_device_agent_if.go
@@ -20,9 +20,9 @@
package coreIf
import (
- "github.com/opencord/voltha-protos/go/voltha"
"github.com/opencord/voltha-go/rw_core/graph"
"github.com/opencord/voltha-go/rw_core/utils"
+ "github.com/opencord/voltha-protos/go/voltha"
)
// LogicalAgent represents a generic agent
diff --git a/vendor/github.com/opencord/voltha-go/rw_core/flow_decomposition/flow_decomposer.go b/vendor/github.com/opencord/voltha-go/rw_core/flow_decomposition/flow_decomposer.go
index ec2904f..41fdc4a 100644
--- a/vendor/github.com/opencord/voltha-go/rw_core/flow_decomposition/flow_decomposer.go
+++ b/vendor/github.com/opencord/voltha-go/rw_core/flow_decomposition/flow_decomposer.go
@@ -22,11 +22,11 @@
"fmt"
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/common/log"
- ofp "github.com/opencord/voltha-protos/go/openflow_13"
- "github.com/opencord/voltha-protos/go/voltha"
"github.com/opencord/voltha-go/rw_core/coreIf"
"github.com/opencord/voltha-go/rw_core/graph"
fu "github.com/opencord/voltha-go/rw_core/utils"
+ ofp "github.com/opencord/voltha-protos/go/openflow_13"
+ "github.com/opencord/voltha-protos/go/voltha"
"math/big"
)
@@ -463,6 +463,18 @@
return 0
}
+func GetTunnelId(flow *ofp.OfpFlowStats) uint64 {
+ if flow == nil {
+ return 0
+ }
+ for _, field := range GetOfbFields(flow) {
+ if field.Type == TUNNEL_ID {
+ return field.GetTunnelId()
+ }
+ }
+ return 0
+}
+
//GetMetaData - legacy get method (only want lower 32 bits)
func GetMetaData(flow *ofp.OfpFlowStats) uint32 {
if flow == nil {
@@ -518,6 +530,18 @@
return (md >> 32) & 0xffffffff
}
+// Extract the child device port from a flow that contains the parent device peer port. Typically the UNI port of an
+// ONU child device. Per TST agreement this will be the lower 32 bits of tunnel id reserving upper 32 bits for later
+// use
+func GetChildPortFromTunnelId(flow *ofp.OfpFlowStats) uint32 {
+ tid := GetTunnelId(flow)
+ if tid == 0 {
+ return 0
+ }
+ // Per TST agreement we are keeping any child port id (uni port id) in the lower 32 bits
+ return uint32(tid & 0xffffffff)
+}
+
func HasNextTable(flow *ofp.OfpFlowStats) bool {
if flow == nil {
return false
@@ -751,9 +775,10 @@
}
//DecomposeRules decomposes per-device flows and flow-groups from the flows and groups defined on a logical device
-func (fd *FlowDecomposer) DecomposeRules(agent coreIf.LogicalDeviceAgent, flows ofp.Flows, groups ofp.FlowGroups) *fu.DeviceRules {
+func (fd *FlowDecomposer) DecomposeRules(agent coreIf.LogicalDeviceAgent, flows ofp.Flows, groups ofp.FlowGroups, includeDefaultFlows bool) *fu.DeviceRules {
rules := agent.GetAllDefaultRules()
deviceRules := rules.Copy()
+ devicesToUpdate := make(map[string]string)
groupMap := make(map[uint32]*ofp.OfpGroupEntry)
for _, groupEntry := range groups.Items {
@@ -766,9 +791,15 @@
for deviceId, flowAndGroups := range decomposedRules.Rules {
deviceRules.CreateEntryIfNotExist(deviceId)
deviceRules.Rules[deviceId].AddFrom(flowAndGroups)
+ devicesToUpdate[deviceId] = deviceId
}
}
- return deviceRules
+ if includeDefaultFlows {
+ return deviceRules
+ }
+ updatedDeviceRules := deviceRules.FilterRules(devicesToUpdate)
+
+ return updatedDeviceRules
}
// Handles special case of any controller-bound flow for a parent device
@@ -804,6 +835,7 @@
}
}
}
+
return newDeviceRules
}
@@ -840,6 +872,7 @@
MatchFields: []*ofp.OfpOxmOfbField{
InPort(egressHop.Ingress),
VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | inputPort),
+ TunnelId(uint64(inputPort)),
},
Actions: []*ofp.OfpAction{
PushVlan(0x8100),
@@ -859,6 +892,7 @@
VlanVid(uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000),
VlanPcp(0),
Metadata_ofp(uint64(inputPort)),
+ TunnelId(uint64(inputPort)),
},
Actions: []*ofp.OfpAction{
PopVlan(),
@@ -869,6 +903,7 @@
}
}
deviceRules.AddFlowsAndGroup(egressHop.DeviceID, fg)
+
return deviceRules
}
@@ -895,6 +930,7 @@
KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
MatchFields: []*ofp.OfpOxmOfbField{
InPort(ingressHop.Ingress),
+ TunnelId(uint64(inPortNo)),
},
Actions: GetActions(flow),
}
@@ -939,6 +975,7 @@
KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
MatchFields: []*ofp.OfpOxmOfbField{
InPort(egressHop.Ingress), //egress_hop.ingress_port.port_no
+ TunnelId(uint64(inPortNo)),
},
Actions: []*ofp.OfpAction{
Output(egressHop.Egress),
@@ -958,6 +995,7 @@
KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
MatchFields: []*ofp.OfpOxmOfbField{
InPort(egressHop.Ingress),
+ TunnelId(uint64(inPortNo)),
},
}
// Augment the matchfields with the ofpfields from the flow
@@ -1020,6 +1058,7 @@
MatchFields: []*ofp.OfpOxmOfbField{
InPort(ingressHop.Ingress),
Metadata_ofp(innerTag),
+ TunnelId(uint64(portNumber)),
},
Actions: GetActions(flow),
}
@@ -1039,6 +1078,7 @@
KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
MatchFields: []*ofp.OfpOxmOfbField{
InPort(ingressHop.Ingress),
+ TunnelId(uint64(inPortNo)),
},
Actions: GetActions(flow),
}
@@ -1052,6 +1092,7 @@
fg.AddFlow(MkFlowStat(fa))
deviceRules.AddFlowsAndGroup(ingressHop.DeviceID, fg)
}
+
return deviceRules
}
@@ -1080,6 +1121,7 @@
KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
MatchFields: []*ofp.OfpOxmOfbField{
InPort(ingressHop.Ingress),
+ TunnelId(uint64(inPortNo)),
},
Actions: []*ofp.OfpAction{
Output(ingressHop.Egress),
@@ -1230,10 +1272,9 @@
inPortNo := GetInPort(flow)
outPortNo := GetOutPort(flow)
-
deviceRules := fu.NewDeviceRules()
-
route := agent.GetRoute(inPortNo, outPortNo)
+
switch len(route) {
case 0:
log.Errorw("no-route", log.Fields{"inPortNo": inPortNo, "outPortNo": outPortNo, "comment": "deleting-flow"})
diff --git a/vendor/github.com/opencord/voltha-go/rw_core/graph/device_graph.go b/vendor/github.com/opencord/voltha-go/rw_core/graph/device_graph.go
index 376df16..5583023 100644
--- a/vendor/github.com/opencord/voltha-go/rw_core/graph/device_graph.go
+++ b/vendor/github.com/opencord/voltha-go/rw_core/graph/device_graph.go
@@ -139,7 +139,7 @@
// Build the graph
var device *voltha.Device
for _, logicalPort := range dg.logicalPorts {
- device, _ = dg.getDevice(logicalPort.DeviceId)
+ device, _ = dg.getDevice(logicalPort.DeviceId, false)
dg.GGraph = dg.addDevice(device, dg.GGraph, &dg.devicesAdded, &dg.portsAdded, dg.boundaryPorts)
}
@@ -148,6 +148,7 @@
// AddPort adds a port to the graph. If the graph is empty it will just invoke ComputeRoutes function
func (dg *DeviceGraph) AddPort(lp *voltha.LogicalPort) {
+ log.Debugw("Addport", log.Fields{"logicalPort": lp})
// If the graph does not exist invoke ComputeRoutes.
if len(dg.boundaryPorts) == 0 {
dg.ComputeRoutes([]*voltha.LogicalPort{lp})
@@ -161,12 +162,14 @@
// If the port is already part of the boundary ports, do nothing
if dg.portExist(portId) {
- fmt.Println("port exists")
return
}
+ // Add the port to the set of boundary ports
+ dg.boundaryPorts[portId] = lp.OfpPort.PortNo
+
// Add the device where this port is located to the device graph. If the device is already added then
// only the missing port will be added
- device, _ := dg.getDevice(lp.DeviceId)
+ device, _ := dg.getDevice(lp.DeviceId, false)
dg.GGraph = dg.addDevice(device, dg.GGraph, &dg.devicesAdded, &dg.portsAdded, dg.boundaryPorts)
if lp.RootPort {
@@ -184,6 +187,7 @@
}
func (dg *DeviceGraph) Print() error {
+ log.Debugw("Print", log.Fields{"graph": dg.logicalDeviceId, "boundaryPorts": dg.boundaryPorts})
if level, err := log.GetPackageLogLevel(); err == nil && level == log.DebugLevel {
output := ""
routeNumber := 1
@@ -197,7 +201,11 @@
output += fmt.Sprintf("%d:{%s=>%s} ", routeNumber, key, fmt.Sprintf("[%s]", val))
routeNumber += 1
}
- log.Debugw("graph_routes", log.Fields{"lDeviceId": dg.logicalDeviceId, "Routes": output})
+ if len(dg.Routes) == 0 {
+ log.Debugw("no-routes-found", log.Fields{"lDeviceId": dg.logicalDeviceId, "Graph": dg.GGraph.String()})
+ } else {
+ log.Debugw("graph_routes", log.Fields{"lDeviceId": dg.logicalDeviceId, "Routes": output})
+ }
}
return nil
}
@@ -205,14 +213,16 @@
//getDevice returns the device either from the local cache (default) or from the model.
//TODO: Set a cache timeout such that we do not use invalid data. The full device lifecycle should also
//be taken in consideration
-func (dg *DeviceGraph) getDevice(id string) (*voltha.Device, error) {
- dg.cachedDevicesLock.RLock()
- if d, exist := dg.cachedDevices[id]; exist {
+func (dg *DeviceGraph) getDevice(id string, useCache bool) (*voltha.Device, error) {
+ if useCache {
+ dg.cachedDevicesLock.RLock()
+ if d, exist := dg.cachedDevices[id]; exist {
+ dg.cachedDevicesLock.RUnlock()
+ //log.Debugw("getDevice - returned from cache", log.Fields{"deviceId": id})
+ return d, nil
+ }
dg.cachedDevicesLock.RUnlock()
- //log.Debugw("getDevice - returned from cache", log.Fields{"deviceId": id})
- return d, nil
}
- dg.cachedDevicesLock.RUnlock()
// Not cached
if d, err := dg.getDeviceFromModel(id); err != nil {
log.Errorw("device-not-found", log.Fields{"deviceId": id, "error": err})
@@ -251,13 +261,13 @@
}
for _, peer := range port.Peers {
if _, exist := (*devicesAdded)[peer.DeviceId]; !exist {
- d, _ := dg.getDevice(peer.DeviceId)
+ d, _ := dg.getDevice(peer.DeviceId, true)
g = dg.addDevice(d, g, devicesAdded, portsAdded, boundaryPorts)
- } else {
- peerPortId = concatDeviceIdPortId(peer.DeviceId, peer.PortNo)
- g.AddEdge(goraph.StringID(portId), goraph.StringID(peerPortId), 1)
- g.AddEdge(goraph.StringID(peerPortId), goraph.StringID(portId), 1)
}
+ peerPortId = concatDeviceIdPortId(peer.DeviceId, peer.PortNo)
+ g.AddEdge(goraph.StringID(portId), goraph.StringID(peerPortId), 1)
+ g.AddEdge(goraph.StringID(peerPortId), goraph.StringID(portId), 1)
+
}
}
return g
diff --git a/vendor/github.com/opencord/voltha-go/rw_core/utils/core_utils.go b/vendor/github.com/opencord/voltha-go/rw_core/utils/core_utils.go
index 1e1ed9f..cf77d59 100644
--- a/vendor/github.com/opencord/voltha-go/rw_core/utils/core_utils.go
+++ b/vendor/github.com/opencord/voltha-go/rw_core/utils/core_utils.go
@@ -15,6 +15,13 @@
*/
package utils
+import (
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "reflect"
+ "time"
+)
+
type DeviceID struct {
Id string
}
@@ -22,3 +29,61 @@
type LogicalDeviceID struct {
Id string
}
+
+//WaitForNilOrErrorResponses waits on a variadic number of channels for either a nil response or an error
+//response. If an error is received from a given channel then the returned error array will contain that error.
+//The error will be at the index corresponding to the order in which the channel appear in the parameter list.
+//If no errors is found then nil is returned. This method also takes in a timeout in milliseconds. If a
+//timeout is obtained then this function will stop waiting for the remaining responses and abort.
+func WaitForNilOrErrorResponses(timeout int64, chnls ...chan interface{}) []error {
+ // Create a timeout channel
+ tChnl := make(chan *interface{})
+ go func() {
+ time.Sleep(time.Duration(timeout) * time.Millisecond)
+ tChnl <- nil
+ }()
+
+ errorsReceived := false
+ errors := make([]error, len(chnls))
+ cases := make([]reflect.SelectCase, len(chnls)+1)
+ for i, ch := range chnls {
+ cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}
+ }
+ // Add the timeout channel
+ cases[len(chnls)] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(tChnl)}
+
+ resultsReceived := make([]bool, len(errors)+1)
+ remaining := len(cases) - 1
+ for remaining > 0 {
+ index, value, ok := reflect.Select(cases)
+ if !ok { // closed channel
+ //Set the channel at that index to nil to disable this case, hence preventing it from interfering with other cases.
+ cases[index].Chan = reflect.ValueOf(nil)
+ errors[index] = status.Errorf(codes.Internal, "channel closed")
+ errorsReceived = true
+ } else if index == len(chnls) { // Timeout has occurred
+ for k := range errors {
+ if !resultsReceived[k] {
+ errors[k] = status.Errorf(codes.Aborted, "timeout")
+ }
+ }
+ errorsReceived = true
+ break
+ } else if value.IsNil() { // Nil means a good response
+ //do nothing
+ } else if err, ok := value.Interface().(error); ok { // error returned
+ errors[index] = err
+ errorsReceived = true
+ } else { // unknown value
+ errors[index] = status.Errorf(codes.Internal, "%s", value)
+ errorsReceived = true
+ }
+ resultsReceived[index] = true
+ remaining -= 1
+ }
+
+ if errorsReceived {
+ return errors
+ }
+ return nil
+}
diff --git a/vendor/github.com/opencord/voltha-go/rw_core/utils/flow_utils.go b/vendor/github.com/opencord/voltha-go/rw_core/utils/flow_utils.go
index 10be81a..0c485bb 100644
--- a/vendor/github.com/opencord/voltha-go/rw_core/utils/flow_utils.go
+++ b/vendor/github.com/opencord/voltha-go/rw_core/utils/flow_utils.go
@@ -172,7 +172,9 @@
func (dr *DeviceRules) Copy() *DeviceRules {
copyDR := NewDeviceRules()
for key, val := range dr.Rules {
- copyDR.Rules[key] = val.Copy()
+ if val != nil {
+ copyDR.Rules[key] = val.Copy()
+ }
}
return copyDR
}
@@ -183,6 +185,16 @@
}
}
+func (dr *DeviceRules) FilterRules(deviceIds map[string]string) *DeviceRules {
+ filteredDR := NewDeviceRules()
+ for key, val := range dr.Rules {
+ if _, exist := deviceIds[key]; exist {
+ filteredDR.Rules[key] = val.Copy()
+ }
+ }
+ return filteredDR
+}
+
func (dr *DeviceRules) AddFlow(deviceId string, flow *ofp.OfpFlowStats) {
if _, exist := dr.Rules[deviceId]; !exist {
dr.Rules[deviceId] = NewFlowsAndGroups()