VOL-1624 Support for tech-profile creation on the first flow that references the tp-id (in write-metadata)
Getting meter from flow itself and bug fixes
Bug fix for dhcp packet-out
Change-Id: Ia466988bfdbfe49fd9a44729a4ba4a30fd991c54
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index 1b2195c..f5e5ffb 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -45,6 +45,11 @@
"google.golang.org/grpc/status"
)
+const (
+ MAX_RETRY = 10
+ MAX_TIMEOUT_IN_MS = 500
+)
+
//DeviceHandler will interact with the OLT device.
type DeviceHandler struct {
deviceID string
@@ -956,12 +961,12 @@
}
//UpdateFlowsIncrementally updates the device flow
-func (dh *DeviceHandler) UpdateFlowsIncrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges) error {
- log.Debugw("In Update_flows_incrementally", log.Fields{"deviceID": device.Id, "flows": flows, "groups": groups})
+func (dh *DeviceHandler) UpdateFlowsIncrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
+ log.Debugw("Received-incremental-flowupdate-in-device-handler", log.Fields{"deviceID": device.Id, "flows": flows, "groups": groups, "flowMetadata": flowMetadata})
if flows != nil {
for _, flow := range flows.ToAdd.Items {
log.Debug("Adding flow", log.Fields{"deviceId": device.Id, "flowToAdd": flow})
- dh.flowMgr.AddFlow(flow)
+ dh.flowMgr.AddFlow(flow, flowMetadata)
}
for _, flow := range flows.ToRemove.Items {
log.Debug("Removing flow", log.Fields{"deviceId": device.Id, "flowToRemove": flow})
@@ -974,6 +979,7 @@
// dh.flowMgr.RemoveFlow(flow)
}
}
+ log.Debug("UpdateFlowsIncrementally done successfully")
return nil
}
@@ -1095,11 +1101,20 @@
}
intfID := IntfIDFromUniPortNum(uint32(egressPortNo))
onuID := OnuIDFromPortNum(uint32(egressPortNo))
- uniID := uint32(egressPortNo)
- onuPkt := oop.OnuPacket{IntfId: intfID, OnuId: onuID, PortNo: uint32(egressPortNo), Pkt: packet.Data}
+ uniID := UniIDFromPortNum(uint32(egressPortNo))
+
+ gemPortID, err := dh.flowMgr.GetPacketOutGemPortID(intfID, onuID, uint32(egressPortNo))
+ if err != nil {
+ // In this case the openolt agent will receive the gemPortID as 0.
+ // The agent tries to retrieve the gemPortID in this case.
+ // This may not always succeed at the agent and packetOut may fail.
+ log.Error("failed-to-retrieve-gemport-id-for-packet-out")
+ }
+
+ onuPkt := oop.OnuPacket{IntfId: intfID, OnuId: onuID, PortNo: uint32(egressPortNo), GemportId: gemPortID, Pkt: packet.Data}
log.Debugw("sending-packet-to-onu", log.Fields{"egress_port_no": egressPortNo, "IntfId": intfID, "onuID": onuID,
- "uniID": uniID, "packet": hex.EncodeToString(packet.Data)})
+ "uniID": uniID, "gemPortID": gemPortID, "packet": hex.EncodeToString(packet.Data)})
if _, err := dh.Client.OnuPacketOut(context.Background(), &onuPkt); err != nil {
log.Errorw("Error while sending packet-out to ONU", log.Fields{"error": err})
diff --git a/adaptercore/olt_platform.go b/adaptercore/olt_platform.go
index 157f331..72eb124 100644
--- a/adaptercore/olt_platform.go
+++ b/adaptercore/olt_platform.go
@@ -177,7 +177,7 @@
}
//FlowExtractInfo fetches uniport from the flow, based on which it gets and returns ponInf, onuID and uniID
-func FlowExtractInfo(flow *ofp.OfpFlowStats, flowDirection string) (uint32, uint32, uint32, error) {
+func FlowExtractInfo(flow *ofp.OfpFlowStats, flowDirection string) (uint32, uint32, uint32, uint32, error) {
var uniPortNo uint32
var ponIntf uint32
var onuID uint32
@@ -206,12 +206,12 @@
}
if uniPortNo == 0 {
- return 0, 0, 0, errors.New("failed to extract Pon Interface, ONU Id and Uni Id from flow")
+ return 0, 0, 0, 0, errors.New("Failed to extract Pon Interface, ONU Id and Uni Id from flow")
}
ponIntf = IntfIDFromUniPortNum(uniPortNo)
onuID = OnuIDFromUniPortNum(uniPortNo)
uniID = UniIDFromPortNum(uniPortNo)
- return ponIntf, onuID, uniID, nil
+ return uniPortNo, ponIntf, onuID, uniID, nil
}
diff --git a/adaptercore/openolt.go b/adaptercore/openolt.go
index abbc4e3..3d10a6f 100644
--- a/adaptercore/openolt.go
+++ b/adaptercore/openolt.go
@@ -249,15 +249,15 @@
}
//Update_flows_bulk returns
-func (oo *OpenOLT) Update_flows_bulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
+func (oo *OpenOLT) Update_flows_bulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata) error {
return errors.New("unImplemented")
}
//Update_flows_incrementally updates (add/remove) the flows on a given device
-func (oo *OpenOLT) Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges) error {
- log.Debugw("Update_flows_incrementally", log.Fields{"deviceId": device.Id, "flows": flows})
+func (oo *OpenOLT) Update_flows_incrementally(device *voltha.Device, flows *openflow_13.FlowChanges, groups *openflow_13.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
+ log.Debugw("Update_flows_incrementally", log.Fields{"deviceId": device.Id, "flows": flows, "flowMetadata": flowMetadata})
if handler := oo.getDeviceHandler(device.Id); handler != nil {
- return handler.UpdateFlowsIncrementally(device, flows, groups)
+ return handler.UpdateFlowsIncrementally(device, flows, groups, flowMetadata)
}
log.Errorw("Update_flows_incrementally failed-device-handler-not-set", log.Fields{"deviceId": device.Id})
return errors.New("device-handler-not-set")
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index 498f6e7..38d5c7e 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -23,6 +23,8 @@
"encoding/json"
"errors"
"fmt"
+ "math/big"
+
"github.com/opencord/voltha-go/common/log"
tp "github.com/opencord/voltha-go/common/techprofile"
"github.com/opencord/voltha-go/rw_core/utils"
@@ -30,8 +32,8 @@
ic "github.com/opencord/voltha-protos/go/inter_container"
ofp "github.com/opencord/voltha-protos/go/openflow_13"
openoltpb2 "github.com/opencord/voltha-protos/go/openolt"
+ tp_pb "github.com/opencord/voltha-protos/go/tech_profile"
"github.com/opencord/voltha-protos/go/voltha"
- "math/big"
//deepcopy "github.com/getlantern/deepcopy"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -46,6 +48,9 @@
//EapolFlow flow category
EapolFlow = "EAPOL_FLOW"
+ //DhcpFlow flow category
+ DhcpFlow = "DHCP_FLOW"
+
//IPProtoDhcp flow category
IPProtoDhcp = 17
@@ -117,6 +122,14 @@
PushVlan = "push_vlan"
//TrapToHost constant
TrapToHost = "trap_to_host"
+ //MaxMeterBand constant
+ MaxMeterBand = 2
+ //VlanPCPMask contant
+ VlanPCPMask = 0xFF
+ //VlanvIDMask constant
+ VlanvIDMask = 0xFFF
+ //MaxPonPorts constant
+ MaxPonPorts = 16
)
type onuInfo struct {
@@ -159,6 +172,7 @@
var flowMgr OpenOltFlowMgr
flowMgr.deviceHandler = dh
flowMgr.resourceMgr = rsrcMgr
+ flowMgr.techprofile = make([]*tp.TechProfileMgr, MaxPonPorts)
if err := flowMgr.populateTechProfilePerPonPort(); err != nil {
log.Error("Error while populating tech profile mgr\n")
return nil
@@ -197,138 +211,337 @@
log.Debugw("updated Stored flow info", log.Fields{"storedDeviceFlows": f.storedDeviceFlows})
}
-func (f *OpenOltFlowMgr) divideAndAddFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifierInfo map[string]interface{}, actionInfo map[string]interface{}, flow *ofp.OfpFlowStats) {
- var allocID []uint32
+func (f *OpenOltFlowMgr) divideAndAddFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifierInfo map[string]interface{}, actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, TpId uint32, UsMeterId uint32, DsMeterId uint32, flowMetadata *voltha.FlowMetadata) {
+ var allocId []uint32
var gemPorts []uint32
- var uni string
+ var gemPort uint32
+ var TpInst *tp.TechProfile
- log.Infow("Dividing flow", log.Fields{"intfId": intfID, "onuId": onuID, "uniId": uniID, "portNo": portNo, "classifier": classifierInfo, "action": actionInfo})
-
- log.Infow("sorting flow", log.Fields{"intfId": intfID, "onuId": onuID, "uniId": uniID, "portNo": portNo,
- "classifierInfo": classifierInfo, "actionInfo": actionInfo})
-
+ log.Infow("Dividing flow", log.Fields{"intfId": intfID, "onuId": onuID, "uniId": uniID, "portNo": portNo,
+ "classifier": classifierInfo, "action": actionInfo, "UsMeterId": UsMeterId, "DsMeterId": DsMeterId, "TpId": TpId})
// only create tcont/gemports if there is actually an onu id. otherwise BAL throws an error. Usually this
// is because the flow is an NNI flow and there would be no onu resources associated with it
// TODO: properly deal with NNI flows
- if onuID > 0 {
- uni = getUniPortPath(intfID, onuID, uniID)
- log.Debugw("Uni port name", log.Fields{"uni": uni})
- allocID, gemPorts = f.createTcontGemports(intfID, onuID, uniID, uni, portNo, flow.GetTableId())
- if allocID == nil || gemPorts == nil {
- log.Error("alloc-id-gem-ports-unavailable")
- return
- }
- log.Debugw("Generated required alloc and gemport ids", log.Fields{"alloc_id": allocID, "gemPorts": gemPorts})
- } else {
+ if onuID <= 0 {
log.Errorw("No onu id for flow", log.Fields{"portNo": portNo, "classifer": classifierInfo, "action": actionInfo})
return
}
- /* Flows can't be added specific to gemport unless p-bits are received.
- * Hence adding flows for all gemports
+ uni := getUniPortPath(intfID, onuID, uniID)
+ log.Debugw("Uni port name", log.Fields{"uni": uni})
+ allocId, gemPorts, TpInst = f.createTcontGemports(intfID, onuID, uniID, uni, portNo, TpId, UsMeterId, DsMeterId, flowMetadata)
+ if allocId == nil || gemPorts == nil || TpInst == nil {
+ log.Error("alloc-id-gem-ports-tp-unavailable")
+ return
+ }
+
+ /* Flows can be added specific to gemport if p-bits are received.
+ * If no pbit mentioned then adding flows for all gemports
*/
- for _, gemPort := range gemPorts {
- if ipProto, ok := classifierInfo[IPProto]; ok {
- if ipProto.(uint32) == IPProtoDhcp {
- log.Info("Adding DHCP flow")
- f.addDHCPTrapFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID[0], gemPort)
- } else if ipProto.(uint32) == IPProtoIgmp {
- log.Info("igmp flow add ignored, not implemented yet")
+
+ args := make(map[string]uint32)
+ args["intfId"] = intfID
+ args["onuId"] = onuID
+ args["uniId"] = uniID
+ args["portNo"] = portNo
+ args["allocId"] = allocId[0]
+
+ if ipProto, ok := classifierInfo[IPProto]; ok {
+ if ipProto.(uint32) == IPProtoDhcp {
+ log.Info("Adding DHCP flow")
+ if pcp, ok := classifierInfo[VlanPcp]; ok {
+ gemPort = f.techprofile[intfID].GetGemportIDForPbit(TpInst,
+ tp_pb.Direction_UPSTREAM,
+ pcp.(uint32))
+ //Adding DHCP upstream flow
+ f.addDHCPTrapFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocId[0], gemPort)
} else {
- log.Errorw("Invalid-Classifier-to-handle", log.Fields{"classifier": classifierInfo, "action": actionInfo})
- //return errors.New("Invalid-Classifier-to-handle")
+ //Adding DHCP upstream flow to all gemports
+ installFlowOnAllGemports(f.addDHCPTrapFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, DhcpFlow)
}
- } else if ethType, ok := classifierInfo[EthType]; ok {
- if ethType.(uint32) == EapEthType {
- log.Info("Adding EAPOL flow")
- f.addEAPOLFlow(intfID, onuID, uniID, portNo, flow, allocID[0], gemPort, DefaultMgmtVlan)
- if vlan := getSubscriberVlan(utils.GetInPort(flow)); vlan != 0 {
- f.addEAPOLFlow(intfID, onuID, uniID, portNo, flow, allocID[0], gemPort, vlan)
- }
- // Send Techprofile download event to child device in go routine as it takes time
- go f.sendTPDownloadMsgToChild(intfID, onuID, uniID, uni)
- }
- if ethType == LldpEthType {
- log.Info("Adding LLDP flow")
- addLLDPFlow(flow, portNo)
- }
- } else if _, ok := actionInfo[PushVlan]; ok {
- log.Info("Adding upstream data rule")
- f.addUpstreamDataFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID[0], gemPort)
- } else if _, ok := actionInfo[PopVlan]; ok {
- log.Info("Adding Downstream data rule")
- f.addDownstreamDataFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocID[0], gemPort)
+
+ } else if ipProto == IgmpProto {
+ log.Info("igmp flow add ignored, not implemented yet")
} else {
- log.Errorw("Invalid-flow-type-to-handle", log.Fields{"classifier": classifierInfo, "action": actionInfo, "flow": flow})
+ log.Errorw("Invalid-Classifier-to-handle", log.Fields{"classifier": classifierInfo, "action": actionInfo})
+ //return errors.New("Invalid-Classifier-to-handle")
}
+ } else if ethType, ok := classifierInfo[EthType]; ok {
+ if ethType.(uint32) == EapEthType {
+ log.Info("Adding EAPOL flow")
+ var vlanId uint32
+ if val, ok := classifierInfo[VlanVid]; ok {
+ vlanId = (val.(uint32)) & VlanvIDMask
+ } else {
+ vlanId = DefaultMgmtVlan
+ }
+ if pcp, ok := classifierInfo[VlanPcp]; ok {
+ gemPort = f.techprofile[intfID].GetGemportIDForPbit(TpInst,
+ tp_pb.Direction_UPSTREAM,
+ pcp.(uint32))
+
+ f.addEAPOLFlow(intfID, onuID, uniID, portNo, flow, allocId[0], gemPort, vlanId)
+ } else {
+ installFlowOnAllGemports(nil, f.addEAPOLFlow, args, classifierInfo, actionInfo, flow, gemPorts, EapolFlow, vlanId)
+ }
+ // Send Techprofile download event to child device in go routine as it takes time
+ go f.sendTPDownloadMsgToChild(intfID, onuID, uniID, uni, TpId)
+ }
+ if ethType == LldpEthType {
+ log.Info("Adding LLDP flow")
+ addLLDPFlow(flow, portNo)
+ }
+ } else if _, ok := actionInfo[PushVlan]; ok {
+ log.Info("Adding upstream data rule")
+ if pcp, ok := classifierInfo[VlanPcp]; ok {
+ gemPort = f.techprofile[intfID].GetGemportIDForPbit(TpInst,
+ tp_pb.Direction_UPSTREAM,
+ pcp.(uint32))
+ //Adding HSIA upstream flow
+ f.addUpstreamDataFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocId[0], gemPort)
+ } else {
+ //Adding HSIA upstream flow to all gemports
+ installFlowOnAllGemports(f.addUpstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, HsiaFlow)
+ }
+ } else if _, ok := actionInfo[PopVlan]; ok {
+ log.Info("Adding Downstream data rule")
+ if pcp, ok := classifierInfo[VlanPcp]; ok {
+ gemPort = f.techprofile[intfID].GetGemportIDForPbit(TpInst,
+ tp_pb.Direction_UPSTREAM,
+ pcp.(uint32))
+ //Adding HSIA downstream flow
+ f.addDownstreamDataFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, allocId[0], gemPort)
+ } else {
+ //Adding HSIA downstream flow to all gemports
+ installFlowOnAllGemports(f.addDownstreamDataFlow, nil, args, classifierInfo, actionInfo, flow, gemPorts, HsiaFlow)
+ }
+ } else {
+ log.Errorw("Invalid-flow-type-to-handle", log.Fields{"classifier": classifierInfo, "action": actionInfo, "flow": flow})
}
}
+func (f *OpenOltFlowMgr) CreateSchedulerQueues(Dir tp_pb.Direction, IntfId uint32, OnuId uint32, UniId uint32, UniPort uint32, TpInst *tp.TechProfile, MeterId uint32, flowMetadata *voltha.FlowMetadata) error {
+
+ log.Debugw("CreateSchedulerQueues", log.Fields{"Dir": Dir, "IntfId": IntfId, "OnuId": OnuId,
+ "UniId": UniId, "MeterId": MeterId, "TpInst": *TpInst, "flowMetadata": flowMetadata})
+
+ if MeterId == 0 { // This should never happen
+ log.Error("Invalid meter id")
+ return errors.New("Invalid meter id")
+ }
+
+ /* Lets make a simple assumption that if the meter-id is present on the KV store,
+ * then the scheduler and queues configuration is applied on the OLT device
+ * in the given direction.
+ */
+ var Direction string
+ var SchedCfg *tp_pb.SchedulerConfig
+ if Dir == tp_pb.Direction_UPSTREAM {
+ Direction = "upstream"
+ } else if Dir == tp_pb.Direction_DOWNSTREAM {
+ Direction = "downstream"
+ }
+ KvStoreMeter, err := f.resourceMgr.GetMeterIdForOnu(Direction, IntfId, OnuId, UniId)
+ if err != nil {
+ log.Error("Failed to get meter for intf %d, onuid %d, uniid %d", IntfId, OnuId, UniId)
+ return err
+ }
+ if KvStoreMeter != nil {
+ if KvStoreMeter.MeterId == MeterId {
+ log.Debug("Scheduler already created for upstream")
+ return nil
+ } else {
+ log.Errorw("Dynamic meter update not supported", log.Fields{"KvStoreMeterId": KvStoreMeter.MeterId, "MeterId-in-flow": MeterId})
+ return errors.New("Invalid-meter-id-in-flow")
+ }
+ }
+ log.Debugw("Meter-does-not-exist-Creating-new", log.Fields{"MeterId": MeterId, "Direction": Direction})
+ if Dir == tp_pb.Direction_UPSTREAM {
+ SchedCfg = f.techprofile[IntfId].GetUsScheduler(TpInst)
+ } else if Dir == tp_pb.Direction_DOWNSTREAM {
+ SchedCfg = f.techprofile[IntfId].GetDsScheduler(TpInst)
+ }
+ var meterConfig *ofp.OfpMeterConfig
+ if flowMetadata != nil {
+ for _, meter := range flowMetadata.Meters {
+ if MeterId == meter.MeterId {
+ meterConfig = meter
+ log.Debugw("Found-meter-config-from-flowmetadata", log.Fields{"meterConfig": meterConfig})
+ break
+ }
+ }
+ } else {
+ log.Error("Flow-metadata-is-not-present-in-flow")
+ }
+ if meterConfig == nil {
+ log.Errorw("Could-not-get-meterbands-from-flowMetadata", log.Fields{"flowMetadata": flowMetadata, "MeterId": MeterId})
+ return errors.New("Failed-to-get-meter-from-flowMetadata")
+ } else if len(meterConfig.Bands) < MaxMeterBand {
+ log.Errorw("Invalid-number-of-bands-in-meter", log.Fields{"Bands": meterConfig.Bands, "MeterId": MeterId})
+ return errors.New("Invalid-number-of-bands-in-meter")
+ }
+ cir := meterConfig.Bands[0].Rate
+ cbs := meterConfig.Bands[0].BurstSize
+ eir := meterConfig.Bands[1].Rate
+ ebs := meterConfig.Bands[1].BurstSize
+ pir := cir + eir
+ pbs := cbs + ebs
+ TrafficShaping := &tp_pb.TrafficShapingInfo{Cir: cir, Cbs: cbs, Pir: pir, Pbs: pbs}
+
+ TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[IntfId].GetTrafficScheduler(TpInst, SchedCfg, TrafficShaping)}
+
+ log.Debugw("Sending Traffic scheduler create to device", log.Fields{"Direction": Direction, "TrafficScheds": TrafficSched})
+ if _, err := f.deviceHandler.Client.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
+ IntfId: IntfId, OnuId: OnuId,
+ UniId: UniId, PortNo: UniPort,
+ TrafficScheds: TrafficSched}); err != nil {
+ log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
+ return err
+ }
+ // On receiving the CreateTrafficQueues request, the driver should create corresponding
+ // downstream queues.
+ trafficQueues := f.techprofile[IntfId].GetTrafficQueues(TpInst, Dir)
+ log.Debugw("Sending Traffic Queues create to device", log.Fields{"Direction": Direction, "TrafficQueues": trafficQueues})
+ if _, err := f.deviceHandler.Client.CreateTrafficQueues(context.Background(),
+ &tp_pb.TrafficQueues{IntfId: IntfId, OnuId: OnuId,
+ UniId: UniId, PortNo: UniPort,
+ TrafficQueues: trafficQueues}); err != nil {
+ log.Errorw("Failed to create traffic queues in device", log.Fields{"error": err})
+ return err
+ }
+
+ /* After we succesfully applied the scheduler configuration on the OLT device,
+ * store the meter id on the KV store, for further reference.
+ */
+ if err := f.resourceMgr.UpdateMeterIdForOnu(Direction, IntfId, OnuId, UniId, meterConfig); err != nil {
+ log.Error("Failed to update meter id for onu %d, meterid %d", OnuId, MeterId)
+ return err
+ }
+ log.Debugw("updated-meter-info into KV store successfully", log.Fields{"Direction": Direction,
+ "Meter": meterConfig})
+ return nil
+}
+
+func (f *OpenOltFlowMgr) RemoveSchedulerQueues(Dir tp_pb.Direction, IntfId uint32, OnuId uint32, UniId uint32, UniPort uint32, TpInst *tp.TechProfile) error {
+
+ var Direction string
+ var SchedCfg *tp_pb.SchedulerConfig
+ var err error
+ log.Debugw("Removing schedulers and Queues in OLT", log.Fields{"Direction": Dir, "IntfId": IntfId, "OnuId": OnuId, "UniId": UniId, "UniPort": UniPort})
+ if Dir == tp_pb.Direction_UPSTREAM {
+ SchedCfg = f.techprofile[IntfId].GetUsScheduler(TpInst)
+ Direction = "upstream"
+ } else if Dir == tp_pb.Direction_DOWNSTREAM {
+ SchedCfg = f.techprofile[IntfId].GetDsScheduler(TpInst)
+ Direction = "downstream"
+ }
+
+ KVStoreMeter, err := f.resourceMgr.GetMeterIdForOnu(Direction, IntfId, OnuId, UniId)
+ if err != nil {
+ log.Errorf("Failed to get Meter for Onu %d", OnuId)
+ return err
+ }
+ if KVStoreMeter == nil {
+ log.Debugw("No-meter-has-been-installed-yet", log.Fields{"direction": Direction, "IntfId": IntfId, "OnuId": OnuId, "UniId": UniId})
+ return nil
+ }
+ cir := KVStoreMeter.Bands[0].Rate
+ cbs := KVStoreMeter.Bands[0].BurstSize
+ eir := KVStoreMeter.Bands[1].Rate
+ ebs := KVStoreMeter.Bands[1].BurstSize
+ pir := cir + eir
+ pbs := cbs + ebs
+
+ TrafficShaping := &tp_pb.TrafficShapingInfo{Cir: cir, Cbs: cbs, Pir: pir, Pbs: pbs}
+
+ TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[IntfId].GetTrafficScheduler(TpInst, SchedCfg, TrafficShaping)}
+ TrafficQueues := f.techprofile[IntfId].GetTrafficQueues(TpInst, Dir)
+
+ if _, err = f.deviceHandler.Client.RemoveTrafficQueues(context.Background(),
+ &tp_pb.TrafficQueues{IntfId: IntfId, OnuId: OnuId,
+ UniId: UniId, PortNo: UniPort,
+ TrafficQueues: TrafficQueues}); err != nil {
+ log.Error("Failed to remove traffic queues")
+ return err
+ } else {
+ log.Debug("Removed traffic queues successfully")
+ }
+ if _, err = f.deviceHandler.Client.RemoveTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
+ IntfId: IntfId, OnuId: OnuId,
+ UniId: UniId, PortNo: UniPort,
+ TrafficScheds: TrafficSched}); err != nil {
+ log.Error("failed to remove traffic schedulers")
+ return err
+ } else {
+ log.Debug("Removed traffic schedulers successfully")
+ }
+
+ /* After we succesfully remove the scheduler configuration on the OLT device,
+ * delete the meter id on the KV store.
+ */
+ err = f.resourceMgr.RemoveMeterIdForOnu(Direction, IntfId, OnuId, UniId)
+ if err != nil {
+ log.Errorf("Failed to remove meter for onu %d, meter id %d", OnuId, KVStoreMeter.MeterId)
+ }
+ log.Debugw("Removed-meter-from-KV-store successfully", log.Fields{"MeterId": KVStoreMeter.MeterId, "dir": Direction})
+ return err
+}
+
// This function allocates tconts and GEM ports for an ONU, currently one TCONT is supported per ONU
-func (f *OpenOltFlowMgr) createTcontGemports(intfID uint32, onuID uint32, uniID uint32, uni string, uniPort uint32, tableID uint32) ([]uint32, []uint32) {
+func (f *OpenOltFlowMgr) createTcontGemports(intfID uint32, onuID uint32, uniID uint32, uni string, uniPort uint32, TpID uint32, UsMeterID uint32, DsMeterID uint32, flowMetadata *voltha.FlowMetadata) ([]uint32, []uint32, *tp.TechProfile) {
var allocID []uint32
var gemPortIDs []uint32
//If we already have allocated earlier for this onu, render them
- if tcontID := f.resourceMgr.GetCurrentAllocIDForOnu(intfID, onuID, uniID); tcontID != 0 {
- allocID = append(allocID, tcontID)
+ if tcontId := f.resourceMgr.GetCurrentAllocIDForOnu(intfID, onuID, uniID); tcontId != 0 {
+ allocID = append(allocID, tcontId)
}
gemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(intfID, onuID, uniID)
- if len(allocID) != 0 && len(gemPortIDs) != 0 {
- log.Debug("Rendered Tcont and GEM ports from resource manager", log.Fields{"intfId": intfID, "onuId": onuID, "uniPort": uniID,
- "allocID": allocID, "gemPortIDs": gemPortIDs})
- return allocID, gemPortIDs
+
+ tpPath := f.getTPpath(intfID, uni, TpID)
+ // Check tech profile instance already exists for derived port name
+ tech_profile_instance, err := f.techprofile[intfID].GetTPInstanceFromKVStore(TpID, tpPath)
+ if err != nil { // This should not happen, something wrong in KV backend transaction
+ log.Errorw("Error in fetching tech profile instance from KV store", log.Fields{"tpID": TpID, "path": tpPath})
+ return nil, nil, nil
}
+
log.Debug("Creating New TConts and Gem ports", log.Fields{"pon": intfID, "onu": onuID, "uni": uniID})
- //FIXME: If table id is <= 63 using 64 as table id
- if tableID < tp.DEFAULT_TECH_PROFILE_TABLE_ID {
- tableID = tp.DEFAULT_TECH_PROFILE_TABLE_ID
- }
- tpPath := f.getTPpath(intfID, uni)
- // Check tech profile instance already exists for derived port name
- techProfileInstance, err := f.techprofile[intfID].GetTPInstanceFromKVStore(tableID, tpPath)
- if err != nil { // This should not happen, something wrong in KV backend transaction
- log.Errorw("Error in fetching tech profile instance from KV store", log.Fields{"tableID": tableID, "path": tpPath})
- return nil, nil
- }
- if techProfileInstance == nil {
+ if tech_profile_instance == nil {
log.Info("Creating tech profile instance", log.Fields{"path": tpPath})
- techProfileInstance = f.techprofile[intfID].CreateTechProfInstance(tableID, uni, intfID)
- if techProfileInstance == nil {
+ tech_profile_instance = f.techprofile[intfID].CreateTechProfInstance(TpID, uni, intfID)
+ if tech_profile_instance == nil {
log.Error("Tech-profile-instance-creation-failed")
- return nil, nil
+ return nil, nil, nil
}
+ f.resourceMgr.UpdateTechProfileIdForOnu(intfID, onuID, uniID, TpID)
} else {
log.Debugw("Tech-profile-instance-already-exist-for-given port-name", log.Fields{"uni": uni})
}
- // Get upstream and downstream scheduler protos
- usScheduler := f.techprofile[intfID].GetUsScheduler(techProfileInstance)
- dsScheduler := f.techprofile[intfID].GetDsScheduler(techProfileInstance)
- // Get TCONTS protos
- tconts := f.techprofile[intfID].GetTconts(techProfileInstance, usScheduler, dsScheduler)
- if len(tconts) == 0 {
- log.Error("TCONTS not found ")
- return nil, nil
+ if UsMeterID != 0 {
+ if err := f.CreateSchedulerQueues(tp_pb.Direction_UPSTREAM, intfID, onuID, uniID, uniPort, tech_profile_instance, UsMeterID, flowMetadata); err != nil {
+ log.Errorw("CreateSchedulerQueues Failed-upstream", log.Fields{"error": err, "meterID": UsMeterID})
+ return nil, nil, nil
+ }
}
- log.Debugw("Sending Create tcont to device",
- log.Fields{"onu": onuID, "uni": uniID, "portNo": "", "tconts": tconts})
- if _, err := f.deviceHandler.Client.CreateTconts(context.Background(),
- &openoltpb2.Tconts{IntfId: intfID,
- OnuId: onuID,
- UniId: uniID,
- PortNo: uniPort,
- Tconts: tconts}); err != nil {
- log.Errorw("Error while creating TCONT in device", log.Fields{"error": err})
- return nil, nil
+ if DsMeterID != 0 {
+ if err := f.CreateSchedulerQueues(tp_pb.Direction_DOWNSTREAM, intfID, onuID, uniID, uniPort, tech_profile_instance, DsMeterID, flowMetadata); err != nil {
+ log.Errorw("CreateSchedulerQueues Failed-downstream", log.Fields{"error": err, "meterID": DsMeterID})
+ return nil, nil, nil
+ }
}
- allocID = append(allocID, techProfileInstance.UsScheduler.AllocID)
- for _, gem := range techProfileInstance.UpstreamGemPortAttributeList {
- gemPortIDs = append(gemPortIDs, gem.GemportID)
+ if len(allocID) == 0 { // Created TCONT first time
+ allocID = append(allocID, tech_profile_instance.UsScheduler.AllocID)
+ }
+ if len(gemPortIDs) == 0 { // Create GEM ports first time
+ for _, gem := range tech_profile_instance.UpstreamGemPortAttributeList {
+ gemPortIDs = append(gemPortIDs, gem.GemportID)
+ }
}
log.Debugw("Allocated Tcont and GEM ports", log.Fields{"allocID": allocID, "gemports": gemPortIDs})
// Send Tconts and GEM ports to KV store
f.storeTcontsGEMPortsIntoKVStore(intfID, onuID, uniID, allocID, gemPortIDs)
- return allocID, gemPortIDs
+ return allocID, gemPortIDs, tech_profile_instance
}
func (f *OpenOltFlowMgr) storeTcontsGEMPortsIntoKVStore(intfID uint32, onuID uint32, uniID uint32, allocID []uint32, gemPortIDs []uint32) {
@@ -352,19 +565,22 @@
}
func (f *OpenOltFlowMgr) populateTechProfilePerPonPort() error {
+ var tpCount int = 0
for _, techRange := range f.resourceMgr.DevInfo.Ranges {
- for intfID := range techRange.IntfIds {
- f.techprofile = append(f.techprofile, f.resourceMgr.ResourceMgrs[uint32(intfID)].TechProfileMgr)
+ for _, intfId := range techRange.IntfIds {
+ f.techprofile[intfId] = f.resourceMgr.ResourceMgrs[uint32(intfId)].TechProfileMgr
+ tpCount++
+ log.Debugw("Init tech profile done", log.Fields{"intfId": intfId})
}
}
//Make sure we have as many tech_profiles as there are pon ports on the device
- if len(f.techprofile) != int(f.resourceMgr.DevInfo.GetPonPorts()) {
+ if tpCount != int(f.resourceMgr.DevInfo.GetPonPorts()) {
log.Errorw("Error while populating techprofile",
- log.Fields{"numofTech": len(f.techprofile), "numPonPorts": f.resourceMgr.DevInfo.GetPonPorts()})
- return errors.New("error while populating techprofile mgrs")
+ log.Fields{"numofTech": tpCount, "numPonPorts": f.resourceMgr.DevInfo.GetPonPorts()})
+ return errors.New("Error while populating techprofile mgrs")
}
- log.Infow("Populated techprofile per ponport successfully",
- log.Fields{"numofTech": len(f.techprofile), "numPonPorts": f.resourceMgr.DevInfo.GetPonPorts()})
+ log.Infow("Populated techprofile for ponports successfully",
+ log.Fields{"numofTech": tpCount, "numPonPorts": f.resourceMgr.DevInfo.GetPonPorts()})
return nil
}
@@ -386,13 +602,18 @@
downlinkClassifier[PacketTagType] = DoubleTag
log.Debugw("Adding downstream data flow", log.Fields{"downlinkClassifier": downlinkClassifier,
"downlinkAction": downlinkAction})
- // Ignore private VLAN flow given by decomposer, cannot do anything with this flow
- if uint32(downlinkClassifier[METADATA].(uint64)) == MkUniPortNum(intfID, onuID, uniID) &&
- downlinkClassifier[VlanVid] == (uint32(ofp.OfpVlanId_OFPVID_PRESENT)|4000) {
- log.Infow("EAPOL DL flow , Already added ,ignoring it", log.Fields{"downlinkClassifier": downlinkClassifier,
- "downlinkAction": downlinkAction})
- return
+ // Ignore Downlink trap flow given by core, cannot do anything with this flow */
+ if vlan, exists := downlinkClassifier[VlanVid]; exists {
+ if vlan.(uint32) == (uint32(ofp.OfpVlanId_OFPVID_PRESENT) | 4000) { //private VLAN given by core
+ if metadata, exists := downlinkClassifier[METADATA]; exists { // inport is filled in metadata by core
+ if uint32(metadata.(uint64)) == MkUniPortNum(intfID, onuID, uniID) {
+ log.Infow("Ignoring DL trap device flow from core", log.Fields{"flow": logicalFlow})
+ return
+ }
+ }
+ }
}
+
/* Already this info available classifier? */
downlinkAction[PopVlan] = true
downlinkAction[VlanVid] = downlinkClassifier[VlanVid]
@@ -413,9 +634,13 @@
log.Debugw("Adding HSIA flow", log.Fields{"intfId": intfID, "onuId": onuID, "uniId": uniID, "classifier": classifier,
"action": action, "direction": direction, "allocId": allocID, "gemPortId": gemPortID,
"logicalFlow": *logicalFlow})
- flowCategory := "HSIA"
+ var vlan_pit uint32 = 0
+ if _, ok := classifier[VlanPcp]; ok {
+ vlan_pit = classifier[VlanPcp].(uint32)
+ log.Debugw("Found pbit in the flow", log.Fields{"vlan_pit": vlan_pit})
+ }
flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
- flowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, flowStoreCookie, flowCategory)
+ flowId, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, HsiaFlow, vlan_pit)
if err != nil {
log.Errorw("Flow id unavailable for HSIA flow", log.Fields{"direction": direction})
return
@@ -436,7 +661,7 @@
flow := openoltpb2.Flow{AccessIntfId: int32(intfID),
OnuId: int32(onuID),
UniId: int32(uniID),
- FlowId: flowID,
+ FlowId: flowId,
FlowType: direction,
AllocId: int32(allocID),
NetworkIntfId: int32(networkIntfID),
@@ -448,7 +673,7 @@
PortNo: portNo}
if ok := f.addFlowToDevice(logicalFlow, &flow); ok {
log.Debug("HSIA flow added to device successfully", log.Fields{"direction": direction})
- flowsToKVStore := f.getUpdatedFlowInfo(&flow, flowStoreCookie, "HSIA", flowID)
+ flowsToKVStore := f.getUpdatedFlowInfo(&flow, flowStoreCookie, HsiaFlow, flowId)
if err := f.updateFlowInfoToKVStore(flow.AccessIntfId,
flow.OnuId,
flow.UniId,
@@ -477,7 +702,7 @@
flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
- flowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, flowStoreCookie, "")
+ flowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, "", 0 /*classifier[VLAN_PCP].(uint32)*/)
if err != nil {
log.Errorw("flowId unavailable for UL EAPOL", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
@@ -545,7 +770,7 @@
uplinkAction[TrapToHost] = true
flowStoreCookie := getFlowStoreCookie(uplinkClassifier, gemPortID)
//Add Uplink EAPOL Flow
- uplinkFlowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, flowStoreCookie, "")
+ uplinkFlowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, "", 0)
if err != nil {
log.Errorw("flowId unavailable for UL EAPOL", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
return
@@ -592,8 +817,8 @@
return
}
}
-
- if vlanID == DefaultMgmtVlan {
+ // Dummy Downstream flow due to BAL 2.6 limitation
+ {
/* Add Downstream EAPOL Flow, Only for first EAP flow (BAL
# requirement)
# On one of the platforms (Broadcom BAL), when same DL classifier
@@ -613,19 +838,20 @@
log.Debugw("specialVlanEAPOLDlFlow:", log.Fields{"dl_vlan": specialVlanDlFlow})
// Fill Classfier
downlinkClassifier[PacketTagType] = SingleTag
+ downlinkClassifier[EthType] = uint32(EapEthType)
downlinkClassifier[VlanVid] = uint32(specialVlanDlFlow)
// Fill action
downlinkAction[PushVlan] = true
downlinkAction[VlanVid] = vlanID
flowStoreCookie := getFlowStoreCookie(downlinkClassifier, gemPortID)
- downlinkFlowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, flowStoreCookie, "")
+ downlinkFlowId, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, "", 0)
if err != nil {
log.Errorw("flowId unavailable for DL EAPOL",
log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
return
}
log.Debugw("Creating DL EAPOL flow",
- log.Fields{"dl_classifier": downlinkClassifier, "dl_action": downlinkAction, "downlinkFlowId": downlinkFlowID})
+ log.Fields{"dl_classifier": downlinkClassifier, "dl_action": downlinkAction, "downlinkFlowId": downlinkFlowId})
if classifierProto = makeOpenOltClassifierField(downlinkClassifier); classifierProto == nil {
log.Error("Error in making classifier protobuf for downlink flow")
return
@@ -638,7 +864,7 @@
downstreamFlow = openoltpb2.Flow{AccessIntfId: int32(intfID),
OnuId: int32(onuID),
UniId: int32(uniID),
- FlowId: downlinkFlowID,
+ FlowId: downlinkFlowId,
FlowType: DOWNSTREAM,
AllocId: int32(allocID),
NetworkIntfId: int32(networkIntfID),
@@ -651,7 +877,7 @@
if ok := f.addFlowToDevice(logicalFlow, &downstreamFlow); ok {
log.Debug("EAPOL DL flow added to device successfully")
flowCategory := ""
- flowsToKVStore := f.getUpdatedFlowInfo(&downstreamFlow, flowStoreCookie, flowCategory, downlinkFlowID)
+ flowsToKVStore := f.getUpdatedFlowInfo(&downstreamFlow, flowStoreCookie, flowCategory, downlinkFlowId)
if err := f.updateFlowInfoToKVStore(downstreamFlow.AccessIntfId,
downstreamFlow.OnuId,
downstreamFlow.UniId,
@@ -662,9 +888,6 @@
return
}
}
- } else {
- log.Infow("EAPOL flow with non-default mgmt vlan is not supported", log.Fields{"vlanId": vlanID})
- return
}
log.Debugw("Added EAPOL flows to device successfully", log.Fields{"flow": logicalFlow})
}
@@ -678,13 +901,17 @@
classifier.IpProto = ipProto.(uint32)
}
if vlanID, ok := classifierInfo[VlanVid]; ok {
- classifier.OVid = (vlanID.(uint32)) & 0xFFF
+ classifier.OVid = (vlanID.(uint32)) & VlanvIDMask
}
- if metadata, ok := classifierInfo[METADATA]; ok { // TODO: Revisit
+ if metadata, ok := classifierInfo[METADATA]; ok {
classifier.IVid = uint32(metadata.(uint64))
}
if vlanPcp, ok := classifierInfo[VlanPcp]; ok {
- classifier.OPbits = vlanPcp.(uint32)
+ if vlanPcp == 0 {
+ classifier.OPbits = VlanPCPMask
+ } else {
+ classifier.OPbits = (vlanPcp.(uint32)) & VlanPCPMask
+ }
}
if udpSrc, ok := classifierInfo[UDPSrc]; ok {
classifier.SrcPort = udpSrc.(uint32)
@@ -732,14 +959,8 @@
return &action
}
-func (f *OpenOltFlowMgr) getTPpath(intfID uint32, uni string) string {
- /*
- FIXME
- Should get Table id form the flow, as of now hardcoded to DEFAULT_TECH_PROFILE_TABLE_ID (64)
- 'tp_path' contains the suffix part of the tech_profile_instance path. The prefix to the 'tp_path' should be set to
- TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX by the ONU adapter.
- */
- return f.techprofile[intfID].GetTechProfileInstanceKVPath(tp.DEFAULT_TECH_PROFILE_TABLE_ID, uni)
+func (f *OpenOltFlowMgr) getTPpath(intfID uint32, uni string, TpID uint32) string {
+ return f.techprofile[intfID].GetTechProfileInstanceKVPath(TpID, uni)
}
func getFlowStoreCookie(classifier map[string]interface{}, gemPortID uint32) uint64 {
@@ -832,6 +1053,7 @@
if deviceFlow.AccessIntfId != -1 {
intfID = uint32(deviceFlow.AccessIntfId)
} else {
+ // REVIST : Why ponport is given as network port?
intfID = uint32(deviceFlow.NetworkIntfId)
}
@@ -911,12 +1133,6 @@
return nil
}
-func getSubscriberVlan(inPort uint32) uint32 {
- /* For EAPOL case we will use default VLAN , so will implement later if required */
- log.Info("unimplemented inport %v", inPort)
- return 0
-}
-
func (f *OpenOltFlowMgr) clearFlowsAndSchedulerForLogicalPort(childDevice *voltha.Device, logicalPort *voltha.LogicalPort) {
log.Info("unimplemented device %v, logicalport %v", childDevice, logicalPort)
}
@@ -930,7 +1146,7 @@
func (f *OpenOltFlowMgr) clearFlowFromResourceManager(flow *ofp.OfpFlowStats, flowID uint32, flowDirection string) {
log.Debugw("clearFlowFromResourceManager", log.Fields{"flowID": flowID, "flowDirection": flowDirection, "flow": *flow})
- ponIntf, onuID, uniID, err := FlowExtractInfo(flow, flowDirection)
+ portNum, ponIntf, onuID, uniID, err := FlowExtractInfo(flow, flowDirection)
if err != nil {
log.Error(err)
return
@@ -965,13 +1181,36 @@
// For ex: Case of HSIA where same flow is shared
// between DS and US.
f.updateFlowInfoToKVStore(int32(ponIntf), int32(onuID), int32(uniID), flowID, &updatedFlows)
- return
+ if len(updatedFlows) == 0 {
+ log.Debugw("Releasing flow Id to resource manager", log.Fields{"ponIntf": ponIntf, "onuId": onuID, "uniId": uniID, "flowId": flowID})
+ f.resourceMgr.FreeFlowID(ponIntf, onuID, uniID, flowID)
+ }
}
- log.Debugw("Releasing flow Id to resource manager", log.Fields{"ponIntf": ponIntf, "onuID": onuID, "uniID": uniID, "flowID": flowID})
- f.resourceMgr.FreeFlowID(ponIntf, onuID, uniID, flowID)
flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(ponIntf, onuID, uniID)
if len(flowIds) == 0 {
- /* TODO: Remove Upstream and Downstream Schedulers */
+ log.Debugf("Flow count for subscriber %d is zero", onuID)
+ kvstoreTpId := f.resourceMgr.GetTechProfileIdForOnu(ponIntf, onuID, uniID)
+ if kvstoreTpId == 0 {
+ log.Warnw("Could-not-find-techprofile-tableid-for-uni", log.Fields{"ponIntf": ponIntf, "onuId": onuID, "uniId": uniID})
+ return
+ }
+ uni := getUniPortPath(ponIntf, onuID, uniID)
+ tpPath := f.getTPpath(ponIntf, uni, kvstoreTpId)
+ log.Debugw("Getting-techprofile-instance-for-subscriber", log.Fields{"TP-PATH": tpPath})
+ techprofileInst, err := f.techprofile[ponIntf].GetTPInstanceFromKVStore(kvstoreTpId, tpPath)
+ if err != nil { // This should not happen, something wrong in KV backend transaction
+ log.Errorw("Error in fetching tech profile instance from KV store", log.Fields{"tpID": 20, "path": tpPath})
+ return
+ }
+ if techprofileInst == nil {
+ log.Errorw("Tech-profile-instance-does-not-exist-in-KV Store", log.Fields{"tpPath": tpPath})
+ return
+ }
+
+ f.RemoveSchedulerQueues(tp_pb.Direction_UPSTREAM, ponIntf, onuID, uniID, portNum, techprofileInst)
+ f.RemoveSchedulerQueues(tp_pb.Direction_DOWNSTREAM, ponIntf, onuID, uniID, portNum, techprofileInst)
+ } else {
+ log.Debugf("Flow ids for subscriber", log.Fields{"onu": onuID, "flows": flowIds})
}
}
@@ -1013,15 +1252,101 @@
}
// AddFlow add flow to device
-func (f *OpenOltFlowMgr) AddFlow(flow *ofp.OfpFlowStats) {
+func (f *OpenOltFlowMgr) AddFlow(flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) {
classifierInfo := make(map[string]interface{})
actionInfo := make(map[string]interface{})
- log.Debug("Adding Flow", log.Fields{"flow": flow})
+ var UsMeterID uint32
+ var DsMeterID uint32
+
+ log.Debug("Adding Flow", log.Fields{"flow": flow, "flowMetadata": flowMetadata})
for _, field := range utils.GetOfbFields(flow) {
- f.updateClassifierInfo(field, classifierInfo)
+ if field.Type == utils.ETH_TYPE {
+ classifierInfo[EthType] = field.GetEthType()
+ log.Debug("field-type-eth-type", log.Fields{"classifierInfo[ETH_TYPE]": classifierInfo[EthType].(uint32)})
+ } else if field.Type == utils.IP_PROTO {
+ classifierInfo[IPProto] = field.GetIpProto()
+ log.Debug("field-type-ip-proto", log.Fields{"classifierInfo[IP_PROTO]": classifierInfo[IPProto].(uint32)})
+ } else if field.Type == utils.IN_PORT {
+ classifierInfo[InPort] = field.GetPort()
+ log.Debug("field-type-in-port", log.Fields{"classifierInfo[IN_PORT]": classifierInfo[InPort].(uint32)})
+ } else if field.Type == utils.VLAN_VID {
+ classifierInfo[VlanVid] = field.GetVlanVid()
+ log.Debug("field-type-vlan-vid", log.Fields{"classifierInfo[VLAN_VID]": classifierInfo[VlanVid].(uint32)})
+ } else if field.Type == utils.VLAN_PCP {
+ classifierInfo[VlanPcp] = field.GetVlanPcp()
+ log.Debug("field-type-vlan-pcp", log.Fields{"classifierInfo[VLAN_PCP]": classifierInfo[VlanPcp].(uint32)})
+ } else if field.Type == utils.UDP_DST {
+ classifierInfo[UDPDst] = field.GetUdpDst()
+ log.Debug("field-type-udp-dst", log.Fields{"classifierInfo[UDP_DST]": classifierInfo[UDPDst].(uint32)})
+ } else if field.Type == utils.UDP_SRC {
+ classifierInfo[UDPSrc] = field.GetUdpSrc()
+ log.Debug("field-type-udp-src", log.Fields{"classifierInfo[UDP_SRC]": classifierInfo[UDPSrc].(uint32)})
+ } else if field.Type == utils.IPV4_DST {
+ classifierInfo[Ipv4Dst] = field.GetIpv4Dst()
+ log.Debug("field-type-ipv4-dst", log.Fields{"classifierInfo[IPV4_DST]": classifierInfo[Ipv4Dst].(uint32)})
+ } else if field.Type == utils.IPV4_SRC {
+ classifierInfo[Ipv4Src] = field.GetIpv4Src()
+ log.Debug("field-type-ipv4-src", log.Fields{"classifierInfo[IPV4_SRC]": classifierInfo[Ipv4Src].(uint32)})
+ } else if field.Type == utils.METADATA {
+ classifierInfo[METADATA] = field.GetTableMetadata()
+ log.Debug("field-type-metadata", log.Fields{"classifierInfo[METADATA]": classifierInfo[METADATA].(uint64)})
+ } else if field.Type == utils.TUNNEL_ID {
+ classifierInfo[TunnelID] = field.GetTunnelId()
+ log.Debug("field-type-tunnelId", log.Fields{"classifierInfo[TUNNEL_ID]": classifierInfo[TunnelID].(uint64)})
+ } else {
+ log.Errorw("Un supported field type", log.Fields{"type": field.Type})
+ return
+ }
}
for _, action := range utils.GetActions(flow) {
- f.updateFlowActionInfo(action, actionInfo, classifierInfo)
+ if action.Type == utils.OUTPUT {
+ if out := action.GetOutput(); out != nil {
+ actionInfo[OUTPUT] = out.GetPort()
+ log.Debugw("action-type-output", log.Fields{"out_port": actionInfo[OUTPUT].(uint32)})
+ } else {
+ log.Error("Invalid output port in action")
+ return
+ }
+ } else if action.Type == utils.POP_VLAN {
+ actionInfo[PopVlan] = true
+ log.Debugw("action-type-pop-vlan", log.Fields{"in_port": classifierInfo[InPort].(uint32)})
+ } else if action.Type == utils.PUSH_VLAN {
+ if out := action.GetPush(); out != nil {
+ if tpid := out.GetEthertype(); tpid != 0x8100 {
+ log.Errorw("Invalid ethertype in push action", log.Fields{"ethertype": actionInfo[PushVlan].(int32)})
+ } else {
+ actionInfo[PushVlan] = true
+ actionInfo[TPID] = tpid
+ log.Debugw("action-type-push-vlan",
+ log.Fields{"push_tpid": actionInfo[TPID].(uint32), "in_port": classifierInfo[InPort].(uint32)})
+ }
+ }
+ } else if action.Type == utils.SET_FIELD {
+ if out := action.GetSetField(); out != nil {
+ if field := out.GetField(); field != nil {
+ if ofClass := field.GetOxmClass(); ofClass != ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
+ log.Errorw("Invalid openflow class", log.Fields{"class": ofClass})
+ return
+ }
+ /*log.Debugw("action-type-set-field",log.Fields{"field": field, "in_port": classifierInfo[IN_PORT].(uint32)})*/
+ if ofbField := field.GetOfbField(); ofbField != nil {
+ if fieldtype := ofbField.GetType(); fieldtype == ofp.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID {
+ if vlan := ofbField.GetVlanVid(); vlan != 0 {
+ actionInfo[VlanVid] = vlan & 0xfff
+ log.Debugw("action-set-vlan-vid", log.Fields{"actionInfo[VLAN_VID]": actionInfo[VlanVid].(uint32)})
+ } else {
+ log.Error("No Invalid vlan id in set vlan-vid action")
+ }
+ } else {
+ log.Errorw("unsupported-action-set-field-type", log.Fields{"type": fieldtype})
+ }
+ }
+ }
+ }
+ } else {
+ log.Errorw("Un supported action type", log.Fields{"type": action.Type})
+ return
+ }
}
/* Controller bound trap flows */
if isControllerFlow := IsControllerBoundFlow(actionInfo[OUTPUT].(uint32)); isControllerFlow {
@@ -1035,7 +1360,10 @@
log.Error("upstream pon-to-controller-flow, NO-inport-in-tunnelid")
return
}
- }
+ } /*else {
+ log.Debugw("Trap on NNI flow currently not supported", log.Fields{"flow": *flow})
+ return
+ }*/
} else {
log.Debug("Non-Controller flows, getting uniport from tunnelid")
// Downstream flow from NNI to PON port , Use tunnel ID as new OUT port / UNI port
@@ -1061,7 +1389,7 @@
}
}
log.Infow("Flow ports", log.Fields{"classifierInfo_inport": classifierInfo[InPort], "action_output": actionInfo[OUTPUT]})
- portNo, intfID, onuID, uniID := ExtractAccessFromFlow(classifierInfo[InPort].(uint32), actionInfo[OUTPUT].(uint32))
+ portNo, intfId, onuId, uniId := ExtractAccessFromFlow(classifierInfo[InPort].(uint32), actionInfo[OUTPUT].(uint32))
if ipProto, ok := classifierInfo[IPProto]; ok {
if ipProto.(uint32) == IPProtoDhcp {
if udpSrc, ok := classifierInfo[UDPSrc]; ok {
@@ -1073,102 +1401,40 @@
}
}
}
- f.divideAndAddFlow(intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow)
-}
-
-func (f *OpenOltFlowMgr) updateClassifierInfo(field *ofp.OfpOxmOfbField, classifierInfo map[string]interface{}) {
- if field.Type == utils.ETH_TYPE {
- classifierInfo[EthType] = field.GetEthType()
- log.Debug("field-type-eth-type", log.Fields{"classifierInfo[ETH_TYPE]": classifierInfo[EthType].(uint32)})
- } else if field.Type == utils.IP_PROTO {
- classifierInfo[IPProto] = field.GetIpProto()
- log.Debug("field-type-ip-proto", log.Fields{"classifierInfo[IP_PROTO]": classifierInfo[IPProto].(uint32)})
- } else if field.Type == utils.IN_PORT {
- classifierInfo[InPort] = field.GetPort()
- log.Debug("field-type-in-port", log.Fields{"classifierInfo[IN_PORT]": classifierInfo[InPort].(uint32)})
- } else if field.Type == utils.VLAN_VID {
- classifierInfo[VlanVid] = field.GetVlanVid()
- log.Debug("field-type-vlan-vid", log.Fields{"classifierInfo[VLAN_VID]": classifierInfo[VlanVid].(uint32)})
- } else if field.Type == utils.VLAN_PCP {
- classifierInfo[VlanPcp] = field.GetVlanPcp()
- log.Debug("field-type-vlan-pcp", log.Fields{"classifierInfo[VLAN_PCP]": classifierInfo[VlanPcp].(uint32)})
- } else if field.Type == utils.UDP_DST {
- classifierInfo[UDPDst] = field.GetUdpDst()
- log.Debug("field-type-udp-dst", log.Fields{"classifierInfo[UDP_DST]": classifierInfo[UDPDst].(uint32)})
- } else if field.Type == utils.UDP_SRC {
- classifierInfo[UDPSrc] = field.GetUdpSrc()
- log.Debug("field-type-udp-src", log.Fields{"classifierInfo[UDP_SRC]": classifierInfo[UDPSrc].(uint32)})
- } else if field.Type == utils.IPV4_DST {
- classifierInfo[Ipv4Dst] = field.GetIpv4Dst()
- log.Debug("field-type-ipv4-dst", log.Fields{"classifierInfo[IPV4_DST]": classifierInfo[Ipv4Dst].(uint32)})
- } else if field.Type == utils.IPV4_SRC {
- classifierInfo[Ipv4Src] = field.GetIpv4Src()
- log.Debug("field-type-ipv4-src", log.Fields{"classifierInfo[IPV4_SRC]": classifierInfo[Ipv4Src].(uint32)})
- } else if field.Type == utils.METADATA {
- classifierInfo[METADATA] = field.GetTableMetadata()
- log.Debug("field-type-metadata", log.Fields{"classifierInfo[METADATA]": classifierInfo[METADATA].(uint64)})
- } else if field.Type == utils.TUNNEL_ID {
- classifierInfo[TunnelID] = field.GetTunnelId()
- log.Debug("field-type-tunnelId", log.Fields{"classifierInfo[TUNNEL_ID]": classifierInfo[TunnelID].(uint64)})
- } else {
- log.Errorw("Un supported field type", log.Fields{"type": field.Type})
+ /* Metadata 8 bytes:
+ Most Significant 2 Bytes = Inner VLAN
+ Next 2 Bytes = Tech Profile ID(TPID)
+ Least Significant 4 Bytes = Port ID
+ Flow METADATA carries Tech-Profile (TP) ID and is mandatory in all
+ subscriber related flows.
+ */
+ metadata := utils.GetMetadataFromWriteMetadataAction(flow)
+ if metadata == 0 {
+ log.Error("Metadata is not present in flow which is mandatory")
return
}
-}
-
-func (f *OpenOltFlowMgr) updateFlowActionInfo(action *ofp.OfpAction, actionInfo map[string]interface{}, classifierInfo map[string]interface{}) {
- if action.Type == utils.OUTPUT {
- if out := action.GetOutput(); out != nil {
- actionInfo[OUTPUT] = out.GetPort()
- log.Debugw("action-type-output", log.Fields{"out_port": actionInfo[OUTPUT].(uint32)})
- } else {
- log.Error("Invalid output port in action")
- return
- }
- } else if action.Type == utils.POP_VLAN {
- actionInfo[PopVlan] = true
- log.Debugw("action-type-pop-vlan", log.Fields{"in_port": classifierInfo[InPort].(uint32)})
- } else if action.Type == utils.PUSH_VLAN {
- if out := action.GetPush(); out != nil {
- if tpid := out.GetEthertype(); tpid != 0x8100 {
- log.Errorw("Invalid ethertype in push action", log.Fields{"ethertype": actionInfo[PushVlan].(int32)})
- } else {
- actionInfo[PushVlan] = true
- actionInfo[TPID] = tpid
- log.Debugw("action-type-push-vlan",
- log.Fields{"push_tpid": actionInfo[TPID].(uint32), "in_port": classifierInfo[InPort].(uint32)})
- }
- }
- } else if action.Type == utils.SET_FIELD {
- if out := action.GetSetField(); out != nil {
- if field := out.GetField(); field != nil {
- if ofClass := field.GetOxmClass(); ofClass != ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
- log.Errorw("Invalid openflow class", log.Fields{"class": ofClass})
- return
- }
- /*log.Debugw("action-type-set-field",log.Fields{"field": field, "in_port": classifierInfo[IN_PORT].(uint32)})*/
- if ofbField := field.GetOfbField(); ofbField != nil {
- if fieldtype := ofbField.GetType(); fieldtype == ofp.OxmOfbFieldTypes_OFPXMT_OFB_VLAN_VID {
- if vlan := ofbField.GetVlanVid(); vlan != 0 {
- actionInfo[VlanVid] = vlan & 0xfff
- log.Debugw("action-set-vlan-vid", log.Fields{"actionInfo[VLAN_VID]": actionInfo[VlanVid].(uint32)})
- } else {
- log.Error("No Invalid vlan id in set vlan-vid action")
- }
- } else {
- log.Errorw("unsupported-action-set-field-type", log.Fields{"type": fieldtype})
- }
- }
- }
- }
- } else {
- log.Errorw("Un supported action type", log.Fields{"type": action.Type})
+ TpID := utils.GetTechProfileIDFromWriteMetaData(metadata)
+ kvstoreTpId := f.resourceMgr.GetTechProfileIdForOnu(intfId, onuId, uniId)
+ if kvstoreTpId == 0 {
+ log.Debugf("tpid-not-present-in-kvstore, using tp id %d from flow metadata", TpID)
+ } else if kvstoreTpId != uint32(TpID) {
+ log.Error(" Tech-profile-updates-not-supported", log.Fields{"Tpid-in-flow": TpID, "kvstore-TpId": kvstoreTpId})
return
}
+ log.Debugw("TPID for this subcriber", log.Fields{"TpId": TpID, "pon": intfId, "onuId": onuId, "uniId": uniId})
+ if IsUpstream(actionInfo[OUTPUT].(uint32)) {
+ UsMeterID = utils.GetMeterIdFromFlow(flow)
+ log.Debugw("Upstream-flow-meter-id", log.Fields{"UsMeterID": UsMeterID})
+ } else {
+ DsMeterID = utils.GetMeterIdFromFlow(flow)
+ log.Debugw("Downstream-flow-meter-id", log.Fields{"DsMeterID": DsMeterID})
+
+ }
+ f.divideAndAddFlow(intfId, onuId, uniId, portNo, classifierInfo, actionInfo, flow, uint32(TpID), UsMeterID, DsMeterID, flowMetadata)
}
//sendTPDownloadMsgToChild send payload
-func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(intfID uint32, onuID uint32, uniID uint32, uni string) error {
+func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(intfID uint32, onuID uint32, uniID uint32, uni string, TpID uint32) error {
onuDevice, err := f.getOnuChildDevice(intfID, onuID)
if err != nil {
@@ -1177,7 +1443,7 @@
}
log.Debugw("Got child device from OLT device handler", log.Fields{"device": *onuDevice})
- tpPath := f.getTPpath(intfID, uni)
+ tpPath := f.getTPpath(intfID, uni, TpID)
tpDownloadMsg := &ic.InterAdapterTechProfileDownloadMessage{UniId: uniID, Path: tpPath}
log.Infow("Sending Load-tech-profile-request-to-brcm-onu-adapter", log.Fields{"msg": *tpDownloadMsg})
sendErr := f.deviceHandler.AdapterProxy.SendInterAdapterMessage(context.Background(),
@@ -1280,6 +1546,31 @@
return gemPortID, err
}
+func installFlowOnAllGemports(
+ f1 func(intfId uint32, onuId uint32, uniId uint32,
+ portNo uint32, classifier map[string]interface{}, action map[string]interface{},
+ logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32),
+ f2 func(intfId uint32, onuId uint32, uniId uint32, portNo uint32,
+ logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32, vlanId uint32),
+ args map[string]uint32,
+ classifier map[string]interface{}, action map[string]interface{},
+ logicalFlow *ofp.OfpFlowStats,
+ gemPorts []uint32,
+ FlowType string,
+ vlanId ...uint32) {
+ log.Debugw("Installing flow on all GEM ports", log.Fields{"FlowType": FlowType, "gemPorts": gemPorts, "vlan": vlanId})
+ for _, gemPortId := range gemPorts {
+ if FlowType == HsiaFlow || FlowType == DhcpFlow {
+ f1(args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortId)
+ } else if FlowType == EapolFlow {
+ f2(args["intfId"], args["onuId"], args["uniId"], args["portNo"], logicalFlow, args["allocId"], gemPortId, vlanId[0])
+ } else {
+ log.Errorw("Unrecognized Flow Type", log.Fields{"FlowType": FlowType})
+ return
+ }
+ }
+}
+
func (f *OpenOltFlowMgr) addDHCPTrapFlowOnNNI(logicalFlow *ofp.OfpFlowStats, classifier map[string]interface{}, portNo uint32) {
log.Debug("Adding trap-dhcp-of-nni-flow")
action := make(map[string]interface{})
@@ -1306,7 +1597,7 @@
log.Debug("Flow-exists--not-re-adding")
return
}
- flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), uint32(onuID), uint32(uniID), flowStoreCookie, "")
+ flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), uint32(onuID), uint32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
if err != nil {
log.Errorw("Flow id unavailable for DHCP traponNNI flow", log.Fields{"error": err})
return
diff --git a/adaptercore/resourcemanager/resourcemanager.go b/adaptercore/resourcemanager/resourcemanager.go
index b7d7737..dc5300a 100755
--- a/adaptercore/resourcemanager/resourcemanager.go
+++ b/adaptercore/resourcemanager/resourcemanager.go
@@ -28,6 +28,7 @@
ponrmgr "github.com/opencord/voltha-go/common/ponresourcemanager"
"github.com/opencord/voltha-go/db/kvstore"
"github.com/opencord/voltha-go/db/model"
+ ofp "github.com/opencord/voltha-protos/go/openflow_13"
"github.com/opencord/voltha-protos/go/openolt"
)
@@ -37,6 +38,8 @@
// BasePathKvStore - service/voltha/openolt/<device_id>
const BasePathKvStore = "service/voltha/openolt/{%s}"
+const TP_ID_PATH_SUFFIX = "tp_id/{%d,%d,%d}" // tp_id/<(pon_id, onu_id, uni_id)>
+const METER_ID_PATH_SUFFIX = "meter_id/{%d,%d,%d}/{%s}" // meter_id/<(pon_id, onu_id, uni_id)>/<direction>
// FlowInfo holds the flow information
type FlowInfo struct {
Flow *openolt.Flow
@@ -58,7 +61,7 @@
ResourceMgrs map[uint32]*ponrmgr.PONResourceManager
}
-func newKVClient(storeType, address string, timeout uint32) (kvstore.Client, error) {
+func newKVClient(storeType string, address string, timeout uint32) (kvstore.Client, error) {
log.Infow("kv-store-type", log.Fields{"store": storeType})
switch storeType {
case "consul":
@@ -413,8 +416,9 @@
// GetFlowID return flow ID for a given pon interface id, onu id and uni id
func (RsrcMgr *OpenOltResourceMgr) GetFlowID(ponIntfID uint32, ONUID uint32, uniID uint32,
+ gemportID uint32,
flowStoreCookie uint64,
- flowCategory string) (uint32, error) {
+ flowCategory string, vlanPcp ...uint32) (uint32, error) {
var err error
FlowPath := fmt.Sprintf("%d,%d,%d", ponIntfID, ONUID, uniID)
@@ -425,13 +429,18 @@
FlowInfo := RsrcMgr.GetFlowIDInfo(ponIntfID, ONUID, uniID, uint32(flowID))
if FlowInfo != nil {
for _, Info := range *FlowInfo {
- if flowCategory != "" && Info.FlowCategory == flowCategory {
- log.Debug("Found flow matching with flow category", log.Fields{"flowId": flowID, "flowCategory": flowCategory})
- return flowID, nil
+ if int32(gemportID) == Info.Flow.GemportId && flowCategory != "" && Info.FlowCategory == flowCategory {
+ log.Debug("Found flow matching with flow catagory", log.Fields{"flowId": flowID, "FlowCategory": flowCategory})
+ if Info.FlowCategory == "HSIA_FLOW" && Info.Flow.Classifier.OPbits == vlanPcp[0] {
+ log.Debug("Found matching vlan pcp ", log.Fields{"flowId": flowID, "Vlanpcp": vlanPcp[0]})
+ return flowID, nil
+ }
}
- if flowStoreCookie != 0 && Info.FlowStoreCookie == flowStoreCookie {
- log.Debug("Found flow matching with flowStore cookie", log.Fields{"flowId": flowID, "flowStoreCookie": flowStoreCookie})
- return flowID, nil
+ if int32(gemportID) == Info.Flow.GemportId && flowStoreCookie != 0 && Info.FlowStoreCookie == flowStoreCookie {
+ if flowCategory != "" && Info.FlowCategory == flowCategory {
+ log.Debug("Found flow matching with flow catagory", log.Fields{"flowId": flowID, "FlowCategory": flowCategory})
+ return flowID, nil
+ }
}
}
}
@@ -694,3 +703,109 @@
}
return false
}
+
+func (RMgr *OpenOltResourceMgr) GetTechProfileIdForOnu(IntfId uint32, OnuId uint32, UniId uint32) uint32 {
+ Path := fmt.Sprintf(TP_ID_PATH_SUFFIX, IntfId, OnuId, UniId)
+ var Data uint32
+ Value, err := RMgr.KVStore.Get(Path)
+ if err == nil {
+ if Value != nil {
+ Val, err := kvstore.ToByte(Value.Value)
+ if err != nil {
+ log.Errorw("Failed to convert into byte array", log.Fields{"error": err})
+ return Data
+ }
+ if err = json.Unmarshal(Val, &Data); err != nil {
+ log.Error("Failed to unmarshal", log.Fields{"error": err})
+ return Data
+ }
+ }
+ } else {
+ log.Errorf("Failed to get TP id from kvstore for path %s", Path)
+ }
+ log.Debugf("Getting TP id %d from path %s", Data, Path)
+ return Data
+
+}
+
+func (RMgr *OpenOltResourceMgr) RemoveTechProfileIdForOnu(IntfId uint32, OnuId uint32, UniId uint32) error {
+ IntfOnuUniId := fmt.Sprintf(TP_ID_PATH_SUFFIX, IntfId, OnuId, UniId)
+ if err := RMgr.KVStore.Delete(IntfOnuUniId); err != nil {
+ log.Error("Failed to delete techprofile id resource %s in KV store", IntfOnuUniId)
+ return err
+ }
+ return nil
+}
+
+func (RMgr *OpenOltResourceMgr) UpdateTechProfileIdForOnu(IntfId uint32, OnuId uint32,
+ UniId uint32, TpId uint32) error {
+ var Value []byte
+ var err error
+
+ IntfOnuUniId := fmt.Sprintf(TP_ID_PATH_SUFFIX, IntfId, OnuId, UniId)
+ log.Debugf("updating tp id %d on path %s", TpId, IntfOnuUniId)
+ Value, err = json.Marshal(TpId)
+ if err != nil {
+ log.Error("failed to Marshal")
+ return err
+ }
+ if err = RMgr.KVStore.Put(IntfOnuUniId, Value); err != nil {
+ log.Errorf("Failed to update resource %s", IntfOnuUniId)
+ return err
+ }
+ return err
+}
+
+func (RMgr *OpenOltResourceMgr) UpdateMeterIdForOnu(Direction string, IntfId uint32, OnuId uint32,
+ UniId uint32, MeterConfig *ofp.OfpMeterConfig) error {
+ var Value []byte
+ var err error
+
+ IntfOnuUniId := fmt.Sprintf(METER_ID_PATH_SUFFIX, IntfId, OnuId, UniId, Direction)
+ Value, err = json.Marshal(*MeterConfig)
+ if err != nil {
+ log.Error("failed to Marshal meter config")
+ return err
+ }
+ if err = RMgr.KVStore.Put(IntfOnuUniId, Value); err != nil {
+ log.Errorf("Failed to store meter into KV store %s", IntfOnuUniId)
+ return err
+ }
+ return err
+}
+
+func (RMgr *OpenOltResourceMgr) GetMeterIdForOnu(Direction string, IntfId uint32, OnuId uint32, UniId uint32) (*ofp.OfpMeterConfig, error) {
+ Path := fmt.Sprintf(METER_ID_PATH_SUFFIX, IntfId, OnuId, UniId, Direction)
+ var meterConfig ofp.OfpMeterConfig
+ Value, err := RMgr.KVStore.Get(Path)
+ if err == nil {
+ if Value != nil {
+ log.Debug("Found meter in KV store", log.Fields{"Direction": Direction})
+ Val, err := kvstore.ToByte(Value.Value)
+ if err != nil {
+ log.Errorw("Failed to convert into byte array", log.Fields{"error": err})
+ return nil, err
+ }
+ if err = json.Unmarshal(Val, &meterConfig); err != nil {
+ log.Error("Failed to unmarshal meterconfig", log.Fields{"error": err})
+ return nil, err
+ }
+ } else {
+ log.Debug("meter-does-not-exists-in-KVStore")
+ return nil, err
+ }
+ } else {
+ log.Errorf("Failed to get Meter config from kvstore for path %s", Path)
+
+ }
+ return &meterConfig, err
+}
+
+func (RMgr *OpenOltResourceMgr) RemoveMeterIdForOnu(Direction string, IntfId uint32, OnuId uint32, UniId uint32) error {
+ Path := fmt.Sprintf(METER_ID_PATH_SUFFIX, IntfId, OnuId, UniId, Direction)
+ if err := RMgr.KVStore.Delete(Path); err != nil {
+ log.Errorf("Failed to delete meter id %s from kvstore ", Path)
+ return err
+ }
+ return nil
+}