[VOL-2941] Upgrading to latest protos and lib
Change-Id: Ie0a92172023f629744f97a7499335cef490dcc3f
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/README.md b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/README.md
deleted file mode 100644
index 13479f8..0000000
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/README.md
+++ /dev/null
@@ -1,10 +0,0 @@
-## How to Build and Run a Voltha Go language Adapter
-
-This directory is a repo for all voltha adapters written in Go language. At this time, the simulated_olt and
-simulated_onu adapters are the only adapters using the Go language. These adapters provide basic capabilities
-which will be used for high availability and capacity testing.
-
-### Building and running the Simulated OLT and ONU Adapters
-
-Please refer to the ```BUILD.md``` file under the voltha-go repo
-
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/common/core_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/common/core_proxy.go
deleted file mode 100644
index b0f7f32..0000000
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/common/core_proxy.go
+++ /dev/null
@@ -1,560 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package common
-
-import (
- "context"
- "github.com/golang/protobuf/ptypes"
- a "github.com/golang/protobuf/ptypes/any"
- "github.com/opencord/voltha-lib-go/v2/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- ic "github.com/opencord/voltha-protos/v2/go/inter_container"
- "github.com/opencord/voltha-protos/v2/go/voltha"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
- "sync"
-)
-
-type CoreProxy struct {
- kafkaICProxy *kafka.InterContainerProxy
- adapterTopic string
- coreTopic string
- deviceIdCoreMap map[string]string
- lockDeviceIdCoreMap sync.RWMutex
-}
-
-func NewCoreProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
- var proxy CoreProxy
- proxy.kafkaICProxy = kafkaProxy
- proxy.adapterTopic = adapterTopic
- proxy.coreTopic = coreTopic
- proxy.deviceIdCoreMap = make(map[string]string)
- proxy.lockDeviceIdCoreMap = sync.RWMutex{}
- log.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
-
- return &proxy
-}
-
-func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
- if success {
- return nil
- } else {
- unpackResult := &ic.Error{}
- var err error
- if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- }
- log.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
- // TODO: Need to get the real error code
- return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
- }
-}
-
-// UpdateCoreReference adds or update a core reference (really the topic name) for a given device Id
-func (ap *CoreProxy) UpdateCoreReference(deviceId string, coreReference string) {
- ap.lockDeviceIdCoreMap.Lock()
- defer ap.lockDeviceIdCoreMap.Unlock()
- ap.deviceIdCoreMap[deviceId] = coreReference
-}
-
-// DeleteCoreReference removes a core reference (really the topic name) for a given device Id
-func (ap *CoreProxy) DeleteCoreReference(deviceId string) {
- ap.lockDeviceIdCoreMap.Lock()
- defer ap.lockDeviceIdCoreMap.Unlock()
- delete(ap.deviceIdCoreMap, deviceId)
-}
-
-func (ap *CoreProxy) getCoreTopic(deviceId string) kafka.Topic {
- ap.lockDeviceIdCoreMap.Lock()
- defer ap.lockDeviceIdCoreMap.Unlock()
-
- if t, exist := ap.deviceIdCoreMap[deviceId]; exist {
- return kafka.Topic{Name: t}
- }
-
- return kafka.Topic{Name: ap.coreTopic}
-}
-
-func (ap *CoreProxy) getAdapterTopic(args ...string) kafka.Topic {
- return kafka.Topic{Name: ap.adapterTopic}
-}
-
-func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
- log.Debugw("registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
- rpc := "Register"
- topic := kafka.Topic{Name: ap.coreTopic}
- replyToTopic := ap.getAdapterTopic()
- args := make([]*kafka.KVArg, 2)
- args[0] = &kafka.KVArg{
- Key: "adapter",
- Value: adapter,
- }
- args[1] = &kafka.KVArg{
- Key: "deviceTypes",
- Value: deviceTypes,
- }
-
- success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, "", args...)
- log.Debugw("Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
- return unPackResponse(rpc, "", success, result)
-}
-
-func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
- log.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
- rpc := "DeviceUpdate"
- toTopic := ap.getCoreTopic(device.Id)
- args := make([]*kafka.KVArg, 1)
- args[0] = &kafka.KVArg{
- Key: "device",
- Value: device,
- }
- // Use a device specific topic as we are the only adaptercore handling requests for this device
- replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, device.Id, args...)
- log.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
- return unPackResponse(rpc, device.Id, success, result)
-}
-
-func (ap *CoreProxy) PortCreated(ctx context.Context, deviceId string, port *voltha.Port) error {
- log.Debugw("PortCreated", log.Fields{"portNo": port.PortNo})
- rpc := "PortCreated"
- // Use a device specific topic to send the request. The adapter handling the device creates a device
- // specific topic
- toTopic := ap.getCoreTopic(deviceId)
- args := make([]*kafka.KVArg, 2)
- id := &voltha.ID{Id: deviceId}
- args[0] = &kafka.KVArg{
- Key: "device_id",
- Value: id,
- }
- args[1] = &kafka.KVArg{
- Key: "port",
- Value: port,
- }
-
- // Use a device specific topic as we are the only adaptercore handling requests for this device
- replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
- log.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
- return unPackResponse(rpc, deviceId, success, result)
-}
-
-func (ap *CoreProxy) PortsStateUpdate(ctx context.Context, deviceId string, operStatus voltha.OperStatus_OperStatus) error {
- log.Debugw("PortsStateUpdate", log.Fields{"deviceId": deviceId})
- rpc := "PortsStateUpdate"
- // Use a device specific topic to send the request. The adapter handling the device creates a device
- // specific topic
- toTopic := ap.getCoreTopic(deviceId)
- args := make([]*kafka.KVArg, 2)
- id := &voltha.ID{Id: deviceId}
- oStatus := &ic.IntType{Val: int64(operStatus)}
-
- args[0] = &kafka.KVArg{
- Key: "device_id",
- Value: id,
- }
- args[1] = &kafka.KVArg{
- Key: "oper_status",
- Value: oStatus,
- }
-
- // Use a device specific topic as we are the only adaptercore handling requests for this device
- replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
- log.Debugw("PortsStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
- return unPackResponse(rpc, deviceId, success, result)
-}
-
-func (ap *CoreProxy) DeleteAllPorts(ctx context.Context, deviceId string) error {
- log.Debugw("DeleteAllPorts", log.Fields{"deviceId": deviceId})
- rpc := "DeleteAllPorts"
- // Use a device specific topic to send the request. The adapter handling the device creates a device
- // specific topic
- toTopic := ap.getCoreTopic(deviceId)
- args := make([]*kafka.KVArg, 2)
- id := &voltha.ID{Id: deviceId}
-
- args[0] = &kafka.KVArg{
- Key: "device_id",
- Value: id,
- }
-
- // Use a device specific topic as we are the only adaptercore handling requests for this device
- replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
- log.Debugw("DeleteAllPorts-response", log.Fields{"deviceId": deviceId, "success": success})
- return unPackResponse(rpc, deviceId, success, result)
-}
-
-func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
- connStatus voltha.ConnectStatus_ConnectStatus, operStatus voltha.OperStatus_OperStatus) error {
- log.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId})
- rpc := "DeviceStateUpdate"
- // Use a device specific topic to send the request. The adapter handling the device creates a device
- // specific topic
- toTopic := ap.getCoreTopic(deviceId)
- args := make([]*kafka.KVArg, 3)
- id := &voltha.ID{Id: deviceId}
- oStatus := &ic.IntType{Val: int64(operStatus)}
- cStatus := &ic.IntType{Val: int64(connStatus)}
-
- args[0] = &kafka.KVArg{
- Key: "device_id",
- Value: id,
- }
- args[1] = &kafka.KVArg{
- Key: "oper_status",
- Value: oStatus,
- }
- args[2] = &kafka.KVArg{
- Key: "connect_status",
- Value: cStatus,
- }
- // Use a device specific topic as we are the only adaptercore handling requests for this device
- replyToTopic := ap.getAdapterTopic()
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
- log.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
- return unPackResponse(rpc, deviceId, success, result)
-}
-
-func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
- childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64) (*voltha.Device, error) {
- log.Debugw("ChildDeviceDetected", log.Fields{"pDeviceId": parentDeviceId, "channelId": channelId})
- rpc := "ChildDeviceDetected"
- // Use a device specific topic to send the request. The adapter handling the device creates a device
- // specific topic
- toTopic := ap.getCoreTopic(parentDeviceId)
- replyToTopic := ap.getAdapterTopic()
-
- args := make([]*kafka.KVArg, 7)
- id := &voltha.ID{Id: parentDeviceId}
- args[0] = &kafka.KVArg{
- Key: "parent_device_id",
- Value: id,
- }
- ppn := &ic.IntType{Val: int64(parentPortNo)}
- args[1] = &kafka.KVArg{
- Key: "parent_port_no",
- Value: ppn,
- }
- cdt := &ic.StrType{Val: childDeviceType}
- args[2] = &kafka.KVArg{
- Key: "child_device_type",
- Value: cdt,
- }
- channel := &ic.IntType{Val: int64(channelId)}
- args[3] = &kafka.KVArg{
- Key: "channel_id",
- Value: channel,
- }
- vId := &ic.StrType{Val: vendorId}
- args[4] = &kafka.KVArg{
- Key: "vendor_id",
- Value: vId,
- }
- sNo := &ic.StrType{Val: serialNumber}
- args[5] = &kafka.KVArg{
- Key: "serial_number",
- Value: sNo,
- }
- oId := &ic.IntType{Val: int64(onuId)}
- args[6] = &kafka.KVArg{
- Key: "onu_id",
- Value: oId,
- }
-
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
- log.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
-
- if success {
- volthaDevice := &voltha.Device{}
- if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
- }
- return volthaDevice, nil
- } else {
- unpackResult := &ic.Error{}
- var err error
- if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- }
- log.Debugw("ChildDeviceDetected-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
- // TODO: Need to get the real error code
- return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
- }
-
-}
-
-func (ap *CoreProxy) ChildDevicesLost(ctx context.Context, parentDeviceId string) error {
- log.Debugw("ChildDevicesLost", log.Fields{"pDeviceId": parentDeviceId})
- rpc := "ChildDevicesLost"
- // Use a device specific topic to send the request. The adapter handling the device creates a device
- // specific topic
- toTopic := ap.getCoreTopic(parentDeviceId)
- replyToTopic := ap.getAdapterTopic()
-
- args := make([]*kafka.KVArg, 1)
- id := &voltha.ID{Id: parentDeviceId}
- args[0] = &kafka.KVArg{
- Key: "parent_device_id",
- Value: id,
- }
-
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
- log.Debugw("ChildDevicesLost-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
- return unPackResponse(rpc, parentDeviceId, success, result)
-}
-
-func (ap *CoreProxy) ChildDevicesDetected(ctx context.Context, parentDeviceId string) error {
- log.Debugw("ChildDevicesDetected", log.Fields{"pDeviceId": parentDeviceId})
- rpc := "ChildDevicesDetected"
- // Use a device specific topic to send the request. The adapter handling the device creates a device
- // specific topic
- toTopic := ap.getCoreTopic(parentDeviceId)
- replyToTopic := ap.getAdapterTopic()
-
- args := make([]*kafka.KVArg, 1)
- id := &voltha.ID{Id: parentDeviceId}
- args[0] = &kafka.KVArg{
- Key: "parent_device_id",
- Value: id,
- }
-
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
- log.Debugw("ChildDevicesDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
- return unPackResponse(rpc, parentDeviceId, success, result)
-}
-
-func (ap *CoreProxy) GetDevice(ctx context.Context, parentDeviceId string, deviceId string) (*voltha.Device, error) {
- log.Debugw("GetDevice", log.Fields{"deviceId": deviceId})
- rpc := "GetDevice"
-
- toTopic := ap.getCoreTopic(parentDeviceId)
- replyToTopic := ap.getAdapterTopic()
-
- args := make([]*kafka.KVArg, 1)
- id := &voltha.ID{Id: deviceId}
- args[0] = &kafka.KVArg{
- Key: "device_id",
- Value: id,
- }
-
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
- log.Debugw("GetDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
-
- if success {
- volthaDevice := &voltha.Device{}
- if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
- }
- return volthaDevice, nil
- } else {
- unpackResult := &ic.Error{}
- var err error
- if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- }
- log.Debugw("GetDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
- // TODO: Need to get the real error code
- return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
- }
-}
-
-func (ap *CoreProxy) GetChildDevice(ctx context.Context, parentDeviceId string, kwargs map[string]interface{}) (*voltha.Device, error) {
- log.Debugw("GetChildDevice", log.Fields{"parentDeviceId": parentDeviceId, "kwargs": kwargs})
- rpc := "GetChildDevice"
-
- toTopic := ap.getCoreTopic(parentDeviceId)
- replyToTopic := ap.getAdapterTopic()
-
- args := make([]*kafka.KVArg, 4)
- id := &voltha.ID{Id: parentDeviceId}
- args[0] = &kafka.KVArg{
- Key: "device_id",
- Value: id,
- }
-
- var cnt uint8 = 0
- for k, v := range kwargs {
- cnt += 1
- if k == "serial_number" {
- val := &ic.StrType{Val: v.(string)}
- args[cnt] = &kafka.KVArg{
- Key: k,
- Value: val,
- }
- } else if k == "onu_id" {
- val := &ic.IntType{Val: int64(v.(uint32))}
- args[cnt] = &kafka.KVArg{
- Key: k,
- Value: val,
- }
- } else if k == "parent_port_no" {
- val := &ic.IntType{Val: int64(v.(uint32))}
- args[cnt] = &kafka.KVArg{
- Key: k,
- Value: val,
- }
- }
- }
-
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
- log.Debugw("GetChildDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
-
- if success {
- volthaDevice := &voltha.Device{}
- if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
- }
- return volthaDevice, nil
- } else {
- unpackResult := &ic.Error{}
- var err error
- if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- }
- log.Debugw("GetChildDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
- // TODO: Need to get the real error code
- return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
- }
-}
-
-func (ap *CoreProxy) GetChildDevices(ctx context.Context, parentDeviceId string) (*voltha.Devices, error) {
- log.Debugw("GetChildDevices", log.Fields{"parentDeviceId": parentDeviceId})
- rpc := "GetChildDevices"
-
- toTopic := ap.getCoreTopic(parentDeviceId)
- replyToTopic := ap.getAdapterTopic()
-
- args := make([]*kafka.KVArg, 1)
- id := &voltha.ID{Id: parentDeviceId}
- args[0] = &kafka.KVArg{
- Key: "device_id",
- Value: id,
- }
-
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
- log.Debugw("GetChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
-
- if success {
- volthaDevices := &voltha.Devices{}
- if err := ptypes.UnmarshalAny(result, volthaDevices); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
- }
- return volthaDevices, nil
- } else {
- unpackResult := &ic.Error{}
- var err error
- if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- }
- log.Debugw("GetChildDevices-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
- // TODO: Need to get the real error code
- return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
- }
-}
-
-func (ap *CoreProxy) SendPacketIn(ctx context.Context, deviceId string, port uint32, pktPayload []byte) error {
- log.Debugw("SendPacketIn", log.Fields{"deviceId": deviceId, "port": port, "pktPayload": pktPayload})
- rpc := "PacketIn"
- // Use a device specific topic to send the request. The adapter handling the device creates a device
- // specific topic
- toTopic := ap.getCoreTopic(deviceId)
- replyToTopic := ap.getAdapterTopic()
-
- args := make([]*kafka.KVArg, 3)
- id := &voltha.ID{Id: deviceId}
- args[0] = &kafka.KVArg{
- Key: "device_id",
- Value: id,
- }
- portNo := &ic.IntType{Val: int64(port)}
- args[1] = &kafka.KVArg{
- Key: "port",
- Value: portNo,
- }
- pkt := &ic.Packet{Payload: pktPayload}
- args[2] = &kafka.KVArg{
- Key: "packet",
- Value: pkt,
- }
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
- log.Debugw("SendPacketIn-response", log.Fields{"pDeviceId": deviceId, "success": success})
- return unPackResponse(rpc, deviceId, success, result)
-}
-
-func (ap *CoreProxy) DeviceReasonUpdate(ctx context.Context, deviceId string, deviceReason string) error {
- log.Debugw("DeviceReasonUpdate", log.Fields{"deviceId": deviceId, "deviceReason": deviceReason})
- rpc := "DeviceReasonUpdate"
- // Use a device specific topic to send the request. The adapter handling the device creates a device
- // specific topic
- toTopic := ap.getCoreTopic(deviceId)
- replyToTopic := ap.getAdapterTopic()
-
- args := make([]*kafka.KVArg, 2)
- id := &voltha.ID{Id: deviceId}
- args[0] = &kafka.KVArg{
- Key: "device_id",
- Value: id,
- }
- reason := &ic.StrType{Val: deviceReason}
- args[1] = &kafka.KVArg{
- Key: "device_reason",
- Value: reason,
- }
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, deviceId, args...)
- log.Debugw("DeviceReason-response", log.Fields{"pDeviceId": deviceId, "success": success})
- return unPackResponse(rpc, deviceId, success, result)
-}
-
-func (ap *CoreProxy) DevicePMConfigUpdate(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
- log.Debugw("DevicePMConfigUpdate", log.Fields{"pmConfigs": pmConfigs})
- rpc := "DevicePMConfigUpdate"
- // Use a device specific topic to send the request. The adapter handling the device creates a device
- // specific topic
- toTopic := ap.getCoreTopic(pmConfigs.Id)
- replyToTopic := ap.getAdapterTopic()
-
- args := make([]*kafka.KVArg, 1)
- args[0] = &kafka.KVArg{
- Key: "device_pm_config",
- Value: pmConfigs,
- }
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, pmConfigs.Id, args...)
- log.Debugw("DevicePMConfigUpdate-response", log.Fields{"pDeviceId": pmConfigs.Id, "success": success})
- return unPackResponse(rpc, pmConfigs.Id, success, result)
-}
-
-func (ap *CoreProxy) ReconcileChildDevices(ctx context.Context, parentDeviceId string) error {
- log.Debugw("ReconcileChildDevices", log.Fields{"parentDeviceId": parentDeviceId})
- rpc := "ReconcileChildDevices"
- // Use a device specific topic to send the request. The adapter handling the device creates a device
- // specific topic
- toTopic := ap.getCoreTopic(parentDeviceId)
- replyToTopic := ap.getAdapterTopic()
-
- args := []*kafka.KVArg{
- {Key: "parent_device_id", Value: &voltha.ID{Id: parentDeviceId}},
- }
-
- success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
- log.Debugw("ReconcileChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
- return unPackResponse(rpc, parentDeviceId, success, result)
-}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/kafka_inter_container_library.go
deleted file mode 100644
index 3326191..0000000
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/kafka_inter_container_library.go
+++ /dev/null
@@ -1,841 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka
-
-import (
- "context"
- "errors"
- "fmt"
- "github.com/golang/protobuf/proto"
- "github.com/golang/protobuf/ptypes"
- "github.com/golang/protobuf/ptypes/any"
- "github.com/google/uuid"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- ic "github.com/opencord/voltha-protos/v2/go/inter_container"
- "reflect"
- "strings"
- "sync"
- "time"
-)
-
-// Initialize the logger - gets the default until the main function setup the logger
-func init() {
- log.AddPackage(log.JSON, log.DebugLevel, nil)
-}
-
-const (
- DefaultMaxRetries = 3
- DefaultRequestTimeout = 10000 // 10000 milliseconds - to handle a wider latency range
-)
-
-const (
- TransactionKey = "transactionID"
- FromTopic = "fromTopic"
-)
-
-var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
-var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
-
-// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
-// obtained from that channel, this interface is invoked. This is used to handle
-// async requests into the Core via the kafka messaging bus
-type requestHandlerChannel struct {
- requesthandlerInterface interface{}
- ch <-chan *ic.InterContainerMessage
-}
-
-// transactionChannel represents a combination of a topic and a channel onto which a response received
-// on the kafka bus will be sent to
-type transactionChannel struct {
- topic *Topic
- ch chan *ic.InterContainerMessage
-}
-
-// InterContainerProxy represents the messaging proxy
-type InterContainerProxy struct {
- kafkaHost string
- kafkaPort int
- DefaultTopic *Topic
- defaultRequestHandlerInterface interface{}
- deviceDiscoveryTopic *Topic
- kafkaClient Client
- doneCh chan int
-
- // This map is used to map a topic to an interface and channel. When a request is received
- // on that channel (registered to the topic) then that interface is invoked.
- topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
- lockTopicRequestHandlerChannelMap sync.RWMutex
-
- // This map is used to map a channel to a response topic. This channel handles all responses on that
- // channel for that topic and forward them to the appropriate consumers channel, using the
- // transactionIdToChannelMap.
- topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
- lockTopicResponseChannelMap sync.RWMutex
-
- // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
- // sent out and we are waiting for a response.
- transactionIdToChannelMap map[string]*transactionChannel
- lockTransactionIdToChannelMap sync.RWMutex
-}
-
-type InterContainerProxyOption func(*InterContainerProxy)
-
-func InterContainerHost(host string) InterContainerProxyOption {
- return func(args *InterContainerProxy) {
- args.kafkaHost = host
- }
-}
-
-func InterContainerPort(port int) InterContainerProxyOption {
- return func(args *InterContainerProxy) {
- args.kafkaPort = port
- }
-}
-
-func DefaultTopic(topic *Topic) InterContainerProxyOption {
- return func(args *InterContainerProxy) {
- args.DefaultTopic = topic
- }
-}
-
-func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
- return func(args *InterContainerProxy) {
- args.deviceDiscoveryTopic = topic
- }
-}
-
-func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
- return func(args *InterContainerProxy) {
- args.defaultRequestHandlerInterface = handler
- }
-}
-
-func MsgClient(client Client) InterContainerProxyOption {
- return func(args *InterContainerProxy) {
- args.kafkaClient = client
- }
-}
-
-func NewInterContainerProxy(opts ...InterContainerProxyOption) (*InterContainerProxy, error) {
- proxy := &InterContainerProxy{
- kafkaHost: DefaultKafkaHost,
- kafkaPort: DefaultKafkaPort,
- }
-
- for _, option := range opts {
- option(proxy)
- }
-
- // Create the locks for all the maps
- proxy.lockTopicRequestHandlerChannelMap = sync.RWMutex{}
- proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
- proxy.lockTopicResponseChannelMap = sync.RWMutex{}
-
- return proxy, nil
-}
-
-func (kp *InterContainerProxy) Start() error {
- log.Info("Starting-Proxy")
-
- // Kafka MsgClient should already have been created. If not, output fatal error
- if kp.kafkaClient == nil {
- log.Fatal("kafka-client-not-set")
- }
-
- // Create the Done channel
- kp.doneCh = make(chan int, 1)
-
- // Start the kafka client
- if err := kp.kafkaClient.Start(); err != nil {
- log.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
- return err
- }
-
- // Create the topic to response channel map
- kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
- //
- // Create the transactionId to Channel Map
- kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
-
- // Create the topic to request channel map
- kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
-
- return nil
-}
-
-func (kp *InterContainerProxy) Stop() {
- log.Info("stopping-intercontainer-proxy")
- kp.doneCh <- 1
- // TODO : Perform cleanup
- kp.kafkaClient.Stop()
- //kp.deleteAllTopicRequestHandlerChannelMap()
- //kp.deleteAllTopicResponseChannelMap()
- //kp.deleteAllTransactionIdToChannelMap()
-}
-
-// DeviceDiscovered publish the discovered device onto the kafka messaging bus
-func (kp *InterContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
- log.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
- // Simple validation
- if deviceId == "" || deviceType == "" {
- log.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
- return errors.New("invalid-parameters")
- }
- // Create the device discovery message
- header := &ic.Header{
- Id: uuid.New().String(),
- Type: ic.MessageType_DEVICE_DISCOVERED,
- FromTopic: kp.DefaultTopic.Name,
- ToTopic: kp.deviceDiscoveryTopic.Name,
- Timestamp: time.Now().UnixNano(),
- }
- body := &ic.DeviceDiscovered{
- Id: deviceId,
- DeviceType: deviceType,
- ParentId: parentId,
- Publisher: publisher,
- }
-
- var marshalledData *any.Any
- var err error
- if marshalledData, err = ptypes.MarshalAny(body); err != nil {
- log.Errorw("cannot-marshal-request", log.Fields{"error": err})
- return err
- }
- msg := &ic.InterContainerMessage{
- Header: header,
- Body: marshalledData,
- }
-
- // Send the message
- if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
- log.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
- return err
- }
- return nil
-}
-
-// InvokeRPC is used to send a request to a given topic
-func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
- waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
-
- // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
- // typically the device ID.
- responseTopic := replyToTopic
- if responseTopic == nil {
- responseTopic = kp.DefaultTopic
- }
-
- // Encode the request
- protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
- if err != nil {
- log.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
- return false, nil
- }
-
- // Subscribe for response, if needed, before sending request
- var ch <-chan *ic.InterContainerMessage
- if waitForResponse {
- var err error
- if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
- log.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
- }
- }
-
- // Send request - if the topic is formatted with a device Id then we will send the request using a
- // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
- // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
- //key := GetDeviceIdFromTopic(*toTopic)
- log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
- go kp.kafkaClient.Send(protoRequest, toTopic, key)
-
- if waitForResponse {
- // Create a child context based on the parent context, if any
- var cancel context.CancelFunc
- childCtx := context.Background()
- if ctx == nil {
- ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
- } else {
- childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
- }
- defer cancel()
-
- // Wait for response as well as timeout or cancellation
- // Remove the subscription for a response on return
- defer kp.unSubscribeForResponse(protoRequest.Header.Id)
- select {
- case msg, ok := <-ch:
- if !ok {
- log.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
- protoError := &ic.Error{Reason: "channel-closed"}
- var marshalledArg *any.Any
- if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
- return false, nil // Should never happen
- }
- return false, marshalledArg
- }
- log.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
- var responseBody *ic.InterContainerResponseBody
- var err error
- if responseBody, err = decodeResponse(msg); err != nil {
- log.Errorw("decode-response-error", log.Fields{"error": err})
- }
- return responseBody.Success, responseBody.Result
- case <-ctx.Done():
- log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
- // pack the error as proto any type
- protoError := &ic.Error{Reason: ctx.Err().Error()}
- var marshalledArg *any.Any
- if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
- return false, nil // Should never happen
- }
- return false, marshalledArg
- case <-childCtx.Done():
- log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
- // pack the error as proto any type
- protoError := &ic.Error{Reason: childCtx.Err().Error()}
- var marshalledArg *any.Any
- if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
- return false, nil // Should never happen
- }
- return false, marshalledArg
- case <-kp.doneCh:
- log.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
- return true, nil
- }
- }
- return true, nil
-}
-
-// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
-// when a message is received on a given topic
-func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
-
- // Subscribe to receive messages for that topic
- var ch <-chan *ic.InterContainerMessage
- var err error
- if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
- //if ch, err = kp.Subscribe(topic); err != nil {
- log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
- return err
- }
-
- kp.defaultRequestHandlerInterface = handler
- kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
- // Launch a go routine to receive and process kafka messages
- go kp.waitForMessages(ch, topic, handler)
-
- return nil
-}
-
-// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
-// when a message is received on a given topic. So far there is only 1 target registered per microservice
-func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
- // Subscribe to receive messages for that topic
- var ch <-chan *ic.InterContainerMessage
- var err error
- if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
- log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
- return err
- }
- kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
-
- // Launch a go routine to receive and process kafka messages
- go kp.waitForMessages(ch, topic, kp.defaultRequestHandlerInterface)
-
- return nil
-}
-
-func (kp *InterContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
- return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
-}
-
-// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
-// responses from that topic.
-func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
- kp.lockTopicResponseChannelMap.Lock()
- defer kp.lockTopicResponseChannelMap.Unlock()
- if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
- kp.topicToResponseChannelMap[topic] = arg
- }
-}
-
-func (kp *InterContainerProxy) isTopicSubscribedForResponse(topic string) bool {
- kp.lockTopicResponseChannelMap.RLock()
- defer kp.lockTopicResponseChannelMap.RUnlock()
- _, exist := kp.topicToResponseChannelMap[topic]
- return exist
-}
-
-func (kp *InterContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
- kp.lockTopicResponseChannelMap.Lock()
- defer kp.lockTopicResponseChannelMap.Unlock()
- if _, exist := kp.topicToResponseChannelMap[topic]; exist {
- // Unsubscribe to this topic first - this will close the subscribed channel
- var err error
- if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
- log.Errorw("unsubscribing-error", log.Fields{"topic": topic})
- }
- delete(kp.topicToResponseChannelMap, topic)
- return err
- } else {
- return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
- }
-}
-
-func (kp *InterContainerProxy) deleteAllTopicResponseChannelMap() error {
- kp.lockTopicResponseChannelMap.Lock()
- defer kp.lockTopicResponseChannelMap.Unlock()
- var err error
- for topic, _ := range kp.topicToResponseChannelMap {
- // Unsubscribe to this topic first - this will close the subscribed channel
- if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
- log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
- }
- delete(kp.topicToResponseChannelMap, topic)
- }
- return err
-}
-
-func (kp *InterContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
- kp.lockTopicRequestHandlerChannelMap.Lock()
- defer kp.lockTopicRequestHandlerChannelMap.Unlock()
- if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
- kp.topicToRequestHandlerChannelMap[topic] = arg
- }
-}
-
-func (kp *InterContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
- kp.lockTopicRequestHandlerChannelMap.Lock()
- defer kp.lockTopicRequestHandlerChannelMap.Unlock()
- if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
- // Close the kafka client client first by unsubscribing to this topic
- kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch)
- delete(kp.topicToRequestHandlerChannelMap, topic)
- return nil
- } else {
- return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
- }
-}
-
-func (kp *InterContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
- kp.lockTopicRequestHandlerChannelMap.Lock()
- defer kp.lockTopicRequestHandlerChannelMap.Unlock()
- var err error
- for topic, _ := range kp.topicToRequestHandlerChannelMap {
- // Close the kafka client client first by unsubscribing to this topic
- if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
- log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
- }
- delete(kp.topicToRequestHandlerChannelMap, topic)
- }
- return err
-}
-
-func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
- kp.lockTransactionIdToChannelMap.Lock()
- defer kp.lockTransactionIdToChannelMap.Unlock()
- if _, exist := kp.transactionIdToChannelMap[id]; !exist {
- kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
- }
-}
-
-func (kp *InterContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
- kp.lockTransactionIdToChannelMap.Lock()
- defer kp.lockTransactionIdToChannelMap.Unlock()
- if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
- // Close the channel first
- close(transChannel.ch)
- delete(kp.transactionIdToChannelMap, id)
- }
-}
-
-func (kp *InterContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
- kp.lockTransactionIdToChannelMap.Lock()
- defer kp.lockTransactionIdToChannelMap.Unlock()
- for key, value := range kp.transactionIdToChannelMap {
- if value.topic.Name == id {
- close(value.ch)
- delete(kp.transactionIdToChannelMap, key)
- }
- }
-}
-
-func (kp *InterContainerProxy) deleteAllTransactionIdToChannelMap() {
- kp.lockTransactionIdToChannelMap.Lock()
- defer kp.lockTransactionIdToChannelMap.Unlock()
- for key, value := range kp.transactionIdToChannelMap {
- close(value.ch)
- delete(kp.transactionIdToChannelMap, key)
- }
-}
-
-func (kp *InterContainerProxy) DeleteTopic(topic Topic) error {
- // If we have any consumers on that topic we need to close them
- if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
- log.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
- }
- if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
- log.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
- }
- kp.deleteTopicTransactionIdToChannelMap(topic.Name)
-
- return kp.kafkaClient.DeleteTopic(&topic)
-}
-
-func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
- // Encode the response argument - needs to be a proto message
- if returnedVal == nil {
- return nil, nil
- }
- protoValue, ok := returnedVal.(proto.Message)
- if !ok {
- log.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
- err := errors.New("response-value-not-proto-message")
- return nil, err
- }
-
- // Marshal the returned value, if any
- var marshalledReturnedVal *any.Any
- var err error
- if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
- log.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
- return nil, err
- }
- return marshalledReturnedVal, nil
-}
-
-func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
- responseHeader := &ic.Header{
- Id: request.Header.Id,
- Type: ic.MessageType_RESPONSE,
- FromTopic: request.Header.ToTopic,
- ToTopic: request.Header.FromTopic,
- Timestamp: time.Now().Unix(),
- }
- responseBody := &ic.InterContainerResponseBody{
- Success: false,
- Result: nil,
- }
- var marshalledResponseBody *any.Any
- var err error
- // Error should never happen here
- if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
- log.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
- }
-
- return &ic.InterContainerMessage{
- Header: responseHeader,
- Body: marshalledResponseBody,
- }
-
-}
-
-//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
-//or an error on failure
-func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
- //log.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
- responseHeader := &ic.Header{
- Id: request.Header.Id,
- Type: ic.MessageType_RESPONSE,
- FromTopic: request.Header.ToTopic,
- ToTopic: request.Header.FromTopic,
- KeyTopic: request.Header.KeyTopic,
- Timestamp: time.Now().UnixNano(),
- }
-
- // Go over all returned values
- var marshalledReturnedVal *any.Any
- var err error
- for _, returnVal := range returnedValues {
- if marshalledReturnedVal, err = encodeReturnedValue(returnVal); err != nil {
- log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
- }
- break // for now we support only 1 returned value - (excluding the error)
- }
-
- responseBody := &ic.InterContainerResponseBody{
- Success: success,
- Result: marshalledReturnedVal,
- }
-
- // Marshal the response body
- var marshalledResponseBody *any.Any
- if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
- log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
- return nil, err
- }
-
- return &ic.InterContainerMessage{
- Header: responseHeader,
- Body: marshalledResponseBody,
- }, nil
-}
-
-func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
- myClassValue := reflect.ValueOf(myClass)
- // Capitalize the first letter in the funcName to workaround the first capital letters required to
- // invoke a function from a different package
- funcName = strings.Title(funcName)
- m := myClassValue.MethodByName(funcName)
- if !m.IsValid() {
- return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
- }
- in := make([]reflect.Value, len(params))
- for i, param := range params {
- in[i] = reflect.ValueOf(param)
- }
- out = m.Call(in)
- return
-}
-
-func (kp *InterContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
- arg := &KVArg{
- Key: TransactionKey,
- Value: &ic.StrType{Val: transactionId},
- }
-
- var marshalledArg *any.Any
- var err error
- if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
- log.Warnw("cannot-add-transactionId", log.Fields{"error": err})
- return currentArgs
- }
- protoArg := &ic.Argument{
- Key: arg.Key,
- Value: marshalledArg,
- }
- return append(currentArgs, protoArg)
-}
-
-func (kp *InterContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
- var marshalledArg *any.Any
- var err error
- if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
- log.Warnw("cannot-add-transactionId", log.Fields{"error": err})
- return currentArgs
- }
- protoArg := &ic.Argument{
- Key: FromTopic,
- Value: marshalledArg,
- }
- return append(currentArgs, protoArg)
-}
-
-func (kp *InterContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
-
- // First extract the header to know whether this is a request - responses are handled by a different handler
- if msg.Header.Type == ic.MessageType_REQUEST {
- var out []reflect.Value
- var err error
-
- // Get the request body
- requestBody := &ic.InterContainerRequestBody{}
- if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
- log.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
- } else {
- log.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
- // let the callee unpack the arguments as its the only one that knows the real proto type
- // Augment the requestBody with the message Id as it will be used in scenarios where cores
- // are set in pairs and competing
- requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
-
- // Augment the requestBody with the From topic name as it will be used in scenarios where a container
- // needs to send an unsollicited message to the currently requested container
- requestBody.Args = kp.addFromTopic(msg.Header.FromTopic, requestBody.Args)
-
- out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
- if err != nil {
- log.Warn(err)
- }
- }
- // Response required?
- if requestBody.ResponseRequired {
- // If we already have an error before then just return that
- var returnError *ic.Error
- var returnedValues []interface{}
- var success bool
- if err != nil {
- returnError = &ic.Error{Reason: err.Error()}
- returnedValues = make([]interface{}, 1)
- returnedValues[0] = returnError
- } else {
- returnedValues = make([]interface{}, 0)
- // Check for errors first
- lastIndex := len(out) - 1
- if out[lastIndex].Interface() != nil { // Error
- if retError, ok := out[lastIndex].Interface().(error); ok {
- if retError.Error() == ErrorTransactionNotAcquired.Error() {
- log.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
- return // Ignore - process is in competing mode and ignored transaction
- }
- returnError = &ic.Error{Reason: retError.Error()}
- returnedValues = append(returnedValues, returnError)
- } else { // Should never happen
- returnError = &ic.Error{Reason: "incorrect-error-returns"}
- returnedValues = append(returnedValues, returnError)
- }
- } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
- log.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
- return // Ignore - should not happen
- } else { // Non-error case
- success = true
- for idx, val := range out {
- //log.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
- if idx != lastIndex {
- returnedValues = append(returnedValues, val.Interface())
- }
- }
- }
- }
-
- var icm *ic.InterContainerMessage
- if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
- log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
- icm = encodeDefaultFailedResponse(msg)
- }
- // To preserve ordering of messages, all messages to a given topic are sent to the same partition
- // by providing a message key. The key is encoded in the topic name. If the deviceId is not
- // present then the key will be empty, hence all messages for a given topic will be sent to all
- // partitions.
- replyTopic := &Topic{Name: msg.Header.FromTopic}
- key := msg.Header.KeyTopic
- log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
- // TODO: handle error response.
- go kp.kafkaClient.Send(icm, replyTopic, key)
- }
- } else if msg.Header.Type == ic.MessageType_RESPONSE {
- log.Debugw("response-received", log.Fields{"msg-header": msg.Header})
- go kp.dispatchResponse(msg)
- } else {
- log.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
- }
-}
-
-func (kp *InterContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
- // Wait for messages
- for msg := range ch {
- //log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
- go kp.handleMessage(msg, targetInterface)
- }
-}
-
-func (kp *InterContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
- kp.lockTransactionIdToChannelMap.RLock()
- defer kp.lockTransactionIdToChannelMap.RUnlock()
- if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
- log.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
- return
- }
- kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
-}
-
-// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
-// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
-// API. There is one response channel waiting for kafka messages before dispatching the message to the
-// corresponding waiting channel
-func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
- log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
-
- // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
- // broadcast any message for this topic to all channels waiting on it.
- ch := make(chan *ic.InterContainerMessage)
- kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
-
- return ch, nil
-}
-
-func (kp *InterContainerProxy) unSubscribeForResponse(trnsId string) error {
- log.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
- kp.deleteFromTransactionIdToChannelMap(trnsId)
- return nil
-}
-
-func (kp *InterContainerProxy) EnableLivenessChannel(enable bool) chan bool {
- return kp.kafkaClient.EnableLivenessChannel(enable)
-}
-
-func (kp *InterContainerProxy) SendLiveness() error {
- return kp.kafkaClient.SendLiveness()
-}
-
-//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
-//or an error on failure
-func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
- requestHeader := &ic.Header{
- Id: uuid.New().String(),
- Type: ic.MessageType_REQUEST,
- FromTopic: replyTopic.Name,
- ToTopic: toTopic.Name,
- KeyTopic: key,
- Timestamp: time.Now().UnixNano(),
- }
- requestBody := &ic.InterContainerRequestBody{
- Rpc: rpc,
- ResponseRequired: true,
- ReplyToTopic: replyTopic.Name,
- }
-
- for _, arg := range kvArgs {
- if arg == nil {
- // In case the caller sends an array with empty args
- continue
- }
- var marshalledArg *any.Any
- var err error
- // ascertain the value interface type is a proto.Message
- protoValue, ok := arg.Value.(proto.Message)
- if !ok {
- log.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
- err := errors.New("argument-value-not-proto-message")
- return nil, err
- }
- if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
- log.Warnw("cannot-marshal-request", log.Fields{"error": err})
- return nil, err
- }
- protoArg := &ic.Argument{
- Key: arg.Key,
- Value: marshalledArg,
- }
- requestBody.Args = append(requestBody.Args, protoArg)
- }
-
- var marshalledData *any.Any
- var err error
- if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
- log.Warnw("cannot-marshal-request", log.Fields{"error": err})
- return nil, err
- }
- request := &ic.InterContainerMessage{
- Header: requestHeader,
- Body: marshalledData,
- }
- return request, nil
-}
-
-func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
- // Extract the message body
- responseBody := ic.InterContainerResponseBody{}
- if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
- return nil, err
- }
- //log.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
-
- return &responseBody, nil
-
-}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif/adapter_proxy_if.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/adapter_proxy_if.go
similarity index 93%
rename from vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif/adapter_proxy_if.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/adapter_proxy_if.go
index 8197170..de5cfc0 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif/adapter_proxy_if.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/adapter_proxy_if.go
@@ -20,7 +20,7 @@
"context"
"github.com/golang/protobuf/proto"
- ic "github.com/opencord/voltha-protos/v2/go/inter_container"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
)
// AdapterProxy interface for AdapterProxy implementation.
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif/core_proxy_if.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/core_proxy_if.go
similarity index 87%
rename from vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif/core_proxy_if.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/core_proxy_if.go
index 9286c0d..9636a7d 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif/core_proxy_if.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/core_proxy_if.go
@@ -18,8 +18,7 @@
import (
"context"
-
- "github.com/opencord/voltha-protos/v2/go/voltha"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
)
// CoreProxy interface for voltha-go coreproxy.
@@ -32,10 +31,10 @@
RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error
DeviceUpdate(ctx context.Context, device *voltha.Device) error
PortCreated(ctx context.Context, deviceID string, port *voltha.Port) error
- PortsStateUpdate(ctx context.Context, deviceID string, operStatus voltha.OperStatus_OperStatus) error
+ PortsStateUpdate(ctx context.Context, deviceID string, operStatus voltha.OperStatus_Types) error
DeleteAllPorts(ctx context.Context, deviceID string) error
DeviceStateUpdate(ctx context.Context, deviceID string,
- connStatus voltha.ConnectStatus_ConnectStatus, operStatus voltha.OperStatus_OperStatus) error
+ connStatus voltha.ConnectStatus_Types, operStatus voltha.OperStatus_Types) error
DevicePMConfigUpdate(ctx context.Context, pmConfigs *voltha.PmConfigs) error
ChildDeviceDetected(ctx context.Context, parentDeviceID string, parentPortNo int,
@@ -48,4 +47,6 @@
GetChildDevices(ctx context.Context, parentDeviceID string) (*voltha.Devices, error)
SendPacketIn(ctx context.Context, deviceID string, port uint32, pktPayload []byte) error
DeviceReasonUpdate(ctx context.Context, deviceID string, deviceReason string) error
+ PortStateUpdate(ctx context.Context, deviceID string, pType voltha.Port_PortType, portNo uint32,
+ operStatus voltha.OperStatus_Types) error
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif/events_proxy_if.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/events_proxy_if.go
similarity index 82%
rename from vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif/events_proxy_if.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/events_proxy_if.go
index b8ea9d8..c144935 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif/events_proxy_if.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif/events_proxy_if.go
@@ -17,7 +17,7 @@
package adapterif
import (
- "github.com/opencord/voltha-protos/v2/go/voltha"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
)
// EventProxy interface for eventproxy
@@ -33,7 +33,7 @@
)
type (
- EventType = voltha.EventType_EventType
- EventCategory = voltha.EventCategory_EventCategory
- EventSubCategory = voltha.EventSubCategory_EventSubCategory
+ EventType = voltha.EventType_Types
+ EventCategory = voltha.EventCategory_Types
+ EventSubCategory = voltha.EventSubCategory_Types
)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/common/adapter_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
similarity index 66%
rename from vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/common/adapter_proxy.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
index 7b09a1f..bbae0ed 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/common/adapter_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
@@ -17,28 +17,33 @@
import (
"context"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
+ "time"
+
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"github.com/google/uuid"
- "github.com/opencord/voltha-lib-go/v2/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- ic "github.com/opencord/voltha-protos/v2/go/inter_container"
- "time"
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
)
type AdapterProxy struct {
- kafkaICProxy *kafka.InterContainerProxy
+ kafkaICProxy kafka.InterContainerProxy
adapterTopic string
coreTopic string
+ endpointMgr kafka.EndpointManager
}
-func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *AdapterProxy {
- var proxy AdapterProxy
- proxy.kafkaICProxy = kafkaProxy
- proxy.adapterTopic = adapterTopic
- proxy.coreTopic = coreTopic
- log.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
+func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string, backend *db.Backend) *AdapterProxy {
+ proxy := AdapterProxy{
+ kafkaICProxy: kafkaProxy,
+ adapterTopic: adapterTopic,
+ coreTopic: coreTopic,
+ endpointMgr: kafka.NewEndpointManager(backend),
+ }
+ logger.Debugw("topics", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
return &proxy
}
@@ -50,14 +55,14 @@
toDeviceId string,
proxyDeviceId string,
messageId string) error {
- log.Debugw("sending-inter-adapter-message", log.Fields{"type": msgType, "from": fromAdapter,
+ logger.Debugw("sending-inter-adapter-message", log.Fields{"type": msgType, "from": fromAdapter,
"to": toAdapter, "toDevice": toDeviceId, "proxyDevice": proxyDeviceId})
//Marshal the message
var marshalledMsg *any.Any
var err error
if marshalledMsg, err = ptypes.MarshalAny(msg); err != nil {
- log.Warnw("cannot-marshal-msg", log.Fields{"error": err})
+ logger.Warnw("cannot-marshal-msg", log.Fields{"error": err})
return err
}
@@ -86,11 +91,15 @@
}
// Set up the required rpc arguments
- topic := kafka.Topic{Name: toAdapter}
+ endpoint, err := ap.endpointMgr.GetEndpoint(toDeviceId, toAdapter)
+ if err != nil {
+ return err
+ }
+ topic := kafka.Topic{Name: string(endpoint)}
replyToTopic := kafka.Topic{Name: fromAdapter}
rpc := "process_inter_adapter_message"
success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, proxyDeviceId, args...)
- log.Debugw("inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
+ logger.Debugw("inter-adapter-msg-response", log.Fields{"replyTopic": replyToTopic, "success": success})
return unPackResponse(rpc, "", success, result)
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/common.go
new file mode 100644
index 0000000..95a036d
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/common.go
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package common
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.Logger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "common"})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
new file mode 100644
index 0000000..20e1a52
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
@@ -0,0 +1,620 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package common
+
+import (
+ "context"
+ "sync"
+
+ "github.com/golang/protobuf/ptypes"
+ a "github.com/golang/protobuf/ptypes/any"
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+type CoreProxy struct {
+ kafkaICProxy kafka.InterContainerProxy
+ adapterTopic string
+ coreTopic string
+ deviceIdCoreMap map[string]string
+ lockDeviceIdCoreMap sync.RWMutex
+}
+
+func NewCoreProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
+ var proxy CoreProxy
+ proxy.kafkaICProxy = kafkaProxy
+ proxy.adapterTopic = adapterTopic
+ proxy.coreTopic = coreTopic
+ proxy.deviceIdCoreMap = make(map[string]string)
+ proxy.lockDeviceIdCoreMap = sync.RWMutex{}
+ logger.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
+
+ return &proxy
+}
+
+func unPackResponse(rpc string, deviceId string, success bool, response *a.Any) error {
+ if success {
+ return nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
+ logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw("response", log.Fields{"rpc": rpc, "deviceId": deviceId, "success": success, "error": err})
+ // TODO: Need to get the real error code
+ return status.Errorf(codes.Canceled, "%s", unpackResult.Reason)
+ }
+}
+
+// UpdateCoreReference adds or update a core reference (really the topic name) for a given device Id
+func (ap *CoreProxy) UpdateCoreReference(deviceId string, coreReference string) {
+ ap.lockDeviceIdCoreMap.Lock()
+ defer ap.lockDeviceIdCoreMap.Unlock()
+ ap.deviceIdCoreMap[deviceId] = coreReference
+}
+
+// DeleteCoreReference removes a core reference (really the topic name) for a given device Id
+func (ap *CoreProxy) DeleteCoreReference(deviceId string) {
+ ap.lockDeviceIdCoreMap.Lock()
+ defer ap.lockDeviceIdCoreMap.Unlock()
+ delete(ap.deviceIdCoreMap, deviceId)
+}
+
+func (ap *CoreProxy) getCoreTopic(deviceId string) kafka.Topic {
+ ap.lockDeviceIdCoreMap.Lock()
+ defer ap.lockDeviceIdCoreMap.Unlock()
+
+ if t, exist := ap.deviceIdCoreMap[deviceId]; exist {
+ return kafka.Topic{Name: t}
+ }
+
+ return kafka.Topic{Name: ap.coreTopic}
+}
+
+func (ap *CoreProxy) getAdapterTopic(args ...string) kafka.Topic {
+ return kafka.Topic{Name: ap.adapterTopic}
+}
+
+func (ap *CoreProxy) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
+ logger.Debugw("registering-adapter", log.Fields{"coreTopic": ap.coreTopic, "adapterTopic": ap.adapterTopic})
+ rpc := "Register"
+ topic := kafka.Topic{Name: ap.coreTopic}
+ replyToTopic := ap.getAdapterTopic()
+ args := make([]*kafka.KVArg, 2)
+
+ if adapter.TotalReplicas == 0 && adapter.CurrentReplica != 0 {
+ log.Fatal("totalReplicas can't be 0, since you're here you have at least one")
+ }
+
+ if adapter.CurrentReplica == 0 && adapter.TotalReplicas != 0 {
+ log.Fatal("currentReplica can't be 0, it has to start from 1")
+ }
+
+ if adapter.CurrentReplica == 0 && adapter.TotalReplicas == 0 {
+ // if the adapter is not setting these fields they default to 0,
+ // in that case it means the adapter is not ready to be scaled and thus it defaults
+ // to a single instance
+ adapter.CurrentReplica = 1
+ adapter.TotalReplicas = 1
+ }
+
+ if adapter.CurrentReplica > adapter.TotalReplicas {
+ log.Fatalf("CurrentReplica (%d) can't be greater than TotalReplicas (%d)",
+ adapter.CurrentReplica, adapter.TotalReplicas)
+ }
+
+ args[0] = &kafka.KVArg{
+ Key: "adapter",
+ Value: adapter,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "deviceTypes",
+ Value: deviceTypes,
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, "", args...)
+ logger.Debugw("Register-Adapter-response", log.Fields{"replyTopic": replyToTopic, "success": success})
+ return unPackResponse(rpc, "", success, result)
+}
+
+func (ap *CoreProxy) DeviceUpdate(ctx context.Context, device *voltha.Device) error {
+ logger.Debugw("DeviceUpdate", log.Fields{"deviceId": device.Id})
+ rpc := "DeviceUpdate"
+ toTopic := ap.getCoreTopic(device.Id)
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device",
+ Value: device,
+ }
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := ap.getAdapterTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, device.Id, args...)
+ logger.Debugw("DeviceUpdate-response", log.Fields{"deviceId": device.Id, "success": success})
+ return unPackResponse(rpc, device.Id, success, result)
+}
+
+func (ap *CoreProxy) PortCreated(ctx context.Context, deviceId string, port *voltha.Port) error {
+ logger.Debugw("PortCreated", log.Fields{"portNo": port.PortNo})
+ rpc := "PortCreated"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ args := make([]*kafka.KVArg, 2)
+ id := &voltha.ID{Id: deviceId}
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "port",
+ Value: port,
+ }
+
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := ap.getAdapterTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ logger.Debugw("PortCreated-response", log.Fields{"deviceId": deviceId, "success": success})
+ return unPackResponse(rpc, deviceId, success, result)
+}
+
+func (ap *CoreProxy) PortsStateUpdate(ctx context.Context, deviceId string, operStatus voltha.OperStatus_Types) error {
+ logger.Debugw("PortsStateUpdate", log.Fields{"deviceId": deviceId})
+ rpc := "PortsStateUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ args := make([]*kafka.KVArg, 2)
+ id := &voltha.ID{Id: deviceId}
+ oStatus := &ic.IntType{Val: int64(operStatus)}
+
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "oper_status",
+ Value: oStatus,
+ }
+
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := ap.getAdapterTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ logger.Debugw("PortsStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
+ return unPackResponse(rpc, deviceId, success, result)
+}
+
+func (ap *CoreProxy) DeleteAllPorts(ctx context.Context, deviceId string) error {
+ logger.Debugw("DeleteAllPorts", log.Fields{"deviceId": deviceId})
+ rpc := "DeleteAllPorts"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ args := make([]*kafka.KVArg, 2)
+ id := &voltha.ID{Id: deviceId}
+
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := ap.getAdapterTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ logger.Debugw("DeleteAllPorts-response", log.Fields{"deviceId": deviceId, "success": success})
+ return unPackResponse(rpc, deviceId, success, result)
+}
+
+func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
+ connStatus voltha.ConnectStatus_Types, operStatus voltha.OperStatus_Types) error {
+ logger.Debugw("DeviceStateUpdate", log.Fields{"deviceId": deviceId})
+ rpc := "DeviceStateUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ args := make([]*kafka.KVArg, 3)
+ id := &voltha.ID{Id: deviceId}
+ oStatus := &ic.IntType{Val: int64(operStatus)}
+ cStatus := &ic.IntType{Val: int64(connStatus)}
+
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "oper_status",
+ Value: oStatus,
+ }
+ args[2] = &kafka.KVArg{
+ Key: "connect_status",
+ Value: cStatus,
+ }
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := ap.getAdapterTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ logger.Debugw("DeviceStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
+ return unPackResponse(rpc, deviceId, success, result)
+}
+
+func (ap *CoreProxy) ChildDeviceDetected(ctx context.Context, parentDeviceId string, parentPortNo int,
+ childDeviceType string, channelId int, vendorId string, serialNumber string, onuId int64) (*voltha.Device, error) {
+ logger.Debugw("ChildDeviceDetected", log.Fields{"pDeviceId": parentDeviceId, "channelId": channelId})
+ rpc := "ChildDeviceDetected"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 7)
+ id := &voltha.ID{Id: parentDeviceId}
+ args[0] = &kafka.KVArg{
+ Key: "parent_device_id",
+ Value: id,
+ }
+ ppn := &ic.IntType{Val: int64(parentPortNo)}
+ args[1] = &kafka.KVArg{
+ Key: "parent_port_no",
+ Value: ppn,
+ }
+ cdt := &ic.StrType{Val: childDeviceType}
+ args[2] = &kafka.KVArg{
+ Key: "child_device_type",
+ Value: cdt,
+ }
+ channel := &ic.IntType{Val: int64(channelId)}
+ args[3] = &kafka.KVArg{
+ Key: "channel_id",
+ Value: channel,
+ }
+ vId := &ic.StrType{Val: vendorId}
+ args[4] = &kafka.KVArg{
+ Key: "vendor_id",
+ Value: vId,
+ }
+ sNo := &ic.StrType{Val: serialNumber}
+ args[5] = &kafka.KVArg{
+ Key: "serial_number",
+ Value: sNo,
+ }
+ oId := &ic.IntType{Val: int64(onuId)}
+ args[6] = &kafka.KVArg{
+ Key: "onu_id",
+ Value: oId,
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ logger.Debugw("ChildDeviceDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+
+ if success {
+ volthaDevice := &voltha.Device{}
+ if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
+ logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Error(codes.InvalidArgument, err.Error())
+ }
+ return volthaDevice, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw("ChildDeviceDetected-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
+
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(unpackResult.Code), unpackResult.Reason)
+ }
+
+}
+
+func (ap *CoreProxy) ChildDevicesLost(ctx context.Context, parentDeviceId string) error {
+ logger.Debugw("ChildDevicesLost", log.Fields{"pDeviceId": parentDeviceId})
+ rpc := "ChildDevicesLost"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 1)
+ id := &voltha.ID{Id: parentDeviceId}
+ args[0] = &kafka.KVArg{
+ Key: "parent_device_id",
+ Value: id,
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ logger.Debugw("ChildDevicesLost-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+ return unPackResponse(rpc, parentDeviceId, success, result)
+}
+
+func (ap *CoreProxy) ChildDevicesDetected(ctx context.Context, parentDeviceId string) error {
+ logger.Debugw("ChildDevicesDetected", log.Fields{"pDeviceId": parentDeviceId})
+ rpc := "ChildDevicesDetected"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 1)
+ id := &voltha.ID{Id: parentDeviceId}
+ args[0] = &kafka.KVArg{
+ Key: "parent_device_id",
+ Value: id,
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ logger.Debugw("ChildDevicesDetected-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+ return unPackResponse(rpc, parentDeviceId, success, result)
+}
+
+func (ap *CoreProxy) GetDevice(ctx context.Context, parentDeviceId string, deviceId string) (*voltha.Device, error) {
+ logger.Debugw("GetDevice", log.Fields{"deviceId": deviceId})
+ rpc := "GetDevice"
+
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 1)
+ id := &voltha.ID{Id: deviceId}
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ logger.Debugw("GetDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+
+ if success {
+ volthaDevice := &voltha.Device{}
+ if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
+ logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Error(codes.InvalidArgument, err.Error())
+ }
+ return volthaDevice, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw("GetDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
+ // TODO: Need to get the real error code
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(unpackResult.Code), unpackResult.Reason)
+ }
+}
+
+func (ap *CoreProxy) GetChildDevice(ctx context.Context, parentDeviceId string, kwargs map[string]interface{}) (*voltha.Device, error) {
+ logger.Debugw("GetChildDevice", log.Fields{"parentDeviceId": parentDeviceId, "kwargs": kwargs})
+ rpc := "GetChildDevice"
+
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 4)
+ id := &voltha.ID{Id: parentDeviceId}
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+
+ var cnt uint8 = 0
+ for k, v := range kwargs {
+ cnt += 1
+ if k == "serial_number" {
+ val := &ic.StrType{Val: v.(string)}
+ args[cnt] = &kafka.KVArg{
+ Key: k,
+ Value: val,
+ }
+ } else if k == "onu_id" {
+ val := &ic.IntType{Val: int64(v.(uint32))}
+ args[cnt] = &kafka.KVArg{
+ Key: k,
+ Value: val,
+ }
+ } else if k == "parent_port_no" {
+ val := &ic.IntType{Val: int64(v.(uint32))}
+ args[cnt] = &kafka.KVArg{
+ Key: k,
+ Value: val,
+ }
+ }
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ logger.Debugw("GetChildDevice-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+
+ if success {
+ volthaDevice := &voltha.Device{}
+ if err := ptypes.UnmarshalAny(result, volthaDevice); err != nil {
+ logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Error(codes.InvalidArgument, err.Error())
+ }
+ return volthaDevice, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw("GetChildDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
+
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(unpackResult.Code), unpackResult.Reason)
+ }
+}
+
+func (ap *CoreProxy) GetChildDevices(ctx context.Context, parentDeviceId string) (*voltha.Devices, error) {
+ logger.Debugw("GetChildDevices", log.Fields{"parentDeviceId": parentDeviceId})
+ rpc := "GetChildDevices"
+
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 1)
+ id := &voltha.ID{Id: parentDeviceId}
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ logger.Debugw("GetChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+
+ if success {
+ volthaDevices := &voltha.Devices{}
+ if err := ptypes.UnmarshalAny(result, volthaDevices); err != nil {
+ logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Error(codes.InvalidArgument, err.Error())
+ }
+ return volthaDevices, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw("GetChildDevices-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
+
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(unpackResult.Code), unpackResult.Reason)
+ }
+}
+
+func (ap *CoreProxy) SendPacketIn(ctx context.Context, deviceId string, port uint32, pktPayload []byte) error {
+ logger.Debugw("SendPacketIn", log.Fields{"deviceId": deviceId, "port": port, "pktPayload": pktPayload})
+ rpc := "PacketIn"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 3)
+ id := &voltha.ID{Id: deviceId}
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+ portNo := &ic.IntType{Val: int64(port)}
+ args[1] = &kafka.KVArg{
+ Key: "port",
+ Value: portNo,
+ }
+ pkt := &ic.Packet{Payload: pktPayload}
+ args[2] = &kafka.KVArg{
+ Key: "packet",
+ Value: pkt,
+ }
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ logger.Debugw("SendPacketIn-response", log.Fields{"pDeviceId": deviceId, "success": success})
+ return unPackResponse(rpc, deviceId, success, result)
+}
+
+func (ap *CoreProxy) DeviceReasonUpdate(ctx context.Context, deviceId string, deviceReason string) error {
+ logger.Debugw("DeviceReasonUpdate", log.Fields{"deviceId": deviceId, "deviceReason": deviceReason})
+ rpc := "DeviceReasonUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 2)
+ id := &voltha.ID{Id: deviceId}
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: id,
+ }
+ reason := &ic.StrType{Val: deviceReason}
+ args[1] = &kafka.KVArg{
+ Key: "device_reason",
+ Value: reason,
+ }
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ logger.Debugw("DeviceReason-response", log.Fields{"pDeviceId": deviceId, "success": success})
+ return unPackResponse(rpc, deviceId, success, result)
+}
+
+func (ap *CoreProxy) DevicePMConfigUpdate(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
+ logger.Debugw("DevicePMConfigUpdate", log.Fields{"pmConfigs": pmConfigs})
+ rpc := "DevicePMConfigUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(pmConfigs.Id)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := make([]*kafka.KVArg, 1)
+ args[0] = &kafka.KVArg{
+ Key: "device_pm_config",
+ Value: pmConfigs,
+ }
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, pmConfigs.Id, args...)
+ logger.Debugw("DevicePMConfigUpdate-response", log.Fields{"pDeviceId": pmConfigs.Id, "success": success})
+ return unPackResponse(rpc, pmConfigs.Id, success, result)
+}
+
+func (ap *CoreProxy) ReconcileChildDevices(ctx context.Context, parentDeviceId string) error {
+ logger.Debugw("ReconcileChildDevices", log.Fields{"parentDeviceId": parentDeviceId})
+ rpc := "ReconcileChildDevices"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(parentDeviceId)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := []*kafka.KVArg{
+ {Key: "parent_device_id", Value: &voltha.ID{Id: parentDeviceId}},
+ }
+
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, parentDeviceId, args...)
+ logger.Debugw("ReconcileChildDevices-response", log.Fields{"pDeviceId": parentDeviceId, "success": success})
+ return unPackResponse(rpc, parentDeviceId, success, result)
+}
+
+func (ap *CoreProxy) PortStateUpdate(ctx context.Context, deviceId string, pType voltha.Port_PortType, portNum uint32,
+ operStatus voltha.OperStatus_Types) error {
+ logger.Debugw("PortStateUpdate", log.Fields{"deviceId": deviceId, "portType": pType, "portNo": portNum, "operation_status": operStatus})
+ rpc := "PortStateUpdate"
+ // Use a device specific topic to send the request. The adapter handling the device creates a device
+ // specific topic
+ toTopic := ap.getCoreTopic(deviceId)
+ args := make([]*kafka.KVArg, 4)
+ deviceID := &voltha.ID{Id: deviceId}
+ portNo := &ic.IntType{Val: int64(portNum)}
+ portType := &ic.IntType{Val: int64(pType)}
+ oStatus := &ic.IntType{Val: int64(operStatus)}
+
+ args[0] = &kafka.KVArg{
+ Key: "device_id",
+ Value: deviceID,
+ }
+ args[1] = &kafka.KVArg{
+ Key: "oper_status",
+ Value: oStatus,
+ }
+ args[2] = &kafka.KVArg{
+ Key: "port_type",
+ Value: portType,
+ }
+ args[3] = &kafka.KVArg{
+ Key: "port_no",
+ Value: portNo,
+ }
+
+ // Use a device specific topic as we are the only adaptercore handling requests for this device
+ replyToTopic := ap.getAdapterTopic()
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceId, args...)
+ logger.Debugw("PortStateUpdate-response", log.Fields{"deviceId": deviceId, "success": success})
+ return unPackResponse(rpc, deviceId, success, result)
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/common/events_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/events_proxy.go
similarity index 66%
rename from vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/common/events_proxy.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/events_proxy.go
index ab6b0d0..da9c9eb 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/common/events_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/events_proxy.go
@@ -23,10 +23,11 @@
"strings"
"time"
- "github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif"
- "github.com/opencord/voltha-lib-go/v2/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- "github.com/opencord/voltha-protos/v2/go/voltha"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
)
type EventProxy struct {
@@ -60,7 +61,11 @@
return fmt.Sprintf("Voltha.openolt.%s.%s", eventName, strconv.FormatInt(time.Now().UnixNano(), 10))
}
-func (ep *EventProxy) getEventHeader(eventName string, category adapterif.EventCategory, subCategory adapterif.EventSubCategory, eventType adapterif.EventType, raisedTs int64) *voltha.EventHeader {
+func (ep *EventProxy) getEventHeader(eventName string,
+ category adapterif.EventCategory,
+ subCategory adapterif.EventSubCategory,
+ eventType adapterif.EventType,
+ raisedTs int64) (*voltha.EventHeader, error) {
var header voltha.EventHeader
if strings.Contains(eventName, "_") {
eventName = strings.Join(strings.Split(eventName, "_")[:len(strings.Split(eventName, "_"))-2], "_")
@@ -73,27 +78,42 @@
header.SubCategory = subCategory
header.Type = eventType
header.TypeVersion = adapterif.EventTypeVersion
- header.RaisedTs = float32(raisedTs)
- header.ReportedTs = float32(time.Now().UnixNano())
- return &header
+
+ // raisedTs is in nanoseconds
+ timestamp, err := ptypes.TimestampProto(time.Unix(0, raisedTs))
+ if err != nil {
+ return nil, err
+ }
+ header.RaisedTs = timestamp
+
+ timestamp, err = ptypes.TimestampProto(time.Now())
+ if err != nil {
+ return nil, err
+ }
+ header.ReportedTs = timestamp
+
+ return &header, nil
}
/* Send out device events*/
func (ep *EventProxy) SendDeviceEvent(deviceEvent *voltha.DeviceEvent, category adapterif.EventCategory, subCategory adapterif.EventSubCategory, raisedTs int64) error {
if deviceEvent == nil {
- log.Error("Recieved empty device event")
+ logger.Error("Recieved empty device event")
return errors.New("Device event nil")
}
var event voltha.Event
var de voltha.Event_DeviceEvent
+ var err error
de.DeviceEvent = deviceEvent
- event.Header = ep.getEventHeader(deviceEvent.DeviceEventName, category, subCategory, voltha.EventType_DEVICE_EVENT, raisedTs)
- event.EventType = &de
- if err := ep.sendEvent(&event); err != nil {
- log.Errorw("Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
+ if event.Header, err = ep.getEventHeader(deviceEvent.DeviceEventName, category, subCategory, voltha.EventType_DEVICE_EVENT, raisedTs); err != nil {
return err
}
- log.Infow("Successfully sent device event KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
+ event.EventType = &de
+ if err := ep.sendEvent(&event); err != nil {
+ logger.Errorw("Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
+ return err
+ }
+ logger.Infow("Successfully sent device event KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
"SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
"ReportedTs": event.Header.ReportedTs, "ResourceId": deviceEvent.ResourceId, "Context": deviceEvent.Context,
"DeviceEventName": deviceEvent.DeviceEventName})
@@ -105,19 +125,22 @@
// SendKpiEvent is to send kpi events to voltha.event topic
func (ep *EventProxy) SendKpiEvent(id string, kpiEvent *voltha.KpiEvent2, category adapterif.EventCategory, subCategory adapterif.EventSubCategory, raisedTs int64) error {
if kpiEvent == nil {
- log.Error("Recieved empty kpi event")
+ logger.Error("Recieved empty kpi event")
return errors.New("KPI event nil")
}
var event voltha.Event
var de voltha.Event_KpiEvent2
+ var err error
de.KpiEvent2 = kpiEvent
- event.Header = ep.getEventHeader(id, category, subCategory, voltha.EventType_KPI_EVENT2, raisedTs)
- event.EventType = &de
- if err := ep.sendEvent(&event); err != nil {
- log.Errorw("Failed to send kpi event to KAFKA bus", log.Fields{"device-event": kpiEvent})
+ if event.Header, err = ep.getEventHeader(id, category, subCategory, voltha.EventType_KPI_EVENT2, raisedTs); err != nil {
return err
}
- log.Infow("Successfully sent kpi event to KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
+ event.EventType = &de
+ if err := ep.sendEvent(&event); err != nil {
+ logger.Errorw("Failed to send kpi event to KAFKA bus", log.Fields{"device-event": kpiEvent})
+ return err
+ }
+ logger.Infow("Successfully sent kpi event to KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
"SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
"ReportedTs": event.Header.ReportedTs, "KpiEventName": "STATS_EVENT"})
@@ -131,7 +154,7 @@
if err := ep.kafkaClient.Send(event, &ep.eventTopic); err != nil {
return err
}
- log.Debugw("Sent event to kafka", log.Fields{"event": event})
+ logger.Debugw("Sent event to kafka", log.Fields{"event": event})
return nil
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/common/performance_metrics.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/performance_metrics.go
similarity index 97%
rename from vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/common/performance_metrics.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/performance_metrics.go
index bcb45f8..7697c05 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/common/performance_metrics.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/performance_metrics.go
@@ -17,7 +17,7 @@
package common
import (
- "github.com/opencord/voltha-protos/v2/go/voltha"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
)
type PmMetrics struct {
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/common/request_handler.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/request_handler.go
similarity index 62%
rename from vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/common/request_handler.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/request_handler.go
index dfcaf1e..843b95c 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/common/request_handler.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/request_handler.go
@@ -17,15 +17,16 @@
import (
"errors"
+
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
- "github.com/opencord/voltha-lib-go/v2/pkg/adapters"
- "github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif"
- "github.com/opencord/voltha-lib-go/v2/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- ic "github.com/opencord/voltha-protos/v2/go/inter_container"
- "github.com/opencord/voltha-protos/v2/go/openflow_13"
- "github.com/opencord/voltha-protos/v2/go/voltha"
+ "github.com/opencord/voltha-lib-go/v3/pkg/adapters"
+ "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
@@ -59,7 +60,7 @@
func (rhp *RequestHandlerProxy) Adopt_device(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 3 {
- log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -70,23 +71,23 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
case kafka.FromTopic:
if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
- log.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
return nil, err
}
}
}
- log.Debugw("Adopt_device", log.Fields{"deviceId": device.Id})
+ logger.Debugw("Adopt_device", log.Fields{"deviceId": device.Id})
//Update the core reference for that device
rhp.coreProxy.UpdateCoreReference(device.Id, fromTopic.Val)
@@ -101,7 +102,7 @@
func (rhp *RequestHandlerProxy) Reconcile_device(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 3 {
- log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -113,17 +114,17 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
case kafka.FromTopic:
if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
- log.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
return nil, err
}
}
@@ -144,7 +145,7 @@
func (rhp *RequestHandlerProxy) Disable_device(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 3 {
- log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -156,17 +157,17 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
case kafka.FromTopic:
if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
- log.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
return nil, err
}
}
@@ -182,7 +183,7 @@
func (rhp *RequestHandlerProxy) Reenable_device(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 3 {
- log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -194,17 +195,17 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
case kafka.FromTopic:
if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
- log.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
return nil, err
}
}
@@ -220,7 +221,7 @@
func (rhp *RequestHandlerProxy) Reboot_device(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 3 {
- log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -232,17 +233,17 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
case kafka.FromTopic:
if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
- log.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
return nil, err
}
}
@@ -263,7 +264,7 @@
func (rhp *RequestHandlerProxy) Delete_device(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 3 {
- log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -275,17 +276,17 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
case kafka.FromTopic:
if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
- log.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
return nil, err
}
}
@@ -304,9 +305,9 @@
}
func (rhp *RequestHandlerProxy) Update_flows_bulk(args []*ic.Argument) (*empty.Empty, error) {
- log.Debug("Update_flows_bulk")
+ logger.Debug("Update_flows_bulk")
if len(args) < 5 {
- log.Warn("Update_flows_bulk-invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn("Update_flows_bulk-invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -319,32 +320,32 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case "flows":
if err := ptypes.UnmarshalAny(arg.Value, flows); err != nil {
- log.Warnw("cannot-unmarshal-flows", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-flows", log.Fields{"error": err})
return nil, err
}
case "groups":
if err := ptypes.UnmarshalAny(arg.Value, groups); err != nil {
- log.Warnw("cannot-unmarshal-groups", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-groups", log.Fields{"error": err})
return nil, err
}
case "flow_metadata":
if err := ptypes.UnmarshalAny(arg.Value, flowMetadata); err != nil {
- log.Warnw("cannot-unmarshal-metadata", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-metadata", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
}
}
- log.Debugw("Update_flows_bulk", log.Fields{"flows": flows, "groups": groups})
+ logger.Debugw("Update_flows_bulk", log.Fields{"flows": flows, "groups": groups})
//Invoke the bulk flow update API of the adapter
if err := rhp.adapter.Update_flows_bulk(device, flows, groups, flowMetadata); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
@@ -353,9 +354,9 @@
}
func (rhp *RequestHandlerProxy) Update_flows_incrementally(args []*ic.Argument) (*empty.Empty, error) {
- log.Debug("Update_flows_incrementally")
+ logger.Debug("Update_flows_incrementally")
if len(args) < 5 {
- log.Warn("Update_flows_incrementally-invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn("Update_flows_incrementally-invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -368,32 +369,32 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case "flow_changes":
if err := ptypes.UnmarshalAny(arg.Value, flows); err != nil {
- log.Warnw("cannot-unmarshal-flows", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-flows", log.Fields{"error": err})
return nil, err
}
case "group_changes":
if err := ptypes.UnmarshalAny(arg.Value, groups); err != nil {
- log.Warnw("cannot-unmarshal-groups", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-groups", log.Fields{"error": err})
return nil, err
}
case "flow_metadata":
if err := ptypes.UnmarshalAny(arg.Value, flowMetadata); err != nil {
- log.Warnw("cannot-unmarshal-metadata", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-metadata", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
}
}
- log.Debugw("Update_flows_incrementally", log.Fields{"flows": flows, "groups": groups})
+ logger.Debugw("Update_flows_incrementally", log.Fields{"flows": flows, "groups": groups})
//Invoke the incremental flow update API of the adapter
if err := rhp.adapter.Update_flows_incrementally(device, flows, groups, flowMetadata); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
@@ -402,9 +403,9 @@
}
func (rhp *RequestHandlerProxy) Update_pm_config(args []*ic.Argument) (*empty.Empty, error) {
- log.Debug("Update_pm_config")
+ logger.Debug("Update_pm_config")
if len(args) < 2 {
- log.Warn("Update_pm_config-invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn("Update_pm_config-invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -415,22 +416,22 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case "pm_configs":
if err := ptypes.UnmarshalAny(arg.Value, pmConfigs); err != nil {
- log.Warnw("cannot-unmarshal-pm-configs", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-pm-configs", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
}
}
- log.Debugw("Update_pm_config", log.Fields{"deviceId": device.Id, "pmConfigs": pmConfigs})
+ logger.Debugw("Update_pm_config", log.Fields{"deviceId": device.Id, "pmConfigs": pmConfigs})
//Invoke the pm config update API of the adapter
if err := rhp.adapter.Update_pm_config(device, pmConfigs); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
@@ -439,9 +440,9 @@
}
func (rhp *RequestHandlerProxy) Receive_packet_out(args []*ic.Argument) (*empty.Empty, error) {
- log.Debugw("Receive_packet_out", log.Fields{"args": args})
+ logger.Debugw("Receive_packet_out", log.Fields{"args": args})
if len(args) < 3 {
- log.Warn("Receive_packet_out-invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn("Receive_packet_out-invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -453,27 +454,27 @@
switch arg.Key {
case "deviceId":
if err := ptypes.UnmarshalAny(arg.Value, deviceId); err != nil {
- log.Warnw("cannot-unmarshal-deviceId", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-deviceId", log.Fields{"error": err})
return nil, err
}
case "outPort":
if err := ptypes.UnmarshalAny(arg.Value, egressPort); err != nil {
- log.Warnw("cannot-unmarshal-egressPort", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-egressPort", log.Fields{"error": err})
return nil, err
}
case "packet":
if err := ptypes.UnmarshalAny(arg.Value, packet); err != nil {
- log.Warnw("cannot-unmarshal-packet", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-packet", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
}
}
- log.Debugw("Receive_packet_out", log.Fields{"deviceId": deviceId.Val, "outPort": egressPort, "packet": packet})
+ logger.Debugw("Receive_packet_out", log.Fields{"deviceId": deviceId.Val, "outPort": egressPort, "packet": packet})
//Invoke the adopt device on the adapter
if err := rhp.adapter.Receive_packet_out(deviceId.Val, int(egressPort.Val), packet); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
@@ -491,7 +492,7 @@
func (rhp *RequestHandlerProxy) Get_ofp_device_info(args []*ic.Argument) (*ic.SwitchCapability, error) {
if len(args) < 2 {
- log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -501,31 +502,31 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
}
}
- log.Debugw("Get_ofp_device_info", log.Fields{"deviceId": device.Id})
+ logger.Debugw("Get_ofp_device_info", log.Fields{"deviceId": device.Id})
var cap *ic.SwitchCapability
var err error
if cap, err = rhp.adapter.Get_ofp_device_info(device); err != nil {
return nil, status.Errorf(codes.NotFound, "%s", err.Error())
}
- log.Debugw("Get_ofp_device_info", log.Fields{"cap": cap})
+ logger.Debugw("Get_ofp_device_info", log.Fields{"cap": cap})
return cap, nil
}
func (rhp *RequestHandlerProxy) Get_ofp_port_info(args []*ic.Argument) (*ic.PortCapability, error) {
if len(args) < 3 {
- log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -536,22 +537,22 @@
switch arg.Key {
case "device":
if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
- log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case "port_no":
if err := ptypes.UnmarshalAny(arg.Value, pNo); err != nil {
- log.Warnw("cannot-unmarshal-port-no", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-port-no", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
}
}
- log.Debugw("Get_ofp_port_info", log.Fields{"deviceId": device.Id, "portNo": pNo.Val})
+ logger.Debugw("Get_ofp_port_info", log.Fields{"deviceId": device.Id, "portNo": pNo.Val})
var cap *ic.PortCapability
var err error
if cap, err = rhp.adapter.Get_ofp_port_info(device, pNo.Val); err != nil {
@@ -562,7 +563,7 @@
func (rhp *RequestHandlerProxy) Process_inter_adapter_message(args []*ic.Argument) (*empty.Empty, error) {
if len(args) < 2 {
- log.Warn("invalid-number-of-args", log.Fields{"args": args})
+ logger.Warn("invalid-number-of-args", log.Fields{"args": args})
err := errors.New("invalid-number-of-args")
return nil, err
}
@@ -572,18 +573,18 @@
switch arg.Key {
case "msg":
if err := ptypes.UnmarshalAny(arg.Value, iaMsg); err != nil {
- log.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
return nil, err
}
case kafka.TransactionKey:
if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
- log.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
return nil, err
}
}
}
- log.Debugw("Process_inter_adapter_message", log.Fields{"msgId": iaMsg.Header.Id})
+ logger.Debugw("Process_inter_adapter_message", log.Fields{"msgId": iaMsg.Header.Id})
//Invoke the inter adapter API on the handler
if err := rhp.adapter.Process_inter_adapter_message(iaMsg); err != nil {
@@ -612,3 +613,125 @@
func (rhp *RequestHandlerProxy) Revert_image_update(args []*ic.Argument) (*voltha.ImageDownload, error) {
return &voltha.ImageDownload{}, nil
}
+
+func (rhp *RequestHandlerProxy) Enable_port(args []*ic.Argument) error {
+ logger.Debugw("enable_port", log.Fields{"args": args})
+ deviceId, port, err := rhp.getEnableDisableParams(args)
+ if err != nil {
+ logger.Warnw("enable_port", log.Fields{"args": args, "deviceId": deviceId, "port": port})
+ return err
+ }
+ return rhp.adapter.Enable_port(deviceId, port)
+}
+
+func (rhp *RequestHandlerProxy) Disable_port(args []*ic.Argument) error {
+ logger.Debugw("disable_port", log.Fields{"args": args})
+ deviceId, port, err := rhp.getEnableDisableParams(args)
+ if err != nil {
+ logger.Warnw("disable_port", log.Fields{"args": args, "deviceId": deviceId, "port": port})
+ return err
+ }
+ return rhp.adapter.Disable_port(deviceId, port)
+}
+
+func (rhp *RequestHandlerProxy) getEnableDisableParams(args []*ic.Argument) (string, *voltha.Port, error) {
+ logger.Debugw("getEnableDisableParams", log.Fields{"args": args})
+ if len(args) < 3 {
+ logger.Warn("invalid-number-of-args", log.Fields{"args": args})
+ return "", nil, errors.New("invalid-number-of-args")
+ }
+ deviceId := &ic.StrType{}
+ port := &voltha.Port{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "deviceId":
+ if err := ptypes.UnmarshalAny(arg.Value, deviceId); err != nil {
+ logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ return "", nil, err
+ }
+ case "port":
+ if err := ptypes.UnmarshalAny(arg.Value, port); err != nil {
+ logger.Warnw("cannot-unmarshal-port", log.Fields{"error": err})
+ return "", nil, err
+ }
+ }
+ }
+ return deviceId.Val, port, nil
+}
+
+func (rhp *RequestHandlerProxy) Child_device_lost(args []*ic.Argument) error {
+ if len(args) < 4 {
+ logger.Warn("invalid-number-of-args", log.Fields{"args": args})
+ return errors.New("invalid-number-of-args")
+ }
+
+ pDeviceId := &ic.StrType{}
+ pPortNo := &ic.IntType{}
+ onuID := &ic.IntType{}
+ fromTopic := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "pDeviceId":
+ if err := ptypes.UnmarshalAny(arg.Value, pDeviceId); err != nil {
+ logger.Warnw("cannot-unmarshal-parent-deviceId", log.Fields{"error": err})
+ return err
+ }
+ case "pPortNo":
+ if err := ptypes.UnmarshalAny(arg.Value, pPortNo); err != nil {
+ logger.Warnw("cannot-unmarshal-port", log.Fields{"error": err})
+ return err
+ }
+ case "onuID":
+ if err := ptypes.UnmarshalAny(arg.Value, onuID); err != nil {
+ logger.Warnw("cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return err
+ }
+ case kafka.FromTopic:
+ if err := ptypes.UnmarshalAny(arg.Value, fromTopic); err != nil {
+ logger.Warnw("cannot-unmarshal-from-topic", log.Fields{"error": err})
+ return err
+ }
+ }
+ }
+ //Update the core reference for that device
+ rhp.coreProxy.UpdateCoreReference(pDeviceId.Val, fromTopic.Val)
+ //Invoke the Child_device_lost API on the adapter
+ if err := rhp.adapter.Child_device_lost(pDeviceId.Val, uint32(pPortNo.Val), uint32(onuID.Val)); err != nil {
+ return status.Errorf(codes.NotFound, "%s", err.Error())
+ }
+ return nil
+}
+
+func (rhp *RequestHandlerProxy) Start_omci_test(args []*ic.Argument) (*ic.TestResponse, error) {
+ if len(args) < 2 {
+ logger.Warn("invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+
+ // TODO: See related comment in voltha-go:adapter_proxy_go:startOmciTest()
+ // Second argument should perhaps be uuid instead of omcitestrequest
+
+ device := &voltha.Device{}
+ request := &voltha.OmciTestRequest{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device":
+ if err := ptypes.UnmarshalAny(arg.Value, device); err != nil {
+ logger.Warnw("cannot-unmarshal-device", log.Fields{"error": err})
+ return nil, err
+ }
+ case "omcitestrequest":
+ if err := ptypes.UnmarshalAny(arg.Value, request); err != nil {
+ logger.Warnw("cannot-unmarshal-omcitestrequest", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ logger.Debugw("Start_omci_test", log.Fields{"device-id": device.Id, "req": request})
+ result, err := rhp.adapter.Start_omci_test(device, request)
+ if err != nil {
+ return nil, status.Errorf(codes.NotFound, "%s", err.Error())
+ }
+ return result, nil
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/common/utils.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/utils.go
similarity index 78%
rename from vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/common/utils.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/utils.go
index d3c562a..94e8bd6 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/common/utils.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/utils.go
@@ -17,6 +17,9 @@
import (
"fmt"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ "google.golang.org/grpc/codes"
"math/rand"
"time"
)
@@ -71,3 +74,17 @@
}
return string(b)
}
+
+func ICProxyErrorCodeToGrpcErrorCode(icErr ic.ErrorCodeCodes) codes.Code {
+ switch icErr {
+ case ic.ErrorCode_INVALID_PARAMETERS:
+ return codes.InvalidArgument
+ case ic.ErrorCode_UNSUPPORTED_REQUEST:
+ return codes.Unavailable
+ case ic.ErrorCode_DEADLINE_EXCEEDED:
+ return codes.DeadlineExceeded
+ default:
+ logger.Warnw("cannnot-map-ic-error-code-to-grpc-error-code", log.Fields{"err": icErr})
+ return codes.Internal
+ }
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/iAdapter.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/iAdapter.go
similarity index 81%
rename from vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/iAdapter.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/iAdapter.go
index be98f17..112fb94 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/adapters/iAdapter.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/iAdapter.go
@@ -16,9 +16,9 @@
package adapters
import (
- ic "github.com/opencord/voltha-protos/v2/go/inter_container"
- "github.com/opencord/voltha-protos/v2/go/openflow_13"
- "github.com/opencord/voltha-protos/v2/go/voltha"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
)
//IAdapter represents the set of APIs a voltha adapter has to support.
@@ -39,8 +39,8 @@
Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error
Update_pm_config(device *voltha.Device, pm_configs *voltha.PmConfigs) error
Receive_packet_out(deviceId string, egress_port_no int, msg *openflow_13.OfpPacketOut) error
- Suppress_alarm(filter *voltha.AlarmFilter) error
- Unsuppress_alarm(filter *voltha.AlarmFilter) error
+ Suppress_event(filter *voltha.EventFilter) error
+ Unsuppress_event(filter *voltha.EventFilter) error
Get_ofp_device_info(device *voltha.Device) (*ic.SwitchCapability, error)
Get_ofp_port_info(device *voltha.Device, port_no int64) (*ic.PortCapability, error)
Process_inter_adapter_message(msg *ic.InterAdapterMessage) error
@@ -49,4 +49,8 @@
Cancel_image_download(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
Activate_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
Revert_image_update(device *voltha.Device, request *voltha.ImageDownload) (*voltha.ImageDownload, error)
+ Enable_port(deviceId string, port *voltha.Port) error
+ Disable_port(deviceId string, port *voltha.Port) error
+ Child_device_lost(parentDeviceId string, parentPortNo uint32, onuID uint32) error
+ Start_omci_test(device *voltha.Device, request *voltha.OmciTestRequest) (*voltha.TestResponse, error)
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
new file mode 100644
index 0000000..faa86ed
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/backend.go
@@ -0,0 +1,269 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package db
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "strconv"
+ "sync"
+ "time"
+
+ "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+const (
+ // Default Minimal Interval for posting alive state of backend kvstore on Liveness Channel
+ DefaultLivenessChannelInterval = time.Second * 30
+)
+
+// Backend structure holds details for accessing the kv store
+type Backend struct {
+ sync.RWMutex
+ Client kvstore.Client
+ StoreType string
+ Host string
+ Port int
+ Timeout int
+ PathPrefix string
+ alive bool // Is this backend connection alive?
+ liveness chan bool // channel to post alive state
+ LivenessChannelInterval time.Duration // regularly push alive state beyond this interval
+ lastLivenessTime time.Time // Instant of last alive state push
+}
+
+// NewBackend creates a new instance of a Backend structure
+func NewBackend(storeType string, host string, port int, timeout int, pathPrefix string) *Backend {
+ var err error
+
+ b := &Backend{
+ StoreType: storeType,
+ Host: host,
+ Port: port,
+ Timeout: timeout,
+ LivenessChannelInterval: DefaultLivenessChannelInterval,
+ PathPrefix: pathPrefix,
+ alive: false, // connection considered down at start
+ }
+
+ address := host + ":" + strconv.Itoa(port)
+ if b.Client, err = b.newClient(address, timeout); err != nil {
+ logger.Errorw("failed-to-create-kv-client",
+ log.Fields{
+ "type": storeType, "host": host, "port": port,
+ "timeout": timeout, "prefix": pathPrefix,
+ "error": err.Error(),
+ })
+ }
+
+ return b
+}
+
+func (b *Backend) newClient(address string, timeout int) (kvstore.Client, error) {
+ switch b.StoreType {
+ case "consul":
+ return kvstore.NewConsulClient(address, timeout)
+ case "etcd":
+ return kvstore.NewEtcdClient(address, timeout)
+ }
+ return nil, errors.New("unsupported-kv-store")
+}
+
+func (b *Backend) makePath(key string) string {
+ path := fmt.Sprintf("%s/%s", b.PathPrefix, key)
+ return path
+}
+
+func (b *Backend) updateLiveness(alive bool) {
+ // Periodically push stream of liveness data to the channel,
+ // so that in a live state, the core does not timeout and
+ // send a forced liveness message. Push alive state if the
+ // last push to channel was beyond livenessChannelInterval
+ if b.liveness != nil {
+
+ if b.alive != alive {
+ logger.Debug("update-liveness-channel-reason-change")
+ b.liveness <- alive
+ b.lastLivenessTime = time.Now()
+ } else if time.Since(b.lastLivenessTime) > b.LivenessChannelInterval {
+ logger.Debug("update-liveness-channel-reason-interval")
+ b.liveness <- alive
+ b.lastLivenessTime = time.Now()
+ }
+ }
+
+ // Emit log message only for alive state change
+ if b.alive != alive {
+ logger.Debugw("change-kvstore-alive-status", log.Fields{"alive": alive})
+ b.alive = alive
+ }
+}
+
+// Perform a dummy Key Lookup on kvstore to test Connection Liveness and
+// post on Liveness channel
+func (b *Backend) PerformLivenessCheck(ctx context.Context) bool {
+ alive := b.Client.IsConnectionUp(ctx)
+ logger.Debugw("kvstore-liveness-check-result", log.Fields{"alive": alive})
+
+ b.updateLiveness(alive)
+ return alive
+}
+
+// Enable the liveness monitor channel. This channel will report
+// a "true" or "false" on every kvstore operation which indicates whether
+// or not the connection is still Live. This channel is then picked up
+// by the service (i.e. rw_core / ro_core) to update readiness status
+// and/or take other actions.
+func (b *Backend) EnableLivenessChannel() chan bool {
+ logger.Debug("enable-kvstore-liveness-channel")
+
+ if b.liveness == nil {
+ logger.Debug("create-kvstore-liveness-channel")
+
+ // Channel size of 10 to avoid any possibility of blocking in Load conditions
+ b.liveness = make(chan bool, 10)
+
+ // Post initial alive state
+ b.liveness <- b.alive
+ b.lastLivenessTime = time.Now()
+ }
+
+ return b.liveness
+}
+
+// Extract Alive status of Kvstore based on type of error
+func (b *Backend) isErrorIndicatingAliveKvstore(err error) bool {
+ // Alive unless observed an error indicating so
+ alive := true
+
+ if err != nil {
+
+ // timeout indicates kvstore not reachable/alive
+ if err == context.DeadlineExceeded {
+ alive = false
+ }
+
+ // Need to analyze client-specific errors based on backend type
+ if b.StoreType == "etcd" {
+
+ // For etcd backend, consider not-alive only for errors indicating
+ // timedout request or unavailable/corrupted cluster. For all remaining
+ // error codes listed in https://godoc.org/google.golang.org/grpc/codes#Code,
+ // we would not infer a not-alive backend because such a error may also
+ // occur due to bad client requests or sequence of operations
+ switch status.Code(err) {
+ case codes.DeadlineExceeded:
+ fallthrough
+ case codes.Unavailable:
+ fallthrough
+ case codes.DataLoss:
+ alive = false
+ }
+
+ //} else {
+ // TODO: Implement for consul backend; would it be needed ever?
+ }
+ }
+
+ return alive
+}
+
+// List retrieves one or more items that match the specified key
+func (b *Backend) List(ctx context.Context, key string) (map[string]*kvstore.KVPair, error) {
+ b.Lock()
+ defer b.Unlock()
+
+ formattedPath := b.makePath(key)
+ logger.Debugw("listing-key", log.Fields{"key": key, "path": formattedPath})
+
+ pair, err := b.Client.List(ctx, formattedPath)
+
+ b.updateLiveness(b.isErrorIndicatingAliveKvstore(err))
+
+ return pair, err
+}
+
+// Get retrieves an item that matches the specified key
+func (b *Backend) Get(ctx context.Context, key string) (*kvstore.KVPair, error) {
+ b.Lock()
+ defer b.Unlock()
+
+ formattedPath := b.makePath(key)
+ logger.Debugw("getting-key", log.Fields{"key": key, "path": formattedPath})
+
+ pair, err := b.Client.Get(ctx, formattedPath)
+
+ b.updateLiveness(b.isErrorIndicatingAliveKvstore(err))
+
+ return pair, err
+}
+
+// Put stores an item value under the specifed key
+func (b *Backend) Put(ctx context.Context, key string, value interface{}) error {
+ b.Lock()
+ defer b.Unlock()
+
+ formattedPath := b.makePath(key)
+ logger.Debugw("putting-key", log.Fields{"key": key, "value": value, "path": formattedPath})
+
+ err := b.Client.Put(ctx, formattedPath, value)
+
+ b.updateLiveness(b.isErrorIndicatingAliveKvstore(err))
+
+ return err
+}
+
+// Delete removes an item under the specified key
+func (b *Backend) Delete(ctx context.Context, key string) error {
+ b.Lock()
+ defer b.Unlock()
+
+ formattedPath := b.makePath(key)
+ logger.Debugw("deleting-key", log.Fields{"key": key, "path": formattedPath})
+
+ err := b.Client.Delete(ctx, formattedPath)
+
+ b.updateLiveness(b.isErrorIndicatingAliveKvstore(err))
+
+ return err
+}
+
+// CreateWatch starts watching events for the specified key
+func (b *Backend) CreateWatch(ctx context.Context, key string, withPrefix bool) chan *kvstore.Event {
+ b.Lock()
+ defer b.Unlock()
+
+ formattedPath := b.makePath(key)
+ logger.Debugw("creating-key-watch", log.Fields{"key": key, "path": formattedPath})
+
+ return b.Client.Watch(ctx, formattedPath, withPrefix)
+}
+
+// DeleteWatch stops watching events for the specified key
+func (b *Backend) DeleteWatch(key string, ch chan *kvstore.Event) {
+ b.Lock()
+ defer b.Unlock()
+
+ formattedPath := b.makePath(key)
+ logger.Debugw("deleting-key-watch", log.Fields{"key": key, "path": formattedPath})
+
+ b.Client.CloseWatch(formattedPath, ch)
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/common.go
new file mode 100644
index 0000000..1cf2e1c
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/common.go
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package db
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.Logger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "db"})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go
similarity index 73%
rename from vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/client.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go
index 97fbec9..b9cb1ee 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/client.go
@@ -15,9 +15,7 @@
*/
package kvstore
-import (
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
-)
+import "context"
const (
// Default timeout in seconds when making a kvstore request
@@ -43,10 +41,6 @@
Lease int64
}
-func init() {
- log.AddPackage(log.JSON, log.WarnLevel, nil)
-}
-
// NewKVPair creates a new KVPair object
func NewKVPair(key string, value interface{}, session string, lease int64, version int64) *KVPair {
kv := new(KVPair)
@@ -79,18 +73,18 @@
// Client represents the set of APIs a KV Client must implement
type Client interface {
- List(key string, timeout int) (map[string]*KVPair, error)
- Get(key string, timeout int) (*KVPair, error)
- Put(key string, value interface{}, timeout int) error
- Delete(key string, timeout int) error
- Reserve(key string, value interface{}, ttl int64) (interface{}, error)
- ReleaseReservation(key string) error
- ReleaseAllReservations() error
- RenewReservation(key string) error
- Watch(key string) chan *Event
- AcquireLock(lockName string, timeout int) error
+ List(ctx context.Context, key string) (map[string]*KVPair, error)
+ Get(ctx context.Context, key string) (*KVPair, error)
+ Put(ctx context.Context, key string, value interface{}) error
+ Delete(ctx context.Context, key string) error
+ Reserve(ctx context.Context, key string, value interface{}, ttl int64) (interface{}, error)
+ ReleaseReservation(ctx context.Context, key string) error
+ ReleaseAllReservations(ctx context.Context) error
+ RenewReservation(ctx context.Context, key string) error
+ Watch(ctx context.Context, key string, withPrefix bool) chan *Event
+ AcquireLock(ctx context.Context, lockName string, timeout int) error
ReleaseLock(lockName string) error
- IsConnectionUp(timeout int) bool // timeout in second
+ IsConnectionUp(ctx context.Context) bool // timeout in second
CloseWatch(key string, ch chan *Event)
Close()
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/common.go
new file mode 100644
index 0000000..aa7aeb0
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/common.go
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kvstore
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.Logger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "kvstore"})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/consulclient.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go
similarity index 80%
rename from vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/consulclient.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go
index a94de4d..bdf2d10 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/consulclient.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/consulclient.go
@@ -19,7 +19,7 @@
"bytes"
"context"
"errors"
- log "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ log "github.com/opencord/voltha-lib-go/v3/pkg/log"
"sync"
"time"
//log "ciena.com/coordinator/common"
@@ -53,7 +53,7 @@
config.WaitTime = duration
consul, err := consulapi.NewClient(config)
if err != nil {
- log.Error(err)
+ logger.Error(err)
return nil, err
}
@@ -64,23 +64,23 @@
}
// IsConnectionUp returns whether the connection to the Consul KV store is up
-func (c *ConsulClient) IsConnectionUp(timeout int) bool {
- log.Error("Unimplemented function")
+func (c *ConsulClient) IsConnectionUp(ctx context.Context) bool {
+ logger.Error("Unimplemented function")
return false
}
// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
// wait for a response
-func (c *ConsulClient) List(key string, timeout int) (map[string]*KVPair, error) {
- duration := GetDuration(timeout)
+func (c *ConsulClient) List(ctx context.Context, key string) (map[string]*KVPair, error) {
+ deadline, _ := ctx.Deadline()
kv := c.consul.KV()
var queryOptions consulapi.QueryOptions
- queryOptions.WaitTime = duration
+ queryOptions.WaitTime = GetDuration(deadline.Second())
// For now we ignore meta data
kvps, _, err := kv.List(key, &queryOptions)
if err != nil {
- log.Error(err)
+ logger.Error(err)
return nil, err
}
m := make(map[string]*KVPair)
@@ -92,17 +92,16 @@
// Get returns a key-value pair for a given key. Timeout defines how long the function will
// wait for a response
-func (c *ConsulClient) Get(key string, timeout int) (*KVPair, error) {
+func (c *ConsulClient) Get(ctx context.Context, key string) (*KVPair, error) {
- duration := GetDuration(timeout)
-
+ deadline, _ := ctx.Deadline()
kv := c.consul.KV()
var queryOptions consulapi.QueryOptions
- queryOptions.WaitTime = duration
+ queryOptions.WaitTime = GetDuration(deadline.Second())
// For now we ignore meta data
kvp, _, err := kv.Get(key, &queryOptions)
if err != nil {
- log.Error(err)
+ logger.Error(err)
return nil, err
}
if kvp != nil {
@@ -115,13 +114,13 @@
// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the consul API
// accepts only a []byte as a value for a put operation. Timeout defines how long the function will
// wait for a response
-func (c *ConsulClient) Put(key string, value interface{}, timeout int) error {
+func (c *ConsulClient) Put(ctx context.Context, key string, value interface{}) error {
// Validate that we can create a byte array from the value as consul API expects a byte array
var val []byte
var er error
if val, er = ToByte(value); er != nil {
- log.Error(er)
+ logger.Error(er)
return er
}
@@ -133,7 +132,7 @@
defer c.writeLock.Unlock()
_, err := kv.Put(&kvp, &writeOptions)
if err != nil {
- log.Error(err)
+ logger.Error(err)
return err
}
return nil
@@ -141,14 +140,14 @@
// Delete removes a key from the KV store. Timeout defines how long the function will
// wait for a response
-func (c *ConsulClient) Delete(key string, timeout int) error {
+func (c *ConsulClient) Delete(ctx context.Context, key string) error {
kv := c.consul.KV()
var writeOptions consulapi.WriteOptions
c.writeLock.Lock()
defer c.writeLock.Unlock()
_, err := kv.Delete(key, &writeOptions)
if err != nil {
- log.Error(err)
+ logger.Error(err)
return err
}
return nil
@@ -156,11 +155,11 @@
func (c *ConsulClient) deleteSession() {
if c.sessionID != "" {
- log.Debug("cleaning-up-session")
+ logger.Debug("cleaning-up-session")
session := c.consul.Session()
_, err := session.Destroy(c.sessionID, nil)
if err != nil {
- log.Errorw("error-cleaning-session", log.Fields{"session": c.sessionID, "error": err})
+ logger.Errorw("error-cleaning-session", log.Fields{"session": c.sessionID, "error": err})
}
}
c.sessionID = ""
@@ -177,17 +176,17 @@
for {
id, meta, err := session.Create(entry, nil)
if err != nil {
- log.Errorw("create-session-error", log.Fields{"error": err})
+ logger.Errorw("create-session-error", log.Fields{"error": err})
if retries == 0 {
return nil, "", err
}
} else if meta.RequestTime == 0 {
- log.Errorw("create-session-bad-meta-data", log.Fields{"meta-data": meta})
+ logger.Errorw("create-session-bad-meta-data", log.Fields{"meta-data": meta})
if retries == 0 {
return nil, "", errors.New("bad-meta-data")
}
} else if id == "" {
- log.Error("create-session-nil-id")
+ logger.Error("create-session-nil-id")
if retries == 0 {
return nil, "", errors.New("ID-nil")
}
@@ -198,7 +197,7 @@
if retries > 0 {
retries--
}
- log.Debug("retrying-session-create-after-a-second-delay")
+ logger.Debug("retrying-session-create-after-a-second-delay")
time.Sleep(time.Duration(1) * time.Second)
}
}
@@ -219,13 +218,13 @@
// defines how long that reservation is valid. When TTL expires the key is unreserved by the KV store itself.
// If the key is acquired then the value returned will be the value passed in. If the key is already acquired
// then the value assigned to that key will be returned.
-func (c *ConsulClient) Reserve(key string, value interface{}, ttl int64) (interface{}, error) {
+func (c *ConsulClient) Reserve(ctx context.Context, key string, value interface{}, ttl int64) (interface{}, error) {
// Validate that we can create a byte array from the value as consul API expects a byte array
var val []byte
var er error
if val, er = ToByte(value); er != nil {
- log.Error(er)
+ logger.Error(er)
return nil, er
}
@@ -238,17 +237,17 @@
reservationSuccessful := false
defer func() {
if !reservationSuccessful {
- log.Debug("deleting-session")
+ logger.Debug("deleting-session")
c.deleteSession()
}
}()
session, sessionID, err := c.createSession(ttl, -1)
if err != nil {
- log.Errorw("no-session-created", log.Fields{"error": err})
+ logger.Errorw("no-session-created", log.Fields{"error": err})
return "", errors.New("no-session-created")
}
- log.Debugw("session-created", log.Fields{"session-id": sessionID})
+ logger.Debugw("session-created", log.Fields{"session-id": sessionID})
c.sessionID = sessionID
c.session = session
@@ -257,19 +256,19 @@
kvp := consulapi.KVPair{Key: key, Value: val, Session: c.sessionID}
result, _, err := kv.Acquire(&kvp, nil)
if err != nil {
- log.Errorw("error-acquiring-keys", log.Fields{"error": err})
+ logger.Errorw("error-acquiring-keys", log.Fields{"error": err})
return nil, err
}
- log.Debugw("key-acquired", log.Fields{"key": key, "status": result})
+ logger.Debugw("key-acquired", log.Fields{"key": key, "status": result})
// Irrespective whether we were successful in acquiring the key, let's read it back and see if it's us.
- m, err := c.Get(key, defaultKVGetTimeout)
+ m, err := c.Get(ctx, key)
if err != nil {
return nil, err
}
if m != nil {
- log.Debugw("response-received", log.Fields{"key": m.Key, "m.value": string(m.Value.([]byte)), "value": value})
+ logger.Debugw("response-received", log.Fields{"key": m.Key, "m.value": string(m.Value.([]byte)), "value": value})
if m.Key == key && isEqual(m.Value, value) {
// My reservation is successful - register it. For now, support is only for 1 reservation per key
// per session.
@@ -286,7 +285,7 @@
}
// ReleaseAllReservations releases all key reservations previously made (using Reserve API)
-func (c *ConsulClient) ReleaseAllReservations() error {
+func (c *ConsulClient) ReleaseAllReservations(ctx context.Context) error {
kv := c.consul.KV()
var kvp consulapi.KVPair
var result bool
@@ -299,11 +298,11 @@
kvp = consulapi.KVPair{Key: key, Value: value.([]byte), Session: c.sessionID}
result, _, err = kv.Release(&kvp, nil)
if err != nil {
- log.Errorw("cannot-release-reservation", log.Fields{"key": key, "error": err})
+ logger.Errorw("cannot-release-reservation", log.Fields{"key": key, "error": err})
return err
}
if !result {
- log.Errorw("cannot-release-reservation", log.Fields{"key": key})
+ logger.Errorw("cannot-release-reservation", log.Fields{"key": key})
}
delete(c.keyReservations, key)
}
@@ -311,7 +310,7 @@
}
// ReleaseReservation releases reservation for a specific key.
-func (c *ConsulClient) ReleaseReservation(key string) error {
+func (c *ConsulClient) ReleaseReservation(ctx context.Context, key string) error {
var ok bool
var reservedValue interface{}
c.writeLock.Lock()
@@ -337,7 +336,7 @@
// RenewReservation renews a reservation. A reservation will go stale after the specified TTL (Time To Live)
// period specified when reserving the key
-func (c *ConsulClient) RenewReservation(key string) error {
+func (c *ConsulClient) RenewReservation(ctx context.Context, key string) error {
// In the case of Consul, renew reservation of a reserve key only require renewing the client session.
c.writeLock.Lock()
@@ -361,7 +360,7 @@
// Watch provides the watch capability on a given key. It returns a channel onto which the callee needs to
// listen to receive Events.
-func (c *ConsulClient) Watch(key string) chan *Event {
+func (c *ConsulClient) Watch(ctx context.Context, key string, withPrefix bool) chan *Event {
// Create a new channel
ch := make(chan *Event, maxClientChannelBufferSize)
@@ -390,14 +389,14 @@
c.writeLock.Lock()
defer c.writeLock.Unlock()
if watchedChannelsContexts, ok = c.watchedChannelsContext[key]; !ok {
- log.Errorw("key-has-no-watched-context-or-channel", log.Fields{"key": key})
+ logger.Errorw("key-has-no-watched-context-or-channel", log.Fields{"key": key})
return
}
// Look for the channels
var pos = -1
for i, chCtxMap := range watchedChannelsContexts {
if chCtxMap.channel == ch {
- log.Debug("channel-found")
+ logger.Debug("channel-found")
chCtxMap.cancel()
//close the channel
close(ch)
@@ -409,7 +408,7 @@
if pos >= 0 {
c.watchedChannelsContext[key] = append(c.watchedChannelsContext[key][:pos], c.watchedChannelsContext[key][pos+1:]...)
}
- log.Debugw("watched-channel-exiting", log.Fields{"key": key, "channel": c.watchedChannelsContext[key]})
+ logger.Debugw("watched-channel-exiting", log.Fields{"key": key, "channel": c.watchedChannelsContext[key]})
}
func (c *ConsulClient) isKVEqual(kv1 *consulapi.KVPair, kv2 *consulapi.KVPair) bool {
@@ -430,7 +429,7 @@
}
func (c *ConsulClient) listenForKeyChange(watchContext context.Context, key string, ch chan *Event) {
- log.Debugw("start-watching-channel", log.Fields{"key": key, "channel": ch})
+ logger.Debugw("start-watching-channel", log.Fields{"key": key, "channel": ch})
defer c.CloseWatch(key, ch)
duration := GetDuration(defaultKVGetTimeout)
@@ -441,7 +440,7 @@
// Get the existing value, if any
previousKVPair, meta, err := kv.Get(key, &queryOptions)
if err != nil {
- log.Debug(err)
+ logger.Debug(err)
}
lastIndex := meta.LastIndex
@@ -456,30 +455,30 @@
pair, meta, err = kv.Get(key, waitOptions)
select {
case <-watchContext.Done():
- log.Debug("done-event-received-exiting")
+ logger.Debug("done-event-received-exiting")
return
default:
if err != nil {
- log.Warnw("error-from-watch", log.Fields{"error": err})
+ logger.Warnw("error-from-watch", log.Fields{"error": err})
ch <- NewEvent(CONNECTIONDOWN, key, []byte(""), -1)
} else {
- log.Debugw("index-state", log.Fields{"lastindex": lastIndex, "newindex": meta.LastIndex, "key": key})
+ logger.Debugw("index-state", log.Fields{"lastindex": lastIndex, "newindex": meta.LastIndex, "key": key})
}
}
if err != nil {
- log.Debug(err)
+ logger.Debug(err)
// On error, block for 10 milliseconds to prevent endless loop
time.Sleep(10 * time.Millisecond)
} else if meta.LastIndex <= lastIndex {
- log.Info("no-index-change-or-negative")
+ logger.Info("no-index-change-or-negative")
} else {
- log.Debugw("update-received", log.Fields{"pair": pair})
+ logger.Debugw("update-received", log.Fields{"pair": pair})
if pair == nil {
ch <- NewEvent(DELETE, key, []byte(""), -1)
} else if !c.isKVEqual(pair, previousKVPair) {
// Push the change onto the channel if the data has changed
// For now just assume it's a PUT change
- log.Debugw("pair-details", log.Fields{"session": pair.Session, "key": pair.Key, "value": pair.Value})
+ logger.Debugw("pair-details", log.Fields{"session": pair.Session, "key": pair.Key, "value": pair.Value})
ch <- NewEvent(PUT, pair.Key, pair.Value, -1)
}
previousKVPair = pair
@@ -500,11 +499,11 @@
// Clear the sessionID
if _, err := c.consul.Session().Destroy(c.sessionID, &writeOptions); err != nil {
- log.Errorw("error-closing-client", log.Fields{"error": err})
+ logger.Errorw("error-closing-client", log.Fields{"error": err})
}
}
-func (c *ConsulClient) AcquireLock(lockName string, timeout int) error {
+func (c *ConsulClient) AcquireLock(ctx context.Context, lockName string, timeout int) error {
return nil
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/etcdclient.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
similarity index 73%
rename from vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/etcdclient.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
index 3ae767c..d38f0f6 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/etcdclient.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
@@ -19,28 +19,25 @@
"context"
"errors"
"fmt"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "sync"
+
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
v3Client "go.etcd.io/etcd/clientv3"
v3Concurrency "go.etcd.io/etcd/clientv3/concurrency"
v3rpcTypes "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
- "sync"
)
// EtcdClient represents the Etcd KV store client
type EtcdClient struct {
- ectdAPI *v3Client.Client
- leaderRev v3Client.Client
- keyReservations map[string]*v3Client.LeaseID
- watchedChannels sync.Map
- writeLock sync.Mutex
- lockToMutexMap map[string]*v3Concurrency.Mutex
- lockToSessionMap map[string]*v3Concurrency.Session
- lockToMutexLock sync.Mutex
+ ectdAPI *v3Client.Client
+ keyReservations map[string]*v3Client.LeaseID
+ watchedChannels sync.Map
+ keyReservationsLock sync.RWMutex
+ lockToMutexMap map[string]*v3Concurrency.Mutex
+ lockToSessionMap map[string]*v3Concurrency.Session
+ lockToMutexLock sync.Mutex
}
-// Connection Timeout in Seconds
-var connTimeout int = 2
-
// NewEtcdClient returns a new client for the Etcd KV store
func NewEtcdClient(addr string, timeout int) (*EtcdClient, error) {
duration := GetDuration(timeout)
@@ -50,7 +47,7 @@
DialTimeout: duration,
})
if err != nil {
- log.Error(err)
+ logger.Error(err)
return nil, err
}
@@ -64,25 +61,21 @@
// IsConnectionUp returns whether the connection to the Etcd KV store is up. If a timeout occurs then
// it is assumed the connection is down or unreachable.
-func (c *EtcdClient) IsConnectionUp(timeout int) bool {
+func (c *EtcdClient) IsConnectionUp(ctx context.Context) bool {
// Let's try to get a non existent key. If the connection is up then there will be no error returned.
- if _, err := c.Get("non-existent-key", timeout); err != nil {
+ if _, err := c.Get(ctx, "non-existent-key"); err != nil {
return false
}
+ //cancel()
return true
}
// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
// wait for a response
-func (c *EtcdClient) List(key string, timeout int) (map[string]*KVPair, error) {
- duration := GetDuration(timeout)
-
- ctx, cancel := context.WithTimeout(context.Background(), duration)
-
+func (c *EtcdClient) List(ctx context.Context, key string) (map[string]*KVPair, error) {
resp, err := c.ectdAPI.Get(ctx, key, v3Client.WithPrefix())
- cancel()
if err != nil {
- log.Error(err)
+ logger.Error(err)
return nil, err
}
m := make(map[string]*KVPair)
@@ -94,15 +87,12 @@
// Get returns a key-value pair for a given key. Timeout defines how long the function will
// wait for a response
-func (c *EtcdClient) Get(key string, timeout int) (*KVPair, error) {
- duration := GetDuration(timeout)
-
- ctx, cancel := context.WithTimeout(context.Background(), duration)
+func (c *EtcdClient) Get(ctx context.Context, key string) (*KVPair, error) {
resp, err := c.ectdAPI.Get(ctx, key)
- cancel()
+
if err != nil {
- log.Error(err)
+ logger.Error(err)
return nil, err
}
for _, ev := range resp.Kvs {
@@ -115,7 +105,7 @@
// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the etcd API
// accepts only a string as a value for a put operation. Timeout defines how long the function will
// wait for a response
-func (c *EtcdClient) Put(key string, value interface{}, timeout int) error {
+func (c *EtcdClient) Put(ctx context.Context, key string, value interface{}) error {
// Validate that we can convert value to a string as etcd API expects a string
var val string
@@ -124,32 +114,28 @@
return fmt.Errorf("unexpected-type-%T", value)
}
- duration := GetDuration(timeout)
-
- ctx, cancel := context.WithTimeout(context.Background(), duration)
-
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
-
var err error
// Check if there is already a lease for this key - if there is then use it, otherwise a PUT will make
// that KV key permanent instead of automatically removing it after a lease expiration
- if leaseID, ok := c.keyReservations[key]; ok {
+ c.keyReservationsLock.RLock()
+ leaseID, ok := c.keyReservations[key]
+ c.keyReservationsLock.RUnlock()
+ if ok {
_, err = c.ectdAPI.Put(ctx, key, val, v3Client.WithLease(*leaseID))
} else {
_, err = c.ectdAPI.Put(ctx, key, val)
}
- cancel()
+
if err != nil {
switch err {
case context.Canceled:
- log.Warnw("context-cancelled", log.Fields{"error": err})
+ logger.Warnw("context-cancelled", log.Fields{"error": err})
case context.DeadlineExceeded:
- log.Warnw("context-deadline-exceeded", log.Fields{"error": err})
+ logger.Warnw("context-deadline-exceeded", log.Fields{"error": err})
case v3rpcTypes.ErrEmptyKey:
- log.Warnw("etcd-client-error", log.Fields{"error": err})
+ logger.Warnw("etcd-client-error", log.Fields{"error": err})
default:
- log.Warnw("bad-endpoints", log.Fields{"error": err})
+ logger.Warnw("bad-endpoints", log.Fields{"error": err})
}
return err
}
@@ -158,23 +144,14 @@
// Delete removes a key from the KV store. Timeout defines how long the function will
// wait for a response
-func (c *EtcdClient) Delete(key string, timeout int) error {
-
- duration := GetDuration(timeout)
-
- ctx, cancel := context.WithTimeout(context.Background(), duration)
-
- defer cancel()
-
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
+func (c *EtcdClient) Delete(ctx context.Context, key string) error {
// delete the key
if _, err := c.ectdAPI.Delete(ctx, key); err != nil {
- log.Errorw("failed-to-delete-key", log.Fields{"key": key, "error": err})
+ logger.Errorw("failed-to-delete-key", log.Fields{"key": key, "error": err})
return err
}
- log.Debugw("key(s)-deleted", log.Fields{"key": key})
+ logger.Debugw("key(s)-deleted", log.Fields{"key": key})
return nil
}
@@ -183,7 +160,7 @@
// defines how long that reservation is valid. When TTL expires the key is unreserved by the KV store itself.
// If the key is acquired then the value returned will be the value passed in. If the key is already acquired
// then the value assigned to that key will be returned.
-func (c *EtcdClient) Reserve(key string, value interface{}, ttl int64) (interface{}, error) {
+func (c *EtcdClient) Reserve(ctx context.Context, key string, value interface{}, ttl int64) (interface{}, error) {
// Validate that we can convert value to a string as etcd API expects a string
var val string
var er error
@@ -191,28 +168,22 @@
return nil, fmt.Errorf("unexpected-type%T", value)
}
- duration := GetDuration(connTimeout)
-
- // Create a lease
- ctx, cancel := context.WithTimeout(context.Background(), duration)
- defer cancel()
-
resp, err := c.ectdAPI.Grant(ctx, ttl)
if err != nil {
- log.Error(err)
+ logger.Error(err)
return nil, err
}
// Register the lease id
- c.writeLock.Lock()
+ c.keyReservationsLock.Lock()
c.keyReservations[key] = &resp.ID
- c.writeLock.Unlock()
+ c.keyReservationsLock.Unlock()
// Revoke lease if reservation is not successful
reservationSuccessful := false
defer func() {
if !reservationSuccessful {
- if err = c.ReleaseReservation(key); err != nil {
- log.Error("cannot-release-lease")
+ if err = c.ReleaseReservation(context.Background(), key); err != nil {
+ logger.Error("cannot-release-lease")
}
}
}()
@@ -241,7 +212,7 @@
}
} else {
// Read the Key to ensure this is our Key
- m, err := c.Get(key, defaultKVGetTimeout)
+ m, err := c.Get(ctx, key)
if err != nil {
return nil, err
}
@@ -260,17 +231,14 @@
}
// ReleaseAllReservations releases all key reservations previously made (using Reserve API)
-func (c *EtcdClient) ReleaseAllReservations() error {
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
- duration := GetDuration(connTimeout)
- ctx, cancel := context.WithTimeout(context.Background(), duration)
- defer cancel()
+func (c *EtcdClient) ReleaseAllReservations(ctx context.Context) error {
+ c.keyReservationsLock.Lock()
+ defer c.keyReservationsLock.Unlock()
for key, leaseID := range c.keyReservations {
_, err := c.ectdAPI.Revoke(ctx, *leaseID)
if err != nil {
- log.Errorw("cannot-release-reservation", log.Fields{"key": key, "error": err})
+ logger.Errorw("cannot-release-reservation", log.Fields{"key": key, "error": err})
return err
}
delete(c.keyReservations, key)
@@ -279,24 +247,21 @@
}
// ReleaseReservation releases reservation for a specific key.
-func (c *EtcdClient) ReleaseReservation(key string) error {
+func (c *EtcdClient) ReleaseReservation(ctx context.Context, key string) error {
// Get the leaseid using the key
- log.Debugw("Release-reservation", log.Fields{"key": key})
+ logger.Debugw("Release-reservation", log.Fields{"key": key})
var ok bool
var leaseID *v3Client.LeaseID
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
+ c.keyReservationsLock.Lock()
+ defer c.keyReservationsLock.Unlock()
if leaseID, ok = c.keyReservations[key]; !ok {
return nil
}
- duration := GetDuration(connTimeout)
- ctx, cancel := context.WithTimeout(context.Background(), duration)
- defer cancel()
if leaseID != nil {
_, err := c.ectdAPI.Revoke(ctx, *leaseID)
if err != nil {
- log.Error(err)
+ logger.Error(err)
return err
}
delete(c.keyReservations, key)
@@ -306,23 +271,22 @@
// RenewReservation renews a reservation. A reservation will go stale after the specified TTL (Time To Live)
// period specified when reserving the key
-func (c *EtcdClient) RenewReservation(key string) error {
+func (c *EtcdClient) RenewReservation(ctx context.Context, key string) error {
// Get the leaseid using the key
var ok bool
var leaseID *v3Client.LeaseID
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
- if leaseID, ok = c.keyReservations[key]; !ok {
+ c.keyReservationsLock.RLock()
+ leaseID, ok = c.keyReservations[key]
+ c.keyReservationsLock.RUnlock()
+
+ if !ok {
return errors.New("key-not-reserved")
}
- duration := GetDuration(connTimeout)
- ctx, cancel := context.WithTimeout(context.Background(), duration)
- defer cancel()
if leaseID != nil {
_, err := c.ectdAPI.KeepAliveOnce(ctx, *leaseID)
if err != nil {
- log.Errorw("lease-may-have-expired", log.Fields{"error": err})
+ logger.Errorw("lease-may-have-expired", log.Fields{"error": err})
return err
}
} else {
@@ -333,10 +297,15 @@
// Watch provides the watch capability on a given key. It returns a channel onto which the callee needs to
// listen to receive Events.
-func (c *EtcdClient) Watch(key string) chan *Event {
+func (c *EtcdClient) Watch(ctx context.Context, key string, withPrefix bool) chan *Event {
w := v3Client.NewWatcher(c.ectdAPI)
- ctx, cancel := context.WithCancel(context.Background())
- channel := w.Watch(ctx, key)
+ ctx, cancel := context.WithCancel(ctx)
+ var channel v3Client.WatchChan
+ if withPrefix {
+ channel = w.Watch(ctx, key, v3Client.WithPrefix())
+ } else {
+ channel = w.Watch(ctx, key)
+ }
// Create a new channel
ch := make(chan *Event, maxClientChannelBufferSize)
@@ -349,7 +318,7 @@
// Changing the log field (from channelMaps) as the underlying logger cannot format the map of channels into a
// json format.
- log.Debugw("watched-channels", log.Fields{"len": len(channelMaps)})
+ logger.Debugw("watched-channels", log.Fields{"len": len(channelMaps)})
// Launch a go routine to listen for updates
go c.listenForKeyChange(channel, ch, cancel)
@@ -402,21 +371,19 @@
// Get the array of channels mapping
var watchedChannels []map[chan *Event]v3Client.Watcher
var ok bool
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
if watchedChannels, ok = c.getChannelMaps(key); !ok {
- log.Warnw("key-has-no-watched-channels", log.Fields{"key": key})
+ logger.Warnw("key-has-no-watched-channels", log.Fields{"key": key})
return
}
// Look for the channels
var pos = -1
for i, chMap := range watchedChannels {
if t, ok := chMap[ch]; ok {
- log.Debug("channel-found")
+ logger.Debug("channel-found")
// Close the etcd watcher before the client channel. This should close the etcd channel as well
if err := t.Close(); err != nil {
- log.Errorw("watcher-cannot-be-closed", log.Fields{"key": key, "error": err})
+ logger.Errorw("watcher-cannot-be-closed", log.Fields{"key": key, "error": err})
}
pos = i
break
@@ -428,11 +395,11 @@
if pos >= 0 {
channelMaps = c.removeChannelMap(key, pos)
}
- log.Infow("watcher-channel-exiting", log.Fields{"key": key, "channel": channelMaps})
+ logger.Infow("watcher-channel-exiting", log.Fields{"key": key, "channel": channelMaps})
}
func (c *EtcdClient) listenForKeyChange(channel v3Client.WatchChan, ch chan<- *Event, cancel context.CancelFunc) {
- log.Debug("start-listening-on-channel ...")
+ logger.Debug("start-listening-on-channel ...")
defer cancel()
defer close(ch)
for resp := range channel {
@@ -440,7 +407,7 @@
ch <- NewEvent(getEventType(ev), ev.Kv.Key, ev.Kv.Value, ev.Kv.Version)
}
}
- log.Debug("stop-listening-on-channel ...")
+ logger.Debug("stop-listening-on-channel ...")
}
func getEventType(event *v3Client.Event) int {
@@ -455,10 +422,8 @@
// Close closes the KV store client
func (c *EtcdClient) Close() {
- c.writeLock.Lock()
- defer c.writeLock.Unlock()
if err := c.ectdAPI.Close(); err != nil {
- log.Errorw("error-closing-client", log.Fields{"error": err})
+ logger.Errorw("error-closing-client", log.Fields{"error": err})
}
}
@@ -490,14 +455,11 @@
return lock, session
}
-func (c *EtcdClient) AcquireLock(lockName string, timeout int) error {
- duration := GetDuration(timeout)
- ctx, cancel := context.WithTimeout(context.Background(), duration)
- defer cancel()
+func (c *EtcdClient) AcquireLock(ctx context.Context, lockName string, timeout int) error {
session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
mu := v3Concurrency.NewMutex(session, "/devicelock_"+lockName)
if err := mu.Lock(context.Background()); err != nil {
- cancel()
+ //cancel()
return err
}
c.addLockName(lockName, mu, session)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/kvutils.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/kvutils.go
similarity index 100%
rename from vendor/github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore/kvutils.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/kvutils.go
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/client.go
similarity index 92%
rename from vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/client.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/client.go
index 488bf9f..9abad93 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/client.go
@@ -18,7 +18,7 @@
import (
"time"
- ca "github.com/opencord/voltha-protos/v2/go/inter_container"
+ ca "github.com/opencord/voltha-protos/v3/go/inter_container"
)
const (
@@ -66,7 +66,9 @@
DeleteTopic(topic *Topic) error
Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ca.InterContainerMessage, error)
UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error
+ SubscribeForMetadata(func(fromTopic string, timestamp int64))
Send(msg interface{}, topic *Topic, keys ...string) error
SendLiveness() error
EnableLivenessChannel(enable bool) chan bool
+ EnableHealthinessChannel(enable bool) chan bool
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/common.go
new file mode 100644
index 0000000..149c150
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/common.go
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.Logger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "kafka"})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go
new file mode 100644
index 0000000..1258382
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go
@@ -0,0 +1,352 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+ "context"
+ "fmt"
+ "github.com/buraksezer/consistent"
+ "github.com/cespare/xxhash"
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "sync"
+)
+
+const (
+ // All the values below can be tuned to get optimal data distribution. The numbers below seems to work well when
+ // supporting 1000-10000 devices and 1 - 20 replicas of a service
+
+ // Keys are distributed among partitions. Prime numbers are good to distribute keys uniformly.
+ DefaultPartitionCount = 1117
+
+ // Represents how many times a node is replicated on the consistent ring.
+ DefaultReplicationFactor = 117
+
+ // Load is used to calculate average load.
+ DefaultLoad = 1.1
+)
+
+type Endpoint string // Endpoint of a service instance. When using kafka, this is the topic name of a service
+type ReplicaID int32 // The replication ID of a service instance
+
+type EndpointManager interface {
+
+ // GetEndpoint is called to get the endpoint to communicate with for a specific device and service type. For
+ // now this will return the topic name
+ GetEndpoint(deviceID string, serviceType string) (Endpoint, error)
+
+ // IsDeviceOwnedByService is invoked when a specific service (service type + replicaNumber) is restarted and
+ // devices owned by that service need to be reconciled
+ IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error)
+
+ // GetReplicaAssignment returns the replica number of the service that owns the deviceID. This is used by the
+ // test only
+ GetReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error)
+}
+
+type service struct {
+ id string // Id of the service. The same id is used for all replicas
+ totalReplicas int32
+ replicas map[ReplicaID]Endpoint
+ consistentRing *consistent.Consistent
+}
+
+type endpointManager struct {
+ partitionCount int
+ replicationFactor int
+ load float64
+ backend *db.Backend
+ services map[string]*service
+ servicesLock sync.RWMutex
+ deviceTypeServiceMap map[string]string
+ deviceTypeServiceMapLock sync.RWMutex
+}
+
+type EndpointManagerOption func(*endpointManager)
+
+func PartitionCount(count int) EndpointManagerOption {
+ return func(args *endpointManager) {
+ args.partitionCount = count
+ }
+}
+
+func ReplicationFactor(replicas int) EndpointManagerOption {
+ return func(args *endpointManager) {
+ args.replicationFactor = replicas
+ }
+}
+
+func Load(load float64) EndpointManagerOption {
+ return func(args *endpointManager) {
+ args.load = load
+ }
+}
+
+func newEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+ tm := &endpointManager{
+ partitionCount: DefaultPartitionCount,
+ replicationFactor: DefaultReplicationFactor,
+ load: DefaultLoad,
+ backend: backend,
+ services: make(map[string]*service),
+ deviceTypeServiceMap: make(map[string]string),
+ }
+
+ for _, option := range opts {
+ option(tm)
+ }
+ return tm
+}
+
+func NewEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+ return newEndpointManager(backend, opts...)
+}
+
+func (ep *endpointManager) GetEndpoint(deviceID string, serviceType string) (Endpoint, error) {
+ logger.Debugw("getting-endpoint", log.Fields{"device-id": deviceID, "service": serviceType})
+ owner, err := ep.getOwner(deviceID, serviceType)
+ if err != nil {
+ return "", err
+ }
+ m, ok := owner.(Member)
+ if !ok {
+ return "", status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+ }
+ endpoint := m.getEndPoint()
+ if endpoint == "" {
+ return "", status.Errorf(codes.Unavailable, "endpoint-not-set-%s", serviceType)
+ }
+ logger.Debugw("returning-endpoint", log.Fields{"device-id": deviceID, "service": serviceType, "endpoint": endpoint})
+ return endpoint, nil
+}
+
+func (ep *endpointManager) IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error) {
+ logger.Debugw("device-ownership", log.Fields{"device-id": deviceID, "service": serviceType, "replica-number": replicaNumber})
+ owner, err := ep.getOwner(deviceID, serviceType)
+ if err != nil {
+ return false, nil
+ }
+ m, ok := owner.(Member)
+ if !ok {
+ return false, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+ }
+ return m.getReplica() == ReplicaID(replicaNumber), nil
+}
+
+func (ep *endpointManager) GetReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error) {
+ owner, err := ep.getOwner(deviceID, serviceType)
+ if err != nil {
+ return 0, nil
+ }
+ m, ok := owner.(Member)
+ if !ok {
+ return 0, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+ }
+ return m.getReplica(), nil
+}
+
+func (ep *endpointManager) getOwner(deviceID string, serviceType string) (consistent.Member, error) {
+ serv, dType, err := ep.getServiceAndDeviceType(serviceType)
+ if err != nil {
+ return nil, err
+ }
+ key := ep.makeKey(deviceID, dType, serviceType)
+ return serv.consistentRing.LocateKey(key), nil
+}
+
+func (ep *endpointManager) getServiceAndDeviceType(serviceType string) (*service, string, error) {
+ // Check whether service exist
+ ep.servicesLock.RLock()
+ serv, serviceExist := ep.services[serviceType]
+ ep.servicesLock.RUnlock()
+
+ // Load the service and device types if needed
+ if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+ if err := ep.loadServices(); err != nil {
+ return nil, "", err
+ }
+
+ // Check whether the service exists now
+ ep.servicesLock.RLock()
+ serv, serviceExist = ep.services[serviceType]
+ ep.servicesLock.RUnlock()
+ if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+ return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+ }
+ }
+
+ ep.deviceTypeServiceMapLock.RLock()
+ defer ep.deviceTypeServiceMapLock.RUnlock()
+ for dType, sType := range ep.deviceTypeServiceMap {
+ if sType == serviceType {
+ return serv, dType, nil
+ }
+ }
+ return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+}
+
+func (ep *endpointManager) getConsistentConfig() consistent.Config {
+ return consistent.Config{
+ PartitionCount: ep.partitionCount,
+ ReplicationFactor: ep.replicationFactor,
+ Load: ep.load,
+ Hasher: hasher{},
+ }
+}
+
+// loadServices loads the services (adapters) and device types in memory. Because of the small size of the data and
+// the data format in the dB being binary protobuf then it is better to load all the data if inconsistency is detected,
+// instead of watching for updates in the dB and acting on it.
+func (ep *endpointManager) loadServices() error {
+ ep.servicesLock.Lock()
+ defer ep.servicesLock.Unlock()
+ ep.deviceTypeServiceMapLock.Lock()
+ defer ep.deviceTypeServiceMapLock.Unlock()
+
+ if ep.backend == nil {
+ return status.Error(codes.Aborted, "backend-not-set")
+ }
+ ep.services = make(map[string]*service)
+ ep.deviceTypeServiceMap = make(map[string]string)
+
+ // Load the adapters
+ blobs, err := ep.backend.List(context.Background(), "adapters")
+ if err != nil {
+ return err
+ }
+
+ // Data is marshalled as proto bytes in the data store
+ for _, blob := range blobs {
+ data := blob.Value.([]byte)
+ adapter := &voltha.Adapter{}
+ if err := proto.Unmarshal(data, adapter); err != nil {
+ return err
+ }
+ // A valid adapter should have the vendorID set
+ if adapter.Vendor != "" {
+ if _, ok := ep.services[adapter.Type]; !ok {
+ ep.services[adapter.Type] = &service{
+ id: adapter.Type,
+ totalReplicas: adapter.TotalReplicas,
+ replicas: make(map[ReplicaID]Endpoint),
+ consistentRing: consistent.New(nil, ep.getConsistentConfig()),
+ }
+
+ }
+ currentReplica := ReplicaID(adapter.CurrentReplica)
+ endpoint := Endpoint(adapter.Endpoint)
+ ep.services[adapter.Type].replicas[currentReplica] = endpoint
+ ep.services[adapter.Type].consistentRing.Add(newMember(adapter.Id, adapter.Type, adapter.Vendor, endpoint, adapter.Version, currentReplica))
+ }
+ }
+ // Load the device types
+ blobs, err = ep.backend.List(context.Background(), "device_types")
+ if err != nil {
+ return err
+ }
+ for _, blob := range blobs {
+ data := blob.Value.([]byte)
+ deviceType := &voltha.DeviceType{}
+ if err := proto.Unmarshal(data, deviceType); err != nil {
+ return err
+ }
+ if _, ok := ep.deviceTypeServiceMap[deviceType.Id]; !ok {
+ ep.deviceTypeServiceMap[deviceType.Id] = deviceType.Adapter
+ }
+ }
+
+ // Log the loaded data in debug mode to facilitate trouble shooting
+ if logger.V(log.DebugLevel) {
+ for key, val := range ep.services {
+ members := val.consistentRing.GetMembers()
+ logger.Debugw("service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())})
+ for _, m := range members {
+ n := m.(Member)
+ logger.Debugw("service-loaded", log.Fields{"serviceId": n.getID(), "serviceType": n.getServiceType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()})
+ }
+ }
+ logger.Debugw("device-types-loaded", log.Fields{"device-types": ep.deviceTypeServiceMap})
+ }
+ return nil
+}
+
+// makeKey creates the string that the hash function uses to create the hash
+func (ep *endpointManager) makeKey(deviceID string, deviceType string, serviceType string) []byte {
+ return []byte(fmt.Sprintf("%s_%s_%s", serviceType, deviceType, deviceID))
+}
+
+// The consistent package requires a hasher function
+type hasher struct{}
+
+// Sum64 provides the hasher function. Based upon numerous testing scenarios, the xxhash package seems to provide the
+// best distribution compare to other hash packages
+func (h hasher) Sum64(data []byte) uint64 {
+ return xxhash.Sum64(data)
+}
+
+// Member represents a member on the consistent ring
+type Member interface {
+ String() string
+ getReplica() ReplicaID
+ getEndPoint() Endpoint
+ getID() string
+ getServiceType() string
+}
+
+// member implements the Member interface
+type member struct {
+ id string
+ serviceType string
+ vendor string
+ version string
+ replica ReplicaID
+ endpoint Endpoint
+}
+
+func newMember(ID string, serviceType string, vendor string, endPoint Endpoint, version string, replica ReplicaID) Member {
+ return &member{
+ id: ID,
+ serviceType: serviceType,
+ vendor: vendor,
+ version: version,
+ replica: replica,
+ endpoint: endPoint,
+ }
+}
+
+func (m *member) String() string {
+ return string(m.endpoint)
+}
+
+func (m *member) getReplica() ReplicaID {
+ return m.replica
+}
+
+func (m *member) getEndPoint() Endpoint {
+ return m.endpoint
+}
+
+func (m *member) getID() string {
+ return m.id
+}
+
+func (m *member) getServiceType() string {
+ return m.serviceType
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
new file mode 100644
index 0000000..fc2334d
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
@@ -0,0 +1,994 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "reflect"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/golang/protobuf/ptypes/any"
+ "github.com/google/uuid"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+)
+
+const (
+ DefaultMaxRetries = 3
+ DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
+)
+
+const (
+ TransactionKey = "transactionID"
+ FromTopic = "fromTopic"
+)
+
+var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
+var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
+
+// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
+// obtained from that channel, this interface is invoked. This is used to handle
+// async requests into the Core via the kafka messaging bus
+type requestHandlerChannel struct {
+ requesthandlerInterface interface{}
+ ch <-chan *ic.InterContainerMessage
+}
+
+// transactionChannel represents a combination of a topic and a channel onto which a response received
+// on the kafka bus will be sent to
+type transactionChannel struct {
+ topic *Topic
+ ch chan *ic.InterContainerMessage
+}
+
+type InterContainerProxy interface {
+ Start() error
+ Stop()
+ GetDefaultTopic() *Topic
+ DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error
+ InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
+ InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
+ SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
+ SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error
+ UnSubscribeFromRequestHandler(topic Topic) error
+ DeleteTopic(topic Topic) error
+ EnableLivenessChannel(enable bool) chan bool
+ SendLiveness() error
+}
+
+// interContainerProxy represents the messaging proxy
+type interContainerProxy struct {
+ kafkaHost string
+ kafkaPort int
+ defaultTopic *Topic
+ defaultRequestHandlerInterface interface{}
+ deviceDiscoveryTopic *Topic
+ kafkaClient Client
+ doneCh chan struct{}
+ doneOnce sync.Once
+
+ // This map is used to map a topic to an interface and channel. When a request is received
+ // on that channel (registered to the topic) then that interface is invoked.
+ topicToRequestHandlerChannelMap map[string]*requestHandlerChannel
+ lockTopicRequestHandlerChannelMap sync.RWMutex
+
+ // This map is used to map a channel to a response topic. This channel handles all responses on that
+ // channel for that topic and forward them to the appropriate consumers channel, using the
+ // transactionIdToChannelMap.
+ topicToResponseChannelMap map[string]<-chan *ic.InterContainerMessage
+ lockTopicResponseChannelMap sync.RWMutex
+
+ // This map is used to map a transaction to a consumers channel. This is used whenever a request has been
+ // sent out and we are waiting for a response.
+ transactionIdToChannelMap map[string]*transactionChannel
+ lockTransactionIdToChannelMap sync.RWMutex
+}
+
+type InterContainerProxyOption func(*interContainerProxy)
+
+func InterContainerHost(host string) InterContainerProxyOption {
+ return func(args *interContainerProxy) {
+ args.kafkaHost = host
+ }
+}
+
+func InterContainerPort(port int) InterContainerProxyOption {
+ return func(args *interContainerProxy) {
+ args.kafkaPort = port
+ }
+}
+
+func DefaultTopic(topic *Topic) InterContainerProxyOption {
+ return func(args *interContainerProxy) {
+ args.defaultTopic = topic
+ }
+}
+
+func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
+ return func(args *interContainerProxy) {
+ args.deviceDiscoveryTopic = topic
+ }
+}
+
+func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
+ return func(args *interContainerProxy) {
+ args.defaultRequestHandlerInterface = handler
+ }
+}
+
+func MsgClient(client Client) InterContainerProxyOption {
+ return func(args *interContainerProxy) {
+ args.kafkaClient = client
+ }
+}
+
+func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
+ proxy := &interContainerProxy{
+ kafkaHost: DefaultKafkaHost,
+ kafkaPort: DefaultKafkaPort,
+ doneCh: make(chan struct{}),
+ }
+
+ for _, option := range opts {
+ option(proxy)
+ }
+
+ return proxy
+}
+
+func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
+ return newInterContainerProxy(opts...)
+}
+
+func (kp *interContainerProxy) Start() error {
+ logger.Info("Starting-Proxy")
+
+ // Kafka MsgClient should already have been created. If not, output fatal error
+ if kp.kafkaClient == nil {
+ logger.Fatal("kafka-client-not-set")
+ }
+
+ // Start the kafka client
+ if err := kp.kafkaClient.Start(); err != nil {
+ logger.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
+ return err
+ }
+
+ // Create the topic to response channel map
+ kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
+ //
+ // Create the transactionId to Channel Map
+ kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
+
+ // Create the topic to request channel map
+ kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
+
+ return nil
+}
+
+func (kp *interContainerProxy) Stop() {
+ logger.Info("stopping-intercontainer-proxy")
+ kp.doneOnce.Do(func() { close(kp.doneCh) })
+ // TODO : Perform cleanup
+ kp.kafkaClient.Stop()
+ err := kp.deleteAllTopicRequestHandlerChannelMap()
+ if err != nil {
+ logger.Errorw("failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
+ }
+ err = kp.deleteAllTopicResponseChannelMap()
+ if err != nil {
+ logger.Errorw("failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
+ }
+ kp.deleteAllTransactionIdToChannelMap()
+}
+
+func (kp *interContainerProxy) GetDefaultTopic() *Topic {
+ return kp.defaultTopic
+}
+
+// DeviceDiscovered publish the discovered device onto the kafka messaging bus
+func (kp *interContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
+ logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
+ // Simple validation
+ if deviceId == "" || deviceType == "" {
+ logger.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
+ return errors.New("invalid-parameters")
+ }
+ // Create the device discovery message
+ header := &ic.Header{
+ Id: uuid.New().String(),
+ Type: ic.MessageType_DEVICE_DISCOVERED,
+ FromTopic: kp.defaultTopic.Name,
+ ToTopic: kp.deviceDiscoveryTopic.Name,
+ Timestamp: time.Now().UnixNano(),
+ }
+ body := &ic.DeviceDiscovered{
+ Id: deviceId,
+ DeviceType: deviceType,
+ ParentId: parentId,
+ Publisher: publisher,
+ }
+
+ var marshalledData *any.Any
+ var err error
+ if marshalledData, err = ptypes.MarshalAny(body); err != nil {
+ logger.Errorw("cannot-marshal-request", log.Fields{"error": err})
+ return err
+ }
+ msg := &ic.InterContainerMessage{
+ Header: header,
+ Body: marshalledData,
+ }
+
+ // Send the message
+ if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
+ logger.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
+ return err
+ }
+ return nil
+}
+
+// InvokeAsyncRPC is used to make an RPC request asynchronously
+func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
+ waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
+
+ logger.Debugw("InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
+ // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
+ // typically the device ID.
+ responseTopic := replyToTopic
+ if responseTopic == nil {
+ responseTopic = kp.GetDefaultTopic()
+ }
+
+ chnl := make(chan *RpcResponse)
+
+ go func() {
+
+ // once we're done,
+ // close the response channel
+ defer close(chnl)
+
+ var err error
+ var protoRequest *ic.InterContainerMessage
+
+ // Encode the request
+ protoRequest, err = encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
+ if err != nil {
+ logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
+ chnl <- NewResponse(RpcFormattingError, err, nil)
+ return
+ }
+
+ // Subscribe for response, if needed, before sending request
+ var ch <-chan *ic.InterContainerMessage
+ if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
+ logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
+ chnl <- NewResponse(RpcTransportError, err, nil)
+ return
+ }
+
+ // Send request - if the topic is formatted with a device Id then we will send the request using a
+ // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
+ // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
+ logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
+
+ // if the message is not sent on kafka publish an event an close the channel
+ if err = kp.kafkaClient.Send(protoRequest, toTopic, key); err != nil {
+ chnl <- NewResponse(RpcTransportError, err, nil)
+ return
+ }
+
+ // if the client is not waiting for a response send the ack and close the channel
+ chnl <- NewResponse(RpcSent, nil, nil)
+ if !waitForResponse {
+ return
+ }
+
+ defer func() {
+ // Remove the subscription for a response on return
+ if err := kp.unSubscribeForResponse(protoRequest.Header.Id); err != nil {
+ logger.Warnw("invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
+ }
+ }()
+
+ // Wait for response as well as timeout or cancellation
+ select {
+ case msg, ok := <-ch:
+ if !ok {
+ logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
+ chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
+ }
+ logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
+ if responseBody, err := decodeResponse(msg); err != nil {
+ chnl <- NewResponse(RpcReply, err, nil)
+ } else {
+ if responseBody.Success {
+ chnl <- NewResponse(RpcReply, nil, responseBody.Result)
+ } else {
+ // response body contains an error
+ unpackErr := &ic.Error{}
+ if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
+ chnl <- NewResponse(RpcReply, err, nil)
+ } else {
+ chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
+ }
+ }
+ }
+ case <-ctx.Done():
+ logger.Errorw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
+ err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
+ chnl <- NewResponse(RpcTimeout, err, nil)
+ case <-kp.doneCh:
+ chnl <- NewResponse(RpcSystemClosing, nil, nil)
+ logger.Warnw("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
+ }
+ }()
+ return chnl
+}
+
+// InvokeRPC is used to send a request to a given topic
+func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
+ waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
+
+ // If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
+ // typically the device ID.
+ responseTopic := replyToTopic
+ if responseTopic == nil {
+ responseTopic = kp.defaultTopic
+ }
+
+ // Encode the request
+ protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, key, kvArgs...)
+ if err != nil {
+ logger.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
+ return false, nil
+ }
+
+ // Subscribe for response, if needed, before sending request
+ var ch <-chan *ic.InterContainerMessage
+ if waitForResponse {
+ var err error
+ if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
+ logger.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
+ }
+ }
+
+ // Send request - if the topic is formatted with a device Id then we will send the request using a
+ // specific key, hence ensuring a single partition is used to publish the request. This ensures that the
+ // subscriber on that topic will receive the request in the order it was sent. The key used is the deviceId.
+ //key := GetDeviceIdFromTopic(*toTopic)
+ logger.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
+ go func() {
+ if err := kp.kafkaClient.Send(protoRequest, toTopic, key); err != nil {
+ logger.Errorw("send-failed", log.Fields{
+ "topic": toTopic,
+ "key": key,
+ "error": err})
+ }
+ }()
+
+ if waitForResponse {
+ // Create a child context based on the parent context, if any
+ var cancel context.CancelFunc
+ childCtx := context.Background()
+ if ctx == nil {
+ ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
+ } else {
+ childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
+ }
+ defer cancel()
+
+ // Wait for response as well as timeout or cancellation
+ // Remove the subscription for a response on return
+ defer func() {
+ if err := kp.unSubscribeForResponse(protoRequest.Header.Id); err != nil {
+ logger.Errorw("response-unsubscribe-failed", log.Fields{
+ "id": protoRequest.Header.Id,
+ "error": err})
+ }
+ }()
+ select {
+ case msg, ok := <-ch:
+ if !ok {
+ logger.Warnw("channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
+ protoError := &ic.Error{Reason: "channel-closed"}
+ var marshalledArg *any.Any
+ if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
+ return false, nil // Should never happen
+ }
+ return false, marshalledArg
+ }
+ logger.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
+ var responseBody *ic.InterContainerResponseBody
+ var err error
+ if responseBody, err = decodeResponse(msg); err != nil {
+ logger.Errorw("decode-response-error", log.Fields{"error": err})
+ // FIXME we should return something
+ }
+ return responseBody.Success, responseBody.Result
+ case <-ctx.Done():
+ logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
+ // pack the error as proto any type
+ protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
+
+ var marshalledArg *any.Any
+ if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
+ return false, nil // Should never happen
+ }
+ return false, marshalledArg
+ case <-childCtx.Done():
+ logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
+ // pack the error as proto any type
+ protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
+
+ var marshalledArg *any.Any
+ if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
+ return false, nil // Should never happen
+ }
+ return false, marshalledArg
+ case <-kp.doneCh:
+ logger.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
+ return true, nil
+ }
+ }
+ return true, nil
+}
+
+// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
+// when a message is received on a given topic
+func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
+
+ // Subscribe to receive messages for that topic
+ var ch <-chan *ic.InterContainerMessage
+ var err error
+ if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
+ //if ch, err = kp.Subscribe(topic); err != nil {
+ logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+ return err
+ }
+
+ kp.defaultRequestHandlerInterface = handler
+ kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
+ // Launch a go routine to receive and process kafka messages
+ go kp.waitForMessages(ch, topic, handler)
+
+ return nil
+}
+
+// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
+// when a message is received on a given topic. So far there is only 1 target registered per microservice
+func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
+ // Subscribe to receive messages for that topic
+ var ch <-chan *ic.InterContainerMessage
+ var err error
+ if ch, err = kp.kafkaClient.Subscribe(&topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
+ logger.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+ return err
+ }
+ kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
+
+ // Launch a go routine to receive and process kafka messages
+ go kp.waitForMessages(ch, topic, kp.defaultRequestHandlerInterface)
+
+ return nil
+}
+
+func (kp *interContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
+ return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
+}
+
+func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
+ kp.lockTopicResponseChannelMap.Lock()
+ defer kp.lockTopicResponseChannelMap.Unlock()
+ if _, exist := kp.topicToResponseChannelMap[topic]; exist {
+ // Unsubscribe to this topic first - this will close the subscribed channel
+ var err error
+ if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+ logger.Errorw("unsubscribing-error", log.Fields{"topic": topic})
+ }
+ delete(kp.topicToResponseChannelMap, topic)
+ return err
+ } else {
+ return fmt.Errorf("%s-Topic-not-found", topic)
+ }
+}
+
+// nolint: unused
+func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
+ logger.Debug("delete-all-topic-response-channel")
+ kp.lockTopicResponseChannelMap.Lock()
+ defer kp.lockTopicResponseChannelMap.Unlock()
+ var unsubscribeFailTopics []string
+ for topic := range kp.topicToResponseChannelMap {
+ // Unsubscribe to this topic first - this will close the subscribed channel
+ if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+ unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
+ logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+ // Do not return. Continue to try to unsubscribe to other topics.
+ } else {
+ // Only delete from channel map if successfully unsubscribed.
+ delete(kp.topicToResponseChannelMap, topic)
+ }
+ }
+ if len(unsubscribeFailTopics) > 0 {
+ return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+ }
+ return nil
+}
+
+func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
+ kp.lockTopicRequestHandlerChannelMap.Lock()
+ defer kp.lockTopicRequestHandlerChannelMap.Unlock()
+ if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
+ kp.topicToRequestHandlerChannelMap[topic] = arg
+ }
+}
+
+func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
+ kp.lockTopicRequestHandlerChannelMap.Lock()
+ defer kp.lockTopicRequestHandlerChannelMap.Unlock()
+ if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
+ // Close the kafka client client first by unsubscribing to this topic
+ if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+ return err
+ }
+ delete(kp.topicToRequestHandlerChannelMap, topic)
+ return nil
+ } else {
+ return fmt.Errorf("%s-Topic-not-found", topic)
+ }
+}
+
+// nolint: unused
+func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
+ logger.Debug("delete-all-topic-request-channel")
+ kp.lockTopicRequestHandlerChannelMap.Lock()
+ defer kp.lockTopicRequestHandlerChannelMap.Unlock()
+ var unsubscribeFailTopics []string
+ for topic := range kp.topicToRequestHandlerChannelMap {
+ // Close the kafka client client first by unsubscribing to this topic
+ if err := kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+ unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
+ logger.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+ // Do not return. Continue to try to unsubscribe to other topics.
+ } else {
+ // Only delete from channel map if successfully unsubscribed.
+ delete(kp.topicToRequestHandlerChannelMap, topic)
+ }
+ }
+ if len(unsubscribeFailTopics) > 0 {
+ return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+ }
+ return nil
+}
+
+func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
+ kp.lockTransactionIdToChannelMap.Lock()
+ defer kp.lockTransactionIdToChannelMap.Unlock()
+ if _, exist := kp.transactionIdToChannelMap[id]; !exist {
+ kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
+ }
+}
+
+func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
+ kp.lockTransactionIdToChannelMap.Lock()
+ defer kp.lockTransactionIdToChannelMap.Unlock()
+ if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
+ // Close the channel first
+ close(transChannel.ch)
+ delete(kp.transactionIdToChannelMap, id)
+ }
+}
+
+func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
+ kp.lockTransactionIdToChannelMap.Lock()
+ defer kp.lockTransactionIdToChannelMap.Unlock()
+ for key, value := range kp.transactionIdToChannelMap {
+ if value.topic.Name == id {
+ close(value.ch)
+ delete(kp.transactionIdToChannelMap, key)
+ }
+ }
+}
+
+// nolint: unused
+func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
+ logger.Debug("delete-all-transaction-id-channel-map")
+ kp.lockTransactionIdToChannelMap.Lock()
+ defer kp.lockTransactionIdToChannelMap.Unlock()
+ for key, value := range kp.transactionIdToChannelMap {
+ close(value.ch)
+ delete(kp.transactionIdToChannelMap, key)
+ }
+}
+
+func (kp *interContainerProxy) DeleteTopic(topic Topic) error {
+ // If we have any consumers on that topic we need to close them
+ if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
+ logger.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
+ }
+ if err := kp.deleteFromTopicRequestHandlerChannelMap(topic.Name); err != nil {
+ logger.Errorw("delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
+ }
+ kp.deleteTopicTransactionIdToChannelMap(topic.Name)
+
+ return kp.kafkaClient.DeleteTopic(&topic)
+}
+
+func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
+ // Encode the response argument - needs to be a proto message
+ if returnedVal == nil {
+ return nil, nil
+ }
+ protoValue, ok := returnedVal.(proto.Message)
+ if !ok {
+ logger.Warnw("response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
+ err := errors.New("response-value-not-proto-message")
+ return nil, err
+ }
+
+ // Marshal the returned value, if any
+ var marshalledReturnedVal *any.Any
+ var err error
+ if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
+ logger.Warnw("cannot-marshal-returned-val", log.Fields{"error": err})
+ return nil, err
+ }
+ return marshalledReturnedVal, nil
+}
+
+func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
+ responseHeader := &ic.Header{
+ Id: request.Header.Id,
+ Type: ic.MessageType_RESPONSE,
+ FromTopic: request.Header.ToTopic,
+ ToTopic: request.Header.FromTopic,
+ Timestamp: time.Now().UnixNano(),
+ }
+ responseBody := &ic.InterContainerResponseBody{
+ Success: false,
+ Result: nil,
+ }
+ var marshalledResponseBody *any.Any
+ var err error
+ // Error should never happen here
+ if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
+ logger.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
+ }
+
+ return &ic.InterContainerMessage{
+ Header: responseHeader,
+ Body: marshalledResponseBody,
+ }
+
+}
+
+//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
+//or an error on failure
+func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
+ //logger.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
+ responseHeader := &ic.Header{
+ Id: request.Header.Id,
+ Type: ic.MessageType_RESPONSE,
+ FromTopic: request.Header.ToTopic,
+ ToTopic: request.Header.FromTopic,
+ KeyTopic: request.Header.KeyTopic,
+ Timestamp: time.Now().UnixNano(),
+ }
+
+ // Go over all returned values
+ var marshalledReturnedVal *any.Any
+ var err error
+
+ // for now we support only 1 returned value - (excluding the error)
+ if len(returnedValues) > 0 {
+ if marshalledReturnedVal, err = encodeReturnedValue(returnedValues[0]); err != nil {
+ logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
+ }
+ }
+
+ responseBody := &ic.InterContainerResponseBody{
+ Success: success,
+ Result: marshalledReturnedVal,
+ }
+
+ // Marshal the response body
+ var marshalledResponseBody *any.Any
+ if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
+ logger.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
+ return nil, err
+ }
+
+ return &ic.InterContainerMessage{
+ Header: responseHeader,
+ Body: marshalledResponseBody,
+ }, nil
+}
+
+func CallFuncByName(myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
+ myClassValue := reflect.ValueOf(myClass)
+ // Capitalize the first letter in the funcName to workaround the first capital letters required to
+ // invoke a function from a different package
+ funcName = strings.Title(funcName)
+ m := myClassValue.MethodByName(funcName)
+ if !m.IsValid() {
+ return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
+ }
+ in := make([]reflect.Value, len(params))
+ for i, param := range params {
+ in[i] = reflect.ValueOf(param)
+ }
+ out = m.Call(in)
+ return
+}
+
+func (kp *interContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
+ arg := &KVArg{
+ Key: TransactionKey,
+ Value: &ic.StrType{Val: transactionId},
+ }
+
+ var marshalledArg *any.Any
+ var err error
+ if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
+ logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
+ return currentArgs
+ }
+ protoArg := &ic.Argument{
+ Key: arg.Key,
+ Value: marshalledArg,
+ }
+ return append(currentArgs, protoArg)
+}
+
+func (kp *interContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
+ var marshalledArg *any.Any
+ var err error
+ if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
+ logger.Warnw("cannot-add-transactionId", log.Fields{"error": err})
+ return currentArgs
+ }
+ protoArg := &ic.Argument{
+ Key: FromTopic,
+ Value: marshalledArg,
+ }
+ return append(currentArgs, protoArg)
+}
+
+func (kp *interContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
+
+ // First extract the header to know whether this is a request - responses are handled by a different handler
+ if msg.Header.Type == ic.MessageType_REQUEST {
+ var out []reflect.Value
+ var err error
+
+ // Get the request body
+ requestBody := &ic.InterContainerRequestBody{}
+ if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
+ logger.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
+ } else {
+ logger.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
+ // let the callee unpack the arguments as its the only one that knows the real proto type
+ // Augment the requestBody with the message Id as it will be used in scenarios where cores
+ // are set in pairs and competing
+ requestBody.Args = kp.addTransactionId(msg.Header.Id, requestBody.Args)
+
+ // Augment the requestBody with the From topic name as it will be used in scenarios where a container
+ // needs to send an unsollicited message to the currently requested container
+ requestBody.Args = kp.addFromTopic(msg.Header.FromTopic, requestBody.Args)
+
+ out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
+ if err != nil {
+ logger.Warn(err)
+ }
+ }
+ // Response required?
+ if requestBody.ResponseRequired {
+ // If we already have an error before then just return that
+ var returnError *ic.Error
+ var returnedValues []interface{}
+ var success bool
+ if err != nil {
+ returnError = &ic.Error{Reason: err.Error()}
+ returnedValues = make([]interface{}, 1)
+ returnedValues[0] = returnError
+ } else {
+ returnedValues = make([]interface{}, 0)
+ // Check for errors first
+ lastIndex := len(out) - 1
+ if out[lastIndex].Interface() != nil { // Error
+ if retError, ok := out[lastIndex].Interface().(error); ok {
+ if retError.Error() == ErrorTransactionNotAcquired.Error() {
+ logger.Debugw("Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
+ return // Ignore - process is in competing mode and ignored transaction
+ }
+ returnError = &ic.Error{Reason: retError.Error()}
+ returnedValues = append(returnedValues, returnError)
+ } else { // Should never happen
+ returnError = &ic.Error{Reason: "incorrect-error-returns"}
+ returnedValues = append(returnedValues, returnError)
+ }
+ } else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
+ logger.Warnw("Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
+ return // Ignore - should not happen
+ } else { // Non-error case
+ success = true
+ for idx, val := range out {
+ //logger.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
+ if idx != lastIndex {
+ returnedValues = append(returnedValues, val.Interface())
+ }
+ }
+ }
+ }
+
+ var icm *ic.InterContainerMessage
+ if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
+ logger.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
+ icm = encodeDefaultFailedResponse(msg)
+ }
+ // To preserve ordering of messages, all messages to a given topic are sent to the same partition
+ // by providing a message key. The key is encoded in the topic name. If the deviceId is not
+ // present then the key will be empty, hence all messages for a given topic will be sent to all
+ // partitions.
+ replyTopic := &Topic{Name: msg.Header.FromTopic}
+ key := msg.Header.KeyTopic
+ logger.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
+ // TODO: handle error response.
+ go func() {
+ if err := kp.kafkaClient.Send(icm, replyTopic, key); err != nil {
+ logger.Errorw("send-reply-failed", log.Fields{
+ "topic": replyTopic,
+ "key": key,
+ "error": err})
+ }
+ }()
+ }
+ } else if msg.Header.Type == ic.MessageType_RESPONSE {
+ logger.Debugw("response-received", log.Fields{"msg-header": msg.Header})
+ go kp.dispatchResponse(msg)
+ } else {
+ logger.Warnw("unsupported-message-received", log.Fields{"msg-header": msg.Header})
+ }
+}
+
+func (kp *interContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
+ // Wait for messages
+ for msg := range ch {
+ //logger.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
+ go kp.handleMessage(msg, targetInterface)
+ }
+}
+
+func (kp *interContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
+ kp.lockTransactionIdToChannelMap.RLock()
+ defer kp.lockTransactionIdToChannelMap.RUnlock()
+ if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
+ logger.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
+ return
+ }
+ kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
+}
+
+// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
+// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
+// API. There is one response channel waiting for kafka messages before dispatching the message to the
+// corresponding waiting channel
+func (kp *interContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
+ logger.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
+
+ // Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
+ // broadcast any message for this topic to all channels waiting on it.
+ // Set channel size to 1 to prevent deadlock, see VOL-2708
+ ch := make(chan *ic.InterContainerMessage, 1)
+ kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
+
+ return ch, nil
+}
+
+func (kp *interContainerProxy) unSubscribeForResponse(trnsId string) error {
+ logger.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
+ kp.deleteFromTransactionIdToChannelMap(trnsId)
+ return nil
+}
+
+func (kp *interContainerProxy) EnableLivenessChannel(enable bool) chan bool {
+ return kp.kafkaClient.EnableLivenessChannel(enable)
+}
+
+func (kp *interContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
+ return kp.kafkaClient.EnableHealthinessChannel(enable)
+}
+
+func (kp *interContainerProxy) SendLiveness() error {
+ return kp.kafkaClient.SendLiveness()
+}
+
+//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
+//or an error on failure
+func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
+ requestHeader := &ic.Header{
+ Id: uuid.New().String(),
+ Type: ic.MessageType_REQUEST,
+ FromTopic: replyTopic.Name,
+ ToTopic: toTopic.Name,
+ KeyTopic: key,
+ Timestamp: time.Now().UnixNano(),
+ }
+ requestBody := &ic.InterContainerRequestBody{
+ Rpc: rpc,
+ ResponseRequired: true,
+ ReplyToTopic: replyTopic.Name,
+ }
+
+ for _, arg := range kvArgs {
+ if arg == nil {
+ // In case the caller sends an array with empty args
+ continue
+ }
+ var marshalledArg *any.Any
+ var err error
+ // ascertain the value interface type is a proto.Message
+ protoValue, ok := arg.Value.(proto.Message)
+ if !ok {
+ logger.Warnw("argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
+ err := errors.New("argument-value-not-proto-message")
+ return nil, err
+ }
+ if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
+ logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
+ return nil, err
+ }
+ protoArg := &ic.Argument{
+ Key: arg.Key,
+ Value: marshalledArg,
+ }
+ requestBody.Args = append(requestBody.Args, protoArg)
+ }
+
+ var marshalledData *any.Any
+ var err error
+ if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
+ logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
+ return nil, err
+ }
+ request := &ic.InterContainerMessage{
+ Header: requestHeader,
+ Body: marshalledData,
+ }
+ return request, nil
+}
+
+func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
+ // Extract the message body
+ responseBody := ic.InterContainerResponseBody{}
+ if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
+ logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, err
+ }
+ //logger.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
+
+ return &responseBody, nil
+
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
similarity index 74%
rename from vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
index a251c56..deb72fd 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
@@ -16,25 +16,22 @@
package kafka
import (
+ "context"
"errors"
"fmt"
- "github.com/Shopify/sarama"
- scc "github.com/bsm/sarama-cluster"
- "github.com/golang/protobuf/proto"
- "github.com/google/uuid"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- ic "github.com/opencord/voltha-protos/v2/go/inter_container"
"strings"
"sync"
"time"
+
+ "github.com/Shopify/sarama"
+ scc "github.com/bsm/sarama-cluster"
+ "github.com/eapache/go-resiliency/breaker"
+ "github.com/golang/protobuf/proto"
+ "github.com/google/uuid"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
)
-func init() {
- log.AddPackage(log.JSON, log.DebugLevel, nil)
-}
-
-type returnErrorFunction func() error
-
// consumerChannels represents one or more consumers listening on a kafka topic. Once a message is received on that
// topic, the consumer(s) broadcasts the message to all the listening channels. The consumer can be a partition
//consumer or a group consumer
@@ -43,10 +40,12 @@
channels []chan *ic.InterContainerMessage
}
+// static check to ensure SaramaClient implements Client
+var _ Client = &SaramaClient{}
+
// SaramaClient represents the messaging proxy
type SaramaClient struct {
cAdmin sarama.ClusterAdmin
- client sarama.Client
KafkaHost string
KafkaPort int
producer sarama.AsyncProducer
@@ -69,6 +68,7 @@
numReplicas int
autoCreateTopic bool
doneCh chan int
+ metadataCallback func(fromTopic string, timestamp int64)
topicToConsumerChannelMap map[string]*consumerChannels
lockTopicToConsumerChannelMap sync.RWMutex
topicLockMap map[string]*sync.RWMutex
@@ -79,6 +79,8 @@
livenessChannelInterval time.Duration
lastLivenessTime time.Time
started bool
+ healthy bool
+ healthiness chan bool
}
type SaramaClientOption func(*SaramaClient)
@@ -229,14 +231,15 @@
client.lockOfTopicLockMap = sync.RWMutex{}
client.lockOfGroupConsumers = sync.RWMutex{}
- // alive until proven otherwise
+ // healthy and alive until proven otherwise
client.alive = true
+ client.healthy = true
return client
}
func (sc *SaramaClient) Start() error {
- log.Info("Starting-kafka-sarama-client")
+ logger.Info("Starting-kafka-sarama-client")
// Create the Done channel
sc.doneCh = make(chan int, 1)
@@ -252,20 +255,20 @@
// Create the Cluster Admin
if err = sc.createClusterAdmin(); err != nil {
- log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
+ logger.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
return err
}
// Create the Publisher
if err := sc.createPublisher(); err != nil {
- log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
+ logger.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
return err
}
if sc.consumerType == DefaultConsumerType {
// Create the master consumers
if err := sc.createConsumer(); err != nil {
- log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
+ logger.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
return err
}
}
@@ -273,7 +276,7 @@
// Create the topic to consumers/channel map
sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
- log.Info("kafka-sarama-client-started")
+ logger.Info("kafka-sarama-client-started")
sc.started = true
@@ -281,7 +284,7 @@
}
func (sc *SaramaClient) Stop() {
- log.Info("stopping-sarama-client")
+ logger.Info("stopping-sarama-client")
sc.started = false
@@ -290,33 +293,33 @@
if sc.producer != nil {
if err := sc.producer.Close(); err != nil {
- log.Errorw("closing-producer-failed", log.Fields{"error": err})
+ logger.Errorw("closing-producer-failed", log.Fields{"error": err})
}
}
if sc.consumer != nil {
if err := sc.consumer.Close(); err != nil {
- log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
+ logger.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
}
}
for key, val := range sc.groupConsumers {
- log.Debugw("closing-group-consumer", log.Fields{"topic": key})
+ logger.Debugw("closing-group-consumer", log.Fields{"topic": key})
if err := val.Close(); err != nil {
- log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
+ logger.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
}
}
if sc.cAdmin != nil {
if err := sc.cAdmin.Close(); err != nil {
- log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
+ logger.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
}
}
//TODO: Clear the consumers map
//sc.clearConsumerChannelMap()
- log.Info("sarama-client-stopped")
+ logger.Info("sarama-client-stopped")
}
//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
@@ -333,15 +336,15 @@
if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
if err == sarama.ErrTopicAlreadyExists {
// Not an error
- log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
+ logger.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
return nil
}
- log.Errorw("create-topic-failure", log.Fields{"error": err})
+ logger.Errorw("create-topic-failure", log.Fields{"error": err})
return err
}
// TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
// do so.
- log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
+ logger.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
return nil
}
@@ -363,16 +366,16 @@
if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
if err == sarama.ErrUnknownTopicOrPartition {
// Not an error as does not exist
- log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
+ logger.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
return nil
}
- log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
+ logger.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
return err
}
// Clear the topic from the consumer channel. This will also close any consumers listening on that topic.
if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
- log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
+ logger.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
return err
}
return nil
@@ -384,11 +387,11 @@
sc.lockTopic(topic)
defer sc.unLockTopic(topic)
- log.Debugw("subscribe", log.Fields{"topic": topic.Name})
+ logger.Debugw("subscribe", log.Fields{"topic": topic.Name})
// If a consumers already exist for that topic then resuse it
if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
- log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
+ logger.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
// Create a channel specific for that consumers and add it to the consumers channel map
ch := make(chan *ic.InterContainerMessage)
sc.addChannelToConsumerChannelMap(topic, ch)
@@ -403,12 +406,12 @@
if sc.consumerType == PartitionConsumer {
if sc.autoCreateTopic {
if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
- log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
+ logger.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
}
if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, getOffset(kvArgs...)); err != nil {
- log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+ logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
} else if sc.consumerType == GroupCustomer {
@@ -416,7 +419,7 @@
// does not consume from a precreated topic in some scenarios
//if sc.autoCreateTopic {
// if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
- // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
+ // logger.Errorw("create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
// return nil, err
// }
//}
@@ -430,12 +433,12 @@
groupId = sc.consumerGroupPrefix + topic.Name
}
if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId, getOffset(kvArgs...)); err != nil {
- log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+ logger.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
return nil, err
}
} else {
- log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
+ logger.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
return nil, errors.New("unknown-consumer-type")
}
@@ -447,17 +450,21 @@
sc.lockTopic(topic)
defer sc.unLockTopic(topic)
- log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
+ logger.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
var err error
if err = sc.removeChannelFromConsumerChannelMap(*topic, ch); err != nil {
- log.Errorw("failed-removing-channel", log.Fields{"error": err})
+ logger.Errorw("failed-removing-channel", log.Fields{"error": err})
}
if err = sc.deleteFromGroupConsumers(topic.Name); err != nil {
- log.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
+ logger.Errorw("failed-deleting-group-consumer", log.Fields{"error": err})
}
return err
}
+func (sc *SaramaClient) SubscribeForMetadata(callback func(fromTopic string, timestamp int64)) {
+ sc.metadataCallback = callback
+}
+
func (sc *SaramaClient) updateLiveness(alive bool) {
// Post a consistent stream of liveness data to the channel,
// so that in a live state, the core does not timeout and
@@ -465,11 +472,11 @@
// events to the channel is rate-limited by livenessChannelInterval.
if sc.liveness != nil {
if sc.alive != alive {
- log.Info("update-liveness-channel-because-change")
+ logger.Info("update-liveness-channel-because-change")
sc.liveness <- alive
sc.lastLivenessTime = time.Now()
- } else if time.Now().Sub(sc.lastLivenessTime) > sc.livenessChannelInterval {
- log.Info("update-liveness-channel-because-interval")
+ } else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
+ logger.Info("update-liveness-channel-because-interval")
sc.liveness <- alive
sc.lastLivenessTime = time.Now()
}
@@ -477,11 +484,68 @@
// Only emit a log message when the state changes
if sc.alive != alive {
- log.Info("set-client-alive", log.Fields{"alive": alive})
+ logger.Info("set-client-alive", log.Fields{"alive": alive})
sc.alive = alive
}
}
+// Once unhealthy, we never go back
+func (sc *SaramaClient) setUnhealthy() {
+ sc.healthy = false
+ if sc.healthiness != nil {
+ logger.Infow("set-client-unhealthy", log.Fields{"healthy": sc.healthy})
+ sc.healthiness <- sc.healthy
+ }
+}
+
+func (sc *SaramaClient) isLivenessError(err error) bool {
+ // Sarama producers and consumers encapsulate the error inside
+ // a ProducerError or ConsumerError struct.
+ if prodError, ok := err.(*sarama.ProducerError); ok {
+ err = prodError.Err
+ } else if consumerError, ok := err.(*sarama.ConsumerError); ok {
+ err = consumerError.Err
+ }
+
+ // Sarama-Cluster will compose the error into a ClusterError struct,
+ // which we can't do a compare by reference. To handle that, we the
+ // best we can do is compare the error strings.
+
+ switch err.Error() {
+ case context.DeadlineExceeded.Error():
+ logger.Info("is-liveness-error-timeout")
+ return true
+ case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
+ logger.Info("is-liveness-error-no-brokers")
+ return true
+ case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
+ logger.Info("is-liveness-error-shutting-down")
+ return true
+ case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
+ logger.Info("is-liveness-error-not-available")
+ return true
+ case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
+ logger.Info("is-liveness-error-circuit-breaker-open")
+ return true
+ }
+
+ if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
+ logger.Info("is-liveness-error-connection-refused")
+ return true
+ }
+
+ if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
+ logger.Info("is-liveness-error-io-timeout")
+ return true
+ }
+
+ // Other errors shouldn't trigger a loss of liveness
+
+ logger.Infow("is-liveness-error-ignored", log.Fields{"err": err})
+
+ return false
+}
+
// send formats and sends the request onto the kafka messaging bus.
func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
@@ -490,15 +554,15 @@
var ok bool
// ascertain the value interface type is a proto.Message
if protoMsg, ok = msg.(proto.Message); !ok {
- log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
- return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
+ logger.Warnw("message-not-proto-message", log.Fields{"msg": msg})
+ return fmt.Errorf("not-a-proto-msg-%s", msg)
}
var marshalled []byte
var err error
// Create the Sarama producer message
if marshalled, err = proto.Marshal(protoMsg); err != nil {
- log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
+ logger.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
return err
}
key := ""
@@ -517,11 +581,11 @@
// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
select {
case ok := <-sc.producer.Successes():
- log.Debugw("message-sent", log.Fields{"status": ok.Topic})
+ logger.Debugw("message-sent", log.Fields{"status": ok.Topic})
sc.updateLiveness(true)
case notOk := <-sc.producer.Errors():
- log.Debugw("error-sending", log.Fields{"status": notOk})
- if strings.Contains(notOk.Error(), "Failed to produce") {
+ logger.Debugw("error-sending", log.Fields{"status": notOk})
+ if sc.isLivenessError(notOk) {
sc.updateLiveness(false)
}
return notOk
@@ -535,10 +599,10 @@
// by the service (i.e. rw_core / ro_core) to update readiness status
// and/or take other actions.
func (sc *SaramaClient) EnableLivenessChannel(enable bool) chan bool {
- log.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
+ logger.Infow("kafka-enable-liveness-channel", log.Fields{"enable": enable})
if enable {
if sc.liveness == nil {
- log.Info("kafka-create-liveness-channel")
+ logger.Info("kafka-create-liveness-channel")
// At least 1, so we can immediately post to it without blocking
// Setting a bigger number (10) allows the monitor to fall behind
// without blocking others. The monitor shouldn't really fall
@@ -555,6 +619,30 @@
return sc.liveness
}
+// Enable the Healthiness monitor channel. This channel will report "false"
+// if the kafka consumers die, or some other problem occurs which is
+// catastrophic that would require re-creating the client.
+func (sc *SaramaClient) EnableHealthinessChannel(enable bool) chan bool {
+ logger.Infow("kafka-enable-healthiness-channel", log.Fields{"enable": enable})
+ if enable {
+ if sc.healthiness == nil {
+ logger.Info("kafka-create-healthiness-channel")
+ // At least 1, so we can immediately post to it without blocking
+ // Setting a bigger number (10) allows the monitor to fall behind
+ // without blocking others. The monitor shouldn't really fall
+ // behind...
+ sc.healthiness = make(chan bool, 10)
+ // post intial state to the channel
+ sc.healthiness <- sc.healthy
+ }
+ } else {
+ // TODO: Think about whether we need the ability to turn off
+ // liveness monitoring
+ panic("Turning off healthiness reporting is not supported")
+ }
+ return sc.healthiness
+}
+
// send an empty message on the liveness channel to check whether connectivity has
// been restored.
func (sc *SaramaClient) SendLiveness() error {
@@ -573,11 +661,11 @@
// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
select {
case ok := <-sc.producer.Successes():
- log.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
+ logger.Debugw("liveness-message-sent", log.Fields{"status": ok.Topic})
sc.updateLiveness(true)
case notOk := <-sc.producer.Errors():
- log.Debugw("liveness-error-sending", log.Fields{"status": notOk})
- if strings.Contains(notOk.Error(), "Failed to produce") {
+ logger.Debugw("liveness-error-sending", log.Fields{"status": notOk})
+ if sc.isLivenessError(notOk) {
sc.updateLiveness(false)
}
return notOk
@@ -614,7 +702,7 @@
var cAdmin sarama.ClusterAdmin
var err error
if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
- log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
+ logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
return err
}
sc.cAdmin = cAdmin
@@ -649,14 +737,6 @@
}
}
-func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
- sc.lockTopicToConsumerChannelMap.Lock()
- defer sc.lockTopicToConsumerChannelMap.Unlock()
- if _, exist := sc.topicToConsumerChannelMap[id]; exist {
- delete(sc.topicToConsumerChannelMap, id)
- }
-}
-
func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
sc.lockTopicToConsumerChannelMap.RLock()
defer sc.lockTopicToConsumerChannelMap.RUnlock()
@@ -674,7 +754,7 @@
consumerCh.channels = append(consumerCh.channels, ch)
return
}
- log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
+ logger.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
}
//closeConsumers closes a list of sarama consumers. The consumers can either be a partition consumers or a group consumers
@@ -684,7 +764,7 @@
// Is it a partition consumers?
if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
if errTemp := partionConsumer.Close(); errTemp != nil {
- log.Debugw("partition!!!", log.Fields{"err": errTemp})
+ logger.Debugw("partition!!!", log.Fields{"err": errTemp})
if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
// This can occur on race condition
err = nil
@@ -714,7 +794,7 @@
consumerCh.channels = removeChannel(consumerCh.channels, ch)
// If there are no more channels then we can close the consumers itself
if len(consumerCh.channels) == 0 {
- log.Debugw("closing-consumers", log.Fields{"topic": topic})
+ logger.Debugw("closing-consumers", log.Fields{"topic": topic})
err := closeConsumers(consumerCh.consumers)
//err := consumerCh.consumers.Close()
delete(sc.topicToConsumerChannelMap, topic.Name)
@@ -722,7 +802,7 @@
}
return nil
}
- log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
+ logger.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
return errors.New("topic-does-not-exist")
}
@@ -743,28 +823,10 @@
delete(sc.topicToConsumerChannelMap, topic.Name)
return err
}
- log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
+ logger.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
return nil
}
-func (sc *SaramaClient) clearConsumerChannelMap() error {
- sc.lockTopicToConsumerChannelMap.Lock()
- defer sc.lockTopicToConsumerChannelMap.Unlock()
- var err error
- for topic, consumerCh := range sc.topicToConsumerChannelMap {
- for _, ch := range consumerCh.channels {
- // Channel will be closed in the removeChannel method
- removeChannel(consumerCh.channels, ch)
- }
- if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
- err = errTemp
- }
- //err = consumerCh.consumers.Close()
- delete(sc.topicToConsumerChannelMap, topic)
- }
- return err
-}
-
//createPublisher creates the publisher which is used to send a message onto kafka
func (sc *SaramaClient) createPublisher() error {
// This Creates the publisher
@@ -782,12 +844,12 @@
brokers := []string{kafkaFullAddr}
if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
- log.Errorw("error-starting-publisher", log.Fields{"error": err})
+ logger.Errorw("error-starting-publisher", log.Fields{"error": err})
return err
} else {
sc.producer = producer
}
- log.Info("Kafka-publisher-created")
+ logger.Info("Kafka-publisher-created")
return nil
}
@@ -803,12 +865,12 @@
brokers := []string{kafkaFullAddr}
if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
- log.Errorw("error-starting-consumers", log.Fields{"error": err})
+ logger.Errorw("error-starting-consumers", log.Fields{"error": err})
return err
} else {
sc.consumer = consumer
}
- log.Info("Kafka-consumers-created")
+ logger.Info("Kafka-consumers-created")
return nil
}
@@ -832,10 +894,10 @@
var err error
if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
- log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+ logger.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
return nil, err
}
- log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
+ logger.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
//sc.groupConsumers[topic.Name] = consumer
sc.addToGroupConsumers(topic.Name, consumer)
@@ -847,93 +909,106 @@
func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
// Need to go over all channels and publish messages to them - do we need to copy msg?
sc.lockTopicToConsumerChannelMap.RLock()
- defer sc.lockTopicToConsumerChannelMap.RUnlock()
for _, ch := range consumerCh.channels {
go func(c chan *ic.InterContainerMessage) {
c <- protoMessage
}(ch)
}
+ sc.lockTopicToConsumerChannelMap.RUnlock()
+
+ if callback := sc.metadataCallback; callback != nil {
+ callback(protoMessage.Header.FromTopic, protoMessage.Header.Timestamp)
+ }
}
func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
- log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
+ logger.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
startloop:
for {
select {
case err, ok := <-consumer.Errors():
if ok {
- log.Warnw("partition-consumers-error", log.Fields{"error": err})
+ if sc.isLivenessError(err) {
+ sc.updateLiveness(false)
+ logger.Warnw("partition-consumers-error", log.Fields{"error": err})
+ }
} else {
// Channel is closed
break startloop
}
case msg, ok := <-consumer.Messages():
- //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
+ //logger.Debugw("message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
if !ok {
// channel is closed
break startloop
}
msgBody := msg.Value
+ sc.updateLiveness(true)
+ logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
icm := &ic.InterContainerMessage{}
if err := proto.Unmarshal(msgBody, icm); err != nil {
- log.Warnw("partition-invalid-message", log.Fields{"error": err})
+ logger.Warnw("partition-invalid-message", log.Fields{"error": err})
continue
}
go sc.dispatchToConsumers(consumerChnls, icm)
case <-sc.doneCh:
- log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
+ logger.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
break startloop
}
}
- log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
+ logger.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
+ sc.setUnhealthy()
}
func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
- log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
+ logger.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
startloop:
for {
select {
case err, ok := <-consumer.Errors():
if ok {
- sc.updateLiveness(false)
- log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
+ if sc.isLivenessError(err) {
+ sc.updateLiveness(false)
+ }
+ logger.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
} else {
- log.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
+ logger.Warnw("group-consumers-closed-err", log.Fields{"topic": topic.Name})
// channel is closed
break startloop
}
case msg, ok := <-consumer.Messages():
if !ok {
- log.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
+ logger.Warnw("group-consumers-closed-msg", log.Fields{"topic": topic.Name})
// Channel closed
break startloop
}
sc.updateLiveness(true)
- log.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
+ logger.Debugw("message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
msgBody := msg.Value
icm := &ic.InterContainerMessage{}
if err := proto.Unmarshal(msgBody, icm); err != nil {
- log.Warnw("invalid-message", log.Fields{"error": err})
+ logger.Warnw("invalid-message", log.Fields{"error": err})
continue
}
go sc.dispatchToConsumers(consumerChnls, icm)
consumer.MarkOffset(msg, "")
case ntf := <-consumer.Notifications():
- log.Debugw("group-received-notification", log.Fields{"notification": ntf})
+ logger.Debugw("group-received-notification", log.Fields{"notification": ntf})
case <-sc.doneCh:
- log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
+ logger.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
break startloop
}
}
- log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
+ logger.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
+ sc.setUnhealthy()
}
func (sc *SaramaClient) startConsumers(topic *Topic) error {
- log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
+ logger.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
var consumerCh *consumerChannels
if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
- log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
+ logger.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
return errors.New("consumers-not-exist")
}
// For each consumer listening for that topic, start a consumption loop
@@ -943,7 +1018,7 @@
} else if gConsumer, ok := consumer.(*scc.Consumer); ok {
go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
} else {
- log.Errorw("invalid-consumer", log.Fields{"topic": topic})
+ logger.Errorw("invalid-consumer", log.Fields{"topic": topic})
return errors.New("invalid-consumer")
}
}
@@ -957,7 +1032,7 @@
var err error
if pConsumers, err = sc.createPartitionConsumers(topic, initialOffset); err != nil {
- log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
+ logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
@@ -978,7 +1053,13 @@
sc.addTopicToConsumerChannelMap(topic.Name, cc)
//Start a consumers to listen on that specific topic
- go sc.startConsumers(topic)
+ go func() {
+ if err := sc.startConsumers(topic); err != nil {
+ logger.Errorw("start-consumers-failed", log.Fields{
+ "topic": topic,
+ "error": err})
+ }
+ }()
return consumerListeningChannel, nil
}
@@ -990,7 +1071,7 @@
var pConsumer *scc.Consumer
var err error
if pConsumer, err = sc.createGroupConsumer(topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
- log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
+ logger.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
@@ -1005,16 +1086,22 @@
sc.addTopicToConsumerChannelMap(topic.Name, cc)
//Start a consumers to listen on that specific topic
- go sc.startConsumers(topic)
+ go func() {
+ if err := sc.startConsumers(topic); err != nil {
+ logger.Errorw("start-consumers-failed", log.Fields{
+ "topic": topic,
+ "error": err})
+ }
+ }()
return consumerListeningChannel, nil
}
func (sc *SaramaClient) createPartitionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
- log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
+ logger.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
partitionList, err := sc.consumer.Partitions(topic.Name)
if err != nil {
- log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
+ logger.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
@@ -1022,7 +1109,7 @@
for _, partition := range partitionList {
var pConsumer sarama.PartitionConsumer
if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
- log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
+ logger.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
pConsumers = append(pConsumers, pConsumer)
@@ -1037,7 +1124,7 @@
if channel == ch {
channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
close(channel)
- log.Debug("channel-closed")
+ logger.Debug("channel-closed")
return channels[:len(channels)-1]
}
}
@@ -1059,7 +1146,7 @@
consumer := sc.groupConsumers[topic]
delete(sc.groupConsumers, topic)
if err := consumer.Close(); err != nil {
- log.Errorw("failure-closing-consumer", log.Fields{"error": err})
+ logger.Errorw("failure-closing-consumer", log.Fields{"error": err})
return err
}
}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/utils.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/utils.go
similarity index 79%
rename from vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/utils.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/utils.go
index 0cb9535..bdc615f 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/kafka/utils.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/utils.go
@@ -15,7 +15,10 @@
*/
package kafka
-import "strings"
+import (
+ "github.com/golang/protobuf/ptypes/any"
+ "strings"
+)
const (
TopicSeparator = "_"
@@ -36,6 +39,31 @@
Value interface{}
}
+type RpcMType int
+
+const (
+ RpcFormattingError RpcMType = iota
+ RpcSent
+ RpcReply
+ RpcTimeout
+ RpcTransportError
+ RpcSystemClosing
+)
+
+type RpcResponse struct {
+ MType RpcMType
+ Err error
+ Reply *any.Any
+}
+
+func NewResponse(messageType RpcMType, err error, body *any.Any) *RpcResponse {
+ return &RpcResponse{
+ MType: messageType,
+ Err: err,
+ Reply: body,
+ }
+}
+
// TODO: Remove and provide better may to get the device id
// GetDeviceIdFromTopic extract the deviceId from the topic name. The topic name is formatted either as:
// <any string> or <any string>_<deviceId>. The device Id is 24 characters long.
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/log/log.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
similarity index 86%
rename from vendor/github.com/opencord/voltha-lib-go/v2/pkg/log/log.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
index fe3a4e0..d0169bd 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/log/log.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/log/log.go
@@ -50,17 +50,17 @@
"strings"
)
+type LogLevel int8
+
const (
// DebugLevel logs a message at debug level
- DebugLevel = iota
+ DebugLevel = LogLevel(iota)
// InfoLevel logs a message at info level
InfoLevel
// WarnLevel logs a message at warning level
WarnLevel
// ErrorLevel logs a message at error level
ErrorLevel
- // PanicLevel logs a message, then panics.
- PanicLevel
// FatalLevel logs a message, then calls os.Exit(1).
FatalLevel
)
@@ -108,7 +108,10 @@
Warningf(string, ...interface{})
// V reports whether verbosity level l is at least the requested verbose level.
- V(l int) bool
+ V(l LogLevel) bool
+
+ //Returns the log level of this specific logger
+ GetLogLevel() LogLevel
}
// Fields is used as key-value pairs for structured logging
@@ -121,11 +124,12 @@
var cfgs map[string]zp.Config
type logger struct {
- log *zp.SugaredLogger
- parent *zp.Logger
+ log *zp.SugaredLogger
+ parent *zp.Logger
+ packageName string
}
-func intToAtomicLevel(l int) zp.AtomicLevel {
+func logLevelToAtomicLevel(l LogLevel) zp.AtomicLevel {
switch l {
case DebugLevel:
return zp.NewAtomicLevelAt(zc.DebugLevel)
@@ -135,15 +139,13 @@
return zp.NewAtomicLevelAt(zc.WarnLevel)
case ErrorLevel:
return zp.NewAtomicLevelAt(zc.ErrorLevel)
- case PanicLevel:
- return zp.NewAtomicLevelAt(zc.PanicLevel)
case FatalLevel:
return zp.NewAtomicLevelAt(zc.FatalLevel)
}
return zp.NewAtomicLevelAt(zc.ErrorLevel)
}
-func intToLevel(l int) zc.Level {
+func logLevelToLevel(l LogLevel) zc.Level {
switch l {
case DebugLevel:
return zc.DebugLevel
@@ -153,15 +155,13 @@
return zc.WarnLevel
case ErrorLevel:
return zc.ErrorLevel
- case PanicLevel:
- return zc.PanicLevel
case FatalLevel:
return zc.FatalLevel
}
return zc.ErrorLevel
}
-func levelToInt(l zc.Level) int {
+func levelToLogLevel(l zc.Level) LogLevel {
switch l {
case zc.DebugLevel:
return DebugLevel
@@ -171,17 +171,47 @@
return WarnLevel
case zc.ErrorLevel:
return ErrorLevel
- case zc.PanicLevel:
- return PanicLevel
- case FatalLevel:
+ case zc.FatalLevel:
return FatalLevel
}
return ErrorLevel
}
-func getDefaultConfig(outputType string, level int, defaultFields Fields) zp.Config {
+func StringToLogLevel(l string) (LogLevel, error) {
+ switch strings.ToUpper(l) {
+ case "DEBUG":
+ return DebugLevel, nil
+ case "INFO":
+ return InfoLevel, nil
+ case "WARN":
+ return WarnLevel, nil
+ case "ERROR":
+ return ErrorLevel, nil
+ case "FATAL":
+ return FatalLevel, nil
+ }
+ return 0, errors.New("Given LogLevel is invalid : " + l)
+}
+
+func LogLevelToString(l LogLevel) (string, error) {
+ switch l {
+ case DebugLevel:
+ return "DEBUG", nil
+ case InfoLevel:
+ return "INFO", nil
+ case WarnLevel:
+ return "WARN", nil
+ case ErrorLevel:
+ return "ERROR", nil
+ case FatalLevel:
+ return "FATAL", nil
+ }
+ return "", errors.New("Given LogLevel is invalid " + string(l))
+}
+
+func getDefaultConfig(outputType string, level LogLevel, defaultFields Fields) zp.Config {
return zp.Config{
- Level: intToAtomicLevel(level),
+ Level: logLevelToAtomicLevel(level),
Encoding: outputType,
Development: true,
OutputPaths: []string{"stdout"},
@@ -191,6 +221,7 @@
LevelKey: "level",
MessageKey: "msg",
TimeKey: "ts",
+ CallerKey: "caller",
StacktraceKey: "stacktrace",
LineEnding: zc.DefaultLineEnding,
EncodeLevel: zc.LowercaseLevelEncoder,
@@ -203,11 +234,11 @@
// SetLogger needs to be invoked before the logger API can be invoked. This function
// initialize the default logger (zap's sugaredlogger)
-func SetDefaultLogger(outputType string, level int, defaultFields Fields) (Logger, error) {
+func SetDefaultLogger(outputType string, level LogLevel, defaultFields Fields) (Logger, error) {
// Build a custom config using zap
cfg = getDefaultConfig(outputType, level, defaultFields)
- l, err := cfg.Build()
+ l, err := cfg.Build(zp.AddCallerSkip(1))
if err != nil {
return nil, err
}
@@ -229,7 +260,7 @@
// be available to it, notably log tracing with filename.functionname.linenumber annotation.
//
// pkgNames parameter should be used for testing only as this function detects the caller's package.
-func AddPackage(outputType string, level int, defaultFields Fields, pkgNames ...string) (Logger, error) {
+func AddPackage(outputType string, level LogLevel, defaultFields Fields, pkgNames ...string) (Logger, error) {
if cfgs == nil {
cfgs = make(map[string]zp.Config)
}
@@ -252,14 +283,15 @@
cfgs[pkgName] = getDefaultConfig(outputType, level, defaultFields)
- l, err := cfgs[pkgName].Build()
+ l, err := cfgs[pkgName].Build(zp.AddCallerSkip(1))
if err != nil {
return nil, err
}
loggers[pkgName] = &logger{
- log: l.Sugar(),
- parent: l,
+ log: l.Sugar(),
+ parent: l,
+ packageName: pkgName,
}
return loggers[pkgName], nil
}
@@ -273,15 +305,14 @@
}
cfg.InitialFields[k] = v
}
- l, err := cfg.Build()
+ l, err := cfg.Build(zp.AddCallerSkip(1))
if err != nil {
return err
}
- loggers[pkgName] = &logger{
- log: l.Sugar(),
- parent: l,
- }
+ // Update the existing zap logger instance
+ loggers[pkgName].log = l.Sugar()
+ loggers[pkgName].parent = l
}
return nil
}
@@ -297,19 +328,16 @@
return keys
}
-// UpdateLogger deletes the logger associated with a caller's package and creates a new logger with the
-// defaultFields. If a calling package is holding on to a Logger reference obtained from AddPackage invocation, then
-// that package needs to invoke UpdateLogger if it needs to make changes to the default fields and obtain a new logger
-// reference
-func UpdateLogger(defaultFields Fields) (Logger, error) {
+// UpdateLogger updates the logger associated with a caller's package with supplied defaultFields
+func UpdateLogger(defaultFields Fields) error {
pkgName, _, _, _ := getCallerInfo()
if _, exist := loggers[pkgName]; !exist {
- return nil, errors.New(fmt.Sprintf("package-%s-not-registered", pkgName))
+ return fmt.Errorf("package-%s-not-registered", pkgName)
}
// Build a new logger
if _, exist := cfgs[pkgName]; !exist {
- return nil, errors.New(fmt.Sprintf("config-%s-not-registered", pkgName))
+ return fmt.Errorf("config-%s-not-registered", pkgName)
}
cfg := cfgs[pkgName]
@@ -319,20 +347,19 @@
}
cfg.InitialFields[k] = v
}
- l, err := cfg.Build()
+ l, err := cfg.Build(zp.AddCallerSkip(1))
if err != nil {
- return nil, err
+ return err
}
- // Set the logger
- loggers[pkgName] = &logger{
- log: l.Sugar(),
- parent: l,
- }
- return loggers[pkgName], nil
+ // Update the existing zap logger instance
+ loggers[pkgName].log = l.Sugar()
+ loggers[pkgName].parent = l
+
+ return nil
}
-func setLevel(cfg zp.Config, level int) {
+func setLevel(cfg zp.Config, level LogLevel) {
switch level {
case DebugLevel:
cfg.Level.SetLevel(zc.DebugLevel)
@@ -342,8 +369,6 @@
cfg.Level.SetLevel(zc.WarnLevel)
case ErrorLevel:
cfg.Level.SetLevel(zc.ErrorLevel)
- case PanicLevel:
- cfg.Level.SetLevel(zc.PanicLevel)
case FatalLevel:
cfg.Level.SetLevel(zc.FatalLevel)
default:
@@ -353,7 +378,7 @@
//SetPackageLogLevel dynamically sets the log level of a given package to level. This is typically invoked at an
// application level during debugging
-func SetPackageLogLevel(packageName string, level int) {
+func SetPackageLogLevel(packageName string, level LogLevel) {
// Get proper config
if cfg, ok := cfgs[packageName]; ok {
setLevel(cfg, level)
@@ -361,7 +386,7 @@
}
//SetAllLogLevel sets the log level of all registered packages to level
-func SetAllLogLevel(level int) {
+func SetAllLogLevel(level LogLevel) {
// Get proper config
for _, cfg := range cfgs {
setLevel(cfg, level)
@@ -369,7 +394,7 @@
}
//GetPackageLogLevel returns the current log level of a package.
-func GetPackageLogLevel(packageName ...string) (int, error) {
+func GetPackageLogLevel(packageName ...string) (LogLevel, error) {
var name string
if len(packageName) == 1 {
name = packageName[0]
@@ -377,21 +402,21 @@
name, _, _, _ = getCallerInfo()
}
if cfg, ok := cfgs[name]; ok {
- return levelToInt(cfg.Level.Level()), nil
+ return levelToLogLevel(cfg.Level.Level()), nil
}
- return 0, errors.New(fmt.Sprintf("unknown-package-%s", name))
+ return 0, fmt.Errorf("unknown-package-%s", name)
}
//GetDefaultLogLevel gets the log level used for packages that don't have specific loggers
-func GetDefaultLogLevel() int {
- return levelToInt(cfg.Level.Level())
+func GetDefaultLogLevel() LogLevel {
+ return levelToLogLevel(cfg.Level.Level())
}
//SetLogLevel sets the log level for the logger corresponding to the caller's package
-func SetLogLevel(level int) error {
+func SetLogLevel(level LogLevel) error {
pkgName, _, _, _ := getCallerInfo()
if _, exist := cfgs[pkgName]; !exist {
- return errors.New(fmt.Sprintf("unregistered-package-%s", pkgName))
+ return fmt.Errorf("unregistered-package-%s", pkgName)
}
cfg := cfgs[pkgName]
setLevel(cfg, level)
@@ -399,7 +424,7 @@
}
//SetDefaultLogLevel sets the log level used for packages that don't have specific loggers
-func SetDefaultLogLevel(level int) {
+func SetDefaultLogLevel(level LogLevel) {
setLevel(cfg, level)
}
@@ -474,11 +499,11 @@
}
func getPackageLevelSugaredLogger() *zp.SugaredLogger {
- pkgName, fileName, funcName, line := getCallerInfo()
+ pkgName, _, _, _ := getCallerInfo()
if _, exist := loggers[pkgName]; exist {
- return loggers[pkgName].log.With("caller", fmt.Sprintf("%s.%s:%d", fileName, funcName, line))
+ return loggers[pkgName].log
}
- return defaultLogger.log.With("caller", fmt.Sprintf("%s.%s:%d", fileName, funcName, line))
+ return defaultLogger.log
}
func getPackageLevelLogger() Logger {
@@ -628,8 +653,13 @@
}
// V reports whether verbosity level l is at least the requested verbose level.
-func (l logger) V(level int) bool {
- return l.parent.Core().Enabled(intToLevel(level))
+func (l logger) V(level LogLevel) bool {
+ return l.parent.Core().Enabled(logLevelToLevel(level))
+}
+
+// GetLogLevel returns the current level of the logger
+func (l logger) GetLogLevel() LogLevel {
+ return levelToLogLevel(cfgs[l.packageName].Level.Level())
}
// With returns a logger initialized with the key-value pairs
@@ -758,6 +788,11 @@
}
// V reports whether verbosity level l is at least the requested verbose level.
-func V(level int) bool {
+func V(level LogLevel) bool {
return getPackageLevelLogger().V(level)
}
+
+//GetLogLevel returns the log level of the invoking package
+func GetLogLevel() LogLevel {
+ return getPackageLevelLogger().GetLogLevel()
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/probe/common.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/probe/common.go
new file mode 100644
index 0000000..211419d
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/probe/common.go
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package probe
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.Logger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "probe"})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/probe/probe.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/probe/probe.go
similarity index 90%
rename from vendor/github.com/opencord/voltha-lib-go/v2/pkg/probe/probe.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/probe/probe.go
index 7e6dbf9..e89d5bc 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/probe/probe.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/probe/probe.go
@@ -18,7 +18,7 @@
import (
"context"
"fmt"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
"net/http"
"sync"
)
@@ -118,7 +118,7 @@
for _, name := range names {
if _, ok := p.status[name]; !ok {
p.status[name] = ServiceStatusUnknown
- log.Debugw("probe-service-registered", log.Fields{"service-name": name})
+ logger.Debugw("probe-service-registered", log.Fields{"service-name": name})
}
}
@@ -161,7 +161,7 @@
} else {
p.isHealthy = defaultHealthFunc(p.status)
}
- log.Debugw("probe-service-status-updated",
+ logger.Debugw("probe-service-status-updated",
log.Fields{
"service-name": name,
"status": status.String(),
@@ -231,15 +231,26 @@
p.mutex.RLock()
defer p.mutex.RUnlock()
w.Header().Set("Content-Type", "application/json")
- w.Write([]byte("{"))
+ if _, err := w.Write([]byte("{")); err != nil {
+ logger.Errorw("write-response", log.Fields{"error": err})
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
comma := ""
for c, s := range p.status {
- w.Write([]byte(fmt.Sprintf("%s\"%s\": \"%s\"", comma, c, s.String())))
+ if _, err := w.Write([]byte(fmt.Sprintf("%s\"%s\": \"%s\"", comma, c, s.String()))); err != nil {
+ logger.Errorw("write-response", log.Fields{"error": err})
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
comma = ", "
}
- w.Write([]byte("}"))
+ if _, err := w.Write([]byte("}")); err != nil {
+ logger.Errorw("write-response", log.Fields{"error": err})
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
w.WriteHeader(http.StatusOK)
-
}
// ListenAndServe implements 3 HTTP endpoints on the given port for healthz, readz, and detailz. Returns only on error
@@ -258,7 +269,7 @@
Addr: address,
Handler: mux,
}
- log.Fatal(s.ListenAndServe())
+ logger.Fatal(s.ListenAndServe())
}
func (p *Probe) IsReady() bool {
diff --git a/vendor/github.com/opencord/voltha-lib-go/v2/pkg/version/version.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/version/version.go
similarity index 100%
rename from vendor/github.com/opencord/voltha-lib-go/v2/pkg/version/version.go
rename to vendor/github.com/opencord/voltha-lib-go/v3/pkg/version/version.go