VOL-1596 Add Support for handling multicast groups in OpenOLT Adapter.
VOL-1595 Add Support for handling multicast flows in OpenOLT Adapter.

Depends voltha-protos from the patch below:
https://gerrit.opencord.org/#/c/16690/

Change-Id: I1cc9900bd6400bb31aed11beda674138838a21d2
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index 3376809..b8af453 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -28,16 +28,16 @@
 	"sync"
 	"time"
 
-	"github.com/opencord/voltha-lib-go/v2/pkg/flows"
-	"github.com/opencord/voltha-lib-go/v2/pkg/log"
-	tp "github.com/opencord/voltha-lib-go/v2/pkg/techprofile"
+	"github.com/opencord/voltha-lib-go/v3/pkg/flows"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	tp "github.com/opencord/voltha-lib-go/v3/pkg/techprofile"
 	rsrcMgr "github.com/opencord/voltha-openolt-adapter/adaptercore/resourcemanager"
-	"github.com/opencord/voltha-protos/v2/go/common"
-	ic "github.com/opencord/voltha-protos/v2/go/inter_container"
-	ofp "github.com/opencord/voltha-protos/v2/go/openflow_13"
-	openoltpb2 "github.com/opencord/voltha-protos/v2/go/openolt"
-	tp_pb "github.com/opencord/voltha-protos/v2/go/tech_profile"
-	"github.com/opencord/voltha-protos/v2/go/voltha"
+	"github.com/opencord/voltha-protos/v3/go/common"
+	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+	openoltpb2 "github.com/opencord/voltha-protos/v3/go/openolt"
+	tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
+	"github.com/opencord/voltha-protos/v3/go/voltha"
 
 	//deepcopy "github.com/getlantern/deepcopy"
 	"github.com/EagleChen/mapmutex"
@@ -57,6 +57,9 @@
 	//DhcpFlow flow category
 	DhcpFlow = "DHCP_FLOW"
 
+	//MulticastFlow flow category
+	MulticastFlow = "MULTICAST_FLOW"
+
 	//IgmpFlow flow category
 	IgmpFlow = "IGMP_FLOW"
 
@@ -90,6 +93,8 @@
 	Upstream = "upstream"
 	//Downstream constant
 	Downstream = "downstream"
+	//Multicast constant
+	Multicast = "multicast"
 	//PacketTagType constant
 	PacketTagType = "pkt_tag_type"
 	//Untagged constant
@@ -103,6 +108,8 @@
 
 	//EthType constant
 	EthType = "eth_type"
+	//EthDst constant
+	EthDst = "eth_dst"
 	//TPID constant
 	TPID = "tpid"
 	//IPProto constant
@@ -128,6 +135,8 @@
 	TunnelID = "tunnel_id"
 	//Output constant
 	Output = "output"
+	//GroupID constant
+	GroupID = "group_id"
 	// Actions
 
 	//PopVlan constant
@@ -152,6 +161,13 @@
 	PortNo = "portNo"
 	//AllocID constant
 	AllocID = "allocId"
+
+	//NoneOnuID constant
+	NoneOnuID = -1
+	//NoneUniID constant
+	NoneUniID = -1
+	//NoneGemPortID constant
+	NoneGemPortID = -1
 )
 
 type gemPortKey struct {
@@ -183,6 +199,11 @@
 	flowMetadata *voltha.FlowMetadata
 }
 
+type queueInfoBrief struct {
+	gemPortID       uint32
+	servicePriority uint32
+}
+
 //OpenOltFlowMgr creates the Structure of OpenOltFlowMgr obj
 type OpenOltFlowMgr struct {
 	techprofile        map[uint32]tp.TechProfileIf
@@ -195,7 +216,8 @@
 	lockCache          sync.RWMutex
 	pendingFlowDelete  sync.Map
 	// The mapmutex.Mutex can be fine tuned to use mapmutex.NewCustomizedMapMutex
-	perUserFlowHandleLock *mapmutex.Mutex
+	perUserFlowHandleLock    *mapmutex.Mutex
+	interfaceToMcastQueueMap map[uint32]*queueInfoBrief /*pon interface -> multicast queue map. Required to assign GEM to a bucket during group population*/
 }
 
 //NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -228,6 +250,9 @@
 	flowMgr.lockCache = sync.RWMutex{}
 	flowMgr.pendingFlowDelete = sync.Map{}
 	flowMgr.perUserFlowHandleLock = mapmutex.NewMapMutex()
