Code changes for handling HSIA flows
Changes for techprofile-download-req msg to onu adapter and bug fixes

Change-Id: I8b235dd03e3c5db7b134fc16d59b3cb67993b52f
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index 18efb33..6bcc3ee 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -22,12 +22,13 @@
 	"encoding/json"
 	"errors"
 	"fmt"
-	"math/big"
-	rsrcMgr "github.com/opencord/voltha-openolt-adapter/adaptercore/resourcemanager"
 	"github.com/opencord/voltha-go/common/log"
 	tp "github.com/opencord/voltha-go/common/techprofile"
 	fd "github.com/opencord/voltha-go/rw_core/flow_decomposition"
+	rsrcMgr "github.com/opencord/voltha-openolt-adapter/adaptercore/resourcemanager"
 	ofp "github.com/opencord/voltha-protos/go/openflow_13"
+	"math/big"
+	//	ic "github.com/opencord/voltha-protos/go/inter_container"
 	openolt_pb2 "github.com/opencord/voltha-protos/go/openolt"
 	voltha "github.com/opencord/voltha-protos/go/voltha"
 )
@@ -255,21 +256,93 @@
 	return nil
 }
 
-func (f *OpenOltFlowMgr) addDownstreamDataFlow(intfid uint32, onuid uint32, uniid uint32, portno uint32, classifier map[string]interface{}, action map[string]interface{}, logicalflow *ofp.OfpFlowStats, allocid uint32, gemportid uint32) {
-
-	log.Info("Unimplemented")
-
-	return
-
+func (f *OpenOltFlowMgr) addUpstreamDataFlow(intfId uint32, onuId uint32, uniId uint32,
+	portNo uint32, uplinkClassifier map[string]interface{},
+	uplinkAction map[string]interface{}, logicalFlow *ofp.OfpFlowStats,
+	allocId uint32, gemportId uint32) {
+	uplinkClassifier[PACKET_TAG_TYPE] = SINGLE_TAG
+	log.Debugw("Adding upstream data flow", log.Fields{"uplinkClassifier": uplinkClassifier, "uplinkAction": uplinkAction})
+	f.addHSIAFlow(intfId, onuId, uniId, portNo, uplinkClassifier, uplinkAction,
+		UPSTREAM, logicalFlow, allocId, gemportId)
+	/* TODO: Install Secondary EAP on the subscriber vlan */
 }
 
-func (f *OpenOltFlowMgr) addUpstreamDataFlow(intfid uint32, onuid uint32, uniid uint32, portno uint32, classifier map[string]interface{}, action map[string]interface{}, logicalflow *ofp.OfpFlowStats, allocid uint32, gemportid uint32) {
-
-	log.Info("Unimplemented")
-	return
-
+func (f *OpenOltFlowMgr) addDownstreamDataFlow(intfId uint32, onuId uint32, uniId uint32,
+	portNo uint32, downlinkClassifier map[string]interface{},
+	downlinkAction map[string]interface{}, logicalFlow *ofp.OfpFlowStats,
+	allocId uint32, gemportId uint32) {
+	downlinkClassifier[PACKET_TAG_TYPE] = DOUBLE_TAG
+	log.Debugw("Adding downstream data flow", log.Fields{"downlinkClassifier": downlinkClassifier,
+		"downlinkAction": downlinkAction})
+	if uint32(downlinkClassifier[METADATA].(uint64)) == MkUniPortNum(intfId, onuId, uniId) &&
+		downlinkClassifier[VLAN_VID] == (uint32(ofp.OfpVlanId_OFPVID_PRESENT)|4000) {
+		log.Infow("EAPOL DL flow , Already added ,ignoring it", log.Fields{"downlinkClassifier": downlinkClassifier,
+			"downlinkAction": downlinkAction})
+		return
+	}
+	/* Already this info available classifier? */
+	downlinkAction[POP_VLAN] = true
+	downlinkAction[VLAN_VID] = downlinkClassifier[VLAN_VID]
+	f.addHSIAFlow(intfId, onuId, uniId, portNo, downlinkClassifier, downlinkAction,
+		DOWNSTREAM, logicalFlow, allocId, gemportId)
 }
 
+func (f *OpenOltFlowMgr) addHSIAFlow(intfId uint32, onuId uint32, uniId uint32, portNo uint32, classifier map[string]interface{},
+	action map[string]interface{}, direction string, logicalFlow *ofp.OfpFlowStats,
+	allocId uint32, gemPortId uint32) {
+	/* One of the OLT platform (Broadcom BAL) requires that symmetric
+	   flows require the same flow_id to be used across UL and DL.
+	   Since HSIA flow is the only symmetric flow currently, we need to
+	   re-use the flow_id across both direction. The 'flow_category'
+	   takes priority over flow_cookie to find any available HSIA_FLOW
+	   id for the ONU.
+	*/
+	log.Debugw("Adding HSIA flow", log.Fields{"intfId": intfId, "onuId": onuId, "uniId": uniId, "classifier": classifier,
+		"action": action, "direction": direction, "allocId": allocId, "gemPortId": gemPortId,
+		"logicalFlow": *logicalFlow})
+	flowStoreCookie := getFlowStoreCookie(classifier, gemPortId)
+	flowId, err := f.resourceMgr.GetFlowID(intfId, onuId, uniId, flowStoreCookie, "HSIA")
+	if err != nil {
+		log.Errorw("Flow id unavailable for HSIA flow", log.Fields{"direction": direction})
+		return
+	}
+	var classifierProto *openolt_pb2.Classifier
+	var actionProto *openolt_pb2.Action
+	if classifierProto = makeOpenOltClassifierField(classifier); classifierProto == nil {
+		log.Error("Error in making classifier protobuf for hsia flow")
+		return
+	}
+	log.Debugw("Created classifier proto", log.Fields{"classifier": *classifierProto})
+	if actionProto = makeOpenOltActionField(action); actionProto == nil {
+		log.Errorw("Error in making action protobuf for hsia flow", log.Fields{"direction": direction})
+		return
+	}
+	log.Debugw("Created action proto", log.Fields{"action": *actionProto})
+	flow := openolt_pb2.Flow{AccessIntfId: int32(intfId),
+		OnuId:         int32(onuId),
+		UniId:         int32(uniId),
+		FlowId:        flowId,
+		FlowType:      direction,
+		AllocId:       int32(allocId),
+		NetworkIntfId: DEFAULT_NETWORK_INTERFACE_ID, // one NNI port is supported now
+		GemportId:     int32(gemPortId),
+		Classifier:    classifierProto,
+		Action:        actionProto,
+		Priority:      int32(logicalFlow.Priority),
+		Cookie:        logicalFlow.Cookie,
+		PortNo:        portNo}
+	if ok := f.addFlowToDevice(&flow); ok {
+		log.Debug("HSIA flow added to device successfully", log.Fields{"direction": direction})
+		flowsToKVStore := f.getUpdatedFlowInfo(&flow, flowStoreCookie, "HSIA")
+		if err := f.updateFlowInfoToKVStore(flow.AccessIntfId,
+			flow.OnuId,
+			flow.UniId,
+			flow.FlowId, flowsToKVStore); err != nil {
+			log.Errorw("Error uploading HSIA  flow into KV store", log.Fields{"flow": flow, "direction": direction, "error": err})
+			return
+		}
+	}
+}
 func (f *OpenOltFlowMgr) addDHCPTrapFlow(intfId uint32, onuId uint32, uniId uint32, portNo uint32, classifier map[string]interface{}, action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32) {
 	return
 }
@@ -295,7 +368,8 @@
 	//Add Uplink EAPOL Flow
 	uplinkFlowId, err := f.resourceMgr.GetFlowID(intfId, onuId, uniId, flowStoreCookie, "")
 	if err != nil {
-		log.Errorw("Error allocating flowID for UL EAPOL", log.Fields{"intfId": intfId, "onuId": onuId, "flowStoreCookie": flowStoreCookie})
+		log.Errorw("flowId unavailable for UL EAPOL", log.Fields{"intfId": intfId, "onuId": onuId, "flowStoreCookie": flowStoreCookie})
+		return
 	}
 	var classifierProto *openolt_pb2.Classifier
 	var actionProto *openolt_pb2.Action
@@ -363,7 +437,7 @@
 		flowStoreCookie := getFlowStoreCookie(downlinkClassifier, gemPortId)
 		downlinkFlowId, err := f.resourceMgr.GetFlowID(intfId, onuId, uniId, flowStoreCookie, "")
 		if err != nil {
-			log.Errorw("Error allocating flowID for DL EAPOL",
+			log.Errorw("flowId unavailable for DL EAPOL",
 				log.Fields{"intfId": intfId, "onuId": onuId, "flowStoreCookie": flowStoreCookie})
 			return
 		}
@@ -420,8 +494,8 @@
 	if vlanId, ok := classifierInfo[VLAN_VID]; ok {
 		classifier.OVid = vlanId.(uint32)
 	}
-	if metadata, ok := classifierInfo[METADATA]; ok {
-		classifier.IVid = metadata.(uint32)
+	if metadata, ok := classifierInfo[METADATA]; ok { // TODO: Revisit
+		classifier.IVid = uint32(metadata.(uint64))
 	}
 	if vlanPcp, ok := classifierInfo[VLAN_PCP]; ok {
 		classifier.OPbits = vlanPcp.(uint32)
@@ -577,14 +651,17 @@
 	return fmt.Sprintf("pon-{%d}/onu-{%d}/uni-{%d}", intfId, onuId, uniId)
 }
 
-func (f *OpenOltFlowMgr) getOnuChildDevice(intfId uint32, onuId uint32) *voltha.Device {
-	parentPortNo := IntfIdToPortNo(intfId, voltha.Port_PON_OLT)
-	ChildDevice := f.deviceHandler.GetChildDevice(parentPortNo, onuId)
-	if ChildDevice == nil {
-		log.Errorw("could-not-find-child-device", log.Fields{"parent_port_no": parentPortNo, "onuId": onuId})
-		return nil
+func (f *OpenOltFlowMgr) getOnuChildDevice(intfId uint32, onuId uint32) (*voltha.Device, error) {
+	log.Debugw("GetChildDevice", log.Fields{"pon port": intfId, "onuId": onuId})
+	//parentPortNo := IntfIdToPortNo(intfId, voltha.Port_PON_OLT)
+	parentPortNo := intfId
+	onuDevice := f.deviceHandler.GetChildDevice(parentPortNo, onuId)
+	if onuDevice == nil {
+		log.Errorw("onu not found", log.Fields{"intfId": parentPortNo, "onuId": onuId})
+		return nil, errors.New("onu not found")
 	}
-	return ChildDevice
+	log.Debugw("Successfully received child device from core", log.Fields{"child_device": *onuDevice})
+	return onuDevice, nil
 }
 
 func findNextFlow(flow *ofp.OfpFlowStats) *ofp.OfpFlowStats {
@@ -652,11 +729,11 @@
 				return
 			}
 		} else if action.Type == fd.POP_VLAN {
-			if gotoTable := fd.GetGotoTableId(flow); gotoTable == 0 {
+			/*if gotoTable := fd.GetGotoTableId(flow); gotoTable == 0 {
 				log.Infow("action-type-pop-vlan, being taken care of by ONU", log.Fields{"flow": flow})
 				actionInfo[POP_VLAN] = false
 				return
-			}
+			}*/
 			actionInfo[POP_VLAN] = true
 			log.Debugw("action-type-pop-vlan", log.Fields{"in_port": classifierInfo[IN_PORT].(uint32)})
 		} else if action.Type == fd.PUSH_VLAN {
@@ -697,14 +774,14 @@
 			return
 		}
 	}
-	if gotoTable := fd.GetGotoTableId(flow); gotoTable != 0 {
+	/*if gotoTable := fd.GetGotoTableId(flow); gotoTable != 0 {
 		if actionInfo[POP_VLAN] == nil {
 			log.Infow("Go to table - action-type-pop-vlan, being taken care of by ONU", log.Fields{"flow": flow})
 			return
 		}
-	}
-	/* NOTE: This condition will be true when core decompose and provides flows to respective devices separately */
-	if _, ok := actionInfo[OUTPUT]; ok == false {
+	}*/
+	/* NOTE: This condition will be false when core decompose and provides flows to respective devices separately */
+	/*	if _,ok := actionInfo[OUTPUT]; ok == false{
 		log.Debug("action-go-to-table, get next flow for outport details")
 		if _, ok := classifierInfo[METADATA]; ok {
 			if next_flow := findNextFlow(flow); next_flow == nil {
@@ -721,28 +798,40 @@
 				}
 			}
 		}
-	}
+	}*/
 	log.Infow("Flow ports", log.Fields{"classifierInfo_inport": classifierInfo[IN_PORT], "action_output": actionInfo[OUTPUT]})
 	portNo, intfId, onuId, uniId := ExtractAccessFromFlow(classifierInfo[IN_PORT].(uint32), actionInfo[OUTPUT].(uint32))
 	f.divideAndAddFlow(intfId, onuId, uniId, portNo, classifierInfo, actionInfo, flow)
 }
 
-func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(intfId uint32, onuId uint32, uniId uint32, uni string) bool {
+func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(intfId uint32, onuId uint32, uniId uint32, uni string) error {
 
-	onuDevice := f.getOnuChildDevice(intfId, onuId)
-	if onuDevice == nil {
-		log.Error("Error while fetching Child device from core", log.Fields{"onuId": onuId})
-		return false
+	onuDevice, err := f.getOnuChildDevice(intfId, onuId)
+	if err != nil {
+		log.Errorw("Error while fetching Child device from core", log.Fields{"onuId": onuId})
+		return err
 	}
-	log.Debugw("Retrived child device from OLT device handler", log.Fields{"device": *onuDevice})
-	tpPath := f.getTPpath(intfId, uni)
-	log.Infow("Load-tech-profile-request-to-brcm-handler", log.Fields{"path": tpPath})
-	/* TODO : "Send Event message to ONU device via proxy to download tech profile in go routine
-	            msg = {'proxy_address': onuDevice.proxyAddress, 'uniId': uniId,
-			  'event': 'download_tech_profile', 'event_data': tp_path}
-		    //Send the event message to the ONU adapter
-	            self.adapter_agent.publish_inter_adapter_message(onu_device.id,msg)
-	*/
-	return true
-
+	log.Debugw("Got child device from OLT device handler", log.Fields{"device": *onuDevice})
+	/* TODO: uncomment once voltha-proto is ready with changes */
+	/*
+	        tpPath := f.getTPpath(intfId, uni)
+	        tpDownloadMsg := &ic.TechProfileDownload{UniId: uniId, Path: tpPath}
+	        var tpDownloadMsg interface{}
+	        log.Infow("Sending Load-tech-profile-request-to-brcm-onu-adapter",log.Fields{"msg": *tpDownloadMsg})
+	        sendErr := f.deviceHandler.AdapterProxy.SendInterAdapterMessage(context.Background(),
+	                                                           tpDownloadMsg,
+	                                                           //ic.InterAdapterMessageType_TECH_PROFILE_DOWNLOAD_REQUEST,
+	                                                           ic.InterAdapterMessageType_OMCI_REQUEST,
+	                                                           f.deviceHandler.deviceType,
+	                                                           onuDevice.Type,
+			                                           onuDevice.Id,
+	                                                           onuDevice.ProxyAddress.DeviceId, "")
+	        if sendErr != nil {
+	            log.Errorw("send techprofile-download request error", log.Fields{"fromAdapter": f.deviceHandler.deviceType,
+	                                          "toAdapter": onuDevice.Type, "onuId": onuDevice.Id,
+	                                          "proxyDeviceId": onuDevice.ProxyAddress.DeviceId})
+	            return sendErr
+	       }
+	       log.Debugw("success Sending Load-tech-profile-request-to-brcm-onu-adapter",log.Fields{"msg":tpDownloadMsg})*/
+	return nil
 }