[VOL-2782] Update go.mod of voltha-go,open-olt-adapter,ofagent-go and voltctl to be compatible with latest version of voltha-lig-go
Change-Id: Idd1a36b6059927af9a4f3c12581663d587a19176
diff --git a/cmd/openolt-adapter/main.go b/cmd/openolt-adapter/main.go
index 79ce363..318ca7f 100644
--- a/cmd/openolt-adapter/main.go
+++ b/cmd/openolt-adapter/main.go
@@ -102,7 +102,7 @@
// Setup Log Config
cm := conf.NewConfigManager(a.kvClient, a.config.KVStoreType, a.config.KVStoreHost, a.config.KVStorePort, a.config.KVStoreTimeout)
- go conf.ProcessLogConfigChange(cm, ctx)
+ go conf.StartLogLevelConfigProcessing(cm, ctx)
// Setup Kafka Client
if a.kafkaClient, err = newKafkaClient("sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
diff --git a/go.mod b/go.mod
index c97d093..046ca7f 100644
--- a/go.mod
+++ b/go.mod
@@ -7,7 +7,7 @@
github.com/cenkalti/backoff/v3 v3.1.1
github.com/gogo/protobuf v1.3.1
github.com/golang/protobuf v1.3.2
- github.com/opencord/voltha-lib-go/v3 v3.0.14
+ github.com/opencord/voltha-lib-go/v3 v3.0.23
github.com/opencord/voltha-protos/v3 v3.2.7
go.etcd.io/etcd v0.0.0-20190930204107-236ac2a90522
google.golang.org/grpc v1.25.1
diff --git a/go.sum b/go.sum
index bad9ab4..5f0ae9d 100644
--- a/go.sum
+++ b/go.sum
@@ -196,8 +196,8 @@
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/opencord/voltha-lib-go/v3 v3.0.14 h1:klI6Qt5iA9bTqI42jflSbU6YyEMJRcpaqh6rRw7qNnI=
-github.com/opencord/voltha-lib-go/v3 v3.0.14/go.mod h1:69Y+rVd25Nq2SUeoY7Q1BXtwrcUPllG0erhq+aK8Qec=
+github.com/opencord/voltha-lib-go/v3 v3.0.23 h1:QJkosPdVm85AaLzVEk4umzcL7p2MmwukU9f83rZTIOU=
+github.com/opencord/voltha-lib-go/v3 v3.0.23/go.mod h1:QuAohPQ+InSw+8XgCFxnp4cpHWcxO2efVTtiBFUmuOY=
github.com/opencord/voltha-protos/v3 v3.2.3 h1:Wv73mw1Ye0bCfyhOk5svgrlE2tLizHq6tQluoDq9Vg8=
github.com/opencord/voltha-protos/v3 v3.2.3/go.mod h1:RIGHt7b80BHpHh3ceodknh0DxUjUHCWSbYbZqRx7Og0=
github.com/opencord/voltha-protos/v3 v3.2.7 h1:zmRtqAz54FfMdmyHO4lfFE5uBaOKkeOLyac1Qvbpels=
diff --git a/internal/pkg/core/olt_platform_test.go b/internal/pkg/core/olt_platform_test.go
index 9e2ee56..211459f 100644
--- a/internal/pkg/core/olt_platform_test.go
+++ b/internal/pkg/core/olt_platform_test.go
@@ -324,7 +324,7 @@
fu.Output(1),
},
}
- ofpstats := fu.MkFlowStat(fa)
+ ofpstats, _ := fu.MkFlowStat(fa)
type args struct {
flow *ofp.OfpFlowStats
flowDirection string
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index 2890b19..6641e6e 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -231,7 +231,7 @@
fu.Output(1),
},
}
- ofpstats := fu.MkFlowStat(fa)
+ ofpstats, _ := fu.MkFlowStat(fa)
ofpstats.Cookie = ofpstats.Id
lldpFa := &fu.FlowArgs{
KV: fu.OfpFlowModArgs{"priority": 1000, "cookie": 48132224281636694},
@@ -244,7 +244,7 @@
fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
},
}
- lldpofpstats := fu.MkFlowStat(lldpFa)
+ lldpofpstats, _ := fu.MkFlowStat(lldpFa)
//lldpofpstats.Cookie = lldpofpstats.Id
dhcpFa := &fu.FlowArgs{
@@ -259,7 +259,7 @@
fu.Output(uint32(ofp.OfpPortNo_OFPP_CONTROLLER)),
},
}
- dhcpofpstats := fu.MkFlowStat(dhcpFa)
+ dhcpofpstats, _ := fu.MkFlowStat(dhcpFa)
//dhcpofpstats.Cookie = dhcpofpstats.Id
//multicast flow
@@ -275,7 +275,7 @@
fu.Group(1),
},
}
- multicastOfpStats := fu.MkFlowStat(multicastFa)
+ multicastOfpStats, _ := fu.MkFlowStat(multicastFa)
multicastOfpStats.Id = 1
type args struct {
@@ -494,18 +494,18 @@
},
KV: kw6,
}
- ofpstats := fu.MkFlowStat(fa)
- ofpstats2 := fu.MkFlowStat(fa2)
- ofpstats3 := fu.MkFlowStat(fa3)
- ofpstats4 := fu.MkFlowStat(fa4)
- ofpstats5 := fu.MkFlowStat(fa5)
- ofpstats6 := fu.MkFlowStat(fa6)
- ofpstats7 := fu.MkFlowStat(lldpFa)
- ofpstats8 := fu.MkFlowStat(dhcpFa)
- ofpstats9 := fu.MkFlowStat(fa9)
- ofpstats10 := fu.MkFlowStat(fa10)
- igmpstats := fu.MkFlowStat(igmpFa)
- ofpstats11 := fu.MkFlowStat(fa11)
+ ofpstats, _ := fu.MkFlowStat(fa)
+ ofpstats2, _ := fu.MkFlowStat(fa2)
+ ofpstats3, _ := fu.MkFlowStat(fa3)
+ ofpstats4, _ := fu.MkFlowStat(fa4)
+ ofpstats5, _ := fu.MkFlowStat(fa5)
+ ofpstats6, _ := fu.MkFlowStat(fa6)
+ ofpstats7, _ := fu.MkFlowStat(lldpFa)
+ ofpstats8, _ := fu.MkFlowStat(dhcpFa)
+ ofpstats9, _ := fu.MkFlowStat(fa9)
+ ofpstats10, _ := fu.MkFlowStat(fa10)
+ igmpstats, _ := fu.MkFlowStat(igmpFa)
+ ofpstats11, _ := fu.MkFlowStat(fa11)
fmt.Println(ofpstats6, ofpstats9, ofpstats10)
@@ -799,10 +799,10 @@
actionInfo3 := make(map[string]interface{})
classifierInfo4 := make(map[string]interface{})
actionInfo4 := make(map[string]interface{})
- flowState := fu.MkFlowStat(fa)
- flowState2 := fu.MkFlowStat(fa2)
- flowState3 := fu.MkFlowStat(fa3)
- flowState4 := fu.MkFlowStat(fa4)
+ flowState, _ := fu.MkFlowStat(fa)
+ flowState2, _ := fu.MkFlowStat(fa2)
+ flowState3, _ := fu.MkFlowStat(fa3)
+ flowState4, _ := fu.MkFlowStat(fa4)
formulateClassifierInfoFromFlow(classifierInfo, flowState)
formulateClassifierInfoFromFlow(classifierInfo2, flowState2)
formulateClassifierInfoFromFlow(classifierInfo3, flowState3)
@@ -986,7 +986,7 @@
fu.Group(1),
},
}
- ofpStats := fu.MkFlowStat(multicastFlowArgs)
+ ofpStats, _ := fu.MkFlowStat(multicastFlowArgs)
flowMgr.AddFlow(ctx, ofpStats, &voltha.FlowMetadata{})
//add bucket to the group
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/core_proxy_if.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/core_proxy_if.go
index dbf3418..9636a7d 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/core_proxy_if.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/core_proxy_if.go
@@ -18,7 +18,6 @@
import (
"context"
-
"github.com/opencord/voltha-protos/v3/go/voltha"
)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go
index 8f96b22..441c488 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go
@@ -24,9 +24,16 @@
"strings"
)
+func init() {
+ _, err := log.AddPackage(log.JSON, log.FatalLevel, nil)
+ if err != nil {
+ log.Errorw("unable-to-register-package-to-the-log-map", log.Fields{"error": err})
+ }
+}
+
const (
defaultkvStoreConfigPath = "config"
- kvStoreDataPathPrefix = "/service/voltha"
+ kvStoreDataPathPrefix = "service/voltha"
kvStorePathSeparator = "/"
)
@@ -52,6 +59,10 @@
Delete
)
+func (ce ChangeEvent) String() string {
+ return [...]string{"Put", "Delete"}[ce]
+}
+
// ConfigChangeEvent represents config for the events recieved from watch
// For example,ChangeType is Put ,ConfigAttribute default
type ConfigChangeEvent struct {
@@ -102,6 +113,35 @@
}
}
+// RetrieveComponentList list the component Names for which loglevel is stored in kvstore
+func (c *ConfigManager) RetrieveComponentList(ctx context.Context, configType ConfigType) ([]string, error) {
+ data, err := c.backend.List(ctx, c.KvStoreConfigPrefix)
+ if err != nil {
+ return nil, err
+ }
+
+ // Looping through the data recieved from the backend for config
+ // Trimming and Splitting the required key and value from data and storing as componentName,PackageName and Level
+ // For Example, recieved key would be <Backend Prefix Path>/<Config Prefix>/<Component Name>/<Config Type>/default and value \"DEBUG\"
+ // Then in default will be stored as PackageName,componentName as <Component Name> and DEBUG will be stored as value in List struct
+ ccPathPrefix := kvStorePathSeparator + configType.String() + kvStorePathSeparator
+ pathPrefix := kvStoreDataPathPrefix + kvStorePathSeparator + c.KvStoreConfigPrefix + kvStorePathSeparator
+ var list []string
+ keys := make(map[string]interface{})
+ for attr := range data {
+ cname := strings.TrimPrefix(attr, pathPrefix)
+ cName := strings.SplitN(cname, ccPathPrefix, 2)
+ if len(cName) != 2 {
+ continue
+ }
+ if _, exist := keys[cName[0]]; !exist {
+ keys[cName[0]] = nil
+ list = append(list, cName[0])
+ }
+ }
+ return list, nil
+}
+
// Initialize the component config
func (cm *ConfigManager) InitComponentConfig(componentLabel string, configType ConfigType) *ComponentConfig {
@@ -147,8 +187,11 @@
func (c *ComponentConfig) processKVStoreWatchEvents() {
ccKeyPrefix := c.makeConfigPath()
+
log.Debugw("processing-kvstore-event-change", log.Fields{"key-prefix": ccKeyPrefix})
+
ccPathPrefix := c.cManager.backend.PathPrefix + ccKeyPrefix + kvStorePathSeparator
+
for watchResp := range c.kvStoreEventChan {
if watchResp.EventType == kvstore.CONNECTIONDOWN || watchResp.EventType == kvstore.UNKNOWN {
@@ -168,10 +211,30 @@
}
}
+// Retrieves value of a specific config key. Value of key is returned in String format
+func (c *ComponentConfig) Retrieve(ctx context.Context, configKey string) (string, error) {
+ key := c.makeConfigPath() + "/" + configKey
+
+ log.Debugw("retrieving-config", log.Fields{"key": key})
+
+ if kvpair, err := c.cManager.backend.Get(ctx, key); err != nil {
+ return "", err
+ } else {
+ if kvpair == nil {
+ return "", fmt.Errorf("config-key-does-not-exist : %s", key)
+ }
+
+ value := strings.Trim(fmt.Sprintf("%s", kvpair.Value), "\"")
+ log.Debugw("retrieved-config", log.Fields{"key": key, "value": value})
+ return value, nil
+ }
+}
+
func (c *ComponentConfig) RetrieveAll(ctx context.Context) (map[string]string, error) {
key := c.makeConfigPath()
log.Debugw("retreiving-list", log.Fields{"key": key})
+
data, err := c.cManager.backend.List(ctx, key)
if err != nil {
return nil, err
@@ -190,10 +253,10 @@
return res, nil
}
-func (c *ComponentConfig) Save(configKey string, configValue string, ctx context.Context) error {
+func (c *ComponentConfig) Save(ctx context.Context, configKey string, configValue string) error {
key := c.makeConfigPath() + "/" + configKey
- log.Debugw("saving-key", log.Fields{"key": key, "value": configValue})
+ log.Debugw("saving-config", log.Fields{"key": key, "value": configValue})
//save the data for update config
if err := c.cManager.backend.Put(ctx, key, configValue); err != nil {
@@ -202,11 +265,11 @@
return nil
}
-func (c *ComponentConfig) Delete(configKey string, ctx context.Context) error {
+func (c *ComponentConfig) Delete(ctx context.Context, configKey string) error {
//construct key using makeConfigPath
key := c.makeConfigPath() + "/" + configKey
- log.Debugw("deleting-key", log.Fields{"key": key})
+ log.Debugw("deleting-config", log.Fields{"key": key})
//delete the config
if err := c.cManager.backend.Delete(ctx, key); err != nil {
return err
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go
index b45c2c8..65927e6 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go
@@ -14,8 +14,10 @@
* limitations under the License.
*/
-// Package Config provides dynamic logging configuration for specific Voltha component type implemented using backend.The package can be used in following manner
-// Any Voltha component type can start dynamic logging by starting goroutine of ProcessLogConfigChange after starting kvClient for the component.
+// Package Config provides dynamic logging configuration for specific Voltha component with loglevel lookup
+// from etcd kvstore implemented using backend.
+// Any Voltha component can start utilizing dynamic logging by starting goroutine of StartLogLevelConfigProcessing after
+// starting kvClient for the component.
package config
@@ -29,6 +31,12 @@
"strings"
)
+const (
+ defaultLogLevelKey = "default" // kvstore key containing default loglevel
+ globalConfigRootNode = "global" // Root Node in kvstore containing global config
+ initialGlobalDefaultLogLevelValue = "WARN" // Hard-coded Global Default loglevel pushed at PoD startup
+)
+
// ComponentLogController represents a Configuration for Logging Config of specific Voltha component type
// It stores ComponentConfig and GlobalConfig of loglevel config of specific Voltha component type
// For example,ComponentLogController instance will be created for rw-core component
@@ -38,6 +46,7 @@
GlobalConfig *ComponentConfig
configManager *ConfigManager
logHash [16]byte
+ initialLogLevel string // Initial default log level set by helm chart
}
func NewComponentLogController(cm *ConfigManager) (*ComponentLogController, error) {
@@ -48,40 +57,85 @@
return nil, errors.New("Unable to retrieve PoD Component Name from Runtime env")
}
+ var defaultLogLevel string
+ var err error
+ // Retrieve and save default log level; used for fallback if all loglevel config is cleared in etcd
+ if defaultLogLevel, err = log.LogLevelToString(log.GetDefaultLogLevel()); err != nil {
+ defaultLogLevel = "DEBUG"
+ }
+
return &ComponentLogController{
ComponentName: componentName,
componentNameConfig: nil,
GlobalConfig: nil,
configManager: cm,
+ initialLogLevel: defaultLogLevel,
}, nil
}
-// ProcessLogConfigChange initialize component config and global config
-func ProcessLogConfigChange(cm *ConfigManager, ctx context.Context) {
+// StartLogLevelConfigProcessing initialize component config and global config
+// Then, it persists initial default Loglevels into Config Store before
+// starting the loading and processing of all Log Configuration
+func StartLogLevelConfigProcessing(cm *ConfigManager, ctx context.Context) {
cc, err := NewComponentLogController(cm)
if err != nil {
log.Errorw("unable-to-construct-component-log-controller-instance-for-log-config-monitoring", log.Fields{"error": err})
return
}
- log.Debugw("processing-log-config-change", log.Fields{"cc": cc})
-
- cc.GlobalConfig = cm.InitComponentConfig("global", ConfigTypeLogLevel)
+ cc.GlobalConfig = cm.InitComponentConfig(globalConfigRootNode, ConfigTypeLogLevel)
log.Debugw("global-log-config", log.Fields{"cc-global-config": cc.GlobalConfig})
cc.componentNameConfig = cm.InitComponentConfig(cc.ComponentName, ConfigTypeLogLevel)
log.Debugw("component-log-config", log.Fields{"cc-component-name-config": cc.componentNameConfig})
+ cc.persistInitialDefaultLogConfigs(ctx)
+
cc.processLogConfig(ctx)
}
-// ProcessLogConfig wait on componentn config and global config channel for any changes
-// Event channel will be recieved from backend for valid change type
+// Method to persist Global default loglevel into etcd, if not set yet
+// It also checks and set Component default loglevel into etcd with initial loglevel set from command line
+func (c *ComponentLogController) persistInitialDefaultLogConfigs(ctx context.Context) {
+
+ _, err := c.GlobalConfig.Retrieve(ctx, defaultLogLevelKey)
+ if err != nil {
+ log.Debugw("failed-to-retrieve-global-default-log-config-at-startup", log.Fields{"error": err})
+
+ err = c.GlobalConfig.Save(ctx, defaultLogLevelKey, initialGlobalDefaultLogLevelValue)
+ if err != nil {
+ log.Errorw("failed-to-persist-global-default-log-config-at-startup", log.Fields{"error": err, "loglevel": initialGlobalDefaultLogLevelValue})
+ }
+ }
+
+ _, err = c.componentNameConfig.Retrieve(ctx, defaultLogLevelKey)
+ if err != nil {
+ log.Debugw("failed-to-retrieve-component-default-log-config-at-startup", log.Fields{"error": err})
+
+ err = c.componentNameConfig.Save(ctx, defaultLogLevelKey, c.initialLogLevel)
+ if err != nil {
+ log.Errorw("failed-to-persist-component-default-log-config-at-startup", log.Fields{"error": err, "loglevel": c.initialLogLevel})
+ }
+ }
+}
+
+// ProcessLogConfig will first load and apply log config and then start waiting on component config and global config
+// channels for any changes. Event channel will be recieved from backend for valid change type
// Then data for componentn log config and global log config will be retrieved from backend and stored in updatedLogConfig in precedence order
// If any changes in updatedLogConfig will be applied on component
func (c *ComponentLogController) processLogConfig(ctx context.Context) {
+ // Load and apply Log Config for first time
+ initialLogConfig, err := c.buildUpdatedLogConfig(ctx)
+ if err != nil {
+ log.Warnw("unable-to-load-log-config-at-startup", log.Fields{"error": err})
+ } else {
+ if err := c.loadAndApplyLogConfig(initialLogConfig); err != nil {
+ log.Warnw("unable-to-apply-log-config-at-startup", log.Fields{"error": err})
+ }
+ }
+
componentConfigEventChan := c.componentNameConfig.MonitorForConfigChange(ctx)
globalConfigEventChan := c.GlobalConfig.MonitorForConfigChange(ctx)
@@ -94,7 +148,7 @@
case configEvent = <-componentConfigEventChan:
}
- log.Debugw("processing-log-config-change", log.Fields{"config-event": configEvent})
+ log.Debugw("processing-log-config-change", log.Fields{"ChangeType": configEvent.ChangeType, "Package": configEvent.ConfigAttribute})
updatedLogConfig, err := c.buildUpdatedLogConfig(ctx)
if err != nil {
@@ -112,12 +166,12 @@
}
// get active loglevel from the zap logger
-func getActiveLogLevel() map[string]string {
- loglevel := make(map[string]string)
+func getActiveLogLevels() map[string]string {
+ loglevels := make(map[string]string)
// now do the default log level
if level, err := log.LogLevelToString(log.GetDefaultLogLevel()); err == nil {
- loglevel["default"] = level
+ loglevels[defaultLogLevelKey] = level
}
// do the per-package log levels
@@ -125,61 +179,69 @@
level, err := log.GetPackageLogLevel(packageName)
if err != nil {
log.Warnw("unable-to-fetch-current-active-loglevel-for-package-name", log.Fields{"package-name": packageName, "error": err})
+ continue
}
- packagename := strings.ReplaceAll(packageName, "/", "#")
if l, err := log.LogLevelToString(level); err == nil {
- loglevel[packagename] = l
+ loglevels[packageName] = l
}
-
}
- log.Debugw("getting-log-levels-from-zap-logger", log.Fields{"log-level": loglevel})
- return loglevel
+ log.Debugw("retreived-log-levels-from-zap-logger", log.Fields{"loglevels": loglevels})
+
+ return loglevels
}
func (c *ComponentLogController) getGlobalLogConfig(ctx context.Context) (string, error) {
- globalDefaultLogLevel := ""
- globalLogConfig, err := c.GlobalConfig.RetrieveAll(ctx)
+ globalDefaultLogLevel, err := c.GlobalConfig.Retrieve(ctx, defaultLogLevelKey)
if err != nil {
return "", err
}
- if globalLevel, ok := globalLogConfig["default"]; ok {
- if _, err := log.StringToLogLevel(globalLevel); err != nil {
- log.Warnw("unsupported-loglevel-config-defined-at-global-context-pacakge-name", log.Fields{"log-level": globalLevel})
- } else {
- globalDefaultLogLevel = globalLevel
- }
+ // Handle edge cases when global default loglevel is deleted directly from etcd or set to a invalid value
+ // We should use hard-coded initial default value in such cases
+ if globalDefaultLogLevel == "" {
+ log.Warn("global-default-loglevel-not-found-in-config-store")
+ globalDefaultLogLevel = initialGlobalDefaultLogLevelValue
}
- log.Debugw("retrieved-global-log-config", log.Fields{"global-log-config": globalLogConfig})
+
+ if _, err := log.StringToLogLevel(globalDefaultLogLevel); err != nil {
+ log.Warnw("unsupported-loglevel-config-defined-at-global-default", log.Fields{"log-level": globalDefaultLogLevel})
+ globalDefaultLogLevel = initialGlobalDefaultLogLevelValue
+ }
+
+ log.Debugw("retrieved-global-default-loglevel", log.Fields{"level": globalDefaultLogLevel})
return globalDefaultLogLevel, nil
}
-func (c *ComponentLogController) getComponentLogConfig(globalDefaultLogLevel string, ctx context.Context) (map[string]string, error) {
- var defaultPresent bool
+func (c *ComponentLogController) getComponentLogConfig(ctx context.Context, globalDefaultLogLevel string) (map[string]string, error) {
componentLogConfig, err := c.componentNameConfig.RetrieveAll(ctx)
if err != nil {
return nil, err
}
- for componentKey, componentLevel := range componentLogConfig {
- if _, err := log.StringToLogLevel(componentLevel); err != nil || componentKey == "" {
- log.Warnw("unsupported-loglevel-config-defined-at-component-context", log.Fields{"package-name": componentKey, "log-level": componentLevel})
- delete(componentLogConfig, componentKey)
+ effectiveDefaultLogLevel := ""
+ for logConfigKey, logConfigValue := range componentLogConfig {
+ if _, err := log.StringToLogLevel(logConfigValue); err != nil || logConfigKey == "" {
+ log.Warnw("unsupported-loglevel-config-defined-at-component-context", log.Fields{"package-name": logConfigKey, "log-level": logConfigValue})
+ delete(componentLogConfig, logConfigKey)
} else {
- if componentKey == "default" {
- defaultPresent = true
+ if logConfigKey == defaultLogLevelKey {
+ effectiveDefaultLogLevel = componentLogConfig[defaultLogLevelKey]
}
}
}
- if !defaultPresent {
- if globalDefaultLogLevel != "" {
- componentLogConfig["default"] = globalDefaultLogLevel
- }
+
+ // if default loglevel is not configured for the component, component should use
+ // default loglevel configured at global level
+ if effectiveDefaultLogLevel == "" {
+ effectiveDefaultLogLevel = globalDefaultLogLevel
}
+
+ componentLogConfig[defaultLogLevelKey] = effectiveDefaultLogLevel
+
log.Debugw("retrieved-component-log-config", log.Fields{"component-log-level": componentLogConfig})
return componentLogConfig, nil
@@ -194,16 +256,20 @@
func (c *ComponentLogController) buildUpdatedLogConfig(ctx context.Context) (map[string]string, error) {
globalLogLevel, err := c.getGlobalLogConfig(ctx)
if err != nil {
- return nil, err
+ log.Errorw("unable-to-retrieve-global-log-config", log.Fields{"err": err})
}
- componentLogConfig, err := c.getComponentLogConfig(globalLogLevel, ctx)
+ componentLogConfig, err := c.getComponentLogConfig(ctx, globalLogLevel)
if err != nil {
return nil, err
}
- log.Debugw("building-and-updating-log-config", log.Fields{"component-log-config": componentLogConfig})
- return componentLogConfig, nil
+ finalLogConfig := make(map[string]string)
+ for packageName, logLevel := range componentLogConfig {
+ finalLogConfig[strings.ReplaceAll(packageName, "#", "/")] = logLevel
+ }
+
+ return finalLogConfig, nil
}
// load and apply the current configuration for component name
@@ -216,64 +282,70 @@
return err
}
- log.Debugw("loading-and-applying-log-config", log.Fields{"log-config": logConfig})
if c.logHash != currentLogHash {
UpdateLogLevels(logConfig)
c.logHash = currentLogHash
+ } else {
+ log.Debug("effective-loglevel-config-same-as-currently-active")
}
+
return nil
}
-// getDefaultLogLevel to return active default log level
-func getDefaultLogLevel(logConfig map[string]string) string {
+// createModifiedLogLevels loops through the activeLogLevels recieved from zap logger and updatedLogLevels recieved from buildUpdatedLogConfig
+// to identify and create map of modified Log Levels of 2 types:
+// - Packages for which log level has been changed
+// - Packages for which log level config has been cleared - set to default log level
+func createModifiedLogLevels(activeLogLevels, updatedLogLevels map[string]string) map[string]string {
+ defaultLevel := updatedLogLevels[defaultLogLevelKey]
- for key, level := range logConfig {
- if key == "default" {
- return level
- }
- }
- return ""
-}
-
-// createCurrentLogLevel loop through the activeLogLevels recieved from zap logger and updatedLogLevels recieved from buildUpdatedLogConfig
-// The packageName is present or not will be checked in updatedLogLevels ,if the package name is not present then updatedLogLevels will be updated with
-// the packageName and loglevel with default log level
-func createCurrentLogLevel(activeLogLevels, updatedLogLevels map[string]string) map[string]string {
- level := getDefaultLogLevel(updatedLogLevels)
+ modifiedLogLevels := make(map[string]string)
for activeKey, activeLevel := range activeLogLevels {
if _, exist := updatedLogLevels[activeKey]; !exist {
- if level != "" {
- activeLevel = level
+ if activeLevel != defaultLevel {
+ modifiedLogLevels[activeKey] = defaultLevel
}
- updatedLogLevels[activeKey] = activeLevel
+ } else if activeLevel != updatedLogLevels[activeKey] {
+ modifiedLogLevels[activeKey] = updatedLogLevels[activeKey]
}
}
- return updatedLogLevels
+
+ // Log warnings for all invalid packages for which log config has been set
+ for key, value := range updatedLogLevels {
+ if _, exist := activeLogLevels[key]; !exist {
+ log.Warnw("ignoring-loglevel-set-for-invalid-package", log.Fields{"package": key, "log-level": value})
+ }
+ }
+
+ return modifiedLogLevels
}
// updateLogLevels update the loglevels for the component
// retrieve active confguration from logger
// compare with entries one by one and apply
-func UpdateLogLevels(logLevel map[string]string) {
+func UpdateLogLevels(updatedLogConfig map[string]string) {
- activeLogLevels := getActiveLogLevel()
- currentLogLevel := createCurrentLogLevel(activeLogLevels, logLevel)
- for key, level := range currentLogLevel {
- if key == "default" {
+ activeLogLevels := getActiveLogLevels()
+ changedLogLevels := createModifiedLogLevels(activeLogLevels, updatedLogConfig)
+
+ // If no changed log levels are found, just return. It may happen on configuration of a invalid package
+ if len(changedLogLevels) == 0 {
+ log.Debug("no-change-in-effective-loglevel-config")
+ return
+ }
+
+ log.Debugw("applying-log-level-for-modified-packages", log.Fields{"changed-log-levels": changedLogLevels})
+ for key, level := range changedLogLevels {
+ if key == defaultLogLevelKey {
if l, err := log.StringToLogLevel(level); err == nil {
log.SetDefaultLogLevel(l)
}
} else {
- pname := strings.ReplaceAll(key, "#", "/")
- if _, err := log.AddPackage(log.JSON, log.DebugLevel, nil, pname); err != nil {
- log.Warnw("unable-to-add-log-package", log.Fields{"package-name": pname, "error": err})
- }
if l, err := log.StringToLogLevel(level); err == nil {
- log.SetPackageLogLevel(pname, l)
+ log.SetPackageLogLevel(key, l)
}
}
}
- log.Debugw("updated-log-level", log.Fields{"current-log-level": currentLogLevel})
}
// generate md5 hash of key value pairs appended into a single string
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
index 04fe35d..faa86ed 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
@@ -222,7 +222,7 @@
defer b.Unlock()
formattedPath := b.makePath(key)
- logger.Debugw("putting-key", log.Fields{"key": key, "value": string(value.([]byte)), "path": formattedPath})
+ logger.Debugw("putting-key", log.Fields{"key": key, "value": value, "path": formattedPath})
err := b.Client.Put(ctx, formattedPath, value)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
index 1014ada..d38f0f6 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
@@ -29,13 +29,13 @@
// EtcdClient represents the Etcd KV store client
type EtcdClient struct {
- ectdAPI *v3Client.Client
- keyReservations map[string]*v3Client.LeaseID
- watchedChannels sync.Map
- writeLock sync.Mutex
- lockToMutexMap map[string]*v3Concurrency.Mutex
- lockToSessionMap map[string]*v3Concurrency.Session
- lockToMutexLock sync.Mutex
+ ectdAPI *v3Client.Client
+ keyReservations map[string]*v3Client.LeaseID
+ watchedChannels sync.Map
+ keyReservationsLock sync.RWMutex
+ lockToMutexMap map[string]*v3Concurrency.Mutex
+ lockToSessionMap map[string]*v3Concurrency.Session
+ lockToMutexLock sync.Mutex
}
// NewEtcdClient returns a new client for the Etcd KV store
@@ -114,13 +114,13 @@
return fmt.Errorf("unexpected-type-%T", value)
}
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
-
var err error
// Check if there is already a lease for this key - if there is then use it, otherwise a PUT will make
// that KV key permanent instead of automatically removing it after a lease expiration
- if leaseID, ok := c.keyReservations[key]; ok {
+ c.keyReservationsLock.RLock()
+ leaseID, ok := c.keyReservations[key]
+ c.keyReservationsLock.RUnlock()
+ if ok {
_, err = c.ectdAPI.Put(ctx, key, val, v3Client.WithLease(*leaseID))
} else {
_, err = c.ectdAPI.Put(ctx, key, val)
@@ -146,9 +146,6 @@
// wait for a response
func (c *EtcdClient) Delete(ctx context.Context, key string) error {
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
-
// delete the key
if _, err := c.ectdAPI.Delete(ctx, key); err != nil {
logger.Errorw("failed-to-delete-key", log.Fields{"key": key, "error": err})
@@ -177,9 +174,9 @@
return nil, err
}
// Register the lease id
- c.writeLock.Lock()
+ c.keyReservationsLock.Lock()
c.keyReservations[key] = &resp.ID
- c.writeLock.Unlock()
+ c.keyReservationsLock.Unlock()
// Revoke lease if reservation is not successful
reservationSuccessful := false
@@ -235,8 +232,8 @@
// ReleaseAllReservations releases all key reservations previously made (using Reserve API)
func (c *EtcdClient) ReleaseAllReservations(ctx context.Context) error {
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
+ c.keyReservationsLock.Lock()
+ defer c.keyReservationsLock.Unlock()
for key, leaseID := range c.keyReservations {
_, err := c.ectdAPI.Revoke(ctx, *leaseID)
@@ -255,8 +252,8 @@
logger.Debugw("Release-reservation", log.Fields{"key": key})
var ok bool
var leaseID *v3Client.LeaseID
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
+ c.keyReservationsLock.Lock()
+ defer c.keyReservationsLock.Unlock()
if leaseID, ok = c.keyReservations[key]; !ok {
return nil
}
@@ -278,9 +275,11 @@
// Get the leaseid using the key
var ok bool
var leaseID *v3Client.LeaseID
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
- if leaseID, ok = c.keyReservations[key]; !ok {
+ c.keyReservationsLock.RLock()
+ leaseID, ok = c.keyReservations[key]
+ c.keyReservationsLock.RUnlock()
+
+ if !ok {
return errors.New("key-not-reserved")
}
@@ -372,8 +371,6 @@
// Get the array of channels mapping
var watchedChannels []map[chan *Event]v3Client.Watcher
var ok bool
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
if watchedChannels, ok = c.getChannelMaps(key); !ok {
logger.Warnw("key-has-no-watched-channels", log.Fields{"key": key})
@@ -425,8 +422,6 @@
// Close closes the KV store client
func (c *EtcdClient) Close() {
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
if err := c.ectdAPI.Close(); err != nil {
logger.Errorw("error-closing-client", log.Fields{"error": err})
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/flows/flow_utils.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/flows/flow_utils.go
index 4de929f..b2086cd 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/flows/flow_utils.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/flows/flow_utils.go
@@ -18,6 +18,7 @@
import (
"bytes"
"crypto/md5"
+ "errors"
"fmt"
"github.com/cevaris/ordered_map"
"github.com/gogo/protobuf/proto"
@@ -678,9 +679,9 @@
// Return unique 64-bit integer hash for flow covering the following attributes:
// 'table_id', 'priority', 'flags', 'cookie', 'match', '_instruction_string'
-func HashFlowStats(flow *ofp.OfpFlowStats) uint64 {
+func HashFlowStats(flow *ofp.OfpFlowStats) (uint64, error) {
if flow == nil { // Should never happen
- return 0
+ return 0, errors.New("hash-flow-stats-nil-flow")
}
// Create string with the instructions field first
var instructionString bytes.Buffer
@@ -690,19 +691,18 @@
var flowString = fmt.Sprintf("%d%d%d%d%s%s", flow.TableId, flow.Priority, flow.Flags, flow.Cookie, flow.Match.String(), instructionString.String())
h := md5.New()
if _, err := h.Write([]byte(flowString)); err != nil {
- logger.Errorw("hash-flow-status", log.Fields{"error": err})
- return 0
+ return 0, fmt.Errorf("hash-flow-stats-failed-hash: %v", err)
}
hash := big.NewInt(0)
hash.SetBytes(h.Sum(nil))
- return hash.Uint64()
+ return hash.Uint64(), nil
}
// flowStatsEntryFromFlowModMessage maps an ofp_flow_mod message to an ofp_flow_stats message
-func FlowStatsEntryFromFlowModMessage(mod *ofp.OfpFlowMod) *ofp.OfpFlowStats {
+func FlowStatsEntryFromFlowModMessage(mod *ofp.OfpFlowMod) (*ofp.OfpFlowStats, error) {
flow := &ofp.OfpFlowStats{}
if mod == nil {
- return flow
+ return flow, nil
}
flow.TableId = mod.TableId
flow.Priority = mod.Priority
@@ -712,8 +712,12 @@
flow.Cookie = mod.Cookie
flow.Match = mod.Match
flow.Instructions = mod.Instructions
- flow.Id = HashFlowStats(flow)
- return flow
+ var err error
+ if flow.Id, err = HashFlowStats(flow); err != nil {
+ return nil, err
+ }
+
+ return flow, nil
}
func GroupEntryFromGroupMod(mod *ofp.OfpGroupMod) *ofp.OfpGroupEntry {
@@ -913,7 +917,7 @@
}
// MkFlowStat is a helper method to build flows
-func MkFlowStat(fa *FlowArgs) *ofp.OfpFlowStats {
+func MkFlowStat(fa *FlowArgs) (*ofp.OfpFlowStats, error) {
//Build the match-fields
matchFields := make([]*ofp.OfpOxmField, 0)
for _, val := range fa.MatchFields {
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
index 91b2143..5dbde9c 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
@@ -19,6 +19,8 @@
"context"
"errors"
"fmt"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
"reflect"
"strings"
"sync"
@@ -34,7 +36,7 @@
const (
DefaultMaxRetries = 3
- DefaultRequestTimeout = 10000 // 10000 milliseconds - to handle a wider latency range
+ DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
)
const (
@@ -66,6 +68,7 @@
GetDefaultTopic() *Topic
DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error
InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
+ InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error
UnSubscribeFromRequestHandler(topic Topic) error
@@ -246,6 +249,104 @@
return nil
}
+// InvokeAsyncRPC is used to make an RPC request asynchronously
+func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
+ waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
+
+ logger.Debugw("InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
+ // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
+ // typically the device ID.
+ responseTopic := replyToTopic
+ if responseTopic == nil {
+ responseTopic = kp.GetDefaultTopic()
+ }
+
+ chnl := make(chan *RpcResponse)
+
+ go func() {
+
+ // once we're done,
+ // close the response channel
+ defer close(chnl)
+
+ var err error
+ var protoRequest *ic.InterContainerMessage
+
+ // Encode the request
+ protoRequest, err = encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
+ if err != nil {
+ logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
+ chnl <- NewResponse(RpcFormattingError, err, nil)
+ return
+ }
+
+ // Subscribe for response, if needed, before sending request
+ var ch <-chan *ic.InterContainerMessage
+ if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
+ logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
+ chnl <- NewResponse(RpcTransportError, err, nil)
+ return
+ }
+
+ // Send request - if the topic is formatted with a device Id then we will send the request using a
+ // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
+ // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
+ logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
+
+ // if the message is not sent on kafka publish an event an close the channel
+ if err = kp.kafkaClient.Send(protoRequest, toTopic, key); err != nil {
+ chnl <- NewResponse(RpcTransportError, err, nil)
+ return
+ }
+
+ // if the client is not waiting for a response send the ack and close the channel
+ chnl <- NewResponse(RpcSent, nil, nil)
+ if !waitForResponse {
+ return
+ }
+
+ defer func() {
+ // Remove the subscription for a response on return
+ if err := kp.unSubscribeForResponse(protoRequest.Header.Id); err != nil {
+ logger.Warnw("invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
+ }
+ }()
+
+ // Wait for response as well as timeout or cancellation
+ select {
+ case msg, ok := <-ch:
+ if !ok {
+ logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
+ chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
+ }
+ logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
+ if responseBody, err := decodeResponse(msg); err != nil {
+ chnl <- NewResponse(RpcReply, err, nil)
+ } else {
+ if responseBody.Success {
+ chnl <- NewResponse(RpcReply, nil, responseBody.Result)
+ } else {
+ // response body contains an error
+ unpackErr := &ic.Error{}
+ if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
+ chnl <- NewResponse(RpcReply, err, nil)
+ } else {
+ chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
+ }
+ }
+ }
+ case <-ctx.Done():
+ logger.Errorw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
+ err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
+ chnl <- NewResponse(RpcTimeout, err, nil)
+ case <-kp.doneCh:
+ chnl <- NewResponse(RpcSystemClosing, nil, nil)
+ logger.Warnw("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
+ }
+ }()
+ return chnl
+}
+
// InvokeRPC is used to send a request to a given topic
func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
@@ -799,7 +900,8 @@
// Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
// broadcast any message for this topic to all channels waiting on it.
- ch := make(chan *ic.InterContainerMessage)
+ // Set channel size to 1 to prevent deadlock, see VOL-2708
+ ch := make(chan *ic.InterContainerMessage, 1)
kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
return ch, nil
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/utils.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/utils.go
index 0cb9535..bdc615f 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/utils.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/utils.go
@@ -15,7 +15,10 @@
*/
package kafka
-import "strings"
+import (
+ "github.com/golang/protobuf/ptypes/any"
+ "strings"
+)
const (
TopicSeparator = "_"
@@ -36,6 +39,31 @@
Value interface{}
}
+type RpcMType int
+
+const (
+ RpcFormattingError RpcMType = iota
+ RpcSent
+ RpcReply
+ RpcTimeout
+ RpcTransportError
+ RpcSystemClosing
+)
+
+type RpcResponse struct {
+ MType RpcMType
+ Err error
+ Reply *any.Any
+}
+
+func NewResponse(messageType RpcMType, err error, body *any.Any) *RpcResponse {
+ return &RpcResponse{
+ MType: messageType,
+ Err: err,
+ Reply: body,
+ }
+}
+
// TODO: Remove and provide better may to get the device id
// GetDeviceIdFromTopic extract the deviceId from the topic name. The topic name is formatted either as:
// <any string> or <any string>_<deviceId>. The device Id is 24 characters long.
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
index 3ebdd3a..47fa3fb 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
@@ -221,6 +221,7 @@
LevelKey: "level",
MessageKey: "msg",
TimeKey: "ts",
+ CallerKey: "caller",
StacktraceKey: "stacktrace",
LineEnding: zc.DefaultLineEnding,
EncodeLevel: zc.LowercaseLevelEncoder,
@@ -237,7 +238,7 @@
// Build a custom config using zap
cfg = getDefaultConfig(outputType, level, defaultFields)
- l, err := cfg.Build()
+ l, err := cfg.Build(zp.AddCallerSkip(1))
if err != nil {
return nil, err
}
@@ -282,7 +283,7 @@
cfgs[pkgName] = getDefaultConfig(outputType, level, defaultFields)
- l, err := cfgs[pkgName].Build()
+ l, err := cfgs[pkgName].Build(zp.AddCallerSkip(1))
if err != nil {
return nil, err
}
@@ -304,16 +305,14 @@
}
cfg.InitialFields[k] = v
}
- l, err := cfg.Build()
+ l, err := cfg.Build(zp.AddCallerSkip(1))
if err != nil {
return err
}
- loggers[pkgName] = &logger{
- log: l.Sugar(),
- parent: l,
- packageName: pkgName,
- }
+ // Update the existing zap logger instance
+ loggers[pkgName].log = l.Sugar()
+ loggers[pkgName].parent = l
}
return nil
}
@@ -329,19 +328,16 @@
return keys
}
-// UpdateLogger deletes the logger associated with a caller's package and creates a new logger with the
-// defaultFields. If a calling package is holding on to a Logger reference obtained from AddPackage invocation, then
-// that package needs to invoke UpdateLogger if it needs to make changes to the default fields and obtain a new logger
-// reference
-func UpdateLogger(defaultFields Fields) (Logger, error) {
+// UpdateLogger updates the logger associated with a caller's package with supplied defaultFields
+func UpdateLogger(defaultFields Fields) error {
pkgName, _, _, _ := getCallerInfo()
if _, exist := loggers[pkgName]; !exist {
- return nil, fmt.Errorf("package-%s-not-registered", pkgName)
+ return fmt.Errorf("package-%s-not-registered", pkgName)
}
// Build a new logger
if _, exist := cfgs[pkgName]; !exist {
- return nil, fmt.Errorf("config-%s-not-registered", pkgName)
+ return fmt.Errorf("config-%s-not-registered", pkgName)
}
cfg := cfgs[pkgName]
@@ -351,18 +347,16 @@
}
cfg.InitialFields[k] = v
}
- l, err := cfg.Build()
+ l, err := cfg.Build(zp.AddCallerSkip(1))
if err != nil {
- return nil, err
+ return err
}
- // Set the logger
- loggers[pkgName] = &logger{
- log: l.Sugar(),
- parent: l,
- packageName: pkgName,
- }
- return loggers[pkgName], nil
+ // Update the existing zap logger instance
+ loggers[pkgName].log = l.Sugar()
+ loggers[pkgName].parent = l
+
+ return nil
}
func setLevel(cfg zp.Config, level LogLevel) {
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager/ponresourcemanager.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager/ponresourcemanager.go
index 6ed18e9..79fefc5 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager/ponresourcemanager.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager/ponresourcemanager.go
@@ -42,6 +42,16 @@
//Constants for passing command line arugments
OLT_MODEL_ARG = "--olt_model"
PATH_PREFIX = "service/voltha/resource_manager/{%s}"
+
+ /*The path under which configuration data is stored is defined as technology/device agnostic.
+ That means the path does not include any specific technology/device variable. Using technology/device
+ agnostic path also makes northbound applications, that need to write to this path,
+ technology/device agnostic.
+
+ Default kv client of PonResourceManager reads from/writes to PATH_PREFIX defined above.
+ That is why, an additional kv client (named KVStoreForConfig) is defined to read from the config path.
+ */
+ PATH_PREFIX_FOR_CONFIG = "service/voltha/resource_manager/config"
/*The resource ranges for a given device model should be placed
at 'resource_manager/<technology>/resource_ranges/<olt_model_type>'
path on the KV store.
@@ -115,6 +125,9 @@
NUM_OF_PON_INTF = 16
KVSTORE_RETRY_TIMEOUT = 5
+ //Path on the KV store for storing reserved gem ports
+ //Format: reserved_gemport_ids
+ RESERVED_GEMPORT_IDS_PATH = "reserved_gemport_ids"
)
//type ResourceTypeIndex string
@@ -122,15 +135,16 @@
type PONResourceManager struct {
//Implements APIs to initialize/allocate/release alloc/gemport/onu IDs.
- Technology string
- DeviceType string
- DeviceID string
- Backend string // ETCD, or consul
- Host string // host ip of the KV store
- Port int // port number for the KV store
- OLTModel string
- KVStore *db.Backend
- TechProfileMgr tp.TechProfileIf // create object of *tp.TechProfileMgr
+ Technology string
+ DeviceType string
+ DeviceID string
+ Backend string // ETCD, or consul
+ Host string // host ip of the KV store
+ Port int // port number for the KV store
+ OLTModel string
+ KVStore *db.Backend
+ KVStoreForConfig *db.Backend
+ TechProfileMgr tp.TechProfileIf // create object of *tp.TechProfileMgr
// Below attribute, pon_resource_ranges, should be initialized
// by reading from KV store.
@@ -152,7 +166,7 @@
return nil, errors.New("unsupported-kv-store")
}
-func SetKVClient(Technology string, Backend string, Host string, Port int) *db.Backend {
+func SetKVClient(Technology string, Backend string, Host string, Port int, configClient bool) *db.Backend {
addr := Host + ":" + strconv.Itoa(Port)
// TODO : Make sure direct call to NewBackend is working fine with backend , currently there is some
// issue between kv store and backend , core is not calling NewBackend directly
@@ -161,13 +175,21 @@
log.Fatalw("Failed to init KV client\n", log.Fields{"err": err})
return nil
}
+
+ var pathPrefix string
+ if configClient {
+ pathPrefix = PATH_PREFIX_FOR_CONFIG
+ } else {
+ pathPrefix = fmt.Sprintf(PATH_PREFIX, Technology)
+ }
+
kvbackend := &db.Backend{
Client: kvClient,
StoreType: Backend,
Host: Host,
Port: Port,
Timeout: KVSTORE_RETRY_TIMEOUT,
- PathPrefix: fmt.Sprintf(PATH_PREFIX, Technology)}
+ PathPrefix: pathPrefix}
return kvbackend
}
@@ -181,11 +203,17 @@
PONMgr.Backend = Backend
PONMgr.Host = Host
PONMgr.Port = Port
- PONMgr.KVStore = SetKVClient(Technology, Backend, Host, Port)
+ PONMgr.KVStore = SetKVClient(Technology, Backend, Host, Port, false)
if PONMgr.KVStore == nil {
log.Error("KV Client initilization failed")
return nil, errors.New("Failed to init KV client")
}
+ // init kv client to read from the config path
+ PONMgr.KVStoreForConfig = SetKVClient(Technology, Backend, Host, Port, true)
+ if PONMgr.KVStoreForConfig == nil {
+ log.Error("KV Config Client initilization failed")
+ return nil, errors.New("Failed to init KV Config client")
+ }
// Initialize techprofile for this technology
if PONMgr.TechProfileMgr, _ = tp.NewTechProfile(&PONMgr, Backend, Host, Port); PONMgr.TechProfileMgr == nil {
log.Error("Techprofile initialization failed")
@@ -493,7 +521,15 @@
log.Debugf("Resource %s already present in store ", Path)
return nil
} else {
- FormatResult, err := PONRMgr.FormatResource(Intf, StartID, EndID)
+ var excluded []uint32
+ if ResourceType == GEMPORT_ID {
+ //get gem port ids defined in the KV store, if any, and exclude them from the gem port id pool
+ if reservedGemPortIds, defined := PONRMgr.getReservedGemPortIdsFromKVStore(ctx); defined {
+ excluded = reservedGemPortIds
+ log.Debugw("Excluding some ports from GEM port id pool", log.Fields{"excluded gem ports": excluded})
+ }
+ }
+ FormatResult, err := PONRMgr.FormatResource(Intf, StartID, EndID, excluded)
if err != nil {
log.Errorf("Failed to format resource")
return err
@@ -511,12 +547,38 @@
return err
}
-func (PONRMgr *PONResourceManager) FormatResource(IntfID uint32, StartIDx uint32, EndIDx uint32) ([]byte, error) {
+func (PONRMgr *PONResourceManager) getReservedGemPortIdsFromKVStore(ctx context.Context) ([]uint32, bool) {
+ var reservedGemPortIds []uint32
+ // read reserved gem ports from the config path
+ KvPair, err := PONRMgr.KVStoreForConfig.Get(ctx, RESERVED_GEMPORT_IDS_PATH)
+ if err != nil {
+ log.Errorw("Unable to get reserved GEM port ids from the kv store", log.Fields{"err": err})
+ return reservedGemPortIds, false
+ }
+ if KvPair == nil || KvPair.Value == nil {
+ //no reserved gem port defined in the store
+ return reservedGemPortIds, false
+ }
+ Val, err := kvstore.ToByte(KvPair.Value)
+ if err != nil {
+ log.Errorw("Failed to convert reserved gem port ids into byte array", log.Fields{"err": err})
+ return reservedGemPortIds, false
+ }
+ if err = json.Unmarshal(Val, &reservedGemPortIds); err != nil {
+ log.Errorw("Failed to unmarshal reservedGemPortIds", log.Fields{"err": err})
+ return reservedGemPortIds, false
+ }
+ return reservedGemPortIds, true
+}
+
+func (PONRMgr *PONResourceManager) FormatResource(IntfID uint32, StartIDx uint32, EndIDx uint32,
+ Excluded []uint32) ([]byte, error) {
/*
Format resource as json.
:param pon_intf_id: OLT PON interface id
:param start_idx: start index for id pool
:param end_idx: end index for id pool
+ :Id values to be Excluded from the pool
:return dictionary: resource formatted as map
*/
// Format resource as json to be stored in backend store
@@ -534,6 +596,14 @@
log.Error("Failed to create a bitmap")
return nil, errors.New("Failed to create bitmap")
}
+ for _, excludedID := range Excluded {
+ if excludedID < StartIDx || excludedID > EndIDx {
+ log.Warnf("Cannot reserve %d. It must be in the range of [%d, %d]", excludedID,
+ StartIDx, EndIDx)
+ continue
+ }
+ PONRMgr.reserveID(TSData, StartIDx, excludedID)
+ }
Resource[POOL] = TSData.Data(false) //we pass false so as the TSData lib api does not do a copy of the data and return
Value, err := json.Marshal(Resource)
@@ -1147,6 +1217,21 @@
return true
}
+/* Reserves a unique id in the specified resource pool.
+:param Resource: resource used to reserve ID
+:param Id: ID to be reserved
+*/
+func (PONRMgr *PONResourceManager) reserveID(TSData *bitmap.Threadsafe, StartIndex uint32, Id uint32) bool {
+ Data := bitmap.TSFromData(TSData.Data(false), false)
+ if Data == nil {
+ log.Error("Failed to get resource pool")
+ return false
+ }
+ Idx := Id - StartIndex
+ Data.Set(int(Idx), true)
+ return true
+}
+
func (PONRMgr *PONResourceManager) GetTechnology() string {
return PONRMgr.Technology
}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 3b97659..20c4eb0 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -63,7 +63,7 @@
github.com/mitchellh/go-homedir
# github.com/mitchellh/mapstructure v1.1.2
github.com/mitchellh/mapstructure
-# github.com/opencord/voltha-lib-go/v3 v3.0.14
+# github.com/opencord/voltha-lib-go/v3 v3.0.23
github.com/opencord/voltha-lib-go/v3/pkg/adapters
github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif
github.com/opencord/voltha-lib-go/v3/pkg/adapters/common