VOL-2920 - Remove NBI passthrough functions.
Modified the NBIHandler to reference device, logical device, and adapter managers as embedded types, allowing the managers to directly implement API functions, without the need for individual passthrough functions.
Also created a new event.Manager type, which is embedded in device.LogicalManager.
Also renamed device.NewDeviceManagers() to device.NewManagers().
Change-Id: I8455da79b991ee67cc16cf898b00b0c98ea97bcd
diff --git a/rw_core/core/adapter/manager.go b/rw_core/core/adapter/manager.go
index 08c0e04..11752e1 100644
--- a/rw_core/core/adapter/manager.go
+++ b/rw_core/core/adapter/manager.go
@@ -19,18 +19,18 @@
import (
"context"
"fmt"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
"sync"
"time"
- "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
-
"github.com/gogo/protobuf/proto"
+ "github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-lib-go/v3/pkg/probe"
"github.com/opencord/voltha-protos/v3/go/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
// Manager represents adapter manager attributes
@@ -197,7 +197,8 @@
return nil
}
-func (aMgr *Manager) ListAdapters(ctx context.Context) (*voltha.Adapters, error) {
+// ListAdapters returns the contents of all adapters known to the system
+func (aMgr *Manager) ListAdapters(_ context.Context, _ *empty.Empty) (*voltha.Adapters, error) {
result := &voltha.Adapters{Items: []*voltha.Adapter{}}
aMgr.lockAdaptersMap.RLock()
defer aMgr.lockAdaptersMap.RUnlock()
@@ -266,30 +267,31 @@
return adapterAgent.adapter.Type, nil
}
}
- return "", fmt.Errorf("Adapter-not-registered-for-device-type %s", deviceType)
+ return "", fmt.Errorf("adapter-not-registered-for-device-type %s", deviceType)
}
-func (aMgr *Manager) ListDeviceTypes() []*voltha.DeviceType {
+// ListDeviceTypes returns all the device types known to the system
+func (aMgr *Manager) ListDeviceTypes(_ context.Context, _ *empty.Empty) (*voltha.DeviceTypes, error) {
+ logger.Debug("ListDeviceTypes")
aMgr.lockdDeviceTypeToAdapterMap.Lock()
defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
deviceTypes := make([]*voltha.DeviceType, 0, len(aMgr.deviceTypes))
-
for _, deviceType := range aMgr.deviceTypes {
deviceTypes = append(deviceTypes, deviceType)
}
-
- return deviceTypes
+ return &voltha.DeviceTypes{Items: deviceTypes}, nil
}
-// getDeviceType returns the device type proto definition given the name of the device type
-func (aMgr *Manager) GetDeviceType(deviceType string) *voltha.DeviceType {
+// GetDeviceType returns the device type proto definition given the name of the device type
+func (aMgr *Manager) GetDeviceType(_ context.Context, deviceType *voltha.ID) (*voltha.DeviceType, error) {
+ logger.Debugw("GetDeviceType", log.Fields{"typeid": deviceType.Id})
aMgr.lockdDeviceTypeToAdapterMap.Lock()
defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
- if deviceType, exist := aMgr.deviceTypes[deviceType]; exist {
- return deviceType
+ dType, exist := aMgr.deviceTypes[deviceType.Id]
+ if !exist {
+ return nil, status.Errorf(codes.NotFound, "device_type-%s", deviceType.Id)
}
-
- return nil
+ return dType, nil
}
diff --git a/rw_core/core/api/adapter_request_handler.go b/rw_core/core/api/adapter_request_handler.go
index 7175a2b..7c03618 100644
--- a/rw_core/core/api/adapter_request_handler.go
+++ b/rw_core/core/api/adapter_request_handler.go
@@ -115,10 +115,10 @@
}
}
}
- logger.Debugw("GetDevice", log.Fields{"deviceID": pID.Id, "transactionID": transactionID.Val})
+ logger.Debugw("getDevice", log.Fields{"deviceID": pID.Id, "transactionID": transactionID.Val})
// Get the device via the device manager
- device, err := rhp.deviceMgr.GetDevice(context.TODO(), pID.Id)
+ device, err := rhp.deviceMgr.GetDevice(context.TODO(), pID)
if err != nil {
logger.Debugw("get-device-failed", log.Fields{"deviceID": pID.Id, "error": err})
}
diff --git a/rw_core/core/api/grpc_nbi_handler.go b/rw_core/core/api/grpc_nbi_handler.go
index 3e06cf7..55117d2 100755
--- a/rw_core/core/api/grpc_nbi_handler.go
+++ b/rw_core/core/api/grpc_nbi_handler.go
@@ -18,795 +18,97 @@
import (
"context"
- "encoding/hex"
"encoding/json"
"errors"
"github.com/golang/protobuf/ptypes/empty"
- da "github.com/opencord/voltha-go/common/core/northbound/grpc"
"github.com/opencord/voltha-go/rw_core/core/adapter"
"github.com/opencord/voltha-go/rw_core/core/device"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-lib-go/v3/pkg/version"
"github.com/opencord/voltha-protos/v3/go/common"
"github.com/opencord/voltha-protos/v3/go/omci"
- "github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
- "io"
- "sync"
)
-// Image related constants
-const (
- ImageDownload = iota
- CancelImageDownload = iota
- ActivateImage = iota
- RevertImage = iota
-)
-
-// NBIHandler represent attributes of API handler
+// NBIHandler combines the partial API implementations in various components into a complete voltha implementation
type NBIHandler struct {
- deviceMgr *device.Manager
- logicalDeviceMgr *device.LogicalManager
- adapterMgr *adapter.Manager
- packetInQueue chan openflow_13.PacketIn
- changeEventQueue chan openflow_13.ChangeEvent
- packetInQueueDone chan bool
- changeEventQueueDone chan bool
- da.DefaultAPIHandler
+ *device.Manager
+ *device.LogicalManager
+ adapterManager // *adapter.Manager
}
-// NewAPIHandler creates API handler instance
-func NewAPIHandler(deviceMgr *device.Manager, logicalDeviceMgr *device.LogicalManager, adapterMgr *adapter.Manager) *NBIHandler {
+// avoid having multiple embedded types with the same name (`<package>.Manager`s conflict)
+type adapterManager struct{ *adapter.Manager }
+
+// NewNBIHandler creates API handler instance
+func NewNBIHandler(deviceMgr *device.Manager, logicalDeviceMgr *device.LogicalManager, adapterMgr *adapter.Manager) *NBIHandler {
return &NBIHandler{
- deviceMgr: deviceMgr,
- logicalDeviceMgr: logicalDeviceMgr,
- adapterMgr: adapterMgr,
- packetInQueue: make(chan openflow_13.PacketIn, 100),
- changeEventQueue: make(chan openflow_13.ChangeEvent, 100),
- packetInQueueDone: make(chan bool, 1),
- changeEventQueueDone: make(chan bool, 1),
+ Manager: deviceMgr,
+ LogicalManager: logicalDeviceMgr,
+ adapterManager: adapterManager{adapterMgr},
}
}
-// waitForNilResponseOnSuccess is a helper function to wait for a response on channel monitorCh where an nil
-// response is expected in a successful scenario
-func waitForNilResponseOnSuccess(ctx context.Context, ch chan interface{}) (*empty.Empty, error) {
- select {
- case res := <-ch:
- if res == nil {
- return &empty.Empty{}, nil
- } else if err, ok := res.(error); ok {
- return &empty.Empty{}, err
- } else {
- logger.Warnw("unexpected-return-type", log.Fields{"result": res})
- err = status.Errorf(codes.Internal, "%s", res)
- return &empty.Empty{}, err
- }
- case <-ctx.Done():
- logger.Debug("client-timeout")
- return nil, ctx.Err()
- }
-}
-
-// ListCoreInstances returns details on the running core containers
-func (handler *NBIHandler) ListCoreInstances(ctx context.Context, empty *empty.Empty) (*voltha.CoreInstances, error) {
- logger.Debug("ListCoreInstances")
- // TODO: unused stub
- return &voltha.CoreInstances{}, status.Errorf(codes.NotFound, "no-core-instances")
-}
-
-// GetCoreInstance returns the details of a specific core container
-func (handler *NBIHandler) GetCoreInstance(ctx context.Context, id *voltha.ID) (*voltha.CoreInstance, error) {
- logger.Debugw("GetCoreInstance", log.Fields{"id": id})
- //TODO: unused stub
- return &voltha.CoreInstance{}, status.Errorf(codes.NotFound, "core-instance-%s", id.Id)
-}
-
-// GetLogicalDevicePort returns logical device port details
-func (handler *NBIHandler) GetLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
- logger.Debugw("GetLogicalDevicePort-request", log.Fields{"id": *id})
-
- return handler.logicalDeviceMgr.GetLogicalPort(ctx, id)
-}
-
-// EnableLogicalDevicePort enables logical device port
-func (handler *NBIHandler) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
- logger.Debugw("EnableLogicalDevicePort-request", log.Fields{"id": id, "test": common.TestModeKeys_api_test.String()})
-
- ch := make(chan interface{})
- defer close(ch)
- go handler.logicalDeviceMgr.EnableLogicalPort(ctx, id, ch)
- return waitForNilResponseOnSuccess(ctx, ch)
-}
-
-// DisableLogicalDevicePort disables logical device port
-func (handler *NBIHandler) DisableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
- logger.Debugw("DisableLogicalDevicePort-request", log.Fields{"id": id, "test": common.TestModeKeys_api_test.String()})
-
- ch := make(chan interface{})
- defer close(ch)
- go handler.logicalDeviceMgr.DisableLogicalPort(ctx, id, ch)
- return waitForNilResponseOnSuccess(ctx, ch)
-}
-
-// UpdateLogicalDeviceFlowTable updates logical device flow table
-func (handler *NBIHandler) UpdateLogicalDeviceFlowTable(ctx context.Context, flow *openflow_13.FlowTableUpdate) (*empty.Empty, error) {
- logger.Debugw("UpdateLogicalDeviceFlowTable-request", log.Fields{"flow": flow, "test": common.TestModeKeys_api_test.String()})
-
- ch := make(chan interface{})
- defer close(ch)
- go handler.logicalDeviceMgr.UpdateFlowTable(ctx, flow.Id, flow.FlowMod, ch)
- return waitForNilResponseOnSuccess(ctx, ch)
-}
-
-// UpdateLogicalDeviceFlowGroupTable updates logical device flow group table
-func (handler *NBIHandler) UpdateLogicalDeviceFlowGroupTable(ctx context.Context, flow *openflow_13.FlowGroupTableUpdate) (*empty.Empty, error) {
- logger.Debugw("UpdateLogicalDeviceFlowGroupTable-request", log.Fields{"flow": flow, "test": common.TestModeKeys_api_test.String()})
- ch := make(chan interface{})
- defer close(ch)
- go handler.logicalDeviceMgr.UpdateGroupTable(ctx, flow.Id, flow.GroupMod, ch)
- return waitForNilResponseOnSuccess(ctx, ch)
-}
-
-// GetDevice must be implemented in the read-only containers - should it also be implemented here?
-func (handler *NBIHandler) GetDevice(ctx context.Context, id *voltha.ID) (*voltha.Device, error) {
- logger.Debugw("GetDevice-request", log.Fields{"id": id})
- return handler.deviceMgr.GetDevice(ctx, id.Id)
-}
-
-// GetDevice must be implemented in the read-only containers - should it also be implemented here?
-
-// ListDevices retrieves the latest devices from the data model
-func (handler *NBIHandler) ListDevices(ctx context.Context, empty *empty.Empty) (*voltha.Devices, error) {
- logger.Debug("ListDevices")
- devices, err := handler.deviceMgr.ListDevices(ctx)
- if err != nil {
- logger.Errorw("Failed to list devices", log.Fields{"error": err})
- return nil, err
- }
- return devices, nil
-}
-
-// ListDeviceIds returns the list of device ids managed by a voltha core
-func (handler *NBIHandler) ListDeviceIds(ctx context.Context, empty *empty.Empty) (*voltha.IDs, error) {
- logger.Debug("ListDeviceIDs")
- return handler.deviceMgr.ListDeviceIds()
-}
-
-//ReconcileDevices is a request to a voltha core to managed a list of devices based on their IDs
-func (handler *NBIHandler) ReconcileDevices(ctx context.Context, ids *voltha.IDs) (*empty.Empty, error) {
- logger.Debug("ReconcileDevices")
-
- ch := make(chan interface{})
- defer close(ch)
- go handler.deviceMgr.ReconcileDevices(ctx, ids, ch)
- return waitForNilResponseOnSuccess(ctx, ch)
-}
-
-// GetLogicalDevice provides a cloned most up to date logical device
-func (handler *NBIHandler) GetLogicalDevice(ctx context.Context, id *voltha.ID) (*voltha.LogicalDevice, error) {
- logger.Debugw("GetLogicalDevice-request", log.Fields{"id": id})
- return handler.logicalDeviceMgr.GetLogicalDevice(ctx, id.Id)
-}
-
-// ListLogicalDevices returns the list of all logical devices
-func (handler *NBIHandler) ListLogicalDevices(ctx context.Context, empty *empty.Empty) (*voltha.LogicalDevices, error) {
- logger.Debug("ListLogicalDevices-request")
- return handler.logicalDeviceMgr.ListLogicalDevices(ctx)
-}
-
-// ListAdapters returns the contents of all adapters known to the system
-func (handler *NBIHandler) ListAdapters(ctx context.Context, empty *empty.Empty) (*voltha.Adapters, error) {
- logger.Debug("ListAdapters")
- return handler.adapterMgr.ListAdapters(ctx)
-}
-
-// ListLogicalDeviceFlows returns the flows of logical device
-func (handler *NBIHandler) ListLogicalDeviceFlows(ctx context.Context, id *voltha.ID) (*openflow_13.Flows, error) {
- logger.Debugw("ListLogicalDeviceFlows", log.Fields{"id": *id})
- return handler.logicalDeviceMgr.ListLogicalDeviceFlows(ctx, id.Id)
-}
-
-// ListLogicalDeviceFlowGroups returns logical device flow groups
-func (handler *NBIHandler) ListLogicalDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*openflow_13.FlowGroups, error) {
- logger.Debugw("ListLogicalDeviceFlowGroups", log.Fields{"id": *id})
- return handler.logicalDeviceMgr.ListLogicalDeviceFlowGroups(ctx, id.Id)
-}
-
-// ListLogicalDevicePorts returns ports of logical device
-func (handler *NBIHandler) ListLogicalDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.LogicalPorts, error) {
- logger.Debugw("ListLogicalDevicePorts", log.Fields{"logicaldeviceid": id})
- return handler.logicalDeviceMgr.ListLogicalDevicePorts(ctx, id.Id)
-}
-
-// CreateDevice creates a new parent device in the data model
-func (handler *NBIHandler) CreateDevice(ctx context.Context, device *voltha.Device) (*voltha.Device, error) {
- if device.MacAddress == "" && device.GetHostAndPort() == "" {
- logger.Errorf("No Device Info Present")
- return &voltha.Device{}, errors.New("no-device-info-present; MAC or HOSTIP&PORT")
- }
- logger.Debugw("create-device", log.Fields{"device": *device})
-
- ch := make(chan interface{})
- defer close(ch)
- go handler.deviceMgr.CreateDevice(ctx, device, ch)
- select {
- case res := <-ch:
- if res != nil {
- if err, ok := res.(error); ok {
- logger.Errorw("create-device-failed", log.Fields{"error": err})
- return nil, err
- }
- if d, ok := res.(*voltha.Device); ok {
- return d, nil
- }
- }
- logger.Warnw("create-device-unexpected-return-type", log.Fields{"result": res})
- err := status.Errorf(codes.Internal, "%s", res)
- return &voltha.Device{}, err
- case <-ctx.Done():
- logger.Debug("createdevice-client-timeout")
- return &voltha.Device{}, ctx.Err()
- }
-}
-
-// EnableDevice activates a device by invoking the adopt_device API on the appropriate adapter
-func (handler *NBIHandler) EnableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
- logger.Debugw("enabledevice", log.Fields{"id": id})
-
- ch := make(chan interface{})
- defer close(ch)
- go handler.deviceMgr.EnableDevice(ctx, id, ch)
- return waitForNilResponseOnSuccess(ctx, ch)
-}
-
-// DisableDevice disables a device along with any child device it may have
-func (handler *NBIHandler) DisableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
- logger.Debugw("disabledevice-request", log.Fields{"id": id})
-
- ch := make(chan interface{})
- defer close(ch)
- go handler.deviceMgr.DisableDevice(ctx, id, ch)
- return waitForNilResponseOnSuccess(ctx, ch)
-}
-
-//RebootDevice invoked the reboot API to the corresponding adapter
-func (handler *NBIHandler) RebootDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
- logger.Debugw("rebootDevice-request", log.Fields{"id": id})
-
- ch := make(chan interface{})
- defer close(ch)
- go handler.deviceMgr.RebootDevice(ctx, id, ch)
- return waitForNilResponseOnSuccess(ctx, ch)
-}
-
-// DeleteDevice removes a device from the data model
-func (handler *NBIHandler) DeleteDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
- logger.Debugw("deletedevice-request", log.Fields{"id": id})
-
- ch := make(chan interface{})
- defer close(ch)
- go handler.deviceMgr.DeleteDevice(ctx, id, ch)
- return waitForNilResponseOnSuccess(ctx, ch)
-}
-
-// ListDevicePorts returns the ports details for a specific device entry
-func (handler *NBIHandler) ListDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.Ports, error) {
- logger.Debugw("listdeviceports-request", log.Fields{"id": id})
- device, err := handler.deviceMgr.GetDevice(ctx, id.Id)
- if err != nil {
- return &voltha.Ports{}, err
- }
- ports := &voltha.Ports{}
- ports.Items = append(ports.Items, device.Ports...)
- return ports, nil
-}
-
-// ListDeviceFlows returns the flow details for a specific device entry
-func (handler *NBIHandler) ListDeviceFlows(ctx context.Context, id *voltha.ID) (*openflow_13.Flows, error) {
- logger.Debugw("listdeviceflows-request", log.Fields{"id": id})
-
- device, err := handler.deviceMgr.GetDevice(ctx, id.Id)
- if err != nil {
- return &openflow_13.Flows{}, err
- }
- flows := &openflow_13.Flows{}
- flows.Items = append(flows.Items, device.Flows.Items...)
- return flows, nil
-}
-
-// ListDeviceFlowGroups returns the flow group details for a specific device entry
-func (handler *NBIHandler) ListDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*voltha.FlowGroups, error) {
- logger.Debugw("ListDeviceFlowGroups", log.Fields{"deviceid": id})
-
- if device, _ := handler.deviceMgr.GetDevice(ctx, id.Id); device != nil {
- return device.GetFlowGroups(), nil
- }
- return &voltha.FlowGroups{}, status.Errorf(codes.NotFound, "device-%s", id.Id)
-}
-
-// ListDeviceGroups returns all the device groups known to the system
-func (handler *NBIHandler) ListDeviceGroups(ctx context.Context, empty *empty.Empty) (*voltha.DeviceGroups, error) {
- logger.Debug("ListDeviceGroups")
- return &voltha.DeviceGroups{}, errors.New("UnImplemented")
-}
-
-// GetDeviceGroup returns a specific device group entry
-func (handler *NBIHandler) GetDeviceGroup(ctx context.Context, id *voltha.ID) (*voltha.DeviceGroup, error) {
- logger.Debug("GetDeviceGroup")
- return &voltha.DeviceGroup{}, errors.New("UnImplemented")
-}
-
-// ListDeviceTypes returns all the device types known to the system
-func (handler *NBIHandler) ListDeviceTypes(ctx context.Context, _ *empty.Empty) (*voltha.DeviceTypes, error) {
- logger.Debug("ListDeviceTypes")
-
- return &voltha.DeviceTypes{Items: handler.adapterMgr.ListDeviceTypes()}, nil
-}
-
-// GetDeviceType returns the device type for a specific device entry
-func (handler *NBIHandler) GetDeviceType(ctx context.Context, id *voltha.ID) (*voltha.DeviceType, error) {
- logger.Debugw("GetDeviceType", log.Fields{"typeid": id})
-
- if deviceType := handler.adapterMgr.GetDeviceType(id.Id); deviceType != nil {
- return deviceType, nil
- }
- return &voltha.DeviceType{}, status.Errorf(codes.NotFound, "device_type-%s", id.Id)
-}
-
-// GetVoltha returns the contents of all components (i.e. devices, logical_devices, ...)
-func (handler *NBIHandler) GetVoltha(ctx context.Context, empty *empty.Empty) (*voltha.Voltha, error) {
-
+// GetVoltha currently just returns version information
+func (handler *NBIHandler) GetVoltha(context.Context, *empty.Empty) (*voltha.Voltha, error) {
logger.Debug("GetVoltha")
/*
* For now, encode all the version information into a JSON object and
* pass that back as "version" so the client can get all the
* information associated with the version. Long term the API should
- * better accomidate this, but for now this will work.
+ * better accommodate this, but for now this will work.
*/
data, err := json.Marshal(&version.VersionInfo)
- info := version.VersionInfo.Version
if err != nil {
logger.Warnf("Unable to encode version information as JSON: %s", err.Error())
- } else {
- info = string(data)
+ return &voltha.Voltha{Version: version.VersionInfo.Version}, nil
}
-
- return &voltha.Voltha{
- Version: info,
- }, nil
+ return &voltha.Voltha{Version: string(data)}, nil
}
-// processImageRequest is a helper method to execute an image download request
-func (handler *NBIHandler) processImageRequest(ctx context.Context, img *voltha.ImageDownload, requestType int) (*common.OperationResp, error) {
- logger.Debugw("processImageDownload", log.Fields{"img": *img, "requestType": requestType})
+var errUnimplemented = errors.New("unimplemented")
- failedresponse := &common.OperationResp{Code: voltha.OperationResp_OPERATION_FAILURE}
-
- ch := make(chan interface{})
- defer close(ch)
- switch requestType {
- case ImageDownload:
- go handler.deviceMgr.DownloadImage(ctx, img, ch)
- case CancelImageDownload:
- go handler.deviceMgr.CancelImageDownload(ctx, img, ch)
- case ActivateImage:
- go handler.deviceMgr.ActivateImage(ctx, img, ch)
- case RevertImage:
- go handler.deviceMgr.RevertImage(ctx, img, ch)
- default:
- logger.Warn("invalid-request-type", log.Fields{"requestType": requestType})
- return failedresponse, status.Errorf(codes.InvalidArgument, "%d", requestType)
- }
- select {
- case res := <-ch:
- if res != nil {
- if err, ok := res.(error); ok {
- return failedresponse, err
- }
- if opResp, ok := res.(*common.OperationResp); ok {
- return opResp, nil
- }
- }
- logger.Warnw("download-image-unexpected-return-type", log.Fields{"result": res})
- return failedresponse, status.Errorf(codes.Internal, "%s", res)
- case <-ctx.Done():
- logger.Debug("downloadImage-client-timeout")
- return &common.OperationResp{}, ctx.Err()
- }
+func (handler *NBIHandler) ListCoreInstances(context.Context, *empty.Empty) (*voltha.CoreInstances, error) {
+ return nil, errUnimplemented
}
-
-// DownloadImage execute an image download request
-func (handler *NBIHandler) DownloadImage(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
- logger.Debugw("DownloadImage-request", log.Fields{"img": *img})
-
- return handler.processImageRequest(ctx, img, ImageDownload)
+func (handler *NBIHandler) GetCoreInstance(context.Context, *voltha.ID) (*voltha.CoreInstance, error) {
+ return nil, errUnimplemented
}
-
-// CancelImageDownload cancels image download request
-func (handler *NBIHandler) CancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
- logger.Debugw("cancelImageDownload-request", log.Fields{"img": *img})
- return handler.processImageRequest(ctx, img, CancelImageDownload)
+func (handler *NBIHandler) ListDeviceGroups(context.Context, *empty.Empty) (*voltha.DeviceGroups, error) {
+ return nil, errUnimplemented
}
-
-// ActivateImageUpdate activates image update request
-func (handler *NBIHandler) ActivateImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
- logger.Debugw("activateImageUpdate-request", log.Fields{"img": *img})
- return handler.processImageRequest(ctx, img, ActivateImage)
+func (handler *NBIHandler) GetDeviceGroup(context.Context, *voltha.ID) (*voltha.DeviceGroup, error) {
+ return nil, errUnimplemented
}
-
-// RevertImageUpdate reverts image update
-func (handler *NBIHandler) RevertImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
- logger.Debugw("revertImageUpdate-request", log.Fields{"img": *img})
- return handler.processImageRequest(ctx, img, RevertImage)
+func (handler *NBIHandler) CreateEventFilter(context.Context, *voltha.EventFilter) (*voltha.EventFilter, error) {
+ return nil, errUnimplemented
}
-
-// GetImageDownloadStatus returns status of image download
-func (handler *NBIHandler) GetImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
- logger.Debugw("getImageDownloadStatus-request", log.Fields{"img": *img})
-
- failedresponse := &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN}
-
- ch := make(chan interface{})
- defer close(ch)
- go handler.deviceMgr.GetImageDownloadStatus(ctx, img, ch)
-
- select {
- case res := <-ch:
- if res != nil {
- if err, ok := res.(error); ok {
- return failedresponse, err
- }
- if downloadResp, ok := res.(*voltha.ImageDownload); ok {
- return downloadResp, nil
- }
- }
- logger.Warnw("download-image-status", log.Fields{"result": res})
- return failedresponse, status.Errorf(codes.Internal, "%s", res)
- case <-ctx.Done():
- logger.Debug("downloadImage-client-timeout")
- return failedresponse, ctx.Err()
- }
+func (handler *NBIHandler) UpdateEventFilter(context.Context, *voltha.EventFilter) (*voltha.EventFilter, error) {
+ return nil, errUnimplemented
}
-
-// GetImageDownload returns image download
-func (handler *NBIHandler) GetImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
- logger.Debugw("GetImageDownload-request", log.Fields{"img": *img})
-
- download, err := handler.deviceMgr.GetImageDownload(ctx, img)
- if err != nil {
- return &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN}, err
- }
- return download, nil
+func (handler *NBIHandler) DeleteEventFilter(context.Context, *voltha.EventFilter) (*empty.Empty, error) {
+ return nil, errUnimplemented
}
-
-// ListImageDownloads returns image downloads
-func (handler *NBIHandler) ListImageDownloads(ctx context.Context, id *voltha.ID) (*voltha.ImageDownloads, error) {
- logger.Debugw("ListImageDownloads-request", log.Fields{"deviceId": id.Id})
-
- downloads, err := handler.deviceMgr.ListImageDownloads(ctx, id.Id)
- if err != nil {
- failedResp := &voltha.ImageDownloads{
- Items: []*voltha.ImageDownload{
- {DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN},
- },
- }
- return failedResp, err
- }
- return downloads, nil
+func (handler *NBIHandler) GetEventFilter(context.Context, *voltha.ID) (*voltha.EventFilters, error) {
+ return nil, errUnimplemented
}
-
-// GetImages returns all images for a specific device entry
-func (handler *NBIHandler) GetImages(ctx context.Context, id *voltha.ID) (*voltha.Images, error) {
- logger.Debugw("GetImages", log.Fields{"deviceid": id.Id})
- device, err := handler.deviceMgr.GetDevice(ctx, id.Id)
- if err != nil {
- return &voltha.Images{}, err
- }
- return device.GetImages(), nil
+func (handler *NBIHandler) ListEventFilters(context.Context, *empty.Empty) (*voltha.EventFilters, error) {
+ return nil, errUnimplemented
}
-
-// UpdateDevicePmConfigs updates the PM configs
-func (handler *NBIHandler) UpdateDevicePmConfigs(ctx context.Context, configs *voltha.PmConfigs) (*empty.Empty, error) {
- logger.Debugw("UpdateDevicePmConfigs-request", log.Fields{"configs": *configs})
-
- ch := make(chan interface{})
- defer close(ch)
- go handler.deviceMgr.UpdatePmConfigs(ctx, configs, ch)
- return waitForNilResponseOnSuccess(ctx, ch)
+func (handler *NBIHandler) SelfTest(context.Context, *voltha.ID) (*voltha.SelfTestResponse, error) {
+ return nil, errUnimplemented
}
-
-// ListDevicePmConfigs returns pm configs of device
-func (handler *NBIHandler) ListDevicePmConfigs(ctx context.Context, id *voltha.ID) (*voltha.PmConfigs, error) {
- logger.Debugw("ListDevicePmConfigs-request", log.Fields{"deviceId": *id})
- return handler.deviceMgr.ListPmConfigs(ctx, id.Id)
+func (handler *NBIHandler) Subscribe(context.Context, *voltha.OfAgentSubscriber) (*voltha.OfAgentSubscriber, error) {
+ return nil, errUnimplemented
}
-
-func (handler *NBIHandler) CreateEventFilter(ctx context.Context, filter *voltha.EventFilter) (*voltha.EventFilter, error) {
- logger.Debugw("CreateEventFilter-request", log.Fields{"filter": *filter})
- return nil, errors.New("UnImplemented")
+func (handler *NBIHandler) GetAlarmDeviceData(context.Context, *common.ID) (*omci.AlarmDeviceData, error) {
+ return nil, errUnimplemented
}
-
-func (handler *NBIHandler) UpdateEventFilter(ctx context.Context, filter *voltha.EventFilter) (*voltha.EventFilter, error) {
- logger.Debugw("UpdateEventFilter-request", log.Fields{"filter": *filter})
- return nil, errors.New("UnImplemented")
+func (handler *NBIHandler) GetMibDeviceData(context.Context, *common.ID) (*omci.MibDeviceData, error) {
+ return nil, errUnimplemented
}
-
-func (handler *NBIHandler) DeleteEventFilter(ctx context.Context, filterInfo *voltha.EventFilter) (*empty.Empty, error) {
- logger.Debugw("DeleteEventFilter-request", log.Fields{"device-id": filterInfo.DeviceId, "filter-id": filterInfo.Id})
- return nil, errors.New("UnImplemented")
-}
-
-// GetEventFilter returns all the filters present for a device
-func (handler *NBIHandler) GetEventFilter(ctx context.Context, id *voltha.ID) (*voltha.EventFilters, error) {
- logger.Debugw("GetEventFilter-request", log.Fields{"device-id": id})
- return nil, errors.New("UnImplemented")
-}
-
-// ListEventFilters returns all the filters known to the system
-func (handler *NBIHandler) ListEventFilters(ctx context.Context, empty *empty.Empty) (*voltha.EventFilters, error) {
- logger.Debug("ListEventFilter-request")
- return nil, errors.New("UnImplemented")
-}
-
-func (handler *NBIHandler) SelfTest(ctx context.Context, id *voltha.ID) (*voltha.SelfTestResponse, error) {
- logger.Debugw("SelfTest-request", log.Fields{"id": id})
- return &voltha.SelfTestResponse{}, errors.New("UnImplemented")
-}
-
-// StreamPacketsOut sends packets to adapter
-func (handler *NBIHandler) StreamPacketsOut(packets voltha.VolthaService_StreamPacketsOutServer) error {
- logger.Debugw("StreamPacketsOut-request", log.Fields{"packets": packets})
-loop:
- for {
- select {
- case <-packets.Context().Done():
- logger.Infow("StreamPacketsOut-context-done", log.Fields{"packets": packets, "error": packets.Context().Err()})
- break loop
- default:
- }
-
- packet, err := packets.Recv()
-
- if err == io.EOF {
- logger.Debugw("Received-EOF", log.Fields{"packets": packets})
- break loop
- }
-
- if err != nil {
- logger.Errorw("Failed to receive packet out", log.Fields{"error": err})
- continue
- }
-
- handler.logicalDeviceMgr.PacketOut(packets.Context(), packet)
- }
-
- logger.Debugw("StreamPacketsOut-request-done", log.Fields{"packets": packets})
- return nil
-}
-
-func (handler *NBIHandler) SendPacketIn(deviceID string, transationID string, packet *openflow_13.OfpPacketIn) {
- // TODO: Augment the OF PacketIn to include the transactionId
- packetIn := openflow_13.PacketIn{Id: deviceID, PacketIn: packet}
- logger.Debugw("SendPacketIn", log.Fields{"packetIn": packetIn})
- handler.packetInQueue <- packetIn
-}
-
-type callTracker struct {
- failedPacket interface{}
-}
-type streamTracker struct {
- calls map[string]*callTracker
- sync.Mutex
-}
-
-var streamingTracker = &streamTracker{calls: make(map[string]*callTracker)}
-
-func (handler *NBIHandler) getStreamingTracker(method string, done chan<- bool) *callTracker {
- streamingTracker.Lock()
- defer streamingTracker.Unlock()
- if _, ok := streamingTracker.calls[method]; ok {
- // bail out the other packet in thread
- logger.Debugf("%s streaming call already running. Exiting it", method)
- done <- true
- logger.Debugf("Last %s exited. Continuing ...", method)
- } else {
- streamingTracker.calls[method] = &callTracker{failedPacket: nil}
- }
- return streamingTracker.calls[method]
-}
-
-func (handler *NBIHandler) flushFailedPackets(tracker *callTracker) error {
- if tracker.failedPacket != nil {
- switch tracker.failedPacket.(type) {
- case openflow_13.PacketIn:
- logger.Debug("Enqueueing last failed packetIn")
- handler.packetInQueue <- tracker.failedPacket.(openflow_13.PacketIn)
- case openflow_13.ChangeEvent:
- logger.Debug("Enqueueing last failed changeEvent")
- handler.changeEventQueue <- tracker.failedPacket.(openflow_13.ChangeEvent)
- }
- }
- return nil
-}
-
-// ReceivePacketsIn receives packets from adapter
-func (handler *NBIHandler) ReceivePacketsIn(empty *empty.Empty, packetsIn voltha.VolthaService_ReceivePacketsInServer) error {
- var streamingTracker = handler.getStreamingTracker("ReceivePacketsIn", handler.packetInQueueDone)
- logger.Debugw("ReceivePacketsIn-request", log.Fields{"packetsIn": packetsIn})
-
- err := handler.flushFailedPackets(streamingTracker)
- if err != nil {
- logger.Errorw("unable-to-flush-failed-packets", log.Fields{"error": err})
- }
-
-loop:
- for {
- select {
- case packet := <-handler.packetInQueue:
- logger.Debugw("sending-packet-in", log.Fields{
- "packet": hex.EncodeToString(packet.PacketIn.Data),
- })
- if err := packetsIn.Send(&packet); err != nil {
- logger.Errorw("failed-to-send-packet", log.Fields{"error": err})
- // save the last failed packet in
- streamingTracker.failedPacket = packet
- } else {
- if streamingTracker.failedPacket != nil {
- // reset last failed packet saved to avoid flush
- streamingTracker.failedPacket = nil
- }
- }
- case <-handler.packetInQueueDone:
- logger.Debug("Another ReceivePacketsIn running. Bailing out ...")
- break loop
- }
- }
-
- //TODO: Find an elegant way to get out of the above loop when the Core is stopped
- return nil
-}
-
-func (handler *NBIHandler) SendChangeEvent(deviceID string, portStatus *openflow_13.OfpPortStatus) {
- // TODO: validate the type of portStatus parameter
- //if _, ok := portStatus.(*openflow_13.OfpPortStatus); ok {
- //}
- event := openflow_13.ChangeEvent{Id: deviceID, Event: &openflow_13.ChangeEvent_PortStatus{PortStatus: portStatus}}
- logger.Debugw("SendChangeEvent", log.Fields{"event": event})
- handler.changeEventQueue <- event
-}
-
-// ReceiveChangeEvents receives change in events
-func (handler *NBIHandler) ReceiveChangeEvents(empty *empty.Empty, changeEvents voltha.VolthaService_ReceiveChangeEventsServer) error {
- var streamingTracker = handler.getStreamingTracker("ReceiveChangeEvents", handler.changeEventQueueDone)
- logger.Debugw("ReceiveChangeEvents-request", log.Fields{"changeEvents": changeEvents})
-
- err := handler.flushFailedPackets(streamingTracker)
- if err != nil {
- logger.Errorw("unable-to-flush-failed-packets", log.Fields{"error": err})
- }
-
-loop:
- for {
- select {
- // Dequeue a change event
- case event := <-handler.changeEventQueue:
- logger.Debugw("sending-change-event", log.Fields{"event": event})
- if err := changeEvents.Send(&event); err != nil {
- logger.Errorw("failed-to-send-change-event", log.Fields{"error": err})
- // save last failed changeevent
- streamingTracker.failedPacket = event
- } else {
- if streamingTracker.failedPacket != nil {
- // reset last failed event saved on success to avoid flushing
- streamingTracker.failedPacket = nil
- }
- }
- case <-handler.changeEventQueueDone:
- logger.Debug("Another ReceiveChangeEvents already running. Bailing out ...")
- break loop
- }
- }
-
- return nil
-}
-
-func (handler *NBIHandler) GetChangeEventsQueueForTest() <-chan openflow_13.ChangeEvent {
- return handler.changeEventQueue
-}
-
-// Subscribe subscribing request of ofagent
-func (handler *NBIHandler) Subscribe(
- ctx context.Context,
- ofAgent *voltha.OfAgentSubscriber,
-) (*voltha.OfAgentSubscriber, error) {
- logger.Debugw("Subscribe-request", log.Fields{"ofAgent": ofAgent})
- return &voltha.OfAgentSubscriber{OfagentId: ofAgent.OfagentId, VolthaId: ofAgent.VolthaId}, nil
-}
-
-// GetAlarmDeviceData @TODO useless stub, what should this actually do?
-func (handler *NBIHandler) GetAlarmDeviceData(ctx context.Context, in *common.ID) (*omci.AlarmDeviceData, error) {
- logger.Debug("GetAlarmDeviceData-stub")
- return &omci.AlarmDeviceData{}, errors.New("UnImplemented")
-}
-
-// ListLogicalDeviceMeters returns logical device meters
-func (handler *NBIHandler) ListLogicalDeviceMeters(ctx context.Context, id *voltha.ID) (*openflow_13.Meters, error) {
-
- logger.Debugw("ListLogicalDeviceMeters", log.Fields{"id": *id})
- return handler.logicalDeviceMgr.ListLogicalDeviceMeters(ctx, id.Id)
-}
-
-// GetMeterStatsOfLogicalDevice @TODO useless stub, what should this actually do?
-func (handler *NBIHandler) GetMeterStatsOfLogicalDevice(ctx context.Context, in *common.ID) (*openflow_13.MeterStatsReply, error) {
- logger.Debug("GetMeterStatsOfLogicalDevice")
- return &openflow_13.MeterStatsReply{}, errors.New("UnImplemented")
-}
-
-// GetMibDeviceData @TODO useless stub, what should this actually do?
-func (handler *NBIHandler) GetMibDeviceData(ctx context.Context, in *common.ID) (*omci.MibDeviceData, error) {
- logger.Debug("GetMibDeviceData")
- return &omci.MibDeviceData{}, errors.New("UnImplemented")
-}
-
-// SimulateAlarm sends simulate alarm request
-func (handler *NBIHandler) SimulateAlarm(
- ctx context.Context,
- in *voltha.SimulateAlarmRequest,
-) (*common.OperationResp, error) {
- logger.Debugw("SimulateAlarm-request", log.Fields{"id": in.Id})
- successResp := &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
- ch := make(chan interface{})
- defer close(ch)
- go handler.deviceMgr.SimulateAlarm(ctx, in, ch)
- return successResp, nil
-}
-
-// UpdateLogicalDeviceMeterTable - This function sends meter mod request to logical device manager and waits for response
-func (handler *NBIHandler) UpdateLogicalDeviceMeterTable(ctx context.Context, meter *openflow_13.MeterModUpdate) (*empty.Empty, error) {
- logger.Debugw("UpdateLogicalDeviceMeterTable-request",
- log.Fields{"meter": meter, "test": common.TestModeKeys_api_test.String()})
- ch := make(chan interface{})
- defer close(ch)
- go handler.logicalDeviceMgr.UpdateMeterTable(ctx, meter.Id, meter.MeterMod, ch)
- return waitForNilResponseOnSuccess(ctx, ch)
-}
-
-// GetMembership returns membership
func (handler *NBIHandler) GetMembership(context.Context, *empty.Empty) (*voltha.Membership, error) {
- return &voltha.Membership{}, errors.New("UnImplemented")
+ return nil, errUnimplemented
}
-
-// UpdateMembership updates membership
func (handler *NBIHandler) UpdateMembership(context.Context, *voltha.Membership) (*empty.Empty, error) {
- return &empty.Empty{}, errors.New("UnImplemented")
-}
-
-func (handler *NBIHandler) EnablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
- logger.Debugw("EnablePort-request", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
- ch := make(chan interface{})
- defer close(ch)
- go handler.deviceMgr.EnablePort(ctx, port, ch)
- return waitForNilResponseOnSuccess(ctx, ch)
-}
-
-func (handler *NBIHandler) DisablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
-
- logger.Debugw("DisablePort-request", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
- ch := make(chan interface{})
- defer close(ch)
- go handler.deviceMgr.DisablePort(ctx, port, ch)
- return waitForNilResponseOnSuccess(ctx, ch)
-}
-
-func (handler *NBIHandler) StartOmciTestAction(ctx context.Context, omcitestrequest *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
- logger.Debugw("Omci_test_Request", log.Fields{"id": omcitestrequest.Id, "uuid": omcitestrequest.Uuid})
- return handler.deviceMgr.StartOmciTest(ctx, omcitestrequest)
-}
-
-func (handler *NBIHandler) GetExtValue(ctx context.Context, valueparam *voltha.ValueSpecifier) (*voltha.ReturnValues, error) {
- log.Debugw("GetExtValue-request", log.Fields{"onu-id": valueparam.Id})
- return handler.deviceMgr.GetExtValue(ctx, valueparam)
+ return nil, errUnimplemented
}
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index 0579f94..e8b651d 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -1,18 +1,19 @@
/*
-* Copyright 2019-present Open Networking Foundation
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
+
package api
import (
@@ -125,7 +126,7 @@
endpointMgr := kafka.NewEndpointManager(backend)
proxy := model.NewProxy(backend, "/")
nb.adapterMgr = adapter.NewAdapterManager(proxy, nb.coreInstanceID, nb.kClient)
- nb.deviceMgr, nb.logicalDeviceMgr = device.NewDeviceManagers(proxy, nb.adapterMgr, nb.kmp, endpointMgr, cfg.CorePairTopic, nb.coreInstanceID, cfg.DefaultCoreTimeout)
+ nb.deviceMgr, nb.logicalDeviceMgr = device.NewManagers(proxy, nb.adapterMgr, nb.kmp, endpointMgr, cfg.CorePairTopic, nb.coreInstanceID, cfg.DefaultCoreTimeout)
if err = nb.adapterMgr.Start(ctx); err != nil {
logger.Fatalf("Cannot start adapterMgr: %s", err)
}
@@ -396,7 +397,7 @@
// Try to create the same device
_, err = nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName, MacAddress: "aa:bb:cc:cc:ee:ee"})
assert.NotNil(t, err)
- assert.Equal(t, "Device is already pre-provisioned", err.Error())
+ assert.Equal(t, "device is already pre-provisioned", err.Error())
// Try to create a device with invalid data
_, err = nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName})
@@ -431,7 +432,7 @@
// Try to enable the oltDevice and check the error message
_, err = nbi.EnableDevice(getContext(), &voltha.ID{Id: oltDeviceNoAdapter.Id})
assert.NotNil(t, err)
- assert.Equal(t, "Adapter-not-registered-for-device-type noAdapterRegistered", err.Error())
+ assert.Equal(t, "adapter-not-registered-for-device-type noAdapterRegistered", err.Error())
//Remove the device
_, err = nbi.DeleteDevice(getContext(), &voltha.ID{Id: oltDeviceNoAdapter.Id})
@@ -833,7 +834,7 @@
// Update the OLT Connection Status to REACHABLE and operation status to ACTIVE
// Normally, in a real adapter this happens after connection regain via a heartbeat mechanism with real hardware
- err = nbi.deviceMgr.UpdateDeviceStatus(getContext(), oltDevice.Id, voltha.OperStatus_ACTIVE, voltha.ConnectStatus_REACHABLE)
+ err = nbi.UpdateDeviceStatus(getContext(), oltDevice.Id, voltha.OperStatus_ACTIVE, voltha.ConnectStatus_REACHABLE)
assert.Nil(t, err)
// Verify the device connection and operation states
@@ -884,7 +885,7 @@
request = &voltha.OmciTestRequest{Id: deviceNoAdapter.Id, Uuid: "456"}
_, err = nbi.StartOmciTestAction(getContext(), request)
assert.NotNil(t, err)
- assert.Equal(t, "Adapter-not-registered-for-device-type noAdapterRegisteredOmciTest", err.Error())
+ assert.Equal(t, "adapter-not-registered-for-device-type noAdapterRegisteredOmciTest", err.Error())
//Remove the device
_, err = nbi.DeleteDevice(getContext(), &voltha.ID{Id: deviceNoAdapter.Id})
@@ -1064,7 +1065,6 @@
func (nb *NBTest) monitorLogicalDevice(t *testing.T, nbi *NBIHandler, numNNIPorts int, numUNIPorts int, wg *sync.WaitGroup) {
defer wg.Done()
- nb.logicalDeviceMgr.SetEventCallbacks(nbi)
// Clear any existing flows on the adapters
nb.oltAdapter.ClearFlows()
@@ -1124,7 +1124,7 @@
processedNniLogicalPorts := 0
processedUniLogicalPorts := 0
- for event := range nbi.changeEventQueue {
+ for event := range nbi.GetChangeEventsQueueForTest() {
startingVlan++
if portStatus, ok := (event.Event).(*ofp.ChangeEvent_PortStatus); ok {
ps := portStatus.PortStatus
@@ -1185,7 +1185,7 @@
nb.startCore(false)
// Set the grpc API interface - no grpc server is running in unit test
- nbi := NewAPIHandler(nb.deviceMgr, nb.logicalDeviceMgr, nb.adapterMgr)
+ nbi := NewNBIHandler(nb.deviceMgr, nb.logicalDeviceMgr, nb.adapterMgr)
// 1. Basic test with no data in Core
nb.testCoreWithoutData(t, nbi)
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index de126a2..7cf9f98 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -122,7 +122,7 @@
logger.Debugw("values", log.Fields{"kmp": core.kmp})
core.adapterMgr = adapter.NewAdapterManager(core.clusterDataProxy, core.instanceID, core.kafkaClient)
- core.deviceMgr, core.logicalDeviceMgr = device.NewDeviceManagers(core.clusterDataProxy, core.adapterMgr, core.kmp, endpointMgr, core.config.CorePairTopic, core.instanceID, core.config.DefaultCoreTimeout)
+ core.deviceMgr, core.logicalDeviceMgr = device.NewManagers(core.clusterDataProxy, core.adapterMgr, core.kmp, endpointMgr, core.config.CorePairTopic, core.instanceID, core.config.DefaultCoreTimeout)
// Start the KafkaManager. This must be done after the deviceMgr, adapterMgr, and
// logicalDeviceMgr have been created, as once the kmp is started, it will register
@@ -172,9 +172,8 @@
core.grpcServer = grpcserver.NewGrpcServer(core.config.GrpcHost, core.config.GrpcPort, nil, false, probe.GetProbeFromContext(ctx))
logger.Info("grpc-server-created")
- core.grpcNBIAPIHandler = api.NewAPIHandler(core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr)
+ core.grpcNBIAPIHandler = api.NewNBIHandler(core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr)
logger.Infow("grpc-handler", log.Fields{"core_binding_key": core.config.CoreBindingKey})
- core.logicalDeviceMgr.SetEventCallbacks(core.grpcNBIAPIHandler)
// Create a function to register the core GRPC service with the GRPC server
f := func(gs *grpc.Server) {
voltha.RegisterVolthaServiceServer(
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 3857a6b..940bf1c 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -389,8 +389,8 @@
defer agent.requestQueue.RequestComplete()
device := agent.getDeviceWithoutLock()
- dType := agent.adapterMgr.GetDeviceType(device.Type)
- if dType == nil {
+ dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+ if err != nil {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
@@ -479,8 +479,8 @@
defer agent.requestQueue.RequestComplete()
device := agent.getDeviceWithoutLock()
- dType := agent.adapterMgr.GetDeviceType(device.Type)
- if dType == nil {
+ dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+ if err != nil {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
@@ -621,8 +621,8 @@
if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
}
- dType := agent.adapterMgr.GetDeviceType(device.Type)
- if dType == nil {
+ dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+ if err != nil {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
@@ -1165,7 +1165,7 @@
func (agent *Agent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
logger.Debugw("getPorts", log.Fields{"device-id": agent.deviceID, "port-type": portType})
ports := &voltha.Ports{}
- if device, _ := agent.deviceMgr.GetDevice(ctx, agent.deviceID); device != nil {
+ if device, _ := agent.deviceMgr.getDevice(ctx, agent.deviceID); device != nil {
for _, port := range device.Ports {
if port.Type == portType {
ports.Items = append(ports.Items, port)
@@ -1503,7 +1503,7 @@
}
}
-func (agent *Agent) simulateAlarm(ctx context.Context, simulatereq *voltha.SimulateAlarmRequest) error {
+func (agent *Agent) simulateAlarm(ctx context.Context, simulateReq *voltha.SimulateAlarmRequest) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
@@ -1513,7 +1513,7 @@
cloned := agent.getDeviceWithoutLock()
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.SimulateAlarm(subCtx, cloned, simulatereq)
+ ch, err := agent.adapterProxy.SimulateAlarm(subCtx, cloned, simulateReq)
if err != nil {
cancel()
return err
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 60b7273..8b003b4 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -1,18 +1,19 @@
/*
-* Copyright 2019-present Open Networking Foundation
-*
-* Licensed under the Apache License, Version 2.0 (the "License");
-* you may not use this file except in compliance with the License.
-* You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
+
package device
import (
@@ -107,11 +108,6 @@
return test
}
-type fakeEventCallbacks struct{}
-
-func (fakeEventCallbacks) SendChangeEvent(_ string, _ *ofp.OfpPortStatus) {}
-func (fakeEventCallbacks) SendPacketIn(_ string, _ string, _ *ofp.OfpPacketIn) {}
-
func (dat *DATest) startCore(inCompeteMode bool) {
cfg := config.NewRWCoreFlags()
cfg.CorePairTopic = "rw_core"
@@ -144,8 +140,7 @@
proxy := model.NewProxy(backend, "/")
adapterMgr := adapter.NewAdapterManager(proxy, dat.coreInstanceID, dat.kClient)
- dat.deviceMgr, dat.logicalDeviceMgr = NewDeviceManagers(proxy, adapterMgr, dat.kmp, endpointMgr, cfg.CorePairTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
- dat.logicalDeviceMgr.SetEventCallbacks(fakeEventCallbacks{})
+ dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, adapterMgr, dat.kmp, endpointMgr, cfg.CorePairTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
if err = dat.kmp.Start(); err != nil {
logger.Fatal("Cannot start InterContainerProxy")
}
diff --git a/rw_core/core/device/event/common.go b/rw_core/core/device/event/common.go
new file mode 100644
index 0000000..ebb1ad3
--- /dev/null
+++ b/rw_core/core/device/event/common.go
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package core Common Logger initialization
+package event
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.Logger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "queue"})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/rw_core/core/device/event/event.go b/rw_core/core/device/event/event.go
new file mode 100644
index 0000000..c205564
--- /dev/null
+++ b/rw_core/core/device/event/event.go
@@ -0,0 +1,172 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package event
+
+import (
+ "encoding/hex"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "sync"
+)
+
+type Manager struct {
+ packetInQueue chan openflow_13.PacketIn
+ packetInQueueDone chan bool
+ changeEventQueue chan openflow_13.ChangeEvent
+ changeEventQueueDone chan bool
+}
+
+func NewManager() *Manager {
+ return &Manager{
+ packetInQueue: make(chan openflow_13.PacketIn, 100),
+ packetInQueueDone: make(chan bool, 1),
+ changeEventQueue: make(chan openflow_13.ChangeEvent, 100),
+ changeEventQueueDone: make(chan bool, 1),
+ }
+}
+
+func (q *Manager) SendPacketIn(deviceID string, transationID string, packet *openflow_13.OfpPacketIn) {
+ // TODO: Augment the OF PacketIn to include the transactionId
+ packetIn := openflow_13.PacketIn{Id: deviceID, PacketIn: packet}
+ logger.Debugw("SendPacketIn", log.Fields{"packetIn": packetIn})
+ q.packetInQueue <- packetIn
+}
+
+type callTracker struct {
+ failedPacket interface{}
+}
+type streamTracker struct {
+ calls map[string]*callTracker
+ sync.Mutex
+}
+
+var streamingTracker = &streamTracker{calls: make(map[string]*callTracker)}
+
+func (q *Manager) getStreamingTracker(method string, done chan<- bool) *callTracker {
+ streamingTracker.Lock()
+ defer streamingTracker.Unlock()
+ if _, ok := streamingTracker.calls[method]; ok {
+ // bail out the other packet in thread
+ logger.Debugf("%s streaming call already running. Exiting it", method)
+ done <- true
+ logger.Debugf("Last %s exited. Continuing ...", method)
+ } else {
+ streamingTracker.calls[method] = &callTracker{failedPacket: nil}
+ }
+ return streamingTracker.calls[method]
+}
+
+func (q *Manager) flushFailedPackets(tracker *callTracker) error {
+ if tracker.failedPacket != nil {
+ switch tracker.failedPacket.(type) {
+ case openflow_13.PacketIn:
+ logger.Debug("Enqueueing last failed packetIn")
+ q.packetInQueue <- tracker.failedPacket.(openflow_13.PacketIn)
+ case openflow_13.ChangeEvent:
+ logger.Debug("Enqueueing last failed changeEvent")
+ q.changeEventQueue <- tracker.failedPacket.(openflow_13.ChangeEvent)
+ }
+ }
+ return nil
+}
+
+// ReceivePacketsIn receives packets from adapter
+func (q *Manager) ReceivePacketsIn(_ *empty.Empty, packetsIn voltha.VolthaService_ReceivePacketsInServer) error {
+ var streamingTracker = q.getStreamingTracker("ReceivePacketsIn", q.packetInQueueDone)
+ logger.Debugw("ReceivePacketsIn-request", log.Fields{"packetsIn": packetsIn})
+
+ err := q.flushFailedPackets(streamingTracker)
+ if err != nil {
+ logger.Errorw("unable-to-flush-failed-packets", log.Fields{"error": err})
+ }
+
+loop:
+ for {
+ select {
+ case packet := <-q.packetInQueue:
+ logger.Debugw("sending-packet-in", log.Fields{
+ "packet": hex.EncodeToString(packet.PacketIn.Data),
+ })
+ if err := packetsIn.Send(&packet); err != nil {
+ logger.Errorw("failed-to-send-packet", log.Fields{"error": err})
+ // save the last failed packet in
+ streamingTracker.failedPacket = packet
+ } else {
+ if streamingTracker.failedPacket != nil {
+ // reset last failed packet saved to avoid flush
+ streamingTracker.failedPacket = nil
+ }
+ }
+ case <-q.packetInQueueDone:
+ logger.Debug("Another ReceivePacketsIn running. Bailing out ...")
+ break loop
+ }
+ }
+
+ //TODO: Find an elegant way to get out of the above loop when the Core is stopped
+ return nil
+}
+
+func (q *Manager) SendChangeEvent(deviceID string, portStatus *openflow_13.OfpPortStatus) {
+ // TODO: validate the type of portStatus parameter
+ //if _, ok := portStatus.(*openflow_13.OfpPortStatus); ok {
+ //}
+ event := openflow_13.ChangeEvent{Id: deviceID, Event: &openflow_13.ChangeEvent_PortStatus{PortStatus: portStatus}}
+ logger.Debugw("SendChangeEvent", log.Fields{"event": event})
+ q.changeEventQueue <- event
+}
+
+// ReceiveChangeEvents receives change in events
+func (q *Manager) ReceiveChangeEvents(_ *empty.Empty, changeEvents voltha.VolthaService_ReceiveChangeEventsServer) error {
+ var streamingTracker = q.getStreamingTracker("ReceiveChangeEvents", q.changeEventQueueDone)
+ logger.Debugw("ReceiveChangeEvents-request", log.Fields{"changeEvents": changeEvents})
+
+ err := q.flushFailedPackets(streamingTracker)
+ if err != nil {
+ logger.Errorw("unable-to-flush-failed-packets", log.Fields{"error": err})
+ }
+
+loop:
+ for {
+ select {
+ // Dequeue a change event
+ case event := <-q.changeEventQueue:
+ logger.Debugw("sending-change-event", log.Fields{"event": event})
+ if err := changeEvents.Send(&event); err != nil {
+ logger.Errorw("failed-to-send-change-event", log.Fields{"error": err})
+ // save last failed changeevent
+ streamingTracker.failedPacket = event
+ } else {
+ if streamingTracker.failedPacket != nil {
+ // reset last failed event saved on success to avoid flushing
+ streamingTracker.failedPacket = nil
+ }
+ }
+ case <-q.changeEventQueueDone:
+ logger.Debug("Another ReceiveChangeEvents already running. Bailing out ...")
+ break loop
+ }
+ }
+
+ return nil
+}
+
+func (q *Manager) GetChangeEventsQueueForTest() <-chan openflow_13.ChangeEvent {
+ return q.changeEventQueue
+}
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index c872da9..7bc8e4d 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -373,7 +373,7 @@
var err error
var device *voltha.Device
- if device, err = agent.deviceMgr.GetDevice(ctx, deviceID); err != nil {
+ if device, err = agent.deviceMgr.getDevice(ctx, deviceID); err != nil {
logger.Errorw("error-retrieving-device", log.Fields{"error": err, "deviceId": deviceID})
return err
}
@@ -1671,7 +1671,7 @@
defer agent.requestQueue.RequestComplete()
if agent.deviceRoutes == nil {
- agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.GetDevice)
+ agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
}
// Get all the logical ports on that logical device
lDevice := agent.getLogicalDeviceWithoutLock()
@@ -1695,7 +1695,7 @@
defer agent.requestQueue.RequestComplete()
if agent.deviceRoutes == nil {
- agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.GetDevice)
+ agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
}
if err := agent.deviceRoutes.AddPort(ctx, lp, agent.logicalDevice.Ports); err != nil {
return err
@@ -1737,15 +1737,15 @@
// Send the port change events to the OF controller
for _, newP := range newPorts {
- go agent.ldeviceMgr.eventCallbacks.SendChangeEvent(agent.logicalDeviceID,
+ go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
&ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_ADD, Desc: newP.OfpPort})
}
for _, change := range changedPorts {
- go agent.ldeviceMgr.eventCallbacks.SendChangeEvent(agent.logicalDeviceID,
+ go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
&ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_MODIFY, Desc: change.OfpPort})
}
for _, del := range deletedPorts {
- go agent.ldeviceMgr.eventCallbacks.SendChangeEvent(agent.logicalDeviceID,
+ go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
&ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_DELETE, Desc: del.OfpPort})
}
@@ -1915,7 +1915,7 @@
"transactionId": transactionID,
})
packetIn := fu.MkPacketIn(port, packet)
- agent.ldeviceMgr.eventCallbacks.SendPacketIn(agent.logicalDeviceID, transactionID, packetIn)
+ agent.ldeviceMgr.SendPacketIn(agent.logicalDeviceID, transactionID, packetIn)
logger.Debugw("sending-packet-in", log.Fields{"packet": hex.EncodeToString(packetIn.Data)})
}
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index 8a00a9e..64c42b5 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -483,8 +483,7 @@
proxy := model.NewProxy(backend, "/")
adapterMgr := adapter.NewAdapterManager(proxy, lda.coreInstanceID, lda.kClient)
- lda.deviceMgr, lda.logicalDeviceMgr = NewDeviceManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CorePairTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
- lda.logicalDeviceMgr.SetEventCallbacks(fakeEventCallbacks{})
+ lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CorePairTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
if err = lda.kmp.Start(); err != nil {
logger.Fatal("Cannot start InterContainerProxy")
}
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index 5005e0c..a5c47b9 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -19,7 +19,10 @@
import (
"context"
"errors"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-go/rw_core/core/device/event"
"github.com/opencord/voltha-go/rw_core/utils"
+ "io"
"strings"
"sync"
"time"
@@ -36,9 +39,9 @@
// LogicalManager represent logical device manager attributes
type LogicalManager struct {
+ *event.Manager
logicalDeviceAgents sync.Map
deviceMgr *Manager
- eventCallbacks EventCallbacks
kafkaICProxy kafka.InterContainerProxy
clusterDataProxy *model.Proxy
exitChannel chan int
@@ -47,15 +50,6 @@
logicalDeviceLoadingInProgress map[string][]chan int
}
-type EventCallbacks interface {
- SendChangeEvent(deviceID string, portStatus *openflow_13.OfpPortStatus)
- SendPacketIn(deviceID string, transactionID string, packet *openflow_13.OfpPacketIn)
-}
-
-func (ldMgr *LogicalManager) SetEventCallbacks(callbacks EventCallbacks) {
- ldMgr.eventCallbacks = callbacks
-}
-
func (ldMgr *LogicalManager) Start(ctx context.Context) {
logger.Info("starting-logical-device-manager")
probe.UpdateStatusFromContext(ctx, "logical-device-manager", probe.ServiceStatusRunning)
@@ -69,18 +63,6 @@
logger.Info("logical-device-manager-stopped")
}
-func sendAPIResponse(ctx context.Context, ch chan interface{}, result interface{}) {
- if ctx.Err() == nil {
- // Returned response only of the ctx has not been cancelled/timeout/etc
- // Channel is automatically closed when a context is Done
- ch <- result
- logger.Debugw("sendResponse", log.Fields{"result": result})
- } else {
- // Should the transaction be reverted back?
- logger.Debugw("sendResponse-context-error", log.Fields{"context-error": ctx.Err()})
- }
-}
-
func (ldMgr *LogicalManager) addLogicalDeviceAgentToMap(agent *LogicalAgent) {
if _, exist := ldMgr.logicalDeviceAgents.Load(agent.logicalDeviceID); !exist {
ldMgr.logicalDeviceAgents.Store(agent.logicalDeviceID, agent)
@@ -118,16 +100,16 @@
// GetLogicalDevice provides a cloned most up to date logical device. If device is not in memory
// it will be fetched from the dB
-func (ldMgr *LogicalManager) GetLogicalDevice(ctx context.Context, id string) (*voltha.LogicalDevice, error) {
+func (ldMgr *LogicalManager) GetLogicalDevice(ctx context.Context, id *voltha.ID) (*voltha.LogicalDevice, error) {
logger.Debugw("getlogicalDevice", log.Fields{"logicaldeviceid": id})
- if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
return agent.GetLogicalDevice(ctx)
}
return nil, status.Errorf(codes.NotFound, "%s", id)
}
//ListLogicalDevices returns the list of all logical devices
-func (ldMgr *LogicalManager) ListLogicalDevices(ctx context.Context) (*voltha.LogicalDevices, error) {
+func (ldMgr *LogicalManager) ListLogicalDevices(ctx context.Context, _ *empty.Empty) (*voltha.LogicalDevices, error) {
logger.Debug("ListAllLogicalDevices")
var logicalDevices []*voltha.LogicalDevice
@@ -301,7 +283,7 @@
// Get the device
var device *voltha.Device
var err error
- if device, err = ldMgr.deviceMgr.GetDevice(ctx, deviceID); err != nil {
+ if device, err = ldMgr.deviceMgr.getDevice(ctx, deviceID); err != nil {
return nil, err
}
return ldMgr.getLogicalDeviceID(ctx, device)
@@ -315,7 +297,7 @@
return nil, err
}
var lDevice *voltha.LogicalDevice
- if lDevice, err = ldMgr.GetLogicalDevice(ctx, *lDeviceID); err != nil {
+ if lDevice, err = ldMgr.GetLogicalDevice(ctx, &voltha.ID{Id: *lDeviceID}); err != nil {
return nil, err
}
// Go over list of ports
@@ -328,37 +310,38 @@
}
// ListLogicalDeviceFlows returns the flows of logical device
-func (ldMgr *LogicalManager) ListLogicalDeviceFlows(ctx context.Context, id string) (*openflow_13.Flows, error) {
- logger.Debugw("ListLogicalDeviceFlows", log.Fields{"logicaldeviceid": id})
- if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
+func (ldMgr *LogicalManager) ListLogicalDeviceFlows(ctx context.Context, id *voltha.ID) (*openflow_13.Flows, error) {
+ logger.Debugw("ListLogicalDeviceFlows", log.Fields{"logicaldeviceid": id.Id})
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
return agent.ListLogicalDeviceFlows(ctx)
}
- return nil, status.Errorf(codes.NotFound, "%s", id)
+ return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
// ListLogicalDeviceFlowGroups returns logical device flow groups
-func (ldMgr *LogicalManager) ListLogicalDeviceFlowGroups(ctx context.Context, id string) (*openflow_13.FlowGroups, error) {
- logger.Debugw("ListLogicalDeviceFlowGroups", log.Fields{"logicaldeviceid": id})
- if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
+func (ldMgr *LogicalManager) ListLogicalDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*openflow_13.FlowGroups, error) {
+ logger.Debugw("ListLogicalDeviceFlowGroups", log.Fields{"logicaldeviceid": id.Id})
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
return agent.ListLogicalDeviceFlowGroups(ctx)
}
- return nil, status.Errorf(codes.NotFound, "%s", id)
+ return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
// ListLogicalDevicePorts returns logical device ports
-func (ldMgr *LogicalManager) ListLogicalDevicePorts(ctx context.Context, id string) (*voltha.LogicalPorts, error) {
- logger.Debugw("ListLogicalDevicePorts", log.Fields{"logicaldeviceid": id})
- if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
+func (ldMgr *LogicalManager) ListLogicalDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.LogicalPorts, error) {
+ logger.Debugw("ListLogicalDevicePorts", log.Fields{"logicaldeviceid": id.Id})
+ if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
return agent.ListLogicalDevicePorts(ctx)
}
- return nil, status.Errorf(codes.NotFound, "%s", id)
+ return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
-func (ldMgr *LogicalManager) GetLogicalPort(ctx context.Context, lPortID *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
+// GetLogicalDevicePort returns logical device port details
+func (ldMgr *LogicalManager) GetLogicalDevicePort(ctx context.Context, lPortID *voltha.LogicalPortId) (*voltha.LogicalPort, error) {
// Get the logical device where this device is attached
var err error
var lDevice *voltha.LogicalDevice
- if lDevice, err = ldMgr.GetLogicalDevice(ctx, lPortID.Id); err != nil {
+ if lDevice, err = ldMgr.GetLogicalDevice(ctx, &voltha.ID{Id: lPortID.Id}); err != nil {
return nil, err
}
// Go over list of ports
@@ -393,7 +376,7 @@
// Get logical port
var logicalPort *voltha.LogicalPort
var err error
- if logicalPort, err = ldMgr.GetLogicalPort(ctx, lPortID); err != nil {
+ if logicalPort, err = ldMgr.GetLogicalDevicePort(ctx, lPortID); err != nil {
logger.Debugw("no-logical-device-port-present", log.Fields{"logicalPortId": lPortID.PortId})
return err
}
@@ -525,72 +508,64 @@
return nil
}
-func (ldMgr *LogicalManager) UpdateFlowTable(ctx context.Context, id string, flow *openflow_13.OfpFlowMod, ch chan interface{}) {
- logger.Debugw("UpdateFlowTable", log.Fields{"logicalDeviceId": id})
- var res interface{}
- if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
- res = agent.updateFlowTable(ctx, flow)
- logger.Debugw("UpdateFlowTable-result", log.Fields{"result": res})
- } else {
- res = status.Errorf(codes.NotFound, "%s", id)
+// UpdateLogicalDeviceFlowTable updates logical device flow table
+func (ldMgr *LogicalManager) UpdateLogicalDeviceFlowTable(ctx context.Context, flow *openflow_13.FlowTableUpdate) (*empty.Empty, error) {
+ logger.Debugw("UpdateLogicalDeviceFlowTable", log.Fields{"logicalDeviceId": flow.Id})
+ agent := ldMgr.getLogicalDeviceAgent(ctx, flow.Id)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", flow.Id)
}
- sendAPIResponse(ctx, ch, res)
+ return &empty.Empty{}, agent.updateFlowTable(ctx, flow.FlowMod)
}
-func (ldMgr *LogicalManager) UpdateMeterTable(ctx context.Context, id string, meter *openflow_13.OfpMeterMod, ch chan interface{}) {
- logger.Debugw("UpdateMeterTable", log.Fields{"logicalDeviceId": id})
- var res interface{}
- if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
- res = agent.updateMeterTable(ctx, meter)
- logger.Debugw("UpdateMeterTable-result", log.Fields{"result": res})
- } else {
- res = status.Errorf(codes.NotFound, "%s", id)
+// UpdateLogicalDeviceMeterTable - This function sends meter mod request to logical device manager and waits for response
+func (ldMgr *LogicalManager) UpdateLogicalDeviceMeterTable(ctx context.Context, meter *openflow_13.MeterModUpdate) (*empty.Empty, error) {
+ logger.Debugw("UpdateLogicalDeviceMeterTable", log.Fields{"logicalDeviceId": meter.Id})
+ agent := ldMgr.getLogicalDeviceAgent(ctx, meter.Id)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", meter.Id)
}
- sendAPIResponse(ctx, ch, res)
+ return &empty.Empty{}, agent.updateMeterTable(ctx, meter.MeterMod)
}
// ListLogicalDeviceMeters returns logical device meters
-func (ldMgr *LogicalManager) ListLogicalDeviceMeters(ctx context.Context, id string) (*openflow_13.Meters, error) {
- logger.Debugw("ListLogicalDeviceMeters", log.Fields{"logicalDeviceId": id})
- if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
- return agent.ListLogicalDeviceMeters(ctx)
+func (ldMgr *LogicalManager) ListLogicalDeviceMeters(ctx context.Context, id *voltha.ID) (*openflow_13.Meters, error) {
+ logger.Debugw("ListLogicalDeviceMeters", log.Fields{"logicalDeviceId": id.Id})
+ agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
- return nil, status.Errorf(codes.NotFound, "%s", id)
-}
-func (ldMgr *LogicalManager) UpdateGroupTable(ctx context.Context, id string, groupMod *openflow_13.OfpGroupMod, ch chan interface{}) {
- logger.Debugw("UpdateGroupTable", log.Fields{"logicalDeviceId": id})
- var res interface{}
- if agent := ldMgr.getLogicalDeviceAgent(ctx, id); agent != nil {
- res = agent.updateGroupTable(ctx, groupMod)
- logger.Debugw("UpdateGroupTable-result", log.Fields{"result": res})
- } else {
- res = status.Errorf(codes.NotFound, "%s", id)
- }
- sendAPIResponse(ctx, ch, res)
+ return agent.ListLogicalDeviceMeters(ctx)
}
-func (ldMgr *LogicalManager) EnableLogicalPort(ctx context.Context, id *voltha.LogicalPortId, ch chan interface{}) {
- logger.Debugw("EnableLogicalPort", log.Fields{"logicalDeviceId": id})
- var res interface{}
- if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
- res = agent.enableLogicalPort(ctx, id.PortId)
- logger.Debugw("EnableLogicalPort-result", log.Fields{"result": res})
- } else {
- res = status.Errorf(codes.NotFound, "%s", id.Id)
+// UpdateLogicalDeviceFlowGroupTable updates logical device flow group table
+func (ldMgr *LogicalManager) UpdateLogicalDeviceFlowGroupTable(ctx context.Context, flow *openflow_13.FlowGroupTableUpdate) (*empty.Empty, error) {
+ logger.Debugw("UpdateGroupTable", log.Fields{"logicalDeviceId": flow.Id})
+ agent := ldMgr.getLogicalDeviceAgent(ctx, flow.Id)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", flow.Id)
}
- sendAPIResponse(ctx, ch, res)
+ return &empty.Empty{}, agent.updateGroupTable(ctx, flow.GroupMod)
}
-func (ldMgr *LogicalManager) DisableLogicalPort(ctx context.Context, id *voltha.LogicalPortId, ch chan interface{}) {
- logger.Debugw("DisableLogicalPort", log.Fields{"logicalDeviceId": id})
- var res interface{}
- if agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id); agent != nil {
- res = agent.disableLogicalPort(ctx, id.PortId)
- logger.Debugw("DisableLogicalPort-result", log.Fields{"result": res})
- } else {
- res = status.Errorf(codes.NotFound, "%s", id.Id)
+// EnableLogicalDevicePort enables logical device port
+func (ldMgr *LogicalManager) EnableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
+ logger.Debugw("EnableLogicalDevicePort", log.Fields{"logicalDeviceId": id})
+ agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
- sendAPIResponse(ctx, ch, res)
+ return &empty.Empty{}, agent.enableLogicalPort(ctx, id.PortId)
+}
+
+// DisableLogicalDevicePort disables logical device port
+func (ldMgr *LogicalManager) DisableLogicalDevicePort(ctx context.Context, id *voltha.LogicalPortId) (*empty.Empty, error) {
+ logger.Debugw("DisableLogicalDevicePort", log.Fields{"logicalDeviceId": id})
+ agent := ldMgr.getLogicalDeviceAgent(ctx, id.Id)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", id.Id)
+ }
+ return &empty.Empty{}, agent.disableLogicalPort(ctx, id.PortId)
}
func (ldMgr *LogicalManager) packetIn(ctx context.Context, logicalDeviceID string, port uint32, transactionID string, packet []byte) error {
@@ -603,10 +578,37 @@
return nil
}
-func (ldMgr *LogicalManager) PacketOut(ctx context.Context, packet *openflow_13.PacketOut) {
- if agent := ldMgr.getLogicalDeviceAgent(ctx, packet.Id); agent != nil {
- agent.packetOut(ctx, packet.PacketOut)
- } else {
- logger.Errorf("No logical device agent present", log.Fields{"logicalDeviceID": packet.Id})
+// StreamPacketsOut sends packets to adapter
+func (ldMgr *LogicalManager) StreamPacketsOut(packets voltha.VolthaService_StreamPacketsOutServer) error {
+ logger.Debugw("StreamPacketsOut-request", log.Fields{"packets": packets})
+loop:
+ for {
+ select {
+ case <-packets.Context().Done():
+ logger.Infow("StreamPacketsOut-context-done", log.Fields{"packets": packets, "error": packets.Context().Err()})
+ break loop
+ default:
+ }
+
+ packet, err := packets.Recv()
+
+ if err == io.EOF {
+ logger.Debugw("Received-EOF", log.Fields{"packets": packets})
+ break loop
+ }
+
+ if err != nil {
+ logger.Errorw("Failed to receive packet out", log.Fields{"error": err})
+ continue
+ }
+
+ if agent := ldMgr.getLogicalDeviceAgent(packets.Context(), packet.Id); agent != nil {
+ agent.packetOut(packets.Context(), packet.PacketOut)
+ } else {
+ logger.Errorf("No logical device agent present", log.Fields{"logicalDeviceID": packet.Id})
+ }
}
+
+ logger.Debugw("StreamPacketsOut-request-done", log.Fields{"packets": packets})
+ return nil
}
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index ad2af57..b0128a5 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -19,11 +19,13 @@
import (
"context"
"errors"
+ "github.com/opencord/voltha-go/rw_core/core/device/event"
"reflect"
"runtime"
"sync"
"time"
+ "github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/core/adapter"
"github.com/opencord/voltha-go/rw_core/core/device/remote"
@@ -31,6 +33,7 @@
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-lib-go/v3/pkg/probe"
+ "github.com/opencord/voltha-protos/v3/go/common"
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
@@ -56,7 +59,7 @@
deviceLoadingInProgress map[string][]chan int
}
-func NewDeviceManagers(proxy *model.Proxy, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
+func NewManagers(proxy *model.Proxy, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
deviceMgr := &Manager{
exitChannel: make(chan int, 1),
rootDevices: make(map[string]bool),
@@ -69,6 +72,7 @@
deviceLoadingInProgress: make(map[string][]chan int),
}
logicalDeviceMgr := &LogicalManager{
+ Manager: event.NewManager(),
exitChannel: make(chan int, 1),
deviceMgr: deviceMgr,
kafkaICProxy: kmp,
@@ -97,18 +101,6 @@
logger.Info("device-manager-stopped")
}
-func sendResponse(ctx context.Context, ch chan interface{}, result interface{}) {
- if ctx.Err() == nil {
- // Returned response only of the ctx has not been cancelled/timeout/etc
- // Channel is automatically closed when a context is Done
- ch <- result
- logger.Debugw("sendResponse", log.Fields{"result": result})
- } else {
- // Should the transaction be reverted back?
- logger.Debugw("sendResponse-context-error", log.Fields{"context-error": ctx.Err()})
- }
-}
-
func (dMgr *Manager) addDeviceAgentToMap(agent *Agent) {
if _, exist := dMgr.deviceAgents.Load(agent.deviceID); !exist {
dMgr.deviceAgents.Store(agent.deviceID, agent)
@@ -158,17 +150,22 @@
return result
}
-func (dMgr *Manager) CreateDevice(ctx context.Context, device *voltha.Device, ch chan interface{}) {
+// CreateDevice creates a new parent device in the data model
+func (dMgr *Manager) CreateDevice(ctx context.Context, device *voltha.Device) (*voltha.Device, error) {
+ if device.MacAddress == "" && device.GetHostAndPort() == "" {
+ logger.Errorf("No Device Info Present")
+ return &voltha.Device{}, errors.New("no-device-info-present; MAC or HOSTIP&PORT")
+ }
+ logger.Debugw("create-device", log.Fields{"device": *device})
+
deviceExist, err := dMgr.isParentDeviceExist(ctx, device)
if err != nil {
logger.Errorf("Failed to fetch parent device info")
- sendResponse(ctx, ch, err)
- return
+ return nil, err
}
if deviceExist {
logger.Errorf("Device is Pre-provisioned already with same IP-Port or MAC Address")
- sendResponse(ctx, ch, errors.New("Device is already pre-provisioned"))
- return
+ return nil, errors.New("device is already pre-provisioned")
}
logger.Debugw("CreateDevice", log.Fields{"device": device, "aproxy": dMgr.adapterProxy})
@@ -179,61 +176,81 @@
device, err = agent.start(ctx, device)
if err != nil {
logger.Errorw("Fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
- sendResponse(ctx, ch, err)
- return
+ return nil, err
}
dMgr.addDeviceAgentToMap(agent)
-
- sendResponse(ctx, ch, device)
+ return device, nil
}
-func (dMgr *Manager) EnableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
- logger.Debugw("EnableDevice", log.Fields{"deviceid": id})
- var res interface{}
- if agent := dMgr.getDeviceAgent(ctx, id.Id); agent != nil {
- res = agent.enableDevice(ctx)
- logger.Debugw("EnableDevice-result", log.Fields{"result": res})
- } else {
- res = status.Errorf(codes.NotFound, "%s", id.Id)
+// EnableDevice activates a device by invoking the adopt_device API on the appropriate adapter
+func (dMgr *Manager) EnableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+ logger.Debugw("EnableDevice", log.Fields{"device-id": id.Id})
+ agent := dMgr.getDeviceAgent(ctx, id.Id)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
- sendResponse(ctx, ch, res)
+ return &empty.Empty{}, agent.enableDevice(ctx)
}
-func (dMgr *Manager) DisableDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
- logger.Debugw("DisableDevice", log.Fields{"deviceid": id})
- var res interface{}
- if agent := dMgr.getDeviceAgent(ctx, id.Id); agent != nil {
- res = agent.disableDevice(ctx)
- logger.Debugw("DisableDevice-result", log.Fields{"result": res})
- } else {
- res = status.Errorf(codes.NotFound, "%s", id.Id)
+// DisableDevice disables a device along with any child device it may have
+func (dMgr *Manager) DisableDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+ logger.Debugw("DisableDevice", log.Fields{"device-id": id.Id})
+ agent := dMgr.getDeviceAgent(ctx, id.Id)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
-
- sendResponse(ctx, ch, res)
+ return &empty.Empty{}, agent.disableDevice(ctx)
}
-func (dMgr *Manager) RebootDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
- logger.Debugw("RebootDevice", log.Fields{"deviceid": id})
- var res interface{}
- if agent := dMgr.getDeviceAgent(ctx, id.Id); agent != nil {
- res = agent.rebootDevice(ctx)
- logger.Debugw("RebootDevice-result", log.Fields{"result": res})
- } else {
- res = status.Errorf(codes.NotFound, "%s", id.Id)
+//RebootDevice invoked the reboot API to the corresponding adapter
+func (dMgr *Manager) RebootDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+ logger.Debugw("RebootDevice", log.Fields{"device-id": id.Id})
+ agent := dMgr.getDeviceAgent(ctx, id.Id)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
- sendResponse(ctx, ch, res)
+ return &empty.Empty{}, agent.rebootDevice(ctx)
}
-func (dMgr *Manager) DeleteDevice(ctx context.Context, id *voltha.ID, ch chan interface{}) {
- logger.Debugw("DeleteDevice", log.Fields{"deviceid": id})
- var res interface{}
- if agent := dMgr.getDeviceAgent(ctx, id.Id); agent != nil {
- res = agent.deleteDevice(ctx)
- logger.Debugw("DeleteDevice-result", log.Fields{"result": res})
- } else {
- res = status.Errorf(codes.NotFound, "%s", id.Id)
+// DeleteDevice removes a device from the data model
+func (dMgr *Manager) DeleteDevice(ctx context.Context, id *voltha.ID) (*empty.Empty, error) {
+ logger.Debugw("DeleteDevice", log.Fields{"device-id": id.Id})
+ agent := dMgr.getDeviceAgent(ctx, id.Id)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
- sendResponse(ctx, ch, res)
+ return &empty.Empty{}, agent.deleteDevice(ctx)
+}
+
+// ListDevicePorts returns the ports details for a specific device entry
+func (dMgr *Manager) ListDevicePorts(ctx context.Context, id *voltha.ID) (*voltha.Ports, error) {
+ logger.Debugw("ListDevicePorts", log.Fields{"device-id": id.Id})
+ device, err := dMgr.getDevice(ctx, id.Id)
+ if err != nil {
+ return &voltha.Ports{}, err
+ }
+ return &voltha.Ports{Items: device.Ports}, nil
+}
+
+// ListDeviceFlows returns the flow details for a specific device entry
+func (dMgr *Manager) ListDeviceFlows(ctx context.Context, id *voltha.ID) (*ofp.Flows, error) {
+ logger.Debugw("ListDeviceFlows", log.Fields{"device-id": id.Id})
+ device, err := dMgr.getDevice(ctx, id.Id)
+ if err != nil {
+ return &ofp.Flows{}, err
+ }
+ return device.Flows, nil
+}
+
+// ListDeviceFlowGroups returns the flow group details for a specific device entry
+func (dMgr *Manager) ListDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*voltha.FlowGroups, error) {
+ logger.Debugw("ListDeviceFlowGroups", log.Fields{"device-id": id.Id})
+
+ device, err := dMgr.getDevice(ctx, id.Id)
+ if err != nil {
+ return nil, status.Errorf(codes.NotFound, "device-%s", id.Id)
+ }
+ return device.GetFlowGroups(), nil
}
// stopManagingDevice stops the management of the device as well as any of its reference device and logical device.
@@ -262,9 +279,13 @@
return nil
}
-// GetDevice will returns a device, either from memory or from the dB, if present
-func (dMgr *Manager) GetDevice(ctx context.Context, id string) (*voltha.Device, error) {
- logger.Debugw("GetDevice", log.Fields{"deviceid": id})
+func (dMgr *Manager) GetDevice(ctx context.Context, id *voltha.ID) (*voltha.Device, error) {
+ return dMgr.getDevice(ctx, id.Id)
+}
+
+// getDevice will returns a device, either from memory or from the dB, if present
+func (dMgr *Manager) getDevice(ctx context.Context, id string) (*voltha.Device, error) {
+ logger.Debugw("getDevice", log.Fields{"deviceid": id})
if agent := dMgr.getDeviceAgent(ctx, id); agent != nil {
return agent.getDevice(ctx)
}
@@ -278,7 +299,7 @@
var parentDevice *voltha.Device
var err error
- if parentDevice, err = dMgr.GetDevice(ctx, parentDeviceID); err != nil {
+ if parentDevice, err = dMgr.getDevice(ctx, parentDeviceID); err != nil {
return nil, status.Errorf(codes.Aborted, "%s", err.Error())
}
var childDeviceIds []string
@@ -293,7 +314,7 @@
var foundChildDevice *voltha.Device
for _, childDeviceID := range childDeviceIds {
var found bool
- if searchDevice, err := dMgr.GetDevice(ctx, childDeviceID); err == nil {
+ if searchDevice, err := dMgr.getDevice(ctx, childDeviceID); err == nil {
foundOnuID := false
if searchDevice.ProxyAddress.OnuId == uint32(onuID) {
@@ -340,7 +361,7 @@
var parentDevice *voltha.Device
var err error
- if parentDevice, err = dMgr.GetDevice(ctx, proxyAddress.DeviceId); err != nil {
+ if parentDevice, err = dMgr.getDevice(ctx, proxyAddress.DeviceId); err != nil {
return nil, status.Errorf(codes.Aborted, "%s", err.Error())
}
var childDeviceIds []string
@@ -354,7 +375,7 @@
var foundChildDevice *voltha.Device
for _, childDeviceID := range childDeviceIds {
- if searchDevice, err := dMgr.GetDevice(ctx, childDeviceID); err == nil {
+ if searchDevice, err := dMgr.getDevice(ctx, childDeviceID); err == nil {
if searchDevice.ProxyAddress == proxyAddress {
foundChildDevice = searchDevice
break
@@ -388,7 +409,7 @@
}
// ListDevices retrieves the latest devices from the data model
-func (dMgr *Manager) ListDevices(ctx context.Context) (*voltha.Devices, error) {
+func (dMgr *Manager) ListDevices(ctx context.Context, _ *empty.Empty) (*voltha.Devices, error) {
logger.Debug("ListDevices")
result := &voltha.Devices{}
@@ -569,17 +590,16 @@
}
// ListDeviceIds retrieves the latest device IDs information from the data model (memory data only)
-func (dMgr *Manager) ListDeviceIds() (*voltha.IDs, error) {
+func (dMgr *Manager) ListDeviceIds(_ context.Context, _ *empty.Empty) (*voltha.IDs, error) {
logger.Debug("ListDeviceIDs")
// Report only device IDs that are in the device agent map
return dMgr.listDeviceIdsFromMap(), nil
}
-//ReconcileDevices is a request to a voltha core to update its list of managed devices. This will
-//trigger loading the devices along with their children and parent in memory
-func (dMgr *Manager) ReconcileDevices(ctx context.Context, ids *voltha.IDs, ch chan interface{}) {
+// ReconcileDevices is a request to a voltha core to update its list of managed devices. This will
+// trigger loading the devices along with their children and parent in memory
+func (dMgr *Manager) ReconcileDevices(ctx context.Context, ids *voltha.IDs) (*empty.Empty, error) {
logger.Debugw("ReconcileDevices", log.Fields{"numDevices": len(ids.Items)})
- var res interface{}
if ids != nil && len(ids.Items) != 0 {
toReconcile := len(ids.Items)
reconciled := 0
@@ -592,12 +612,12 @@
}
}
if toReconcile != reconciled {
- res = status.Errorf(codes.DataLoss, "less-device-reconciled-than-requested:%d/%d", reconciled, toReconcile)
+ return nil, status.Errorf(codes.DataLoss, "less-device-reconciled-than-requested:%d/%d", reconciled, toReconcile)
}
} else {
- res = status.Errorf(codes.InvalidArgument, "empty-list-of-ids")
+ return nil, status.Errorf(codes.InvalidArgument, "empty-list-of-ids")
}
- sendResponse(ctx, ch, res)
+ return &empty.Empty{}, nil
}
// isOkToReconcile validates whether a device is in the correct status to be reconciled
@@ -740,7 +760,7 @@
// Notify the logical device manager to setup a logical port, if needed. If the added port is an NNI or UNI
// then a logical port will be added to the logical device and the device graph generated. If the port is a
// PON port then only the device graph will be generated.
- if device, err := dMgr.GetDevice(ctx, deviceID); err == nil {
+ if device, err := dMgr.getDevice(ctx, deviceID); err == nil {
go func() {
err = dMgr.logicalDeviceMgr.updateLogicalPort(context.Background(), device, port)
if err != nil {
@@ -792,18 +812,17 @@
return status.Errorf(codes.NotFound, "%s", deviceID)
}
-// UpdatePmConfigs updates the PM configs. This is executed when the northbound gRPC API is invoked, typically
+// UpdateDevicePmConfigs updates the PM configs. This is executed when the northbound gRPC API is invoked, typically
// following a user action
-func (dMgr *Manager) UpdatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs, ch chan interface{}) {
- var res interface{}
- if pmConfigs.Id == "" {
- res = status.Errorf(codes.FailedPrecondition, "invalid-device-Id")
- } else if agent := dMgr.getDeviceAgent(ctx, pmConfigs.Id); agent != nil {
- res = agent.updatePmConfigs(ctx, pmConfigs)
- } else {
- res = status.Errorf(codes.NotFound, "%s", pmConfigs.Id)
+func (dMgr *Manager) UpdateDevicePmConfigs(ctx context.Context, configs *voltha.PmConfigs) (*empty.Empty, error) {
+ if configs.Id == "" {
+ return nil, status.Error(codes.FailedPrecondition, "invalid-device-Id")
}
- sendResponse(ctx, ch, res)
+ agent := dMgr.getDeviceAgent(ctx, configs.Id)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", configs.Id)
+ }
+ return &empty.Empty{}, agent.updatePmConfigs(ctx, configs)
}
// InitPmConfigs initialize the pm configs as defined by the adapter.
@@ -817,11 +836,13 @@
return status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *Manager) ListPmConfigs(ctx context.Context, deviceID string) (*voltha.PmConfigs, error) {
- if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
- return agent.listPmConfigs(ctx)
+// ListDevicePmConfigs returns pm configs of device
+func (dMgr *Manager) ListDevicePmConfigs(ctx context.Context, id *voltha.ID) (*voltha.PmConfigs, error) {
+ agent := dMgr.getDeviceAgent(ctx, id.Id)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", id.Id)
}
- return nil, status.Errorf(codes.NotFound, "%s", deviceID)
+ return agent.listPmConfigs(ctx)
}
func (dMgr *Manager) getSwitchCapability(ctx context.Context, deviceID string) (*ic.SwitchCapability, error) {
@@ -860,7 +881,7 @@
logger.Debugw("UpdateChildrenStatus", log.Fields{"parentDeviceid": deviceID, "operStatus": operStatus, "connStatus": connStatus})
var parentDevice *voltha.Device
var err error
- if parentDevice, err = dMgr.GetDevice(ctx, deviceID); err != nil {
+ if parentDevice, err = dMgr.getDevice(ctx, deviceID); err != nil {
return status.Errorf(codes.Aborted, "%s", err.Error())
}
var childDeviceIds []string
@@ -917,7 +938,7 @@
// Notify the logical device manager to remove all logical ports, if needed.
// At this stage the device itself may gave been deleted already at a DeleteAllPorts
// typically is part of a device deletion phase.
- if device, err := dMgr.GetDevice(ctx, deviceID); err == nil {
+ if device, err := dMgr.getDevice(ctx, deviceID); err == nil {
go func() {
err = dMgr.logicalDeviceMgr.deleteAllLogicalPorts(ctx, device)
if err != nil {
@@ -953,7 +974,7 @@
return status.Error(codes.Unimplemented, "state-change-not-implemented")
}
// Notify the logical device about the state change
- device, err := dMgr.GetDevice(ctx, deviceID)
+ device, err := dMgr.getDevice(ctx, deviceID)
if err != nil {
logger.Warnw("non-existent-device", log.Fields{"deviceId": deviceID, "error": err})
return err
@@ -973,8 +994,12 @@
if deviceType == "" && vendorID != "" {
logger.Debug("device-type-is-nil-fetching-device-type")
+ deviceTypes, err := dMgr.adapterMgr.ListDeviceTypes(ctx, nil)
+ if err != nil {
+ return nil, err
+ }
OLoop:
- for _, dType := range dMgr.adapterMgr.ListDeviceTypes() {
+ for _, dType := range deviceTypes.Items {
for _, v := range dType.VendorIds {
if v == vendorID {
deviceType = dType.Adapter
@@ -1085,7 +1110,7 @@
// Get the logical device Id based on the deviceId
var device *voltha.Device
var err error
- if device, err = dMgr.GetDevice(ctx, deviceID); err != nil {
+ if device, err = dMgr.getDevice(ctx, deviceID); err != nil {
logger.Errorw("device-not-found", log.Fields{"deviceId": deviceID})
return err
}
@@ -1171,7 +1196,7 @@
// childDevice is the parent device
return childDevice
}
- parentDevice, _ := dMgr.GetDevice(ctx, childDevice.ParentId)
+ parentDevice, _ := dMgr.getDevice(ctx, childDevice.ParentId)
return parentDevice
}
@@ -1181,7 +1206,7 @@
logger.Debug("ChildDevicesLost")
var err error
var parentDevice *voltha.Device
- if parentDevice, err = dMgr.GetDevice(ctx, parentDeviceID); err != nil {
+ if parentDevice, err = dMgr.getDevice(ctx, parentDeviceID); err != nil {
logger.Warnw("failed-getting-device", log.Fields{"deviceId": parentDeviceID, "error": err})
return err
}
@@ -1196,7 +1221,7 @@
var parentDevice *voltha.Device
var childDeviceIds []string
- if parentDevice, err = dMgr.GetDevice(ctx, parentDeviceID); err != nil {
+ if parentDevice, err = dMgr.getDevice(ctx, parentDeviceID); err != nil {
logger.Warnw("failed-getting-device", log.Fields{"deviceId": parentDeviceID, "error": err})
return err
}
@@ -1330,11 +1355,11 @@
//GetAllChildDevices is a helper method to get all the child device IDs from the device passed as parameter
func (dMgr *Manager) GetAllChildDevices(ctx context.Context, parentDeviceID string) (*voltha.Devices, error) {
logger.Debugw("GetAllChildDevices", log.Fields{"parentDeviceId": parentDeviceID})
- if parentDevice, err := dMgr.GetDevice(ctx, parentDeviceID); err == nil {
+ if parentDevice, err := dMgr.getDevice(ctx, parentDeviceID); err == nil {
childDevices := make([]*voltha.Device, 0)
if childDeviceIds, er := dMgr.getAllChildDeviceIds(parentDevice); er == nil {
for _, deviceID := range childDeviceIds {
- if d, e := dMgr.GetDevice(ctx, deviceID); e == nil && d != nil {
+ if d, e := dMgr.getDevice(ctx, deviceID); e == nil && d != nil {
childDevices = append(childDevices, d)
}
}
@@ -1354,83 +1379,84 @@
return nil
}
-func (dMgr *Manager) DownloadImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
- logger.Debugw("DownloadImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
- var res interface{}
- var err error
- if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
- if res, err = agent.downloadImage(ctx, img); err != nil {
- logger.Debugw("DownloadImage-failed", log.Fields{"err": err, "imageName": img.Name})
- res = err
- }
- } else {
- res = status.Errorf(codes.NotFound, "%s", img.Id)
+// convenience to avoid redefining
+var operationFailureResp = &common.OperationResp{Code: voltha.OperationResp_OPERATION_FAILURE}
+
+// DownloadImage execute an image download request
+func (dMgr *Manager) DownloadImage(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+ logger.Debugw("DownloadImage", log.Fields{"device-id": img.Id, "imageName": img.Name})
+ agent := dMgr.getDeviceAgent(ctx, img.Id)
+ if agent == nil {
+ return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
}
- sendResponse(ctx, ch, res)
+ resp, err := agent.downloadImage(ctx, img)
+ if err != nil {
+ return operationFailureResp, err
+ }
+ return resp, nil
}
-func (dMgr *Manager) CancelImageDownload(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
- logger.Debugw("CancelImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
- var res interface{}
- var err error
- if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
- if res, err = agent.cancelImageDownload(ctx, img); err != nil {
- logger.Debugw("CancelImageDownload-failed", log.Fields{"err": err, "imageName": img.Name})
- res = err
- }
- } else {
- res = status.Errorf(codes.NotFound, "%s", img.Id)
+// CancelImageDownload cancels image download request
+func (dMgr *Manager) CancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+ logger.Debugw("CancelImageDownload", log.Fields{"device-id": img.Id, "imageName": img.Name})
+ agent := dMgr.getDeviceAgent(ctx, img.Id)
+ if agent == nil {
+ return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
}
- sendResponse(ctx, ch, res)
+ resp, err := agent.cancelImageDownload(ctx, img)
+ if err != nil {
+ return operationFailureResp, err
+ }
+ return resp, nil
}
-func (dMgr *Manager) ActivateImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
- logger.Debugw("ActivateImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
- var res interface{}
- var err error
- if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
- if res, err = agent.activateImage(ctx, img); err != nil {
- logger.Debugw("ActivateImage-failed", log.Fields{"err": err, "imageName": img.Name})
- res = err
- }
- } else {
- res = status.Errorf(codes.NotFound, "%s", img.Id)
+// ActivateImageUpdate activates image update request
+func (dMgr *Manager) ActivateImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+ logger.Debugw("ActivateImageUpdate", log.Fields{"device-id": img.Id, "imageName": img.Name})
+ agent := dMgr.getDeviceAgent(ctx, img.Id)
+ if agent == nil {
+ return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
}
- sendResponse(ctx, ch, res)
+ resp, err := agent.activateImage(ctx, img)
+ if err != nil {
+ return operationFailureResp, err
+ }
+ return resp, nil
}
-func (dMgr *Manager) RevertImage(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
- logger.Debugw("RevertImage", log.Fields{"deviceid": img.Id, "imageName": img.Name})
- var res interface{}
- var err error
- if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
- if res, err = agent.revertImage(ctx, img); err != nil {
- logger.Debugw("RevertImage-failed", log.Fields{"err": err, "imageName": img.Name})
- res = err
- }
- } else {
- res = status.Errorf(codes.NotFound, "%s", img.Id)
+// RevertImageUpdate reverts image update
+func (dMgr *Manager) RevertImageUpdate(ctx context.Context, img *voltha.ImageDownload) (*common.OperationResp, error) {
+ logger.Debugw("RevertImageUpdate", log.Fields{"device-id": img.Id, "imageName": img.Name})
+ agent := dMgr.getDeviceAgent(ctx, img.Id)
+ if agent == nil {
+ return operationFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
}
- sendResponse(ctx, ch, res)
+ resp, err := agent.revertImage(ctx, img)
+ if err != nil {
+ return operationFailureResp, err
+ }
+ return resp, nil
}
-func (dMgr *Manager) GetImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload, ch chan interface{}) {
- logger.Debugw("GetImageDownloadStatus", log.Fields{"deviceid": img.Id, "imageName": img.Name})
- var res interface{}
- var err error
- if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
- if res, err = agent.getImageDownloadStatus(ctx, img); err != nil {
- logger.Debugw("GetImageDownloadStatus-failed", log.Fields{"err": err, "imageName": img.Name})
- res = err
- }
- } else {
- res = status.Errorf(codes.NotFound, "%s", img.Id)
+// convenience to avoid redefining
+var imageDownloadFailureResp = &voltha.ImageDownload{DownloadState: voltha.ImageDownload_DOWNLOAD_UNKNOWN}
+
+// GetImageDownloadStatus returns status of image download
+func (dMgr *Manager) GetImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+ logger.Debugw("GetImageDownloadStatus", log.Fields{"device-id": img.Id, "imageName": img.Name})
+ agent := dMgr.getDeviceAgent(ctx, img.Id)
+ if agent == nil {
+ return imageDownloadFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
}
- sendResponse(ctx, ch, res)
+ resp, err := agent.getImageDownloadStatus(ctx, img)
+ if err != nil {
+ return imageDownloadFailureResp, err
+ }
+ return resp, nil
}
func (dMgr *Manager) UpdateImageDownload(ctx context.Context, deviceID string, img *voltha.ImageDownload) error {
- logger.Debugw("UpdateImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
+ logger.Debugw("UpdateImageDownload", log.Fields{"device-id": img.Id, "imageName": img.Name})
if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
if err := agent.updateImageDownload(ctx, img); err != nil {
logger.Debugw("UpdateImageDownload-failed", log.Fields{"err": err, "imageName": img.Name})
@@ -1442,20 +1468,42 @@
return nil
}
+// GetImageDownload returns image download
func (dMgr *Manager) GetImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
- logger.Debugw("GetImageDownload", log.Fields{"deviceid": img.Id, "imageName": img.Name})
- if agent := dMgr.getDeviceAgent(ctx, img.Id); agent != nil {
- return agent.getImageDownload(ctx, img)
+ logger.Debugw("GetImageDownload", log.Fields{"device-id": img.Id, "imageName": img.Name})
+ agent := dMgr.getDeviceAgent(ctx, img.Id)
+ if agent == nil {
+ return imageDownloadFailureResp, status.Errorf(codes.NotFound, "%s", img.Id)
}
- return nil, status.Errorf(codes.NotFound, "%s", img.Id)
+ resp, err := agent.getImageDownload(ctx, img)
+ if err != nil {
+ return imageDownloadFailureResp, err
+ }
+ return resp, nil
}
-func (dMgr *Manager) ListImageDownloads(ctx context.Context, deviceID string) (*voltha.ImageDownloads, error) {
- logger.Debugw("ListImageDownloads", log.Fields{"deviceID": deviceID})
- if agent := dMgr.getDeviceAgent(ctx, deviceID); agent != nil {
- return agent.listImageDownloads(ctx, deviceID)
+// ListImageDownloads returns image downloads
+func (dMgr *Manager) ListImageDownloads(ctx context.Context, id *voltha.ID) (*voltha.ImageDownloads, error) {
+ logger.Debugw("ListImageDownloads", log.Fields{"device-id": id.Id})
+ agent := dMgr.getDeviceAgent(ctx, id.Id)
+ if agent == nil {
+ return &voltha.ImageDownloads{Items: []*voltha.ImageDownload{imageDownloadFailureResp}}, status.Errorf(codes.NotFound, "%s", id.Id)
}
- return nil, status.Errorf(codes.NotFound, "%s", deviceID)
+ resp, err := agent.listImageDownloads(ctx, id.Id)
+ if err != nil {
+ return &voltha.ImageDownloads{Items: []*voltha.ImageDownload{imageDownloadFailureResp}}, err
+ }
+ return resp, nil
+}
+
+// GetImages returns all images for a specific device entry
+func (dMgr *Manager) GetImages(ctx context.Context, id *voltha.ID) (*voltha.Images, error) {
+ logger.Debugw("GetImages", log.Fields{"device-id": id.Id})
+ device, err := dMgr.getDevice(ctx, id.Id)
+ if err != nil {
+ return nil, err
+ }
+ return device.GetImages(), nil
}
func (dMgr *Manager) NotifyInvalidTransition(_ context.Context, device *voltha.Device) error {
@@ -1484,24 +1532,25 @@
// GetParentDeviceID returns parent device id, either from memory or from the dB, if present
func (dMgr *Manager) GetParentDeviceID(ctx context.Context, deviceID string) string {
- if device, _ := dMgr.GetDevice(ctx, deviceID); device != nil {
+ if device, _ := dMgr.getDevice(ctx, deviceID); device != nil {
logger.Infow("GetParentDeviceId", log.Fields{"deviceId": device.Id, "parentId": device.ParentId})
return device.ParentId
}
return ""
}
-func (dMgr *Manager) SimulateAlarm(ctx context.Context, simulatereq *voltha.SimulateAlarmRequest, ch chan interface{}) {
- logger.Debugw("SimulateAlarm", log.Fields{"id": simulatereq.Id, "Indicator": simulatereq.Indicator, "IntfId": simulatereq.IntfId,
- "PortTypeName": simulatereq.PortTypeName, "OnuDeviceId": simulatereq.OnuDeviceId, "InverseBitErrorRate": simulatereq.InverseBitErrorRate,
- "Drift": simulatereq.Drift, "NewEqd": simulatereq.NewEqd, "OnuSerialNumber": simulatereq.OnuSerialNumber, "Operation": simulatereq.Operation})
- var res interface{}
- if agent := dMgr.getDeviceAgent(ctx, simulatereq.Id); agent != nil {
- res = agent.simulateAlarm(ctx, simulatereq)
- logger.Debugw("SimulateAlarm-result", log.Fields{"result": res})
+func (dMgr *Manager) SimulateAlarm(ctx context.Context, simulateReq *voltha.SimulateAlarmRequest) (*common.OperationResp, error) {
+ logger.Debugw("SimulateAlarm", log.Fields{"id": simulateReq.Id, "Indicator": simulateReq.Indicator, "IntfId": simulateReq.IntfId,
+ "PortTypeName": simulateReq.PortTypeName, "OnuDeviceId": simulateReq.OnuDeviceId, "InverseBitErrorRate": simulateReq.InverseBitErrorRate,
+ "Drift": simulateReq.Drift, "NewEqd": simulateReq.NewEqd, "OnuSerialNumber": simulateReq.OnuSerialNumber, "Operation": simulateReq.Operation})
+ agent := dMgr.getDeviceAgent(ctx, simulateReq.Id)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", simulateReq.Id)
}
- //TODO CLI always get successful response
- sendResponse(ctx, ch, res)
+ if err := agent.simulateAlarm(ctx, simulateReq); err != nil {
+ return nil, err
+ }
+ return &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}, nil
}
func (dMgr *Manager) UpdateDeviceReason(ctx context.Context, deviceID string, reason string) error {
@@ -1512,30 +1561,22 @@
return status.Errorf(codes.NotFound, "%s", deviceID)
}
-func (dMgr *Manager) EnablePort(ctx context.Context, port *voltha.Port, ch chan interface{}) {
+func (dMgr *Manager) EnablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
logger.Debugw("EnablePort", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
- var res interface{}
- if agent := dMgr.getDeviceAgent(ctx, port.DeviceId); agent != nil {
- res = agent.enablePort(ctx, port)
- logger.Debugw("EnablePort-result", log.Fields{"result": res})
- } else {
- res = status.Errorf(codes.NotFound, "%s", port.DeviceId)
+ agent := dMgr.getDeviceAgent(ctx, port.DeviceId)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", port.DeviceId)
}
-
- sendResponse(ctx, ch, res)
+ return &empty.Empty{}, agent.enablePort(ctx, port)
}
-func (dMgr *Manager) DisablePort(ctx context.Context, port *voltha.Port, ch chan interface{}) {
+func (dMgr *Manager) DisablePort(ctx context.Context, port *voltha.Port) (*empty.Empty, error) {
logger.Debugw("DisablePort", log.Fields{"device-id": port.DeviceId, "port-no": port.PortNo})
- var res interface{}
- if agent := dMgr.getDeviceAgent(ctx, port.DeviceId); agent != nil {
- res = agent.disablePort(ctx, port)
- logger.Debugw("DisablePort-result", log.Fields{"result": res})
- } else {
- res = status.Errorf(codes.NotFound, "%s", port.DeviceId)
+ agent := dMgr.getDeviceAgent(ctx, port.DeviceId)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", port.DeviceId)
}
-
- sendResponse(ctx, ch, res)
+ return &empty.Empty{}, agent.disablePort(ctx, port)
}
// ChildDeviceLost calls parent adapter to delete child device and all its references
@@ -1551,26 +1592,22 @@
return nil
}
-func (dMgr *Manager) StartOmciTest(ctx context.Context, omcitestrequest *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
- logger.Debugw("Omci_test_Request", log.Fields{"device-id": omcitestrequest.Id, "uuid": omcitestrequest.Uuid})
- if agent := dMgr.getDeviceAgent(ctx, omcitestrequest.Id); agent != nil {
- res, err := agent.startOmciTest(ctx, omcitestrequest)
- if err != nil {
- return nil, err
- }
- logger.Debugw("Omci_test_Response_result-device-magnager", log.Fields{"result": res})
- return res, nil
+func (dMgr *Manager) StartOmciTestAction(ctx context.Context, request *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
+ logger.Debugw("StartOmciTestAction", log.Fields{"device-id": request.Id, "uuid": request.Uuid})
+ agent := dMgr.getDeviceAgent(ctx, request.Id)
+ if agent == nil {
+ return nil, status.Errorf(codes.NotFound, "%s", request.Id)
}
- return nil, status.Errorf(codes.NotFound, "%s", omcitestrequest.Id)
+ return agent.startOmciTest(ctx, request)
}
func (dMgr *Manager) GetExtValue(ctx context.Context, value *voltha.ValueSpecifier) (*voltha.ReturnValues, error) {
log.Debugw("getExtValue", log.Fields{"onu-id": value.Id})
- cDevice, err := dMgr.GetDevice(ctx, value.Id)
+ cDevice, err := dMgr.getDevice(ctx, value.Id)
if err != nil {
return nil, status.Errorf(codes.Aborted, "%s", err.Error())
}
- pDevice, err := dMgr.GetDevice(ctx, cDevice.ParentId)
+ pDevice, err := dMgr.getDevice(ctx, cDevice.ParentId)
if err != nil {
return nil, status.Errorf(codes.Aborted, "%s", err.Error())
}