+	flowMgr.interfaceToMcastQueueMap = make(map[uint32]*queueInfoBrief)
+	//load interface to multicast queue map from kv store
+	flowMgr.loadInterfaceToMulticastQueueMap()
 	log.Info("Initialization of  flow manager success!!")
 	return &flowMgr
 }
@@ -239,6 +264,9 @@
 	} else if direction == Downstream {
 		log.Debug("downstream flow, not shifting id")
 		return uint64(flowID), nil
+	} else if direction == Multicast {
+		log.Debug("multicast flow, shifting id")
+		return 0x2<<15 | uint64(flowID), nil
 	} else {
 		log.Debug("Unrecognized direction")
 		return 0, fmt.Errorf("unrecognized direction %s", direction)
@@ -427,6 +455,25 @@
 		return err
 	}
 
+	if sq.direction == tp_pb.Direction_DOWNSTREAM {
+		multicastTrafficQueues := f.techprofile[sq.intfID].GetMulticastTrafficQueues(sq.tpInst)
+		if len(multicastTrafficQueues) > 0 {
+			if _, present := f.interfaceToMcastQueueMap[sq.intfID]; !present {
+				//assumed that there is only one queue per PON for the multicast service
+				//the default queue with multicastQueuePerPonPort.Priority per a pon interface is used for multicast service
+				//just put it in interfaceToMcastQueueMap to use for building group members
+				multicastQueuePerPonPort := multicastTrafficQueues[0]
+				f.interfaceToMcastQueueMap[sq.intfID] = &queueInfoBrief{
+					gemPortID:       multicastQueuePerPonPort.GemportId,
+					servicePriority: multicastQueuePerPonPort.Priority,
+				}
+				//also store the queue info in kv store
+				f.resourceMgr.AddMcastQueueForIntf(sq.intfID,
+					multicastQueuePerPonPort.GemportId,
+					multicastQueuePerPonPort.Priority)
+			}
+		}
+	}
 	return nil
 }
 
@@ -998,6 +1045,7 @@
 	classifier.DstPort, _ = classifierInfo[UDPDst].(uint32)
 	classifier.DstIp, _ = classifierInfo[Ipv4Dst].(uint32)
 	classifier.SrcIp, _ = classifierInfo[Ipv4Src].(uint32)
+	classifier.DstMac, _ = classifierInfo[EthDst].([]uint8)
 	if pktTagType, ok := classifierInfo[PacketTagType].(string); ok {
 		classifier.PktTagType = pktTagType
 
@@ -1505,6 +1553,12 @@
 func (f *OpenOltFlowMgr) clearFlowFromResourceManager(flow *ofp.OfpFlowStats, flowDirection string) {
 
 	log.Debugw("clearFlowFromResourceManager", log.Fields{"flowDirection": flowDirection, "flow": *flow})
+
+	if flowDirection == Multicast {
+		f.clearMulticastFlowFromResourceManager(flow)
+		return
+	}
+
 	var updatedFlows []rsrcMgr.FlowInfo
 	var flowID uint32
 	var onuID, uniID int32
@@ -1570,6 +1624,61 @@
 	}
 }
 
