VOL-1596 Add Support for handling multicast groups in OpenOLT Adapter.
VOL-1595 Add Support for handling multicast flows in OpenOLT Adapter.

Depends voltha-protos from the patch below:
https://gerrit.opencord.org/#/c/16690/

Change-Id: I1cc9900bd6400bb31aed11beda674138838a21d2
diff --git a/adaptercore/resourcemanager/resourcemanager.go b/adaptercore/resourcemanager/resourcemanager.go
index 21f21c9..0e3da4d 100755
--- a/adaptercore/resourcemanager/resourcemanager.go
+++ b/adaptercore/resourcemanager/resourcemanager.go
@@ -24,12 +24,12 @@
 	"strconv"
 	"strings"
 
-	"github.com/opencord/voltha-lib-go/v2/pkg/db"
-	"github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
-	"github.com/opencord/voltha-lib-go/v2/pkg/log"
-	ponrmgr "github.com/opencord/voltha-lib-go/v2/pkg/ponresourcemanager"
-	ofp "github.com/opencord/voltha-protos/v2/go/openflow_13"
-	"github.com/opencord/voltha-protos/v2/go/openolt"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	ponrmgr "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
+	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
 )
 
 const (
@@ -48,6 +48,20 @@
 	OnuPacketINPath = "onu_packetin/{%d,%d,%d}"
 	//FlowIDsForGem flowids_per_gem/<intfid>
 	FlowIDsForGem = "flowids_per_gem/{%d}"
+	//McastQueuesForIntf multicast queues for pon interfaces
+	McastQueuesForIntf = "mcast_qs_for_int"
+	//FlowGroup flow_groups/<flow_group_id>
+	// A group is stored under this path on the KV store after it has been installed to the device.
+	// It should also be deleted after it has been removed from the device accordingly.
+	FlowGroup = "flow_groups/{%d}"
+	//FlowGroupCached flow_groups_cached/<flow_group_id>
+	// When a group add request received, we create the group without setting any members to it since we cannot
+	// set any members to a group until it is associated with a multicast flow. It is a BAL limitation.
+	// When the related multicast flow has been created we perform set members operation for the group.
+	// That is why we need to keep the members of a group until the multicast flow creation request comes.
+	// We preserve the groups under "FlowGroupsCached" directory in the KV store temporarily. Having set members,
+	// we remove the group from the cached group store.
+	FlowGroupCached = "flow_groups_cached/{%d}"
 )
 
 // FlowInfo holds the flow information
@@ -74,6 +88,12 @@
 	LogicalPort uint32
 }
 
+// GroupInfo holds group information
+type GroupInfo struct {
+	GroupID  uint32
+	OutPorts []uint32
+}
+
 // OpenOltResourceMgr holds resource related information as provided below for each field
 type OpenOltResourceMgr struct {
 	DeviceID    string      // OLT device id
@@ -1269,9 +1289,9 @@
 		log.Error("failed to get data from kv store")
 		return nil, err
 	}
