First Commit of Voltha-Go-Controller from Radisys
Change-Id: I8e2e908e7ab09a4fe3d86849da18b6d69dcf4ab0
diff --git a/internal/pkg/controller/addflows.go b/internal/pkg/controller/addflows.go
new file mode 100644
index 0000000..0364341
--- /dev/null
+++ b/internal/pkg/controller/addflows.go
@@ -0,0 +1,217 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package controller
+
+import (
+ "context"
+ infraerror "voltha-go-controller/internal/pkg/errorcodes"
+ infraerrorcode "voltha-go-controller/internal/pkg/errorcodes/service"
+ "time"
+
+ "voltha-go-controller/internal/pkg/of"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+)
+
+const (
+ //MaxRetryCount - Maximum retry attempts on failure
+ MaxRetryCount int = 1
+)
+
+// AddFlowsTask structure
+type AddFlowsTask struct {
+ taskID uint8
+ ctx context.Context
+ flow *of.VoltFlow
+ device *Device
+ timestamp string
+}
+
+// NewAddFlowsTask is constructor for AddFlowsTask
+func NewAddFlowsTask(ctx context.Context, flow *of.VoltFlow, device *Device) *AddFlowsTask {
+ var aft AddFlowsTask
+ aft.device = device
+ aft.flow = flow
+ aft.ctx = ctx
+ tstamp := (time.Now()).Format(time.RFC3339Nano)
+ aft.timestamp = tstamp
+ return &aft
+}
+
+// Name to add flow task
+func (aft *AddFlowsTask) Name() string {
+ for _, flow := range aft.flow.SubFlows {
+ logger.Infow(ctx, "Flow Cookies", log.Fields{"Cookie": flow.Cookie})
+ }
+ return "Add Flows Task"
+}
+
+// TaskID to return task ID
+func (aft *AddFlowsTask) TaskID() uint8 {
+ return aft.taskID
+}
+
+// Timestamp to return timestamp
+func (aft *AddFlowsTask) Timestamp() string {
+ return aft.timestamp
+}
+
+// Stop to stop the add flow task
+func (aft *AddFlowsTask) Stop() {
+}
+
+// Start to start adding flow task
+func (aft *AddFlowsTask) Start(ctx context.Context, taskID uint8) error {
+ var err error
+ aft.taskID = taskID
+ aft.ctx = ctx
+ flowsToProcess := make(map[uint64]*of.VoltSubFlow)
+ flowsPresent := 0
+ // First add/delete the flows first locally before passing them to actual device
+ for _, flow := range aft.flow.SubFlows {
+ logger.Infow(ctx, "Flow Mod Request", log.Fields{"Cookie": flow.Cookie, "Oper": aft.flow.Command, "Port": aft.flow.PortID})
+ if aft.flow.Command == of.CommandAdd {
+ flow.State = of.FlowAddPending
+ if err := aft.device.AddFlow(flow); err != nil {
+ logger.Warnw(ctx, "Add Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
+
+ // If flow already exists in cache, check for flow state
+ // If Success: Trigger success FLow Indication
+ // if Failure: Continue process, so add-retry happens
+ if err.Error() == ErrDuplicateFlow {
+ dbFlow, _ := aft.device.GetFlow(flow.Cookie)
+ if dbFlow.State == of.FlowAddSuccess {
+ aft.device.triggerFlowNotification(flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+ flowsPresent++
+ }
+ }
+ }
+ flowsToProcess[flow.Cookie] = flow
+ } else {
+ dbFlow, ok := aft.device.GetFlow(flow.Cookie)
+ if !ok {
+ logger.Warnw(ctx, "Delete Flow Error: Flow Does not Exist", log.Fields{"Cookie": flow.Cookie, "Device": aft.device.ID})
+ } else {
+ // dbFlow.State = of.FlowDelPending
+ // aft.device.AddFlowToDb(dbFlow)
+ flowsToProcess[flow.Cookie] = dbFlow
+ }
+ aft.device.triggerFlowNotification(flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+ }
+ }
+
+ if flowsPresent == len(aft.flow.SubFlows) {
+ logger.Warn(ctx, "All Flows already present in database. Skipping Flow Push to SB")
+ }
+
+ // PortName and PortID are used for validation of PortID, whether it is still valid and associated with old PortName or
+ // PortID got assigned to another PortName. If the condition met, skip these flow update to voltha core
+ if aft.flow.PortName != "" && aft.flow.PortID != 0 {
+ portName, _ := aft.device.GetPortName(aft.flow.PortID)
+ if aft.flow.PortName != portName && portName != "" {
+ for _, flow := range aft.flow.SubFlows {
+ logger.Errorw(ctx, "Skip Flow Update", log.Fields{"Reason": "Port Deleted", "PortName": aft.flow.PortName, "PortNo": aft.flow.PortID, "Cookie": flow.Cookie, "Operation": aft.flow.Command})
+ if aft.flow.Command == of.CommandDel {
+ aft.device.triggerFlowNotification(flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+ }
+ }
+ return nil
+ }
+ }
+
+ if !aft.device.isSBOperAllowed(aft.flow.ForceAction) {
+ for _, flow := range aft.flow.SubFlows {
+ logger.Errorw(ctx, "Skipping Flow Table Update", log.Fields{"Reason": "Device State not UP", "State": aft.device.State, "Cookie": flow.Cookie, "Operation": aft.flow.Command})
+ }
+ return nil
+ }
+
+ flows := of.ProcessVoltFlow(aft.device.ID, aft.flow.Command, flowsToProcess)
+ for _, flow := range flows {
+ attempt := 0
+ if vc := aft.device.VolthaClient(); vc != nil {
+ for {
+ if _, err = vc.UpdateLogicalDeviceFlowTable(aft.ctx, flow); err != nil {
+ logger.Errorw(ctx, "Update Flow Table Failed", log.Fields{"Cookie": flow.GetFlowMod().Cookie, "Reason": err.Error(), "Operation": aft.flow.Command})
+ statusCode, _ := infraerror.GetErrorInfo(err)
+
+ // Retry on flow delete failure once.
+ // Do NOT retry incase of failure with reason: Entry Not Found
+ if aft.flow.Command == of.CommandDel && statusCode != uint32(infraerrorcode.ErrNotExists) {
+ if attempt != MaxRetryCount {
+ logger.Errorw(ctx, "Retrying Flow Delete", log.Fields{"Cookie": flow.GetFlowMod().Cookie, "Attempt": attempt})
+ attempt++
+ continue
+ }
+ logger.Errorw(ctx, "Flow Delete failed even aft max retries", log.Fields{"Flow": flow, "Attempt": attempt})
+ }
+ }
+ break
+ }
+ aft.device.triggerFlowNotification(flow.FlowMod.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+
+ } else {
+ logger.Errorw(ctx, "Update Flow Table Failed: Voltha Client Unavailable", log.Fields{"Flow": flow})
+ }
+ }
+ return nil
+}
+
+func isFlowOperSuccess(statusCode uint32, oper of.Command) bool {
+ volthaErrorCode := infraerrorcode.ErrorCode(statusCode)
+
+ if volthaErrorCode == infraerrorcode.ErrOk {
+ return true
+ }
+
+ if oper == of.CommandAdd && volthaErrorCode == infraerrorcode.ErrAlreadyExists {
+ return true
+
+ } else if oper == of.CommandDel && volthaErrorCode == infraerrorcode.ErrNotExists {
+ return true
+ }
+ return false
+}
+
+// func getBwAvailInfo(bwAvailInfo []*voltha.ResponseMsg) of.BwAvailDetails {
+// var bwInfo of.BwAvailDetails
+// // convert the bw details sent from olt to a struct
+// // received msg format:
+// // additional_data[Data{ResponseMsg
+// //{"key":"prevBW","value":"111111"},
+// //{"key":"presentBW","value":"10000"}]
+// if len(bwAvailInfo) > 1 {
+// prevBwResp := bwAvailInfo[0]
+// if prevBwResp.Key == of.PrevBwInfo {
+// _, err := strconv.Atoi(prevBwResp.Val)
+// if err == nil {
+// bwInfo.PrevBw = prevBwResp.Val
+// }
+// }
+
+// presentBwResp := bwAvailInfo[1]
+// if presentBwResp.Key == of.PresentBwInfo {
+// _, err := strconv.Atoi(prevBwResp.Val)
+// if err == nil {
+// bwInfo.PresentBw = presentBwResp.Val
+// }
+// }
+// if bwInfo.PresentBw == bwInfo.PrevBw {
+// return of.BwAvailDetails{}
+// }
+// logger.Infow(ctx, "Bandwidth-consumed-info", log.Fields{"BwConsumed": bwInfo})
+// }
+// return bwInfo
+// }
diff --git a/internal/pkg/controller/auditdevice.go b/internal/pkg/controller/auditdevice.go
new file mode 100644
index 0000000..ea00185
--- /dev/null
+++ b/internal/pkg/controller/auditdevice.go
@@ -0,0 +1,210 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package controller
+
+import (
+ "context"
+ "time"
+
+ "voltha-go-controller/internal/pkg/tasks"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-protos/v5/go/common"
+ ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+)
+
+// AuditEventType type
+type AuditEventType uint8
+
+const (
+ // AuditEventDeviceDisc constant
+ AuditEventDeviceDisc AuditEventType = 0
+ // AuditEventDeviceStateChange constant
+ AuditEventDeviceStateChange AuditEventType = 1
+)
+
+const (
+ // NNIPortID NNI port id
+ NNIPortID uint32 = 0x1000000
+)
+
+// AuditDevice structure
+type AuditDevice struct {
+ taskID uint8
+ ctx context.Context
+ device *Device
+ stop bool
+ timestamp string
+ event AuditEventType
+}
+
+// NewAuditDevice is constructor for AuditDevice
+func NewAuditDevice(device *Device, event AuditEventType) *AuditDevice {
+ var ad AuditDevice
+ ad.device = device
+ ad.stop = false
+ tstamp := (time.Now()).Format(time.RFC3339Nano)
+ ad.timestamp = tstamp
+ ad.event = event
+ return &ad
+}
+
+// Name returns the task name
+func (ad *AuditDevice) Name() string {
+ return "Device Audit Task"
+}
+
+// TaskID returns the task id
+func (ad *AuditDevice) TaskID() uint8 {
+ return ad.taskID
+}
+
+// Timestamp returns the timestamp for the task
+func (ad *AuditDevice) Timestamp() string {
+ return ad.timestamp
+}
+
+// Stop to stop the task
+func (ad *AuditDevice) Stop() {
+ ad.stop = true
+}
+
+// Start to start the task
+func (ad *AuditDevice) Start(ctx context.Context, taskID uint8) error {
+ logger.Warnw(ctx, "Audit Device Task Triggered", log.Fields{"Context": ctx, "taskId": taskID, "Device": ad.device.ID})
+ ad.taskID = taskID
+ ad.ctx = ctx
+
+ if ad.stop {
+ logger.Errorw(ctx, "Audit Device Task Cancelled", log.Fields{"Context": ad.ctx, "Task": ad.taskID})
+ return tasks.ErrTaskCancelError
+ }
+
+ ofpps, err := ad.device.VolthaClient().ListLogicalDevicePorts(ad.ctx, &common.ID{Id: ad.device.ID})
+ if err != nil {
+ return err
+ }
+
+ // Compute the difference between the ports received and ports at VGC
+ // First build a map of all the received ports under missing ports. We
+ // will eliminate the ports that are in the device from the missing ports
+ // so that the elements remaining are missing ports. The ones that are
+ // not in missing ports are added to excess ports which should be deleted
+ // from the VGC.
+ missingPorts := make(map[uint32]*ofp.OfpPort)
+ for _, ofpp := range ofpps.Items {
+ missingPorts[ofpp.OfpPort.PortNo] = ofpp.OfpPort
+ }
+
+ var excessPorts []uint32
+ GetController().SetAuditFlags(ad.device)
+
+ processPortState := func(id uint32, vgcPort *DevicePort) {
+ logger.Debugw(ctx, "Process Port State Ind", log.Fields{"Port No": vgcPort.ID, "Port Name": vgcPort.Name})
+
+ if ofpPort, ok := missingPorts[id]; ok {
+ if ((vgcPort.State == PortStateDown) && (ofpPort.State == uint32(ofp.OfpPortState_OFPPS_LIVE))) || ((vgcPort.State == PortStateUp) && (ofpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE))) {
+ // This port exists in the received list and the map at
+ // VGC. This is common so delete it
+ logger.Infow(ctx, "Port State Mismatch", log.Fields{"Port": vgcPort.ID, "OfpPort": ofpPort.PortNo, "ReceivedState": ofpPort.State, "CurrentState": vgcPort.State})
+ ad.device.ProcessPortState(ofpPort.PortNo, ofpPort.State)
+ } else {
+ //To ensure the flows are in sync with port status and no mismatch due to reboot,
+ // repush/delete flows based on current port status
+ logger.Infow(ctx, "Port State Processing", log.Fields{"Port": vgcPort.ID, "OfpPort": ofpPort.PortNo, "ReceivedState": ofpPort.State, "CurrentState": vgcPort.State})
+ ad.device.ProcessPortStateAfterReboot(ofpPort.PortNo, ofpPort.State)
+ }
+ delete(missingPorts, id)
+ } else {
+ // This port is missing from the received list. This is an
+ // excess port at VGC. This must be added to excess ports
+ excessPorts = append(excessPorts, id)
+ }
+ logger.Debugw(ctx, "Processed Port State Ind", log.Fields{"Port No": vgcPort.ID, "Port Name": vgcPort.Name})
+
+ }
+
+ // 1st process the NNI port before all other ports so that the device state can be updated.
+ if vgcPort, ok := ad.device.PortsByID[NNIPortID]; ok {
+ logger.Info(ctx, "Processing NNI port state")
+ processPortState(NNIPortID, vgcPort)
+ }
+
+ for id, vgcPort := range ad.device.PortsByID {
+ if id == NNIPortID {
+ //NNI port already processed
+ continue
+ }
+ if ad.stop {
+ break
+ }
+ processPortState(id, vgcPort)
+ }
+ GetController().ResetAuditFlags(ad.device)
+
+ if ad.stop {
+ logger.Errorw(ctx, "Audit Device Task Cancelled", log.Fields{"Context": ad.ctx, "Task": ad.taskID})
+ return tasks.ErrTaskCancelError
+ }
+ ad.AddMissingPorts(missingPorts)
+ ad.DelExcessPorts(excessPorts)
+ ad.device.deviceAuditInProgress = false
+ logger.Warnw(ctx, "Audit Device Task Completed", log.Fields{"Context": ctx, "taskId": taskID, "Device": ad.device.ID})
+ return nil
+}
+
+// AddMissingPorts to add the missing ports
+func (ad *AuditDevice) AddMissingPorts(mps map[uint32]*ofp.OfpPort) {
+ logger.Debugw(ctx, "Device Audit - Add Missing Ports", log.Fields{"NumPorts": len(mps)})
+
+ addMissingPort := func(mp *ofp.OfpPort) {
+ logger.Debugw(ctx, "Process Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
+
+ // Error is ignored as it only drops duplicate ports
+ logger.Infow(ctx, "Calling AddPort", log.Fields{"No": mp.PortNo, "Name": mp.Name})
+ if err := ad.device.AddPort(mp.PortNo, mp.Name); err != nil {
+ logger.Warnw(ctx, "AddPort Failed", log.Fields{"No": mp.PortNo, "Name": mp.Name, "Reason": err})
+ }
+ if mp.State == uint32(ofp.OfpPortState_OFPPS_LIVE) {
+ ad.device.ProcessPortState(mp.PortNo, mp.State)
+ }
+ logger.Debugw(ctx, "Processed Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
+
+ }
+
+ // 1st process the NNI port before all other ports so that the flow provisioning for UNIs can be enabled
+ if mp, ok := mps[NNIPortID]; ok {
+ logger.Info(ctx, "Adding Missing NNI port")
+ addMissingPort(mp)
+ }
+
+ for portNo, mp := range mps {
+ if portNo != NNIPortID {
+ addMissingPort(mp)
+ }
+ }
+}
+
+// DelExcessPorts to delete the excess ports
+func (ad *AuditDevice) DelExcessPorts(eps []uint32) {
+ logger.Debugw(ctx, "Device Audit - Delete Excess Ports", log.Fields{"NumPorts": len(eps)})
+ for _, id := range eps {
+ // Now delete the port from the device @ VGC
+ logger.Infow(ctx, "Device Audit - Deleting Port", log.Fields{"PortId": id})
+ if err := ad.device.DelPort(id); err != nil {
+ logger.Warnw(ctx, "DelPort Failed", log.Fields{"PortId": id, "Reason": err})
+ }
+ }
+}
diff --git a/internal/pkg/controller/audittables.go b/internal/pkg/controller/audittables.go
new file mode 100644
index 0000000..486d560
--- /dev/null
+++ b/internal/pkg/controller/audittables.go
@@ -0,0 +1,532 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package controller
+
+import (
+ "context"
+ "strconv"
+ "time"
+
+ "voltha-go-controller/internal/pkg/intf"
+ "voltha-go-controller/internal/pkg/of"
+ "voltha-go-controller/internal/pkg/tasks"
+ "voltha-go-controller/internal/pkg/util"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-protos/v5/go/common"
+ ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
+)
+
+var (
+ rcvdGroups map[uint32]*ofp.OfpGroupDesc
+ groupsToAdd []*of.Group
+ groupsToMod []*of.Group
+)
+
+// AuditTablesTask structure
+type AuditTablesTask struct {
+ taskID uint8
+ ctx context.Context
+ device *Device
+ stop bool
+ timestamp string
+}
+
+// NewAuditTablesTask is constructor for AuditTablesTask
+func NewAuditTablesTask(device *Device) *AuditTablesTask {
+ var att AuditTablesTask
+ att.device = device
+ att.stop = false
+ tstamp := (time.Now()).Format(time.RFC3339Nano)
+ att.timestamp = tstamp
+ return &att
+}
+
+// Name returns name of the task
+func (att *AuditTablesTask) Name() string {
+ return "Audit Table Task"
+}
+
+// TaskID to return task id of the task
+func (att *AuditTablesTask) TaskID() uint8 {
+ return att.taskID
+}
+
+// Timestamp to return timestamp for the task
+func (att *AuditTablesTask) Timestamp() string {
+ return att.timestamp
+}
+
+// Stop to stop the task
+func (att *AuditTablesTask) Stop() {
+ att.stop = true
+}
+
+// Start is called by the framework and is responsible for implementing
+// the actual task.
+func (att *AuditTablesTask) Start(ctx context.Context, taskID uint8) error {
+ logger.Warnw(ctx, "Audit Table Task Triggered", log.Fields{"Context": ctx, "taskId": taskID, "Device": att.device.ID})
+ att.taskID = taskID
+ att.ctx = ctx
+ var errInfo error
+ var err error
+
+ // Audit the meters
+ if err = att.AuditMeters(); err != nil {
+ logger.Errorw(ctx, "Audit Meters Failed", log.Fields{"Reason": err.Error()})
+ errInfo = err
+ }
+
+ // Audit the Groups
+ if rcvdGroups, err = att.AuditGroups(); err != nil {
+ logger.Errorw(ctx, "Audit Groups Failed", log.Fields{"Reason": err.Error()})
+ errInfo = err
+ }
+
+ // Audit the flows
+ if err = att.AuditFlows(); err != nil {
+ logger.Errorw(ctx, "Audit Flows Failed", log.Fields{"Reason": err.Error()})
+ errInfo = err
+ }
+
+ // Triggering deletion of excess groups from device after the corresponding flows are removed
+ // to avoid flow dependency error during group deletion
+ logger.Infow(ctx, "Excess Groups", log.Fields{"Groups": rcvdGroups})
+ att.DelExcessGroups(rcvdGroups)
+ logger.Warnw(ctx, "Audit Table Task Completed", log.Fields{"Context": ctx, "taskId": taskID, "Device": att.device.ID})
+ return errInfo
+
+}
+
+// AuditMeters : Audit the meters which includes fetching the existing meters at the
+// voltha and identifying the delta between the ones held here and the
+// ones held at VOLTHA. The delta must be cleaned up to keep both the
+// components in sync
+func (att *AuditTablesTask) AuditMeters() error {
+
+ if att.stop {
+ return tasks.ErrTaskCancelError
+ }
+ var vc voltha.VolthaServiceClient
+ if vc = att.device.VolthaClient(); vc == nil {
+ logger.Error(ctx, "Fetch Device Meters Failed: Voltha Client Unavailable")
+ return nil
+ }
+
+ //-----------------------------
+ // Perform the audit of meters
+ // Fetch the meters
+ ms, err := vc.ListLogicalDeviceMeters(att.ctx, &voltha.ID{Id: att.device.ID})
+ if err != nil {
+ logger.Warnw(ctx, "Audit of flows failed", log.Fields{"Reason": err.Error()})
+ return err
+ }
+
+ // Build the map for easy and faster processing
+ rcvdMeters := make(map[uint32]*ofp.OfpMeterStats)
+ for _, m := range ms.Items {
+ rcvdMeters[m.Stats.MeterId] = m.Stats
+ }
+
+ // Verify all meters that are in the controller but not in the device
+ missingMeters := []*of.Meter{}
+ for _, meter := range att.device.meters {
+
+ if att.stop {
+ break
+ }
+ logger.Debugw(ctx, "Auditing Meter", log.Fields{"Id": meter.ID})
+
+ if _, ok := rcvdMeters[meter.ID]; ok {
+ // The meter exists in the device too. Just remove it from
+ // the received meters
+ delete(rcvdMeters, meter.ID)
+ } else {
+ // The flow exists at the controller but not at the device
+ // Push the flow to the device
+ logger.Debugw(ctx, "Adding Meter To Missing Meters", log.Fields{"Id": meter.ID})
+ missingMeters = append(missingMeters, meter)
+ }
+ }
+ if !att.stop {
+ att.AddMissingMeters(missingMeters)
+ att.DelExcessMeters(rcvdMeters)
+ } else {
+ err = tasks.ErrTaskCancelError
+ }
+ return err
+}
+
+// AddMissingMeters adds the missing meters detected by AuditMeters
+func (att *AuditTablesTask) AddMissingMeters(meters []*of.Meter) {
+ logger.Debugw(ctx, "Adding missing meters", log.Fields{"Number": len(meters)})
+ for _, meter := range meters {
+ meterMod, err := of.MeterUpdate(att.device.ID, of.MeterCommandAdd, meter)
+ if err != nil {
+ logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
+ continue
+ }
+ if vc := att.device.VolthaClient(); vc != nil {
+ if _, err = vc.UpdateLogicalDeviceMeterTable(att.ctx, meterMod); err != nil {
+ logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
+ }
+ } else {
+ logger.Error(ctx, "Update Meter Table Failed: Voltha Client Unavailable")
+ }
+ }
+}
+
+// DelExcessMeters to delete excess meters
+func (att *AuditTablesTask) DelExcessMeters(meters map[uint32]*ofp.OfpMeterStats) {
+ logger.Debugw(ctx, "Deleting Excess Meters", log.Fields{"Number": len(meters)})
+ for _, meter := range meters {
+ meterMod := &ofp.OfpMeterMod{}
+ meterMod.Command = ofp.OfpMeterModCommand_OFPMC_DELETE
+ meterMod.MeterId = meter.MeterId
+ meterUpd := &ofp.MeterModUpdate{Id: att.device.ID, MeterMod: meterMod}
+ if vc := att.device.VolthaClient(); vc != nil {
+ if _, err := vc.UpdateLogicalDeviceMeterTable(att.ctx, meterUpd); err != nil {
+ logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
+ }
+ } else {
+ logger.Error(ctx, "Update Meter Table Failed: Voltha Client Unavailable")
+ }
+ }
+}
+
+// AuditFlows audit the flows which includes fetching the existing meters at the
+// voltha and identifying the delta between the ones held here and the
+// ones held at VOLTHA. The delta must be cleaned up to keep both the
+// components in sync
+func (att *AuditTablesTask) AuditFlows() error {
+
+ if att.stop {
+ return tasks.ErrTaskCancelError
+ }
+
+ var vc voltha.VolthaServiceClient
+ if vc = att.device.VolthaClient(); vc == nil {
+ logger.Error(ctx, "Flow Audit Failed: Voltha Client Unavailable")
+ return nil
+ }
+
+ // ---------------------------------
+ // Perform the audit of flows first
+ // Retrieve the flows from the device
+ f, err := vc.ListLogicalDeviceFlows(att.ctx, &common.ID{Id: att.device.ID})
+ if err != nil {
+ logger.Warnw(ctx, "Audit of flows failed", log.Fields{"Reason": err.Error()})
+ return err
+ }
+
+ defaultSuccessFlowStatus := intf.FlowStatus{
+ Device: att.device.ID,
+ FlowModType: of.CommandAdd,
+ Status: 0,
+ Reason: "",
+ }
+
+ // Build the map for easy and faster processing
+ rcvdFlows := make(map[uint64]*ofp.OfpFlowStats)
+ flowsToAdd := &of.VoltFlow{}
+ flowsToAdd.SubFlows = make(map[uint64]*of.VoltSubFlow)
+ for _, flow := range f.Items {
+ rcvdFlows[flow.Cookie] = flow
+ }
+
+ att.device.flowLock.Lock()
+ // Verify all flows that are in the controller but not in the device
+ for _, flow := range att.device.flows {
+
+ if att.stop {
+ break
+ }
+
+ logger.Debugw(ctx, "Auditing Flow", log.Fields{"Cookie": flow.Cookie})
+ if _, ok := rcvdFlows[flow.Cookie]; ok {
+ // The flow exists in the device too. Just remove it from
+ // the received flows & trigger flow success indication unless
+ // the flow in del failure/pending state
+
+ if flow.State != of.FlowDelFailure && flow.State != of.FlowDelPending {
+ delete(rcvdFlows, flow.Cookie)
+ }
+ defaultSuccessFlowStatus.Cookie = strconv.FormatUint(flow.Cookie, 10)
+
+ logger.Infow(ctx, "Triggering Internal Flow Notification", log.Fields{"Flow Status": defaultSuccessFlowStatus})
+ GetController().ProcessFlowModResultIndication(defaultSuccessFlowStatus)
+ } else {
+ // The flow exists at the controller but not at the device
+ // Push the flow to the device
+ logger.Debugw(ctx, "Adding Flow To Missing Flows", log.Fields{"Cookie": flow.Cookie})
+ flowsToAdd.SubFlows[flow.Cookie] = flow
+ }
+ }
+ att.device.flowLock.Unlock()
+
+ if !att.stop {
+ // The flows remaining in the received flows are the excess flows at
+ // the device. Delete those flows
+ att.DelExcessFlows(rcvdFlows)
+ // Add the flows missing at the device
+ att.AddMissingFlows(flowsToAdd)
+ } else {
+ err = tasks.ErrTaskCancelError
+ }
+ return err
+}
+
+// AddMissingFlows : The flows missing from the device are reinstalled att the audit
+// The flows are added into a VoltFlow structure.
+func (att *AuditTablesTask) AddMissingFlows(mflow *of.VoltFlow) {
+ logger.Debugw(ctx, "Add Missing Flows", log.Fields{"Number": len(mflow.SubFlows)})
+ mflow.Command = of.CommandAdd
+ ofFlows := of.ProcessVoltFlow(att.device.ID, mflow.Command, mflow.SubFlows)
+ var vc voltha.VolthaServiceClient
+ var bwConsumedInfo of.BwAvailDetails
+ if vc = att.device.VolthaClient(); vc == nil {
+ logger.Error(ctx, "Update Flow Table Failed: Voltha Client Unavailable")
+ return
+ }
+ for _, flow := range ofFlows {
+ var dbFlow *of.VoltSubFlow
+ var present bool
+ if flow.FlowMod != nil {
+ if dbFlow, present = att.device.GetFlow(flow.FlowMod.Cookie); !present {
+ logger.Warn(ctx, "Flow Removed from DB. Ignoring Add Missing Flow", log.Fields{"Device": att.device.ID, "Cookie": flow.FlowMod.Cookie})
+ continue
+ }
+ }
+ var err error
+ if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flow); err != nil {
+ logger.Errorw(ctx, "Update Flow Table Failed", log.Fields{"Reason": err.Error()})
+ }
+ att.device.triggerFlowResultNotification(flow.FlowMod.Cookie, dbFlow, of.CommandAdd, bwConsumedInfo, err)
+ }
+}
+
+// DelExcessFlows delete the excess flows held at the VOLTHA
+func (att *AuditTablesTask) DelExcessFlows(flows map[uint64]*ofp.OfpFlowStats) {
+ logger.Debugw(ctx, "Deleting Excess Flows", log.Fields{"Number of Flows": len(flows)})
+
+ var vc voltha.VolthaServiceClient
+ if vc = att.device.VolthaClient(); vc == nil {
+ logger.Error(ctx, "Delete Excess Flows Failed: Voltha Client Unavailable")
+ return
+ }
+
+ // Let's cycle through the flows to delete the excess flows
+ for _, flow := range flows {
+
+ if _, present := att.device.GetFlow(flow.Cookie); present {
+ logger.Warn(ctx, "Flow Present in DB. Ignoring Delete Excess Flow", log.Fields{"Device": att.device.ID, "Cookie": flow.Cookie})
+ continue
+ }
+
+ logger.Debugw(ctx, "Deleting Flow", log.Fields{"Cookie": flow.Cookie})
+ // Create the flowMod structure and fill it out
+ flowMod := &ofp.OfpFlowMod{}
+ flowMod.Cookie = flow.Cookie
+ flowMod.TableId = flow.TableId
+ flowMod.Command = ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT
+ flowMod.IdleTimeout = flow.IdleTimeout
+ flowMod.HardTimeout = flow.HardTimeout
+ flowMod.Priority = flow.Priority
+ flowMod.BufferId = of.DefaultBufferID
+ flowMod.OutPort = of.DefaultOutPort
+ flowMod.OutGroup = of.DefaultOutGroup
+ flowMod.Flags = flow.Flags
+ flowMod.Match = flow.Match
+ flowMod.Instructions = flow.Instructions
+
+ // Create FlowTableUpdate
+ flowUpdate := &ofp.FlowTableUpdate{
+ Id: att.device.ID,
+ FlowMod: flowMod,
+ }
+
+ var err error
+ if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flowUpdate); err != nil {
+ logger.Errorw(ctx, "Flow Audit Delete Failed", log.Fields{"Reason": err.Error()})
+ }
+ att.device.triggerFlowResultNotification(flow.Cookie, nil, of.CommandDel, of.BwAvailDetails{}, err)
+ }
+}
+
+// AuditGroups audit the groups which includes fetching the existing groups at the
+// voltha and identifying the delta between the ones held here and the
+// ones held at VOLTHA. The delta must be cleaned up to keep both the
+// components in sync
+func (att *AuditTablesTask) AuditGroups() (map[uint32]*ofp.OfpGroupDesc, error) {
+
+ // Build the map for easy and faster processing
+ rcvdGroups = make(map[uint32]*ofp.OfpGroupDesc)
+
+ if att.stop {
+ return rcvdGroups, tasks.ErrTaskCancelError
+ }
+
+ var vc voltha.VolthaServiceClient
+ if vc = att.device.VolthaClient(); vc == nil {
+ logger.Error(ctx, "Group Audit Failed: Voltha Client Unavailable")
+ return rcvdGroups, nil
+ }
+
+ // ---------------------------------
+ // Perform the audit of groups first
+ // Retrieve the groups from the device
+ g, err := vc.ListLogicalDeviceFlowGroups(att.ctx, &common.ID{Id: att.device.ID})
+ if err != nil {
+ logger.Warnw(ctx, "Audit of groups failed", log.Fields{"Reason": err.Error()})
+ return rcvdGroups, err
+ }
+
+ groupsToAdd = []*of.Group{}
+ groupsToMod = []*of.Group{}
+ for _, group := range g.Items {
+ rcvdGroups[group.Desc.GroupId] = group.Desc
+ }
+ logger.Infow(ctx, "Received Groups", log.Fields{"Groups": rcvdGroups})
+
+ // Verify all groups that are in the controller but not in the device
+ att.device.groups.Range(att.compareGroupEntries)
+
+ if !att.stop {
+ // Add the groups missing at the device
+ logger.Infow(ctx, "Missing Groups", log.Fields{"Groups": groupsToAdd})
+ att.AddMissingGroups(groupsToAdd)
+
+ // Update groups with group member mismatch
+ logger.Infow(ctx, "Modify Groups", log.Fields{"Groups": groupsToMod})
+ att.UpdateMismatchGroups(groupsToMod)
+
+ // Note: Excess groups will be deleted after ensuring the connected
+ // flows are also removed as part fo audit flows
+ } else {
+ err = tasks.ErrTaskCancelError
+ }
+ // The groups remaining in the received groups are the excess groups at
+ // the device
+ return rcvdGroups, err
+}
+
+// compareGroupEntries to compare the group entries
+func (att *AuditTablesTask) compareGroupEntries(key, value interface{}) bool {
+
+ if att.stop {
+ return false
+ }
+
+ groupID := key.(uint32)
+ dbGroup := value.(*of.Group)
+ logger.Debugw(ctx, "Auditing Group", log.Fields{"Groupid": groupID})
+ if rcvdGrp, ok := rcvdGroups[groupID]; ok {
+ // The group exists in the device too.
+ // Compare the group members and add to modify list if required
+ compareGroupMembers(dbGroup, rcvdGrp)
+ delete(rcvdGroups, groupID)
+ } else {
+ // The group exists at the controller but not at the device
+ // Push the group to the device
+ logger.Debugw(ctx, "Adding Group To Missing Groups", log.Fields{"GroupId": groupID})
+ groupsToAdd = append(groupsToAdd, value.(*of.Group))
+ }
+ return true
+}
+
+func compareGroupMembers(refGroup *of.Group, rcvdGroup *ofp.OfpGroupDesc) {
+
+ portList := []uint32{}
+ refPortList := []uint32{}
+
+ //Collect port list from response Group Mod structure
+ //If PON is configured even for one group, then only PON shall be considered for compared for all groups
+ for _, bucket := range rcvdGroup.Buckets {
+ for _, actionBucket := range bucket.Actions {
+ if actionBucket.Type == ofp.OfpActionType_OFPAT_OUTPUT {
+ action := actionBucket.GetOutput()
+ portList = append(portList, action.Port)
+ }
+ }
+ }
+
+ refPortList = append(refPortList, refGroup.Buckets...)
+
+ //Is port list differs, trigger group update
+ if !util.IsSliceSame(refPortList, portList) {
+ groupsToMod = append(groupsToMod, refGroup)
+ }
+}
+
+//AddMissingGroups - addmissing groups to Voltha
+func (att *AuditTablesTask) AddMissingGroups(groupList []*of.Group) {
+ att.PushGroups(groupList, of.GroupCommandAdd)
+}
+
+//UpdateMismatchGroups - updates mismatched groups to Voltha
+func (att *AuditTablesTask) UpdateMismatchGroups(groupList []*of.Group) {
+ att.PushGroups(groupList, of.GroupCommandMod)
+}
+
+// PushGroups - The groups missing/to be updated in the device are reinstalled att the audit
+func (att *AuditTablesTask) PushGroups(groupList []*of.Group, grpCommand of.GroupCommand) {
+ logger.Debugw(ctx, "Pushing Groups", log.Fields{"Number": len(groupList), "Command": grpCommand})
+
+ var vc voltha.VolthaServiceClient
+ if vc = att.device.VolthaClient(); vc == nil {
+ logger.Error(ctx, "Update Group Table Failed: Voltha Client Unavailable")
+ return
+ }
+ for _, group := range groupList {
+ group.Command = grpCommand
+ groupUpdate := of.CreateGroupTableUpdate(group)
+ if _, err := vc.UpdateLogicalDeviceFlowGroupTable(att.ctx, groupUpdate); err != nil {
+ logger.Errorw(ctx, "Update Group Table Failed", log.Fields{"Reason": err.Error()})
+ }
+ }
+}
+
+// DelExcessGroups - Delete the excess groups held at the VOLTHA
+func (att *AuditTablesTask) DelExcessGroups(groups map[uint32]*ofp.OfpGroupDesc) {
+ logger.Debugw(ctx, "Deleting Excess Groups", log.Fields{"Number of Groups": len(groups)})
+
+ var vc voltha.VolthaServiceClient
+ if vc = att.device.VolthaClient(); vc == nil {
+ logger.Error(ctx, "Delete Excess Groups Failed: Voltha Client Unavailable")
+ return
+ }
+
+ // Let's cycle through the groups to delete the excess groups
+ for _, groupDesc := range groups {
+ logger.Debugw(ctx, "Deleting Group", log.Fields{"GroupId": groupDesc.GroupId})
+ group := &of.Group{}
+ group.Device = att.device.ID
+ group.GroupID = groupDesc.GroupId
+
+ //Group Members should be deleted before triggered group delete
+ group.Command = of.GroupCommandMod
+ groupUpdate := of.CreateGroupTableUpdate(group)
+ if _, err := vc.UpdateLogicalDeviceFlowGroupTable(att.ctx, groupUpdate); err != nil {
+ logger.Errorw(ctx, "Update Group Table Failed", log.Fields{"Reason": err.Error()})
+ }
+
+ group.Command = of.GroupCommandDel
+ groupUpdate = of.CreateGroupTableUpdate(group)
+ if _, err := vc.UpdateLogicalDeviceFlowGroupTable(att.ctx, groupUpdate); err != nil {
+ logger.Errorw(ctx, "Update Group Table Failed", log.Fields{"Reason": err.Error()})
+ }
+ }
+}
diff --git a/internal/pkg/controller/changeevent.go b/internal/pkg/controller/changeevent.go
new file mode 100644
index 0000000..95f6b07
--- /dev/null
+++ b/internal/pkg/controller/changeevent.go
@@ -0,0 +1,92 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package controller
+
+import (
+ "context"
+ "errors"
+ "time"
+
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+)
+
+// ChangeEventTask structure
+type ChangeEventTask struct {
+ taskID uint8
+ ctx context.Context
+ event *ofp.ChangeEvent
+ device *Device
+ timestamp string
+}
+
+// NewChangeEventTask is constructor for ChangeEventTask
+func NewChangeEventTask(ctx context.Context, event *ofp.ChangeEvent, device *Device) *ChangeEventTask {
+ var cet ChangeEventTask
+ cet.device = device
+ cet.event = event
+ cet.ctx = ctx
+ tstamp := (time.Now()).Format(time.RFC3339Nano)
+ cet.timestamp = tstamp
+ return &cet
+}
+
+// Name returns the name of the task
+func (cet *ChangeEventTask) Name() string {
+ return "Change Event Task"
+}
+
+// TaskID to return task id of the task
+func (cet *ChangeEventTask) TaskID() uint8 {
+ return cet.taskID
+}
+
+// Timestamp to return timestamp for the task
+func (cet *ChangeEventTask) Timestamp() string {
+ return cet.timestamp
+}
+
+// Stop to stop the task
+func (cet *ChangeEventTask) Stop() {
+}
+
+// Start to start the Change event task
+func (cet *ChangeEventTask) Start(ctx context.Context, taskID uint8) error {
+ cet.taskID = taskID
+ cet.ctx = ctx
+
+ if status, ok := cet.event.Event.(*ofp.ChangeEvent_PortStatus); ok {
+ portNo := status.PortStatus.Desc.PortNo
+ portName := status.PortStatus.Desc.Name
+ state := status.PortStatus.Desc.State
+ logger.Infow(ctx, "Process Port Change Event", log.Fields{"Port No": portNo, "Port Name": portName, "State": state, "Reason": status.PortStatus.Reason})
+ if status.PortStatus.Reason == ofp.OfpPortReason_OFPPR_ADD {
+ _ = cet.device.AddPort(portNo, portName)
+ if state == uint32(ofp.OfpPortState_OFPPS_LIVE) {
+ cet.device.ProcessPortState(portNo, state)
+ }
+ } else if status.PortStatus.Reason == ofp.OfpPortReason_OFPPR_DELETE {
+ if err := cet.device.DelPort(portNo); err != nil {
+ logger.Warnw(ctx, "DelPort Failed", log.Fields{"Port No": portNo, "Error": err})
+ }
+ } else if status.PortStatus.Reason == ofp.OfpPortReason_OFPPR_MODIFY {
+ cet.device.ProcessPortUpdate(portName, portNo, state)
+ }
+ logger.Infow(ctx, "Processed Port Change Event", log.Fields{"Port No": portNo, "Port Name": portName, "State": state, "Reason": status.PortStatus.Reason})
+ return nil
+ }
+ return errors.New("Invalid message received")
+}
diff --git a/internal/pkg/controller/controller.go b/internal/pkg/controller/controller.go
new file mode 100644
index 0000000..ae34133
--- /dev/null
+++ b/internal/pkg/controller/controller.go
@@ -0,0 +1,523 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package controller
+
+import (
+ "context"
+ "errors"
+ "sync"
+ "time"
+
+ "encoding/hex"
+
+ "voltha-go-controller/database"
+ errorCodes "voltha-go-controller/internal/pkg/errorcodes"
+ "voltha-go-controller/internal/pkg/intf"
+ "voltha-go-controller/internal/pkg/of"
+ "voltha-go-controller/internal/pkg/tasks"
+ "voltha-go-controller/internal/pkg/util"
+ "voltha-go-controller/internal/pkg/vpagent"
+
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+)
+
+var logger log.CLogger
+var ctx = context.TODO()
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
+ if err != nil {
+ panic(err)
+ }
+}
+
+var db database.DBIntf
+
+var deviceTableSyncDuration = 15 * time.Minute
+
+//SetDeviceTableSyncDuration - sets interval between device table sync up activity
+// duration - in minutes
+func SetDeviceTableSyncDuration(duration int) {
+ deviceTableSyncDuration = time.Duration(duration) * time.Minute
+}
+
+// VoltController structure
+type VoltController struct {
+ rebootLock sync.Mutex
+ rebootInProgressDevices map[string]string
+ devices map[string]*Device
+ deviceLock sync.RWMutex
+ vagent map[string]*vpagent.VPAgent
+ ctx context.Context
+ app intf.App
+ RebootFlow bool
+ BlockedDeviceList *util.ConcurrentMap
+ deviceTaskQueue *util.ConcurrentMap
+}
+
+var vcontroller *VoltController
+
+// NewController is the constructor for VoltController
+func NewController(ctx context.Context, app intf.App) intf.IVPClientAgent {
+ var controller VoltController
+
+ controller.rebootInProgressDevices = make(map[string]string)
+ controller.devices = make(map[string]*Device)
+ controller.deviceLock = sync.RWMutex{}
+ controller.ctx = ctx
+ controller.app = app
+ controller.BlockedDeviceList = util.NewConcurrentMap()
+ controller.deviceTaskQueue = util.NewConcurrentMap()
+ db = database.GetDatabase()
+ vcontroller = &controller
+ return &controller
+}
+
+// AddDevice to add device
+func (v *VoltController) AddDevice(config *intf.VPClientCfg) intf.IVPClient {
+
+ d := NewDevice(config.DeviceID, config.SerialNum, config.VolthaClient, config.SouthBoundID)
+ v.devices[config.DeviceID] = d
+ v.app.AddDevice(d.ID, d.SerialNum, config.SouthBoundID)
+
+ d.RestoreMetersFromDb()
+ d.RestoreGroupsFromDb()
+ d.RestoreFlowsFromDb()
+ d.RestorePortsFromDb()
+ d.ConnectInd(context.TODO(), intf.DeviceDisc)
+ d.packetOutChannel = config.PacketOutChannel
+
+ logger.Warnw(ctx, "Added device", log.Fields{"Device": config.DeviceID, "SerialNo": d.SerialNum, "State": d.State})
+
+ return d
+}
+
+// DelDevice to delete device
+func (v *VoltController) DelDevice(id string) {
+ d, ok := v.devices[id]
+ if ok {
+ delete(v.devices, id)
+ d.Delete()
+ }
+ v.app.DelDevice(id)
+ d.cancel() // To stop the device tables sync routine
+ logger.Warnw(ctx, "Deleted device", log.Fields{"Device": id})
+}
+
+//AddControllerTask - add task to controller queue
+func (v *VoltController) AddControllerTask(device string, task tasks.Task) {
+ var taskQueueIntf interface{}
+ var taskQueue *tasks.Tasks
+ var found bool
+ if taskQueueIntf, found = v.deviceTaskQueue.Get(device); !found {
+ taskQueue = tasks.NewTasks(context.TODO())
+ v.deviceTaskQueue.Set(device, taskQueue)
+ } else {
+ taskQueue = taskQueueIntf.(*tasks.Tasks)
+ }
+ taskQueue.AddTask(task)
+ logger.Warnw(ctx, "Task Added to Controller Task List", log.Fields{"Len": taskQueue.NumPendingTasks(), "Total": taskQueue.TotalTasks()})
+}
+
+//AddNewDevice - called when new device is discovered. This will be
+//processed as part of controller queue
+func (v *VoltController) AddNewDevice(config *intf.VPClientCfg) {
+ adt := NewAddDeviceTask(config)
+ v.AddControllerTask(config.DeviceID, adt)
+}
+
+// GetDevice to get device info
+func (v *VoltController) GetDevice(id string) (*Device, error) {
+ d, ok := v.devices[id]
+ if ok {
+ return d, nil
+ }
+ return nil, errorCodes.ErrDeviceNotFound
+}
+
+// IsRebootInProgressForDevice to check if reboot is in progress for the device
+func (v *VoltController) IsRebootInProgressForDevice(device string) bool {
+ v.rebootLock.Lock()
+ defer v.rebootLock.Unlock()
+ _, ok := v.rebootInProgressDevices[device]
+ return ok
+}
+
+// SetRebootInProgressForDevice to set reboot in progress for the device
+func (v *VoltController) SetRebootInProgressForDevice(device string) bool {
+ v.rebootLock.Lock()
+ defer v.rebootLock.Unlock()
+ _, ok := v.rebootInProgressDevices[device]
+ if ok {
+ return true
+ }
+ v.rebootInProgressDevices[device] = device
+ logger.Warnw(ctx, "Setted Reboot-In-Progress flag", log.Fields{"Device": device})
+
+ d, err := v.GetDevice(device)
+ if err == nil {
+ d.ResetCache()
+ } else {
+ logger.Errorw(ctx, "Failed to get device", log.Fields{"Device": device, "Error": err})
+ }
+
+ return true
+}
+
+// ReSetRebootInProgressForDevice to reset reboot in progress for the device
+func (v *VoltController) ReSetRebootInProgressForDevice(device string) bool {
+ v.rebootLock.Lock()
+ defer v.rebootLock.Unlock()
+ _, ok := v.rebootInProgressDevices[device]
+ if !ok {
+ return true
+ }
+ delete(v.rebootInProgressDevices, device)
+ logger.Warnw(ctx, "Resetted Reboot-In-Progress flag", log.Fields{"Device": device})
+ return true
+}
+
+// DeviceRebootInd is device reboot indication
+func (v *VoltController) DeviceRebootInd(dID string, srNo string, sbID string) {
+ v.app.DeviceRebootInd(dID, srNo, sbID)
+ _ = db.DelAllRoutesForDevice(dID)
+ _ = db.DelAllGroup(dID)
+ _ = db.DelAllMeter(dID)
+ _ = db.DelAllPONCounters(dID)
+}
+
+// DeviceDisableInd is device deactivation indication
+func (v *VoltController) DeviceDisableInd(dID string) {
+ v.app.DeviceDisableInd(dID)
+}
+
+//TriggerPendingProfileDeleteReq - trigger pending profile delete requests
+func (v *VoltController) TriggerPendingProfileDeleteReq(device string) {
+ v.app.TriggerPendingProfileDeleteReq(device)
+}
+
+//TriggerPendingMigrateServicesReq - trigger pending services migration requests
+func (v *VoltController) TriggerPendingMigrateServicesReq(device string) {
+ v.app.TriggerPendingMigrateServicesReq(device)
+}
+
+// SetAuditFlags to set the audit flags
+func (v *VoltController) SetAuditFlags(device *Device) {
+ v.app.SetRebootFlag(true)
+ device.auditInProgress = true
+}
+
+// ResetAuditFlags to reset the audit flags
+func (v *VoltController) ResetAuditFlags(device *Device) {
+ v.app.SetRebootFlag(false)
+ device.auditInProgress = false
+}
+
+//ProcessFlowModResultIndication - send flow mod result notification
+func (v *VoltController) ProcessFlowModResultIndication(flowStatus intf.FlowStatus) {
+ v.app.ProcessFlowModResultIndication(flowStatus)
+}
+
+// AddVPAgent to add the vpagent
+func (v *VoltController) AddVPAgent(vep string, vpa *vpagent.VPAgent) {
+ v.vagent[vep] = vpa
+}
+
+// VPAgent to get vpagent info
+func (v *VoltController) VPAgent(vep string) (*vpagent.VPAgent, error) {
+ vpa, ok := v.vagent[vep]
+ if ok {
+ return vpa, nil
+ }
+ return nil, errors.New("VPA Not Registered")
+}
+
+// PacketOutReq for packet out request
+func (v *VoltController) PacketOutReq(device string, inport string, outport string, pkt []byte, isCustomPkt bool) error {
+ logger.Debugw(ctx, "Packet Out Req", log.Fields{"Device": device, "OutPort": outport})
+ d, err := v.GetDevice(device)
+ if err != nil {
+ return err
+ }
+ logger.Debugw(ctx, "Packet Out Pkt", log.Fields{"Pkt": hex.EncodeToString(pkt)})
+ return d.PacketOutReq(inport, outport, pkt, isCustomPkt)
+}
+
+// AddFlows to add flows
+func (v *VoltController) AddFlows(port string, device string, flow *of.VoltFlow) error {
+ d, err := v.GetDevice(device)
+ if err != nil {
+ logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
+ return err
+ }
+ devPort := d.GetPortByName(port)
+ if devPort == nil {
+ logger.Errorw(ctx, "Port Not Found", log.Fields{"Device": device})
+ return errorCodes.ErrPortNotFound
+ }
+ if d.ctx == nil {
+ //FIXME: Application should know the context before it could submit task. Handle at application level
+ logger.Errorw(ctx, "Context is missing. AddFlow Operation Not added to Task", log.Fields{"Device": device})
+ return errorCodes.ErrInvalidParamInRequest
+ }
+
+ var isMigrationRequired bool
+ if flow.MigrateCookie {
+ // flow migration to new cookie must be done only during the audit. Migration for all subflows must be done if
+ // atlease one subflow with old cookie found in the device.
+ for _, subFlow := range flow.SubFlows {
+ if isMigrationRequired = d.IsFlowPresentWithOldCookie(subFlow); isMigrationRequired {
+ break
+ }
+ }
+ }
+
+ if isMigrationRequired {
+ // In this case, the flow is updated in local cache and db here.
+ // Actual flow deletion and addition at voltha will happen during flow tables audit.
+ for _, subFlow := range flow.SubFlows {
+ logger.Debugw(ctx, "Cookie Migration Required", log.Fields{"OldCookie": subFlow.OldCookie, "NewCookie": subFlow.Cookie})
+ if err := d.DelFlowWithOldCookie(subFlow); err != nil {
+ logger.Errorw(ctx, "Delete flow with old cookie failed", log.Fields{"Error": err, "OldCookie": subFlow.OldCookie})
+ }
+ if err := d.AddFlow(subFlow); err != nil {
+ logger.Errorw(ctx, "Flow Add Failed", log.Fields{"Error": err, "Cookie": subFlow.Cookie})
+ }
+ }
+ } else {
+ flow.Command = of.CommandAdd
+ d.UpdateFlows(flow, devPort)
+ for cookie := range flow.SubFlows {
+ logger.Debugw(ctx, "Flow Add added to queue", log.Fields{"Cookie": cookie, "Device": device, "Port": port})
+ }
+ }
+ return nil
+}
+
+// DelFlows to delete flows
+func (v *VoltController) DelFlows(port string, device string, flow *of.VoltFlow) error {
+ d, err := v.GetDevice(device)
+ if err != nil {
+ logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
+ return err
+ }
+ devPort := d.GetPortByName(port)
+ if devPort == nil {
+ logger.Errorw(ctx, "Port Not Found", log.Fields{"Device": device})
+ return errorCodes.ErrPortNotFound
+ }
+ if d.ctx == nil {
+ //FIXME: Application should know the context before it could submit task. Handle at application level
+ logger.Errorw(ctx, "Context is missing. DelFlow Operation Not added to Task", log.Fields{"Device": device})
+ return errorCodes.ErrInvalidParamInRequest
+ }
+
+ var isMigrationRequired bool
+ if flow.MigrateCookie {
+ // flow migration to new cookie must be done only during the audit. Migration for all subflows must be done if
+ // atlease one subflow with old cookie found in the device.
+ for _, subFlow := range flow.SubFlows {
+ if isMigrationRequired = d.IsFlowPresentWithOldCookie(subFlow); isMigrationRequired {
+ break
+ }
+ }
+ }
+
+ if isMigrationRequired {
+ // In this case, the flow is deleted from local cache and db here.
+ // Actual flow deletion at voltha will happen during flow tables audit.
+ for _, subFlow := range flow.SubFlows {
+ logger.Debugw(ctx, "Old Cookie delete Required", log.Fields{"OldCookie": subFlow.OldCookie})
+ if err := d.DelFlowWithOldCookie(subFlow); err != nil {
+ logger.Errorw(ctx, "DelFlowWithOldCookie failed", log.Fields{"OldCookie": subFlow.OldCookie, "Error": err})
+ }
+ }
+ } else {
+ flow.Command = of.CommandDel
+ d.UpdateFlows(flow, devPort)
+ for cookie := range flow.SubFlows {
+ logger.Debugw(ctx, "Flow Del added to queue", log.Fields{"Cookie": cookie, "Device": device, "Port": port})
+ }
+ }
+ return nil
+}
+
+// GroupUpdate for group update
+func (v *VoltController) GroupUpdate(port string, device string, group *of.Group) error {
+ d, err := v.GetDevice(device)
+ if err != nil {
+ logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
+ return err
+ }
+
+ devPort := d.GetPortByName(port)
+ if devPort == nil {
+ logger.Errorw(ctx, "Port Not Found", log.Fields{"Device": device})
+ return errorCodes.ErrPortNotFound
+ }
+
+ if d.ctx == nil {
+ //FIXME: Application should know the context before it could submit task. Handle at application level
+ logger.Errorw(ctx, "Context is missing. GroupMod Operation Not added to task", log.Fields{"Device": device})
+ return errorCodes.ErrInvalidParamInRequest
+ }
+
+ d.UpdateGroup(group, devPort)
+ return nil
+}
+
+// ModMeter to get mod meter info
+func (v *VoltController) ModMeter(port string, device string, command of.MeterCommand, meter *of.Meter) error {
+ d, err := v.GetDevice(device)
+ if err != nil {
+ logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
+ return err
+ }
+
+ devPort := d.GetPortByName(port)
+ if devPort == nil {
+ logger.Errorw(ctx, "Port Not Found", log.Fields{"Device": device})
+ return errorCodes.ErrPortNotFound
+ }
+
+ d.ModMeter(command, meter, devPort)
+ return nil
+}
+
+// PortAddInd for port add indication
+func (v *VoltController) PortAddInd(device string, id uint32, name string) {
+ v.app.PortAddInd(device, id, name)
+}
+
+// PortDelInd for port delete indication
+func (v *VoltController) PortDelInd(device string, port string) {
+ v.app.PortDelInd(device, port)
+}
+
+// PortUpdateInd for port update indication
+func (v *VoltController) PortUpdateInd(device string, name string, id uint32) {
+ v.app.PortUpdateInd(device, name, id)
+}
+
+// PortUpInd for port up indication
+func (v *VoltController) PortUpInd(device string, port string) {
+ v.app.PortUpInd(device, port)
+}
+
+// PortDownInd for port down indication
+func (v *VoltController) PortDownInd(device string, port string) {
+ v.app.PortDownInd(device, port)
+}
+
+// DeviceUpInd for device up indication
+func (v *VoltController) DeviceUpInd(device string) {
+ v.app.DeviceUpInd(device)
+}
+
+// DeviceDownInd for device down indication
+func (v *VoltController) DeviceDownInd(device string) {
+ v.app.DeviceDownInd(device)
+}
+
+// PacketInInd for packet in indication
+func (v *VoltController) PacketInInd(device string, port string, data []byte) {
+ v.app.PacketInInd(device, port, data)
+}
+
+// GetPortState to get port status
+func (v *VoltController) GetPortState(device string, name string) (PortState, error) {
+ d, err := v.GetDevice(device)
+ if err != nil {
+ logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
+ return PortStateDown, err
+ }
+ return d.GetPortState(name)
+}
+
+// UpdateMvlanProfiles for update mvlan profiles
+func (v *VoltController) UpdateMvlanProfiles(device string) {
+ v.app.UpdateMvlanProfilesForDevice(device)
+}
+
+// GetController to get controller
+func GetController() *VoltController {
+ return vcontroller
+}
+
+/*
+// PostIndication to post indication
+func (v *VoltController) PostIndication(device string, task interface{}) error {
+ var srvTask AddServiceIndTask
+ var portTask AddPortIndTask
+ var taskCommon tasks.Task
+ var isSvcTask bool
+
+ switch data := task.(type) {
+ case *AddServiceIndTask:
+ srvTask = *data
+ taskCommon = data
+ isSvcTask = true
+ case *AddPortIndTask:
+ portTask = *data
+ taskCommon = data
+ }
+
+ d, err := v.GetDevice(device)
+ if err != nil {
+ logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
+ //It means device itself it not present so just post the indication directly
+ if isSvcTask {
+ msgbus.PostAccessConfigInd(srvTask.result, d.SerialNum, srvTask.indicationType, srvTask.serviceName, 0, srvTask.reason, srvTask.trigger, srvTask.portState)
+ } else {
+ msgbus.ProcessPortInd(portTask.indicationType, d.SerialNum, portTask.portName, portTask.accessConfig, portTask.serviceList)
+ }
+ return err
+ }
+ if taskCommon != nil {
+ d.AddTask(taskCommon)
+ }
+ return nil
+}
+*/
+
+// GetTaskList to get the task list
+func (v *VoltController) GetTaskList(device string) []tasks.Task {
+ d, err := v.GetDevice(device)
+ if err != nil || d.ctx == nil {
+ logger.Errorw(ctx, "Device Not Connected/Found", log.Fields{"Device": device, "Dev Obj": d})
+ return []tasks.Task{}
+ }
+ return d.GetTaskList()
+
+}
+
+// AddBlockedDevices to add devices to blocked devices list
+func (v *VoltController) AddBlockedDevices(deviceSerialNumber string) {
+ v.BlockedDeviceList.Set(deviceSerialNumber, deviceSerialNumber)
+}
+
+// DelBlockedDevices to remove device from blocked device list
+func (v *VoltController) DelBlockedDevices(deviceSerialNumber string) {
+ v.BlockedDeviceList.Remove(deviceSerialNumber)
+}
+
+// IsBlockedDevice to check if device is blocked
+func (v *VoltController) IsBlockedDevice(deviceSerialNumber string) bool {
+ _, ifPresent := v.BlockedDeviceList.Get(deviceSerialNumber)
+ return ifPresent
+}
diff --git a/internal/pkg/controller/controllertasks.go b/internal/pkg/controller/controllertasks.go
new file mode 100644
index 0000000..bd06ffb
--- /dev/null
+++ b/internal/pkg/controller/controllertasks.go
@@ -0,0 +1,76 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package controller
+
+import (
+ "context"
+ "time"
+
+ "voltha-go-controller/internal/pkg/intf"
+ "voltha-go-controller/internal/pkg/vpagent"
+
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+)
+
+// AddDeviceTask structure
+type AddDeviceTask struct {
+ taskID uint8
+ ctx context.Context
+ config *intf.VPClientCfg
+ timestamp string
+}
+
+// NewAddDeviceTask is the constructor for AddDeviceTask
+func NewAddDeviceTask(config *intf.VPClientCfg) *AddDeviceTask {
+ var adt AddDeviceTask
+ adt.config = config
+ tstamp := (time.Now()).Format(time.RFC3339Nano)
+ adt.timestamp = tstamp
+ return &adt
+}
+
+// Name returns name of the task
+func (adt *AddDeviceTask) Name() string {
+ return "Add Device Task"
+}
+
+// TaskID returns task Id of the task
+func (adt *AddDeviceTask) TaskID() uint8 {
+ return adt.taskID
+}
+
+// Timestamp returns time stamp for the task
+func (adt *AddDeviceTask) Timestamp() string {
+ return adt.timestamp
+}
+
+// Stop to stop the task
+func (adt *AddDeviceTask) Stop() {
+}
+
+// Start to start the task
+func (adt *AddDeviceTask) Start(ctx context.Context, taskID uint8) error {
+ adt.taskID = taskID
+ adt.ctx = ctx
+
+ logger.Infow(ctx, "Add Device Task Triggered", log.Fields{"Device": adt.config.DeviceID, "SerialNum": adt.config.SerialNum})
+
+ device := GetController().AddDevice(adt.config)
+ vpagent.GetVPAgent().AddClientToClientMap(adt.config.DeviceID, device)
+ logger.Infow(ctx, "Add Device Task Completed", log.Fields{"Device": adt.config.DeviceID, "SerialNum": adt.config.SerialNum})
+
+ return nil
+}
diff --git a/internal/pkg/controller/device.go b/internal/pkg/controller/device.go
new file mode 100644
index 0000000..aa7bd2c
--- /dev/null
+++ b/internal/pkg/controller/device.go
@@ -0,0 +1,1042 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package controller
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ infraerror "voltha-go-controller/internal/pkg/errorcodes"
+ "strconv"
+ "sync"
+ "time"
+
+ "voltha-go-controller/database"
+ "voltha-go-controller/internal/pkg/holder"
+ "voltha-go-controller/internal/pkg/intf"
+ "voltha-go-controller/internal/pkg/of"
+ //"voltha-go-controller/internal/pkg/vpagent"
+ "voltha-go-controller/internal/pkg/tasks"
+ "voltha-go-controller/internal/pkg/util"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
+)
+
+// PortState type
+type PortState string
+
+const (
+ // PortStateDown constant
+ PortStateDown PortState = "DOWN"
+ // PortStateUp constant
+ PortStateUp PortState = "UP"
+ // DefaultMaxFlowQueues constant
+ DefaultMaxFlowQueues = 67
+ //ErrDuplicateFlow - indicates flow already exists in DB
+ ErrDuplicateFlow string = "Duplicate Flow"
+)
+
+// DevicePort structure
+type DevicePort struct {
+ tasks.Tasks
+ Name string
+ ID uint32
+ State PortState
+ Version string
+}
+
+// NewDevicePort is the constructor for DevicePort
+func NewDevicePort(id uint32, name string) *DevicePort {
+ var port DevicePort
+
+ port.ID = id
+ port.Name = name
+ port.State = PortStateDown
+ return &port
+}
+
+// UniIDFlowQueue structure which maintains flows in queue.
+type UniIDFlowQueue struct {
+ tasks.Tasks
+ ID uint32
+}
+
+// NewUniIDFlowQueue is the constructor for UniIDFlowQueue.
+func NewUniIDFlowQueue(id uint32) *UniIDFlowQueue {
+ var flowQueue UniIDFlowQueue
+ flowQueue.ID = id
+ return &flowQueue
+}
+
+// DeviceState type
+type DeviceState string
+
+const (
+
+ // DeviceStateUNKNOWN constant
+ DeviceStateUNKNOWN DeviceState = "UNKNOWN"
+ // DeviceStateINIT constant
+ DeviceStateINIT DeviceState = "INIT"
+ // DeviceStateUP constant
+ DeviceStateUP DeviceState = "UP"
+ // DeviceStateDOWN constant
+ DeviceStateDOWN DeviceState = "DOWN"
+ // DeviceStateREBOOTED constant
+ DeviceStateREBOOTED DeviceState = "REBOOTED"
+ // DeviceStateDISABLED constant
+ DeviceStateDISABLED DeviceState = "DISABLED"
+ // DeviceStateDELETED constant
+ DeviceStateDELETED DeviceState = "DELETED"
+)
+
+// Device structure
+type Device struct {
+ tasks.Tasks
+ ID string
+ SerialNum string
+ State DeviceState
+ PortsByID map[uint32]*DevicePort
+ PortsByName map[string]*DevicePort
+ portLock sync.RWMutex
+ vclientHolder *holder.VolthaServiceClientHolder
+ ctx context.Context
+ cancel context.CancelFunc
+ packetOutChannel chan *ofp.PacketOut
+ flows map[uint64]*of.VoltSubFlow
+ flowLock sync.RWMutex
+ meters map[uint32]*of.Meter
+ meterLock sync.RWMutex
+ groups sync.Map //map[uint32]*of.Group -> [GroupId : Group]
+ auditInProgress bool
+ flowQueueLock sync.RWMutex
+ flowHash uint32
+ flowQueue map[uint32]*UniIDFlowQueue // key is hash ID generated and value is UniIDFlowQueue.
+ deviceAuditInProgress bool
+ SouthBoundID string
+}
+
+// NewDevice is the constructor for Device
+func NewDevice(id string, slno string, vclientHldr *holder.VolthaServiceClientHolder, southBoundID string) *Device {
+ var device Device
+ device.ID = id
+ device.SerialNum = slno
+ device.State = DeviceStateDOWN
+ device.PortsByID = make(map[uint32]*DevicePort)
+ device.PortsByName = make(map[string]*DevicePort)
+ device.vclientHolder = vclientHldr
+ device.flows = make(map[uint64]*of.VoltSubFlow)
+ device.meters = make(map[uint32]*of.Meter)
+ device.flowQueue = make(map[uint32]*UniIDFlowQueue)
+ //Get the flowhash from db and update the flowhash variable in the device.
+ device.SouthBoundID = southBoundID
+ flowHash, err := db.GetFlowHash(id)
+ if err != nil {
+ device.flowHash = DefaultMaxFlowQueues
+ } else {
+ var hash uint32
+ err = json.Unmarshal([]byte(flowHash), &hash)
+ if err != nil {
+ logger.Error(ctx, "Failed to unmarshall flowhash")
+ } else {
+ device.flowHash = hash
+ }
+ }
+ logger.Infow(ctx, "Flow hash for device", log.Fields{"Deviceid": id, "hash": device.flowHash})
+ return &device
+}
+
+// ResetCache to reset cache
+func (d *Device) ResetCache() {
+ logger.Warnw(ctx, "Resetting flows, meters and groups cache", log.Fields{"Device": d.ID})
+ d.flows = make(map[uint64]*of.VoltSubFlow)
+ d.meters = make(map[uint32]*of.Meter)
+ d.groups = sync.Map{}
+}
+
+// GetFlow - Get the flow from device obj
+func (d *Device) GetFlow(cookie uint64) (*of.VoltSubFlow, bool) {
+ d.flowLock.RLock()
+ defer d.flowLock.RUnlock()
+ logger.Infow(ctx, "Get Flow", log.Fields{"Cookie": cookie})
+ flow, ok := d.flows[cookie]
+ return flow, ok
+}
+
+// AddFlow - Adds the flow to the device and also to the database
+func (d *Device) AddFlow(flow *of.VoltSubFlow) error {
+ d.flowLock.Lock()
+ defer d.flowLock.Unlock()
+ logger.Infow(ctx, "AddFlow to device", log.Fields{"Cookie": flow.Cookie})
+ if _, ok := d.flows[flow.Cookie]; ok {
+ return errors.New(ErrDuplicateFlow)
+ }
+ d.flows[flow.Cookie] = flow
+ d.AddFlowToDb(flow)
+ return nil
+}
+
+// AddFlowToDb is the utility to add the flow to the device
+func (d *Device) AddFlowToDb(flow *of.VoltSubFlow) {
+ if b, err := json.Marshal(flow); err == nil {
+ if err = db.PutFlow(d.ID, flow.Cookie, string(b)); err != nil {
+ logger.Errorw(ctx, "Write Flow to DB failed", log.Fields{"device": d.ID, "cookie": flow.Cookie, "Reason": err})
+ }
+ }
+}
+
+// DelFlow - Deletes the flow from the device and the database
+func (d *Device) DelFlow(flow *of.VoltSubFlow) error {
+ d.flowLock.Lock()
+ defer d.flowLock.Unlock()
+ if _, ok := d.flows[flow.Cookie]; ok {
+ delete(d.flows, flow.Cookie)
+ d.DelFlowFromDb(flow.Cookie)
+ return nil
+ }
+ return errors.New("Flow does not Exist")
+}
+
+// DelFlowFromDb is utility to delete the flow from the device
+func (d *Device) DelFlowFromDb(flowID uint64) {
+ _ = db.DelFlow(d.ID, flowID)
+}
+
+// IsFlowPresentWithOldCookie is to check whether there is any flow with old cookie.
+func (d *Device) IsFlowPresentWithOldCookie(flow *of.VoltSubFlow) bool {
+ d.flowLock.RLock()
+ defer d.flowLock.RUnlock()
+ if _, ok := d.flows[flow.Cookie]; ok {
+ return false
+ } else if flow.OldCookie != 0 && flow.Cookie != flow.OldCookie {
+ if _, ok := d.flows[flow.OldCookie]; ok {
+ logger.Infow(ctx, "Flow present with old cookie", log.Fields{"OldCookie": flow.OldCookie})
+ return true
+ }
+ }
+ return false
+}
+
+// DelFlowWithOldCookie is to delete flow with old cookie.
+func (d *Device) DelFlowWithOldCookie(flow *of.VoltSubFlow) error {
+ d.flowLock.Lock()
+ defer d.flowLock.Unlock()
+ if _, ok := d.flows[flow.OldCookie]; ok {
+ logger.Infow(ctx, "Flow was added before vgc upgrade. Trying to delete with old cookie",
+ log.Fields{"OldCookie": flow.OldCookie})
+ delete(d.flows, flow.OldCookie)
+ d.DelFlowFromDb(flow.OldCookie)
+ return nil
+ }
+ return errors.New("Flow does not Exist")
+}
+
+// RestoreFlowsFromDb to restore flows from database
+func (d *Device) RestoreFlowsFromDb() {
+ flows, _ := db.GetFlows(d.ID)
+ for _, flow := range flows {
+ b, ok := flow.Value.([]byte)
+ if !ok {
+ logger.Warn(ctx, "The value type is not []byte")
+ continue
+ }
+ d.CreateFlowFromString(b)
+ }
+}
+
+// CreateFlowFromString to create flow from string
+func (d *Device) CreateFlowFromString(b []byte) {
+ var flow of.VoltSubFlow
+ if err := json.Unmarshal(b, &flow); err == nil {
+ if _, ok := d.flows[flow.Cookie]; !ok {
+ logger.Debugw(ctx, "Adding Flow From Db", log.Fields{"Cookie": flow.Cookie})
+ d.flows[flow.Cookie] = &flow
+ } else {
+ logger.Warnw(ctx, "Duplicate Flow", log.Fields{"Cookie": flow.Cookie})
+ }
+ } else {
+ logger.Warn(ctx, "Unmarshal failed")
+ }
+}
+
+// ----------------------------------------------------------
+// Database related functionality
+// Group operations at the device which include update and delete
+
+// UpdateGroupEntry - Adds/Updates the group to the device and also to the database
+func (d *Device) UpdateGroupEntry(group *of.Group) {
+
+ logger.Infow(ctx, "Update Group to device", log.Fields{"ID": group.GroupID})
+ d.groups.Store(group.GroupID, group)
+ d.AddGroupToDb(group)
+}
+
+// AddGroupToDb - Utility to add the group to the device DB
+func (d *Device) AddGroupToDb(group *of.Group) {
+ if b, err := json.Marshal(group); err == nil {
+ logger.Infow(ctx, "Adding Group to DB", log.Fields{"grp": group, "Json": string(b)})
+ if err = db.PutGroup(d.ID, group.GroupID, string(b)); err != nil {
+ logger.Errorw(ctx, "Write Group to DB failed", log.Fields{"device": d.ID, "groupID": group.GroupID, "Reason": err})
+ }
+ }
+}
+
+// DelGroupEntry - Deletes the group from the device and the database
+func (d *Device) DelGroupEntry(group *of.Group) {
+
+ if _, ok := d.groups.Load(group.GroupID); ok {
+ d.groups.Delete(group.GroupID)
+ d.DelGroupFromDb(group.GroupID)
+ }
+}
+
+// DelGroupFromDb - Utility to delete the Group from the device
+func (d *Device) DelGroupFromDb(groupID uint32) {
+ _ = db.DelGroup(d.ID, groupID)
+}
+
+//RestoreGroupsFromDb - restores all groups from DB
+func (d *Device) RestoreGroupsFromDb() {
+ logger.Info(ctx, "Restoring Groups")
+ groups, _ := db.GetGroups(d.ID)
+ for _, group := range groups {
+ b, ok := group.Value.([]byte)
+ if !ok {
+ logger.Warn(ctx, "The value type is not []byte")
+ continue
+ }
+ d.CreateGroupFromString(b)
+ }
+}
+
+//CreateGroupFromString - Forms group struct from json string
+func (d *Device) CreateGroupFromString(b []byte) {
+ var group of.Group
+ if err := json.Unmarshal(b, &group); err == nil {
+ if _, ok := d.groups.Load(group.GroupID); !ok {
+ logger.Debugw(ctx, "Adding Group From Db", log.Fields{"GroupId": group.GroupID})
+ d.groups.Store(group.GroupID, &group)
+ } else {
+ logger.Warnw(ctx, "Duplicate Group", log.Fields{"GroupId": group.GroupID})
+ }
+ } else {
+ logger.Warn(ctx, "Unmarshal failed")
+ }
+}
+
+// AddMeter to add meter
+func (d *Device) AddMeter(meter *of.Meter) error {
+ d.meterLock.Lock()
+ defer d.meterLock.Unlock()
+ if _, ok := d.meters[meter.ID]; ok {
+ return errors.New("Duplicate Meter")
+ }
+ d.meters[meter.ID] = meter
+ go d.AddMeterToDb(meter)
+ return nil
+}
+
+// GetMeter to get meter
+func (d *Device) GetMeter(id uint32) (*of.Meter, error) {
+ d.meterLock.RLock()
+ defer d.meterLock.RUnlock()
+ if m, ok := d.meters[id]; ok {
+ return m, nil
+ }
+ return nil, errors.New("Meter Not Found")
+}
+
+// DelMeter to delete meter
+func (d *Device) DelMeter(meter *of.Meter) bool {
+ d.meterLock.Lock()
+ defer d.meterLock.Unlock()
+ if _, ok := d.meters[meter.ID]; ok {
+ delete(d.meters, meter.ID)
+ go d.DelMeterFromDb(meter.ID)
+ return true
+ }
+ return false
+}
+
+// AddMeterToDb is utility to add the Group to the device
+func (d *Device) AddMeterToDb(meter *of.Meter) {
+ if b, err := json.Marshal(meter); err == nil {
+ if err = db.PutDeviceMeter(d.ID, meter.ID, string(b)); err != nil {
+ logger.Errorw(ctx, "Write Meter to DB failed", log.Fields{"device": d.ID, "meterID": meter.ID, "Reason": err})
+ }
+ }
+}
+
+// DelMeterFromDb to delete meter from db
+func (d *Device) DelMeterFromDb(id uint32) {
+ _ = db.DelDeviceMeter(d.ID, id)
+}
+
+// RestoreMetersFromDb to restore meters from db
+func (d *Device) RestoreMetersFromDb() {
+ meters, _ := db.GetDeviceMeters(d.ID)
+ for _, meter := range meters {
+ b, ok := meter.Value.([]byte)
+ if !ok {
+ logger.Warn(ctx, "The value type is not []byte")
+ continue
+ }
+ d.CreateMeterFromString(b)
+ }
+}
+
+// CreateMeterFromString to create meter from string
+func (d *Device) CreateMeterFromString(b []byte) {
+ var meter of.Meter
+ if err := json.Unmarshal(b, &meter); err == nil {
+ if _, ok := d.meters[meter.ID]; !ok {
+ logger.Debugw(ctx, "Adding Meter From Db", log.Fields{"ID": meter.ID})
+ d.meters[meter.ID] = &meter
+ } else {
+ logger.Warnw(ctx, "Duplicate Meter", log.Fields{"ID": meter.ID})
+ }
+ } else {
+ logger.Warn(ctx, "Unmarshal failed")
+ }
+}
+
+// VolthaClient to get voltha client
+func (d *Device) VolthaClient() voltha.VolthaServiceClient {
+ return d.vclientHolder.Get()
+}
+
+// AddPort to add the port as requested by the device/VOLTHA
+// Inform the application if the port is successfully added
+func (d *Device) AddPort(id uint32, name string) error {
+ d.portLock.Lock()
+ defer d.portLock.Unlock()
+
+ if _, ok := d.PortsByID[id]; ok {
+ return errors.New("Duplicate port")
+ }
+ if _, ok := d.PortsByName[name]; ok {
+ return errors.New("Duplicate port")
+ }
+
+ p := NewDevicePort(id, name)
+ d.PortsByID[id] = p
+ d.PortsByName[name] = p
+ d.WritePortToDb(p)
+ GetController().PortAddInd(d.ID, p.ID, p.Name)
+ logger.Infow(ctx, "Added Port", log.Fields{"Device": d.ID, "Port": id})
+ return nil
+}
+
+// DelPort to delete the port as requested by the device/VOLTHA
+// Inform the application if the port is successfully deleted
+func (d *Device) DelPort(id uint32) error {
+
+ p := d.GetPortByID(id)
+ if p == nil {
+ return errors.New("Unknown Port")
+ }
+ if p.State == PortStateUp {
+ GetController().PortDownInd(d.ID, p.Name)
+ }
+ d.portLock.Lock()
+ defer d.portLock.Unlock()
+
+ GetController().PortDelInd(d.ID, p.Name)
+ delete(d.PortsByID, p.ID)
+ delete(d.PortsByName, p.Name)
+ d.DelPortFromDb(p.ID)
+ logger.Infow(ctx, "Deleted Port", log.Fields{"Device": d.ID, "Port": id})
+ return nil
+}
+
+// UpdatePortByName is utility to update the port by Name
+func (d *Device) UpdatePortByName(name string, port uint32) {
+ d.portLock.Lock()
+ defer d.portLock.Unlock()
+
+ p, ok := d.PortsByName[name]
+ if !ok {
+ return
+ }
+ delete(d.PortsByID, p.ID)
+ p.ID = port
+ d.PortsByID[port] = p
+ d.WritePortToDb(p)
+ GetController().PortUpdateInd(d.ID, p.Name, p.ID)
+ logger.Infow(ctx, "Updated Port", log.Fields{"Device": d.ID, "Port": p.ID, "PortName": name})
+}
+
+// GetPortName to get the name of the port by its id
+func (d *Device) GetPortName(id uint32) (string, error) {
+ d.portLock.RLock()
+ defer d.portLock.RUnlock()
+
+ if p, ok := d.PortsByID[id]; ok {
+ return p.Name, nil
+ }
+ logger.Errorw(ctx, "Port not found", log.Fields{"port": id})
+ return "", errors.New("Unknown Port ID")
+}
+
+// GetPortByID is utility to retrieve the port by ID
+func (d *Device) GetPortByID(id uint32) *DevicePort {
+ d.portLock.RLock()
+ defer d.portLock.RUnlock()
+
+ p, ok := d.PortsByID[id]
+ if ok {
+ return p
+ }
+ return nil
+}
+
+// GetPortByName is utility to retrieve the port by Name
+func (d *Device) GetPortByName(name string) *DevicePort {
+ d.portLock.RLock()
+ defer d.portLock.RUnlock()
+
+ p, ok := d.PortsByName[name]
+ if ok {
+ return p
+ }
+ return nil
+}
+
+// GetPortState to get the state of the port by name
+func (d *Device) GetPortState(name string) (PortState, error) {
+ d.portLock.RLock()
+ defer d.portLock.RUnlock()
+
+ if p, ok := d.PortsByName[name]; ok {
+ return p.State, nil
+ }
+ return PortStateDown, errors.New("Unknown Port ID")
+}
+
+// GetPortID to get the port-id by the port name
+func (d *Device) GetPortID(name string) (uint32, error) {
+ d.portLock.RLock()
+ defer d.portLock.RUnlock()
+
+ if p, ok := d.PortsByName[name]; ok {
+ return p.ID, nil
+ }
+ return 0, errors.New("Unknown Port ID")
+
+}
+
+// WritePortToDb to add the port to the database
+func (d *Device) WritePortToDb(port *DevicePort) {
+ port.Version = database.PresentVersionMap[database.DevicePortPath]
+ if b, err := json.Marshal(port); err == nil {
+ if err = db.PutPort(d.ID, port.ID, string(b)); err != nil {
+ logger.Errorw(ctx, "Write port to DB failed", log.Fields{"device": d.ID, "port": port.ID, "Reason": err})
+ }
+ }
+}
+
+// DelPortFromDb to delete port from database
+func (d *Device) DelPortFromDb(id uint32) {
+ _ = db.DelPort(d.ID, id)
+}
+
+// RestorePortsFromDb to restore ports from database
+func (d *Device) RestorePortsFromDb() {
+ ports, _ := db.GetPorts(d.ID)
+ for _, port := range ports {
+ b, ok := port.Value.([]byte)
+ if !ok {
+ logger.Warn(ctx, "The value type is not []byte")
+ continue
+ }
+ d.CreatePortFromString(b)
+ }
+}
+
+// CreatePortFromString to create port from string
+func (d *Device) CreatePortFromString(b []byte) {
+ var port DevicePort
+ if err := json.Unmarshal(b, &port); err == nil {
+ if _, ok := d.PortsByID[port.ID]; !ok {
+ logger.Debugw(ctx, "Adding Port From Db", log.Fields{"ID": port.ID})
+ d.PortsByID[port.ID] = &port
+ d.PortsByName[port.Name] = &port
+ GetController().PortAddInd(d.ID, port.ID, port.Name)
+ } else {
+ logger.Warnw(ctx, "Duplicate Port", log.Fields{"ID": port.ID})
+ }
+ } else {
+ logger.Warn(ctx, "Unmarshal failed")
+ }
+}
+
+// Delete : OLT Delete functionality yet to be implemented. IDeally all of the
+// resources should have been removed by this time. It is an error
+// scenario if the OLT has resources associated with it.
+func (d *Device) Delete() {
+ d.StopAll()
+}
+
+// Stop to stop the task
+func (d *Device) Stop() {
+}
+
+// ConnectInd is called when the connection between VGC and the VOLTHA is
+// restored. This will perform audit of the device post reconnection
+func (d *Device) ConnectInd(ctx context.Context, discType intf.DiscoveryType) {
+ logger.Warnw(ctx, "Audit Upon Connection Establishment", log.Fields{"Device": d.ID, "State": d.State})
+ ctx1, cancel := context.WithCancel(ctx)
+ d.cancel = cancel
+ d.ctx = ctx1
+ d.Tasks.Initialize(ctx1)
+
+ logger.Warnw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
+ d.State = DeviceStateUP
+ GetController().DeviceUpInd(d.ID)
+
+ logger.Warnw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
+ t := NewAuditDevice(d, AuditEventDeviceDisc)
+ d.Tasks.AddTask(t)
+
+ t1 := NewAuditTablesTask(d)
+ d.Tasks.AddTask(t1)
+
+ t2 := NewPendingProfilesTask(d)
+ d.Tasks.AddTask(t2)
+
+ go d.synchronizeDeviceTables()
+}
+
+func (d *Device) synchronizeDeviceTables() {
+
+ tick := time.NewTicker(deviceTableSyncDuration)
+loop:
+ for {
+ select {
+ case <-d.ctx.Done():
+ logger.Warnw(d.ctx, "Context Done. Cancelling Periodic Audit", log.Fields{"Context": ctx, "Device": d.ID, "DeviceSerialNum": d.SerialNum})
+ break loop
+ case <-tick.C:
+ t1 := NewAuditTablesTask(d)
+ d.Tasks.AddTask(t1)
+ }
+ }
+ tick.Stop()
+}
+
+// DeviceUpInd is called when the logical device state changes to UP. This will perform audit of the device post reconnection
+func (d *Device) DeviceUpInd() {
+ logger.Warnw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
+ d.State = DeviceStateUP
+ GetController().DeviceUpInd(d.ID)
+
+ logger.Warnw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
+ t := NewAuditDevice(d, AuditEventDeviceDisc)
+ d.Tasks.AddTask(t)
+
+ t1 := NewAuditTablesTask(d)
+ d.Tasks.AddTask(t1)
+
+ t2 := NewPendingProfilesTask(d)
+ d.Tasks.AddTask(t2)
+}
+
+// DeviceDownInd is called when the logical device state changes to Down.
+func (d *Device) DeviceDownInd() {
+ logger.Warnw(ctx, "Device State change Ind: Down", log.Fields{"Device": d.ID})
+ d.State = DeviceStateDOWN
+ GetController().DeviceDownInd(d.ID)
+}
+
+// DeviceRebootInd is called when the logical device is rebooted.
+func (d *Device) DeviceRebootInd() {
+ logger.Warnw(ctx, "Device State change Ind: Rebooted", log.Fields{"Device": d.ID})
+
+ if d.State == DeviceStateREBOOTED {
+ d.State = DeviceStateREBOOTED
+ logger.Warnw(ctx, "Ignoring Device State change Ind: REBOOT, Device Already in REBOOT state", log.Fields{"Device": d.ID, "SeralNo": d.SerialNum})
+ return
+ }
+
+ d.State = DeviceStateREBOOTED
+ GetController().SetRebootInProgressForDevice(d.ID)
+ GetController().DeviceRebootInd(d.ID, d.SerialNum, d.SouthBoundID)
+ d.ReSetAllPortStates()
+}
+
+// DeviceDisabledInd is called when the logical device is disabled
+func (d *Device) DeviceDisabledInd() {
+ logger.Warnw(ctx, "Device State change Ind: Disabled", log.Fields{"Device": d.ID})
+ d.State = DeviceStateDISABLED
+ GetController().DeviceDisableInd(d.ID)
+}
+
+//ReSetAllPortStates - Set all logical device port status to DOWN
+func (d *Device) ReSetAllPortStates() {
+ logger.Warnw(ctx, "Resetting all Ports State to DOWN", log.Fields{"Device": d.ID, "State": d.State})
+
+ d.portLock.Lock()
+ defer d.portLock.Unlock()
+
+ for _, port := range d.PortsByID {
+ if port.State != PortStateDown {
+ logger.Infow(ctx, "Resetting Port State to DOWN", log.Fields{"Device": d.ID, "Port": port})
+ GetController().PortDownInd(d.ID, port.Name)
+ port.State = PortStateDown
+ d.WritePortToDb(port)
+ }
+ }
+}
+
+//ReSetAllPortStatesInDb - Set all logical device port status to DOWN in DB and skip indication to application
+func (d *Device) ReSetAllPortStatesInDb() {
+ logger.Warnw(ctx, "Resetting all Ports State to DOWN In DB", log.Fields{"Device": d.ID, "State": d.State})
+
+ d.portLock.Lock()
+ defer d.portLock.Unlock()
+
+ for _, port := range d.PortsByID {
+ if port.State != PortStateDown {
+ logger.Infow(ctx, "Resetting Port State to DOWN and Write to DB", log.Fields{"Device": d.ID, "Port": port})
+ port.State = PortStateDown
+ d.WritePortToDb(port)
+ }
+ }
+}
+
+// ProcessPortUpdate deals with the change in port id (ONU movement) and taking action
+// to update only when the port state is DOWN
+func (d *Device) ProcessPortUpdate(portName string, port uint32, state uint32) {
+ if p := d.GetPortByName(portName); p != nil {
+ if p.ID != port {
+ logger.Infow(ctx, "Port ID update indication", log.Fields{"Port": p.Name, "Old PortID": p.ID, "New Port ID": port})
+ if p.State != PortStateDown {
+ logger.Errorw(ctx, "Port ID update failed. Port State UP", log.Fields{"Port": p})
+ return
+ }
+ d.UpdatePortByName(portName, port)
+ logger.Errorw(ctx, "Port ID Updated", log.Fields{"Port": p})
+ }
+ d.ProcessPortState(port, state)
+ }
+}
+
+// ***Operations Performed on Port state Transitions***
+//
+// |-----------------------------------------------------------------------------|
+// | State | Action |
+// |--------------------|--------------------------------------------------------|
+// | UP | UNI - Trigger Flow addition for service configured |
+// | | NNI - Trigger Flow addition for vnets & mvlan profiles |
+// | | |
+// | DOWN | UNI - Trigger Flow deletion for service configured |
+// | | NNI - Trigger Flow deletion for vnets & mvlan profiles |
+// | | |
+// |-----------------------------------------------------------------------------|
+//
+
+// ProcessPortState deals with the change in port status and taking action
+// based on the new state and the old state
+func (d *Device) ProcessPortState(port uint32, state uint32) {
+ if d.State != DeviceStateUP && !util.IsNniPort(port) {
+ logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
+ return
+ }
+ if p := d.GetPortByID(port); p != nil {
+ logger.Infow(ctx, "Port State Processing", log.Fields{"Received": state, "Current": p.State})
+
+ // Avoid blind initialization as the current tasks in the queue will be lost
+ // Eg: Service Del followed by Port Down - The flows will be dangling
+ // Eg: NNI Down followed by NNI UP - Mcast data flows will be dangling
+ p.Tasks.CheckAndInitialize(d.ctx)
+ if state == uint32(ofp.OfpPortState_OFPPS_LIVE) && p.State == PortStateDown {
+ // Transition from DOWN to UP
+ logger.Infow(ctx, "Port State Change to UP", log.Fields{"Device": d.ID, "Port": port})
+ GetController().PortUpInd(d.ID, p.Name)
+ p.State = PortStateUp
+ d.WritePortToDb(p)
+ } else if (state != uint32(ofp.OfpPortState_OFPPS_LIVE)) && (p.State != PortStateDown) {
+ // Transition from UP to Down
+ logger.Infow(ctx, "Port State Change to Down", log.Fields{"Device": d.ID, "Port": port})
+ GetController().PortDownInd(d.ID, p.Name)
+ p.State = PortStateDown
+ d.WritePortToDb(p)
+ } else {
+ logger.Warnw(ctx, "Dropping Port Ind: No Change in Port State", log.Fields{"PortName": p.Name, "ID": port, "Device": d.ID, "PortState": p.State, "IncomingState": state})
+ }
+ }
+}
+
+// ProcessPortStateAfterReboot - triggers the port state indication to sort out configu mismatch due to reboot
+func (d *Device) ProcessPortStateAfterReboot(port uint32, state uint32) {
+ if d.State != DeviceStateUP && !util.IsNniPort(port) {
+ logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
+ return
+ }
+ if p := d.GetPortByID(port); p != nil {
+ logger.Infow(ctx, "Port State Processing after Reboot", log.Fields{"Received": state, "Current": p.State})
+ p.Tasks.Initialize(d.ctx)
+ if p.State == PortStateUp {
+ logger.Infow(ctx, "Port State: UP", log.Fields{"Device": d.ID, "Port": port})
+ GetController().PortUpInd(d.ID, p.Name)
+ } else if p.State == PortStateDown {
+ logger.Infow(ctx, "Port State: Down", log.Fields{"Device": d.ID, "Port": port})
+ GetController().PortDownInd(d.ID, p.Name)
+ }
+ }
+}
+
+// ChangeEvent : Change event brings in ports related changes such as addition/deletion
+// or modification where the port status change up/down is indicated to the
+// controller
+func (d *Device) ChangeEvent(event *ofp.ChangeEvent) error {
+ cet := NewChangeEventTask(d.ctx, event, d)
+ d.AddTask(cet)
+ return nil
+}
+
+// PacketIn handle the incoming packet-in and deliver to the application for the
+// actual processing
+func (d *Device) PacketIn(pkt *ofp.PacketIn) {
+ logger.Debugw(ctx, "Received a Packet-In", log.Fields{"Device": d.ID})
+ if pkt.PacketIn.Reason != ofp.OfpPacketInReason_OFPR_ACTION {
+ logger.Warnw(ctx, "Unsupported PacketIn Reason", log.Fields{"Reason": pkt.PacketIn.Reason})
+ return
+ }
+ data := pkt.PacketIn.Data
+ port := PacketInGetPort(pkt.PacketIn)
+ if pName, err := d.GetPortName(port); err == nil {
+ GetController().PacketInInd(d.ID, pName, data)
+ } else {
+ logger.Warnw(ctx, "Unknown Port", log.Fields{"Reason": err.Error()})
+ }
+}
+
+// PacketInGetPort to get the port on which the packet-in is reported
+func PacketInGetPort(pkt *ofp.OfpPacketIn) uint32 {
+ for _, field := range pkt.Match.OxmFields {
+ if field.OxmClass == ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
+ if ofbField, ok := field.Field.(*ofp.OfpOxmField_OfbField); ok {
+ if ofbField.OfbField.Type == ofp.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT {
+ if port, ok := ofbField.OfbField.Value.(*ofp.OfpOxmOfbField_Port); ok {
+ return port.Port
+ }
+ }
+ }
+ }
+ }
+ return 0
+}
+
+// PacketOutReq receives the packet out request from the application via the
+// controller. The interface from the application uses name as the identity.
+func (d *Device) PacketOutReq(outport string, inport string, data []byte, isCustomPkt bool) error {
+ inp, err := d.GetPortID(inport)
+ if err != nil {
+ return errors.New("Unknown inport")
+ }
+ outp, err1 := d.GetPortID(outport)
+ if err1 != nil {
+ return errors.New("Unknown outport")
+ }
+ logger.Debugw(ctx, "Sending packet out", log.Fields{"Device": d.ID, "Inport": inport, "Outport": outport})
+ return d.SendPacketOut(outp, inp, data, isCustomPkt)
+}
+
+// SendPacketOut is responsible for building the OF structure and send the
+// packet-out to the VOLTHA
+func (d *Device) SendPacketOut(outport uint32, inport uint32, data []byte, isCustomPkt bool) error {
+ pout := &ofp.PacketOut{}
+ pout.Id = d.ID
+ opout := &ofp.OfpPacketOut{}
+ pout.PacketOut = opout
+ opout.InPort = inport
+ opout.Data = data
+ opout.Actions = []*ofp.OfpAction{
+ {
+ Type: ofp.OfpActionType_OFPAT_OUTPUT,
+ Action: &ofp.OfpAction_Output{
+ Output: &ofp.OfpActionOutput{
+ Port: outport,
+ MaxLen: 65535,
+ },
+ },
+ },
+ }
+ d.packetOutChannel <- pout
+ return nil
+}
+
+// UpdateFlows receives the flows in the form that is implemented
+// in the VGC and transforms them to the OF format. This is handled
+// as a port of the task that is enqueued to do the same.
+func (d *Device) UpdateFlows(flow *of.VoltFlow, devPort *DevicePort) {
+ t := NewAddFlowsTask(d.ctx, flow, d)
+ logger.Debugw(ctx, "Port Context", log.Fields{"Ctx": devPort.GetContext()})
+ // check if port isNni , if yes flows will be added to device port queues.
+ if util.IsNniPort(devPort.ID) {
+ // Adding the flows to device port queues.
+ devPort.AddTask(t)
+ return
+ }
+ // If the flowHash is enabled then add the flows to the flowhash generated queues.
+ flowQueue := d.getAndAddFlowQueueForUniID(uint32(devPort.ID))
+ if flowQueue != nil {
+ logger.Debugw(ctx, "flowHashQId", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID})
+ flowQueue.AddTask(t)
+ logger.Debugw(ctx, "Tasks Info", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID, "Total": flowQueue.TotalTasks(), "Pending": flowQueue.NumPendingTasks()})
+ } else {
+ //FlowThrotling disabled, add to the device port queue
+ devPort.AddTask(t)
+ return
+ }
+}
+
+// UpdateGroup to update group info
+func (d *Device) UpdateGroup(group *of.Group, devPort *DevicePort) {
+ task := NewModGroupTask(d.ctx, group, d)
+ logger.Debugw(ctx, "NNI Port Context", log.Fields{"Ctx": devPort.GetContext()})
+ devPort.AddTask(task)
+}
+
+// ModMeter for mod meter task
+func (d *Device) ModMeter(command of.MeterCommand, meter *of.Meter, devPort *DevicePort) {
+ if command == of.MeterCommandAdd {
+ if _, err := d.GetMeter(meter.ID); err == nil {
+ logger.Debugw(ctx, "Meter already added", log.Fields{"ID": meter.ID})
+ return
+ }
+ }
+ t := NewModMeterTask(d.ctx, command, meter, d)
+ devPort.AddTask(t)
+}
+
+func (d *Device) getAndAddFlowQueueForUniID(id uint32) *UniIDFlowQueue {
+ d.flowQueueLock.RLock()
+ //If flowhash is 0 that means flowhash throttling is disabled, return nil
+ if d.flowHash == 0 {
+ d.flowQueueLock.RUnlock()
+ return nil
+ }
+ flowHashID := id % uint32(d.flowHash)
+ if value, found := d.flowQueue[uint32(flowHashID)]; found {
+ d.flowQueueLock.RUnlock()
+ return value
+ }
+ d.flowQueueLock.RUnlock()
+ logger.Debugw(ctx, "Flow queue not found creating one", log.Fields{"uniid": id, "hash": flowHashID})
+
+ return d.addFlowQueueForUniID(id)
+}
+
+func (d *Device) addFlowQueueForUniID(id uint32) *UniIDFlowQueue {
+
+ d.flowQueueLock.Lock()
+ defer d.flowQueueLock.Unlock()
+ flowHashID := id % uint32(d.flowHash)
+ flowQueue := NewUniIDFlowQueue(uint32(flowHashID))
+ flowQueue.Tasks.Initialize(d.ctx)
+ d.flowQueue[flowHashID] = flowQueue
+ return flowQueue
+}
+
+// SetFlowHash sets the device flow hash and writes to the DB.
+func (d *Device) SetFlowHash(hash uint32) {
+ d.flowQueueLock.Lock()
+ defer d.flowQueueLock.Unlock()
+
+ d.flowHash = hash
+ d.writeFlowHashToDB()
+}
+
+func (d *Device) writeFlowHashToDB() {
+ hash, err := json.Marshal(d.flowHash)
+ if err != nil {
+ logger.Errorw(ctx, "failed to marshal flow hash", log.Fields{"hash": d.flowHash})
+ return
+ }
+ if err := db.PutFlowHash(d.ID, string(hash)); err != nil {
+ logger.Errorw(ctx, "Failed to add flow hash to DB", log.Fields{"device": d.ID, "hash": d.flowHash})
+ }
+}
+
+//isSBOperAllowed - determins if the SB operation is allowed based on device state & force flag
+func (d *Device) isSBOperAllowed(forceAction bool) bool {
+
+ if d.State == DeviceStateUP {
+ return true
+ }
+
+ if d.State == DeviceStateDISABLED && forceAction {
+ return true
+ }
+
+ return false
+}
+
+func (d *Device) triggerFlowNotification(cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error) {
+ flow, _ := d.GetFlow(cookie)
+ d.triggerFlowResultNotification(cookie, flow, oper, bwDetails, err)
+}
+
+func (d *Device) triggerFlowResultNotification(cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error) {
+
+ statusCode, statusMsg := infraerror.GetErrorInfo(err)
+ success := isFlowOperSuccess(statusCode, oper)
+
+ updateFlow := func(cookie uint64, state int, reason string) {
+ if dbFlow, ok := d.GetFlow(cookie); ok {
+ dbFlow.State = uint8(state)
+ dbFlow.ErrorReason = reason
+ d.AddFlowToDb(dbFlow)
+ }
+ }
+
+ //Update flow results
+ // Add - Update Success or Failure status with reason
+ // Del - Delete entry from DB on success else update error reason
+ if oper == of.CommandAdd {
+ state := of.FlowAddSuccess
+ reason := ""
+ if !success {
+ state = of.FlowAddFailure
+ reason = statusMsg
+ }
+ updateFlow(cookie, state, reason)
+ logger.Debugw(ctx, "Updated Flow to DB", log.Fields{"Cookie": cookie, "State": state})
+ } else {
+ if success && flow != nil {
+ if err := d.DelFlow(flow); err != nil {
+ logger.Warnw(ctx, "Delete Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
+ }
+ } else if !success {
+ updateFlow(cookie, of.FlowDelFailure, statusMsg)
+ }
+ }
+
+ flowResult := intf.FlowStatus{
+ Cookie: strconv.FormatUint(cookie, 10),
+ Device: d.ID,
+ FlowModType: oper,
+ Flow: flow,
+ Status: statusCode,
+ Reason: statusMsg,
+ AdditionalData: bwDetails,
+ }
+
+ logger.Infow(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
+ GetController().ProcessFlowModResultIndication(flowResult)
+}
diff --git a/internal/pkg/controller/modgroup.go b/internal/pkg/controller/modgroup.go
new file mode 100644
index 0000000..49da920
--- /dev/null
+++ b/internal/pkg/controller/modgroup.go
@@ -0,0 +1,133 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package controller
+
+import (
+ "context"
+ "time"
+
+ infraerror "voltha-go-controller/internal/pkg/errorcodes"
+ infraerrorcode "voltha-go-controller/internal/pkg/errorcodes/service"
+
+ "voltha-go-controller/internal/pkg/of"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "google.golang.org/grpc/codes"
+)
+
+//ModGroupTask - Group Modification Task
+type ModGroupTask struct {
+ taskID uint8
+ ctx context.Context
+ group *of.Group
+ device *Device
+ timestamp string
+}
+
+//NewModGroupTask - Initializes new group task
+func NewModGroupTask(ctx context.Context, group *of.Group, device *Device) *ModGroupTask {
+ var grp ModGroupTask
+ grp.device = device
+ grp.group = group
+ grp.ctx = ctx
+ tstamp := (time.Now()).Format(time.RFC3339Nano)
+ grp.timestamp = tstamp
+ return &grp
+}
+
+//Name - Name of task
+func (grp *ModGroupTask) Name() string {
+ return "Group Mod Task"
+}
+
+//TaskID - Task id
+func (grp *ModGroupTask) TaskID() uint8 {
+ return grp.taskID
+}
+
+// Timestamp to return timestamp of the task
+func (grp *ModGroupTask) Timestamp() string {
+ return grp.timestamp
+}
+
+//Stop - task stop
+func (grp *ModGroupTask) Stop() {
+}
+
+//Start - task start
+func (grp *ModGroupTask) Start(ctx context.Context, taskID uint8) error {
+ var err error
+ grp.taskID = taskID
+ grp.ctx = ctx
+ i := 0
+
+ processGroupModResult := func(err error) bool {
+
+ statusCode, statusMsg := infraerror.GetErrorInfo(err)
+
+ if infraerrorcode.ErrorCode(statusCode) != infraerrorcode.ErrOk {
+
+ if grp.group.Command == of.GroupCommandAdd && (codes.Code(statusCode) == codes.AlreadyExists) {
+ logger.Warnw(ctx, "Update Group Table Failed - Ignoring since Group Already exists",
+ log.Fields{"groupId": grp.group.GroupID, "groupOp": grp.group.Command, "Status": statusCode, "errorReason": statusMsg})
+ return true
+ }
+ logger.Errorw(ctx, "Update Group Table Failed",
+ log.Fields{"groupId": grp.group.GroupID, "groupOp": grp.group.Command, "Status": statusCode, "errorReason": statusMsg})
+ return false
+ }
+ logger.Infow(ctx, "Group Mod Result", log.Fields{"groupID": grp.group.GroupID, "Error Code": statusCode})
+ return true
+
+ }
+
+ if grp.group.Command != of.GroupCommandDel {
+ grp.group.State = of.GroupOperPending
+ grp.device.UpdateGroupEntry(grp.group)
+ } else {
+ grp.device.DelGroupEntry(grp.group)
+ }
+
+ if !grp.device.isSBOperAllowed(grp.group.ForceAction) {
+ logger.Errorw(ctx, "Skipping Group Table Update", log.Fields{"Reason": "Device State not UP", "State": grp.device.State, "GroupID": grp.group.GroupID, "Operation": grp.group.Command})
+ return nil
+ }
+
+ groupUpdate := of.CreateGroupTableUpdate(grp.group)
+ if vc := grp.device.VolthaClient(); vc != nil {
+
+ //Retry on group mod failure
+ //Retry attempts = 3
+ //Delay between retry = 100ms. Total Possible Delay = 200ms
+ for {
+ logger.Infow(ctx, "Group Mod Triggered", log.Fields{"GroupId": grp.group.GroupID, "Attempt": i})
+ _, err = vc.UpdateLogicalDeviceFlowGroupTable(grp.ctx, groupUpdate)
+ if isSuccess := processGroupModResult(err); isSuccess {
+ break
+ }
+ i++
+ if i < 3 {
+ time.Sleep(100 * time.Millisecond)
+ continue
+ }
+ logger.Errorw(ctx, "Update Group Table Failed on all 3 attempts. Dropping request", log.Fields{"GroupId": grp.group.GroupID, "Bucket": grp.group.Buckets})
+ break
+
+ }
+ return err
+ }
+ logger.Error(ctx, "Update Group Flow Table Failed: Voltha Client Unavailable")
+ return nil
+}
diff --git a/internal/pkg/controller/modmeter.go b/internal/pkg/controller/modmeter.go
new file mode 100644
index 0000000..04b1e04
--- /dev/null
+++ b/internal/pkg/controller/modmeter.go
@@ -0,0 +1,124 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package controller
+
+import (
+ "context"
+ "time"
+
+ "voltha-go-controller/internal/pkg/of"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+)
+
+// ModMeterTask structure
+type ModMeterTask struct {
+ taskID uint8
+ ctx context.Context
+ command of.MeterCommand
+ meter *of.Meter
+ device *Device
+ timestamp string
+}
+
+// NewModMeterTask is the constructor for ModMeterTask
+func NewModMeterTask(ctx context.Context, command of.MeterCommand, meter *of.Meter, device *Device) *ModMeterTask {
+ var mmt ModMeterTask
+ mmt.device = device
+ mmt.meter = meter
+ mmt.ctx = ctx
+ mmt.command = command
+ tstamp := (time.Now()).Format(time.RFC3339Nano)
+ mmt.timestamp = tstamp
+ return &mmt
+}
+
+// Name returns name of the task
+func (mmt *ModMeterTask) Name() string {
+ return "Add Flows Task"
+}
+
+// TaskID returns task Id of the task
+func (mmt *ModMeterTask) TaskID() uint8 {
+ return mmt.taskID
+}
+
+// Timestamp returns time stamp for the task
+func (mmt *ModMeterTask) Timestamp() string {
+ return mmt.timestamp
+}
+
+// Stop to stop the task
+func (mmt *ModMeterTask) Stop() {
+}
+
+// Start to start the task
+func (mmt *ModMeterTask) Start(ctx context.Context, taskID uint8) error {
+ mmt.taskID = taskID
+ mmt.ctx = ctx
+
+ //Temp commenting Sync response handling
+ //triggerMeterNotification := func(err error) {
+
+ // statusCode, statusMsg := infraerror.GetErrorInfo(err)
+
+ // if mmt.command == of.MeterCommandAdd && infraerrorcode.ErrorCode(statusCode) != infraerrorcode.ErrOk {
+ // mmt.meter.State = of.MeterOperFailure
+ // mmt.meter.ErrorReason = statusMsg
+
+ // logger.Errorw(ctx, "Update Meter Table Failed",
+ // log.Fields{"meterId": mmt.meter.ID, "meterOp": mmt.command, "Status": statusCode, "errorReason": statusMsg})
+ // go mmt.device.AddMeterToDb(mmt.meter)
+ // } else {
+ // log.Infow("Meter Mod Result", log.Fields{"meterID": mmt.meter.ID, "Error Code": statusCode})
+ // }
+ // }
+
+ // First add/delete the flows first locally before passing them to actual device
+ if mmt.command == of.MeterCommandAdd {
+ mmt.meter.State = of.MeterOperPending
+ if err := mmt.device.AddMeter(mmt.meter); err != nil {
+ // Meter already exists so we dont have to do anything here
+ return nil
+ }
+ } else {
+ if !mmt.device.DelMeter(mmt.meter) {
+ // Meter doesn't exist so we dont have to do anything here
+ return nil
+ }
+ }
+
+ if mmt.device.State != DeviceStateUP {
+ logger.Errorw(ctx, "Update Meter Table Failed: Device State DOWN", log.Fields{"Reason": "Device State DOWN", "Meter": mmt.meter.ID})
+ return nil
+ }
+ meterMod, err := of.MeterUpdate(mmt.device.ID, mmt.command, mmt.meter)
+ if err != nil {
+ logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
+ return err
+ }
+
+ if vc := mmt.device.VolthaClient(); vc != nil {
+
+ if _, err = vc.UpdateLogicalDeviceMeterTable(mmt.ctx, meterMod); err != nil {
+ logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
+ }
+ //triggerMeterNotification(err)
+ return err
+ }
+
+ logger.Error(ctx, "Update Meter Table Failed: Voltha Client Unavailable")
+ return nil
+}
diff --git a/internal/pkg/controller/pendingprofiles.go b/internal/pkg/controller/pendingprofiles.go
new file mode 100644
index 0000000..6258f36
--- /dev/null
+++ b/internal/pkg/controller/pendingprofiles.go
@@ -0,0 +1,98 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package controller
+
+import (
+ "context"
+ "time"
+
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+)
+
+// PendingProfilesTask structure
+type PendingProfilesTask struct {
+ taskID uint8
+ ctx context.Context
+ device *Device
+ ts string
+}
+
+// NewPendingProfilesTask is constructor for PendingProfilesTask
+func NewPendingProfilesTask(device *Device) *PendingProfilesTask {
+ var ppt PendingProfilesTask
+ ppt.device = device
+ ppt.ts = (time.Now()).Format(time.RFC3339Nano)
+ return &ppt
+}
+
+// Name returns name of the task
+func (ppt *PendingProfilesTask) Name() string {
+ return "Pending Profiles Task"
+}
+
+// TaskID returns task id of the task
+func (ppt *PendingProfilesTask) TaskID() uint8 {
+ return ppt.taskID
+}
+
+// Timestamp returns timestamp of the task
+func (ppt *PendingProfilesTask) Timestamp() string {
+ return ppt.ts
+}
+
+// Stop to stop the task
+func (ppt *PendingProfilesTask) Stop() {
+}
+
+// Start is called by the framework and is responsible for implementing
+// the actual task.
+func (ppt *PendingProfilesTask) Start(ctx context.Context, taskID uint8) error {
+ logger.Warnw(ctx, "Pending Profiles Task Triggered", log.Fields{"Context": ctx, "taskID": taskID, "Device": ppt.device.ID})
+ ppt.taskID = taskID
+ ppt.ctx = ctx
+ var errInfo error
+
+ GetController().SetAuditFlags(ppt.device)
+
+ //Trigger Pending Service Delete Tasks
+ logger.Warnw(ctx, "Pending Service Delete Task Triggered", log.Fields{"Device": ppt.device.ID})
+ GetController().TriggerPendingProfileDeleteReq(ppt.device.ID)
+ logger.Warnw(ctx, "Pending Service Delete Task Completed", log.Fields{"Device": ppt.device.ID})
+
+ //Trigger Pending Migrate Services Tasks
+ logger.Warnw(ctx, "Pending Migrate Services Task Triggered", log.Fields{"Device": ppt.device.ID})
+ GetController().TriggerPendingMigrateServicesReq(ppt.device.ID)
+ logger.Warnw(ctx, "Pending Migrate Services Task Completed", log.Fields{"Device": ppt.device.ID})
+
+ GetController().ResetAuditFlags(ppt.device)
+
+ // Updating Mvlan Profile
+ logger.Warnw(ctx, "Pending Update Mvlan Task Triggered", log.Fields{"Device": ppt.device.ID})
+ if err := ppt.UpdateMvlanProfiles(); err != nil {
+ logger.Errorw(ctx, "Update Mvlan Profile Failed", log.Fields{"Reason": err.Error()})
+ errInfo = err
+ }
+ logger.Warnw(ctx, "Pending Update Mvlan Task Completed", log.Fields{"Device": ppt.device.ID})
+
+ logger.Warnw(ctx, "Pending Profiles Task Completed", log.Fields{"Context": ctx, "taskID": taskID, "Device": ppt.device.ID})
+ return errInfo
+}
+
+// UpdateMvlanProfiles to update the mvlan profiles
+func (ppt *PendingProfilesTask) UpdateMvlanProfiles() error {
+ GetController().UpdateMvlanProfiles(ppt.device.ID)
+ return nil
+}
diff --git a/internal/pkg/controller/utils.go b/internal/pkg/controller/utils.go
new file mode 100644
index 0000000..c07ac59
--- /dev/null
+++ b/internal/pkg/controller/utils.go
@@ -0,0 +1,283 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package controller
+
+import (
+ "fmt"
+ "strings"
+ "sync"
+)
+
+var mu sync.Mutex
+var xid uint32 = 1
+
+// GetXid to get xid
+func GetXid() uint32 {
+ mu.Lock()
+ defer mu.Unlock()
+ xid++
+ return xid
+}
+
+// PadString for padding of string
+func PadString(value string, padSize int) string {
+ size := len(value)
+ nullsNeeded := padSize - size
+ null := fmt.Sprintf("%c", '\000')
+ padded := strings.Repeat(null, nullsNeeded)
+ return fmt.Sprintf("%s%s", value, padded)
+}
+
+/*
+// extractAction for extract action
+func extractAction(action ofp.IAction) *openflow_13.OfpAction {
+ var ofpAction openflow_13.OfpAction
+ switch action.GetType() {
+ case ofp.OFPATOutput:
+ var outputAction openflow_13.OfpAction_Output
+ loxiOutputAction := action.(*ofp.ActionOutput)
+ var output openflow_13.OfpActionOutput
+ output.Port = uint32(loxiOutputAction.GetPort())
+ /*
+ var maxLen uint16
+ maxLen = loxiOutputAction.GetMaxLen()
+ output.MaxLen = uint32(maxLen)
+
+ */
+/*
+ output.MaxLen = 0
+ outputAction.Output = &output
+ ofpAction.Action = &outputAction
+ ofpAction.Type = openflow_13.OfpActionType_OFPAT_OUTPUT
+ case ofp.OFPATCopyTtlOut: //CopyTtltOut
+ case ofp.OFPATCopyTtlIn: //CopyTtlIn
+ case ofp.OFPATSetMplsTtl: //SetMplsTtl
+ case ofp.OFPATDecMplsTtl: //DecMplsTtl
+ case ofp.OFPATPushVLAN: //PushVlan
+ var pushVlan openflow_13.OfpAction_Push
+ loxiPushAction := action.(*ofp.ActionPushVlan)
+ var push openflow_13.OfpActionPush
+ push.Ethertype = uint32(loxiPushAction.Ethertype) //TODO This should be available in the fields
+ pushVlan.Push = &push
+ ofpAction.Type = openflow_13.OfpActionType_OFPAT_PUSH_VLAN
+ ofpAction.Action = &pushVlan
+ case ofp.OFPATPopVLAN: //PopVlan
+ ofpAction.Type = openflow_13.OfpActionType_OFPAT_POP_VLAN
+ case ofp.OFPATPushMpls: //PushMpls
+ case ofp.OFPATPopMpls: //PopMpls
+ case ofp.OFPATSetQueue: //SetQueue
+ case ofp.OFPATGroup: //ActionGroup
+ case ofp.OFPATSetNwTtl: //SetNwTtl
+ case ofp.OFPATDecNwTtl: //DecNwTtl
+ case ofp.OFPATSetField: //SetField
+ ofpAction.Type = openflow_13.OfpActionType_OFPAT_SET_FIELD
+ var ofpActionForSetField openflow_13.OfpAction_SetField
+ var ofpActionSetField openflow_13.OfpActionSetField
+ var ofpOxmField openflow_13.OfpOxmField
+ ofpOxmField.OxmClass = openflow_13.OfpOxmClass_OFPXMC_OPENFLOW_BASIC
+ var ofpOxmFieldForOfbField openflow_13.OfpOxmField_OfbField
+ var ofpOxmOfbField openflow_13.OfpOxmOfbField
+ loxiSetField := action.(*ofp.ActionSetField)
+ oxmName := loxiSetField.Field.GetOXMName()
+ switch oxmName {
+ //TODO handle set field sith other fields
+ case "vlan_vid":
+ ofpOxmOfbField.Type = openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID
+ var vlanVid openflow_13.OfpOxmOfbField_VlanVid
+ var VlanVid = loxiSetField.Field.GetOXMValue().(uint16)
+ vlanVid.VlanVid = uint32(VlanVid)
+
+ ofpOxmOfbField.Value = &vlanVid
+ }
+ ofpOxmFieldForOfbField.OfbField = &ofpOxmOfbField
+ ofpOxmField.Field = &ofpOxmFieldForOfbField
+ ofpActionSetField.Field = &ofpOxmField
+ ofpActionForSetField.SetField = &ofpActionSetField
+ ofpAction.Action = &ofpActionForSetField
+
+ case ofp.OFPATPushPbb: //PushPbb
+ case ofp.OFPATPopPbb: //PopPbb
+ case ofp.OFPATExperimenter: //Experimenter
+
+ }
+ return &ofpAction
+
+}
+
+// parseOxm for parsing OxmOfb field
+func parseOxm(ofbField *openflow_13.OfpOxmOfbField, DeviceID string) (goloxi.IOxm, uint16) {
+ switch ofbField.Type {
+ case openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT:
+ ofpInPort := ofp.NewOxmInPort()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_Port)
+ ofpInPort.Value = ofp.Port(val.Port)
+ return ofpInPort, 4
+ case openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_ETH_TYPE:
+ ofpEthType := ofp.NewOxmEthType()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_EthType)
+ ofpEthType.Value = ofp.EthernetType(val.EthType)
+ return ofpEthType, 2
+ case openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_IN_PHY_PORT:
+ ofpInPhyPort := ofp.NewOxmInPhyPort()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_PhysicalPort)
+ ofpInPhyPort.Value = ofp.Port(val.PhysicalPort)
+ return ofpInPhyPort, 4
+ case openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_IP_PROTO:
+ ofpIPProto := ofp.NewOxmIpProto()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_IpProto)
+ ofpIPProto.Value = ofp.IpPrototype(val.IpProto)
+ return ofpIPProto, 1
+ case openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_UDP_SRC:
+ ofpUDPSrc := ofp.NewOxmUdpSrc()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_UdpSrc)
+ ofpUDPSrc.Value = uint16(val.UdpSrc)
+ return ofpUDPSrc, 2
+ case openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_UDP_DST:
+ ofpUDPDst := ofp.NewOxmUdpDst()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_UdpDst)
+ ofpUDPDst.Value = uint16(val.UdpDst)
+ return ofpUDPDst, 2
+ case openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID:
+ ofpVlanVid := ofp.NewOxmVlanVid()
+ val := ofbField.GetValue()
+ if val == nil {
+ ofpVlanVid.Value = uint16(0)
+ return ofpVlanVid, 2
+ }
+ vlanID := val.(*openflow_13.OfpOxmOfbField_VlanVid)
+ if ofbField.HasMask {
+ ofpVlanVidMasked := ofp.NewOxmVlanVidMasked()
+ valMask := ofbField.GetMask()
+ vlanMask := valMask.(*openflow_13.OfpOxmOfbField_VlanVidMask)
+ if vlanID.VlanVid == 4096 && vlanMask.VlanVidMask == 4096 {
+ ofpVlanVidMasked.Value = uint16(vlanID.VlanVid)
+ ofpVlanVidMasked.ValueMask = uint16(vlanMask.VlanVidMask)
+ } else {
+ ofpVlanVidMasked.Value = uint16(vlanID.VlanVid) | 0x1000
+ ofpVlanVidMasked.ValueMask = uint16(vlanMask.VlanVidMask)
+
+ }
+ return ofpVlanVidMasked, 4
+ }
+ ofpVlanVid.Value = uint16(vlanID.VlanVid) | 0x1000
+ return ofpVlanVid, 2
+ case openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_METADATA:
+ ofpMetadata := ofp.NewOxmMetadata()
+ val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_TableMetadata)
+ ofpMetadata.Value = val.TableMetadata
+ return ofpMetadata, 8
+ default:
+ }
+ return nil, 0
+}
+
+// parseInstructions for parsing of instructions
+func parseInstructions(ofpInstruction *openflow_13.OfpInstruction, DeviceID string) (ofp.IInstruction, uint16) {
+ instType := ofpInstruction.Type
+ data := ofpInstruction.GetData()
+ switch instType {
+ case ofp.OFPITWriteMetadata:
+ instruction := ofp.NewInstructionWriteMetadata()
+ instruction.Len = 24
+ metadata := data.(*openflow_13.OfpInstruction_WriteMetadata).WriteMetadata
+ instruction.Metadata = uint64(metadata.Metadata)
+ return instruction, 24
+ case ofp.OFPITMeter:
+ instruction := ofp.NewInstructionMeter()
+ instruction.Len = 8
+ meter := data.(*openflow_13.OfpInstruction_Meter).Meter
+ instruction.MeterId = meter.MeterId
+ return instruction, 8
+ case ofp.OFPITGotoTable:
+ instruction := ofp.NewInstructionGotoTable()
+ instruction.Len = 8
+ gotoTable := data.(*openflow_13.OfpInstruction_GotoTable).GotoTable
+ instruction.TableId = uint8(gotoTable.TableId)
+ return instruction, 8
+ case ofp.OFPITApplyActions:
+ instruction := ofp.NewInstructionApplyActions()
+ var instructionSize uint16
+ instructionSize = 8
+ //ofpActions := ofpInstruction.GetActions().Actions
+ var actions []goloxi.IAction
+ for _, ofpAction := range ofpInstruction.GetActions().Actions {
+ action, actionSize := parseAction(ofpAction, DeviceID)
+ actions = append(actions, action)
+ instructionSize += actionSize
+
+ }
+ instruction.Actions = actions
+ instruction.SetLen(instructionSize)
+ return instruction, instructionSize
+ }
+ //shouldn't have reached here :<
+ return nil, 0
+}
+
+// parseAction for parsing of actions
+func parseAction(ofpAction *openflow_13.OfpAction, DeviceID string) (goloxi.IAction, uint16) {
+ switch ofpAction.Type {
+ case openflow_13.OfpActionType_OFPAT_OUTPUT:
+ ofpOutputAction := ofpAction.GetOutput()
+ outputAction := ofp.NewActionOutput()
+ outputAction.Port = ofp.Port(ofpOutputAction.Port)
+ outputAction.MaxLen = uint16(ofpOutputAction.MaxLen)
+ outputAction.Len = 16
+ return outputAction, 16
+ case openflow_13.OfpActionType_OFPAT_PUSH_VLAN:
+ ofpPushVlanAction := ofp.NewActionPushVlan()
+ ofpPushVlanAction.Ethertype = uint16(ofpAction.GetPush().Ethertype)
+ ofpPushVlanAction.Len = 8
+ return ofpPushVlanAction, 8
+ case openflow_13.OfpActionType_OFPAT_POP_VLAN:
+ ofpPopVlanAction := ofp.NewActionPopVlan()
+ ofpPopVlanAction.Len = 8
+ return ofpPopVlanAction, 8
+ case openflow_13.OfpActionType_OFPAT_SET_FIELD:
+ ofpActionSetField := ofpAction.GetSetField()
+ setFieldAction := ofp.NewActionSetField()
+
+ iOxm, _ := parseOxm(ofpActionSetField.GetField().GetOfbField(), DeviceID)
+ setFieldAction.Field = iOxm
+ setFieldAction.Len = 16
+ return setFieldAction, 16
+ default:
+ }
+ return nil, 0
+}
+
+// parsePortStats for parsing of port stats
+func parsePortStats(port *voltha.LogicalPort) *ofp.PortStatsEntry {
+ stats := port.OfpPortStats
+ port.OfpPort.GetPortNo()
+ var entry ofp.PortStatsEntry
+ entry.SetPortNo(ofp.Port(port.OfpPort.GetPortNo()))
+ entry.SetRxPackets(stats.GetRxPackets())
+ entry.SetTxPackets(stats.GetTxPackets())
+ entry.SetRxBytes(stats.GetRxBytes())
+ entry.SetTxBytes(stats.GetTxBytes())
+ entry.SetRxDropped(stats.GetRxDropped())
+ entry.SetTxDropped(stats.GetTxDropped())
+ entry.SetRxErrors(stats.GetRxErrors())
+ entry.SetTxErrors(stats.GetTxErrors())
+ entry.SetRxFrameErr(stats.GetRxFrameErr())
+ entry.SetRxOverErr(stats.GetRxOverErr())
+ entry.SetRxCrcErr(stats.GetRxCrcErr())
+ entry.SetCollisions(stats.GetCollisions())
+ entry.SetDurationSec(stats.GetDurationSec())
+ entry.SetDurationNsec(stats.GetDurationNsec())
+ return &entry
+}*/