blob: a3527c812fd0469519abfe9f143930f1b4af56ed [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14*/
15
16package controller
17
18import (
19 "context"
20 "encoding/json"
21 "errors"
Tinoj Joseph429b9d92022-11-16 18:51:05 +053022 "fmt"
Naveen Sampath04696f72022-06-13 15:19:14 +053023 infraerror "voltha-go-controller/internal/pkg/errorcodes"
24 "strconv"
Tinoj Joseph429b9d92022-11-16 18:51:05 +053025 "strings"
Naveen Sampath04696f72022-06-13 15:19:14 +053026 "sync"
27 "time"
28
29 "voltha-go-controller/database"
30 "voltha-go-controller/internal/pkg/holder"
31 "voltha-go-controller/internal/pkg/intf"
32 "voltha-go-controller/internal/pkg/of"
33 //"voltha-go-controller/internal/pkg/vpagent"
34 "voltha-go-controller/internal/pkg/tasks"
35 "voltha-go-controller/internal/pkg/util"
Tinoj Joseph1d108322022-07-13 10:07:39 +053036 "voltha-go-controller/log"
Naveen Sampath04696f72022-06-13 15:19:14 +053037 ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
38 "github.com/opencord/voltha-protos/v5/go/voltha"
39)
40
41// PortState type
42type PortState string
43
44const (
45 // PortStateDown constant
46 PortStateDown PortState = "DOWN"
47 // PortStateUp constant
48 PortStateUp PortState = "UP"
49 // DefaultMaxFlowQueues constant
50 DefaultMaxFlowQueues = 67
51 //ErrDuplicateFlow - indicates flow already exists in DB
52 ErrDuplicateFlow string = "Duplicate Flow"
53)
54
55// DevicePort structure
56type DevicePort struct {
57 tasks.Tasks
Tinoj Joseph429b9d92022-11-16 18:51:05 +053058 Name string
59 ID uint32
60 State PortState
61 Version string
62 HwAddr string
63 CurrSpeed uint32
64 MaxSpeed uint32
Naveen Sampath04696f72022-06-13 15:19:14 +053065}
66
67// NewDevicePort is the constructor for DevicePort
Tinoj Joseph429b9d92022-11-16 18:51:05 +053068func NewDevicePort(mp *ofp.OfpPort) *DevicePort {
Naveen Sampath04696f72022-06-13 15:19:14 +053069 var port DevicePort
70
Tinoj Joseph429b9d92022-11-16 18:51:05 +053071 port.ID = mp.PortNo
72 port.Name = mp.Name
73
74 //port.HwAddr = strings.Trim(strings.Join(strings.Fields(fmt.Sprint("%02x", mp.HwAddr)), ":"), "[]")
75 port.HwAddr = strings.Trim(strings.ReplaceAll(fmt.Sprintf("%02x", mp.HwAddr), " ", ":"), "[]")
76 port.CurrSpeed = mp.CurrSpeed
77 port.MaxSpeed = mp.MaxSpeed
Naveen Sampath04696f72022-06-13 15:19:14 +053078 port.State = PortStateDown
79 return &port
80}
81
82// UniIDFlowQueue structure which maintains flows in queue.
83type UniIDFlowQueue struct {
84 tasks.Tasks
85 ID uint32
86}
87
88// NewUniIDFlowQueue is the constructor for UniIDFlowQueue.
89func NewUniIDFlowQueue(id uint32) *UniIDFlowQueue {
90 var flowQueue UniIDFlowQueue
91 flowQueue.ID = id
92 return &flowQueue
93}
94
95// DeviceState type
96type DeviceState string
97
98const (
99
100 // DeviceStateUNKNOWN constant
101 DeviceStateUNKNOWN DeviceState = "UNKNOWN"
102 // DeviceStateINIT constant
103 DeviceStateINIT DeviceState = "INIT"
104 // DeviceStateUP constant
105 DeviceStateUP DeviceState = "UP"
106 // DeviceStateDOWN constant
107 DeviceStateDOWN DeviceState = "DOWN"
108 // DeviceStateREBOOTED constant
109 DeviceStateREBOOTED DeviceState = "REBOOTED"
110 // DeviceStateDISABLED constant
111 DeviceStateDISABLED DeviceState = "DISABLED"
112 // DeviceStateDELETED constant
113 DeviceStateDELETED DeviceState = "DELETED"
114)
115
116// Device structure
117type Device struct {
118 tasks.Tasks
119 ID string
120 SerialNum string
121 State DeviceState
122 PortsByID map[uint32]*DevicePort
123 PortsByName map[string]*DevicePort
124 portLock sync.RWMutex
125 vclientHolder *holder.VolthaServiceClientHolder
126 ctx context.Context
127 cancel context.CancelFunc
128 packetOutChannel chan *ofp.PacketOut
129 flows map[uint64]*of.VoltSubFlow
130 flowLock sync.RWMutex
131 meters map[uint32]*of.Meter
132 meterLock sync.RWMutex
133 groups sync.Map //map[uint32]*of.Group -> [GroupId : Group]
134 auditInProgress bool
135 flowQueueLock sync.RWMutex
136 flowHash uint32
137 flowQueue map[uint32]*UniIDFlowQueue // key is hash ID generated and value is UniIDFlowQueue.
138 deviceAuditInProgress bool
139 SouthBoundID string
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530140 MfrDesc string
141 HwDesc string
142 SwDesc string
143 TimeStamp time.Time
Naveen Sampath04696f72022-06-13 15:19:14 +0530144}
145
146// NewDevice is the constructor for Device
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530147func NewDevice(cntx context.Context, id string, slno string, vclientHldr *holder.VolthaServiceClientHolder, southBoundID, mfr, hwDesc, swDesc string) *Device {
Naveen Sampath04696f72022-06-13 15:19:14 +0530148 var device Device
149 device.ID = id
150 device.SerialNum = slno
151 device.State = DeviceStateDOWN
152 device.PortsByID = make(map[uint32]*DevicePort)
153 device.PortsByName = make(map[string]*DevicePort)
154 device.vclientHolder = vclientHldr
155 device.flows = make(map[uint64]*of.VoltSubFlow)
156 device.meters = make(map[uint32]*of.Meter)
157 device.flowQueue = make(map[uint32]*UniIDFlowQueue)
158 //Get the flowhash from db and update the flowhash variable in the device.
159 device.SouthBoundID = southBoundID
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530160 device.MfrDesc = mfr
161 device.HwDesc = hwDesc
162 device.SwDesc = swDesc
163 device.TimeStamp = time.Now()
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530164 flowHash, err := db.GetFlowHash(cntx, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530165 if err != nil {
166 device.flowHash = DefaultMaxFlowQueues
167 } else {
168 var hash uint32
169 err = json.Unmarshal([]byte(flowHash), &hash)
170 if err != nil {
171 logger.Error(ctx, "Failed to unmarshall flowhash")
172 } else {
173 device.flowHash = hash
174 }
175 }
176 logger.Infow(ctx, "Flow hash for device", log.Fields{"Deviceid": id, "hash": device.flowHash})
177 return &device
178}
179
180// ResetCache to reset cache
181func (d *Device) ResetCache() {
182 logger.Warnw(ctx, "Resetting flows, meters and groups cache", log.Fields{"Device": d.ID})
183 d.flows = make(map[uint64]*of.VoltSubFlow)
184 d.meters = make(map[uint32]*of.Meter)
185 d.groups = sync.Map{}
186}
187
188// GetFlow - Get the flow from device obj
189func (d *Device) GetFlow(cookie uint64) (*of.VoltSubFlow, bool) {
190 d.flowLock.RLock()
191 defer d.flowLock.RUnlock()
192 logger.Infow(ctx, "Get Flow", log.Fields{"Cookie": cookie})
193 flow, ok := d.flows[cookie]
194 return flow, ok
195}
196
Tinoj Josephec742f62022-09-29 19:11:10 +0530197// GetAllFlows - Get the flow from device obj
198func (d *Device) GetAllFlows() ([]*of.VoltSubFlow) {
199 d.flowLock.RLock()
200 defer d.flowLock.RUnlock()
201 var flows []*of.VoltSubFlow
202 logger.Infow(ctx, "Get All Flows", log.Fields{"deviceID": d.ID})
203 for _, f := range d.flows {
204 flows = append(flows, f)
205 }
206 return flows
207}
208
209// GetAllPendingFlows - Get the flow from device obj
210func (d *Device) GetAllPendingFlows() ([]*of.VoltSubFlow) {
211 d.flowLock.RLock()
212 defer d.flowLock.RUnlock()
213 var flows []*of.VoltSubFlow
214 logger.Infow(ctx, "Get All Pending Flows", log.Fields{"deviceID": d.ID})
215 for _, f := range d.flows {
216 if f.State == of.FlowAddPending {
217 flows = append(flows, f)
218 }
219 }
220 return flows
221}
222
Naveen Sampath04696f72022-06-13 15:19:14 +0530223// AddFlow - Adds the flow to the device and also to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530224func (d *Device) AddFlow(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530225 d.flowLock.Lock()
226 defer d.flowLock.Unlock()
227 logger.Infow(ctx, "AddFlow to device", log.Fields{"Cookie": flow.Cookie})
228 if _, ok := d.flows[flow.Cookie]; ok {
229 return errors.New(ErrDuplicateFlow)
230 }
231 d.flows[flow.Cookie] = flow
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530232 d.AddFlowToDb(cntx, flow)
Naveen Sampath04696f72022-06-13 15:19:14 +0530233 return nil
234}
235
236// AddFlowToDb is the utility to add the flow to the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530237func (d *Device) AddFlowToDb(cntx context.Context, flow *of.VoltSubFlow) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530238 if b, err := json.Marshal(flow); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530239 if err = db.PutFlow(cntx, d.ID, flow.Cookie, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530240 logger.Errorw(ctx, "Write Flow to DB failed", log.Fields{"device": d.ID, "cookie": flow.Cookie, "Reason": err})
241 }
242 }
243}
244
245// DelFlow - Deletes the flow from the device and the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530246func (d *Device) DelFlow(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530247 d.flowLock.Lock()
248 defer d.flowLock.Unlock()
249 if _, ok := d.flows[flow.Cookie]; ok {
250 delete(d.flows, flow.Cookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530251 d.DelFlowFromDb(cntx, flow.Cookie)
Naveen Sampath04696f72022-06-13 15:19:14 +0530252 return nil
253 }
254 return errors.New("Flow does not Exist")
255}
256
257// DelFlowFromDb is utility to delete the flow from the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530258func (d *Device) DelFlowFromDb(cntx context.Context, flowID uint64) {
259 _ = db.DelFlow(cntx, d.ID, flowID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530260}
261
262// IsFlowPresentWithOldCookie is to check whether there is any flow with old cookie.
263func (d *Device) IsFlowPresentWithOldCookie(flow *of.VoltSubFlow) bool {
264 d.flowLock.RLock()
265 defer d.flowLock.RUnlock()
266 if _, ok := d.flows[flow.Cookie]; ok {
267 return false
268 } else if flow.OldCookie != 0 && flow.Cookie != flow.OldCookie {
269 if _, ok := d.flows[flow.OldCookie]; ok {
270 logger.Infow(ctx, "Flow present with old cookie", log.Fields{"OldCookie": flow.OldCookie})
271 return true
272 }
273 }
274 return false
275}
276
277// DelFlowWithOldCookie is to delete flow with old cookie.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530278func (d *Device) DelFlowWithOldCookie(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530279 d.flowLock.Lock()
280 defer d.flowLock.Unlock()
281 if _, ok := d.flows[flow.OldCookie]; ok {
282 logger.Infow(ctx, "Flow was added before vgc upgrade. Trying to delete with old cookie",
283 log.Fields{"OldCookie": flow.OldCookie})
284 delete(d.flows, flow.OldCookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530285 d.DelFlowFromDb(cntx, flow.OldCookie)
Naveen Sampath04696f72022-06-13 15:19:14 +0530286 return nil
287 }
288 return errors.New("Flow does not Exist")
289}
290
291// RestoreFlowsFromDb to restore flows from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530292func (d *Device) RestoreFlowsFromDb(cntx context.Context) {
293 flows, _ := db.GetFlows(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530294 for _, flow := range flows {
295 b, ok := flow.Value.([]byte)
296 if !ok {
297 logger.Warn(ctx, "The value type is not []byte")
298 continue
299 }
300 d.CreateFlowFromString(b)
301 }
302}
303
304// CreateFlowFromString to create flow from string
305func (d *Device) CreateFlowFromString(b []byte) {
306 var flow of.VoltSubFlow
307 if err := json.Unmarshal(b, &flow); err == nil {
308 if _, ok := d.flows[flow.Cookie]; !ok {
309 logger.Debugw(ctx, "Adding Flow From Db", log.Fields{"Cookie": flow.Cookie})
310 d.flows[flow.Cookie] = &flow
311 } else {
312 logger.Warnw(ctx, "Duplicate Flow", log.Fields{"Cookie": flow.Cookie})
313 }
314 } else {
315 logger.Warn(ctx, "Unmarshal failed")
316 }
317}
318
319// ----------------------------------------------------------
320// Database related functionality
321// Group operations at the device which include update and delete
322
323// UpdateGroupEntry - Adds/Updates the group to the device and also to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530324func (d *Device) UpdateGroupEntry(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530325
326 logger.Infow(ctx, "Update Group to device", log.Fields{"ID": group.GroupID})
327 d.groups.Store(group.GroupID, group)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530328 d.AddGroupToDb(cntx, group)
Naveen Sampath04696f72022-06-13 15:19:14 +0530329}
330
331// AddGroupToDb - Utility to add the group to the device DB
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530332func (d *Device) AddGroupToDb(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530333 if b, err := json.Marshal(group); err == nil {
334 logger.Infow(ctx, "Adding Group to DB", log.Fields{"grp": group, "Json": string(b)})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530335 if err = db.PutGroup(cntx, d.ID, group.GroupID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530336 logger.Errorw(ctx, "Write Group to DB failed", log.Fields{"device": d.ID, "groupID": group.GroupID, "Reason": err})
337 }
338 }
339}
340
341// DelGroupEntry - Deletes the group from the device and the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530342func (d *Device) DelGroupEntry(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530343
344 if _, ok := d.groups.Load(group.GroupID); ok {
345 d.groups.Delete(group.GroupID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530346 d.DelGroupFromDb(cntx, group.GroupID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530347 }
348}
349
350// DelGroupFromDb - Utility to delete the Group from the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530351func (d *Device) DelGroupFromDb(cntx context.Context, groupID uint32) {
352 _ = db.DelGroup(cntx, d.ID, groupID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530353}
354
355//RestoreGroupsFromDb - restores all groups from DB
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530356func (d *Device) RestoreGroupsFromDb(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530357 logger.Info(ctx, "Restoring Groups")
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530358 groups, _ := db.GetGroups(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530359 for _, group := range groups {
360 b, ok := group.Value.([]byte)
361 if !ok {
362 logger.Warn(ctx, "The value type is not []byte")
363 continue
364 }
365 d.CreateGroupFromString(b)
366 }
367}
368
369//CreateGroupFromString - Forms group struct from json string
370func (d *Device) CreateGroupFromString(b []byte) {
371 var group of.Group
372 if err := json.Unmarshal(b, &group); err == nil {
373 if _, ok := d.groups.Load(group.GroupID); !ok {
374 logger.Debugw(ctx, "Adding Group From Db", log.Fields{"GroupId": group.GroupID})
375 d.groups.Store(group.GroupID, &group)
376 } else {
377 logger.Warnw(ctx, "Duplicate Group", log.Fields{"GroupId": group.GroupID})
378 }
379 } else {
380 logger.Warn(ctx, "Unmarshal failed")
381 }
382}
383
384// AddMeter to add meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530385func (d *Device) AddMeter(cntx context.Context, meter *of.Meter) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530386 d.meterLock.Lock()
387 defer d.meterLock.Unlock()
388 if _, ok := d.meters[meter.ID]; ok {
389 return errors.New("Duplicate Meter")
390 }
391 d.meters[meter.ID] = meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530392 go d.AddMeterToDb(cntx, meter)
Naveen Sampath04696f72022-06-13 15:19:14 +0530393 return nil
394}
395
Sridhar Ravindra2d2ef4e2023-02-08 16:43:38 +0530396// UpdateMeter to update meter
397func (d *Device) UpdateMeter(cntx context.Context, meter *of.Meter) error {
398 d.meterLock.Lock()
399 defer d.meterLock.Unlock()
400 if _, ok := d.meters[meter.ID]; ok {
401 d.meters[meter.ID] = meter
402 d.AddMeterToDb(cntx, meter)
403 } else {
404 return errors.New("Meter not found for updation")
405 }
406 return nil
407}
408
Naveen Sampath04696f72022-06-13 15:19:14 +0530409// GetMeter to get meter
410func (d *Device) GetMeter(id uint32) (*of.Meter, error) {
411 d.meterLock.RLock()
412 defer d.meterLock.RUnlock()
413 if m, ok := d.meters[id]; ok {
414 return m, nil
415 }
416 return nil, errors.New("Meter Not Found")
417}
418
419// DelMeter to delete meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530420func (d *Device) DelMeter(cntx context.Context, meter *of.Meter) bool {
Naveen Sampath04696f72022-06-13 15:19:14 +0530421 d.meterLock.Lock()
422 defer d.meterLock.Unlock()
423 if _, ok := d.meters[meter.ID]; ok {
424 delete(d.meters, meter.ID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530425 go d.DelMeterFromDb(cntx, meter.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530426 return true
427 }
428 return false
429}
430
431// AddMeterToDb is utility to add the Group to the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530432func (d *Device) AddMeterToDb(cntx context.Context, meter *of.Meter) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530433 if b, err := json.Marshal(meter); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530434 if err = db.PutDeviceMeter(cntx, d.ID, meter.ID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530435 logger.Errorw(ctx, "Write Meter to DB failed", log.Fields{"device": d.ID, "meterID": meter.ID, "Reason": err})
436 }
437 }
438}
439
440// DelMeterFromDb to delete meter from db
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530441func (d *Device) DelMeterFromDb(cntx context.Context, id uint32) {
442 _ = db.DelDeviceMeter(cntx, d.ID, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530443}
444
445// RestoreMetersFromDb to restore meters from db
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530446func (d *Device) RestoreMetersFromDb(cntx context.Context) {
447 meters, _ := db.GetDeviceMeters(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530448 for _, meter := range meters {
449 b, ok := meter.Value.([]byte)
450 if !ok {
451 logger.Warn(ctx, "The value type is not []byte")
452 continue
453 }
454 d.CreateMeterFromString(b)
455 }
456}
457
458// CreateMeterFromString to create meter from string
459func (d *Device) CreateMeterFromString(b []byte) {
460 var meter of.Meter
461 if err := json.Unmarshal(b, &meter); err == nil {
462 if _, ok := d.meters[meter.ID]; !ok {
463 logger.Debugw(ctx, "Adding Meter From Db", log.Fields{"ID": meter.ID})
464 d.meters[meter.ID] = &meter
465 } else {
466 logger.Warnw(ctx, "Duplicate Meter", log.Fields{"ID": meter.ID})
467 }
468 } else {
469 logger.Warn(ctx, "Unmarshal failed")
470 }
471}
472
473// VolthaClient to get voltha client
474func (d *Device) VolthaClient() voltha.VolthaServiceClient {
475 return d.vclientHolder.Get()
476}
477
478// AddPort to add the port as requested by the device/VOLTHA
479// Inform the application if the port is successfully added
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530480func (d *Device) AddPort(cntx context.Context, mp *ofp.OfpPort) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530481 d.portLock.Lock()
482 defer d.portLock.Unlock()
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530483 id := mp.PortNo
484 name := mp.Name
Naveen Sampath04696f72022-06-13 15:19:14 +0530485 if _, ok := d.PortsByID[id]; ok {
486 return errors.New("Duplicate port")
487 }
488 if _, ok := d.PortsByName[name]; ok {
489 return errors.New("Duplicate port")
490 }
491
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530492 p := NewDevicePort(mp)
Naveen Sampath04696f72022-06-13 15:19:14 +0530493 d.PortsByID[id] = p
494 d.PortsByName[name] = p
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530495 d.WritePortToDb(cntx, p)
496 GetController().PortAddInd(cntx, d.ID, p.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530497 logger.Infow(ctx, "Added Port", log.Fields{"Device": d.ID, "Port": id})
498 return nil
499}
500
501// DelPort to delete the port as requested by the device/VOLTHA
502// Inform the application if the port is successfully deleted
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530503func (d *Device) DelPort(cntx context.Context, id uint32) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530504
505 p := d.GetPortByID(id)
506 if p == nil {
507 return errors.New("Unknown Port")
508 }
509 if p.State == PortStateUp {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530510 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530511 }
Tinoj Joseph4ead4e02023-01-30 03:12:44 +0530512 GetController().PortDelInd(cntx, d.ID, p.Name)
513
Naveen Sampath04696f72022-06-13 15:19:14 +0530514 d.portLock.Lock()
515 defer d.portLock.Unlock()
516
Naveen Sampath04696f72022-06-13 15:19:14 +0530517 delete(d.PortsByID, p.ID)
518 delete(d.PortsByName, p.Name)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530519 d.DelPortFromDb(cntx, p.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530520 logger.Infow(ctx, "Deleted Port", log.Fields{"Device": d.ID, "Port": id})
521 return nil
522}
523
524// UpdatePortByName is utility to update the port by Name
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530525func (d *Device) UpdatePortByName(cntx context.Context, name string, port uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530526 d.portLock.Lock()
527 defer d.portLock.Unlock()
528
529 p, ok := d.PortsByName[name]
530 if !ok {
531 return
532 }
533 delete(d.PortsByID, p.ID)
534 p.ID = port
535 d.PortsByID[port] = p
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530536 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530537 GetController().PortUpdateInd(d.ID, p.Name, p.ID)
538 logger.Infow(ctx, "Updated Port", log.Fields{"Device": d.ID, "Port": p.ID, "PortName": name})
539}
540
541// GetPortName to get the name of the port by its id
542func (d *Device) GetPortName(id uint32) (string, error) {
543 d.portLock.RLock()
544 defer d.portLock.RUnlock()
545
546 if p, ok := d.PortsByID[id]; ok {
547 return p.Name, nil
548 }
549 logger.Errorw(ctx, "Port not found", log.Fields{"port": id})
550 return "", errors.New("Unknown Port ID")
551}
552
553// GetPortByID is utility to retrieve the port by ID
554func (d *Device) GetPortByID(id uint32) *DevicePort {
555 d.portLock.RLock()
556 defer d.portLock.RUnlock()
557
558 p, ok := d.PortsByID[id]
559 if ok {
560 return p
561 }
562 return nil
563}
564
565// GetPortByName is utility to retrieve the port by Name
566func (d *Device) GetPortByName(name string) *DevicePort {
567 d.portLock.RLock()
568 defer d.portLock.RUnlock()
569
570 p, ok := d.PortsByName[name]
571 if ok {
572 return p
573 }
574 return nil
575}
576
577// GetPortState to get the state of the port by name
578func (d *Device) GetPortState(name string) (PortState, error) {
579 d.portLock.RLock()
580 defer d.portLock.RUnlock()
581
582 if p, ok := d.PortsByName[name]; ok {
583 return p.State, nil
584 }
585 return PortStateDown, errors.New("Unknown Port ID")
586}
587
588// GetPortID to get the port-id by the port name
589func (d *Device) GetPortID(name string) (uint32, error) {
590 d.portLock.RLock()
591 defer d.portLock.RUnlock()
592
593 if p, ok := d.PortsByName[name]; ok {
594 return p.ID, nil
595 }
596 return 0, errors.New("Unknown Port ID")
597
598}
599
600// WritePortToDb to add the port to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530601func (d *Device) WritePortToDb(ctx context.Context, port *DevicePort) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530602 port.Version = database.PresentVersionMap[database.DevicePortPath]
603 if b, err := json.Marshal(port); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530604 if err = db.PutPort(ctx, d.ID, port.ID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530605 logger.Errorw(ctx, "Write port to DB failed", log.Fields{"device": d.ID, "port": port.ID, "Reason": err})
606 }
607 }
608}
609
610// DelPortFromDb to delete port from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530611func (d *Device) DelPortFromDb(cntx context.Context, id uint32) {
612 _ = db.DelPort(cntx, d.ID, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530613}
614
615// RestorePortsFromDb to restore ports from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530616func (d *Device) RestorePortsFromDb(cntx context.Context) {
617 ports, _ := db.GetPorts(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530618 for _, port := range ports {
619 b, ok := port.Value.([]byte)
620 if !ok {
621 logger.Warn(ctx, "The value type is not []byte")
622 continue
623 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530624 d.CreatePortFromString(cntx, b)
Naveen Sampath04696f72022-06-13 15:19:14 +0530625 }
626}
627
628// CreatePortFromString to create port from string
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530629func (d *Device) CreatePortFromString(cntx context.Context, b []byte) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530630 var port DevicePort
631 if err := json.Unmarshal(b, &port); err == nil {
632 if _, ok := d.PortsByID[port.ID]; !ok {
633 logger.Debugw(ctx, "Adding Port From Db", log.Fields{"ID": port.ID})
634 d.PortsByID[port.ID] = &port
635 d.PortsByName[port.Name] = &port
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530636 GetController().PortAddInd(cntx, d.ID, port.ID, port.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530637 } else {
638 logger.Warnw(ctx, "Duplicate Port", log.Fields{"ID": port.ID})
639 }
640 } else {
641 logger.Warn(ctx, "Unmarshal failed")
642 }
643}
644
645// Delete : OLT Delete functionality yet to be implemented. IDeally all of the
646// resources should have been removed by this time. It is an error
647// scenario if the OLT has resources associated with it.
648func (d *Device) Delete() {
649 d.StopAll()
650}
651
652// Stop to stop the task
653func (d *Device) Stop() {
654}
655
656// ConnectInd is called when the connection between VGC and the VOLTHA is
657// restored. This will perform audit of the device post reconnection
658func (d *Device) ConnectInd(ctx context.Context, discType intf.DiscoveryType) {
659 logger.Warnw(ctx, "Audit Upon Connection Establishment", log.Fields{"Device": d.ID, "State": d.State})
660 ctx1, cancel := context.WithCancel(ctx)
661 d.cancel = cancel
662 d.ctx = ctx1
663 d.Tasks.Initialize(ctx1)
664
665 logger.Warnw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
666 d.State = DeviceStateUP
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530667 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530668 GetController().DeviceUpInd(d.ID)
669
670 logger.Warnw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
671 t := NewAuditDevice(d, AuditEventDeviceDisc)
672 d.Tasks.AddTask(t)
673
674 t1 := NewAuditTablesTask(d)
675 d.Tasks.AddTask(t1)
676
677 t2 := NewPendingProfilesTask(d)
678 d.Tasks.AddTask(t2)
679
680 go d.synchronizeDeviceTables()
681}
682
683func (d *Device) synchronizeDeviceTables() {
684
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530685 tick := time.NewTicker(GetController().GetDeviceTableSyncDuration())
Naveen Sampath04696f72022-06-13 15:19:14 +0530686loop:
687 for {
688 select {
689 case <-d.ctx.Done():
690 logger.Warnw(d.ctx, "Context Done. Cancelling Periodic Audit", log.Fields{"Context": ctx, "Device": d.ID, "DeviceSerialNum": d.SerialNum})
691 break loop
692 case <-tick.C:
693 t1 := NewAuditTablesTask(d)
694 d.Tasks.AddTask(t1)
695 }
696 }
697 tick.Stop()
698}
699
700// DeviceUpInd is called when the logical device state changes to UP. This will perform audit of the device post reconnection
701func (d *Device) DeviceUpInd() {
702 logger.Warnw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
703 d.State = DeviceStateUP
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530704 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530705 GetController().DeviceUpInd(d.ID)
706
707 logger.Warnw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
708 t := NewAuditDevice(d, AuditEventDeviceDisc)
709 d.Tasks.AddTask(t)
710
711 t1 := NewAuditTablesTask(d)
712 d.Tasks.AddTask(t1)
713
714 t2 := NewPendingProfilesTask(d)
715 d.Tasks.AddTask(t2)
716}
717
718// DeviceDownInd is called when the logical device state changes to Down.
719func (d *Device) DeviceDownInd() {
720 logger.Warnw(ctx, "Device State change Ind: Down", log.Fields{"Device": d.ID})
721 d.State = DeviceStateDOWN
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530722 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530723 GetController().DeviceDownInd(d.ID)
724}
725
726// DeviceRebootInd is called when the logical device is rebooted.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530727func (d *Device) DeviceRebootInd(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530728 logger.Warnw(ctx, "Device State change Ind: Rebooted", log.Fields{"Device": d.ID})
729
730 if d.State == DeviceStateREBOOTED {
731 d.State = DeviceStateREBOOTED
732 logger.Warnw(ctx, "Ignoring Device State change Ind: REBOOT, Device Already in REBOOT state", log.Fields{"Device": d.ID, "SeralNo": d.SerialNum})
733 return
734 }
735
736 d.State = DeviceStateREBOOTED
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530737 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530738 GetController().SetRebootInProgressForDevice(d.ID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530739 GetController().DeviceRebootInd(cntx, d.ID, d.SerialNum, d.SouthBoundID)
740 d.ReSetAllPortStates(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +0530741}
742
743// DeviceDisabledInd is called when the logical device is disabled
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530744func (d *Device) DeviceDisabledInd(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530745 logger.Warnw(ctx, "Device State change Ind: Disabled", log.Fields{"Device": d.ID})
746 d.State = DeviceStateDISABLED
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530747 d.TimeStamp = time.Now()
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530748 GetController().DeviceDisableInd(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530749}
750
751//ReSetAllPortStates - Set all logical device port status to DOWN
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530752func (d *Device) ReSetAllPortStates(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530753 logger.Warnw(ctx, "Resetting all Ports State to DOWN", log.Fields{"Device": d.ID, "State": d.State})
754
755 d.portLock.Lock()
756 defer d.portLock.Unlock()
757
758 for _, port := range d.PortsByID {
759 if port.State != PortStateDown {
760 logger.Infow(ctx, "Resetting Port State to DOWN", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530761 GetController().PortDownInd(cntx, d.ID, port.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530762 port.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530763 d.WritePortToDb(cntx, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530764 }
765 }
766}
767
768//ReSetAllPortStatesInDb - Set all logical device port status to DOWN in DB and skip indication to application
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530769func (d *Device) ReSetAllPortStatesInDb(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530770 logger.Warnw(ctx, "Resetting all Ports State to DOWN In DB", log.Fields{"Device": d.ID, "State": d.State})
771
772 d.portLock.Lock()
773 defer d.portLock.Unlock()
774
775 for _, port := range d.PortsByID {
776 if port.State != PortStateDown {
777 logger.Infow(ctx, "Resetting Port State to DOWN and Write to DB", log.Fields{"Device": d.ID, "Port": port})
778 port.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530779 d.WritePortToDb(cntx, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530780 }
781 }
782}
783
784// ProcessPortUpdate deals with the change in port id (ONU movement) and taking action
785// to update only when the port state is DOWN
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530786func (d *Device) ProcessPortUpdate(cntx context.Context, portName string, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530787 if p := d.GetPortByName(portName); p != nil {
788 if p.ID != port {
789 logger.Infow(ctx, "Port ID update indication", log.Fields{"Port": p.Name, "Old PortID": p.ID, "New Port ID": port})
790 if p.State != PortStateDown {
791 logger.Errorw(ctx, "Port ID update failed. Port State UP", log.Fields{"Port": p})
792 return
793 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530794 d.UpdatePortByName(cntx, portName, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530795 logger.Errorw(ctx, "Port ID Updated", log.Fields{"Port": p})
796 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530797 d.ProcessPortState(cntx, port, state)
Naveen Sampath04696f72022-06-13 15:19:14 +0530798 }
799}
800
801// ***Operations Performed on Port state Transitions***
802//
803// |-----------------------------------------------------------------------------|
804// | State | Action |
805// |--------------------|--------------------------------------------------------|
806// | UP | UNI - Trigger Flow addition for service configured |
807// | | NNI - Trigger Flow addition for vnets & mvlan profiles |
808// | | |
809// | DOWN | UNI - Trigger Flow deletion for service configured |
810// | | NNI - Trigger Flow deletion for vnets & mvlan profiles |
811// | | |
812// |-----------------------------------------------------------------------------|
813//
814
815// ProcessPortState deals with the change in port status and taking action
816// based on the new state and the old state
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530817func (d *Device) ProcessPortState(cntx context.Context, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530818 if d.State != DeviceStateUP && !util.IsNniPort(port) {
819 logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
820 return
821 }
822 if p := d.GetPortByID(port); p != nil {
823 logger.Infow(ctx, "Port State Processing", log.Fields{"Received": state, "Current": p.State})
824
825 // Avoid blind initialization as the current tasks in the queue will be lost
826 // Eg: Service Del followed by Port Down - The flows will be dangling
827 // Eg: NNI Down followed by NNI UP - Mcast data flows will be dangling
828 p.Tasks.CheckAndInitialize(d.ctx)
829 if state == uint32(ofp.OfpPortState_OFPPS_LIVE) && p.State == PortStateDown {
830 // Transition from DOWN to UP
831 logger.Infow(ctx, "Port State Change to UP", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530832 GetController().PortUpInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530833 p.State = PortStateUp
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530834 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530835 } else if (state != uint32(ofp.OfpPortState_OFPPS_LIVE)) && (p.State != PortStateDown) {
836 // Transition from UP to Down
837 logger.Infow(ctx, "Port State Change to Down", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530838 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530839 p.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530840 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530841 } else {
842 logger.Warnw(ctx, "Dropping Port Ind: No Change in Port State", log.Fields{"PortName": p.Name, "ID": port, "Device": d.ID, "PortState": p.State, "IncomingState": state})
843 }
844 }
845}
846
847// ProcessPortStateAfterReboot - triggers the port state indication to sort out configu mismatch due to reboot
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530848func (d *Device) ProcessPortStateAfterReboot(cntx context.Context, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530849 if d.State != DeviceStateUP && !util.IsNniPort(port) {
850 logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
851 return
852 }
853 if p := d.GetPortByID(port); p != nil {
854 logger.Infow(ctx, "Port State Processing after Reboot", log.Fields{"Received": state, "Current": p.State})
855 p.Tasks.Initialize(d.ctx)
856 if p.State == PortStateUp {
857 logger.Infow(ctx, "Port State: UP", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530858 GetController().PortUpInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530859 } else if p.State == PortStateDown {
860 logger.Infow(ctx, "Port State: Down", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530861 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530862 }
863 }
864}
865
866// ChangeEvent : Change event brings in ports related changes such as addition/deletion
867// or modification where the port status change up/down is indicated to the
868// controller
869func (d *Device) ChangeEvent(event *ofp.ChangeEvent) error {
870 cet := NewChangeEventTask(d.ctx, event, d)
871 d.AddTask(cet)
872 return nil
873}
874
875// PacketIn handle the incoming packet-in and deliver to the application for the
876// actual processing
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530877func (d *Device) PacketIn(cntx context.Context, pkt *ofp.PacketIn) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530878 logger.Debugw(ctx, "Received a Packet-In", log.Fields{"Device": d.ID})
879 if pkt.PacketIn.Reason != ofp.OfpPacketInReason_OFPR_ACTION {
880 logger.Warnw(ctx, "Unsupported PacketIn Reason", log.Fields{"Reason": pkt.PacketIn.Reason})
881 return
882 }
883 data := pkt.PacketIn.Data
884 port := PacketInGetPort(pkt.PacketIn)
885 if pName, err := d.GetPortName(port); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530886 GetController().PacketInInd(cntx, d.ID, pName, data)
Naveen Sampath04696f72022-06-13 15:19:14 +0530887 } else {
888 logger.Warnw(ctx, "Unknown Port", log.Fields{"Reason": err.Error()})
889 }
890}
891
892// PacketInGetPort to get the port on which the packet-in is reported
893func PacketInGetPort(pkt *ofp.OfpPacketIn) uint32 {
894 for _, field := range pkt.Match.OxmFields {
895 if field.OxmClass == ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
896 if ofbField, ok := field.Field.(*ofp.OfpOxmField_OfbField); ok {
897 if ofbField.OfbField.Type == ofp.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT {
898 if port, ok := ofbField.OfbField.Value.(*ofp.OfpOxmOfbField_Port); ok {
899 return port.Port
900 }
901 }
902 }
903 }
904 }
905 return 0
906}
907
908// PacketOutReq receives the packet out request from the application via the
909// controller. The interface from the application uses name as the identity.
910func (d *Device) PacketOutReq(outport string, inport string, data []byte, isCustomPkt bool) error {
911 inp, err := d.GetPortID(inport)
912 if err != nil {
913 return errors.New("Unknown inport")
914 }
915 outp, err1 := d.GetPortID(outport)
916 if err1 != nil {
917 return errors.New("Unknown outport")
918 }
919 logger.Debugw(ctx, "Sending packet out", log.Fields{"Device": d.ID, "Inport": inport, "Outport": outport})
920 return d.SendPacketOut(outp, inp, data, isCustomPkt)
921}
922
923// SendPacketOut is responsible for building the OF structure and send the
924// packet-out to the VOLTHA
925func (d *Device) SendPacketOut(outport uint32, inport uint32, data []byte, isCustomPkt bool) error {
926 pout := &ofp.PacketOut{}
927 pout.Id = d.ID
928 opout := &ofp.OfpPacketOut{}
929 pout.PacketOut = opout
930 opout.InPort = inport
931 opout.Data = data
932 opout.Actions = []*ofp.OfpAction{
933 {
934 Type: ofp.OfpActionType_OFPAT_OUTPUT,
935 Action: &ofp.OfpAction_Output{
936 Output: &ofp.OfpActionOutput{
937 Port: outport,
938 MaxLen: 65535,
939 },
940 },
941 },
942 }
943 d.packetOutChannel <- pout
944 return nil
945}
946
947// UpdateFlows receives the flows in the form that is implemented
948// in the VGC and transforms them to the OF format. This is handled
949// as a port of the task that is enqueued to do the same.
950func (d *Device) UpdateFlows(flow *of.VoltFlow, devPort *DevicePort) {
951 t := NewAddFlowsTask(d.ctx, flow, d)
952 logger.Debugw(ctx, "Port Context", log.Fields{"Ctx": devPort.GetContext()})
953 // check if port isNni , if yes flows will be added to device port queues.
954 if util.IsNniPort(devPort.ID) {
955 // Adding the flows to device port queues.
956 devPort.AddTask(t)
957 return
958 }
959 // If the flowHash is enabled then add the flows to the flowhash generated queues.
960 flowQueue := d.getAndAddFlowQueueForUniID(uint32(devPort.ID))
961 if flowQueue != nil {
962 logger.Debugw(ctx, "flowHashQId", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID})
963 flowQueue.AddTask(t)
964 logger.Debugw(ctx, "Tasks Info", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID, "Total": flowQueue.TotalTasks(), "Pending": flowQueue.NumPendingTasks()})
965 } else {
966 //FlowThrotling disabled, add to the device port queue
967 devPort.AddTask(t)
968 return
969 }
970}
971
972// UpdateGroup to update group info
973func (d *Device) UpdateGroup(group *of.Group, devPort *DevicePort) {
974 task := NewModGroupTask(d.ctx, group, d)
975 logger.Debugw(ctx, "NNI Port Context", log.Fields{"Ctx": devPort.GetContext()})
976 devPort.AddTask(task)
977}
978
979// ModMeter for mod meter task
980func (d *Device) ModMeter(command of.MeterCommand, meter *of.Meter, devPort *DevicePort) {
981 if command == of.MeterCommandAdd {
982 if _, err := d.GetMeter(meter.ID); err == nil {
983 logger.Debugw(ctx, "Meter already added", log.Fields{"ID": meter.ID})
984 return
985 }
986 }
987 t := NewModMeterTask(d.ctx, command, meter, d)
988 devPort.AddTask(t)
989}
990
991func (d *Device) getAndAddFlowQueueForUniID(id uint32) *UniIDFlowQueue {
992 d.flowQueueLock.RLock()
993 //If flowhash is 0 that means flowhash throttling is disabled, return nil
994 if d.flowHash == 0 {
995 d.flowQueueLock.RUnlock()
996 return nil
997 }
998 flowHashID := id % uint32(d.flowHash)
999 if value, found := d.flowQueue[uint32(flowHashID)]; found {
1000 d.flowQueueLock.RUnlock()
1001 return value
1002 }
1003 d.flowQueueLock.RUnlock()
1004 logger.Debugw(ctx, "Flow queue not found creating one", log.Fields{"uniid": id, "hash": flowHashID})
1005
1006 return d.addFlowQueueForUniID(id)
1007}
1008
1009func (d *Device) addFlowQueueForUniID(id uint32) *UniIDFlowQueue {
1010
1011 d.flowQueueLock.Lock()
1012 defer d.flowQueueLock.Unlock()
1013 flowHashID := id % uint32(d.flowHash)
1014 flowQueue := NewUniIDFlowQueue(uint32(flowHashID))
1015 flowQueue.Tasks.Initialize(d.ctx)
1016 d.flowQueue[flowHashID] = flowQueue
1017 return flowQueue
1018}
1019
1020// SetFlowHash sets the device flow hash and writes to the DB.
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301021func (d *Device) SetFlowHash(cntx context.Context, hash uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301022 d.flowQueueLock.Lock()
1023 defer d.flowQueueLock.Unlock()
1024
1025 d.flowHash = hash
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301026 d.writeFlowHashToDB(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +05301027}
1028
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301029func (d *Device) writeFlowHashToDB(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301030 hash, err := json.Marshal(d.flowHash)
1031 if err != nil {
1032 logger.Errorw(ctx, "failed to marshal flow hash", log.Fields{"hash": d.flowHash})
1033 return
1034 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301035 if err := db.PutFlowHash(cntx, d.ID, string(hash)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301036 logger.Errorw(ctx, "Failed to add flow hash to DB", log.Fields{"device": d.ID, "hash": d.flowHash})
1037 }
1038}
1039
1040//isSBOperAllowed - determins if the SB operation is allowed based on device state & force flag
1041func (d *Device) isSBOperAllowed(forceAction bool) bool {
1042
1043 if d.State == DeviceStateUP {
1044 return true
1045 }
1046
1047 if d.State == DeviceStateDISABLED && forceAction {
1048 return true
1049 }
1050
1051 return false
1052}
1053
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301054func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301055 flow, _ := d.GetFlow(cookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301056 d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err)
Naveen Sampath04696f72022-06-13 15:19:14 +05301057}
1058
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301059func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301060
1061 statusCode, statusMsg := infraerror.GetErrorInfo(err)
1062 success := isFlowOperSuccess(statusCode, oper)
1063
1064 updateFlow := func(cookie uint64, state int, reason string) {
1065 if dbFlow, ok := d.GetFlow(cookie); ok {
1066 dbFlow.State = uint8(state)
1067 dbFlow.ErrorReason = reason
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301068 d.AddFlowToDb(cntx, dbFlow)
Naveen Sampath04696f72022-06-13 15:19:14 +05301069 }
1070 }
1071
1072 //Update flow results
1073 // Add - Update Success or Failure status with reason
1074 // Del - Delete entry from DB on success else update error reason
1075 if oper == of.CommandAdd {
1076 state := of.FlowAddSuccess
1077 reason := ""
1078 if !success {
1079 state = of.FlowAddFailure
1080 reason = statusMsg
1081 }
1082 updateFlow(cookie, state, reason)
1083 logger.Debugw(ctx, "Updated Flow to DB", log.Fields{"Cookie": cookie, "State": state})
1084 } else {
1085 if success && flow != nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301086 if err := d.DelFlow(cntx, flow); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301087 logger.Warnw(ctx, "Delete Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
1088 }
1089 } else if !success {
1090 updateFlow(cookie, of.FlowDelFailure, statusMsg)
1091 }
1092 }
1093
1094 flowResult := intf.FlowStatus{
1095 Cookie: strconv.FormatUint(cookie, 10),
1096 Device: d.ID,
1097 FlowModType: oper,
1098 Flow: flow,
1099 Status: statusCode,
1100 Reason: statusMsg,
1101 AdditionalData: bwDetails,
1102 }
1103
1104 logger.Infow(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301105 GetController().ProcessFlowModResultIndication(cntx, flowResult)
Naveen Sampath04696f72022-06-13 15:19:14 +05301106}