VOL-1596 Add Support for handling multicast groups in OpenOLT Adapter.
VOL-1595 Add Support for handling multicast flows in OpenOLT Adapter.

Depends voltha-protos from the patch below:
https://gerrit.opencord.org/#/c/16690/

Change-Id: I1cc9900bd6400bb31aed11beda674138838a21d2
diff --git a/adaptercore/resourcemanager/resourcemanager.go b/adaptercore/resourcemanager/resourcemanager.go
index 21f21c9..0e3da4d 100755
--- a/adaptercore/resourcemanager/resourcemanager.go
+++ b/adaptercore/resourcemanager/resourcemanager.go
@@ -24,12 +24,12 @@
 	"strconv"
 	"strings"
 
-	"github.com/opencord/voltha-lib-go/v2/pkg/db"
-	"github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
-	"github.com/opencord/voltha-lib-go/v2/pkg/log"
-	ponrmgr "github.com/opencord/voltha-lib-go/v2/pkg/ponresourcemanager"
-	ofp "github.com/opencord/voltha-protos/v2/go/openflow_13"
-	"github.com/opencord/voltha-protos/v2/go/openolt"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	ponrmgr "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
+	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
 )
 
 const (
@@ -48,6 +48,20 @@
 	OnuPacketINPath = "onu_packetin/{%d,%d,%d}"
 	//FlowIDsForGem flowids_per_gem/<intfid>
 	FlowIDsForGem = "flowids_per_gem/{%d}"
+	//McastQueuesForIntf multicast queues for pon interfaces
+	McastQueuesForIntf = "mcast_qs_for_int"
+	//FlowGroup flow_groups/<flow_group_id>
+	// A group is stored under this path on the KV store after it has been installed to the device.
+	// It should also be deleted after it has been removed from the device accordingly.
+	FlowGroup = "flow_groups/{%d}"
+	//FlowGroupCached flow_groups_cached/<flow_group_id>
+	// When a group add request received, we create the group without setting any members to it since we cannot
+	// set any members to a group until it is associated with a multicast flow. It is a BAL limitation.
+	// When the related multicast flow has been created we perform set members operation for the group.
+	// That is why we need to keep the members of a group until the multicast flow creation request comes.
+	// We preserve the groups under "FlowGroupsCached" directory in the KV store temporarily. Having set members,
+	// we remove the group from the cached group store.
+	FlowGroupCached = "flow_groups_cached/{%d}"
 )
 
 // FlowInfo holds the flow information
@@ -74,6 +88,12 @@
 	LogicalPort uint32
 }
 
+// GroupInfo holds group information
+type GroupInfo struct {
+	GroupID  uint32
+	OutPorts []uint32
+}
+
 // OpenOltResourceMgr holds resource related information as provided below for each field
 type OpenOltResourceMgr struct {
 	DeviceID    string      // OLT device id
@@ -1269,9 +1289,9 @@
 		log.Error("failed to get data from kv store")
 		return nil, err
 	}
