[VOL-1349] EPON OLT adapter (package B)
Change-Id: I634ef62c53813dcf4456f54948f13e06358e263c
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/adapter_proxy_if.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/adapter_proxy_if.go
new file mode 100644
index 0000000..de5cfc0
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/adapter_proxy_if.go
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package adapterif
+
+import (
+ "context"
+
+ "github.com/golang/protobuf/proto"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+)
+
+// AdapterProxy interface for AdapterProxy implementation.
+type AdapterProxy interface {
+ SendInterAdapterMessage(ctx context.Context,
+ msg proto.Message,
+ msgType ic.InterAdapterMessageType_Types,
+ fromAdapter string,
+ toAdapter string,
+ toDeviceID string,
+ proxyDeviceID string,
+ messageID string) error
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/core_proxy_if.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/core_proxy_if.go
new file mode 100644
index 0000000..a7ab6dc
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/core_proxy_if.go
@@ -0,0 +1,52 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package adapterif
+
+import (
+ "context"
+
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+)
+
+// CoreProxy interface for voltha-go coreproxy.
+type CoreProxy interface {
+ UpdateCoreReference(deviceID string, coreReference string)
+ DeleteCoreReference(deviceID string)
+ RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error
+ DeviceUpdate(ctx context.Context, device *voltha.Device) error
+ PortCreated(ctx context.Context, deviceID string, port *voltha.Port) error
+ PortsStateUpdate(ctx context.Context, deviceID string, portTypeFilter uint32, operStatus voltha.OperStatus_Types) error
+ DeleteAllPorts(ctx context.Context, deviceID string) error
+ GetDevicePort(ctx context.Context, deviceID string, portNo uint32) (*voltha.Port, error)
+ ListDevicePorts(ctx context.Context, deviceID string) ([]*voltha.Port, error)
+ DeviceStateUpdate(ctx context.Context, deviceID string,
+ connStatus voltha.ConnectStatus_Types, operStatus voltha.OperStatus_Types) error
+
+ DevicePMConfigUpdate(ctx context.Context, pmConfigs *voltha.PmConfigs) error
+ ChildDeviceDetected(ctx context.Context, parentDeviceID string, parentPortNo int,
+ childDeviceType string, channelID int, vendorID string, serialNumber string, onuID int64) (*voltha.Device, error)
+
+ ChildDevicesLost(ctx context.Context, parentDeviceID string) error
+ ChildDevicesDetected(ctx context.Context, parentDeviceID string) error
+ GetDevice(ctx context.Context, parentDeviceID string, deviceID string) (*voltha.Device, error)
+ GetChildDevice(ctx context.Context, parentDeviceID string, kwargs map[string]interface{}) (*voltha.Device, error)
+ GetChildDevices(ctx context.Context, parentDeviceID string) (*voltha.Devices, error)
+ SendPacketIn(ctx context.Context, deviceID string, port uint32, pktPayload []byte) error
+ DeviceReasonUpdate(ctx context.Context, deviceID string, deviceReason string) error
+ PortStateUpdate(ctx context.Context, deviceID string, pType voltha.Port_PortType, portNo uint32,
+ operStatus voltha.OperStatus_Types) error
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/events_proxy_if.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/events_proxy_if.go
new file mode 100644
index 0000000..dbd8140
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/events_proxy_if.go
@@ -0,0 +1,40 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package adapterif
+
+import (
+ "context"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+)
+
+// EventProxy interface for eventproxy
+type EventProxy interface {
+ SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category EventCategory,
+ subCategory EventSubCategory, raisedTs int64) error
+ SendKpiEvent(ctx context.Context, id string, deviceEvent *voltha.KpiEvent2, category EventCategory,
+ subCategory EventSubCategory, raisedTs int64) error
+}
+
+const (
+ EventTypeVersion = "0.1"
+)
+
+type (
+ EventType = voltha.EventType_Types
+ EventCategory = voltha.EventCategory_Types
+ EventSubCategory = voltha.EventSubCategory_Types
+)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
new file mode 100644
index 0000000..8588fe4
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package common
+
+import (
+ "context"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/golang/protobuf/ptypes/any"
+ "github.com/google/uuid"
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+)
+
+type AdapterProxy struct {
+ kafkaICProxy kafka.InterContainerProxy
+ coreTopic string
+ endpointMgr kafka.EndpointManager
+}
+
+func NewAdapterProxy(ctx context.Context, kafkaProxy kafka.InterContainerProxy, coreTopic string, backend *db.Backend) *AdapterProxy {
+ proxy := AdapterProxy{
+ kafkaICProxy: kafkaProxy,
+ coreTopic: coreTopic,
+ endpointMgr: kafka.NewEndpointManager(backend),
+ }
+ logger.Debugw(ctx, "topics", log.Fields{"core": proxy.coreTopic})
+ return &proxy
+}
+
+func (ap *AdapterProxy) SendInterAdapterMessage(ctx context.Context,
+ msg proto.Message,
+ msgType ic.InterAdapterMessageType_Types,
+ fromAdapter string,
+ toAdapter string,
+ toDeviceId string,
+ proxyDeviceId string,
+ messageId string) error {
+ logger.Debugw(ctx, "sending-inter-adapter-message", log.Fields{"type": msgType, "from": fromAdapter,
+ "to": toAdapter, "toDevice": toDeviceId, "proxyDevice": proxyDeviceId})
+
+ //Marshal the message
+ var marshalledMsg *any.Any
+ var err error
+ if marshalledMsg, err = ptypes.MarshalAny(msg); err != nil {
+ logger.Warnw(ctx, "cannot-marshal-msg", log.Fields{"error": err})
+ return err
+ }
+
+ // Set up the required rpc arguments
+ endpoint, err := ap.endpointMgr.GetEndpoint(ctx, toDeviceId, toAdapter)
+ if err != nil {
+ return err
+ }
+
+ //Build the inter adapter message
+ header := &ic.InterAdapterHeader{
+ Type: msgType,
+ FromTopic: fromAdapter,
+ ToTopic: string(endpoint),
+ ToDeviceId: toDeviceId,
+ ProxyDeviceId: proxyDeviceId,
+ }
+ if messageId != "" {
+ header.Id = messageId
+ } else {
+ header.Id = uuid.New().String()
+ }
+ header.Timestamp = ptypes.TimestampNow()
+ iaMsg := &ic.InterAdapterMessage{
+ Header: header,
+ Body: marshalledMsg,
+ }
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "msg",
+ Value: iaMsg,
+ }
+
+ topic := kafka.Topic{Name: string(endpoint)}
+ replyToTopic := kafka.Topic{Name: fromAdapter}
+ rpc := "process_inter_adapter_message"
+
+ // Add a indication in context to differentiate this Inter Adapter message during Span processing in Kafka IC proxy
+ ctx = context.WithValue(ctx, "inter-adapter-msg-type", msgType)
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
+ logger.Debugw(ctx, "inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
+ return unPackResponse(ctx, rpc, "", success, result)
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/common.go
new file mode 100644
index 0000000..2f56e42
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/common.go
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package common
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.CLogger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
new file mode 100644
index 0000000..188bbbd
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
@@ -0,0 +1,689 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package common
+
+import (
+ "context"
+ "sync"
+
+ "github.com/golang/protobuf/ptypes"
+ a "github.com/golang/protobuf/ptypes/any"
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+type CoreProxy struct {
+ kafkaICProxy kafka.InterContainerProxy
+ adapterTopic string
+ coreTopic string
+ deviceIdCoreMap map[string]string
+ lockDeviceIdCoreMap sync.RWMutex
+}
+
+func NewCoreProxy(ctx context.Context, kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
+ var proxy CoreProxy
+ proxy.kafkaICProxy = kafkaProxy
+ proxy.adapterTopic = adapterTopic
+ proxy.coreTopic = coreTopic
+ proxy.deviceIdCoreMap = make(map[string]string)
+ proxy.lockDeviceIdCoreMap = sync.RWMutex{}
+ logger.Debugw(ctx, "TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
+
+ return &proxy
+}
+
+func unPackResponse(ctx context.Context, rpc string, deviceId string, success bool, response *a.Any) error {
+ if success {
+ return nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw(ctx, "response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
+ // TODO: Need to get the real error code
+ return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
+ }
+}
+
+// UpdateCoreReference adds or update a core reference (really the topic name) for a given device Id
+func (ap *CoreProxy) UpdateCoreReference(deviceId string, coreReference string) {
+ ap.lockDeviceIdCoreMap.Lock()
+ defer ap.lockDeviceIdCoreMap.Unlock()
+ ap.deviceIdCoreMap[deviceId] = coreReference
+}
+
+// DeleteCoreReference removes a core reference (really the topic name) for a given device Id
+func (ap *CoreProxy) DeleteCoreReference(deviceId string) {
+ ap.lockDeviceIdCoreMap.Lock()
+ defer ap.lockDeviceIdCoreMap.Unlock()
+ delete(ap.deviceIdCoreMap, deviceId)
+}
+
+func (ap *CoreProxy) getCoreTopic(deviceId string) kafka.Topic {
+ ap.lockDeviceIdCoreMap.Lock()
+ defer ap.lockDeviceIdCoreMap.Unlock()
+
+ if t, exist := ap.deviceIdCoreMap[deviceId]; exist {
+ return kafka.Topic{Name: t}
+ }
+
+ return kafka.Topic{Name: ap.coreTopic}
+}
+
+func (ap *CoreProxy) getAdapterTopic(args ...string) kafka.Topic {
+ return kafka.Topic{Name: ap.adapterTopic}
+}
+
+func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
+ logger.Debugw(ctx, "registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
+ rpc := "Register"
+ topic := kafka.Topic{Name: ap.coreTopic}
+ replyToTopic := ap.getAdapterTopic()
+ args := make([]*kafka.KVArg, 2)
+
+ if adapter.TotalReplicas == 0 && adapter.CurrentReplica != 0 {
+ logger.Fatal(ctx, "totalReplicas can't be 0, since you're here you have at least one")
+ }
+
+ if adapter.CurrentReplica == 0 && adapter.TotalReplicas != 0 {
+ logger.Fatal(ctx, "currentReplica can't be 0, it has to start from 1")
+ }
+
+ if adapter.CurrentReplica == 0 && adapter.TotalReplicas == 0 {
+ // if the adapter is not setting these fields they default to 0,
+ // in that case it means the adapter is not ready to be scaled and thus it defaults
+ // to a single instance
+ adapter.CurrentReplica = 1
+ adapter.TotalReplicas = 1
+ }
+
+ if adapter.CurrentReplica > adapter.TotalReplicas {
+ logger.Fatalf(ctx, "CurrentReplica (%d) can't be greater than TotalReplicas (%d)",
+ adapter.CurrentReplica, adapter.TotalReplicas)
+ }
+
+ args[0] = &kafka.KVArg{
+ Key: "adapter",
+ Value: adapter,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "deviceTypes",
+ Value: deviceTypes,
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, "", args...)
+ logger.Debugw(ctx, "Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
+ return unPackResponse(ctx, rpc, "", success, result)
+}
+
+func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
+ logger.Debugw(ctx, "DeviceUpdate", log.Fields{"deviceId": device.Id})
+ rpc := "DeviceUpdate"
+ toTopic := ap.getCoreTopic(device.Id)
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := ap.getAdapterTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ logger.Debugw(ctx, "DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
+ return unPackResponse(ctx, rpc, device.Id, success, result)
+}
+
+func (ap *CoreProxy) PortCreated(ctx context.Context, deviceId string, port *voltha.Port) error {
+ logger.Debugw(ctx, "PortCreated", log.Fields{"portNo": port.PortNo})
+ rpc := "PortCreated"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ args := make([]*kafka.KVArg, 2)
+ id := &voltha.ID{Id: deviceId}
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "port",
+ Value: port,
+ }
+
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := ap.getAdapterTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ logger.Debugw(ctx, "PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
+ return unPackResponse(ctx, rpc, deviceId, success, result)
+}
+
+func (ap *CoreProxy) PortsStateUpdate(ctx context.Context, deviceId string, portTypeFilter uint32, operStatus voltha.OperStatus_Types) error {
+ logger.Debugw(ctx, "PortsStateUpdate", log.Fields{"deviceId": deviceId})
+ rpc := "PortsStateUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ args := []*kafka.KVArg{{
+ Key: "device_id",
+ Value: &voltha.ID{Id: deviceId},
+ }, {
+ Key: "port_type_filter",
+ Value: &ic.IntType{Val: int64(portTypeFilter)},
+ }, {
+ Key: "oper_status",
+ Value: &ic.IntType{Val: int64(operStatus)},
+ }}
+
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := ap.getAdapterTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ logger.Debugw(ctx, "PortsStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
+ return unPackResponse(ctx, rpc, deviceId, success, result)
+}
+
+func (ap *CoreProxy) DeleteAllPorts(ctx context.Context, deviceId string) error {
+ logger.Debugw(ctx, "DeleteAllPorts", log.Fields{"deviceId": deviceId})
+ rpc := "DeleteAllPorts"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ args := make([]*kafka.KVArg, 2)
+ id := &voltha.ID{Id: deviceId}
+
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := ap.getAdapterTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ logger.Debugw(ctx, "DeleteAllPorts-response", log.Fields{"deviceId": deviceId, "success": success})
+ return unPackResponse(ctx, rpc, deviceId, success, result)
+}
+
+func (ap *CoreProxy) GetDevicePort(ctx context.Context, deviceID string, portNo uint32) (*voltha.Port, error) {
+ logger.Debugw(ctx, "GetDevicePort", log.Fields{"device-id": deviceID})
+ rpc := "GetDevicePort"
+
+ toTopic := ap.getCoreTopic(deviceID)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := []*kafka.KVArg{{
+ Key: "device_id",
+ Value: &voltha.ID{Id: deviceID},
+ }, {
+ Key: "port_no",
+ Value: &ic.IntType{Val: int64(portNo)},
+ }}
+
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceID, args...)
+ logger.Debugw(ctx, "GetDevicePort-response", log.Fields{"device-id": deviceID, "success": success})
+
+ if success {
+ port := &voltha.Port{}
+ if err := ptypes.UnmarshalAny(result, port); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Error(codes.InvalidArgument, err.Error())
+ }
+ return port, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw(ctx, "GetDevicePort-return", log.Fields{"device-id": deviceID, "success": success, "error": err})
+ // TODO: Need to get the real error code
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
+ }
+}
+
+func (ap *CoreProxy) ListDevicePorts(ctx context.Context, deviceID string) ([]*voltha.Port, error) {
+ logger.Debugw(ctx, "ListDevicePorts", log.Fields{"device-id": deviceID})
+ rpc := "ListDevicePorts"
+
+ toTopic := ap.getCoreTopic(deviceID)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := []*kafka.KVArg{{
+ Key: "device_id",
+ Value: &voltha.ID{Id: deviceID},
+ }}
+
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceID, args...)
+ logger.Debugw(ctx, "ListDevicePorts-response", log.Fields{"device-id": deviceID, "success": success})
+
+ if success {
+ ports := &voltha.Ports{}
+ if err := ptypes.UnmarshalAny(result, ports); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Error(codes.InvalidArgument, err.Error())
+ }
+ return ports.Items, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw(ctx, "ListDevicePorts-return", log.Fields{"device-id": deviceID, "success": success, "error": err})
+ // TODO: Need to get the real error code
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
+ }
+}
+
+func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
+ connStatus voltha.ConnectStatus_Types, operStatus voltha.OperStatus_Types) error {
+ logger.Debugw(ctx, "DeviceStateUpdate", log.Fields{"deviceId": deviceId})
+ rpc := "DeviceStateUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ args := make([]*kafka.KVArg, 3)
+ id := &voltha.ID{Id: deviceId}
+ oStatus := &ic.IntType{Val: int64(operStatus)}
+ cStatus := &ic.IntType{Val: int64(connStatus)}
+
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "oper_status",
+ Value: oStatus,
+ }
+ args[2] = &kafka.KVArg{
+ Key: "connect_status",
+ Value: cStatus,
+ }
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := ap.getAdapterTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ logger.Debugw(ctx, "DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
+ return unPackResponse(ctx, rpc, deviceId, success, result)
+}
+
+func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
+ childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64) (*voltha.Device, error) {
+ logger.Debugw(ctx, "ChildDeviceDetected", log.Fields{"pDeviceId": parentDeviceId, "channelId": channelId})
+ rpc := "ChildDeviceDetected"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 7)
+ id := &voltha.ID{Id: parentDeviceId}
+ args[0] = &kafka.KVArg{
+ Key: "parent_device_id",
+ Value: id,
+ }
+ ppn := &ic.IntType{Val: int64(parentPortNo)}
+ args[1] = &kafka.KVArg{
+ Key: "parent_port_no",
+ Value: ppn,
+ }
+ cdt := &ic.StrType{Val: childDeviceType}
+ args[2] = &kafka.KVArg{
+ Key: "child_device_type",
+ Value: cdt,
+ }
+ channel := &ic.IntType{Val: int64(channelId)}
+ args[3] = &kafka.KVArg{
+ Key: "channel_id",
+ Value: channel,
+ }
+ vId := &ic.StrType{Val: vendorId}
+ args[4] = &kafka.KVArg{
+ Key: "vendor_id",
+ Value: vId,
+ }
+ sNo := &ic.StrType{Val: serialNumber}
+ args[5] = &kafka.KVArg{
+ Key: "serial_number",
+ Value: sNo,
+ }
+ oId := &ic.IntType{Val: int64(onuId)}
+ args[6] = &kafka.KVArg{
+ Key: "onu_id",
+ Value: oId,
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ logger.Debugw(ctx, "ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+
+ if success {
+ volthaDevice := &voltha.Device{}
+ if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Error(codes.InvalidArgument, err.Error())
+ }
+ return volthaDevice, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw(ctx, "ChildDeviceDetected-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
+
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
+ }
+
+}
+
+func (ap *CoreProxy) ChildDevicesLost(ctx context.Context, parentDeviceId string) error {
+ logger.Debugw(ctx, "ChildDevicesLost", log.Fields{"pDeviceId": parentDeviceId})
+ rpc := "ChildDevicesLost"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 1)
+ id := &voltha.ID{Id: parentDeviceId}
+ args[0] = &kafka.KVArg{
+ Key: "parent_device_id",
+ Value: id,
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ logger.Debugw(ctx, "ChildDevicesLost-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+ return unPackResponse(ctx, rpc, parentDeviceId, success, result)
+}
+
+func (ap *CoreProxy) ChildDevicesDetected(ctx context.Context, parentDeviceId string) error {
+ logger.Debugw(ctx, "ChildDevicesDetected", log.Fields{"pDeviceId": parentDeviceId})
+ rpc := "ChildDevicesDetected"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 1)
+ id := &voltha.ID{Id: parentDeviceId}
+ args[0] = &kafka.KVArg{
+ Key: "parent_device_id",
+ Value: id,
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ logger.Debugw(ctx, "ChildDevicesDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+ return unPackResponse(ctx, rpc, parentDeviceId, success, result)
+}
+
+func (ap *CoreProxy) GetDevice(ctx context.Context, parentDeviceId string, deviceId string) (*voltha.Device, error) {
+ logger.Debugw(ctx, "GetDevice", log.Fields{"deviceId": deviceId})
+ rpc := "GetDevice"
+
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 1)
+ id := &voltha.ID{Id: deviceId}
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ logger.Debugw(ctx, "GetDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+
+ if success {
+ volthaDevice := &voltha.Device{}
+ if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Error(codes.InvalidArgument, err.Error())
+ }
+ return volthaDevice, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw(ctx, "GetDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
+ // TODO: Need to get the real error code
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
+ }
+}
+
+func (ap *CoreProxy) GetChildDevice(ctx context.Context, parentDeviceId string, kwargs map[string]interface{}) (*voltha.Device, error) {
+ logger.Debugw(ctx, "GetChildDevice", log.Fields{"parentDeviceId": parentDeviceId, "kwargs": kwargs})
+ rpc := "GetChildDevice"
+
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 4)
+ id := &voltha.ID{Id: parentDeviceId}
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+
+ var cnt uint8 = 0
+ for k, v := range kwargs {
+ cnt += 1
+ if k == "serial_number" {
+ val := &ic.StrType{Val: v.(string)}
+ args[cnt] = &kafka.KVArg{
+ Key: k,
+ Value: val,
+ }
+ } else if k == "onu_id" {
+ val := &ic.IntType{Val: int64(v.(uint32))}
+ args[cnt] = &kafka.KVArg{
+ Key: k,
+ Value: val,
+ }
+ } else if k == "parent_port_no" {
+ val := &ic.IntType{Val: int64(v.(uint32))}
+ args[cnt] = &kafka.KVArg{
+ Key: k,
+ Value: val,
+ }
+ }
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ logger.Debugw(ctx, "GetChildDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+
+ if success {
+ volthaDevice := &voltha.Device{}
+ if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Error(codes.InvalidArgument, err.Error())
+ }
+ return volthaDevice, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw(ctx, "GetChildDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
+
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
+ }
+}
+
+func (ap *CoreProxy) GetChildDevices(ctx context.Context, parentDeviceId string) (*voltha.Devices, error) {
+ logger.Debugw(ctx, "GetChildDevices", log.Fields{"parentDeviceId": parentDeviceId})
+ rpc := "GetChildDevices"
+
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 1)
+ id := &voltha.ID{Id: parentDeviceId}
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ logger.Debugw(ctx, "GetChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+
+ if success {
+ volthaDevices := &voltha.Devices{}
+ if err := ptypes.UnmarshalAny(result, volthaDevices); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Error(codes.InvalidArgument, err.Error())
+ }
+ return volthaDevices, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw(ctx, "GetChildDevices-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
+
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
+ }
+}
+
+func (ap *CoreProxy) SendPacketIn(ctx context.Context, deviceId string, port uint32, pktPayload []byte) error {
+ logger.Debugw(ctx, "SendPacketIn", log.Fields{"deviceId": deviceId, "port": port, "pktPayload": pktPayload})
+ rpc := "PacketIn"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 3)
+ id := &voltha.ID{Id: deviceId}
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+ portNo := &ic.IntType{Val: int64(port)}
+ args[1] = &kafka.KVArg{
+ Key: "port",
+ Value: portNo,
+ }
+ pkt := &ic.Packet{Payload: pktPayload}
+ args[2] = &kafka.KVArg{
+ Key: "packet",
+ Value: pkt,
+ }
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ logger.Debugw(ctx, "SendPacketIn-response", log.Fields{"pDeviceId": deviceId, "success": success})
+ return unPackResponse(ctx, rpc, deviceId, success, result)
+}
+
+func (ap *CoreProxy) DeviceReasonUpdate(ctx context.Context, deviceId string, deviceReason string) error {
+ logger.Debugw(ctx, "DeviceReasonUpdate", log.Fields{"deviceId": deviceId, "deviceReason": deviceReason})
+ rpc := "DeviceReasonUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 2)
+ id := &voltha.ID{Id: deviceId}
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+ reason := &ic.StrType{Val: deviceReason}
+ args[1] = &kafka.KVArg{
+ Key: "device_reason",
+ Value: reason,
+ }
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ logger.Debugw(ctx, "DeviceReason-response", log.Fields{"pDeviceId": deviceId, "success": success})
+ return unPackResponse(ctx, rpc, deviceId, success, result)
+}
+
+func (ap *CoreProxy) DevicePMConfigUpdate(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
+ logger.Debugw(ctx, "DevicePMConfigUpdate", log.Fields{"pmConfigs": pmConfigs})
+ rpc := "DevicePMConfigUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(pmConfigs.Id)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device_pm_config",
+ Value: pmConfigs,
+ }
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, pmConfigs.Id, args...)
+ logger.Debugw(ctx, "DevicePMConfigUpdate-response", log.Fields{"pDeviceId": pmConfigs.Id, "success": success})
+ return unPackResponse(ctx, rpc, pmConfigs.Id, success, result)
+}
+
+func (ap *CoreProxy) ReconcileChildDevices(ctx context.Context, parentDeviceId string) error {
+ logger.Debugw(ctx, "ReconcileChildDevices", log.Fields{"parentDeviceId": parentDeviceId})
+ rpc := "ReconcileChildDevices"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := []*kafka.KVArg{
+ {Key: "parent_device_id", Value: &voltha.ID{Id: parentDeviceId}},
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ logger.Debugw(ctx, "ReconcileChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+ return unPackResponse(ctx, rpc, parentDeviceId, success, result)
+}
+
+func (ap *CoreProxy) PortStateUpdate(ctx context.Context, deviceId string, pType voltha.Port_PortType, portNum uint32,
+ operStatus voltha.OperStatus_Types) error {
+ logger.Debugw(ctx, "PortStateUpdate", log.Fields{"deviceId": deviceId, "portType": pType, "portNo": portNum, "operation_status": operStatus})
+ rpc := "PortStateUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ args := make([]*kafka.KVArg, 4)
+ deviceID := &voltha.ID{Id: deviceId}
+ portNo := &ic.IntType{Val: int64(portNum)}
+ portType := &ic.IntType{Val: int64(pType)}
+ oStatus := &ic.IntType{Val: int64(operStatus)}
+
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: deviceID,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "oper_status",
+ Value: oStatus,
+ }
+ args[2] = &kafka.KVArg{
+ Key: "port_type",
+ Value: portType,
+ }
+ args[3] = &kafka.KVArg{
+ Key: "port_no",
+ Value: portNo,
+ }
+
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := ap.getAdapterTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(log.WithSpanFromContext(context.Background(), ctx), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ logger.Debugw(ctx, "PortStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
+ return unPackResponse(ctx, rpc, deviceId, success, result)
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/events_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/events_proxy.go
new file mode 100644
index 0000000..b79bafe
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/events_proxy.go
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/golang/protobuf/ptypes"
+ "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+)
+
+type EventProxy struct {
+ kafkaClient kafka.Client
+ eventTopic kafka.Topic
+}
+
+func NewEventProxy(opts ...EventProxyOption) *EventProxy {
+ var proxy EventProxy
+ for _, option := range opts {
+ option(&proxy)
+ }
+ return &proxy
+}
+
+type EventProxyOption func(*EventProxy)
+
+func MsgClient(client kafka.Client) EventProxyOption {
+ return func(args *EventProxy) {
+ args.kafkaClient = client
+ }
+}
+
+func MsgTopic(topic kafka.Topic) EventProxyOption {
+ return func(args *EventProxy) {
+ args.eventTopic = topic
+ }
+}
+
+func (ep *EventProxy) formatId(eventName string) string {
+ return fmt.Sprintf("Voltha.openolt.%s.%s", eventName, strconv.FormatInt(time.Now().UnixNano(), 10))
+}
+
+func (ep *EventProxy) getEventHeader(eventName string,
+ category adapterif.EventCategory,
+ subCategory adapterif.EventSubCategory,
+ eventType adapterif.EventType,
+ raisedTs int64) (*voltha.EventHeader, error) {
+ var header voltha.EventHeader
+ if strings.Contains(eventName, "_") {
+ eventName = strings.Join(strings.Split(eventName, "_")[:len(strings.Split(eventName, "_"))-2], "_")
+ } else {
+ eventName = "UNKNOWN_EVENT"
+ }
+ /* Populating event header */
+ header.Id = ep.formatId(eventName)
+ header.Category = category
+ header.SubCategory = subCategory
+ header.Type = eventType
+ header.TypeVersion = adapterif.EventTypeVersion
+
+ // raisedTs is in nanoseconds
+ timestamp, err := ptypes.TimestampProto(time.Unix(0, raisedTs))
+ if err != nil {
+ return nil, err
+ }
+ header.RaisedTs = timestamp
+
+ timestamp, err = ptypes.TimestampProto(time.Now())
+ if err != nil {
+ return nil, err
+ }
+ header.ReportedTs = timestamp
+
+ return &header, nil
+}
+
+/* Send out device events*/
+func (ep *EventProxy) SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category adapterif.EventCategory, subCategory adapterif.EventSubCategory, raisedTs int64) error {
+ if deviceEvent == nil {
+ logger.Error(ctx, "Recieved empty device event")
+ return errors.New("Device event nil")
+ }
+ var event voltha.Event
+ var de voltha.Event_DeviceEvent
+ var err error
+ de.DeviceEvent = deviceEvent
+ if event.Header, err = ep.getEventHeader(deviceEvent.DeviceEventName, category, subCategory, voltha.EventType_DEVICE_EVENT, raisedTs); err != nil {
+ return err
+ }
+ event.EventType = &de
+ if err := ep.sendEvent(ctx, &event); err != nil {
+ logger.Errorw(ctx, "Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
+ return err
+ }
+ logger.Infow(ctx, "Successfully sent device event KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
+ "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
+ "ReportedTs": event.Header.ReportedTs, "ResourceId": deviceEvent.ResourceId, "Context": deviceEvent.Context,
+ "DeviceEventName": deviceEvent.DeviceEventName})
+
+ return nil
+
+}
+
+// SendKpiEvent is to send kpi events to voltha.event topic
+func (ep *EventProxy) SendKpiEvent(ctx context.Context, id string, kpiEvent *voltha.KpiEvent2, category adapterif.EventCategory, subCategory adapterif.EventSubCategory, raisedTs int64) error {
+ if kpiEvent == nil {
+ logger.Error(ctx, "Recieved empty kpi event")
+ return errors.New("KPI event nil")
+ }
+ var event voltha.Event
+ var de voltha.Event_KpiEvent2
+ var err error
+ de.KpiEvent2 = kpiEvent
+ if event.Header, err = ep.getEventHeader(id, category, subCategory, voltha.EventType_KPI_EVENT2, raisedTs); err != nil {
+ return err
+ }
+ event.EventType = &de
+ if err := ep.sendEvent(ctx, &event); err != nil {
+ logger.Errorw(ctx, "Failed to send kpi event to KAFKA bus", log.Fields{"device-event": kpiEvent})
+ return err
+ }
+ logger.Infow(ctx, "Successfully sent kpi event to KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
+ "SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
+ "ReportedTs": event.Header.ReportedTs, "KpiEventName": "STATS_EVENT"})
+
+ return nil
+
+}
+
+/* TODO: Send out KPI events*/
+
+func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event) error {
+ if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic); err != nil {
+ return err
+ }
+ logger.Debugw(ctx, "Sent event to kafka", log.Fields{"event": event})
+
+ return nil
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/performance_metrics.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/performance_metrics.go
new file mode 100644
index 0000000..7697c05
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/performance_metrics.go
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+)
+
+type PmMetrics struct {
+ deviceId string
+ frequency uint32
+ grouped bool
+ frequencyOverride bool
+ metrics map[string]*voltha.PmConfig
+}
+
+type PmMetricsOption func(*PmMetrics)
+
+func Frequency(frequency uint32) PmMetricsOption {
+ return func(args *PmMetrics) {
+ args.frequency = frequency
+ }
+}
+
+func Grouped(grouped bool) PmMetricsOption {
+ return func(args *PmMetrics) {
+ args.grouped = grouped
+ }
+}
+
+func FrequencyOverride(frequencyOverride bool) PmMetricsOption {
+ return func(args *PmMetrics) {
+ args.frequencyOverride = frequencyOverride
+ }
+}
+
+func Metrics(pmNames []string) PmMetricsOption {
+ return func(args *PmMetrics) {
+ args.metrics = make(map[string]*voltha.PmConfig)
+ for _, name := range pmNames {
+ args.metrics[name] = &voltha.PmConfig{
+ Name: name,
+ Type: voltha.PmConfig_COUNTER,
+ Enabled: true,
+ }
+ }
+ }
+}
+
+func NewPmMetrics(deviceId string, opts ...PmMetricsOption) *PmMetrics {
+ pm := &PmMetrics{deviceId: deviceId}
+ for _, option := range opts {
+ option(pm)
+ }
+ return pm
+}
+
+func (pm *PmMetrics) ToPmConfigs() *voltha.PmConfigs {
+ pmConfigs := &voltha.PmConfigs{
+ Id: pm.deviceId,
+ DefaultFreq: pm.frequency,
+ Grouped: pm.grouped,
+ FreqOverride: pm.frequencyOverride,
+ }
+ for _, v := range pm.metrics {
+ pmConfigs.Metrics = append(pmConfigs.Metrics, &voltha.PmConfig{Name: v.Name, Type: v.Type, Enabled: v.Enabled})
+ }
+ return pmConfigs
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/request_handler.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/request_handler.go
new file mode 100644
index 0000000..a92ed51
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/request_handler.go
@@ -0,0 +1,735 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package common
+
+import (
+ "context"
+ "errors"
+
+ "github.com/golang/protobuf/ptypes"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-lib-go/v3/pkg/adapters"
+ "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+type RequestHandlerProxy struct {
+ TestMode bool
+ coreInstanceId string
+ adapter adapters.IAdapter
+ coreProxy adapterif.CoreProxy
+}
+
+func NewRequestHandlerProxy(coreInstanceId string, iadapter adapters.IAdapter, cProxy adapterif.CoreProxy) *RequestHandlerProxy {
+ var proxy RequestHandlerProxy
+ proxy.coreInstanceId = coreInstanceId
+ proxy.adapter = iadapter
+ proxy.coreProxy = cProxy
+ return &proxy
+}
+
+func (rhp *RequestHandlerProxy) Adapter_descriptor() (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Device_types() (*voltha.DeviceTypes, error) {
+ return nil, nil
+}
+
+func (rhp *RequestHandlerProxy) Health() (*voltha.HealthStatus, error) {
+ return nil, nil
+}
+
+func (rhp *RequestHandlerProxy) Adopt_device(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
+ if len(args) < 3 {
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ device := &voltha.Device{}
+ transactionID := &ic.StrType{}
+ fromTopic := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.FromTopic:
+ if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+
+ logger.Debugw(ctx, "Adopt_device", log.Fields{"deviceId": device.Id})
+
+ //Update the core reference for that device
+ rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
+
+ //Invoke the adopt device on the adapter
+ if err := rhp.adapter.Adopt_device(ctx, device); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
+
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Reconcile_device(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
+ if len(args) < 3 {
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+
+ device := &voltha.Device{}
+ transactionID := &ic.StrType{}
+ fromTopic := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.FromTopic:
+ if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ //Update the core reference for that device
+ rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
+
+ //Invoke the reconcile device API on the adapter
+ if err := rhp.adapter.Reconcile_device(ctx, device); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Abandon_device(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Disable_device(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
+ if len(args) < 3 {
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+
+ device := &voltha.Device{}
+ transactionID := &ic.StrType{}
+ fromTopic := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.FromTopic:
+ if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ //Update the core reference for that device
+ rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
+ //Invoke the Disable_device API on the adapter
+ if err := rhp.adapter.Disable_device(ctx, device); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Reenable_device(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
+ if len(args) < 3 {
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+
+ device := &voltha.Device{}
+ transactionID := &ic.StrType{}
+ fromTopic := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.FromTopic:
+ if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ //Update the core reference for that device
+ rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
+ //Invoke the Reenable_device API on the adapter
+ if err := rhp.adapter.Reenable_device(ctx, device); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Reboot_device(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
+ if len(args) < 3 {
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+
+ device := &voltha.Device{}
+ transactionID := &ic.StrType{}
+ fromTopic := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.FromTopic:
+ if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ //Update the core reference for that device
+ rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
+ //Invoke the Reboot_device API on the adapter
+ if err := rhp.adapter.Reboot_device(ctx, device); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
+ return new(empty.Empty), nil
+
+}
+
+func (rhp *RequestHandlerProxy) Self_test_device(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Delete_device(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
+ if len(args) < 3 {
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+
+ device := &voltha.Device{}
+ transactionID := &ic.StrType{}
+ fromTopic := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.FromTopic:
+ if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ //Update the core reference for that device
+ rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
+ //Invoke the delete_device API on the adapter
+ if err := rhp.adapter.Delete_device(ctx, device); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Get_device_details(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Update_flows_bulk(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
+ logger.Debug(ctx, "Update_flows_bulk")
+ if len(args) < 5 {
+ logger.Warn(ctx, "Update_flows_bulk-invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ device := &voltha.Device{}
+ transactionID := &ic.StrType{}
+ flows := &voltha.Flows{}
+ flowMetadata := &voltha.FlowMetadata{}
+ groups := &voltha.FlowGroups{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case "flows":
+ if err := ptypes.UnmarshalAny(arg.Value, flows); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-flows", log.Fields{"error": err})
+ return nil, err
+ }
+ case "groups":
+ if err := ptypes.UnmarshalAny(arg.Value, groups); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-groups", log.Fields{"error": err})
+ return nil, err
+ }
+ case "flow_metadata":
+ if err := ptypes.UnmarshalAny(arg.Value, flowMetadata); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-metadata", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ logger.Debugw(ctx, "Update_flows_bulk", log.Fields{"flows": flows, "groups": groups})
+ //Invoke the bulk flow update API of the adapter
+ if err := rhp.adapter.Update_flows_bulk(ctx, device, flows, groups, flowMetadata); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Update_flows_incrementally(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
+ logger.Debug(ctx, "Update_flows_incrementally")
+ if len(args) < 5 {
+ logger.Warn(ctx, "Update_flows_incrementally-invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ device := &voltha.Device{}
+ transactionID := &ic.StrType{}
+ flows := &openflow_13.FlowChanges{}
+ flowMetadata := &voltha.FlowMetadata{}
+ groups := &openflow_13.FlowGroupChanges{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case "flow_changes":
+ if err := ptypes.UnmarshalAny(arg.Value, flows); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-flows", log.Fields{"error": err})
+ return nil, err
+ }
+ case "group_changes":
+ if err := ptypes.UnmarshalAny(arg.Value, groups); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-groups", log.Fields{"error": err})
+ return nil, err
+ }
+ case "flow_metadata":
+ if err := ptypes.UnmarshalAny(arg.Value, flowMetadata); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-metadata", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ logger.Debugw(ctx, "Update_flows_incrementally", log.Fields{"flows": flows, "groups": groups})
+ //Invoke the incremental flow update API of the adapter
+ if err := rhp.adapter.Update_flows_incrementally(ctx, device, flows, groups, flowMetadata); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Update_pm_config(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
+ logger.Debug(ctx, "Update_pm_config")
+ if len(args) < 2 {
+ logger.Warn(ctx, "Update_pm_config-invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ device := &voltha.Device{}
+ transactionID := &ic.StrType{}
+ pmConfigs := &voltha.PmConfigs{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case "pm_configs":
+ if err := ptypes.UnmarshalAny(arg.Value, pmConfigs); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-pm-configs", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ logger.Debugw(ctx, "Update_pm_config", log.Fields{"deviceId": device.Id, "pmConfigs": pmConfigs})
+ //Invoke the pm config update API of the adapter
+ if err := rhp.adapter.Update_pm_config(ctx, device, pmConfigs); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Receive_packet_out(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
+ logger.Debugw(ctx, "Receive_packet_out", log.Fields{"args": args})
+ if len(args) < 3 {
+ logger.Warn(ctx, "Receive_packet_out-invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ deviceId := &ic.StrType{}
+ egressPort := &ic.IntType{}
+ packet := &openflow_13.OfpPacketOut{}
+ transactionID := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "deviceId":
+ if err := ptypes.UnmarshalAny(arg.Value, deviceId); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-deviceId", log.Fields{"error": err})
+ return nil, err
+ }
+ case "outPort":
+ if err := ptypes.UnmarshalAny(arg.Value, egressPort); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-egressPort", log.Fields{"error": err})
+ return nil, err
+ }
+ case "packet":
+ if err := ptypes.UnmarshalAny(arg.Value, packet); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-packet", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ logger.Debugw(ctx, "Receive_packet_out", log.Fields{"deviceId": deviceId.Val, "outPort": egressPort, "packet": packet})
+ //Invoke the adopt device on the adapter
+ if err := rhp.adapter.Receive_packet_out(ctx, deviceId.Val, int(egressPort.Val), packet); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Suppress_alarm(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Unsuppress_alarm(args []*ic.Argument) (*empty.Empty, error) {
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Get_ofp_device_info(ctx context.Context, args []*ic.Argument) (*ic.SwitchCapability, error) {
+ if len(args) < 2 {
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ device := &voltha.Device{}
+ transactionID := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+
+ logger.Debugw(ctx, "Get_ofp_device_info", log.Fields{"deviceId": device.Id})
+
+ var cap *ic.SwitchCapability
+ var err error
+ if cap, err = rhp.adapter.Get_ofp_device_info(ctx, device); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
+ logger.Debugw(ctx, "Get_ofp_device_info", log.Fields{"cap": cap})
+ return cap, nil
+}
+
+func (rhp *RequestHandlerProxy) Process_inter_adapter_message(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
+ if len(args) < 2 {
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ iaMsg := &ic.InterAdapterMessage{}
+ transactionID := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "msg":
+ if err := ptypes.UnmarshalAny(arg.Value, iaMsg); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+
+ logger.Debugw(ctx, "Process_inter_adapter_message", log.Fields{"msgId": iaMsg.Header.Id})
+
+ //Invoke the inter adapter API on the handler
+ if err := rhp.adapter.Process_inter_adapter_message(ctx, iaMsg); err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
+
+ return new(empty.Empty), nil
+}
+
+func (rhp *RequestHandlerProxy) Download_image(args []*ic.Argument) (*voltha.ImageDownload, error) {
+ return &voltha.ImageDownload{}, nil
+}
+
+func (rhp *RequestHandlerProxy) Get_image_download_status(args []*ic.Argument) (*voltha.ImageDownload, error) {
+ return &voltha.ImageDownload{}, nil
+}
+
+func (rhp *RequestHandlerProxy) Cancel_image_download(args []*ic.Argument) (*voltha.ImageDownload, error) {
+ return &voltha.ImageDownload{}, nil
+}
+
+func (rhp *RequestHandlerProxy) Activate_image_update(args []*ic.Argument) (*voltha.ImageDownload, error) {
+ return &voltha.ImageDownload{}, nil
+}
+
+func (rhp *RequestHandlerProxy) Revert_image_update(args []*ic.Argument) (*voltha.ImageDownload, error) {
+ return &voltha.ImageDownload{}, nil
+}
+
+func (rhp *RequestHandlerProxy) Enable_port(ctx context.Context, args []*ic.Argument) error {
+ logger.Debugw(ctx, "enable_port", log.Fields{"args": args})
+ deviceId, port, err := rhp.getEnableDisableParams(ctx, args)
+ if err != nil {
+ logger.Warnw(ctx, "enable_port", log.Fields{"args": args, "deviceId": deviceId, "port": port})
+ return err
+ }
+ return rhp.adapter.Enable_port(ctx, deviceId, port)
+}
+
+func (rhp *RequestHandlerProxy) Disable_port(ctx context.Context, args []*ic.Argument) error {
+ logger.Debugw(ctx, "disable_port", log.Fields{"args": args})
+ deviceId, port, err := rhp.getEnableDisableParams(ctx, args)
+ if err != nil {
+ logger.Warnw(ctx, "disable_port", log.Fields{"args": args, "deviceId": deviceId, "port": port})
+ return err
+ }
+ return rhp.adapter.Disable_port(ctx, deviceId, port)
+}
+
+func (rhp *RequestHandlerProxy) getEnableDisableParams(ctx context.Context, args []*ic.Argument) (string, *voltha.Port, error) {
+ logger.Debugw(ctx, "getEnableDisableParams", log.Fields{"args": args})
+ if len(args) < 3 {
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
+ return "", nil, errors.New("invalid-number-of-args")
+ }
+ deviceId := &ic.StrType{}
+ port := &voltha.Port{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "deviceId":
+ if err := ptypes.UnmarshalAny(arg.Value, deviceId); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
+ return "", nil, err
+ }
+ case "port":
+ if err := ptypes.UnmarshalAny(arg.Value, port); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-port", log.Fields{"error": err})
+ return "", nil, err
+ }
+ }
+ }
+ return deviceId.Val, port, nil
+}
+
+func (rhp *RequestHandlerProxy) Child_device_lost(ctx context.Context, args []*ic.Argument) error {
+ if len(args) < 4 {
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
+ return errors.New("invalid-number-of-args")
+ }
+
+ pDeviceId := &ic.StrType{}
+ pPortNo := &ic.IntType{}
+ onuID := &ic.IntType{}
+ fromTopic := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "pDeviceId":
+ if err := ptypes.UnmarshalAny(arg.Value, pDeviceId); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-parent-deviceId", log.Fields{"error": err})
+ return err
+ }
+ case "pPortNo":
+ if err := ptypes.UnmarshalAny(arg.Value, pPortNo); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-port", log.Fields{"error": err})
+ return err
+ }
+ case "onuID":
+ if err := ptypes.UnmarshalAny(arg.Value, onuID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return err
+ }
+ case kafka.FromTopic:
+ if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-from-topic", log.Fields{"error": err})
+ return err
+ }
+ }
+ }
+ //Update the core reference for that device
+ rhp.coreProxy.UpdateCoreReference(pDeviceId.Val, fromTopic.Val)
+ //Invoke the Child_device_lost API on the adapter
+ if err := rhp.adapter.Child_device_lost(ctx, pDeviceId.Val, uint32(pPortNo.Val), uint32(onuID.Val)); err != nil {
+ return status.Errorf(codes.NotFound, "%s", err.Error())
+ }
+ return nil
+}
+
+func (rhp *RequestHandlerProxy) Start_omci_test(ctx context.Context, args []*ic.Argument) (*ic.TestResponse, error) {
+ if len(args) < 2 {
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+
+ // TODO: See related comment in voltha-go:adapter_proxy_go:startOmciTest()
+ // Second argument should perhaps be uuid instead of omcitestrequest
+
+ device := &voltha.Device{}
+ request := &voltha.OmciTestRequest{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case "omcitestrequest":
+ if err := ptypes.UnmarshalAny(arg.Value, request); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-omcitestrequest", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ logger.Debugw(ctx, "Start_omci_test", log.Fields{"device-id": device.Id, "req": request})
+ result, err := rhp.adapter.Start_omci_test(ctx, device, request)
+ if err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
+ return result, nil
+}
+func (rhp *RequestHandlerProxy) Get_ext_value(ctx context.Context, args []*ic.Argument) (*voltha.ReturnValues, error) {
+ if len(args) < 3 {
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
+ return nil, errors.New("invalid-number-of-args")
+ }
+
+ pDeviceId := &ic.StrType{}
+ device := &voltha.Device{}
+ valuetype := &ic.IntType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case "pDeviceId":
+ if err := ptypes.UnmarshalAny(arg.Value, pDeviceId); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-parent-deviceId", log.Fields{"error": err})
+ return nil, err
+ }
+ case "valuetype":
+ if err := ptypes.UnmarshalAny(arg.Value, valuetype); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-valuetype", log.Fields{"error": err})
+ return nil, err
+ }
+ default:
+ logger.Warnw(ctx, "key-not-found", log.Fields{"arg.Key": arg.Key})
+ }
+ }
+
+ //Invoke the Get_value API on the adapter
+ return rhp.adapter.Get_ext_value(ctx, pDeviceId.Val, device, voltha.ValueType_Type(valuetype.Val))
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/utils.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/utils.go
new file mode 100644
index 0000000..3d91119
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/utils.go
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package common
+
+import (
+ "context"
+ "fmt"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ "google.golang.org/grpc/codes"
+ "math/rand"
+ "time"
+)
+
+//GetRandomSerialNumber returns a serial number formatted as "HOST:PORT"
+func GetRandomSerialNumber() string {
+ rand.Seed(time.Now().UnixNano())
+ return fmt.Sprintf("%d.%d.%d.%d:%d",
+ rand.Intn(255),
+ rand.Intn(255),
+ rand.Intn(255),
+ rand.Intn(255),
+ rand.Intn(9000)+1000,
+ )
+}
+
+//GetRandomMacAddress returns a random mac address
+func GetRandomMacAddress() string {
+ rand.Seed(time.Now().UnixNano())
+ return fmt.Sprintf("%02x:%02x:%02x:%02x:%02x:%02x",
+ rand.Intn(128),
+ rand.Intn(128),
+ rand.Intn(128),
+ rand.Intn(128),
+ rand.Intn(128),
+ rand.Intn(128),
+ )
+}
+
+const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
+const (
+ letterIdxBits = 6 // 6 bits to represent a letter index
+ letterIdxMask = 1<<letterIdxBits - 1 // All 1-bits, as many as letterIdxBits
+ letterIdxMax = 63 / letterIdxBits // # of letter indices fitting in 63 bits
+)
+
+var src = rand.NewSource(time.Now().UnixNano())
+
+func GetRandomString(n int) string {
+ b := make([]byte, n)
+ // A src.Int63() generates 63 random bits, enough for letterIdxMax characters!
+ for i, cache, remain := n-1, src.Int63(), letterIdxMax; i >= 0; {
+ if remain == 0 {
+ cache, remain = src.Int63(), letterIdxMax
+ }
+ if idx := int(cache & letterIdxMask); idx < len(letterBytes) {
+ b[i] = letterBytes[idx]
+ i--
+ }
+ cache >>= letterIdxBits
+ remain--
+ }
+ return string(b)
+}
+
+func ICProxyErrorCodeToGrpcErrorCode(ctx context.Context, icErr ic.ErrorCodeCodes) codes.Code {
+ switch icErr {
+ case ic.ErrorCode_INVALID_PARAMETERS:
+ return codes.InvalidArgument
+ case ic.ErrorCode_UNSUPPORTED_REQUEST:
+ return codes.Unavailable
+ case ic.ErrorCode_DEADLINE_EXCEEDED:
+ return codes.DeadlineExceeded
+ default:
+ logger.Warnw(ctx, "cannnot-map-ic-error-code-to-grpc-error-code", log.Fields{"err": icErr})
+ return codes.Internal
+ }
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/iAdapter.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/iAdapter.go
new file mode 100644
index 0000000..ce0b791
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/iAdapter.go
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adapters
+
+import (
+ "context"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+)
+
+//IAdapter represents the set of APIs a voltha adapter has to support.
+type IAdapter interface {
+ Adapter_descriptor(ctx context.Context) error
+ Device_types(ctx context.Context) (*voltha.DeviceTypes, error)
+ Health(ctx context.Context) (*voltha.HealthStatus, error)
+ Adopt_device(ctx context.Context, device *voltha.Device) error
+ Reconcile_device(ctx context.Context, device *voltha.Device) error
+ Abandon_device(ctx context.Context, device *voltha.Device) error
+ Disable_device(ctx context.Context, device *voltha.Device) error
+ Reenable_device(ctx context.Context, device *voltha.Device) error
+ Reboot_device(ctx context.Context, device *voltha.Device) error
+ Self_test_device(ctx context.Context, device *voltha.Device) error
+ Delete_device(ctx context.Context, device *voltha.Device) error
+ Get_device_details(ctx context.Context, device *voltha.Device) error
+ Update_flows_bulk(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) error
+ Update_flows_incrementally(ctx context.Context, device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error
+ Update_pm_config(ctx context.Context, device *voltha.Device, pm_configs *voltha.PmConfigs) error
+ Receive_packet_out(ctx context.Context, deviceId string, egress_port_no int, msg *openflow_13.OfpPacketOut) error
+ Suppress_event(ctx context.Context, filter *voltha.EventFilter) error
+ Unsuppress_event(ctx context.Context, filter *voltha.EventFilter) error
+ Get_ofp_device_info(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error)
+ Process_inter_adapter_message(ctx context.Context, msg *ic.InterAdapterMessage) error
+ Download_image(ctx context.Context, device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
+ Get_image_download_status(ctx context.Context, device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
+ Cancel_image_download(ctx context.Context, device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
+ Activate_image_update(ctx context.Context, device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
+ Revert_image_update(ctx context.Context, device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
+ Enable_port(ctx context.Context, deviceId string, port *voltha.Port) error
+ Disable_port(ctx context.Context, deviceId string, port *voltha.Port) error
+ Child_device_lost(ctx context.Context, parentDeviceId string, parentPortNo uint32, onuID uint32) error
+ Start_omci_test(ctx context.Context, device *voltha.Device, request *voltha.OmciTestRequest) (*voltha.TestResponse, error)
+ Get_ext_value(ctx context.Context, deviceId string, device *voltha.Device, valueflag voltha.ValueType_Type) (*voltha.ReturnValues, error)
+}