blob: efd14890aa6c7ae20b0b98c612c5ca6397acc332 [file] [log] [blame]
Girish Gowdra64503432020-01-07 10:59:10 +05301/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package core
18
19import (
20 "context"
21 "encoding/hex"
22 "encoding/json"
23 "errors"
24 "fmt"
25 "github.com/cenkalti/backoff/v3"
26 "github.com/opencord/openolt-scale-tester/config"
27 "github.com/opencord/voltha-lib-go/v2/pkg/db/kvstore"
28 "github.com/opencord/voltha-lib-go/v2/pkg/log"
29 "github.com/opencord/voltha-lib-go/v2/pkg/techprofile"
30 oop "github.com/opencord/voltha-protos/v2/go/openolt"
31 "google.golang.org/grpc"
32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/status"
34 "io"
35 "io/ioutil"
36 "os"
37 "strconv"
38 "sync"
39 "syscall"
40 "time"
41)
42
43const (
44 ReasonOk = "OK"
45 TechProfileKVPath = "service/voltha/technology_profiles/%s/%d" // service/voltha/technology_profiles/xgspon/<tech_profile_tableID>
Thiyagarajan Subramanib83b0432020-01-08 13:43:28 +053046 DTWorkFlow = "DT"
Girish Gowdra64503432020-01-07 10:59:10 +053047)
48
49type OnuDeviceKey struct {
50 onuID uint32
51 ponInfID uint32
52}
53
54type OpenOltManager struct {
55 ipPort string
56 deviceInfo *oop.DeviceInfo
57 OnuDeviceMap map[OnuDeviceKey]*OnuDevice `json:"onuDeviceMap"`
58 TechProfile map[uint32]*techprofile.TechProfileIf
59 clientConn *grpc.ClientConn
60 openOltClient oop.OpenoltClient
61 testConfig *config.OpenOltScaleTesterConfig
62 rsrMgr *OpenOltResourceMgr
63 lockRsrAlloc sync.RWMutex
64}
65
66func init() {
67 _, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
68}
69
70func NewOpenOltManager(ipPort string) *OpenOltManager {
71 log.Infow("initialized openolt manager with ipPort", log.Fields{"ipPort": ipPort})
72 return &OpenOltManager{
73 ipPort: ipPort,
74 OnuDeviceMap: make(map[OnuDeviceKey]*OnuDevice),
75 lockRsrAlloc: sync.RWMutex{},
76 }
77}
78
79func (om *OpenOltManager) readAndLoadTPsToEtcd() {
80 var byteValue []byte
81 var err error
82 // Verify that etcd is up before starting the application.
83 etcdIpPort := "http://" + om.testConfig.KVStoreHost + ":" + strconv.Itoa(om.testConfig.KVStorePort)
84 client, err := kvstore.NewEtcdClient(etcdIpPort, 5)
85 if err != nil || client == nil {
86 log.Fatal("error-initializing-etcd-client")
87 return
88 }
89
90 // Load TPs to etcd for each of the specified tech-profiles
91 for _, tpID := range om.testConfig.TpIDList {
92 // Below should translate to something like "/app/ATT-64.json"
93 // The TP file should exist.
94 tpFilePath := "/app/" + om.testConfig.WorkflowName + "-" + strconv.Itoa(tpID) + ".json"
95 // Open our jsonFile
96 jsonFile, err := os.Open(tpFilePath)
97 // if we os.Open returns an error then handle it
98 if err != nil {
99 log.Fatalw("could-not-find-tech-profile", log.Fields{"err": err, "tpFile": tpFilePath})
100 }
101 log.Debugw("tp-file-opened-successfully", log.Fields{"tpFile": tpFilePath})
102
103 // read our opened json file as a byte array.
104 if byteValue, err = ioutil.ReadAll(jsonFile); err != nil {
105 log.Fatalw("could-not-read-tp-file", log.Fields{"err": err, "tpFile": tpFilePath})
106 }
107
108 var tp techprofile.TechProfile
109
110 if err = json.Unmarshal(byteValue, &tp); err != nil {
111 log.Fatalw("could-not-unmarshal-tp", log.Fields{"err": err, "tpFile": tpFilePath})
112 } else {
113 log.Infow("tp-read-from-file", log.Fields{"tp": tp, "tpFile": tpFilePath})
114 }
115 kvPath := fmt.Sprintf(TechProfileKVPath, om.deviceInfo.Technology, tpID)
116 tpJson, err := json.Marshal(tp)
117 err = client.Put(kvPath, tpJson, 2)
118 if err != nil {
119 log.Fatalw("tp-put-to-etcd-failed", log.Fields{"tpPath": kvPath, "err": err})
120 }
121 // verify the PUT succeeded.
122 kvResult, err := client.Get(kvPath, 2)
123 if kvResult == nil {
124 log.Fatal("tp-not-found-on-kv-after-load", log.Fields{"key": kvPath, "err": err})
125 } else {
126 var KvTpIns techprofile.TechProfile
127 var resPtr = &KvTpIns
128 if value, err := kvstore.ToByte(kvResult.Value); err == nil {
129 if err = json.Unmarshal(value, resPtr); err != nil {
130 log.Fatal("error-unmarshal-kv-result", log.Fields{"err": err, "key": kvPath, "value": value})
131 } else {
132 log.Infow("verified-ok-that-tp-load-was-good", log.Fields{"tpID": tpID, "kvPath": kvPath})
133 _ = jsonFile.Close()
134 continue
135 }
136 }
137 }
138 }
139}
140
141func (om *OpenOltManager) Start(testConfig *config.OpenOltScaleTesterConfig) error {
142 var err error
143 om.testConfig = testConfig
144
145 // Establish gRPC connection with the device
146 if om.clientConn, err = grpc.Dial(om.ipPort, grpc.WithInsecure(), grpc.WithBlock()); err != nil {
147 log.Errorw("Failed to dial device", log.Fields{"ipPort": om.ipPort, "err": err})
148 return err
149 }
150 om.openOltClient = oop.NewOpenoltClient(om.clientConn)
151
152 // Populate Device Info
153 if deviceInfo, err := om.populateDeviceInfo(); err != nil {
154 log.Error("error fetching device info", log.Fields{"err": err, "deviceInfo": deviceInfo})
155 return err
156 }
157
158 // Read and load TPs to etcd.
159 om.readAndLoadTPsToEtcd()
160
161 log.Info("etcd-up-and-running--tp-loaded-successfully")
162
163 if om.rsrMgr = NewResourceMgr("ABCD", om.testConfig.KVStoreHost+":"+strconv.Itoa(om.testConfig.KVStorePort),
164 "etcd", "openolt", om.deviceInfo); om.rsrMgr == nil {
165 log.Error("Error while instantiating resource manager")
166 return errors.New("instantiating resource manager failed")
167 }
168
169 om.TechProfile = make(map[uint32]*techprofile.TechProfileIf)
170 if err = om.populateTechProfilePerPonPort(); err != nil {
171 log.Error("Error while populating tech profile mgr\n")
172 return errors.New("error-loading-tech-profile-per-ponPort")
173 }
174
175 // Start reading indications
176 go om.readIndications()
177
178 // Provision OLT NNI Trap flows as needed by the Workflow
Thiyagarajan Subramanib83b0432020-01-08 13:43:28 +0530179 if om.testConfig.WorkflowName != DTWorkFlow {
180 if err = ProvisionNniTrapFlow(om.openOltClient, om.testConfig, om.rsrMgr); err != nil {
181 log.Error("failed-to-add-nni-trap-flow", log.Fields{"err": err})
182 }
Girish Gowdra64503432020-01-07 10:59:10 +0530183 }
184
185 // Provision ONUs one by one
186 go om.provisionONUs()
187
188 return nil
189
190}
191
192func (om *OpenOltManager) populateDeviceInfo() (*oop.DeviceInfo, error) {
193 var err error
194
195 if om.deviceInfo, err = om.openOltClient.GetDeviceInfo(context.Background(), new(oop.Empty)); err != nil {
196 log.Errorw("Failed to fetch device info", log.Fields{"err": err})
197 return nil, err
198 }
199
200 if om.deviceInfo == nil {
201 log.Errorw("Device info is nil", log.Fields{})
202 return nil, errors.New("failed to get device info from OLT")
203 }
204
205 log.Debugw("Fetched device info", log.Fields{"deviceInfo": om.deviceInfo})
206
207 return om.deviceInfo, nil
208}
209
210func (om *OpenOltManager) provisionONUs() {
211 var numOfONUsPerPon uint
212 var i, j, onuID uint32
213 var err error
214 oltChan := make(chan bool)
215 numOfONUsPerPon = om.testConfig.NumOfOnu / uint(om.deviceInfo.PonPorts)
216 if oddONUs := om.testConfig.NumOfOnu % uint(om.deviceInfo.PonPorts); oddONUs > 0 {
217 log.Warnw("Odd number ONUs left out of provisioning", log.Fields{"oddONUs": oddONUs})
218 }
219 totalOnusToProvision := numOfONUsPerPon * uint(om.deviceInfo.PonPorts)
220 log.Infow("***** all-onu-provision-started ******",
221 log.Fields{"totalNumOnus": totalOnusToProvision,
222 "numOfOnusPerPon": numOfONUsPerPon,
223 "numOfPons": om.deviceInfo.PonPorts})
224 for i = 0; i < om.deviceInfo.PonPorts; i++ {
225 for j = 0; j < uint32(numOfONUsPerPon); j++ {
226 // TODO: More work with ONU provisioning
227 om.lockRsrAlloc.Lock()
228 sn := GenerateNextONUSerialNumber()
229 om.lockRsrAlloc.Unlock()
230 log.Debugw("provisioning onu", log.Fields{"onuID": j, "ponPort": i, "serialNum": sn})
231 if onuID, err = om.rsrMgr.GetONUID(i); err != nil {
232 log.Errorw("error getting onu id", log.Fields{"err": err})
233 continue
234 }
235 log.Infow("onu-provision-started-from-olt-manager", log.Fields{"onuId": onuID, "ponIntf": i})
236 go om.activateONU(i, onuID, sn, om.stringifySerialNumber(sn), oltChan)
237 // Wait for complete ONU provision to succeed, including provisioning the subscriber
238 <-oltChan
239 log.Infow("onu-provision-completed-from-olt-manager", log.Fields{"onuId": onuID, "ponIntf": i})
240
241 // Sleep for configured time before provisioning next ONU
242 time.Sleep(time.Duration(om.testConfig.TimeIntervalBetweenSubs))
243 }
244 }
245 log.Info("******** all-onu-provisioning-completed *******")
246
247 // TODO: We need to dump the results at the end. But below json marshall does not work
248 // We will need custom Marshal function.
249 /*
250 e, err := json.Marshal(om)
251 if err != nil {
252 fmt.Println(err)
253 return
254 }
255 fmt.Println(string(e))
256 */
257
258 // Stop the process once the job is done
259 _ = syscall.Kill(syscall.Getpid(), syscall.SIGINT)
260}
261
262func (om *OpenOltManager) activateONU(intfID uint32, onuID uint32, serialNum *oop.SerialNumber, serialNumber string, oltCh chan bool) {
263 log.Debugw("activate-onu", log.Fields{"intfID": intfID, "onuID": onuID, "serialNum": serialNum, "serialNumber": serialNumber})
264 // TODO: need resource manager
265 var pir uint32 = 1000000
266 var onuDevice = OnuDevice{
267 SerialNum: serialNumber,
268 OnuID: onuID,
269 PonIntf: intfID,
270 openOltClient: om.openOltClient,
271 testConfig: om.testConfig,
272 rsrMgr: om.rsrMgr,
273 }
274 var err error
275 onuDeviceKey := OnuDeviceKey{onuID: onuID, ponInfID: intfID}
276 Onu := oop.Onu{IntfId: intfID, OnuId: onuID, SerialNumber: serialNum, Pir: pir}
277 now := time.Now()
278 nanos := now.UnixNano()
279 milliStart := nanos / 1000000
280 onuDevice.OnuProvisionStartTime = time.Unix(0, nanos)
281 if _, err = om.openOltClient.ActivateOnu(context.Background(), &Onu); err != nil {
282 st, _ := status.FromError(err)
283 if st.Code() == codes.AlreadyExists {
284 log.Debug("ONU activation is in progress", log.Fields{"SerialNumber": serialNumber})
285 oltCh <- false
286 } else {
287 nanos = now.UnixNano()
288 milliEnd := nanos / 1000000
289 onuDevice.OnuProvisionEndTime = time.Unix(0, nanos)
290 onuDevice.OnuProvisionDurationInMs = milliEnd - milliStart
291 log.Errorw("activate-onu-failed", log.Fields{"Onu": Onu, "err ": err})
292 onuDevice.Reason = err.Error()
293 oltCh <- false
294 }
295 } else {
296 nanos = now.UnixNano()
297 milliEnd := nanos / 1000000
298 onuDevice.OnuProvisionEndTime = time.Unix(0, nanos)
299 onuDevice.OnuProvisionDurationInMs = milliEnd - milliStart
300 onuDevice.Reason = ReasonOk
301 log.Infow("activated-onu", log.Fields{"SerialNumber": serialNumber})
302 }
303
304 om.OnuDeviceMap[onuDeviceKey] = &onuDevice
305
306 // If ONU activation was success provision the ONU
307 if err == nil {
308 // start provisioning the ONU
309 go om.OnuDeviceMap[onuDeviceKey].Start(oltCh)
310 }
311}
312
313func (om *OpenOltManager) stringifySerialNumber(serialNum *oop.SerialNumber) string {
314 if serialNum != nil {
315 return string(serialNum.VendorId) + om.stringifyVendorSpecific(serialNum.VendorSpecific)
316 }
317 return ""
318}
319
320func (om *OpenOltManager) stringifyVendorSpecific(vendorSpecific []byte) string {
321 tmp := fmt.Sprintf("%x", (uint32(vendorSpecific[0])>>4)&0x0f) +
322 fmt.Sprintf("%x", uint32(vendorSpecific[0]&0x0f)) +
323 fmt.Sprintf("%x", (uint32(vendorSpecific[1])>>4)&0x0f) +
324 fmt.Sprintf("%x", (uint32(vendorSpecific[1]))&0x0f) +
325 fmt.Sprintf("%x", (uint32(vendorSpecific[2])>>4)&0x0f) +
326 fmt.Sprintf("%x", (uint32(vendorSpecific[2]))&0x0f) +
327 fmt.Sprintf("%x", (uint32(vendorSpecific[3])>>4)&0x0f) +
328 fmt.Sprintf("%x", (uint32(vendorSpecific[3]))&0x0f)
329 return tmp
330}
331
332// readIndications to read the indications from the OLT device
333func (om *OpenOltManager) readIndications() {
334 defer log.Errorw("Indications ended", log.Fields{})
335 indications, err := om.openOltClient.EnableIndication(context.Background(), new(oop.Empty))
336 if err != nil {
337 log.Errorw("Failed to read indications", log.Fields{"err": err})
338 return
339 }
340 if indications == nil {
341 log.Errorw("Indications is nil", log.Fields{})
342 return
343 }
344
345 // Create an exponential backoff around re-enabling indications. The
346 // maximum elapsed time for the back off is set to 0 so that we will
347 // continue to retry. The max interval defaults to 1m, but is set
348 // here for code clarity
349 indicationBackoff := backoff.NewExponentialBackOff()
350 indicationBackoff.MaxElapsedTime = 0
351 indicationBackoff.MaxInterval = 1 * time.Minute
352 for {
353 indication, err := indications.Recv()
354 if err == io.EOF {
355 log.Infow("EOF for indications", log.Fields{"err": err})
356 // Use an exponential back off to prevent getting into a tight loop
357 duration := indicationBackoff.NextBackOff()
358 if duration == backoff.Stop {
359 // If we reach a maximum then warn and reset the backoff
360 // timer and keep attempting.
361 log.Warnw("Maximum indication backoff reached, resetting backoff timer",
362 log.Fields{"max_indication_backoff": indicationBackoff.MaxElapsedTime})
363 indicationBackoff.Reset()
364 }
365 time.Sleep(indicationBackoff.NextBackOff())
366 indications, err = om.openOltClient.EnableIndication(context.Background(), new(oop.Empty))
367 if err != nil {
368 log.Errorw("Failed to read indications", log.Fields{"err": err})
369 return
370 }
371 continue
372 }
373 if err != nil {
374 log.Infow("Failed to read from indications", log.Fields{"err": err})
375 break
376 }
377 // Reset backoff if we have a successful receive
378 indicationBackoff.Reset()
379 om.handleIndication(indication)
380
381 }
382}
383
384func (om *OpenOltManager) handleIndication(indication *oop.Indication) {
385 switch indication.Data.(type) {
386 case *oop.Indication_OltInd:
387 log.Info("received olt indication")
388 case *oop.Indication_IntfInd:
389 intfInd := indication.GetIntfInd()
390 log.Infow("Received interface indication ", log.Fields{"InterfaceInd": intfInd})
391 case *oop.Indication_IntfOperInd:
392 intfOperInd := indication.GetIntfOperInd()
393 if intfOperInd.GetType() == "nni" {
394 log.Info("received interface oper indication for nni port")
395 } else if intfOperInd.GetType() == "pon" {
396 log.Info("received interface oper indication for pon port")
397 }
398 /*
399 case *oop.Indication_OnuDiscInd:
400 onuDiscInd := indication.GetOnuDiscInd()
401 log.Infow("Received Onu discovery indication ", log.Fields{"OnuDiscInd": onuDiscInd})
402 */
403 case *oop.Indication_OnuInd:
404 onuInd := indication.GetOnuInd()
405 log.Infow("Received Onu indication ", log.Fields{"OnuInd": onuInd})
406 case *oop.Indication_OmciInd:
407 omciInd := indication.GetOmciInd()
408 log.Debugw("Received Omci indication ", log.Fields{"IntfId": omciInd.IntfId, "OnuId": omciInd.OnuId, "pkt": hex.EncodeToString(omciInd.Pkt)})
409 case *oop.Indication_PktInd:
410 pktInd := indication.GetPktInd()
411 log.Infow("Received pakcet indication ", log.Fields{"PktInd": pktInd})
412 /*
413 case *oop.Indication_PortStats:
414 portStats := indication.GetPortStats()
415 log.Infow("Received port stats", log.Fields{"portStats": portStats})
416 case *oop.Indication_FlowStats:
417 flowStats := indication.GetFlowStats()
418 log.Infow("Received flow stats", log.Fields{"FlowStats": flowStats})
419 */
420 case *oop.Indication_AlarmInd:
421 alarmInd := indication.GetAlarmInd()
422 log.Infow("Received alarm indication ", log.Fields{"AlarmInd": alarmInd})
423 }
424}
425
426func (om *OpenOltManager) populateTechProfilePerPonPort() error {
427 var tpCount int
428 for _, techRange := range om.deviceInfo.Ranges {
429 for _, intfID := range techRange.IntfIds {
430 om.TechProfile[intfID] = &(om.rsrMgr.ResourceMgrs[intfID].TechProfileMgr)
431 tpCount++
432 log.Debugw("Init tech profile done", log.Fields{"intfID": intfID})
433 }
434 }
435 //Make sure we have as many tech_profiles as there are pon ports on the device
436 if tpCount != int(om.deviceInfo.GetPonPorts()) {
437 log.Errorw("Error while populating techprofile",
438 log.Fields{"numofTech": tpCount, "numPonPorts": om.deviceInfo.GetPonPorts()})
439 return errors.New("error while populating techprofile mgrs")
440 }
441 log.Infow("Populated techprofile for ponports successfully",
442 log.Fields{"numofTech": tpCount, "numPonPorts": om.deviceInfo.GetPonPorts()})
443 return nil
444}