[VOL-2588] Simplify TT case
Change-Id: Ia22dbda21b0702ac0444a17ae3e5063c7723e395
diff --git a/core/att_workflow.go b/core/att_workflow.go
index 947a38d..cf5cbae 100644
--- a/core/att_workflow.go
+++ b/core/att_workflow.go
@@ -21,10 +21,10 @@
"strings"
"github.com/opencord/openolt-scale-tester/config"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- "github.com/opencord/voltha-lib-go/v2/pkg/ponresourcemanager"
- oop "github.com/opencord/voltha-protos/v2/go/openolt"
- tp_pb "github.com/opencord/voltha-protos/v2/go/tech_profile"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
+ oop "github.com/opencord/voltha-protos/v3/go/openolt"
+ tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -42,7 +42,7 @@
var flowID []uint32
var err error
- if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(uint32(config.NniIntfID),
+ if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(context.Background(), uint32(config.NniIntfID),
ponresourcemanager.FLOW_ID, 1); err != nil {
return err
}
@@ -67,7 +67,7 @@
if err != nil {
log.Errorw("Failed to Add DHCP IPv4 to device", log.Fields{"err": err, "deviceFlow": flow})
- rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(uint32(config.NniIntfID),
+ rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(context.Background(), uint32(config.NniIntfID),
ponresourcemanager.FLOW_ID, flowID)
return err
}
@@ -80,7 +80,7 @@
var flowID []uint32
var err error
- if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(uint32(config.NniIntfID),
+ if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(context.Background(), uint32(config.NniIntfID),
ponresourcemanager.FLOW_ID, 1); err != nil {
return err
}
@@ -105,7 +105,7 @@
if err != nil {
log.Errorw("Failed to Add DHCP IPV6 to device", log.Fields{"err": err, "deviceFlow": flow})
- rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(uint32(config.NniIntfID),
+ rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(context.Background(), uint32(config.NniIntfID),
ponresourcemanager.FLOW_ID, flowID)
return err
}
@@ -216,12 +216,12 @@
for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
if pbitSet == '1' {
pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
- if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
+ if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
ponresourcemanager.FLOW_ID, 1); err != nil {
return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
} else {
if err := AddFlow(subs, EapolFlow, Upstream, flowID[0], allocID, gemID, pcp); err != nil {
- subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
+ subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
ponresourcemanager.FLOW_ID, flowID)
return err
}
@@ -247,12 +247,12 @@
for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
if pbitSet == '1' {
pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
- if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
+ if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
ponresourcemanager.FLOW_ID, 1); err != nil {
return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
} else {
if err := AddFlow(subs, DhcpFlowIPV4, Upstream, flowID[0], allocID, gemID, pcp); err != nil {
- subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
+ subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
ponresourcemanager.FLOW_ID, flowID)
return err
}
@@ -278,12 +278,12 @@
for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
if pbitSet == '1' {
pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
- if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
+ if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
ponresourcemanager.FLOW_ID, 1); err != nil {
return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
} else {
if err := AddFlow(subs, DhcpFlowIPV6, Upstream, flowID[0], allocID, gemID, pcp); err != nil {
- subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
+ subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
ponresourcemanager.FLOW_ID, flowID)
return err
}
@@ -314,7 +314,7 @@
for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
if pbitSet == '1' {
pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
- if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
+ if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
ponresourcemanager.FLOW_ID, 1); err != nil {
return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
} else {
@@ -328,7 +328,7 @@
log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
}
if errUs != nil && errDs != nil {
- subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
+ subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
ponresourcemanager.FLOW_ID, flowID)
}
if errUs != nil || errDs != nil {
@@ -343,3 +343,23 @@
}
return nil
}
+
+func (att AttWorkFlow) ProvisionVoipFlow(subs *Subscriber) error {
+ log.Info("att-workflow-does-not-support-voip-yet--nothing-to-do")
+ return nil
+}
+
+func (att AttWorkFlow) ProvisionVodFlow(subs *Subscriber) error {
+ log.Info("att-workflow-does-not-support-vod-yet--nothing-to-do")
+ return nil
+}
+
+func (att AttWorkFlow) ProvisionMgmtFlow(subs *Subscriber) error {
+ log.Info("att-workflow-does-not-support-mgmt-yet--nothing-to-do")
+ return nil
+}
+
+func (att AttWorkFlow) ProvisionMulticastFlow(subs *Subscriber) error {
+ log.Info("att-workflow-does-not-support-multicast-yet--nothing-to-do")
+ return nil
+}
diff --git a/core/dt_workflow.go b/core/dt_workflow.go
index b2ad300..a06b29b 100644
--- a/core/dt_workflow.go
+++ b/core/dt_workflow.go
@@ -21,10 +21,10 @@
"strings"
"github.com/opencord/openolt-scale-tester/config"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- "github.com/opencord/voltha-lib-go/v2/pkg/ponresourcemanager"
- oop "github.com/opencord/voltha-protos/v2/go/openolt"
- tp_pb "github.com/opencord/voltha-protos/v2/go/tech_profile"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
+ oop "github.com/opencord/voltha-protos/v3/go/openolt"
+ tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
"golang.org/x/net/context"
)
@@ -156,12 +156,12 @@
for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
if pbitSet == '1' {
pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
- if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(uint32(subs.PonIntf),
+ if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
ponresourcemanager.FLOW_ID, 1); err != nil {
return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
} else {
if err := AddFlow(subs, HsiaFlow, Upstream, flowID[0], allocID, gemID, pcp); err != nil {
- subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(uint32(subs.PonIntf),
+ subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
ponresourcemanager.FLOW_ID, flowID)
return err
}
@@ -174,3 +174,23 @@
}
return nil
}
+
+func (dt DtWorkFlow) ProvisionVoipFlow(subs *Subscriber) error {
+ log.Info("dt-workflow-does-not-support-voip-yet--nothing-to-do")
+ return nil
+}
+
+func (dt DtWorkFlow) ProvisionVodFlow(subs *Subscriber) error {
+ log.Info("dt-workflow-does-not-support-vod-yet--nothing-to-do")
+ return nil
+}
+
+func (dt DtWorkFlow) ProvisionMgmtFlow(subs *Subscriber) error {
+ log.Info("dt-workflow-does-not-support-mgmt-yet--nothing-to-do")
+ return nil
+}
+
+func (dt DtWorkFlow) ProvisionMulticastFlow(subs *Subscriber) error {
+ log.Info("dt-workflow-does-not-support-multicast-yet--nothing-to-do")
+ return nil
+}
diff --git a/core/olt_manager.go b/core/olt_manager.go
index 6d18b7e..78dd97e 100644
--- a/core/olt_manager.go
+++ b/core/olt_manager.go
@@ -22,15 +22,6 @@
"encoding/json"
"errors"
"fmt"
- "github.com/cenkalti/backoff/v3"
- "github.com/opencord/openolt-scale-tester/config"
- "github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- "github.com/opencord/voltha-lib-go/v2/pkg/techprofile"
- oop "github.com/opencord/voltha-protos/v2/go/openolt"
- "google.golang.org/grpc"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
"io"
"io/ioutil"
"os"
@@ -38,6 +29,16 @@
"sync"
"syscall"
"time"
+
+ "github.com/cenkalti/backoff/v3"
+ "github.com/opencord/openolt-scale-tester/config"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-lib-go/v3/pkg/techprofile"
+ oop "github.com/opencord/voltha-protos/v3/go/openolt"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
)
const (
@@ -114,12 +115,12 @@
}
kvPath := fmt.Sprintf(TechProfileKVPath, om.deviceInfo.Technology, tpID)
tpJson, err := json.Marshal(tp)
- err = client.Put(kvPath, tpJson, 2)
+ err = client.Put(context.Background(), kvPath, tpJson)
if err != nil {
log.Fatalw("tp-put-to-etcd-failed", log.Fields{"tpPath": kvPath, "err": err})
}
// verify the PUT succeeded.
- kvResult, err := client.Get(kvPath, 2)
+ kvResult, err := client.Get(context.Background(), kvPath)
if kvResult == nil {
log.Fatal("tp-not-found-on-kv-after-load", log.Fields{"key": kvPath, "err": err})
} else {
@@ -406,7 +407,7 @@
log.Debugw("Received Omci indication ", log.Fields{"IntfId": omciInd.IntfId, "OnuId": omciInd.OnuId, "pkt": hex.EncodeToString(omciInd.Pkt)})
case *oop.Indication_PktInd:
pktInd := indication.GetPktInd()
- log.Infow("Received pakcet indication ", log.Fields{"PktInd": pktInd})
+ log.Infow("Received packet indication ", log.Fields{"PktInd": pktInd})
/*
case *oop.Indication_PortStats:
portStats := indication.GetPortStats()
diff --git a/core/onu_manager.go b/core/onu_manager.go
index bd1265c..6fa9201 100644
--- a/core/onu_manager.go
+++ b/core/onu_manager.go
@@ -17,11 +17,12 @@
package core
import (
- "github.com/opencord/openolt-scale-tester/config"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- oop "github.com/opencord/voltha-protos/v2/go/openolt"
"strconv"
"time"
+
+ "github.com/opencord/openolt-scale-tester/config"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ oop "github.com/opencord/voltha-protos/v3/go/openolt"
)
func init() {
@@ -72,7 +73,7 @@
log.Infow("subscriber-provision-started-from-onu-manager", log.Fields{"subsName": subsName})
// Start provisioning the subscriber
- go subs.Start(onuCh)
+ go subs.Start(onuCh, onu.testConfig.IsGroupTest)
// Wait for subscriber provision to complete
<-onuCh
diff --git a/core/resource_manager.go b/core/resource_manager.go
index 1bb5aaf..3be01bc 100644
--- a/core/resource_manager.go
+++ b/core/resource_manager.go
@@ -23,9 +23,10 @@
"strconv"
"strings"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- ponrmgr "github.com/opencord/voltha-lib-go/v2/pkg/ponresourcemanager"
- "github.com/opencord/voltha-protos/v2/go/openolt"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ ponrmgr "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
+ "github.com/opencord/voltha-protos/v3/go/openolt"
+ "golang.org/x/net/context"
)
func init() {
@@ -130,7 +131,7 @@
// After we have initialized resource ranges, initialize the
// resource pools accordingly.
for _, PONRMgr := range RsrcMgrsByTech {
- _ = PONRMgr.InitDeviceResourcePool()
+ _ = PONRMgr.InitDeviceResourcePool(context.Background())
}
log.Info("Initialization of resource manager success!")
return &ResourceMgr
@@ -147,7 +148,7 @@
log.Debugf("Resource range pool init for technology %s", ponRMgr.Technology)
// first load from KV profiles
- status := ponRMgr.InitResourceRangesFromKVStore()
+ status := ponRMgr.InitResourceRangesFromKVStore(context.Background())
if !status {
log.Debugf("Failed to load resource ranges from KV store for tech %s", ponRMgr.Technology)
}
@@ -298,7 +299,7 @@
self.resource_mgrs[pon_intf_id].assert_resource_limits(uni_id, PONResourceManager.UNI_ID)
*/
for _, rsrcMgr := range RsrcMgr.ResourceMgrs {
- if err := rsrcMgr.ClearDeviceResourcePool(); err != nil {
+ if err := rsrcMgr.ClearDeviceResourcePool(context.Background()); err != nil {
log.Debug("Failed to clear device resource pool")
return err
}
@@ -315,7 +316,7 @@
return 0, err
}
// Get ONU id for a provided pon interface ID.
- ONUID, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(ponIntfID,
+ ONUID, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(context.Background(), ponIntfID,
ponrmgr.ONU_ID, 1)
if err != nil {
log.Errorf("Failed to get resource for interface %d for type %s",
@@ -323,7 +324,7 @@
return 0, err
}
if ONUID != nil {
- RsrcMgr.ResourceMgrs[ponIntfID].InitResourceMap(fmt.Sprintf("%d,%d", ponIntfID, ONUID[0]))
+ RsrcMgr.ResourceMgrs[ponIntfID].InitResourceMap(context.Background(), fmt.Sprintf("%d,%d", ponIntfID, ONUID[0]))
return ONUID[0], err
}
@@ -337,7 +338,7 @@
var err error
IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", intfID, onuID, uniID)
- AllocID := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(IntfOnuIDUniID)
+ AllocID := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(context.Background(), IntfOnuIDUniID)
if AllocID != nil {
// Since we support only one alloc_id for the ONU at the moment,
// return the first alloc_id in the list, if available, for that
@@ -345,7 +346,7 @@
log.Debugw("Retrieved alloc ID from pon resource mgr", log.Fields{"AllocID": AllocID})
return AllocID[0]
}
- AllocID, err = RsrcMgr.ResourceMgrs[intfID].GetResourceID(intfID,
+ AllocID, err = RsrcMgr.ResourceMgrs[intfID].GetResourceID(context.Background(), intfID,
ponrmgr.ALLOC_ID, 1)
if AllocID == nil || err != nil {
@@ -354,7 +355,7 @@
}
// update the resource map on KV store with the list of alloc_id
// allocated for the pon_intf_onu_id tuple
- err = RsrcMgr.ResourceMgrs[intfID].UpdateAllocIdsForOnu(IntfOnuIDUniID, AllocID)
+ err = RsrcMgr.ResourceMgrs[intfID].UpdateAllocIdsForOnu(context.Background(), IntfOnuIDUniID, AllocID)
if err != nil {
log.Error("Failed to update Alloc ID")
return 0
@@ -375,12 +376,12 @@
var err error
IntfOnuIDUniID := fmt.Sprintf("%d,%d,%d", ponPort, onuID, uniID)
- GEMPortList := RsrcMgr.ResourceMgrs[ponPort].GetCurrentGEMPortIDsForOnu(IntfOnuIDUniID)
+ GEMPortList := RsrcMgr.ResourceMgrs[ponPort].GetCurrentGEMPortIDsForOnu(context.Background(), IntfOnuIDUniID)
if GEMPortList != nil {
return GEMPortList, nil
}
- GEMPortList, err = RsrcMgr.ResourceMgrs[ponPort].GetResourceID(ponPort,
+ GEMPortList, err = RsrcMgr.ResourceMgrs[ponPort].GetResourceID(context.Background(), ponPort,
ponrmgr.GEMPORT_ID, NumOfPorts)
if err != nil && GEMPortList == nil {
log.Errorf("Failed to get gem port id for %s", IntfOnuIDUniID)
@@ -389,7 +390,7 @@
// update the resource map on KV store with the list of gemport_id
// allocated for the pon_intf_onu_id tuple
- err = RsrcMgr.ResourceMgrs[ponPort].UpdateGEMPortIDsForOnu(IntfOnuIDUniID,
+ err = RsrcMgr.ResourceMgrs[ponPort].UpdateGEMPortIDsForOnu(context.Background(), IntfOnuIDUniID,
GEMPortList)
if err != nil {
log.Errorf("Failed to update GEM ports to kv store for %s", IntfOnuIDUniID)
@@ -408,29 +409,29 @@
FlowIds = append(FlowIds, FlowID)
IntfONUID = fmt.Sprintf("%d,%d,%d", IntfID, onuID, uniID)
- err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(IntfONUID, FlowID, false)
+ err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(context.Background(), IntfONUID, FlowID, false)
if err != nil {
log.Errorw("Failed to Update flow id for", log.Fields{"intf": IntfONUID})
}
- RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(IntfONUID, FlowID)
- RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(IntfID, ponrmgr.FLOW_ID, FlowIds)
+ RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(context.Background(), IntfONUID, FlowID)
+ RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(context.Background(), IntfID, ponrmgr.FLOW_ID, FlowIds)
}
// FreeFlowIDs releases the flow Ids
func (RsrcMgr *OpenOltResourceMgr) FreeFlowIDs(IntfID uint32, onuID uint32,
uniID uint32, FlowID []uint32) {
- RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(IntfID, ponrmgr.FLOW_ID, FlowID)
+ RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(context.Background(), IntfID, ponrmgr.FLOW_ID, FlowID)
var IntfOnuIDUniID string
var err error
for _, flow := range FlowID {
IntfOnuIDUniID = fmt.Sprintf("%d,%d,%d", IntfID, onuID, uniID)
- err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(IntfOnuIDUniID, flow, false)
+ err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(context.Background(), IntfOnuIDUniID, flow, false)
if err != nil {
log.Errorw("Failed to Update flow id for", log.Fields{"intf": IntfOnuIDUniID})
}
- RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(IntfOnuIDUniID, flow)
+ RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(context.Background(), IntfOnuIDUniID, flow)
}
}
@@ -439,7 +440,7 @@
func (RsrcMgr *OpenOltResourceMgr) FreeAllocID(IntfID uint32, allocID uint32) {
allocIDs := make([]uint32, 0)
allocIDs = append(allocIDs, allocID)
- RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(IntfID, ponrmgr.ALLOC_ID, allocIDs)
+ RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(context.Background(), IntfID, ponrmgr.ALLOC_ID, allocIDs)
}
// FreeGemPortID frees GemPortID on the PON resource pool and also frees the gemPortID association
@@ -447,5 +448,5 @@
func (RsrcMgr *OpenOltResourceMgr) FreeGemPortID(IntfID uint32, gemPortID uint32) {
gemPortIDs := make([]uint32, 0)
gemPortIDs = append(gemPortIDs, gemPortID)
- RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(IntfID, ponrmgr.GEMPORT_ID, gemPortIDs)
+ RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(context.Background(), IntfID, ponrmgr.GEMPORT_ID, gemPortIDs)
}
diff --git a/core/subscriber_manager.go b/core/subscriber_manager.go
index 5610d2c..d84fad9 100644
--- a/core/subscriber_manager.go
+++ b/core/subscriber_manager.go
@@ -18,10 +18,12 @@
import (
"fmt"
+
"github.com/opencord/openolt-scale-tester/config"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- "github.com/opencord/voltha-lib-go/v2/pkg/techprofile"
- oop "github.com/opencord/voltha-protos/v2/go/openolt"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-lib-go/v3/pkg/techprofile"
+ oop "github.com/opencord/voltha-protos/v3/go/openolt"
+ "golang.org/x/net/context"
)
func init() {
@@ -86,7 +88,8 @@
RsrMgr *OpenOltResourceMgr
}
-func (subs *Subscriber) Start(onuCh chan bool) {
+func (subs *Subscriber) Start(onuCh chan bool, isGroup bool) {
+ var err error
log.Infow("workflow-deploy-started-for-subscriber", log.Fields{"subsName": subs.SubscriberName})
@@ -94,9 +97,9 @@
for _, tpID := range subs.TestConfig.TpIDList {
uniPortName := fmt.Sprintf(UniPortName, subs.PonIntf, subs.OnuID, subs.UniID)
- if subs.TpInstance[tpID] =
- subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.CreateTechProfInstance(
- uint32(tpID), uniPortName, subs.PonIntf); subs.TpInstance[tpID] == nil {
+ if subs.TpInstance[tpID], err =
+ subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.CreateTechProfInstance(context.Background(),
+ uint32(tpID), uniPortName, subs.PonIntf); err != nil {
log.Errorw("error-creating-tp-instance-for-subs",
log.Fields{"subsName": subs.SubscriberName, "onuID": subs.OnuID, "tpID": tpID})
@@ -107,7 +110,7 @@
}
}
- DeployWorkflow(subs)
+ DeployWorkflow(subs, isGroup)
log.Infow("workflow-deploy-completed-for-subscriber", log.Fields{"subsName": subs.SubscriberName})
diff --git a/core/tt_workflow.go b/core/tt_workflow.go
new file mode 100644
index 0000000..862da20
--- /dev/null
+++ b/core/tt_workflow.go
@@ -0,0 +1,588 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package core
+
+import (
+ "errors"
+ "strings"
+ "sync/atomic"
+
+ "github.com/opencord/openolt-scale-tester/config"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
+ oop "github.com/opencord/voltha-protos/v3/go/openolt"
+ tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
+ "golang.org/x/net/context"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+var lastPonIntf *uint32 = new(uint32)
+
+func init() {
+ _, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
+}
+
+// A dummy struct to comply with the WorkFlow interface.
+type TtWorkFlow struct {
+}
+
+func AddTtDhcpIPV4Flow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig, rsrMgr *OpenOltResourceMgr) error {
+ var flowID []uint32
+ var err error
+
+ if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(context.Background(), uint32(config.NniIntfID),
+ ponresourcemanager.FLOW_ID, 1); err != nil {
+ return err
+ }
+
+ // DHCP IPV4
+ flowClassifier := &oop.Classifier{EthType: 2048, IpProto: 17, SrcPort: 67, DstPort: 68, PktTagType: "double_tag"}
+ actionCmd := &oop.ActionCmd{TrapToHost: true}
+ actionInfo := &oop.Action{Cmd: actionCmd}
+
+ flow := oop.Flow{AccessIntfId: -1, OnuId: -1, UniId: -1, FlowId: flowID[0],
+ FlowType: "downstream", AllocId: -1, GemportId: -1,
+ Classifier: flowClassifier, Action: actionInfo,
+ Priority: 1000, PortNo: uint32(config.NniIntfID)}
+
+ _, err = oo.FlowAdd(context.Background(), &flow)
+
+ st, _ := status.FromError(err)
+ if st.Code() == codes.AlreadyExists {
+ log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+ return nil
+ }
+
+ if err != nil {
+ log.Errorw("Failed to Add DHCP IPv4 to device", log.Fields{"err": err, "deviceFlow": flow})
+ rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(context.Background(), uint32(config.NniIntfID),
+ ponresourcemanager.FLOW_ID, flowID)
+ return err
+ }
+ log.Debugw("DHCP IPV4 added to device successfully ", log.Fields{"flow": flow})
+
+ return nil
+}
+
+func AddTtDhcpIPV6Flow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig, rsrMgr *OpenOltResourceMgr) error {
+ log.Info("tt-workflow-does-not-require-dhcp-ipv6-support--nothing-to-do")
+ return nil
+}
+
+func ProvisionTtNniTrapFlow(oo oop.OpenoltClient, config *config.OpenOltScaleTesterConfig, rsrMgr *OpenOltResourceMgr) error {
+ _ = AddTtDhcpIPV4Flow(oo, config, rsrMgr)
+ return nil
+}
+
+func FormatTtClassfierAction(flowType string, direction string, subs *Subscriber) (oop.Classifier, oop.Action) {
+ var flowClassifier oop.Classifier
+ var actionCmd oop.ActionCmd
+ var actionInfo oop.Action
+
+ if direction == Upstream {
+ switch flowType {
+ case IgmpFlow:
+ flowClassifier.EthType = IPv4EthType
+ flowClassifier.IpProto = IgmpProto
+ flowClassifier.SrcPort = 0
+ flowClassifier.DstPort = 0
+ flowClassifier.PktTagType = SingleTag
+ actionCmd.TrapToHost = true
+ actionInfo.Cmd = &actionCmd
+ case HsiaFlow:
+ actionCmd.AddOuterTag = true
+ actionInfo.Cmd = &actionCmd
+ actionInfo.IVid = 33
+ actionInfo.OVid = 7
+ flowClassifier.IPbits = 255
+ flowClassifier.OVid = 33
+ flowClassifier.PktTagType = SingleTag
+ case VoipFlow:
+ actionCmd.AddOuterTag = true
+ actionInfo.Cmd = &actionCmd
+ actionInfo.OPbits = 7
+ actionInfo.IVid = 63
+ actionInfo.OVid = 10
+ flowClassifier.IPbits = 255
+ flowClassifier.OPbits = 7
+ flowClassifier.OVid = 63
+ flowClassifier.PktTagType = SingleTag
+ case VodFlow:
+ actionCmd.AddOuterTag = true
+ actionInfo.Cmd = &actionCmd
+ actionInfo.OPbits = 5
+ actionInfo.IVid = 55
+ actionInfo.OVid = 555
+ flowClassifier.IPbits = 255
+ flowClassifier.OPbits = 5
+ flowClassifier.OVid = 55
+ flowClassifier.PktTagType = SingleTag
+ case MgmtFlow:
+ actionCmd.AddOuterTag = true
+ actionInfo.Cmd = &actionCmd
+ actionInfo.OPbits = 7
+ actionInfo.IVid = 75
+ actionInfo.OVid = 575
+ flowClassifier.IPbits = 255
+ flowClassifier.OPbits = 7
+ flowClassifier.OVid = 75
+ flowClassifier.PktTagType = SingleTag
+ default:
+ log.Errorw("Unsupported TT flow type", log.Fields{"flowtype": flowType,
+ "direction": direction})
+ }
+ } else if direction == Downstream {
+ switch flowType {
+ case IgmpFlow:
+ log.Errorw("Downstream IGMP flows are not required instead we have "+
+ "IGMP trap flows already installed", log.Fields{"flowtype": flowType,
+ "direction": direction})
+ case HsiaFlow:
+ actionCmd.RemoveOuterTag = true
+ actionInfo.Cmd = &actionCmd
+ actionInfo.IVid = 33
+ flowClassifier.IPbits = 255
+ flowClassifier.OPbits = 255
+ flowClassifier.IVid = 33
+ flowClassifier.OVid = 7
+ flowClassifier.PktTagType = DoubleTag
+ case VoipFlow:
+ actionCmd.RemoveOuterTag = true
+ actionInfo.Cmd = &actionCmd
+ actionInfo.IPbits = 7
+ actionInfo.IVid = 63
+ flowClassifier.IPbits = 255
+ flowClassifier.OPbits = 255
+ flowClassifier.IVid = 63
+ flowClassifier.OVid = 10
+ flowClassifier.DstMac = GenerateMac(true)
+ flowClassifier.PktTagType = DoubleTag
+ case VodFlow:
+ actionCmd.RemoveOuterTag = true
+ actionInfo.Cmd = &actionCmd
+ actionInfo.IPbits = 5
+ actionInfo.IVid = 55
+ flowClassifier.IPbits = 255
+ flowClassifier.OPbits = 255
+ flowClassifier.IVid = 55
+ flowClassifier.OVid = 555
+ flowClassifier.DstMac = GenerateMac(true)
+ flowClassifier.PktTagType = DoubleTag
+ case MgmtFlow:
+ actionCmd.RemoveOuterTag = true
+ actionInfo.Cmd = &actionCmd
+ actionInfo.IPbits = 7
+ actionInfo.IVid = 75
+ flowClassifier.IPbits = 255
+ flowClassifier.OPbits = 255
+ flowClassifier.IVid = 75
+ flowClassifier.OVid = 575
+ flowClassifier.DstMac = GenerateMac(true)
+ flowClassifier.PktTagType = DoubleTag
+ default:
+ log.Errorw("Unsupported TT flow type", log.Fields{"flowtype": flowType,
+ "direction": direction})
+ }
+ }
+ return flowClassifier, actionInfo
+}
+
+func AddTtFlow(subs *Subscriber, flowType string, direction string, flowID uint32,
+ allocID uint32, gemID uint32, pcp uint32) error {
+ log.Infow("add-flow", log.Fields{"WorkFlow": subs.TestConfig.WorkflowName, "FlowType": flowType,
+ "direction": direction, "flowID": flowID})
+ var err error
+
+ flowClassifier, actionInfo := FormatTtClassfierAction(flowType, direction, subs)
+ // Update the o_pbit for which this flow has to be classified
+ flowClassifier.OPbits = pcp
+ flow := oop.Flow{AccessIntfId: int32(subs.PonIntf), OnuId: int32(subs.OnuID),
+ UniId: int32(subs.UniID), FlowId: flowID,
+ FlowType: direction, AllocId: int32(allocID), GemportId: int32(gemID),
+ Classifier: &flowClassifier, Action: &actionInfo,
+ Priority: 1000, PortNo: subs.UniPortNo}
+
+ _, err = subs.OpenOltClient.FlowAdd(context.Background(), &flow)
+
+ st, _ := status.FromError(err)
+ if st.Code() == codes.AlreadyExists {
+ log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+ return nil
+ }
+
+ if err != nil {
+ log.Errorw("Failed to Add flow to device", log.Fields{"err": err, "deviceFlow": flow})
+ return errors.New(ReasonCodeToReasonString(FLOW_ADD_FAILED))
+ }
+ log.Debugw("Flow added to device successfully ", log.Fields{"flow": flow})
+
+ return nil
+}
+
+func (tt TtWorkFlow) ProvisionScheds(subs *Subscriber) error {
+ var trafficSched []*tp_pb.TrafficScheduler
+
+ log.Info("provisioning-scheds")
+
+ if trafficSched = getTrafficSched(subs, tp_pb.Direction_DOWNSTREAM); trafficSched == nil {
+ log.Error("ds-traffic-sched-is-nil")
+ return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
+ }
+
+ log.Debugw("Sending Traffic scheduler create to device",
+ log.Fields{"Direction": tp_pb.Direction_DOWNSTREAM, "TrafficScheds": trafficSched})
+ if _, err := subs.OpenOltClient.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
+ IntfId: subs.PonIntf, OnuId: subs.OnuID,
+ UniId: subs.UniID, PortNo: subs.UniPortNo,
+ TrafficScheds: trafficSched}); err != nil {
+ log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
+ return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
+ }
+
+ if trafficSched = getTrafficSched(subs, tp_pb.Direction_UPSTREAM); trafficSched == nil {
+ log.Error("us-traffic-sched-is-nil")
+ return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
+ }
+
+ log.Debugw("Sending Traffic scheduler create to device",
+ log.Fields{"Direction": tp_pb.Direction_UPSTREAM, "TrafficScheds": trafficSched})
+ if _, err := subs.OpenOltClient.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
+ IntfId: subs.PonIntf, OnuId: subs.OnuID,
+ UniId: subs.UniID, PortNo: subs.UniPortNo,
+ TrafficScheds: trafficSched}); err != nil {
+ log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
+ return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
+ }
+ return nil
+}
+
+func (tt TtWorkFlow) ProvisionQueues(subs *Subscriber) error {
+ log.Info("provisioning-queues")
+
+ var trafficQueues []*tp_pb.TrafficQueue
+ if trafficQueues = getTrafficQueues(subs, tp_pb.Direction_DOWNSTREAM); trafficQueues == nil {
+ log.Error("Failed to create traffic queues")
+ return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
+ }
+
+ // On receiving the CreateTrafficQueues request, the driver should create corresponding
+ // downstream queues.
+ log.Debugw("Sending Traffic Queues create to device",
+ log.Fields{"Direction": tp_pb.Direction_DOWNSTREAM, "TrafficQueues": trafficQueues})
+ if _, err := subs.OpenOltClient.CreateTrafficQueues(context.Background(),
+ &tp_pb.TrafficQueues{IntfId: subs.PonIntf, OnuId: subs.OnuID,
+ UniId: subs.UniID, PortNo: subs.UniPortNo,
+ TrafficQueues: trafficQueues}); err != nil {
+ log.Errorw("Failed to create traffic queues in device", log.Fields{"error": err})
+ return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
+ }
+
+ if trafficQueues = getTrafficQueues(subs, tp_pb.Direction_UPSTREAM); trafficQueues == nil {
+ log.Error("Failed to create traffic queues")
+ return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
+ }
+
+ // On receiving the CreateTrafficQueues request, the driver should create corresponding
+ // upstream queues.
+ log.Debugw("Sending Traffic Queues create to device",
+ log.Fields{"Direction": tp_pb.Direction_UPSTREAM, "TrafficQueues": trafficQueues})
+ if _, err := subs.OpenOltClient.CreateTrafficQueues(context.Background(),
+ &tp_pb.TrafficQueues{IntfId: subs.PonIntf, OnuId: subs.OnuID,
+ UniId: subs.UniID, PortNo: subs.UniPortNo,
+ TrafficQueues: trafficQueues}); err != nil {
+ log.Errorw("Failed to create traffic queues in device", log.Fields{"error": err})
+ return errors.New(ReasonCodeToReasonString(QUEUE_CREATION_FAILED))
+ }
+
+ return nil
+}
+
+func (tt TtWorkFlow) ProvisionEapFlow(subs *Subscriber) error {
+ log.Info("tt-workflow-does-not-support-eap-yet--nothing-to-do")
+ return nil
+}
+
+func (tt TtWorkFlow) ProvisionDhcpIPV4Flow(subs *Subscriber) error {
+ log.Info("tt-workflow-does-not-require-dhcp-ipv4-yet--nothing-to-do")
+ return nil
+}
+
+func (tt TtWorkFlow) ProvisionDhcpIPV6Flow(subs *Subscriber) error {
+ log.Info("tt-workflow-does-not-require-dhcp-ipv6-support--nothing-to-do")
+ return nil
+}
+
+func (tt TtWorkFlow) ProvisionIgmpFlow(subs *Subscriber) error {
+ log.Info("tt-workflow-does-not-require-igmp-support--nothing-to-do")
+ return nil
+}
+
+func (tt TtWorkFlow) ProvisionHsiaFlow(subs *Subscriber) error {
+ var err error
+ var flowID []uint32
+ var gemPortIDs []uint32
+
+ var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
+ for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
+ gemPortIDs = append(gemPortIDs, gem.GemportID)
+ }
+
+ for idx, gemID := range gemPortIDs {
+ pBitMap := subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList[idx].PbitMap
+ for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
+ if pbitSet == '1' {
+ pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
+ if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
+ ponresourcemanager.FLOW_ID, 1); err != nil {
+ return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+ } else {
+ var errUs, errDs error
+ if errUs = AddTtFlow(subs, HsiaFlow, Upstream, flowID[0], allocID, gemID, pcp); errUs != nil {
+ log.Errorw("failed to install US HSIA flow",
+ log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+ }
+ if errDs = AddTtFlow(subs, HsiaFlow, Downstream, flowID[0], allocID, gemID, pcp); errDs != nil {
+ log.Errorw("failed to install DS HSIA flow",
+ log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+ }
+ if errUs != nil && errDs != nil {
+ subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
+ ponresourcemanager.FLOW_ID, flowID)
+ }
+ if errUs != nil || errDs != nil {
+ if errUs != nil {
+ return errUs
+ }
+ return errDs
+ }
+ }
+ }
+ }
+ }
+ return nil
+}
+
+func (tt TtWorkFlow) ProvisionVoipFlow(subs *Subscriber) error {
+ var err error
+ var flowID []uint32
+ var gemPortIDs []uint32
+
+ var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
+ for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
+ gemPortIDs = append(gemPortIDs, gem.GemportID)
+ }
+
+ for idx, gemID := range gemPortIDs {
+ pBitMap := subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList[idx].PbitMap
+ for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
+ if pbitSet == '1' {
+ pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
+ if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
+ ponresourcemanager.FLOW_ID, 1); err != nil {
+ return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+ } else {
+ var errUs, errDs, errDhcp error
+ if errUs = AddTtFlow(subs, VoipFlow, Upstream, flowID[0], allocID, gemID, pcp); errUs != nil {
+ log.Errorw("failed to install US VOIP flow",
+ log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+ }
+ if errDs = AddTtFlow(subs, VoipFlow, Downstream, flowID[0], allocID, gemID, pcp); errDs != nil {
+ log.Errorw("failed to install DS VOIP flow",
+ log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+ }
+ if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowID[0], allocID, gemID, pcp); errDhcp != nil {
+ log.Errorw("failed to install US VOIP-DHCP flow",
+ log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+ }
+ if errUs != nil && errDs != nil && errDhcp != nil {
+ subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
+ ponresourcemanager.FLOW_ID, flowID)
+ }
+ if errUs != nil || errDs != nil || errDhcp != nil {
+ if errUs != nil {
+ return errUs
+ }
+ if errDs != nil {
+ return errDs
+ }
+ return errDhcp
+ }
+ }
+ }
+ }
+ }
+ return nil
+}
+
+func (tt TtWorkFlow) ProvisionVodFlow(subs *Subscriber) error {
+ var err error
+ var flowID []uint32
+ var gemPortIDs []uint32
+
+ var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
+ for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
+ gemPortIDs = append(gemPortIDs, gem.GemportID)
+ }
+
+ for idx, gemID := range gemPortIDs {
+ pBitMap := subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList[idx].PbitMap
+ for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
+ if pbitSet == '1' {
+ pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
+ if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
+ ponresourcemanager.FLOW_ID, 1); err != nil {
+ return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+ } else {
+ var errUs, errDs, errDhcp, errIgmp error
+ if errUs = AddTtFlow(subs, VodFlow, Upstream, flowID[0], allocID, gemID, pcp); errUs != nil {
+ log.Errorw("failed to install US VOIP flow",
+ log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+ }
+ if errDs = AddTtFlow(subs, VodFlow, Downstream, flowID[0], allocID, gemID, pcp); errDs != nil {
+ log.Errorw("failed to install DS VOIP flow",
+ log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+ }
+ if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowID[0], allocID, gemID, pcp); errDhcp != nil {
+ log.Errorw("failed to install US VOIP-DHCP flow",
+ log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+ }
+ if errIgmp = AddTtFlow(subs, IgmpFlow, Upstream, flowID[0], allocID, gemID, pcp); errIgmp != nil {
+ log.Errorw("failed to install US VOIP-IGMP flow",
+ log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+ }
+ if errUs != nil && errDs != nil && errDhcp != nil && errIgmp != nil {
+ subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
+ ponresourcemanager.FLOW_ID, flowID)
+ }
+ if errUs != nil || errDs != nil || errDhcp != nil || errIgmp != nil {
+ if errUs != nil {
+ return errUs
+ }
+ if errDs != nil {
+ return errDs
+ }
+ if errDhcp != nil {
+ return errDhcp
+ }
+ return errIgmp
+ }
+ }
+ }
+ }
+ }
+ return nil
+}
+
+func (tt TtWorkFlow) ProvisionMgmtFlow(subs *Subscriber) error {
+ var err error
+ var flowID []uint32
+ var gemPortIDs []uint32
+
+ var allocID = subs.TpInstance[subs.TestConfig.TpIDList[0]].UsScheduler.AllocID
+ for _, gem := range subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList {
+ gemPortIDs = append(gemPortIDs, gem.GemportID)
+ }
+
+ for idx, gemID := range gemPortIDs {
+ pBitMap := subs.TpInstance[subs.TestConfig.TpIDList[0]].UpstreamGemPortAttributeList[idx].PbitMap
+ for pos, pbitSet := range strings.TrimPrefix(pBitMap, "0b") {
+ if pbitSet == '1' {
+ pcp := uint32(len(strings.TrimPrefix(pBitMap, "0b"))) - 1 - uint32(pos)
+ if flowID, err = subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].GetResourceID(context.Background(), uint32(subs.PonIntf),
+ ponresourcemanager.FLOW_ID, 1); err != nil {
+ return errors.New(ReasonCodeToReasonString(FLOW_ID_GENERATION_FAILED))
+ } else {
+ var errUs, errDs, errDhcp error
+ if errUs = AddTtFlow(subs, MgmtFlow, Upstream, flowID[0], allocID, gemID, pcp); errUs != nil {
+ log.Errorw("failed to install US MGMT flow",
+ log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+ }
+ if errDs = AddTtFlow(subs, MgmtFlow, Downstream, flowID[0], allocID, gemID, pcp); errDs != nil {
+ log.Errorw("failed to install DS MGMT flow",
+ log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+ }
+ if errDhcp = AddFlow(subs, DhcpFlowIPV4, Upstream, flowID[0], allocID, gemID, pcp); errDhcp != nil {
+ log.Errorw("failed to install US MGMT-DHCP flow",
+ log.Fields{"onuID": subs.OnuID, "uniID": subs.UniID, "intf": subs.PonIntf})
+ }
+ if errUs != nil && errDs != nil && errDhcp != nil {
+ subs.RsrMgr.ResourceMgrs[uint32(subs.PonIntf)].FreeResourceID(context.Background(), uint32(subs.PonIntf),
+ ponresourcemanager.FLOW_ID, flowID)
+ }
+ if errUs != nil || errDs != nil || errDhcp != nil {
+ if errUs != nil {
+ return errUs
+ }
+ if errDs != nil {
+ return errDs
+ }
+ return errDhcp
+ }
+ }
+ }
+ }
+ }
+ return nil
+}
+
+func (tt TtWorkFlow) ProvisionMulticastFlow(subs *Subscriber) error {
+ var grp GroupData
+ var err error
+
+ numOfONUsPerPon := uint32(subs.TestConfig.NumOfOnu / uint(subs.RsrMgr.deviceInfo.GetPonPorts()))
+
+ grp.Subs = *subs
+ grp.Weight = 20
+ grp.Priority = 0
+ grp.OnuID = 6666
+ grp.UniID = 6666
+ grp.AllocID = 0
+ grp.GemPortID = 4069
+ grp.SchedPolicy = tp_pb.SchedulingPolicy_WRR
+
+ log.Debugw("Group data", log.Fields{"OnuID": subs.OnuID, "GroupID": grp.GroupID, "numOfONUsPerPon": numOfONUsPerPon})
+
+ grp.GroupID = subs.OnuID
+
+ if subs.PonIntf == 0 {
+ grp.AddGroup = true
+ grp.AddFlow = true
+ } else {
+ grp.AddFlow = false
+ grp.AddGroup = false
+ }
+
+ if subs.PonIntf == atomic.LoadUint32(lastPonIntf) {
+ _ = atomic.AddUint32(lastPonIntf, 1)
+ grp.AddSched = true
+ grp.AddQueue = true
+ } else {
+ grp.AddSched = false
+ grp.AddQueue = false
+ }
+
+ grp.AddMember = true
+
+ err = AddMulticastQueueFlow(&grp)
+
+ if err != nil {
+ log.Errorw("Failed to add multicast flow", log.Fields{"error": err})
+ }
+
+ return err
+}
diff --git a/core/utils.go b/core/utils.go
index a99096e..b239443 100644
--- a/core/utils.go
+++ b/core/utils.go
@@ -19,8 +19,8 @@
import (
"fmt"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- "github.com/opencord/voltha-protos/v2/go/openolt"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-protos/v3/go/openolt"
)
type DtStagKey struct {
@@ -31,12 +31,14 @@
var DtStag map[DtStagKey]uint32
var DtCtag map[uint32]uint32
var AttCtag map[uint32]uint32
+var TtCtag map[uint32]uint32
func init() {
_, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
AttCtag = make(map[uint32]uint32)
DtCtag = make(map[uint32]uint32)
DtStag = make(map[DtStagKey]uint32)
+ TtCtag = make(map[uint32]uint32)
}
const (
@@ -97,6 +99,18 @@
return DtCtag[ponIntf]
}
+func GetTtCtag(ponIntf uint32) uint32 {
+ var currCtag uint32
+ var ok bool
+ if currCtag, ok = TtCtag[ponIntf]; !ok {
+ // Start with ctag 1
+ TtCtag[ponIntf] = 1
+ return TtCtag[ponIntf]
+ }
+ TtCtag[ponIntf] = currCtag + 1
+ return TtCtag[ponIntf]
+}
+
func GetAttStag(ponIntf uint32) uint32 {
// start with stag 2
return ponIntf + 2
@@ -115,6 +129,11 @@
return DtStag[key]
}
+func GetTtStag(ponIntf uint32) uint32 {
+ // start with stag 2
+ return ponIntf + 2
+}
+
// TODO: More workflow support to be added here
func GetCtag(workFlowName string, ponIntf uint32) uint32 {
switch workFlowName {
@@ -122,6 +141,8 @@
return GetAttCtag(ponIntf)
case "DT":
return GetDtCtag(ponIntf)
+ case "TT":
+ return GetTtCtag(ponIntf)
default:
log.Errorw("unknown-workflowname", log.Fields{"workflow": workFlowName})
}
@@ -134,6 +155,8 @@
return GetAttStag(ponIntf)
case "DT":
return GetDtStag(ponIntf, onuID, uniID)
+ case "TT":
+ return GetTtStag(ponIntf)
default:
log.Errorw("unknown-workflowname", log.Fields{"workflow": workFlowName})
}
diff --git a/core/workflow_manager.go b/core/workflow_manager.go
index 8ab8551..1112a7e 100644
--- a/core/workflow_manager.go
+++ b/core/workflow_manager.go
@@ -18,9 +18,10 @@
import (
"errors"
+
"github.com/opencord/openolt-scale-tester/config"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- oop "github.com/opencord/voltha-protos/v2/go/openolt"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ oop "github.com/opencord/voltha-protos/v3/go/openolt"
)
func init() {
@@ -35,46 +36,72 @@
ProvisionDhcpIPV6Flow(subs *Subscriber) error
ProvisionIgmpFlow(subs *Subscriber) error
ProvisionHsiaFlow(subs *Subscriber) error
+ ProvisionVoipFlow(subs *Subscriber) error
+ ProvisionVodFlow(subs *Subscriber) error
+ ProvisionMgmtFlow(subs *Subscriber) error
+ ProvisionMulticastFlow(subs *Subscriber) error
// TODO: Add new items here as needed.
}
-func DeployWorkflow(subs *Subscriber) {
+func DeployWorkflow(subs *Subscriber, isGroup bool) {
var wf = getWorkFlow(subs)
- // TODO: Catch and log errors for below items if needed.
- if err := wf.ProvisionScheds(subs); err != nil {
- subs.Reason = err.Error()
- return
- }
+ if isGroup {
+ if err := wf.ProvisionMulticastFlow(subs); err != nil {
+ subs.Reason = err.Error()
+ return
+ }
+ } else {
+ // TODO: Catch and log errors for below items if needed.
+ if err := wf.ProvisionScheds(subs); err != nil {
+ subs.Reason = err.Error()
+ return
+ }
- if err := wf.ProvisionQueues(subs); err != nil {
- subs.Reason = err.Error()
- return
- }
+ if err := wf.ProvisionQueues(subs); err != nil {
+ subs.Reason = err.Error()
+ return
+ }
- if err := wf.ProvisionEapFlow(subs); err != nil {
- subs.Reason = err.Error()
- return
- }
+ if err := wf.ProvisionEapFlow(subs); err != nil {
+ subs.Reason = err.Error()
+ return
+ }
- if err := wf.ProvisionDhcpIPV4Flow(subs); err != nil {
- subs.Reason = err.Error()
- return
- }
+ if err := wf.ProvisionDhcpIPV4Flow(subs); err != nil {
+ subs.Reason = err.Error()
+ return
+ }
- if err := wf.ProvisionDhcpIPV6Flow(subs); err != nil {
- subs.Reason = err.Error()
- return
- }
+ if err := wf.ProvisionDhcpIPV6Flow(subs); err != nil {
+ subs.Reason = err.Error()
+ return
+ }
- if err := wf.ProvisionIgmpFlow(subs); err != nil {
- subs.Reason = err.Error()
- return
- }
+ if err := wf.ProvisionIgmpFlow(subs); err != nil {
+ subs.Reason = err.Error()
+ return
+ }
- if err := wf.ProvisionHsiaFlow(subs); err != nil {
- subs.Reason = err.Error()
- return
+ if err := wf.ProvisionHsiaFlow(subs); err != nil {
+ subs.Reason = err.Error()
+ return
+ }
+
+ if err := wf.ProvisionVoipFlow(subs); err != nil {
+ subs.Reason = err.Error()
+ return
+ }
+
+ if err := wf.ProvisionVodFlow(subs); err != nil {
+ subs.Reason = err.Error()
+ return
+ }
+
+ if err := wf.ProvisionMgmtFlow(subs); err != nil {
+ subs.Reason = err.Error()
+ return
+ }
}
subs.Reason = ReasonCodeToReasonString(SUBSCRIBER_PROVISION_SUCCESS)
@@ -88,6 +115,9 @@
case "DT":
log.Info("chosen-dt-workflow")
return DtWorkFlow{}
+ case "TT":
+ log.Info("chosen-tt-workflow")
+ return TtWorkFlow{}
// TODO: Add new workflow here
default:
log.Errorw("operator-workflow-not-supported-yet", log.Fields{"workflowName": subs.TestConfig.WorkflowName})
@@ -109,6 +139,11 @@
log.Error("error-installing-flow", log.Fields{"err": err})
return err
}
+ case "TT":
+ if err := ProvisionTtNniTrapFlow(oo, config, rsrMgr); err != nil {
+ log.Error("error-installing-flow", log.Fields{"err": err})
+ return err
+ }
// TODO: Add new items here
default:
log.Errorw("operator-workflow-not-supported-yet", log.Fields{"workflowName": config.WorkflowName})
diff --git a/core/workflow_utils.go b/core/workflow_utils.go
index 4376760..4ab7356 100644
--- a/core/workflow_utils.go
+++ b/core/workflow_utils.go
@@ -18,12 +18,13 @@
import (
"errors"
+ "math/rand"
"github.com/opencord/openolt-scale-tester/config"
- "github.com/opencord/voltha-lib-go/v2/pkg/log"
- "github.com/opencord/voltha-lib-go/v2/pkg/ponresourcemanager"
- oop "github.com/opencord/voltha-protos/v2/go/openolt"
- tp_pb "github.com/opencord/voltha-protos/v2/go/tech_profile"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
+ oop "github.com/opencord/voltha-protos/v3/go/openolt"
+ tp_pb "github.com/opencord/voltha-protos/v3/go/tech_profile"
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -60,20 +61,57 @@
Untagged = "untagged"
SingleTag = "single_tag"
DoubleTag = "double_tag"
+
+ VoipFlow = "VOIP_FLOW"
+
+ VodFlow = "VOD_FLOW"
+
+ MgmtFlow = "MGMT_FLOW"
+
+ IgmpProto = 2
+ IgmpFlow = "IGMP_FLOW"
)
+const (
+ MacSize = 6
+ MacMin = 0x0
+ MacMax = 0xFF
+)
+
+type GroupData struct {
+ Subs Subscriber `json:"subscriber"`
+ GroupID uint32 `json:"groupID"`
+ Weight uint32 `json:"weight"`
+ Priority uint32 `json:"priority"`
+ OnuID uint32 `json:"onuID"`
+ UniID uint32 `json:"uniID"`
+ AllocID uint32 `json:"allocId"`
+ GemPortID uint32 `json:"gemPortIds"`
+ SchedPolicy tp_pb.SchedulingPolicy `json:"schedPolicy"`
+ AddGroup bool `json:"addGroup"`
+ AddFlow bool `json:"addFlow"`
+ AddSched bool `json:"addSched"`
+ AddQueue bool `json:"addQueue"`
+ AddMember bool `json:"addMember"`
+}
+
func getTrafficSched(subs *Subscriber, direction tp_pb.Direction) []*tp_pb.TrafficScheduler {
var SchedCfg *tp_pb.SchedulerConfig
+ var err error
if direction == tp_pb.Direction_DOWNSTREAM {
- SchedCfg = subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
+ SchedCfg, err = subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
GetDsScheduler(subs.TpInstance[subs.TestConfig.TpIDList[0]])
-
} else {
- SchedCfg = subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
+ SchedCfg, err = subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
GetUsScheduler(subs.TpInstance[subs.TestConfig.TpIDList[0]])
}
+ if err != nil {
+ log.Errorw("Failed to create traffic schedulers", log.Fields{"direction": direction, "error": err})
+ return nil
+ }
+
// hard-code for now
cir := 16000
cbs := 5000
@@ -92,10 +130,15 @@
func getTrafficQueues(subs *Subscriber, direction tp_pb.Direction) []*tp_pb.TrafficQueue {
- trafficQueues := subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
+ trafficQueues, err := subs.RsrMgr.ResourceMgrs[subs.PonIntf].TechProfileMgr.
GetTrafficQueues(subs.TpInstance[subs.TestConfig.TpIDList[0]], direction)
- return trafficQueues
+ if err == nil {
+ return trafficQueues
+ }
+
+ log.Errorw("Failed to create traffic queues", log.Fields{"direction": direction, "error": err})
+ return nil
}
func FormatClassfierAction(flowType string, direction string, subs *Subscriber) (oop.Classifier, oop.Action) {
@@ -202,7 +245,7 @@
var flowID []uint32
var err error
- if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(uint32(config.NniIntfID),
+ if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(context.Background(), uint32(config.NniIntfID),
ponresourcemanager.FLOW_ID, 1); err != nil {
return err
}
@@ -226,7 +269,7 @@
if err != nil {
log.Errorw("Failed to Add LLDP flow to device", log.Fields{"err": err, "deviceFlow": flow})
- rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(uint32(config.NniIntfID),
+ rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].FreeResourceID(context.Background(), uint32(config.NniIntfID),
ponresourcemanager.FLOW_ID, flowID)
return err
}
@@ -234,3 +277,263 @@
return nil
}
+
+func GenerateMac(isRand bool) []byte {
+ var mac []byte
+
+ if isRand {
+ for i := 0; i < MacSize; i++ {
+ mac = append(mac, byte(rand.Intn(MacMax-MacMin)+MacMin))
+ }
+ } else {
+ mac = []byte{0x12, 0xAB, 0x34, 0xCD, 0x56, 0xEF}
+ }
+
+ return mac
+}
+
+func GenerateMulticastMac(onu_id uint32, group_id uint32) []byte {
+ var mac []byte
+
+ mac = []byte{0x01, 0x00, 0x5E}
+
+ mac = append(mac, byte(onu_id%255))
+ mac = append(mac, byte(rand.Intn(MacMax-MacMin)+MacMin))
+ mac = append(mac, byte(group_id))
+
+ return mac
+}
+
+func PerformGroupOperation(grp *GroupData, groupCfg *oop.Group) (*oop.Empty, error) {
+ oo := grp.Subs.OpenOltClient
+
+ var err error
+ var res *oop.Empty
+
+ if res, err = oop.OpenoltClient.PerformGroupOperation(oo, context.Background(), groupCfg); err != nil {
+ log.Errorw("Failed to perform - PerformGroupOperation()", log.Fields{"err": err})
+ return nil, err
+ }
+
+ log.Info("Successfully called - PerformGroupOperation()")
+
+ return res, nil
+}
+
+func CreateGroup(grp *GroupData) (*oop.Empty, error) {
+ var groupCfg oop.Group
+
+ log.Infow("creating group", log.Fields{"GroupID": grp.GroupID})
+
+ groupCfg.Command = oop.Group_SET_MEMBERS
+ groupCfg.GroupId = grp.GroupID
+
+ return PerformGroupOperation(grp, &groupCfg)
+}
+
+func OpMulticastTrafficQueue(grp *GroupData, isCreating bool) (*oop.Empty, error) {
+ log.Infow("operating on multicast traffic queue", log.Fields{"Creating": isCreating, "GroupID": grp.GroupID})
+
+ oo := grp.Subs.OpenOltClient
+
+ var request tp_pb.TrafficQueues
+ request.IntfId = grp.Subs.PonIntf
+ request.OnuId = grp.Subs.OnuID
+ request.UniId = grp.Subs.UniID
+
+ var trafficQueues []*tp_pb.TrafficQueue
+
+ var trafficQueue tp_pb.TrafficQueue
+ trafficQueue.Direction = tp_pb.Direction_DOWNSTREAM
+ trafficQueue.Priority = grp.Priority
+ trafficQueue.Weight = grp.Weight
+ trafficQueue.GemportId = grp.GemPortID
+ trafficQueue.SchedPolicy = grp.SchedPolicy
+
+ trafficQueues = append(trafficQueues, &trafficQueue)
+
+ request.TrafficQueues = trafficQueues
+
+ var err error
+ var res *oop.Empty
+
+ if isCreating {
+ if res, err = oop.OpenoltClient.CreateTrafficQueues(oo, context.Background(), &request); err != nil {
+ log.Errorw("Failed to perform - CreateTrafficQueues()", log.Fields{"err": err})
+ return nil, err
+ }
+
+ log.Info("Successfully called - CreateTrafficQueues()")
+ } else {
+ if res, err = oop.OpenoltClient.RemoveTrafficQueues(oo, context.Background(), &request); err != nil {
+ log.Errorw("Failed to perform - RemoveTrafficQueues()", log.Fields{"err": err})
+ return nil, err
+ }
+
+ log.Info("Successfully called - RemoveTrafficQueues()")
+ }
+
+ return res, nil
+}
+
+func AddMulticastFlow(grp *GroupData) error {
+ log.Infow("add multicast flow", log.Fields{"GroupID": grp.GroupID})
+
+ oo := grp.Subs.OpenOltClient
+ config := grp.Subs.TestConfig
+ rsrMgr := grp.Subs.RsrMgr
+
+ var flowID []uint32
+ var err error
+
+ if flowID, err = rsrMgr.ResourceMgrs[uint32(config.NniIntfID)].GetResourceID(context.Background(), uint32(config.NniIntfID),
+ ponresourcemanager.FLOW_ID, 1); err != nil {
+ return err
+ }
+
+ flowClassifier := &oop.Classifier{
+ IPbits: 255,
+ OPbits: 255,
+ IVid: 55,
+ OVid: 255,
+ DstMac: GenerateMulticastMac(grp.Subs.OnuID, grp.GroupID),
+ PktTagType: DoubleTag}
+
+ flow := oop.Flow{AccessIntfId: int32(grp.Subs.PonIntf), OnuId: int32(grp.Subs.OnuID), UniId: int32(grp.Subs.UniID), FlowId: flowID[0],
+ FlowType: "multicast", AllocId: int32(grp.AllocID), GemportId: int32(grp.GemPortID),
+ Classifier: flowClassifier, Priority: int32(grp.Priority), PortNo: uint32(grp.Subs.UniPortNo), GroupId: uint32(grp.GroupID)}
+
+ _, err = oo.FlowAdd(context.Background(), &flow)
+
+ st, _ := status.FromError(err)
+ if st.Code() == codes.AlreadyExists {
+ log.Debugw("Flow already exists", log.Fields{"err": err, "deviceFlow": flow})
+ return nil
+ }
+
+ if err != nil {
+ log.Errorw("Failed to add multicast flow to device", log.Fields{"err": err, "deviceFlow": flow})
+ rsrMgr.ResourceMgrs[uint32(grp.Subs.PonIntf)].FreeResourceID(context.Background(), uint32(config.NniIntfID),
+ ponresourcemanager.FLOW_ID, flowID)
+ return err
+ }
+
+ log.Debugw("Multicast flow added to device successfully ", log.Fields{"flow": flow})
+
+ return nil
+}
+
+func AddMulticastSched(grp *GroupData) error {
+ log.Infow("creating multicast sched", log.Fields{"GroupID": grp.GroupID})
+
+ SchedCfg := &tp_pb.SchedulerConfig{
+ Direction: tp_pb.Direction_DOWNSTREAM,
+ AdditionalBw: tp_pb.AdditionalBW_AdditionalBW_BestEffort,
+ Priority: grp.Priority,
+ Weight: grp.Weight,
+ SchedPolicy: tp_pb.SchedulingPolicy_WRR}
+
+ // hard-code for now
+ cir := 1948
+ cbs := 31768
+ eir := 100
+ ebs := 1000
+ pir := cir + eir
+ pbs := cbs + ebs
+
+ TfShInfo := &tp_pb.TrafficShapingInfo{Cir: uint32(cir), Cbs: uint32(cbs), Pir: uint32(pir), Pbs: uint32(pbs)}
+
+ TrafficSched := []*tp_pb.TrafficScheduler{grp.Subs.RsrMgr.ResourceMgrs[grp.Subs.PonIntf].TechProfileMgr.
+ GetTrafficScheduler(grp.Subs.TpInstance[grp.Subs.TestConfig.TpIDList[0]], SchedCfg, TfShInfo)}
+
+ if TrafficSched == nil {
+ log.Error("Create scheduler for multicast traffic failed")
+ return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
+ }
+
+ log.Debugw("Sending Traffic scheduler create to device",
+ log.Fields{"Direction": tp_pb.Direction_DOWNSTREAM, "TrafficScheds": TrafficSched})
+
+ if _, err := grp.Subs.OpenOltClient.CreateTrafficSchedulers(context.Background(), &tp_pb.TrafficSchedulers{
+ IntfId: grp.Subs.PonIntf, OnuId: grp.Subs.OnuID,
+ UniId: grp.Subs.UniID, PortNo: grp.Subs.UniPortNo,
+ TrafficScheds: TrafficSched}); err != nil {
+ log.Errorw("Failed to create traffic schedulers", log.Fields{"error": err})
+ return errors.New(ReasonCodeToReasonString(SCHED_CREATION_FAILED))
+ }
+
+ return nil
+}
+
+func OpMemberToGroup(grp *GroupData, isAdding bool) (*oop.Empty, error) {
+ log.Infow("operating on group", log.Fields{"Adding": isAdding})
+
+ var groupCfg oop.Group
+
+ if isAdding {
+ groupCfg.Command = oop.Group_ADD_MEMBERS
+ } else {
+ groupCfg.Command = oop.Group_REMOVE_MEMBERS
+ }
+
+ groupCfg.GroupId = grp.GroupID
+
+ var members []*oop.GroupMember
+
+ var member0 oop.GroupMember
+ member0.InterfaceId = grp.Subs.PonIntf
+ member0.GemPortId = grp.GemPortID
+ member0.Priority = grp.Priority
+ //member0.SchedPolicy = tp_pb.SchedulingPolicy_WRR
+ member0.InterfaceType = oop.GroupMember_PON
+
+ members = append(members, &member0)
+
+ groupCfg.Members = members
+
+ return PerformGroupOperation(grp, &groupCfg)
+}
+
+func AddMulticastQueueFlow(grp *GroupData) error {
+ var err error
+
+ log.Debugw("Create multicast queue flow", log.Fields{"GroupID": grp.GroupID, "AddGroup": grp.AddGroup,
+ "AddFlow": grp.AddFlow, "AddSched": grp.AddSched, "AddQueue": grp.AddQueue, "AddMember": grp.AddMember})
+
+ if grp.AddGroup {
+ if _, err = CreateGroup(grp); err != nil {
+ log.Error("Failed to add group to device")
+ return err
+ }
+ }
+
+ if grp.AddFlow {
+ if err = AddMulticastFlow(grp); err != nil {
+ log.Error("Failed to add multicast flow to device")
+ return err
+ }
+ }
+
+ if grp.AddSched {
+ if err = AddMulticastSched(grp); err != nil {
+ log.Error("Failed to add multicast sched to device")
+ return err
+ }
+ }
+
+ if grp.AddQueue {
+ if _, err = OpMulticastTrafficQueue(grp, true); err != nil {
+ log.Error("Failed to add multicast queue to device")
+ return err
+ }
+ }
+
+ if grp.AddMember {
+ if _, err = OpMemberToGroup(grp, true); err != nil {
+ log.Error("Failed to add member to group")
+ return err
+ }
+ }
+
+ return nil
+}