+//clearMulticastFlowFromResourceManager  removes a multicast flow from the KV store and
+// clears resources reserved for this multicast flow
+func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(flow *ofp.OfpFlowStats) {
+	classifierInfo := make(map[string]interface{})
+	formulateClassifierInfoFromFlow(classifierInfo, flow)
+	inPort, err := f.getInPortOfMulticastFlow(classifierInfo)
+
+	if err != nil {
+		log.Warnw("No inPort found. Cannot release resources of the multicast flow.", log.Fields{"flowId:": flow.Id})
+		return
+	}
+
+	networkInterfaceID := IntfIDFromNniPortNum(inPort)
+	var onuID = int32(NoneOnuID)
+	var uniID = int32(NoneUniID)
+	var flowID uint32
+	var updatedFlows []rsrcMgr.FlowInfo
+
+	flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(networkInterfaceID, onuID, uniID)
+
+	for _, flowID = range flowIds {
+		flowInfo := f.resourceMgr.GetFlowIDInfo(networkInterfaceID, onuID, uniID, flowID)
+		if flowInfo == nil {
+			log.Debugw("No multicast FlowInfo found in the KV store",
+				log.Fields{"Intf": networkInterfaceID, "onuID": onuID, "uniID": uniID, "flowID": flowID})
+			continue
+		}
+		updatedFlows = nil
+		for _, flow := range *flowInfo {
+			updatedFlows = append(updatedFlows, flow)
+		}
+		for i, storedFlow := range updatedFlows {
+			if flow.Id == storedFlow.LogicalFlowID {
+				removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: storedFlow.Flow.FlowType}
+				log.Debugw("Multicast flow to be deleted", log.Fields{"flow": storedFlow})
+				//remove from device
+				if ok := f.removeFlowFromDevice(&removeFlowMessage); !ok {
+					log.Errorw("Failed to remove multicast flow from device", log.Fields{"flowId": flow.Id})
+					return
+				}
+				log.Debugw("Multicast flow removed from device successfully", log.Fields{"flowId": flow.Id})
+				//Remove the Flow from FlowInfo
+				updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
+				if err := f.updateFlowInfoToKVStore(int32(networkInterfaceID), NoneOnuID, NoneUniID, flowID, &updatedFlows); err != nil {
+					log.Error("Failed to delete multicast flow from the KV store", log.Fields{"flow": storedFlow, "err": err})
+					return
+				}
+				//release flow id
+				log.Debugw("Releasing multicast flow id", log.Fields{"flowId": flowID, "interfaceID": networkInterfaceID})
+				f.resourceMgr.FreeFlowID(uint32(networkInterfaceID), NoneOnuID, NoneUniID, flowID)
+			}
+		}
+	}
+}
+
 //RemoveFlow removes the flow from the device
 func (f *OpenOltFlowMgr) RemoveFlow(flow *ofp.OfpFlowStats) {
 	log.Debugw("Removing Flow", log.Fields{"flow": flow})
@@ -1587,12 +1696,14 @@
 			}
 		}
 	}
-	if IsUpstream(actionInfo[Output].(uint32)) {
+
+	if flows.HasGroup(flow) {
+		direction = Multicast
+	} else if IsUpstream(actionInfo[Output].(uint32)) {
 		direction = Upstream
 	} else {
 		direction = Downstream
 	}
-
 	f.clearFlowFromResourceManager(flow, direction) //TODO: Take care of the limitations
 
 	return
@@ -1650,6 +1761,12 @@
 		return
 	}
 
+	if flows.HasGroup(flow) {
+		// handle multicast flow
+		f.handleFlowWithGroup(actionInfo, classifierInfo, flow)
+		return
+	}
+
 	/* Controller bound trap flows */
 	err = formulateControllerBoundTrapFlowInfo(actionInfo, classifierInfo, flow)
 	if err != nil {
@@ -1723,6 +1840,283 @@
 	}
 }
 
