VOL-2419: Add DHCP and HSIA flow add support for ATT worflow
VOL-2422: Integrate DT workflow with HSIA flow add support
Change-Id: Id77fbf6adecea759a0ef982399c30b4a4b88593b
diff --git a/core/att_workflow.go b/core/att_workflow.go
index f20d866..0bc0f69 100644
--- a/core/att_workflow.go
+++ b/core/att_workflow.go
@@ -18,6 +18,7 @@
import (
"errors"
+
"github.com/opencord/openolt-scale-tester/config"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
"github.com/opencord/voltha-lib-go/v2/pkg/ponresourcemanager"
@@ -36,7 +37,8 @@
type AttWorkFlow struct {
}
-func ProvisionAttNniTrapFlow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig, rsrMgr *OpenOltResourceMgr) error {
+func ProvisionAttNniTrapFlow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig,
+ rsrMgr *OpenOltResourceMgr) error {
var flowID []uint32
var err error
@@ -64,7 +66,8 @@
if err != nil {
log.Errorw("Failed to Add flow to device", log.Fields{"err": err, "deviceFlow": flow})
- rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(uint32(config.NniIntfID), ponresourcemanager.FLOW_ID, flowID)
+ rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(uint32(config.NniIntfID),
+ ponresourcemanager.FLOW_ID, flowID)
return err
}
log.Debugw("Flow added to device successfully ", log.Fields{"flow": flow})
@@ -72,34 +75,6 @@
return nil
}
-func getTrafficSched(subs *Subscriber, direction tp_pb.Direction) []*tp_pb.TrafficScheduler {
- var SchedCfg *tp_pb.SchedulerConfig
-
- if direction == tp_pb.Direction_DOWNSTREAM {
- SchedCfg = subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
- GetDsScheduler(subs.TpInstance[subs.TestConfig.TpIDList[0]])
-
- } else {
- SchedCfg = subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
- GetUsScheduler(subs.TpInstance[subs.TestConfig.TpIDList[0]])
- }
-
- // hard-code for now
- cir := 16000
- cbs := 5000
- eir := 16000
- ebs := 5000
- pir := cir + eir
- pbs := cbs + ebs
-
- TrafficShaping := &tp_pb.TrafficShapingInfo{Cir: uint32(cir), Cbs: uint32(cbs), Pir: uint32(pir), Pbs: uint32(pbs)}
-
- TrafficSched := []*tp_pb.TrafficScheduler{subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
- GetTrafficScheduler(subs.TpInstance[subs.TestConfig.TpIDList[0]], SchedCfg, TrafficShaping)}
-
- return TrafficSched
-}
-
func (att AttWorkFlow) ProvisionScheds(subs *Subscriber) error {
var trafficSched []*tp_pb.TrafficScheduler
@@ -138,14 +113,6 @@
return nil
}
-func getTrafficQueues(subs *Subscriber, direction tp_pb.Direction) []*tp_pb.TrafficQueue {
-
- trafficQueues := subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
- GetTrafficQueues(subs.TpInstance[subs.TestConfig.TpIDList[0]], direction)
-
- return trafficQueues
-}
-
func (att AttWorkFlow) ProvisionQueues(subs *Subscriber) error {
log.Info("provisioning-queues")
@@ -188,14 +155,11 @@
}
func (att AttWorkFlow) ProvisionEapFlow(subs *Subscriber) error {
- log.Info("provisioning-eap--att")
-
var err error
var flowID []uint32
var gemPortIDs []uint32
var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
-
for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
gemPortIDs = append(gemPortIDs, gem.GemportID)
}
@@ -203,40 +167,40 @@
for _, gemID := range gemPortIDs {
if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
ponresourcemanager.FLOW_ID, 1); err != nil {
- return errors.New(ReasonCodeToReasonString(FLOW_ADD_FAILED))
+ return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+ } else {
+ if err := AddFlow(subs, EapolFlow, Upstream, flowID[0], allocID, gemID); err != nil {
+ subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
+ ponresourcemanager.FLOW_ID, flowID)
+ return err
+ }
}
- flowClassifier := &oop.Classifier{EthType: 34958, OVid: subs.Ctag, PktTagType: "single_tag"}
- actionCmd := &oop.ActionCmd{TrapToHost: true}
- actionInfo := &oop.Action{Cmd: actionCmd}
-
- flow := oop.Flow{AccessIntfId: int32(subs.PonIntf), OnuId: int32(subs.OnuID),
- UniId: int32(subs.UniID), FlowId: flowID[0],
- FlowType: "upstream", AllocId: int32(allocID), GemportId: int32(gemID),
- Classifier: flowClassifier, Action: actionInfo,
- Priority: 1000, PortNo: subs.UniPortNo}
-
- _, err = subs.OpenOltClient.FlowAdd(context.Background(), &flow)
-
- st, _ := status.FromError(err)
- if st.Code() == codes.AlreadyExists {
- log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
- continue
- }
-
- if err != nil {
- log.Errorw("Failed to Add flow to device", log.Fields{"err": err, "deviceFlow": flow})
- subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
- ponresourcemanager.FLOW_ID, flowID)
- return errors.New(ReasonCodeToReasonString(FLOW_ADD_FAILED))
- }
- log.Debugw("Flow added to device successfully ", log.Fields{"flow": flow})
}
return nil
}
func (att AttWorkFlow) ProvisionDhcpFlow(subs *Subscriber) error {
- // TODO
- log.Info("provisioning-dhcp")
+ var err error
+ var flowID []uint32
+ var gemPortIDs []uint32
+
+ var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
+ for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
+ gemPortIDs = append(gemPortIDs, gem.GemportID)
+ }
+
+ for _, gemID := range gemPortIDs {
+ if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
+ ponresourcemanager.FLOW_ID, 1); err != nil {
+ return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+ } else {
+ if err := AddFlow(subs, DhcpFlow, Upstream, flowID[0], allocID, gemID); err != nil {
+ subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
+ ponresourcemanager.FLOW_ID, flowID)
+ return err
+ }
+ }
+ }
return nil
}
@@ -246,7 +210,29 @@
}
func (att AttWorkFlow) ProvisionHsiaFlow(subs *Subscriber) error {
- // TODO
- log.Info("provisioning-hsia")
+ var err error
+ var flowID []uint32
+ var gemPortIDs []uint32
+
+ var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
+ for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
+ gemPortIDs = append(gemPortIDs, gem.GemportID)
+ }
+
+ for _, gemID := range gemPortIDs {
+ if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
+ ponresourcemanager.FLOW_ID, 1); err != nil {
+ return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+ } else {
+ if err := AddFlow(subs, HsiaFlow, Upstream, flowID[0], allocID, gemID); err != nil {
+ subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
+ ponresourcemanager.FLOW_ID, flowID)
+ return err
+ }
+ if err := AddFlow(subs, HsiaFlow, Downstream, flowID[0], allocID, gemID); err != nil {
+ return err
+ }
+ }
+ }
return nil
}
diff --git a/core/dt_workflow.go b/core/dt_workflow.go
new file mode 100644
index 0000000..2b06b71
--- /dev/null
+++ b/core/dt_workflow.go
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package core
+
+import (
+ "errors"
+
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-lib-go/v2/pkg/ponresourcemanager"
+ tp_pb "github.com/opencord/voltha-protos/v2/go/tech_profile"
+ "golang.org/x/net/context"
+)
+
+func init() {
+ _, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
+}
+
+// A dummy struct to comply with the WorkFlow interface.
+type DtWorkFlow struct {
+}
+
+func (dt DtWorkFlow) ProvisionScheds(subs *Subscriber) error {
+ var trafficSched []*tp_pb.TrafficScheduler
+
+ log.Info("provisioning-scheds")
+
+ if trafficSched = getTrafficSched(subs, tp_pb.Direction_DOWNSTREAM); trafficSched == nil {
+ log.Error("ds-traffic-sched-is-nil")
+ return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
+ }
+
+ log.Debugw("Sending Traffic scheduler create to device",
+ log.Fields{"Direction": tp_pb.Direction_DOWNSTREAM, "TrafficScheds": trafficSched})
+ if _, err := subs.OpenOltClient.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
+ IntfId: subs.PonIntf, OnuId: subs.OnuID,
+ UniId: subs.UniID, PortNo: subs.UniPortNo,
+ TrafficScheds: trafficSched}); err != nil {
+ log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
+ return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
+ }
+
+ if trafficSched = getTrafficSched(subs, tp_pb.Direction_UPSTREAM); trafficSched == nil {
+ log.Error("us-traffic-sched-is-nil")
+ return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
+ }
+
+ log.Debugw("Sending Traffic scheduler create to device",
+ log.Fields{"Direction": tp_pb.Direction_UPSTREAM, "TrafficScheds": trafficSched})
+ if _, err := subs.OpenOltClient.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
+ IntfId: subs.PonIntf, OnuId: subs.OnuID,
+ UniId: subs.UniID, PortNo: subs.UniPortNo,
+ TrafficScheds: trafficSched}); err != nil {
+ log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
+ return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
+ }
+
+ return nil
+}
+
+func (dt DtWorkFlow) ProvisionQueues(subs *Subscriber) error {
+ log.Info("provisioning-queues")
+
+ var trafficQueues []*tp_pb.TrafficQueue
+ if trafficQueues = getTrafficQueues(subs, tp_pb.Direction_DOWNSTREAM); trafficQueues == nil {
+ log.Error("Failed to create traffic queues")
+ return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
+ }
+
+ // On receiving the CreateTrafficQueues request, the driver should create corresponding
+ // downstream queues.
+ log.Debugw("Sending Traffic Queues create to device",
+ log.Fields{"Direction": tp_pb.Direction_DOWNSTREAM, "TrafficQueues": trafficQueues})
+ if _, err := subs.OpenOltClient.CreateTrafficQueues(context.Background(),
+ &tp_pb.TrafficQueues{IntfId: subs.PonIntf, OnuId: subs.OnuID,
+ UniId: subs.UniID, PortNo: subs.UniPortNo,
+ TrafficQueues: trafficQueues}); err != nil {
+ log.Errorw("Failed to create traffic queues in device", log.Fields{"error": err})
+ return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
+ }
+
+ if trafficQueues = getTrafficQueues(subs, tp_pb.Direction_UPSTREAM); trafficQueues == nil {
+ log.Error("Failed to create traffic queues")
+ return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
+ }
+
+ // On receiving the CreateTrafficQueues request, the driver should create corresponding
+ // upstream queues.
+ log.Debugw("Sending Traffic Queues create to device",
+ log.Fields{"Direction": tp_pb.Direction_UPSTREAM, "TrafficQueues": trafficQueues})
+ if _, err := subs.OpenOltClient.CreateTrafficQueues(context.Background(),
+ &tp_pb.TrafficQueues{IntfId: subs.PonIntf, OnuId: subs.OnuID,
+ UniId: subs.UniID, PortNo: subs.UniPortNo,
+ TrafficQueues: trafficQueues}); err != nil {
+ log.Errorw("Failed to create traffic queues in device", log.Fields{"error": err})
+ return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
+ }
+
+ return nil
+}
+
+func (dt DtWorkFlow) ProvisionEapFlow(subs *Subscriber) error {
+ log.Info("dt-workflow-does-not-require-eap-support--nothing-to-do")
+ return nil
+}
+
+func (dt DtWorkFlow) ProvisionDhcpFlow(subs *Subscriber) error {
+ log.Info("dt-workflow-does-not-require-dhcp-support--nothing-to-do")
+ return nil
+}
+
+func (dt DtWorkFlow) ProvisionIgmpFlow(subs *Subscriber) error {
+ log.Info("dt-workflow-does-not-support-igmp-yet--nothing-to-do")
+ return nil
+}
+
+func (dt DtWorkFlow) ProvisionHsiaFlow(subs *Subscriber) error {
+ var err error
+ var flowID []uint32
+ var gemPortIDs []uint32
+
+ var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
+ for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
+ gemPortIDs = append(gemPortIDs, gem.GemportID)
+ }
+
+ for _, gemID := range gemPortIDs {
+ if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
+ ponresourcemanager.FLOW_ID, 1); err != nil {
+ return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+ } else {
+ if err := AddFlow(subs, HsiaFlow, Upstream, flowID[0], allocID, gemID); err != nil {
+ subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
+ ponresourcemanager.FLOW_ID, flowID)
+ return err
+ }
+ if err := AddFlow(subs, HsiaFlow, Downstream, flowID[0], allocID, gemID); err != nil {
+ return err
+ }
+ }
+ }
+ return nil
+}
diff --git a/core/olt_manager.go b/core/olt_manager.go
index 122c18d..efd1489 100644
--- a/core/olt_manager.go
+++ b/core/olt_manager.go
@@ -43,6 +43,7 @@
const (
ReasonOk = "OK"
TechProfileKVPath = "service/voltha/technology_profiles/%s/%d" // service/voltha/technology_profiles/xgspon/<tech_profile_tableID>
+ DTWorkFlow = "DT"
)
type OnuDeviceKey struct {
@@ -175,8 +176,10 @@
go om.readIndications()
// Provision OLT NNI Trap flows as needed by the Workflow
- if err = ProvisionNniTrapFlow(om.openOltClient, om.testConfig, om.rsrMgr); err != nil {
- log.Error("failed-to-add-nni-trap-flow", log.Fields{"err": err})
+ if om.testConfig.WorkflowName != DTWorkFlow {
+ if err = ProvisionNniTrapFlow(om.openOltClient, om.testConfig, om.rsrMgr); err != nil {
+ log.Error("failed-to-add-nni-trap-flow", log.Fields{"err": err})
+ }
}
// Provision ONUs one by one
diff --git a/core/onu_manager.go b/core/onu_manager.go
index ad8aa8b..bd1265c 100644
--- a/core/onu_manager.go
+++ b/core/onu_manager.go
@@ -62,7 +62,7 @@
PonIntf: onu.PonIntf,
UniPortNo: MkUniPortNum(onu.PonIntf, onu.OnuID, uint32(subs)),
Ctag: GetCtag(onu.testConfig.WorkflowName, onu.PonIntf),
- Stag: GetStag(onu.testConfig.WorkflowName, onu.PonIntf),
+ Stag: GetStag(onu.testConfig.WorkflowName, onu.PonIntf, onu.OnuID, uint32(subs)),
OpenOltClient: onu.openOltClient,
TestConfig: onu.testConfig,
RsrMgr: onu.rsrMgr,
diff --git a/core/subscriber_manager.go b/core/subscriber_manager.go
index 717c323..5610d2c 100644
--- a/core/subscriber_manager.go
+++ b/core/subscriber_manager.go
@@ -31,6 +31,7 @@
const (
SUBSCRIBER_PROVISION_SUCCESS = iota
TP_INSTANCE_CREATION_FAILED
+ FLOW_ID_GENERATION_FAILED
FLOW_ADD_FAILED
SCHED_CREATION_FAILED
QUEUE_CREATION_FAILED
@@ -43,6 +44,7 @@
var Reason = [...]string{
"SUBSCRIBER_PROVISION_SUCCESS",
"TP_INSTANCE_CREATION_FAILED",
+ "FLOW_ID_GENERATION_FAILED",
"FLOW_ADD_FAILED",
"SCHED_CREATION_FAILED",
"QUEUE_CREATION_FAILED",
diff --git a/core/utils.go b/core/utils.go
index 01ff500..a99096e 100644
--- a/core/utils.go
+++ b/core/utils.go
@@ -18,15 +18,25 @@
import (
"fmt"
+
"github.com/opencord/voltha-lib-go/v2/pkg/log"
"github.com/opencord/voltha-protos/v2/go/openolt"
)
+type DtStagKey struct {
+ ponIntf, onuID, uniID uint32
+}
+
+var currDtStag uint32
+var DtStag map[DtStagKey]uint32
+var DtCtag map[uint32]uint32
var AttCtag map[uint32]uint32
func init() {
_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
AttCtag = make(map[uint32]uint32)
+ DtCtag = make(map[uint32]uint32)
+ DtStag = make(map[DtStagKey]uint32)
}
const (
@@ -75,26 +85,55 @@
return AttCtag[ponIntf]
}
+func GetDtCtag(ponIntf uint32) uint32 {
+ var currCtag uint32
+ var ok bool
+ if currCtag, ok = DtCtag[ponIntf]; !ok {
+ // Start with ctag 1
+ DtCtag[ponIntf] = 1
+ return DtCtag[ponIntf]
+ }
+ DtCtag[ponIntf] = currCtag + 1
+ return DtCtag[ponIntf]
+}
+
func GetAttStag(ponIntf uint32) uint32 {
// start with stag 2
return ponIntf + 2
}
+func GetDtStag(ponIntf uint32, onuID uint32, uniID uint32) uint32 {
+ // Dt workflow requires unique stag for each subscriber
+ key := DtStagKey{ponIntf: ponIntf, onuID: onuID, uniID: uniID}
+
+ if value, ok := DtStag[key]; ok {
+ return value
+ } else {
+ DtStag[key] = currDtStag + 1
+ currDtStag = DtStag[key]
+ }
+ return DtStag[key]
+}
+
// TODO: More workflow support to be added here
func GetCtag(workFlowName string, ponIntf uint32) uint32 {
switch workFlowName {
case "ATT":
return GetAttCtag(ponIntf)
+ case "DT":
+ return GetDtCtag(ponIntf)
default:
log.Errorw("unknown-workflowname", log.Fields{"workflow": workFlowName})
}
return 0
}
-func GetStag(workFlowName string, ponIntf uint32) uint32 {
+func GetStag(workFlowName string, ponIntf uint32, onuID uint32, uniID uint32) uint32 {
switch workFlowName {
case "ATT":
return GetAttStag(ponIntf)
+ case "DT":
+ return GetDtStag(ponIntf, onuID, uniID)
default:
log.Errorw("unknown-workflowname", log.Fields{"workflow": workFlowName})
}
diff --git a/core/workflow_manager.go b/core/workflow_manager.go
index b8674ac..95233c1 100644
--- a/core/workflow_manager.go
+++ b/core/workflow_manager.go
@@ -79,6 +79,9 @@
case "ATT":
log.Info("chosen-att-workflow")
return AttWorkFlow{}
+ case "DT":
+ log.Info("chosen-dt-workflow")
+ return DtWorkFlow{}
// TODO: Add new workflow here
default:
log.Errorw("operator-workflow-not-supported-yet", log.Fields{"workflowName": subs.TestConfig.WorkflowName})
diff --git a/core/workflow_utils.go b/core/workflow_utils.go
new file mode 100644
index 0000000..3260816
--- /dev/null
+++ b/core/workflow_utils.go
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package core
+
+import (
+ "errors"
+
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ oop "github.com/opencord/voltha-protos/v2/go/openolt"
+ tp_pb "github.com/opencord/voltha-protos/v2/go/tech_profile"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+const (
+ //Constants utilised while forming HSIA Flow
+ HsiaFlow = "HSIA_FLOW"
+ DownStreamHsiaActionOVid = 4108 //Used in Downstream HSIA
+
+ //Constants utilised while forming DHCP Flow
+ DhcpFlow = "DHCP_FLOW"
+ IPv4EthType = 0x800 //2048
+ DhcpIPProto = 17
+ DhcpSrcPort = 68
+ DhcpDstPort = 67
+
+ //Constants utilised while forming EAPOL Flow
+ EapolFlow = "EAPOL_FLOW"
+ EapEthType = 0x888e //34958
+
+ //Direction constant
+ Upstream = "upstream"
+ Downstream = "downstream"
+
+ //PacketTagType constant
+ PacketTagType = "pkt_tag_type"
+ Untagged = "untagged"
+ SingleTag = "single_tag"
+ DoubleTag = "double_tag"
+)
+
+func getTrafficSched(subs *Subscriber, direction tp_pb.Direction) []*tp_pb.TrafficScheduler {
+ var SchedCfg *tp_pb.SchedulerConfig
+
+ if direction == tp_pb.Direction_DOWNSTREAM {
+ SchedCfg = subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
+ GetDsScheduler(subs.TpInstance[subs.TestConfig.TpIDList[0]])
+
+ } else {
+ SchedCfg = subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
+ GetUsScheduler(subs.TpInstance[subs.TestConfig.TpIDList[0]])
+ }
+
+ // hard-code for now
+ cir := 16000
+ cbs := 5000
+ eir := 16000
+ ebs := 5000
+ pir := cir + eir
+ pbs := cbs + ebs
+
+ TrafficShaping := &tp_pb.TrafficShapingInfo{Cir: uint32(cir), Cbs: uint32(cbs), Pir: uint32(pir), Pbs: uint32(pbs)}
+
+ TrafficSched := []*tp_pb.TrafficScheduler{subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
+ GetTrafficScheduler(subs.TpInstance[subs.TestConfig.TpIDList[0]], SchedCfg, TrafficShaping)}
+
+ return TrafficSched
+}
+
+func getTrafficQueues(subs *Subscriber, direction tp_pb.Direction) []*tp_pb.TrafficQueue {
+
+ trafficQueues := subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
+ GetTrafficQueues(subs.TpInstance[subs.TestConfig.TpIDList[0]], direction)
+
+ return trafficQueues
+}
+
+func FormatClassfierAction(flowType string, direction string, subs *Subscriber) (oop.Classifier, oop.Action) {
+ var flowClassifier oop.Classifier
+ var actionCmd oop.ActionCmd
+ var actionInfo oop.Action
+
+ if direction == Upstream {
+ switch flowType {
+ case EapolFlow:
+ flowClassifier.EthType = EapEthType
+ flowClassifier.OVid = subs.Ctag
+ flowClassifier.PktTagType = SingleTag
+ actionCmd.TrapToHost = true
+ actionInfo.Cmd = &actionCmd
+ case DhcpFlow:
+ flowClassifier.EthType = IPv4EthType
+ flowClassifier.IpProto = DhcpIPProto
+ flowClassifier.SrcPort = DhcpSrcPort
+ flowClassifier.DstPort = DhcpDstPort
+ flowClassifier.PktTagType = SingleTag
+ actionCmd.TrapToHost = true
+ actionInfo.Cmd = &actionCmd
+ case HsiaFlow:
+ flowClassifier.OVid = subs.Ctag
+ flowClassifier.PktTagType = SingleTag
+ actionCmd.AddOuterTag = true
+ actionInfo.Cmd = &actionCmd
+ actionInfo.OVid = subs.Stag
+ default:
+ log.Errorw("Unsupported flow type", log.Fields{"flowtype": flowType,
+ "direction": direction})
+ }
+ } else if direction == Downstream {
+ switch flowType {
+ case EapolFlow:
+ log.Errorw("Downstream EAP flows are not required instead controller "+
+ "packet outs EAP response directly to onu in downstream", log.Fields{"flowtype": flowType,
+ "direction": direction})
+ case DhcpFlow:
+ log.Errorw("Downstream DHCP flows are not required instead we have "+
+ "NNI trap flows already installed", log.Fields{"flowtype": flowType,
+ "direction": direction})
+ case HsiaFlow:
+ flowClassifier.OVid = subs.Stag
+ flowClassifier.IVid = subs.Ctag
+ flowClassifier.PktTagType = DoubleTag
+ actionCmd.RemoveOuterTag = true
+ actionInfo.Cmd = &actionCmd
+ actionInfo.OVid = DownStreamHsiaActionOVid
+ default:
+ log.Errorw("Unsupported flow type", log.Fields{"flowtype": flowType,
+ "direction": direction})
+ }
+ }
+ return flowClassifier, actionInfo
+}
+
+func AddFlow(subs *Subscriber, flowType string, direction string, flowID uint32,
+ allocID uint32, gemID uint32) error {
+ log.Infow("add-flow", log.Fields{"WorkFlow": subs.TestConfig.WorkflowName, "FlowType": flowType,
+ "direction": direction, "flowID": flowID})
+ var err error
+
+ flowClassifier, actionInfo := FormatClassfierAction(flowType, direction, subs)
+ flow := oop.Flow{AccessIntfId: int32(subs.PonIntf), OnuId: int32(subs.OnuID),
+ UniId: int32(subs.UniID), FlowId: flowID,
+ FlowType: direction, AllocId: int32(allocID), GemportId: int32(gemID),
+ Classifier: &flowClassifier, Action: &actionInfo,
+ Priority: 1000, PortNo: subs.UniPortNo}
+
+ _, err = subs.OpenOltClient.FlowAdd(context.Background(), &flow)
+
+ st, _ := status.FromError(err)
+ if st.Code() == codes.AlreadyExists {
+ log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+ return nil
+ }
+
+ if err != nil {
+ log.Errorw("Failed to Add flow to device", log.Fields{"err": err, "deviceFlow": flow})
+ return errors.New(ReasonCodeToReasonString(FLOW_ADD_FAILED))
+ }
+ log.Debugw("Flow added to device successfully ", log.Fields{"flow": flow})
+
+ return nil
+}