-	if value != nil {
+	if value != nil && value.Value != nil {
 		if val, err = kvstore.ToByte(value.Value); err != nil {
-			log.Error("Failed to convert to byte array", log.Fields{"error": err})
+			log.Error("Failed to convert to byte array ", log.Fields{"error": err})
 			return nil, err
 		}
 		if err = json.Unmarshal(val, &flowsForGem); err != nil {
@@ -1298,3 +1318,136 @@
 	IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
 	RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(IntfOnuIDUniID)
 }
+
+//GetMcastQueuePerInterfaceMap gets multicast queue info per pon interface
+func (RsrcMgr *OpenOltResourceMgr) GetMcastQueuePerInterfaceMap() (map[uint32][]uint32, error) {
+	path := fmt.Sprintf(McastQueuesForIntf)
+	var mcastQueueToIntfMap map[uint32][]uint32
+	var val []byte
+
+	kvPair, err := RsrcMgr.KVStore.Get(path)
+	if err != nil {
+		log.Error("failed to get data from kv store")
+		return nil, err
+	}
+	if kvPair != nil && kvPair.Value != nil {
+		if val, err = kvstore.ToByte(kvPair.Value); err != nil {
+			log.Error("Failed to convert to byte array ", log.Fields{"error": err})
+			return nil, err
+		}
+		if err = json.Unmarshal(val, &mcastQueueToIntfMap); err != nil {
+			log.Error("Failed to unmarshall ", log.Fields{"error": err})
+			return nil, err
+		}
+	}
+	return mcastQueueToIntfMap, nil
+}
+
+//AddMcastQueueForIntf adds multicast queue for pon interface
+func (RsrcMgr *OpenOltResourceMgr) AddMcastQueueForIntf(intf uint32, gem uint32, servicePriority uint32) error {
+	var val []byte
+	path := fmt.Sprintf(McastQueuesForIntf)
+
+	mcastQueues, err := RsrcMgr.GetMcastQueuePerInterfaceMap()
+	if err != nil {
+		log.Errorw("Failed to get multicast queue info for interface", log.Fields{"error": err, "intf": intf})
+		return err
+	}
+	if mcastQueues == nil {
+		mcastQueues = make(map[uint32][]uint32)
+	}
+	mcastQueues[intf] = []uint32{gem, servicePriority}
+	if val, err = json.Marshal(mcastQueues); err != nil {
+		log.Errorw("Failed to marshal data", log.Fields{"error": err})
+		return err
+	}
+	if err = RsrcMgr.KVStore.Put(path, val); err != nil {
+		log.Errorw("Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
+		return err
+	}
+	log.Debugw("added multicast queue info to KV store successfully", log.Fields{"path": path, "mcastQueueInfo": mcastQueues[intf], "interfaceId": intf})
+	return nil
+}
+
+//AddFlowGroupToKVStore adds flow group into KV store
+func (RsrcMgr *OpenOltResourceMgr) AddFlowGroupToKVStore(groupEntry *ofp.OfpGroupEntry, cached bool) error {
+	var Value []byte
+	var err error
+	var path string
+	if cached {
+		path = fmt.Sprintf(FlowGroupCached, groupEntry.Desc.GroupId)
+	} else {
+		path = fmt.Sprintf(FlowGroup, groupEntry.Desc.GroupId)
+	}
+	//build group info object
+	var outPorts []uint32
+	for _, ofBucket := range groupEntry.Desc.Buckets {
+		for _, ofAction := range ofBucket.Actions {
+			if ofAction.Type == ofp.OfpActionType_OFPAT_OUTPUT {
+				outPorts = append(outPorts, ofAction.GetOutput().Port)
+			}
+		}
+	}
+	groupInfo := GroupInfo{
+		GroupID:  groupEntry.Desc.GroupId,
+		OutPorts: outPorts,
+	}
+
+	Value, err = json.Marshal(groupInfo)
+
+	if err != nil {
+		log.Error("failed to Marshal flow group object")
+		return err
+	}
+
+	if err = RsrcMgr.KVStore.Put(path, Value); err != nil {
+		log.Errorf("Failed to update resource %s", path)
+		return err
+	}
+	return nil
+}
+
+//RemoveFlowGroupFromKVStore removes flow group from KV store
+func (RsrcMgr *OpenOltResourceMgr) RemoveFlowGroupFromKVStore(groupID uint32, cached bool) bool {
+	var path string
+	if cached {
+		path = fmt.Sprintf(FlowGroupCached, groupID)
+	} else {
+		path = fmt.Sprintf(FlowGroup, groupID)
+	}
+	if err := RsrcMgr.KVStore.Delete(path); err != nil {
+		log.Errorf("Failed to remove resource %s due to %s", path, err)
+		return false
+	}
+	return true
+}
+
+//GetFlowGroupFromKVStore fetches flow group from the KV store. Returns (false, {} error) if any problem occurs during
+//fetching the data. Returns (true, groupInfo, nil) if the group is fetched successfully.
+// Returns (false, {}, nil) if the group does not exists in the KV store.
+func (RsrcMgr *OpenOltResourceMgr) GetFlowGroupFromKVStore(groupID uint32, cached bool) (bool, GroupInfo, error) {
+	var groupInfo GroupInfo
+	var path string
+	if cached {
+		path = fmt.Sprintf(FlowGroupCached, groupID)
+	} else {
+		path = fmt.Sprintf(FlowGroup, groupID)
+	}
+	kvPair, err := RsrcMgr.KVStore.Get(path)
+	if err != nil {
+		return false, groupInfo, err
+	}
+	if kvPair != nil && kvPair.Value != nil {
+		Val, err := kvstore.ToByte(kvPair.Value)
+		if err != nil {
+			log.Errorw("Failed to convert flow group into byte array", log.Fields{"error": err})
+			return false, groupInfo, err
+		}
+		if err = json.Unmarshal(Val, &groupInfo); err != nil {
+			log.Errorw("Failed to unmarshal", log.Fields{"error": err})
+			return false, groupInfo, err
+		}
+		return true, groupInfo, nil
+	}
+	return false, groupInfo, nil
+}
diff --git a/adaptercore/resourcemanager/resourcemanager_test.go b/adaptercore/resourcemanager/resourcemanager_test.go
index 1eac99f..2cc6953 100644
--- a/adaptercore/resourcemanager/resourcemanager_test.go
+++ b/adaptercore/resourcemanager/resourcemanager_test.go
@@ -26,12 +26,13 @@
 import (
 	"encoding/json"
 	"errors"
-	"github.com/opencord/voltha-lib-go/v2/pkg/db"
-	"github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
-	"github.com/opencord/voltha-lib-go/v2/pkg/log"
-	ponrmgr "github.com/opencord/voltha-lib-go/v2/pkg/ponresourcemanager"
-	ofp "github.com/opencord/voltha-protos/v2/go/openflow_13"
-	"github.com/opencord/voltha-protos/v2/go/openolt"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
+	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	ponrmgr "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
+	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+	"github.com/opencord/voltha-protos/v3/go/openolt"
 	"reflect"
 	"strconv"
 	"strings"
@@ -161,6 +162,19 @@
 			str, _ := json.Marshal(1)
 			return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
 		}
+		if strings.Contains(key, McastQueuesForIntf) {
+			log.Debug("Error Error Error Key:", McastQueuesForIntf)
+			mcastQueues := make(map[uint32][]uint32)
+			mcastQueues[10] = []uint32{4000, 0}
+			str, _ := json.Marshal(mcastQueues)
+			return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
+		}
+		if strings.Contains(key, "flow_groups") && !strings.Contains(key, "1000") {
+			groupInfo := GroupInfo{GroupID: 2, OutPorts: []uint32{2}}
+			str, _ := json.Marshal(groupInfo)
+			return kvstore.NewKVPair(key, str, "mock", 3000, 1), nil
+		}
+
 		maps := make(map[string]*kvstore.KVPair)
 		maps[key] = &kvstore.KVPair{Key: key}
 		return maps[key], nil
@@ -971,3 +985,141 @@
 		})
 	}
 }