-	if value != nil {
+	if value != nil && value.Value != nil {
 		if val, err = kvstore.ToByte(value.Value); err != nil {
-			log.Error("Failed to convert to byte array", log.Fields{"error": err})
+			log.Error("Failed to convert to byte array ", log.Fields{"error": err})
 			return nil, err
 		}
 		if err = json.Unmarshal(val, &flowsForGem); err != nil {
@@ -1298,3 +1318,136 @@
 	IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
 	RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(IntfOnuIDUniID)
 }
+
+//GetMcastQueuePerInterfaceMap gets multicast queue info per pon interface
+func (RsrcMgr *OpenOltResourceMgr) GetMcastQueuePerInterfaceMap() (map[uint32][]uint32, error) {
+	path := fmt.Sprintf(McastQueuesForIntf)
+	var mcastQueueToIntfMap map[uint32][]uint32
+	var val []byte
+
+	kvPair, err := RsrcMgr.KVStore.Get(path)
+	if err != nil {
+		log.Error("failed to get data from kv store")
+		return nil, err
+	}
+	if kvPair != nil && kvPair.Value != nil {
+		if val, err = kvstore.ToByte(kvPair.Value); err != nil {
+			log.Error("Failed to convert to byte array ", log.Fields{"error": err})
+			return nil, err
+		}
+		if err = json.Unmarshal(val, &mcastQueueToIntfMap); err != nil {
+			log.Error("Failed to unmarshall ", log.Fields{"error": err})
+			return nil, err
+		}
+	}
+	return mcastQueueToIntfMap, nil
+}
+
+//AddMcastQueueForIntf adds multicast queue for pon interface
+func (RsrcMgr *OpenOltResourceMgr) AddMcastQueueForIntf(intf uint32, gem uint32, servicePriority uint32) error {
+	var val []byte
+	path := fmt.Sprintf(McastQueuesForIntf)
+
+	mcastQueues, err := RsrcMgr.GetMcastQueuePerInterfaceMap()
+	if err != nil {
+		log.Errorw("Failed to get multicast queue info for interface", log.Fields{"error": err, "intf": intf})
+		return err
+	}
+	if mcastQueues == nil {
+		mcastQueues = make(map[uint32][]uint32)
+	}
+	mcastQueues[intf] = []uint32{gem, servicePriority}
+	if val, err = json.Marshal(mcastQueues); err != nil {
+		log.Errorw("Failed to marshal data", log.Fields{"error": err})
+		return err
+	}
+	if err = RsrcMgr.KVStore.Put(path, val); err != nil {
+		log.Errorw("Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
+		return err
+	}
+	log.Debugw("added multicast queue info to KV store successfully", log.Fields{"path": path, "mcastQueueInfo": mcastQueues[intf], "interfaceId": intf})
+	return nil
+}
+
+//AddFlowGroupToKVStore adds flow group into KV store
+func (RsrcMgr *OpenOltResourceMgr) AddFlowGroupToKVStore(groupEntry *ofp.OfpGroupEntry, cached bool) error {
+	var Value []byte
+	var err error
+	var path string
+	if cached {
+		path = fmt.Sprintf(FlowGroupCached, groupEntry.Desc.GroupId)
+	} else {
+		path = fmt.Sprintf(FlowGroup, groupEntry.Desc.GroupId)
+	}
+	//build group info object
+	var outPorts []uint32
+	for _, ofBucket := range groupEntry.Desc.Buckets {
+		for _, ofAction := range ofBucket.Actions {
+			if ofAction.Type == ofp.OfpActionType_OFPAT_OUTPUT {
+				outPorts = append(outPorts, ofAction.GetOutput().Port)
+			}
+		}
+	}
+	groupInfo := GroupInfo{
+		GroupID:  groupEntry.Desc.GroupId,
+		OutPorts: outPorts,
+	}
+
+	Value, err = json.Marshal(groupInfo)
+
+	if err != nil {
+		log.Error("failed to Marshal flow group object")
+		return err
+	}
+
+	if err = RsrcMgr.KVStore.Put(path, Value); err != nil {
+		log.Errorf("Failed to update resource %s", path)
+		return err
+	}
+	return nil
+}
+
+//RemoveFlowGroupFromKVStore removes flow group from KV store
+func (RsrcMgr *OpenOltResourceMgr) RemoveFlowGroupFromKVStore(groupID uint32, cached bool) bool {
+	var path string
+	if cached {
+		path = fmt.Sprintf(FlowGroupCached, groupID)
+	} else {
+		path = fmt.Sprintf(FlowGroup, groupID)
+	}
+	if err := RsrcMgr.KVStore.Delete(path); err != nil {
+		log.Errorf("Failed to remove resource %s due to %s", path, err)
+		return false
+	}
+	return true
+}
+
+//GetFlowGroupFromKVStore fetches flow group from the KV store. Returns (false, {} error) if any problem occurs during
+//fetching the data. Returns (true, groupInfo, nil) if the group is fetched successfully.
+// Returns (false, {}, nil) if the group does not exists in the KV store.
+func (RsrcMgr *OpenOltResourceMgr) GetFlowGroupFromKVStore(groupID uint32, cached bool) (bool, GroupInfo, error) {
+	var groupInfo GroupInfo
+	var path string
+	if cached {
+		path = fmt.Sprintf(FlowGroupCached, groupID)
+	} else {
+		path = fmt.Sprintf(FlowGroup, groupID)
+	}
+	kvPair, err := RsrcMgr.KVStore.Get(path)
+	if err != nil {
+		return false, groupInfo, err
+	}
+	if kvPair != nil && kvPair.Value != nil {
+		Val, err := kvstore.ToByte(kvPair.Value)
+		if err != nil {
+			log.Errorw("Failed to convert flow group into byte array", log.Fields{"error": err})
+			return false, groupInfo, err
+		}
+		if err = json.Unmarshal(Val, &groupInfo); err != nil {
+			log.Errorw("Failed to unmarshal", log.Fields{"error": err})
+			return false, groupInfo, err
+		}
+		return true, groupInfo, nil
+	}
+	return false, groupInfo, nil
+}