blob: d60855a50b3905df24b0238be24264aded56415c [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14*/
15
16package controller
17
18import (
19 "context"
20 "encoding/json"
21 "errors"
22 infraerror "voltha-go-controller/internal/pkg/errorcodes"
23 "strconv"
24 "sync"
25 "time"
26
27 "voltha-go-controller/database"
28 "voltha-go-controller/internal/pkg/holder"
29 "voltha-go-controller/internal/pkg/intf"
30 "voltha-go-controller/internal/pkg/of"
31 //"voltha-go-controller/internal/pkg/vpagent"
32 "voltha-go-controller/internal/pkg/tasks"
33 "voltha-go-controller/internal/pkg/util"
Tinoj Joseph1d108322022-07-13 10:07:39 +053034 "voltha-go-controller/log"
Naveen Sampath04696f72022-06-13 15:19:14 +053035 ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
36 "github.com/opencord/voltha-protos/v5/go/voltha"
37)
38
39// PortState type
40type PortState string
41
42const (
43 // PortStateDown constant
44 PortStateDown PortState = "DOWN"
45 // PortStateUp constant
46 PortStateUp PortState = "UP"
47 // DefaultMaxFlowQueues constant
48 DefaultMaxFlowQueues = 67
49 //ErrDuplicateFlow - indicates flow already exists in DB
50 ErrDuplicateFlow string = "Duplicate Flow"
51)
52
53// DevicePort structure
54type DevicePort struct {
55 tasks.Tasks
56 Name string
57 ID uint32
58 State PortState
59 Version string
60}
61
62// NewDevicePort is the constructor for DevicePort
63func NewDevicePort(id uint32, name string) *DevicePort {
64 var port DevicePort
65
66 port.ID = id
67 port.Name = name
68 port.State = PortStateDown
69 return &port
70}
71
72// UniIDFlowQueue structure which maintains flows in queue.
73type UniIDFlowQueue struct {
74 tasks.Tasks
75 ID uint32
76}
77
78// NewUniIDFlowQueue is the constructor for UniIDFlowQueue.
79func NewUniIDFlowQueue(id uint32) *UniIDFlowQueue {
80 var flowQueue UniIDFlowQueue
81 flowQueue.ID = id
82 return &flowQueue
83}
84
85// DeviceState type
86type DeviceState string
87
88const (
89
90 // DeviceStateUNKNOWN constant
91 DeviceStateUNKNOWN DeviceState = "UNKNOWN"
92 // DeviceStateINIT constant
93 DeviceStateINIT DeviceState = "INIT"
94 // DeviceStateUP constant
95 DeviceStateUP DeviceState = "UP"
96 // DeviceStateDOWN constant
97 DeviceStateDOWN DeviceState = "DOWN"
98 // DeviceStateREBOOTED constant
99 DeviceStateREBOOTED DeviceState = "REBOOTED"
100 // DeviceStateDISABLED constant
101 DeviceStateDISABLED DeviceState = "DISABLED"
102 // DeviceStateDELETED constant
103 DeviceStateDELETED DeviceState = "DELETED"
104)
105
106// Device structure
107type Device struct {
108 tasks.Tasks
109 ID string
110 SerialNum string
111 State DeviceState
112 PortsByID map[uint32]*DevicePort
113 PortsByName map[string]*DevicePort
114 portLock sync.RWMutex
115 vclientHolder *holder.VolthaServiceClientHolder
116 ctx context.Context
117 cancel context.CancelFunc
118 packetOutChannel chan *ofp.PacketOut
119 flows map[uint64]*of.VoltSubFlow
120 flowLock sync.RWMutex
121 meters map[uint32]*of.Meter
122 meterLock sync.RWMutex
123 groups sync.Map //map[uint32]*of.Group -> [GroupId : Group]
124 auditInProgress bool
125 flowQueueLock sync.RWMutex
126 flowHash uint32
127 flowQueue map[uint32]*UniIDFlowQueue // key is hash ID generated and value is UniIDFlowQueue.
128 deviceAuditInProgress bool
129 SouthBoundID string
130}
131
132// NewDevice is the constructor for Device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530133func NewDevice(cntx context.Context, id string, slno string, vclientHldr *holder.VolthaServiceClientHolder, southBoundID string) *Device {
Naveen Sampath04696f72022-06-13 15:19:14 +0530134 var device Device
135 device.ID = id
136 device.SerialNum = slno
137 device.State = DeviceStateDOWN
138 device.PortsByID = make(map[uint32]*DevicePort)
139 device.PortsByName = make(map[string]*DevicePort)
140 device.vclientHolder = vclientHldr
141 device.flows = make(map[uint64]*of.VoltSubFlow)
142 device.meters = make(map[uint32]*of.Meter)
143 device.flowQueue = make(map[uint32]*UniIDFlowQueue)
144 //Get the flowhash from db and update the flowhash variable in the device.
145 device.SouthBoundID = southBoundID
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530146 flowHash, err := db.GetFlowHash(cntx, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530147 if err != nil {
148 device.flowHash = DefaultMaxFlowQueues
149 } else {
150 var hash uint32
151 err = json.Unmarshal([]byte(flowHash), &hash)
152 if err != nil {
153 logger.Error(ctx, "Failed to unmarshall flowhash")
154 } else {
155 device.flowHash = hash
156 }
157 }
158 logger.Infow(ctx, "Flow hash for device", log.Fields{"Deviceid": id, "hash": device.flowHash})
159 return &device
160}
161
162// ResetCache to reset cache
163func (d *Device) ResetCache() {
164 logger.Warnw(ctx, "Resetting flows, meters and groups cache", log.Fields{"Device": d.ID})
165 d.flows = make(map[uint64]*of.VoltSubFlow)
166 d.meters = make(map[uint32]*of.Meter)
167 d.groups = sync.Map{}
168}
169
170// GetFlow - Get the flow from device obj
171func (d *Device) GetFlow(cookie uint64) (*of.VoltSubFlow, bool) {
172 d.flowLock.RLock()
173 defer d.flowLock.RUnlock()
174 logger.Infow(ctx, "Get Flow", log.Fields{"Cookie": cookie})
175 flow, ok := d.flows[cookie]
176 return flow, ok
177}
178
Tinoj Josephec742f62022-09-29 19:11:10 +0530179// GetAllFlows - Get the flow from device obj
180func (d *Device) GetAllFlows() ([]*of.VoltSubFlow) {
181 d.flowLock.RLock()
182 defer d.flowLock.RUnlock()
183 var flows []*of.VoltSubFlow
184 logger.Infow(ctx, "Get All Flows", log.Fields{"deviceID": d.ID})
185 for _, f := range d.flows {
186 flows = append(flows, f)
187 }
188 return flows
189}
190
191// GetAllPendingFlows - Get the flow from device obj
192func (d *Device) GetAllPendingFlows() ([]*of.VoltSubFlow) {
193 d.flowLock.RLock()
194 defer d.flowLock.RUnlock()
195 var flows []*of.VoltSubFlow
196 logger.Infow(ctx, "Get All Pending Flows", log.Fields{"deviceID": d.ID})
197 for _, f := range d.flows {
198 if f.State == of.FlowAddPending {
199 flows = append(flows, f)
200 }
201 }
202 return flows
203}
204
Naveen Sampath04696f72022-06-13 15:19:14 +0530205// AddFlow - Adds the flow to the device and also to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530206func (d *Device) AddFlow(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530207 d.flowLock.Lock()
208 defer d.flowLock.Unlock()
209 logger.Infow(ctx, "AddFlow to device", log.Fields{"Cookie": flow.Cookie})
210 if _, ok := d.flows[flow.Cookie]; ok {
211 return errors.New(ErrDuplicateFlow)
212 }
213 d.flows[flow.Cookie] = flow
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530214 d.AddFlowToDb(cntx, flow)
Naveen Sampath04696f72022-06-13 15:19:14 +0530215 return nil
216}
217
218// AddFlowToDb is the utility to add the flow to the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530219func (d *Device) AddFlowToDb(cntx context.Context, flow *of.VoltSubFlow) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530220 if b, err := json.Marshal(flow); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530221 if err = db.PutFlow(cntx, d.ID, flow.Cookie, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530222 logger.Errorw(ctx, "Write Flow to DB failed", log.Fields{"device": d.ID, "cookie": flow.Cookie, "Reason": err})
223 }
224 }
225}
226
227// DelFlow - Deletes the flow from the device and the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530228func (d *Device) DelFlow(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530229 d.flowLock.Lock()
230 defer d.flowLock.Unlock()
231 if _, ok := d.flows[flow.Cookie]; ok {
232 delete(d.flows, flow.Cookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530233 d.DelFlowFromDb(cntx, flow.Cookie)
Naveen Sampath04696f72022-06-13 15:19:14 +0530234 return nil
235 }
236 return errors.New("Flow does not Exist")
237}
238
239// DelFlowFromDb is utility to delete the flow from the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530240func (d *Device) DelFlowFromDb(cntx context.Context, flowID uint64) {
241 _ = db.DelFlow(cntx, d.ID, flowID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530242}
243
244// IsFlowPresentWithOldCookie is to check whether there is any flow with old cookie.
245func (d *Device) IsFlowPresentWithOldCookie(flow *of.VoltSubFlow) bool {
246 d.flowLock.RLock()
247 defer d.flowLock.RUnlock()
248 if _, ok := d.flows[flow.Cookie]; ok {
249 return false
250 } else if flow.OldCookie != 0 && flow.Cookie != flow.OldCookie {
251 if _, ok := d.flows[flow.OldCookie]; ok {
252 logger.Infow(ctx, "Flow present with old cookie", log.Fields{"OldCookie": flow.OldCookie})
253 return true
254 }
255 }
256 return false
257}
258
259// DelFlowWithOldCookie is to delete flow with old cookie.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530260func (d *Device) DelFlowWithOldCookie(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530261 d.flowLock.Lock()
262 defer d.flowLock.Unlock()
263 if _, ok := d.flows[flow.OldCookie]; ok {
264 logger.Infow(ctx, "Flow was added before vgc upgrade. Trying to delete with old cookie",
265 log.Fields{"OldCookie": flow.OldCookie})
266 delete(d.flows, flow.OldCookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530267 d.DelFlowFromDb(cntx, flow.OldCookie)
Naveen Sampath04696f72022-06-13 15:19:14 +0530268 return nil
269 }
270 return errors.New("Flow does not Exist")
271}
272
273// RestoreFlowsFromDb to restore flows from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530274func (d *Device) RestoreFlowsFromDb(cntx context.Context) {
275 flows, _ := db.GetFlows(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530276 for _, flow := range flows {
277 b, ok := flow.Value.([]byte)
278 if !ok {
279 logger.Warn(ctx, "The value type is not []byte")
280 continue
281 }
282 d.CreateFlowFromString(b)
283 }
284}
285
286// CreateFlowFromString to create flow from string
287func (d *Device) CreateFlowFromString(b []byte) {
288 var flow of.VoltSubFlow
289 if err := json.Unmarshal(b, &flow); err == nil {
290 if _, ok := d.flows[flow.Cookie]; !ok {
291 logger.Debugw(ctx, "Adding Flow From Db", log.Fields{"Cookie": flow.Cookie})
292 d.flows[flow.Cookie] = &flow
293 } else {
294 logger.Warnw(ctx, "Duplicate Flow", log.Fields{"Cookie": flow.Cookie})
295 }
296 } else {
297 logger.Warn(ctx, "Unmarshal failed")
298 }
299}
300
301// ----------------------------------------------------------
302// Database related functionality
303// Group operations at the device which include update and delete
304
305// UpdateGroupEntry - Adds/Updates the group to the device and also to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530306func (d *Device) UpdateGroupEntry(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530307
308 logger.Infow(ctx, "Update Group to device", log.Fields{"ID": group.GroupID})
309 d.groups.Store(group.GroupID, group)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530310 d.AddGroupToDb(cntx, group)
Naveen Sampath04696f72022-06-13 15:19:14 +0530311}
312
313// AddGroupToDb - Utility to add the group to the device DB
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530314func (d *Device) AddGroupToDb(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530315 if b, err := json.Marshal(group); err == nil {
316 logger.Infow(ctx, "Adding Group to DB", log.Fields{"grp": group, "Json": string(b)})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530317 if err = db.PutGroup(cntx, d.ID, group.GroupID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530318 logger.Errorw(ctx, "Write Group to DB failed", log.Fields{"device": d.ID, "groupID": group.GroupID, "Reason": err})
319 }
320 }
321}
322
323// DelGroupEntry - Deletes the group from the device and the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530324func (d *Device) DelGroupEntry(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530325
326 if _, ok := d.groups.Load(group.GroupID); ok {
327 d.groups.Delete(group.GroupID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530328 d.DelGroupFromDb(cntx, group.GroupID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530329 }
330}
331
332// DelGroupFromDb - Utility to delete the Group from the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530333func (d *Device) DelGroupFromDb(cntx context.Context, groupID uint32) {
334 _ = db.DelGroup(cntx, d.ID, groupID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530335}
336
337//RestoreGroupsFromDb - restores all groups from DB
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530338func (d *Device) RestoreGroupsFromDb(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530339 logger.Info(ctx, "Restoring Groups")
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530340 groups, _ := db.GetGroups(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530341 for _, group := range groups {
342 b, ok := group.Value.([]byte)
343 if !ok {
344 logger.Warn(ctx, "The value type is not []byte")
345 continue
346 }
347 d.CreateGroupFromString(b)
348 }
349}
350
351//CreateGroupFromString - Forms group struct from json string
352func (d *Device) CreateGroupFromString(b []byte) {
353 var group of.Group
354 if err := json.Unmarshal(b, &group); err == nil {
355 if _, ok := d.groups.Load(group.GroupID); !ok {
356 logger.Debugw(ctx, "Adding Group From Db", log.Fields{"GroupId": group.GroupID})
357 d.groups.Store(group.GroupID, &group)
358 } else {
359 logger.Warnw(ctx, "Duplicate Group", log.Fields{"GroupId": group.GroupID})
360 }
361 } else {
362 logger.Warn(ctx, "Unmarshal failed")
363 }
364}
365
366// AddMeter to add meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530367func (d *Device) AddMeter(cntx context.Context, meter *of.Meter) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530368 d.meterLock.Lock()
369 defer d.meterLock.Unlock()
370 if _, ok := d.meters[meter.ID]; ok {
371 return errors.New("Duplicate Meter")
372 }
373 d.meters[meter.ID] = meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530374 go d.AddMeterToDb(cntx, meter)
Naveen Sampath04696f72022-06-13 15:19:14 +0530375 return nil
376}
377
378// GetMeter to get meter
379func (d *Device) GetMeter(id uint32) (*of.Meter, error) {
380 d.meterLock.RLock()
381 defer d.meterLock.RUnlock()
382 if m, ok := d.meters[id]; ok {
383 return m, nil
384 }
385 return nil, errors.New("Meter Not Found")
386}
387
388// DelMeter to delete meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530389func (d *Device) DelMeter(cntx context.Context, meter *of.Meter) bool {
Naveen Sampath04696f72022-06-13 15:19:14 +0530390 d.meterLock.Lock()
391 defer d.meterLock.Unlock()
392 if _, ok := d.meters[meter.ID]; ok {
393 delete(d.meters, meter.ID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530394 go d.DelMeterFromDb(cntx, meter.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530395 return true
396 }
397 return false
398}
399
400// AddMeterToDb is utility to add the Group to the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530401func (d *Device) AddMeterToDb(cntx context.Context, meter *of.Meter) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530402 if b, err := json.Marshal(meter); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530403 if err = db.PutDeviceMeter(cntx, d.ID, meter.ID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530404 logger.Errorw(ctx, "Write Meter to DB failed", log.Fields{"device": d.ID, "meterID": meter.ID, "Reason": err})
405 }
406 }
407}
408
409// DelMeterFromDb to delete meter from db
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530410func (d *Device) DelMeterFromDb(cntx context.Context, id uint32) {
411 _ = db.DelDeviceMeter(cntx, d.ID, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530412}
413
414// RestoreMetersFromDb to restore meters from db
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530415func (d *Device) RestoreMetersFromDb(cntx context.Context) {
416 meters, _ := db.GetDeviceMeters(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530417 for _, meter := range meters {
418 b, ok := meter.Value.([]byte)
419 if !ok {
420 logger.Warn(ctx, "The value type is not []byte")
421 continue
422 }
423 d.CreateMeterFromString(b)
424 }
425}
426
427// CreateMeterFromString to create meter from string
428func (d *Device) CreateMeterFromString(b []byte) {
429 var meter of.Meter
430 if err := json.Unmarshal(b, &meter); err == nil {
431 if _, ok := d.meters[meter.ID]; !ok {
432 logger.Debugw(ctx, "Adding Meter From Db", log.Fields{"ID": meter.ID})
433 d.meters[meter.ID] = &meter
434 } else {
435 logger.Warnw(ctx, "Duplicate Meter", log.Fields{"ID": meter.ID})
436 }
437 } else {
438 logger.Warn(ctx, "Unmarshal failed")
439 }
440}
441
442// VolthaClient to get voltha client
443func (d *Device) VolthaClient() voltha.VolthaServiceClient {
444 return d.vclientHolder.Get()
445}
446
447// AddPort to add the port as requested by the device/VOLTHA
448// Inform the application if the port is successfully added
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530449func (d *Device) AddPort(cntx context.Context, id uint32, name string) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530450 d.portLock.Lock()
451 defer d.portLock.Unlock()
452
453 if _, ok := d.PortsByID[id]; ok {
454 return errors.New("Duplicate port")
455 }
456 if _, ok := d.PortsByName[name]; ok {
457 return errors.New("Duplicate port")
458 }
459
460 p := NewDevicePort(id, name)
461 d.PortsByID[id] = p
462 d.PortsByName[name] = p
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530463 d.WritePortToDb(cntx, p)
464 GetController().PortAddInd(cntx, d.ID, p.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530465 logger.Infow(ctx, "Added Port", log.Fields{"Device": d.ID, "Port": id})
466 return nil
467}
468
469// DelPort to delete the port as requested by the device/VOLTHA
470// Inform the application if the port is successfully deleted
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530471func (d *Device) DelPort(cntx context.Context, id uint32) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530472
473 p := d.GetPortByID(id)
474 if p == nil {
475 return errors.New("Unknown Port")
476 }
477 if p.State == PortStateUp {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530478 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530479 }
480 d.portLock.Lock()
481 defer d.portLock.Unlock()
482
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530483 GetController().PortDelInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530484 delete(d.PortsByID, p.ID)
485 delete(d.PortsByName, p.Name)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530486 d.DelPortFromDb(cntx, p.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530487 logger.Infow(ctx, "Deleted Port", log.Fields{"Device": d.ID, "Port": id})
488 return nil
489}
490
491// UpdatePortByName is utility to update the port by Name
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530492func (d *Device) UpdatePortByName(cntx context.Context, name string, port uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530493 d.portLock.Lock()
494 defer d.portLock.Unlock()
495
496 p, ok := d.PortsByName[name]
497 if !ok {
498 return
499 }
500 delete(d.PortsByID, p.ID)
501 p.ID = port
502 d.PortsByID[port] = p
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530503 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530504 GetController().PortUpdateInd(d.ID, p.Name, p.ID)
505 logger.Infow(ctx, "Updated Port", log.Fields{"Device": d.ID, "Port": p.ID, "PortName": name})
506}
507
508// GetPortName to get the name of the port by its id
509func (d *Device) GetPortName(id uint32) (string, error) {
510 d.portLock.RLock()
511 defer d.portLock.RUnlock()
512
513 if p, ok := d.PortsByID[id]; ok {
514 return p.Name, nil
515 }
516 logger.Errorw(ctx, "Port not found", log.Fields{"port": id})
517 return "", errors.New("Unknown Port ID")
518}
519
520// GetPortByID is utility to retrieve the port by ID
521func (d *Device) GetPortByID(id uint32) *DevicePort {
522 d.portLock.RLock()
523 defer d.portLock.RUnlock()
524
525 p, ok := d.PortsByID[id]
526 if ok {
527 return p
528 }
529 return nil
530}
531
532// GetPortByName is utility to retrieve the port by Name
533func (d *Device) GetPortByName(name string) *DevicePort {
534 d.portLock.RLock()
535 defer d.portLock.RUnlock()
536
537 p, ok := d.PortsByName[name]
538 if ok {
539 return p
540 }
541 return nil
542}
543
544// GetPortState to get the state of the port by name
545func (d *Device) GetPortState(name string) (PortState, error) {
546 d.portLock.RLock()
547 defer d.portLock.RUnlock()
548
549 if p, ok := d.PortsByName[name]; ok {
550 return p.State, nil
551 }
552 return PortStateDown, errors.New("Unknown Port ID")
553}
554
555// GetPortID to get the port-id by the port name
556func (d *Device) GetPortID(name string) (uint32, error) {
557 d.portLock.RLock()
558 defer d.portLock.RUnlock()
559
560 if p, ok := d.PortsByName[name]; ok {
561 return p.ID, nil
562 }
563 return 0, errors.New("Unknown Port ID")
564
565}
566
567// WritePortToDb to add the port to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530568func (d *Device) WritePortToDb(ctx context.Context, port *DevicePort) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530569 port.Version = database.PresentVersionMap[database.DevicePortPath]
570 if b, err := json.Marshal(port); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530571 if err = db.PutPort(ctx, d.ID, port.ID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530572 logger.Errorw(ctx, "Write port to DB failed", log.Fields{"device": d.ID, "port": port.ID, "Reason": err})
573 }
574 }
575}
576
577// DelPortFromDb to delete port from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530578func (d *Device) DelPortFromDb(cntx context.Context, id uint32) {
579 _ = db.DelPort(cntx, d.ID, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530580}
581
582// RestorePortsFromDb to restore ports from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530583func (d *Device) RestorePortsFromDb(cntx context.Context) {
584 ports, _ := db.GetPorts(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530585 for _, port := range ports {
586 b, ok := port.Value.([]byte)
587 if !ok {
588 logger.Warn(ctx, "The value type is not []byte")
589 continue
590 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530591 d.CreatePortFromString(cntx, b)
Naveen Sampath04696f72022-06-13 15:19:14 +0530592 }
593}
594
595// CreatePortFromString to create port from string
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530596func (d *Device) CreatePortFromString(cntx context.Context, b []byte) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530597 var port DevicePort
598 if err := json.Unmarshal(b, &port); err == nil {
599 if _, ok := d.PortsByID[port.ID]; !ok {
600 logger.Debugw(ctx, "Adding Port From Db", log.Fields{"ID": port.ID})
601 d.PortsByID[port.ID] = &port
602 d.PortsByName[port.Name] = &port
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530603 GetController().PortAddInd(cntx, d.ID, port.ID, port.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530604 } else {
605 logger.Warnw(ctx, "Duplicate Port", log.Fields{"ID": port.ID})
606 }
607 } else {
608 logger.Warn(ctx, "Unmarshal failed")
609 }
610}
611
612// Delete : OLT Delete functionality yet to be implemented. IDeally all of the
613// resources should have been removed by this time. It is an error
614// scenario if the OLT has resources associated with it.
615func (d *Device) Delete() {
616 d.StopAll()
617}
618
619// Stop to stop the task
620func (d *Device) Stop() {
621}
622
623// ConnectInd is called when the connection between VGC and the VOLTHA is
624// restored. This will perform audit of the device post reconnection
625func (d *Device) ConnectInd(ctx context.Context, discType intf.DiscoveryType) {
626 logger.Warnw(ctx, "Audit Upon Connection Establishment", log.Fields{"Device": d.ID, "State": d.State})
627 ctx1, cancel := context.WithCancel(ctx)
628 d.cancel = cancel
629 d.ctx = ctx1
630 d.Tasks.Initialize(ctx1)
631
632 logger.Warnw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
633 d.State = DeviceStateUP
634 GetController().DeviceUpInd(d.ID)
635
636 logger.Warnw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
637 t := NewAuditDevice(d, AuditEventDeviceDisc)
638 d.Tasks.AddTask(t)
639
640 t1 := NewAuditTablesTask(d)
641 d.Tasks.AddTask(t1)
642
643 t2 := NewPendingProfilesTask(d)
644 d.Tasks.AddTask(t2)
645
646 go d.synchronizeDeviceTables()
647}
648
649func (d *Device) synchronizeDeviceTables() {
650
651 tick := time.NewTicker(deviceTableSyncDuration)
652loop:
653 for {
654 select {
655 case <-d.ctx.Done():
656 logger.Warnw(d.ctx, "Context Done. Cancelling Periodic Audit", log.Fields{"Context": ctx, "Device": d.ID, "DeviceSerialNum": d.SerialNum})
657 break loop
658 case <-tick.C:
659 t1 := NewAuditTablesTask(d)
660 d.Tasks.AddTask(t1)
661 }
662 }
663 tick.Stop()
664}
665
666// DeviceUpInd is called when the logical device state changes to UP. This will perform audit of the device post reconnection
667func (d *Device) DeviceUpInd() {
668 logger.Warnw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
669 d.State = DeviceStateUP
670 GetController().DeviceUpInd(d.ID)
671
672 logger.Warnw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
673 t := NewAuditDevice(d, AuditEventDeviceDisc)
674 d.Tasks.AddTask(t)
675
676 t1 := NewAuditTablesTask(d)
677 d.Tasks.AddTask(t1)
678
679 t2 := NewPendingProfilesTask(d)
680 d.Tasks.AddTask(t2)
681}
682
683// DeviceDownInd is called when the logical device state changes to Down.
684func (d *Device) DeviceDownInd() {
685 logger.Warnw(ctx, "Device State change Ind: Down", log.Fields{"Device": d.ID})
686 d.State = DeviceStateDOWN
687 GetController().DeviceDownInd(d.ID)
688}
689
690// DeviceRebootInd is called when the logical device is rebooted.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530691func (d *Device) DeviceRebootInd(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530692 logger.Warnw(ctx, "Device State change Ind: Rebooted", log.Fields{"Device": d.ID})
693
694 if d.State == DeviceStateREBOOTED {
695 d.State = DeviceStateREBOOTED
696 logger.Warnw(ctx, "Ignoring Device State change Ind: REBOOT, Device Already in REBOOT state", log.Fields{"Device": d.ID, "SeralNo": d.SerialNum})
697 return
698 }
699
700 d.State = DeviceStateREBOOTED
701 GetController().SetRebootInProgressForDevice(d.ID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530702 GetController().DeviceRebootInd(cntx, d.ID, d.SerialNum, d.SouthBoundID)
703 d.ReSetAllPortStates(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +0530704}
705
706// DeviceDisabledInd is called when the logical device is disabled
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530707func (d *Device) DeviceDisabledInd(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530708 logger.Warnw(ctx, "Device State change Ind: Disabled", log.Fields{"Device": d.ID})
709 d.State = DeviceStateDISABLED
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530710 GetController().DeviceDisableInd(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530711}
712
713//ReSetAllPortStates - Set all logical device port status to DOWN
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530714func (d *Device) ReSetAllPortStates(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530715 logger.Warnw(ctx, "Resetting all Ports State to DOWN", log.Fields{"Device": d.ID, "State": d.State})
716
717 d.portLock.Lock()
718 defer d.portLock.Unlock()
719
720 for _, port := range d.PortsByID {
721 if port.State != PortStateDown {
722 logger.Infow(ctx, "Resetting Port State to DOWN", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530723 GetController().PortDownInd(cntx, d.ID, port.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530724 port.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530725 d.WritePortToDb(cntx, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530726 }
727 }
728}
729
730//ReSetAllPortStatesInDb - Set all logical device port status to DOWN in DB and skip indication to application
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530731func (d *Device) ReSetAllPortStatesInDb(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530732 logger.Warnw(ctx, "Resetting all Ports State to DOWN In DB", log.Fields{"Device": d.ID, "State": d.State})
733
734 d.portLock.Lock()
735 defer d.portLock.Unlock()
736
737 for _, port := range d.PortsByID {
738 if port.State != PortStateDown {
739 logger.Infow(ctx, "Resetting Port State to DOWN and Write to DB", log.Fields{"Device": d.ID, "Port": port})
740 port.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530741 d.WritePortToDb(cntx, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530742 }
743 }
744}
745
746// ProcessPortUpdate deals with the change in port id (ONU movement) and taking action
747// to update only when the port state is DOWN
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530748func (d *Device) ProcessPortUpdate(cntx context.Context, portName string, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530749 if p := d.GetPortByName(portName); p != nil {
750 if p.ID != port {
751 logger.Infow(ctx, "Port ID update indication", log.Fields{"Port": p.Name, "Old PortID": p.ID, "New Port ID": port})
752 if p.State != PortStateDown {
753 logger.Errorw(ctx, "Port ID update failed. Port State UP", log.Fields{"Port": p})
754 return
755 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530756 d.UpdatePortByName(cntx, portName, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530757 logger.Errorw(ctx, "Port ID Updated", log.Fields{"Port": p})
758 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530759 d.ProcessPortState(cntx, port, state)
Naveen Sampath04696f72022-06-13 15:19:14 +0530760 }
761}
762
763// ***Operations Performed on Port state Transitions***
764//
765// |-----------------------------------------------------------------------------|
766// | State | Action |
767// |--------------------|--------------------------------------------------------|
768// | UP | UNI - Trigger Flow addition for service configured |
769// | | NNI - Trigger Flow addition for vnets & mvlan profiles |
770// | | |
771// | DOWN | UNI - Trigger Flow deletion for service configured |
772// | | NNI - Trigger Flow deletion for vnets & mvlan profiles |
773// | | |
774// |-----------------------------------------------------------------------------|
775//
776
777// ProcessPortState deals with the change in port status and taking action
778// based on the new state and the old state
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530779func (d *Device) ProcessPortState(cntx context.Context, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530780 if d.State != DeviceStateUP && !util.IsNniPort(port) {
781 logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
782 return
783 }
784 if p := d.GetPortByID(port); p != nil {
785 logger.Infow(ctx, "Port State Processing", log.Fields{"Received": state, "Current": p.State})
786
787 // Avoid blind initialization as the current tasks in the queue will be lost
788 // Eg: Service Del followed by Port Down - The flows will be dangling
789 // Eg: NNI Down followed by NNI UP - Mcast data flows will be dangling
790 p.Tasks.CheckAndInitialize(d.ctx)
791 if state == uint32(ofp.OfpPortState_OFPPS_LIVE) && p.State == PortStateDown {
792 // Transition from DOWN to UP
793 logger.Infow(ctx, "Port State Change to UP", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530794 GetController().PortUpInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530795 p.State = PortStateUp
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530796 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530797 } else if (state != uint32(ofp.OfpPortState_OFPPS_LIVE)) && (p.State != PortStateDown) {
798 // Transition from UP to Down
799 logger.Infow(ctx, "Port State Change to Down", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530800 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530801 p.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530802 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530803 } else {
804 logger.Warnw(ctx, "Dropping Port Ind: No Change in Port State", log.Fields{"PortName": p.Name, "ID": port, "Device": d.ID, "PortState": p.State, "IncomingState": state})
805 }
806 }
807}
808
809// ProcessPortStateAfterReboot - triggers the port state indication to sort out configu mismatch due to reboot
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530810func (d *Device) ProcessPortStateAfterReboot(cntx context.Context, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530811 if d.State != DeviceStateUP && !util.IsNniPort(port) {
812 logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
813 return
814 }
815 if p := d.GetPortByID(port); p != nil {
816 logger.Infow(ctx, "Port State Processing after Reboot", log.Fields{"Received": state, "Current": p.State})
817 p.Tasks.Initialize(d.ctx)
818 if p.State == PortStateUp {
819 logger.Infow(ctx, "Port State: UP", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530820 GetController().PortUpInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530821 } else if p.State == PortStateDown {
822 logger.Infow(ctx, "Port State: Down", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530823 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530824 }
825 }
826}
827
828// ChangeEvent : Change event brings in ports related changes such as addition/deletion
829// or modification where the port status change up/down is indicated to the
830// controller
831func (d *Device) ChangeEvent(event *ofp.ChangeEvent) error {
832 cet := NewChangeEventTask(d.ctx, event, d)
833 d.AddTask(cet)
834 return nil
835}
836
837// PacketIn handle the incoming packet-in and deliver to the application for the
838// actual processing
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530839func (d *Device) PacketIn(cntx context.Context, pkt *ofp.PacketIn) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530840 logger.Debugw(ctx, "Received a Packet-In", log.Fields{"Device": d.ID})
841 if pkt.PacketIn.Reason != ofp.OfpPacketInReason_OFPR_ACTION {
842 logger.Warnw(ctx, "Unsupported PacketIn Reason", log.Fields{"Reason": pkt.PacketIn.Reason})
843 return
844 }
845 data := pkt.PacketIn.Data
846 port := PacketInGetPort(pkt.PacketIn)
847 if pName, err := d.GetPortName(port); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530848 GetController().PacketInInd(cntx, d.ID, pName, data)
Naveen Sampath04696f72022-06-13 15:19:14 +0530849 } else {
850 logger.Warnw(ctx, "Unknown Port", log.Fields{"Reason": err.Error()})
851 }
852}
853
854// PacketInGetPort to get the port on which the packet-in is reported
855func PacketInGetPort(pkt *ofp.OfpPacketIn) uint32 {
856 for _, field := range pkt.Match.OxmFields {
857 if field.OxmClass == ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
858 if ofbField, ok := field.Field.(*ofp.OfpOxmField_OfbField); ok {
859 if ofbField.OfbField.Type == ofp.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT {
860 if port, ok := ofbField.OfbField.Value.(*ofp.OfpOxmOfbField_Port); ok {
861 return port.Port
862 }
863 }
864 }
865 }
866 }
867 return 0
868}
869
870// PacketOutReq receives the packet out request from the application via the
871// controller. The interface from the application uses name as the identity.
872func (d *Device) PacketOutReq(outport string, inport string, data []byte, isCustomPkt bool) error {
873 inp, err := d.GetPortID(inport)
874 if err != nil {
875 return errors.New("Unknown inport")
876 }
877 outp, err1 := d.GetPortID(outport)
878 if err1 != nil {
879 return errors.New("Unknown outport")
880 }
881 logger.Debugw(ctx, "Sending packet out", log.Fields{"Device": d.ID, "Inport": inport, "Outport": outport})
882 return d.SendPacketOut(outp, inp, data, isCustomPkt)
883}
884
885// SendPacketOut is responsible for building the OF structure and send the
886// packet-out to the VOLTHA
887func (d *Device) SendPacketOut(outport uint32, inport uint32, data []byte, isCustomPkt bool) error {
888 pout := &ofp.PacketOut{}
889 pout.Id = d.ID
890 opout := &ofp.OfpPacketOut{}
891 pout.PacketOut = opout
892 opout.InPort = inport
893 opout.Data = data
894 opout.Actions = []*ofp.OfpAction{
895 {
896 Type: ofp.OfpActionType_OFPAT_OUTPUT,
897 Action: &ofp.OfpAction_Output{
898 Output: &ofp.OfpActionOutput{
899 Port: outport,
900 MaxLen: 65535,
901 },
902 },
903 },
904 }
905 d.packetOutChannel <- pout
906 return nil
907}
908
909// UpdateFlows receives the flows in the form that is implemented
910// in the VGC and transforms them to the OF format. This is handled
911// as a port of the task that is enqueued to do the same.
912func (d *Device) UpdateFlows(flow *of.VoltFlow, devPort *DevicePort) {
913 t := NewAddFlowsTask(d.ctx, flow, d)
914 logger.Debugw(ctx, "Port Context", log.Fields{"Ctx": devPort.GetContext()})
915 // check if port isNni , if yes flows will be added to device port queues.
916 if util.IsNniPort(devPort.ID) {
917 // Adding the flows to device port queues.
918 devPort.AddTask(t)
919 return
920 }
921 // If the flowHash is enabled then add the flows to the flowhash generated queues.
922 flowQueue := d.getAndAddFlowQueueForUniID(uint32(devPort.ID))
923 if flowQueue != nil {
924 logger.Debugw(ctx, "flowHashQId", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID})
925 flowQueue.AddTask(t)
926 logger.Debugw(ctx, "Tasks Info", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID, "Total": flowQueue.TotalTasks(), "Pending": flowQueue.NumPendingTasks()})
927 } else {
928 //FlowThrotling disabled, add to the device port queue
929 devPort.AddTask(t)
930 return
931 }
932}
933
934// UpdateGroup to update group info
935func (d *Device) UpdateGroup(group *of.Group, devPort *DevicePort) {
936 task := NewModGroupTask(d.ctx, group, d)
937 logger.Debugw(ctx, "NNI Port Context", log.Fields{"Ctx": devPort.GetContext()})
938 devPort.AddTask(task)
939}
940
941// ModMeter for mod meter task
942func (d *Device) ModMeter(command of.MeterCommand, meter *of.Meter, devPort *DevicePort) {
943 if command == of.MeterCommandAdd {
944 if _, err := d.GetMeter(meter.ID); err == nil {
945 logger.Debugw(ctx, "Meter already added", log.Fields{"ID": meter.ID})
946 return
947 }
948 }
949 t := NewModMeterTask(d.ctx, command, meter, d)
950 devPort.AddTask(t)
951}
952
953func (d *Device) getAndAddFlowQueueForUniID(id uint32) *UniIDFlowQueue {
954 d.flowQueueLock.RLock()
955 //If flowhash is 0 that means flowhash throttling is disabled, return nil
956 if d.flowHash == 0 {
957 d.flowQueueLock.RUnlock()
958 return nil
959 }
960 flowHashID := id % uint32(d.flowHash)
961 if value, found := d.flowQueue[uint32(flowHashID)]; found {
962 d.flowQueueLock.RUnlock()
963 return value
964 }
965 d.flowQueueLock.RUnlock()
966 logger.Debugw(ctx, "Flow queue not found creating one", log.Fields{"uniid": id, "hash": flowHashID})
967
968 return d.addFlowQueueForUniID(id)
969}
970
971func (d *Device) addFlowQueueForUniID(id uint32) *UniIDFlowQueue {
972
973 d.flowQueueLock.Lock()
974 defer d.flowQueueLock.Unlock()
975 flowHashID := id % uint32(d.flowHash)
976 flowQueue := NewUniIDFlowQueue(uint32(flowHashID))
977 flowQueue.Tasks.Initialize(d.ctx)
978 d.flowQueue[flowHashID] = flowQueue
979 return flowQueue
980}
981
982// SetFlowHash sets the device flow hash and writes to the DB.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530983func (d *Device) SetFlowHash(cntx context.Context, hash uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530984 d.flowQueueLock.Lock()
985 defer d.flowQueueLock.Unlock()
986
987 d.flowHash = hash
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530988 d.writeFlowHashToDB(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +0530989}
990
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530991func (d *Device) writeFlowHashToDB(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530992 hash, err := json.Marshal(d.flowHash)
993 if err != nil {
994 logger.Errorw(ctx, "failed to marshal flow hash", log.Fields{"hash": d.flowHash})
995 return
996 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530997 if err := db.PutFlowHash(cntx, d.ID, string(hash)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530998 logger.Errorw(ctx, "Failed to add flow hash to DB", log.Fields{"device": d.ID, "hash": d.flowHash})
999 }
1000}
1001
1002//isSBOperAllowed - determins if the SB operation is allowed based on device state & force flag
1003func (d *Device) isSBOperAllowed(forceAction bool) bool {
1004
1005 if d.State == DeviceStateUP {
1006 return true
1007 }
1008
1009 if d.State == DeviceStateDISABLED && forceAction {
1010 return true
1011 }
1012
1013 return false
1014}
1015
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301016func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301017 flow, _ := d.GetFlow(cookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301018 d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err)
Naveen Sampath04696f72022-06-13 15:19:14 +05301019}
1020
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301021func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301022
1023 statusCode, statusMsg := infraerror.GetErrorInfo(err)
1024 success := isFlowOperSuccess(statusCode, oper)
1025
1026 updateFlow := func(cookie uint64, state int, reason string) {
1027 if dbFlow, ok := d.GetFlow(cookie); ok {
1028 dbFlow.State = uint8(state)
1029 dbFlow.ErrorReason = reason
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301030 d.AddFlowToDb(cntx, dbFlow)
Naveen Sampath04696f72022-06-13 15:19:14 +05301031 }
1032 }
1033
1034 //Update flow results
1035 // Add - Update Success or Failure status with reason
1036 // Del - Delete entry from DB on success else update error reason
1037 if oper == of.CommandAdd {
1038 state := of.FlowAddSuccess
1039 reason := ""
1040 if !success {
1041 state = of.FlowAddFailure
1042 reason = statusMsg
1043 }
1044 updateFlow(cookie, state, reason)
1045 logger.Debugw(ctx, "Updated Flow to DB", log.Fields{"Cookie": cookie, "State": state})
1046 } else {
1047 if success && flow != nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301048 if err := d.DelFlow(cntx, flow); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301049 logger.Warnw(ctx, "Delete Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
1050 }
1051 } else if !success {
1052 updateFlow(cookie, of.FlowDelFailure, statusMsg)
1053 }
1054 }
1055
1056 flowResult := intf.FlowStatus{
1057 Cookie: strconv.FormatUint(cookie, 10),
1058 Device: d.ID,
1059 FlowModType: oper,
1060 Flow: flow,
1061 Status: statusCode,
1062 Reason: statusMsg,
1063 AdditionalData: bwDetails,
1064 }
1065
1066 logger.Infow(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301067 GetController().ProcessFlowModResultIndication(cntx, flowResult)
Naveen Sampath04696f72022-06-13 15:19:14 +05301068}