+
+func TestOpenOltResourceMgr_AddMcastQueueForIntf(t *testing.T) {
+	type args struct {
+		intf            uint32
+		gem             uint32
+		servicePriority uint32
+	}
+	tests := []struct {
+		name   string
+		args   args
+		fields *fields
+	}{
+		{"AddMcastQueueForIntf-1", args{0, 4000, 0}, getResMgr()},
+		{"AddMcastQueueForIntf-2", args{1, 4000, 1}, getResMgr()},
+		{"AddMcastQueueForIntf-3", args{2, 4000, 2}, getResMgr()},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			RsrcMgr := testResMgrObject(tt.fields)
+			err := RsrcMgr.AddMcastQueueForIntf(tt.args.intf, tt.args.gem, tt.args.servicePriority)
+			if err != nil {
+				t.Errorf("%s got err= %s wants nil", tt.name, err)
+				return
+			}
+		})
+	}
+}
+
+func newGroup(groupID uint32, outPorts []uint32) *ofp.OfpGroupEntry {
+	groupDesc := ofp.OfpGroupDesc{
+		Type:    ofp.OfpGroupType_OFPGT_ALL,
+		GroupId: groupID,
+	}
+	groupEntry := ofp.OfpGroupEntry{
+		Desc: &groupDesc,
+	}
+	var acts []*ofp.OfpAction
+	for i := 0; i < len(outPorts); i++ {
+		acts = append(acts, fu.Output(outPorts[i]))
+	}
+	bucket := ofp.OfpBucket{
+		Actions: acts,
+	}
+	groupDesc.Buckets = []*ofp.OfpBucket{&bucket}
+	return &groupEntry
+}
+
+func TestOpenOltResourceMgr_AddFlowGroupToKVStore(t *testing.T) {
+	type args struct {
+		group  *ofp.OfpGroupEntry
+		cached bool
+	}
+	//create group 1
+	group1 := newGroup(1, []uint32{1})
+	//create group 2
+	group2 := newGroup(2, []uint32{2})
+	//define test set
+	tests := []struct {
+		name   string
+		args   args
+		fields *fields
+	}{
+		{"AddFlowGroupToKVStore-1", args{group1, true}, getResMgr()},
+		{"AddFlowGroupToKVStore-2", args{group2, false}, getResMgr()},
+	}
+	//execute tests
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			RsrcMgr := testResMgrObject(tt.fields)
+			err := RsrcMgr.AddFlowGroupToKVStore(tt.args.group, tt.args.cached)
+			if err != nil {
+				t.Errorf("%s got err= %s wants nil", tt.name, err)
+				return
+			}
+		})
+	}
+}
+
+func TestOpenOltResourceMgr_RemoveFlowGroupFromKVStore(t *testing.T) {
+	type args struct {
+		groupID uint32
+		cached  bool
+	}
+	//define test set
+	tests := []struct {
+		name   string
+		args   args
+		fields *fields
+	}{
+		{"RemoveFlowGroupFromKVStore-1", args{1, true}, getResMgr()},
+		{"RemoveFlowGroupFromKVStore-2", args{2, false}, getResMgr()},
+	}
+	//execute tests
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			RsrcMgr := testResMgrObject(tt.fields)
+			success := RsrcMgr.RemoveFlowGroupFromKVStore(tt.args.groupID, tt.args.cached)
+			if !success {
+				t.Errorf("%s got false but wants true", tt.name)
+				return
+			}
+		})
+	}
+}
+
+func TestOpenOltResourceMgr_GetFlowGroupFromKVStore(t *testing.T) {
+	type args struct {
+		groupID uint32
+		cached  bool
+	}
+	//define test set
+	tests := []struct {
+		name   string
+		args   args
+		fields *fields
+	}{
+		{"GetFlowGroupFromKVStore-1", args{1, true}, getResMgr()},
+		{"GetFlowGroupFromKVStore-2", args{2, false}, getResMgr()},
+		{"GetFlowGroupFromKVStore-3", args{1000, false}, getResMgr()},
+	}
+	//execute tests
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			RsrcMgr := testResMgrObject(tt.fields)
+			exists, groupInfo, err := RsrcMgr.GetFlowGroupFromKVStore(tt.args.groupID, tt.args.cached)
+			if err != nil {
+				t.Errorf("%s got error but wants nil error", tt.name)
+				return
+			} else if exists && (groupInfo.GroupID == 0) {
+				t.Errorf("%s got true and nil group info but expected not nil group info", tt.name)
+				return
+			} else if tt.args.groupID == 3 && exists {
+				t.Errorf("%s got true but wants false", tt.name)
+				return
+			}
+		})
+	}
+}