Broke down logical_agent.go into multiple files.
Functions have only been moved, not modified; so there are no changes to existing logic.
This is to facilitate further development without having multiple devs modifying the same file.
Functions which do not belong in any file, or which would belong in many files, have been left in the voltha_agent file.
Flow/meter/group map management has also separated out. (*_loader.go files)
Change-Id: Iaee6699a852e03553e8363be3f16f461c2aab8c5
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index f032a65..15eb677 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -19,9 +19,7 @@
import (
"context"
"encoding/hex"
- "errors"
"fmt"
- "strconv"
"sync"
"time"
@@ -67,27 +65,6 @@
groupLock sync.RWMutex
}
-//MeterChunk keeps a meter entry and its lock. The lock in the struct is used to syncronize the
-//modifications for the related meter.
-type MeterChunk struct {
- meter *ofp.OfpMeterEntry
- lock sync.Mutex
-}
-
-//FlowChunk keeps a flow and the lock for this flow. The lock in the struct is used to syncronize the
-//modifications for the related flow.
-type FlowChunk struct {
- flow *ofp.OfpFlowStats
- lock sync.Mutex
-}
-
-//GroupChunk keeps a group entry and its lock. The lock in the struct is used to syncronize the
-//modifications for the related group.
-type GroupChunk struct {
- group *ofp.OfpGroupEntry
- lock sync.Mutex
-}
-
func newLogicalDeviceAgent(id string, sn string, deviceID string, ldeviceMgr *LogicalManager,
deviceMgr *Manager, cdProxy *model.Proxy, timeout time.Duration) *LogicalAgent {
var agent LogicalAgent
@@ -238,333 +215,12 @@
return proto.Clone(agent.logicalDevice).(*voltha.LogicalDevice), nil
}
-// ListLogicalDeviceFlows returns logical device flows
-func (agent *LogicalAgent) ListLogicalDeviceFlows(ctx context.Context) (*ofp.Flows, error) {
- logger.Debug("ListLogicalDeviceFlows")
- var flowStats []*ofp.OfpFlowStats
- agent.flowLock.RLock()
- defer agent.flowLock.RUnlock()
- for _, flowChunk := range agent.flows {
- flowStats = append(flowStats, (proto.Clone(flowChunk.flow)).(*ofp.OfpFlowStats))
- }
- return &ofp.Flows{Items: flowStats}, nil
-}
-
-// ListLogicalDeviceMeters returns logical device meters
-func (agent *LogicalAgent) ListLogicalDeviceMeters(ctx context.Context) (*ofp.Meters, error) {
- logger.Debug("ListLogicalDeviceMeters")
-
- var meterEntries []*ofp.OfpMeterEntry
- agent.meterLock.RLock()
- defer agent.meterLock.RUnlock()
- for _, meterChunk := range agent.meters {
- meterEntries = append(meterEntries, (proto.Clone(meterChunk.meter)).(*ofp.OfpMeterEntry))
- }
- return &ofp.Meters{Items: meterEntries}, nil
-}
-
-// ListLogicalDeviceFlowGroups returns logical device flow groups
-func (agent *LogicalAgent) ListLogicalDeviceFlowGroups(ctx context.Context) (*ofp.FlowGroups, error) {
- logger.Debug("ListLogicalDeviceFlowGroups")
-
- var groupEntries []*ofp.OfpGroupEntry
- agent.groupLock.RLock()
- defer agent.groupLock.RUnlock()
- for _, value := range agent.groups {
- groupEntries = append(groupEntries, (proto.Clone(value.group)).(*ofp.OfpGroupEntry))
- }
- return &ofp.FlowGroups{Items: groupEntries}, nil
-}
-
-// ListLogicalDevicePorts returns logical device ports
-func (agent *LogicalAgent) ListLogicalDevicePorts(ctx context.Context) (*voltha.LogicalPorts, error) {
- logger.Debug("ListLogicalDevicePorts")
- logicalDevice, err := agent.GetLogicalDevice(ctx)
- if err != nil {
- return nil, err
- }
- if logicalDevice == nil {
- return &voltha.LogicalPorts{}, nil
- }
- lPorts := make([]*voltha.LogicalPort, 0)
- lPorts = append(lPorts, logicalDevice.Ports...)
- return &voltha.LogicalPorts{Items: lPorts}, nil
-}
-
-//updateLogicalDeviceFlow updates flow in the store and cache
-//It is assumed that the chunk lock has been acquired before this function is called
-func (agent *LogicalAgent) updateLogicalDeviceFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowChunk *FlowChunk) error {
- path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flow.Id)
- if err := agent.clusterDataProxy.Update(ctx, path, flow); err != nil {
- return status.Errorf(codes.Internal, "failed-update-flow:%s:%d %s", agent.logicalDeviceID, flow.Id, err)
- }
- flowChunk.flow = flow
- return nil
-}
-
-//removeLogicalDeviceFlow deletes the flow from store and cache.
-//It is assumed that the chunk lock has been acquired before this function is called
-func (agent *LogicalAgent) removeLogicalDeviceFlow(ctx context.Context, flowID uint64) error {
- path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
- if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
- return fmt.Errorf("couldnt-delete-flow-from-the-store-%s", path)
- }
- agent.flowLock.Lock()
- defer agent.flowLock.Unlock()
- delete(agent.flows, flowID)
- return nil
-}
-
-//updateLogicalDeviceMeter updates meter info in store and cache
-//It is assumed that the chunk lock has been acquired before this function is called
-func (agent *LogicalAgent) updateLogicalDeviceMeter(ctx context.Context, meter *ofp.OfpMeterEntry, meterChunk *MeterChunk) error {
- path := fmt.Sprintf("meters/%s/%d", agent.logicalDeviceID, meter.Config.MeterId)
- if err := agent.clusterDataProxy.Update(ctx, path, meter); err != nil {
- logger.Errorw("error-updating-logical-device-with-meters", log.Fields{"error": err})
- return err
- }
- meterChunk.meter = meter
- return nil
-}
-
-//removeLogicalDeviceMeter deletes the meter from store and cache
-//It is assumed that the chunk lock has been acquired before this function is called
-func (agent *LogicalAgent) removeLogicalDeviceMeter(ctx context.Context, meterID uint32) error {
- path := fmt.Sprintf("meters/%s/%d", agent.logicalDeviceID, meterID)
- if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
- return fmt.Errorf("couldnt-delete-meter-from-store-%s", path)
- }
- agent.meterLock.Lock()
- defer agent.meterLock.Unlock()
- delete(agent.meters, meterID)
- return nil
-}
-
-//updateLogicalDeviceFlowGroup updates the flow groups in store and cache
-//It is assumed that the chunk lock has been acquired before this function is called
-func (agent *LogicalAgent) updateLogicalDeviceFlowGroup(ctx context.Context, groupEntry *ofp.OfpGroupEntry, groupChunk *GroupChunk) error {
- path := fmt.Sprintf("groups/%s/%d", agent.logicalDeviceID, groupEntry.Desc.GroupId)
- if err := agent.clusterDataProxy.Update(ctx, path, groupEntry); err != nil {
- logger.Errorw("error-updating-logical-device-with-group", log.Fields{"error": err})
- return err
- }
- groupChunk.group = groupEntry
- return nil
-}
-
-//removeLogicalDeviceFlowGroup removes the flow groups in store and cache
-//It is assumed that the chunk lock has been acquired before this function is called
-func (agent *LogicalAgent) removeLogicalDeviceFlowGroup(ctx context.Context, groupID uint32) error {
- path := fmt.Sprintf("groups/%s/%d", agent.logicalDeviceID, groupID)
- if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
- return fmt.Errorf("couldnt-delete-group-from-store-%s", path)
- }
- agent.groupLock.Lock()
- defer agent.groupLock.Unlock()
- delete(agent.groups, groupID)
- return nil
-}
-
// getLogicalDeviceWithoutLock returns a cloned logical device to a function that already holds the agent lock.
func (agent *LogicalAgent) getLogicalDeviceWithoutLock() *voltha.LogicalDevice {
logger.Debug("getLogicalDeviceWithoutLock")
return proto.Clone(agent.logicalDevice).(*voltha.LogicalDevice)
}
-func (agent *LogicalAgent) updateLogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
- logger.Debugw("updateLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
- var err error
- if port.Type == voltha.Port_ETHERNET_NNI {
- if _, err = agent.addNNILogicalPort(ctx, device, port); err != nil {
- return err
- }
- agent.addLogicalPortToMap(port.PortNo, true)
- } else if port.Type == voltha.Port_ETHERNET_UNI {
- if _, err = agent.addUNILogicalPort(ctx, device, port); err != nil {
- return err
- }
- agent.addLogicalPortToMap(port.PortNo, false)
- } else {
- // Update the device routes to ensure all routes on the logical device have been calculated
- if err = agent.buildRoutes(ctx); err != nil {
- // Not an error - temporary state
- logger.Warnw("failed-to-update-routes", log.Fields{"device-id": device.Id, "port": port, "error": err})
- }
- }
- return nil
-}
-
-// setupLogicalPorts is invoked once the logical device has been created and is ready to get ports
-// added to it. While the logical device was being created we could have received requests to add
-// NNI and UNI ports which were discarded. Now is the time to add them if needed
-func (agent *LogicalAgent) setupLogicalPorts(ctx context.Context) error {
- logger.Infow("setupLogicalPorts", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- // First add any NNI ports which could have been missing
- if err := agent.setupNNILogicalPorts(ctx, agent.rootDeviceID); err != nil {
- logger.Errorw("error-setting-up-NNI-ports", log.Fields{"error": err, "deviceId": agent.rootDeviceID})
- return err
- }
-
- // Now, set up the UNI ports if needed.
- children, err := agent.deviceMgr.GetAllChildDevices(ctx, agent.rootDeviceID)
- if err != nil {
- logger.Errorw("error-getting-child-devices", log.Fields{"error": err, "deviceId": agent.rootDeviceID})
- return err
- }
- responses := make([]coreutils.Response, 0)
- for _, child := range children.Items {
- response := coreutils.NewResponse()
- responses = append(responses, response)
- go func(child *voltha.Device) {
- if err = agent.setupUNILogicalPorts(context.Background(), child); err != nil {
- logger.Error("setting-up-UNI-ports-failed", log.Fields{"deviceID": child.Id})
- response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
- }
- response.Done()
- }(child)
- }
- // Wait for completion
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
- return status.Errorf(codes.Aborted, "errors-%s", res)
- }
- return nil
-}
-
-// setupNNILogicalPorts creates an NNI port on the logical device that represents an NNI interface on a root device
-func (agent *LogicalAgent) setupNNILogicalPorts(ctx context.Context, deviceID string) error {
- logger.Infow("setupNNILogicalPorts-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- // Build the logical device based on information retrieved from the device adapter
- var err error
-
- var device *voltha.Device
- if device, err = agent.deviceMgr.getDevice(ctx, deviceID); err != nil {
- logger.Errorw("error-retrieving-device", log.Fields{"error": err, "deviceId": deviceID})
- return err
- }
-
- //Get UNI port number
- for _, port := range device.Ports {
- if port.Type == voltha.Port_ETHERNET_NNI {
- if _, err = agent.addNNILogicalPort(ctx, device, port); err != nil {
- logger.Errorw("error-adding-UNI-port", log.Fields{"error": err})
- }
- agent.addLogicalPortToMap(port.PortNo, true)
- }
- }
- return err
-}
-
-// updatePortState updates the port state of the device
-func (agent *LogicalAgent) updatePortState(ctx context.Context, deviceID string, portNo uint32, operStatus voltha.OperStatus_Types) error {
- logger.Infow("updatePortState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "portNo": portNo, "state": operStatus})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- // Get the latest logical device info
- original := agent.getLogicalDeviceWithoutLock()
- updatedPorts := clonePorts(original.Ports)
- for _, port := range updatedPorts {
- if port.DeviceId == deviceID && port.DevicePortNo == portNo {
- if operStatus == voltha.OperStatus_ACTIVE {
- port.OfpPort.Config = port.OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
- } else {
- port.OfpPort.Config = port.OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
- }
- // Update the logical device
- if err := agent.updateLogicalDevicePortsWithoutLock(ctx, original, updatedPorts); err != nil {
- logger.Errorw("error-updating-logical-device", log.Fields{"error": err})
- return err
- }
- return nil
- }
- }
- return status.Errorf(codes.NotFound, "port-%d-not-exist", portNo)
-}
-
-// updatePortsState updates the ports state related to the device
-func (agent *LogicalAgent) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.OperStatus_Types) error {
- logger.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- // Get the latest logical device info
- original := agent.getLogicalDeviceWithoutLock()
- updatedPorts := clonePorts(original.Ports)
- for _, port := range updatedPorts {
- if port.DeviceId == device.Id {
- if state == voltha.OperStatus_ACTIVE {
- port.OfpPort.Config = port.OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
- } else {
- port.OfpPort.Config = port.OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
- }
- }
- }
- // Updating the logical device will trigger the poprt change events to be populated to the controller
- if err := agent.updateLogicalDevicePortsWithoutLock(ctx, original, updatedPorts); err != nil {
- logger.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
- return err
- }
- return nil
-}
-
-// setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
-func (agent *LogicalAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
- logger.Infow("setupUNILogicalPort", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- // Build the logical device based on information retrieved from the device adapter
- var err error
- var added bool
- //Get UNI port number
- for _, port := range childDevice.Ports {
- if port.Type == voltha.Port_ETHERNET_UNI {
- if added, err = agent.addUNILogicalPort(ctx, childDevice, port); err != nil {
- logger.Errorw("error-adding-UNI-port", log.Fields{"error": err})
- }
- if added {
- agent.addLogicalPortToMap(port.PortNo, false)
- }
- }
- }
- return err
-}
-
-// deleteAllLogicalPorts deletes all logical ports associated with this logical device
-func (agent *LogicalAgent) deleteAllLogicalPorts(ctx context.Context) error {
- logger.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- // Get the latest logical device info
- cloned := agent.getLogicalDeviceWithoutLock()
-
- if err := agent.updateLogicalDevicePortsWithoutLock(ctx, cloned, []*voltha.LogicalPort{}); err != nil {
- logger.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
- return err
- }
- return nil
-}
-
-func clonePorts(ports []*voltha.LogicalPort) []*voltha.LogicalPort {
- return proto.Clone(&voltha.LogicalPorts{Items: ports}).(*voltha.LogicalPorts).Items
-}
-
-//updateLogicalDevicePortsWithoutLock updates the
-func (agent *LogicalAgent) updateLogicalDevicePortsWithoutLock(ctx context.Context, device *voltha.LogicalDevice, newPorts []*voltha.LogicalPort) error {
- oldPorts := device.Ports
- device.Ports = newPorts
- if err := agent.updateLogicalDeviceWithoutLock(ctx, device); err != nil {
- return err
- }
- agent.portUpdated(oldPorts, newPorts)
- return nil
-}
-
//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
func (agent *LogicalAgent) updateLogicalDeviceWithoutLock(ctx context.Context, logicalDevice *voltha.LogicalDevice) error {
if agent.stopped {
@@ -582,547 +238,6 @@
return nil
}
-//generateDeviceRoutesIfNeeded generates the device routes if the logical device has been updated since the last time
-//that device graph was generated.
-func (agent *LogicalAgent) generateDeviceRoutesIfNeeded(ctx context.Context) error {
- agent.lockDeviceRoutes.Lock()
- defer agent.lockDeviceRoutes.Unlock()
-
- ld, err := agent.GetLogicalDevice(ctx)
- if err != nil {
- return err
- }
-
- if agent.deviceRoutes != nil && agent.deviceRoutes.IsUpToDate(ld) {
- return nil
- }
- logger.Debug("Generation of device route required")
- if err := agent.buildRoutes(ctx); err != nil {
- // No Route is not an error
- if !errors.Is(err, route.ErrNoRoute) {
- return err
- }
- }
- return nil
-}
-
-//updateFlowTable updates the flow table of that logical device
-func (agent *LogicalAgent) updateFlowTable(ctx context.Context, flow *ofp.OfpFlowMod) error {
- logger.Debug("UpdateFlowTable")
- if flow == nil {
- return nil
- }
-
- if err := agent.generateDeviceRoutesIfNeeded(ctx); err != nil {
- return err
- }
- switch flow.GetCommand() {
- case ofp.OfpFlowModCommand_OFPFC_ADD:
- return agent.flowAdd(ctx, flow)
- case ofp.OfpFlowModCommand_OFPFC_DELETE:
- return agent.flowDelete(ctx, flow)
- case ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT:
- return agent.flowDeleteStrict(ctx, flow)
- case ofp.OfpFlowModCommand_OFPFC_MODIFY:
- return agent.flowModify(flow)
- case ofp.OfpFlowModCommand_OFPFC_MODIFY_STRICT:
- return agent.flowModifyStrict(flow)
- }
- return status.Errorf(codes.Internal,
- "unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, flow.GetCommand())
-}
-
-//updateGroupTable updates the group table of that logical device
-func (agent *LogicalAgent) updateGroupTable(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
- logger.Debug("updateGroupTable")
- if groupMod == nil {
- return nil
- }
-
- if err := agent.generateDeviceRoutesIfNeeded(ctx); err != nil {
- return err
- }
-
- switch groupMod.GetCommand() {
- case ofp.OfpGroupModCommand_OFPGC_ADD:
- return agent.groupAdd(ctx, groupMod)
- case ofp.OfpGroupModCommand_OFPGC_DELETE:
- return agent.groupDelete(ctx, groupMod)
- case ofp.OfpGroupModCommand_OFPGC_MODIFY:
- return agent.groupModify(ctx, groupMod)
- }
- return status.Errorf(codes.Internal,
- "unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, groupMod.GetCommand())
-}
-
-// updateMeterTable updates the meter table of that logical device
-func (agent *LogicalAgent) updateMeterTable(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
- logger.Debug("updateMeterTable")
- if meterMod == nil {
- return nil
- }
- switch meterMod.GetCommand() {
- case ofp.OfpMeterModCommand_OFPMC_ADD:
- return agent.meterAdd(ctx, meterMod)
- case ofp.OfpMeterModCommand_OFPMC_DELETE:
- return agent.meterDelete(ctx, meterMod)
- case ofp.OfpMeterModCommand_OFPMC_MODIFY:
- return agent.meterModify(ctx, meterMod)
- }
- return status.Errorf(codes.Internal,
- "unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, meterMod.GetCommand())
-
-}
-
-func (agent *LogicalAgent) meterAdd(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
- logger.Debugw("meterAdd", log.Fields{"metermod": *meterMod})
- if meterMod == nil {
- return nil
- }
-
- meterEntry := fu.MeterEntryFromMeterMod(meterMod)
- agent.meterLock.Lock()
- //check if the meter already exists or not
- _, ok := agent.meters[meterMod.MeterId]
- if ok {
- logger.Infow("Meter-already-exists", log.Fields{"meter": *meterMod})
- agent.meterLock.Unlock()
- return nil
- }
-
- mChunk := MeterChunk{
- meter: meterEntry,
- }
- //Add to map and acquire the per meter lock
- agent.meters[meterMod.MeterId] = &mChunk
- mChunk.lock.Lock()
- defer mChunk.lock.Unlock()
- agent.meterLock.Unlock()
- meterID := strconv.Itoa(int(meterMod.MeterId))
- if err := agent.clusterDataProxy.AddWithID(ctx, "meters/"+agent.logicalDeviceID, meterID, meterEntry); err != nil {
- logger.Errorw("failed-adding-meter", log.Fields{"deviceID": agent.logicalDeviceID, "meterID": meterID, "err": err})
- //Revert the map
- agent.meterLock.Lock()
- delete(agent.meters, meterMod.MeterId)
- agent.meterLock.Unlock()
- return err
- }
-
- logger.Debugw("Meter-added-successfully", log.Fields{"Added-meter": meterEntry})
- return nil
-}
-
-func (agent *LogicalAgent) meterDelete(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
- logger.Debug("meterDelete", log.Fields{"meterMod": *meterMod})
- if meterMod == nil {
- return nil
- }
- agent.meterLock.RLock()
- meterChunk, ok := agent.meters[meterMod.MeterId]
- agent.meterLock.RUnlock()
- if ok {
- //Dont let anyone to do any changes to this meter until this is done.
- //And wait if someone else is already making modifications. Do this with per meter lock.
- meterChunk.lock.Lock()
- defer meterChunk.lock.Unlock()
- if err := agent.deleteFlowsOfMeter(ctx, meterMod.MeterId); err != nil {
- return err
- }
- //remove from the store and cache
- if err := agent.removeLogicalDeviceMeter(ctx, meterMod.MeterId); err != nil {
- return err
- }
- logger.Debugw("meterDelete-success", log.Fields{"meterID": meterMod.MeterId})
- } else {
- logger.Warnw("meter-not-found", log.Fields{"meterID": meterMod.MeterId})
- }
- return nil
-}
-
-func (agent *LogicalAgent) meterModify(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
- logger.Debug("meterModify")
- if meterMod == nil {
- return nil
- }
- newMeter := fu.MeterEntryFromMeterMod(meterMod)
- agent.meterLock.RLock()
- meterChunk, ok := agent.meters[newMeter.Config.MeterId]
- agent.meterLock.RUnlock()
- if !ok {
- return fmt.Errorf("no-meter-to-modify:%d", newMeter.Config.MeterId)
- }
- //Release the map lock and syncronize per meter
- meterChunk.lock.Lock()
- defer meterChunk.lock.Unlock()
- oldMeter := meterChunk.meter
- newMeter.Stats.FlowCount = oldMeter.Stats.FlowCount
-
- if err := agent.updateLogicalDeviceMeter(ctx, newMeter, meterChunk); err != nil {
- logger.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "meterID": newMeter.Config.MeterId})
- return err
- }
- logger.Debugw("replaced-with-new-meter", log.Fields{"oldMeter": oldMeter, "newMeter": newMeter})
- return nil
-
-}
-
-func (agent *LogicalAgent) deleteFlowsOfMeter(ctx context.Context, meterID uint32) error {
- logger.Infow("Delete-flows-matching-meter", log.Fields{"meter": meterID})
- agent.flowLock.Lock()
- defer agent.flowLock.Unlock()
- for flowID, flowChunk := range agent.flows {
- if mID := fu.GetMeterIdFromFlow(flowChunk.flow); mID != 0 && mID == meterID {
- logger.Debugw("Flow-to-be- deleted", log.Fields{"flow": flowChunk.flow})
- path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
- if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
- //TODO: Think on carrying on and deleting the remaining flows, instead of returning.
- //Anyways this returns an error to controller which possibly results with a re-deletion.
- //Then how can we handle the new deletion request(Same for group deletion)?
- return fmt.Errorf("couldnt-deleted-flow-from-store-%s", path)
- }
- delete(agent.flows, flowID)
- }
- }
- return nil
-}
-
-func (agent *LogicalAgent) updateFlowCountOfMeterStats(ctx context.Context, modCommand *ofp.OfpFlowMod, flow *ofp.OfpFlowStats, revertUpdate bool) bool {
-
- flowCommand := modCommand.GetCommand()
- meterID := fu.GetMeterIdFromFlow(flow)
- logger.Debugw("Meter-id-in-flow-mod", log.Fields{"meterId": meterID})
- if meterID == 0 {
- logger.Debugw("No-meter-present-in-the-flow", log.Fields{"flow": *flow})
- return true
- }
-
- if flowCommand != ofp.OfpFlowModCommand_OFPFC_ADD && flowCommand != ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT {
- return true
- }
- agent.meterLock.RLock()
- meterChunk, ok := agent.meters[meterID]
- agent.meterLock.RUnlock()
- if !ok {
- logger.Debugw("Meter-is-not-present-in-logical-device", log.Fields{"meterID": meterID})
- return true
- }
-
- //acquire the meter lock
- meterChunk.lock.Lock()
- defer meterChunk.lock.Unlock()
-
- if flowCommand == ofp.OfpFlowModCommand_OFPFC_ADD {
- if revertUpdate {
- meterChunk.meter.Stats.FlowCount--
- } else {
- meterChunk.meter.Stats.FlowCount++
- }
- } else if flowCommand == ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT {
- if revertUpdate {
- meterChunk.meter.Stats.FlowCount++
- } else {
- meterChunk.meter.Stats.FlowCount--
- }
- }
-
- // Update store and cache
- if err := agent.updateLogicalDeviceMeter(ctx, meterChunk.meter, meterChunk); err != nil {
- logger.Debugw("unable-to-update-meter-in-db", log.Fields{"logicalDevice": agent.logicalDeviceID, "meterID": meterID})
- return false
- }
-
- logger.Debugw("updated-meter-flow-stats", log.Fields{"meterId": meterID})
- return true
-}
-
-//flowAdd adds a flow to the flow table of that logical device
-func (agent *LogicalAgent) flowAdd(ctx context.Context, mod *ofp.OfpFlowMod) error {
- logger.Debugw("flowAdd", log.Fields{"flow": mod})
- if mod == nil {
- return nil
- }
- flow, err := fu.FlowStatsEntryFromFlowModMessage(mod)
- if err != nil {
- logger.Errorw("flowAdd-failed", log.Fields{"flowMod": mod, "err": err})
- return err
- }
- var updated bool
- var changed bool
- if changed, updated, err = agent.decomposeAndAdd(ctx, flow, mod); err != nil {
- logger.Errorw("flow-decompose-and-add-failed ", log.Fields{"flowMod": mod, "err": err})
- return err
- }
- if changed && !updated {
- if dbupdated := agent.updateFlowCountOfMeterStats(ctx, mod, flow, false); !dbupdated {
- return fmt.Errorf("couldnt-updated-flow-stats-%s", strconv.FormatUint(flow.Id, 10))
- }
- }
- return nil
-
-}
-
-func (agent *LogicalAgent) decomposeAndAdd(ctx context.Context, flow *ofp.OfpFlowStats, mod *ofp.OfpFlowMod) (bool, bool, error) {
- changed := false
- updated := false
- alreadyExist := true
- var flowToReplace *ofp.OfpFlowStats
-
- //if flow is not found in the map, create a new entry, otherwise get the existing one.
- agent.flowLock.Lock()
- flowChunk, ok := agent.flows[flow.Id]
- if !ok {
- flowChunk = &FlowChunk{
- flow: flow,
- }
- agent.flows[flow.Id] = flowChunk
- alreadyExist = false
- flowChunk.lock.Lock() //acquire chunk lock before releasing map lock
- defer flowChunk.lock.Unlock()
- agent.flowLock.Unlock()
- } else {
- agent.flowLock.Unlock() //release map lock before acquiring chunk lock
- flowChunk.lock.Lock()
- defer flowChunk.lock.Unlock()
- }
-
- if !alreadyExist {
- flowID := strconv.FormatUint(flow.Id, 10)
- if err := agent.clusterDataProxy.AddWithID(ctx, "logical_flows/"+agent.logicalDeviceID, flowID, flow); err != nil {
- logger.Errorw("failed-adding-flow-to-db", log.Fields{"deviceID": agent.logicalDeviceID, "flowID": flowID, "err": err})
- //Revert the map
- //TODO: Solve the condition:If we have two flow Adds of the same flow (at least same priority and match) in quick succession
- //then if the first one fails while the second one was waiting on the flowchunk, we will end up with an instance of flowChunk that is no longer in the map.
- agent.flowLock.Lock()
- delete(agent.flows, flow.Id)
- agent.flowLock.Unlock()
- return changed, updated, err
- }
- }
- flows := make([]*ofp.OfpFlowStats, 0)
- updatedFlows := make([]*ofp.OfpFlowStats, 0)
- checkOverlap := (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_CHECK_OVERLAP)) != 0
- if checkOverlap {
- if overlapped := fu.FindOverlappingFlows(flows, mod); len(overlapped) != 0 {
- // TODO: should this error be notified other than being logged?
- logger.Warnw("overlapped-flows", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
- } else {
- // Add flow
- changed = true
- }
- } else {
- if alreadyExist {
- flowToReplace = flowChunk.flow
- if (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_RESET_COUNTS)) != 0 {
- flow.ByteCount = flowToReplace.ByteCount
- flow.PacketCount = flowToReplace.PacketCount
- }
- if !proto.Equal(flowToReplace, flow) {
- changed = true
- updated = true
- }
- } else {
- changed = true
- }
- }
- logger.Debugw("flowAdd-changed", log.Fields{"changed": changed, "updated": updated})
- if changed {
- updatedFlows = append(updatedFlows, flow)
- var flowMetadata voltha.FlowMetadata
- lMeters, _ := agent.ListLogicalDeviceMeters(ctx)
- if err := agent.GetMeterConfig(updatedFlows, lMeters.Items, &flowMetadata); err != nil {
- logger.Error("Meter-referred-in-flow-not-present")
- return changed, updated, err
- }
- flowGroups, _ := agent.ListLogicalDeviceFlowGroups(ctx)
- deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: updatedFlows}, *flowGroups)
- if err != nil {
- return changed, updated, err
- }
-
- logger.Debugw("rules", log.Fields{"rules": deviceRules.String()})
- // Update store and cache
- if updated {
- if err := agent.updateLogicalDeviceFlow(ctx, flow, flowChunk); err != nil {
- return changed, updated, err
- }
- }
- respChannels := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &flowMetadata)
- // Create the go routines to wait
- go func() {
- // Wait for completion
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChannels...); res != nil {
- logger.Infow("failed-to-add-flows-will-attempt-deletion", log.Fields{"errors": res, "logical-device-id": agent.logicalDeviceID})
- // Revert added flows
- if err := agent.revertAddedFlows(context.Background(), mod, flow, flowToReplace, deviceRules, &flowMetadata); err != nil {
- logger.Errorw("failure-to-delete-flows-after-failed-addition", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
- }
- }
- }()
- }
- return changed, updated, nil
-}
-
-// revertAddedFlows reverts flows after the flowAdd request has failed. All flows corresponding to that flowAdd request
-// will be reverted, both from the logical devices and the devices.
-func (agent *LogicalAgent) revertAddedFlows(ctx context.Context, mod *ofp.OfpFlowMod, addedFlow *ofp.OfpFlowStats, replacedFlow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules, metadata *voltha.FlowMetadata) error {
- logger.Debugw("revertFlowAdd", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules, "metadata": metadata})
-
- agent.flowLock.RLock()
- flowChunk, ok := agent.flows[addedFlow.Id]
- agent.flowLock.RUnlock()
- if !ok {
- // Not found - do nothing
- log.Debugw("flow-not-found", log.Fields{"added-flow": addedFlow})
- return nil
- }
- //Leave the map lock and syncronize per flow
- flowChunk.lock.Lock()
- defer flowChunk.lock.Unlock()
-
- if replacedFlow != nil {
- if err := agent.updateLogicalDeviceFlow(ctx, replacedFlow, flowChunk); err != nil {
- return err
- }
- } else {
- if err := agent.removeLogicalDeviceFlow(ctx, addedFlow.Id); err != nil {
- return err
- }
- }
- // Revert meters
- if changedMeterStats := agent.updateFlowCountOfMeterStats(ctx, mod, addedFlow, true); !changedMeterStats {
- return fmt.Errorf("Unable-to-revert-meterstats-for-flow-%s", strconv.FormatUint(addedFlow.Id, 10))
- }
-
- // Update the devices
- respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, metadata)
-
- // Wait for the responses
- go func() {
- // Since this action is taken following an add failure, we may also receive a failure for the revert
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
- logger.Warnw("failure-reverting-added-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
- }
- }()
-
- return nil
-}
-
-// GetMeterConfig returns meter config
-func (agent *LogicalAgent) GetMeterConfig(flows []*ofp.OfpFlowStats, meters []*ofp.OfpMeterEntry, metadata *voltha.FlowMetadata) error {
- m := make(map[uint32]bool)
- for _, flow := range flows {
- if flowMeterID := fu.GetMeterIdFromFlow(flow); flowMeterID != 0 && !m[flowMeterID] {
- foundMeter := false
- // Meter is present in the flow , Get from logical device
- for _, meter := range meters {
- if flowMeterID == meter.Config.MeterId {
- metadata.Meters = append(metadata.Meters, meter.Config)
- logger.Debugw("Found meter in logical device",
- log.Fields{"meterID": flowMeterID, "meter-band": meter.Config})
- m[flowMeterID] = true
- foundMeter = true
- break
- }
- }
- if !foundMeter {
- logger.Errorw("Meter-referred-by-flow-is-not-found-in-logicaldevice",
- log.Fields{"meterID": flowMeterID, "Available-meters": meters, "flow": *flow})
- return fmt.Errorf("Meter-referred-by-flow-is-not-found-in-logicaldevice.MeterId-%d", flowMeterID)
- }
- }
- }
- logger.Debugw("meter-bands-for-flows", log.Fields{"flows": len(flows), "metadata": metadata})
- return nil
-
-}
-
-//flowDelete deletes a flow from the flow table of that logical device
-func (agent *LogicalAgent) flowDelete(ctx context.Context, mod *ofp.OfpFlowMod) error {
- logger.Debug("flowDelete")
- if mod == nil {
- return nil
- }
-
- fs, err := fu.FlowStatsEntryFromFlowModMessage(mod)
- if err != nil {
- return err
- }
-
- //build a list of what to delete
- toDelete := make([]*ofp.OfpFlowStats, 0)
- toDeleteChunks := make([]*FlowChunk, 0)
- //Lock the map to search the matched flows
- agent.flowLock.RLock()
- for _, f := range agent.flows {
- if fu.FlowMatch(f.flow, fs) {
- toDelete = append(toDelete, f.flow)
- toDeleteChunks = append(toDeleteChunks, f)
- continue
- }
- // Check wild card match
- if fu.FlowMatchesMod(f.flow, mod) {
- toDelete = append(toDelete, f.flow)
- toDeleteChunks = append(toDeleteChunks, f)
- }
- }
- agent.flowLock.RUnlock()
- //Delete the matched flows
- if len(toDelete) > 0 {
- logger.Debugw("flowDelete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "toDelete": len(toDelete)})
- var meters []*ofp.OfpMeterEntry
- var flowGroups []*ofp.OfpGroupEntry
- if ofpMeters, err := agent.ListLogicalDeviceMeters(ctx); err != nil {
- meters = ofpMeters.Items
- }
-
- if groups, err := agent.ListLogicalDeviceFlowGroups(ctx); err != nil {
- flowGroups = groups.Items
- }
-
- for _, fc := range toDeleteChunks {
- if err := agent.deleteFlowAndUpdateMeterStats(ctx, mod, fc); err != nil {
- return err
- }
- }
- var flowMetadata voltha.FlowMetadata
- if err := agent.GetMeterConfig(toDelete, meters, &flowMetadata); err != nil { // This should never happen
- logger.Error("Meter-referred-in-flows-not-present")
- return err
- }
- var respChnls []coreutils.Response
- var partialRoute bool
- var deviceRules *fu.DeviceRules
- deviceRules, err = agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: toDelete}, ofp.FlowGroups{Items: flowGroups})
- if err != nil {
- // A no route error means no route exists between the ports specified in the flow. This can happen when the
- // child device is deleted and a request to delete flows from the parent device is received
- if !errors.Is(err, route.ErrNoRoute) {
- logger.Errorw("unexpected-error-received", log.Fields{"flows-to-delete": toDelete, "error": err})
- return err
- }
- partialRoute = true
- }
-
- // Update the devices
- if partialRoute {
- respChnls = agent.deleteFlowsFromParentDevice(ctx, ofp.Flows{Items: toDelete}, &flowMetadata)
- } else {
- respChnls = agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
- }
-
- // Wait for the responses
- go func() {
- // Wait for completion
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
- logger.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
- // TODO: Revert the flow deletion
- }
- }()
- }
- //TODO: send announcement on delete
- return nil
-}
-
func (agent *LogicalAgent) deleteFlowAndUpdateMeterStats(ctx context.Context, mod *ofp.OfpFlowMod, chunk *FlowChunk) error {
chunk.lock.Lock()
defer chunk.lock.Unlock()
@@ -1197,22 +312,6 @@
return responses
}
-// getUNILogicalPortNo returns the UNI logical port number specified in the flow
-func (agent *LogicalAgent) getUNILogicalPortNo(flow *ofp.OfpFlowStats) (uint32, error) {
- var uniPort uint32
- inPortNo := fu.GetInPort(flow)
- outPortNo := fu.GetOutPort(flow)
- if agent.isNNIPort(inPortNo) {
- uniPort = outPortNo
- } else if agent.isNNIPort(outPortNo) {
- uniPort = inPortNo
- }
- if uniPort != 0 {
- return uniPort, nil
- }
- return 0, status.Errorf(codes.NotFound, "no-uni-port: %v", flow)
-}
-
func (agent *LogicalAgent) deleteFlowsFromParentDevice(ctx context.Context, flows ofp.Flows, metadata *voltha.FlowMetadata) []coreutils.Response {
logger.Debugw("deleting-flows-from-parent-device", log.Fields{"logical-device-id": agent.logicalDeviceID, "flows": flows})
responses := make([]coreutils.Response, 0)
@@ -1240,746 +339,6 @@
return responses
}
-//flowDeleteStrict deletes a flow from the flow table of that logical device
-func (agent *LogicalAgent) flowDeleteStrict(ctx context.Context, mod *ofp.OfpFlowMod) error {
- logger.Debugw("flowDeleteStrict", log.Fields{"mod": mod})
- if mod == nil {
- return nil
- }
-
- flow, err := fu.FlowStatsEntryFromFlowModMessage(mod)
- if err != nil {
- return err
- }
- logger.Debugw("flow-id-in-flow-delete-strict", log.Fields{"flowID": flow.Id})
- agent.flowLock.RLock()
- flowChunk, ok := agent.flows[flow.Id]
- agent.flowLock.RUnlock()
- if !ok {
- logger.Debugw("Skipping-flow-delete-strict-request. No-flow-found", log.Fields{"flowMod": mod})
- return nil
- }
- //Release the map lock and syncronize per flow
- flowChunk.lock.Lock()
- defer flowChunk.lock.Unlock()
-
- var meters []*ofp.OfpMeterEntry
- var flowGroups []*ofp.OfpGroupEntry
- if ofMeters, er := agent.ListLogicalDeviceMeters(ctx); er == nil {
- meters = ofMeters.Items
- }
- if ofGroups, er := agent.ListLogicalDeviceFlowGroups(ctx); er == nil {
- flowGroups = ofGroups.Items
- }
- if changedMeter := agent.updateFlowCountOfMeterStats(ctx, mod, flow, false); !changedMeter {
- return fmt.Errorf("Cannot delete flow - %s. Meter update failed", flow)
- }
-
- var flowMetadata voltha.FlowMetadata
- flowsToDelete := []*ofp.OfpFlowStats{flowChunk.flow}
- if err := agent.GetMeterConfig(flowsToDelete, meters, &flowMetadata); err != nil {
- logger.Error("meter-referred-in-flows-not-present")
- return err
- }
- var respChnls []coreutils.Response
- var partialRoute bool
- deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: flowsToDelete}, ofp.FlowGroups{Items: flowGroups})
- if err != nil {
- // A no route error means no route exists between the ports specified in the flow. This can happen when the
- // child device is deleted and a request to delete flows from the parent device is received
- if !errors.Is(err, route.ErrNoRoute) {
- logger.Errorw("unexpected-error-received", log.Fields{"flows-to-delete": flowsToDelete, "error": err})
- return err
- }
- partialRoute = true
- }
-
- // Update the model
- if err := agent.removeLogicalDeviceFlow(ctx, flow.Id); err != nil {
- return err
- }
- // Update the devices
- if partialRoute {
- respChnls = agent.deleteFlowsFromParentDevice(ctx, ofp.Flows{Items: flowsToDelete}, &flowMetadata)
- } else {
- respChnls = agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
- }
-
- // Wait for completion
- go func() {
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
- logger.Warnw("failure-deleting-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
- //TODO: Revert flow changes
- }
- }()
-
- return nil
-}
-
-//flowModify modifies a flow from the flow table of that logical device
-func (agent *LogicalAgent) flowModify(mod *ofp.OfpFlowMod) error {
- return errors.New("flowModify not implemented")
-}
-
-//flowModifyStrict deletes a flow from the flow table of that logical device
-func (agent *LogicalAgent) flowModifyStrict(mod *ofp.OfpFlowMod) error {
- return errors.New("flowModifyStrict not implemented")
-}
-
-func (agent *LogicalAgent) groupAdd(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
- if groupMod == nil {
- return nil
- }
- logger.Debugw("groupAdd", log.Fields{"GroupId": groupMod.GroupId})
- agent.groupLock.Lock()
- _, ok := agent.groups[groupMod.GroupId]
- if ok {
- agent.groupLock.Unlock()
- return fmt.Errorf("Group %d already exists", groupMod.GroupId)
- }
-
- groupEntry := fu.GroupEntryFromGroupMod(groupMod)
- groupChunk := GroupChunk{
- group: groupEntry,
- }
- //add to map
- agent.groups[groupMod.GroupId] = &groupChunk
- groupChunk.lock.Lock()
- defer groupChunk.lock.Unlock()
- agent.groupLock.Unlock()
- //add to the kv store
- path := fmt.Sprintf("groups/%s", agent.logicalDeviceID)
- groupID := strconv.Itoa(int(groupMod.GroupId))
- if err := agent.clusterDataProxy.AddWithID(ctx, path, groupID, groupEntry); err != nil {
- logger.Errorw("failed-adding-group", log.Fields{"deviceID": agent.logicalDeviceID, "groupID": groupID, "err": err})
- agent.groupLock.Lock()
- delete(agent.groups, groupMod.GroupId)
- agent.groupLock.Unlock()
- return err
- }
- deviceRules := fu.NewDeviceRules()
- deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
- fg := fu.NewFlowsAndGroups()
- fg.AddGroup(fu.GroupEntryFromGroupMod(groupMod))
- deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
-
- logger.Debugw("rules", log.Fields{"rules for group-add": deviceRules.String()})
-
- // Update the devices
- respChnls := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &voltha.FlowMetadata{})
-
- // Wait for completion
- go func() {
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
- logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
- //TODO: Revert flow changes
- }
- }()
- return nil
-}
-
-func (agent *LogicalAgent) groupDelete(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
- logger.Debug("groupDelete")
- if groupMod == nil {
- return nil
- }
- affectedFlows := make([]*ofp.OfpFlowStats, 0)
- affectedGroups := make([]*ofp.OfpGroupEntry, 0)
- var groupsChanged bool
- groupID := groupMod.GroupId
- var err error
- if groupID == uint32(ofp.OfpGroup_OFPG_ALL) {
- if err := func() error {
- agent.groupLock.Lock()
- defer agent.groupLock.Unlock()
- for key, groupChunk := range agent.groups {
- //Remove from store and cache. Do this in a one time lock allocation.
- path := fmt.Sprintf("groups/%s/%d", agent.logicalDeviceID, key)
- if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
- return fmt.Errorf("couldnt-deleted-group-from-store-%s", path)
- }
- delete(agent.groups, groupID)
- var flows []*ofp.OfpFlowStats
- if flows, err = agent.deleteFlowsOfGroup(ctx, key); err != nil {
- logger.Errorw("cannot-update-flow-for-group-delete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "groupID": key})
- return err
- }
- affectedFlows = append(affectedFlows, flows...)
- affectedGroups = append(affectedGroups, groupChunk.group)
- }
- return nil
- }(); err != nil {
- return err
- }
- groupsChanged = true
- } else {
- agent.groupLock.RLock()
- groupChunk, ok := agent.groups[groupID]
- agent.groupLock.RUnlock()
- if !ok {
- logger.Warnw("group-not-found", log.Fields{"groupID": groupID})
- return nil
- }
- groupChunk.lock.Lock()
- defer groupChunk.lock.Unlock()
- var flows []*ofp.OfpFlowStats
- if flows, err = agent.deleteFlowsOfGroup(ctx, groupID); err != nil {
- logger.Errorw("cannot-update-flow-for-group-delete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "groupID": groupID})
- return err
- }
- //remove from store
- if err := agent.removeLogicalDeviceFlowGroup(ctx, groupID); err != nil {
- return err
- }
- affectedFlows = append(affectedFlows, flows...)
- affectedGroups = append(affectedGroups, groupChunk.group)
- groupsChanged = true
-
- }
-
- if err != nil || groupsChanged {
- var deviceRules *fu.DeviceRules
- deviceRules, err = agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: affectedFlows}, ofp.FlowGroups{Items: affectedGroups})
- if err != nil {
- return err
- }
- logger.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-
- // Update the devices
- respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, nil)
-
- // Wait for completion
- go func() {
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
- logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
- //TODO: Revert flow changes
- }
- }()
- }
- return nil
-}
-
-func (agent *LogicalAgent) deleteFlowsOfGroup(ctx context.Context, groupID uint32) ([]*ofp.OfpFlowStats, error) {
- logger.Infow("Delete-flows-matching-group", log.Fields{"groupID": groupID})
- var flowsRemoved []*ofp.OfpFlowStats
- agent.flowLock.Lock()
- defer agent.flowLock.Unlock()
- for flowID, flowChunk := range agent.flows {
- if fu.FlowHasOutGroup(flowChunk.flow, groupID) {
- path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
- if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
- return nil, fmt.Errorf("couldnt-delete-flow-from-store-%s", path)
- }
- delete(agent.flows, flowID)
- flowsRemoved = append(flowsRemoved, flowChunk.flow)
- }
- }
- return flowsRemoved, nil
-}
-
-func (agent *LogicalAgent) groupModify(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
- logger.Debug("groupModify")
- if groupMod == nil {
- return nil
- }
-
- groupID := groupMod.GroupId
- agent.groupLock.RLock()
- groupChunk, ok := agent.groups[groupID]
- agent.groupLock.RUnlock()
- if !ok {
- return fmt.Errorf("group-absent:%d", groupID)
- }
- //Don't let any other thread to make modifications to this group till all done here.
- groupChunk.lock.Lock()
- defer groupChunk.lock.Unlock()
- //replace existing group entry with new group definition
- groupEntry := fu.GroupEntryFromGroupMod(groupMod)
- deviceRules := fu.NewDeviceRules()
- deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
- fg := fu.NewFlowsAndGroups()
- fg.AddGroup(fu.GroupEntryFromGroupMod(groupMod))
- deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
-
- logger.Debugw("rules", log.Fields{"rules-for-group-modify": deviceRules.String()})
- //update KV
- if err := agent.updateLogicalDeviceFlowGroup(ctx, groupEntry, groupChunk); err != nil {
- logger.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- return err
- }
-
- // Update the devices
- respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, &voltha.FlowMetadata{})
-
- // Wait for completion
- go func() {
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
- logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
- //TODO: Revert flow changes
- }
- }()
- return nil
-}
-
-// deleteLogicalPort removes the logical port
-func (agent *LogicalAgent) deleteLogicalPort(ctx context.Context, lPort *voltha.LogicalPort) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
-
- logicalDevice := agent.getLogicalDeviceWithoutLock()
-
- index := -1
- for i, logicalPort := range logicalDevice.Ports {
- if logicalPort.Id == lPort.Id {
- index = i
- break
- }
- }
- if index >= 0 {
- clonedPorts := clonePorts(logicalDevice.Ports)
- if index < len(clonedPorts)-1 {
- copy(clonedPorts[index:], clonedPorts[index+1:])
- }
- clonedPorts[len(clonedPorts)-1] = nil
- clonedPorts = clonedPorts[:len(clonedPorts)-1]
- logger.Debugw("logical-port-deleted", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- if err := agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts); err != nil {
- logger.Errorw("logical-device-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- return err
- }
-
- // Remove the logical port from cache
- agent.deleteLogicalPortsFromMap([]uint32{lPort.DevicePortNo})
- // Reset the logical device routes
- go func() {
- if err := agent.buildRoutes(context.Background()); err != nil {
- logger.Warnw("device-routes-not-ready", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
- }
- }()
- }
- return nil
-}
-
-// deleteLogicalPorts removes the logical ports associated with that deviceId
-func (agent *LogicalAgent) deleteLogicalPorts(ctx context.Context, deviceID string) error {
- logger.Debugw("deleting-logical-ports", log.Fields{"device-id": deviceID})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
-
- logicalDevice := agent.getLogicalDeviceWithoutLock()
- lPortstoKeep := []*voltha.LogicalPort{}
- lPortsNoToDelete := []uint32{}
- for _, logicalPort := range logicalDevice.Ports {
- if logicalPort.DeviceId != deviceID {
- lPortstoKeep = append(lPortstoKeep, logicalPort)
- } else {
- lPortsNoToDelete = append(lPortsNoToDelete, logicalPort.DevicePortNo)
- }
- }
- logger.Debugw("deleted-logical-ports", log.Fields{"ports": lPortstoKeep})
- if err := agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, lPortstoKeep); err != nil {
- logger.Errorw("logical-device-update-failed", log.Fields{"logical-device-id": agent.logicalDeviceID})
- return err
- }
- // Remove the port from the cached logical ports set
- agent.deleteLogicalPortsFromMap(lPortsNoToDelete)
-
- // Reset the logical device routes
- go func() {
- if err := agent.buildRoutes(context.Background()); err != nil {
- logger.Warnw("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
- }
- }()
-
- return nil
-}
-
-// enableLogicalPort enables the logical port
-func (agent *LogicalAgent) enableLogicalPort(ctx context.Context, lPortID string) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
-
- logicalDevice := agent.getLogicalDeviceWithoutLock()
-
- index := -1
- for i, logicalPort := range logicalDevice.Ports {
- if logicalPort.Id == lPortID {
- index = i
- break
- }
- }
- if index >= 0 {
- clonedPorts := clonePorts(logicalDevice.Ports)
- clonedPorts[index].OfpPort.Config = clonedPorts[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- return agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts)
- }
- return status.Errorf(codes.NotFound, "Port %s on Logical Device %s", lPortID, agent.logicalDeviceID)
-}
-
-// disableLogicalPort disabled the logical port
-func (agent *LogicalAgent) disableLogicalPort(ctx context.Context, lPortID string) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
-
- // Get the most up to date logical device
- logicalDevice := agent.getLogicalDeviceWithoutLock()
- index := -1
- for i, logicalPort := range logicalDevice.Ports {
- if logicalPort.Id == lPortID {
- index = i
- break
- }
- }
- if index >= 0 {
- clonedPorts := clonePorts(logicalDevice.Ports)
- clonedPorts[index].OfpPort.Config = (clonedPorts[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)) | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
- return agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts)
- }
- return status.Errorf(codes.NotFound, "Port %s on Logical Device %s", lPortID, agent.logicalDeviceID)
-}
-
-func (agent *LogicalAgent) getPreCalculatedRoute(ingress, egress uint32) ([]route.Hop, error) {
- logger.Debugw("ROUTE", log.Fields{"len": len(agent.deviceRoutes.Routes)})
- for routeLink, route := range agent.deviceRoutes.Routes {
- logger.Debugw("ROUTELINKS", log.Fields{"ingress": ingress, "egress": egress, "routelink": routeLink})
- if ingress == routeLink.Ingress && egress == routeLink.Egress {
- return route, nil
- }
- }
- return nil, status.Errorf(codes.FailedPrecondition, "no route from:%d to:%d", ingress, egress)
-}
-
-// GetRoute returns route
-func (agent *LogicalAgent) GetRoute(ctx context.Context, ingressPortNo uint32, egressPortNo uint32) ([]route.Hop, error) {
- logger.Debugw("getting-route", log.Fields{"ingress-port": ingressPortNo, "egress-port": egressPortNo})
- routes := make([]route.Hop, 0)
-
- // Note: A port value of 0 is equivalent to a nil port
-
- // Consider different possibilities
- if egressPortNo != 0 && ((egressPortNo & 0x7fffffff) == uint32(ofp.OfpPortNo_OFPP_CONTROLLER)) {
- logger.Debugw("controller-flow", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
- if agent.isNNIPort(ingressPortNo) {
- //This is a trap on the NNI Port
- if len(agent.deviceRoutes.Routes) == 0 {
- // If there are no routes set (usually when the logical device has only NNI port(s), then just return an
- // route with same IngressHop and EgressHop
- hop := route.Hop{DeviceID: agent.rootDeviceID, Ingress: ingressPortNo, Egress: ingressPortNo}
- routes = append(routes, hop)
- routes = append(routes, hop)
- return routes, nil
- }
- //Return a 'half' route to make the flow decomposer logic happy
- for routeLink, path := range agent.deviceRoutes.Routes {
- if agent.isNNIPort(routeLink.Egress) {
- routes = append(routes, route.Hop{}) // first hop is set to empty
- routes = append(routes, path[1])
- return routes, nil
- }
- }
- return nil, fmt.Errorf("no upstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
- }
- //treat it as if the output port is the first NNI of the OLT
- var err error
- if egressPortNo, err = agent.getFirstNNIPort(); err != nil {
- logger.Warnw("no-nni-port", log.Fields{"error": err})
- return nil, err
- }
- }
- //If ingress port is not specified (nil), it may be a wildcarded
- //route if egress port is OFPP_CONTROLLER or a nni logical port,
- //in which case we need to create a half-route where only the egress
- //hop is filled, the first hop is nil
- if ingressPortNo == 0 && agent.isNNIPort(egressPortNo) {
- // We can use the 2nd hop of any upstream route, so just find the first upstream:
- for routeLink, path := range agent.deviceRoutes.Routes {
- if agent.isNNIPort(routeLink.Egress) {
- routes = append(routes, route.Hop{}) // first hop is set to empty
- routes = append(routes, path[1])
- return routes, nil
- }
- }
- return nil, fmt.Errorf("no upstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
- }
- //If egress port is not specified (nil), we can also can return a "half" route
- if egressPortNo == 0 {
- for routeLink, path := range agent.deviceRoutes.Routes {
- if routeLink.Ingress == ingressPortNo {
- routes = append(routes, path[0])
- routes = append(routes, route.Hop{})
- return routes, nil
- }
- }
- return nil, fmt.Errorf("no downstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
- }
- // Return the pre-calculated route
- return agent.getPreCalculatedRoute(ingressPortNo, egressPortNo)
-}
-
-//GetWildcardInputPorts filters out the logical port number from the set of logical ports on the device and
-//returns their port numbers. This function is invoked only during flow decomposition where the lock on the logical
-//device is already held. Therefore it is safe to retrieve the logical device without lock.
-func (agent *LogicalAgent) GetWildcardInputPorts(excludePort ...uint32) []uint32 {
- lPorts := make([]uint32, 0)
- var exclPort uint32
- if len(excludePort) == 1 {
- exclPort = excludePort[0]
- }
- lDevice := agent.getLogicalDeviceWithoutLock()
- for _, port := range lDevice.Ports {
- if port.OfpPort.PortNo != exclPort {
- lPorts = append(lPorts, port.OfpPort.PortNo)
- }
- }
- return lPorts
-}
-
-// GetDeviceRoutes returns device graph
-func (agent *LogicalAgent) GetDeviceRoutes() *route.DeviceRoutes {
- return agent.deviceRoutes
-}
-
-//rebuildRoutes rebuilds the device routes
-func (agent *LogicalAgent) buildRoutes(ctx context.Context) error {
- logger.Debugf("building-routes", log.Fields{"logical-device-id": agent.logicalDeviceID})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
-
- if agent.deviceRoutes == nil {
- agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
- }
- // Get all the logical ports on that logical device
- lDevice := agent.getLogicalDeviceWithoutLock()
-
- if err := agent.deviceRoutes.ComputeRoutes(ctx, lDevice.Ports); err != nil {
- return err
- }
- if err := agent.deviceRoutes.Print(); err != nil {
- return err
- }
-
- return nil
-}
-
-//updateRoutes updates the device routes
-func (agent *LogicalAgent) updateRoutes(ctx context.Context, lp *voltha.LogicalPort) error {
- logger.Debugw("updateRoutes", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
-
- if agent.deviceRoutes == nil {
- agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
- }
- if err := agent.deviceRoutes.AddPort(ctx, lp, agent.logicalDevice.Ports); err != nil {
- return err
- }
- if err := agent.deviceRoutes.Print(); err != nil {
- return err
- }
- return nil
-}
-
-// diff go over two lists of logical ports and return what's new, what's changed and what's removed.
-func diff(oldList, newList []*voltha.LogicalPort) (newPorts, changedPorts, deletedPorts map[string]*voltha.LogicalPort) {
- newPorts = make(map[string]*voltha.LogicalPort, len(newList))
- changedPorts = make(map[string]*voltha.LogicalPort, len(oldList))
- deletedPorts = make(map[string]*voltha.LogicalPort, len(oldList))
-
- for _, n := range newList {
- newPorts[n.Id] = n
- }
-
- for _, o := range oldList {
- if n, have := newPorts[o.Id]; have {
- delete(newPorts, o.Id) // not new
- if !proto.Equal(n, o) {
- changedPorts[n.Id] = n // changed
- }
- } else {
- deletedPorts[o.Id] = o // deleted
- }
- }
-
- return newPorts, changedPorts, deletedPorts
-}
-
-// portUpdated is invoked when a port is updated on the logical device
-func (agent *LogicalAgent) portUpdated(prevPorts, currPorts []*voltha.LogicalPort) interface{} {
- // Get the difference between the two list
- newPorts, changedPorts, deletedPorts := diff(prevPorts, currPorts)
-
- // Send the port change events to the OF controller
- for _, newP := range newPorts {
- go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
- &ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_ADD, Desc: newP.OfpPort})
- }
- for _, change := range changedPorts {
- go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
- &ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_MODIFY, Desc: change.OfpPort})
- }
- for _, del := range deletedPorts {
- go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
- &ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_DELETE, Desc: del.OfpPort})
- }
-
- return nil
-}
-
-// addNNILogicalPort adds an NNI port to the logical device. It returns a bool representing whether a port has been
-// added and an eror in case a valid error is encountered. If the port was successfully added it will return
-// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
-// scenario. This also applies to the case where the port was already added.
-func (agent *LogicalAgent) addNNILogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) (bool, error) {
- logger.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
-
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return false, err
- }
- if agent.portExist(device, port) {
- logger.Debugw("port-already-exist", log.Fields{"port": port})
- agent.requestQueue.RequestComplete()
- return false, nil
- }
- agent.requestQueue.RequestComplete()
-
- var portCap *ic.PortCapability
- var err error
- // First get the port capability
- if portCap, err = agent.deviceMgr.getPortCapability(ctx, device.Id, port.PortNo); err != nil {
- logger.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
- return false, err
- }
-
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return false, err
- }
-
- defer agent.requestQueue.RequestComplete()
- // Double check again if this port has been already added since the getPortCapability could have taken a long time
- if agent.portExist(device, port) {
- logger.Debugw("port-already-exist", log.Fields{"port": port})
- return false, nil
- }
-
- portCap.Port.RootPort = true
- lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
- lp.DeviceId = device.Id
- lp.Id = fmt.Sprintf("nni-%d", port.PortNo)
- lp.OfpPort.PortNo = port.PortNo
- lp.OfpPort.Name = lp.Id
- lp.DevicePortNo = port.PortNo
-
- ld := agent.getLogicalDeviceWithoutLock()
-
- clonedPorts := clonePorts(ld.Ports)
- if clonedPorts == nil {
- clonedPorts = make([]*voltha.LogicalPort, 0)
- }
- clonedPorts = append(clonedPorts, lp)
-
- if err = agent.updateLogicalDevicePortsWithoutLock(ctx, ld, clonedPorts); err != nil {
- logger.Errorw("error-updating-logical-device", log.Fields{"error": err})
- return false, err
- }
-
- // Update the device routes with this new logical port
- clonedLP := (proto.Clone(lp)).(*voltha.LogicalPort)
- go func() {
- if err := agent.updateRoutes(context.Background(), clonedLP); err != nil {
- logger.Warnw("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": lp.OfpPort.PortNo, "error": err})
- }
- }()
-
- return true, nil
-}
-
-func (agent *LogicalAgent) portExist(device *voltha.Device, port *voltha.Port) bool {
- ldevice := agent.getLogicalDeviceWithoutLock()
- for _, lPort := range ldevice.Ports {
- if lPort.DeviceId == device.Id && lPort.DevicePortNo == port.PortNo && lPort.Id == port.Label {
- return true
- }
- }
- return false
-}
-
-// addUNILogicalPort adds an UNI port to the logical device. It returns a bool representing whether a port has been
-// added and an eror in case a valid error is encountered. If the port was successfully added it will return
-// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
-// scenario. This also applies to the case where the port was already added.
-func (agent *LogicalAgent) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device, port *voltha.Port) (bool, error) {
- logger.Debugw("addUNILogicalPort", log.Fields{"port": port})
- if childDevice.AdminState != voltha.AdminState_ENABLED || childDevice.OperStatus != voltha.OperStatus_ACTIVE {
- logger.Infow("device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
- return false, nil
- }
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return false, err
- }
-
- if agent.portExist(childDevice, port) {
- logger.Debugw("port-already-exist", log.Fields{"port": port})
- agent.requestQueue.RequestComplete()
- return false, nil
- }
- agent.requestQueue.RequestComplete()
- var portCap *ic.PortCapability
- var err error
- // First get the port capability
- if portCap, err = agent.deviceMgr.getPortCapability(ctx, childDevice.Id, port.PortNo); err != nil {
- logger.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
- return false, err
- }
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return false, err
- }
- defer agent.requestQueue.RequestComplete()
- // Double check again if this port has been already added since the getPortCapability could have taken a long time
- if agent.portExist(childDevice, port) {
- logger.Debugw("port-already-exist", log.Fields{"port": port})
- return false, nil
- }
- // Get stored logical device
- ldevice := agent.getLogicalDeviceWithoutLock()
-
- logger.Debugw("adding-uni", log.Fields{"deviceId": childDevice.Id})
- portCap.Port.RootPort = false
- portCap.Port.Id = port.Label
- portCap.Port.OfpPort.PortNo = port.PortNo
- portCap.Port.DeviceId = childDevice.Id
- portCap.Port.DevicePortNo = port.PortNo
- clonedPorts := clonePorts(ldevice.Ports)
- if clonedPorts == nil {
- clonedPorts = make([]*voltha.LogicalPort, 0)
- }
- clonedPorts = append(clonedPorts, portCap.Port)
- if err := agent.updateLogicalDevicePortsWithoutLock(ctx, ldevice, clonedPorts); err != nil {
- return false, err
- }
- // Update the device graph with this new logical port
- clonedLP := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
-
- go func() {
- if err := agent.updateRoutes(context.Background(), clonedLP); err != nil {
- logger.Warn("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
- }
- }()
-
- return true, nil
-}
-
func (agent *LogicalAgent) packetOut(ctx context.Context, packet *ofp.OfpPacketOut) {
logger.Debugw("packet-out", log.Fields{
"packet": hex.EncodeToString(packet.Data),
@@ -2003,120 +362,3 @@
agent.ldeviceMgr.SendPacketIn(agent.logicalDeviceID, transactionID, packetIn)
logger.Debugw("sending-packet-in", log.Fields{"packet": hex.EncodeToString(packetIn.Data)})
}
-
-func (agent *LogicalAgent) addLogicalPortToMap(portNo uint32, nniPort bool) {
- agent.lockLogicalPortsNo.Lock()
- defer agent.lockLogicalPortsNo.Unlock()
- if exist := agent.logicalPortsNo[portNo]; !exist {
- agent.logicalPortsNo[portNo] = nniPort
- }
-}
-
-func (agent *LogicalAgent) deleteLogicalPortsFromMap(portsNo []uint32) {
- agent.lockLogicalPortsNo.Lock()
- defer agent.lockLogicalPortsNo.Unlock()
- for _, pNo := range portsNo {
- delete(agent.logicalPortsNo, pNo)
- }
-}
-
-func (agent *LogicalAgent) addLogicalPortsToMap(lps []*voltha.LogicalPort) {
- agent.lockLogicalPortsNo.Lock()
- defer agent.lockLogicalPortsNo.Unlock()
- for _, lp := range lps {
- if exist := agent.logicalPortsNo[lp.DevicePortNo]; !exist {
- agent.logicalPortsNo[lp.DevicePortNo] = lp.RootPort
- }
- }
-}
-
-func (agent *LogicalAgent) loadFlows(ctx context.Context) {
- agent.flowLock.Lock()
- defer agent.flowLock.Unlock()
-
- var flowList []*ofp.OfpFlowStats
- if err := agent.clusterDataProxy.List(ctx, "logical_flows/"+agent.logicalDeviceID, &flowList); err != nil {
- logger.Errorw("Failed-to-list-logicalflows-from-cluster-data-proxy", log.Fields{"error": err})
- return
- }
- for _, flow := range flowList {
- if flow != nil {
- flowsChunk := FlowChunk{
- flow: flow,
- }
- agent.flows[flow.Id] = &flowsChunk
- }
- }
-}
-
-func (agent *LogicalAgent) loadMeters(ctx context.Context) {
- agent.meterLock.Lock()
- defer agent.meterLock.Unlock()
-
- var meters []*ofp.OfpMeterEntry
- if err := agent.clusterDataProxy.List(ctx, "meters/"+agent.logicalDeviceID, &meters); err != nil {
- logger.Errorw("Failed-to-list-meters-from-proxy", log.Fields{"error": err})
- return
- }
- for _, meter := range meters {
- if meter.Config != nil {
- meterChunk := MeterChunk{
- meter: meter,
- }
- agent.meters[meter.Config.MeterId] = &meterChunk
- }
- }
-}
-
-func (agent *LogicalAgent) loadGroups(ctx context.Context) {
- agent.groupLock.Lock()
- defer agent.groupLock.Unlock()
-
- var groups []*ofp.OfpGroupEntry
- if err := agent.clusterDataProxy.List(ctx, "groups/"+agent.logicalDeviceID, &groups); err != nil {
- logger.Errorw("Failed-to-list-groups-from-proxy", log.Fields{"error": err})
- return
- }
- for _, group := range groups {
- if group.Desc != nil {
- groupChunk := GroupChunk{
- group: group,
- }
- agent.groups[group.Desc.GroupId] = &groupChunk
- }
- }
- logger.Infow("Groups-are-loaded-into-the-cache-from-store", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
-}
-
-func (agent *LogicalAgent) isNNIPort(portNo uint32) bool {
- agent.lockLogicalPortsNo.RLock()
- defer agent.lockLogicalPortsNo.RUnlock()
- if exist := agent.logicalPortsNo[portNo]; exist {
- return agent.logicalPortsNo[portNo]
- }
- return false
-}
-
-func (agent *LogicalAgent) getFirstNNIPort() (uint32, error) {
- agent.lockLogicalPortsNo.RLock()
- defer agent.lockLogicalPortsNo.RUnlock()
- for portNo, nni := range agent.logicalPortsNo {
- if nni {
- return portNo, nil
- }
- }
- return 0, status.Error(codes.NotFound, "No NNI port found")
-}
-
-//GetNNIPorts returns NNI ports.
-func (agent *LogicalAgent) GetNNIPorts() []uint32 {
- agent.lockLogicalPortsNo.RLock()
- defer agent.lockLogicalPortsNo.RUnlock()
- nniPorts := make([]uint32, 0)
- for portNo, nni := range agent.logicalPortsNo {
- if nni {
- nniPorts = append(nniPorts, portNo)
- }
- }
- return nniPorts
-}
diff --git a/rw_core/core/device/logical_agent_flow.go b/rw_core/core/device/logical_agent_flow.go
new file mode 100644
index 0000000..5d35251
--- /dev/null
+++ b/rw_core/core/device/logical_agent_flow.go
@@ -0,0 +1,405 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "strconv"
+
+ "github.com/gogo/protobuf/proto"
+ "github.com/opencord/voltha-go/rw_core/route"
+ coreutils "github.com/opencord/voltha-go/rw_core/utils"
+ fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+//updateFlowTable updates the flow table of that logical device
+func (agent *LogicalAgent) updateFlowTable(ctx context.Context, flow *ofp.OfpFlowMod) error {
+ logger.Debug("UpdateFlowTable")
+ if flow == nil {
+ return nil
+ }
+
+ if err := agent.generateDeviceRoutesIfNeeded(ctx); err != nil {
+ return err
+ }
+ switch flow.GetCommand() {
+ case ofp.OfpFlowModCommand_OFPFC_ADD:
+ return agent.flowAdd(ctx, flow)
+ case ofp.OfpFlowModCommand_OFPFC_DELETE:
+ return agent.flowDelete(ctx, flow)
+ case ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT:
+ return agent.flowDeleteStrict(ctx, flow)
+ case ofp.OfpFlowModCommand_OFPFC_MODIFY:
+ return agent.flowModify(flow)
+ case ofp.OfpFlowModCommand_OFPFC_MODIFY_STRICT:
+ return agent.flowModifyStrict(flow)
+ }
+ return status.Errorf(codes.Internal,
+ "unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, flow.GetCommand())
+}
+
+//flowAdd adds a flow to the flow table of that logical device
+func (agent *LogicalAgent) flowAdd(ctx context.Context, mod *ofp.OfpFlowMod) error {
+ logger.Debugw("flowAdd", log.Fields{"flow": mod})
+ if mod == nil {
+ return nil
+ }
+ flow, err := fu.FlowStatsEntryFromFlowModMessage(mod)
+ if err != nil {
+ logger.Errorw("flowAdd-failed", log.Fields{"flowMod": mod, "err": err})
+ return err
+ }
+ var updated bool
+ var changed bool
+ if changed, updated, err = agent.decomposeAndAdd(ctx, flow, mod); err != nil {
+ logger.Errorw("flow-decompose-and-add-failed ", log.Fields{"flowMod": mod, "err": err})
+ return err
+ }
+ if changed && !updated {
+ if dbupdated := agent.updateFlowCountOfMeterStats(ctx, mod, flow, false); !dbupdated {
+ return fmt.Errorf("couldnt-updated-flow-stats-%s", strconv.FormatUint(flow.Id, 10))
+ }
+ }
+ return nil
+
+}
+
+func (agent *LogicalAgent) decomposeAndAdd(ctx context.Context, flow *ofp.OfpFlowStats, mod *ofp.OfpFlowMod) (bool, bool, error) {
+ changed := false
+ updated := false
+ alreadyExist := true
+ var flowToReplace *ofp.OfpFlowStats
+
+ //if flow is not found in the map, create a new entry, otherwise get the existing one.
+ agent.flowLock.Lock()
+ flowChunk, ok := agent.flows[flow.Id]
+ if !ok {
+ flowChunk = &FlowChunk{
+ flow: flow,
+ }
+ agent.flows[flow.Id] = flowChunk
+ alreadyExist = false
+ flowChunk.lock.Lock() //acquire chunk lock before releasing map lock
+ defer flowChunk.lock.Unlock()
+ agent.flowLock.Unlock()
+ } else {
+ agent.flowLock.Unlock() //release map lock before acquiring chunk lock
+ flowChunk.lock.Lock()
+ defer flowChunk.lock.Unlock()
+ }
+
+ if !alreadyExist {
+ flowID := strconv.FormatUint(flow.Id, 10)
+ if err := agent.clusterDataProxy.AddWithID(ctx, "logical_flows/"+agent.logicalDeviceID, flowID, flow); err != nil {
+ logger.Errorw("failed-adding-flow-to-db", log.Fields{"deviceID": agent.logicalDeviceID, "flowID": flowID, "err": err})
+ //Revert the map
+ //TODO: Solve the condition:If we have two flow Adds of the same flow (at least same priority and match) in quick succession
+ //then if the first one fails while the second one was waiting on the flowchunk, we will end up with an instance of flowChunk that is no longer in the map.
+ agent.flowLock.Lock()
+ delete(agent.flows, flow.Id)
+ agent.flowLock.Unlock()
+ return changed, updated, err
+ }
+ }
+ flows := make([]*ofp.OfpFlowStats, 0)
+ updatedFlows := make([]*ofp.OfpFlowStats, 0)
+ checkOverlap := (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_CHECK_OVERLAP)) != 0
+ if checkOverlap {
+ if overlapped := fu.FindOverlappingFlows(flows, mod); len(overlapped) != 0 {
+ // TODO: should this error be notified other than being logged?
+ logger.Warnw("overlapped-flows", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
+ } else {
+ // Add flow
+ changed = true
+ }
+ } else {
+ if alreadyExist {
+ flowToReplace = flowChunk.flow
+ if (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_RESET_COUNTS)) != 0 {
+ flow.ByteCount = flowToReplace.ByteCount
+ flow.PacketCount = flowToReplace.PacketCount
+ }
+ if !proto.Equal(flowToReplace, flow) {
+ changed = true
+ updated = true
+ }
+ } else {
+ changed = true
+ }
+ }
+ logger.Debugw("flowAdd-changed", log.Fields{"changed": changed, "updated": updated})
+ if changed {
+ updatedFlows = append(updatedFlows, flow)
+ var flowMetadata voltha.FlowMetadata
+ lMeters, _ := agent.ListLogicalDeviceMeters(ctx)
+ if err := agent.GetMeterConfig(updatedFlows, lMeters.Items, &flowMetadata); err != nil {
+ logger.Error("Meter-referred-in-flow-not-present")
+ return changed, updated, err
+ }
+ flowGroups, _ := agent.ListLogicalDeviceFlowGroups(ctx)
+ deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: updatedFlows}, *flowGroups)
+ if err != nil {
+ return changed, updated, err
+ }
+
+ logger.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+ // Update store and cache
+ if updated {
+ if err := agent.updateLogicalDeviceFlow(ctx, flow, flowChunk); err != nil {
+ return changed, updated, err
+ }
+ }
+ respChannels := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &flowMetadata)
+ // Create the go routines to wait
+ go func() {
+ // Wait for completion
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChannels...); res != nil {
+ logger.Infow("failed-to-add-flows-will-attempt-deletion", log.Fields{"errors": res, "logical-device-id": agent.logicalDeviceID})
+ // Revert added flows
+ if err := agent.revertAddedFlows(context.Background(), mod, flow, flowToReplace, deviceRules, &flowMetadata); err != nil {
+ logger.Errorw("failure-to-delete-flows-after-failed-addition", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
+ }
+ }
+ }()
+ }
+ return changed, updated, nil
+}
+
+// revertAddedFlows reverts flows after the flowAdd request has failed. All flows corresponding to that flowAdd request
+// will be reverted, both from the logical devices and the devices.
+func (agent *LogicalAgent) revertAddedFlows(ctx context.Context, mod *ofp.OfpFlowMod, addedFlow *ofp.OfpFlowStats, replacedFlow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules, metadata *voltha.FlowMetadata) error {
+ logger.Debugw("revertFlowAdd", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules, "metadata": metadata})
+
+ agent.flowLock.RLock()
+ flowChunk, ok := agent.flows[addedFlow.Id]
+ agent.flowLock.RUnlock()
+ if !ok {
+ // Not found - do nothing
+ log.Debugw("flow-not-found", log.Fields{"added-flow": addedFlow})
+ return nil
+ }
+ //Leave the map lock and syncronize per flow
+ flowChunk.lock.Lock()
+ defer flowChunk.lock.Unlock()
+
+ if replacedFlow != nil {
+ if err := agent.updateLogicalDeviceFlow(ctx, replacedFlow, flowChunk); err != nil {
+ return err
+ }
+ } else {
+ if err := agent.removeLogicalDeviceFlow(ctx, addedFlow.Id); err != nil {
+ return err
+ }
+ }
+ // Revert meters
+ if changedMeterStats := agent.updateFlowCountOfMeterStats(ctx, mod, addedFlow, true); !changedMeterStats {
+ return fmt.Errorf("Unable-to-revert-meterstats-for-flow-%s", strconv.FormatUint(addedFlow.Id, 10))
+ }
+
+ // Update the devices
+ respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, metadata)
+
+ // Wait for the responses
+ go func() {
+ // Since this action is taken following an add failure, we may also receive a failure for the revert
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+ logger.Warnw("failure-reverting-added-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+ }
+ }()
+
+ return nil
+}
+
+//flowDelete deletes a flow from the flow table of that logical device
+func (agent *LogicalAgent) flowDelete(ctx context.Context, mod *ofp.OfpFlowMod) error {
+ logger.Debug("flowDelete")
+ if mod == nil {
+ return nil
+ }
+
+ fs, err := fu.FlowStatsEntryFromFlowModMessage(mod)
+ if err != nil {
+ return err
+ }
+
+ //build a list of what to delete
+ toDelete := make([]*ofp.OfpFlowStats, 0)
+ toDeleteChunks := make([]*FlowChunk, 0)
+ //Lock the map to search the matched flows
+ agent.flowLock.RLock()
+ for _, f := range agent.flows {
+ if fu.FlowMatch(f.flow, fs) {
+ toDelete = append(toDelete, f.flow)
+ toDeleteChunks = append(toDeleteChunks, f)
+ continue
+ }
+ // Check wild card match
+ if fu.FlowMatchesMod(f.flow, mod) {
+ toDelete = append(toDelete, f.flow)
+ toDeleteChunks = append(toDeleteChunks, f)
+ }
+ }
+ agent.flowLock.RUnlock()
+ //Delete the matched flows
+ if len(toDelete) > 0 {
+ logger.Debugw("flowDelete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "toDelete": len(toDelete)})
+ var meters []*ofp.OfpMeterEntry
+ var flowGroups []*ofp.OfpGroupEntry
+ if ofpMeters, err := agent.ListLogicalDeviceMeters(ctx); err != nil {
+ meters = ofpMeters.Items
+ }
+
+ if groups, err := agent.ListLogicalDeviceFlowGroups(ctx); err != nil {
+ flowGroups = groups.Items
+ }
+
+ for _, fc := range toDeleteChunks {
+ if err := agent.deleteFlowAndUpdateMeterStats(ctx, mod, fc); err != nil {
+ return err
+ }
+ }
+ var flowMetadata voltha.FlowMetadata
+ if err := agent.GetMeterConfig(toDelete, meters, &flowMetadata); err != nil { // This should never happen
+ logger.Error("Meter-referred-in-flows-not-present")
+ return err
+ }
+ var respChnls []coreutils.Response
+ var partialRoute bool
+ var deviceRules *fu.DeviceRules
+ deviceRules, err = agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: toDelete}, ofp.FlowGroups{Items: flowGroups})
+ if err != nil {
+ // A no route error means no route exists between the ports specified in the flow. This can happen when the
+ // child device is deleted and a request to delete flows from the parent device is received
+ if !errors.Is(err, route.ErrNoRoute) {
+ logger.Errorw("unexpected-error-received", log.Fields{"flows-to-delete": toDelete, "error": err})
+ return err
+ }
+ partialRoute = true
+ }
+
+ // Update the devices
+ if partialRoute {
+ respChnls = agent.deleteFlowsFromParentDevice(ctx, ofp.Flows{Items: toDelete}, &flowMetadata)
+ } else {
+ respChnls = agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
+ }
+
+ // Wait for the responses
+ go func() {
+ // Wait for completion
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+ logger.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+ // TODO: Revert the flow deletion
+ }
+ }()
+ }
+ //TODO: send announcement on delete
+ return nil
+}
+
+//flowDeleteStrict deletes a flow from the flow table of that logical device
+func (agent *LogicalAgent) flowDeleteStrict(ctx context.Context, mod *ofp.OfpFlowMod) error {
+ logger.Debugw("flowDeleteStrict", log.Fields{"mod": mod})
+ if mod == nil {
+ return nil
+ }
+
+ flow, err := fu.FlowStatsEntryFromFlowModMessage(mod)
+ if err != nil {
+ return err
+ }
+ logger.Debugw("flow-id-in-flow-delete-strict", log.Fields{"flowID": flow.Id})
+ agent.flowLock.RLock()
+ flowChunk, ok := agent.flows[flow.Id]
+ agent.flowLock.RUnlock()
+ if !ok {
+ logger.Debugw("Skipping-flow-delete-strict-request. No-flow-found", log.Fields{"flowMod": mod})
+ return nil
+ }
+ //Release the map lock and syncronize per flow
+ flowChunk.lock.Lock()
+ defer flowChunk.lock.Unlock()
+
+ var meters []*ofp.OfpMeterEntry
+ var flowGroups []*ofp.OfpGroupEntry
+ if ofMeters, er := agent.ListLogicalDeviceMeters(ctx); er == nil {
+ meters = ofMeters.Items
+ }
+ if ofGroups, er := agent.ListLogicalDeviceFlowGroups(ctx); er == nil {
+ flowGroups = ofGroups.Items
+ }
+ if changedMeter := agent.updateFlowCountOfMeterStats(ctx, mod, flow, false); !changedMeter {
+ return fmt.Errorf("Cannot delete flow - %s. Meter update failed", flow)
+ }
+
+ var flowMetadata voltha.FlowMetadata
+ flowsToDelete := []*ofp.OfpFlowStats{flowChunk.flow}
+ if err := agent.GetMeterConfig(flowsToDelete, meters, &flowMetadata); err != nil {
+ logger.Error("meter-referred-in-flows-not-present")
+ return err
+ }
+ var respChnls []coreutils.Response
+ var partialRoute bool
+ deviceRules, err := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: flowsToDelete}, ofp.FlowGroups{Items: flowGroups})
+ if err != nil {
+ // A no route error means no route exists between the ports specified in the flow. This can happen when the
+ // child device is deleted and a request to delete flows from the parent device is received
+ if !errors.Is(err, route.ErrNoRoute) {
+ logger.Errorw("unexpected-error-received", log.Fields{"flows-to-delete": flowsToDelete, "error": err})
+ return err
+ }
+ partialRoute = true
+ }
+
+ // Update the model
+ if err := agent.removeLogicalDeviceFlow(ctx, flow.Id); err != nil {
+ return err
+ }
+ // Update the devices
+ if partialRoute {
+ respChnls = agent.deleteFlowsFromParentDevice(ctx, ofp.Flows{Items: flowsToDelete}, &flowMetadata)
+ } else {
+ respChnls = agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, &flowMetadata)
+ }
+
+ // Wait for completion
+ go func() {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+ logger.Warnw("failure-deleting-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+ //TODO: Revert flow changes
+ }
+ }()
+
+ return nil
+}
+
+//flowModify modifies a flow from the flow table of that logical device
+func (agent *LogicalAgent) flowModify(mod *ofp.OfpFlowMod) error {
+ return errors.New("flowModify not implemented")
+}
+
+//flowModifyStrict deletes a flow from the flow table of that logical device
+func (agent *LogicalAgent) flowModifyStrict(mod *ofp.OfpFlowMod) error {
+ return errors.New("flowModifyStrict not implemented")
+}
diff --git a/rw_core/core/device/logical_agent_flow_loader.go b/rw_core/core/device/logical_agent_flow_loader.go
new file mode 100644
index 0000000..84d1a47
--- /dev/null
+++ b/rw_core/core/device/logical_agent_flow_loader.go
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ "github.com/gogo/protobuf/proto"
+ fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+//FlowChunk keeps a flow and the lock for this flow. The lock in the struct is used to syncronize the
+//modifications for the related flow.
+type FlowChunk struct {
+ flow *ofp.OfpFlowStats
+ lock sync.Mutex
+}
+
+func (agent *LogicalAgent) loadFlows(ctx context.Context) {
+ agent.flowLock.Lock()
+ defer agent.flowLock.Unlock()
+
+ var flowList []*ofp.OfpFlowStats
+ if err := agent.clusterDataProxy.List(ctx, "logical_flows/"+agent.logicalDeviceID, &flowList); err != nil {
+ logger.Errorw("Failed-to-list-logicalflows-from-cluster-data-proxy", log.Fields{"error": err})
+ return
+ }
+ for _, flow := range flowList {
+ if flow != nil {
+ flowsChunk := FlowChunk{
+ flow: flow,
+ }
+ agent.flows[flow.Id] = &flowsChunk
+ }
+ }
+}
+
+//updateLogicalDeviceFlow updates flow in the store and cache
+//It is assumed that the chunk lock has been acquired before this function is called
+func (agent *LogicalAgent) updateLogicalDeviceFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowChunk *FlowChunk) error {
+ path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flow.Id)
+ if err := agent.clusterDataProxy.Update(ctx, path, flow); err != nil {
+ return status.Errorf(codes.Internal, "failed-update-flow:%s:%d %s", agent.logicalDeviceID, flow.Id, err)
+ }
+ flowChunk.flow = flow
+ return nil
+}
+
+//removeLogicalDeviceFlow deletes the flow from store and cache.
+//It is assumed that the chunk lock has been acquired before this function is called
+func (agent *LogicalAgent) removeLogicalDeviceFlow(ctx context.Context, flowID uint64) error {
+ path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
+ if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
+ return fmt.Errorf("couldnt-delete-flow-from-the-store-%s", path)
+ }
+ agent.flowLock.Lock()
+ defer agent.flowLock.Unlock()
+ delete(agent.flows, flowID)
+ return nil
+}
+
+// ListLogicalDeviceFlows returns logical device flows
+func (agent *LogicalAgent) ListLogicalDeviceFlows(ctx context.Context) (*ofp.Flows, error) {
+ logger.Debug("ListLogicalDeviceFlows")
+ var flowStats []*ofp.OfpFlowStats
+ agent.flowLock.RLock()
+ defer agent.flowLock.RUnlock()
+ for _, flowChunk := range agent.flows {
+ flowStats = append(flowStats, (proto.Clone(flowChunk.flow)).(*ofp.OfpFlowStats))
+ }
+ return &ofp.Flows{Items: flowStats}, nil
+}
+
+func (agent *LogicalAgent) deleteFlowsOfMeter(ctx context.Context, meterID uint32) error {
+ logger.Infow("Delete-flows-matching-meter", log.Fields{"meter": meterID})
+ agent.flowLock.Lock()
+ defer agent.flowLock.Unlock()
+ for flowID, flowChunk := range agent.flows {
+ if mID := fu.GetMeterIdFromFlow(flowChunk.flow); mID != 0 && mID == meterID {
+ logger.Debugw("Flow-to-be- deleted", log.Fields{"flow": flowChunk.flow})
+ path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
+ if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
+ //TODO: Think on carrying on and deleting the remaining flows, instead of returning.
+ //Anyways this returns an error to controller which possibly results with a re-deletion.
+ //Then how can we handle the new deletion request(Same for group deletion)?
+ return fmt.Errorf("couldnt-deleted-flow-from-store-%s", path)
+ }
+ delete(agent.flows, flowID)
+ }
+ }
+ return nil
+}
+
+func (agent *LogicalAgent) deleteFlowsOfGroup(ctx context.Context, groupID uint32) ([]*ofp.OfpFlowStats, error) {
+ logger.Infow("Delete-flows-matching-group", log.Fields{"groupID": groupID})
+ var flowsRemoved []*ofp.OfpFlowStats
+ agent.flowLock.Lock()
+ defer agent.flowLock.Unlock()
+ for flowID, flowChunk := range agent.flows {
+ if fu.FlowHasOutGroup(flowChunk.flow, groupID) {
+ path := fmt.Sprintf("logical_flows/%s/%d", agent.logicalDeviceID, flowID)
+ if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
+ return nil, fmt.Errorf("couldnt-delete-flow-from-store-%s", path)
+ }
+ delete(agent.flows, flowID)
+ flowsRemoved = append(flowsRemoved, flowChunk.flow)
+ }
+ }
+ return flowsRemoved, nil
+}
diff --git a/rw_core/core/device/logical_agent_group.go b/rw_core/core/device/logical_agent_group.go
new file mode 100644
index 0000000..a0d6c4a
--- /dev/null
+++ b/rw_core/core/device/logical_agent_group.go
@@ -0,0 +1,230 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+ "context"
+ "fmt"
+ "strconv"
+
+ coreutils "github.com/opencord/voltha-go/rw_core/utils"
+ fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+//updateGroupTable updates the group table of that logical device
+func (agent *LogicalAgent) updateGroupTable(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
+ logger.Debug("updateGroupTable")
+ if groupMod == nil {
+ return nil
+ }
+
+ if err := agent.generateDeviceRoutesIfNeeded(ctx); err != nil {
+ return err
+ }
+
+ switch groupMod.GetCommand() {
+ case ofp.OfpGroupModCommand_OFPGC_ADD:
+ return agent.groupAdd(ctx, groupMod)
+ case ofp.OfpGroupModCommand_OFPGC_DELETE:
+ return agent.groupDelete(ctx, groupMod)
+ case ofp.OfpGroupModCommand_OFPGC_MODIFY:
+ return agent.groupModify(ctx, groupMod)
+ }
+ return status.Errorf(codes.Internal, "unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, groupMod.GetCommand())
+}
+
+func (agent *LogicalAgent) groupAdd(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
+ if groupMod == nil {
+ return nil
+ }
+ logger.Debugw("groupAdd", log.Fields{"GroupId": groupMod.GroupId})
+ agent.groupLock.Lock()
+ _, ok := agent.groups[groupMod.GroupId]
+ if ok {
+ agent.groupLock.Unlock()
+ return fmt.Errorf("Group %d already exists", groupMod.GroupId)
+ }
+
+ groupEntry := fu.GroupEntryFromGroupMod(groupMod)
+ groupChunk := GroupChunk{
+ group: groupEntry,
+ }
+ //add to map
+ agent.groups[groupMod.GroupId] = &groupChunk
+ groupChunk.lock.Lock()
+ defer groupChunk.lock.Unlock()
+ agent.groupLock.Unlock()
+ //add to the kv store
+ path := fmt.Sprintf("groups/%s", agent.logicalDeviceID)
+ groupID := strconv.Itoa(int(groupMod.GroupId))
+ if err := agent.clusterDataProxy.AddWithID(ctx, path, groupID, groupEntry); err != nil {
+ logger.Errorw("failed-adding-group", log.Fields{"deviceID": agent.logicalDeviceID, "groupID": groupID, "err": err})
+ agent.groupLock.Lock()
+ delete(agent.groups, groupMod.GroupId)
+ agent.groupLock.Unlock()
+ return err
+ }
+ deviceRules := fu.NewDeviceRules()
+ deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
+ fg := fu.NewFlowsAndGroups()
+ fg.AddGroup(fu.GroupEntryFromGroupMod(groupMod))
+ deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
+
+ logger.Debugw("rules", log.Fields{"rules for group-add": deviceRules.String()})
+
+ // Update the devices
+ respChnls := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &voltha.FlowMetadata{})
+
+ // Wait for completion
+ go func() {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+ logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+ //TODO: Revert flow changes
+ }
+ }()
+ return nil
+}
+
+func (agent *LogicalAgent) groupDelete(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
+ logger.Debug("groupDelete")
+ if groupMod == nil {
+ return nil
+ }
+ affectedFlows := make([]*ofp.OfpFlowStats, 0)
+ affectedGroups := make([]*ofp.OfpGroupEntry, 0)
+ var groupsChanged bool
+ groupID := groupMod.GroupId
+ var err error
+ if groupID == uint32(ofp.OfpGroup_OFPG_ALL) {
+ if err := func() error {
+ agent.groupLock.Lock()
+ defer agent.groupLock.Unlock()
+ for key, groupChunk := range agent.groups {
+ //Remove from store and cache. Do this in a one time lock allocation.
+ path := fmt.Sprintf("groups/%s/%d", agent.logicalDeviceID, key)
+ if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
+ return fmt.Errorf("couldnt-deleted-group-from-store-%s", path)
+ }
+ delete(agent.groups, groupID)
+ var flows []*ofp.OfpFlowStats
+ if flows, err = agent.deleteFlowsOfGroup(ctx, key); err != nil {
+ logger.Errorw("cannot-update-flow-for-group-delete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "groupID": key})
+ return err
+ }
+ affectedFlows = append(affectedFlows, flows...)
+ affectedGroups = append(affectedGroups, groupChunk.group)
+ }
+ return nil
+ }(); err != nil {
+ return err
+ }
+ groupsChanged = true
+ } else {
+ agent.groupLock.RLock()
+ groupChunk, ok := agent.groups[groupID]
+ agent.groupLock.RUnlock()
+ if !ok {
+ logger.Warnw("group-not-found", log.Fields{"groupID": groupID})
+ return nil
+ }
+ groupChunk.lock.Lock()
+ defer groupChunk.lock.Unlock()
+ var flows []*ofp.OfpFlowStats
+ if flows, err = agent.deleteFlowsOfGroup(ctx, groupID); err != nil {
+ logger.Errorw("cannot-update-flow-for-group-delete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "groupID": groupID})
+ return err
+ }
+ //remove from store
+ if err := agent.removeLogicalDeviceFlowGroup(ctx, groupID); err != nil {
+ return err
+ }
+ affectedFlows = append(affectedFlows, flows...)
+ affectedGroups = append(affectedGroups, groupChunk.group)
+ groupsChanged = true
+
+ }
+
+ if err != nil || groupsChanged {
+ var deviceRules *fu.DeviceRules
+ deviceRules, err = agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: affectedFlows}, ofp.FlowGroups{Items: affectedGroups})
+ if err != nil {
+ return err
+ }
+ logger.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+ // Update the devices
+ respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, nil)
+
+ // Wait for completion
+ go func() {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+ logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+ //TODO: Revert flow changes
+ }
+ }()
+ }
+ return nil
+}
+
+func (agent *LogicalAgent) groupModify(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
+ logger.Debug("groupModify")
+ if groupMod == nil {
+ return nil
+ }
+
+ groupID := groupMod.GroupId
+ agent.groupLock.RLock()
+ groupChunk, ok := agent.groups[groupID]
+ agent.groupLock.RUnlock()
+ if !ok {
+ return fmt.Errorf("group-absent:%d", groupID)
+ }
+ //Don't let any other thread to make modifications to this group till all done here.
+ groupChunk.lock.Lock()
+ defer groupChunk.lock.Unlock()
+ //replace existing group entry with new group definition
+ groupEntry := fu.GroupEntryFromGroupMod(groupMod)
+ deviceRules := fu.NewDeviceRules()
+ deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
+ fg := fu.NewFlowsAndGroups()
+ fg.AddGroup(fu.GroupEntryFromGroupMod(groupMod))
+ deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
+
+ logger.Debugw("rules", log.Fields{"rules-for-group-modify": deviceRules.String()})
+ //update KV
+ if err := agent.updateLogicalDeviceFlowGroup(ctx, groupEntry, groupChunk); err != nil {
+ logger.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ return err
+ }
+
+ // Update the devices
+ respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, &voltha.FlowMetadata{})
+
+ // Wait for completion
+ go func() {
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+ logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+ //TODO: Revert flow changes
+ }
+ }()
+ return nil
+}
diff --git a/rw_core/core/device/logical_agent_group_loader.go b/rw_core/core/device/logical_agent_group_loader.go
new file mode 100644
index 0000000..53e4076
--- /dev/null
+++ b/rw_core/core/device/logical_agent_group_loader.go
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ "github.com/gogo/protobuf/proto"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+)
+
+//GroupChunk keeps a group entry and its lock. The lock in the struct is used to syncronize the
+//modifications for the related group.
+type GroupChunk struct {
+ group *ofp.OfpGroupEntry
+ lock sync.Mutex
+}
+
+func (agent *LogicalAgent) loadGroups(ctx context.Context) {
+ agent.groupLock.Lock()
+ defer agent.groupLock.Unlock()
+
+ var groups []*ofp.OfpGroupEntry
+ if err := agent.clusterDataProxy.List(ctx, "groups/"+agent.logicalDeviceID, &groups); err != nil {
+ logger.Errorw("Failed-to-list-groups-from-proxy", log.Fields{"error": err})
+ return
+ }
+ for _, group := range groups {
+ if group.Desc != nil {
+ groupChunk := GroupChunk{
+ group: group,
+ }
+ agent.groups[group.Desc.GroupId] = &groupChunk
+ }
+ }
+ logger.Infow("Groups-are-loaded-into-the-cache-from-store", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
+}
+
+//updateLogicalDeviceFlowGroup updates the flow groups in store and cache
+//It is assumed that the chunk lock has been acquired before this function is called
+func (agent *LogicalAgent) updateLogicalDeviceFlowGroup(ctx context.Context, groupEntry *ofp.OfpGroupEntry, groupChunk *GroupChunk) error {
+ path := fmt.Sprintf("groups/%s/%d", agent.logicalDeviceID, groupEntry.Desc.GroupId)
+ if err := agent.clusterDataProxy.Update(ctx, path, groupEntry); err != nil {
+ logger.Errorw("error-updating-logical-device-with-group", log.Fields{"error": err})
+ return err
+ }
+ groupChunk.group = groupEntry
+ return nil
+}
+
+//removeLogicalDeviceFlowGroup removes the flow groups in store and cache
+//It is assumed that the chunk lock has been acquired before this function is called
+func (agent *LogicalAgent) removeLogicalDeviceFlowGroup(ctx context.Context, groupID uint32) error {
+ path := fmt.Sprintf("groups/%s/%d", agent.logicalDeviceID, groupID)
+ if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
+ return fmt.Errorf("couldnt-delete-group-from-store-%s", path)
+ }
+ agent.groupLock.Lock()
+ defer agent.groupLock.Unlock()
+ delete(agent.groups, groupID)
+ return nil
+}
+
+// ListLogicalDeviceFlowGroups returns logical device flow groups
+func (agent *LogicalAgent) ListLogicalDeviceFlowGroups(ctx context.Context) (*ofp.FlowGroups, error) {
+ logger.Debug("ListLogicalDeviceFlowGroups")
+
+ var groupEntries []*ofp.OfpGroupEntry
+ agent.groupLock.RLock()
+ defer agent.groupLock.RUnlock()
+ for _, value := range agent.groups {
+ groupEntries = append(groupEntries, (proto.Clone(value.group)).(*ofp.OfpGroupEntry))
+ }
+ return &ofp.FlowGroups{Items: groupEntries}, nil
+}
diff --git a/rw_core/core/device/logical_agent_meter.go b/rw_core/core/device/logical_agent_meter.go
new file mode 100644
index 0000000..c211f1e
--- /dev/null
+++ b/rw_core/core/device/logical_agent_meter.go
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+ "context"
+ "fmt"
+ "strconv"
+
+ fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// updateMeterTable updates the meter table of that logical device
+func (agent *LogicalAgent) updateMeterTable(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
+ logger.Debug("updateMeterTable")
+ if meterMod == nil {
+ return nil
+ }
+ switch meterMod.GetCommand() {
+ case ofp.OfpMeterModCommand_OFPMC_ADD:
+ return agent.meterAdd(ctx, meterMod)
+ case ofp.OfpMeterModCommand_OFPMC_DELETE:
+ return agent.meterDelete(ctx, meterMod)
+ case ofp.OfpMeterModCommand_OFPMC_MODIFY:
+ return agent.meterModify(ctx, meterMod)
+ }
+ return status.Errorf(codes.Internal,
+ "unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, meterMod.GetCommand())
+}
+
+func (agent *LogicalAgent) meterAdd(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
+ logger.Debugw("meterAdd", log.Fields{"metermod": *meterMod})
+ if meterMod == nil {
+ return nil
+ }
+
+ meterEntry := fu.MeterEntryFromMeterMod(meterMod)
+ agent.meterLock.Lock()
+ //check if the meter already exists or not
+ _, ok := agent.meters[meterMod.MeterId]
+ if ok {
+ logger.Infow("Meter-already-exists", log.Fields{"meter": *meterMod})
+ agent.meterLock.Unlock()
+ return nil
+ }
+
+ mChunk := MeterChunk{
+ meter: meterEntry,
+ }
+ //Add to map and acquire the per meter lock
+ agent.meters[meterMod.MeterId] = &mChunk
+ mChunk.lock.Lock()
+ defer mChunk.lock.Unlock()
+ agent.meterLock.Unlock()
+ meterID := strconv.Itoa(int(meterMod.MeterId))
+ if err := agent.clusterDataProxy.AddWithID(ctx, "meters/"+agent.logicalDeviceID, meterID, meterEntry); err != nil {
+ logger.Errorw("failed-adding-meter", log.Fields{"deviceID": agent.logicalDeviceID, "meterID": meterID, "err": err})
+ //Revert the map
+ agent.meterLock.Lock()
+ delete(agent.meters, meterMod.MeterId)
+ agent.meterLock.Unlock()
+ return err
+ }
+
+ logger.Debugw("Meter-added-successfully", log.Fields{"Added-meter": meterEntry})
+ return nil
+}
+
+func (agent *LogicalAgent) meterDelete(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
+ logger.Debug("meterDelete", log.Fields{"meterMod": *meterMod})
+ if meterMod == nil {
+ return nil
+ }
+ agent.meterLock.RLock()
+ meterChunk, ok := agent.meters[meterMod.MeterId]
+ agent.meterLock.RUnlock()
+ if ok {
+ //Dont let anyone to do any changes to this meter until this is done.
+ //And wait if someone else is already making modifications. Do this with per meter lock.
+ meterChunk.lock.Lock()
+ defer meterChunk.lock.Unlock()
+ if err := agent.deleteFlowsOfMeter(ctx, meterMod.MeterId); err != nil {
+ return err
+ }
+ //remove from the store and cache
+ if err := agent.removeLogicalDeviceMeter(ctx, meterMod.MeterId); err != nil {
+ return err
+ }
+ logger.Debugw("meterDelete-success", log.Fields{"meterID": meterMod.MeterId})
+ } else {
+ logger.Warnw("meter-not-found", log.Fields{"meterID": meterMod.MeterId})
+ }
+ return nil
+}
+
+func (agent *LogicalAgent) meterModify(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
+ logger.Debug("meterModify")
+ if meterMod == nil {
+ return nil
+ }
+ newMeter := fu.MeterEntryFromMeterMod(meterMod)
+ agent.meterLock.RLock()
+ meterChunk, ok := agent.meters[newMeter.Config.MeterId]
+ agent.meterLock.RUnlock()
+ if !ok {
+ return fmt.Errorf("no-meter-to-modify:%d", newMeter.Config.MeterId)
+ }
+ //Release the map lock and syncronize per meter
+ meterChunk.lock.Lock()
+ defer meterChunk.lock.Unlock()
+ oldMeter := meterChunk.meter
+ newMeter.Stats.FlowCount = oldMeter.Stats.FlowCount
+
+ if err := agent.updateLogicalDeviceMeter(ctx, newMeter, meterChunk); err != nil {
+ logger.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "meterID": newMeter.Config.MeterId})
+ return err
+ }
+ logger.Debugw("replaced-with-new-meter", log.Fields{"oldMeter": oldMeter, "newMeter": newMeter})
+ return nil
+}
diff --git a/rw_core/core/device/logical_agent_meter_loader.go b/rw_core/core/device/logical_agent_meter_loader.go
new file mode 100644
index 0000000..4408bf1
--- /dev/null
+++ b/rw_core/core/device/logical_agent_meter_loader.go
@@ -0,0 +1,169 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ "github.com/gogo/protobuf/proto"
+ fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+)
+
+//MeterChunk keeps a meter entry and its lock. The lock in the struct is used to syncronize the
+//modifications for the related meter.
+type MeterChunk struct {
+ meter *ofp.OfpMeterEntry
+ lock sync.Mutex
+}
+
+func (agent *LogicalAgent) loadMeters(ctx context.Context) {
+ agent.meterLock.Lock()
+ defer agent.meterLock.Unlock()
+
+ var meters []*ofp.OfpMeterEntry
+ if err := agent.clusterDataProxy.List(ctx, "meters/"+agent.logicalDeviceID, &meters); err != nil {
+ logger.Errorw("Failed-to-list-meters-from-proxy", log.Fields{"error": err})
+ return
+ }
+ for _, meter := range meters {
+ if meter.Config != nil {
+ meterChunk := MeterChunk{
+ meter: meter,
+ }
+ agent.meters[meter.Config.MeterId] = &meterChunk
+ }
+ }
+}
+
+//updateLogicalDeviceMeter updates meter info in store and cache
+//It is assumed that the chunk lock has been acquired before this function is called
+func (agent *LogicalAgent) updateLogicalDeviceMeter(ctx context.Context, meter *ofp.OfpMeterEntry, meterChunk *MeterChunk) error {
+ path := fmt.Sprintf("meters/%s/%d", agent.logicalDeviceID, meter.Config.MeterId)
+ if err := agent.clusterDataProxy.Update(ctx, path, meter); err != nil {
+ logger.Errorw("error-updating-logical-device-with-meters", log.Fields{"error": err})
+ return err
+ }
+ meterChunk.meter = meter
+ return nil
+}
+
+//removeLogicalDeviceMeter deletes the meter from store and cache
+//It is assumed that the chunk lock has been acquired before this function is called
+func (agent *LogicalAgent) removeLogicalDeviceMeter(ctx context.Context, meterID uint32) error {
+ path := fmt.Sprintf("meters/%s/%d", agent.logicalDeviceID, meterID)
+ if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
+ return fmt.Errorf("couldnt-delete-meter-from-store-%s", path)
+ }
+ agent.meterLock.Lock()
+ defer agent.meterLock.Unlock()
+ delete(agent.meters, meterID)
+ return nil
+}
+
+// ListLogicalDeviceMeters returns logical device meters
+func (agent *LogicalAgent) ListLogicalDeviceMeters(ctx context.Context) (*ofp.Meters, error) {
+ logger.Debug("ListLogicalDeviceMeters")
+
+ var meterEntries []*ofp.OfpMeterEntry
+ agent.meterLock.RLock()
+ defer agent.meterLock.RUnlock()
+ for _, meterChunk := range agent.meters {
+ meterEntries = append(meterEntries, (proto.Clone(meterChunk.meter)).(*ofp.OfpMeterEntry))
+ }
+ return &ofp.Meters{Items: meterEntries}, nil
+}
+
+// GetMeterConfig returns meter config
+func (agent *LogicalAgent) GetMeterConfig(flows []*ofp.OfpFlowStats, meters []*ofp.OfpMeterEntry, metadata *voltha.FlowMetadata) error {
+ m := make(map[uint32]bool)
+ for _, flow := range flows {
+ if flowMeterID := fu.GetMeterIdFromFlow(flow); flowMeterID != 0 && !m[flowMeterID] {
+ foundMeter := false
+ // Meter is present in the flow , Get from logical device
+ for _, meter := range meters {
+ if flowMeterID == meter.Config.MeterId {
+ metadata.Meters = append(metadata.Meters, meter.Config)
+ logger.Debugw("Found meter in logical device",
+ log.Fields{"meterID": flowMeterID, "meter-band": meter.Config})
+ m[flowMeterID] = true
+ foundMeter = true
+ break
+ }
+ }
+ if !foundMeter {
+ logger.Errorw("Meter-referred-by-flow-is-not-found-in-logicaldevice",
+ log.Fields{"meterID": flowMeterID, "Available-meters": meters, "flow": *flow})
+ return fmt.Errorf("Meter-referred-by-flow-is-not-found-in-logicaldevice.MeterId-%d", flowMeterID)
+ }
+ }
+ }
+ logger.Debugw("meter-bands-for-flows", log.Fields{"flows": len(flows), "metadata": metadata})
+ return nil
+}
+
+func (agent *LogicalAgent) updateFlowCountOfMeterStats(ctx context.Context, modCommand *ofp.OfpFlowMod, flow *ofp.OfpFlowStats, revertUpdate bool) bool {
+ flowCommand := modCommand.GetCommand()
+ meterID := fu.GetMeterIdFromFlow(flow)
+ logger.Debugw("Meter-id-in-flow-mod", log.Fields{"meterId": meterID})
+ if meterID == 0 {
+ logger.Debugw("No-meter-present-in-the-flow", log.Fields{"flow": *flow})
+ return true
+ }
+
+ if flowCommand != ofp.OfpFlowModCommand_OFPFC_ADD && flowCommand != ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT {
+ return true
+ }
+ agent.meterLock.RLock()
+ meterChunk, ok := agent.meters[meterID]
+ agent.meterLock.RUnlock()
+ if !ok {
+ logger.Debugw("Meter-is-not-present-in-logical-device", log.Fields{"meterID": meterID})
+ return true
+ }
+
+ //acquire the meter lock
+ meterChunk.lock.Lock()
+ defer meterChunk.lock.Unlock()
+
+ if flowCommand == ofp.OfpFlowModCommand_OFPFC_ADD {
+ if revertUpdate {
+ meterChunk.meter.Stats.FlowCount--
+ } else {
+ meterChunk.meter.Stats.FlowCount++
+ }
+ } else if flowCommand == ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT {
+ if revertUpdate {
+ meterChunk.meter.Stats.FlowCount++
+ } else {
+ meterChunk.meter.Stats.FlowCount--
+ }
+ }
+
+ // Update store and cache
+ if err := agent.updateLogicalDeviceMeter(ctx, meterChunk.meter, meterChunk); err != nil {
+ logger.Debugw("unable-to-update-meter-in-db", log.Fields{"logicalDevice": agent.logicalDeviceID, "meterID": meterID})
+ return false
+ }
+
+ logger.Debugw("updated-meter-flow-stats", log.Fields{"meterId": meterID})
+ return true
+}
diff --git a/rw_core/core/device/logical_agent_port.go b/rw_core/core/device/logical_agent_port.go
new file mode 100644
index 0000000..27ce04b
--- /dev/null
+++ b/rw_core/core/device/logical_agent_port.go
@@ -0,0 +1,650 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+ "context"
+ "fmt"
+
+ "github.com/gogo/protobuf/proto"
+ coreutils "github.com/opencord/voltha-go/rw_core/utils"
+ fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// ListLogicalDevicePorts returns logical device ports
+func (agent *LogicalAgent) ListLogicalDevicePorts(ctx context.Context) (*voltha.LogicalPorts, error) {
+ logger.Debug("ListLogicalDevicePorts")
+ logicalDevice, err := agent.GetLogicalDevice(ctx)
+ if err != nil {
+ return nil, err
+ }
+ if logicalDevice == nil {
+ return &voltha.LogicalPorts{}, nil
+ }
+ lPorts := make([]*voltha.LogicalPort, 0)
+ lPorts = append(lPorts, logicalDevice.Ports...)
+ return &voltha.LogicalPorts{Items: lPorts}, nil
+}
+
+func (agent *LogicalAgent) updateLogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
+ logger.Debugw("updateLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
+ var err error
+ if port.Type == voltha.Port_ETHERNET_NNI {
+ if _, err = agent.addNNILogicalPort(ctx, device, port); err != nil {
+ return err
+ }
+ agent.addLogicalPortToMap(port.PortNo, true)
+ } else if port.Type == voltha.Port_ETHERNET_UNI {
+ if _, err = agent.addUNILogicalPort(ctx, device, port); err != nil {
+ return err
+ }
+ agent.addLogicalPortToMap(port.PortNo, false)
+ } else {
+ // Update the device routes to ensure all routes on the logical device have been calculated
+ if err = agent.buildRoutes(ctx); err != nil {
+ // Not an error - temporary state
+ logger.Warnw("failed-to-update-routes", log.Fields{"device-id": device.Id, "port": port, "error": err})
+ }
+ }
+ return nil
+}
+
+// setupLogicalPorts is invoked once the logical device has been created and is ready to get ports
+// added to it. While the logical device was being created we could have received requests to add
+// NNI and UNI ports which were discarded. Now is the time to add them if needed
+func (agent *LogicalAgent) setupLogicalPorts(ctx context.Context) error {
+ logger.Infow("setupLogicalPorts", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ // First add any NNI ports which could have been missing
+ if err := agent.setupNNILogicalPorts(ctx, agent.rootDeviceID); err != nil {
+ logger.Errorw("error-setting-up-NNI-ports", log.Fields{"error": err, "deviceId": agent.rootDeviceID})
+ return err
+ }
+
+ // Now, set up the UNI ports if needed.
+ children, err := agent.deviceMgr.GetAllChildDevices(ctx, agent.rootDeviceID)
+ if err != nil {
+ logger.Errorw("error-getting-child-devices", log.Fields{"error": err, "deviceId": agent.rootDeviceID})
+ return err
+ }
+ responses := make([]coreutils.Response, 0)
+ for _, child := range children.Items {
+ response := coreutils.NewResponse()
+ responses = append(responses, response)
+ go func(child *voltha.Device) {
+ if err = agent.setupUNILogicalPorts(context.Background(), child); err != nil {
+ logger.Error("setting-up-UNI-ports-failed", log.Fields{"deviceID": child.Id})
+ response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
+ }
+ response.Done()
+ }(child)
+ }
+ // Wait for completion
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, responses...); res != nil {
+ return status.Errorf(codes.Aborted, "errors-%s", res)
+ }
+ return nil
+}
+
+// setupNNILogicalPorts creates an NNI port on the logical device that represents an NNI interface on a root device
+func (agent *LogicalAgent) setupNNILogicalPorts(ctx context.Context, deviceID string) error {
+ logger.Infow("setupNNILogicalPorts-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ // Build the logical device based on information retrieved from the device adapter
+ var err error
+
+ var device *voltha.Device
+ if device, err = agent.deviceMgr.getDevice(ctx, deviceID); err != nil {
+ logger.Errorw("error-retrieving-device", log.Fields{"error": err, "deviceId": deviceID})
+ return err
+ }
+
+ //Get UNI port number
+ for _, port := range device.Ports {
+ if port.Type == voltha.Port_ETHERNET_NNI {
+ if _, err = agent.addNNILogicalPort(ctx, device, port); err != nil {
+ logger.Errorw("error-adding-UNI-port", log.Fields{"error": err})
+ }
+ agent.addLogicalPortToMap(port.PortNo, true)
+ }
+ }
+ return err
+}
+
+// updatePortState updates the port state of the device
+func (agent *LogicalAgent) updatePortState(ctx context.Context, deviceID string, portNo uint32, operStatus voltha.OperStatus_Types) error {
+ logger.Infow("updatePortState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "portNo": portNo, "state": operStatus})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+ // Get the latest logical device info
+ original := agent.getLogicalDeviceWithoutLock()
+ updatedPorts := clonePorts(original.Ports)
+ for _, port := range updatedPorts {
+ if port.DeviceId == deviceID && port.DevicePortNo == portNo {
+ if operStatus == voltha.OperStatus_ACTIVE {
+ port.OfpPort.Config = port.OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
+ } else {
+ port.OfpPort.Config = port.OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
+ }
+ // Update the logical device
+ if err := agent.updateLogicalDevicePortsWithoutLock(ctx, original, updatedPorts); err != nil {
+ logger.Errorw("error-updating-logical-device", log.Fields{"error": err})
+ return err
+ }
+ return nil
+ }
+ }
+ return status.Errorf(codes.NotFound, "port-%d-not-exist", portNo)
+}
+
+// updatePortsState updates the ports state related to the device
+func (agent *LogicalAgent) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.OperStatus_Types) error {
+ logger.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+ // Get the latest logical device info
+ original := agent.getLogicalDeviceWithoutLock()
+ updatedPorts := clonePorts(original.Ports)
+ for _, port := range updatedPorts {
+ if port.DeviceId == device.Id {
+ if state == voltha.OperStatus_ACTIVE {
+ port.OfpPort.Config = port.OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
+ } else {
+ port.OfpPort.Config = port.OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ port.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
+ }
+ }
+ }
+ // Updating the logical device will trigger the poprt change events to be populated to the controller
+ if err := agent.updateLogicalDevicePortsWithoutLock(ctx, original, updatedPorts); err != nil {
+ logger.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
+ return err
+ }
+ return nil
+}
+
+// setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
+func (agent *LogicalAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
+ logger.Infow("setupUNILogicalPort", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ // Build the logical device based on information retrieved from the device adapter
+ var err error
+ var added bool
+ //Get UNI port number
+ for _, port := range childDevice.Ports {
+ if port.Type == voltha.Port_ETHERNET_UNI {
+ if added, err = agent.addUNILogicalPort(ctx, childDevice, port); err != nil {
+ logger.Errorw("error-adding-UNI-port", log.Fields{"error": err})
+ }
+ if added {
+ agent.addLogicalPortToMap(port.PortNo, false)
+ }
+ }
+ }
+ return err
+}
+
+// deleteAllLogicalPorts deletes all logical ports associated with this logical device
+func (agent *LogicalAgent) deleteAllLogicalPorts(ctx context.Context) error {
+ logger.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+ // Get the latest logical device info
+ cloned := agent.getLogicalDeviceWithoutLock()
+
+ if err := agent.updateLogicalDevicePortsWithoutLock(ctx, cloned, []*voltha.LogicalPort{}); err != nil {
+ logger.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
+ return err
+ }
+ return nil
+}
+
+// deleteLogicalPort removes the logical port
+func (agent *LogicalAgent) deleteLogicalPort(ctx context.Context, lPort *voltha.LogicalPort) error {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+
+ logicalDevice := agent.getLogicalDeviceWithoutLock()
+
+ index := -1
+ for i, logicalPort := range logicalDevice.Ports {
+ if logicalPort.Id == lPort.Id {
+ index = i
+ break
+ }
+ }
+ if index >= 0 {
+ clonedPorts := clonePorts(logicalDevice.Ports)
+ if index < len(clonedPorts)-1 {
+ copy(clonedPorts[index:], clonedPorts[index+1:])
+ }
+ clonedPorts[len(clonedPorts)-1] = nil
+ clonedPorts = clonedPorts[:len(clonedPorts)-1]
+ logger.Debugw("logical-port-deleted", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ if err := agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts); err != nil {
+ logger.Errorw("logical-device-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ return err
+ }
+
+ // Remove the logical port from cache
+ agent.deleteLogicalPortsFromMap([]uint32{lPort.DevicePortNo})
+ // Reset the logical device routes
+ go func() {
+ if err := agent.buildRoutes(context.Background()); err != nil {
+ logger.Warnw("device-routes-not-ready", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
+ }
+ }()
+ }
+ return nil
+}
+
+// deleteLogicalPorts removes the logical ports associated with that deviceId
+func (agent *LogicalAgent) deleteLogicalPorts(ctx context.Context, deviceID string) error {
+ logger.Debugw("deleting-logical-ports", log.Fields{"device-id": deviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+
+ logicalDevice := agent.getLogicalDeviceWithoutLock()
+ lPortstoKeep := []*voltha.LogicalPort{}
+ lPortsNoToDelete := []uint32{}
+ for _, logicalPort := range logicalDevice.Ports {
+ if logicalPort.DeviceId != deviceID {
+ lPortstoKeep = append(lPortstoKeep, logicalPort)
+ } else {
+ lPortsNoToDelete = append(lPortsNoToDelete, logicalPort.DevicePortNo)
+ }
+ }
+ logger.Debugw("deleted-logical-ports", log.Fields{"ports": lPortstoKeep})
+ if err := agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, lPortstoKeep); err != nil {
+ logger.Errorw("logical-device-update-failed", log.Fields{"logical-device-id": agent.logicalDeviceID})
+ return err
+ }
+ // Remove the port from the cached logical ports set
+ agent.deleteLogicalPortsFromMap(lPortsNoToDelete)
+
+ // Reset the logical device routes
+ go func() {
+ if err := agent.buildRoutes(context.Background()); err != nil {
+ logger.Warnw("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
+ }
+ }()
+
+ return nil
+}
+
+// enableLogicalPort enables the logical port
+func (agent *LogicalAgent) enableLogicalPort(ctx context.Context, lPortID string) error {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+
+ logicalDevice := agent.getLogicalDeviceWithoutLock()
+
+ index := -1
+ for i, logicalPort := range logicalDevice.Ports {
+ if logicalPort.Id == lPortID {
+ index = i
+ break
+ }
+ }
+ if index >= 0 {
+ clonedPorts := clonePorts(logicalDevice.Ports)
+ clonedPorts[index].OfpPort.Config = clonedPorts[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ return agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts)
+ }
+ return status.Errorf(codes.NotFound, "Port %s on Logical Device %s", lPortID, agent.logicalDeviceID)
+}
+
+// disableLogicalPort disabled the logical port
+func (agent *LogicalAgent) disableLogicalPort(ctx context.Context, lPortID string) error {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+
+ // Get the most up to date logical device
+ logicalDevice := agent.getLogicalDeviceWithoutLock()
+ index := -1
+ for i, logicalPort := range logicalDevice.Ports {
+ if logicalPort.Id == lPortID {
+ index = i
+ break
+ }
+ }
+ if index >= 0 {
+ clonedPorts := clonePorts(logicalDevice.Ports)
+ clonedPorts[index].OfpPort.Config = (clonedPorts[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)) | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ return agent.updateLogicalDevicePortsWithoutLock(ctx, logicalDevice, clonedPorts)
+ }
+ return status.Errorf(codes.NotFound, "Port %s on Logical Device %s", lPortID, agent.logicalDeviceID)
+}
+
+// addNNILogicalPort adds an NNI port to the logical device. It returns a bool representing whether a port has been
+// added and an eror in case a valid error is encountered. If the port was successfully added it will return
+// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
+// scenario. This also applies to the case where the port was already added.
+func (agent *LogicalAgent) addNNILogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) (bool, error) {
+ logger.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
+
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return false, err
+ }
+ if agent.portExist(device, port) {
+ logger.Debugw("port-already-exist", log.Fields{"port": port})
+ agent.requestQueue.RequestComplete()
+ return false, nil
+ }
+ agent.requestQueue.RequestComplete()
+
+ var portCap *ic.PortCapability
+ var err error
+ // First get the port capability
+ if portCap, err = agent.deviceMgr.getPortCapability(ctx, device.Id, port.PortNo); err != nil {
+ logger.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
+ return false, err
+ }
+
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return false, err
+ }
+
+ defer agent.requestQueue.RequestComplete()
+ // Double check again if this port has been already added since the getPortCapability could have taken a long time
+ if agent.portExist(device, port) {
+ logger.Debugw("port-already-exist", log.Fields{"port": port})
+ return false, nil
+ }
+
+ portCap.Port.RootPort = true
+ lp := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
+ lp.DeviceId = device.Id
+ lp.Id = fmt.Sprintf("nni-%d", port.PortNo)
+ lp.OfpPort.PortNo = port.PortNo
+ lp.OfpPort.Name = lp.Id
+ lp.DevicePortNo = port.PortNo
+
+ ld := agent.getLogicalDeviceWithoutLock()
+
+ clonedPorts := clonePorts(ld.Ports)
+ if clonedPorts == nil {
+ clonedPorts = make([]*voltha.LogicalPort, 0)
+ }
+ clonedPorts = append(clonedPorts, lp)
+
+ if err = agent.updateLogicalDevicePortsWithoutLock(ctx, ld, clonedPorts); err != nil {
+ logger.Errorw("error-updating-logical-device", log.Fields{"error": err})
+ return false, err
+ }
+
+ // Update the device routes with this new logical port
+ clonedLP := (proto.Clone(lp)).(*voltha.LogicalPort)
+ go func() {
+ if err := agent.updateRoutes(context.Background(), clonedLP); err != nil {
+ logger.Warnw("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "logical-port": lp.OfpPort.PortNo, "error": err})
+ }
+ }()
+
+ return true, nil
+}
+
+func (agent *LogicalAgent) portExist(device *voltha.Device, port *voltha.Port) bool {
+ ldevice := agent.getLogicalDeviceWithoutLock()
+ for _, lPort := range ldevice.Ports {
+ if lPort.DeviceId == device.Id && lPort.DevicePortNo == port.PortNo && lPort.Id == port.Label {
+ return true
+ }
+ }
+ return false
+}
+
+// addUNILogicalPort adds an UNI port to the logical device. It returns a bool representing whether a port has been
+// added and an eror in case a valid error is encountered. If the port was successfully added it will return
+// (true, nil). If the device is not in the correct state it will return (false, nil) as this is a valid
+// scenario. This also applies to the case where the port was already added.
+func (agent *LogicalAgent) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device, port *voltha.Port) (bool, error) {
+ logger.Debugw("addUNILogicalPort", log.Fields{"port": port})
+ if childDevice.AdminState != voltha.AdminState_ENABLED || childDevice.OperStatus != voltha.OperStatus_ACTIVE {
+ logger.Infow("device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
+ return false, nil
+ }
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return false, err
+ }
+
+ if agent.portExist(childDevice, port) {
+ logger.Debugw("port-already-exist", log.Fields{"port": port})
+ agent.requestQueue.RequestComplete()
+ return false, nil
+ }
+ agent.requestQueue.RequestComplete()
+ var portCap *ic.PortCapability
+ var err error
+ // First get the port capability
+ if portCap, err = agent.deviceMgr.getPortCapability(ctx, childDevice.Id, port.PortNo); err != nil {
+ logger.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
+ return false, err
+ }
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return false, err
+ }
+ defer agent.requestQueue.RequestComplete()
+ // Double check again if this port has been already added since the getPortCapability could have taken a long time
+ if agent.portExist(childDevice, port) {
+ logger.Debugw("port-already-exist", log.Fields{"port": port})
+ return false, nil
+ }
+ // Get stored logical device
+ ldevice := agent.getLogicalDeviceWithoutLock()
+
+ logger.Debugw("adding-uni", log.Fields{"deviceId": childDevice.Id})
+ portCap.Port.RootPort = false
+ portCap.Port.Id = port.Label
+ portCap.Port.OfpPort.PortNo = port.PortNo
+ portCap.Port.DeviceId = childDevice.Id
+ portCap.Port.DevicePortNo = port.PortNo
+ clonedPorts := clonePorts(ldevice.Ports)
+ if clonedPorts == nil {
+ clonedPorts = make([]*voltha.LogicalPort, 0)
+ }
+ clonedPorts = append(clonedPorts, portCap.Port)
+ if err := agent.updateLogicalDevicePortsWithoutLock(ctx, ldevice, clonedPorts); err != nil {
+ return false, err
+ }
+ // Update the device graph with this new logical port
+ clonedLP := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
+
+ go func() {
+ if err := agent.updateRoutes(context.Background(), clonedLP); err != nil {
+ logger.Warn("routes-not-ready", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
+ }
+ }()
+
+ return true, nil
+}
+
+func clonePorts(ports []*voltha.LogicalPort) []*voltha.LogicalPort {
+ return proto.Clone(&voltha.LogicalPorts{Items: ports}).(*voltha.LogicalPorts).Items
+}
+
+//updateLogicalDevicePortsWithoutLock updates the
+func (agent *LogicalAgent) updateLogicalDevicePortsWithoutLock(ctx context.Context, device *voltha.LogicalDevice, newPorts []*voltha.LogicalPort) error {
+ oldPorts := device.Ports
+ device.Ports = newPorts
+ if err := agent.updateLogicalDeviceWithoutLock(ctx, device); err != nil {
+ return err
+ }
+ agent.portUpdated(oldPorts, newPorts)
+ return nil
+}
+
+// diff go over two lists of logical ports and return what's new, what's changed and what's removed.
+func diff(oldList, newList []*voltha.LogicalPort) (newPorts, changedPorts, deletedPorts map[string]*voltha.LogicalPort) {
+ newPorts = make(map[string]*voltha.LogicalPort, len(newList))
+ changedPorts = make(map[string]*voltha.LogicalPort, len(oldList))
+ deletedPorts = make(map[string]*voltha.LogicalPort, len(oldList))
+
+ for _, n := range newList {
+ newPorts[n.Id] = n
+ }
+
+ for _, o := range oldList {
+ if n, have := newPorts[o.Id]; have {
+ delete(newPorts, o.Id) // not new
+ if !proto.Equal(n, o) {
+ changedPorts[n.Id] = n // changed
+ }
+ } else {
+ deletedPorts[o.Id] = o // deleted
+ }
+ }
+
+ return newPorts, changedPorts, deletedPorts
+}
+
+// portUpdated is invoked when a port is updated on the logical device
+func (agent *LogicalAgent) portUpdated(prevPorts, currPorts []*voltha.LogicalPort) interface{} {
+ // Get the difference between the two list
+ newPorts, changedPorts, deletedPorts := diff(prevPorts, currPorts)
+
+ // Send the port change events to the OF controller
+ for _, newP := range newPorts {
+ go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
+ &ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_ADD, Desc: newP.OfpPort})
+ }
+ for _, change := range changedPorts {
+ go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
+ &ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_MODIFY, Desc: change.OfpPort})
+ }
+ for _, del := range deletedPorts {
+ go agent.ldeviceMgr.SendChangeEvent(agent.logicalDeviceID,
+ &ofp.OfpPortStatus{Reason: ofp.OfpPortReason_OFPPR_DELETE, Desc: del.OfpPort})
+ }
+
+ return nil
+}
+
+//GetWildcardInputPorts filters out the logical port number from the set of logical ports on the device and
+//returns their port numbers. This function is invoked only during flow decomposition where the lock on the logical
+//device is already held. Therefore it is safe to retrieve the logical device without lock.
+func (agent *LogicalAgent) GetWildcardInputPorts(excludePort ...uint32) []uint32 {
+ lPorts := make([]uint32, 0)
+ var exclPort uint32
+ if len(excludePort) == 1 {
+ exclPort = excludePort[0]
+ }
+ lDevice := agent.getLogicalDeviceWithoutLock()
+ for _, port := range lDevice.Ports {
+ if port.OfpPort.PortNo != exclPort {
+ lPorts = append(lPorts, port.OfpPort.PortNo)
+ }
+ }
+ return lPorts
+}
+
+// helpers for agent.logicalPortsNo
+
+func (agent *LogicalAgent) addLogicalPortToMap(portNo uint32, nniPort bool) {
+ agent.lockLogicalPortsNo.Lock()
+ defer agent.lockLogicalPortsNo.Unlock()
+ if exist := agent.logicalPortsNo[portNo]; !exist {
+ agent.logicalPortsNo[portNo] = nniPort
+ }
+}
+
+func (agent *LogicalAgent) addLogicalPortsToMap(lps []*voltha.LogicalPort) {
+ agent.lockLogicalPortsNo.Lock()
+ defer agent.lockLogicalPortsNo.Unlock()
+ for _, lp := range lps {
+ if exist := agent.logicalPortsNo[lp.DevicePortNo]; !exist {
+ agent.logicalPortsNo[lp.DevicePortNo] = lp.RootPort
+ }
+ }
+}
+
+func (agent *LogicalAgent) deleteLogicalPortsFromMap(portsNo []uint32) {
+ agent.lockLogicalPortsNo.Lock()
+ defer agent.lockLogicalPortsNo.Unlock()
+ for _, pNo := range portsNo {
+ delete(agent.logicalPortsNo, pNo)
+ }
+}
+
+func (agent *LogicalAgent) isNNIPort(portNo uint32) bool {
+ agent.lockLogicalPortsNo.RLock()
+ defer agent.lockLogicalPortsNo.RUnlock()
+ if exist := agent.logicalPortsNo[portNo]; exist {
+ return agent.logicalPortsNo[portNo]
+ }
+ return false
+}
+
+func (agent *LogicalAgent) getFirstNNIPort() (uint32, error) {
+ agent.lockLogicalPortsNo.RLock()
+ defer agent.lockLogicalPortsNo.RUnlock()
+ for portNo, nni := range agent.logicalPortsNo {
+ if nni {
+ return portNo, nil
+ }
+ }
+ return 0, status.Error(codes.NotFound, "No NNI port found")
+}
+
+//GetNNIPorts returns NNI ports.
+func (agent *LogicalAgent) GetNNIPorts() []uint32 {
+ agent.lockLogicalPortsNo.RLock()
+ defer agent.lockLogicalPortsNo.RUnlock()
+ nniPorts := make([]uint32, 0)
+ for portNo, nni := range agent.logicalPortsNo {
+ if nni {
+ nniPorts = append(nniPorts, portNo)
+ }
+ }
+ return nniPorts
+}
+
+// getUNILogicalPortNo returns the UNI logical port number specified in the flow
+func (agent *LogicalAgent) getUNILogicalPortNo(flow *ofp.OfpFlowStats) (uint32, error) {
+ var uniPort uint32
+ inPortNo := fu.GetInPort(flow)
+ outPortNo := fu.GetOutPort(flow)
+ if agent.isNNIPort(inPortNo) {
+ uniPort = outPortNo
+ } else if agent.isNNIPort(outPortNo) {
+ uniPort = inPortNo
+ }
+ if uniPort != 0 {
+ return uniPort, nil
+ }
+ return 0, status.Errorf(codes.NotFound, "no-uni-port: %v", flow)
+}
diff --git a/rw_core/core/device/logical_agent_route.go b/rw_core/core/device/logical_agent_route.go
new file mode 100644
index 0000000..6736160
--- /dev/null
+++ b/rw_core/core/device/logical_agent_route.go
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+ "context"
+ "errors"
+ "fmt"
+
+ "github.com/opencord/voltha-go/rw_core/route"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// GetRoute returns route
+func (agent *LogicalAgent) GetRoute(ctx context.Context, ingressPortNo uint32, egressPortNo uint32) ([]route.Hop, error) {
+ logger.Debugw("getting-route", log.Fields{"ingress-port": ingressPortNo, "egress-port": egressPortNo})
+ routes := make([]route.Hop, 0)
+
+ // Note: A port value of 0 is equivalent to a nil port
+
+ // Consider different possibilities
+ if egressPortNo != 0 && ((egressPortNo & 0x7fffffff) == uint32(ofp.OfpPortNo_OFPP_CONTROLLER)) {
+ logger.Debugw("controller-flow", log.Fields{"ingressPortNo": ingressPortNo, "egressPortNo": egressPortNo, "logicalPortsNo": agent.logicalPortsNo})
+ if agent.isNNIPort(ingressPortNo) {
+ //This is a trap on the NNI Port
+ if len(agent.deviceRoutes.Routes) == 0 {
+ // If there are no routes set (usually when the logical device has only NNI port(s), then just return an
+ // route with same IngressHop and EgressHop
+ hop := route.Hop{DeviceID: agent.rootDeviceID, Ingress: ingressPortNo, Egress: ingressPortNo}
+ routes = append(routes, hop)
+ routes = append(routes, hop)
+ return routes, nil
+ }
+ //Return a 'half' route to make the flow decomposer logic happy
+ for routeLink, path := range agent.deviceRoutes.Routes {
+ if agent.isNNIPort(routeLink.Egress) {
+ routes = append(routes, route.Hop{}) // first hop is set to empty
+ routes = append(routes, path[1])
+ return routes, nil
+ }
+ }
+ return nil, fmt.Errorf("no upstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
+ }
+ //treat it as if the output port is the first NNI of the OLT
+ var err error
+ if egressPortNo, err = agent.getFirstNNIPort(); err != nil {
+ logger.Warnw("no-nni-port", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ //If ingress port is not specified (nil), it may be a wildcarded
+ //route if egress port is OFPP_CONTROLLER or a nni logical port,
+ //in which case we need to create a half-route where only the egress
+ //hop is filled, the first hop is nil
+ if ingressPortNo == 0 && agent.isNNIPort(egressPortNo) {
+ // We can use the 2nd hop of any upstream route, so just find the first upstream:
+ for routeLink, path := range agent.deviceRoutes.Routes {
+ if agent.isNNIPort(routeLink.Egress) {
+ routes = append(routes, route.Hop{}) // first hop is set to empty
+ routes = append(routes, path[1])
+ return routes, nil
+ }
+ }
+ return nil, fmt.Errorf("no upstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
+ }
+ //If egress port is not specified (nil), we can also can return a "half" route
+ if egressPortNo == 0 {
+ for routeLink, path := range agent.deviceRoutes.Routes {
+ if routeLink.Ingress == ingressPortNo {
+ routes = append(routes, path[0])
+ routes = append(routes, route.Hop{})
+ return routes, nil
+ }
+ }
+ return nil, fmt.Errorf("no downstream route from:%d to:%d :%w", ingressPortNo, egressPortNo, route.ErrNoRoute)
+ }
+ // Return the pre-calculated route
+ return agent.getPreCalculatedRoute(ingressPortNo, egressPortNo)
+}
+
+func (agent *LogicalAgent) getPreCalculatedRoute(ingress, egress uint32) ([]route.Hop, error) {
+ logger.Debugw("ROUTE", log.Fields{"len": len(agent.deviceRoutes.Routes)})
+ for routeLink, route := range agent.deviceRoutes.Routes {
+ logger.Debugw("ROUTELINKS", log.Fields{"ingress": ingress, "egress": egress, "routelink": routeLink})
+ if ingress == routeLink.Ingress && egress == routeLink.Egress {
+ return route, nil
+ }
+ }
+ return nil, status.Errorf(codes.FailedPrecondition, "no route from:%d to:%d", ingress, egress)
+}
+
+// GetDeviceRoutes returns device graph
+func (agent *LogicalAgent) GetDeviceRoutes() *route.DeviceRoutes {
+ return agent.deviceRoutes
+}
+
+//generateDeviceRoutesIfNeeded generates the device routes if the logical device has been updated since the last time
+//that device graph was generated.
+func (agent *LogicalAgent) generateDeviceRoutesIfNeeded(ctx context.Context) error {
+ agent.lockDeviceRoutes.Lock()
+ defer agent.lockDeviceRoutes.Unlock()
+
+ ld, err := agent.GetLogicalDevice(ctx)
+ if err != nil {
+ return err
+ }
+
+ if agent.deviceRoutes != nil && agent.deviceRoutes.IsUpToDate(ld) {
+ return nil
+ }
+ logger.Debug("Generation of device route required")
+ if err := agent.buildRoutes(ctx); err != nil {
+ // No Route is not an error
+ if !errors.Is(err, route.ErrNoRoute) {
+ return err
+ }
+ }
+ return nil
+}
+
+//rebuildRoutes rebuilds the device routes
+func (agent *LogicalAgent) buildRoutes(ctx context.Context) error {
+ logger.Debugf("building-routes", log.Fields{"logical-device-id": agent.logicalDeviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+
+ if agent.deviceRoutes == nil {
+ agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
+ }
+ // Get all the logical ports on that logical device
+ lDevice := agent.getLogicalDeviceWithoutLock()
+
+ if err := agent.deviceRoutes.ComputeRoutes(ctx, lDevice.Ports); err != nil {
+ return err
+ }
+ if err := agent.deviceRoutes.Print(); err != nil {
+ return err
+ }
+ return nil
+}
+
+//updateRoutes updates the device routes
+func (agent *LogicalAgent) updateRoutes(ctx context.Context, lp *voltha.LogicalPort) error {
+ logger.Debugw("updateRoutes", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+
+ if agent.deviceRoutes == nil {
+ agent.deviceRoutes = route.NewDeviceRoutes(agent.logicalDeviceID, agent.deviceMgr.getDevice)
+ }
+ if err := agent.deviceRoutes.AddPort(ctx, lp, agent.logicalDevice.Ports); err != nil {
+ return err
+ }
+ if err := agent.deviceRoutes.Print(); err != nil {
+ return err
+ }
+ return nil
+}