blob: b410490afe5740a90ce366dc92661692769d3b04 [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14*/
15
16package controller
17
18import (
19 "context"
20 "encoding/json"
21 "errors"
22 infraerror "voltha-go-controller/internal/pkg/errorcodes"
23 "strconv"
24 "sync"
25 "time"
26
27 "voltha-go-controller/database"
28 "voltha-go-controller/internal/pkg/holder"
29 "voltha-go-controller/internal/pkg/intf"
30 "voltha-go-controller/internal/pkg/of"
31 //"voltha-go-controller/internal/pkg/vpagent"
32 "voltha-go-controller/internal/pkg/tasks"
33 "voltha-go-controller/internal/pkg/util"
Tinoj Joseph1d108322022-07-13 10:07:39 +053034 "voltha-go-controller/log"
Naveen Sampath04696f72022-06-13 15:19:14 +053035 ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
36 "github.com/opencord/voltha-protos/v5/go/voltha"
37)
38
39// PortState type
40type PortState string
41
42const (
43 // PortStateDown constant
44 PortStateDown PortState = "DOWN"
45 // PortStateUp constant
46 PortStateUp PortState = "UP"
47 // DefaultMaxFlowQueues constant
48 DefaultMaxFlowQueues = 67
49 //ErrDuplicateFlow - indicates flow already exists in DB
50 ErrDuplicateFlow string = "Duplicate Flow"
51)
52
53// DevicePort structure
54type DevicePort struct {
55 tasks.Tasks
56 Name string
57 ID uint32
58 State PortState
59 Version string
60}
61
62// NewDevicePort is the constructor for DevicePort
63func NewDevicePort(id uint32, name string) *DevicePort {
64 var port DevicePort
65
66 port.ID = id
67 port.Name = name
68 port.State = PortStateDown
69 return &port
70}
71
72// UniIDFlowQueue structure which maintains flows in queue.
73type UniIDFlowQueue struct {
74 tasks.Tasks
75 ID uint32
76}
77
78// NewUniIDFlowQueue is the constructor for UniIDFlowQueue.
79func NewUniIDFlowQueue(id uint32) *UniIDFlowQueue {
80 var flowQueue UniIDFlowQueue
81 flowQueue.ID = id
82 return &flowQueue
83}
84
85// DeviceState type
86type DeviceState string
87
88const (
89
90 // DeviceStateUNKNOWN constant
91 DeviceStateUNKNOWN DeviceState = "UNKNOWN"
92 // DeviceStateINIT constant
93 DeviceStateINIT DeviceState = "INIT"
94 // DeviceStateUP constant
95 DeviceStateUP DeviceState = "UP"
96 // DeviceStateDOWN constant
97 DeviceStateDOWN DeviceState = "DOWN"
98 // DeviceStateREBOOTED constant
99 DeviceStateREBOOTED DeviceState = "REBOOTED"
100 // DeviceStateDISABLED constant
101 DeviceStateDISABLED DeviceState = "DISABLED"
102 // DeviceStateDELETED constant
103 DeviceStateDELETED DeviceState = "DELETED"
104)
105
106// Device structure
107type Device struct {
108 tasks.Tasks
109 ID string
110 SerialNum string
111 State DeviceState
112 PortsByID map[uint32]*DevicePort
113 PortsByName map[string]*DevicePort
114 portLock sync.RWMutex
115 vclientHolder *holder.VolthaServiceClientHolder
116 ctx context.Context
117 cancel context.CancelFunc
118 packetOutChannel chan *ofp.PacketOut
119 flows map[uint64]*of.VoltSubFlow
120 flowLock sync.RWMutex
121 meters map[uint32]*of.Meter
122 meterLock sync.RWMutex
123 groups sync.Map //map[uint32]*of.Group -> [GroupId : Group]
124 auditInProgress bool
125 flowQueueLock sync.RWMutex
126 flowHash uint32
127 flowQueue map[uint32]*UniIDFlowQueue // key is hash ID generated and value is UniIDFlowQueue.
128 deviceAuditInProgress bool
129 SouthBoundID string
130}
131
132// NewDevice is the constructor for Device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530133func NewDevice(cntx context.Context, id string, slno string, vclientHldr *holder.VolthaServiceClientHolder, southBoundID string) *Device {
Naveen Sampath04696f72022-06-13 15:19:14 +0530134 var device Device
135 device.ID = id
136 device.SerialNum = slno
137 device.State = DeviceStateDOWN
138 device.PortsByID = make(map[uint32]*DevicePort)
139 device.PortsByName = make(map[string]*DevicePort)
140 device.vclientHolder = vclientHldr
141 device.flows = make(map[uint64]*of.VoltSubFlow)
142 device.meters = make(map[uint32]*of.Meter)
143 device.flowQueue = make(map[uint32]*UniIDFlowQueue)
144 //Get the flowhash from db and update the flowhash variable in the device.
145 device.SouthBoundID = southBoundID
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530146 flowHash, err := db.GetFlowHash(cntx, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530147 if err != nil {
148 device.flowHash = DefaultMaxFlowQueues
149 } else {
150 var hash uint32
151 err = json.Unmarshal([]byte(flowHash), &hash)
152 if err != nil {
153 logger.Error(ctx, "Failed to unmarshall flowhash")
154 } else {
155 device.flowHash = hash
156 }
157 }
158 logger.Infow(ctx, "Flow hash for device", log.Fields{"Deviceid": id, "hash": device.flowHash})
159 return &device
160}
161
162// ResetCache to reset cache
163func (d *Device) ResetCache() {
164 logger.Warnw(ctx, "Resetting flows, meters and groups cache", log.Fields{"Device": d.ID})
165 d.flows = make(map[uint64]*of.VoltSubFlow)
166 d.meters = make(map[uint32]*of.Meter)
167 d.groups = sync.Map{}
168}
169
170// GetFlow - Get the flow from device obj
171func (d *Device) GetFlow(cookie uint64) (*of.VoltSubFlow, bool) {
172 d.flowLock.RLock()
173 defer d.flowLock.RUnlock()
174 logger.Infow(ctx, "Get Flow", log.Fields{"Cookie": cookie})
175 flow, ok := d.flows[cookie]
176 return flow, ok
177}
178
179// AddFlow - Adds the flow to the device and also to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530180func (d *Device) AddFlow(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530181 d.flowLock.Lock()
182 defer d.flowLock.Unlock()
183 logger.Infow(ctx, "AddFlow to device", log.Fields{"Cookie": flow.Cookie})
184 if _, ok := d.flows[flow.Cookie]; ok {
185 return errors.New(ErrDuplicateFlow)
186 }
187 d.flows[flow.Cookie] = flow
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530188 d.AddFlowToDb(cntx, flow)
Naveen Sampath04696f72022-06-13 15:19:14 +0530189 return nil
190}
191
192// AddFlowToDb is the utility to add the flow to the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530193func (d *Device) AddFlowToDb(cntx context.Context, flow *of.VoltSubFlow) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530194 if b, err := json.Marshal(flow); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530195 if err = db.PutFlow(cntx, d.ID, flow.Cookie, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530196 logger.Errorw(ctx, "Write Flow to DB failed", log.Fields{"device": d.ID, "cookie": flow.Cookie, "Reason": err})
197 }
198 }
199}
200
201// DelFlow - Deletes the flow from the device and the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530202func (d *Device) DelFlow(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530203 d.flowLock.Lock()
204 defer d.flowLock.Unlock()
205 if _, ok := d.flows[flow.Cookie]; ok {
206 delete(d.flows, flow.Cookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530207 d.DelFlowFromDb(cntx, flow.Cookie)
Naveen Sampath04696f72022-06-13 15:19:14 +0530208 return nil
209 }
210 return errors.New("Flow does not Exist")
211}
212
213// DelFlowFromDb is utility to delete the flow from the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530214func (d *Device) DelFlowFromDb(cntx context.Context, flowID uint64) {
215 _ = db.DelFlow(cntx, d.ID, flowID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530216}
217
218// IsFlowPresentWithOldCookie is to check whether there is any flow with old cookie.
219func (d *Device) IsFlowPresentWithOldCookie(flow *of.VoltSubFlow) bool {
220 d.flowLock.RLock()
221 defer d.flowLock.RUnlock()
222 if _, ok := d.flows[flow.Cookie]; ok {
223 return false
224 } else if flow.OldCookie != 0 && flow.Cookie != flow.OldCookie {
225 if _, ok := d.flows[flow.OldCookie]; ok {
226 logger.Infow(ctx, "Flow present with old cookie", log.Fields{"OldCookie": flow.OldCookie})
227 return true
228 }
229 }
230 return false
231}
232
233// DelFlowWithOldCookie is to delete flow with old cookie.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530234func (d *Device) DelFlowWithOldCookie(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530235 d.flowLock.Lock()
236 defer d.flowLock.Unlock()
237 if _, ok := d.flows[flow.OldCookie]; ok {
238 logger.Infow(ctx, "Flow was added before vgc upgrade. Trying to delete with old cookie",
239 log.Fields{"OldCookie": flow.OldCookie})
240 delete(d.flows, flow.OldCookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530241 d.DelFlowFromDb(cntx, flow.OldCookie)
Naveen Sampath04696f72022-06-13 15:19:14 +0530242 return nil
243 }
244 return errors.New("Flow does not Exist")
245}
246
247// RestoreFlowsFromDb to restore flows from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530248func (d *Device) RestoreFlowsFromDb(cntx context.Context) {
249 flows, _ := db.GetFlows(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530250 for _, flow := range flows {
251 b, ok := flow.Value.([]byte)
252 if !ok {
253 logger.Warn(ctx, "The value type is not []byte")
254 continue
255 }
256 d.CreateFlowFromString(b)
257 }
258}
259
260// CreateFlowFromString to create flow from string
261func (d *Device) CreateFlowFromString(b []byte) {
262 var flow of.VoltSubFlow
263 if err := json.Unmarshal(b, &flow); err == nil {
264 if _, ok := d.flows[flow.Cookie]; !ok {
265 logger.Debugw(ctx, "Adding Flow From Db", log.Fields{"Cookie": flow.Cookie})
266 d.flows[flow.Cookie] = &flow
267 } else {
268 logger.Warnw(ctx, "Duplicate Flow", log.Fields{"Cookie": flow.Cookie})
269 }
270 } else {
271 logger.Warn(ctx, "Unmarshal failed")
272 }
273}
274
275// ----------------------------------------------------------
276// Database related functionality
277// Group operations at the device which include update and delete
278
279// UpdateGroupEntry - Adds/Updates the group to the device and also to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530280func (d *Device) UpdateGroupEntry(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530281
282 logger.Infow(ctx, "Update Group to device", log.Fields{"ID": group.GroupID})
283 d.groups.Store(group.GroupID, group)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530284 d.AddGroupToDb(cntx, group)
Naveen Sampath04696f72022-06-13 15:19:14 +0530285}
286
287// AddGroupToDb - Utility to add the group to the device DB
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530288func (d *Device) AddGroupToDb(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530289 if b, err := json.Marshal(group); err == nil {
290 logger.Infow(ctx, "Adding Group to DB", log.Fields{"grp": group, "Json": string(b)})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530291 if err = db.PutGroup(cntx, d.ID, group.GroupID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530292 logger.Errorw(ctx, "Write Group to DB failed", log.Fields{"device": d.ID, "groupID": group.GroupID, "Reason": err})
293 }
294 }
295}
296
297// DelGroupEntry - Deletes the group from the device and the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530298func (d *Device) DelGroupEntry(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530299
300 if _, ok := d.groups.Load(group.GroupID); ok {
301 d.groups.Delete(group.GroupID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530302 d.DelGroupFromDb(cntx, group.GroupID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530303 }
304}
305
306// DelGroupFromDb - Utility to delete the Group from the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530307func (d *Device) DelGroupFromDb(cntx context.Context, groupID uint32) {
308 _ = db.DelGroup(cntx, d.ID, groupID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530309}
310
311//RestoreGroupsFromDb - restores all groups from DB
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530312func (d *Device) RestoreGroupsFromDb(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530313 logger.Info(ctx, "Restoring Groups")
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530314 groups, _ := db.GetGroups(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530315 for _, group := range groups {
316 b, ok := group.Value.([]byte)
317 if !ok {
318 logger.Warn(ctx, "The value type is not []byte")
319 continue
320 }
321 d.CreateGroupFromString(b)
322 }
323}
324
325//CreateGroupFromString - Forms group struct from json string
326func (d *Device) CreateGroupFromString(b []byte) {
327 var group of.Group
328 if err := json.Unmarshal(b, &group); err == nil {
329 if _, ok := d.groups.Load(group.GroupID); !ok {
330 logger.Debugw(ctx, "Adding Group From Db", log.Fields{"GroupId": group.GroupID})
331 d.groups.Store(group.GroupID, &group)
332 } else {
333 logger.Warnw(ctx, "Duplicate Group", log.Fields{"GroupId": group.GroupID})
334 }
335 } else {
336 logger.Warn(ctx, "Unmarshal failed")
337 }
338}
339
340// AddMeter to add meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530341func (d *Device) AddMeter(cntx context.Context, meter *of.Meter) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530342 d.meterLock.Lock()
343 defer d.meterLock.Unlock()
344 if _, ok := d.meters[meter.ID]; ok {
345 return errors.New("Duplicate Meter")
346 }
347 d.meters[meter.ID] = meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530348 go d.AddMeterToDb(cntx, meter)
Naveen Sampath04696f72022-06-13 15:19:14 +0530349 return nil
350}
351
352// GetMeter to get meter
353func (d *Device) GetMeter(id uint32) (*of.Meter, error) {
354 d.meterLock.RLock()
355 defer d.meterLock.RUnlock()
356 if m, ok := d.meters[id]; ok {
357 return m, nil
358 }
359 return nil, errors.New("Meter Not Found")
360}
361
362// DelMeter to delete meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530363func (d *Device) DelMeter(cntx context.Context, meter *of.Meter) bool {
Naveen Sampath04696f72022-06-13 15:19:14 +0530364 d.meterLock.Lock()
365 defer d.meterLock.Unlock()
366 if _, ok := d.meters[meter.ID]; ok {
367 delete(d.meters, meter.ID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530368 go d.DelMeterFromDb(cntx, meter.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530369 return true
370 }
371 return false
372}
373
374// AddMeterToDb is utility to add the Group to the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530375func (d *Device) AddMeterToDb(cntx context.Context, meter *of.Meter) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530376 if b, err := json.Marshal(meter); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530377 if err = db.PutDeviceMeter(cntx, d.ID, meter.ID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530378 logger.Errorw(ctx, "Write Meter to DB failed", log.Fields{"device": d.ID, "meterID": meter.ID, "Reason": err})
379 }
380 }
381}
382
383// DelMeterFromDb to delete meter from db
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530384func (d *Device) DelMeterFromDb(cntx context.Context, id uint32) {
385 _ = db.DelDeviceMeter(cntx, d.ID, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530386}
387
388// RestoreMetersFromDb to restore meters from db
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530389func (d *Device) RestoreMetersFromDb(cntx context.Context) {
390 meters, _ := db.GetDeviceMeters(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530391 for _, meter := range meters {
392 b, ok := meter.Value.([]byte)
393 if !ok {
394 logger.Warn(ctx, "The value type is not []byte")
395 continue
396 }
397 d.CreateMeterFromString(b)
398 }
399}
400
401// CreateMeterFromString to create meter from string
402func (d *Device) CreateMeterFromString(b []byte) {
403 var meter of.Meter
404 if err := json.Unmarshal(b, &meter); err == nil {
405 if _, ok := d.meters[meter.ID]; !ok {
406 logger.Debugw(ctx, "Adding Meter From Db", log.Fields{"ID": meter.ID})
407 d.meters[meter.ID] = &meter
408 } else {
409 logger.Warnw(ctx, "Duplicate Meter", log.Fields{"ID": meter.ID})
410 }
411 } else {
412 logger.Warn(ctx, "Unmarshal failed")
413 }
414}
415
416// VolthaClient to get voltha client
417func (d *Device) VolthaClient() voltha.VolthaServiceClient {
418 return d.vclientHolder.Get()
419}
420
421// AddPort to add the port as requested by the device/VOLTHA
422// Inform the application if the port is successfully added
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530423func (d *Device) AddPort(cntx context.Context, id uint32, name string) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530424 d.portLock.Lock()
425 defer d.portLock.Unlock()
426
427 if _, ok := d.PortsByID[id]; ok {
428 return errors.New("Duplicate port")
429 }
430 if _, ok := d.PortsByName[name]; ok {
431 return errors.New("Duplicate port")
432 }
433
434 p := NewDevicePort(id, name)
435 d.PortsByID[id] = p
436 d.PortsByName[name] = p
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530437 d.WritePortToDb(cntx, p)
438 GetController().PortAddInd(cntx, d.ID, p.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530439 logger.Infow(ctx, "Added Port", log.Fields{"Device": d.ID, "Port": id})
440 return nil
441}
442
443// DelPort to delete the port as requested by the device/VOLTHA
444// Inform the application if the port is successfully deleted
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530445func (d *Device) DelPort(cntx context.Context, id uint32) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530446
447 p := d.GetPortByID(id)
448 if p == nil {
449 return errors.New("Unknown Port")
450 }
451 if p.State == PortStateUp {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530452 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530453 }
454 d.portLock.Lock()
455 defer d.portLock.Unlock()
456
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530457 GetController().PortDelInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530458 delete(d.PortsByID, p.ID)
459 delete(d.PortsByName, p.Name)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530460 d.DelPortFromDb(cntx, p.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530461 logger.Infow(ctx, "Deleted Port", log.Fields{"Device": d.ID, "Port": id})
462 return nil
463}
464
465// UpdatePortByName is utility to update the port by Name
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530466func (d *Device) UpdatePortByName(cntx context.Context, name string, port uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530467 d.portLock.Lock()
468 defer d.portLock.Unlock()
469
470 p, ok := d.PortsByName[name]
471 if !ok {
472 return
473 }
474 delete(d.PortsByID, p.ID)
475 p.ID = port
476 d.PortsByID[port] = p
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530477 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530478 GetController().PortUpdateInd(d.ID, p.Name, p.ID)
479 logger.Infow(ctx, "Updated Port", log.Fields{"Device": d.ID, "Port": p.ID, "PortName": name})
480}
481
482// GetPortName to get the name of the port by its id
483func (d *Device) GetPortName(id uint32) (string, error) {
484 d.portLock.RLock()
485 defer d.portLock.RUnlock()
486
487 if p, ok := d.PortsByID[id]; ok {
488 return p.Name, nil
489 }
490 logger.Errorw(ctx, "Port not found", log.Fields{"port": id})
491 return "", errors.New("Unknown Port ID")
492}
493
494// GetPortByID is utility to retrieve the port by ID
495func (d *Device) GetPortByID(id uint32) *DevicePort {
496 d.portLock.RLock()
497 defer d.portLock.RUnlock()
498
499 p, ok := d.PortsByID[id]
500 if ok {
501 return p
502 }
503 return nil
504}
505
506// GetPortByName is utility to retrieve the port by Name
507func (d *Device) GetPortByName(name string) *DevicePort {
508 d.portLock.RLock()
509 defer d.portLock.RUnlock()
510
511 p, ok := d.PortsByName[name]
512 if ok {
513 return p
514 }
515 return nil
516}
517
518// GetPortState to get the state of the port by name
519func (d *Device) GetPortState(name string) (PortState, error) {
520 d.portLock.RLock()
521 defer d.portLock.RUnlock()
522
523 if p, ok := d.PortsByName[name]; ok {
524 return p.State, nil
525 }
526 return PortStateDown, errors.New("Unknown Port ID")
527}
528
529// GetPortID to get the port-id by the port name
530func (d *Device) GetPortID(name string) (uint32, error) {
531 d.portLock.RLock()
532 defer d.portLock.RUnlock()
533
534 if p, ok := d.PortsByName[name]; ok {
535 return p.ID, nil
536 }
537 return 0, errors.New("Unknown Port ID")
538
539}
540
541// WritePortToDb to add the port to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530542func (d *Device) WritePortToDb(ctx context.Context, port *DevicePort) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530543 port.Version = database.PresentVersionMap[database.DevicePortPath]
544 if b, err := json.Marshal(port); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530545 if err = db.PutPort(ctx, d.ID, port.ID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530546 logger.Errorw(ctx, "Write port to DB failed", log.Fields{"device": d.ID, "port": port.ID, "Reason": err})
547 }
548 }
549}
550
551// DelPortFromDb to delete port from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530552func (d *Device) DelPortFromDb(cntx context.Context, id uint32) {
553 _ = db.DelPort(cntx, d.ID, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530554}
555
556// RestorePortsFromDb to restore ports from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530557func (d *Device) RestorePortsFromDb(cntx context.Context) {
558 ports, _ := db.GetPorts(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530559 for _, port := range ports {
560 b, ok := port.Value.([]byte)
561 if !ok {
562 logger.Warn(ctx, "The value type is not []byte")
563 continue
564 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530565 d.CreatePortFromString(cntx, b)
Naveen Sampath04696f72022-06-13 15:19:14 +0530566 }
567}
568
569// CreatePortFromString to create port from string
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530570func (d *Device) CreatePortFromString(cntx context.Context, b []byte) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530571 var port DevicePort
572 if err := json.Unmarshal(b, &port); err == nil {
573 if _, ok := d.PortsByID[port.ID]; !ok {
574 logger.Debugw(ctx, "Adding Port From Db", log.Fields{"ID": port.ID})
575 d.PortsByID[port.ID] = &port
576 d.PortsByName[port.Name] = &port
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530577 GetController().PortAddInd(cntx, d.ID, port.ID, port.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530578 } else {
579 logger.Warnw(ctx, "Duplicate Port", log.Fields{"ID": port.ID})
580 }
581 } else {
582 logger.Warn(ctx, "Unmarshal failed")
583 }
584}
585
586// Delete : OLT Delete functionality yet to be implemented. IDeally all of the
587// resources should have been removed by this time. It is an error
588// scenario if the OLT has resources associated with it.
589func (d *Device) Delete() {
590 d.StopAll()
591}
592
593// Stop to stop the task
594func (d *Device) Stop() {
595}
596
597// ConnectInd is called when the connection between VGC and the VOLTHA is
598// restored. This will perform audit of the device post reconnection
599func (d *Device) ConnectInd(ctx context.Context, discType intf.DiscoveryType) {
600 logger.Warnw(ctx, "Audit Upon Connection Establishment", log.Fields{"Device": d.ID, "State": d.State})
601 ctx1, cancel := context.WithCancel(ctx)
602 d.cancel = cancel
603 d.ctx = ctx1
604 d.Tasks.Initialize(ctx1)
605
606 logger.Warnw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
607 d.State = DeviceStateUP
608 GetController().DeviceUpInd(d.ID)
609
610 logger.Warnw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
611 t := NewAuditDevice(d, AuditEventDeviceDisc)
612 d.Tasks.AddTask(t)
613
614 t1 := NewAuditTablesTask(d)
615 d.Tasks.AddTask(t1)
616
617 t2 := NewPendingProfilesTask(d)
618 d.Tasks.AddTask(t2)
619
620 go d.synchronizeDeviceTables()
621}
622
623func (d *Device) synchronizeDeviceTables() {
624
625 tick := time.NewTicker(deviceTableSyncDuration)
626loop:
627 for {
628 select {
629 case <-d.ctx.Done():
630 logger.Warnw(d.ctx, "Context Done. Cancelling Periodic Audit", log.Fields{"Context": ctx, "Device": d.ID, "DeviceSerialNum": d.SerialNum})
631 break loop
632 case <-tick.C:
633 t1 := NewAuditTablesTask(d)
634 d.Tasks.AddTask(t1)
635 }
636 }
637 tick.Stop()
638}
639
640// DeviceUpInd is called when the logical device state changes to UP. This will perform audit of the device post reconnection
641func (d *Device) DeviceUpInd() {
642 logger.Warnw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
643 d.State = DeviceStateUP
644 GetController().DeviceUpInd(d.ID)
645
646 logger.Warnw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
647 t := NewAuditDevice(d, AuditEventDeviceDisc)
648 d.Tasks.AddTask(t)
649
650 t1 := NewAuditTablesTask(d)
651 d.Tasks.AddTask(t1)
652
653 t2 := NewPendingProfilesTask(d)
654 d.Tasks.AddTask(t2)
655}
656
657// DeviceDownInd is called when the logical device state changes to Down.
658func (d *Device) DeviceDownInd() {
659 logger.Warnw(ctx, "Device State change Ind: Down", log.Fields{"Device": d.ID})
660 d.State = DeviceStateDOWN
661 GetController().DeviceDownInd(d.ID)
662}
663
664// DeviceRebootInd is called when the logical device is rebooted.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530665func (d *Device) DeviceRebootInd(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530666 logger.Warnw(ctx, "Device State change Ind: Rebooted", log.Fields{"Device": d.ID})
667
668 if d.State == DeviceStateREBOOTED {
669 d.State = DeviceStateREBOOTED
670 logger.Warnw(ctx, "Ignoring Device State change Ind: REBOOT, Device Already in REBOOT state", log.Fields{"Device": d.ID, "SeralNo": d.SerialNum})
671 return
672 }
673
674 d.State = DeviceStateREBOOTED
675 GetController().SetRebootInProgressForDevice(d.ID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530676 GetController().DeviceRebootInd(cntx, d.ID, d.SerialNum, d.SouthBoundID)
677 d.ReSetAllPortStates(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +0530678}
679
680// DeviceDisabledInd is called when the logical device is disabled
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530681func (d *Device) DeviceDisabledInd(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530682 logger.Warnw(ctx, "Device State change Ind: Disabled", log.Fields{"Device": d.ID})
683 d.State = DeviceStateDISABLED
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530684 GetController().DeviceDisableInd(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530685}
686
687//ReSetAllPortStates - Set all logical device port status to DOWN
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530688func (d *Device) ReSetAllPortStates(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530689 logger.Warnw(ctx, "Resetting all Ports State to DOWN", log.Fields{"Device": d.ID, "State": d.State})
690
691 d.portLock.Lock()
692 defer d.portLock.Unlock()
693
694 for _, port := range d.PortsByID {
695 if port.State != PortStateDown {
696 logger.Infow(ctx, "Resetting Port State to DOWN", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530697 GetController().PortDownInd(cntx, d.ID, port.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530698 port.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530699 d.WritePortToDb(cntx, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530700 }
701 }
702}
703
704//ReSetAllPortStatesInDb - Set all logical device port status to DOWN in DB and skip indication to application
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530705func (d *Device) ReSetAllPortStatesInDb(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530706 logger.Warnw(ctx, "Resetting all Ports State to DOWN In DB", log.Fields{"Device": d.ID, "State": d.State})
707
708 d.portLock.Lock()
709 defer d.portLock.Unlock()
710
711 for _, port := range d.PortsByID {
712 if port.State != PortStateDown {
713 logger.Infow(ctx, "Resetting Port State to DOWN and Write to DB", log.Fields{"Device": d.ID, "Port": port})
714 port.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530715 d.WritePortToDb(cntx, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530716 }
717 }
718}
719
720// ProcessPortUpdate deals with the change in port id (ONU movement) and taking action
721// to update only when the port state is DOWN
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530722func (d *Device) ProcessPortUpdate(cntx context.Context, portName string, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530723 if p := d.GetPortByName(portName); p != nil {
724 if p.ID != port {
725 logger.Infow(ctx, "Port ID update indication", log.Fields{"Port": p.Name, "Old PortID": p.ID, "New Port ID": port})
726 if p.State != PortStateDown {
727 logger.Errorw(ctx, "Port ID update failed. Port State UP", log.Fields{"Port": p})
728 return
729 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530730 d.UpdatePortByName(cntx, portName, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530731 logger.Errorw(ctx, "Port ID Updated", log.Fields{"Port": p})
732 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530733 d.ProcessPortState(cntx, port, state)
Naveen Sampath04696f72022-06-13 15:19:14 +0530734 }
735}
736
737// ***Operations Performed on Port state Transitions***
738//
739// |-----------------------------------------------------------------------------|
740// | State | Action |
741// |--------------------|--------------------------------------------------------|
742// | UP | UNI - Trigger Flow addition for service configured |
743// | | NNI - Trigger Flow addition for vnets & mvlan profiles |
744// | | |
745// | DOWN | UNI - Trigger Flow deletion for service configured |
746// | | NNI - Trigger Flow deletion for vnets & mvlan profiles |
747// | | |
748// |-----------------------------------------------------------------------------|
749//
750
751// ProcessPortState deals with the change in port status and taking action
752// based on the new state and the old state
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530753func (d *Device) ProcessPortState(cntx context.Context, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530754 if d.State != DeviceStateUP && !util.IsNniPort(port) {
755 logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
756 return
757 }
758 if p := d.GetPortByID(port); p != nil {
759 logger.Infow(ctx, "Port State Processing", log.Fields{"Received": state, "Current": p.State})
760
761 // Avoid blind initialization as the current tasks in the queue will be lost
762 // Eg: Service Del followed by Port Down - The flows will be dangling
763 // Eg: NNI Down followed by NNI UP - Mcast data flows will be dangling
764 p.Tasks.CheckAndInitialize(d.ctx)
765 if state == uint32(ofp.OfpPortState_OFPPS_LIVE) && p.State == PortStateDown {
766 // Transition from DOWN to UP
767 logger.Infow(ctx, "Port State Change to UP", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530768 GetController().PortUpInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530769 p.State = PortStateUp
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530770 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530771 } else if (state != uint32(ofp.OfpPortState_OFPPS_LIVE)) && (p.State != PortStateDown) {
772 // Transition from UP to Down
773 logger.Infow(ctx, "Port State Change to Down", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530774 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530775 p.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530776 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530777 } else {
778 logger.Warnw(ctx, "Dropping Port Ind: No Change in Port State", log.Fields{"PortName": p.Name, "ID": port, "Device": d.ID, "PortState": p.State, "IncomingState": state})
779 }
780 }
781}
782
783// ProcessPortStateAfterReboot - triggers the port state indication to sort out configu mismatch due to reboot
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530784func (d *Device) ProcessPortStateAfterReboot(cntx context.Context, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530785 if d.State != DeviceStateUP && !util.IsNniPort(port) {
786 logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
787 return
788 }
789 if p := d.GetPortByID(port); p != nil {
790 logger.Infow(ctx, "Port State Processing after Reboot", log.Fields{"Received": state, "Current": p.State})
791 p.Tasks.Initialize(d.ctx)
792 if p.State == PortStateUp {
793 logger.Infow(ctx, "Port State: UP", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530794 GetController().PortUpInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530795 } else if p.State == PortStateDown {
796 logger.Infow(ctx, "Port State: Down", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530797 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530798 }
799 }
800}
801
802// ChangeEvent : Change event brings in ports related changes such as addition/deletion
803// or modification where the port status change up/down is indicated to the
804// controller
805func (d *Device) ChangeEvent(event *ofp.ChangeEvent) error {
806 cet := NewChangeEventTask(d.ctx, event, d)
807 d.AddTask(cet)
808 return nil
809}
810
811// PacketIn handle the incoming packet-in and deliver to the application for the
812// actual processing
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530813func (d *Device) PacketIn(cntx context.Context, pkt *ofp.PacketIn) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530814 logger.Debugw(ctx, "Received a Packet-In", log.Fields{"Device": d.ID})
815 if pkt.PacketIn.Reason != ofp.OfpPacketInReason_OFPR_ACTION {
816 logger.Warnw(ctx, "Unsupported PacketIn Reason", log.Fields{"Reason": pkt.PacketIn.Reason})
817 return
818 }
819 data := pkt.PacketIn.Data
820 port := PacketInGetPort(pkt.PacketIn)
821 if pName, err := d.GetPortName(port); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530822 GetController().PacketInInd(cntx, d.ID, pName, data)
Naveen Sampath04696f72022-06-13 15:19:14 +0530823 } else {
824 logger.Warnw(ctx, "Unknown Port", log.Fields{"Reason": err.Error()})
825 }
826}
827
828// PacketInGetPort to get the port on which the packet-in is reported
829func PacketInGetPort(pkt *ofp.OfpPacketIn) uint32 {
830 for _, field := range pkt.Match.OxmFields {
831 if field.OxmClass == ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
832 if ofbField, ok := field.Field.(*ofp.OfpOxmField_OfbField); ok {
833 if ofbField.OfbField.Type == ofp.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT {
834 if port, ok := ofbField.OfbField.Value.(*ofp.OfpOxmOfbField_Port); ok {
835 return port.Port
836 }
837 }
838 }
839 }
840 }
841 return 0
842}
843
844// PacketOutReq receives the packet out request from the application via the
845// controller. The interface from the application uses name as the identity.
846func (d *Device) PacketOutReq(outport string, inport string, data []byte, isCustomPkt bool) error {
847 inp, err := d.GetPortID(inport)
848 if err != nil {
849 return errors.New("Unknown inport")
850 }
851 outp, err1 := d.GetPortID(outport)
852 if err1 != nil {
853 return errors.New("Unknown outport")
854 }
855 logger.Debugw(ctx, "Sending packet out", log.Fields{"Device": d.ID, "Inport": inport, "Outport": outport})
856 return d.SendPacketOut(outp, inp, data, isCustomPkt)
857}
858
859// SendPacketOut is responsible for building the OF structure and send the
860// packet-out to the VOLTHA
861func (d *Device) SendPacketOut(outport uint32, inport uint32, data []byte, isCustomPkt bool) error {
862 pout := &ofp.PacketOut{}
863 pout.Id = d.ID
864 opout := &ofp.OfpPacketOut{}
865 pout.PacketOut = opout
866 opout.InPort = inport
867 opout.Data = data
868 opout.Actions = []*ofp.OfpAction{
869 {
870 Type: ofp.OfpActionType_OFPAT_OUTPUT,
871 Action: &ofp.OfpAction_Output{
872 Output: &ofp.OfpActionOutput{
873 Port: outport,
874 MaxLen: 65535,
875 },
876 },
877 },
878 }
879 d.packetOutChannel <- pout
880 return nil
881}
882
883// UpdateFlows receives the flows in the form that is implemented
884// in the VGC and transforms them to the OF format. This is handled
885// as a port of the task that is enqueued to do the same.
886func (d *Device) UpdateFlows(flow *of.VoltFlow, devPort *DevicePort) {
887 t := NewAddFlowsTask(d.ctx, flow, d)
888 logger.Debugw(ctx, "Port Context", log.Fields{"Ctx": devPort.GetContext()})
889 // check if port isNni , if yes flows will be added to device port queues.
890 if util.IsNniPort(devPort.ID) {
891 // Adding the flows to device port queues.
892 devPort.AddTask(t)
893 return
894 }
895 // If the flowHash is enabled then add the flows to the flowhash generated queues.
896 flowQueue := d.getAndAddFlowQueueForUniID(uint32(devPort.ID))
897 if flowQueue != nil {
898 logger.Debugw(ctx, "flowHashQId", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID})
899 flowQueue.AddTask(t)
900 logger.Debugw(ctx, "Tasks Info", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID, "Total": flowQueue.TotalTasks(), "Pending": flowQueue.NumPendingTasks()})
901 } else {
902 //FlowThrotling disabled, add to the device port queue
903 devPort.AddTask(t)
904 return
905 }
906}
907
908// UpdateGroup to update group info
909func (d *Device) UpdateGroup(group *of.Group, devPort *DevicePort) {
910 task := NewModGroupTask(d.ctx, group, d)
911 logger.Debugw(ctx, "NNI Port Context", log.Fields{"Ctx": devPort.GetContext()})
912 devPort.AddTask(task)
913}
914
915// ModMeter for mod meter task
916func (d *Device) ModMeter(command of.MeterCommand, meter *of.Meter, devPort *DevicePort) {
917 if command == of.MeterCommandAdd {
918 if _, err := d.GetMeter(meter.ID); err == nil {
919 logger.Debugw(ctx, "Meter already added", log.Fields{"ID": meter.ID})
920 return
921 }
922 }
923 t := NewModMeterTask(d.ctx, command, meter, d)
924 devPort.AddTask(t)
925}
926
927func (d *Device) getAndAddFlowQueueForUniID(id uint32) *UniIDFlowQueue {
928 d.flowQueueLock.RLock()
929 //If flowhash is 0 that means flowhash throttling is disabled, return nil
930 if d.flowHash == 0 {
931 d.flowQueueLock.RUnlock()
932 return nil
933 }
934 flowHashID := id % uint32(d.flowHash)
935 if value, found := d.flowQueue[uint32(flowHashID)]; found {
936 d.flowQueueLock.RUnlock()
937 return value
938 }
939 d.flowQueueLock.RUnlock()
940 logger.Debugw(ctx, "Flow queue not found creating one", log.Fields{"uniid": id, "hash": flowHashID})
941
942 return d.addFlowQueueForUniID(id)
943}
944
945func (d *Device) addFlowQueueForUniID(id uint32) *UniIDFlowQueue {
946
947 d.flowQueueLock.Lock()
948 defer d.flowQueueLock.Unlock()
949 flowHashID := id % uint32(d.flowHash)
950 flowQueue := NewUniIDFlowQueue(uint32(flowHashID))
951 flowQueue.Tasks.Initialize(d.ctx)
952 d.flowQueue[flowHashID] = flowQueue
953 return flowQueue
954}
955
956// SetFlowHash sets the device flow hash and writes to the DB.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530957func (d *Device) SetFlowHash(cntx context.Context, hash uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530958 d.flowQueueLock.Lock()
959 defer d.flowQueueLock.Unlock()
960
961 d.flowHash = hash
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530962 d.writeFlowHashToDB(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +0530963}
964
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530965func (d *Device) writeFlowHashToDB(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530966 hash, err := json.Marshal(d.flowHash)
967 if err != nil {
968 logger.Errorw(ctx, "failed to marshal flow hash", log.Fields{"hash": d.flowHash})
969 return
970 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530971 if err := db.PutFlowHash(cntx, d.ID, string(hash)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530972 logger.Errorw(ctx, "Failed to add flow hash to DB", log.Fields{"device": d.ID, "hash": d.flowHash})
973 }
974}
975
976//isSBOperAllowed - determins if the SB operation is allowed based on device state & force flag
977func (d *Device) isSBOperAllowed(forceAction bool) bool {
978
979 if d.State == DeviceStateUP {
980 return true
981 }
982
983 if d.State == DeviceStateDISABLED && forceAction {
984 return true
985 }
986
987 return false
988}
989
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530990func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530991 flow, _ := d.GetFlow(cookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530992 d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err)
Naveen Sampath04696f72022-06-13 15:19:14 +0530993}
994
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530995func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530996
997 statusCode, statusMsg := infraerror.GetErrorInfo(err)
998 success := isFlowOperSuccess(statusCode, oper)
999
1000 updateFlow := func(cookie uint64, state int, reason string) {
1001 if dbFlow, ok := d.GetFlow(cookie); ok {
1002 dbFlow.State = uint8(state)
1003 dbFlow.ErrorReason = reason
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301004 d.AddFlowToDb(cntx, dbFlow)
Naveen Sampath04696f72022-06-13 15:19:14 +05301005 }
1006 }
1007
1008 //Update flow results
1009 // Add - Update Success or Failure status with reason
1010 // Del - Delete entry from DB on success else update error reason
1011 if oper == of.CommandAdd {
1012 state := of.FlowAddSuccess
1013 reason := ""
1014 if !success {
1015 state = of.FlowAddFailure
1016 reason = statusMsg
1017 }
1018 updateFlow(cookie, state, reason)
1019 logger.Debugw(ctx, "Updated Flow to DB", log.Fields{"Cookie": cookie, "State": state})
1020 } else {
1021 if success && flow != nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301022 if err := d.DelFlow(cntx, flow); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301023 logger.Warnw(ctx, "Delete Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
1024 }
1025 } else if !success {
1026 updateFlow(cookie, of.FlowDelFailure, statusMsg)
1027 }
1028 }
1029
1030 flowResult := intf.FlowStatus{
1031 Cookie: strconv.FormatUint(cookie, 10),
1032 Device: d.ID,
1033 FlowModType: oper,
1034 Flow: flow,
1035 Status: statusCode,
1036 Reason: statusMsg,
1037 AdditionalData: bwDetails,
1038 }
1039
1040 logger.Infow(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301041 GetController().ProcessFlowModResultIndication(cntx, flowResult)
Naveen Sampath04696f72022-06-13 15:19:14 +05301042}