VOL-1594 - Add Support for handling IGMP flows in OpenOLT Adapter

Change-Id: I56dc90620c557647a1f7fd6e94b534c5ab669ce7
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index e34283a..cb746c2 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -57,6 +57,9 @@
 	//DhcpFlow flow category
 	DhcpFlow = "DHCP_FLOW"
 
+	//IgmpFlow flow category
+	IgmpFlow = "IGMP_FLOW"
+
 	//IPProtoDhcp flow category
 	IPProtoDhcp = 17
 
@@ -67,6 +70,8 @@
 	EapEthType = 0x888e
 	//LldpEthType lldp ethtype value
 	LldpEthType = 0x88cc
+	//IPv4EthType IPv4 ethernet type value
+	IPv4EthType = 0x800
 
 	//IgmpProto proto value
 	IgmpProto = 2
@@ -725,6 +730,7 @@
 		}
 	}
 }
+
 func (f *OpenOltFlowMgr) addDHCPTrapFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{}, action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32) {
 
 	var dhcpFlow openoltpb2.Flow
@@ -802,6 +808,90 @@
 	return
 }
 
+//addIGMPTrapFlow creates IGMP trap-to-host flow
+func (f *OpenOltFlowMgr) addIGMPTrapFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
+	action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32) {
+	f.addUpstreamTrapFlow(intfID, onuID, uniID, portNo, classifier, action, logicalFlow, allocID, gemPortID, IgmpFlow)
+}
+
+//addUpstreamTrapFlow creates a trap-to-host flow
+func (f *OpenOltFlowMgr) addUpstreamTrapFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
+	action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, flowType string) {
+
+	var flow openoltpb2.Flow
+	var actionProto *openoltpb2.Action
+	var classifierProto *openoltpb2.Classifier
+
+	networkIntfID, err := getNniIntfID(classifier, action)
+	if err != nil {
+		log.Error("Failed to get nniIntf ID")
+		return
+	}
+
+	// Clear the action map
+	for k := range action {
+		delete(action, k)
+	}
+
+	action[TrapToHost] = true
+	classifier[PacketTagType] = SingleTag
+	delete(classifier, VlanVid)
+
+	flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
+	if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkIntfID), int32(onuID), int32(uniID), flowStoreCookie); present {
+		log.Debug("Flow-exists--not-re-adding")
+		return
+	}
+
+	flowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, flowType, 0, 0 /*classifier[VLAN_PCP].(uint32)*/)
+
+	if err != nil {
+		log.Errorw("flowId unavailable for upstream trap flow", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie, "flowType": flowType})
+		return
+	}
+
+	log.Debugw("Creating upstream trap flow", log.Fields{"ul_classifier": classifier, "ul_action": action, "uplinkFlowId": flowID, "flowType": flowType})
+
+	if classifierProto = makeOpenOltClassifierField(classifier); classifierProto == nil {
+		log.Error("Error in making classifier protobuf for ul flow")
+		return
+	}
+	log.Debugw("Created classifier proto", log.Fields{"classifier": *classifierProto})
+	if actionProto = makeOpenOltActionField(action); actionProto == nil {
+		log.Error("Error in making action protobuf for ul flow")
+		return
+	}
+
+	flow = openoltpb2.Flow{AccessIntfId: int32(intfID),
+		OnuId:         int32(onuID),
+		UniId:         int32(uniID),
+		FlowId:        flowID,
+		FlowType:      Upstream,
+		AllocId:       int32(allocID),
+		NetworkIntfId: int32(networkIntfID),
+		GemportId:     int32(gemPortID),
+		Classifier:    classifierProto,
+		Action:        actionProto,
+		Priority:      int32(logicalFlow.Priority),
+		Cookie:        logicalFlow.Cookie,
+		PortNo:        portNo}
+
+	if ok := f.addFlowToDevice(logicalFlow, &flow); ok {
+		log.Debugf("%s UL flow added to device successfully", flowType)
+
+		flowsToKVStore := f.getUpdatedFlowInfo(&flow, flowStoreCookie, flowType, flowID, logicalFlow.Id)
+		if err := f.updateFlowInfoToKVStore(flow.AccessIntfId,
+			flow.OnuId,
+			flow.UniId,
+			flow.FlowId, flowsToKVStore); err != nil {
+			log.Errorw("Error uploading UL flow into KV store", log.Fields{"flow": flow, "error": err})
+			return
+		}
+	}
+
+	return
+}
+
 // Add EAPOL flow to  device with mac, vlanId as classifier for upstream and downstream
 func (f *OpenOltFlowMgr) addEAPOLFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, vlanID uint32, classifier map[string]interface{}, action map[string]interface{}) {
 	log.Debugw("Adding EAPOL to device", log.Fields{"intfId": intfID, "onuId": onuID, "portNo": portNo, "allocId": allocID, "gemPortId": gemPortID, "vlanId": vlanID, "flow": logicalFlow})
@@ -1523,6 +1613,22 @@
 	}
 }
 
