blob: fc310412f2fda47dc756cebecb5cf957bf4b26ab [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package common
import (
"context"
"github.com/opencord/voltha-lib-go/v5/pkg/db"
"google.golang.org/grpc/status"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"github.com/google/uuid"
"github.com/opencord/voltha-lib-go/v5/pkg/kafka"
"github.com/opencord/voltha-lib-go/v5/pkg/log"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
)
type AdapterProxy struct {
kafkaICProxy kafka.InterContainerProxy
coreTopic string
endpointMgr kafka.EndpointManager
}
func NewAdapterProxy(ctx context.Context, kafkaProxy kafka.InterContainerProxy, coreTopic string, backend *db.Backend) *AdapterProxy {
proxy := AdapterProxy{
kafkaICProxy: kafkaProxy,
coreTopic: coreTopic,
endpointMgr: kafka.NewEndpointManager(backend),
}
logger.Debugw(ctx, "topics", log.Fields{"core": proxy.coreTopic})
return &proxy
}
func (ap *AdapterProxy) SendInterAdapterMessage(ctx context.Context,
msg proto.Message,
msgType ic.InterAdapterMessageType_Types,
fromAdapter string,
toAdapter string,
toDeviceId string,
proxyDeviceId string,
messageId string) error {
logger.Debugw(ctx, "sending-inter-adapter-message", log.Fields{"type": msgType, "from": fromAdapter,
"to": toAdapter, "toDevice": toDeviceId, "proxyDevice": proxyDeviceId})
//Marshal the message
var marshalledMsg *any.Any
var err error
if marshalledMsg, err = ptypes.MarshalAny(msg); err != nil {
logger.Warnw(ctx, "cannot-marshal-msg", log.Fields{"error": err})
return err
}
// Set up the required rpc arguments
endpoint, err := ap.endpointMgr.GetEndpoint(ctx, toDeviceId, toAdapter)
if err != nil {
return err
}
//Build the inter adapter message
header := &ic.InterAdapterHeader{
Type: msgType,
FromTopic: fromAdapter,
ToTopic: string(endpoint),
ToDeviceId: toDeviceId,
ProxyDeviceId: proxyDeviceId,
}
if messageId != "" {
header.Id = messageId
} else {
header.Id = uuid.New().String()
}
header.Timestamp = ptypes.TimestampNow()
iaMsg := &ic.InterAdapterMessage{
Header: header,
Body: marshalledMsg,
}
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "msg",
Value: iaMsg,
}
topic := kafka.Topic{Name: string(endpoint)}
replyToTopic := kafka.Topic{Name: fromAdapter}
rpc := "process_inter_adapter_message"
// Add a indication in context to differentiate this Inter Adapter message during Span processing in Kafka IC proxy
ctx = context.WithValue(ctx, "inter-adapter-msg-type", msgType)
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
logger.Debugw(ctx, "inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
return unPackResponse(ctx, rpc, "", success, result)
}
func (ap *AdapterProxy) TechProfileInstanceRequest(ctx context.Context,
tpPath string,
parentPonPort uint32,
onuID uint32,
uniID uint32,
fromAdapter string,
toAdapter string,
toDeviceId string,
proxyDeviceId string) (*ic.InterAdapterTechProfileDownloadMessage, error) {
logger.Debugw(ctx, "sending-tech-profile-instance-request-message", log.Fields{"from": fromAdapter,
"to": toAdapter, "toDevice": toDeviceId, "proxyDevice": proxyDeviceId})
// Set up the required rpc arguments
endpoint, err := ap.endpointMgr.GetEndpoint(ctx, toDeviceId, toAdapter)
if err != nil {
return nil, err
}
//Build the inter adapter message
tpReqMsg := &ic.InterAdapterTechProfileInstanceRequestMessage{
TpInstancePath: tpPath,
ParentDeviceId: toDeviceId,
ParentPonPort: parentPonPort,
OnuId: onuID,
UniId: uniID,
}
args := make([]*kafka.KVArg, 1)
args[0] = &kafka.KVArg{
Key: "msg",
Value: tpReqMsg,
}
topic := kafka.Topic{Name: string(endpoint)}
replyToTopic := kafka.Topic{Name: fromAdapter}
rpc := "process_tech_profile_instance_request"
ctx = context.WithValue(ctx, "inter-adapter-tp-req-msg", tpPath)
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
logger.Debugw(ctx, "inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
if success {
tpDwnldMsg := &ic.InterAdapterTechProfileDownloadMessage{}
if err := ptypes.UnmarshalAny(result, tpDwnldMsg); err != nil {
logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
return nil, err
}
return tpDwnldMsg, nil
} else {
unpackResult := &ic.Error{}
var err error
if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
}
logger.Debugw(ctx, "TechProfileInstanceRequest-return", log.Fields{"tpPath": tpPath, "success": success, "error": err})
return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
}
}