+// handleFlowWithGroup adds multicast flow to the device.
+func (f *OpenOltFlowMgr) handleFlowWithGroup(actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) {
+	classifierInfo[PacketTagType] = DoubleTag
+	log.Debugw("add-multicast-flow", log.Fields{"classifierInfo": classifierInfo, "actionInfo": actionInfo})
+
+	inPort, err := f.getInPortOfMulticastFlow(classifierInfo)
+	if err != nil {
+		log.Warnw("No inPort found. Ignoring multicast flow.", log.Fields{"flowId:": flow.Id})
+		return
+	}
+	//replace ipDst with ethDst
+	if ipv4Dst, ok := classifierInfo[Ipv4Dst]; ok &&
+		flows.IsMulticastIp(ipv4Dst.(uint32)) {
+		// replace ipv4_dst classifier with eth_dst
+		multicastMac := flows.ConvertToMulticastMacBytes(ipv4Dst.(uint32))
+		delete(classifierInfo, Ipv4Dst)
+		delete(classifierInfo, EthType)
+		classifierInfo[EthDst] = multicastMac
+		log.Debugw("multicast-ip-to-mac-conversion-success", log.Fields{"ip:": ipv4Dst.(uint32), "mac:": multicastMac})
+	}
+
+	var onuID = NoneOnuID
+	var uniID = NoneUniID
+	var gemPortID = NoneGemPortID
+
+	networkInterfaceID := IntfIDFromNniPortNum(inPort)
+
+	var flowStoreCookie = getFlowStoreCookie(classifierInfo, uint32(0))
+	if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
+		log.Debugw("multicast-flow-exists--not-re-adding", log.Fields{"classifierInfo": classifierInfo})
+		return
+	}
+	flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
+	if err != nil {
+		log.Errorw("Flow id unavailable for multicast flow", log.Fields{"error": err})
+		return
+	}
+	var classifierProto *openoltpb2.Classifier
+	if classifierProto = makeOpenOltClassifierField(classifierInfo); classifierProto == nil {
+		log.Error("Error in making classifier protobuf for multicast flow")
+		return
+	}
+	groupID := actionInfo[GroupID].(uint32)
+	multicastFlow := openoltpb2.Flow{
+		FlowId:        flowID,
+		FlowType:      Multicast,
+		NetworkIntfId: int32(networkInterfaceID),
+		GroupId:       groupID,
+		Classifier:    classifierProto,
+		Priority:      int32(flow.Priority),
+		Cookie:        flow.Cookie}
+
+	if ok := f.addFlowToDevice(flow, &multicastFlow); ok {
+		log.Debug("multicast flow added to device successfully")
+		//get cached group
+		group, _, err := f.GetFlowGroupFromKVStore(groupID, true)
+		if err == nil {
+			//calling groupAdd to set group members after multicast flow creation
+			if f.ModifyGroup(group) {
+				//cached group can be removed now
+				f.resourceMgr.RemoveFlowGroupFromKVStore(groupID, true)
+			}
+		}
+
+		flowsToKVStore := f.getUpdatedFlowInfo(&multicastFlow, flowStoreCookie, MulticastFlow, flowID, flow.Id)
+		if err := f.updateFlowInfoToKVStore(int32(networkInterfaceID),
+			int32(onuID),
+			int32(uniID),
+			flowID, flowsToKVStore); err != nil {
+			log.Errorw("Error uploading multicast flow into KV store", log.Fields{"flow": multicastFlow, "error": err})
+		}
+	}
+	return
+}
+
+//getInPortOfMulticastFlow return inPort criterion if exists; returns NNI interface of the device otherwise
+func (f *OpenOltFlowMgr) getInPortOfMulticastFlow(classifierInfo map[string]interface{}) (uint32, error) {
+	if _, ok := classifierInfo[InPort]; ok {
+		return classifierInfo[InPort].(uint32), nil
+	}
+	// find first NNI port of the device
+	nniPorts, e := f.resourceMgr.GetNNIFromKVStore()
+	if e == nil && len(nniPorts) > 0 {
+		return nniPorts[0], nil
+	}
+	return 0, errors.New("cannot find NNI port of device")
+}
+
+// AddGroup add or update the group
+func (f *OpenOltFlowMgr) AddGroup(group *ofp.OfpGroupEntry) {
+	log.Infow("add-group", log.Fields{"group": group})
+	if group == nil {
+		log.Warn("skipping nil group")
+		return
+	}
+
+	groupToOlt := openoltpb2.Group{
+		GroupId: group.Desc.GroupId,
+		Command: openoltpb2.Group_SET_MEMBERS,
+		Action:  f.buildGroupAction(),
+	}
+
+	log.Debugw("Sending group to device", log.Fields{"groupToOlt": groupToOlt})
+	_, err := f.deviceHandler.Client.PerformGroupOperation(context.Background(), &groupToOlt)
+	if err != nil {
+		log.Errorw("add-group operation failed", log.Fields{"err": err, "groupToOlt": groupToOlt})
+		return
+	}
+	// group members not created yet. So let's store the group
+	if err := f.resourceMgr.AddFlowGroupToKVStore(group, true); err != nil {
+		log.Errorw("Group cannot be stored in KV store", log.Fields{"groupId": group.Desc.GroupId, "err": err})
+	} else {
+		log.Debugw("add-group operation performed on the device successfully ", log.Fields{"groupToOlt": groupToOlt})
+	}
+}
+
+//buildGroupAction creates and returns a group action
+func (f *OpenOltFlowMgr) buildGroupAction() *openoltpb2.Action {
+	var actionCmd openoltpb2.ActionCmd
+	var action openoltpb2.Action
+	action.Cmd = &actionCmd
+	//pop outer vlan
+	action.Cmd.RemoveOuterTag = true
+	return &action
+}
+
+// ModifyGroup updates the group
+func (f *OpenOltFlowMgr) ModifyGroup(group *ofp.OfpGroupEntry) bool {
+	log.Infow("modify-group", log.Fields{"group": group})
+	if group == nil || group.Desc == nil {
+		log.Warn("cannot modify group; group is nil")
+		return false
+	}
+
+	new := f.buildGroup(group.Desc.GroupId, group.Desc.Buckets)
+	//get existing members of the group
+	val, groupExists, err := f.GetFlowGroupFromKVStore(group.Desc.GroupId, false)
+
+	if err != nil {
+		log.Errorw("Failed to retrieve the group from the store. Cannot modify group.",
+			log.Fields{"groupId": group.Desc.GroupId, "err": err})
+		return false
+	}
+
+	var current *openoltpb2.Group
+	if groupExists {
+		// group already exists
+		current = f.buildGroup(group.Desc.GroupId, val.Desc.GetBuckets())
+		log.Debugw("modify-group: group exists.", log.Fields{"current": val, "new": group})
+	} else {
+		current = f.buildGroup(group.Desc.GroupId, nil)
+	}
+
+	log.Debugw("modify-group: comparing current and new.", log.Fields{"current": current, "new": new})
+	// check if the buckets are identical
+	bucketsIdentical := f.bucketsIdentical(current, new)
+
+	isSuccess := true
+	if !bucketsIdentical {
+		groupToOlt := openoltpb2.Group{
+			GroupId: group.Desc.GroupId,
+			Command: openoltpb2.Group_SET_MEMBERS,
+			Members: new.Members,
+			Action:  f.buildGroupAction(),
+		}
+
+		if err := f.callGroupAdd(&groupToOlt); err != nil {
+			log.Warnw("One of the group add/remove operations has failed. Cannot save group modifications",
+				log.Fields{"group": group})
+			isSuccess = false
+		}
+	}
+
+	if isSuccess {
+		if err := f.resourceMgr.AddFlowGroupToKVStore(group, false); err != nil {
+			log.Errorw("Failed to save the group into kv store", log.Fields{"groupId": group.Desc.GroupId})
+		}
+		log.Debugw("modify-group was success. Storing the group", log.Fields{"group": group, "existingGroup": current})
+	}
+	return isSuccess
+}
+
+//bucketsIdentical returns true if groups are identical; false otherwise
+func (f *OpenOltFlowMgr) bucketsIdentical(current *openoltpb2.Group, new *openoltpb2.Group) bool {
+	if current.GroupId == new.GroupId &&
+		len(new.Members) == len(current.Members) {
+		diff := f.findDiff(current, new)
+		if diff == nil || len(diff) < 1 {
+			log.Infow("modify-group: current and new buckets are the same. Won't send SET_MEMBERS again.",
+				log.Fields{"groupId:": current.GroupId})
+			return true
+		}
+	}
+	return false
+}
+
+//findDiff compares group members and finds members which only exists in groups2
+func (f *OpenOltFlowMgr) findDiff(group1 *openoltpb2.Group, group2 *openoltpb2.Group) []*openoltpb2.GroupMember {
+	var members []*openoltpb2.GroupMember
+	for _, bucket := range group2.Members {
+		if !f.contains(group1.Members, bucket) {
+			// bucket does not exist and must be added
+			members = append(members, bucket)
+		}
+	}
+	return members
+}
+
+//contains returns true if the members list contains the given member; false otherwise
+func (f *OpenOltFlowMgr) contains(members []*openoltpb2.GroupMember, member *openoltpb2.GroupMember) bool {
+	for _, groupMember := range members {
+		if groupMember.InterfaceId == member.InterfaceId {
+			return true
+		}
+	}
+	return false
+}
+
+//callGroupAdd call GroupAdd operation of openolt proto
+func (f *OpenOltFlowMgr) callGroupAdd(group *openoltpb2.Group) error {
+	log.Debugw("Sending group to device", log.Fields{"groupToOlt": group, "command": group.Command})
+	_, err := f.deviceHandler.Client.PerformGroupOperation(context.Background(), group)
+	if err != nil {
+		log.Errorw("group operation failed", log.Fields{"err": err, "groupToOlt": group})
+	}
+	return err
+}
+
+//buildGroup build openoltpb2.Group from given group id and bucket list
+func (f *OpenOltFlowMgr) buildGroup(groupID uint32, buckets []*ofp.OfpBucket) *openoltpb2.Group {
+	group := openoltpb2.Group{
+		GroupId: groupID}
+	// create members of the group
+	if buckets != nil {
+		for _, ofBucket := range buckets {
+			member := f.buildMember(ofBucket)
+			if member != nil && !f.contains(group.Members, member) {
+				group.Members = append(group.Members, member)
+			}
+		}
+	}
+	return &group
+}
+
+//buildMember builds openoltpb2.GroupMember from an OpenFlow bucket
+func (f *OpenOltFlowMgr) buildMember(ofBucket *ofp.OfpBucket) *openoltpb2.GroupMember {
+	var outPort uint32
+	outPortFound := false
+	for _, ofAction := range ofBucket.Actions {
+		if ofAction.Type == ofp.OfpActionType_OFPAT_OUTPUT {
+			outPort = ofAction.GetOutput().Port
+			outPortFound = true
+		}
+	}
+
+	if !outPortFound {
+		log.Debugw("bucket skipped since no out port found in it",
+			log.Fields{"ofBucket": ofBucket})
+		return nil
+	}
+	interfaceID := IntfIDFromUniPortNum(outPort)
+	log.Debugw("got associated interface id of the port", log.Fields{"portNumber:": outPort, "interfaceId:": interfaceID})
+	if groupInfo, ok := f.interfaceToMcastQueueMap[interfaceID]; ok {
+		member := openoltpb2.GroupMember{
+			InterfaceId:   interfaceID,
+			InterfaceType: openoltpb2.GroupMember_PON,
+			GemPortId:     groupInfo.gemPortID,
+			Priority:      groupInfo.servicePriority,
+		}
+		//add member to the group
+		return &member
+	}
+	log.Warnf("bucket skipped since interface-2-gem mapping cannot be found",
+		log.Fields{"ofBucket": ofBucket})
+	return nil
+}
+
 //sendTPDownloadMsgToChild send payload
 func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(intfID uint32, onuID uint32, uniID uint32, uni string, TpID uint32) error {
 
@@ -2228,6 +2622,9 @@
 		if field.Type == flows.ETH_TYPE {
 			classifierInfo[EthType] = field.GetEthType()
 			log.Debug("field-type-eth-type", log.Fields{"classifierInfo[ETH_TYPE]": classifierInfo[EthType].(uint32)})
+		} else if field.Type == flows.ETH_DST {
+			classifierInfo[EthDst] = field.GetEthDst()
+			log.Debug("field-type-eth-type", log.Fields{"classifierInfo[ETH_DST]": classifierInfo[EthDst].([]uint8)})
 		} else if field.Type == flows.IP_PROTO {
 			classifierInfo[IPProto] = field.GetIpProto()
 			log.Debug("field-type-ip-proto", log.Fields{"classifierInfo[IP_PROTO]": classifierInfo[IPProto].(uint32)})
@@ -2297,20 +2694,11 @@
 						return errors.New("invalid openflow class")
 					}
 					/*log.Debugw("action-type-set-field",log.Fields{"field": field, "in_port": classifierInfo[IN_PORT].(uint32)})*/
-					if ofbField := field.GetOfbField(); ofbField != nil {
-						if fieldtype := ofbField.GetType(); fieldtype == ofp.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID {
-							if vlan := ofbField.GetVlanVid(); vlan != 0 {
-								actionInfo[VlanVid] = vlan & 0xfff
-								log.Debugw("action-set-vlan-vid", log.Fields{"actionInfo[VLAN_VID]": actionInfo[VlanVid].(uint32)})
-							} else {
-								log.Error("No Invalid vlan id in set vlan-vid action")
-							}
-						} else {
-							log.Errorw("unsupported-action-set-field-type", log.Fields{"type": fieldtype})
-						}
-					}
+					formulateSetFieldActionInfoFromFlow(field, actionInfo)
 				}
 			}
