VOL-4154: Changes to techprofile module for etcd storage improvements.
- using protobuf definitions of techprofile template and instance
- store smaller footprint resource instance on the kv store
- store techprofile instance in cache
- reconcile techprofile instance from resource instance on adapter restart
- retry etcd get/put/delete on failure
- remove dealing of onu-gem-info data from PONResourceManager module
as adapter has to deal with this.
Change-Id: I741181e3f0dc5c4a419ffbed577eb4d21b73c4d6
diff --git a/pkg/adapters/adapterif/adapter_proxy_if.go b/pkg/adapters/adapterif/adapter_proxy_if.go
index 30fcead..c514d6d 100644
--- a/pkg/adapters/adapterif/adapter_proxy_if.go
+++ b/pkg/adapters/adapterif/adapter_proxy_if.go
@@ -33,4 +33,13 @@
toDeviceID string,
proxyDeviceID string,
messageID string) error
+ TechProfileInstanceRequest(ctx context.Context,
+ tpPath string,
+ ponIntfID uint32,
+ onuID uint32,
+ uniID uint32,
+ fromAdapter string,
+ toAdapter string,
+ toDeviceID string,
+ proxyDeviceID string) (*ic.InterAdapterTechProfileDownloadMessage, error)
}
diff --git a/pkg/adapters/common/adapter_proxy.go b/pkg/adapters/common/adapter_proxy.go
index 9ade0d1..fc31041 100644
--- a/pkg/adapters/common/adapter_proxy.go
+++ b/pkg/adapters/common/adapter_proxy.go
@@ -17,14 +17,15 @@
import (
"context"
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db"
+ "google.golang.org/grpc/status"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"github.com/google/uuid"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
)
@@ -103,3 +104,62 @@
logger.Debugw(ctx, "inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
return unPackResponse(ctx, rpc, "", success, result)
}
+
+func (ap *AdapterProxy) TechProfileInstanceRequest(ctx context.Context,
+ tpPath string,
+ parentPonPort uint32,
+ onuID uint32,
+ uniID uint32,
+ fromAdapter string,
+ toAdapter string,
+ toDeviceId string,
+ proxyDeviceId string) (*ic.InterAdapterTechProfileDownloadMessage, error) {
+ logger.Debugw(ctx, "sending-tech-profile-instance-request-message", log.Fields{"from": fromAdapter,
+ "to": toAdapter, "toDevice": toDeviceId, "proxyDevice": proxyDeviceId})
+
+ // Set up the required rpc arguments
+ endpoint, err := ap.endpointMgr.GetEndpoint(ctx, toDeviceId, toAdapter)
+ if err != nil {
+ return nil, err
+ }
+
+ //Build the inter adapter message
+ tpReqMsg := &ic.InterAdapterTechProfileInstanceRequestMessage{
+ TpInstancePath: tpPath,
+ ParentDeviceId: toDeviceId,
+ ParentPonPort: parentPonPort,
+ OnuId: onuID,
+ UniId: uniID,
+ }
+
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "msg",
+ Value: tpReqMsg,
+ }
+
+ topic := kafka.Topic{Name: string(endpoint)}
+ replyToTopic := kafka.Topic{Name: fromAdapter}
+ rpc := "process_tech_profile_instance_request"
+
+ ctx = context.WithValue(ctx, "inter-adapter-tp-req-msg", tpPath)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
+ logger.Debugw(ctx, "inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
+ if success {
+ tpDwnldMsg := &ic.InterAdapterTechProfileDownloadMessage{}
+ if err := ptypes.UnmarshalAny(result, tpDwnldMsg); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, err
+ }
+ return tpDwnldMsg, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw(ctx, "TechProfileInstanceRequest-return", log.Fields{"tpPath": tpPath, "success": success, "error": err})
+
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
+ }
+}
diff --git a/pkg/adapters/common/adapter_proxy_test.go b/pkg/adapters/common/adapter_proxy_test.go
index c73cb0f..2ca87a5 100644
--- a/pkg/adapters/common/adapter_proxy_test.go
+++ b/pkg/adapters/common/adapter_proxy_test.go
@@ -18,9 +18,9 @@
import (
"context"
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
- mocks "github.com/opencord/voltha-lib-go/v4/pkg/mocks/kafka"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
+ mocks "github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
"github.com/opencord/voltha-protos/v4/go/voltha"
"github.com/phayes/freeport"
@@ -75,7 +75,7 @@
adapter.endpointMgr = mocks.NewEndpointManager()
- delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: 1, TpPath: "tpPath", GemPortId: 2}
+ delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: 1, TpInstancePath: "tpPath", GemPortId: 2}
err := adapter.SendInterAdapterMessage(context.TODO(), delGemPortMsg, ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST, "Adapter1", "Adapter2", "testDeviceId", "testProxyDeviceId", "testMessage")
@@ -119,7 +119,7 @@
adapter.endpointMgr = mocks.NewEndpointManager()
- delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: 1, TpPath: "tpPath", GemPortId: 2}
+ delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: 1, TpInstancePath: "tpPath", GemPortId: 2}
err := adapter.SendInterAdapterMessage(context.TODO(), delGemPortMsg, ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST, "Adapter1", "Adapter2", "testDeviceId", "testProxyDeviceId", "")
call := mockKafkaIcProxy.InvokeRpcSpy.Calls[1]
diff --git a/pkg/adapters/common/common.go b/pkg/adapters/common/common.go
index 5d7d7f8..98085bb 100644
--- a/pkg/adapters/common/common.go
+++ b/pkg/adapters/common/common.go
@@ -16,7 +16,7 @@
package common
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/pkg/adapters/common/core_proxy.go b/pkg/adapters/common/core_proxy.go
index 1077226..589d951 100644
--- a/pkg/adapters/common/core_proxy.go
+++ b/pkg/adapters/common/core_proxy.go
@@ -21,8 +21,8 @@
"github.com/golang/protobuf/ptypes"
a "github.com/golang/protobuf/ptypes/any"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
"github.com/opencord/voltha-protos/v4/go/voltha"
"google.golang.org/grpc/codes"
diff --git a/pkg/adapters/common/core_proxy_test.go b/pkg/adapters/common/core_proxy_test.go
index 97a88f5..890b885 100644
--- a/pkg/adapters/common/core_proxy_test.go
+++ b/pkg/adapters/common/core_proxy_test.go
@@ -19,9 +19,9 @@
"context"
"testing"
- adapterIf "github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
- mocks "github.com/opencord/voltha-lib-go/v4/pkg/mocks/kafka"
+ adapterIf "github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
+ mocks "github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
"github.com/opencord/voltha-protos/v4/go/voltha"
"github.com/stretchr/testify/assert"
diff --git a/pkg/adapters/common/request_handler.go b/pkg/adapters/common/request_handler.go
index b6cf1c0..90f575b 100644
--- a/pkg/adapters/common/request_handler.go
+++ b/pkg/adapters/common/request_handler.go
@@ -21,10 +21,10 @@
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
- "github.com/opencord/voltha-lib-go/v4/pkg/adapters"
- "github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/adapters"
+ "github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"github.com/opencord/voltha-protos/v4/go/extension"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
"github.com/opencord/voltha-protos/v4/go/openflow_13"
@@ -559,6 +559,37 @@
return new(empty.Empty), nil
}
+func (rhp *RequestHandlerProxy) Process_tech_profile_instance_request(ctx context.Context, args []*ic.Argument) (*ic.InterAdapterTechProfileDownloadMessage, error) {
+ if len(args) < 2 {
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ iaTpReqMsg := &ic.InterAdapterTechProfileInstanceRequestMessage{}
+ transactionID := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "msg":
+ if err := ptypes.UnmarshalAny(arg.Value, iaTpReqMsg); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+
+ logger.Debugw(ctx, "Process_tech_profile_instance_request", log.Fields{"tpPath": iaTpReqMsg.TpInstancePath})
+
+ //Invoke the tech profile instance request
+ tpInst := rhp.adapter.Process_tech_profile_instance_request(ctx, iaTpReqMsg)
+
+ return tpInst, nil
+}
+
func (rhp *RequestHandlerProxy) Download_image(ctx context.Context, args []*ic.Argument) (*voltha.ImageDownload, error) {
device, image, err := unMarshalImageDowload(args, ctx)
if err != nil {
diff --git a/pkg/adapters/common/utils.go b/pkg/adapters/common/utils.go
index 65b432c..35f227e 100644
--- a/pkg/adapters/common/utils.go
+++ b/pkg/adapters/common/utils.go
@@ -18,7 +18,7 @@
import (
"context"
"fmt"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
"google.golang.org/grpc/codes"
"math/rand"
diff --git a/pkg/adapters/iAdapter.go b/pkg/adapters/iAdapter.go
index fbf2b5d..aca4271 100644
--- a/pkg/adapters/iAdapter.go
+++ b/pkg/adapters/iAdapter.go
@@ -46,6 +46,7 @@
Unsuppress_event(ctx context.Context, filter *voltha.EventFilter) error
Get_ofp_device_info(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error)
Process_inter_adapter_message(ctx context.Context, msg *ic.InterAdapterMessage) error
+ Process_tech_profile_instance_request(ctx context.Context, msg *ic.InterAdapterTechProfileInstanceRequestMessage) *ic.InterAdapterTechProfileDownloadMessage
Download_image(ctx context.Context, device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
Get_image_download_status(ctx context.Context, device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
Cancel_image_download(ctx context.Context, device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
diff --git a/pkg/config/common.go b/pkg/config/common.go
index 294a4bd..606d18c 100644
--- a/pkg/config/common.go
+++ b/pkg/config/common.go
@@ -16,7 +16,7 @@
package config
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/pkg/config/configmanager.go b/pkg/config/configmanager.go
index 8350225..f5efa36 100644
--- a/pkg/config/configmanager.go
+++ b/pkg/config/configmanager.go
@@ -22,9 +22,9 @@
"strings"
"time"
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
- "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
const (
diff --git a/pkg/config/logcontroller.go b/pkg/config/logcontroller.go
index 8187edc..68bfb32 100644
--- a/pkg/config/logcontroller.go
+++ b/pkg/config/logcontroller.go
@@ -26,7 +26,7 @@
"crypto/md5"
"encoding/json"
"errors"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"os"
"sort"
"strings"
diff --git a/pkg/config/logfeaturescontroller.go b/pkg/config/logfeaturescontroller.go
index 353ae5c..95c5bde 100644
--- a/pkg/config/logfeaturescontroller.go
+++ b/pkg/config/logfeaturescontroller.go
@@ -19,7 +19,7 @@
import (
"context"
"errors"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"os"
"strings"
)
diff --git a/pkg/db/backend.go b/pkg/db/backend.go
index bf30a48..ff0b5b7 100644
--- a/pkg/db/backend.go
+++ b/pkg/db/backend.go
@@ -23,8 +23,8 @@
"sync"
"time"
- "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
diff --git a/pkg/db/backend_test.go b/pkg/db/backend_test.go
index a5659e4..7f1d878 100644
--- a/pkg/db/backend_test.go
+++ b/pkg/db/backend_test.go
@@ -23,7 +23,7 @@
"testing"
"time"
- mocks "github.com/opencord/voltha-lib-go/v4/pkg/mocks/etcd"
+ mocks "github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd"
"github.com/phayes/freeport"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
diff --git a/pkg/db/common.go b/pkg/db/common.go
index 25cddf5..4bc92b1 100644
--- a/pkg/db/common.go
+++ b/pkg/db/common.go
@@ -16,7 +16,7 @@
package db
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/pkg/db/kvstore/common.go b/pkg/db/kvstore/common.go
index 99c603d..b8509db 100644
--- a/pkg/db/kvstore/common.go
+++ b/pkg/db/kvstore/common.go
@@ -16,7 +16,7 @@
package kvstore
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/pkg/db/kvstore/etcdclient.go b/pkg/db/kvstore/etcdclient.go
index 98f0559..c2a38c6 100644
--- a/pkg/db/kvstore/etcdclient.go
+++ b/pkg/db/kvstore/etcdclient.go
@@ -22,7 +22,7 @@
"sync"
"time"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
v3Client "go.etcd.io/etcd/clientv3"
v3Concurrency "go.etcd.io/etcd/clientv3/concurrency"
@@ -99,17 +99,43 @@
// wait for a response
func (c *EtcdClient) Get(ctx context.Context, key string) (*KVPair, error) {
- resp, err := c.ectdAPI.Get(ctx, key)
+ attempt := 0
+startLoop:
+ for {
+ resp, err := c.ectdAPI.Get(ctx, key)
+ if err != nil {
+ switch err {
+ case context.Canceled:
+ logger.Warnw(ctx, "context-cancelled", log.Fields{"error": err})
+ case context.DeadlineExceeded:
+ logger.Warnw(ctx, "context-deadline-exceeded", log.Fields{"error": err, "context": ctx})
+ case v3rpcTypes.ErrEmptyKey:
+ logger.Warnw(ctx, "etcd-client-error", log.Fields{"error": err})
+ case v3rpcTypes.ErrLeaderChanged,
+ v3rpcTypes.ErrGRPCNoLeader,
+ v3rpcTypes.ErrTimeout,
+ v3rpcTypes.ErrTimeoutDueToLeaderFail,
+ v3rpcTypes.ErrTimeoutDueToConnectionLost:
+ // Retry for these server errors
+ attempt += 1
+ if er := backoff(ctx, attempt); er != nil {
+ logger.Warnw(ctx, "get-retries-failed", log.Fields{"key": key, "error": er, "attempt": attempt})
+ return nil, err
+ }
+ logger.Warnw(ctx, "retrying-get", log.Fields{"key": key, "error": err, "attempt": attempt})
+ goto startLoop
+ default:
+ logger.Warnw(ctx, "etcd-server-error", log.Fields{"error": err})
+ }
+ return nil, err
+ }
- if err != nil {
- logger.Error(ctx, err)
- return nil, err
+ for _, ev := range resp.Kvs {
+ // Only one value is returned
+ return NewKVPair(string(ev.Key), ev.Value, "", ev.Lease, ev.Version), nil
+ }
+ return nil, nil
}
- for _, ev := range resp.Kvs {
- // Only one value is returned
- return NewKVPair(string(ev.Key), ev.Value, "", ev.Lease, ev.Version), nil
- }
- return nil, nil
}
// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the etcd API
@@ -124,45 +150,87 @@
return fmt.Errorf("unexpected-type-%T", value)
}
- var err error
// Check if there is already a lease for this key - if there is then use it, otherwise a PUT will make
// that KV key permanent instead of automatically removing it after a lease expiration
c.keyReservationsLock.RLock()
leaseID, ok := c.keyReservations[key]
c.keyReservationsLock.RUnlock()
- if ok {
- _, err = c.ectdAPI.Put(ctx, key, val, v3Client.WithLease(*leaseID))
- } else {
- _, err = c.ectdAPI.Put(ctx, key, val)
- }
-
- if err != nil {
- switch err {
- case context.Canceled:
- logger.Warnw(ctx, "context-cancelled", log.Fields{"error": err})
- case context.DeadlineExceeded:
- logger.Warnw(ctx, "context-deadline-exceeded", log.Fields{"error": err})
- case v3rpcTypes.ErrEmptyKey:
- logger.Warnw(ctx, "etcd-client-error", log.Fields{"error": err})
- default:
- logger.Warnw(ctx, "bad-endpoints", log.Fields{"error": err})
+ attempt := 0
+startLoop:
+ for {
+ var err error
+ if ok {
+ _, err = c.ectdAPI.Put(ctx, key, val, v3Client.WithLease(*leaseID))
+ } else {
+ _, err = c.ectdAPI.Put(ctx, key, val)
}
- return err
+ if err != nil {
+ switch err {
+ case context.Canceled:
+ logger.Warnw(ctx, "context-cancelled", log.Fields{"error": err})
+ case context.DeadlineExceeded:
+ logger.Warnw(ctx, "context-deadline-exceeded", log.Fields{"error": err, "context": ctx})
+ case v3rpcTypes.ErrEmptyKey:
+ logger.Warnw(ctx, "etcd-client-error", log.Fields{"error": err})
+ case v3rpcTypes.ErrLeaderChanged,
+ v3rpcTypes.ErrGRPCNoLeader,
+ v3rpcTypes.ErrTimeout,
+ v3rpcTypes.ErrTimeoutDueToLeaderFail,
+ v3rpcTypes.ErrTimeoutDueToConnectionLost:
+ // Retry for these server errors
+ attempt += 1
+ if er := backoff(ctx, attempt); er != nil {
+ logger.Warnw(ctx, "put-retries-failed", log.Fields{"key": key, "error": er, "attempt": attempt})
+ return err
+ }
+ logger.Warnw(ctx, "retrying-put", log.Fields{"key": key, "error": err, "attempt": attempt})
+ goto startLoop
+ default:
+ logger.Warnw(ctx, "etcd-server-error", log.Fields{"error": err})
+ }
+ return err
+ }
+ return nil
}
- return nil
}
// Delete removes a key from the KV store. Timeout defines how long the function will
// wait for a response
func (c *EtcdClient) Delete(ctx context.Context, key string) error {
- // delete the key
- if _, err := c.ectdAPI.Delete(ctx, key); err != nil {
- logger.Errorw(ctx, "failed-to-delete-key", log.Fields{"key": key, "error": err})
- return err
+ attempt := 0
+startLoop:
+ for {
+ _, err := c.ectdAPI.Delete(ctx, key)
+ if err != nil {
+ switch err {
+ case context.Canceled:
+ logger.Warnw(ctx, "context-cancelled", log.Fields{"error": err})
+ case context.DeadlineExceeded:
+ logger.Warnw(ctx, "context-deadline-exceeded", log.Fields{"error": err, "context": ctx})
+ case v3rpcTypes.ErrEmptyKey:
+ logger.Warnw(ctx, "etcd-client-error", log.Fields{"error": err})
+ case v3rpcTypes.ErrLeaderChanged,
+ v3rpcTypes.ErrGRPCNoLeader,
+ v3rpcTypes.ErrTimeout,
+ v3rpcTypes.ErrTimeoutDueToLeaderFail,
+ v3rpcTypes.ErrTimeoutDueToConnectionLost:
+ // Retry for these server errors
+ attempt += 1
+ if er := backoff(ctx, attempt); er != nil {
+ logger.Warnw(ctx, "delete-retries-failed", log.Fields{"key": key, "error": er, "attempt": attempt})
+ return err
+ }
+ logger.Warnw(ctx, "retrying-delete", log.Fields{"key": key, "error": err, "attempt": attempt})
+ goto startLoop
+ default:
+ logger.Warnw(ctx, "etcd-server-error", log.Fields{"error": err})
+ }
+ return err
+ }
+ logger.Debugw(ctx, "key(s)-deleted", log.Fields{"key": key})
+ return nil
}
- logger.Debugw(ctx, "key(s)-deleted", log.Fields{"key": key})
- return nil
}
func (c *EtcdClient) DeleteWithPrefix(ctx context.Context, prefixKey string) error {
diff --git a/pkg/db/kvstore/kvutils.go b/pkg/db/kvstore/kvutils.go
index 70bd977..946dbf2 100644
--- a/pkg/db/kvstore/kvutils.go
+++ b/pkg/db/kvstore/kvutils.go
@@ -17,7 +17,18 @@
import (
"bytes"
+ "context"
"fmt"
+ "math"
+ "math/rand"
+ "time"
+)
+
+const (
+ minRetryInterval = 100
+ maxRetryInterval = 5000
+ incrementalFactor = 1.2
+ jitter = 0.2
)
// ToString converts an interface value to a string. The interface should either be of
@@ -56,3 +67,24 @@
}
return val1 == val2
}
+
+// backoff waits an amount of time that is proportional to the attempt value. The wait time in a range of
+// minRetryInterval and maxRetryInterval.
+func backoff(ctx context.Context, attempt int) error {
+ if attempt == 0 {
+ return nil
+ }
+ backoff := int(minRetryInterval + incrementalFactor*math.Exp(float64(attempt)))
+ backoff *= 1 + int(jitter*(rand.Float64()*2-1))
+ if backoff > maxRetryInterval {
+ backoff = maxRetryInterval
+ }
+ ticker := time.NewTicker(time.Duration(backoff) * time.Millisecond)
+ defer ticker.Stop()
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-ticker.C:
+ }
+ return nil
+}
diff --git a/pkg/db/kvstore/kvutils_test.go b/pkg/db/kvstore/kvutils_test.go
index 98c96c9..5c2ef8c 100644
--- a/pkg/db/kvstore/kvutils_test.go
+++ b/pkg/db/kvstore/kvutils_test.go
@@ -16,8 +16,10 @@
package kvstore
import (
+ "context"
"github.com/stretchr/testify/assert"
"testing"
+ "time"
)
func TestToStringWithString(t *testing.T) {
@@ -58,3 +60,32 @@
assert.Equal(t, expectedResult, actualResult)
assert.NotEqual(t, error, nil)
}
+
+func TestBackoffNoWait(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Millisecond)
+ defer cancel()
+ err := backoff(ctx, 0)
+ assert.Nil(t, err)
+}
+
+func TestBackoffSuccess(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
+ defer cancel()
+ previous := time.Duration(0)
+ for i := 1; i < 5; i++ {
+ start := time.Now()
+ err := backoff(ctx, i)
+ assert.Nil(t, err)
+ current := time.Since(start)
+ assert.Greater(t, current.Milliseconds(), previous.Milliseconds())
+ previous = current
+ }
+}
+
+func TestBackoffContextTimeout(t *testing.T) {
+ ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond)
+ defer cancel()
+ err := backoff(ctx, 10)
+ assert.NotNil(t, err)
+ assert.Equal(t, context.DeadlineExceeded, err)
+}
diff --git a/pkg/events/common.go b/pkg/events/common.go
index 489a493..df3e839 100644
--- a/pkg/events/common.go
+++ b/pkg/events/common.go
@@ -16,7 +16,7 @@
package events
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/pkg/events/events_proxy.go b/pkg/events/events_proxy.go
index 910fec3..19a4f26 100644
--- a/pkg/events/events_proxy.go
+++ b/pkg/events/events_proxy.go
@@ -27,9 +27,9 @@
"time"
"github.com/golang/protobuf/ptypes"
- "github.com/opencord/voltha-lib-go/v4/pkg/events/eventif"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/events/eventif"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"github.com/opencord/voltha-protos/v4/go/voltha"
)
diff --git a/pkg/events/events_proxy_test.go b/pkg/events/events_proxy_test.go
index 119df28..8bd870d 100644
--- a/pkg/events/events_proxy_test.go
+++ b/pkg/events/events_proxy_test.go
@@ -19,9 +19,9 @@
import (
"context"
"github.com/golang/protobuf/ptypes"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- mock_kafka "github.com/opencord/voltha-lib-go/v4/pkg/mocks/kafka"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
+ mock_kafka "github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka"
"github.com/opencord/voltha-protos/v4/go/common"
"github.com/opencord/voltha-protos/v4/go/voltha"
"github.com/stretchr/testify/assert"
diff --git a/pkg/flows/common.go b/pkg/flows/common.go
index fdc93bd..beb0574 100644
--- a/pkg/flows/common.go
+++ b/pkg/flows/common.go
@@ -16,7 +16,7 @@
package flows
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/pkg/flows/flow_utils.go b/pkg/flows/flow_utils.go
index 98fad49..90b6785 100644
--- a/pkg/flows/flow_utils.go
+++ b/pkg/flows/flow_utils.go
@@ -26,7 +26,7 @@
"github.com/cevaris/ordered_map"
"github.com/gogo/protobuf/proto"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
)
diff --git a/pkg/grpc/common.go b/pkg/grpc/common.go
index 5f90da9..cb5480b 100644
--- a/pkg/grpc/common.go
+++ b/pkg/grpc/common.go
@@ -16,7 +16,7 @@
package grpc
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/pkg/grpc/common_test.go b/pkg/grpc/common_test.go
index e6a3533..b7ba6ae 100644
--- a/pkg/grpc/common_test.go
+++ b/pkg/grpc/common_test.go
@@ -16,7 +16,7 @@
package grpc
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
const (
diff --git a/pkg/grpc/server.go b/pkg/grpc/server.go
index ea4573f..38dc308 100644
--- a/pkg/grpc/server.go
+++ b/pkg/grpc/server.go
@@ -21,7 +21,7 @@
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
diff --git a/pkg/kafka/common.go b/pkg/kafka/common.go
index 5db364d..f4d7661 100644
--- a/pkg/kafka/common.go
+++ b/pkg/kafka/common.go
@@ -16,7 +16,7 @@
package kafka
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/pkg/kafka/endpoint_manager.go b/pkg/kafka/endpoint_manager.go
index 796eb72..962b932 100644
--- a/pkg/kafka/endpoint_manager.go
+++ b/pkg/kafka/endpoint_manager.go
@@ -21,8 +21,8 @@
"github.com/buraksezer/consistent"
"github.com/cespare/xxhash"
"github.com/golang/protobuf/proto"
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"github.com/opencord/voltha-protos/v4/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
diff --git a/pkg/kafka/endpoint_manager_test.go b/pkg/kafka/endpoint_manager_test.go
index 0399b4d..ea9c770 100644
--- a/pkg/kafka/endpoint_manager_test.go
+++ b/pkg/kafka/endpoint_manager_test.go
@@ -20,9 +20,9 @@
"fmt"
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- "github.com/opencord/voltha-lib-go/v4/pkg/mocks/etcd"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd"
"github.com/opencord/voltha-protos/v4/go/voltha"
"github.com/phayes/freeport"
"github.com/stretchr/testify/assert"
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index 3af35d7..b149e7d 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -31,7 +31,7 @@
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"github.com/google/uuid"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
"github.com/opentracing/opentracing-go"
)
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index cd6d27b..3273470 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -29,7 +29,7 @@
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/google/uuid"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
)
diff --git a/pkg/meters/common.go b/pkg/meters/common.go
index 0a171f6..e058e48 100644
--- a/pkg/meters/common.go
+++ b/pkg/meters/common.go
@@ -16,7 +16,7 @@
package meters
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/pkg/meters/meter_utils.go b/pkg/meters/meter_utils.go
index 38f35b9..d220c0b 100644
--- a/pkg/meters/meter_utils.go
+++ b/pkg/meters/meter_utils.go
@@ -18,7 +18,7 @@
import (
"context"
"fmt"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
tp_pb "github.com/opencord/voltha-protos/v4/go/tech_profile"
)
diff --git a/pkg/mocks/etcd/common.go b/pkg/mocks/etcd/common.go
index 9f427c0..a874bd1 100644
--- a/pkg/mocks/etcd/common.go
+++ b/pkg/mocks/etcd/common.go
@@ -16,7 +16,7 @@
package etcd
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/pkg/mocks/etcd/etcd_server_test.go b/pkg/mocks/etcd/etcd_server_test.go
index 0c2f882..ba7ed5b 100644
--- a/pkg/mocks/etcd/etcd_server_test.go
+++ b/pkg/mocks/etcd/etcd_server_test.go
@@ -19,8 +19,8 @@
import (
"context"
"fmt"
- "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"github.com/phayes/freeport"
"github.com/stretchr/testify/assert"
"os"
diff --git a/pkg/mocks/kafka/common.go b/pkg/mocks/kafka/common.go
index 6bc3356..313ff9e 100644
--- a/pkg/mocks/kafka/common.go
+++ b/pkg/mocks/kafka/common.go
@@ -16,7 +16,7 @@
package kafka
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/pkg/mocks/kafka/endpoint_manager.go b/pkg/mocks/kafka/endpoint_manager.go
index 68bdcab..5acec69 100644
--- a/pkg/mocks/kafka/endpoint_manager.go
+++ b/pkg/mocks/kafka/endpoint_manager.go
@@ -18,7 +18,7 @@
import (
"context"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
)
type EndpointManager struct{}
diff --git a/pkg/mocks/kafka/kafka_client.go b/pkg/mocks/kafka/kafka_client.go
index 97bb135..92966a7 100644
--- a/pkg/mocks/kafka/kafka_client.go
+++ b/pkg/mocks/kafka/kafka_client.go
@@ -23,8 +23,8 @@
"time"
"github.com/golang/protobuf/proto"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
"github.com/opencord/voltha-protos/v4/go/voltha"
"google.golang.org/grpc/codes"
diff --git a/pkg/mocks/kafka/kafka_client_test.go b/pkg/mocks/kafka/kafka_client_test.go
index 8754313..b9038c3 100644
--- a/pkg/mocks/kafka/kafka_client_test.go
+++ b/pkg/mocks/kafka/kafka_client_test.go
@@ -18,7 +18,7 @@
import (
"context"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
"github.com/stretchr/testify/assert"
"testing"
diff --git a/pkg/mocks/kafka/kafka_inter_container_proxy.go b/pkg/mocks/kafka/kafka_inter_container_proxy.go
index 9bbdf3c..a028aa9 100644
--- a/pkg/mocks/kafka/kafka_inter_container_proxy.go
+++ b/pkg/mocks/kafka/kafka_inter_container_proxy.go
@@ -22,7 +22,7 @@
"github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
)
diff --git a/pkg/ponresourcemanager/common.go b/pkg/ponresourcemanager/common.go
index 1c9a5b1..76207a0 100644
--- a/pkg/ponresourcemanager/common.go
+++ b/pkg/ponresourcemanager/common.go
@@ -16,7 +16,7 @@
package ponresourcemanager
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/pkg/ponresourcemanager/ponresourcemanager.go b/pkg/ponresourcemanager/ponresourcemanager.go
index 70ed8e6..804a6f3 100755
--- a/pkg/ponresourcemanager/ponresourcemanager.go
+++ b/pkg/ponresourcemanager/ponresourcemanager.go
@@ -25,10 +25,10 @@
"time"
bitmap "github.com/boljen/go-bitmap"
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
- "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
- tp "github.com/opencord/voltha-lib-go/v4/pkg/techprofile"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
+ tp "github.com/opencord/voltha-lib-go/v5/pkg/techprofile"
)
const (
@@ -118,10 +118,6 @@
//Format: <device_id>/flow_id_info/<(pon_intf_id, onu_id)><flow_id>
FLOW_ID_INFO_PATH = FLOW_ID_INFO_PATH_PREFIX + "/{%s}/{%d}"
- //path on the kvstore to store onugem info map
- //format: <device-id>/onu_gem_info/<intfid>
- ONU_GEM_INFO_PATH = "{%s}/onu_gem_info/{%d}" // onu_gem/<(intfid)>
-
//Constants for internal usage.
PON_INTF_ID = "pon_intf_id"
START_IDX = "start_idx"
@@ -428,6 +424,43 @@
return err
}
+func (PONRMgr *PONResourceManager) InitDeviceResourcePoolForIntf(ctx context.Context, intfID uint32) error {
+
+ logger.Debug(ctx, "Init resource ranges for intf %d", intfID)
+
+ var err error
+
+ if err = PONRMgr.InitResourceIDPool(ctx, intfID, ONU_ID,
+ PONRMgr.PonResourceRanges[ONU_ID_START_IDX].(uint32),
+ PONRMgr.PonResourceRanges[ONU_ID_END_IDX].(uint32)); err != nil {
+ logger.Error(ctx, "Failed to init ONU ID resource pool")
+ return err
+ }
+
+ if err = PONRMgr.InitResourceIDPool(ctx, intfID, ALLOC_ID,
+ PONRMgr.PonResourceRanges[ALLOC_ID_START_IDX].(uint32),
+ PONRMgr.PonResourceRanges[ALLOC_ID_END_IDX].(uint32)); err != nil {
+ logger.Error(ctx, "Failed to init ALLOC ID resource pool ")
+ return err
+ }
+
+ if err = PONRMgr.InitResourceIDPool(ctx, intfID, GEMPORT_ID,
+ PONRMgr.PonResourceRanges[GEMPORT_ID_START_IDX].(uint32),
+ PONRMgr.PonResourceRanges[GEMPORT_ID_END_IDX].(uint32)); err != nil {
+ logger.Error(ctx, "Failed to init GEMPORT ID resource pool")
+ return err
+ }
+
+ if err = PONRMgr.InitResourceIDPool(ctx, intfID, FLOW_ID,
+ PONRMgr.PonResourceRanges[FLOW_ID_START_IDX].(uint32),
+ PONRMgr.PonResourceRanges[FLOW_ID_END_IDX].(uint32)); err != nil {
+ logger.Error(ctx, "Failed to init FLOW ID resource pool")
+ return err
+ }
+
+ return nil
+}
+
func (PONRMgr *PONResourceManager) ClearDeviceResourcePool(ctx context.Context) error {
//Clear resource pool for all PON ports.
@@ -491,6 +524,33 @@
return nil
}
+func (PONRMgr *PONResourceManager) ClearDeviceResourcePoolForIntf(ctx context.Context, intfID uint32) error {
+
+ logger.Debugf(ctx, "Clear resource ranges for intf %d", intfID)
+
+ if status := PONRMgr.ClearResourceIDPool(ctx, intfID, ONU_ID); !status {
+ logger.Error(ctx, "Failed to clear ONU ID resource pool")
+ return errors.New("Failed to clear ONU ID resource pool")
+ }
+
+ if status := PONRMgr.ClearResourceIDPool(ctx, intfID, ALLOC_ID); !status {
+ logger.Error(ctx, "Failed to clear ALLOC ID resource pool ")
+ return errors.New("Failed to clear ALLOC ID resource pool")
+ }
+
+ if status := PONRMgr.ClearResourceIDPool(ctx, intfID, GEMPORT_ID); !status {
+ logger.Error(ctx, "Failed to clear GEMPORT ID resource pool")
+ return errors.New("Failed to clear GEMPORT ID resource pool")
+ }
+
+ if status := PONRMgr.ClearResourceIDPool(ctx, intfID, FLOW_ID); !status {
+ logger.Error(ctx, "Failed to clear FLOW ID resource pool")
+ return errors.New("Failed to clear FLOW ID resource pool")
+ }
+
+ return nil
+}
+
func (PONRMgr *PONResourceManager) InitResourceIDPool(ctx context.Context, Intf uint32, ResourceType string, StartID uint32, EndID uint32) error {
/*Initialize Resource ID pool for a given Resource Type on a given PON Port
@@ -1305,69 +1365,3 @@
return "", fmt.Errorf("unexpected-type-%T", t)
}
}
-
-func (PONRMgr *PONResourceManager) AddOnuGemInfo(ctx context.Context, intfID uint32, onuGemData interface{}) error {
- /*
- Update onugem info map,
- :param pon_intf_id: reference of PON interface id
- :param onuegmdata: onugem info map
- */
- var Value []byte
- var err error
- Path := fmt.Sprintf(ONU_GEM_INFO_PATH, PONRMgr.DeviceID, intfID)
- Value, err = json.Marshal(onuGemData)
- if err != nil {
- logger.Error(ctx, "failed to Marshal")
- return err
- }
-
- if err = PONRMgr.KVStore.Put(ctx, Path, Value); err != nil {
- logger.Errorf(ctx, "Failed to update resource %s", Path)
- return err
- }
- return err
-}
-
-func (PONRMgr *PONResourceManager) GetOnuGemInfo(ctx context.Context, IntfId uint32, onuGemInfo interface{}) error {
- /*
- Get onugeminfo map from kvstore
- :param intfid: refremce pon intfid
- :param onuGemInfo: onugem info to return from kv strore.
- */
- var Val []byte
-
- path := fmt.Sprintf(ONU_GEM_INFO_PATH, PONRMgr.DeviceID, IntfId)
- value, err := PONRMgr.KVStore.Get(ctx, path)
- if err != nil {
- logger.Errorw(ctx, "Failed to get from kv store", log.Fields{"path": path})
- return err
- } else if value == nil {
- logger.Debug(ctx, "No onuinfo for path", log.Fields{"path": path})
- return nil // returning nil as this could happen if there are no onus for the interface yet
- }
- if Val, err = kvstore.ToByte(value.Value); err != nil {
- logger.Error(ctx, "Failed to convert to byte array")
- return err
- }
-
- if err = json.Unmarshal(Val, &onuGemInfo); err != nil {
- logger.Error(ctx, "Failed to unmarshall")
- return err
- }
- logger.Debugw(ctx, "found onuinfo from path", log.Fields{"path": path, "onuinfo": onuGemInfo})
- return err
-}
-
-func (PONRMgr *PONResourceManager) DelOnuGemInfoForIntf(ctx context.Context, intfId uint32) error {
- /*
- delete onugem info for an interface from kvstore
- :param intfid: refremce pon intfid
- */
-
- path := fmt.Sprintf(ONU_GEM_INFO_PATH, PONRMgr.DeviceID, intfId)
- if err := PONRMgr.KVStore.Delete(ctx, path); err != nil {
- logger.Errorf(ctx, "Falied to remove resource %s", path)
- return err
- }
- return nil
-}
diff --git a/pkg/ponresourcemanager/ponresourcemanager_test.go b/pkg/ponresourcemanager/ponresourcemanager_test.go
index 1bc908e..9363735 100644
--- a/pkg/ponresourcemanager/ponresourcemanager_test.go
+++ b/pkg/ponresourcemanager/ponresourcemanager_test.go
@@ -20,9 +20,9 @@
"context"
"encoding/json"
"errors"
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
- "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"github.com/stretchr/testify/assert"
"strings"
"testing"
diff --git a/pkg/probe/common.go b/pkg/probe/common.go
index d9739af..119d78e 100644
--- a/pkg/probe/common.go
+++ b/pkg/probe/common.go
@@ -16,7 +16,7 @@
package probe
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/pkg/probe/probe.go b/pkg/probe/probe.go
index f13f257..b66f398 100644
--- a/pkg/probe/probe.go
+++ b/pkg/probe/probe.go
@@ -18,7 +18,7 @@
import (
"context"
"fmt"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"net/http"
"sync"
)
diff --git a/pkg/techprofile/SingleQueueEponProfile.json b/pkg/techprofile/SingleQueueEponProfile.json
index 00476a2..4015251 100644
--- a/pkg/techprofile/SingleQueueEponProfile.json
+++ b/pkg/techprofile/SingleQueueEponProfile.json
@@ -58,4 +58,4 @@
}
}
]
-}
\ No newline at end of file
+}
diff --git a/pkg/techprofile/common.go b/pkg/techprofile/common.go
index 544c780..1e89822 100644
--- a/pkg/techprofile/common.go
+++ b/pkg/techprofile/common.go
@@ -16,7 +16,7 @@
package techprofile
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/pkg/techprofile/config.go b/pkg/techprofile/config.go
index 438ea4a..d13a876 100644
--- a/pkg/techprofile/config.go
+++ b/pkg/techprofile/config.go
@@ -17,7 +17,7 @@
import (
"fmt"
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db"
"time"
)
@@ -39,12 +39,12 @@
// Tech profile path prefix in kv store (for TP instances)
defaultKVPathPrefix = "%s/technology_profiles"
+ // Resource instance path prefix in KV store (for Resource Instances)
+ defaultResourceInstancePathPrefix = "%s/resource_instances"
+
// Tech profile path in kv store
defaultTechProfileKVPath = "%s/%d" // <technology>/<tech_profile_tableID>
- // Tech profile instance path in kv store
- // Format: <technology>/<tech_profile_tableID>/<uni_port_name>
- defaultTPInstanceKVPath = "%s/%d/%s"
)
//Tech-Profile JSON String Keys
@@ -98,41 +98,42 @@
// TechprofileFlags represents the set of configurations used
type TechProfileFlags struct {
- KVStoreAddress string
- KVStoreType string
- KVStoreTimeout time.Duration
- KVBackend *db.Backend // this is the backend used to store TP instances
- DefaultTpKVBackend *db.Backend // this is the backend used to read the TP profile
- TPKVPathPrefix string
- defaultTpKvPathPrefix string
- TPFileKVPath string
- TPInstanceKVPath string
- DefaultTPName string
- TPVersion int
- NumGemPorts uint32
- DefaultPbits []string
- LogLevel int
- DefaultTechProfileID uint32
- DefaultNumGemPorts uint32
+ KVStoreAddress string
+ KVStoreType string
+ KVStoreTimeout time.Duration
+ KVBackend *db.Backend // this is the backend used to store TP instances
+ DefaultTpKVBackend *db.Backend // this is the backend used to read the TP profile
+ ResourceInstanceKVBacked *db.Backend // this is the backed used to read/write Resource Instances
+ TPKVPathPrefix string
+ defaultTpKvPathPrefix string
+ TPFileKVPath string
+ ResourceInstanceKVPathPrefix string
+ DefaultTPName string
+ TPVersion uint32
+ NumGemPorts uint32
+ DefaultPbits []string
+ LogLevel int
+ DefaultTechProfileID uint32
+ DefaultNumGemPorts uint32
}
func NewTechProfileFlags(KVStoreType string, KVStoreAddress string, basePathKvStore string) *TechProfileFlags {
// initialize with default values
var techProfileFlags = TechProfileFlags{
- KVBackend: nil,
- KVStoreAddress: KVStoreAddress,
- KVStoreType: KVStoreType,
- KVStoreTimeout: defaultKVStoreTimeout,
- DefaultTPName: defaultTechProfileName,
- TPKVPathPrefix: fmt.Sprintf(defaultKVPathPrefix, basePathKvStore),
- defaultTpKvPathPrefix: defaultTpKvPathPrefix,
- TPVersion: defaultVersion,
- TPFileKVPath: defaultTechProfileKVPath,
- TPInstanceKVPath: defaultTPInstanceKVPath,
- DefaultTechProfileID: DEFAULT_TECH_PROFILE_TABLE_ID,
- DefaultNumGemPorts: defaultGemportsCount,
- DefaultPbits: []string{defaultPbits},
- LogLevel: defaultLogLevel,
+ KVBackend: nil,
+ KVStoreAddress: KVStoreAddress,
+ KVStoreType: KVStoreType,
+ KVStoreTimeout: defaultKVStoreTimeout,
+ DefaultTPName: defaultTechProfileName,
+ TPKVPathPrefix: fmt.Sprintf(defaultKVPathPrefix, basePathKvStore),
+ defaultTpKvPathPrefix: defaultTpKvPathPrefix,
+ TPVersion: defaultVersion,
+ TPFileKVPath: defaultTechProfileKVPath,
+ ResourceInstanceKVPathPrefix: fmt.Sprintf(defaultResourceInstancePathPrefix, basePathKvStore),
+ DefaultTechProfileID: DEFAULT_TECH_PROFILE_TABLE_ID,
+ DefaultNumGemPorts: defaultGemportsCount,
+ DefaultPbits: []string{defaultPbits},
+ LogLevel: defaultLogLevel,
}
return &techProfileFlags
diff --git a/pkg/techprofile/tech_profile.go b/pkg/techprofile/tech_profile.go
index 2d2332d..757118a 100644
--- a/pkg/techprofile/tech_profile.go
+++ b/pkg/techprofile/tech_profile.go
@@ -17,46 +17,35 @@
package techprofile
import (
+ "bytes"
"context"
- "encoding/json"
"errors"
"fmt"
+ "github.com/gogo/protobuf/proto"
+ "github.com/golang/protobuf/jsonpb"
+ "github.com/opencord/voltha-protos/v4/go/openolt"
"regexp"
"strconv"
"sync"
"time"
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db"
- "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
tp_pb "github.com/opencord/voltha-protos/v4/go/tech_profile"
)
// Interface to pon resource manager APIs
type iPonResourceMgr interface {
- GetResourceID(ctx context.Context, IntfID uint32, ResourceType string, NumIDs uint32) ([]uint32, error)
- FreeResourceID(ctx context.Context, IntfID uint32, ResourceType string, ReleaseContent []uint32) error
+ GetResourceID(ctx context.Context, intfID uint32, resourceType string, numIDs uint32) ([]uint32, error)
+ FreeResourceID(ctx context.Context, intfID uint32, resourceType string, ReleaseContent []uint32) error
GetResourceTypeAllocID() string
GetResourceTypeGemPortID() string
GetResourceTypeOnuID() string
GetTechnology() string
}
-type Direction int32
-
-const (
- Direction_UPSTREAM Direction = 0
- Direction_DOWNSTREAM Direction = 1
- Direction_BIDIRECTIONAL Direction = 2
-)
-
-var Direction_name = map[Direction]string{
- 0: "UPSTREAM",
- 1: "DOWNSTREAM",
- 2: "BIDIRECTIONAL",
-}
-
type SchedulingPolicy int32
const (
@@ -65,12 +54,6 @@
SchedulingPolicy_Hybrid SchedulingPolicy = 2
)
-var SchedulingPolicy_name = map[SchedulingPolicy]string{
- 0: "WRR",
- 1: "StrictPriority",
- 2: "Hybrid",
-}
-
type AdditionalBW int32
const (
@@ -80,13 +63,6 @@
AdditionalBW_AdditionalBW_Auto AdditionalBW = 3
)
-var AdditionalBW_name = map[AdditionalBW]string{
- 0: "AdditionalBW_None",
- 1: "AdditionalBW_NA",
- 2: "AdditionalBW_BestEffort",
- 3: "AdditionalBW_Auto",
-}
-
type DiscardPolicy int32
const (
@@ -96,31 +72,9 @@
DiscardPolicy_WRed DiscardPolicy = 3
)
-var DiscardPolicy_name = map[DiscardPolicy]string{
- 0: "TailDrop",
- 1: "WTailDrop",
- 2: "Red",
- 3: "WRed",
-}
-
// Required uniPortName format
-var uniPortNameFormat = regexp.MustCompile(`^olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}$`)
+var uniPortNameFormatRegexp = regexp.MustCompile(`^olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}$`)
-/*
- type InferredAdditionBWIndication int32
-
- const (
- InferredAdditionBWIndication_InferredAdditionBWIndication_None InferredAdditionBWIndication = 0
- InferredAdditionBWIndication_InferredAdditionBWIndication_Assured InferredAdditionBWIndication = 1
- InferredAdditionBWIndication_InferredAdditionBWIndication_BestEffort InferredAdditionBWIndication = 2
- )
-
- var InferredAdditionBWIndication_name = map[int32]string{
- 0: "InferredAdditionBWIndication_None",
- 1: "InferredAdditionBWIndication_Assured",
- 2: "InferredAdditionBWIndication_BestEffort",
- }
-*/
// instance control defaults
const (
defaultOnuInstance = "multi-instance"
@@ -128,14 +82,6 @@
defaultGemPayloadSize = "auto"
)
-const MAX_GEM_PAYLOAD = "max_gem_payload_size"
-
-type InstanceControl struct {
- Onu string `json:"ONU"`
- Uni string `json:"uni"`
- MaxGemPayloadSize string `json:"max_gem_payload_size"`
-}
-
// default discard config constants
const (
defaultMinThreshold = 0
@@ -143,144 +89,23 @@
defaultMaxProbability = 0
)
-type DiscardConfig struct {
- MinThreshold int `json:"min_threshold"`
- MaxThreshold int `json:"max_threshold"`
- MaxProbability int `json:"max_probability"`
-}
-
// default scheduler contants
const (
- defaultAdditionalBw = AdditionalBW_AdditionalBW_BestEffort
- defaultPriority = 0
- defaultWeight = 0
- defaultQueueSchedPolicy = SchedulingPolicy_Hybrid
+ defaultPriority = 0
+ defaultWeight = 0
)
-type Scheduler struct {
- Direction string `json:"direction"`
- AdditionalBw string `json:"additional_bw"`
- Priority uint32 `json:"priority"`
- Weight uint32 `json:"weight"`
- QSchedPolicy string `json:"q_sched_policy"`
-}
-
// default GEM attribute constants
const (
defaultAESEncryption = "True"
defaultPriorityQueue = 0
defaultQueueWeight = 0
defaultMaxQueueSize = "auto"
- defaultdropPolicy = DiscardPolicy_TailDrop
- defaultSchedulePolicy = SchedulingPolicy_WRR
defaultIsMulticast = "False"
defaultAccessControlList = "224.0.0.0-239.255.255.255"
defaultMcastGemID = 4069
)
-type GemPortAttribute struct {
- MaxQueueSize string `json:"max_q_size"`
- PbitMap string `json:"pbit_map"`
- AesEncryption string `json:"aes_encryption"`
- SchedulingPolicy string `json:"scheduling_policy"`
- PriorityQueue uint32 `json:"priority_q"`
- Weight uint32 `json:"weight"`
- DiscardPolicy string `json:"discard_policy"`
- DiscardConfig DiscardConfig `json:"discard_config"`
- IsMulticast string `json:"is_multicast"`
- DControlList string `json:"dynamic_access_control_list"`
- SControlList string `json:"static_access_control_list"`
- McastGemID uint32 `json:"multicast_gem_id"`
-}
-
-// Instance of Scheduler
-type IScheduler struct {
- AllocID uint32 `json:"alloc_id"`
- Direction string `json:"direction"`
- AdditionalBw string `json:"additional_bw"`
- Priority uint32 `json:"priority"`
- Weight uint32 `json:"weight"`
- QSchedPolicy string `json:"q_sched_policy"`
-}
-
-// Instance of GemPortAttribute
-type IGemPortAttribute struct {
- GemportID uint32 `json:"gemport_id"`
- MaxQueueSize string `json:"max_q_size"`
- PbitMap string `json:"pbit_map"`
- AesEncryption string `json:"aes_encryption"`
- SchedulingPolicy string `json:"scheduling_policy"`
- PriorityQueue uint32 `json:"priority_q"`
- Weight uint32 `json:"weight"`
- DiscardPolicy string `json:"discard_policy"`
- DiscardConfig DiscardConfig `json:"discard_config"`
- IsMulticast string `json:"is_multicast"`
- DControlList string `json:"dynamic_access_control_list"`
- SControlList string `json:"static_access_control_list"`
- McastGemID uint32 `json:"multicast_gem_id"`
-}
-
-type TechProfileMgr struct {
- config *TechProfileFlags
- resourceMgr iPonResourceMgr
- OnuIDMgmtLock sync.RWMutex
- GemPortIDMgmtLock sync.RWMutex
- AllocIDMgmtLock sync.RWMutex
-}
-type DefaultTechProfile struct {
- Name string `json:"name"`
- ProfileType string `json:"profile_type"`
- Version int `json:"version"`
- NumGemPorts uint32 `json:"num_gem_ports"`
- InstanceCtrl InstanceControl `json:"instance_control"`
- UsScheduler Scheduler `json:"us_scheduler"`
- DsScheduler Scheduler `json:"ds_scheduler"`
- UpstreamGemPortAttributeList []GemPortAttribute `json:"upstream_gem_port_attribute_list"`
- DownstreamGemPortAttributeList []GemPortAttribute `json:"downstream_gem_port_attribute_list"`
-}
-type TechProfile struct {
- Name string `json:"name"`
- SubscriberIdentifier string `json:"subscriber_identifier"`
- ProfileType string `json:"profile_type"`
- Version int `json:"version"`
- NumGemPorts uint32 `json:"num_gem_ports"`
- InstanceCtrl InstanceControl `json:"instance_control"`
- UsScheduler IScheduler `json:"us_scheduler"`
- DsScheduler IScheduler `json:"ds_scheduler"`
- UpstreamGemPortAttributeList []IGemPortAttribute `json:"upstream_gem_port_attribute_list"`
- DownstreamGemPortAttributeList []IGemPortAttribute `json:"downstream_gem_port_attribute_list"`
-}
-
-// QThresholds struct for EPON
-type QThresholds struct {
- QThreshold1 uint32 `json:"q_threshold1"`
- QThreshold2 uint32 `json:"q_threshold2"`
- QThreshold3 uint32 `json:"q_threshold3"`
- QThreshold4 uint32 `json:"q_threshold4"`
- QThreshold5 uint32 `json:"q_threshold5"`
- QThreshold6 uint32 `json:"q_threshold6"`
- QThreshold7 uint32 `json:"q_threshold7"`
-}
-
-// UpstreamQueueAttribute struct for EPON
-type UpstreamQueueAttribute struct {
- MaxQueueSize string `json:"max_q_size"`
- PbitMap string `json:"pbit_map"`
- AesEncryption string `json:"aes_encryption"`
- TrafficType string `json:"traffic_type"`
- UnsolicitedGrantSize uint32 `json:"unsolicited_grant_size"`
- NominalInterval uint32 `json:"nominal_interval"`
- ToleratedPollJitter uint32 `json:"tolerated_poll_jitter"`
- RequestTransmissionPolicy uint32 `json:"request_transmission_policy"`
- NumQueueSet uint32 `json:"num_q_sets"`
- QThresholds QThresholds `json:"q_thresholds"`
- SchedulingPolicy string `json:"scheduling_policy"`
- PriorityQueue uint32 `json:"priority_q"`
- Weight uint32 `json:"weight"`
- DiscardPolicy string `json:"discard_policy"`
- DiscardConfig DiscardConfig `json:"discard_config"`
-}
-
// Default EPON constants
const (
defaultPakageType = "B"
@@ -303,88 +128,33 @@
defaultQThreshold7 = 0
)
-// DownstreamQueueAttribute struct for EPON
-type DownstreamQueueAttribute struct {
- MaxQueueSize string `json:"max_q_size"`
- PbitMap string `json:"pbit_map"`
- AesEncryption string `json:"aes_encryption"`
- SchedulingPolicy string `json:"scheduling_policy"`
- PriorityQueue uint32 `json:"priority_q"`
- Weight uint32 `json:"weight"`
- DiscardPolicy string `json:"discard_policy"`
- DiscardConfig DiscardConfig `json:"discard_config"`
-}
-
-// iUpstreamQueueAttribute struct for EPON
-type iUpstreamQueueAttribute struct {
- GemportID uint32 `json:"q_id"`
- MaxQueueSize string `json:"max_q_size"`
- PbitMap string `json:"pbit_map"`
- AesEncryption string `json:"aes_encryption"`
- TrafficType string `json:"traffic_type"`
- UnsolicitedGrantSize uint32 `json:"unsolicited_grant_size"`
- NominalInterval uint32 `json:"nominal_interval"`
- ToleratedPollJitter uint32 `json:"tolerated_poll_jitter"`
- RequestTransmissionPolicy uint32 `json:"request_transmission_policy"`
- NumQueueSet uint32 `json:"num_q_sets"`
- QThresholds QThresholds `json:"q_thresholds"`
- SchedulingPolicy string `json:"scheduling_policy"`
- PriorityQueue uint32 `json:"priority_q"`
- Weight uint32 `json:"weight"`
- DiscardPolicy string `json:"discard_policy"`
- DiscardConfig DiscardConfig `json:"discard_config"`
-}
-
-// iDownstreamQueueAttribute struct for EPON
-type iDownstreamQueueAttribute struct {
- GemportID uint32 `json:"q_id"`
- MaxQueueSize string `json:"max_q_size"`
- PbitMap string `json:"pbit_map"`
- AesEncryption string `json:"aes_encryption"`
- SchedulingPolicy string `json:"scheduling_policy"`
- PriorityQueue uint32 `json:"priority_q"`
- Weight uint32 `json:"weight"`
- DiscardPolicy string `json:"discard_policy"`
- DiscardConfig DiscardConfig `json:"discard_config"`
-}
-
-// EponAttribute struct for EPON
-type EponAttribute struct {
- PackageType string `json:"pakage_type"`
-}
-
-// DefaultTechProfile struct for EPON
-type DefaultEponProfile struct {
- Name string `json:"name"`
- ProfileType string `json:"profile_type"`
- Version int `json:"version"`
- NumGemPorts uint32 `json:"num_gem_ports"`
- InstanceCtrl InstanceControl `json:"instance_control"`
- EponAttribute EponAttribute `json:"epon_attribute"`
- UpstreamQueueAttributeList []UpstreamQueueAttribute `json:"upstream_queue_attribute_list"`
- DownstreamQueueAttributeList []DownstreamQueueAttribute `json:"downstream_queue_attribute_list"`
-}
-
-// TechProfile struct for EPON
-type EponProfile struct {
- Name string `json:"name"`
- SubscriberIdentifier string `json:"subscriber_identifier"`
- ProfileType string `json:"profile_type"`
- Version int `json:"version"`
- NumGemPorts uint32 `json:"num_gem_ports"`
- InstanceCtrl InstanceControl `json:"instance_control"`
- EponAttribute EponAttribute `json:"epon_attribute"`
- AllocID uint32 `json:"llid"`
- UpstreamQueueAttributeList []iUpstreamQueueAttribute `json:"upstream_queue_attribute_list"`
- DownstreamQueueAttributeList []iDownstreamQueueAttribute `json:"downstream_queue_attribute_list"`
-}
-
const (
xgspon = "XGS-PON"
+ xgpon = "XGPON"
gpon = "GPON"
epon = "EPON"
)
+const (
+ MaxUniPortPerOnu = 16 // TODO: Adapter uses its own constant for MaxUniPort. How to synchronize this and have a single source of truth?
+)
+
+type TechProfileMgr struct {
+ config *TechProfileFlags
+ resourceMgr iPonResourceMgr
+ OnuIDMgmtLock sync.RWMutex
+ GemPortIDMgmtLock sync.RWMutex
+ AllocIDMgmtLock sync.RWMutex
+ tpInstanceMap map[string]*tp_pb.TechProfileInstance // Map of tp path to tp instance
+ tpInstanceMapLock sync.RWMutex
+ eponTpInstanceMap map[string]*tp_pb.EponTechProfileInstance // Map of tp path to epon tp instance
+ epontpInstanceMapLock sync.RWMutex
+ tpMap map[uint32]*tp_pb.TechProfile // Map of tp id to tp
+ tpMapLock sync.RWMutex
+ eponTpMap map[uint32]*tp_pb.EponTechProfile // map of tp id to epon tp
+ eponTpMapLock sync.RWMutex
+}
+
func (t *TechProfileMgr) SetKVClient(ctx context.Context, pathPrefix string) *db.Backend {
kvClient, err := newKVClient(ctx, t.config.KVStoreType, t.config.KVStoreAddress, t.config.KVStoreTimeout)
if err != nil {
@@ -405,232 +175,516 @@
/* TODO : Make sure direct call to NewBackend is working fine with backend , currently there is some
issue between kv store and backend , core is not calling NewBackend directly
- kv := model.NewBackend(t.config.KVStoreType, t.config.KVStoreHost, t.config.KVStorePort,
+ kv := model.NewBackend(t.config.kvStoreType, t.config.KVStoreHost, t.config.KVStorePort,
t.config.KVStoreTimeout, kvStoreTechProfilePathPrefix)
*/
}
-func newKVClient(ctx context.Context, storeType string, address string, timeout time.Duration) (kvstore.Client, error) {
-
- logger.Infow(ctx, "kv-store", log.Fields{"storeType": storeType, "address": address})
- switch storeType {
- case "etcd":
- return kvstore.NewEtcdClient(ctx, address, timeout, log.WarnLevel)
- }
- return nil, errors.New("unsupported-kv-store")
-}
-
-func NewTechProfile(ctx context.Context, resourceMgr iPonResourceMgr, KVStoreType string, KVStoreAddress string, basePathKvStore string) (*TechProfileMgr, error) {
+func NewTechProfile(ctx context.Context, resourceMgr iPonResourceMgr, kvStoreType string, kvStoreAddress string, basePathKvStore string) (*TechProfileMgr, error) {
var techprofileObj TechProfileMgr
- logger.Debug(ctx, "Initializing techprofile Manager")
- techprofileObj.config = NewTechProfileFlags(KVStoreType, KVStoreAddress, basePathKvStore)
+ logger.Debug(ctx, "initializing-techprofile-mananger")
+ techprofileObj.config = NewTechProfileFlags(kvStoreType, kvStoreAddress, basePathKvStore)
techprofileObj.config.KVBackend = techprofileObj.SetKVClient(ctx, techprofileObj.config.TPKVPathPrefix)
techprofileObj.config.DefaultTpKVBackend = techprofileObj.SetKVClient(ctx, techprofileObj.config.defaultTpKvPathPrefix)
if techprofileObj.config.KVBackend == nil {
- logger.Error(ctx, "Failed to initialize KV backend\n")
- return nil, errors.New("KV backend init failed")
+ logger.Error(ctx, "failed-to-initialize-backend")
+ return nil, errors.New("kv-backend-init-failed")
+ }
+ techprofileObj.config.ResourceInstanceKVBacked = techprofileObj.SetKVClient(ctx, techprofileObj.config.ResourceInstanceKVPathPrefix)
+ if techprofileObj.config.ResourceInstanceKVBacked == nil {
+ logger.Error(ctx, "failed-to-initialize-resource-instance-kv-backend")
+ return nil, errors.New("resource-instance-kv-backend-init-failed")
}
techprofileObj.resourceMgr = resourceMgr
- logger.Debug(ctx, "Initializing techprofile object instance success")
+ techprofileObj.tpInstanceMap = make(map[string]*tp_pb.TechProfileInstance)
+ techprofileObj.eponTpInstanceMap = make(map[string]*tp_pb.EponTechProfileInstance)
+ techprofileObj.tpMap = make(map[uint32]*tp_pb.TechProfile)
+ techprofileObj.eponTpMap = make(map[uint32]*tp_pb.EponTechProfile)
+ logger.Debug(ctx, "reconcile-tp-instance-cache-start")
+ if err := techprofileObj.reconcileTpInstancesToCache(ctx); err != nil {
+ logger.Errorw(ctx, "failed-to-reconcile-tp-instances", log.Fields{"err": err})
+ return nil, err
+ }
+ logger.Debug(ctx, "reconcile-tp-instance-cache-end")
+ logger.Debug(ctx, "initializing-tech-profile-manager-object-success")
return &techprofileObj, nil
}
-func (t *TechProfileMgr) GetTechProfileInstanceKVPath(ctx context.Context, techProfiletblID uint32, uniPortName string) string {
- logger.Debugw(ctx, "get-tp-instance-kv-path", log.Fields{
+// GetTechProfileInstanceKey returns the tp instance key that is used to reference TP Instance Map
+func (t *TechProfileMgr) GetTechProfileInstanceKey(ctx context.Context, tpID uint32, uniPortName string) string {
+ logger.Debugw(ctx, "get-tp-instance-kv-key", log.Fields{
"uniPortName": uniPortName,
- "tpId": techProfiletblID,
+ "tpId": tpID,
})
- return fmt.Sprintf(t.config.TPInstanceKVPath, t.resourceMgr.GetTechnology(), techProfiletblID, uniPortName)
+ // Make sure the uniPortName is as per format olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
+ if !uniPortNameFormatRegexp.Match([]byte(uniPortName)) {
+ logger.Warnw(ctx, "uni-port-name-not-confirming-to-format", log.Fields{"uniPortName": uniPortName})
+ }
+ // The key path prefix (like service/voltha/technology_profiles or service/voltha_voltha/technology_profiles)
+ // is expected to be attached by the components that use this path as part of the KVBackend configuration.
+ resourceInstanceKvPathSuffix := "%s/%d/%s" // <technology>/<tpID>/<uni-port-name>
+ // <uni-port-name> must be of the format pon-{\d+}/onu-{\d+}/uni-{\d+}
+ return fmt.Sprintf(resourceInstanceKvPathSuffix, t.resourceMgr.GetTechnology(), tpID, uniPortName)
}
-func (t *TechProfileMgr) GetTPInstanceFromKVStore(ctx context.Context, techProfiletblID uint32, path string) (interface{}, error) {
- var err error
- var kvResult *kvstore.KVPair
- var KvTpIns TechProfile
- var KvEponIns EponProfile
- var resPtr interface{}
- // For example:
- // tpInstPath like "XGS-PON/64/uni_port_name"
- // is broken into ["XGS-PON" "64" ...]
- pathSlice := regexp.MustCompile(`/`).Split(path, -1)
- switch pathSlice[0] {
- case xgspon, gpon:
- resPtr = &KvTpIns
+// GetTPInstance gets TP instance from cache if found
+func (t *TechProfileMgr) GetTPInstance(ctx context.Context, path string) (interface{}, error) {
+ tech := t.resourceMgr.GetTechnology()
+ switch tech {
+ case xgspon, xgpon, gpon:
+ t.tpInstanceMapLock.RLock()
+ defer t.tpInstanceMapLock.RUnlock()
+ tpInst, ok := t.tpInstanceMap[path]
+ if !ok {
+ return nil, fmt.Errorf("tp-instance-not-found-tp-path-%v", path)
+ }
+ return tpInst, nil
case epon:
- resPtr = &KvEponIns
+ t.epontpInstanceMapLock.RLock()
+ defer t.epontpInstanceMapLock.RUnlock()
+ tpInst, ok := t.eponTpInstanceMap[path]
+ if !ok {
+ return nil, fmt.Errorf("tp-instance-not-found-tp-path-%v", path)
+ }
+ return tpInst, nil
default:
- logger.Errorw(ctx, "unknown-tech", log.Fields{"tech": pathSlice[0]})
- return nil, fmt.Errorf("unknown-tech-%s", pathSlice[0])
+ logger.Errorw(ctx, "unknown-tech", log.Fields{"tech": tech})
+ return nil, fmt.Errorf("unknown-tech-%s-tp-path-%v", tech, path)
}
-
- kvResult, _ = t.config.KVBackend.Get(ctx, path)
- if kvResult == nil {
- logger.Infow(ctx, "tp-instance-not-found-on-kv", log.Fields{"key": path})
- return nil, nil
- } else {
- if value, err := kvstore.ToByte(kvResult.Value); err == nil {
- if err = json.Unmarshal(value, resPtr); err != nil {
- logger.Errorw(ctx, "error-unmarshal-kv-result", log.Fields{"key": path, "value": value})
- return nil, errors.New("error-unmarshal-kv-result")
- } else {
- return resPtr, nil
- }
- }
- }
- return nil, err
}
-func (t *TechProfileMgr) addTechProfInstanceToKVStore(ctx context.Context, techProfiletblID uint32, uniPortName string, tpInstance *TechProfile) error {
- path := t.GetTechProfileInstanceKVPath(ctx, techProfiletblID, uniPortName)
- logger.Debugw(ctx, "Adding techprof instance to kvstore", log.Fields{"key": path, "tpinstance": tpInstance})
- tpInstanceJson, err := json.Marshal(*tpInstance)
- if err == nil {
- // Backend will convert JSON byte array into string format
- logger.Debugw(ctx, "Storing tech profile instance to KV Store", log.Fields{"key": path, "val": tpInstanceJson})
- err = t.config.KVBackend.Put(ctx, path, tpInstanceJson)
- } else {
- logger.Errorw(ctx, "Error in marshaling into Json format", log.Fields{"key": path, "tpinstance": tpInstance})
- }
- return err
-}
+// CreateTechProfileInstance creates a new TP instance.
+func (t *TechProfileMgr) CreateTechProfileInstance(ctx context.Context, tpID uint32, uniPortName string, intfID uint32) (interface{}, error) {
+ var tpInstance *tp_pb.TechProfileInstance
+ var eponTpInstance *tp_pb.EponTechProfileInstance
-func (t *TechProfileMgr) addEponProfInstanceToKVStore(ctx context.Context, techProfiletblID uint32, uniPortName string, tpInstance *EponProfile) error {
- path := t.GetTechProfileInstanceKVPath(ctx, techProfiletblID, uniPortName)
- logger.Debugw(ctx, "Adding techprof instance to kvstore", log.Fields{"key": path, "tpinstance": tpInstance})
- tpInstanceJson, err := json.Marshal(*tpInstance)
- if err == nil {
- // Backend will convert JSON byte array into string format
- logger.Debugw(ctx, "Storing tech profile instance to KV Store", log.Fields{"key": path, "val": tpInstanceJson})
- err = t.config.KVBackend.Put(ctx, path, tpInstanceJson)
- } else {
- logger.Errorw(ctx, "Error in marshaling into Json format", log.Fields{"key": path, "tpinstance": tpInstance})
- }
- return err
-}
+ logger.Infow(ctx, "creating-tp-instance", log.Fields{"tpID": tpID, "uni": uniPortName, "intId": intfID})
-func (t *TechProfileMgr) getTPFromKVStore(ctx context.Context, techProfiletblID uint32) *DefaultTechProfile {
- var kvtechprofile DefaultTechProfile
- key := fmt.Sprintf(t.config.TPFileKVPath, t.resourceMgr.GetTechnology(), techProfiletblID)
- logger.Debugw(ctx, "Getting techprofile from KV store", log.Fields{"techProfiletblID": techProfiletblID, "Key": key})
- kvresult, err := t.config.DefaultTpKVBackend.Get(ctx, key)
- if err != nil {
- logger.Errorw(ctx, "Error while fetching value from KV store", log.Fields{"key": key})
- return nil
- }
- if kvresult != nil {
- /* Backend will return Value in string format,needs to be converted to []byte before unmarshal*/
- if value, err := kvstore.ToByte(kvresult.Value); err == nil {
- if err = json.Unmarshal(value, &kvtechprofile); err != nil {
- logger.Errorw(ctx, "Error unmarshaling techprofile fetched from KV store", log.Fields{"techProfiletblID": techProfiletblID, "error": err, "techprofile_json": value})
- return nil
- }
-
- logger.Debugw(ctx, "Success fetched techprofile from KV store", log.Fields{"techProfiletblID": techProfiletblID, "value": kvtechprofile})
- return &kvtechprofile
- }
- }
- return nil
-}
-
-func (t *TechProfileMgr) getEponTPFromKVStore(ctx context.Context, techProfiletblID uint32) *DefaultEponProfile {
- var kvtechprofile DefaultEponProfile
- key := fmt.Sprintf(t.config.TPFileKVPath, t.resourceMgr.GetTechnology(), techProfiletblID)
- logger.Debugw(ctx, "Getting techprofile from KV store", log.Fields{"techProfiletblID": techProfiletblID, "Key": key})
- kvresult, err := t.config.DefaultTpKVBackend.Get(ctx, key)
- if err != nil {
- logger.Errorw(ctx, "Error while fetching value from KV store", log.Fields{"key": key})
- return nil
- }
- if kvresult != nil {
- /* Backend will return Value in string format,needs to be converted to []byte before unmarshal*/
- if value, err := kvstore.ToByte(kvresult.Value); err == nil {
- if err = json.Unmarshal(value, &kvtechprofile); err != nil {
- logger.Errorw(ctx, "Error unmarshaling techprofile fetched from KV store", log.Fields{"techProfiletblID": techProfiletblID, "error": err, "techprofile_json": value})
- return nil
- }
-
- logger.Debugw(ctx, "Success fetched techprofile from KV store", log.Fields{"techProfiletblID": techProfiletblID, "value": kvtechprofile})
- return &kvtechprofile
- }
- }
- return nil
-}
-
-func (t *TechProfileMgr) CreateTechProfInstance(ctx context.Context, techProfiletblID uint32, uniPortName string, intfId uint32) (interface{}, error) {
- var tpInstance *TechProfile
- var tpEponInstance *EponProfile
-
- logger.Infow(ctx, "creating-tp-instance", log.Fields{"tableid": techProfiletblID, "uni": uniPortName, "intId": intfId})
-
- // Make sure the uniPortName is as per format pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
- if !uniPortNameFormat.Match([]byte(uniPortName)) {
+ // Make sure the uniPortName is as per format olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
+ if !uniPortNameFormatRegexp.Match([]byte(uniPortName)) {
logger.Errorw(ctx, "uni-port-name-not-confirming-to-format", log.Fields{"uniPortName": uniPortName})
- return nil, errors.New("uni-port-name-not-confirming-to-format")
+ return nil, fmt.Errorf("uni-port-name-not-confirming-to-format-%s", uniPortName)
}
- tpInstancePath := t.GetTechProfileInstanceKVPath(ctx, techProfiletblID, uniPortName)
- // For example:
- // tpInstPath like "XGS-PON/64/uni_port_name"
- // is broken into ["XGS-PON" "64" ...]
- pathSlice := regexp.MustCompile(`/`).Split(tpInstancePath, -1)
- if pathSlice[0] == epon {
- tp := t.getEponTPFromKVStore(ctx, techProfiletblID)
+ tpInstancePathSuffix := t.GetTechProfileInstanceKey(ctx, tpID, uniPortName)
+
+ if t.resourceMgr.GetTechnology() == epon {
+ tp := t.getEponTPFromKVStore(ctx, tpID)
if tp != nil {
- if err := t.validateInstanceControlAttr(ctx, tp.InstanceCtrl); err != nil {
- logger.Error(ctx, "invalid-instance-ctrl-attr--using-default-tp")
+ if err := t.validateInstanceControlAttr(ctx, *tp.InstanceControl); err != nil {
+ logger.Error(ctx, "invalid-instance-ctrl-attr-using-default-tp")
tp = t.getDefaultEponProfile(ctx)
} else {
- logger.Infow(ctx, "using-specified-tp-from-kv-store", log.Fields{"tpid": techProfiletblID})
+ logger.Infow(ctx, "using-specified-tp-from-kv-store", log.Fields{"tpID": tpID})
}
} else {
logger.Info(ctx, "tp-not-found-on-kv--creating-default-tp")
tp = t.getDefaultEponProfile(ctx)
}
+ // Store TP in cache
+ t.eponTpMapLock.Lock()
+ t.eponTpMap[tpID] = tp
+ t.eponTpMapLock.Unlock()
- if tpEponInstance = t.allocateEponTPInstance(ctx, uniPortName, tp, intfId, tpInstancePath); tpEponInstance == nil {
- logger.Error(ctx, "tp-intance-allocation-failed")
- return nil, errors.New("tp-intance-allocation-failed")
+ if eponTpInstance = t.allocateEponTPInstance(ctx, uniPortName, tp, intfID, tpInstancePathSuffix); eponTpInstance == nil {
+ logger.Error(ctx, "tp-instance-allocation-failed")
+ return nil, errors.New("tp-instance-allocation-failed")
}
- if err := t.addEponProfInstanceToKVStore(ctx, techProfiletblID, uniPortName, tpEponInstance); err != nil {
- logger.Errorw(ctx, "error-adding-tp-to-kv-store", log.Fields{"tableid": techProfiletblID, "uni": uniPortName})
- return nil, errors.New("error-adding-tp-to-kv-store")
+ t.epontpInstanceMapLock.Lock()
+ t.eponTpInstanceMap[tpInstancePathSuffix] = eponTpInstance
+ t.epontpInstanceMapLock.Unlock()
+ resInst := tp_pb.ResourceInstance{
+ TpId: tpID,
+ ProfileType: eponTpInstance.ProfileType,
+ SubscriberIdentifier: eponTpInstance.SubscriberIdentifier,
+ AllocId: eponTpInstance.AllocId,
}
- logger.Infow(ctx, "tp-added-to-kv-store-successfully",
- log.Fields{"tpid": techProfiletblID, "uni": uniPortName, "intfId": intfId})
- return tpEponInstance, nil
+ for _, usQAttr := range eponTpInstance.UpstreamQueueAttributeList {
+ resInst.GemportIds = append(resInst.GemportIds, usQAttr.GemportId)
+ }
+
+ logger.Infow(ctx, "epon-tp-instance-created-successfully",
+ log.Fields{"tpID": tpID, "uni": uniPortName, "intfID": intfID})
+ if err := t.addResourceInstanceToKVStore(ctx, tpID, uniPortName, resInst); err != nil {
+ logger.Errorw(ctx, "failed-to-update-resource-instance-to-kv-store--freeing-up-resources", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
+ allocIDs := make([]uint32, 0)
+ allocIDs = append(allocIDs, resInst.AllocId)
+ errList := make([]error, 0)
+ errList = append(errList, t.FreeResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), allocIDs))
+ errList = append(errList, t.FreeResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeGemPortID(), resInst.GemportIds))
+ if len(errList) > 0 {
+ logger.Errorw(ctx, "failed-to-free-up-resources-on-kv-store--system-behavior-has-become-erratic", log.Fields{"tpID": tpID, "uniPortName": uniPortName, "errList": errList})
+ }
+ return nil, err
+ }
+ return eponTpInstance, nil
} else {
- tp := t.getTPFromKVStore(ctx, techProfiletblID)
+ tp := t.getTPFromKVStore(ctx, tpID)
if tp != nil {
- if err := t.validateInstanceControlAttr(ctx, tp.InstanceCtrl); err != nil {
+ if err := t.validateInstanceControlAttr(ctx, *tp.InstanceControl); err != nil {
logger.Error(ctx, "invalid-instance-ctrl-attr--using-default-tp")
tp = t.getDefaultTechProfile(ctx)
} else {
- logger.Infow(ctx, "using-specified-tp-from-kv-store", log.Fields{"tpid": techProfiletblID})
+ logger.Infow(ctx, "using-specified-tp-from-kv-store", log.Fields{"tpID": tpID})
}
} else {
logger.Info(ctx, "tp-not-found-on-kv--creating-default-tp")
tp = t.getDefaultTechProfile(ctx)
}
+ // Store TP in cache
+ t.tpMapLock.Lock()
+ t.tpMap[tpID] = tp
+ t.tpMapLock.Unlock()
- if tpInstance = t.allocateTPInstance(ctx, uniPortName, tp, intfId, tpInstancePath); tpInstance == nil {
- logger.Error(ctx, "tp-intance-allocation-failed")
- return nil, errors.New("tp-intance-allocation-failed")
+ if tpInstance = t.allocateTPInstance(ctx, uniPortName, tp, intfID, tpInstancePathSuffix); tpInstance == nil {
+ logger.Error(ctx, "tp-instance-allocation-failed")
+ return nil, errors.New("tp-instance-allocation-failed")
}
- if err := t.addTechProfInstanceToKVStore(ctx, techProfiletblID, uniPortName, tpInstance); err != nil {
- logger.Errorw(ctx, "error-adding-tp-to-kv-store", log.Fields{"tableid": techProfiletblID, "uni": uniPortName})
- return nil, errors.New("error-adding-tp-to-kv-store")
+ t.tpInstanceMapLock.Lock()
+ t.tpInstanceMap[tpInstancePathSuffix] = tpInstance
+ t.tpInstanceMapLock.Unlock()
+
+ resInst := tp_pb.ResourceInstance{
+ TpId: tpID,
+ ProfileType: tpInstance.ProfileType,
+ SubscriberIdentifier: tpInstance.SubscriberIdentifier,
+ AllocId: tpInstance.UsScheduler.AllocId,
}
- logger.Infow(ctx, "tp-added-to-kv-store-successfully",
- log.Fields{"tpid": techProfiletblID, "uni": uniPortName, "intfId": intfId})
+ for _, usQAttr := range tpInstance.UpstreamGemPortAttributeList {
+ resInst.GemportIds = append(resInst.GemportIds, usQAttr.GemportId)
+ }
+
+ logger.Infow(ctx, "tp-instance-created-successfully",
+ log.Fields{"tpID": tpID, "uni": uniPortName, "intfID": intfID})
+ if err := t.addResourceInstanceToKVStore(ctx, tpID, uniPortName, resInst); err != nil {
+ logger.Errorw(ctx, "failed-to-update-resource-instance-to-kv-store--freeing-up-resources", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
+ allocIDs := make([]uint32, 0)
+ allocIDs = append(allocIDs, resInst.AllocId)
+ errList := make([]error, 0)
+ errList = append(errList, t.FreeResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), allocIDs))
+ errList = append(errList, t.FreeResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeGemPortID(), resInst.GemportIds))
+ if len(errList) > 0 {
+ logger.Fatalw(ctx, "failed-to-free-up-resources-on-kv-store--system-behavior-has-become-erratic", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
+ }
+ return nil, err
+ }
+
+ logger.Infow(ctx, "resource-instance-added-to-kv-store-successfully",
+ log.Fields{"tpID": tpID, "uni": uniPortName, "intfID": intfID})
return tpInstance, nil
}
}
-func (t *TechProfileMgr) DeleteTechProfileInstance(ctx context.Context, techProfiletblID uint32, uniPortName string) error {
- path := t.GetTechProfileInstanceKVPath(ctx, techProfiletblID, uniPortName)
- return t.config.KVBackend.Delete(ctx, path)
+// DeleteTechProfileInstance deletes the TP instance from the local cache as well as deletes the corresponding
+// resource instance from the KV store.
+func (t *TechProfileMgr) DeleteTechProfileInstance(ctx context.Context, tpID uint32, uniPortName string) error {
+ // Make sure the uniPortName is as per format olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
+ if !uniPortNameFormatRegexp.Match([]byte(uniPortName)) {
+ logger.Errorw(ctx, "uni-port-name-not-confirming-to-format", log.Fields{"uniPortName": uniPortName})
+ return fmt.Errorf("uni-port-name-not-confirming-to-format--%s", uniPortName)
+ }
+ path := t.GetTechProfileInstanceKey(ctx, tpID, uniPortName)
+ logger.Infow(ctx, "delete-tp-instance-from-cache", log.Fields{"key": path})
+ t.tpInstanceMapLock.Lock()
+ delete(t.tpInstanceMap, path)
+ t.tpInstanceMapLock.Unlock()
+ if err := t.removeResourceInstanceFromKVStore(ctx, tpID, uniPortName); err != nil {
+ return err
+ }
+ return nil
}
-func (t *TechProfileMgr) validateInstanceControlAttr(ctx context.Context, instCtl InstanceControl) error {
+func (t *TechProfileMgr) GetMulticastTrafficQueues(ctx context.Context, tp *tp_pb.TechProfileInstance) []*tp_pb.TrafficQueue {
+ var encryp bool
+ NumGemPorts := len(tp.DownstreamGemPortAttributeList)
+ mcastTrafficQueues := make([]*tp_pb.TrafficQueue, 0)
+ for Count := 0; Count < NumGemPorts; Count++ {
+ if !isMulticastGem(tp.DownstreamGemPortAttributeList[Count].IsMulticast) {
+ continue
+ }
+ if tp.DownstreamGemPortAttributeList[Count].AesEncryption == "True" {
+ encryp = true
+ } else {
+ encryp = false
+ }
+ mcastTrafficQueues = append(mcastTrafficQueues, &tp_pb.TrafficQueue{
+ Direction: tp_pb.Direction_DOWNSTREAM,
+ GemportId: tp.DownstreamGemPortAttributeList[Count].MulticastGemId,
+ PbitMap: tp.DownstreamGemPortAttributeList[Count].PbitMap,
+ AesEncryption: encryp,
+ SchedPolicy: tp.DownstreamGemPortAttributeList[Count].SchedulingPolicy,
+ Priority: tp.DownstreamGemPortAttributeList[Count].PriorityQ,
+ Weight: tp.DownstreamGemPortAttributeList[Count].Weight,
+ DiscardPolicy: tp.DownstreamGemPortAttributeList[Count].DiscardPolicy,
+ })
+ }
+ logger.Debugw(ctx, "Downstream Multicast Traffic queue list ", log.Fields{"queuelist": mcastTrafficQueues})
+ return mcastTrafficQueues
+}
+
+func (t *TechProfileMgr) GetGemportForPbit(ctx context.Context, tp interface{}, dir tp_pb.Direction, pbit uint32) interface{} {
+ /*
+ Function to get the Gemport mapped to a pbit.
+ */
+ switch tp := tp.(type) {
+ case *tp_pb.TechProfileInstance:
+ if dir == tp_pb.Direction_UPSTREAM {
+ // upstream GEM ports
+ numGemPorts := len(tp.UpstreamGemPortAttributeList)
+ for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
+ lenOfPbitMap := len(tp.UpstreamGemPortAttributeList[gemCnt].PbitMap)
+ for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
+ // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
+ // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
+ if p, err := strconv.Atoi(string(tp.UpstreamGemPortAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
+ if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
+ logger.Debugw(ctx, "Found-US-GEMport-for-Pcp", log.Fields{"pbit": pbit, "GEMport": tp.UpstreamGemPortAttributeList[gemCnt].GemportId})
+ return tp.UpstreamGemPortAttributeList[gemCnt]
+ }
+ }
+ }
+ }
+ } else if dir == tp_pb.Direction_DOWNSTREAM {
+ //downstream GEM ports
+ numGemPorts := len(tp.DownstreamGemPortAttributeList)
+ for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
+ lenOfPbitMap := len(tp.DownstreamGemPortAttributeList[gemCnt].PbitMap)
+ for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
+ // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
+ // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
+ if p, err := strconv.Atoi(string(tp.DownstreamGemPortAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
+ if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
+ logger.Debugw(ctx, "Found-DS-GEMport-for-Pcp", log.Fields{"pbit": pbit, "GEMport": tp.DownstreamGemPortAttributeList[gemCnt].GemportId})
+ return tp.DownstreamGemPortAttributeList[gemCnt]
+ }
+ }
+ }
+ }
+ }
+ logger.Errorw(ctx, "No-GemportId-Found-For-Pcp", log.Fields{"pcpVlan": pbit})
+ case *openolt.EponTechProfileInstance:
+ if dir == tp_pb.Direction_UPSTREAM {
+ // upstream GEM ports
+ numGemPorts := len(tp.UpstreamQueueAttributeList)
+ for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
+ lenOfPbitMap := len(tp.UpstreamQueueAttributeList[gemCnt].PbitMap)
+ for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
+ // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
+ // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
+ if p, err := strconv.Atoi(string(tp.UpstreamQueueAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
+ if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
+ logger.Debugw(ctx, "Found-US-Queue-for-Pcp", log.Fields{"pbit": pbit, "Queue": tp.UpstreamQueueAttributeList[gemCnt].GemportId})
+ return tp.UpstreamQueueAttributeList[gemCnt]
+ }
+ }
+ }
+ }
+ } else if dir == tp_pb.Direction_DOWNSTREAM {
+ //downstream GEM ports
+ numGemPorts := len(tp.DownstreamQueueAttributeList)
+ for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
+ lenOfPbitMap := len(tp.DownstreamQueueAttributeList[gemCnt].PbitMap)
+ for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
+ // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
+ // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
+ if p, err := strconv.Atoi(string(tp.DownstreamQueueAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
+ if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
+ logger.Debugw(ctx, "Found-DS-Queue-for-Pcp", log.Fields{"pbit": pbit, "Queue": tp.DownstreamQueueAttributeList[gemCnt].GemportId})
+ return tp.DownstreamQueueAttributeList[gemCnt]
+ }
+ }
+ }
+ }
+ }
+ logger.Errorw(ctx, "No-QueueId-Found-For-Pcp", log.Fields{"pcpVlan": pbit})
+ default:
+ logger.Errorw(ctx, "unknown-tech", log.Fields{"tp": tp})
+ }
+ return nil
+}
+
+// FindAllTpInstances returns all TechProfile instances for a given TechProfile table-id, pon interface ID and onu ID.
+func (t *TechProfileMgr) FindAllTpInstances(ctx context.Context, oltDeviceID string, tpID uint32, intfID uint32, onuID uint32) interface{} {
+ onuTpInstancePathSuffix := fmt.Sprintf("%s/%d/olt-{%s}/pon-{%d}/onu-{%d}", t.resourceMgr.GetTechnology(), tpID, oltDeviceID, intfID, onuID)
+ tech := t.resourceMgr.GetTechnology()
+ if tech == xgspon || tech == xgpon || tech == gpon {
+ t.tpInstanceMapLock.RLock()
+ defer t.tpInstanceMapLock.RUnlock()
+ tpInstancesTech := make([]tp_pb.TechProfileInstance, 0)
+ for i := 0; i < MaxUniPortPerOnu; i++ {
+ key := onuTpInstancePathSuffix + fmt.Sprintf("/uni-{%d}", i)
+ if tpInst, ok := t.tpInstanceMap[key]; ok {
+ tpInstancesTech = append(tpInstancesTech, *tpInst)
+ }
+ }
+ return tpInstancesTech
+ } else if tech == epon {
+ t.epontpInstanceMapLock.RLock()
+ defer t.epontpInstanceMapLock.RUnlock()
+ tpInstancesTech := make([]tp_pb.EponTechProfileInstance, 0)
+ for i := 0; i < MaxUniPortPerOnu; i++ {
+ key := onuTpInstancePathSuffix + fmt.Sprintf("/uni-{%d}", i)
+ if tpInst, ok := t.eponTpInstanceMap[key]; ok {
+ tpInstancesTech = append(tpInstancesTech, *tpInst)
+ }
+ }
+ return tpInstancesTech
+ } else {
+ logger.Errorw(ctx, "unknown-tech", log.Fields{"tech": tech, "tpID": tpID, "onuID": onuID, "intfID": intfID})
+ }
+ return nil
+}
+
+func (t *TechProfileMgr) GetResourceID(ctx context.Context, intfID uint32, resourceType string, numIDs uint32) ([]uint32, error) {
+ logger.Debugw(ctx, "getting-resource-id", log.Fields{
+ "intf-id": intfID,
+ "resource-type": resourceType,
+ "num": numIDs,
+ })
+ var err error
+ var ids []uint32
+ switch resourceType {
+ case t.resourceMgr.GetResourceTypeAllocID():
+ t.AllocIDMgmtLock.Lock()
+ ids, err = t.resourceMgr.GetResourceID(ctx, intfID, resourceType, numIDs)
+ t.AllocIDMgmtLock.Unlock()
+ case t.resourceMgr.GetResourceTypeGemPortID():
+ t.GemPortIDMgmtLock.Lock()
+ ids, err = t.resourceMgr.GetResourceID(ctx, intfID, resourceType, numIDs)
+ t.GemPortIDMgmtLock.Unlock()
+ case t.resourceMgr.GetResourceTypeOnuID():
+ t.OnuIDMgmtLock.Lock()
+ ids, err = t.resourceMgr.GetResourceID(ctx, intfID, resourceType, numIDs)
+ t.OnuIDMgmtLock.Unlock()
+ default:
+ return nil, fmt.Errorf("resourceType %s not supported", resourceType)
+ }
+ if err != nil {
+ return nil, err
+ }
+ return ids, nil
+}
+
+func (t *TechProfileMgr) FreeResourceID(ctx context.Context, intfID uint32, resourceType string, ReleaseContent []uint32) error {
+ logger.Debugw(ctx, "freeing-resource-id", log.Fields{
+ "intf-id": intfID,
+ "resource-type": resourceType,
+ "release-content": ReleaseContent,
+ })
+ var err error
+ switch resourceType {
+ case t.resourceMgr.GetResourceTypeAllocID():
+ t.AllocIDMgmtLock.Lock()
+ err = t.resourceMgr.FreeResourceID(ctx, intfID, resourceType, ReleaseContent)
+ t.AllocIDMgmtLock.Unlock()
+ case t.resourceMgr.GetResourceTypeGemPortID():
+ t.GemPortIDMgmtLock.Lock()
+ err = t.resourceMgr.FreeResourceID(ctx, intfID, resourceType, ReleaseContent)
+ t.GemPortIDMgmtLock.Unlock()
+ case t.resourceMgr.GetResourceTypeOnuID():
+ t.OnuIDMgmtLock.Lock()
+ err = t.resourceMgr.FreeResourceID(ctx, intfID, resourceType, ReleaseContent)
+ t.OnuIDMgmtLock.Unlock()
+ default:
+ return fmt.Errorf("resourceType %s not supported", resourceType)
+ }
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (t *TechProfileMgr) GetUsScheduler(tpInstance *tp_pb.TechProfileInstance) *tp_pb.SchedulerConfig {
+ return &tp_pb.SchedulerConfig{
+ Direction: tpInstance.UsScheduler.Direction,
+ AdditionalBw: tpInstance.UsScheduler.AdditionalBw,
+ Priority: tpInstance.UsScheduler.Priority,
+ Weight: tpInstance.UsScheduler.Weight,
+ SchedPolicy: tpInstance.UsScheduler.QSchedPolicy}
+}
+
+func (t *TechProfileMgr) GetDsScheduler(tpInstance *tp_pb.TechProfileInstance) *tp_pb.SchedulerConfig {
+ return &tp_pb.SchedulerConfig{
+ Direction: tpInstance.DsScheduler.Direction,
+ AdditionalBw: tpInstance.DsScheduler.AdditionalBw,
+ Priority: tpInstance.DsScheduler.Priority,
+ Weight: tpInstance.DsScheduler.Weight,
+ SchedPolicy: tpInstance.DsScheduler.QSchedPolicy}
+}
+
+func (t *TechProfileMgr) GetTrafficScheduler(tpInstance *tp_pb.TechProfileInstance, SchedCfg *tp_pb.SchedulerConfig,
+ ShapingCfg *tp_pb.TrafficShapingInfo) *tp_pb.TrafficScheduler {
+
+ tSched := &tp_pb.TrafficScheduler{
+ Direction: SchedCfg.Direction,
+ AllocId: tpInstance.UsScheduler.AllocId,
+ TrafficShapingInfo: ShapingCfg,
+ Scheduler: SchedCfg}
+
+ return tSched
+}
+
+func (t *TechProfileMgr) GetTrafficQueues(ctx context.Context, tp *tp_pb.TechProfileInstance, direction tp_pb.Direction) ([]*tp_pb.TrafficQueue, error) {
+
+ var encryp bool
+ if direction == tp_pb.Direction_UPSTREAM {
+ // upstream GEM ports
+ NumGemPorts := len(tp.UpstreamGemPortAttributeList)
+ GemPorts := make([]*tp_pb.TrafficQueue, 0)
+ for Count := 0; Count < NumGemPorts; Count++ {
+ if tp.UpstreamGemPortAttributeList[Count].AesEncryption == "True" {
+ encryp = true
+ } else {
+ encryp = false
+ }
+
+ GemPorts = append(GemPorts, &tp_pb.TrafficQueue{
+ Direction: direction,
+ GemportId: tp.UpstreamGemPortAttributeList[Count].GemportId,
+ PbitMap: tp.UpstreamGemPortAttributeList[Count].PbitMap,
+ AesEncryption: encryp,
+ SchedPolicy: tp.UpstreamGemPortAttributeList[Count].SchedulingPolicy,
+ Priority: tp.UpstreamGemPortAttributeList[Count].PriorityQ,
+ Weight: tp.UpstreamGemPortAttributeList[Count].Weight,
+ DiscardPolicy: tp.UpstreamGemPortAttributeList[Count].DiscardPolicy,
+ })
+ }
+ logger.Debugw(ctx, "Upstream Traffic queue list ", log.Fields{"queuelist": GemPorts})
+ return GemPorts, nil
+ } else if direction == tp_pb.Direction_DOWNSTREAM {
+ //downstream GEM ports
+ NumGemPorts := len(tp.DownstreamGemPortAttributeList)
+ GemPorts := make([]*tp_pb.TrafficQueue, 0)
+ for Count := 0; Count < NumGemPorts; Count++ {
+ if isMulticastGem(tp.DownstreamGemPortAttributeList[Count].IsMulticast) {
+ //do not take multicast GEM ports. They are handled separately.
+ continue
+ }
+ if tp.DownstreamGemPortAttributeList[Count].AesEncryption == "True" {
+ encryp = true
+ } else {
+ encryp = false
+ }
+
+ GemPorts = append(GemPorts, &tp_pb.TrafficQueue{
+ Direction: direction,
+ GemportId: tp.DownstreamGemPortAttributeList[Count].GemportId,
+ PbitMap: tp.DownstreamGemPortAttributeList[Count].PbitMap,
+ AesEncryption: encryp,
+ SchedPolicy: tp.DownstreamGemPortAttributeList[Count].SchedulingPolicy,
+ Priority: tp.DownstreamGemPortAttributeList[Count].PriorityQ,
+ Weight: tp.DownstreamGemPortAttributeList[Count].Weight,
+ DiscardPolicy: tp.DownstreamGemPortAttributeList[Count].DiscardPolicy,
+ })
+ }
+ logger.Debugw(ctx, "Downstream Traffic queue list ", log.Fields{"queuelist": GemPorts})
+ return GemPorts, nil
+ }
+
+ logger.Errorf(ctx, "Unsupported direction %s used for generating Traffic Queue list", direction)
+ return nil, fmt.Errorf("downstream gem port traffic queue creation failed due to unsupported direction %s", direction)
+}
+
+func (t *TechProfileMgr) validateInstanceControlAttr(ctx context.Context, instCtl tp_pb.InstanceControl) error {
if instCtl.Onu != "single-instance" && instCtl.Onu != "multi-instance" {
logger.Errorw(ctx, "invalid-onu-instance-control-attribute", log.Fields{"onu-inst": instCtl.Onu})
return errors.New("invalid-onu-instance-ctl-attr")
@@ -649,56 +703,54 @@
return nil
}
-func (t *TechProfileMgr) allocateTPInstance(ctx context.Context, uniPortName string, tp *DefaultTechProfile, intfId uint32, tpInstPath string) *TechProfile {
+// allocateTPInstance for GPON, XGPON and XGS-PON technology
+func (t *TechProfileMgr) allocateTPInstance(ctx context.Context, uniPortName string, tp *tp_pb.TechProfile, intfID uint32, tpInstPathSuffix string) *tp_pb.TechProfileInstance {
- var usGemPortAttributeList []IGemPortAttribute
- var dsGemPortAttributeList []IGemPortAttribute
- var dsMulticastGemAttributeList []IGemPortAttribute
- var dsUnicastGemAttributeList []IGemPortAttribute
+ var usGemPortAttributeList []*tp_pb.GemPortAttributes
+ var dsGemPortAttributeList []*tp_pb.GemPortAttributes
+ var dsMulticastGemAttributeList []*tp_pb.GemPortAttributes
+ var dsUnicastGemAttributeList []*tp_pb.GemPortAttributes
var tcontIDs []uint32
var gemPorts []uint32
var err error
- logger.Infow(ctx, "Allocating TechProfileMgr instance from techprofile template", log.Fields{"uniPortName": uniPortName, "intfId": intfId, "numGem": tp.NumGemPorts})
+ logger.Infow(ctx, "Allocating TechProfileMgr instance from techprofile template", log.Fields{"uniPortName": uniPortName, "intfID": intfID, "numGem": tp.NumGemPorts})
- if tp.InstanceCtrl.Onu == "multi-instance" {
- tcontIDs, err = t.GetResourceID(ctx, intfId, t.resourceMgr.GetResourceTypeAllocID(), 1)
+ if tp.InstanceControl.Onu == "multi-instance" {
+ tcontIDs, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), 1)
if err != nil {
- logger.Errorw(ctx, "Error getting alloc id from rsrcrMgr", log.Fields{"intfId": intfId})
+ logger.Errorw(ctx, "Error getting alloc id from rsrcrMgr", log.Fields{"err": err, "intfID": intfID})
return nil
}
} else { // "single-instance"
- if tpInst, err := t.getSingleInstanceTp(ctx, tpInstPath); err != nil {
- logger.Errorw(ctx, "Error getting alloc id from rsrcrMgr", log.Fields{"intfId": intfId})
- return nil
- } else if tpInst == nil {
+ if tpInst := t.getSingleInstanceTp(ctx, tpInstPathSuffix); tpInst == nil {
// No "single-instance" tp found on one any uni port for the given TP ID
// Allocate a new TcontID or AllocID
- tcontIDs, err = t.GetResourceID(ctx, intfId, t.resourceMgr.GetResourceTypeAllocID(), 1)
+ tcontIDs, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), 1)
if err != nil {
- logger.Errorw(ctx, "Error getting alloc id from rsrcrMgr", log.Fields{"intfId": intfId})
+ logger.Errorw(ctx, "Error getting alloc id from rsrcrMgr", log.Fields{"err": err, "intfID": intfID})
return nil
}
} else {
// Use the alloc-id from the existing TpInstance
- tcontIDs = append(tcontIDs, tpInst.UsScheduler.AllocID)
+ tcontIDs = append(tcontIDs, tpInst.UsScheduler.AllocId)
}
}
logger.Debugw(ctx, "Num GEM ports in TP:", log.Fields{"NumGemPorts": tp.NumGemPorts})
- gemPorts, err = t.GetResourceID(ctx, intfId, t.resourceMgr.GetResourceTypeGemPortID(), tp.NumGemPorts)
+ gemPorts, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeGemPortID(), tp.NumGemPorts)
if err != nil {
- logger.Errorw(ctx, "Error getting gemport ids from rsrcrMgr", log.Fields{"intfId": intfId, "numGemports": tp.NumGemPorts})
+ logger.Errorw(ctx, "Error getting gemport ids from rsrcrMgr", log.Fields{"err": err, "intfID": intfID, "numGemports": tp.NumGemPorts})
return nil
}
logger.Infow(ctx, "Allocated tconts and GEM ports successfully", log.Fields{"tconts": tcontIDs, "gemports": gemPorts})
for index := 0; index < int(tp.NumGemPorts); index++ {
usGemPortAttributeList = append(usGemPortAttributeList,
- IGemPortAttribute{GemportID: gemPorts[index],
- MaxQueueSize: tp.UpstreamGemPortAttributeList[index].MaxQueueSize,
+ &tp_pb.GemPortAttributes{GemportId: gemPorts[index],
+ MaxQSize: tp.UpstreamGemPortAttributeList[index].MaxQSize,
PbitMap: tp.UpstreamGemPortAttributeList[index].PbitMap,
AesEncryption: tp.UpstreamGemPortAttributeList[index].AesEncryption,
SchedulingPolicy: tp.UpstreamGemPortAttributeList[index].SchedulingPolicy,
- PriorityQueue: tp.UpstreamGemPortAttributeList[index].PriorityQueue,
+ PriorityQ: tp.UpstreamGemPortAttributeList[index].PriorityQ,
Weight: tp.UpstreamGemPortAttributeList[index].Weight,
DiscardPolicy: tp.UpstreamGemPortAttributeList[index].DiscardPolicy,
DiscardConfig: tp.UpstreamGemPortAttributeList[index].DiscardConfig})
@@ -706,69 +758,71 @@
logger.Info(ctx, "length of DownstreamGemPortAttributeList", len(tp.DownstreamGemPortAttributeList))
//put multicast and unicast downstream GEM port attributes in different lists first
- for index := 0; index < int(len(tp.DownstreamGemPortAttributeList)); index++ {
+ for index := 0; index < len(tp.DownstreamGemPortAttributeList); index++ {
if isMulticastGem(tp.DownstreamGemPortAttributeList[index].IsMulticast) {
dsMulticastGemAttributeList = append(dsMulticastGemAttributeList,
- IGemPortAttribute{
- McastGemID: tp.DownstreamGemPortAttributeList[index].McastGemID,
- MaxQueueSize: tp.DownstreamGemPortAttributeList[index].MaxQueueSize,
- PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
- AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
- SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
- PriorityQueue: tp.DownstreamGemPortAttributeList[index].PriorityQueue,
- Weight: tp.DownstreamGemPortAttributeList[index].Weight,
- DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
- DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig,
- IsMulticast: tp.DownstreamGemPortAttributeList[index].IsMulticast,
- DControlList: tp.DownstreamGemPortAttributeList[index].DControlList,
- SControlList: tp.DownstreamGemPortAttributeList[index].SControlList})
+ &tp_pb.GemPortAttributes{
+ MulticastGemId: tp.DownstreamGemPortAttributeList[index].MulticastGemId,
+ MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
+ PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
+ AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
+ SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
+ PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
+ Weight: tp.DownstreamGemPortAttributeList[index].Weight,
+ DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
+ DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig,
+ IsMulticast: tp.DownstreamGemPortAttributeList[index].IsMulticast,
+ DynamicAccessControlList: tp.DownstreamGemPortAttributeList[index].DynamicAccessControlList,
+ StaticAccessControlList: tp.DownstreamGemPortAttributeList[index].StaticAccessControlList})
} else {
dsUnicastGemAttributeList = append(dsUnicastGemAttributeList,
- IGemPortAttribute{
- MaxQueueSize: tp.DownstreamGemPortAttributeList[index].MaxQueueSize,
+ &tp_pb.GemPortAttributes{
+ MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
- PriorityQueue: tp.DownstreamGemPortAttributeList[index].PriorityQueue,
+ PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
Weight: tp.DownstreamGemPortAttributeList[index].Weight,
DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig})
}
}
//add unicast downstream GEM ports to dsGemPortAttributeList
- for index := 0; index < int(tp.NumGemPorts); index++ {
- dsGemPortAttributeList = append(dsGemPortAttributeList,
- IGemPortAttribute{GemportID: gemPorts[index],
- MaxQueueSize: dsUnicastGemAttributeList[index].MaxQueueSize,
- PbitMap: dsUnicastGemAttributeList[index].PbitMap,
- AesEncryption: dsUnicastGemAttributeList[index].AesEncryption,
- SchedulingPolicy: dsUnicastGemAttributeList[index].SchedulingPolicy,
- PriorityQueue: dsUnicastGemAttributeList[index].PriorityQueue,
- Weight: dsUnicastGemAttributeList[index].Weight,
- DiscardPolicy: dsUnicastGemAttributeList[index].DiscardPolicy,
- DiscardConfig: dsUnicastGemAttributeList[index].DiscardConfig})
+ if dsUnicastGemAttributeList != nil {
+ for index := 0; index < int(tp.NumGemPorts); index++ {
+ dsGemPortAttributeList = append(dsGemPortAttributeList,
+ &tp_pb.GemPortAttributes{GemportId: gemPorts[index],
+ MaxQSize: dsUnicastGemAttributeList[index].MaxQSize,
+ PbitMap: dsUnicastGemAttributeList[index].PbitMap,
+ AesEncryption: dsUnicastGemAttributeList[index].AesEncryption,
+ SchedulingPolicy: dsUnicastGemAttributeList[index].SchedulingPolicy,
+ PriorityQ: dsUnicastGemAttributeList[index].PriorityQ,
+ Weight: dsUnicastGemAttributeList[index].Weight,
+ DiscardPolicy: dsUnicastGemAttributeList[index].DiscardPolicy,
+ DiscardConfig: dsUnicastGemAttributeList[index].DiscardConfig})
+ }
}
//add multicast GEM ports to dsGemPortAttributeList afterwards
for k := range dsMulticastGemAttributeList {
dsGemPortAttributeList = append(dsGemPortAttributeList, dsMulticastGemAttributeList[k])
}
- return &TechProfile{
+ return &tp_pb.TechProfileInstance{
SubscriberIdentifier: uniPortName,
Name: tp.Name,
ProfileType: tp.ProfileType,
Version: tp.Version,
NumGemPorts: tp.NumGemPorts,
- InstanceCtrl: tp.InstanceCtrl,
- UsScheduler: IScheduler{
- AllocID: tcontIDs[0],
+ InstanceControl: tp.InstanceControl,
+ UsScheduler: &tp_pb.SchedulerAttributes{
+ AllocId: tcontIDs[0],
Direction: tp.UsScheduler.Direction,
AdditionalBw: tp.UsScheduler.AdditionalBw,
Priority: tp.UsScheduler.Priority,
Weight: tp.UsScheduler.Weight,
QSchedPolicy: tp.UsScheduler.QSchedPolicy},
- DsScheduler: IScheduler{
- AllocID: tcontIDs[0],
+ DsScheduler: &tp_pb.SchedulerAttributes{
+ AllocId: tcontIDs[0],
Direction: tp.DsScheduler.Direction,
AdditionalBw: tp.DsScheduler.AdditionalBw,
Priority: tp.DsScheduler.Priority,
@@ -779,47 +833,44 @@
}
// allocateTPInstance function for EPON
-func (t *TechProfileMgr) allocateEponTPInstance(ctx context.Context, uniPortName string, tp *DefaultEponProfile, intfId uint32, tpInstPath string) *EponProfile {
+func (t *TechProfileMgr) allocateEponTPInstance(ctx context.Context, uniPortName string, tp *tp_pb.EponTechProfile, intfID uint32, tpInstPath string) *tp_pb.EponTechProfileInstance {
- var usQueueAttributeList []iUpstreamQueueAttribute
- var dsQueueAttributeList []iDownstreamQueueAttribute
+ var usQueueAttributeList []*tp_pb.EPONQueueAttributes
+ var dsQueueAttributeList []*tp_pb.EPONQueueAttributes
var tcontIDs []uint32
var gemPorts []uint32
var err error
- logger.Infow(ctx, "Allocating TechProfileMgr instance from techprofile template", log.Fields{"uniPortName": uniPortName, "intfId": intfId, "numGem": tp.NumGemPorts})
+ logger.Infow(ctx, "allocating-tp-instance-from-tp-template", log.Fields{"uniPortName": uniPortName, "intfID": intfID, "numGem": tp.NumGemPorts})
- if tp.InstanceCtrl.Onu == "multi-instance" {
- if tcontIDs, err = t.GetResourceID(ctx, intfId, t.resourceMgr.GetResourceTypeAllocID(), 1); err != nil {
- logger.Errorw(ctx, "Error getting alloc id from rsrcrMgr", log.Fields{"intfId": intfId})
+ if tp.InstanceControl.Onu == "multi-instance" {
+ if tcontIDs, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), 1); err != nil {
+ logger.Errorw(ctx, "Error getting alloc id from rsrcrMgr", log.Fields{"err": err, "intfID": intfID})
return nil
}
} else { // "single-instance"
- if tpInst, err := t.getSingleInstanceEponTp(ctx, tpInstPath); err != nil {
- logger.Errorw(ctx, "Error getting alloc id from rsrcrMgr", log.Fields{"intfId": intfId})
- return nil
- } else if tpInst == nil {
+ if tpInst := t.getSingleInstanceEponTp(ctx, tpInstPath); tpInst == nil {
// No "single-instance" tp found on one any uni port for the given TP ID
// Allocate a new TcontID or AllocID
- if tcontIDs, err = t.GetResourceID(ctx, intfId, t.resourceMgr.GetResourceTypeAllocID(), 1); err != nil {
- logger.Errorw(ctx, "Error getting alloc id from rsrcrMgr", log.Fields{"intfId": intfId})
+ if tcontIDs, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeAllocID(), 1); err != nil {
+ logger.Errorw(ctx, "error-getting-alloc-id-from-resource-mgr", log.Fields{"err": err, "intfID": intfID})
return nil
}
} else {
// Use the alloc-id from the existing TpInstance
- tcontIDs = append(tcontIDs, tpInst.AllocID)
+ tcontIDs = append(tcontIDs, tpInst.AllocId)
}
}
logger.Debugw(ctx, "Num GEM ports in TP:", log.Fields{"NumGemPorts": tp.NumGemPorts})
- if gemPorts, err = t.GetResourceID(ctx, intfId, t.resourceMgr.GetResourceTypeGemPortID(), tp.NumGemPorts); err != nil {
- logger.Errorw(ctx, "Error getting gemport ids from rsrcrMgr", log.Fields{"intfId": intfId, "numGemports": tp.NumGemPorts})
+ if gemPorts, err = t.GetResourceID(ctx, intfID, t.resourceMgr.GetResourceTypeGemPortID(), tp.NumGemPorts); err != nil {
+ logger.Errorw(ctx, "error-getting-gemport-id-from-resource-mgr", log.Fields{"err": err, "intfID": intfID, "numGemports": tp.NumGemPorts})
return nil
}
- logger.Infow(ctx, "Allocated tconts and GEM ports successfully", log.Fields{"tconts": tcontIDs, "gemports": gemPorts})
+ logger.Infow(ctx, "allocated-alloc-id-and-gemport-successfully", log.Fields{"tconts": tcontIDs, "gemports": gemPorts})
for index := 0; index < int(tp.NumGemPorts); index++ {
usQueueAttributeList = append(usQueueAttributeList,
- iUpstreamQueueAttribute{GemportID: gemPorts[index],
- MaxQueueSize: tp.UpstreamQueueAttributeList[index].MaxQueueSize,
+ &tp_pb.EPONQueueAttributes{GemportId: gemPorts[index],
+ MaxQSize: tp.UpstreamQueueAttributeList[index].MaxQSize,
PbitMap: tp.UpstreamQueueAttributeList[index].PbitMap,
AesEncryption: tp.UpstreamQueueAttributeList[index].AesEncryption,
TrafficType: tp.UpstreamQueueAttributeList[index].TrafficType,
@@ -827,165 +878,180 @@
NominalInterval: tp.UpstreamQueueAttributeList[index].NominalInterval,
ToleratedPollJitter: tp.UpstreamQueueAttributeList[index].ToleratedPollJitter,
RequestTransmissionPolicy: tp.UpstreamQueueAttributeList[index].RequestTransmissionPolicy,
- NumQueueSet: tp.UpstreamQueueAttributeList[index].NumQueueSet,
+ NumQSets: tp.UpstreamQueueAttributeList[index].NumQSets,
QThresholds: tp.UpstreamQueueAttributeList[index].QThresholds,
SchedulingPolicy: tp.UpstreamQueueAttributeList[index].SchedulingPolicy,
- PriorityQueue: tp.UpstreamQueueAttributeList[index].PriorityQueue,
+ PriorityQ: tp.UpstreamQueueAttributeList[index].PriorityQ,
Weight: tp.UpstreamQueueAttributeList[index].Weight,
DiscardPolicy: tp.UpstreamQueueAttributeList[index].DiscardPolicy,
DiscardConfig: tp.UpstreamQueueAttributeList[index].DiscardConfig})
}
- logger.Info(ctx, "length of DownstreamGemPortAttributeList", len(tp.DownstreamQueueAttributeList))
+ logger.Info(ctx, "length-of-downstream-gemport-attribute-list", len(tp.DownstreamQueueAttributeList))
for index := 0; index < int(tp.NumGemPorts); index++ {
dsQueueAttributeList = append(dsQueueAttributeList,
- iDownstreamQueueAttribute{GemportID: gemPorts[index],
- MaxQueueSize: tp.DownstreamQueueAttributeList[index].MaxQueueSize,
+ &tp_pb.EPONQueueAttributes{GemportId: gemPorts[index],
+ MaxQSize: tp.DownstreamQueueAttributeList[index].MaxQSize,
PbitMap: tp.DownstreamQueueAttributeList[index].PbitMap,
AesEncryption: tp.DownstreamQueueAttributeList[index].AesEncryption,
SchedulingPolicy: tp.DownstreamQueueAttributeList[index].SchedulingPolicy,
- PriorityQueue: tp.DownstreamQueueAttributeList[index].PriorityQueue,
+ PriorityQ: tp.DownstreamQueueAttributeList[index].PriorityQ,
Weight: tp.DownstreamQueueAttributeList[index].Weight,
DiscardPolicy: tp.DownstreamQueueAttributeList[index].DiscardPolicy,
DiscardConfig: tp.DownstreamQueueAttributeList[index].DiscardConfig})
}
- return &EponProfile{
+ return &tp_pb.EponTechProfileInstance{
SubscriberIdentifier: uniPortName,
Name: tp.Name,
ProfileType: tp.ProfileType,
Version: tp.Version,
NumGemPorts: tp.NumGemPorts,
- InstanceCtrl: tp.InstanceCtrl,
- EponAttribute: tp.EponAttribute,
- AllocID: tcontIDs[0],
+ InstanceControl: tp.InstanceControl,
+ PackageType: tp.PackageType,
+ AllocId: tcontIDs[0],
UpstreamQueueAttributeList: usQueueAttributeList,
DownstreamQueueAttributeList: dsQueueAttributeList}
}
-// getSingleInstanceTp returns another TpInstance for an ONU on a different
+// getSingleInstanceTp returns another TpInstance (GPON, XGPON, XGS-PON) for an ONU on a different
// uni port for the same TP ID, if it finds one, else nil.
-func (t *TechProfileMgr) getSingleInstanceTp(ctx context.Context, tpPath string) (*TechProfile, error) {
- var tpInst TechProfile
+func (t *TechProfileMgr) getSingleInstanceTp(ctx context.Context, tpPathSuffix string) *tp_pb.TechProfileInstance {
// For example:
- // tpPath like "service/voltha/technology_profiles/xgspon/64/pon-{0}/onu-{1}/uni-{1}"
- // is broken into ["service/voltha/technology_profiles/xgspon/64/pon-{0}/onu-{1}" ""]
- uniPathSlice := regexp.MustCompile(`/uni-{[0-9]+}$`).Split(tpPath, 2)
- kvPairs, _ := t.config.KVBackend.List(ctx, uniPathSlice[0])
+ // tpPathSuffix like "XGS-PON/64/olt-{1234}/pon-{0}/onu-{1}/uni-{1}"
+ // is broken into ["XGS-PON/64/olt-{1234}/pon-{0}/onu-{1}" ""]
+ uniPathSlice := regexp.MustCompile(`/uni-{[0-9]+}$`).Split(tpPathSuffix, 2)
- // Find a valid TP Instance among all the UNIs of that ONU for the given TP ID
- for keyPath, kvPair := range kvPairs {
- if value, err := kvstore.ToByte(kvPair.Value); err == nil {
- if err = json.Unmarshal(value, &tpInst); err != nil {
- logger.Errorw(ctx, "error-unmarshal-kv-pair", log.Fields{"keyPath": keyPath, "value": value})
- return nil, errors.New("error-unmarshal-kv-pair")
- } else {
- logger.Debugw(ctx, "found-valid-tp-instance-on-another-uni", log.Fields{"keyPath": keyPath})
- return &tpInst, nil
- }
+ t.tpInstanceMapLock.RLock()
+ defer t.tpInstanceMapLock.RUnlock()
+ for i := 0; i < MaxUniPortPerOnu; i++ {
+ key := fmt.Sprintf(uniPathSlice[0]+"/uni-{%d}", i)
+ if tpInst, ok := t.tpInstanceMap[key]; ok {
+ logger.Debugw(ctx, "found-single-instance-tp", log.Fields{"key": key})
+ return tpInst
}
}
- return nil, nil
+ return nil
}
-func (t *TechProfileMgr) getSingleInstanceEponTp(ctx context.Context, tpPath string) (*EponProfile, error) {
- var tpInst EponProfile
-
+// getSingleInstanceTp returns another TpInstance (EPON) for an ONU on a different
+// uni port for the same TP ID, if it finds one, else nil.
+func (t *TechProfileMgr) getSingleInstanceEponTp(ctx context.Context, tpPathSuffix string) *tp_pb.EponTechProfileInstance {
// For example:
- // tpPath like "service/voltha/technology_profiles/xgspon/64/pon-{0}/onu-{1}/uni-{1}"
- // is broken into ["service/voltha/technology_profiles/xgspon/64/pon-{0}/onu-{1}" ""]
- uniPathSlice := regexp.MustCompile(`/uni-{[0-9]+}$`).Split(tpPath, 2)
- kvPairs, _ := t.config.KVBackend.List(ctx, uniPathSlice[0])
+ // tpPathSuffix like "EPON/64/olt-{1234}/pon-{0}/onu-{1}/uni-{1}"
+ // is broken into ["EPON/64/-{1234}/pon-{0}/onu-{1}" ""]
+ uniPathSlice := regexp.MustCompile(`/uni-{[0-9]+}$`).Split(tpPathSuffix, 2)
- // Find a valid TP Instance among all the UNIs of that ONU for the given TP ID
- for keyPath, kvPair := range kvPairs {
- if value, err := kvstore.ToByte(kvPair.Value); err == nil {
- if err = json.Unmarshal(value, &tpInst); err != nil {
- logger.Errorw(ctx, "error-unmarshal-kv-pair", log.Fields{"keyPath": keyPath, "value": value})
- return nil, errors.New("error-unmarshal-kv-pair")
- } else {
- logger.Debugw(ctx, "found-valid-tp-instance-on-another-uni", log.Fields{"keyPath": keyPath})
- return &tpInst, nil
- }
+ t.epontpInstanceMapLock.RLock()
+ defer t.epontpInstanceMapLock.RUnlock()
+ for i := 0; i < MaxUniPortPerOnu; i++ {
+ key := fmt.Sprintf(uniPathSlice[0]+"/uni-{%d}", i)
+ if tpInst, ok := t.eponTpInstanceMap[key]; ok {
+ logger.Debugw(ctx, "found-single-instance-tp", log.Fields{"key": key})
+ return tpInst
}
}
- return nil, nil
+ return nil
}
-func (t *TechProfileMgr) getDefaultTechProfile(ctx context.Context) *DefaultTechProfile {
- var usGemPortAttributeList []GemPortAttribute
- var dsGemPortAttributeList []GemPortAttribute
+// getDefaultTechProfile returns a default TechProfile for GPON, XGPON, XGS-PON
+func (t *TechProfileMgr) getDefaultTechProfile(ctx context.Context) *tp_pb.TechProfile {
+ var usGemPortAttributeList []*tp_pb.GemPortAttributes
+ var dsGemPortAttributeList []*tp_pb.GemPortAttributes
for _, pbit := range t.config.DefaultPbits {
- logger.Debugw(ctx, "Creating GEM port", log.Fields{"pbit": pbit})
+ logger.Debugw(ctx, "creating-gem-port-profile-profile", log.Fields{"pbit": pbit})
usGemPortAttributeList = append(usGemPortAttributeList,
- GemPortAttribute{
- MaxQueueSize: defaultMaxQueueSize,
+ &tp_pb.GemPortAttributes{
+ MaxQSize: defaultMaxQueueSize,
PbitMap: pbit,
AesEncryption: defaultAESEncryption,
- SchedulingPolicy: SchedulingPolicy_name[defaultSchedulePolicy],
- PriorityQueue: defaultPriorityQueue,
+ SchedulingPolicy: tp_pb.SchedulingPolicy_WRR,
+ PriorityQ: defaultPriorityQueue,
Weight: defaultQueueWeight,
- DiscardPolicy: DiscardPolicy_name[defaultdropPolicy],
- DiscardConfig: DiscardConfig{
+ DiscardPolicy: tp_pb.DiscardPolicy_TailDrop,
+ DiscardConfigV2: &tp_pb.DiscardConfig{
+ DiscardPolicy: tp_pb.DiscardPolicy_Red,
+ DiscardConfig: &tp_pb.DiscardConfig_RedDiscardConfig{
+ RedDiscardConfig: &tp_pb.RedDiscardConfig{
+ MinThreshold: defaultMinThreshold,
+ MaxThreshold: defaultMaxThreshold,
+ MaxProbability: defaultMaxProbability,
+ },
+ },
+ },
+ DiscardConfig: &tp_pb.RedDiscardConfig{
MinThreshold: defaultMinThreshold,
MaxThreshold: defaultMaxThreshold,
- MaxProbability: defaultMaxProbability}})
+ MaxProbability: defaultMaxProbability,
+ },
+ })
dsGemPortAttributeList = append(dsGemPortAttributeList,
- GemPortAttribute{
- MaxQueueSize: defaultMaxQueueSize,
+ &tp_pb.GemPortAttributes{
+ MaxQSize: defaultMaxQueueSize,
PbitMap: pbit,
AesEncryption: defaultAESEncryption,
- SchedulingPolicy: SchedulingPolicy_name[defaultSchedulePolicy],
- PriorityQueue: defaultPriorityQueue,
+ SchedulingPolicy: tp_pb.SchedulingPolicy_WRR,
+ PriorityQ: defaultPriorityQueue,
Weight: defaultQueueWeight,
- DiscardPolicy: DiscardPolicy_name[defaultdropPolicy],
- DiscardConfig: DiscardConfig{
+ DiscardPolicy: tp_pb.DiscardPolicy_TailDrop,
+ DiscardConfigV2: &tp_pb.DiscardConfig{
+ DiscardPolicy: tp_pb.DiscardPolicy_Red,
+ DiscardConfig: &tp_pb.DiscardConfig_RedDiscardConfig{
+ RedDiscardConfig: &tp_pb.RedDiscardConfig{
+ MinThreshold: defaultMinThreshold,
+ MaxThreshold: defaultMaxThreshold,
+ MaxProbability: defaultMaxProbability,
+ },
+ },
+ },
+ DiscardConfig: &tp_pb.RedDiscardConfig{
MinThreshold: defaultMinThreshold,
MaxThreshold: defaultMaxThreshold,
- MaxProbability: defaultMaxProbability},
- IsMulticast: defaultIsMulticast,
- DControlList: defaultAccessControlList,
- SControlList: defaultAccessControlList,
- McastGemID: defaultMcastGemID})
+ MaxProbability: defaultMaxProbability,
+ },
+ IsMulticast: defaultIsMulticast,
+ DynamicAccessControlList: defaultAccessControlList,
+ StaticAccessControlList: defaultAccessControlList,
+ MulticastGemId: defaultMcastGemID})
}
- return &DefaultTechProfile{
+ return &tp_pb.TechProfile{
Name: t.config.DefaultTPName,
ProfileType: t.resourceMgr.GetTechnology(),
Version: t.config.TPVersion,
NumGemPorts: uint32(len(usGemPortAttributeList)),
- InstanceCtrl: InstanceControl{
+ InstanceControl: &tp_pb.InstanceControl{
Onu: defaultOnuInstance,
Uni: defaultUniInstance,
MaxGemPayloadSize: defaultGemPayloadSize},
- UsScheduler: Scheduler{
- Direction: Direction_name[Direction_UPSTREAM],
- AdditionalBw: AdditionalBW_name[defaultAdditionalBw],
+ UsScheduler: &tp_pb.SchedulerAttributes{
+ Direction: tp_pb.Direction_UPSTREAM,
+ AdditionalBw: tp_pb.AdditionalBW_AdditionalBW_BestEffort,
Priority: defaultPriority,
Weight: defaultWeight,
- QSchedPolicy: SchedulingPolicy_name[defaultQueueSchedPolicy]},
- DsScheduler: Scheduler{
- Direction: Direction_name[Direction_DOWNSTREAM],
- AdditionalBw: AdditionalBW_name[defaultAdditionalBw],
+ QSchedPolicy: tp_pb.SchedulingPolicy_Hybrid},
+ DsScheduler: &tp_pb.SchedulerAttributes{
+ Direction: tp_pb.Direction_DOWNSTREAM,
+ AdditionalBw: tp_pb.AdditionalBW_AdditionalBW_BestEffort,
Priority: defaultPriority,
Weight: defaultWeight,
- QSchedPolicy: SchedulingPolicy_name[defaultQueueSchedPolicy]},
+ QSchedPolicy: tp_pb.SchedulingPolicy_Hybrid},
UpstreamGemPortAttributeList: usGemPortAttributeList,
DownstreamGemPortAttributeList: dsGemPortAttributeList}
}
-// getDefaultTechProfile function for EPON
-func (t *TechProfileMgr) getDefaultEponProfile(ctx context.Context) *DefaultEponProfile {
+// getDefaultEponProfile returns a default TechProfile for EPON
+func (t *TechProfileMgr) getDefaultEponProfile(ctx context.Context) *tp_pb.EponTechProfile {
- var usQueueAttributeList []UpstreamQueueAttribute
- var dsQueueAttributeList []DownstreamQueueAttribute
+ var usQueueAttributeList []*tp_pb.EPONQueueAttributes
+ var dsQueueAttributeList []*tp_pb.EPONQueueAttributes
for _, pbit := range t.config.DefaultPbits {
logger.Debugw(ctx, "Creating Queue", log.Fields{"pbit": pbit})
usQueueAttributeList = append(usQueueAttributeList,
- UpstreamQueueAttribute{
- MaxQueueSize: defaultMaxQueueSize,
+ &tp_pb.EPONQueueAttributes{
+ MaxQSize: defaultMaxQueueSize,
PbitMap: pbit,
AesEncryption: defaultAESEncryption,
TrafficType: defaultTrafficType,
@@ -993,8 +1059,8 @@
NominalInterval: defaultNominalInterval,
ToleratedPollJitter: defaultToleratedPollJitter,
RequestTransmissionPolicy: defaultRequestTransmissionPolicy,
- NumQueueSet: defaultNumQueueSet,
- QThresholds: QThresholds{
+ NumQSets: defaultNumQueueSet,
+ QThresholds: &tp_pb.QThresholds{
QThreshold1: defaultQThreshold1,
QThreshold2: defaultQThreshold2,
QThreshold3: defaultQThreshold3,
@@ -1002,454 +1068,432 @@
QThreshold5: defaultQThreshold5,
QThreshold6: defaultQThreshold6,
QThreshold7: defaultQThreshold7},
- SchedulingPolicy: SchedulingPolicy_name[defaultSchedulePolicy],
- PriorityQueue: defaultPriorityQueue,
+ SchedulingPolicy: tp_pb.SchedulingPolicy_WRR,
+ PriorityQ: defaultPriorityQueue,
Weight: defaultQueueWeight,
- DiscardPolicy: DiscardPolicy_name[defaultdropPolicy],
- DiscardConfig: DiscardConfig{
+ DiscardPolicy: tp_pb.DiscardPolicy_TailDrop,
+ DiscardConfigV2: &tp_pb.DiscardConfig{
+ DiscardPolicy: tp_pb.DiscardPolicy_Red,
+ DiscardConfig: &tp_pb.DiscardConfig_RedDiscardConfig{
+ RedDiscardConfig: &tp_pb.RedDiscardConfig{
+ MinThreshold: defaultMinThreshold,
+ MaxThreshold: defaultMaxThreshold,
+ MaxProbability: defaultMaxProbability,
+ },
+ },
+ },
+ DiscardConfig: &tp_pb.RedDiscardConfig{
MinThreshold: defaultMinThreshold,
MaxThreshold: defaultMaxThreshold,
- MaxProbability: defaultMaxProbability}})
+ MaxProbability: defaultMaxProbability,
+ }})
dsQueueAttributeList = append(dsQueueAttributeList,
- DownstreamQueueAttribute{
- MaxQueueSize: defaultMaxQueueSize,
+ &tp_pb.EPONQueueAttributes{
+ MaxQSize: defaultMaxQueueSize,
PbitMap: pbit,
AesEncryption: defaultAESEncryption,
- SchedulingPolicy: SchedulingPolicy_name[defaultSchedulePolicy],
- PriorityQueue: defaultPriorityQueue,
+ SchedulingPolicy: tp_pb.SchedulingPolicy_WRR,
+ PriorityQ: defaultPriorityQueue,
Weight: defaultQueueWeight,
- DiscardPolicy: DiscardPolicy_name[defaultdropPolicy],
- DiscardConfig: DiscardConfig{
+ DiscardPolicy: tp_pb.DiscardPolicy_TailDrop,
+ DiscardConfigV2: &tp_pb.DiscardConfig{
+ DiscardPolicy: tp_pb.DiscardPolicy_Red,
+ DiscardConfig: &tp_pb.DiscardConfig_RedDiscardConfig{
+ RedDiscardConfig: &tp_pb.RedDiscardConfig{
+ MinThreshold: defaultMinThreshold,
+ MaxThreshold: defaultMaxThreshold,
+ MaxProbability: defaultMaxProbability,
+ },
+ },
+ },
+ DiscardConfig: &tp_pb.RedDiscardConfig{
MinThreshold: defaultMinThreshold,
MaxThreshold: defaultMaxThreshold,
- MaxProbability: defaultMaxProbability}})
+ MaxProbability: defaultMaxProbability,
+ }})
}
- return &DefaultEponProfile{
+ return &tp_pb.EponTechProfile{
Name: t.config.DefaultTPName,
ProfileType: t.resourceMgr.GetTechnology(),
Version: t.config.TPVersion,
NumGemPorts: uint32(len(usQueueAttributeList)),
- InstanceCtrl: InstanceControl{
+ InstanceControl: &tp_pb.InstanceControl{
Onu: defaultOnuInstance,
Uni: defaultUniInstance,
MaxGemPayloadSize: defaultGemPayloadSize},
- EponAttribute: EponAttribute{
- PackageType: defaultPakageType},
+ PackageType: defaultPakageType,
UpstreamQueueAttributeList: usQueueAttributeList,
DownstreamQueueAttributeList: dsQueueAttributeList}
}
-func (t *TechProfileMgr) GetprotoBufParamValue(ctx context.Context, paramType string, paramKey string) int32 {
- var result int32 = -1
-
- if paramType == "direction" {
- for key, val := range tp_pb.Direction_value {
- if key == paramKey {
- result = val
- }
- }
- } else if paramType == "discard_policy" {
- for key, val := range tp_pb.DiscardPolicy_value {
- if key == paramKey {
- result = val
- }
- }
- } else if paramType == "sched_policy" {
- for key, val := range tp_pb.SchedulingPolicy_value {
- if key == paramKey {
- logger.Debugw(ctx, "Got value in proto", log.Fields{"key": key, "value": val})
- result = val
- }
- }
- } else if paramType == "additional_bw" {
- for key, val := range tp_pb.AdditionalBW_value {
- if key == paramKey {
- result = val
- }
- }
- } else {
- logger.Error(ctx, "Could not find proto parameter", log.Fields{"paramType": paramType, "key": paramKey})
- return -1
- }
- logger.Debugw(ctx, "Got value in proto", log.Fields{"key": paramKey, "value": result})
- return result
-}
-
-func (t *TechProfileMgr) GetUsScheduler(ctx context.Context, tpInstance *TechProfile) (*tp_pb.SchedulerConfig, error) {
- dir := tp_pb.Direction(t.GetprotoBufParamValue(ctx, "direction", tpInstance.UsScheduler.Direction))
- if dir == -1 {
- logger.Errorf(ctx, "Error in getting proto id for direction %s for upstream scheduler", tpInstance.UsScheduler.Direction)
- return nil, fmt.Errorf("unable to get proto id for direction %s for upstream scheduler", tpInstance.UsScheduler.Direction)
- }
-
- bw := tp_pb.AdditionalBW(t.GetprotoBufParamValue(ctx, "additional_bw", tpInstance.UsScheduler.AdditionalBw))
- if bw == -1 {
- logger.Errorf(ctx, "Error in getting proto id for bandwidth %s for upstream scheduler", tpInstance.UsScheduler.AdditionalBw)
- return nil, fmt.Errorf("unable to get proto id for bandwidth %s for upstream scheduler", tpInstance.UsScheduler.AdditionalBw)
- }
-
- policy := tp_pb.SchedulingPolicy(t.GetprotoBufParamValue(ctx, "sched_policy", tpInstance.UsScheduler.QSchedPolicy))
- if policy == -1 {
- logger.Errorf(ctx, "Error in getting proto id for scheduling policy %s for upstream scheduler", tpInstance.UsScheduler.QSchedPolicy)
- return nil, fmt.Errorf("unable to get proto id for scheduling policy %s for upstream scheduler", tpInstance.UsScheduler.QSchedPolicy)
- }
-
- return &tp_pb.SchedulerConfig{
- Direction: dir,
- AdditionalBw: bw,
- Priority: tpInstance.UsScheduler.Priority,
- Weight: tpInstance.UsScheduler.Weight,
- SchedPolicy: policy}, nil
-}
-
-func (t *TechProfileMgr) GetDsScheduler(ctx context.Context, tpInstance *TechProfile) (*tp_pb.SchedulerConfig, error) {
-
- dir := tp_pb.Direction(t.GetprotoBufParamValue(ctx, "direction", tpInstance.DsScheduler.Direction))
- if dir == -1 {
- logger.Errorf(ctx, "Error in getting proto id for direction %s for downstream scheduler", tpInstance.DsScheduler.Direction)
- return nil, fmt.Errorf("unable to get proto id for direction %s for downstream scheduler", tpInstance.DsScheduler.Direction)
- }
-
- bw := tp_pb.AdditionalBW(t.GetprotoBufParamValue(ctx, "additional_bw", tpInstance.DsScheduler.AdditionalBw))
- if bw == -1 {
- logger.Errorf(ctx, "Error in getting proto id for bandwidth %s for downstream scheduler", tpInstance.DsScheduler.AdditionalBw)
- return nil, fmt.Errorf("unable to get proto id for bandwidth %s for downstream scheduler", tpInstance.DsScheduler.AdditionalBw)
- }
-
- policy := tp_pb.SchedulingPolicy(t.GetprotoBufParamValue(ctx, "sched_policy", tpInstance.DsScheduler.QSchedPolicy))
- if policy == -1 {
- logger.Errorf(ctx, "Error in getting proto id for scheduling policy %s for downstream scheduler", tpInstance.DsScheduler.QSchedPolicy)
- return nil, fmt.Errorf("unable to get proto id for scheduling policy %s for downstream scheduler", tpInstance.DsScheduler.QSchedPolicy)
- }
-
- return &tp_pb.SchedulerConfig{
- Direction: dir,
- AdditionalBw: bw,
- Priority: tpInstance.DsScheduler.Priority,
- Weight: tpInstance.DsScheduler.Weight,
- SchedPolicy: policy}, nil
-}
-
-func (t *TechProfileMgr) GetTrafficScheduler(tpInstance *TechProfile, SchedCfg *tp_pb.SchedulerConfig,
- ShapingCfg *tp_pb.TrafficShapingInfo) *tp_pb.TrafficScheduler {
-
- tSched := &tp_pb.TrafficScheduler{
- Direction: SchedCfg.Direction,
- AllocId: tpInstance.UsScheduler.AllocID,
- TrafficShapingInfo: ShapingCfg,
- Scheduler: SchedCfg}
-
- return tSched
-}
-
-func (tpm *TechProfileMgr) GetTrafficQueues(ctx context.Context, tp *TechProfile, Dir tp_pb.Direction) ([]*tp_pb.TrafficQueue, error) {
-
- var encryp bool
- if Dir == tp_pb.Direction_UPSTREAM {
- // upstream GEM ports
- NumGemPorts := len(tp.UpstreamGemPortAttributeList)
- GemPorts := make([]*tp_pb.TrafficQueue, 0)
- for Count := 0; Count < NumGemPorts; Count++ {
- if tp.UpstreamGemPortAttributeList[Count].AesEncryption == "True" {
- encryp = true
- } else {
- encryp = false
- }
-
- schedPolicy := tpm.GetprotoBufParamValue(ctx, "sched_policy", tp.UpstreamGemPortAttributeList[Count].SchedulingPolicy)
- if schedPolicy == -1 {
- logger.Errorf(ctx, "Error in getting Proto Id for scheduling policy %s for Upstream Gem Port %d", tp.UpstreamGemPortAttributeList[Count].SchedulingPolicy, Count)
- return nil, fmt.Errorf("upstream gem port traffic queue creation failed due to unrecognized scheduling policy %s", tp.UpstreamGemPortAttributeList[Count].SchedulingPolicy)
- }
-
- discardPolicy := tpm.GetprotoBufParamValue(ctx, "discard_policy", tp.UpstreamGemPortAttributeList[Count].DiscardPolicy)
- if discardPolicy == -1 {
- logger.Errorf(ctx, "Error in getting Proto Id for discard policy %s for Upstream Gem Port %d", tp.UpstreamGemPortAttributeList[Count].DiscardPolicy, Count)
- return nil, fmt.Errorf("upstream gem port traffic queue creation failed due to unrecognized discard policy %s", tp.UpstreamGemPortAttributeList[Count].DiscardPolicy)
- }
-
- GemPorts = append(GemPorts, &tp_pb.TrafficQueue{
- Direction: tp_pb.Direction(tpm.GetprotoBufParamValue(ctx, "direction", tp.UsScheduler.Direction)),
- GemportId: tp.UpstreamGemPortAttributeList[Count].GemportID,
- PbitMap: tp.UpstreamGemPortAttributeList[Count].PbitMap,
- AesEncryption: encryp,
- SchedPolicy: tp_pb.SchedulingPolicy(schedPolicy),
- Priority: tp.UpstreamGemPortAttributeList[Count].PriorityQueue,
- Weight: tp.UpstreamGemPortAttributeList[Count].Weight,
- DiscardPolicy: tp_pb.DiscardPolicy(discardPolicy),
- })
- }
- logger.Debugw(ctx, "Upstream Traffic queue list ", log.Fields{"queuelist": GemPorts})
- return GemPorts, nil
- } else if Dir == tp_pb.Direction_DOWNSTREAM {
- //downstream GEM ports
- NumGemPorts := len(tp.DownstreamGemPortAttributeList)
- GemPorts := make([]*tp_pb.TrafficQueue, 0)
- for Count := 0; Count < NumGemPorts; Count++ {
- if isMulticastGem(tp.DownstreamGemPortAttributeList[Count].IsMulticast) {
- //do not take multicast GEM ports. They are handled separately.
- continue
- }
- if tp.DownstreamGemPortAttributeList[Count].AesEncryption == "True" {
- encryp = true
- } else {
- encryp = false
- }
-
- schedPolicy := tpm.GetprotoBufParamValue(ctx, "sched_policy", tp.DownstreamGemPortAttributeList[Count].SchedulingPolicy)
- if schedPolicy == -1 {
- logger.Errorf(ctx, "Error in getting Proto Id for scheduling policy %s for Downstream Gem Port %d", tp.DownstreamGemPortAttributeList[Count].SchedulingPolicy, Count)
- return nil, fmt.Errorf("downstream gem port traffic queue creation failed due to unrecognized scheduling policy %s", tp.DownstreamGemPortAttributeList[Count].SchedulingPolicy)
- }
-
- discardPolicy := tpm.GetprotoBufParamValue(ctx, "discard_policy", tp.DownstreamGemPortAttributeList[Count].DiscardPolicy)
- if discardPolicy == -1 {
- logger.Errorf(ctx, "Error in getting Proto Id for discard policy %s for Downstream Gem Port %d", tp.DownstreamGemPortAttributeList[Count].DiscardPolicy, Count)
- return nil, fmt.Errorf("downstream gem port traffic queue creation failed due to unrecognized discard policy %s", tp.DownstreamGemPortAttributeList[Count].DiscardPolicy)
- }
-
- GemPorts = append(GemPorts, &tp_pb.TrafficQueue{
- Direction: tp_pb.Direction(tpm.GetprotoBufParamValue(ctx, "direction", tp.DsScheduler.Direction)),
- GemportId: tp.DownstreamGemPortAttributeList[Count].GemportID,
- PbitMap: tp.DownstreamGemPortAttributeList[Count].PbitMap,
- AesEncryption: encryp,
- SchedPolicy: tp_pb.SchedulingPolicy(schedPolicy),
- Priority: tp.DownstreamGemPortAttributeList[Count].PriorityQueue,
- Weight: tp.DownstreamGemPortAttributeList[Count].Weight,
- DiscardPolicy: tp_pb.DiscardPolicy(discardPolicy),
- })
- }
- logger.Debugw(ctx, "Downstream Traffic queue list ", log.Fields{"queuelist": GemPorts})
- return GemPorts, nil
- }
-
- logger.Errorf(ctx, "Unsupported direction %s used for generating Traffic Queue list", Dir)
- return nil, fmt.Errorf("downstream gem port traffic queue creation failed due to unsupported direction %s", Dir)
-}
-
//isMulticastGem returns true if isMulticast attribute value of a GEM port is true; false otherwise
func isMulticastGem(isMulticastAttrValue string) bool {
return isMulticastAttrValue != "" &&
(isMulticastAttrValue == "True" || isMulticastAttrValue == "true" || isMulticastAttrValue == "TRUE")
}
-func (tpm *TechProfileMgr) GetMulticastTrafficQueues(ctx context.Context, tp *TechProfile) []*tp_pb.TrafficQueue {
- var encryp bool
- NumGemPorts := len(tp.DownstreamGemPortAttributeList)
- mcastTrafficQueues := make([]*tp_pb.TrafficQueue, 0)
- for Count := 0; Count < NumGemPorts; Count++ {
- if !isMulticastGem(tp.DownstreamGemPortAttributeList[Count].IsMulticast) {
- continue
- }
- if tp.DownstreamGemPortAttributeList[Count].AesEncryption == "True" {
- encryp = true
- } else {
- encryp = false
- }
- mcastTrafficQueues = append(mcastTrafficQueues, &tp_pb.TrafficQueue{
- Direction: tp_pb.Direction(tpm.GetprotoBufParamValue(ctx, "direction", tp.DsScheduler.Direction)),
- GemportId: tp.DownstreamGemPortAttributeList[Count].McastGemID,
- PbitMap: tp.DownstreamGemPortAttributeList[Count].PbitMap,
- AesEncryption: encryp,
- SchedPolicy: tp_pb.SchedulingPolicy(tpm.GetprotoBufParamValue(ctx, "sched_policy", tp.DownstreamGemPortAttributeList[Count].SchedulingPolicy)),
- Priority: tp.DownstreamGemPortAttributeList[Count].PriorityQueue,
- Weight: tp.DownstreamGemPortAttributeList[Count].Weight,
- DiscardPolicy: tp_pb.DiscardPolicy(tpm.GetprotoBufParamValue(ctx, "discard_policy", tp.DownstreamGemPortAttributeList[Count].DiscardPolicy)),
- })
- }
- logger.Debugw(ctx, "Downstream Multicast Traffic queue list ", log.Fields{"queuelist": mcastTrafficQueues})
- return mcastTrafficQueues
-}
-
-func (tpm *TechProfileMgr) GetUsTrafficScheduler(ctx context.Context, tp *TechProfile) *tp_pb.TrafficScheduler {
- UsScheduler, _ := tpm.GetUsScheduler(ctx, tp)
-
- return &tp_pb.TrafficScheduler{Direction: UsScheduler.Direction,
- AllocId: tp.UsScheduler.AllocID,
- Scheduler: UsScheduler}
-}
-
-func (t *TechProfileMgr) GetGemportForPbit(ctx context.Context, tp interface{}, dir tp_pb.Direction, pbit uint32) interface{} {
- /*
- Function to get the Gemport mapped to a pbit.
- */
- switch tp := tp.(type) {
- case *TechProfile:
- if dir == tp_pb.Direction_UPSTREAM {
- // upstream GEM ports
- numGemPorts := len(tp.UpstreamGemPortAttributeList)
- for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
- lenOfPbitMap := len(tp.UpstreamGemPortAttributeList[gemCnt].PbitMap)
- for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
- // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
- // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
- if p, err := strconv.Atoi(string(tp.UpstreamGemPortAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
- if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
- logger.Debugw(ctx, "Found-US-GEMport-for-Pcp", log.Fields{"pbit": pbit, "GEMport": tp.UpstreamGemPortAttributeList[gemCnt].GemportID})
- return tp.UpstreamGemPortAttributeList[gemCnt]
- }
- }
- }
- }
- } else if dir == tp_pb.Direction_DOWNSTREAM {
- //downstream GEM ports
- numGemPorts := len(tp.DownstreamGemPortAttributeList)
- for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
- lenOfPbitMap := len(tp.DownstreamGemPortAttributeList[gemCnt].PbitMap)
- for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
- // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
- // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
- if p, err := strconv.Atoi(string(tp.DownstreamGemPortAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
- if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
- logger.Debugw(ctx, "Found-DS-GEMport-for-Pcp", log.Fields{"pbit": pbit, "GEMport": tp.DownstreamGemPortAttributeList[gemCnt].GemportID})
- return tp.DownstreamGemPortAttributeList[gemCnt]
- }
- }
- }
- }
- }
- logger.Errorw(ctx, "No-GemportId-Found-For-Pcp", log.Fields{"pcpVlan": pbit})
- case *EponProfile:
- if dir == tp_pb.Direction_UPSTREAM {
- // upstream GEM ports
- numGemPorts := len(tp.UpstreamQueueAttributeList)
- for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
- lenOfPbitMap := len(tp.UpstreamQueueAttributeList[gemCnt].PbitMap)
- for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
- // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
- // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
- if p, err := strconv.Atoi(string(tp.UpstreamQueueAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
- if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
- logger.Debugw(ctx, "Found-US-Queue-for-Pcp", log.Fields{"pbit": pbit, "Queue": tp.UpstreamQueueAttributeList[gemCnt].GemportID})
- return tp.UpstreamQueueAttributeList[gemCnt]
- }
- }
- }
- }
- } else if dir == tp_pb.Direction_DOWNSTREAM {
- //downstream GEM ports
- numGemPorts := len(tp.DownstreamQueueAttributeList)
- for gemCnt := 0; gemCnt < numGemPorts; gemCnt++ {
- lenOfPbitMap := len(tp.DownstreamQueueAttributeList[gemCnt].PbitMap)
- for pbitMapIdx := 2; pbitMapIdx < lenOfPbitMap; pbitMapIdx++ {
- // Given a sample pbit map string "0b00000001", lenOfPbitMap is 10
- // "lenOfPbitMap - pbitMapIdx + 1" will give pbit-i th value from LSB position in the pbit map string
- if p, err := strconv.Atoi(string(tp.DownstreamQueueAttributeList[gemCnt].PbitMap[lenOfPbitMap-pbitMapIdx+1])); err == nil {
- if uint32(pbitMapIdx-2) == pbit && p == 1 { // Check this p-bit is set
- logger.Debugw(ctx, "Found-DS-Queue-for-Pcp", log.Fields{"pbit": pbit, "Queue": tp.DownstreamQueueAttributeList[gemCnt].GemportID})
- return tp.DownstreamQueueAttributeList[gemCnt]
- }
- }
- }
- }
- }
- logger.Errorw(ctx, "No-QueueId-Found-For-Pcp", log.Fields{"pcpVlan": pbit})
- default:
- logger.Errorw(ctx, "unknown-tech", log.Fields{"tp": tp})
- }
- return nil
-}
-
-// FindAllTpInstances returns all TechProfile instances for a given TechProfile table-id, pon interface ID and onu ID.
-func (t *TechProfileMgr) FindAllTpInstances(ctx context.Context, oltDeviceID string, tpID uint32, ponIntf uint32, onuID uint32) interface{} {
- var tpTech TechProfile
- var tpEpon EponProfile
-
- onuTpInstancePath := fmt.Sprintf("%s/%d/olt-{%s}/pon-{%d}/onu-{%d}", t.resourceMgr.GetTechnology(), tpID, oltDeviceID, ponIntf, onuID)
-
- if kvPairs, _ := t.config.KVBackend.List(ctx, onuTpInstancePath); kvPairs != nil {
- tech := t.resourceMgr.GetTechnology()
- tpInstancesTech := make([]TechProfile, 0, len(kvPairs))
- tpInstancesEpon := make([]EponProfile, 0, len(kvPairs))
-
- for kvPath, kvPair := range kvPairs {
- if value, err := kvstore.ToByte(kvPair.Value); err == nil {
- if tech == xgspon || tech == gpon {
- if err = json.Unmarshal(value, &tpTech); err != nil {
- logger.Errorw(ctx, "error-unmarshal-kv-pair", log.Fields{"kvPath": kvPath, "value": value})
- continue
- } else {
- tpInstancesTech = append(tpInstancesTech, tpTech)
- }
- } else if tech == epon {
- if err = json.Unmarshal(value, &tpEpon); err != nil {
- logger.Errorw(ctx, "error-unmarshal-kv-pair", log.Fields{"kvPath": kvPath, "value": value})
- continue
- } else {
- tpInstancesEpon = append(tpInstancesEpon, tpEpon)
- }
- }
- }
- }
-
- switch tech {
- case xgspon, gpon:
- return tpInstancesTech
- case epon:
- return tpInstancesEpon
- default:
- logger.Errorw(ctx, "unknown-technology", log.Fields{"tech": tech})
- return nil
- }
- }
- return nil
-}
-
-func (t *TechProfileMgr) GetResourceID(ctx context.Context, IntfID uint32, ResourceType string, NumIDs uint32) ([]uint32, error) {
- logger.Debugw(ctx, "getting-resource-id", log.Fields{
- "intf-id": IntfID,
- "resource-type": ResourceType,
- "num": NumIDs,
- })
- var err error
- var ids []uint32
- switch ResourceType {
- case t.resourceMgr.GetResourceTypeAllocID():
- t.AllocIDMgmtLock.Lock()
- ids, err = t.resourceMgr.GetResourceID(ctx, IntfID, ResourceType, NumIDs)
- t.AllocIDMgmtLock.Unlock()
- case t.resourceMgr.GetResourceTypeGemPortID():
- t.GemPortIDMgmtLock.Lock()
- ids, err = t.resourceMgr.GetResourceID(ctx, IntfID, ResourceType, NumIDs)
- t.GemPortIDMgmtLock.Unlock()
- case t.resourceMgr.GetResourceTypeOnuID():
- t.OnuIDMgmtLock.Lock()
- ids, err = t.resourceMgr.GetResourceID(ctx, IntfID, ResourceType, NumIDs)
- t.OnuIDMgmtLock.Unlock()
- default:
- return nil, fmt.Errorf("ResourceType %s not supported", ResourceType)
- }
+func (t *TechProfileMgr) addResourceInstanceToKVStore(ctx context.Context, tpID uint32, uniPortName string, resInst tp_pb.ResourceInstance) error {
+ logger.Debugw(ctx, "adding-resource-instance-to-kv-store", log.Fields{"tpID": tpID, "uniPortName": uniPortName, "resInst": resInst})
+ val, err := proto.Marshal(&resInst)
if err != nil {
- return nil, err
- }
- return ids, nil
-}
-
-func (t *TechProfileMgr) FreeResourceID(ctx context.Context, IntfID uint32, ResourceType string, ReleaseContent []uint32) error {
- logger.Debugw(ctx, "freeing-resource-id", log.Fields{
- "intf-id": IntfID,
- "resource-type": ResourceType,
- "release-content": ReleaseContent,
- })
- var err error
- switch ResourceType {
- case t.resourceMgr.GetResourceTypeAllocID():
- t.AllocIDMgmtLock.Lock()
- err = t.resourceMgr.FreeResourceID(ctx, IntfID, ResourceType, ReleaseContent)
- t.AllocIDMgmtLock.Unlock()
- case t.resourceMgr.GetResourceTypeGemPortID():
- t.GemPortIDMgmtLock.Lock()
- err = t.resourceMgr.FreeResourceID(ctx, IntfID, ResourceType, ReleaseContent)
- t.GemPortIDMgmtLock.Unlock()
- case t.resourceMgr.GetResourceTypeOnuID():
- t.OnuIDMgmtLock.Lock()
- err = t.resourceMgr.FreeResourceID(ctx, IntfID, ResourceType, ReleaseContent)
- t.OnuIDMgmtLock.Unlock()
- default:
- return fmt.Errorf("ResourceType %s not supported", ResourceType)
- }
- if err != nil {
+ logger.Errorw(ctx, "failed-to-marshall-resource-instance", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName, "resInst": resInst})
return err
}
+ err = t.config.ResourceInstanceKVBacked.Put(ctx, fmt.Sprintf("%s/%d/%s", t.resourceMgr.GetTechnology(), tpID, uniPortName), val)
+ return err
+}
+
+func (t *TechProfileMgr) removeResourceInstanceFromKVStore(ctx context.Context, tpID uint32, uniPortName string) error {
+ logger.Debugw(ctx, "removing-resource-instance-to-kv-store", log.Fields{"tpID": tpID, "uniPortName": uniPortName})
+ if err := t.config.ResourceInstanceKVBacked.Delete(ctx, fmt.Sprintf("%s/%d/%s", t.resourceMgr.GetTechnology(), tpID, uniPortName)); err != nil {
+ logger.Errorw(ctx, "error-removing-resource-instance-to-kv-store", log.Fields{"err": err, "tpID": tpID, "uniPortName": uniPortName})
+ return err
+ }
+ return nil
+}
+
+func (t *TechProfileMgr) getTPFromKVStore(ctx context.Context, tpID uint32) *tp_pb.TechProfile {
+ var tp *tp_pb.TechProfile
+ t.tpMapLock.RLock()
+ tp, ok := t.tpMap[tpID]
+ t.tpMapLock.RUnlock()
+ if ok {
+ logger.Debugw(ctx, "found-tp-in-cache", log.Fields{"tpID": tpID})
+ return tp
+ }
+ key := fmt.Sprintf(t.config.TPFileKVPath, t.resourceMgr.GetTechnology(), tpID)
+ logger.Debugw(ctx, "getting-tp-from-kv-store", log.Fields{"tpID": tpID, "Key": key})
+ kvresult, err := t.config.DefaultTpKVBackend.Get(ctx, key)
+ if err != nil {
+ logger.Errorw(ctx, "error-fetching-from-kv-store", log.Fields{"err": err, "key": key})
+ return nil
+ }
+ if kvresult != nil {
+ /* Backend will return Value in string format,needs to be converted to []byte before unmarshal*/
+ if value, err := kvstore.ToByte(kvresult.Value); err == nil {
+ lTp := &tp_pb.TechProfile{}
+ reader := bytes.NewReader(value)
+ if err = jsonpb.Unmarshal(reader, lTp); err != nil {
+ logger.Errorw(ctx, "error-unmarshalling-tp-from-kv-store", log.Fields{"err": err, "tpID": tpID, "error": err})
+ return nil
+ }
+
+ logger.Debugw(ctx, "success-fetched-tp-from-kv-store", log.Fields{"tpID": tpID, "value": *lTp})
+ return lTp
+ } else {
+ logger.Errorw(ctx, "error-decoding-tp", log.Fields{"err": err, "tpID": tpID})
+ // We we create a default profile in this case.
+ }
+ }
+
+ return nil
+}
+
+func (t *TechProfileMgr) getEponTPFromKVStore(ctx context.Context, tpID uint32) *tp_pb.EponTechProfile {
+ var eponTp *tp_pb.EponTechProfile
+ t.eponTpMapLock.RLock()
+ eponTp, ok := t.eponTpMap[tpID]
+ t.eponTpMapLock.RUnlock()
+ if ok {
+ logger.Debugw(ctx, "found-tp-in-cache", log.Fields{"tpID": tpID})
+ return eponTp
+ }
+ key := fmt.Sprintf(t.config.TPFileKVPath, t.resourceMgr.GetTechnology(), tpID)
+ logger.Debugw(ctx, "getting-epon-tp-from-kv-store", log.Fields{"tpID": tpID, "Key": key})
+ kvresult, err := t.config.DefaultTpKVBackend.Get(ctx, key)
+ if err != nil {
+ logger.Errorw(ctx, "error-fetching-from-kv-store", log.Fields{"err": err, "key": key})
+ return nil
+ }
+ if kvresult != nil {
+ /* Backend will return Value in string format,needs to be converted to []byte before unmarshal*/
+ if value, err := kvstore.ToByte(kvresult.Value); err == nil {
+ lEponTp := &tp_pb.EponTechProfile{}
+ reader := bytes.NewReader(value)
+ if err = jsonpb.Unmarshal(reader, lEponTp); err != nil {
+ logger.Errorw(ctx, "error-unmarshalling-epon-tp-from-kv-store", log.Fields{"err": err, "tpID": tpID, "error": err})
+ return nil
+ }
+
+ logger.Debugw(ctx, "success-fetching-epon-tp-from-kv-store", log.Fields{"tpID": tpID, "value": *lEponTp})
+ return lEponTp
+ }
+ }
+ return nil
+}
+
+func newKVClient(ctx context.Context, storeType string, address string, timeout time.Duration) (kvstore.Client, error) {
+
+ logger.Infow(ctx, "kv-store", log.Fields{"storeType": storeType, "address": address})
+ switch storeType {
+ case "etcd":
+ return kvstore.NewEtcdClient(ctx, address, timeout, log.WarnLevel)
+ }
+ return nil, errors.New("unsupported-kv-store")
+}
+
+// buildTpInstanceFromResourceInstance for GPON, XGPON and XGS-PON technology - build TpInstance from TechProfile template and ResourceInstance
+func (t *TechProfileMgr) buildTpInstanceFromResourceInstance(ctx context.Context, tp *tp_pb.TechProfile, resInst *tp_pb.ResourceInstance) *tp_pb.TechProfileInstance {
+
+ var usGemPortAttributeList []*tp_pb.GemPortAttributes
+ var dsGemPortAttributeList []*tp_pb.GemPortAttributes
+ var dsMulticastGemAttributeList []*tp_pb.GemPortAttributes
+ var dsUnicastGemAttributeList []*tp_pb.GemPortAttributes
+
+ if len(resInst.GemportIds) != int(tp.NumGemPorts) {
+ logger.Errorw(ctx, "mismatch-in-number-of-gemports-between-template-and-resource-instance",
+ log.Fields{"tpID": resInst.TpId, "totalResInstGemPortIDs": len(resInst.GemportIds), "totalTpTemplateGemPorts": tp.NumGemPorts})
+ return nil
+ }
+ for index := 0; index < int(tp.NumGemPorts); index++ {
+ usGemPortAttributeList = append(usGemPortAttributeList,
+ &tp_pb.GemPortAttributes{GemportId: resInst.GemportIds[index],
+ MaxQSize: tp.UpstreamGemPortAttributeList[index].MaxQSize,
+ PbitMap: tp.UpstreamGemPortAttributeList[index].PbitMap,
+ AesEncryption: tp.UpstreamGemPortAttributeList[index].AesEncryption,
+ SchedulingPolicy: tp.UpstreamGemPortAttributeList[index].SchedulingPolicy,
+ PriorityQ: tp.UpstreamGemPortAttributeList[index].PriorityQ,
+ Weight: tp.UpstreamGemPortAttributeList[index].Weight,
+ DiscardPolicy: tp.UpstreamGemPortAttributeList[index].DiscardPolicy,
+ DiscardConfig: tp.UpstreamGemPortAttributeList[index].DiscardConfig})
+ }
+
+ //put multicast and unicast downstream GEM port attributes in different lists first
+ for index := 0; index < len(tp.DownstreamGemPortAttributeList); index++ {
+ if isMulticastGem(tp.DownstreamGemPortAttributeList[index].IsMulticast) {
+ dsMulticastGemAttributeList = append(dsMulticastGemAttributeList,
+ &tp_pb.GemPortAttributes{
+ MulticastGemId: tp.DownstreamGemPortAttributeList[index].MulticastGemId,
+ MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
+ PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
+ AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
+ SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
+ PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
+ Weight: tp.DownstreamGemPortAttributeList[index].Weight,
+ DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
+ DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig,
+ IsMulticast: tp.DownstreamGemPortAttributeList[index].IsMulticast,
+ DynamicAccessControlList: tp.DownstreamGemPortAttributeList[index].DynamicAccessControlList,
+ StaticAccessControlList: tp.DownstreamGemPortAttributeList[index].StaticAccessControlList})
+ } else {
+ dsUnicastGemAttributeList = append(dsUnicastGemAttributeList,
+ &tp_pb.GemPortAttributes{
+ MaxQSize: tp.DownstreamGemPortAttributeList[index].MaxQSize,
+ PbitMap: tp.DownstreamGemPortAttributeList[index].PbitMap,
+ AesEncryption: tp.DownstreamGemPortAttributeList[index].AesEncryption,
+ SchedulingPolicy: tp.DownstreamGemPortAttributeList[index].SchedulingPolicy,
+ PriorityQ: tp.DownstreamGemPortAttributeList[index].PriorityQ,
+ Weight: tp.DownstreamGemPortAttributeList[index].Weight,
+ DiscardPolicy: tp.DownstreamGemPortAttributeList[index].DiscardPolicy,
+ DiscardConfig: tp.DownstreamGemPortAttributeList[index].DiscardConfig})
+ }
+ }
+ //add unicast downstream GEM ports to dsGemPortAttributeList
+ if dsUnicastGemAttributeList != nil {
+ for index := 0; index < int(tp.NumGemPorts); index++ {
+ dsGemPortAttributeList = append(dsGemPortAttributeList,
+ &tp_pb.GemPortAttributes{GemportId: resInst.GemportIds[index],
+ MaxQSize: dsUnicastGemAttributeList[index].MaxQSize,
+ PbitMap: dsUnicastGemAttributeList[index].PbitMap,
+ AesEncryption: dsUnicastGemAttributeList[index].AesEncryption,
+ SchedulingPolicy: dsUnicastGemAttributeList[index].SchedulingPolicy,
+ PriorityQ: dsUnicastGemAttributeList[index].PriorityQ,
+ Weight: dsUnicastGemAttributeList[index].Weight,
+ DiscardPolicy: dsUnicastGemAttributeList[index].DiscardPolicy,
+ DiscardConfig: dsUnicastGemAttributeList[index].DiscardConfig})
+ }
+ }
+ //add multicast GEM ports to dsGemPortAttributeList afterwards
+ for k := range dsMulticastGemAttributeList {
+ dsGemPortAttributeList = append(dsGemPortAttributeList, dsMulticastGemAttributeList[k])
+ }
+
+ return &tp_pb.TechProfileInstance{
+ SubscriberIdentifier: resInst.SubscriberIdentifier,
+ Name: tp.Name,
+ ProfileType: tp.ProfileType,
+ Version: tp.Version,
+ NumGemPorts: tp.NumGemPorts,
+ InstanceControl: tp.InstanceControl,
+ UsScheduler: &tp_pb.SchedulerAttributes{
+ AllocId: resInst.AllocId,
+ Direction: tp.UsScheduler.Direction,
+ AdditionalBw: tp.UsScheduler.AdditionalBw,
+ Priority: tp.UsScheduler.Priority,
+ Weight: tp.UsScheduler.Weight,
+ QSchedPolicy: tp.UsScheduler.QSchedPolicy},
+ DsScheduler: &tp_pb.SchedulerAttributes{
+ AllocId: resInst.AllocId,
+ Direction: tp.DsScheduler.Direction,
+ AdditionalBw: tp.DsScheduler.AdditionalBw,
+ Priority: tp.DsScheduler.Priority,
+ Weight: tp.DsScheduler.Weight,
+ QSchedPolicy: tp.DsScheduler.QSchedPolicy},
+ UpstreamGemPortAttributeList: usGemPortAttributeList,
+ DownstreamGemPortAttributeList: dsGemPortAttributeList}
+}
+
+// buildEponTpInstanceFromResourceInstance for EPON technology - build EponTpInstance from EponTechProfile template and ResourceInstance
+func (t *TechProfileMgr) buildEponTpInstanceFromResourceInstance(ctx context.Context, tp *tp_pb.EponTechProfile, resInst *tp_pb.ResourceInstance) *tp_pb.EponTechProfileInstance {
+
+ var usQueueAttributeList []*tp_pb.EPONQueueAttributes
+ var dsQueueAttributeList []*tp_pb.EPONQueueAttributes
+
+ if len(resInst.GemportIds) != int(tp.NumGemPorts) {
+ logger.Errorw(ctx, "mismatch-in-number-of-gemports-between-epon-tp-template-and-resource-instance",
+ log.Fields{"tpID": resInst.TpId, "totalResInstGemPortIDs": len(resInst.GemportIds), "totalTpTemplateGemPorts": tp.NumGemPorts})
+ return nil
+ }
+
+ for index := 0; index < int(tp.NumGemPorts); index++ {
+ usQueueAttributeList = append(usQueueAttributeList,
+ &tp_pb.EPONQueueAttributes{GemportId: resInst.GemportIds[index],
+ MaxQSize: tp.UpstreamQueueAttributeList[index].MaxQSize,
+ PbitMap: tp.UpstreamQueueAttributeList[index].PbitMap,
+ AesEncryption: tp.UpstreamQueueAttributeList[index].AesEncryption,
+ TrafficType: tp.UpstreamQueueAttributeList[index].TrafficType,
+ UnsolicitedGrantSize: tp.UpstreamQueueAttributeList[index].UnsolicitedGrantSize,
+ NominalInterval: tp.UpstreamQueueAttributeList[index].NominalInterval,
+ ToleratedPollJitter: tp.UpstreamQueueAttributeList[index].ToleratedPollJitter,
+ RequestTransmissionPolicy: tp.UpstreamQueueAttributeList[index].RequestTransmissionPolicy,
+ NumQSets: tp.UpstreamQueueAttributeList[index].NumQSets,
+ QThresholds: tp.UpstreamQueueAttributeList[index].QThresholds,
+ SchedulingPolicy: tp.UpstreamQueueAttributeList[index].SchedulingPolicy,
+ PriorityQ: tp.UpstreamQueueAttributeList[index].PriorityQ,
+ Weight: tp.UpstreamQueueAttributeList[index].Weight,
+ DiscardPolicy: tp.UpstreamQueueAttributeList[index].DiscardPolicy,
+ DiscardConfig: tp.UpstreamQueueAttributeList[index].DiscardConfig})
+ }
+
+ for index := 0; index < int(tp.NumGemPorts); index++ {
+ dsQueueAttributeList = append(dsQueueAttributeList,
+ &tp_pb.EPONQueueAttributes{GemportId: resInst.GemportIds[index],
+ MaxQSize: tp.DownstreamQueueAttributeList[index].MaxQSize,
+ PbitMap: tp.DownstreamQueueAttributeList[index].PbitMap,
+ AesEncryption: tp.DownstreamQueueAttributeList[index].AesEncryption,
+ SchedulingPolicy: tp.DownstreamQueueAttributeList[index].SchedulingPolicy,
+ PriorityQ: tp.DownstreamQueueAttributeList[index].PriorityQ,
+ Weight: tp.DownstreamQueueAttributeList[index].Weight,
+ DiscardPolicy: tp.DownstreamQueueAttributeList[index].DiscardPolicy,
+ DiscardConfig: tp.DownstreamQueueAttributeList[index].DiscardConfig})
+ }
+
+ return &tp_pb.EponTechProfileInstance{
+ SubscriberIdentifier: resInst.SubscriberIdentifier,
+ Name: tp.Name,
+ ProfileType: tp.ProfileType,
+ Version: tp.Version,
+ NumGemPorts: tp.NumGemPorts,
+ InstanceControl: tp.InstanceControl,
+ PackageType: tp.PackageType,
+ AllocId: resInst.AllocId,
+ UpstreamQueueAttributeList: usQueueAttributeList,
+ DownstreamQueueAttributeList: dsQueueAttributeList}
+}
+
+func (t *TechProfileMgr) getTpInstanceFromResourceInstance(ctx context.Context, resInst *tp_pb.ResourceInstance) *tp_pb.TechProfileInstance {
+ if resInst == nil {
+ logger.Error(ctx, "resource-instance-nil")
+ return nil
+ }
+ tp := t.getTPFromKVStore(ctx, resInst.TpId)
+ if tp == nil {
+ logger.Warnw(ctx, "tp-not-found-on-kv--creating-default-tp", log.Fields{"tpID": resInst.TpId})
+ tp = t.getDefaultTechProfile(ctx)
+ }
+ return t.buildTpInstanceFromResourceInstance(ctx, tp, resInst)
+}
+
+func (t *TechProfileMgr) getEponTpInstanceFromResourceInstance(ctx context.Context, resInst *tp_pb.ResourceInstance) *tp_pb.EponTechProfileInstance {
+ if resInst == nil {
+ logger.Error(ctx, "resource-instance-nil")
+ return nil
+ }
+ eponTp := t.getEponTPFromKVStore(ctx, resInst.TpId)
+ if eponTp == nil {
+ logger.Warnw(ctx, "tp-not-found-on-kv--creating-default-tp", log.Fields{"tpID": resInst.TpId})
+ eponTp = t.getDefaultEponProfile(ctx)
+ }
+ return t.buildEponTpInstanceFromResourceInstance(ctx, eponTp, resInst)
+}
+
+func (t *TechProfileMgr) reconcileTpInstancesToCache(ctx context.Context) error {
+
+ tech := t.resourceMgr.GetTechnology()
+ newCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
+ defer cancel()
+ kvPairs, _ := t.config.ResourceInstanceKVBacked.List(newCtx, tech)
+
+ if tech == xgspon || tech == xgpon || tech == gpon {
+ for keyPath, kvPair := range kvPairs {
+ logger.Debugw(ctx, "attempting-to-reconcile-tp-instance-from-resource-instance", log.Fields{"resourceInstPath": keyPath})
+ if value, err := kvstore.ToByte(kvPair.Value); err == nil {
+ var resInst tp_pb.ResourceInstance
+ if err = proto.Unmarshal(value, &resInst); err != nil {
+ logger.Errorw(ctx, "error-unmarshal-kv-pair", log.Fields{"err": err, "keyPath": keyPath, "value": value})
+ continue
+ } else {
+ if tpInst := t.getTpInstanceFromResourceInstance(ctx, &resInst); tpInst != nil {
+ // Trim the kv path by removing the default prefix part and get only the suffix part to reference the internal cache
+ keySuffixSlice := regexp.MustCompile(t.config.ResourceInstanceKVPathPrefix+"/").Split(keyPath, 2)
+ if len(keySuffixSlice) == 2 {
+ keySuffixFormatRegexp := regexp.MustCompile(`^[a-zA-Z\-]+/[0-9]+/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}$`)
+ // Make sure the keySuffixSlice is as per format [a-zA-Z-+]/[\d+]/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
+ if !keySuffixFormatRegexp.Match([]byte(keySuffixSlice[1])) {
+ logger.Errorw(ctx, "kv-path-not-confirming-to-format", log.Fields{"kvPath": keySuffixSlice[1]})
+ continue
+ }
+ } else {
+ logger.Errorw(ctx, "kv-instance-key-path-not-in-the-expected-format", log.Fields{"kvPath": keyPath})
+ continue
+ }
+ t.tpInstanceMapLock.Lock()
+ t.tpInstanceMap[keySuffixSlice[1]] = tpInst
+ t.tpInstanceMapLock.Unlock()
+ logger.Debugw(ctx, "reconciled-tp-success", log.Fields{"keyPath": keyPath})
+ }
+ }
+ } else {
+ logger.Errorw(ctx, "error-converting-kv-pair-value-to-byte", log.Fields{"err": err})
+ }
+ }
+ } else if tech == epon {
+ for keyPath, kvPair := range kvPairs {
+ logger.Debugw(ctx, "attempting-to-reconcile-epon-tp-instance", log.Fields{"keyPath": keyPath})
+ if value, err := kvstore.ToByte(kvPair.Value); err == nil {
+ var resInst tp_pb.ResourceInstance
+ if err = proto.Unmarshal(value, &resInst); err != nil {
+ logger.Errorw(ctx, "error-unmarshal-kv-pair", log.Fields{"keyPath": keyPath, "value": value})
+ continue
+ } else {
+ if eponTpInst := t.getEponTpInstanceFromResourceInstance(ctx, &resInst); eponTpInst != nil {
+ // Trim the kv path by removing the default prefix part and get only the suffix part to reference the internal cache
+ keySuffixSlice := regexp.MustCompile(t.config.ResourceInstanceKVPathPrefix+"/").Split(keyPath, 2)
+ if len(keySuffixSlice) == 2 {
+ keySuffixFormatRegexp := regexp.MustCompile(`^[a-zA-Z\-]+/[0-9]+/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}$`)
+ // Make sure the keySuffixSlice is as per format [a-zA-Z-+]/[\d+]/olt-{[a-z0-9\-]+}/pon-{[0-9]+}/onu-{[0-9]+}/uni-{[0-9]+}
+ if !keySuffixFormatRegexp.Match([]byte(keySuffixSlice[1])) {
+ logger.Errorw(ctx, "kv-path-not-confirming-to-format", log.Fields{"kvPath": keySuffixSlice[1]})
+ continue
+ }
+ } else {
+ logger.Errorw(ctx, "kv-instance-key-path-not-in-the-expected-format", log.Fields{"kvPath": keyPath})
+ continue
+ }
+ t.epontpInstanceMapLock.Lock()
+ t.eponTpInstanceMap[keySuffixSlice[1]] = eponTpInst
+ t.epontpInstanceMapLock.Unlock()
+ logger.Debugw(ctx, "reconciled-epon-tp-success", log.Fields{"keyPath": keyPath})
+ }
+ }
+ } else {
+ logger.Errorw(ctx, "error-converting-kv-pair-value-to-byte", log.Fields{"err": err})
+ }
+ }
+ } else {
+ logger.Errorw(ctx, "unknown-tech", log.Fields{"tech": tech})
+ return fmt.Errorf("unknown-tech-%v", tech)
+ }
+
return nil
}
diff --git a/pkg/techprofile/tech_profile_if.go b/pkg/techprofile/tech_profile_if.go
index 9aa3cbe..5622345 100644
--- a/pkg/techprofile/tech_profile_if.go
+++ b/pkg/techprofile/tech_profile_if.go
@@ -18,24 +18,21 @@
import (
"context"
-
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db"
tp_pb "github.com/opencord/voltha-protos/v4/go/tech_profile"
)
type TechProfileIf interface {
SetKVClient(ctx context.Context, pathPrefix string) *db.Backend
- GetTechProfileInstanceKVPath(ctx context.Context, techProfiletblID uint32, uniPortName string) string
- GetTPInstanceFromKVStore(ctx context.Context, techProfiletblID uint32, path string) (interface{}, error)
- CreateTechProfInstance(ctx context.Context, techProfiletblID uint32, uniPortName string, intfId uint32) (interface{}, error)
- DeleteTechProfileInstance(ctx context.Context, techProfiletblID uint32, uniPortName string) error
- GetprotoBufParamValue(ctx context.Context, paramType string, paramKey string) int32
- GetUsScheduler(ctx context.Context, tpInstance *TechProfile) (*tp_pb.SchedulerConfig, error)
- GetDsScheduler(ctx context.Context, tpInstance *TechProfile) (*tp_pb.SchedulerConfig, error)
- GetTrafficScheduler(tpInstance *TechProfile, SchedCfg *tp_pb.SchedulerConfig,
- ShapingCfg *tp_pb.TrafficShapingInfo) *tp_pb.TrafficScheduler
- GetTrafficQueues(ctx context.Context, tp *TechProfile, Dir tp_pb.Direction) ([]*tp_pb.TrafficQueue, error)
- GetMulticastTrafficQueues(ctx context.Context, tp *TechProfile) []*tp_pb.TrafficQueue
+ GetTechProfileInstanceKey(ctx context.Context, tpID uint32, uniPortName string) string
+ GetTPInstance(ctx context.Context, path string) (interface{}, error)
+ CreateTechProfileInstance(ctx context.Context, tpID uint32, uniPortName string, intfID uint32) (interface{}, error)
+ DeleteTechProfileInstance(ctx context.Context, tpID uint32, uniPortName string) error
+ GetUsScheduler(tpInstance *tp_pb.TechProfileInstance) *tp_pb.SchedulerConfig
+ GetDsScheduler(tpInstance *tp_pb.TechProfileInstance) *tp_pb.SchedulerConfig
+ GetTrafficScheduler(tpInstance *tp_pb.TechProfileInstance, SchedCfg *tp_pb.SchedulerConfig, ShapingCfg *tp_pb.TrafficShapingInfo) *tp_pb.TrafficScheduler
+ GetTrafficQueues(ctx context.Context, tp *tp_pb.TechProfileInstance, Dir tp_pb.Direction) ([]*tp_pb.TrafficQueue, error)
+ GetMulticastTrafficQueues(ctx context.Context, tp *tp_pb.TechProfileInstance) []*tp_pb.TrafficQueue
GetGemportForPbit(ctx context.Context, tp interface{}, Dir tp_pb.Direction, pbit uint32) interface{}
FindAllTpInstances(ctx context.Context, oltDeviceID string, tpID uint32, ponIntf uint32, onuID uint32) interface{}
GetResourceID(ctx context.Context, IntfID uint32, ResourceType string, NumIDs uint32) ([]uint32, error)