blob: f4e31434ec10fe47ac9427c740350e58c014f09f [file] [log] [blame]
Girish Gowdra64503432020-01-07 10:59:10 +05301/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package core
18
19import (
20 "context"
21 "encoding/hex"
22 "encoding/json"
23 "errors"
24 "fmt"
Girish Gowdra64503432020-01-07 10:59:10 +053025 "io"
26 "io/ioutil"
27 "os"
28 "strconv"
29 "sync"
30 "syscall"
31 "time"
Orhan Kupusoglu66b00d82020-03-13 12:06:33 +030032
33 "github.com/cenkalti/backoff/v3"
34 "github.com/opencord/openolt-scale-tester/config"
35 "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
36 "github.com/opencord/voltha-lib-go/v3/pkg/log"
37 "github.com/opencord/voltha-lib-go/v3/pkg/techprofile"
38 oop "github.com/opencord/voltha-protos/v3/go/openolt"
39 "google.golang.org/grpc"
40 "google.golang.org/grpc/codes"
41 "google.golang.org/grpc/status"
Girish Gowdra64503432020-01-07 10:59:10 +053042)
43
44const (
45 ReasonOk = "OK"
46 TechProfileKVPath = "service/voltha/technology_profiles/%s/%d" // service/voltha/technology_profiles/xgspon/<tech_profile_tableID>
47)
48
49type OnuDeviceKey struct {
50 onuID uint32
51 ponInfID uint32
52}
53
54type OpenOltManager struct {
55 ipPort string
56 deviceInfo *oop.DeviceInfo
57 OnuDeviceMap map[OnuDeviceKey]*OnuDevice `json:"onuDeviceMap"`
58 TechProfile map[uint32]*techprofile.TechProfileIf
59 clientConn *grpc.ClientConn
60 openOltClient oop.OpenoltClient
61 testConfig *config.OpenOltScaleTesterConfig
62 rsrMgr *OpenOltResourceMgr
63 lockRsrAlloc sync.RWMutex
Girish Gowdraaeceb842020-08-21 12:10:39 -070064 lockOpenOltManager sync.RWMutex
Girish Gowdra64503432020-01-07 10:59:10 +053065}
66
67func init() {
68 _, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
69}
70
71func NewOpenOltManager(ipPort string) *OpenOltManager {
72 log.Infow("initialized openolt manager with ipPort", log.Fields{"ipPort": ipPort})
73 return &OpenOltManager{
74 ipPort: ipPort,
75 OnuDeviceMap: make(map[OnuDeviceKey]*OnuDevice),
76 lockRsrAlloc: sync.RWMutex{},
Girish Gowdraaeceb842020-08-21 12:10:39 -070077 lockOpenOltManager: sync.RWMutex{},
Girish Gowdra64503432020-01-07 10:59:10 +053078 }
79}
80
81func (om *OpenOltManager) readAndLoadTPsToEtcd() {
82 var byteValue []byte
83 var err error
84 // Verify that etcd is up before starting the application.
85 etcdIpPort := "http://" + om.testConfig.KVStoreHost + ":" + strconv.Itoa(om.testConfig.KVStorePort)
86 client, err := kvstore.NewEtcdClient(etcdIpPort, 5)
87 if err != nil || client == nil {
88 log.Fatal("error-initializing-etcd-client")
89 return
90 }
91
92 // Load TPs to etcd for each of the specified tech-profiles
93 for _, tpID := range om.testConfig.TpIDList {
94 // Below should translate to something like "/app/ATT-64.json"
95 // The TP file should exist.
96 tpFilePath := "/app/" + om.testConfig.WorkflowName + "-" + strconv.Itoa(tpID) + ".json"
97 // Open our jsonFile
98 jsonFile, err := os.Open(tpFilePath)
99 // if we os.Open returns an error then handle it
100 if err != nil {
101 log.Fatalw("could-not-find-tech-profile", log.Fields{"err": err, "tpFile": tpFilePath})
102 }
103 log.Debugw("tp-file-opened-successfully", log.Fields{"tpFile": tpFilePath})
104
105 // read our opened json file as a byte array.
106 if byteValue, err = ioutil.ReadAll(jsonFile); err != nil {
107 log.Fatalw("could-not-read-tp-file", log.Fields{"err": err, "tpFile": tpFilePath})
108 }
109
110 var tp techprofile.TechProfile
111
112 if err = json.Unmarshal(byteValue, &tp); err != nil {
113 log.Fatalw("could-not-unmarshal-tp", log.Fields{"err": err, "tpFile": tpFilePath})
114 } else {
115 log.Infow("tp-read-from-file", log.Fields{"tp": tp, "tpFile": tpFilePath})
116 }
117 kvPath := fmt.Sprintf(TechProfileKVPath, om.deviceInfo.Technology, tpID)
118 tpJson, err := json.Marshal(tp)
Orhan Kupusoglu66b00d82020-03-13 12:06:33 +0300119 err = client.Put(context.Background(), kvPath, tpJson)
Girish Gowdra64503432020-01-07 10:59:10 +0530120 if err != nil {
121 log.Fatalw("tp-put-to-etcd-failed", log.Fields{"tpPath": kvPath, "err": err})
122 }
123 // verify the PUT succeeded.
Orhan Kupusoglu66b00d82020-03-13 12:06:33 +0300124 kvResult, err := client.Get(context.Background(), kvPath)
Girish Gowdra64503432020-01-07 10:59:10 +0530125 if kvResult == nil {
126 log.Fatal("tp-not-found-on-kv-after-load", log.Fields{"key": kvPath, "err": err})
127 } else {
128 var KvTpIns techprofile.TechProfile
129 var resPtr = &KvTpIns
130 if value, err := kvstore.ToByte(kvResult.Value); err == nil {
131 if err = json.Unmarshal(value, resPtr); err != nil {
132 log.Fatal("error-unmarshal-kv-result", log.Fields{"err": err, "key": kvPath, "value": value})
133 } else {
134 log.Infow("verified-ok-that-tp-load-was-good", log.Fields{"tpID": tpID, "kvPath": kvPath})
135 _ = jsonFile.Close()
136 continue
137 }
138 }
139 }
140 }
141}
142
143func (om *OpenOltManager) Start(testConfig *config.OpenOltScaleTesterConfig) error {
144 var err error
145 om.testConfig = testConfig
146
147 // Establish gRPC connection with the device
148 if om.clientConn, err = grpc.Dial(om.ipPort, grpc.WithInsecure(), grpc.WithBlock()); err != nil {
149 log.Errorw("Failed to dial device", log.Fields{"ipPort": om.ipPort, "err": err})
150 return err
151 }
152 om.openOltClient = oop.NewOpenoltClient(om.clientConn)
153
154 // Populate Device Info
155 if deviceInfo, err := om.populateDeviceInfo(); err != nil {
156 log.Error("error fetching device info", log.Fields{"err": err, "deviceInfo": deviceInfo})
157 return err
158 }
159
160 // Read and load TPs to etcd.
161 om.readAndLoadTPsToEtcd()
162
163 log.Info("etcd-up-and-running--tp-loaded-successfully")
164
165 if om.rsrMgr = NewResourceMgr("ABCD", om.testConfig.KVStoreHost+":"+strconv.Itoa(om.testConfig.KVStorePort),
166 "etcd", "openolt", om.deviceInfo); om.rsrMgr == nil {
167 log.Error("Error while instantiating resource manager")
168 return errors.New("instantiating resource manager failed")
169 }
170
171 om.TechProfile = make(map[uint32]*techprofile.TechProfileIf)
172 if err = om.populateTechProfilePerPonPort(); err != nil {
173 log.Error("Error while populating tech profile mgr\n")
174 return errors.New("error-loading-tech-profile-per-ponPort")
175 }
176
177 // Start reading indications
178 go om.readIndications()
179
180 // Provision OLT NNI Trap flows as needed by the Workflow
Thiyagarajan Subramanic4f8da82020-02-05 16:08:26 +0530181 if err = ProvisionNniTrapFlow(om.openOltClient, om.testConfig, om.rsrMgr); err != nil {
182 log.Error("failed-to-add-nni-trap-flow", log.Fields{"err": err})
Girish Gowdra64503432020-01-07 10:59:10 +0530183 }
184
185 // Provision ONUs one by one
186 go om.provisionONUs()
187
188 return nil
189
190}
191
192func (om *OpenOltManager) populateDeviceInfo() (*oop.DeviceInfo, error) {
193 var err error
194
195 if om.deviceInfo, err = om.openOltClient.GetDeviceInfo(context.Background(), new(oop.Empty)); err != nil {
196 log.Errorw("Failed to fetch device info", log.Fields{"err": err})
197 return nil, err
198 }
199
200 if om.deviceInfo == nil {
201 log.Errorw("Device info is nil", log.Fields{})
202 return nil, errors.New("failed to get device info from OLT")
203 }
204
205 log.Debugw("Fetched device info", log.Fields{"deviceInfo": om.deviceInfo})
206
207 return om.deviceInfo, nil
208}
209
210func (om *OpenOltManager) provisionONUs() {
211 var numOfONUsPerPon uint
Girish Gowdraaeceb842020-08-21 12:10:39 -0700212 var i, j, k, onuID uint32
Girish Gowdra64503432020-01-07 10:59:10 +0530213 var err error
Girish Gowdraaeceb842020-08-21 12:10:39 -0700214 var onuWg sync.WaitGroup
215
216 defer func() {
217 // Stop the process once the job is done
218 _ = syscall.Kill(syscall.Getpid(), syscall.SIGINT)
219 }()
220
221 // If the number of ONUs to provision is not a power of 2, stop execution
222 // This is needed for ensure even distribution of ONUs across all PONs
223 if !isPowerOfTwo(om.testConfig.NumOfOnu) {
224 log.Errorw("num-of-onus-to-provision-is-not-a-power-of-2", log.Fields{"numOfOnus": om.testConfig.NumOfOnu})
225 return
Girish Gowdra64503432020-01-07 10:59:10 +0530226 }
Girish Gowdraaeceb842020-08-21 12:10:39 -0700227
228 // Number of ONUs to provision should not be less than the number of PON ports.
229 // We need at least one ONU per PON
230 if om.testConfig.NumOfOnu < uint(om.deviceInfo.PonPorts) {
231 log.Errorw("num-of-onu-is-less-than-num-of-pon-port", log.Fields{"numOfOnus":om.testConfig.NumOfOnu, "numOfPon": om.deviceInfo.PonPorts})
232 return
233 }
234
235 numOfONUsPerPon = om.testConfig.NumOfOnu / uint(om.deviceInfo.PonPorts)
Girish Gowdra64503432020-01-07 10:59:10 +0530236 totalOnusToProvision := numOfONUsPerPon * uint(om.deviceInfo.PonPorts)
237 log.Infow("***** all-onu-provision-started ******",
238 log.Fields{"totalNumOnus": totalOnusToProvision,
239 "numOfOnusPerPon": numOfONUsPerPon,
240 "numOfPons": om.deviceInfo.PonPorts})
Girish Gowdra64503432020-01-07 10:59:10 +0530241
Girish Gowdraaeceb842020-08-21 12:10:39 -0700242 // These are the number of ONUs that will be provisioned per PON port per batch.
243 // Such number of ONUs will be chosen across all PON ports per batch
244 var onusPerIterationPerPonPort uint32 = 4
245
246 // If the total number of ONUs per PON is lesser than the default ONU to provision per pon port per batch
247 // then keep halving the ONU to provision per pon port per batch until we reach an acceptable number
248 // Note: the least possible value for onusPerIterationPerPonPort is 1
249 for uint32(numOfONUsPerPon) < onusPerIterationPerPonPort {
250 onusPerIterationPerPonPort /= 2
Girish Gowdra64503432020-01-07 10:59:10 +0530251 }
Girish Gowdra64503432020-01-07 10:59:10 +0530252
Girish Gowdraaeceb842020-08-21 12:10:39 -0700253 startTime := time.Now()
254 // Start provisioning the ONUs
255 for i = 0; i < uint32(numOfONUsPerPon)/onusPerIterationPerPonPort; i++ {
256 for j = 0; j < om.deviceInfo.PonPorts; j++ {
257 for k = 0; k < onusPerIterationPerPonPort; k++ {
258 om.lockRsrAlloc.Lock()
259 sn := GenerateNextONUSerialNumber()
260 om.lockRsrAlloc.Unlock()
261 log.Debugw("provisioning onu", log.Fields{"onuID": j, "ponPort": i, "serialNum": sn})
262 if onuID, err = om.rsrMgr.GetONUID(j); err != nil {
263 log.Errorw("error getting onu id", log.Fields{"err": err})
264 continue
265 }
266 log.Infow("onu-provision-started-from-olt-manager", log.Fields{"onuId": onuID, "ponIntf": i})
267
268 onuWg.Add(1)
269 go om.activateONU(j, onuID, sn, om.stringifySerialNumber(sn), &onuWg)
270 }
271 }
272 // Wait for the group of ONUs to complete processing before going to next batch of ONUs
273 onuWg.Wait()
274 }
275 endTime := time.Now()
276 log.Info("******** all-onu-provisioning-completed *******")
277 totalTime := endTime.Sub(startTime)
278 out := time.Time{}.Add(totalTime)
279 log.Infof("****** Total Time to provision all the ONUs is => %s", out.Format("15:04:05"))
280
281 // TODO: We need to dump the results at the end. But below json marshall does not work. We will need custom Marshal function.
Girish Gowdra64503432020-01-07 10:59:10 +0530282 /*
283 e, err := json.Marshal(om)
284 if err != nil {
285 fmt.Println(err)
286 return
287 }
288 fmt.Println(string(e))
289 */
Girish Gowdra64503432020-01-07 10:59:10 +0530290}
291
Girish Gowdraaeceb842020-08-21 12:10:39 -0700292func (om *OpenOltManager) activateONU(intfID uint32, onuID uint32, serialNum *oop.SerialNumber, serialNumber string, onuWg *sync.WaitGroup) {
Girish Gowdra64503432020-01-07 10:59:10 +0530293 log.Debugw("activate-onu", log.Fields{"intfID": intfID, "onuID": onuID, "serialNum": serialNum, "serialNumber": serialNumber})
294 // TODO: need resource manager
295 var pir uint32 = 1000000
296 var onuDevice = OnuDevice{
297 SerialNum: serialNumber,
298 OnuID: onuID,
299 PonIntf: intfID,
300 openOltClient: om.openOltClient,
301 testConfig: om.testConfig,
302 rsrMgr: om.rsrMgr,
Girish Gowdraaeceb842020-08-21 12:10:39 -0700303 onuWg: onuWg,
Girish Gowdra64503432020-01-07 10:59:10 +0530304 }
305 var err error
306 onuDeviceKey := OnuDeviceKey{onuID: onuID, ponInfID: intfID}
307 Onu := oop.Onu{IntfId: intfID, OnuId: onuID, SerialNumber: serialNum, Pir: pir}
308 now := time.Now()
309 nanos := now.UnixNano()
310 milliStart := nanos / 1000000
311 onuDevice.OnuProvisionStartTime = time.Unix(0, nanos)
312 if _, err = om.openOltClient.ActivateOnu(context.Background(), &Onu); err != nil {
313 st, _ := status.FromError(err)
314 if st.Code() == codes.AlreadyExists {
315 log.Debug("ONU activation is in progress", log.Fields{"SerialNumber": serialNumber})
Girish Gowdra64503432020-01-07 10:59:10 +0530316 } else {
317 nanos = now.UnixNano()
318 milliEnd := nanos / 1000000
319 onuDevice.OnuProvisionEndTime = time.Unix(0, nanos)
320 onuDevice.OnuProvisionDurationInMs = milliEnd - milliStart
321 log.Errorw("activate-onu-failed", log.Fields{"Onu": Onu, "err ": err})
322 onuDevice.Reason = err.Error()
Girish Gowdra64503432020-01-07 10:59:10 +0530323 }
324 } else {
325 nanos = now.UnixNano()
326 milliEnd := nanos / 1000000
327 onuDevice.OnuProvisionEndTime = time.Unix(0, nanos)
328 onuDevice.OnuProvisionDurationInMs = milliEnd - milliStart
329 onuDevice.Reason = ReasonOk
330 log.Infow("activated-onu", log.Fields{"SerialNumber": serialNumber})
331 }
332
Girish Gowdraaeceb842020-08-21 12:10:39 -0700333 om.lockOpenOltManager.Lock()
Girish Gowdra64503432020-01-07 10:59:10 +0530334 om.OnuDeviceMap[onuDeviceKey] = &onuDevice
Girish Gowdraaeceb842020-08-21 12:10:39 -0700335 om.lockOpenOltManager.Unlock()
Girish Gowdra64503432020-01-07 10:59:10 +0530336
337 // If ONU activation was success provision the ONU
338 if err == nil {
Girish Gowdraaeceb842020-08-21 12:10:39 -0700339 om.lockOpenOltManager.RLock()
340 go om.OnuDeviceMap[onuDeviceKey].Start()
341 om.lockOpenOltManager.RUnlock()
342
Girish Gowdra64503432020-01-07 10:59:10 +0530343 }
344}
345
346func (om *OpenOltManager) stringifySerialNumber(serialNum *oop.SerialNumber) string {
347 if serialNum != nil {
348 return string(serialNum.VendorId) + om.stringifyVendorSpecific(serialNum.VendorSpecific)
349 }
350 return ""
351}
352
353func (om *OpenOltManager) stringifyVendorSpecific(vendorSpecific []byte) string {
354 tmp := fmt.Sprintf("%x", (uint32(vendorSpecific[0])>>4)&0x0f) +
355 fmt.Sprintf("%x", uint32(vendorSpecific[0]&0x0f)) +
356 fmt.Sprintf("%x", (uint32(vendorSpecific[1])>>4)&0x0f) +
357 fmt.Sprintf("%x", (uint32(vendorSpecific[1]))&0x0f) +
358 fmt.Sprintf("%x", (uint32(vendorSpecific[2])>>4)&0x0f) +
359 fmt.Sprintf("%x", (uint32(vendorSpecific[2]))&0x0f) +
360 fmt.Sprintf("%x", (uint32(vendorSpecific[3])>>4)&0x0f) +
361 fmt.Sprintf("%x", (uint32(vendorSpecific[3]))&0x0f)
362 return tmp
363}
364
365// readIndications to read the indications from the OLT device
366func (om *OpenOltManager) readIndications() {
367 defer log.Errorw("Indications ended", log.Fields{})
368 indications, err := om.openOltClient.EnableIndication(context.Background(), new(oop.Empty))
369 if err != nil {
370 log.Errorw("Failed to read indications", log.Fields{"err": err})
371 return
372 }
373 if indications == nil {
374 log.Errorw("Indications is nil", log.Fields{})
375 return
376 }
377
378 // Create an exponential backoff around re-enabling indications. The
379 // maximum elapsed time for the back off is set to 0 so that we will
380 // continue to retry. The max interval defaults to 1m, but is set
381 // here for code clarity
382 indicationBackoff := backoff.NewExponentialBackOff()
383 indicationBackoff.MaxElapsedTime = 0
384 indicationBackoff.MaxInterval = 1 * time.Minute
385 for {
386 indication, err := indications.Recv()
387 if err == io.EOF {
388 log.Infow("EOF for indications", log.Fields{"err": err})
389 // Use an exponential back off to prevent getting into a tight loop
390 duration := indicationBackoff.NextBackOff()
391 if duration == backoff.Stop {
392 // If we reach a maximum then warn and reset the backoff
393 // timer and keep attempting.
394 log.Warnw("Maximum indication backoff reached, resetting backoff timer",
395 log.Fields{"max_indication_backoff": indicationBackoff.MaxElapsedTime})
396 indicationBackoff.Reset()
397 }
398 time.Sleep(indicationBackoff.NextBackOff())
399 indications, err = om.openOltClient.EnableIndication(context.Background(), new(oop.Empty))
400 if err != nil {
401 log.Errorw("Failed to read indications", log.Fields{"err": err})
402 return
403 }
404 continue
405 }
406 if err != nil {
407 log.Infow("Failed to read from indications", log.Fields{"err": err})
408 break
409 }
410 // Reset backoff if we have a successful receive
411 indicationBackoff.Reset()
412 om.handleIndication(indication)
413
414 }
415}
416
417func (om *OpenOltManager) handleIndication(indication *oop.Indication) {
418 switch indication.Data.(type) {
419 case *oop.Indication_OltInd:
420 log.Info("received olt indication")
421 case *oop.Indication_IntfInd:
422 intfInd := indication.GetIntfInd()
423 log.Infow("Received interface indication ", log.Fields{"InterfaceInd": intfInd})
424 case *oop.Indication_IntfOperInd:
425 intfOperInd := indication.GetIntfOperInd()
426 if intfOperInd.GetType() == "nni" {
427 log.Info("received interface oper indication for nni port")
428 } else if intfOperInd.GetType() == "pon" {
429 log.Info("received interface oper indication for pon port")
430 }
431 /*
432 case *oop.Indication_OnuDiscInd:
433 onuDiscInd := indication.GetOnuDiscInd()
434 log.Infow("Received Onu discovery indication ", log.Fields{"OnuDiscInd": onuDiscInd})
435 */
436 case *oop.Indication_OnuInd:
437 onuInd := indication.GetOnuInd()
438 log.Infow("Received Onu indication ", log.Fields{"OnuInd": onuInd})
439 case *oop.Indication_OmciInd:
440 omciInd := indication.GetOmciInd()
441 log.Debugw("Received Omci indication ", log.Fields{"IntfId": omciInd.IntfId, "OnuId": omciInd.OnuId, "pkt": hex.EncodeToString(omciInd.Pkt)})
442 case *oop.Indication_PktInd:
443 pktInd := indication.GetPktInd()
Orhan Kupusoglu66b00d82020-03-13 12:06:33 +0300444 log.Infow("Received packet indication ", log.Fields{"PktInd": pktInd})
Girish Gowdra64503432020-01-07 10:59:10 +0530445 /*
446 case *oop.Indication_PortStats:
447 portStats := indication.GetPortStats()
448 log.Infow("Received port stats", log.Fields{"portStats": portStats})
449 case *oop.Indication_FlowStats:
450 flowStats := indication.GetFlowStats()
451 log.Infow("Received flow stats", log.Fields{"FlowStats": flowStats})
452 */
453 case *oop.Indication_AlarmInd:
454 alarmInd := indication.GetAlarmInd()
455 log.Infow("Received alarm indication ", log.Fields{"AlarmInd": alarmInd})
456 }
457}
458
459func (om *OpenOltManager) populateTechProfilePerPonPort() error {
460 var tpCount int
461 for _, techRange := range om.deviceInfo.Ranges {
462 for _, intfID := range techRange.IntfIds {
463 om.TechProfile[intfID] = &(om.rsrMgr.ResourceMgrs[intfID].TechProfileMgr)
464 tpCount++
465 log.Debugw("Init tech profile done", log.Fields{"intfID": intfID})
466 }
467 }
468 //Make sure we have as many tech_profiles as there are pon ports on the device
469 if tpCount != int(om.deviceInfo.GetPonPorts()) {
470 log.Errorw("Error while populating techprofile",
471 log.Fields{"numofTech": tpCount, "numPonPorts": om.deviceInfo.GetPonPorts()})
472 return errors.New("error while populating techprofile mgrs")
473 }
474 log.Infow("Populated techprofile for ponports successfully",
475 log.Fields{"numofTech": tpCount, "numPonPorts": om.deviceInfo.GetPonPorts()})
476 return nil
477}
Girish Gowdraaeceb842020-08-21 12:10:39 -0700478
479func isPowerOfTwo(numOfOnus uint) bool {
480 return (numOfOnus & (numOfOnus - 1)) == 0
481}