VOL-3677 - move to v4 of protos and lib
Change-Id: Ie9d215f69b798dbf2b879d8e7d041c0d671f84c0
diff --git a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/adapters/common/core_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/adapters/common/core_proxy.go
new file mode 100644
index 0000000..1077226
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/adapters/common/core_proxy.go
@@ -0,0 +1,689 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package common
+
+import (
+ "context"
+ "sync"
+
+ "github.com/golang/protobuf/ptypes"
+ a "github.com/golang/protobuf/ptypes/any"
+ "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ ic "github.com/opencord/voltha-protos/v4/go/inter_container"
+ "github.com/opencord/voltha-protos/v4/go/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+type CoreProxy struct {
+ kafkaICProxy kafka.InterContainerProxy
+ adapterTopic string
+ coreTopic string
+ deviceIdCoreMap map[string]string
+ lockDeviceIdCoreMap sync.RWMutex
+}
+
+func NewCoreProxy(ctx context.Context, kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
+ var proxy CoreProxy
+ proxy.kafkaICProxy = kafkaProxy
+ proxy.adapterTopic = adapterTopic
+ proxy.coreTopic = coreTopic
+ proxy.deviceIdCoreMap = make(map[string]string)
+ proxy.lockDeviceIdCoreMap = sync.RWMutex{}
+ logger.Debugw(ctx, "TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
+
+ return &proxy
+}
+
+func unPackResponse(ctx context.Context, rpc string, deviceId string, success bool, response *a.Any) error {
+ if success {
+ return nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw(ctx, "response", log.Fields{"rpc": rpc, "device-id": deviceId, "success": success, "error": err})
+ // TODO: Need to get the real error code
+ return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
+ }
+}
+
+// UpdateCoreReference adds or update a core reference (really the topic name) for a given device Id
+func (ap *CoreProxy) UpdateCoreReference(deviceId string, coreReference string) {
+ ap.lockDeviceIdCoreMap.Lock()
+ defer ap.lockDeviceIdCoreMap.Unlock()
+ ap.deviceIdCoreMap[deviceId] = coreReference
+}
+
+// DeleteCoreReference removes a core reference (really the topic name) for a given device Id
+func (ap *CoreProxy) DeleteCoreReference(deviceId string) {
+ ap.lockDeviceIdCoreMap.Lock()
+ defer ap.lockDeviceIdCoreMap.Unlock()
+ delete(ap.deviceIdCoreMap, deviceId)
+}
+
+func (ap *CoreProxy) getCoreTopic(deviceId string) kafka.Topic {
+ ap.lockDeviceIdCoreMap.Lock()
+ defer ap.lockDeviceIdCoreMap.Unlock()
+
+ if t, exist := ap.deviceIdCoreMap[deviceId]; exist {
+ return kafka.Topic{Name: t}
+ }
+
+ return kafka.Topic{Name: ap.coreTopic}
+}
+
+func (ap *CoreProxy) getAdapterTopic(args ...string) kafka.Topic {
+ return kafka.Topic{Name: ap.adapterTopic}
+}
+
+func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
+ logger.Debugw(ctx, "registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
+ rpc := "Register"
+ topic := kafka.Topic{Name: ap.coreTopic}
+ replyToTopic := ap.getAdapterTopic()
+ args := make([]*kafka.KVArg, 2)
+
+ if adapter.TotalReplicas == 0 && adapter.CurrentReplica != 0 {
+ logger.Fatal(ctx, "totalReplicas can't be 0, since you're here you have at least one")
+ }
+
+ if adapter.CurrentReplica == 0 && adapter.TotalReplicas != 0 {
+ logger.Fatal(ctx, "currentReplica can't be 0, it has to start from 1")
+ }
+
+ if adapter.CurrentReplica == 0 && adapter.TotalReplicas == 0 {
+ // if the adapter is not setting these fields they default to 0,
+ // in that case it means the adapter is not ready to be scaled and thus it defaults
+ // to a single instance
+ adapter.CurrentReplica = 1
+ adapter.TotalReplicas = 1
+ }
+
+ if adapter.CurrentReplica > adapter.TotalReplicas {
+ logger.Fatalf(ctx, "CurrentReplica (%d) can't be greater than TotalReplicas (%d)",
+ adapter.CurrentReplica, adapter.TotalReplicas)
+ }
+
+ args[0] = &kafka.KVArg{
+ Key: "adapter",
+ Value: adapter,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "deviceTypes",
+ Value: deviceTypes,
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, "", args...)
+ logger.Debugw(ctx, "Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
+ return unPackResponse(ctx, rpc, "", success, result)
+}
+
+func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
+ logger.Debugw(ctx, "DeviceUpdate", log.Fields{"device-id": device.Id})
+ rpc := "DeviceUpdate"
+ toTopic := ap.getCoreTopic(device.Id)
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := ap.getAdapterTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ logger.Debugw(ctx, "DeviceUpdate-response", log.Fields{"device-id": device.Id, "success": success})
+ return unPackResponse(ctx, rpc, device.Id, success, result)
+}
+
+func (ap *CoreProxy) PortCreated(ctx context.Context, deviceId string, port *voltha.Port) error {
+ logger.Debugw(ctx, "PortCreated", log.Fields{"portNo": port.PortNo})
+ rpc := "PortCreated"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ args := make([]*kafka.KVArg, 2)
+ id := &voltha.ID{Id: deviceId}
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "port",
+ Value: port,
+ }
+
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := ap.getAdapterTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ logger.Debugw(ctx, "PortCreated-response", log.Fields{"device-id": deviceId, "success": success})
+ return unPackResponse(ctx, rpc, deviceId, success, result)
+}
+
+func (ap *CoreProxy) PortsStateUpdate(ctx context.Context, deviceId string, portTypeFilter uint32, operStatus voltha.OperStatus_Types) error {
+ logger.Debugw(ctx, "PortsStateUpdate", log.Fields{"device-id": deviceId})
+ rpc := "PortsStateUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ args := []*kafka.KVArg{{
+ Key: "device_id",
+ Value: &voltha.ID{Id: deviceId},
+ }, {
+ Key: "port_type_filter",
+ Value: &ic.IntType{Val: int64(portTypeFilter)},
+ }, {
+ Key: "oper_status",
+ Value: &ic.IntType{Val: int64(operStatus)},
+ }}
+
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := ap.getAdapterTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ logger.Debugw(ctx, "PortsStateUpdate-response", log.Fields{"device-id": deviceId, "success": success})
+ return unPackResponse(ctx, rpc, deviceId, success, result)
+}
+
+func (ap *CoreProxy) DeleteAllPorts(ctx context.Context, deviceId string) error {
+ logger.Debugw(ctx, "DeleteAllPorts", log.Fields{"device-id": deviceId})
+ rpc := "DeleteAllPorts"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ args := make([]*kafka.KVArg, 2)
+ id := &voltha.ID{Id: deviceId}
+
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := ap.getAdapterTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ logger.Debugw(ctx, "DeleteAllPorts-response", log.Fields{"device-id": deviceId, "success": success})
+ return unPackResponse(ctx, rpc, deviceId, success, result)
+}
+
+func (ap *CoreProxy) GetDevicePort(ctx context.Context, deviceID string, portNo uint32) (*voltha.Port, error) {
+ logger.Debugw(ctx, "GetDevicePort", log.Fields{"device-id": deviceID})
+ rpc := "GetDevicePort"
+
+ toTopic := ap.getCoreTopic(deviceID)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := []*kafka.KVArg{{
+ Key: "device_id",
+ Value: &voltha.ID{Id: deviceID},
+ }, {
+ Key: "port_no",
+ Value: &ic.IntType{Val: int64(portNo)},
+ }}
+
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceID, args...)
+ logger.Debugw(ctx, "GetDevicePort-response", log.Fields{"device-id": deviceID, "success": success})
+
+ if success {
+ port := &voltha.Port{}
+ if err := ptypes.UnmarshalAny(result, port); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Error(codes.InvalidArgument, err.Error())
+ }
+ return port, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw(ctx, "GetDevicePort-return", log.Fields{"device-id": deviceID, "success": success, "error": err})
+ // TODO: Need to get the real error code
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
+ }
+}
+
+func (ap *CoreProxy) ListDevicePorts(ctx context.Context, deviceID string) ([]*voltha.Port, error) {
+ logger.Debugw(ctx, "ListDevicePorts", log.Fields{"device-id": deviceID})
+ rpc := "ListDevicePorts"
+
+ toTopic := ap.getCoreTopic(deviceID)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := []*kafka.KVArg{{
+ Key: "device_id",
+ Value: &voltha.ID{Id: deviceID},
+ }}
+
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceID, args...)
+ logger.Debugw(ctx, "ListDevicePorts-response", log.Fields{"device-id": deviceID, "success": success})
+
+ if success {
+ ports := &voltha.Ports{}
+ if err := ptypes.UnmarshalAny(result, ports); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Error(codes.InvalidArgument, err.Error())
+ }
+ return ports.Items, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw(ctx, "ListDevicePorts-return", log.Fields{"device-id": deviceID, "success": success, "error": err})
+ // TODO: Need to get the real error code
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
+ }
+}
+
+func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
+ connStatus voltha.ConnectStatus_Types, operStatus voltha.OperStatus_Types) error {
+ logger.Debugw(ctx, "DeviceStateUpdate", log.Fields{"device-id": deviceId})
+ rpc := "DeviceStateUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ args := make([]*kafka.KVArg, 3)
+ id := &voltha.ID{Id: deviceId}
+ oStatus := &ic.IntType{Val: int64(operStatus)}
+ cStatus := &ic.IntType{Val: int64(connStatus)}
+
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "oper_status",
+ Value: oStatus,
+ }
+ args[2] = &kafka.KVArg{
+ Key: "connect_status",
+ Value: cStatus,
+ }
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := ap.getAdapterTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ logger.Debugw(ctx, "DeviceStateUpdate-response", log.Fields{"device-id": deviceId, "success": success})
+ return unPackResponse(ctx, rpc, deviceId, success, result)
+}
+
+func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
+ childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64) (*voltha.Device, error) {
+ logger.Debugw(ctx, "ChildDeviceDetected", log.Fields{"parent-device-id": parentDeviceId, "channelId": channelId})
+ rpc := "ChildDeviceDetected"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 7)
+ id := &voltha.ID{Id: parentDeviceId}
+ args[0] = &kafka.KVArg{
+ Key: "parent_device_id",
+ Value: id,
+ }
+ ppn := &ic.IntType{Val: int64(parentPortNo)}
+ args[1] = &kafka.KVArg{
+ Key: "parent_port_no",
+ Value: ppn,
+ }
+ cdt := &ic.StrType{Val: childDeviceType}
+ args[2] = &kafka.KVArg{
+ Key: "child_device_type",
+ Value: cdt,
+ }
+ channel := &ic.IntType{Val: int64(channelId)}
+ args[3] = &kafka.KVArg{
+ Key: "channel_id",
+ Value: channel,
+ }
+ vId := &ic.StrType{Val: vendorId}
+ args[4] = &kafka.KVArg{
+ Key: "vendor_id",
+ Value: vId,
+ }
+ sNo := &ic.StrType{Val: serialNumber}
+ args[5] = &kafka.KVArg{
+ Key: "serial_number",
+ Value: sNo,
+ }
+ oId := &ic.IntType{Val: int64(onuId)}
+ args[6] = &kafka.KVArg{
+ Key: "onu_id",
+ Value: oId,
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ logger.Debugw(ctx, "ChildDeviceDetected-response", log.Fields{"parent-device-id": parentDeviceId, "success": success})
+
+ if success {
+ volthaDevice := &voltha.Device{}
+ if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Error(codes.InvalidArgument, err.Error())
+ }
+ return volthaDevice, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw(ctx, "ChildDeviceDetected-return", log.Fields{"device-id": parentDeviceId, "success": success, "error": err})
+
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
+ }
+
+}
+
+func (ap *CoreProxy) ChildDevicesLost(ctx context.Context, parentDeviceId string) error {
+ logger.Debugw(ctx, "ChildDevicesLost", log.Fields{"parent-device-id": parentDeviceId})
+ rpc := "ChildDevicesLost"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 1)
+ id := &voltha.ID{Id: parentDeviceId}
+ args[0] = &kafka.KVArg{
+ Key: "parent_device_id",
+ Value: id,
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ logger.Debugw(ctx, "ChildDevicesLost-response", log.Fields{"parent-device-id": parentDeviceId, "success": success})
+ return unPackResponse(ctx, rpc, parentDeviceId, success, result)
+}
+
+func (ap *CoreProxy) ChildDevicesDetected(ctx context.Context, parentDeviceId string) error {
+ logger.Debugw(ctx, "ChildDevicesDetected", log.Fields{"parent-device-id": parentDeviceId})
+ rpc := "ChildDevicesDetected"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 1)
+ id := &voltha.ID{Id: parentDeviceId}
+ args[0] = &kafka.KVArg{
+ Key: "parent_device_id",
+ Value: id,
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ logger.Debugw(ctx, "ChildDevicesDetected-response", log.Fields{"parent-device-id": parentDeviceId, "success": success})
+ return unPackResponse(ctx, rpc, parentDeviceId, success, result)
+}
+
+func (ap *CoreProxy) GetDevice(ctx context.Context, parentDeviceId string, deviceId string) (*voltha.Device, error) {
+ logger.Debugw(ctx, "GetDevice", log.Fields{"device-id": deviceId})
+ rpc := "GetDevice"
+
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 1)
+ id := &voltha.ID{Id: deviceId}
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ logger.Debugw(ctx, "GetDevice-response", log.Fields{"parent-device-id": parentDeviceId, "success": success})
+
+ if success {
+ volthaDevice := &voltha.Device{}
+ if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Error(codes.InvalidArgument, err.Error())
+ }
+ return volthaDevice, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw(ctx, "GetDevice-return", log.Fields{"parent-device-id": parentDeviceId, "success": success, "error": err})
+ // TODO: Need to get the real error code
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
+ }
+}
+
+func (ap *CoreProxy) GetChildDevice(ctx context.Context, parentDeviceId string, kwargs map[string]interface{}) (*voltha.Device, error) {
+ logger.Debugw(ctx, "GetChildDevice", log.Fields{"parent-device-id": parentDeviceId, "kwargs": kwargs})
+ rpc := "GetChildDevice"
+
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 4)
+ id := &voltha.ID{Id: parentDeviceId}
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+
+ var cnt uint8 = 0
+ for k, v := range kwargs {
+ cnt += 1
+ if k == "serial_number" {
+ val := &ic.StrType{Val: v.(string)}
+ args[cnt] = &kafka.KVArg{
+ Key: k,
+ Value: val,
+ }
+ } else if k == "onu_id" {
+ val := &ic.IntType{Val: int64(v.(uint32))}
+ args[cnt] = &kafka.KVArg{
+ Key: k,
+ Value: val,
+ }
+ } else if k == "parent_port_no" {
+ val := &ic.IntType{Val: int64(v.(uint32))}
+ args[cnt] = &kafka.KVArg{
+ Key: k,
+ Value: val,
+ }
+ }
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ logger.Debugw(ctx, "GetChildDevice-response", log.Fields{"parent-device-id": parentDeviceId, "success": success})
+
+ if success {
+ volthaDevice := &voltha.Device{}
+ if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Error(codes.InvalidArgument, err.Error())
+ }
+ return volthaDevice, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw(ctx, "GetChildDevice-return", log.Fields{"parent-device-id": parentDeviceId, "success": success, "error": err})
+
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
+ }
+}
+
+func (ap *CoreProxy) GetChildDevices(ctx context.Context, parentDeviceId string) (*voltha.Devices, error) {
+ logger.Debugw(ctx, "GetChildDevices", log.Fields{"parent-device-id": parentDeviceId})
+ rpc := "GetChildDevices"
+
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 1)
+ id := &voltha.ID{Id: parentDeviceId}
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ logger.Debugw(ctx, "GetChildDevices-response", log.Fields{"parent-device-id": parentDeviceId, "success": success})
+
+ if success {
+ volthaDevices := &voltha.Devices{}
+ if err := ptypes.UnmarshalAny(result, volthaDevices); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Error(codes.InvalidArgument, err.Error())
+ }
+ return volthaDevices, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw(ctx, "GetChildDevices-return", log.Fields{"parent-device-id": parentDeviceId, "success": success, "error": err})
+
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
+ }
+}
+
+func (ap *CoreProxy) SendPacketIn(ctx context.Context, deviceId string, port uint32, pktPayload []byte) error {
+ logger.Debugw(ctx, "SendPacketIn", log.Fields{"device-id": deviceId, "port": port, "pktPayload": pktPayload})
+ rpc := "PacketIn"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 3)
+ id := &voltha.ID{Id: deviceId}
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+ portNo := &ic.IntType{Val: int64(port)}
+ args[1] = &kafka.KVArg{
+ Key: "port",
+ Value: portNo,
+ }
+ pkt := &ic.Packet{Payload: pktPayload}
+ args[2] = &kafka.KVArg{
+ Key: "packet",
+ Value: pkt,
+ }
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ logger.Debugw(ctx, "SendPacketIn-response", log.Fields{"device-id": deviceId, "success": success})
+ return unPackResponse(ctx, rpc, deviceId, success, result)
+}
+
+func (ap *CoreProxy) DeviceReasonUpdate(ctx context.Context, deviceId string, deviceReason string) error {
+ logger.Debugw(ctx, "DeviceReasonUpdate", log.Fields{"device-id": deviceId, "deviceReason": deviceReason})
+ rpc := "DeviceReasonUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 2)
+ id := &voltha.ID{Id: deviceId}
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+ reason := &ic.StrType{Val: deviceReason}
+ args[1] = &kafka.KVArg{
+ Key: "device_reason",
+ Value: reason,
+ }
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ logger.Debugw(ctx, "DeviceReason-response", log.Fields{"device-id": deviceId, "success": success})
+ return unPackResponse(ctx, rpc, deviceId, success, result)
+}
+
+func (ap *CoreProxy) DevicePMConfigUpdate(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
+ logger.Debugw(ctx, "DevicePMConfigUpdate", log.Fields{"pmConfigs": pmConfigs})
+ rpc := "DevicePMConfigUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(pmConfigs.Id)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device_pm_config",
+ Value: pmConfigs,
+ }
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, pmConfigs.Id, args...)
+ logger.Debugw(ctx, "DevicePMConfigUpdate-response", log.Fields{"pmconfig-device-id": pmConfigs.Id, "success": success})
+ return unPackResponse(ctx, rpc, pmConfigs.Id, success, result)
+}
+
+func (ap *CoreProxy) ReconcileChildDevices(ctx context.Context, parentDeviceId string) error {
+ logger.Debugw(ctx, "ReconcileChildDevices", log.Fields{"parent-device-id": parentDeviceId})
+ rpc := "ReconcileChildDevices"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := []*kafka.KVArg{
+ {Key: "parent_device_id", Value: &voltha.ID{Id: parentDeviceId}},
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ logger.Debugw(ctx, "ReconcileChildDevices-response", log.Fields{"parent-device-id": parentDeviceId, "success": success})
+ return unPackResponse(ctx, rpc, parentDeviceId, success, result)
+}
+
+func (ap *CoreProxy) PortStateUpdate(ctx context.Context, deviceId string, pType voltha.Port_PortType, portNum uint32,
+ operStatus voltha.OperStatus_Types) error {
+ logger.Debugw(ctx, "PortStateUpdate", log.Fields{"device-id": deviceId, "portType": pType, "portNo": portNum, "operation_status": operStatus})
+ rpc := "PortStateUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ args := make([]*kafka.KVArg, 4)
+ deviceID := &voltha.ID{Id: deviceId}
+ portNo := &ic.IntType{Val: int64(portNum)}
+ portType := &ic.IntType{Val: int64(pType)}
+ oStatus := &ic.IntType{Val: int64(operStatus)}
+
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: deviceID,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "oper_status",
+ Value: oStatus,
+ }
+ args[2] = &kafka.KVArg{
+ Key: "port_type",
+ Value: portType,
+ }
+ args[3] = &kafka.KVArg{
+ Key: "port_no",
+ Value: portNo,
+ }
+
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := ap.getAdapterTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ logger.Debugw(ctx, "PortStateUpdate-response", log.Fields{"device-id": deviceId, "success": success})
+ return unPackResponse(ctx, rpc, deviceId, success, result)
+}