First Commit of Voltha-Go-Controller from Radisys

Change-Id: I8e2e908e7ab09a4fe3d86849da18b6d69dcf4ab0
diff --git a/internal/pkg/controller/addflows.go b/internal/pkg/controller/addflows.go
new file mode 100644
index 0000000..0364341
--- /dev/null
+++ b/internal/pkg/controller/addflows.go
@@ -0,0 +1,217 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package controller
+
+import (
+	"context"
+	infraerror "voltha-go-controller/internal/pkg/errorcodes"
+	infraerrorcode "voltha-go-controller/internal/pkg/errorcodes/service"
+	"time"
+
+	"voltha-go-controller/internal/pkg/of"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+)
+
+const (
+	//MaxRetryCount - Maximum retry attempts on failure
+	MaxRetryCount int = 1
+)
+
+// AddFlowsTask structure
+type AddFlowsTask struct {
+	taskID    uint8
+	ctx       context.Context
+	flow      *of.VoltFlow
+	device    *Device
+	timestamp string
+}
+
+// NewAddFlowsTask is constructor for AddFlowsTask
+func NewAddFlowsTask(ctx context.Context, flow *of.VoltFlow, device *Device) *AddFlowsTask {
+	var aft AddFlowsTask
+	aft.device = device
+	aft.flow = flow
+	aft.ctx = ctx
+	tstamp := (time.Now()).Format(time.RFC3339Nano)
+	aft.timestamp = tstamp
+	return &aft
+}
+
+// Name to add flow task
+func (aft *AddFlowsTask) Name() string {
+	for _, flow := range aft.flow.SubFlows {
+		logger.Infow(ctx, "Flow Cookies", log.Fields{"Cookie": flow.Cookie})
+	}
+	return "Add Flows Task"
+}
+
+// TaskID to return task ID
+func (aft *AddFlowsTask) TaskID() uint8 {
+	return aft.taskID
+}
+
+// Timestamp to return timestamp
+func (aft *AddFlowsTask) Timestamp() string {
+	return aft.timestamp
+}
+
+// Stop to stop the add flow task
+func (aft *AddFlowsTask) Stop() {
+}
+
+// Start to start adding flow task
+func (aft *AddFlowsTask) Start(ctx context.Context, taskID uint8) error {
+	var err error
+	aft.taskID = taskID
+	aft.ctx = ctx
+	flowsToProcess := make(map[uint64]*of.VoltSubFlow)
+	flowsPresent := 0
+	// First add/delete the flows first locally before passing them to actual device
+	for _, flow := range aft.flow.SubFlows {
+		logger.Infow(ctx, "Flow Mod Request", log.Fields{"Cookie": flow.Cookie, "Oper": aft.flow.Command, "Port": aft.flow.PortID})
+		if aft.flow.Command == of.CommandAdd {
+			flow.State = of.FlowAddPending
+			if err := aft.device.AddFlow(flow); err != nil {
+				logger.Warnw(ctx, "Add Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
+
+				// If flow already exists in cache, check for flow state
+				// If Success: Trigger success FLow Indication
+				// if Failure: Continue process, so add-retry happens
+				if err.Error() == ErrDuplicateFlow {
+					dbFlow, _ := aft.device.GetFlow(flow.Cookie)
+					if dbFlow.State == of.FlowAddSuccess {
+						aft.device.triggerFlowNotification(flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+						flowsPresent++
+					}
+				}
+			}
+			flowsToProcess[flow.Cookie] = flow
+		} else {
+			dbFlow, ok := aft.device.GetFlow(flow.Cookie)
+			if !ok {
+				logger.Warnw(ctx, "Delete Flow Error: Flow Does not Exist", log.Fields{"Cookie": flow.Cookie, "Device": aft.device.ID})
+			} else {
+				// dbFlow.State = of.FlowDelPending
+				// aft.device.AddFlowToDb(dbFlow)
+				flowsToProcess[flow.Cookie] = dbFlow
+			}
+			aft.device.triggerFlowNotification(flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+		}
+	}
+
+	if flowsPresent == len(aft.flow.SubFlows) {
+		logger.Warn(ctx, "All Flows already present in database. Skipping Flow Push to SB")
+	}
+
+	// PortName and PortID are used for validation of PortID, whether it is still valid and associated with old PortName or
+	// PortID got assigned to another PortName. If the condition met, skip these flow update to voltha core
+	if aft.flow.PortName != "" && aft.flow.PortID != 0 {
+		portName, _ := aft.device.GetPortName(aft.flow.PortID)
+		if aft.flow.PortName != portName && portName != "" {
+			for _, flow := range aft.flow.SubFlows {
+				logger.Errorw(ctx, "Skip Flow Update", log.Fields{"Reason": "Port Deleted", "PortName": aft.flow.PortName, "PortNo": aft.flow.PortID, "Cookie": flow.Cookie, "Operation": aft.flow.Command})
+				if aft.flow.Command == of.CommandDel {
+					aft.device.triggerFlowNotification(flow.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+				}
+			}
+			return nil
+		}
+	}
+
+	if !aft.device.isSBOperAllowed(aft.flow.ForceAction) {
+		for _, flow := range aft.flow.SubFlows {
+			logger.Errorw(ctx, "Skipping Flow Table Update", log.Fields{"Reason": "Device State not UP", "State": aft.device.State, "Cookie": flow.Cookie, "Operation": aft.flow.Command})
+		}
+		return nil
+	}
+
+	flows := of.ProcessVoltFlow(aft.device.ID, aft.flow.Command, flowsToProcess)
+	for _, flow := range flows {
+		attempt := 0
+		if vc := aft.device.VolthaClient(); vc != nil {
+			for {
+				if _, err = vc.UpdateLogicalDeviceFlowTable(aft.ctx, flow); err != nil {
+					logger.Errorw(ctx, "Update Flow Table Failed", log.Fields{"Cookie": flow.GetFlowMod().Cookie, "Reason": err.Error(), "Operation": aft.flow.Command})
+					statusCode, _ := infraerror.GetErrorInfo(err)
+
+					// Retry on flow delete failure once.
+					// Do NOT retry incase of failure with reason: Entry Not Found
+					if aft.flow.Command == of.CommandDel && statusCode != uint32(infraerrorcode.ErrNotExists) {
+						if attempt != MaxRetryCount {
+							logger.Errorw(ctx, "Retrying Flow Delete", log.Fields{"Cookie": flow.GetFlowMod().Cookie, "Attempt": attempt})
+							attempt++
+							continue
+						}
+						logger.Errorw(ctx, "Flow Delete failed even aft max retries", log.Fields{"Flow": flow, "Attempt": attempt})
+					}
+				}
+				break
+			}
+			aft.device.triggerFlowNotification(flow.FlowMod.Cookie, aft.flow.Command, of.BwAvailDetails{}, nil)
+
+		} else {
+			logger.Errorw(ctx, "Update Flow Table Failed: Voltha Client Unavailable", log.Fields{"Flow": flow})
+		}
+	}
+	return nil
+}
+
+func isFlowOperSuccess(statusCode uint32, oper of.Command) bool {
+	volthaErrorCode := infraerrorcode.ErrorCode(statusCode)
+
+	if volthaErrorCode == infraerrorcode.ErrOk {
+		return true
+	}
+
+	if oper == of.CommandAdd && volthaErrorCode == infraerrorcode.ErrAlreadyExists {
+		return true
+
+	} else if oper == of.CommandDel && volthaErrorCode == infraerrorcode.ErrNotExists {
+		return true
+	}
+	return false
+}
+
+// func getBwAvailInfo(bwAvailInfo []*voltha.ResponseMsg) of.BwAvailDetails {
+// 	var bwInfo of.BwAvailDetails
+// 	// convert the bw details sent from olt to a struct
+// 	// received msg format:
+// 	// additional_data[Data{ResponseMsg
+// 	//{"key":"prevBW","value":"111111"},
+// 	//{"key":"presentBW","value":"10000"}]
+// 	if len(bwAvailInfo) > 1 {
+// 		prevBwResp := bwAvailInfo[0]
+// 		if prevBwResp.Key == of.PrevBwInfo {
+// 			_, err := strconv.Atoi(prevBwResp.Val)
+// 			if err == nil {
+// 				bwInfo.PrevBw = prevBwResp.Val
+// 			}
+// 		}
+
+// 		presentBwResp := bwAvailInfo[1]
+// 		if presentBwResp.Key == of.PresentBwInfo {
+// 			_, err := strconv.Atoi(prevBwResp.Val)
+// 			if err == nil {
+// 				bwInfo.PresentBw = presentBwResp.Val
+// 			}
+// 		}
+// 		if bwInfo.PresentBw == bwInfo.PrevBw {
+// 			return of.BwAvailDetails{}
+// 		}
+// 		logger.Infow(ctx, "Bandwidth-consumed-info", log.Fields{"BwConsumed": bwInfo})
+// 	}
+// 	return bwInfo
+// }
diff --git a/internal/pkg/controller/auditdevice.go b/internal/pkg/controller/auditdevice.go
new file mode 100644
index 0000000..ea00185
--- /dev/null
+++ b/internal/pkg/controller/auditdevice.go
@@ -0,0 +1,210 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package controller
+
+import (
+	"context"
+	"time"
+
+	"voltha-go-controller/internal/pkg/tasks"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-protos/v5/go/common"
+	ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+)
+
+// AuditEventType type
+type AuditEventType uint8
+
+const (
+	// AuditEventDeviceDisc constant
+	AuditEventDeviceDisc AuditEventType = 0
+	// AuditEventDeviceStateChange constant
+	AuditEventDeviceStateChange AuditEventType = 1
+)
+
+const (
+	// NNIPortID NNI port id
+	NNIPortID uint32 = 0x1000000
+)
+
+// AuditDevice structure
+type AuditDevice struct {
+	taskID    uint8
+	ctx       context.Context
+	device    *Device
+	stop      bool
+	timestamp string
+	event     AuditEventType
+}
+
+// NewAuditDevice is constructor for AuditDevice
+func NewAuditDevice(device *Device, event AuditEventType) *AuditDevice {
+	var ad AuditDevice
+	ad.device = device
+	ad.stop = false
+	tstamp := (time.Now()).Format(time.RFC3339Nano)
+	ad.timestamp = tstamp
+	ad.event = event
+	return &ad
+}
+
+// Name returns the task name
+func (ad *AuditDevice) Name() string {
+	return "Device Audit Task"
+}
+
+// TaskID returns the task id
+func (ad *AuditDevice) TaskID() uint8 {
+	return ad.taskID
+}
+
+// Timestamp returns the timestamp for the task
+func (ad *AuditDevice) Timestamp() string {
+	return ad.timestamp
+}
+
+// Stop to stop the task
+func (ad *AuditDevice) Stop() {
+	ad.stop = true
+}
+
+// Start to start the task
+func (ad *AuditDevice) Start(ctx context.Context, taskID uint8) error {
+	logger.Warnw(ctx, "Audit Device Task Triggered", log.Fields{"Context": ctx, "taskId": taskID, "Device": ad.device.ID})
+	ad.taskID = taskID
+	ad.ctx = ctx
+
+	if ad.stop {
+		logger.Errorw(ctx, "Audit Device Task Cancelled", log.Fields{"Context": ad.ctx, "Task": ad.taskID})
+		return tasks.ErrTaskCancelError
+	}
+
+	ofpps, err := ad.device.VolthaClient().ListLogicalDevicePorts(ad.ctx, &common.ID{Id: ad.device.ID})
+	if err != nil {
+		return err
+	}
+
+	// Compute the difference between the ports received and ports at VGC
+	// First build a map of all the received ports under missing ports. We
+	// will eliminate the ports that are in the device from the missing ports
+	// so that the elements remaining are missing ports. The ones that are
+	// not in missing ports are added to excess ports which should be deleted
+	// from the VGC.
+	missingPorts := make(map[uint32]*ofp.OfpPort)
+	for _, ofpp := range ofpps.Items {
+		missingPorts[ofpp.OfpPort.PortNo] = ofpp.OfpPort
+	}
+
+	var excessPorts []uint32
+	GetController().SetAuditFlags(ad.device)
+
+	processPortState := func(id uint32, vgcPort *DevicePort) {
+		logger.Debugw(ctx, "Process Port State Ind", log.Fields{"Port No": vgcPort.ID, "Port Name": vgcPort.Name})
+
+		if ofpPort, ok := missingPorts[id]; ok {
+			if ((vgcPort.State == PortStateDown) && (ofpPort.State == uint32(ofp.OfpPortState_OFPPS_LIVE))) || ((vgcPort.State == PortStateUp) && (ofpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE))) {
+				// This port exists in the received list and the map at
+				// VGC. This is common so delete it
+				logger.Infow(ctx, "Port State Mismatch", log.Fields{"Port": vgcPort.ID, "OfpPort": ofpPort.PortNo, "ReceivedState": ofpPort.State, "CurrentState": vgcPort.State})
+				ad.device.ProcessPortState(ofpPort.PortNo, ofpPort.State)
+			} else {
+				//To ensure the flows are in sync with port status and no mismatch due to reboot,
+				// repush/delete flows based on current port status
+				logger.Infow(ctx, "Port State Processing", log.Fields{"Port": vgcPort.ID, "OfpPort": ofpPort.PortNo, "ReceivedState": ofpPort.State, "CurrentState": vgcPort.State})
+				ad.device.ProcessPortStateAfterReboot(ofpPort.PortNo, ofpPort.State)
+			}
+			delete(missingPorts, id)
+		} else {
+			// This port is missing from the received list. This is an
+			// excess port at VGC. This must be added to excess ports
+			excessPorts = append(excessPorts, id)
+		}
+		logger.Debugw(ctx, "Processed Port State Ind", log.Fields{"Port No": vgcPort.ID, "Port Name": vgcPort.Name})
+
+	}
+
+	// 1st process the NNI port before all other ports so that the device state can be updated.
+	if vgcPort, ok := ad.device.PortsByID[NNIPortID]; ok {
+		logger.Info(ctx, "Processing NNI port state")
+		processPortState(NNIPortID, vgcPort)
+	}
+
+	for id, vgcPort := range ad.device.PortsByID {
+		if id == NNIPortID {
+			//NNI port already processed
+			continue
+		}
+		if ad.stop {
+			break
+		}
+		processPortState(id, vgcPort)
+	}
+	GetController().ResetAuditFlags(ad.device)
+
+	if ad.stop {
+		logger.Errorw(ctx, "Audit Device Task Cancelled", log.Fields{"Context": ad.ctx, "Task": ad.taskID})
+		return tasks.ErrTaskCancelError
+	}
+	ad.AddMissingPorts(missingPorts)
+	ad.DelExcessPorts(excessPorts)
+	ad.device.deviceAuditInProgress = false
+	logger.Warnw(ctx, "Audit Device Task Completed", log.Fields{"Context": ctx, "taskId": taskID, "Device": ad.device.ID})
+	return nil
+}
+
+// AddMissingPorts to add the missing ports
+func (ad *AuditDevice) AddMissingPorts(mps map[uint32]*ofp.OfpPort) {
+	logger.Debugw(ctx, "Device Audit - Add Missing Ports", log.Fields{"NumPorts": len(mps)})
+
+	addMissingPort := func(mp *ofp.OfpPort) {
+		logger.Debugw(ctx, "Process Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
+
+		// Error is ignored as it only drops duplicate ports
+		logger.Infow(ctx, "Calling AddPort", log.Fields{"No": mp.PortNo, "Name": mp.Name})
+		if err := ad.device.AddPort(mp.PortNo, mp.Name); err != nil {
+			logger.Warnw(ctx, "AddPort Failed", log.Fields{"No": mp.PortNo, "Name": mp.Name, "Reason": err})
+		}
+		if mp.State == uint32(ofp.OfpPortState_OFPPS_LIVE) {
+			ad.device.ProcessPortState(mp.PortNo, mp.State)
+		}
+		logger.Debugw(ctx, "Processed Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
+
+	}
+
+	// 1st process the NNI port before all other ports so that the flow provisioning for UNIs can be enabled
+	if mp, ok := mps[NNIPortID]; ok {
+		logger.Info(ctx, "Adding Missing NNI port")
+		addMissingPort(mp)
+	}
+
+	for portNo, mp := range mps {
+		if portNo != NNIPortID {
+			addMissingPort(mp)
+		}
+	}
+}
+
+// DelExcessPorts to delete the excess ports
+func (ad *AuditDevice) DelExcessPorts(eps []uint32) {
+	logger.Debugw(ctx, "Device Audit - Delete Excess Ports", log.Fields{"NumPorts": len(eps)})
+	for _, id := range eps {
+		// Now delete the port from the device @ VGC
+		logger.Infow(ctx, "Device Audit - Deleting Port", log.Fields{"PortId": id})
+		if err := ad.device.DelPort(id); err != nil {
+			logger.Warnw(ctx, "DelPort Failed", log.Fields{"PortId": id, "Reason": err})
+		}
+	}
+}
diff --git a/internal/pkg/controller/audittables.go b/internal/pkg/controller/audittables.go
new file mode 100644
index 0000000..486d560
--- /dev/null
+++ b/internal/pkg/controller/audittables.go
@@ -0,0 +1,532 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package controller
+
+import (
+	"context"
+	"strconv"
+	"time"
+
+	"voltha-go-controller/internal/pkg/intf"
+	"voltha-go-controller/internal/pkg/of"
+	"voltha-go-controller/internal/pkg/tasks"
+	"voltha-go-controller/internal/pkg/util"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-protos/v5/go/common"
+	ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
+)
+
+var (
+	rcvdGroups  map[uint32]*ofp.OfpGroupDesc
+	groupsToAdd []*of.Group
+	groupsToMod []*of.Group
+)
+
+// AuditTablesTask structure
+type AuditTablesTask struct {
+	taskID    uint8
+	ctx       context.Context
+	device    *Device
+	stop      bool
+	timestamp string
+}
+
+// NewAuditTablesTask is constructor for AuditTablesTask
+func NewAuditTablesTask(device *Device) *AuditTablesTask {
+	var att AuditTablesTask
+	att.device = device
+	att.stop = false
+	tstamp := (time.Now()).Format(time.RFC3339Nano)
+	att.timestamp = tstamp
+	return &att
+}
+
+// Name returns name of the task
+func (att *AuditTablesTask) Name() string {
+	return "Audit Table Task"
+}
+
+// TaskID to return task id of the task
+func (att *AuditTablesTask) TaskID() uint8 {
+	return att.taskID
+}
+
+// Timestamp to return timestamp for the task
+func (att *AuditTablesTask) Timestamp() string {
+	return att.timestamp
+}
+
+// Stop to stop the task
+func (att *AuditTablesTask) Stop() {
+	att.stop = true
+}
+
+// Start is called by the framework and is responsible for implementing
+// the actual task.
+func (att *AuditTablesTask) Start(ctx context.Context, taskID uint8) error {
+	logger.Warnw(ctx, "Audit Table Task Triggered", log.Fields{"Context": ctx, "taskId": taskID, "Device": att.device.ID})
+	att.taskID = taskID
+	att.ctx = ctx
+	var errInfo error
+	var err error
+
+	// Audit the meters
+	if err = att.AuditMeters(); err != nil {
+		logger.Errorw(ctx, "Audit Meters Failed", log.Fields{"Reason": err.Error()})
+		errInfo = err
+	}
+
+	// Audit the Groups
+	if rcvdGroups, err = att.AuditGroups(); err != nil {
+		logger.Errorw(ctx, "Audit Groups Failed", log.Fields{"Reason": err.Error()})
+		errInfo = err
+	}
+
+	// Audit the flows
+	if err = att.AuditFlows(); err != nil {
+		logger.Errorw(ctx, "Audit Flows Failed", log.Fields{"Reason": err.Error()})
+		errInfo = err
+	}
+
+	// Triggering deletion of excess groups from device after the corresponding flows are removed
+	// to avoid flow dependency error during group deletion
+	logger.Infow(ctx, "Excess Groups", log.Fields{"Groups": rcvdGroups})
+	att.DelExcessGroups(rcvdGroups)
+	logger.Warnw(ctx, "Audit Table Task Completed", log.Fields{"Context": ctx, "taskId": taskID, "Device": att.device.ID})
+	return errInfo
+
+}
+
+// AuditMeters : Audit the meters which includes fetching the existing meters at the
+// voltha and identifying the delta between the ones held here and the
+// ones held at VOLTHA. The delta must be cleaned up to keep both the
+// components in sync
+func (att *AuditTablesTask) AuditMeters() error {
+
+	if att.stop {
+		return tasks.ErrTaskCancelError
+	}
+	var vc voltha.VolthaServiceClient
+	if vc = att.device.VolthaClient(); vc == nil {
+		logger.Error(ctx, "Fetch Device Meters Failed: Voltha Client Unavailable")
+		return nil
+	}
+
+	//-----------------------------
+	// Perform the audit of meters
+	// Fetch the meters
+	ms, err := vc.ListLogicalDeviceMeters(att.ctx, &voltha.ID{Id: att.device.ID})
+	if err != nil {
+		logger.Warnw(ctx, "Audit of flows failed", log.Fields{"Reason": err.Error()})
+		return err
+	}
+
+	// Build the map for easy and faster processing
+	rcvdMeters := make(map[uint32]*ofp.OfpMeterStats)
+	for _, m := range ms.Items {
+		rcvdMeters[m.Stats.MeterId] = m.Stats
+	}
+
+	// Verify all meters that are in the controller but not in the device
+	missingMeters := []*of.Meter{}
+	for _, meter := range att.device.meters {
+
+		if att.stop {
+			break
+		}
+		logger.Debugw(ctx, "Auditing Meter", log.Fields{"Id": meter.ID})
+
+		if _, ok := rcvdMeters[meter.ID]; ok {
+			// The meter exists in the device too. Just remove it from
+			// the received meters
+			delete(rcvdMeters, meter.ID)
+		} else {
+			// The flow exists at the controller but not at the device
+			// Push the flow to the device
+			logger.Debugw(ctx, "Adding Meter To Missing Meters", log.Fields{"Id": meter.ID})
+			missingMeters = append(missingMeters, meter)
+		}
+	}
+	if !att.stop {
+		att.AddMissingMeters(missingMeters)
+		att.DelExcessMeters(rcvdMeters)
+	} else {
+		err = tasks.ErrTaskCancelError
+	}
+	return err
+}
+
+// AddMissingMeters adds the missing meters detected by AuditMeters
+func (att *AuditTablesTask) AddMissingMeters(meters []*of.Meter) {
+	logger.Debugw(ctx, "Adding missing meters", log.Fields{"Number": len(meters)})
+	for _, meter := range meters {
+		meterMod, err := of.MeterUpdate(att.device.ID, of.MeterCommandAdd, meter)
+		if err != nil {
+			logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
+			continue
+		}
+		if vc := att.device.VolthaClient(); vc != nil {
+			if _, err = vc.UpdateLogicalDeviceMeterTable(att.ctx, meterMod); err != nil {
+				logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
+			}
+		} else {
+			logger.Error(ctx, "Update Meter Table Failed: Voltha Client Unavailable")
+		}
+	}
+}
+
+// DelExcessMeters to delete excess meters
+func (att *AuditTablesTask) DelExcessMeters(meters map[uint32]*ofp.OfpMeterStats) {
+	logger.Debugw(ctx, "Deleting Excess Meters", log.Fields{"Number": len(meters)})
+	for _, meter := range meters {
+		meterMod := &ofp.OfpMeterMod{}
+		meterMod.Command = ofp.OfpMeterModCommand_OFPMC_DELETE
+		meterMod.MeterId = meter.MeterId
+		meterUpd := &ofp.MeterModUpdate{Id: att.device.ID, MeterMod: meterMod}
+		if vc := att.device.VolthaClient(); vc != nil {
+			if _, err := vc.UpdateLogicalDeviceMeterTable(att.ctx, meterUpd); err != nil {
+				logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
+			}
+		} else {
+			logger.Error(ctx, "Update Meter Table Failed: Voltha Client Unavailable")
+		}
+	}
+}
+
+// AuditFlows audit the flows which includes fetching the existing meters at the
+// voltha and identifying the delta between the ones held here and the
+// ones held at VOLTHA. The delta must be cleaned up to keep both the
+// components in sync
+func (att *AuditTablesTask) AuditFlows() error {
+
+	if att.stop {
+		return tasks.ErrTaskCancelError
+	}
+
+	var vc voltha.VolthaServiceClient
+	if vc = att.device.VolthaClient(); vc == nil {
+		logger.Error(ctx, "Flow Audit Failed: Voltha Client Unavailable")
+		return nil
+	}
+
+	// ---------------------------------
+	// Perform the audit of flows first
+	// Retrieve the flows from the device
+	f, err := vc.ListLogicalDeviceFlows(att.ctx, &common.ID{Id: att.device.ID})
+	if err != nil {
+		logger.Warnw(ctx, "Audit of flows failed", log.Fields{"Reason": err.Error()})
+		return err
+	}
+
+	defaultSuccessFlowStatus := intf.FlowStatus{
+		Device:      att.device.ID,
+		FlowModType: of.CommandAdd,
+		Status:      0,
+		Reason:      "",
+	}
+
+	// Build the map for easy and faster processing
+	rcvdFlows := make(map[uint64]*ofp.OfpFlowStats)
+	flowsToAdd := &of.VoltFlow{}
+	flowsToAdd.SubFlows = make(map[uint64]*of.VoltSubFlow)
+	for _, flow := range f.Items {
+		rcvdFlows[flow.Cookie] = flow
+	}
+
+	att.device.flowLock.Lock()
+	// Verify all flows that are in the controller but not in the device
+	for _, flow := range att.device.flows {
+
+		if att.stop {
+			break
+		}
+
+		logger.Debugw(ctx, "Auditing Flow", log.Fields{"Cookie": flow.Cookie})
+		if _, ok := rcvdFlows[flow.Cookie]; ok {
+			// The flow exists in the device too. Just remove it from
+			// the received flows & trigger flow success indication unless
+			// the flow in del failure/pending state
+
+			if flow.State != of.FlowDelFailure && flow.State != of.FlowDelPending {
+				delete(rcvdFlows, flow.Cookie)
+			}
+			defaultSuccessFlowStatus.Cookie = strconv.FormatUint(flow.Cookie, 10)
+
+			logger.Infow(ctx, "Triggering Internal Flow Notification", log.Fields{"Flow Status": defaultSuccessFlowStatus})
+			GetController().ProcessFlowModResultIndication(defaultSuccessFlowStatus)
+		} else {
+			// The flow exists at the controller but not at the device
+			// Push the flow to the device
+			logger.Debugw(ctx, "Adding Flow To Missing Flows", log.Fields{"Cookie": flow.Cookie})
+			flowsToAdd.SubFlows[flow.Cookie] = flow
+		}
+	}
+	att.device.flowLock.Unlock()
+
+	if !att.stop {
+		//  The flows remaining in the received flows are the excess flows at
+		// the device. Delete those flows
+		att.DelExcessFlows(rcvdFlows)
+		// Add the flows missing at the device
+		att.AddMissingFlows(flowsToAdd)
+	} else {
+		err = tasks.ErrTaskCancelError
+	}
+	return err
+}
+
+// AddMissingFlows : The flows missing from the device are reinstalled att the audit
+// The flows are added into a VoltFlow structure.
+func (att *AuditTablesTask) AddMissingFlows(mflow *of.VoltFlow) {
+	logger.Debugw(ctx, "Add Missing Flows", log.Fields{"Number": len(mflow.SubFlows)})
+	mflow.Command = of.CommandAdd
+	ofFlows := of.ProcessVoltFlow(att.device.ID, mflow.Command, mflow.SubFlows)
+	var vc voltha.VolthaServiceClient
+	var bwConsumedInfo of.BwAvailDetails
+	if vc = att.device.VolthaClient(); vc == nil {
+		logger.Error(ctx, "Update Flow Table Failed: Voltha Client Unavailable")
+		return
+	}
+	for _, flow := range ofFlows {
+		var dbFlow *of.VoltSubFlow
+		var present bool
+		if flow.FlowMod != nil {
+			if dbFlow, present = att.device.GetFlow(flow.FlowMod.Cookie); !present {
+				logger.Warn(ctx, "Flow Removed from DB. Ignoring Add Missing Flow", log.Fields{"Device": att.device.ID, "Cookie": flow.FlowMod.Cookie})
+				continue
+			}
+		}
+		var err error
+		if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flow); err != nil {
+			logger.Errorw(ctx, "Update Flow Table Failed", log.Fields{"Reason": err.Error()})
+		}
+		att.device.triggerFlowResultNotification(flow.FlowMod.Cookie, dbFlow, of.CommandAdd, bwConsumedInfo, err)
+	}
+}
+
+// DelExcessFlows delete the excess flows held at the VOLTHA
+func (att *AuditTablesTask) DelExcessFlows(flows map[uint64]*ofp.OfpFlowStats) {
+	logger.Debugw(ctx, "Deleting Excess Flows", log.Fields{"Number of Flows": len(flows)})
+
+	var vc voltha.VolthaServiceClient
+	if vc = att.device.VolthaClient(); vc == nil {
+		logger.Error(ctx, "Delete Excess Flows Failed: Voltha Client Unavailable")
+		return
+	}
+
+	// Let's cycle through the flows to delete the excess flows
+	for _, flow := range flows {
+
+		if _, present := att.device.GetFlow(flow.Cookie); present {
+			logger.Warn(ctx, "Flow Present in DB. Ignoring Delete Excess Flow", log.Fields{"Device": att.device.ID, "Cookie": flow.Cookie})
+			continue
+		}
+
+		logger.Debugw(ctx, "Deleting Flow", log.Fields{"Cookie": flow.Cookie})
+		// Create the flowMod structure and fill it out
+		flowMod := &ofp.OfpFlowMod{}
+		flowMod.Cookie = flow.Cookie
+		flowMod.TableId = flow.TableId
+		flowMod.Command = ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT
+		flowMod.IdleTimeout = flow.IdleTimeout
+		flowMod.HardTimeout = flow.HardTimeout
+		flowMod.Priority = flow.Priority
+		flowMod.BufferId = of.DefaultBufferID
+		flowMod.OutPort = of.DefaultOutPort
+		flowMod.OutGroup = of.DefaultOutGroup
+		flowMod.Flags = flow.Flags
+		flowMod.Match = flow.Match
+		flowMod.Instructions = flow.Instructions
+
+		// Create FlowTableUpdate
+		flowUpdate := &ofp.FlowTableUpdate{
+			Id:      att.device.ID,
+			FlowMod: flowMod,
+		}
+
+		var err error
+		if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flowUpdate); err != nil {
+			logger.Errorw(ctx, "Flow Audit Delete Failed", log.Fields{"Reason": err.Error()})
+		}
+		att.device.triggerFlowResultNotification(flow.Cookie, nil, of.CommandDel, of.BwAvailDetails{}, err)
+	}
+}
+
+// AuditGroups audit the groups which includes fetching the existing groups at the
+// voltha and identifying the delta between the ones held here and the
+// ones held at VOLTHA. The delta must be cleaned up to keep both the
+// components in sync
+func (att *AuditTablesTask) AuditGroups() (map[uint32]*ofp.OfpGroupDesc, error) {
+
+	// Build the map for easy and faster processing
+	rcvdGroups = make(map[uint32]*ofp.OfpGroupDesc)
+
+	if att.stop {
+		return rcvdGroups, tasks.ErrTaskCancelError
+	}
+
+	var vc voltha.VolthaServiceClient
+	if vc = att.device.VolthaClient(); vc == nil {
+		logger.Error(ctx, "Group Audit Failed: Voltha Client Unavailable")
+		return rcvdGroups, nil
+	}
+
+	// ---------------------------------
+	// Perform the audit of groups first
+	// Retrieve the groups from the device
+	g, err := vc.ListLogicalDeviceFlowGroups(att.ctx, &common.ID{Id: att.device.ID})
+	if err != nil {
+		logger.Warnw(ctx, "Audit of groups failed", log.Fields{"Reason": err.Error()})
+		return rcvdGroups, err
+	}
+
+	groupsToAdd = []*of.Group{}
+	groupsToMod = []*of.Group{}
+	for _, group := range g.Items {
+		rcvdGroups[group.Desc.GroupId] = group.Desc
+	}
+	logger.Infow(ctx, "Received Groups", log.Fields{"Groups": rcvdGroups})
+
+	// Verify all groups that are in the controller but not in the device
+	att.device.groups.Range(att.compareGroupEntries)
+
+	if !att.stop {
+		// Add the groups missing at the device
+		logger.Infow(ctx, "Missing Groups", log.Fields{"Groups": groupsToAdd})
+		att.AddMissingGroups(groupsToAdd)
+
+		// Update groups with group member mismatch
+		logger.Infow(ctx, "Modify Groups", log.Fields{"Groups": groupsToMod})
+		att.UpdateMismatchGroups(groupsToMod)
+
+		// Note: Excess groups will be deleted after ensuring the connected
+		// flows are also removed as part fo audit flows
+	} else {
+		err = tasks.ErrTaskCancelError
+	}
+	// The groups remaining in the received groups are the excess groups at
+	// the device
+	return rcvdGroups, err
+}
+
+// compareGroupEntries to compare the group entries
+func (att *AuditTablesTask) compareGroupEntries(key, value interface{}) bool {
+
+	if att.stop {
+		return false
+	}
+
+	groupID := key.(uint32)
+	dbGroup := value.(*of.Group)
+	logger.Debugw(ctx, "Auditing Group", log.Fields{"Groupid": groupID})
+	if rcvdGrp, ok := rcvdGroups[groupID]; ok {
+		// The group exists in the device too.
+		// Compare the group members and add to modify list if required
+		compareGroupMembers(dbGroup, rcvdGrp)
+		delete(rcvdGroups, groupID)
+	} else {
+		// The group exists at the controller but not at the device
+		// Push the group to the device
+		logger.Debugw(ctx, "Adding Group To Missing Groups", log.Fields{"GroupId": groupID})
+		groupsToAdd = append(groupsToAdd, value.(*of.Group))
+	}
+	return true
+}
+
+func compareGroupMembers(refGroup *of.Group, rcvdGroup *ofp.OfpGroupDesc) {
+
+	portList := []uint32{}
+	refPortList := []uint32{}
+
+	//Collect port list from response Group Mod structure
+	//If PON is configured even for one group, then only PON shall be considered for compared for all groups
+	for _, bucket := range rcvdGroup.Buckets {
+		for _, actionBucket := range bucket.Actions {
+			if actionBucket.Type == ofp.OfpActionType_OFPAT_OUTPUT {
+				action := actionBucket.GetOutput()
+				portList = append(portList, action.Port)
+			}
+		}
+	}
+
+	refPortList = append(refPortList, refGroup.Buckets...)
+
+	//Is port list differs, trigger group update
+	if !util.IsSliceSame(refPortList, portList) {
+		groupsToMod = append(groupsToMod, refGroup)
+	}
+}
+
+//AddMissingGroups - addmissing groups to Voltha
+func (att *AuditTablesTask) AddMissingGroups(groupList []*of.Group) {
+	att.PushGroups(groupList, of.GroupCommandAdd)
+}
+
+//UpdateMismatchGroups - updates mismatched groups to Voltha
+func (att *AuditTablesTask) UpdateMismatchGroups(groupList []*of.Group) {
+	att.PushGroups(groupList, of.GroupCommandMod)
+}
+
+// PushGroups - The groups missing/to be updated in the device are reinstalled att the audit
+func (att *AuditTablesTask) PushGroups(groupList []*of.Group, grpCommand of.GroupCommand) {
+	logger.Debugw(ctx, "Pushing Groups", log.Fields{"Number": len(groupList), "Command": grpCommand})
+
+	var vc voltha.VolthaServiceClient
+	if vc = att.device.VolthaClient(); vc == nil {
+		logger.Error(ctx, "Update Group Table Failed: Voltha Client Unavailable")
+		return
+	}
+	for _, group := range groupList {
+		group.Command = grpCommand
+		groupUpdate := of.CreateGroupTableUpdate(group)
+		if _, err := vc.UpdateLogicalDeviceFlowGroupTable(att.ctx, groupUpdate); err != nil {
+			logger.Errorw(ctx, "Update Group Table Failed", log.Fields{"Reason": err.Error()})
+		}
+	}
+}
+
+// DelExcessGroups - Delete the excess groups held at the VOLTHA
+func (att *AuditTablesTask) DelExcessGroups(groups map[uint32]*ofp.OfpGroupDesc) {
+	logger.Debugw(ctx, "Deleting Excess Groups", log.Fields{"Number of Groups": len(groups)})
+
+	var vc voltha.VolthaServiceClient
+	if vc = att.device.VolthaClient(); vc == nil {
+		logger.Error(ctx, "Delete Excess Groups Failed: Voltha Client Unavailable")
+		return
+	}
+
+	// Let's cycle through the groups to delete the excess groups
+	for _, groupDesc := range groups {
+		logger.Debugw(ctx, "Deleting Group", log.Fields{"GroupId": groupDesc.GroupId})
+		group := &of.Group{}
+		group.Device = att.device.ID
+		group.GroupID = groupDesc.GroupId
+
+		//Group Members should be deleted before triggered group delete
+		group.Command = of.GroupCommandMod
+		groupUpdate := of.CreateGroupTableUpdate(group)
+		if _, err := vc.UpdateLogicalDeviceFlowGroupTable(att.ctx, groupUpdate); err != nil {
+			logger.Errorw(ctx, "Update Group Table Failed", log.Fields{"Reason": err.Error()})
+		}
+
+		group.Command = of.GroupCommandDel
+		groupUpdate = of.CreateGroupTableUpdate(group)
+		if _, err := vc.UpdateLogicalDeviceFlowGroupTable(att.ctx, groupUpdate); err != nil {
+			logger.Errorw(ctx, "Update Group Table Failed", log.Fields{"Reason": err.Error()})
+		}
+	}
+}
diff --git a/internal/pkg/controller/changeevent.go b/internal/pkg/controller/changeevent.go
new file mode 100644
index 0000000..95f6b07
--- /dev/null
+++ b/internal/pkg/controller/changeevent.go
@@ -0,0 +1,92 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package controller
+
+import (
+	"context"
+	"errors"
+	"time"
+
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+)
+
+// ChangeEventTask structure
+type ChangeEventTask struct {
+	taskID    uint8
+	ctx       context.Context
+	event     *ofp.ChangeEvent
+	device    *Device
+	timestamp string
+}
+
+// NewChangeEventTask is constructor for ChangeEventTask
+func NewChangeEventTask(ctx context.Context, event *ofp.ChangeEvent, device *Device) *ChangeEventTask {
+	var cet ChangeEventTask
+	cet.device = device
+	cet.event = event
+	cet.ctx = ctx
+	tstamp := (time.Now()).Format(time.RFC3339Nano)
+	cet.timestamp = tstamp
+	return &cet
+}
+
+// Name returns the name of the task
+func (cet *ChangeEventTask) Name() string {
+	return "Change Event Task"
+}
+
+// TaskID to return task id of the task
+func (cet *ChangeEventTask) TaskID() uint8 {
+	return cet.taskID
+}
+
+// Timestamp to return timestamp for the task
+func (cet *ChangeEventTask) Timestamp() string {
+	return cet.timestamp
+}
+
+// Stop to stop the task
+func (cet *ChangeEventTask) Stop() {
+}
+
+// Start to start the Change event task
+func (cet *ChangeEventTask) Start(ctx context.Context, taskID uint8) error {
+	cet.taskID = taskID
+	cet.ctx = ctx
+
+	if status, ok := cet.event.Event.(*ofp.ChangeEvent_PortStatus); ok {
+		portNo := status.PortStatus.Desc.PortNo
+		portName := status.PortStatus.Desc.Name
+		state := status.PortStatus.Desc.State
+		logger.Infow(ctx, "Process Port Change Event", log.Fields{"Port No": portNo, "Port Name": portName, "State": state, "Reason": status.PortStatus.Reason})
+		if status.PortStatus.Reason == ofp.OfpPortReason_OFPPR_ADD {
+			_ = cet.device.AddPort(portNo, portName)
+			if state == uint32(ofp.OfpPortState_OFPPS_LIVE) {
+				cet.device.ProcessPortState(portNo, state)
+			}
+		} else if status.PortStatus.Reason == ofp.OfpPortReason_OFPPR_DELETE {
+			if err := cet.device.DelPort(portNo); err != nil {
+				logger.Warnw(ctx, "DelPort Failed", log.Fields{"Port No": portNo, "Error": err})
+			}
+		} else if status.PortStatus.Reason == ofp.OfpPortReason_OFPPR_MODIFY {
+			cet.device.ProcessPortUpdate(portName, portNo, state)
+		}
+		logger.Infow(ctx, "Processed Port Change Event", log.Fields{"Port No": portNo, "Port Name": portName, "State": state, "Reason": status.PortStatus.Reason})
+		return nil
+	}
+	return errors.New("Invalid message received")
+}
diff --git a/internal/pkg/controller/controller.go b/internal/pkg/controller/controller.go
new file mode 100644
index 0000000..ae34133
--- /dev/null
+++ b/internal/pkg/controller/controller.go
@@ -0,0 +1,523 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package controller
+
+import (
+	"context"
+	"errors"
+	"sync"
+	"time"
+
+	"encoding/hex"
+
+	"voltha-go-controller/database"
+	errorCodes "voltha-go-controller/internal/pkg/errorcodes"
+	"voltha-go-controller/internal/pkg/intf"
+	"voltha-go-controller/internal/pkg/of"
+	"voltha-go-controller/internal/pkg/tasks"
+	"voltha-go-controller/internal/pkg/util"
+	"voltha-go-controller/internal/pkg/vpagent"
+
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+)
+
+var logger log.CLogger
+var ctx = context.TODO()
+
+func init() {
+	// Setup this package so that it's log level can be modified at run time
+	var err error
+	logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
+	if err != nil {
+		panic(err)
+	}
+}
+
+var db database.DBIntf
+
+var deviceTableSyncDuration = 15 * time.Minute
+
+//SetDeviceTableSyncDuration - sets interval between device table sync up activity
+//  duration - in minutes
+func SetDeviceTableSyncDuration(duration int) {
+	deviceTableSyncDuration = time.Duration(duration) * time.Minute
+}
+
+// VoltController structure
+type VoltController struct {
+	rebootLock              sync.Mutex
+	rebootInProgressDevices map[string]string
+	devices                 map[string]*Device
+	deviceLock              sync.RWMutex
+	vagent                  map[string]*vpagent.VPAgent
+	ctx                     context.Context
+	app                     intf.App
+	RebootFlow              bool
+	BlockedDeviceList       *util.ConcurrentMap
+	deviceTaskQueue         *util.ConcurrentMap
+}
+
+var vcontroller *VoltController
+
+// NewController is the constructor for VoltController
+func NewController(ctx context.Context, app intf.App) intf.IVPClientAgent {
+	var controller VoltController
+
+	controller.rebootInProgressDevices = make(map[string]string)
+	controller.devices = make(map[string]*Device)
+	controller.deviceLock = sync.RWMutex{}
+	controller.ctx = ctx
+	controller.app = app
+	controller.BlockedDeviceList = util.NewConcurrentMap()
+	controller.deviceTaskQueue = util.NewConcurrentMap()
+	db = database.GetDatabase()
+	vcontroller = &controller
+	return &controller
+}
+
+// AddDevice to add device
+func (v *VoltController) AddDevice(config *intf.VPClientCfg) intf.IVPClient {
+
+	d := NewDevice(config.DeviceID, config.SerialNum, config.VolthaClient, config.SouthBoundID)
+	v.devices[config.DeviceID] = d
+	v.app.AddDevice(d.ID, d.SerialNum, config.SouthBoundID)
+
+	d.RestoreMetersFromDb()
+	d.RestoreGroupsFromDb()
+	d.RestoreFlowsFromDb()
+	d.RestorePortsFromDb()
+	d.ConnectInd(context.TODO(), intf.DeviceDisc)
+	d.packetOutChannel = config.PacketOutChannel
+
+	logger.Warnw(ctx, "Added device", log.Fields{"Device": config.DeviceID, "SerialNo": d.SerialNum, "State": d.State})
+
+	return d
+}
+
+// DelDevice to delete device
+func (v *VoltController) DelDevice(id string) {
+	d, ok := v.devices[id]
+	if ok {
+		delete(v.devices, id)
+		d.Delete()
+	}
+	v.app.DelDevice(id)
+	d.cancel() // To stop the device tables sync routine
+	logger.Warnw(ctx, "Deleted device", log.Fields{"Device": id})
+}
+
+//AddControllerTask - add task to controller queue
+func (v *VoltController) AddControllerTask(device string, task tasks.Task) {
+	var taskQueueIntf interface{}
+	var taskQueue *tasks.Tasks
+	var found bool
+	if taskQueueIntf, found = v.deviceTaskQueue.Get(device); !found {
+		taskQueue = tasks.NewTasks(context.TODO())
+		v.deviceTaskQueue.Set(device, taskQueue)
+	} else {
+		taskQueue = taskQueueIntf.(*tasks.Tasks)
+	}
+	taskQueue.AddTask(task)
+	logger.Warnw(ctx, "Task Added to Controller Task List", log.Fields{"Len": taskQueue.NumPendingTasks(), "Total": taskQueue.TotalTasks()})
+}
+
+//AddNewDevice - called when new device is discovered. This will be
+//processed as part of controller queue
+func (v *VoltController) AddNewDevice(config *intf.VPClientCfg) {
+	adt := NewAddDeviceTask(config)
+	v.AddControllerTask(config.DeviceID, adt)
+}
+
+// GetDevice to get device info
+func (v *VoltController) GetDevice(id string) (*Device, error) {
+	d, ok := v.devices[id]
+	if ok {
+		return d, nil
+	}
+	return nil, errorCodes.ErrDeviceNotFound
+}
+
+// IsRebootInProgressForDevice to check if reboot is in progress for the device
+func (v *VoltController) IsRebootInProgressForDevice(device string) bool {
+	v.rebootLock.Lock()
+	defer v.rebootLock.Unlock()
+	_, ok := v.rebootInProgressDevices[device]
+	return ok
+}
+
+// SetRebootInProgressForDevice to set reboot in progress for the device
+func (v *VoltController) SetRebootInProgressForDevice(device string) bool {
+	v.rebootLock.Lock()
+	defer v.rebootLock.Unlock()
+	_, ok := v.rebootInProgressDevices[device]
+	if ok {
+		return true
+	}
+	v.rebootInProgressDevices[device] = device
+	logger.Warnw(ctx, "Setted Reboot-In-Progress flag", log.Fields{"Device": device})
+
+	d, err := v.GetDevice(device)
+	if err == nil {
+		d.ResetCache()
+	} else {
+		logger.Errorw(ctx, "Failed to get device", log.Fields{"Device": device, "Error": err})
+	}
+
+	return true
+}
+
+// ReSetRebootInProgressForDevice to reset reboot in progress for the device
+func (v *VoltController) ReSetRebootInProgressForDevice(device string) bool {
+	v.rebootLock.Lock()
+	defer v.rebootLock.Unlock()
+	_, ok := v.rebootInProgressDevices[device]
+	if !ok {
+		return true
+	}
+	delete(v.rebootInProgressDevices, device)
+	logger.Warnw(ctx, "Resetted Reboot-In-Progress flag", log.Fields{"Device": device})
+	return true
+}
+
+// DeviceRebootInd is device reboot indication
+func (v *VoltController) DeviceRebootInd(dID string, srNo string, sbID string) {
+	v.app.DeviceRebootInd(dID, srNo, sbID)
+	_ = db.DelAllRoutesForDevice(dID)
+	_ = db.DelAllGroup(dID)
+	_ = db.DelAllMeter(dID)
+	_ = db.DelAllPONCounters(dID)
+}
+
+// DeviceDisableInd is device deactivation indication
+func (v *VoltController) DeviceDisableInd(dID string) {
+	v.app.DeviceDisableInd(dID)
+}
+
+//TriggerPendingProfileDeleteReq - trigger pending profile delete requests
+func (v *VoltController) TriggerPendingProfileDeleteReq(device string) {
+	v.app.TriggerPendingProfileDeleteReq(device)
+}
+
+//TriggerPendingMigrateServicesReq - trigger pending services migration requests
+func (v *VoltController) TriggerPendingMigrateServicesReq(device string) {
+	v.app.TriggerPendingMigrateServicesReq(device)
+}
+
+// SetAuditFlags to set the audit flags
+func (v *VoltController) SetAuditFlags(device *Device) {
+	v.app.SetRebootFlag(true)
+	device.auditInProgress = true
+}
+
+// ResetAuditFlags to reset the audit flags
+func (v *VoltController) ResetAuditFlags(device *Device) {
+	v.app.SetRebootFlag(false)
+	device.auditInProgress = false
+}
+
+//ProcessFlowModResultIndication - send flow mod result notification
+func (v *VoltController) ProcessFlowModResultIndication(flowStatus intf.FlowStatus) {
+	v.app.ProcessFlowModResultIndication(flowStatus)
+}
+
+// AddVPAgent to add the vpagent
+func (v *VoltController) AddVPAgent(vep string, vpa *vpagent.VPAgent) {
+	v.vagent[vep] = vpa
+}
+
+// VPAgent to get vpagent info
+func (v *VoltController) VPAgent(vep string) (*vpagent.VPAgent, error) {
+	vpa, ok := v.vagent[vep]
+	if ok {
+		return vpa, nil
+	}
+	return nil, errors.New("VPA Not Registered")
+}
+
+// PacketOutReq for packet out request
+func (v *VoltController) PacketOutReq(device string, inport string, outport string, pkt []byte, isCustomPkt bool) error {
+	logger.Debugw(ctx, "Packet Out Req", log.Fields{"Device": device, "OutPort": outport})
+	d, err := v.GetDevice(device)
+	if err != nil {
+		return err
+	}
+	logger.Debugw(ctx, "Packet Out Pkt", log.Fields{"Pkt": hex.EncodeToString(pkt)})
+	return d.PacketOutReq(inport, outport, pkt, isCustomPkt)
+}
+
+// AddFlows to add flows
+func (v *VoltController) AddFlows(port string, device string, flow *of.VoltFlow) error {
+	d, err := v.GetDevice(device)
+	if err != nil {
+		logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
+		return err
+	}
+	devPort := d.GetPortByName(port)
+	if devPort == nil {
+		logger.Errorw(ctx, "Port Not Found", log.Fields{"Device": device})
+		return errorCodes.ErrPortNotFound
+	}
+	if d.ctx == nil {
+		//FIXME: Application should know the context before it could submit task. Handle at application level
+		logger.Errorw(ctx, "Context is missing. AddFlow Operation Not added to Task", log.Fields{"Device": device})
+		return errorCodes.ErrInvalidParamInRequest
+	}
+
+	var isMigrationRequired bool
+	if flow.MigrateCookie {
+		// flow migration to new cookie must be done only during the audit. Migration for all subflows must be done if
+		// atlease one subflow with old cookie found in the device.
+		for _, subFlow := range flow.SubFlows {
+			if isMigrationRequired = d.IsFlowPresentWithOldCookie(subFlow); isMigrationRequired {
+				break
+			}
+		}
+	}
+
+	if isMigrationRequired {
+		// In this case, the flow is updated in local cache and db here.
+		// Actual flow deletion and addition at voltha will happen during flow tables audit.
+		for _, subFlow := range flow.SubFlows {
+			logger.Debugw(ctx, "Cookie Migration Required", log.Fields{"OldCookie": subFlow.OldCookie, "NewCookie": subFlow.Cookie})
+			if err := d.DelFlowWithOldCookie(subFlow); err != nil {
+				logger.Errorw(ctx, "Delete flow with old cookie failed", log.Fields{"Error": err, "OldCookie": subFlow.OldCookie})
+			}
+			if err := d.AddFlow(subFlow); err != nil {
+				logger.Errorw(ctx, "Flow Add Failed", log.Fields{"Error": err, "Cookie": subFlow.Cookie})
+			}
+		}
+	} else {
+		flow.Command = of.CommandAdd
+		d.UpdateFlows(flow, devPort)
+		for cookie := range flow.SubFlows {
+			logger.Debugw(ctx, "Flow Add added to queue", log.Fields{"Cookie": cookie, "Device": device, "Port": port})
+		}
+	}
+	return nil
+}
+
+// DelFlows to delete flows
+func (v *VoltController) DelFlows(port string, device string, flow *of.VoltFlow) error {
+	d, err := v.GetDevice(device)
+	if err != nil {
+		logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
+		return err
+	}
+	devPort := d.GetPortByName(port)
+	if devPort == nil {
+		logger.Errorw(ctx, "Port Not Found", log.Fields{"Device": device})
+		return errorCodes.ErrPortNotFound
+	}
+	if d.ctx == nil {
+		//FIXME: Application should know the context before it could submit task. Handle at application level
+		logger.Errorw(ctx, "Context is missing. DelFlow Operation Not added to Task", log.Fields{"Device": device})
+		return errorCodes.ErrInvalidParamInRequest
+	}
+
+	var isMigrationRequired bool
+	if flow.MigrateCookie {
+		// flow migration to new cookie must be done only during the audit. Migration for all subflows must be done if
+		// atlease one subflow with old cookie found in the device.
+		for _, subFlow := range flow.SubFlows {
+			if isMigrationRequired = d.IsFlowPresentWithOldCookie(subFlow); isMigrationRequired {
+				break
+			}
+		}
+	}
+
+	if isMigrationRequired {
+		// In this case, the flow is deleted from local cache and db here.
+		// Actual flow deletion at voltha will happen during flow tables audit.
+		for _, subFlow := range flow.SubFlows {
+			logger.Debugw(ctx, "Old Cookie delete Required", log.Fields{"OldCookie": subFlow.OldCookie})
+			if err := d.DelFlowWithOldCookie(subFlow); err != nil {
+				logger.Errorw(ctx, "DelFlowWithOldCookie failed", log.Fields{"OldCookie": subFlow.OldCookie, "Error": err})
+			}
+		}
+	} else {
+		flow.Command = of.CommandDel
+		d.UpdateFlows(flow, devPort)
+		for cookie := range flow.SubFlows {
+			logger.Debugw(ctx, "Flow Del added to queue", log.Fields{"Cookie": cookie, "Device": device, "Port": port})
+		}
+	}
+	return nil
+}
+
+// GroupUpdate for group update
+func (v *VoltController) GroupUpdate(port string, device string, group *of.Group) error {
+	d, err := v.GetDevice(device)
+	if err != nil {
+		logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
+		return err
+	}
+
+	devPort := d.GetPortByName(port)
+	if devPort == nil {
+		logger.Errorw(ctx, "Port Not Found", log.Fields{"Device": device})
+		return errorCodes.ErrPortNotFound
+	}
+
+	if d.ctx == nil {
+		//FIXME: Application should know the context before it could submit task. Handle at application level
+		logger.Errorw(ctx, "Context is missing. GroupMod Operation Not added to task", log.Fields{"Device": device})
+		return errorCodes.ErrInvalidParamInRequest
+	}
+
+	d.UpdateGroup(group, devPort)
+	return nil
+}
+
+// ModMeter to get mod meter info
+func (v *VoltController) ModMeter(port string, device string, command of.MeterCommand, meter *of.Meter) error {
+	d, err := v.GetDevice(device)
+	if err != nil {
+		logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
+		return err
+	}
+
+	devPort := d.GetPortByName(port)
+	if devPort == nil {
+		logger.Errorw(ctx, "Port Not Found", log.Fields{"Device": device})
+		return errorCodes.ErrPortNotFound
+	}
+
+	d.ModMeter(command, meter, devPort)
+	return nil
+}
+
+// PortAddInd for port add indication
+func (v *VoltController) PortAddInd(device string, id uint32, name string) {
+	v.app.PortAddInd(device, id, name)
+}
+
+// PortDelInd for port delete indication
+func (v *VoltController) PortDelInd(device string, port string) {
+	v.app.PortDelInd(device, port)
+}
+
+// PortUpdateInd for port update indication
+func (v *VoltController) PortUpdateInd(device string, name string, id uint32) {
+	v.app.PortUpdateInd(device, name, id)
+}
+
+// PortUpInd for port up indication
+func (v *VoltController) PortUpInd(device string, port string) {
+	v.app.PortUpInd(device, port)
+}
+
+// PortDownInd for port down indication
+func (v *VoltController) PortDownInd(device string, port string) {
+	v.app.PortDownInd(device, port)
+}
+
+// DeviceUpInd for device up indication
+func (v *VoltController) DeviceUpInd(device string) {
+	v.app.DeviceUpInd(device)
+}
+
+// DeviceDownInd for device down indication
+func (v *VoltController) DeviceDownInd(device string) {
+	v.app.DeviceDownInd(device)
+}
+
+// PacketInInd for packet in indication
+func (v *VoltController) PacketInInd(device string, port string, data []byte) {
+	v.app.PacketInInd(device, port, data)
+}
+
+// GetPortState to get port status
+func (v *VoltController) GetPortState(device string, name string) (PortState, error) {
+	d, err := v.GetDevice(device)
+	if err != nil {
+		logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
+		return PortStateDown, err
+	}
+	return d.GetPortState(name)
+}
+
+// UpdateMvlanProfiles for update mvlan profiles
+func (v *VoltController) UpdateMvlanProfiles(device string) {
+	v.app.UpdateMvlanProfilesForDevice(device)
+}
+
+// GetController to get controller
+func GetController() *VoltController {
+	return vcontroller
+}
+
+/*
+// PostIndication to post indication
+func (v *VoltController) PostIndication(device string, task interface{}) error {
+	var srvTask AddServiceIndTask
+	var portTask AddPortIndTask
+	var taskCommon tasks.Task
+	var isSvcTask bool
+
+	switch data := task.(type) {
+	case *AddServiceIndTask:
+		srvTask = *data
+		taskCommon = data
+		isSvcTask = true
+	case *AddPortIndTask:
+		portTask = *data
+		taskCommon = data
+	}
+
+	d, err := v.GetDevice(device)
+	if err != nil {
+		logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
+		//It means device itself it not present so just post the indication directly
+		if isSvcTask {
+			msgbus.PostAccessConfigInd(srvTask.result, d.SerialNum, srvTask.indicationType, srvTask.serviceName, 0, srvTask.reason, srvTask.trigger, srvTask.portState)
+		} else {
+			msgbus.ProcessPortInd(portTask.indicationType, d.SerialNum, portTask.portName, portTask.accessConfig, portTask.serviceList)
+		}
+		return err
+	}
+	if taskCommon != nil {
+		d.AddTask(taskCommon)
+	}
+	return nil
+}
+*/
+
+// GetTaskList to get the task list
+func (v *VoltController) GetTaskList(device string) []tasks.Task {
+	d, err := v.GetDevice(device)
+	if err != nil || d.ctx == nil {
+		logger.Errorw(ctx, "Device Not Connected/Found", log.Fields{"Device": device, "Dev Obj": d})
+		return []tasks.Task{}
+	}
+	return d.GetTaskList()
+
+}
+
+// AddBlockedDevices to add devices to blocked devices list
+func (v *VoltController) AddBlockedDevices(deviceSerialNumber string) {
+	v.BlockedDeviceList.Set(deviceSerialNumber, deviceSerialNumber)
+}
+
+// DelBlockedDevices to remove device from blocked device list
+func (v *VoltController) DelBlockedDevices(deviceSerialNumber string) {
+	v.BlockedDeviceList.Remove(deviceSerialNumber)
+}
+
+// IsBlockedDevice to check if device is blocked
+func (v *VoltController) IsBlockedDevice(deviceSerialNumber string) bool {
+	_, ifPresent := v.BlockedDeviceList.Get(deviceSerialNumber)
+	return ifPresent
+}
diff --git a/internal/pkg/controller/controllertasks.go b/internal/pkg/controller/controllertasks.go
new file mode 100644
index 0000000..bd06ffb
--- /dev/null
+++ b/internal/pkg/controller/controllertasks.go
@@ -0,0 +1,76 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package controller
+
+import (
+	"context"
+	"time"
+
+	"voltha-go-controller/internal/pkg/intf"
+	"voltha-go-controller/internal/pkg/vpagent"
+
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+)
+
+// AddDeviceTask structure
+type AddDeviceTask struct {
+	taskID    uint8
+	ctx       context.Context
+	config    *intf.VPClientCfg
+	timestamp string
+}
+
+// NewAddDeviceTask is the constructor for AddDeviceTask
+func NewAddDeviceTask(config *intf.VPClientCfg) *AddDeviceTask {
+	var adt AddDeviceTask
+	adt.config = config
+	tstamp := (time.Now()).Format(time.RFC3339Nano)
+	adt.timestamp = tstamp
+	return &adt
+}
+
+// Name returns name of the task
+func (adt *AddDeviceTask) Name() string {
+	return "Add Device Task"
+}
+
+// TaskID returns task Id of the task
+func (adt *AddDeviceTask) TaskID() uint8 {
+	return adt.taskID
+}
+
+// Timestamp returns time stamp for the task
+func (adt *AddDeviceTask) Timestamp() string {
+	return adt.timestamp
+}
+
+// Stop to stop the task
+func (adt *AddDeviceTask) Stop() {
+}
+
+// Start to start the task
+func (adt *AddDeviceTask) Start(ctx context.Context, taskID uint8) error {
+	adt.taskID = taskID
+	adt.ctx = ctx
+
+	logger.Infow(ctx, "Add Device Task Triggered", log.Fields{"Device": adt.config.DeviceID, "SerialNum": adt.config.SerialNum})
+
+	device := GetController().AddDevice(adt.config)
+	vpagent.GetVPAgent().AddClientToClientMap(adt.config.DeviceID, device)
+	logger.Infow(ctx, "Add Device Task Completed", log.Fields{"Device": adt.config.DeviceID, "SerialNum": adt.config.SerialNum})
+
+	return nil
+}
diff --git a/internal/pkg/controller/device.go b/internal/pkg/controller/device.go
new file mode 100644
index 0000000..aa7bd2c
--- /dev/null
+++ b/internal/pkg/controller/device.go
@@ -0,0 +1,1042 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package controller
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	infraerror "voltha-go-controller/internal/pkg/errorcodes"
+	"strconv"
+	"sync"
+	"time"
+
+	"voltha-go-controller/database"
+	"voltha-go-controller/internal/pkg/holder"
+	"voltha-go-controller/internal/pkg/intf"
+	"voltha-go-controller/internal/pkg/of"
+	//"voltha-go-controller/internal/pkg/vpagent"
+	"voltha-go-controller/internal/pkg/tasks"
+	"voltha-go-controller/internal/pkg/util"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
+)
+
+// PortState type
+type PortState string
+
+const (
+	// PortStateDown constant
+	PortStateDown PortState = "DOWN"
+	// PortStateUp constant
+	PortStateUp PortState = "UP"
+	// DefaultMaxFlowQueues constant
+	DefaultMaxFlowQueues = 67
+	//ErrDuplicateFlow - indicates flow already exists in DB
+	ErrDuplicateFlow string = "Duplicate Flow"
+)
+
+// DevicePort structure
+type DevicePort struct {
+	tasks.Tasks
+	Name    string
+	ID      uint32
+	State   PortState
+	Version string
+}
+
+// NewDevicePort is the constructor for DevicePort
+func NewDevicePort(id uint32, name string) *DevicePort {
+	var port DevicePort
+
+	port.ID = id
+	port.Name = name
+	port.State = PortStateDown
+	return &port
+}
+
+// UniIDFlowQueue structure which maintains flows in queue.
+type UniIDFlowQueue struct {
+	tasks.Tasks
+	ID uint32
+}
+
+// NewUniIDFlowQueue is the constructor for UniIDFlowQueue.
+func NewUniIDFlowQueue(id uint32) *UniIDFlowQueue {
+	var flowQueue UniIDFlowQueue
+	flowQueue.ID = id
+	return &flowQueue
+}
+
+// DeviceState type
+type DeviceState string
+
+const (
+
+	// DeviceStateUNKNOWN constant
+	DeviceStateUNKNOWN DeviceState = "UNKNOWN"
+	// DeviceStateINIT constant
+	DeviceStateINIT DeviceState = "INIT"
+	// DeviceStateUP constant
+	DeviceStateUP DeviceState = "UP"
+	// DeviceStateDOWN constant
+	DeviceStateDOWN DeviceState = "DOWN"
+	// DeviceStateREBOOTED constant
+	DeviceStateREBOOTED DeviceState = "REBOOTED"
+	// DeviceStateDISABLED constant
+	DeviceStateDISABLED DeviceState = "DISABLED"
+	// DeviceStateDELETED constant
+	DeviceStateDELETED DeviceState = "DELETED"
+)
+
+// Device structure
+type Device struct {
+	tasks.Tasks
+	ID                    string
+	SerialNum             string
+	State                 DeviceState
+	PortsByID             map[uint32]*DevicePort
+	PortsByName           map[string]*DevicePort
+	portLock              sync.RWMutex
+	vclientHolder         *holder.VolthaServiceClientHolder
+	ctx                   context.Context
+	cancel                context.CancelFunc
+	packetOutChannel      chan *ofp.PacketOut
+	flows                 map[uint64]*of.VoltSubFlow
+	flowLock              sync.RWMutex
+	meters                map[uint32]*of.Meter
+	meterLock             sync.RWMutex
+	groups                sync.Map //map[uint32]*of.Group -> [GroupId : Group]
+	auditInProgress       bool
+	flowQueueLock         sync.RWMutex
+	flowHash              uint32
+	flowQueue             map[uint32]*UniIDFlowQueue // key is hash ID generated and value is UniIDFlowQueue.
+	deviceAuditInProgress bool
+	SouthBoundID          string
+}
+
+// NewDevice is the constructor for Device
+func NewDevice(id string, slno string, vclientHldr *holder.VolthaServiceClientHolder, southBoundID string) *Device {
+	var device Device
+	device.ID = id
+	device.SerialNum = slno
+	device.State = DeviceStateDOWN
+	device.PortsByID = make(map[uint32]*DevicePort)
+	device.PortsByName = make(map[string]*DevicePort)
+	device.vclientHolder = vclientHldr
+	device.flows = make(map[uint64]*of.VoltSubFlow)
+	device.meters = make(map[uint32]*of.Meter)
+	device.flowQueue = make(map[uint32]*UniIDFlowQueue)
+	//Get the flowhash from db and update the flowhash variable in the device.
+	device.SouthBoundID = southBoundID
+	flowHash, err := db.GetFlowHash(id)
+	if err != nil {
+		device.flowHash = DefaultMaxFlowQueues
+	} else {
+		var hash uint32
+		err = json.Unmarshal([]byte(flowHash), &hash)
+		if err != nil {
+			logger.Error(ctx, "Failed to unmarshall flowhash")
+		} else {
+			device.flowHash = hash
+		}
+	}
+	logger.Infow(ctx, "Flow hash for device", log.Fields{"Deviceid": id, "hash": device.flowHash})
+	return &device
+}
+
+// ResetCache to reset cache
+func (d *Device) ResetCache() {
+	logger.Warnw(ctx, "Resetting flows, meters and groups cache", log.Fields{"Device": d.ID})
+	d.flows = make(map[uint64]*of.VoltSubFlow)
+	d.meters = make(map[uint32]*of.Meter)
+	d.groups = sync.Map{}
+}
+
+// GetFlow - Get the flow from device obj
+func (d *Device) GetFlow(cookie uint64) (*of.VoltSubFlow, bool) {
+	d.flowLock.RLock()
+	defer d.flowLock.RUnlock()
+	logger.Infow(ctx, "Get Flow", log.Fields{"Cookie": cookie})
+	flow, ok := d.flows[cookie]
+	return flow, ok
+}
+
+// AddFlow - Adds the flow to the device and also to the database
+func (d *Device) AddFlow(flow *of.VoltSubFlow) error {
+	d.flowLock.Lock()
+	defer d.flowLock.Unlock()
+	logger.Infow(ctx, "AddFlow to device", log.Fields{"Cookie": flow.Cookie})
+	if _, ok := d.flows[flow.Cookie]; ok {
+		return errors.New(ErrDuplicateFlow)
+	}
+	d.flows[flow.Cookie] = flow
+	d.AddFlowToDb(flow)
+	return nil
+}
+
+// AddFlowToDb is the utility to add the flow to the device
+func (d *Device) AddFlowToDb(flow *of.VoltSubFlow) {
+	if b, err := json.Marshal(flow); err == nil {
+		if err = db.PutFlow(d.ID, flow.Cookie, string(b)); err != nil {
+			logger.Errorw(ctx, "Write Flow to DB failed", log.Fields{"device": d.ID, "cookie": flow.Cookie, "Reason": err})
+		}
+	}
+}
+
+// DelFlow - Deletes the flow from the device and the database
+func (d *Device) DelFlow(flow *of.VoltSubFlow) error {
+	d.flowLock.Lock()
+	defer d.flowLock.Unlock()
+	if _, ok := d.flows[flow.Cookie]; ok {
+		delete(d.flows, flow.Cookie)
+		d.DelFlowFromDb(flow.Cookie)
+		return nil
+	}
+	return errors.New("Flow does not Exist")
+}
+
+// DelFlowFromDb is utility to delete the flow from the device
+func (d *Device) DelFlowFromDb(flowID uint64) {
+	_ = db.DelFlow(d.ID, flowID)
+}
+
+// IsFlowPresentWithOldCookie is to check whether there is any flow with old cookie.
+func (d *Device) IsFlowPresentWithOldCookie(flow *of.VoltSubFlow) bool {
+	d.flowLock.RLock()
+	defer d.flowLock.RUnlock()
+	if _, ok := d.flows[flow.Cookie]; ok {
+		return false
+	} else if flow.OldCookie != 0 && flow.Cookie != flow.OldCookie {
+		if _, ok := d.flows[flow.OldCookie]; ok {
+			logger.Infow(ctx, "Flow present with old cookie", log.Fields{"OldCookie": flow.OldCookie})
+			return true
+		}
+	}
+	return false
+}
+
+// DelFlowWithOldCookie is to delete flow with old cookie.
+func (d *Device) DelFlowWithOldCookie(flow *of.VoltSubFlow) error {
+	d.flowLock.Lock()
+	defer d.flowLock.Unlock()
+	if _, ok := d.flows[flow.OldCookie]; ok {
+		logger.Infow(ctx, "Flow was added before vgc upgrade. Trying to delete with old cookie",
+			log.Fields{"OldCookie": flow.OldCookie})
+		delete(d.flows, flow.OldCookie)
+		d.DelFlowFromDb(flow.OldCookie)
+		return nil
+	}
+	return errors.New("Flow does not Exist")
+}
+
+// RestoreFlowsFromDb to restore flows from database
+func (d *Device) RestoreFlowsFromDb() {
+	flows, _ := db.GetFlows(d.ID)
+	for _, flow := range flows {
+		b, ok := flow.Value.([]byte)
+		if !ok {
+			logger.Warn(ctx, "The value type is not []byte")
+			continue
+		}
+		d.CreateFlowFromString(b)
+	}
+}
+
+// CreateFlowFromString to create flow from string
+func (d *Device) CreateFlowFromString(b []byte) {
+	var flow of.VoltSubFlow
+	if err := json.Unmarshal(b, &flow); err == nil {
+		if _, ok := d.flows[flow.Cookie]; !ok {
+			logger.Debugw(ctx, "Adding Flow From Db", log.Fields{"Cookie": flow.Cookie})
+			d.flows[flow.Cookie] = &flow
+		} else {
+			logger.Warnw(ctx, "Duplicate Flow", log.Fields{"Cookie": flow.Cookie})
+		}
+	} else {
+		logger.Warn(ctx, "Unmarshal failed")
+	}
+}
+
+// ----------------------------------------------------------
+// Database related functionality
+// Group operations at the device which include update and delete
+
+// UpdateGroupEntry - Adds/Updates the group to the device and also to the database
+func (d *Device) UpdateGroupEntry(group *of.Group) {
+
+	logger.Infow(ctx, "Update Group to device", log.Fields{"ID": group.GroupID})
+	d.groups.Store(group.GroupID, group)
+	d.AddGroupToDb(group)
+}
+
+// AddGroupToDb - Utility to add the group to the device DB
+func (d *Device) AddGroupToDb(group *of.Group) {
+	if b, err := json.Marshal(group); err == nil {
+		logger.Infow(ctx, "Adding Group to DB", log.Fields{"grp": group, "Json": string(b)})
+		if err = db.PutGroup(d.ID, group.GroupID, string(b)); err != nil {
+			logger.Errorw(ctx, "Write Group to DB failed", log.Fields{"device": d.ID, "groupID": group.GroupID, "Reason": err})
+		}
+	}
+}
+
+// DelGroupEntry - Deletes the group from the device and the database
+func (d *Device) DelGroupEntry(group *of.Group) {
+
+	if _, ok := d.groups.Load(group.GroupID); ok {
+		d.groups.Delete(group.GroupID)
+		d.DelGroupFromDb(group.GroupID)
+	}
+}
+
+// DelGroupFromDb - Utility to delete the Group from the device
+func (d *Device) DelGroupFromDb(groupID uint32) {
+	_ = db.DelGroup(d.ID, groupID)
+}
+
+//RestoreGroupsFromDb - restores all groups from DB
+func (d *Device) RestoreGroupsFromDb() {
+	logger.Info(ctx, "Restoring Groups")
+	groups, _ := db.GetGroups(d.ID)
+	for _, group := range groups {
+		b, ok := group.Value.([]byte)
+		if !ok {
+			logger.Warn(ctx, "The value type is not []byte")
+			continue
+		}
+		d.CreateGroupFromString(b)
+	}
+}
+
+//CreateGroupFromString - Forms group struct from json string
+func (d *Device) CreateGroupFromString(b []byte) {
+	var group of.Group
+	if err := json.Unmarshal(b, &group); err == nil {
+		if _, ok := d.groups.Load(group.GroupID); !ok {
+			logger.Debugw(ctx, "Adding Group From Db", log.Fields{"GroupId": group.GroupID})
+			d.groups.Store(group.GroupID, &group)
+		} else {
+			logger.Warnw(ctx, "Duplicate Group", log.Fields{"GroupId": group.GroupID})
+		}
+	} else {
+		logger.Warn(ctx, "Unmarshal failed")
+	}
+}
+
+// AddMeter to add meter
+func (d *Device) AddMeter(meter *of.Meter) error {
+	d.meterLock.Lock()
+	defer d.meterLock.Unlock()
+	if _, ok := d.meters[meter.ID]; ok {
+		return errors.New("Duplicate Meter")
+	}
+	d.meters[meter.ID] = meter
+	go d.AddMeterToDb(meter)
+	return nil
+}
+
+// GetMeter to get meter
+func (d *Device) GetMeter(id uint32) (*of.Meter, error) {
+	d.meterLock.RLock()
+	defer d.meterLock.RUnlock()
+	if m, ok := d.meters[id]; ok {
+		return m, nil
+	}
+	return nil, errors.New("Meter Not Found")
+}
+
+// DelMeter to delete meter
+func (d *Device) DelMeter(meter *of.Meter) bool {
+	d.meterLock.Lock()
+	defer d.meterLock.Unlock()
+	if _, ok := d.meters[meter.ID]; ok {
+		delete(d.meters, meter.ID)
+		go d.DelMeterFromDb(meter.ID)
+		return true
+	}
+	return false
+}
+
+// AddMeterToDb is utility to add the Group to the device
+func (d *Device) AddMeterToDb(meter *of.Meter) {
+	if b, err := json.Marshal(meter); err == nil {
+		if err = db.PutDeviceMeter(d.ID, meter.ID, string(b)); err != nil {
+			logger.Errorw(ctx, "Write Meter to DB failed", log.Fields{"device": d.ID, "meterID": meter.ID, "Reason": err})
+		}
+	}
+}
+
+// DelMeterFromDb to delete meter from db
+func (d *Device) DelMeterFromDb(id uint32) {
+	_ = db.DelDeviceMeter(d.ID, id)
+}
+
+// RestoreMetersFromDb to restore meters from db
+func (d *Device) RestoreMetersFromDb() {
+	meters, _ := db.GetDeviceMeters(d.ID)
+	for _, meter := range meters {
+		b, ok := meter.Value.([]byte)
+		if !ok {
+			logger.Warn(ctx, "The value type is not []byte")
+			continue
+		}
+		d.CreateMeterFromString(b)
+	}
+}
+
+// CreateMeterFromString to create meter from string
+func (d *Device) CreateMeterFromString(b []byte) {
+	var meter of.Meter
+	if err := json.Unmarshal(b, &meter); err == nil {
+		if _, ok := d.meters[meter.ID]; !ok {
+			logger.Debugw(ctx, "Adding Meter From Db", log.Fields{"ID": meter.ID})
+			d.meters[meter.ID] = &meter
+		} else {
+			logger.Warnw(ctx, "Duplicate Meter", log.Fields{"ID": meter.ID})
+		}
+	} else {
+		logger.Warn(ctx, "Unmarshal failed")
+	}
+}
+
+// VolthaClient to get voltha client
+func (d *Device) VolthaClient() voltha.VolthaServiceClient {
+	return d.vclientHolder.Get()
+}
+
+// AddPort to add the port as requested by the device/VOLTHA
+// Inform the application if the port is successfully added
+func (d *Device) AddPort(id uint32, name string) error {
+	d.portLock.Lock()
+	defer d.portLock.Unlock()
+
+	if _, ok := d.PortsByID[id]; ok {
+		return errors.New("Duplicate port")
+	}
+	if _, ok := d.PortsByName[name]; ok {
+		return errors.New("Duplicate port")
+	}
+
+	p := NewDevicePort(id, name)
+	d.PortsByID[id] = p
+	d.PortsByName[name] = p
+	d.WritePortToDb(p)
+	GetController().PortAddInd(d.ID, p.ID, p.Name)
+	logger.Infow(ctx, "Added Port", log.Fields{"Device": d.ID, "Port": id})
+	return nil
+}
+
+// DelPort to delete the port as requested by the device/VOLTHA
+// Inform the application if the port is successfully deleted
+func (d *Device) DelPort(id uint32) error {
+
+	p := d.GetPortByID(id)
+	if p == nil {
+		return errors.New("Unknown Port")
+	}
+	if p.State == PortStateUp {
+		GetController().PortDownInd(d.ID, p.Name)
+	}
+	d.portLock.Lock()
+	defer d.portLock.Unlock()
+
+	GetController().PortDelInd(d.ID, p.Name)
+	delete(d.PortsByID, p.ID)
+	delete(d.PortsByName, p.Name)
+	d.DelPortFromDb(p.ID)
+	logger.Infow(ctx, "Deleted Port", log.Fields{"Device": d.ID, "Port": id})
+	return nil
+}
+
+// UpdatePortByName is utility to update the port by Name
+func (d *Device) UpdatePortByName(name string, port uint32) {
+	d.portLock.Lock()
+	defer d.portLock.Unlock()
+
+	p, ok := d.PortsByName[name]
+	if !ok {
+		return
+	}
+	delete(d.PortsByID, p.ID)
+	p.ID = port
+	d.PortsByID[port] = p
+	d.WritePortToDb(p)
+	GetController().PortUpdateInd(d.ID, p.Name, p.ID)
+	logger.Infow(ctx, "Updated Port", log.Fields{"Device": d.ID, "Port": p.ID, "PortName": name})
+}
+
+// GetPortName to get the name of the port by its id
+func (d *Device) GetPortName(id uint32) (string, error) {
+	d.portLock.RLock()
+	defer d.portLock.RUnlock()
+
+	if p, ok := d.PortsByID[id]; ok {
+		return p.Name, nil
+	}
+	logger.Errorw(ctx, "Port not found", log.Fields{"port": id})
+	return "", errors.New("Unknown Port ID")
+}
+
+// GetPortByID is utility to retrieve the port by ID
+func (d *Device) GetPortByID(id uint32) *DevicePort {
+	d.portLock.RLock()
+	defer d.portLock.RUnlock()
+
+	p, ok := d.PortsByID[id]
+	if ok {
+		return p
+	}
+	return nil
+}
+
+// GetPortByName is utility to retrieve the port by Name
+func (d *Device) GetPortByName(name string) *DevicePort {
+	d.portLock.RLock()
+	defer d.portLock.RUnlock()
+
+	p, ok := d.PortsByName[name]
+	if ok {
+		return p
+	}
+	return nil
+}
+
+// GetPortState to get the state of the port by name
+func (d *Device) GetPortState(name string) (PortState, error) {
+	d.portLock.RLock()
+	defer d.portLock.RUnlock()
+
+	if p, ok := d.PortsByName[name]; ok {
+		return p.State, nil
+	}
+	return PortStateDown, errors.New("Unknown Port ID")
+}
+
+// GetPortID to get the port-id by the port name
+func (d *Device) GetPortID(name string) (uint32, error) {
+	d.portLock.RLock()
+	defer d.portLock.RUnlock()
+
+	if p, ok := d.PortsByName[name]; ok {
+		return p.ID, nil
+	}
+	return 0, errors.New("Unknown Port ID")
+
+}
+
+// WritePortToDb to add the port to the database
+func (d *Device) WritePortToDb(port *DevicePort) {
+	port.Version = database.PresentVersionMap[database.DevicePortPath]
+	if b, err := json.Marshal(port); err == nil {
+		if err = db.PutPort(d.ID, port.ID, string(b)); err != nil {
+			logger.Errorw(ctx, "Write port to DB failed", log.Fields{"device": d.ID, "port": port.ID, "Reason": err})
+		}
+	}
+}
+
+// DelPortFromDb to delete port from database
+func (d *Device) DelPortFromDb(id uint32) {
+	_ = db.DelPort(d.ID, id)
+}
+
+// RestorePortsFromDb to restore ports from database
+func (d *Device) RestorePortsFromDb() {
+	ports, _ := db.GetPorts(d.ID)
+	for _, port := range ports {
+		b, ok := port.Value.([]byte)
+		if !ok {
+			logger.Warn(ctx, "The value type is not []byte")
+			continue
+		}
+		d.CreatePortFromString(b)
+	}
+}
+
+// CreatePortFromString to create port from string
+func (d *Device) CreatePortFromString(b []byte) {
+	var port DevicePort
+	if err := json.Unmarshal(b, &port); err == nil {
+		if _, ok := d.PortsByID[port.ID]; !ok {
+			logger.Debugw(ctx, "Adding Port From Db", log.Fields{"ID": port.ID})
+			d.PortsByID[port.ID] = &port
+			d.PortsByName[port.Name] = &port
+			GetController().PortAddInd(d.ID, port.ID, port.Name)
+		} else {
+			logger.Warnw(ctx, "Duplicate Port", log.Fields{"ID": port.ID})
+		}
+	} else {
+		logger.Warn(ctx, "Unmarshal failed")
+	}
+}
+
+// Delete : OLT Delete functionality yet to be implemented. IDeally all of the
+// resources should have been removed by this time. It is an error
+// scenario if the OLT has resources associated with it.
+func (d *Device) Delete() {
+	d.StopAll()
+}
+
+// Stop to stop the task
+func (d *Device) Stop() {
+}
+
+// ConnectInd is called when the connection between VGC and the VOLTHA is
+// restored. This will perform audit of the device post reconnection
+func (d *Device) ConnectInd(ctx context.Context, discType intf.DiscoveryType) {
+	logger.Warnw(ctx, "Audit Upon Connection Establishment", log.Fields{"Device": d.ID, "State": d.State})
+	ctx1, cancel := context.WithCancel(ctx)
+	d.cancel = cancel
+	d.ctx = ctx1
+	d.Tasks.Initialize(ctx1)
+
+	logger.Warnw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
+	d.State = DeviceStateUP
+	GetController().DeviceUpInd(d.ID)
+
+	logger.Warnw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
+	t := NewAuditDevice(d, AuditEventDeviceDisc)
+	d.Tasks.AddTask(t)
+
+	t1 := NewAuditTablesTask(d)
+	d.Tasks.AddTask(t1)
+
+	t2 := NewPendingProfilesTask(d)
+	d.Tasks.AddTask(t2)
+
+	go d.synchronizeDeviceTables()
+}
+
+func (d *Device) synchronizeDeviceTables() {
+
+	tick := time.NewTicker(deviceTableSyncDuration)
+loop:
+	for {
+		select {
+		case <-d.ctx.Done():
+			logger.Warnw(d.ctx, "Context Done. Cancelling Periodic Audit", log.Fields{"Context": ctx, "Device": d.ID, "DeviceSerialNum": d.SerialNum})
+			break loop
+		case <-tick.C:
+			t1 := NewAuditTablesTask(d)
+			d.Tasks.AddTask(t1)
+		}
+	}
+	tick.Stop()
+}
+
+// DeviceUpInd is called when the logical device state changes to UP. This will perform audit of the device post reconnection
+func (d *Device) DeviceUpInd() {
+	logger.Warnw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
+	d.State = DeviceStateUP
+	GetController().DeviceUpInd(d.ID)
+
+	logger.Warnw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
+	t := NewAuditDevice(d, AuditEventDeviceDisc)
+	d.Tasks.AddTask(t)
+
+	t1 := NewAuditTablesTask(d)
+	d.Tasks.AddTask(t1)
+
+	t2 := NewPendingProfilesTask(d)
+	d.Tasks.AddTask(t2)
+}
+
+// DeviceDownInd is called when the logical device state changes to Down.
+func (d *Device) DeviceDownInd() {
+	logger.Warnw(ctx, "Device State change Ind: Down", log.Fields{"Device": d.ID})
+	d.State = DeviceStateDOWN
+	GetController().DeviceDownInd(d.ID)
+}
+
+// DeviceRebootInd is called when the logical device is rebooted.
+func (d *Device) DeviceRebootInd() {
+	logger.Warnw(ctx, "Device State change Ind: Rebooted", log.Fields{"Device": d.ID})
+
+	if d.State == DeviceStateREBOOTED {
+		d.State = DeviceStateREBOOTED
+		logger.Warnw(ctx, "Ignoring Device State change Ind: REBOOT, Device Already in REBOOT state", log.Fields{"Device": d.ID, "SeralNo": d.SerialNum})
+		return
+	}
+
+	d.State = DeviceStateREBOOTED
+	GetController().SetRebootInProgressForDevice(d.ID)
+	GetController().DeviceRebootInd(d.ID, d.SerialNum, d.SouthBoundID)
+	d.ReSetAllPortStates()
+}
+
+// DeviceDisabledInd is called when the logical device is disabled
+func (d *Device) DeviceDisabledInd() {
+	logger.Warnw(ctx, "Device State change Ind: Disabled", log.Fields{"Device": d.ID})
+	d.State = DeviceStateDISABLED
+	GetController().DeviceDisableInd(d.ID)
+}
+
+//ReSetAllPortStates - Set all logical device port status to DOWN
+func (d *Device) ReSetAllPortStates() {
+	logger.Warnw(ctx, "Resetting all Ports State to DOWN", log.Fields{"Device": d.ID, "State": d.State})
+
+	d.portLock.Lock()
+	defer d.portLock.Unlock()
+
+	for _, port := range d.PortsByID {
+		if port.State != PortStateDown {
+			logger.Infow(ctx, "Resetting Port State to DOWN", log.Fields{"Device": d.ID, "Port": port})
+			GetController().PortDownInd(d.ID, port.Name)
+			port.State = PortStateDown
+			d.WritePortToDb(port)
+		}
+	}
+}
+
+//ReSetAllPortStatesInDb - Set all logical device port status to DOWN in DB and skip indication to application
+func (d *Device) ReSetAllPortStatesInDb() {
+	logger.Warnw(ctx, "Resetting all Ports State to DOWN In DB", log.Fields{"Device": d.ID, "State": d.State})
+
+	d.portLock.Lock()
+	defer d.portLock.Unlock()
+
+	for _, port := range d.PortsByID {
+		if port.State != PortStateDown {
+			logger.Infow(ctx, "Resetting Port State to DOWN and Write to DB", log.Fields{"Device": d.ID, "Port": port})
+			port.State = PortStateDown
+			d.WritePortToDb(port)
+		}
+	}
+}
+
+// ProcessPortUpdate deals with the change in port id (ONU movement) and taking action
+// to update only when the port state is DOWN
+func (d *Device) ProcessPortUpdate(portName string, port uint32, state uint32) {
+	if p := d.GetPortByName(portName); p != nil {
+		if p.ID != port {
+			logger.Infow(ctx, "Port ID update indication", log.Fields{"Port": p.Name, "Old PortID": p.ID, "New Port ID": port})
+			if p.State != PortStateDown {
+				logger.Errorw(ctx, "Port ID update failed. Port State UP", log.Fields{"Port": p})
+				return
+			}
+			d.UpdatePortByName(portName, port)
+			logger.Errorw(ctx, "Port ID Updated", log.Fields{"Port": p})
+		}
+		d.ProcessPortState(port, state)
+	}
+}
+
+// ***Operations Performed on Port state Transitions***
+//
+// |-----------------------------------------------------------------------------|
+// |  State             |   Action                                               |
+// |--------------------|--------------------------------------------------------|
+// | UP                 | UNI - Trigger Flow addition for service configured     |
+// |                    | NNI - Trigger Flow addition for vnets & mvlan profiles |
+// |                    |                                                        |
+// | DOWN               | UNI - Trigger Flow deletion for service configured     |
+// |                    | NNI - Trigger Flow deletion for vnets & mvlan profiles |
+// |                    |                                                        |
+// |-----------------------------------------------------------------------------|
+//
+
+// ProcessPortState deals with the change in port status and taking action
+// based on the new state and the old state
+func (d *Device) ProcessPortState(port uint32, state uint32) {
+	if d.State != DeviceStateUP && !util.IsNniPort(port) {
+		logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
+		return
+	}
+	if p := d.GetPortByID(port); p != nil {
+		logger.Infow(ctx, "Port State Processing", log.Fields{"Received": state, "Current": p.State})
+
+		// Avoid blind initialization as the current tasks in the queue will be lost
+		// Eg: Service Del followed by Port Down - The flows will be dangling
+		// Eg: NNI Down followed by NNI UP - Mcast data flows will be dangling
+		p.Tasks.CheckAndInitialize(d.ctx)
+		if state == uint32(ofp.OfpPortState_OFPPS_LIVE) && p.State == PortStateDown {
+			// Transition from DOWN to UP
+			logger.Infow(ctx, "Port State Change to UP", log.Fields{"Device": d.ID, "Port": port})
+			GetController().PortUpInd(d.ID, p.Name)
+			p.State = PortStateUp
+			d.WritePortToDb(p)
+		} else if (state != uint32(ofp.OfpPortState_OFPPS_LIVE)) && (p.State != PortStateDown) {
+			// Transition from UP to Down
+			logger.Infow(ctx, "Port State Change to Down", log.Fields{"Device": d.ID, "Port": port})
+			GetController().PortDownInd(d.ID, p.Name)
+			p.State = PortStateDown
+			d.WritePortToDb(p)
+		} else {
+			logger.Warnw(ctx, "Dropping Port Ind: No Change in Port State", log.Fields{"PortName": p.Name, "ID": port, "Device": d.ID, "PortState": p.State, "IncomingState": state})
+		}
+	}
+}
+
+// ProcessPortStateAfterReboot - triggers the port state indication to sort out configu mismatch due to reboot
+func (d *Device) ProcessPortStateAfterReboot(port uint32, state uint32) {
+	if d.State != DeviceStateUP && !util.IsNniPort(port) {
+		logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
+		return
+	}
+	if p := d.GetPortByID(port); p != nil {
+		logger.Infow(ctx, "Port State Processing after Reboot", log.Fields{"Received": state, "Current": p.State})
+		p.Tasks.Initialize(d.ctx)
+		if p.State == PortStateUp {
+			logger.Infow(ctx, "Port State: UP", log.Fields{"Device": d.ID, "Port": port})
+			GetController().PortUpInd(d.ID, p.Name)
+		} else if p.State == PortStateDown {
+			logger.Infow(ctx, "Port State: Down", log.Fields{"Device": d.ID, "Port": port})
+			GetController().PortDownInd(d.ID, p.Name)
+		}
+	}
+}
+
+// ChangeEvent : Change event brings in ports related changes such as addition/deletion
+// or modification where the port status change up/down is indicated to the
+// controller
+func (d *Device) ChangeEvent(event *ofp.ChangeEvent) error {
+	cet := NewChangeEventTask(d.ctx, event, d)
+	d.AddTask(cet)
+	return nil
+}
+
+// PacketIn handle the incoming packet-in and deliver to the application for the
+// actual processing
+func (d *Device) PacketIn(pkt *ofp.PacketIn) {
+	logger.Debugw(ctx, "Received a Packet-In", log.Fields{"Device": d.ID})
+	if pkt.PacketIn.Reason != ofp.OfpPacketInReason_OFPR_ACTION {
+		logger.Warnw(ctx, "Unsupported PacketIn Reason", log.Fields{"Reason": pkt.PacketIn.Reason})
+		return
+	}
+	data := pkt.PacketIn.Data
+	port := PacketInGetPort(pkt.PacketIn)
+	if pName, err := d.GetPortName(port); err == nil {
+		GetController().PacketInInd(d.ID, pName, data)
+	} else {
+		logger.Warnw(ctx, "Unknown Port", log.Fields{"Reason": err.Error()})
+	}
+}
+
+// PacketInGetPort to get the port on which the packet-in is reported
+func PacketInGetPort(pkt *ofp.OfpPacketIn) uint32 {
+	for _, field := range pkt.Match.OxmFields {
+		if field.OxmClass == ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
+			if ofbField, ok := field.Field.(*ofp.OfpOxmField_OfbField); ok {
+				if ofbField.OfbField.Type == ofp.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT {
+					if port, ok := ofbField.OfbField.Value.(*ofp.OfpOxmOfbField_Port); ok {
+						return port.Port
+					}
+				}
+			}
+		}
+	}
+	return 0
+}
+
+// PacketOutReq receives the packet out request from the application via the
+// controller. The interface from the application uses name as the identity.
+func (d *Device) PacketOutReq(outport string, inport string, data []byte, isCustomPkt bool) error {
+	inp, err := d.GetPortID(inport)
+	if err != nil {
+		return errors.New("Unknown inport")
+	}
+	outp, err1 := d.GetPortID(outport)
+	if err1 != nil {
+		return errors.New("Unknown outport")
+	}
+	logger.Debugw(ctx, "Sending packet out", log.Fields{"Device": d.ID, "Inport": inport, "Outport": outport})
+	return d.SendPacketOut(outp, inp, data, isCustomPkt)
+}
+
+// SendPacketOut is responsible for building the OF structure and send the
+// packet-out to the VOLTHA
+func (d *Device) SendPacketOut(outport uint32, inport uint32, data []byte, isCustomPkt bool) error {
+	pout := &ofp.PacketOut{}
+	pout.Id = d.ID
+	opout := &ofp.OfpPacketOut{}
+	pout.PacketOut = opout
+	opout.InPort = inport
+	opout.Data = data
+	opout.Actions = []*ofp.OfpAction{
+		{
+			Type: ofp.OfpActionType_OFPAT_OUTPUT,
+			Action: &ofp.OfpAction_Output{
+				Output: &ofp.OfpActionOutput{
+					Port:   outport,
+					MaxLen: 65535,
+				},
+			},
+		},
+	}
+	d.packetOutChannel <- pout
+	return nil
+}
+
+// UpdateFlows receives the flows in the form that is implemented
+// in the VGC and transforms them to the OF format. This is handled
+// as a port of the task that is enqueued to do the same.
+func (d *Device) UpdateFlows(flow *of.VoltFlow, devPort *DevicePort) {
+	t := NewAddFlowsTask(d.ctx, flow, d)
+	logger.Debugw(ctx, "Port Context", log.Fields{"Ctx": devPort.GetContext()})
+	// check if port isNni , if yes flows will be added to device port queues.
+	if util.IsNniPort(devPort.ID) {
+		// Adding the flows to device port queues.
+		devPort.AddTask(t)
+		return
+	}
+	// If the flowHash is enabled then add the flows to the flowhash generated queues.
+	flowQueue := d.getAndAddFlowQueueForUniID(uint32(devPort.ID))
+	if flowQueue != nil {
+		logger.Debugw(ctx, "flowHashQId", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID})
+		flowQueue.AddTask(t)
+		logger.Debugw(ctx, "Tasks Info", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID, "Total": flowQueue.TotalTasks(), "Pending": flowQueue.NumPendingTasks()})
+	} else {
+		//FlowThrotling disabled, add to the device port queue
+		devPort.AddTask(t)
+		return
+	}
+}
+
+// UpdateGroup to update group info
+func (d *Device) UpdateGroup(group *of.Group, devPort *DevicePort) {
+	task := NewModGroupTask(d.ctx, group, d)
+	logger.Debugw(ctx, "NNI Port Context", log.Fields{"Ctx": devPort.GetContext()})
+	devPort.AddTask(task)
+}
+
+// ModMeter for mod meter task
+func (d *Device) ModMeter(command of.MeterCommand, meter *of.Meter, devPort *DevicePort) {
+	if command == of.MeterCommandAdd {
+		if _, err := d.GetMeter(meter.ID); err == nil {
+			logger.Debugw(ctx, "Meter already added", log.Fields{"ID": meter.ID})
+			return
+		}
+	}
+	t := NewModMeterTask(d.ctx, command, meter, d)
+	devPort.AddTask(t)
+}
+
+func (d *Device) getAndAddFlowQueueForUniID(id uint32) *UniIDFlowQueue {
+	d.flowQueueLock.RLock()
+	//If flowhash is 0 that means flowhash throttling is disabled, return nil
+	if d.flowHash == 0 {
+		d.flowQueueLock.RUnlock()
+		return nil
+	}
+	flowHashID := id % uint32(d.flowHash)
+	if value, found := d.flowQueue[uint32(flowHashID)]; found {
+		d.flowQueueLock.RUnlock()
+		return value
+	}
+	d.flowQueueLock.RUnlock()
+	logger.Debugw(ctx, "Flow queue not found creating one", log.Fields{"uniid": id, "hash": flowHashID})
+
+	return d.addFlowQueueForUniID(id)
+}
+
+func (d *Device) addFlowQueueForUniID(id uint32) *UniIDFlowQueue {
+
+	d.flowQueueLock.Lock()
+	defer d.flowQueueLock.Unlock()
+	flowHashID := id % uint32(d.flowHash)
+	flowQueue := NewUniIDFlowQueue(uint32(flowHashID))
+	flowQueue.Tasks.Initialize(d.ctx)
+	d.flowQueue[flowHashID] = flowQueue
+	return flowQueue
+}
+
+// SetFlowHash sets the device flow hash and writes to the DB.
+func (d *Device) SetFlowHash(hash uint32) {
+	d.flowQueueLock.Lock()
+	defer d.flowQueueLock.Unlock()
+
+	d.flowHash = hash
+	d.writeFlowHashToDB()
+}
+
+func (d *Device) writeFlowHashToDB() {
+	hash, err := json.Marshal(d.flowHash)
+	if err != nil {
+		logger.Errorw(ctx, "failed to marshal flow hash", log.Fields{"hash": d.flowHash})
+		return
+	}
+	if err := db.PutFlowHash(d.ID, string(hash)); err != nil {
+		logger.Errorw(ctx, "Failed to add flow hash to DB", log.Fields{"device": d.ID, "hash": d.flowHash})
+	}
+}
+
+//isSBOperAllowed - determins if the SB operation is allowed based on device state & force flag
+func (d *Device) isSBOperAllowed(forceAction bool) bool {
+
+	if d.State == DeviceStateUP {
+		return true
+	}
+
+	if d.State == DeviceStateDISABLED && forceAction {
+		return true
+	}
+
+	return false
+}
+
+func (d *Device) triggerFlowNotification(cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error) {
+	flow, _ := d.GetFlow(cookie)
+	d.triggerFlowResultNotification(cookie, flow, oper, bwDetails, err)
+}
+
+func (d *Device) triggerFlowResultNotification(cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error) {
+
+	statusCode, statusMsg := infraerror.GetErrorInfo(err)
+	success := isFlowOperSuccess(statusCode, oper)
+
+	updateFlow := func(cookie uint64, state int, reason string) {
+		if dbFlow, ok := d.GetFlow(cookie); ok {
+			dbFlow.State = uint8(state)
+			dbFlow.ErrorReason = reason
+			d.AddFlowToDb(dbFlow)
+		}
+	}
+
+	//Update flow results
+	// Add - Update Success or Failure status with reason
+	// Del - Delete entry from DB on success else update error reason
+	if oper == of.CommandAdd {
+		state := of.FlowAddSuccess
+		reason := ""
+		if !success {
+			state = of.FlowAddFailure
+			reason = statusMsg
+		}
+		updateFlow(cookie, state, reason)
+		logger.Debugw(ctx, "Updated Flow to DB", log.Fields{"Cookie": cookie, "State": state})
+	} else {
+		if success && flow != nil {
+			if err := d.DelFlow(flow); err != nil {
+				logger.Warnw(ctx, "Delete Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
+			}
+		} else if !success {
+			updateFlow(cookie, of.FlowDelFailure, statusMsg)
+		}
+	}
+
+	flowResult := intf.FlowStatus{
+		Cookie:         strconv.FormatUint(cookie, 10),
+		Device:         d.ID,
+		FlowModType:    oper,
+		Flow:           flow,
+		Status:         statusCode,
+		Reason:         statusMsg,
+		AdditionalData: bwDetails,
+	}
+
+	logger.Infow(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
+	GetController().ProcessFlowModResultIndication(flowResult)
+}
diff --git a/internal/pkg/controller/modgroup.go b/internal/pkg/controller/modgroup.go
new file mode 100644
index 0000000..49da920
--- /dev/null
+++ b/internal/pkg/controller/modgroup.go
@@ -0,0 +1,133 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package controller
+
+import (
+	"context"
+	"time"
+
+	infraerror "voltha-go-controller/internal/pkg/errorcodes"
+	infraerrorcode "voltha-go-controller/internal/pkg/errorcodes/service"
+
+	"voltha-go-controller/internal/pkg/of"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"google.golang.org/grpc/codes"
+)
+
+//ModGroupTask - Group Modification Task
+type ModGroupTask struct {
+	taskID    uint8
+	ctx       context.Context
+	group     *of.Group
+	device    *Device
+	timestamp string
+}
+
+//NewModGroupTask - Initializes new group task
+func NewModGroupTask(ctx context.Context, group *of.Group, device *Device) *ModGroupTask {
+	var grp ModGroupTask
+	grp.device = device
+	grp.group = group
+	grp.ctx = ctx
+	tstamp := (time.Now()).Format(time.RFC3339Nano)
+	grp.timestamp = tstamp
+	return &grp
+}
+
+//Name - Name of task
+func (grp *ModGroupTask) Name() string {
+	return "Group Mod Task"
+}
+
+//TaskID - Task id
+func (grp *ModGroupTask) TaskID() uint8 {
+	return grp.taskID
+}
+
+// Timestamp to return timestamp of the task
+func (grp *ModGroupTask) Timestamp() string {
+	return grp.timestamp
+}
+
+//Stop - task stop
+func (grp *ModGroupTask) Stop() {
+}
+
+//Start - task start
+func (grp *ModGroupTask) Start(ctx context.Context, taskID uint8) error {
+	var err error
+	grp.taskID = taskID
+	grp.ctx = ctx
+	i := 0
+
+	processGroupModResult := func(err error) bool {
+
+		statusCode, statusMsg := infraerror.GetErrorInfo(err)
+
+		if infraerrorcode.ErrorCode(statusCode) != infraerrorcode.ErrOk {
+
+			if grp.group.Command == of.GroupCommandAdd && (codes.Code(statusCode) == codes.AlreadyExists) {
+				logger.Warnw(ctx, "Update Group Table Failed - Ignoring since Group Already exists",
+					log.Fields{"groupId": grp.group.GroupID, "groupOp": grp.group.Command, "Status": statusCode, "errorReason": statusMsg})
+				return true
+			}
+			logger.Errorw(ctx, "Update Group Table Failed",
+				log.Fields{"groupId": grp.group.GroupID, "groupOp": grp.group.Command, "Status": statusCode, "errorReason": statusMsg})
+			return false
+		}
+		logger.Infow(ctx, "Group Mod Result", log.Fields{"groupID": grp.group.GroupID, "Error Code": statusCode})
+		return true
+
+	}
+
+	if grp.group.Command != of.GroupCommandDel {
+		grp.group.State = of.GroupOperPending
+		grp.device.UpdateGroupEntry(grp.group)
+	} else {
+		grp.device.DelGroupEntry(grp.group)
+	}
+
+	if !grp.device.isSBOperAllowed(grp.group.ForceAction) {
+		logger.Errorw(ctx, "Skipping Group Table Update", log.Fields{"Reason": "Device State not UP", "State": grp.device.State, "GroupID": grp.group.GroupID, "Operation": grp.group.Command})
+		return nil
+	}
+
+	groupUpdate := of.CreateGroupTableUpdate(grp.group)
+	if vc := grp.device.VolthaClient(); vc != nil {
+
+		//Retry on group mod failure
+		//Retry attempts = 3
+		//Delay between retry = 100ms. Total Possible Delay = 200ms
+		for {
+			logger.Infow(ctx, "Group Mod Triggered", log.Fields{"GroupId": grp.group.GroupID, "Attempt": i})
+			_, err = vc.UpdateLogicalDeviceFlowGroupTable(grp.ctx, groupUpdate)
+			if isSuccess := processGroupModResult(err); isSuccess {
+				break
+			}
+			i++
+			if i < 3 {
+				time.Sleep(100 * time.Millisecond)
+				continue
+			}
+			logger.Errorw(ctx, "Update Group Table Failed on all 3 attempts. Dropping request", log.Fields{"GroupId": grp.group.GroupID, "Bucket": grp.group.Buckets})
+			break
+
+		}
+		return err
+	}
+	logger.Error(ctx, "Update Group Flow Table Failed: Voltha Client Unavailable")
+	return nil
+}
diff --git a/internal/pkg/controller/modmeter.go b/internal/pkg/controller/modmeter.go
new file mode 100644
index 0000000..04b1e04
--- /dev/null
+++ b/internal/pkg/controller/modmeter.go
@@ -0,0 +1,124 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package controller
+
+import (
+	"context"
+	"time"
+
+	"voltha-go-controller/internal/pkg/of"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+)
+
+// ModMeterTask structure
+type ModMeterTask struct {
+	taskID    uint8
+	ctx       context.Context
+	command   of.MeterCommand
+	meter     *of.Meter
+	device    *Device
+	timestamp string
+}
+
+// NewModMeterTask is the constructor for ModMeterTask
+func NewModMeterTask(ctx context.Context, command of.MeterCommand, meter *of.Meter, device *Device) *ModMeterTask {
+	var mmt ModMeterTask
+	mmt.device = device
+	mmt.meter = meter
+	mmt.ctx = ctx
+	mmt.command = command
+	tstamp := (time.Now()).Format(time.RFC3339Nano)
+	mmt.timestamp = tstamp
+	return &mmt
+}
+
+// Name returns name of the task
+func (mmt *ModMeterTask) Name() string {
+	return "Add Flows Task"
+}
+
+// TaskID returns task Id of the task
+func (mmt *ModMeterTask) TaskID() uint8 {
+	return mmt.taskID
+}
+
+// Timestamp returns time stamp for the task
+func (mmt *ModMeterTask) Timestamp() string {
+	return mmt.timestamp
+}
+
+// Stop to stop the task
+func (mmt *ModMeterTask) Stop() {
+}
+
+// Start to start the task
+func (mmt *ModMeterTask) Start(ctx context.Context, taskID uint8) error {
+	mmt.taskID = taskID
+	mmt.ctx = ctx
+
+	//Temp commenting Sync response handling
+	//triggerMeterNotification := func(err error) {
+
+	// 	statusCode, statusMsg := infraerror.GetErrorInfo(err)
+
+	// 	if mmt.command == of.MeterCommandAdd && infraerrorcode.ErrorCode(statusCode) != infraerrorcode.ErrOk {
+	// 		mmt.meter.State = of.MeterOperFailure
+	// 		mmt.meter.ErrorReason = statusMsg
+
+	// 		logger.Errorw(ctx, "Update Meter Table Failed",
+	// 			log.Fields{"meterId": mmt.meter.ID, "meterOp": mmt.command, "Status": statusCode, "errorReason": statusMsg})
+	// 		go mmt.device.AddMeterToDb(mmt.meter)
+	// 	} else {
+	// 		log.Infow("Meter Mod Result", log.Fields{"meterID": mmt.meter.ID, "Error Code": statusCode})
+	// 	}
+	// }
+
+	// First add/delete the flows first locally before passing them to actual device
+	if mmt.command == of.MeterCommandAdd {
+		mmt.meter.State = of.MeterOperPending
+		if err := mmt.device.AddMeter(mmt.meter); err != nil {
+			// Meter already exists so we dont have to do anything here
+			return nil
+		}
+	} else {
+		if !mmt.device.DelMeter(mmt.meter) {
+			// Meter doesn't exist so we dont have to do anything here
+			return nil
+		}
+	}
+
+	if mmt.device.State != DeviceStateUP {
+		logger.Errorw(ctx, "Update Meter Table Failed: Device State DOWN", log.Fields{"Reason": "Device State DOWN", "Meter": mmt.meter.ID})
+		return nil
+	}
+	meterMod, err := of.MeterUpdate(mmt.device.ID, mmt.command, mmt.meter)
+	if err != nil {
+		logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
+		return err
+	}
+
+	if vc := mmt.device.VolthaClient(); vc != nil {
+
+		if _, err = vc.UpdateLogicalDeviceMeterTable(mmt.ctx, meterMod); err != nil {
+			logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
+		}
+		//triggerMeterNotification(err)
+		return err
+	}
+
+	logger.Error(ctx, "Update Meter Table Failed: Voltha Client Unavailable")
+	return nil
+}
diff --git a/internal/pkg/controller/pendingprofiles.go b/internal/pkg/controller/pendingprofiles.go
new file mode 100644
index 0000000..6258f36
--- /dev/null
+++ b/internal/pkg/controller/pendingprofiles.go
@@ -0,0 +1,98 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package controller
+
+import (
+	"context"
+	"time"
+
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+)
+
+// PendingProfilesTask structure
+type PendingProfilesTask struct {
+	taskID uint8
+	ctx    context.Context
+	device *Device
+	ts     string
+}
+
+// NewPendingProfilesTask is constructor for PendingProfilesTask
+func NewPendingProfilesTask(device *Device) *PendingProfilesTask {
+	var ppt PendingProfilesTask
+	ppt.device = device
+	ppt.ts = (time.Now()).Format(time.RFC3339Nano)
+	return &ppt
+}
+
+// Name returns name of the task
+func (ppt *PendingProfilesTask) Name() string {
+	return "Pending Profiles Task"
+}
+
+// TaskID returns task id of the task
+func (ppt *PendingProfilesTask) TaskID() uint8 {
+	return ppt.taskID
+}
+
+// Timestamp returns timestamp of the task
+func (ppt *PendingProfilesTask) Timestamp() string {
+	return ppt.ts
+}
+
+// Stop to stop the task
+func (ppt *PendingProfilesTask) Stop() {
+}
+
+// Start is called by the framework and is responsible for implementing
+// the actual task.
+func (ppt *PendingProfilesTask) Start(ctx context.Context, taskID uint8) error {
+	logger.Warnw(ctx, "Pending Profiles Task Triggered", log.Fields{"Context": ctx, "taskID": taskID, "Device": ppt.device.ID})
+	ppt.taskID = taskID
+	ppt.ctx = ctx
+	var errInfo error
+
+	GetController().SetAuditFlags(ppt.device)
+
+	//Trigger Pending Service Delete Tasks
+	logger.Warnw(ctx, "Pending Service Delete Task Triggered", log.Fields{"Device": ppt.device.ID})
+	GetController().TriggerPendingProfileDeleteReq(ppt.device.ID)
+	logger.Warnw(ctx, "Pending Service Delete Task Completed", log.Fields{"Device": ppt.device.ID})
+
+	//Trigger Pending Migrate Services Tasks
+	logger.Warnw(ctx, "Pending Migrate Services Task Triggered", log.Fields{"Device": ppt.device.ID})
+	GetController().TriggerPendingMigrateServicesReq(ppt.device.ID)
+	logger.Warnw(ctx, "Pending Migrate Services Task Completed", log.Fields{"Device": ppt.device.ID})
+
+	GetController().ResetAuditFlags(ppt.device)
+
+	// Updating Mvlan Profile
+	logger.Warnw(ctx, "Pending Update Mvlan Task Triggered", log.Fields{"Device": ppt.device.ID})
+	if err := ppt.UpdateMvlanProfiles(); err != nil {
+		logger.Errorw(ctx, "Update Mvlan Profile Failed", log.Fields{"Reason": err.Error()})
+		errInfo = err
+	}
+	logger.Warnw(ctx, "Pending Update Mvlan Task Completed", log.Fields{"Device": ppt.device.ID})
+
+	logger.Warnw(ctx, "Pending Profiles Task Completed", log.Fields{"Context": ctx, "taskID": taskID, "Device": ppt.device.ID})
+	return errInfo
+}
+
+// UpdateMvlanProfiles to update the mvlan profiles
+func (ppt *PendingProfilesTask) UpdateMvlanProfiles() error {
+	GetController().UpdateMvlanProfiles(ppt.device.ID)
+	return nil
+}
diff --git a/internal/pkg/controller/utils.go b/internal/pkg/controller/utils.go
new file mode 100644
index 0000000..c07ac59
--- /dev/null
+++ b/internal/pkg/controller/utils.go
@@ -0,0 +1,283 @@
+/*
+* Copyright 2022-present Open Networking Foundation
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+
+package controller
+
+import (
+	"fmt"
+	"strings"
+	"sync"
+)
+
+var mu sync.Mutex
+var xid uint32 = 1
+
+// GetXid to get xid
+func GetXid() uint32 {
+	mu.Lock()
+	defer mu.Unlock()
+	xid++
+	return xid
+}
+
+// PadString for padding of string
+func PadString(value string, padSize int) string {
+	size := len(value)
+	nullsNeeded := padSize - size
+	null := fmt.Sprintf("%c", '\000')
+	padded := strings.Repeat(null, nullsNeeded)
+	return fmt.Sprintf("%s%s", value, padded)
+}
+
+/*
+// extractAction for extract action
+func extractAction(action ofp.IAction) *openflow_13.OfpAction {
+	var ofpAction openflow_13.OfpAction
+	switch action.GetType() {
+	case ofp.OFPATOutput:
+		var outputAction openflow_13.OfpAction_Output
+		loxiOutputAction := action.(*ofp.ActionOutput)
+		var output openflow_13.OfpActionOutput
+		output.Port = uint32(loxiOutputAction.GetPort())
+		/*
+			var maxLen uint16
+			maxLen = loxiOutputAction.GetMaxLen()
+			output.MaxLen = uint32(maxLen)
+
+		*/
+/*
+		output.MaxLen = 0
+		outputAction.Output = &output
+		ofpAction.Action = &outputAction
+		ofpAction.Type = openflow_13.OfpActionType_OFPAT_OUTPUT
+	case ofp.OFPATCopyTtlOut: //CopyTtltOut
+	case ofp.OFPATCopyTtlIn: //CopyTtlIn
+	case ofp.OFPATSetMplsTtl: //SetMplsTtl
+	case ofp.OFPATDecMplsTtl: //DecMplsTtl
+	case ofp.OFPATPushVLAN: //PushVlan
+		var pushVlan openflow_13.OfpAction_Push
+		loxiPushAction := action.(*ofp.ActionPushVlan)
+		var push openflow_13.OfpActionPush
+		push.Ethertype = uint32(loxiPushAction.Ethertype) //TODO This should be available in the fields
+		pushVlan.Push = &push
+		ofpAction.Type = openflow_13.OfpActionType_OFPAT_PUSH_VLAN
+		ofpAction.Action = &pushVlan
+	case ofp.OFPATPopVLAN: //PopVlan
+		ofpAction.Type = openflow_13.OfpActionType_OFPAT_POP_VLAN
+	case ofp.OFPATPushMpls: //PushMpls
+	case ofp.OFPATPopMpls: //PopMpls
+	case ofp.OFPATSetQueue: //SetQueue
+	case ofp.OFPATGroup: //ActionGroup
+	case ofp.OFPATSetNwTtl: //SetNwTtl
+	case ofp.OFPATDecNwTtl: //DecNwTtl
+	case ofp.OFPATSetField: //SetField
+		ofpAction.Type = openflow_13.OfpActionType_OFPAT_SET_FIELD
+		var ofpActionForSetField openflow_13.OfpAction_SetField
+		var ofpActionSetField openflow_13.OfpActionSetField
+		var ofpOxmField openflow_13.OfpOxmField
+		ofpOxmField.OxmClass = openflow_13.OfpOxmClass_OFPXMC_OPENFLOW_BASIC
+		var ofpOxmFieldForOfbField openflow_13.OfpOxmField_OfbField
+		var ofpOxmOfbField openflow_13.OfpOxmOfbField
+		loxiSetField := action.(*ofp.ActionSetField)
+		oxmName := loxiSetField.Field.GetOXMName()
+		switch oxmName {
+		//TODO handle set field sith other fields
+		case "vlan_vid":
+			ofpOxmOfbField.Type = openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID
+			var vlanVid openflow_13.OfpOxmOfbField_VlanVid
+			var VlanVid = loxiSetField.Field.GetOXMValue().(uint16)
+			vlanVid.VlanVid = uint32(VlanVid)
+
+			ofpOxmOfbField.Value = &vlanVid
+		}
+		ofpOxmFieldForOfbField.OfbField = &ofpOxmOfbField
+		ofpOxmField.Field = &ofpOxmFieldForOfbField
+		ofpActionSetField.Field = &ofpOxmField
+		ofpActionForSetField.SetField = &ofpActionSetField
+		ofpAction.Action = &ofpActionForSetField
+
+	case ofp.OFPATPushPbb: //PushPbb
+	case ofp.OFPATPopPbb: //PopPbb
+	case ofp.OFPATExperimenter: //Experimenter
+
+	}
+	return &ofpAction
+
+}
+
+// parseOxm for parsing OxmOfb field
+func parseOxm(ofbField *openflow_13.OfpOxmOfbField, DeviceID string) (goloxi.IOxm, uint16) {
+	switch ofbField.Type {
+	case openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT:
+		ofpInPort := ofp.NewOxmInPort()
+		val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_Port)
+		ofpInPort.Value = ofp.Port(val.Port)
+		return ofpInPort, 4
+	case openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_ETH_TYPE:
+		ofpEthType := ofp.NewOxmEthType()
+		val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_EthType)
+		ofpEthType.Value = ofp.EthernetType(val.EthType)
+		return ofpEthType, 2
+	case openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_IN_PHY_PORT:
+		ofpInPhyPort := ofp.NewOxmInPhyPort()
+		val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_PhysicalPort)
+		ofpInPhyPort.Value = ofp.Port(val.PhysicalPort)
+		return ofpInPhyPort, 4
+	case openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_IP_PROTO:
+		ofpIPProto := ofp.NewOxmIpProto()
+		val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_IpProto)
+		ofpIPProto.Value = ofp.IpPrototype(val.IpProto)
+		return ofpIPProto, 1
+	case openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_UDP_SRC:
+		ofpUDPSrc := ofp.NewOxmUdpSrc()
+		val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_UdpSrc)
+		ofpUDPSrc.Value = uint16(val.UdpSrc)
+		return ofpUDPSrc, 2
+	case openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_UDP_DST:
+		ofpUDPDst := ofp.NewOxmUdpDst()
+		val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_UdpDst)
+		ofpUDPDst.Value = uint16(val.UdpDst)
+		return ofpUDPDst, 2
+	case openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID:
+		ofpVlanVid := ofp.NewOxmVlanVid()
+		val := ofbField.GetValue()
+		if val == nil {
+			ofpVlanVid.Value = uint16(0)
+			return ofpVlanVid, 2
+		}
+		vlanID := val.(*openflow_13.OfpOxmOfbField_VlanVid)
+		if ofbField.HasMask {
+			ofpVlanVidMasked := ofp.NewOxmVlanVidMasked()
+			valMask := ofbField.GetMask()
+			vlanMask := valMask.(*openflow_13.OfpOxmOfbField_VlanVidMask)
+			if vlanID.VlanVid == 4096 && vlanMask.VlanVidMask == 4096 {
+				ofpVlanVidMasked.Value = uint16(vlanID.VlanVid)
+				ofpVlanVidMasked.ValueMask = uint16(vlanMask.VlanVidMask)
+			} else {
+				ofpVlanVidMasked.Value = uint16(vlanID.VlanVid) | 0x1000
+				ofpVlanVidMasked.ValueMask = uint16(vlanMask.VlanVidMask)
+
+			}
+			return ofpVlanVidMasked, 4
+		}
+		ofpVlanVid.Value = uint16(vlanID.VlanVid) | 0x1000
+		return ofpVlanVid, 2
+	case openflow_13.OxmOfbFieldTypes_OFPXMT_OFB_METADATA:
+		ofpMetadata := ofp.NewOxmMetadata()
+		val := ofbField.GetValue().(*openflow_13.OfpOxmOfbField_TableMetadata)
+		ofpMetadata.Value = val.TableMetadata
+		return ofpMetadata, 8
+	default:
+	}
+	return nil, 0
+}
+
+// parseInstructions for parsing of instructions
+func parseInstructions(ofpInstruction *openflow_13.OfpInstruction, DeviceID string) (ofp.IInstruction, uint16) {
+	instType := ofpInstruction.Type
+	data := ofpInstruction.GetData()
+	switch instType {
+	case ofp.OFPITWriteMetadata:
+		instruction := ofp.NewInstructionWriteMetadata()
+		instruction.Len = 24
+		metadata := data.(*openflow_13.OfpInstruction_WriteMetadata).WriteMetadata
+		instruction.Metadata = uint64(metadata.Metadata)
+		return instruction, 24
+	case ofp.OFPITMeter:
+		instruction := ofp.NewInstructionMeter()
+		instruction.Len = 8
+		meter := data.(*openflow_13.OfpInstruction_Meter).Meter
+		instruction.MeterId = meter.MeterId
+		return instruction, 8
+	case ofp.OFPITGotoTable:
+		instruction := ofp.NewInstructionGotoTable()
+		instruction.Len = 8
+		gotoTable := data.(*openflow_13.OfpInstruction_GotoTable).GotoTable
+		instruction.TableId = uint8(gotoTable.TableId)
+		return instruction, 8
+	case ofp.OFPITApplyActions:
+		instruction := ofp.NewInstructionApplyActions()
+		var instructionSize uint16
+		instructionSize = 8
+		//ofpActions := ofpInstruction.GetActions().Actions
+		var actions []goloxi.IAction
+		for _, ofpAction := range ofpInstruction.GetActions().Actions {
+			action, actionSize := parseAction(ofpAction, DeviceID)
+			actions = append(actions, action)
+			instructionSize += actionSize
+
+		}
+		instruction.Actions = actions
+		instruction.SetLen(instructionSize)
+		return instruction, instructionSize
+	}
+	//shouldn't have reached here :<
+	return nil, 0
+}
+
+// parseAction for parsing of actions
+func parseAction(ofpAction *openflow_13.OfpAction, DeviceID string) (goloxi.IAction, uint16) {
+	switch ofpAction.Type {
+	case openflow_13.OfpActionType_OFPAT_OUTPUT:
+		ofpOutputAction := ofpAction.GetOutput()
+		outputAction := ofp.NewActionOutput()
+		outputAction.Port = ofp.Port(ofpOutputAction.Port)
+		outputAction.MaxLen = uint16(ofpOutputAction.MaxLen)
+		outputAction.Len = 16
+		return outputAction, 16
+	case openflow_13.OfpActionType_OFPAT_PUSH_VLAN:
+		ofpPushVlanAction := ofp.NewActionPushVlan()
+		ofpPushVlanAction.Ethertype = uint16(ofpAction.GetPush().Ethertype)
+		ofpPushVlanAction.Len = 8
+		return ofpPushVlanAction, 8
+	case openflow_13.OfpActionType_OFPAT_POP_VLAN:
+		ofpPopVlanAction := ofp.NewActionPopVlan()
+		ofpPopVlanAction.Len = 8
+		return ofpPopVlanAction, 8
+	case openflow_13.OfpActionType_OFPAT_SET_FIELD:
+		ofpActionSetField := ofpAction.GetSetField()
+		setFieldAction := ofp.NewActionSetField()
+
+		iOxm, _ := parseOxm(ofpActionSetField.GetField().GetOfbField(), DeviceID)
+		setFieldAction.Field = iOxm
+		setFieldAction.Len = 16
+		return setFieldAction, 16
+	default:
+	}
+	return nil, 0
+}
+
+// parsePortStats for parsing of port stats
+func parsePortStats(port *voltha.LogicalPort) *ofp.PortStatsEntry {
+	stats := port.OfpPortStats
+	port.OfpPort.GetPortNo()
+	var entry ofp.PortStatsEntry
+	entry.SetPortNo(ofp.Port(port.OfpPort.GetPortNo()))
+	entry.SetRxPackets(stats.GetRxPackets())
+	entry.SetTxPackets(stats.GetTxPackets())
+	entry.SetRxBytes(stats.GetRxBytes())
+	entry.SetTxBytes(stats.GetTxBytes())
+	entry.SetRxDropped(stats.GetRxDropped())
+	entry.SetTxDropped(stats.GetTxDropped())
+	entry.SetRxErrors(stats.GetRxErrors())
+	entry.SetTxErrors(stats.GetTxErrors())
+	entry.SetRxFrameErr(stats.GetRxFrameErr())
+	entry.SetRxOverErr(stats.GetRxOverErr())
+	entry.SetRxCrcErr(stats.GetRxCrcErr())
+	entry.SetCollisions(stats.GetCollisions())
+	entry.SetDurationSec(stats.GetDurationSec())
+	entry.SetDurationNsec(stats.GetDurationNsec())
+	return &entry
+}*/