VOL-4154: Changes to techprofile module for etcd storage improvements.
- using protobuf definitions of techprofile template and instance
- store smaller footprint resource instance on the kv store
- store techprofile instance in cache
- reconcile techprofile instance from resource instance on adapter restart
- retry etcd get/put/delete on failure
- remove dealing of onu-gem-info data from PONResourceManager module
as adapter has to deal with this.
Change-Id: I741181e3f0dc5c4a419ffbed577eb4d21b73c4d6
diff --git a/pkg/adapters/common/adapter_proxy.go b/pkg/adapters/common/adapter_proxy.go
index 9ade0d1..fc31041 100644
--- a/pkg/adapters/common/adapter_proxy.go
+++ b/pkg/adapters/common/adapter_proxy.go
@@ -17,14 +17,15 @@
import (
"context"
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db"
+ "google.golang.org/grpc/status"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"github.com/google/uuid"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
)
@@ -103,3 +104,62 @@
logger.Debugw(ctx, "inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
return unPackResponse(ctx, rpc, "", success, result)
}
+
+func (ap *AdapterProxy) TechProfileInstanceRequest(ctx context.Context,
+ tpPath string,
+ parentPonPort uint32,
+ onuID uint32,
+ uniID uint32,
+ fromAdapter string,
+ toAdapter string,
+ toDeviceId string,
+ proxyDeviceId string) (*ic.InterAdapterTechProfileDownloadMessage, error) {
+ logger.Debugw(ctx, "sending-tech-profile-instance-request-message", log.Fields{"from": fromAdapter,
+ "to": toAdapter, "toDevice": toDeviceId, "proxyDevice": proxyDeviceId})
+
+ // Set up the required rpc arguments
+ endpoint, err := ap.endpointMgr.GetEndpoint(ctx, toDeviceId, toAdapter)
+ if err != nil {
+ return nil, err
+ }
+
+ //Build the inter adapter message
+ tpReqMsg := &ic.InterAdapterTechProfileInstanceRequestMessage{
+ TpInstancePath: tpPath,
+ ParentDeviceId: toDeviceId,
+ ParentPonPort: parentPonPort,
+ OnuId: onuID,
+ UniId: uniID,
+ }
+
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "msg",
+ Value: tpReqMsg,
+ }
+
+ topic := kafka.Topic{Name: string(endpoint)}
+ replyToTopic := kafka.Topic{Name: fromAdapter}
+ rpc := "process_tech_profile_instance_request"
+
+ ctx = context.WithValue(ctx, "inter-adapter-tp-req-msg", tpPath)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
+ logger.Debugw(ctx, "inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
+ if success {
+ tpDwnldMsg := &ic.InterAdapterTechProfileDownloadMessage{}
+ if err := ptypes.UnmarshalAny(result, tpDwnldMsg); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, err
+ }
+ return tpDwnldMsg, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw(ctx, "TechProfileInstanceRequest-return", log.Fields{"tpPath": tpPath, "success": success, "error": err})
+
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
+ }
+}
diff --git a/pkg/adapters/common/adapter_proxy_test.go b/pkg/adapters/common/adapter_proxy_test.go
index c73cb0f..2ca87a5 100644
--- a/pkg/adapters/common/adapter_proxy_test.go
+++ b/pkg/adapters/common/adapter_proxy_test.go
@@ -18,9 +18,9 @@
import (
"context"
- "github.com/opencord/voltha-lib-go/v4/pkg/db"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
- mocks "github.com/opencord/voltha-lib-go/v4/pkg/mocks/kafka"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
+ mocks "github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
"github.com/opencord/voltha-protos/v4/go/voltha"
"github.com/phayes/freeport"
@@ -75,7 +75,7 @@
adapter.endpointMgr = mocks.NewEndpointManager()
- delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: 1, TpPath: "tpPath", GemPortId: 2}
+ delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: 1, TpInstancePath: "tpPath", GemPortId: 2}
err := adapter.SendInterAdapterMessage(context.TODO(), delGemPortMsg, ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST, "Adapter1", "Adapter2", "testDeviceId", "testProxyDeviceId", "testMessage")
@@ -119,7 +119,7 @@
adapter.endpointMgr = mocks.NewEndpointManager()
- delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: 1, TpPath: "tpPath", GemPortId: 2}
+ delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: 1, TpInstancePath: "tpPath", GemPortId: 2}
err := adapter.SendInterAdapterMessage(context.TODO(), delGemPortMsg, ic.InterAdapterMessageType_DELETE_GEM_PORT_REQUEST, "Adapter1", "Adapter2", "testDeviceId", "testProxyDeviceId", "")
call := mockKafkaIcProxy.InvokeRpcSpy.Calls[1]
diff --git a/pkg/adapters/common/common.go b/pkg/adapters/common/common.go
index 5d7d7f8..98085bb 100644
--- a/pkg/adapters/common/common.go
+++ b/pkg/adapters/common/common.go
@@ -16,7 +16,7 @@
package common
import (
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
)
var logger log.CLogger
diff --git a/pkg/adapters/common/core_proxy.go b/pkg/adapters/common/core_proxy.go
index 1077226..589d951 100644
--- a/pkg/adapters/common/core_proxy.go
+++ b/pkg/adapters/common/core_proxy.go
@@ -21,8 +21,8 @@
"github.com/golang/protobuf/ptypes"
a "github.com/golang/protobuf/ptypes/any"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
"github.com/opencord/voltha-protos/v4/go/voltha"
"google.golang.org/grpc/codes"
diff --git a/pkg/adapters/common/core_proxy_test.go b/pkg/adapters/common/core_proxy_test.go
index 97a88f5..890b885 100644
--- a/pkg/adapters/common/core_proxy_test.go
+++ b/pkg/adapters/common/core_proxy_test.go
@@ -19,9 +19,9 @@
"context"
"testing"
- adapterIf "github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
- mocks "github.com/opencord/voltha-lib-go/v4/pkg/mocks/kafka"
+ adapterIf "github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
+ mocks "github.com/opencord/voltha-lib-go/v5/pkg/mocks/kafka"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
"github.com/opencord/voltha-protos/v4/go/voltha"
"github.com/stretchr/testify/assert"
diff --git a/pkg/adapters/common/request_handler.go b/pkg/adapters/common/request_handler.go
index b6cf1c0..90f575b 100644
--- a/pkg/adapters/common/request_handler.go
+++ b/pkg/adapters/common/request_handler.go
@@ -21,10 +21,10 @@
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
- "github.com/opencord/voltha-lib-go/v4/pkg/adapters"
- "github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/adapters"
+ "github.com/opencord/voltha-lib-go/v5/pkg/adapters/adapterif"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
"github.com/opencord/voltha-protos/v4/go/extension"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
"github.com/opencord/voltha-protos/v4/go/openflow_13"
@@ -559,6 +559,37 @@
return new(empty.Empty), nil
}
+func (rhp *RequestHandlerProxy) Process_tech_profile_instance_request(ctx context.Context, args []*ic.Argument) (*ic.InterAdapterTechProfileDownloadMessage, error) {
+ if len(args) < 2 {
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ iaTpReqMsg := &ic.InterAdapterTechProfileInstanceRequestMessage{}
+ transactionID := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "msg":
+ if err := ptypes.UnmarshalAny(arg.Value, iaTpReqMsg); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+
+ logger.Debugw(ctx, "Process_tech_profile_instance_request", log.Fields{"tpPath": iaTpReqMsg.TpInstancePath})
+
+ //Invoke the tech profile instance request
+ tpInst := rhp.adapter.Process_tech_profile_instance_request(ctx, iaTpReqMsg)
+
+ return tpInst, nil
+}
+
func (rhp *RequestHandlerProxy) Download_image(ctx context.Context, args []*ic.Argument) (*voltha.ImageDownload, error) {
device, image, err := unMarshalImageDowload(args, ctx)
if err != nil {
diff --git a/pkg/adapters/common/utils.go b/pkg/adapters/common/utils.go
index 65b432c..35f227e 100644
--- a/pkg/adapters/common/utils.go
+++ b/pkg/adapters/common/utils.go
@@ -18,7 +18,7 @@
import (
"context"
"fmt"
- "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
"google.golang.org/grpc/codes"
"math/rand"