VOL-3419: Replicate voltha flows in openolt agent
- The scale-tester-app will adhere to new openolt.proto interface (version 4.0.2)
and will pass necessary information for openolt-agent to replicate the flows.
- upgrade to voltha-lib-go version 4.0.0
Change-Id: I9d862929ae8ac4468d4e93096f8cd8e16f26ec93
diff --git a/core/olt_manager.go b/core/olt_manager.go
index f4e3143..ab171ef 100644
--- a/core/olt_manager.go
+++ b/core/olt_manager.go
@@ -32,10 +32,10 @@
"github.com/cenkalti/backoff/v3"
"github.com/opencord/openolt-scale-tester/config"
- "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
- "github.com/opencord/voltha-lib-go/v3/pkg/techprofile"
- oop "github.com/opencord/voltha-protos/v3/go/openolt"
+ "github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v4/pkg/log"
+ "github.com/opencord/voltha-lib-go/v4/pkg/techprofile"
+ oop "github.com/opencord/voltha-protos/v4/go/openolt"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -52,28 +52,24 @@
}
type OpenOltManager struct {
- ipPort string
- deviceInfo *oop.DeviceInfo
- OnuDeviceMap map[OnuDeviceKey]*OnuDevice `json:"onuDeviceMap"`
- TechProfile map[uint32]*techprofile.TechProfileIf
- clientConn *grpc.ClientConn
- openOltClient oop.OpenoltClient
- testConfig *config.OpenOltScaleTesterConfig
- rsrMgr *OpenOltResourceMgr
- lockRsrAlloc sync.RWMutex
+ ipPort string
+ deviceInfo *oop.DeviceInfo
+ OnuDeviceMap map[OnuDeviceKey]*OnuDevice `json:"onuDeviceMap"`
+ TechProfile map[uint32]*techprofile.TechProfileIf
+ clientConn *grpc.ClientConn
+ openOltClient oop.OpenoltClient
+ testConfig *config.OpenOltScaleTesterConfig
+ rsrMgr *OpenOltResourceMgr
+ lockRsrAlloc sync.RWMutex
lockOpenOltManager sync.RWMutex
}
-func init() {
- _, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
-}
-
func NewOpenOltManager(ipPort string) *OpenOltManager {
- log.Infow("initialized openolt manager with ipPort", log.Fields{"ipPort": ipPort})
+ logger.Infow(nil, "initialized openolt manager with ipPort", log.Fields{"ipPort": ipPort})
return &OpenOltManager{
- ipPort: ipPort,
- OnuDeviceMap: make(map[OnuDeviceKey]*OnuDevice),
- lockRsrAlloc: sync.RWMutex{},
+ ipPort: ipPort,
+ OnuDeviceMap: make(map[OnuDeviceKey]*OnuDevice),
+ lockRsrAlloc: sync.RWMutex{},
lockOpenOltManager: sync.RWMutex{},
}
}
@@ -83,9 +79,9 @@
var err error
// Verify that etcd is up before starting the application.
etcdIpPort := "http://" + om.testConfig.KVStoreHost + ":" + strconv.Itoa(om.testConfig.KVStorePort)
- client, err := kvstore.NewEtcdClient(etcdIpPort, 5)
+ client, err := kvstore.NewEtcdClient(context.Background(), etcdIpPort, 5*time.Second, log.FatalLevel)
if err != nil || client == nil {
- log.Fatal("error-initializing-etcd-client")
+ logger.Fatal(nil, "error-initializing-etcd-client")
return
}
@@ -98,40 +94,40 @@
jsonFile, err := os.Open(tpFilePath)
// if we os.Open returns an error then handle it
if err != nil {
- log.Fatalw("could-not-find-tech-profile", log.Fields{"err": err, "tpFile": tpFilePath})
+ logger.Fatalw(nil, "could-not-find-tech-profile", log.Fields{"err": err, "tpFile": tpFilePath})
}
- log.Debugw("tp-file-opened-successfully", log.Fields{"tpFile": tpFilePath})
+ logger.Debugw(nil, "tp-file-opened-successfully", log.Fields{"tpFile": tpFilePath})
// read our opened json file as a byte array.
if byteValue, err = ioutil.ReadAll(jsonFile); err != nil {
- log.Fatalw("could-not-read-tp-file", log.Fields{"err": err, "tpFile": tpFilePath})
+ logger.Fatalw(nil, "could-not-read-tp-file", log.Fields{"err": err, "tpFile": tpFilePath})
}
var tp techprofile.TechProfile
if err = json.Unmarshal(byteValue, &tp); err != nil {
- log.Fatalw("could-not-unmarshal-tp", log.Fields{"err": err, "tpFile": tpFilePath})
+ logger.Fatalw(nil, "could-not-unmarshal-tp", log.Fields{"err": err, "tpFile": tpFilePath})
} else {
- log.Infow("tp-read-from-file", log.Fields{"tp": tp, "tpFile": tpFilePath})
+ logger.Infow(nil, "tp-read-from-file", log.Fields{"tp": tp, "tpFile": tpFilePath})
}
kvPath := fmt.Sprintf(TechProfileKVPath, om.deviceInfo.Technology, tpID)
tpJson, err := json.Marshal(tp)
err = client.Put(context.Background(), kvPath, tpJson)
if err != nil {
- log.Fatalw("tp-put-to-etcd-failed", log.Fields{"tpPath": kvPath, "err": err})
+ logger.Fatalw(nil, "tp-put-to-etcd-failed", log.Fields{"tpPath": kvPath, "err": err})
}
// verify the PUT succeeded.
kvResult, err := client.Get(context.Background(), kvPath)
if kvResult == nil {
- log.Fatal("tp-not-found-on-kv-after-load", log.Fields{"key": kvPath, "err": err})
+ logger.Fatal(nil, "tp-not-found-on-kv-after-load", log.Fields{"key": kvPath, "err": err})
} else {
var KvTpIns techprofile.TechProfile
var resPtr = &KvTpIns
if value, err := kvstore.ToByte(kvResult.Value); err == nil {
if err = json.Unmarshal(value, resPtr); err != nil {
- log.Fatal("error-unmarshal-kv-result", log.Fields{"err": err, "key": kvPath, "value": value})
+ logger.Fatal(nil, "error-unmarshal-kv-result", log.Fields{"err": err, "key": kvPath, "value": value})
} else {
- log.Infow("verified-ok-that-tp-load-was-good", log.Fields{"tpID": tpID, "kvPath": kvPath})
+ logger.Infow(nil, "verified-ok-that-tp-load-was-good", log.Fields{"tpID": tpID, "kvPath": kvPath})
_ = jsonFile.Close()
continue
}
@@ -146,31 +142,31 @@
// Establish gRPC connection with the device
if om.clientConn, err = grpc.Dial(om.ipPort, grpc.WithInsecure(), grpc.WithBlock()); err != nil {
- log.Errorw("Failed to dial device", log.Fields{"ipPort": om.ipPort, "err": err})
+ logger.Errorw(nil, "Failed to dial device", log.Fields{"ipPort": om.ipPort, "err": err})
return err
}
om.openOltClient = oop.NewOpenoltClient(om.clientConn)
// Populate Device Info
if deviceInfo, err := om.populateDeviceInfo(); err != nil {
- log.Error("error fetching device info", log.Fields{"err": err, "deviceInfo": deviceInfo})
+ logger.Error(nil, "error fetching device info", log.Fields{"err": err, "deviceInfo": deviceInfo})
return err
}
// Read and load TPs to etcd.
om.readAndLoadTPsToEtcd()
- log.Info("etcd-up-and-running--tp-loaded-successfully")
+ logger.Info(nil, "etcd-up-and-running--tp-loaded-successfully")
if om.rsrMgr = NewResourceMgr("ABCD", om.testConfig.KVStoreHost+":"+strconv.Itoa(om.testConfig.KVStorePort),
"etcd", "openolt", om.deviceInfo); om.rsrMgr == nil {
- log.Error("Error while instantiating resource manager")
+ logger.Error(nil, "Error while instantiating resource manager")
return errors.New("instantiating resource manager failed")
}
om.TechProfile = make(map[uint32]*techprofile.TechProfileIf)
if err = om.populateTechProfilePerPonPort(); err != nil {
- log.Error("Error while populating tech profile mgr\n")
+ logger.Error(nil, "Error while populating tech profile mgr\n")
return errors.New("error-loading-tech-profile-per-ponPort")
}
@@ -179,7 +175,7 @@
// Provision OLT NNI Trap flows as needed by the Workflow
if err = ProvisionNniTrapFlow(om.openOltClient, om.testConfig, om.rsrMgr); err != nil {
- log.Error("failed-to-add-nni-trap-flow", log.Fields{"err": err})
+ logger.Error(nil, "failed-to-add-nni-trap-flow", log.Fields{"err": err})
}
// Provision ONUs one by one
@@ -193,16 +189,16 @@
var err error
if om.deviceInfo, err = om.openOltClient.GetDeviceInfo(context.Background(), new(oop.Empty)); err != nil {
- log.Errorw("Failed to fetch device info", log.Fields{"err": err})
+ logger.Errorw(nil, "Failed to fetch device info", log.Fields{"err": err})
return nil, err
}
if om.deviceInfo == nil {
- log.Errorw("Device info is nil", log.Fields{})
+ logger.Errorw(nil, "Device info is nil", log.Fields{})
return nil, errors.New("failed to get device info from OLT")
}
- log.Debugw("Fetched device info", log.Fields{"deviceInfo": om.deviceInfo})
+ logger.Debugw(nil, "Fetched device info", log.Fields{"deviceInfo": om.deviceInfo})
return om.deviceInfo, nil
}
@@ -221,20 +217,20 @@
// If the number of ONUs to provision is not a power of 2, stop execution
// This is needed for ensure even distribution of ONUs across all PONs
if !isPowerOfTwo(om.testConfig.NumOfOnu) {
- log.Errorw("num-of-onus-to-provision-is-not-a-power-of-2", log.Fields{"numOfOnus": om.testConfig.NumOfOnu})
+ logger.Errorw(nil, "num-of-onus-to-provision-is-not-a-power-of-2", log.Fields{"numOfOnus": om.testConfig.NumOfOnu})
return
}
// Number of ONUs to provision should not be less than the number of PON ports.
// We need at least one ONU per PON
if om.testConfig.NumOfOnu < uint(om.deviceInfo.PonPorts) {
- log.Errorw("num-of-onu-is-less-than-num-of-pon-port", log.Fields{"numOfOnus":om.testConfig.NumOfOnu, "numOfPon": om.deviceInfo.PonPorts})
+ logger.Errorw(nil, "num-of-onu-is-less-than-num-of-pon-port", log.Fields{"numOfOnus": om.testConfig.NumOfOnu, "numOfPon": om.deviceInfo.PonPorts})
return
}
numOfONUsPerPon = om.testConfig.NumOfOnu / uint(om.deviceInfo.PonPorts)
totalOnusToProvision := numOfONUsPerPon * uint(om.deviceInfo.PonPorts)
- log.Infow("***** all-onu-provision-started ******",
+ logger.Infow(nil, "***** all-onu-provision-started ******",
log.Fields{"totalNumOnus": totalOnusToProvision,
"numOfOnusPerPon": numOfONUsPerPon,
"numOfPons": om.deviceInfo.PonPorts})
@@ -258,12 +254,12 @@
om.lockRsrAlloc.Lock()
sn := GenerateNextONUSerialNumber()
om.lockRsrAlloc.Unlock()
- log.Debugw("provisioning onu", log.Fields{"onuID": j, "ponPort": i, "serialNum": sn})
+ logger.Debugw(nil, "provisioning onu", log.Fields{"onuID": j, "ponPort": i, "serialNum": sn})
if onuID, err = om.rsrMgr.GetONUID(j); err != nil {
- log.Errorw("error getting onu id", log.Fields{"err": err})
+ logger.Errorw(nil, "error getting onu id", log.Fields{"err": err})
continue
}
- log.Infow("onu-provision-started-from-olt-manager", log.Fields{"onuId": onuID, "ponIntf": i})
+ logger.Infow(nil, "onu-provision-started-from-olt-manager", log.Fields{"onuId": onuID, "ponIntf": i})
onuWg.Add(1)
go om.activateONU(j, onuID, sn, om.stringifySerialNumber(sn), &onuWg)
@@ -273,10 +269,10 @@
onuWg.Wait()
}
endTime := time.Now()
- log.Info("******** all-onu-provisioning-completed *******")
+ logger.Info(nil, "******** all-onu-provisioning-completed *******")
totalTime := endTime.Sub(startTime)
out := time.Time{}.Add(totalTime)
- log.Infof("****** Total Time to provision all the ONUs is => %s", out.Format("15:04:05"))
+ logger.Infof(nil, "****** Total Time to provision all the ONUs is => %s", out.Format("15:04:05"))
// TODO: We need to dump the results at the end. But below json marshall does not work. We will need custom Marshal function.
/*
@@ -290,7 +286,7 @@
}
func (om *OpenOltManager) activateONU(intfID uint32, onuID uint32, serialNum *oop.SerialNumber, serialNumber string, onuWg *sync.WaitGroup) {
- log.Debugw("activate-onu", log.Fields{"intfID": intfID, "onuID": onuID, "serialNum": serialNum, "serialNumber": serialNumber})
+ logger.Debugw(nil, "activate-onu", log.Fields{"intfID": intfID, "onuID": onuID, "serialNum": serialNum, "serialNumber": serialNumber})
// TODO: need resource manager
var pir uint32 = 1000000
var onuDevice = OnuDevice{
@@ -312,13 +308,13 @@
if _, err = om.openOltClient.ActivateOnu(context.Background(), &Onu); err != nil {
st, _ := status.FromError(err)
if st.Code() == codes.AlreadyExists {
- log.Debug("ONU activation is in progress", log.Fields{"SerialNumber": serialNumber})
+ logger.Debug(nil, "ONU activation is in progress", log.Fields{"SerialNumber": serialNumber})
} else {
nanos = now.UnixNano()
milliEnd := nanos / 1000000
onuDevice.OnuProvisionEndTime = time.Unix(0, nanos)
onuDevice.OnuProvisionDurationInMs = milliEnd - milliStart
- log.Errorw("activate-onu-failed", log.Fields{"Onu": Onu, "err ": err})
+ logger.Errorw(nil, "activate-onu-failed", log.Fields{"Onu": Onu, "err ": err})
onuDevice.Reason = err.Error()
}
} else {
@@ -327,7 +323,7 @@
onuDevice.OnuProvisionEndTime = time.Unix(0, nanos)
onuDevice.OnuProvisionDurationInMs = milliEnd - milliStart
onuDevice.Reason = ReasonOk
- log.Infow("activated-onu", log.Fields{"SerialNumber": serialNumber})
+ logger.Infow(nil, "activated-onu", log.Fields{"SerialNumber": serialNumber})
}
om.lockOpenOltManager.Lock()
@@ -364,14 +360,14 @@
// readIndications to read the indications from the OLT device
func (om *OpenOltManager) readIndications() {
- defer log.Errorw("Indications ended", log.Fields{})
+ defer logger.Errorw(nil, "Indications ended", log.Fields{})
indications, err := om.openOltClient.EnableIndication(context.Background(), new(oop.Empty))
if err != nil {
- log.Errorw("Failed to read indications", log.Fields{"err": err})
+ logger.Errorw(nil, "Failed to read indications", log.Fields{"err": err})
return
}
if indications == nil {
- log.Errorw("Indications is nil", log.Fields{})
+ logger.Errorw(nil, "Indications is nil", log.Fields{})
return
}
@@ -385,26 +381,26 @@
for {
indication, err := indications.Recv()
if err == io.EOF {
- log.Infow("EOF for indications", log.Fields{"err": err})
+ logger.Infow(nil, "EOF for indications", log.Fields{"err": err})
// Use an exponential back off to prevent getting into a tight loop
duration := indicationBackoff.NextBackOff()
if duration == backoff.Stop {
// If we reach a maximum then warn and reset the backoff
// timer and keep attempting.
- log.Warnw("Maximum indication backoff reached, resetting backoff timer",
+ logger.Warnw(nil, "Maximum indication backoff reached, resetting backoff timer",
log.Fields{"max_indication_backoff": indicationBackoff.MaxElapsedTime})
indicationBackoff.Reset()
}
time.Sleep(indicationBackoff.NextBackOff())
indications, err = om.openOltClient.EnableIndication(context.Background(), new(oop.Empty))
if err != nil {
- log.Errorw("Failed to read indications", log.Fields{"err": err})
+ logger.Errorw(nil, "Failed to read indications", log.Fields{"err": err})
return
}
continue
}
if err != nil {
- log.Infow("Failed to read from indications", log.Fields{"err": err})
+ logger.Infow(nil, "Failed to read from indications", log.Fields{"err": err})
break
}
// Reset backoff if we have a successful receive
@@ -417,42 +413,42 @@
func (om *OpenOltManager) handleIndication(indication *oop.Indication) {
switch indication.Data.(type) {
case *oop.Indication_OltInd:
- log.Info("received olt indication")
+ logger.Info(nil, "received olt indication")
case *oop.Indication_IntfInd:
intfInd := indication.GetIntfInd()
- log.Infow("Received interface indication ", log.Fields{"InterfaceInd": intfInd})
+ logger.Infow(nil, "Received interface indication ", log.Fields{"InterfaceInd": intfInd})
case *oop.Indication_IntfOperInd:
intfOperInd := indication.GetIntfOperInd()
if intfOperInd.GetType() == "nni" {
- log.Info("received interface oper indication for nni port")
+ logger.Info(nil, "received interface oper indication for nni port")
} else if intfOperInd.GetType() == "pon" {
- log.Info("received interface oper indication for pon port")
+ logger.Info(nil, "received interface oper indication for pon port")
}
/*
case *oop.Indication_OnuDiscInd:
onuDiscInd := indication.GetOnuDiscInd()
- log.Infow("Received Onu discovery indication ", log.Fields{"OnuDiscInd": onuDiscInd})
+ logger.Infow(nil, "Received Onu discovery indication ", log.Fields{"OnuDiscInd": onuDiscInd})
*/
case *oop.Indication_OnuInd:
onuInd := indication.GetOnuInd()
- log.Infow("Received Onu indication ", log.Fields{"OnuInd": onuInd})
+ logger.Infow(nil, "Received Onu indication ", log.Fields{"OnuInd": onuInd})
case *oop.Indication_OmciInd:
omciInd := indication.GetOmciInd()
- log.Debugw("Received Omci indication ", log.Fields{"IntfId": omciInd.IntfId, "OnuId": omciInd.OnuId, "pkt": hex.EncodeToString(omciInd.Pkt)})
+ logger.Debugw(nil, "Received Omci indication ", log.Fields{"IntfId": omciInd.IntfId, "OnuId": omciInd.OnuId, "pkt": hex.EncodeToString(omciInd.Pkt)})
case *oop.Indication_PktInd:
pktInd := indication.GetPktInd()
- log.Infow("Received packet indication ", log.Fields{"PktInd": pktInd})
+ logger.Infow(nil, "Received packet indication ", log.Fields{"PktInd": pktInd})
/*
case *oop.Indication_PortStats:
portStats := indication.GetPortStats()
- log.Infow("Received port stats", log.Fields{"portStats": portStats})
+ logger.Infow(nil, "Received port stats", log.Fields{"portStats": portStats})
case *oop.Indication_FlowStats:
flowStats := indication.GetFlowStats()
- log.Infow("Received flow stats", log.Fields{"FlowStats": flowStats})
+ logger.Infow(nil, "Received flow stats", log.Fields{"FlowStats": flowStats})
*/
case *oop.Indication_AlarmInd:
alarmInd := indication.GetAlarmInd()
- log.Infow("Received alarm indication ", log.Fields{"AlarmInd": alarmInd})
+ logger.Infow(nil, "Received alarm indication ", log.Fields{"AlarmInd": alarmInd})
}
}
@@ -462,20 +458,20 @@
for _, intfID := range techRange.IntfIds {
om.TechProfile[intfID] = &(om.rsrMgr.ResourceMgrs[intfID].TechProfileMgr)
tpCount++
- log.Debugw("Init tech profile done", log.Fields{"intfID": intfID})
+ logger.Debugw(nil, "Init tech profile done", log.Fields{"intfID": intfID})
}
}
//Make sure we have as many tech_profiles as there are pon ports on the device
if tpCount != int(om.deviceInfo.GetPonPorts()) {
- log.Errorw("Error while populating techprofile",
+ logger.Errorw(nil, "Error while populating techprofile",
log.Fields{"numofTech": tpCount, "numPonPorts": om.deviceInfo.GetPonPorts()})
return errors.New("error while populating techprofile mgrs")
}
- log.Infow("Populated techprofile for ponports successfully",
+ logger.Infow(nil, "Populated techprofile for ponports successfully",
log.Fields{"numofTech": tpCount, "numPonPorts": om.deviceInfo.GetPonPorts()})
return nil
}
func isPowerOfTwo(numOfOnus uint) bool {
return (numOfOnus & (numOfOnus - 1)) == 0
-}
\ No newline at end of file
+}