+//isIgmpTrapDownstreamFlow return true if the flow is a downsteam IGMP trap-to-host flow; false otherwise
+func isIgmpTrapDownstreamFlow(classifierInfo map[string]interface{}) bool {
+	if portType := IntfIDToPortTypeName(classifierInfo[InPort].(uint32)); portType == voltha.Port_ETHERNET_NNI {
+		if ethType, ok := classifierInfo[EthType]; ok {
+			if ethType.(uint32) == IPv4EthType {
+				if ipProto, ok := classifierInfo[IPProto]; ok {
+					if ipProto.(uint32) == IgmpProto {
+						return true
+					}
+				}
+			}
+		}
+	}
+	return false
+}
+
 // AddFlow add flow to device
 func (f *OpenOltFlowMgr) AddFlow(flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) {
 	classifierInfo := make(map[string]interface{})
@@ -1568,6 +1674,11 @@
 			}
 		}
 	}
+	if isIgmpTrapDownstreamFlow(classifierInfo) {
+		log.Debug("trap-igmp-from-nni-flow")
+		f.addIgmpTrapFlowOnNNI(flow, classifierInfo, portNo)
+		return
+	}
 
 	f.deviceHandler.AddUniPortToOnu(intfID, onuID, portNo)
 	f.resourceMgr.AddUniPortToOnuInfo(intfID, onuID, portNo)
@@ -1862,6 +1973,106 @@
 	return
 }
 
