blob: ae34133d723a545b039abaf14f4e9b5f5606529b [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14 */
15
16package controller
17
18import (
19 "context"
20 "errors"
21 "sync"
22 "time"
23
24 "encoding/hex"
25
26 "voltha-go-controller/database"
27 errorCodes "voltha-go-controller/internal/pkg/errorcodes"
28 "voltha-go-controller/internal/pkg/intf"
29 "voltha-go-controller/internal/pkg/of"
30 "voltha-go-controller/internal/pkg/tasks"
31 "voltha-go-controller/internal/pkg/util"
32 "voltha-go-controller/internal/pkg/vpagent"
33
34 "github.com/opencord/voltha-lib-go/v7/pkg/log"
35)
36
37var logger log.CLogger
38var ctx = context.TODO()
39
40func init() {
41 // Setup this package so that it's log level can be modified at run time
42 var err error
43 logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
44 if err != nil {
45 panic(err)
46 }
47}
48
49var db database.DBIntf
50
51var deviceTableSyncDuration = 15 * time.Minute
52
53//SetDeviceTableSyncDuration - sets interval between device table sync up activity
54// duration - in minutes
55func SetDeviceTableSyncDuration(duration int) {
56 deviceTableSyncDuration = time.Duration(duration) * time.Minute
57}
58
59// VoltController structure
60type VoltController struct {
61 rebootLock sync.Mutex
62 rebootInProgressDevices map[string]string
63 devices map[string]*Device
64 deviceLock sync.RWMutex
65 vagent map[string]*vpagent.VPAgent
66 ctx context.Context
67 app intf.App
68 RebootFlow bool
69 BlockedDeviceList *util.ConcurrentMap
70 deviceTaskQueue *util.ConcurrentMap
71}
72
73var vcontroller *VoltController
74
75// NewController is the constructor for VoltController
76func NewController(ctx context.Context, app intf.App) intf.IVPClientAgent {
77 var controller VoltController
78
79 controller.rebootInProgressDevices = make(map[string]string)
80 controller.devices = make(map[string]*Device)
81 controller.deviceLock = sync.RWMutex{}
82 controller.ctx = ctx
83 controller.app = app
84 controller.BlockedDeviceList = util.NewConcurrentMap()
85 controller.deviceTaskQueue = util.NewConcurrentMap()
86 db = database.GetDatabase()
87 vcontroller = &controller
88 return &controller
89}
90
91// AddDevice to add device
92func (v *VoltController) AddDevice(config *intf.VPClientCfg) intf.IVPClient {
93
94 d := NewDevice(config.DeviceID, config.SerialNum, config.VolthaClient, config.SouthBoundID)
95 v.devices[config.DeviceID] = d
96 v.app.AddDevice(d.ID, d.SerialNum, config.SouthBoundID)
97
98 d.RestoreMetersFromDb()
99 d.RestoreGroupsFromDb()
100 d.RestoreFlowsFromDb()
101 d.RestorePortsFromDb()
102 d.ConnectInd(context.TODO(), intf.DeviceDisc)
103 d.packetOutChannel = config.PacketOutChannel
104
105 logger.Warnw(ctx, "Added device", log.Fields{"Device": config.DeviceID, "SerialNo": d.SerialNum, "State": d.State})
106
107 return d
108}
109
110// DelDevice to delete device
111func (v *VoltController) DelDevice(id string) {
112 d, ok := v.devices[id]
113 if ok {
114 delete(v.devices, id)
115 d.Delete()
116 }
117 v.app.DelDevice(id)
118 d.cancel() // To stop the device tables sync routine
119 logger.Warnw(ctx, "Deleted device", log.Fields{"Device": id})
120}
121
122//AddControllerTask - add task to controller queue
123func (v *VoltController) AddControllerTask(device string, task tasks.Task) {
124 var taskQueueIntf interface{}
125 var taskQueue *tasks.Tasks
126 var found bool
127 if taskQueueIntf, found = v.deviceTaskQueue.Get(device); !found {
128 taskQueue = tasks.NewTasks(context.TODO())
129 v.deviceTaskQueue.Set(device, taskQueue)
130 } else {
131 taskQueue = taskQueueIntf.(*tasks.Tasks)
132 }
133 taskQueue.AddTask(task)
134 logger.Warnw(ctx, "Task Added to Controller Task List", log.Fields{"Len": taskQueue.NumPendingTasks(), "Total": taskQueue.TotalTasks()})
135}
136
137//AddNewDevice - called when new device is discovered. This will be
138//processed as part of controller queue
139func (v *VoltController) AddNewDevice(config *intf.VPClientCfg) {
140 adt := NewAddDeviceTask(config)
141 v.AddControllerTask(config.DeviceID, adt)
142}
143
144// GetDevice to get device info
145func (v *VoltController) GetDevice(id string) (*Device, error) {
146 d, ok := v.devices[id]
147 if ok {
148 return d, nil
149 }
150 return nil, errorCodes.ErrDeviceNotFound
151}
152
153// IsRebootInProgressForDevice to check if reboot is in progress for the device
154func (v *VoltController) IsRebootInProgressForDevice(device string) bool {
155 v.rebootLock.Lock()
156 defer v.rebootLock.Unlock()
157 _, ok := v.rebootInProgressDevices[device]
158 return ok
159}
160
161// SetRebootInProgressForDevice to set reboot in progress for the device
162func (v *VoltController) SetRebootInProgressForDevice(device string) bool {
163 v.rebootLock.Lock()
164 defer v.rebootLock.Unlock()
165 _, ok := v.rebootInProgressDevices[device]
166 if ok {
167 return true
168 }
169 v.rebootInProgressDevices[device] = device
170 logger.Warnw(ctx, "Setted Reboot-In-Progress flag", log.Fields{"Device": device})
171
172 d, err := v.GetDevice(device)
173 if err == nil {
174 d.ResetCache()
175 } else {
176 logger.Errorw(ctx, "Failed to get device", log.Fields{"Device": device, "Error": err})
177 }
178
179 return true
180}
181
182// ReSetRebootInProgressForDevice to reset reboot in progress for the device
183func (v *VoltController) ReSetRebootInProgressForDevice(device string) bool {
184 v.rebootLock.Lock()
185 defer v.rebootLock.Unlock()
186 _, ok := v.rebootInProgressDevices[device]
187 if !ok {
188 return true
189 }
190 delete(v.rebootInProgressDevices, device)
191 logger.Warnw(ctx, "Resetted Reboot-In-Progress flag", log.Fields{"Device": device})
192 return true
193}
194
195// DeviceRebootInd is device reboot indication
196func (v *VoltController) DeviceRebootInd(dID string, srNo string, sbID string) {
197 v.app.DeviceRebootInd(dID, srNo, sbID)
198 _ = db.DelAllRoutesForDevice(dID)
199 _ = db.DelAllGroup(dID)
200 _ = db.DelAllMeter(dID)
201 _ = db.DelAllPONCounters(dID)
202}
203
204// DeviceDisableInd is device deactivation indication
205func (v *VoltController) DeviceDisableInd(dID string) {
206 v.app.DeviceDisableInd(dID)
207}
208
209//TriggerPendingProfileDeleteReq - trigger pending profile delete requests
210func (v *VoltController) TriggerPendingProfileDeleteReq(device string) {
211 v.app.TriggerPendingProfileDeleteReq(device)
212}
213
214//TriggerPendingMigrateServicesReq - trigger pending services migration requests
215func (v *VoltController) TriggerPendingMigrateServicesReq(device string) {
216 v.app.TriggerPendingMigrateServicesReq(device)
217}
218
219// SetAuditFlags to set the audit flags
220func (v *VoltController) SetAuditFlags(device *Device) {
221 v.app.SetRebootFlag(true)
222 device.auditInProgress = true
223}
224
225// ResetAuditFlags to reset the audit flags
226func (v *VoltController) ResetAuditFlags(device *Device) {
227 v.app.SetRebootFlag(false)
228 device.auditInProgress = false
229}
230
231//ProcessFlowModResultIndication - send flow mod result notification
232func (v *VoltController) ProcessFlowModResultIndication(flowStatus intf.FlowStatus) {
233 v.app.ProcessFlowModResultIndication(flowStatus)
234}
235
236// AddVPAgent to add the vpagent
237func (v *VoltController) AddVPAgent(vep string, vpa *vpagent.VPAgent) {
238 v.vagent[vep] = vpa
239}
240
241// VPAgent to get vpagent info
242func (v *VoltController) VPAgent(vep string) (*vpagent.VPAgent, error) {
243 vpa, ok := v.vagent[vep]
244 if ok {
245 return vpa, nil
246 }
247 return nil, errors.New("VPA Not Registered")
248}
249
250// PacketOutReq for packet out request
251func (v *VoltController) PacketOutReq(device string, inport string, outport string, pkt []byte, isCustomPkt bool) error {
252 logger.Debugw(ctx, "Packet Out Req", log.Fields{"Device": device, "OutPort": outport})
253 d, err := v.GetDevice(device)
254 if err != nil {
255 return err
256 }
257 logger.Debugw(ctx, "Packet Out Pkt", log.Fields{"Pkt": hex.EncodeToString(pkt)})
258 return d.PacketOutReq(inport, outport, pkt, isCustomPkt)
259}
260
261// AddFlows to add flows
262func (v *VoltController) AddFlows(port string, device string, flow *of.VoltFlow) error {
263 d, err := v.GetDevice(device)
264 if err != nil {
265 logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
266 return err
267 }
268 devPort := d.GetPortByName(port)
269 if devPort == nil {
270 logger.Errorw(ctx, "Port Not Found", log.Fields{"Device": device})
271 return errorCodes.ErrPortNotFound
272 }
273 if d.ctx == nil {
274 //FIXME: Application should know the context before it could submit task. Handle at application level
275 logger.Errorw(ctx, "Context is missing. AddFlow Operation Not added to Task", log.Fields{"Device": device})
276 return errorCodes.ErrInvalidParamInRequest
277 }
278
279 var isMigrationRequired bool
280 if flow.MigrateCookie {
281 // flow migration to new cookie must be done only during the audit. Migration for all subflows must be done if
282 // atlease one subflow with old cookie found in the device.
283 for _, subFlow := range flow.SubFlows {
284 if isMigrationRequired = d.IsFlowPresentWithOldCookie(subFlow); isMigrationRequired {
285 break
286 }
287 }
288 }
289
290 if isMigrationRequired {
291 // In this case, the flow is updated in local cache and db here.
292 // Actual flow deletion and addition at voltha will happen during flow tables audit.
293 for _, subFlow := range flow.SubFlows {
294 logger.Debugw(ctx, "Cookie Migration Required", log.Fields{"OldCookie": subFlow.OldCookie, "NewCookie": subFlow.Cookie})
295 if err := d.DelFlowWithOldCookie(subFlow); err != nil {
296 logger.Errorw(ctx, "Delete flow with old cookie failed", log.Fields{"Error": err, "OldCookie": subFlow.OldCookie})
297 }
298 if err := d.AddFlow(subFlow); err != nil {
299 logger.Errorw(ctx, "Flow Add Failed", log.Fields{"Error": err, "Cookie": subFlow.Cookie})
300 }
301 }
302 } else {
303 flow.Command = of.CommandAdd
304 d.UpdateFlows(flow, devPort)
305 for cookie := range flow.SubFlows {
306 logger.Debugw(ctx, "Flow Add added to queue", log.Fields{"Cookie": cookie, "Device": device, "Port": port})
307 }
308 }
309 return nil
310}
311
312// DelFlows to delete flows
313func (v *VoltController) DelFlows(port string, device string, flow *of.VoltFlow) error {
314 d, err := v.GetDevice(device)
315 if err != nil {
316 logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
317 return err
318 }
319 devPort := d.GetPortByName(port)
320 if devPort == nil {
321 logger.Errorw(ctx, "Port Not Found", log.Fields{"Device": device})
322 return errorCodes.ErrPortNotFound
323 }
324 if d.ctx == nil {
325 //FIXME: Application should know the context before it could submit task. Handle at application level
326 logger.Errorw(ctx, "Context is missing. DelFlow Operation Not added to Task", log.Fields{"Device": device})
327 return errorCodes.ErrInvalidParamInRequest
328 }
329
330 var isMigrationRequired bool
331 if flow.MigrateCookie {
332 // flow migration to new cookie must be done only during the audit. Migration for all subflows must be done if
333 // atlease one subflow with old cookie found in the device.
334 for _, subFlow := range flow.SubFlows {
335 if isMigrationRequired = d.IsFlowPresentWithOldCookie(subFlow); isMigrationRequired {
336 break
337 }
338 }
339 }
340
341 if isMigrationRequired {
342 // In this case, the flow is deleted from local cache and db here.
343 // Actual flow deletion at voltha will happen during flow tables audit.
344 for _, subFlow := range flow.SubFlows {
345 logger.Debugw(ctx, "Old Cookie delete Required", log.Fields{"OldCookie": subFlow.OldCookie})
346 if err := d.DelFlowWithOldCookie(subFlow); err != nil {
347 logger.Errorw(ctx, "DelFlowWithOldCookie failed", log.Fields{"OldCookie": subFlow.OldCookie, "Error": err})
348 }
349 }
350 } else {
351 flow.Command = of.CommandDel
352 d.UpdateFlows(flow, devPort)
353 for cookie := range flow.SubFlows {
354 logger.Debugw(ctx, "Flow Del added to queue", log.Fields{"Cookie": cookie, "Device": device, "Port": port})
355 }
356 }
357 return nil
358}
359
360// GroupUpdate for group update
361func (v *VoltController) GroupUpdate(port string, device string, group *of.Group) error {
362 d, err := v.GetDevice(device)
363 if err != nil {
364 logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
365 return err
366 }
367
368 devPort := d.GetPortByName(port)
369 if devPort == nil {
370 logger.Errorw(ctx, "Port Not Found", log.Fields{"Device": device})
371 return errorCodes.ErrPortNotFound
372 }
373
374 if d.ctx == nil {
375 //FIXME: Application should know the context before it could submit task. Handle at application level
376 logger.Errorw(ctx, "Context is missing. GroupMod Operation Not added to task", log.Fields{"Device": device})
377 return errorCodes.ErrInvalidParamInRequest
378 }
379
380 d.UpdateGroup(group, devPort)
381 return nil
382}
383
384// ModMeter to get mod meter info
385func (v *VoltController) ModMeter(port string, device string, command of.MeterCommand, meter *of.Meter) error {
386 d, err := v.GetDevice(device)
387 if err != nil {
388 logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
389 return err
390 }
391
392 devPort := d.GetPortByName(port)
393 if devPort == nil {
394 logger.Errorw(ctx, "Port Not Found", log.Fields{"Device": device})
395 return errorCodes.ErrPortNotFound
396 }
397
398 d.ModMeter(command, meter, devPort)
399 return nil
400}
401
402// PortAddInd for port add indication
403func (v *VoltController) PortAddInd(device string, id uint32, name string) {
404 v.app.PortAddInd(device, id, name)
405}
406
407// PortDelInd for port delete indication
408func (v *VoltController) PortDelInd(device string, port string) {
409 v.app.PortDelInd(device, port)
410}
411
412// PortUpdateInd for port update indication
413func (v *VoltController) PortUpdateInd(device string, name string, id uint32) {
414 v.app.PortUpdateInd(device, name, id)
415}
416
417// PortUpInd for port up indication
418func (v *VoltController) PortUpInd(device string, port string) {
419 v.app.PortUpInd(device, port)
420}
421
422// PortDownInd for port down indication
423func (v *VoltController) PortDownInd(device string, port string) {
424 v.app.PortDownInd(device, port)
425}
426
427// DeviceUpInd for device up indication
428func (v *VoltController) DeviceUpInd(device string) {
429 v.app.DeviceUpInd(device)
430}
431
432// DeviceDownInd for device down indication
433func (v *VoltController) DeviceDownInd(device string) {
434 v.app.DeviceDownInd(device)
435}
436
437// PacketInInd for packet in indication
438func (v *VoltController) PacketInInd(device string, port string, data []byte) {
439 v.app.PacketInInd(device, port, data)
440}
441
442// GetPortState to get port status
443func (v *VoltController) GetPortState(device string, name string) (PortState, error) {
444 d, err := v.GetDevice(device)
445 if err != nil {
446 logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
447 return PortStateDown, err
448 }
449 return d.GetPortState(name)
450}
451
452// UpdateMvlanProfiles for update mvlan profiles
453func (v *VoltController) UpdateMvlanProfiles(device string) {
454 v.app.UpdateMvlanProfilesForDevice(device)
455}
456
457// GetController to get controller
458func GetController() *VoltController {
459 return vcontroller
460}
461
462/*
463// PostIndication to post indication
464func (v *VoltController) PostIndication(device string, task interface{}) error {
465 var srvTask AddServiceIndTask
466 var portTask AddPortIndTask
467 var taskCommon tasks.Task
468 var isSvcTask bool
469
470 switch data := task.(type) {
471 case *AddServiceIndTask:
472 srvTask = *data
473 taskCommon = data
474 isSvcTask = true
475 case *AddPortIndTask:
476 portTask = *data
477 taskCommon = data
478 }
479
480 d, err := v.GetDevice(device)
481 if err != nil {
482 logger.Errorw(ctx, "Device Not Found", log.Fields{"Device": device})
483 //It means device itself it not present so just post the indication directly
484 if isSvcTask {
485 msgbus.PostAccessConfigInd(srvTask.result, d.SerialNum, srvTask.indicationType, srvTask.serviceName, 0, srvTask.reason, srvTask.trigger, srvTask.portState)
486 } else {
487 msgbus.ProcessPortInd(portTask.indicationType, d.SerialNum, portTask.portName, portTask.accessConfig, portTask.serviceList)
488 }
489 return err
490 }
491 if taskCommon != nil {
492 d.AddTask(taskCommon)
493 }
494 return nil
495}
496*/
497
498// GetTaskList to get the task list
499func (v *VoltController) GetTaskList(device string) []tasks.Task {
500 d, err := v.GetDevice(device)
501 if err != nil || d.ctx == nil {
502 logger.Errorw(ctx, "Device Not Connected/Found", log.Fields{"Device": device, "Dev Obj": d})
503 return []tasks.Task{}
504 }
505 return d.GetTaskList()
506
507}
508
509// AddBlockedDevices to add devices to blocked devices list
510func (v *VoltController) AddBlockedDevices(deviceSerialNumber string) {
511 v.BlockedDeviceList.Set(deviceSerialNumber, deviceSerialNumber)
512}
513
514// DelBlockedDevices to remove device from blocked device list
515func (v *VoltController) DelBlockedDevices(deviceSerialNumber string) {
516 v.BlockedDeviceList.Remove(deviceSerialNumber)
517}
518
519// IsBlockedDevice to check if device is blocked
520func (v *VoltController) IsBlockedDevice(deviceSerialNumber string) bool {
521 _, ifPresent := v.BlockedDeviceList.Get(deviceSerialNumber)
522 return ifPresent
523}