Code changes for handling HSIA flows
Changes for techprofile-download-req msg to onu adapter and bug fixes
Change-Id: I8b235dd03e3c5db7b134fc16d59b3cb67993b52f
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index 18efb33..6bcc3ee 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -22,12 +22,13 @@
"encoding/json"
"errors"
"fmt"
- "math/big"
- rsrcMgr "github.com/opencord/voltha-openolt-adapter/adaptercore/resourcemanager"
"github.com/opencord/voltha-go/common/log"
tp "github.com/opencord/voltha-go/common/techprofile"
fd "github.com/opencord/voltha-go/rw_core/flow_decomposition"
+ rsrcMgr "github.com/opencord/voltha-openolt-adapter/adaptercore/resourcemanager"
ofp "github.com/opencord/voltha-protos/go/openflow_13"
+ "math/big"
+ // ic "github.com/opencord/voltha-protos/go/inter_container"
openolt_pb2 "github.com/opencord/voltha-protos/go/openolt"
voltha "github.com/opencord/voltha-protos/go/voltha"
)
@@ -255,21 +256,93 @@
return nil
}
-func (f *OpenOltFlowMgr) addDownstreamDataFlow(intfid uint32, onuid uint32, uniid uint32, portno uint32, classifier map[string]interface{}, action map[string]interface{}, logicalflow *ofp.OfpFlowStats, allocid uint32, gemportid uint32) {
-
- log.Info("Unimplemented")
-
- return
-
+func (f *OpenOltFlowMgr) addUpstreamDataFlow(intfId uint32, onuId uint32, uniId uint32,
+ portNo uint32, uplinkClassifier map[string]interface{},
+ uplinkAction map[string]interface{}, logicalFlow *ofp.OfpFlowStats,
+ allocId uint32, gemportId uint32) {
+ uplinkClassifier[PACKET_TAG_TYPE] = SINGLE_TAG
+ log.Debugw("Adding upstream data flow", log.Fields{"uplinkClassifier": uplinkClassifier, "uplinkAction": uplinkAction})
+ f.addHSIAFlow(intfId, onuId, uniId, portNo, uplinkClassifier, uplinkAction,
+ UPSTREAM, logicalFlow, allocId, gemportId)
+ /* TODO: Install Secondary EAP on the subscriber vlan */
}
-func (f *OpenOltFlowMgr) addUpstreamDataFlow(intfid uint32, onuid uint32, uniid uint32, portno uint32, classifier map[string]interface{}, action map[string]interface{}, logicalflow *ofp.OfpFlowStats, allocid uint32, gemportid uint32) {
-
- log.Info("Unimplemented")
- return
-
+func (f *OpenOltFlowMgr) addDownstreamDataFlow(intfId uint32, onuId uint32, uniId uint32,
+ portNo uint32, downlinkClassifier map[string]interface{},
+ downlinkAction map[string]interface{}, logicalFlow *ofp.OfpFlowStats,
+ allocId uint32, gemportId uint32) {
+ downlinkClassifier[PACKET_TAG_TYPE] = DOUBLE_TAG
+ log.Debugw("Adding downstream data flow", log.Fields{"downlinkClassifier": downlinkClassifier,
+ "downlinkAction": downlinkAction})
+ if uint32(downlinkClassifier[METADATA].(uint64)) == MkUniPortNum(intfId, onuId, uniId) &&
+ downlinkClassifier[VLAN_VID] == (uint32(ofp.OfpVlanId_OFPVID_PRESENT)|4000) {
+ log.Infow("EAPOL DL flow , Already added ,ignoring it", log.Fields{"downlinkClassifier": downlinkClassifier,
+ "downlinkAction": downlinkAction})
+ return
+ }
+ /* Already this info available classifier? */
+ downlinkAction[POP_VLAN] = true
+ downlinkAction[VLAN_VID] = downlinkClassifier[VLAN_VID]
+ f.addHSIAFlow(intfId, onuId, uniId, portNo, downlinkClassifier, downlinkAction,
+ DOWNSTREAM, logicalFlow, allocId, gemportId)
}
+func (f *OpenOltFlowMgr) addHSIAFlow(intfId uint32, onuId uint32, uniId uint32, portNo uint32, classifier map[string]interface{},
+ action map[string]interface{}, direction string, logicalFlow *ofp.OfpFlowStats,
+ allocId uint32, gemPortId uint32) {
+ /* One of the OLT platform (Broadcom BAL) requires that symmetric
+ flows require the same flow_id to be used across UL and DL.
+ Since HSIA flow is the only symmetric flow currently, we need to
+ re-use the flow_id across both direction. The 'flow_category'
+ takes priority over flow_cookie to find any available HSIA_FLOW
+ id for the ONU.
+ */
+ log.Debugw("Adding HSIA flow", log.Fields{"intfId": intfId, "onuId": onuId, "uniId": uniId, "classifier": classifier,
+ "action": action, "direction": direction, "allocId": allocId, "gemPortId": gemPortId,
+ "logicalFlow": *logicalFlow})
+ flowStoreCookie := getFlowStoreCookie(classifier, gemPortId)
+ flowId, err := f.resourceMgr.GetFlowID(intfId, onuId, uniId, flowStoreCookie, "HSIA")
+ if err != nil {
+ log.Errorw("Flow id unavailable for HSIA flow", log.Fields{"direction": direction})
+ return
+ }
+ var classifierProto *openolt_pb2.Classifier
+ var actionProto *openolt_pb2.Action
+ if classifierProto = makeOpenOltClassifierField(classifier); classifierProto == nil {
+ log.Error("Error in making classifier protobuf for hsia flow")
+ return
+ }
+ log.Debugw("Created classifier proto", log.Fields{"classifier": *classifierProto})
+ if actionProto = makeOpenOltActionField(action); actionProto == nil {
+ log.Errorw("Error in making action protobuf for hsia flow", log.Fields{"direction": direction})
+ return
+ }
+ log.Debugw("Created action proto", log.Fields{"action": *actionProto})
+ flow := openolt_pb2.Flow{AccessIntfId: int32(intfId),
+ OnuId: int32(onuId),
+ UniId: int32(uniId),
+ FlowId: flowId,
+ FlowType: direction,
+ AllocId: int32(allocId),
+ NetworkIntfId: DEFAULT_NETWORK_INTERFACE_ID, // one NNI port is supported now
+ GemportId: int32(gemPortId),
+ Classifier: classifierProto,
+ Action: actionProto,
+ Priority: int32(logicalFlow.Priority),
+ Cookie: logicalFlow.Cookie,
+ PortNo: portNo}
+ if ok := f.addFlowToDevice(&flow); ok {
+ log.Debug("HSIA flow added to device successfully", log.Fields{"direction": direction})
+ flowsToKVStore := f.getUpdatedFlowInfo(&flow, flowStoreCookie, "HSIA")
+ if err := f.updateFlowInfoToKVStore(flow.AccessIntfId,
+ flow.OnuId,
+ flow.UniId,
+ flow.FlowId, flowsToKVStore); err != nil {
+ log.Errorw("Error uploading HSIA flow into KV store", log.Fields{"flow": flow, "direction": direction, "error": err})
+ return
+ }
+ }
+}
func (f *OpenOltFlowMgr) addDHCPTrapFlow(intfId uint32, onuId uint32, uniId uint32, portNo uint32, classifier map[string]interface{}, action map[string]interface{}, logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32) {
return
}
@@ -295,7 +368,8 @@
//Add Uplink EAPOL Flow
uplinkFlowId, err := f.resourceMgr.GetFlowID(intfId, onuId, uniId, flowStoreCookie, "")
if err != nil {
- log.Errorw("Error allocating flowID for UL EAPOL", log.Fields{"intfId": intfId, "onuId": onuId, "flowStoreCookie": flowStoreCookie})
+ log.Errorw("flowId unavailable for UL EAPOL", log.Fields{"intfId": intfId, "onuId": onuId, "flowStoreCookie": flowStoreCookie})
+ return
}
var classifierProto *openolt_pb2.Classifier
var actionProto *openolt_pb2.Action
@@ -363,7 +437,7 @@
flowStoreCookie := getFlowStoreCookie(downlinkClassifier, gemPortId)
downlinkFlowId, err := f.resourceMgr.GetFlowID(intfId, onuId, uniId, flowStoreCookie, "")
if err != nil {
- log.Errorw("Error allocating flowID for DL EAPOL",
+ log.Errorw("flowId unavailable for DL EAPOL",
log.Fields{"intfId": intfId, "onuId": onuId, "flowStoreCookie": flowStoreCookie})
return
}
@@ -420,8 +494,8 @@
if vlanId, ok := classifierInfo[VLAN_VID]; ok {
classifier.OVid = vlanId.(uint32)
}
- if metadata, ok := classifierInfo[METADATA]; ok {
- classifier.IVid = metadata.(uint32)
+ if metadata, ok := classifierInfo[METADATA]; ok { // TODO: Revisit
+ classifier.IVid = uint32(metadata.(uint64))
}
if vlanPcp, ok := classifierInfo[VLAN_PCP]; ok {
classifier.OPbits = vlanPcp.(uint32)
@@ -577,14 +651,17 @@
return fmt.Sprintf("pon-{%d}/onu-{%d}/uni-{%d}", intfId, onuId, uniId)
}
-func (f *OpenOltFlowMgr) getOnuChildDevice(intfId uint32, onuId uint32) *voltha.Device {
- parentPortNo := IntfIdToPortNo(intfId, voltha.Port_PON_OLT)
- ChildDevice := f.deviceHandler.GetChildDevice(parentPortNo, onuId)
- if ChildDevice == nil {
- log.Errorw("could-not-find-child-device", log.Fields{"parent_port_no": parentPortNo, "onuId": onuId})
- return nil
+func (f *OpenOltFlowMgr) getOnuChildDevice(intfId uint32, onuId uint32) (*voltha.Device, error) {
+ log.Debugw("GetChildDevice", log.Fields{"pon port": intfId, "onuId": onuId})
+ //parentPortNo := IntfIdToPortNo(intfId, voltha.Port_PON_OLT)
+ parentPortNo := intfId
+ onuDevice := f.deviceHandler.GetChildDevice(parentPortNo, onuId)
+ if onuDevice == nil {
+ log.Errorw("onu not found", log.Fields{"intfId": parentPortNo, "onuId": onuId})
+ return nil, errors.New("onu not found")
}
- return ChildDevice
+ log.Debugw("Successfully received child device from core", log.Fields{"child_device": *onuDevice})
+ return onuDevice, nil
}
func findNextFlow(flow *ofp.OfpFlowStats) *ofp.OfpFlowStats {
@@ -652,11 +729,11 @@
return
}
} else if action.Type == fd.POP_VLAN {
- if gotoTable := fd.GetGotoTableId(flow); gotoTable == 0 {
+ /*if gotoTable := fd.GetGotoTableId(flow); gotoTable == 0 {
log.Infow("action-type-pop-vlan, being taken care of by ONU", log.Fields{"flow": flow})
actionInfo[POP_VLAN] = false
return
- }
+ }*/
actionInfo[POP_VLAN] = true
log.Debugw("action-type-pop-vlan", log.Fields{"in_port": classifierInfo[IN_PORT].(uint32)})
} else if action.Type == fd.PUSH_VLAN {
@@ -697,14 +774,14 @@
return
}
}
- if gotoTable := fd.GetGotoTableId(flow); gotoTable != 0 {
+ /*if gotoTable := fd.GetGotoTableId(flow); gotoTable != 0 {
if actionInfo[POP_VLAN] == nil {
log.Infow("Go to table - action-type-pop-vlan, being taken care of by ONU", log.Fields{"flow": flow})
return
}
- }
- /* NOTE: This condition will be true when core decompose and provides flows to respective devices separately */
- if _, ok := actionInfo[OUTPUT]; ok == false {
+ }*/
+ /* NOTE: This condition will be false when core decompose and provides flows to respective devices separately */
+ /* if _,ok := actionInfo[OUTPUT]; ok == false{
log.Debug("action-go-to-table, get next flow for outport details")
if _, ok := classifierInfo[METADATA]; ok {
if next_flow := findNextFlow(flow); next_flow == nil {
@@ -721,28 +798,40 @@
}
}
}
- }
+ }*/
log.Infow("Flow ports", log.Fields{"classifierInfo_inport": classifierInfo[IN_PORT], "action_output": actionInfo[OUTPUT]})
portNo, intfId, onuId, uniId := ExtractAccessFromFlow(classifierInfo[IN_PORT].(uint32), actionInfo[OUTPUT].(uint32))
f.divideAndAddFlow(intfId, onuId, uniId, portNo, classifierInfo, actionInfo, flow)
}
-func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(intfId uint32, onuId uint32, uniId uint32, uni string) bool {
+func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(intfId uint32, onuId uint32, uniId uint32, uni string) error {
- onuDevice := f.getOnuChildDevice(intfId, onuId)
- if onuDevice == nil {
- log.Error("Error while fetching Child device from core", log.Fields{"onuId": onuId})
- return false
+ onuDevice, err := f.getOnuChildDevice(intfId, onuId)
+ if err != nil {
+ log.Errorw("Error while fetching Child device from core", log.Fields{"onuId": onuId})
+ return err
}
- log.Debugw("Retrived child device from OLT device handler", log.Fields{"device": *onuDevice})
- tpPath := f.getTPpath(intfId, uni)
- log.Infow("Load-tech-profile-request-to-brcm-handler", log.Fields{"path": tpPath})
- /* TODO : "Send Event message to ONU device via proxy to download tech profile in go routine
- msg = {'proxy_address': onuDevice.proxyAddress, 'uniId': uniId,
- 'event': 'download_tech_profile', 'event_data': tp_path}
- //Send the event message to the ONU adapter
- self.adapter_agent.publish_inter_adapter_message(onu_device.id,msg)
- */
- return true
-
+ log.Debugw("Got child device from OLT device handler", log.Fields{"device": *onuDevice})
+ /* TODO: uncomment once voltha-proto is ready with changes */
+ /*
+ tpPath := f.getTPpath(intfId, uni)
+ tpDownloadMsg := &ic.TechProfileDownload{UniId: uniId, Path: tpPath}
+ var tpDownloadMsg interface{}
+ log.Infow("Sending Load-tech-profile-request-to-brcm-onu-adapter",log.Fields{"msg": *tpDownloadMsg})
+ sendErr := f.deviceHandler.AdapterProxy.SendInterAdapterMessage(context.Background(),
+ tpDownloadMsg,
+ //ic.InterAdapterMessageType_TECH_PROFILE_DOWNLOAD_REQUEST,
+ ic.InterAdapterMessageType_OMCI_REQUEST,
+ f.deviceHandler.deviceType,
+ onuDevice.Type,
+ onuDevice.Id,
+ onuDevice.ProxyAddress.DeviceId, "")
+ if sendErr != nil {
+ log.Errorw("send techprofile-download request error", log.Fields{"fromAdapter": f.deviceHandler.deviceType,
+ "toAdapter": onuDevice.Type, "onuId": onuDevice.Id,
+ "proxyDeviceId": onuDevice.ProxyAddress.DeviceId})
+ return sendErr
+ }
+ log.Debugw("success Sending Load-tech-profile-request-to-brcm-onu-adapter",log.Fields{"msg":tpDownloadMsg})*/
+ return nil
}