+		} else if action.Type == flows.GROUP {
+			formulateGroupActionInfoFromFlow(action, actionInfo)
 		} else {
 			log.Errorw("Un supported action type", log.Fields{"type": action.Type})
 			return errors.New("un supported action type")
@@ -2319,6 +2707,30 @@
 	return nil
 }
 
+func formulateSetFieldActionInfoFromFlow(field *ofp.OfpOxmField, actionInfo map[string]interface{}) {
+	if ofbField := field.GetOfbField(); ofbField != nil {
+		if fieldtype := ofbField.GetType(); fieldtype == ofp.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID {
+			if vlan := ofbField.GetVlanVid(); vlan != 0 {
+				actionInfo[VlanVid] = vlan & 0xfff
+				log.Debugw("action-set-vlan-vid", log.Fields{"actionInfo[VLAN_VID]": actionInfo[VlanVid].(uint32)})
+			} else {
+				log.Error("No Invalid vlan id in set vlan-vid action")
+			}
+		} else {
+			log.Errorw("unsupported-action-set-field-type", log.Fields{"type": fieldtype})
+		}
+	}
+}
+
+func formulateGroupActionInfoFromFlow(action *ofp.OfpAction, actionInfo map[string]interface{}) {
+	if action.GetGroup() == nil {
+		log.Warn("No group entry found in the group action")
+	} else {
+		actionInfo[GroupID] = action.GetGroup().GroupId
+		log.Debugw("action-group-id", log.Fields{"actionInfo[GroupID]": actionInfo[GroupID].(uint32)})
+	}
+}
+
 func formulateControllerBoundTrapFlowInfo(actionInfo, classifierInfo map[string]interface{}, flow *ofp.OfpFlowStats) error {
 	if isControllerFlow := IsControllerBoundFlow(actionInfo[Output].(uint32)); isControllerFlow {
 		log.Debug("Controller bound trap flows, getting inport from tunnelid")
@@ -2456,3 +2868,54 @@
 	}
 	return
 }