+//getPacketTypeFromClassifiers finds and returns packet type of a flow by checking flow classifiers
+func getPacketTypeFromClassifiers(classifierInfo map[string]interface{}) string {
+	var packetType string
+	ovid, ivid := false, false
+	if vlanID, ok := classifierInfo[VlanVid].(uint32); ok {
+		vid := vlanID & VlanvIDMask
+		if vid != ReservedVlan {
+			ovid = true
+		}
+	}
+	if metadata, ok := classifierInfo[Metadata].(uint64); ok {
+		vid := uint32(metadata)
+		if vid != ReservedVlan {
+			ivid = true
+		}
+	}
+	if ovid && ivid {
+		packetType = DoubleTag
+	} else if !ovid && !ivid {
+		packetType = Untagged
+	} else {
+		packetType = SingleTag
+	}
+	return packetType
+}
+
+//addIgmpTrapFlowOnNNI adds a trap-to-host flow on NNI
+func (f *OpenOltFlowMgr) addIgmpTrapFlowOnNNI(logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) {
+	log.Debugw("Adding igmp-trap-of-nni-flow", log.Fields{"classifierInfo": classifier})
+	action := make(map[string]interface{})
+	classifier[PacketTagType] = getPacketTypeFromClassifiers(classifier)
+	action[TrapToHost] = true
+	/* We manage flowId resource pool on per PON port basis.
+	   Since this situation is tricky, as a hack, we pass the NNI port
+	   index (network_intf_id) as PON port Index for the flowId resource
+	   pool. Also, there is no ONU Id available for trapping packets
+	   on NNI port, use onu_id as -1 (invalid)
+	   ****************** CAVEAT *******************
+	   This logic works if the NNI Port Id falls within the same valid
+	   range of PON Port Ids. If this doesn't work for some OLT Vendor
+	   we need to have a re-look at this.
+	   *********************************************
+	*/
+	onuID := -1
+	uniID := -1
+	gemPortID := -1
+	allocID := -1
+	networkInterfaceID, err := getNniIntfID(classifier, action)
+	if err != nil {
+		log.Error("Failed to get nniIntf ID")
+		return
+	}
+	flowStoreCookie := getFlowStoreCookie(classifier, uint32(0))
+	if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
+		log.Debug("igmp-flow-exists--not-re-adding")
+		return
+	}
+	flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0, 0)
+	if err != nil {
+		log.Errorw("IGMP flow id unavailable for trap-on-NNI flow", log.Fields{"error": err})
+		return
+	}
+	var classifierProto *openoltpb2.Classifier
+	var actionProto *openoltpb2.Action
+	if classifierProto = makeOpenOltClassifierField(classifier); classifierProto == nil {
+		log.Error("Error in making classifier protobuf for igmp trap on nni flow")
+		return
+	}
+	log.Debugw("Created classifier proto for the IGMP flow", log.Fields{"classifier": *classifierProto})
+	if actionProto = makeOpenOltActionField(action); actionProto == nil {
+		log.Error("Error in making action protobuf for IGMP trap on nni flow")
+		return
+	}
+	log.Debugw("Created action proto for the IGMP flow", log.Fields{"action": *actionProto})
+	downstreamflow := openoltpb2.Flow{AccessIntfId: int32(-1), // AccessIntfId not required
+		OnuId:         int32(onuID), // OnuId not required
+		UniId:         int32(uniID), // UniId not used
+		FlowId:        flowID,
+		FlowType:      Downstream,
+		AllocId:       int32(allocID), // AllocId not used
+		NetworkIntfId: int32(networkInterfaceID),
+		GemportId:     int32(gemPortID), // GemportId not used
+		Classifier:    classifierProto,
+		Action:        actionProto,
+		Priority:      int32(logicalFlow.Priority),
+		Cookie:        logicalFlow.Cookie,
+		PortNo:        portNo}
+	if ok := f.addFlowToDevice(logicalFlow, &downstreamflow); ok {
+		log.Debug("IGMP Trap on NNI flow added to device successfully")
+		flowsToKVStore := f.getUpdatedFlowInfo(&downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
+		if err := f.updateFlowInfoToKVStore(int32(networkInterfaceID),
+			int32(onuID),
+			int32(uniID),
+			flowID, flowsToKVStore); err != nil {
+			log.Errorw("Error uploading igmp-trap-on-nni flow into KV store", log.Fields{"flow": downstreamflow, "error": err})
+		}
+	}
+	return
+}
+
 func verifyMeterIDAndGetDirection(MeterID uint32, Dir tp_pb.Direction) (string, error) {
 	if MeterID == 0 { // This should never happen
 		log.Error("Invalid meter id")
@@ -1899,8 +2110,16 @@
 			}
 
 		} else if ipProto == IgmpProto {
-			log.Info("igmp flow add ignored, not implemented yet")
-			return
+			log.Infow("Adding Us IGMP flow", log.Fields{"intfID": intfID, "onuID": onuID, "uniID": uniID, "classifierInfo:": classifierInfo})
+			if pcp, ok := classifierInfo[VlanPcp]; ok {
+				gemPort = f.techprofile[intfID].GetGemportIDForPbit(TpInst,
+					tp_pb.Direction_UPSTREAM,
+					pcp.(uint32))
+				f.addIGMPTrapFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID, gemPort)
+			} else {
+				//Adding IGMP upstream flow to all gem ports
+				installFlowOnAllGemports(f.addIGMPTrapFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, IgmpFlow)
+			}
 		} else {
 			log.Errorw("Invalid-Classifier-to-handle", log.Fields{"classifier": classifierInfo, "action": actionInfo})
 			return