blob: 78dd97e8905956742074ae6c7c8f8debca631883 [file] [log] [blame]
Girish Gowdra64503432020-01-07 10:59:10 +05301/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package core
18
19import (
20 "context"
21 "encoding/hex"
22 "encoding/json"
23 "errors"
24 "fmt"
Girish Gowdra64503432020-01-07 10:59:10 +053025 "io"
26 "io/ioutil"
27 "os"
28 "strconv"
29 "sync"
30 "syscall"
31 "time"
Orhan Kupusoglu66b00d82020-03-13 12:06:33 +030032
33 "github.com/cenkalti/backoff/v3"
34 "github.com/opencord/openolt-scale-tester/config"
35 "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
36 "github.com/opencord/voltha-lib-go/v3/pkg/log"
37 "github.com/opencord/voltha-lib-go/v3/pkg/techprofile"
38 oop "github.com/opencord/voltha-protos/v3/go/openolt"
39 "google.golang.org/grpc"
40 "google.golang.org/grpc/codes"
41 "google.golang.org/grpc/status"
Girish Gowdra64503432020-01-07 10:59:10 +053042)
43
44const (
45 ReasonOk = "OK"
46 TechProfileKVPath = "service/voltha/technology_profiles/%s/%d" // service/voltha/technology_profiles/xgspon/<tech_profile_tableID>
Thiyagarajan Subramanib83b0432020-01-08 13:43:28 +053047 DTWorkFlow = "DT"
Girish Gowdra64503432020-01-07 10:59:10 +053048)
49
50type OnuDeviceKey struct {
51 onuID uint32
52 ponInfID uint32
53}
54
55type OpenOltManager struct {
56 ipPort string
57 deviceInfo *oop.DeviceInfo
58 OnuDeviceMap map[OnuDeviceKey]*OnuDevice `json:"onuDeviceMap"`
59 TechProfile map[uint32]*techprofile.TechProfileIf
60 clientConn *grpc.ClientConn
61 openOltClient oop.OpenoltClient
62 testConfig *config.OpenOltScaleTesterConfig
63 rsrMgr *OpenOltResourceMgr
64 lockRsrAlloc sync.RWMutex
65}
66
67func init() {
68 _, _ = log.AddPackage(log.JSON, log.DebugLevel, nil)
69}
70
71func NewOpenOltManager(ipPort string) *OpenOltManager {
72 log.Infow("initialized openolt manager with ipPort", log.Fields{"ipPort": ipPort})
73 return &OpenOltManager{
74 ipPort: ipPort,
75 OnuDeviceMap: make(map[OnuDeviceKey]*OnuDevice),
76 lockRsrAlloc: sync.RWMutex{},
77 }
78}
79
80func (om *OpenOltManager) readAndLoadTPsToEtcd() {
81 var byteValue []byte
82 var err error
83 // Verify that etcd is up before starting the application.
84 etcdIpPort := "http://" + om.testConfig.KVStoreHost + ":" + strconv.Itoa(om.testConfig.KVStorePort)
85 client, err := kvstore.NewEtcdClient(etcdIpPort, 5)
86 if err != nil || client == nil {
87 log.Fatal("error-initializing-etcd-client")
88 return
89 }
90
91 // Load TPs to etcd for each of the specified tech-profiles
92 for _, tpID := range om.testConfig.TpIDList {
93 // Below should translate to something like "/app/ATT-64.json"
94 // The TP file should exist.
95 tpFilePath := "/app/" + om.testConfig.WorkflowName + "-" + strconv.Itoa(tpID) + ".json"
96 // Open our jsonFile
97 jsonFile, err := os.Open(tpFilePath)
98 // if we os.Open returns an error then handle it
99 if err != nil {
100 log.Fatalw("could-not-find-tech-profile", log.Fields{"err": err, "tpFile": tpFilePath})
101 }
102 log.Debugw("tp-file-opened-successfully", log.Fields{"tpFile": tpFilePath})
103
104 // read our opened json file as a byte array.
105 if byteValue, err = ioutil.ReadAll(jsonFile); err != nil {
106 log.Fatalw("could-not-read-tp-file", log.Fields{"err": err, "tpFile": tpFilePath})
107 }
108
109 var tp techprofile.TechProfile
110
111 if err = json.Unmarshal(byteValue, &tp); err != nil {
112 log.Fatalw("could-not-unmarshal-tp", log.Fields{"err": err, "tpFile": tpFilePath})
113 } else {
114 log.Infow("tp-read-from-file", log.Fields{"tp": tp, "tpFile": tpFilePath})
115 }
116 kvPath := fmt.Sprintf(TechProfileKVPath, om.deviceInfo.Technology, tpID)
117 tpJson, err := json.Marshal(tp)
Orhan Kupusoglu66b00d82020-03-13 12:06:33 +0300118 err = client.Put(context.Background(), kvPath, tpJson)
Girish Gowdra64503432020-01-07 10:59:10 +0530119 if err != nil {
120 log.Fatalw("tp-put-to-etcd-failed", log.Fields{"tpPath": kvPath, "err": err})
121 }
122 // verify the PUT succeeded.
Orhan Kupusoglu66b00d82020-03-13 12:06:33 +0300123 kvResult, err := client.Get(context.Background(), kvPath)
Girish Gowdra64503432020-01-07 10:59:10 +0530124 if kvResult == nil {
125 log.Fatal("tp-not-found-on-kv-after-load", log.Fields{"key": kvPath, "err": err})
126 } else {
127 var KvTpIns techprofile.TechProfile
128 var resPtr = &KvTpIns
129 if value, err := kvstore.ToByte(kvResult.Value); err == nil {
130 if err = json.Unmarshal(value, resPtr); err != nil {
131 log.Fatal("error-unmarshal-kv-result", log.Fields{"err": err, "key": kvPath, "value": value})
132 } else {
133 log.Infow("verified-ok-that-tp-load-was-good", log.Fields{"tpID": tpID, "kvPath": kvPath})
134 _ = jsonFile.Close()
135 continue
136 }
137 }
138 }
139 }
140}
141
142func (om *OpenOltManager) Start(testConfig *config.OpenOltScaleTesterConfig) error {
143 var err error
144 om.testConfig = testConfig
145
146 // Establish gRPC connection with the device
147 if om.clientConn, err = grpc.Dial(om.ipPort, grpc.WithInsecure(), grpc.WithBlock()); err != nil {
148 log.Errorw("Failed to dial device", log.Fields{"ipPort": om.ipPort, "err": err})
149 return err
150 }
151 om.openOltClient = oop.NewOpenoltClient(om.clientConn)
152
153 // Populate Device Info
154 if deviceInfo, err := om.populateDeviceInfo(); err != nil {
155 log.Error("error fetching device info", log.Fields{"err": err, "deviceInfo": deviceInfo})
156 return err
157 }
158
159 // Read and load TPs to etcd.
160 om.readAndLoadTPsToEtcd()
161
162 log.Info("etcd-up-and-running--tp-loaded-successfully")
163
164 if om.rsrMgr = NewResourceMgr("ABCD", om.testConfig.KVStoreHost+":"+strconv.Itoa(om.testConfig.KVStorePort),
165 "etcd", "openolt", om.deviceInfo); om.rsrMgr == nil {
166 log.Error("Error while instantiating resource manager")
167 return errors.New("instantiating resource manager failed")
168 }
169
170 om.TechProfile = make(map[uint32]*techprofile.TechProfileIf)
171 if err = om.populateTechProfilePerPonPort(); err != nil {
172 log.Error("Error while populating tech profile mgr\n")
173 return errors.New("error-loading-tech-profile-per-ponPort")
174 }
175
176 // Start reading indications
177 go om.readIndications()
178
179 // Provision OLT NNI Trap flows as needed by the Workflow
Thiyagarajan Subramanic4f8da82020-02-05 16:08:26 +0530180 if err = ProvisionNniTrapFlow(om.openOltClient, om.testConfig, om.rsrMgr); err != nil {
181 log.Error("failed-to-add-nni-trap-flow", log.Fields{"err": err})
Girish Gowdra64503432020-01-07 10:59:10 +0530182 }
183
184 // Provision ONUs one by one
185 go om.provisionONUs()
186
187 return nil
188
189}
190
191func (om *OpenOltManager) populateDeviceInfo() (*oop.DeviceInfo, error) {
192 var err error
193
194 if om.deviceInfo, err = om.openOltClient.GetDeviceInfo(context.Background(), new(oop.Empty)); err != nil {
195 log.Errorw("Failed to fetch device info", log.Fields{"err": err})
196 return nil, err
197 }
198
199 if om.deviceInfo == nil {
200 log.Errorw("Device info is nil", log.Fields{})
201 return nil, errors.New("failed to get device info from OLT")
202 }
203
204 log.Debugw("Fetched device info", log.Fields{"deviceInfo": om.deviceInfo})
205
206 return om.deviceInfo, nil
207}
208
209func (om *OpenOltManager) provisionONUs() {
210 var numOfONUsPerPon uint
211 var i, j, onuID uint32
212 var err error
213 oltChan := make(chan bool)
214 numOfONUsPerPon = om.testConfig.NumOfOnu / uint(om.deviceInfo.PonPorts)
215 if oddONUs := om.testConfig.NumOfOnu % uint(om.deviceInfo.PonPorts); oddONUs > 0 {
216 log.Warnw("Odd number ONUs left out of provisioning", log.Fields{"oddONUs": oddONUs})
217 }
218 totalOnusToProvision := numOfONUsPerPon * uint(om.deviceInfo.PonPorts)
219 log.Infow("***** all-onu-provision-started ******",
220 log.Fields{"totalNumOnus": totalOnusToProvision,
221 "numOfOnusPerPon": numOfONUsPerPon,
222 "numOfPons": om.deviceInfo.PonPorts})
223 for i = 0; i < om.deviceInfo.PonPorts; i++ {
224 for j = 0; j < uint32(numOfONUsPerPon); j++ {
225 // TODO: More work with ONU provisioning
226 om.lockRsrAlloc.Lock()
227 sn := GenerateNextONUSerialNumber()
228 om.lockRsrAlloc.Unlock()
229 log.Debugw("provisioning onu", log.Fields{"onuID": j, "ponPort": i, "serialNum": sn})
230 if onuID, err = om.rsrMgr.GetONUID(i); err != nil {
231 log.Errorw("error getting onu id", log.Fields{"err": err})
232 continue
233 }
234 log.Infow("onu-provision-started-from-olt-manager", log.Fields{"onuId": onuID, "ponIntf": i})
235 go om.activateONU(i, onuID, sn, om.stringifySerialNumber(sn), oltChan)
236 // Wait for complete ONU provision to succeed, including provisioning the subscriber
237 <-oltChan
238 log.Infow("onu-provision-completed-from-olt-manager", log.Fields{"onuId": onuID, "ponIntf": i})
239
240 // Sleep for configured time before provisioning next ONU
241 time.Sleep(time.Duration(om.testConfig.TimeIntervalBetweenSubs))
242 }
243 }
244 log.Info("******** all-onu-provisioning-completed *******")
245
246 // TODO: We need to dump the results at the end. But below json marshall does not work
247 // We will need custom Marshal function.
248 /*
249 e, err := json.Marshal(om)
250 if err != nil {
251 fmt.Println(err)
252 return
253 }
254 fmt.Println(string(e))
255 */
256
257 // Stop the process once the job is done
258 _ = syscall.Kill(syscall.Getpid(), syscall.SIGINT)
259}
260
261func (om *OpenOltManager) activateONU(intfID uint32, onuID uint32, serialNum *oop.SerialNumber, serialNumber string, oltCh chan bool) {
262 log.Debugw("activate-onu", log.Fields{"intfID": intfID, "onuID": onuID, "serialNum": serialNum, "serialNumber": serialNumber})
263 // TODO: need resource manager
264 var pir uint32 = 1000000
265 var onuDevice = OnuDevice{
266 SerialNum: serialNumber,
267 OnuID: onuID,
268 PonIntf: intfID,
269 openOltClient: om.openOltClient,
270 testConfig: om.testConfig,
271 rsrMgr: om.rsrMgr,
272 }
273 var err error
274 onuDeviceKey := OnuDeviceKey{onuID: onuID, ponInfID: intfID}
275 Onu := oop.Onu{IntfId: intfID, OnuId: onuID, SerialNumber: serialNum, Pir: pir}
276 now := time.Now()
277 nanos := now.UnixNano()
278 milliStart := nanos / 1000000
279 onuDevice.OnuProvisionStartTime = time.Unix(0, nanos)
280 if _, err = om.openOltClient.ActivateOnu(context.Background(), &Onu); err != nil {
281 st, _ := status.FromError(err)
282 if st.Code() == codes.AlreadyExists {
283 log.Debug("ONU activation is in progress", log.Fields{"SerialNumber": serialNumber})
284 oltCh <- false
285 } else {
286 nanos = now.UnixNano()
287 milliEnd := nanos / 1000000
288 onuDevice.OnuProvisionEndTime = time.Unix(0, nanos)
289 onuDevice.OnuProvisionDurationInMs = milliEnd - milliStart
290 log.Errorw("activate-onu-failed", log.Fields{"Onu": Onu, "err ": err})
291 onuDevice.Reason = err.Error()
292 oltCh <- false
293 }
294 } else {
295 nanos = now.UnixNano()
296 milliEnd := nanos / 1000000
297 onuDevice.OnuProvisionEndTime = time.Unix(0, nanos)
298 onuDevice.OnuProvisionDurationInMs = milliEnd - milliStart
299 onuDevice.Reason = ReasonOk
300 log.Infow("activated-onu", log.Fields{"SerialNumber": serialNumber})
301 }
302
303 om.OnuDeviceMap[onuDeviceKey] = &onuDevice
304
305 // If ONU activation was success provision the ONU
306 if err == nil {
307 // start provisioning the ONU
308 go om.OnuDeviceMap[onuDeviceKey].Start(oltCh)
309 }
310}
311
312func (om *OpenOltManager) stringifySerialNumber(serialNum *oop.SerialNumber) string {
313 if serialNum != nil {
314 return string(serialNum.VendorId) + om.stringifyVendorSpecific(serialNum.VendorSpecific)
315 }
316 return ""
317}
318
319func (om *OpenOltManager) stringifyVendorSpecific(vendorSpecific []byte) string {
320 tmp := fmt.Sprintf("%x", (uint32(vendorSpecific[0])>>4)&0x0f) +
321 fmt.Sprintf("%x", uint32(vendorSpecific[0]&0x0f)) +
322 fmt.Sprintf("%x", (uint32(vendorSpecific[1])>>4)&0x0f) +
323 fmt.Sprintf("%x", (uint32(vendorSpecific[1]))&0x0f) +
324 fmt.Sprintf("%x", (uint32(vendorSpecific[2])>>4)&0x0f) +
325 fmt.Sprintf("%x", (uint32(vendorSpecific[2]))&0x0f) +
326 fmt.Sprintf("%x", (uint32(vendorSpecific[3])>>4)&0x0f) +
327 fmt.Sprintf("%x", (uint32(vendorSpecific[3]))&0x0f)
328 return tmp
329}
330
331// readIndications to read the indications from the OLT device
332func (om *OpenOltManager) readIndications() {
333 defer log.Errorw("Indications ended", log.Fields{})
334 indications, err := om.openOltClient.EnableIndication(context.Background(), new(oop.Empty))
335 if err != nil {
336 log.Errorw("Failed to read indications", log.Fields{"err": err})
337 return
338 }
339 if indications == nil {
340 log.Errorw("Indications is nil", log.Fields{})
341 return
342 }
343
344 // Create an exponential backoff around re-enabling indications. The
345 // maximum elapsed time for the back off is set to 0 so that we will
346 // continue to retry. The max interval defaults to 1m, but is set
347 // here for code clarity
348 indicationBackoff := backoff.NewExponentialBackOff()
349 indicationBackoff.MaxElapsedTime = 0
350 indicationBackoff.MaxInterval = 1 * time.Minute
351 for {
352 indication, err := indications.Recv()
353 if err == io.EOF {
354 log.Infow("EOF for indications", log.Fields{"err": err})
355 // Use an exponential back off to prevent getting into a tight loop
356 duration := indicationBackoff.NextBackOff()
357 if duration == backoff.Stop {
358 // If we reach a maximum then warn and reset the backoff
359 // timer and keep attempting.
360 log.Warnw("Maximum indication backoff reached, resetting backoff timer",
361 log.Fields{"max_indication_backoff": indicationBackoff.MaxElapsedTime})
362 indicationBackoff.Reset()
363 }
364 time.Sleep(indicationBackoff.NextBackOff())
365 indications, err = om.openOltClient.EnableIndication(context.Background(), new(oop.Empty))
366 if err != nil {
367 log.Errorw("Failed to read indications", log.Fields{"err": err})
368 return
369 }
370 continue
371 }
372 if err != nil {
373 log.Infow("Failed to read from indications", log.Fields{"err": err})
374 break
375 }
376 // Reset backoff if we have a successful receive
377 indicationBackoff.Reset()
378 om.handleIndication(indication)
379
380 }
381}
382
383func (om *OpenOltManager) handleIndication(indication *oop.Indication) {
384 switch indication.Data.(type) {
385 case *oop.Indication_OltInd:
386 log.Info("received olt indication")
387 case *oop.Indication_IntfInd:
388 intfInd := indication.GetIntfInd()
389 log.Infow("Received interface indication ", log.Fields{"InterfaceInd": intfInd})
390 case *oop.Indication_IntfOperInd:
391 intfOperInd := indication.GetIntfOperInd()
392 if intfOperInd.GetType() == "nni" {
393 log.Info("received interface oper indication for nni port")
394 } else if intfOperInd.GetType() == "pon" {
395 log.Info("received interface oper indication for pon port")
396 }
397 /*
398 case *oop.Indication_OnuDiscInd:
399 onuDiscInd := indication.GetOnuDiscInd()
400 log.Infow("Received Onu discovery indication ", log.Fields{"OnuDiscInd": onuDiscInd})
401 */
402 case *oop.Indication_OnuInd:
403 onuInd := indication.GetOnuInd()
404 log.Infow("Received Onu indication ", log.Fields{"OnuInd": onuInd})
405 case *oop.Indication_OmciInd:
406 omciInd := indication.GetOmciInd()
407 log.Debugw("Received Omci indication ", log.Fields{"IntfId": omciInd.IntfId, "OnuId": omciInd.OnuId, "pkt": hex.EncodeToString(omciInd.Pkt)})
408 case *oop.Indication_PktInd:
409 pktInd := indication.GetPktInd()
Orhan Kupusoglu66b00d82020-03-13 12:06:33 +0300410 log.Infow("Received packet indication ", log.Fields{"PktInd": pktInd})
Girish Gowdra64503432020-01-07 10:59:10 +0530411 /*
412 case *oop.Indication_PortStats:
413 portStats := indication.GetPortStats()
414 log.Infow("Received port stats", log.Fields{"portStats": portStats})
415 case *oop.Indication_FlowStats:
416 flowStats := indication.GetFlowStats()
417 log.Infow("Received flow stats", log.Fields{"FlowStats": flowStats})
418 */
419 case *oop.Indication_AlarmInd:
420 alarmInd := indication.GetAlarmInd()
421 log.Infow("Received alarm indication ", log.Fields{"AlarmInd": alarmInd})
422 }
423}
424
425func (om *OpenOltManager) populateTechProfilePerPonPort() error {
426 var tpCount int
427 for _, techRange := range om.deviceInfo.Ranges {
428 for _, intfID := range techRange.IntfIds {
429 om.TechProfile[intfID] = &(om.rsrMgr.ResourceMgrs[intfID].TechProfileMgr)
430 tpCount++
431 log.Debugw("Init tech profile done", log.Fields{"intfID": intfID})
432 }
433 }
434 //Make sure we have as many tech_profiles as there are pon ports on the device
435 if tpCount != int(om.deviceInfo.GetPonPorts()) {
436 log.Errorw("Error while populating techprofile",
437 log.Fields{"numofTech": tpCount, "numPonPorts": om.deviceInfo.GetPonPorts()})
438 return errors.New("error while populating techprofile mgrs")
439 }
440 log.Infow("Populated techprofile for ponports successfully",
441 log.Fields{"numofTech": tpCount, "numPonPorts": om.deviceInfo.GetPonPorts()})
442 return nil
443}