+
+//loadInterfaceToMulticastQueueMap reads multicast queues per interface from the KV store
+//and put them into interfaceToMcastQueueMap.
+func (f *OpenOltFlowMgr) loadInterfaceToMulticastQueueMap() {
+	storedMulticastQueueMap, err := f.resourceMgr.GetMcastQueuePerInterfaceMap()
+	if err != nil {
+		log.Error("Failed to get pon interface to multicast queue map")
+		return
+	}
+	for intf, queueInfo := range storedMulticastQueueMap {
+		q := queueInfoBrief{
+			gemPortID:       queueInfo[0],
+			servicePriority: queueInfo[1],
+		}
+		f.interfaceToMcastQueueMap[intf] = &q
+	}
+}
+
+//GetFlowGroupFromKVStore fetches and returns flow group from the KV store. Returns (nil, false, error) if any problem occurs during
+//fetching the data. Returns (group, true, nil) if the group is fetched and returned successfully.
+//Returns (nil, false, nil) if the group does not exists in the KV store.
+func (f *OpenOltFlowMgr) GetFlowGroupFromKVStore(groupID uint32, cached bool) (*ofp.OfpGroupEntry, bool, error) {
+	exists, groupInfo, err := f.resourceMgr.GetFlowGroupFromKVStore(groupID, cached)
+	if err != nil {
+		log.Errorw("Failed to get the flow group from KV store", log.Fields{"groupId": groupID, "err": err})
+		return nil, false, errors.New("failed to retrieve the flow group")
+	}
+	if exists {
+		return newGroup(groupInfo.GroupID, groupInfo.OutPorts), exists, nil
+	}
+	return nil, exists, nil
+}
+
+func newGroup(groupID uint32, outPorts []uint32) *ofp.OfpGroupEntry {
+	groupDesc := ofp.OfpGroupDesc{
+		Type:    ofp.OfpGroupType_OFPGT_ALL,
+		GroupId: groupID,
+	}
+	groupEntry := ofp.OfpGroupEntry{
+		Desc: &groupDesc,
+	}
+	var acts []*ofp.OfpAction
+	for i := 0; i < len(outPorts); i++ {
+		acts = append(acts, flows.Output(outPorts[i]))
+	}
+	bucket := ofp.OfpBucket{
+		Actions: acts,
+	}
+	groupDesc.Buckets = []*ofp.OfpBucket{&bucket}
+	return &groupEntry
+}