VOL-4077: Improve storage usage on etcd
- Do away with unnecessary data storage on etcd if it can be
reconciled on adapter restart
- For data that needs storage, use lesser footprint if possible
- Use write-through-cache for all data stored on etcd via
resource manager module
- Use ResourceManager module per interface to localize lock
contention per PON port
Change-Id: I21d38216fab195d738a446b3f96a00251569e38b
diff --git a/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/common/adapter_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/common/adapter_proxy.go
new file mode 100644
index 0000000..fc31041
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v5/pkg/adapters/common/adapter_proxy.go
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package common
+
+import (
+ "context"
+ "github.com/opencord/voltha-lib-go/v5/pkg/db"
+ "google.golang.org/grpc/status"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/golang/protobuf/ptypes/any"
+ "github.com/google/uuid"
+ "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v5/pkg/log"
+ ic "github.com/opencord/voltha-protos/v4/go/inter_container"
+)
+
+type AdapterProxy struct {
+ kafkaICProxy kafka.InterContainerProxy
+ coreTopic string
+ endpointMgr kafka.EndpointManager
+}
+
+func NewAdapterProxy(ctx context.Context, kafkaProxy kafka.InterContainerProxy, coreTopic string, backend *db.Backend) *AdapterProxy {
+ proxy := AdapterProxy{
+ kafkaICProxy: kafkaProxy,
+ coreTopic: coreTopic,
+ endpointMgr: kafka.NewEndpointManager(backend),
+ }
+ logger.Debugw(ctx, "topics", log.Fields{"core": proxy.coreTopic})
+ return &proxy
+}
+
+func (ap *AdapterProxy) SendInterAdapterMessage(ctx context.Context,
+ msg proto.Message,
+ msgType ic.InterAdapterMessageType_Types,
+ fromAdapter string,
+ toAdapter string,
+ toDeviceId string,
+ proxyDeviceId string,
+ messageId string) error {
+ logger.Debugw(ctx, "sending-inter-adapter-message", log.Fields{"type": msgType, "from": fromAdapter,
+ "to": toAdapter, "toDevice": toDeviceId, "proxyDevice": proxyDeviceId})
+
+ //Marshal the message
+ var marshalledMsg *any.Any
+ var err error
+ if marshalledMsg, err = ptypes.MarshalAny(msg); err != nil {
+ logger.Warnw(ctx, "cannot-marshal-msg", log.Fields{"error": err})
+ return err
+ }
+
+ // Set up the required rpc arguments
+ endpoint, err := ap.endpointMgr.GetEndpoint(ctx, toDeviceId, toAdapter)
+ if err != nil {
+ return err
+ }
+
+ //Build the inter adapter message
+ header := &ic.InterAdapterHeader{
+ Type: msgType,
+ FromTopic: fromAdapter,
+ ToTopic: string(endpoint),
+ ToDeviceId: toDeviceId,
+ ProxyDeviceId: proxyDeviceId,
+ }
+ if messageId != "" {
+ header.Id = messageId
+ } else {
+ header.Id = uuid.New().String()
+ }
+ header.Timestamp = ptypes.TimestampNow()
+ iaMsg := &ic.InterAdapterMessage{
+ Header: header,
+ Body: marshalledMsg,
+ }
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "msg",
+ Value: iaMsg,
+ }
+
+ topic := kafka.Topic{Name: string(endpoint)}
+ replyToTopic := kafka.Topic{Name: fromAdapter}
+ rpc := "process_inter_adapter_message"
+
+ // Add a indication in context to differentiate this Inter Adapter message during Span processing in Kafka IC proxy
+ ctx = context.WithValue(ctx, "inter-adapter-msg-type", msgType)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
+ logger.Debugw(ctx, "inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
+ return unPackResponse(ctx, rpc, "", success, result)
+}
+
+func (ap *AdapterProxy) TechProfileInstanceRequest(ctx context.Context,
+ tpPath string,
+ parentPonPort uint32,
+ onuID uint32,
+ uniID uint32,
+ fromAdapter string,
+ toAdapter string,
+ toDeviceId string,
+ proxyDeviceId string) (*ic.InterAdapterTechProfileDownloadMessage, error) {
+ logger.Debugw(ctx, "sending-tech-profile-instance-request-message", log.Fields{"from": fromAdapter,
+ "to": toAdapter, "toDevice": toDeviceId, "proxyDevice": proxyDeviceId})
+
+ // Set up the required rpc arguments
+ endpoint, err := ap.endpointMgr.GetEndpoint(ctx, toDeviceId, toAdapter)
+ if err != nil {
+ return nil, err
+ }
+
+ //Build the inter adapter message
+ tpReqMsg := &ic.InterAdapterTechProfileInstanceRequestMessage{
+ TpInstancePath: tpPath,
+ ParentDeviceId: toDeviceId,
+ ParentPonPort: parentPonPort,
+ OnuId: onuID,
+ UniId: uniID,
+ }
+
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "msg",
+ Value: tpReqMsg,
+ }
+
+ topic := kafka.Topic{Name: string(endpoint)}
+ replyToTopic := kafka.Topic{Name: fromAdapter}
+ rpc := "process_tech_profile_instance_request"
+
+ ctx = context.WithValue(ctx, "inter-adapter-tp-req-msg", tpPath)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
+ logger.Debugw(ctx, "inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
+ if success {
+ tpDwnldMsg := &ic.InterAdapterTechProfileDownloadMessage{}
+ if err := ptypes.UnmarshalAny(result, tpDwnldMsg); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, err
+ }
+ return tpDwnldMsg, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw(ctx, "TechProfileInstanceRequest-return", log.Fields{"tpPath": tpPath, "success": success, "error": err})
+
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
+ }
+}