blob: 3ac5600c6948bec7fad40aef822ab893e63f7ef8 [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
vinokuma926cb3e2023-03-29 11:41:06 +053014 */
Naveen Sampath04696f72022-06-13 15:19:14 +053015
16package controller
17
18import (
19 "context"
20 "encoding/json"
21 "errors"
Tinoj Joseph429b9d92022-11-16 18:51:05 +053022 "fmt"
Naveen Sampath04696f72022-06-13 15:19:14 +053023 "strconv"
Tinoj Joseph429b9d92022-11-16 18:51:05 +053024 "strings"
Naveen Sampath04696f72022-06-13 15:19:14 +053025 "sync"
26 "time"
vinokuma926cb3e2023-03-29 11:41:06 +053027 infraerror "voltha-go-controller/internal/pkg/errorcodes"
Naveen Sampath04696f72022-06-13 15:19:14 +053028
29 "voltha-go-controller/database"
30 "voltha-go-controller/internal/pkg/holder"
31 "voltha-go-controller/internal/pkg/intf"
32 "voltha-go-controller/internal/pkg/of"
vinokuma926cb3e2023-03-29 11:41:06 +053033
Naveen Sampath04696f72022-06-13 15:19:14 +053034 //"voltha-go-controller/internal/pkg/vpagent"
35 "voltha-go-controller/internal/pkg/tasks"
36 "voltha-go-controller/internal/pkg/util"
Tinoj Joseph1d108322022-07-13 10:07:39 +053037 "voltha-go-controller/log"
vinokuma926cb3e2023-03-29 11:41:06 +053038
Naveen Sampath04696f72022-06-13 15:19:14 +053039 ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
40 "github.com/opencord/voltha-protos/v5/go/voltha"
41)
42
43// PortState type
44type PortState string
45
46const (
47 // PortStateDown constant
48 PortStateDown PortState = "DOWN"
49 // PortStateUp constant
50 PortStateUp PortState = "UP"
51 // DefaultMaxFlowQueues constant
52 DefaultMaxFlowQueues = 67
53 //ErrDuplicateFlow - indicates flow already exists in DB
54 ErrDuplicateFlow string = "Duplicate Flow"
55)
56
57// DevicePort structure
58type DevicePort struct {
vinokuma926cb3e2023-03-29 11:41:06 +053059 Name string
60 State PortState
61 Version string
62 HwAddr string
Naveen Sampath04696f72022-06-13 15:19:14 +053063 tasks.Tasks
Tinoj Joseph429b9d92022-11-16 18:51:05 +053064 CurrSpeed uint32
65 MaxSpeed uint32
vinokuma926cb3e2023-03-29 11:41:06 +053066 ID uint32
Naveen Sampath04696f72022-06-13 15:19:14 +053067}
68
69// NewDevicePort is the constructor for DevicePort
Tinoj Joseph429b9d92022-11-16 18:51:05 +053070func NewDevicePort(mp *ofp.OfpPort) *DevicePort {
Naveen Sampath04696f72022-06-13 15:19:14 +053071 var port DevicePort
72
Tinoj Joseph429b9d92022-11-16 18:51:05 +053073 port.ID = mp.PortNo
74 port.Name = mp.Name
75
76 //port.HwAddr = strings.Trim(strings.Join(strings.Fields(fmt.Sprint("%02x", mp.HwAddr)), ":"), "[]")
77 port.HwAddr = strings.Trim(strings.ReplaceAll(fmt.Sprintf("%02x", mp.HwAddr), " ", ":"), "[]")
78 port.CurrSpeed = mp.CurrSpeed
79 port.MaxSpeed = mp.MaxSpeed
Naveen Sampath04696f72022-06-13 15:19:14 +053080 port.State = PortStateDown
81 return &port
82}
83
84// UniIDFlowQueue structure which maintains flows in queue.
85type UniIDFlowQueue struct {
86 tasks.Tasks
87 ID uint32
88}
89
90// NewUniIDFlowQueue is the constructor for UniIDFlowQueue.
91func NewUniIDFlowQueue(id uint32) *UniIDFlowQueue {
92 var flowQueue UniIDFlowQueue
93 flowQueue.ID = id
94 return &flowQueue
95}
96
97// DeviceState type
98type DeviceState string
99
100const (
101
102 // DeviceStateUNKNOWN constant
103 DeviceStateUNKNOWN DeviceState = "UNKNOWN"
104 // DeviceStateINIT constant
105 DeviceStateINIT DeviceState = "INIT"
106 // DeviceStateUP constant
107 DeviceStateUP DeviceState = "UP"
108 // DeviceStateDOWN constant
109 DeviceStateDOWN DeviceState = "DOWN"
110 // DeviceStateREBOOTED constant
111 DeviceStateREBOOTED DeviceState = "REBOOTED"
112 // DeviceStateDISABLED constant
113 DeviceStateDISABLED DeviceState = "DISABLED"
114 // DeviceStateDELETED constant
115 DeviceStateDELETED DeviceState = "DELETED"
116)
117
118// Device structure
119type Device struct {
vinokuma926cb3e2023-03-29 11:41:06 +0530120 ctx context.Context
121 cancel context.CancelFunc
122 vclientHolder *holder.VolthaServiceClientHolder
123 packetOutChannel chan *ofp.PacketOut
124 PortsByName map[string]*DevicePort
125 flows map[uint64]*of.VoltSubFlow
126 PortsByID map[uint32]*DevicePort
127 meters map[uint32]*of.Meter
128 flowQueue map[uint32]*UniIDFlowQueue // key is hash ID generated and value is UniIDFlowQueue.
129 SouthBoundID string
130 MfrDesc string
131 HwDesc string
132 SwDesc string
133 ID string
134 SerialNum string
135 State DeviceState
136 TimeStamp time.Time
137 groups sync.Map //map[uint32]*of.Group -> [GroupId : Group]
Naveen Sampath04696f72022-06-13 15:19:14 +0530138 tasks.Tasks
Naveen Sampath04696f72022-06-13 15:19:14 +0530139 portLock sync.RWMutex
Naveen Sampath04696f72022-06-13 15:19:14 +0530140 flowLock sync.RWMutex
Naveen Sampath04696f72022-06-13 15:19:14 +0530141 meterLock sync.RWMutex
Naveen Sampath04696f72022-06-13 15:19:14 +0530142 flowQueueLock sync.RWMutex
143 flowHash uint32
vinokuma926cb3e2023-03-29 11:41:06 +0530144 auditInProgress bool
Naveen Sampath04696f72022-06-13 15:19:14 +0530145 deviceAuditInProgress bool
Naveen Sampath04696f72022-06-13 15:19:14 +0530146}
147
148// NewDevice is the constructor for Device
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530149func NewDevice(cntx context.Context, id string, slno string, vclientHldr *holder.VolthaServiceClientHolder, southBoundID, mfr, hwDesc, swDesc string) *Device {
Naveen Sampath04696f72022-06-13 15:19:14 +0530150 var device Device
151 device.ID = id
152 device.SerialNum = slno
153 device.State = DeviceStateDOWN
154 device.PortsByID = make(map[uint32]*DevicePort)
155 device.PortsByName = make(map[string]*DevicePort)
156 device.vclientHolder = vclientHldr
157 device.flows = make(map[uint64]*of.VoltSubFlow)
158 device.meters = make(map[uint32]*of.Meter)
159 device.flowQueue = make(map[uint32]*UniIDFlowQueue)
vinokuma926cb3e2023-03-29 11:41:06 +0530160 // Get the flowhash from db and update the flowhash variable in the device.
Naveen Sampath04696f72022-06-13 15:19:14 +0530161 device.SouthBoundID = southBoundID
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530162 device.MfrDesc = mfr
163 device.HwDesc = hwDesc
164 device.SwDesc = swDesc
165 device.TimeStamp = time.Now()
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530166 flowHash, err := db.GetFlowHash(cntx, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530167 if err != nil {
168 device.flowHash = DefaultMaxFlowQueues
169 } else {
170 var hash uint32
171 err = json.Unmarshal([]byte(flowHash), &hash)
172 if err != nil {
173 logger.Error(ctx, "Failed to unmarshall flowhash")
174 } else {
175 device.flowHash = hash
176 }
177 }
178 logger.Infow(ctx, "Flow hash for device", log.Fields{"Deviceid": id, "hash": device.flowHash})
179 return &device
180}
181
182// ResetCache to reset cache
183func (d *Device) ResetCache() {
184 logger.Warnw(ctx, "Resetting flows, meters and groups cache", log.Fields{"Device": d.ID})
185 d.flows = make(map[uint64]*of.VoltSubFlow)
186 d.meters = make(map[uint32]*of.Meter)
187 d.groups = sync.Map{}
188}
189
190// GetFlow - Get the flow from device obj
191func (d *Device) GetFlow(cookie uint64) (*of.VoltSubFlow, bool) {
192 d.flowLock.RLock()
193 defer d.flowLock.RUnlock()
194 logger.Infow(ctx, "Get Flow", log.Fields{"Cookie": cookie})
195 flow, ok := d.flows[cookie]
196 return flow, ok
197}
198
Tinoj Josephec742f62022-09-29 19:11:10 +0530199// GetAllFlows - Get the flow from device obj
vinokuma926cb3e2023-03-29 11:41:06 +0530200func (d *Device) GetAllFlows() []*of.VoltSubFlow {
Tinoj Josephec742f62022-09-29 19:11:10 +0530201 d.flowLock.RLock()
202 defer d.flowLock.RUnlock()
203 var flows []*of.VoltSubFlow
204 logger.Infow(ctx, "Get All Flows", log.Fields{"deviceID": d.ID})
205 for _, f := range d.flows {
206 flows = append(flows, f)
207 }
208 return flows
209}
210
211// GetAllPendingFlows - Get the flow from device obj
vinokuma926cb3e2023-03-29 11:41:06 +0530212func (d *Device) GetAllPendingFlows() []*of.VoltSubFlow {
Tinoj Josephec742f62022-09-29 19:11:10 +0530213 d.flowLock.RLock()
214 defer d.flowLock.RUnlock()
215 var flows []*of.VoltSubFlow
216 logger.Infow(ctx, "Get All Pending Flows", log.Fields{"deviceID": d.ID})
217 for _, f := range d.flows {
218 if f.State == of.FlowAddPending {
219 flows = append(flows, f)
220 }
221 }
222 return flows
223}
224
Naveen Sampath04696f72022-06-13 15:19:14 +0530225// AddFlow - Adds the flow to the device and also to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530226func (d *Device) AddFlow(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530227 d.flowLock.Lock()
228 defer d.flowLock.Unlock()
229 logger.Infow(ctx, "AddFlow to device", log.Fields{"Cookie": flow.Cookie})
230 if _, ok := d.flows[flow.Cookie]; ok {
231 return errors.New(ErrDuplicateFlow)
232 }
233 d.flows[flow.Cookie] = flow
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530234 d.AddFlowToDb(cntx, flow)
Naveen Sampath04696f72022-06-13 15:19:14 +0530235 return nil
236}
237
238// AddFlowToDb is the utility to add the flow to the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530239func (d *Device) AddFlowToDb(cntx context.Context, flow *of.VoltSubFlow) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530240 if b, err := json.Marshal(flow); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530241 if err = db.PutFlow(cntx, d.ID, flow.Cookie, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530242 logger.Errorw(ctx, "Write Flow to DB failed", log.Fields{"device": d.ID, "cookie": flow.Cookie, "Reason": err})
243 }
244 }
245}
246
247// DelFlow - Deletes the flow from the device and the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530248func (d *Device) DelFlow(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530249 d.flowLock.Lock()
250 defer d.flowLock.Unlock()
251 if _, ok := d.flows[flow.Cookie]; ok {
252 delete(d.flows, flow.Cookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530253 d.DelFlowFromDb(cntx, flow.Cookie)
Naveen Sampath04696f72022-06-13 15:19:14 +0530254 return nil
255 }
256 return errors.New("Flow does not Exist")
257}
258
259// DelFlowFromDb is utility to delete the flow from the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530260func (d *Device) DelFlowFromDb(cntx context.Context, flowID uint64) {
261 _ = db.DelFlow(cntx, d.ID, flowID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530262}
263
264// IsFlowPresentWithOldCookie is to check whether there is any flow with old cookie.
265func (d *Device) IsFlowPresentWithOldCookie(flow *of.VoltSubFlow) bool {
266 d.flowLock.RLock()
267 defer d.flowLock.RUnlock()
268 if _, ok := d.flows[flow.Cookie]; ok {
269 return false
270 } else if flow.OldCookie != 0 && flow.Cookie != flow.OldCookie {
271 if _, ok := d.flows[flow.OldCookie]; ok {
272 logger.Infow(ctx, "Flow present with old cookie", log.Fields{"OldCookie": flow.OldCookie})
273 return true
274 }
275 }
276 return false
277}
278
279// DelFlowWithOldCookie is to delete flow with old cookie.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530280func (d *Device) DelFlowWithOldCookie(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530281 d.flowLock.Lock()
282 defer d.flowLock.Unlock()
283 if _, ok := d.flows[flow.OldCookie]; ok {
284 logger.Infow(ctx, "Flow was added before vgc upgrade. Trying to delete with old cookie",
285 log.Fields{"OldCookie": flow.OldCookie})
286 delete(d.flows, flow.OldCookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530287 d.DelFlowFromDb(cntx, flow.OldCookie)
Naveen Sampath04696f72022-06-13 15:19:14 +0530288 return nil
289 }
290 return errors.New("Flow does not Exist")
291}
292
293// RestoreFlowsFromDb to restore flows from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530294func (d *Device) RestoreFlowsFromDb(cntx context.Context) {
295 flows, _ := db.GetFlows(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530296 for _, flow := range flows {
297 b, ok := flow.Value.([]byte)
298 if !ok {
299 logger.Warn(ctx, "The value type is not []byte")
300 continue
301 }
302 d.CreateFlowFromString(b)
303 }
304}
305
306// CreateFlowFromString to create flow from string
307func (d *Device) CreateFlowFromString(b []byte) {
308 var flow of.VoltSubFlow
309 if err := json.Unmarshal(b, &flow); err == nil {
310 if _, ok := d.flows[flow.Cookie]; !ok {
311 logger.Debugw(ctx, "Adding Flow From Db", log.Fields{"Cookie": flow.Cookie})
312 d.flows[flow.Cookie] = &flow
313 } else {
314 logger.Warnw(ctx, "Duplicate Flow", log.Fields{"Cookie": flow.Cookie})
315 }
316 } else {
317 logger.Warn(ctx, "Unmarshal failed")
318 }
319}
320
321// ----------------------------------------------------------
322// Database related functionality
323// Group operations at the device which include update and delete
324
325// UpdateGroupEntry - Adds/Updates the group to the device and also to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530326func (d *Device) UpdateGroupEntry(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530327 logger.Infow(ctx, "Update Group to device", log.Fields{"ID": group.GroupID})
328 d.groups.Store(group.GroupID, group)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530329 d.AddGroupToDb(cntx, group)
Naveen Sampath04696f72022-06-13 15:19:14 +0530330}
331
332// AddGroupToDb - Utility to add the group to the device DB
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530333func (d *Device) AddGroupToDb(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530334 if b, err := json.Marshal(group); err == nil {
335 logger.Infow(ctx, "Adding Group to DB", log.Fields{"grp": group, "Json": string(b)})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530336 if err = db.PutGroup(cntx, d.ID, group.GroupID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530337 logger.Errorw(ctx, "Write Group to DB failed", log.Fields{"device": d.ID, "groupID": group.GroupID, "Reason": err})
338 }
339 }
340}
341
342// DelGroupEntry - Deletes the group from the device and the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530343func (d *Device) DelGroupEntry(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530344 if _, ok := d.groups.Load(group.GroupID); ok {
345 d.groups.Delete(group.GroupID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530346 d.DelGroupFromDb(cntx, group.GroupID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530347 }
348}
349
350// DelGroupFromDb - Utility to delete the Group from the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530351func (d *Device) DelGroupFromDb(cntx context.Context, groupID uint32) {
352 _ = db.DelGroup(cntx, d.ID, groupID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530353}
354
vinokuma926cb3e2023-03-29 11:41:06 +0530355// RestoreGroupsFromDb - restores all groups from DB
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530356func (d *Device) RestoreGroupsFromDb(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530357 logger.Info(ctx, "Restoring Groups")
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530358 groups, _ := db.GetGroups(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530359 for _, group := range groups {
360 b, ok := group.Value.([]byte)
361 if !ok {
362 logger.Warn(ctx, "The value type is not []byte")
363 continue
364 }
365 d.CreateGroupFromString(b)
366 }
367}
368
vinokuma926cb3e2023-03-29 11:41:06 +0530369// CreateGroupFromString - Forms group struct from json string
Naveen Sampath04696f72022-06-13 15:19:14 +0530370func (d *Device) CreateGroupFromString(b []byte) {
371 var group of.Group
372 if err := json.Unmarshal(b, &group); err == nil {
373 if _, ok := d.groups.Load(group.GroupID); !ok {
374 logger.Debugw(ctx, "Adding Group From Db", log.Fields{"GroupId": group.GroupID})
375 d.groups.Store(group.GroupID, &group)
376 } else {
377 logger.Warnw(ctx, "Duplicate Group", log.Fields{"GroupId": group.GroupID})
378 }
379 } else {
380 logger.Warn(ctx, "Unmarshal failed")
381 }
382}
383
384// AddMeter to add meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530385func (d *Device) AddMeter(cntx context.Context, meter *of.Meter) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530386 d.meterLock.Lock()
387 defer d.meterLock.Unlock()
388 if _, ok := d.meters[meter.ID]; ok {
389 return errors.New("Duplicate Meter")
390 }
391 d.meters[meter.ID] = meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530392 go d.AddMeterToDb(cntx, meter)
Naveen Sampath04696f72022-06-13 15:19:14 +0530393 return nil
394}
395
Sridhar Ravindra2d2ef4e2023-02-08 16:43:38 +0530396// UpdateMeter to update meter
397func (d *Device) UpdateMeter(cntx context.Context, meter *of.Meter) error {
vinokuma926cb3e2023-03-29 11:41:06 +0530398 d.meterLock.Lock()
399 defer d.meterLock.Unlock()
400 if _, ok := d.meters[meter.ID]; ok {
401 d.meters[meter.ID] = meter
402 d.AddMeterToDb(cntx, meter)
403 } else {
404 return errors.New("Meter not found for updation")
405 }
406 return nil
Sridhar Ravindra2d2ef4e2023-02-08 16:43:38 +0530407}
408
Naveen Sampath04696f72022-06-13 15:19:14 +0530409// GetMeter to get meter
410func (d *Device) GetMeter(id uint32) (*of.Meter, error) {
411 d.meterLock.RLock()
412 defer d.meterLock.RUnlock()
413 if m, ok := d.meters[id]; ok {
414 return m, nil
415 }
416 return nil, errors.New("Meter Not Found")
417}
418
419// DelMeter to delete meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530420func (d *Device) DelMeter(cntx context.Context, meter *of.Meter) bool {
Naveen Sampath04696f72022-06-13 15:19:14 +0530421 d.meterLock.Lock()
422 defer d.meterLock.Unlock()
423 if _, ok := d.meters[meter.ID]; ok {
424 delete(d.meters, meter.ID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530425 go d.DelMeterFromDb(cntx, meter.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530426 return true
427 }
428 return false
429}
430
431// AddMeterToDb is utility to add the Group to the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530432func (d *Device) AddMeterToDb(cntx context.Context, meter *of.Meter) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530433 if b, err := json.Marshal(meter); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530434 if err = db.PutDeviceMeter(cntx, d.ID, meter.ID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530435 logger.Errorw(ctx, "Write Meter to DB failed", log.Fields{"device": d.ID, "meterID": meter.ID, "Reason": err})
436 }
437 }
438}
439
440// DelMeterFromDb to delete meter from db
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530441func (d *Device) DelMeterFromDb(cntx context.Context, id uint32) {
442 _ = db.DelDeviceMeter(cntx, d.ID, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530443}
444
445// RestoreMetersFromDb to restore meters from db
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530446func (d *Device) RestoreMetersFromDb(cntx context.Context) {
447 meters, _ := db.GetDeviceMeters(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530448 for _, meter := range meters {
449 b, ok := meter.Value.([]byte)
450 if !ok {
451 logger.Warn(ctx, "The value type is not []byte")
452 continue
453 }
454 d.CreateMeterFromString(b)
455 }
456}
457
458// CreateMeterFromString to create meter from string
459func (d *Device) CreateMeterFromString(b []byte) {
460 var meter of.Meter
461 if err := json.Unmarshal(b, &meter); err == nil {
462 if _, ok := d.meters[meter.ID]; !ok {
463 logger.Debugw(ctx, "Adding Meter From Db", log.Fields{"ID": meter.ID})
464 d.meters[meter.ID] = &meter
465 } else {
466 logger.Warnw(ctx, "Duplicate Meter", log.Fields{"ID": meter.ID})
467 }
468 } else {
469 logger.Warn(ctx, "Unmarshal failed")
470 }
471}
472
473// VolthaClient to get voltha client
474func (d *Device) VolthaClient() voltha.VolthaServiceClient {
475 return d.vclientHolder.Get()
476}
477
478// AddPort to add the port as requested by the device/VOLTHA
479// Inform the application if the port is successfully added
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530480func (d *Device) AddPort(cntx context.Context, mp *ofp.OfpPort) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530481 d.portLock.Lock()
482 defer d.portLock.Unlock()
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530483 id := mp.PortNo
484 name := mp.Name
Naveen Sampath04696f72022-06-13 15:19:14 +0530485 if _, ok := d.PortsByID[id]; ok {
486 return errors.New("Duplicate port")
487 }
488 if _, ok := d.PortsByName[name]; ok {
489 return errors.New("Duplicate port")
490 }
491
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530492 p := NewDevicePort(mp)
Naveen Sampath04696f72022-06-13 15:19:14 +0530493 d.PortsByID[id] = p
494 d.PortsByName[name] = p
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530495 d.WritePortToDb(cntx, p)
496 GetController().PortAddInd(cntx, d.ID, p.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530497 logger.Infow(ctx, "Added Port", log.Fields{"Device": d.ID, "Port": id})
498 return nil
499}
500
501// DelPort to delete the port as requested by the device/VOLTHA
502// Inform the application if the port is successfully deleted
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530503func (d *Device) DelPort(cntx context.Context, id uint32) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530504 p := d.GetPortByID(id)
505 if p == nil {
506 return errors.New("Unknown Port")
507 }
508 if p.State == PortStateUp {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530509 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530510 }
Tinoj Joseph4ead4e02023-01-30 03:12:44 +0530511 GetController().PortDelInd(cntx, d.ID, p.Name)
512
Naveen Sampath04696f72022-06-13 15:19:14 +0530513 d.portLock.Lock()
514 defer d.portLock.Unlock()
515
Naveen Sampath04696f72022-06-13 15:19:14 +0530516 delete(d.PortsByID, p.ID)
517 delete(d.PortsByName, p.Name)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530518 d.DelPortFromDb(cntx, p.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530519 logger.Infow(ctx, "Deleted Port", log.Fields{"Device": d.ID, "Port": id})
520 return nil
521}
522
523// UpdatePortByName is utility to update the port by Name
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530524func (d *Device) UpdatePortByName(cntx context.Context, name string, port uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530525 d.portLock.Lock()
526 defer d.portLock.Unlock()
527
528 p, ok := d.PortsByName[name]
529 if !ok {
530 return
531 }
532 delete(d.PortsByID, p.ID)
533 p.ID = port
534 d.PortsByID[port] = p
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530535 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530536 GetController().PortUpdateInd(d.ID, p.Name, p.ID)
537 logger.Infow(ctx, "Updated Port", log.Fields{"Device": d.ID, "Port": p.ID, "PortName": name})
538}
539
540// GetPortName to get the name of the port by its id
541func (d *Device) GetPortName(id uint32) (string, error) {
542 d.portLock.RLock()
543 defer d.portLock.RUnlock()
544
545 if p, ok := d.PortsByID[id]; ok {
546 return p.Name, nil
547 }
548 logger.Errorw(ctx, "Port not found", log.Fields{"port": id})
549 return "", errors.New("Unknown Port ID")
550}
551
552// GetPortByID is utility to retrieve the port by ID
553func (d *Device) GetPortByID(id uint32) *DevicePort {
554 d.portLock.RLock()
555 defer d.portLock.RUnlock()
556
557 p, ok := d.PortsByID[id]
558 if ok {
559 return p
560 }
561 return nil
562}
563
564// GetPortByName is utility to retrieve the port by Name
565func (d *Device) GetPortByName(name string) *DevicePort {
566 d.portLock.RLock()
567 defer d.portLock.RUnlock()
568
569 p, ok := d.PortsByName[name]
570 if ok {
571 return p
572 }
573 return nil
574}
575
576// GetPortState to get the state of the port by name
577func (d *Device) GetPortState(name string) (PortState, error) {
578 d.portLock.RLock()
579 defer d.portLock.RUnlock()
580
581 if p, ok := d.PortsByName[name]; ok {
582 return p.State, nil
583 }
584 return PortStateDown, errors.New("Unknown Port ID")
585}
586
587// GetPortID to get the port-id by the port name
588func (d *Device) GetPortID(name string) (uint32, error) {
589 d.portLock.RLock()
590 defer d.portLock.RUnlock()
591
592 if p, ok := d.PortsByName[name]; ok {
593 return p.ID, nil
594 }
595 return 0, errors.New("Unknown Port ID")
Naveen Sampath04696f72022-06-13 15:19:14 +0530596}
597
598// WritePortToDb to add the port to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530599func (d *Device) WritePortToDb(ctx context.Context, port *DevicePort) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530600 port.Version = database.PresentVersionMap[database.DevicePortPath]
601 if b, err := json.Marshal(port); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530602 if err = db.PutPort(ctx, d.ID, port.ID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530603 logger.Errorw(ctx, "Write port to DB failed", log.Fields{"device": d.ID, "port": port.ID, "Reason": err})
604 }
605 }
606}
607
608// DelPortFromDb to delete port from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530609func (d *Device) DelPortFromDb(cntx context.Context, id uint32) {
610 _ = db.DelPort(cntx, d.ID, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530611}
612
613// RestorePortsFromDb to restore ports from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530614func (d *Device) RestorePortsFromDb(cntx context.Context) {
615 ports, _ := db.GetPorts(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530616 for _, port := range ports {
617 b, ok := port.Value.([]byte)
618 if !ok {
619 logger.Warn(ctx, "The value type is not []byte")
620 continue
621 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530622 d.CreatePortFromString(cntx, b)
Naveen Sampath04696f72022-06-13 15:19:14 +0530623 }
624}
625
626// CreatePortFromString to create port from string
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530627func (d *Device) CreatePortFromString(cntx context.Context, b []byte) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530628 var port DevicePort
629 if err := json.Unmarshal(b, &port); err == nil {
630 if _, ok := d.PortsByID[port.ID]; !ok {
631 logger.Debugw(ctx, "Adding Port From Db", log.Fields{"ID": port.ID})
632 d.PortsByID[port.ID] = &port
633 d.PortsByName[port.Name] = &port
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530634 GetController().PortAddInd(cntx, d.ID, port.ID, port.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530635 } else {
636 logger.Warnw(ctx, "Duplicate Port", log.Fields{"ID": port.ID})
637 }
638 } else {
639 logger.Warn(ctx, "Unmarshal failed")
640 }
641}
642
643// Delete : OLT Delete functionality yet to be implemented. IDeally all of the
644// resources should have been removed by this time. It is an error
645// scenario if the OLT has resources associated with it.
646func (d *Device) Delete() {
647 d.StopAll()
648}
649
650// Stop to stop the task
651func (d *Device) Stop() {
652}
653
654// ConnectInd is called when the connection between VGC and the VOLTHA is
655// restored. This will perform audit of the device post reconnection
656func (d *Device) ConnectInd(ctx context.Context, discType intf.DiscoveryType) {
657 logger.Warnw(ctx, "Audit Upon Connection Establishment", log.Fields{"Device": d.ID, "State": d.State})
658 ctx1, cancel := context.WithCancel(ctx)
659 d.cancel = cancel
660 d.ctx = ctx1
661 d.Tasks.Initialize(ctx1)
662
663 logger.Warnw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
664 d.State = DeviceStateUP
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530665 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530666 GetController().DeviceUpInd(d.ID)
667
668 logger.Warnw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
669 t := NewAuditDevice(d, AuditEventDeviceDisc)
670 d.Tasks.AddTask(t)
671
672 t1 := NewAuditTablesTask(d)
673 d.Tasks.AddTask(t1)
674
675 t2 := NewPendingProfilesTask(d)
676 d.Tasks.AddTask(t2)
677
678 go d.synchronizeDeviceTables()
679}
680
681func (d *Device) synchronizeDeviceTables() {
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530682 tick := time.NewTicker(GetController().GetDeviceTableSyncDuration())
Naveen Sampath04696f72022-06-13 15:19:14 +0530683loop:
684 for {
685 select {
686 case <-d.ctx.Done():
vinokuma926cb3e2023-03-29 11:41:06 +0530687 logger.Warnw(d.ctx, "Context Done. Canceling Periodic Audit", log.Fields{"Context": ctx, "Device": d.ID, "DeviceSerialNum": d.SerialNum})
Naveen Sampath04696f72022-06-13 15:19:14 +0530688 break loop
689 case <-tick.C:
690 t1 := NewAuditTablesTask(d)
691 d.Tasks.AddTask(t1)
692 }
693 }
694 tick.Stop()
695}
696
697// DeviceUpInd is called when the logical device state changes to UP. This will perform audit of the device post reconnection
698func (d *Device) DeviceUpInd() {
699 logger.Warnw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
700 d.State = DeviceStateUP
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530701 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530702 GetController().DeviceUpInd(d.ID)
703
704 logger.Warnw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
705 t := NewAuditDevice(d, AuditEventDeviceDisc)
706 d.Tasks.AddTask(t)
707
708 t1 := NewAuditTablesTask(d)
709 d.Tasks.AddTask(t1)
710
711 t2 := NewPendingProfilesTask(d)
712 d.Tasks.AddTask(t2)
713}
714
715// DeviceDownInd is called when the logical device state changes to Down.
716func (d *Device) DeviceDownInd() {
717 logger.Warnw(ctx, "Device State change Ind: Down", log.Fields{"Device": d.ID})
718 d.State = DeviceStateDOWN
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530719 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530720 GetController().DeviceDownInd(d.ID)
721}
722
723// DeviceRebootInd is called when the logical device is rebooted.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530724func (d *Device) DeviceRebootInd(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530725 logger.Warnw(ctx, "Device State change Ind: Rebooted", log.Fields{"Device": d.ID})
726
727 if d.State == DeviceStateREBOOTED {
728 d.State = DeviceStateREBOOTED
729 logger.Warnw(ctx, "Ignoring Device State change Ind: REBOOT, Device Already in REBOOT state", log.Fields{"Device": d.ID, "SeralNo": d.SerialNum})
730 return
731 }
732
733 d.State = DeviceStateREBOOTED
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530734 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530735 GetController().SetRebootInProgressForDevice(d.ID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530736 GetController().DeviceRebootInd(cntx, d.ID, d.SerialNum, d.SouthBoundID)
737 d.ReSetAllPortStates(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +0530738}
739
740// DeviceDisabledInd is called when the logical device is disabled
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530741func (d *Device) DeviceDisabledInd(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530742 logger.Warnw(ctx, "Device State change Ind: Disabled", log.Fields{"Device": d.ID})
743 d.State = DeviceStateDISABLED
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530744 d.TimeStamp = time.Now()
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530745 GetController().DeviceDisableInd(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530746}
747
vinokuma926cb3e2023-03-29 11:41:06 +0530748// ReSetAllPortStates - Set all logical device port status to DOWN
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530749func (d *Device) ReSetAllPortStates(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530750 logger.Warnw(ctx, "Resetting all Ports State to DOWN", log.Fields{"Device": d.ID, "State": d.State})
751
752 d.portLock.Lock()
753 defer d.portLock.Unlock()
754
755 for _, port := range d.PortsByID {
756 if port.State != PortStateDown {
757 logger.Infow(ctx, "Resetting Port State to DOWN", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530758 GetController().PortDownInd(cntx, d.ID, port.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530759 port.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530760 d.WritePortToDb(cntx, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530761 }
762 }
763}
764
vinokuma926cb3e2023-03-29 11:41:06 +0530765// ReSetAllPortStatesInDb - Set all logical device port status to DOWN in DB and skip indication to application
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530766func (d *Device) ReSetAllPortStatesInDb(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530767 logger.Warnw(ctx, "Resetting all Ports State to DOWN In DB", log.Fields{"Device": d.ID, "State": d.State})
768
769 d.portLock.Lock()
770 defer d.portLock.Unlock()
771
772 for _, port := range d.PortsByID {
773 if port.State != PortStateDown {
774 logger.Infow(ctx, "Resetting Port State to DOWN and Write to DB", log.Fields{"Device": d.ID, "Port": port})
775 port.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530776 d.WritePortToDb(cntx, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530777 }
778 }
779}
780
781// ProcessPortUpdate deals with the change in port id (ONU movement) and taking action
782// to update only when the port state is DOWN
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530783func (d *Device) ProcessPortUpdate(cntx context.Context, portName string, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530784 if p := d.GetPortByName(portName); p != nil {
785 if p.ID != port {
786 logger.Infow(ctx, "Port ID update indication", log.Fields{"Port": p.Name, "Old PortID": p.ID, "New Port ID": port})
787 if p.State != PortStateDown {
788 logger.Errorw(ctx, "Port ID update failed. Port State UP", log.Fields{"Port": p})
789 return
790 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530791 d.UpdatePortByName(cntx, portName, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530792 logger.Errorw(ctx, "Port ID Updated", log.Fields{"Port": p})
793 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530794 d.ProcessPortState(cntx, port, state)
Naveen Sampath04696f72022-06-13 15:19:14 +0530795 }
796}
797
798// ***Operations Performed on Port state Transitions***
799//
800// |-----------------------------------------------------------------------------|
801// | State | Action |
802// |--------------------|--------------------------------------------------------|
803// | UP | UNI - Trigger Flow addition for service configured |
804// | | NNI - Trigger Flow addition for vnets & mvlan profiles |
805// | | |
806// | DOWN | UNI - Trigger Flow deletion for service configured |
807// | | NNI - Trigger Flow deletion for vnets & mvlan profiles |
808// | | |
809// |-----------------------------------------------------------------------------|
810//
811
812// ProcessPortState deals with the change in port status and taking action
813// based on the new state and the old state
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530814func (d *Device) ProcessPortState(cntx context.Context, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530815 if d.State != DeviceStateUP && !util.IsNniPort(port) {
816 logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
817 return
818 }
819 if p := d.GetPortByID(port); p != nil {
820 logger.Infow(ctx, "Port State Processing", log.Fields{"Received": state, "Current": p.State})
821
822 // Avoid blind initialization as the current tasks in the queue will be lost
823 // Eg: Service Del followed by Port Down - The flows will be dangling
824 // Eg: NNI Down followed by NNI UP - Mcast data flows will be dangling
825 p.Tasks.CheckAndInitialize(d.ctx)
826 if state == uint32(ofp.OfpPortState_OFPPS_LIVE) && p.State == PortStateDown {
827 // Transition from DOWN to UP
828 logger.Infow(ctx, "Port State Change to UP", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530829 GetController().PortUpInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530830 p.State = PortStateUp
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530831 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530832 } else if (state != uint32(ofp.OfpPortState_OFPPS_LIVE)) && (p.State != PortStateDown) {
833 // Transition from UP to Down
834 logger.Infow(ctx, "Port State Change to Down", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530835 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530836 p.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530837 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530838 } else {
839 logger.Warnw(ctx, "Dropping Port Ind: No Change in Port State", log.Fields{"PortName": p.Name, "ID": port, "Device": d.ID, "PortState": p.State, "IncomingState": state})
840 }
841 }
842}
843
844// ProcessPortStateAfterReboot - triggers the port state indication to sort out configu mismatch due to reboot
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530845func (d *Device) ProcessPortStateAfterReboot(cntx context.Context, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530846 if d.State != DeviceStateUP && !util.IsNniPort(port) {
847 logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
848 return
849 }
850 if p := d.GetPortByID(port); p != nil {
851 logger.Infow(ctx, "Port State Processing after Reboot", log.Fields{"Received": state, "Current": p.State})
852 p.Tasks.Initialize(d.ctx)
853 if p.State == PortStateUp {
854 logger.Infow(ctx, "Port State: UP", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530855 GetController().PortUpInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530856 } else if p.State == PortStateDown {
857 logger.Infow(ctx, "Port State: Down", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530858 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530859 }
860 }
861}
862
863// ChangeEvent : Change event brings in ports related changes such as addition/deletion
864// or modification where the port status change up/down is indicated to the
865// controller
866func (d *Device) ChangeEvent(event *ofp.ChangeEvent) error {
867 cet := NewChangeEventTask(d.ctx, event, d)
868 d.AddTask(cet)
869 return nil
870}
871
872// PacketIn handle the incoming packet-in and deliver to the application for the
873// actual processing
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530874func (d *Device) PacketIn(cntx context.Context, pkt *ofp.PacketIn) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530875 logger.Debugw(ctx, "Received a Packet-In", log.Fields{"Device": d.ID})
876 if pkt.PacketIn.Reason != ofp.OfpPacketInReason_OFPR_ACTION {
877 logger.Warnw(ctx, "Unsupported PacketIn Reason", log.Fields{"Reason": pkt.PacketIn.Reason})
878 return
879 }
880 data := pkt.PacketIn.Data
881 port := PacketInGetPort(pkt.PacketIn)
882 if pName, err := d.GetPortName(port); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530883 GetController().PacketInInd(cntx, d.ID, pName, data)
Naveen Sampath04696f72022-06-13 15:19:14 +0530884 } else {
885 logger.Warnw(ctx, "Unknown Port", log.Fields{"Reason": err.Error()})
886 }
887}
888
889// PacketInGetPort to get the port on which the packet-in is reported
890func PacketInGetPort(pkt *ofp.OfpPacketIn) uint32 {
891 for _, field := range pkt.Match.OxmFields {
892 if field.OxmClass == ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
893 if ofbField, ok := field.Field.(*ofp.OfpOxmField_OfbField); ok {
894 if ofbField.OfbField.Type == ofp.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT {
895 if port, ok := ofbField.OfbField.Value.(*ofp.OfpOxmOfbField_Port); ok {
896 return port.Port
897 }
898 }
899 }
900 }
901 }
902 return 0
903}
904
905// PacketOutReq receives the packet out request from the application via the
906// controller. The interface from the application uses name as the identity.
907func (d *Device) PacketOutReq(outport string, inport string, data []byte, isCustomPkt bool) error {
908 inp, err := d.GetPortID(inport)
909 if err != nil {
910 return errors.New("Unknown inport")
911 }
912 outp, err1 := d.GetPortID(outport)
913 if err1 != nil {
914 return errors.New("Unknown outport")
915 }
916 logger.Debugw(ctx, "Sending packet out", log.Fields{"Device": d.ID, "Inport": inport, "Outport": outport})
917 return d.SendPacketOut(outp, inp, data, isCustomPkt)
918}
919
920// SendPacketOut is responsible for building the OF structure and send the
921// packet-out to the VOLTHA
922func (d *Device) SendPacketOut(outport uint32, inport uint32, data []byte, isCustomPkt bool) error {
923 pout := &ofp.PacketOut{}
924 pout.Id = d.ID
925 opout := &ofp.OfpPacketOut{}
926 pout.PacketOut = opout
927 opout.InPort = inport
928 opout.Data = data
929 opout.Actions = []*ofp.OfpAction{
930 {
931 Type: ofp.OfpActionType_OFPAT_OUTPUT,
932 Action: &ofp.OfpAction_Output{
933 Output: &ofp.OfpActionOutput{
934 Port: outport,
935 MaxLen: 65535,
936 },
937 },
938 },
939 }
940 d.packetOutChannel <- pout
941 return nil
942}
943
944// UpdateFlows receives the flows in the form that is implemented
945// in the VGC and transforms them to the OF format. This is handled
946// as a port of the task that is enqueued to do the same.
947func (d *Device) UpdateFlows(flow *of.VoltFlow, devPort *DevicePort) {
948 t := NewAddFlowsTask(d.ctx, flow, d)
949 logger.Debugw(ctx, "Port Context", log.Fields{"Ctx": devPort.GetContext()})
950 // check if port isNni , if yes flows will be added to device port queues.
951 if util.IsNniPort(devPort.ID) {
952 // Adding the flows to device port queues.
953 devPort.AddTask(t)
954 return
955 }
956 // If the flowHash is enabled then add the flows to the flowhash generated queues.
957 flowQueue := d.getAndAddFlowQueueForUniID(uint32(devPort.ID))
958 if flowQueue != nil {
959 logger.Debugw(ctx, "flowHashQId", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID})
960 flowQueue.AddTask(t)
961 logger.Debugw(ctx, "Tasks Info", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID, "Total": flowQueue.TotalTasks(), "Pending": flowQueue.NumPendingTasks()})
962 } else {
963 //FlowThrotling disabled, add to the device port queue
964 devPort.AddTask(t)
965 return
966 }
967}
968
969// UpdateGroup to update group info
970func (d *Device) UpdateGroup(group *of.Group, devPort *DevicePort) {
971 task := NewModGroupTask(d.ctx, group, d)
972 logger.Debugw(ctx, "NNI Port Context", log.Fields{"Ctx": devPort.GetContext()})
973 devPort.AddTask(task)
974}
975
976// ModMeter for mod meter task
977func (d *Device) ModMeter(command of.MeterCommand, meter *of.Meter, devPort *DevicePort) {
978 if command == of.MeterCommandAdd {
979 if _, err := d.GetMeter(meter.ID); err == nil {
980 logger.Debugw(ctx, "Meter already added", log.Fields{"ID": meter.ID})
981 return
982 }
983 }
984 t := NewModMeterTask(d.ctx, command, meter, d)
985 devPort.AddTask(t)
986}
987
988func (d *Device) getAndAddFlowQueueForUniID(id uint32) *UniIDFlowQueue {
989 d.flowQueueLock.RLock()
vinokuma926cb3e2023-03-29 11:41:06 +0530990 // If flowhash is 0 that means flowhash throttling is disabled, return nil
Naveen Sampath04696f72022-06-13 15:19:14 +0530991 if d.flowHash == 0 {
992 d.flowQueueLock.RUnlock()
993 return nil
994 }
995 flowHashID := id % uint32(d.flowHash)
996 if value, found := d.flowQueue[uint32(flowHashID)]; found {
997 d.flowQueueLock.RUnlock()
998 return value
999 }
1000 d.flowQueueLock.RUnlock()
1001 logger.Debugw(ctx, "Flow queue not found creating one", log.Fields{"uniid": id, "hash": flowHashID})
1002
1003 return d.addFlowQueueForUniID(id)
1004}
1005
1006func (d *Device) addFlowQueueForUniID(id uint32) *UniIDFlowQueue {
Naveen Sampath04696f72022-06-13 15:19:14 +05301007 d.flowQueueLock.Lock()
1008 defer d.flowQueueLock.Unlock()
1009 flowHashID := id % uint32(d.flowHash)
1010 flowQueue := NewUniIDFlowQueue(uint32(flowHashID))
1011 flowQueue.Tasks.Initialize(d.ctx)
1012 d.flowQueue[flowHashID] = flowQueue
1013 return flowQueue
1014}
1015
1016// SetFlowHash sets the device flow hash and writes to the DB.
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301017func (d *Device) SetFlowHash(cntx context.Context, hash uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301018 d.flowQueueLock.Lock()
1019 defer d.flowQueueLock.Unlock()
1020
1021 d.flowHash = hash
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301022 d.writeFlowHashToDB(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +05301023}
1024
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301025func (d *Device) writeFlowHashToDB(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301026 hash, err := json.Marshal(d.flowHash)
1027 if err != nil {
1028 logger.Errorw(ctx, "failed to marshal flow hash", log.Fields{"hash": d.flowHash})
1029 return
1030 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301031 if err := db.PutFlowHash(cntx, d.ID, string(hash)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301032 logger.Errorw(ctx, "Failed to add flow hash to DB", log.Fields{"device": d.ID, "hash": d.flowHash})
1033 }
1034}
1035
vinokuma926cb3e2023-03-29 11:41:06 +05301036// isSBOperAllowed - determines if the SB operation is allowed based on device state & force flag
Naveen Sampath04696f72022-06-13 15:19:14 +05301037func (d *Device) isSBOperAllowed(forceAction bool) bool {
Naveen Sampath04696f72022-06-13 15:19:14 +05301038 if d.State == DeviceStateUP {
1039 return true
1040 }
1041
1042 if d.State == DeviceStateDISABLED && forceAction {
1043 return true
1044 }
1045
1046 return false
1047}
1048
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301049func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301050 flow, _ := d.GetFlow(cookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301051 d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err)
Naveen Sampath04696f72022-06-13 15:19:14 +05301052}
1053
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301054func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301055 statusCode, statusMsg := infraerror.GetErrorInfo(err)
1056 success := isFlowOperSuccess(statusCode, oper)
1057
1058 updateFlow := func(cookie uint64, state int, reason string) {
1059 if dbFlow, ok := d.GetFlow(cookie); ok {
1060 dbFlow.State = uint8(state)
1061 dbFlow.ErrorReason = reason
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301062 d.AddFlowToDb(cntx, dbFlow)
Naveen Sampath04696f72022-06-13 15:19:14 +05301063 }
1064 }
1065
vinokuma926cb3e2023-03-29 11:41:06 +05301066 // Update flow results
Naveen Sampath04696f72022-06-13 15:19:14 +05301067 // Add - Update Success or Failure status with reason
1068 // Del - Delete entry from DB on success else update error reason
1069 if oper == of.CommandAdd {
1070 state := of.FlowAddSuccess
1071 reason := ""
1072 if !success {
1073 state = of.FlowAddFailure
1074 reason = statusMsg
1075 }
1076 updateFlow(cookie, state, reason)
1077 logger.Debugw(ctx, "Updated Flow to DB", log.Fields{"Cookie": cookie, "State": state})
1078 } else {
1079 if success && flow != nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301080 if err := d.DelFlow(cntx, flow); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301081 logger.Warnw(ctx, "Delete Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
1082 }
1083 } else if !success {
1084 updateFlow(cookie, of.FlowDelFailure, statusMsg)
1085 }
1086 }
1087
1088 flowResult := intf.FlowStatus{
1089 Cookie: strconv.FormatUint(cookie, 10),
1090 Device: d.ID,
1091 FlowModType: oper,
1092 Flow: flow,
1093 Status: statusCode,
1094 Reason: statusMsg,
1095 AdditionalData: bwDetails,
1096 }
1097
1098 logger.Infow(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301099 GetController().ProcessFlowModResultIndication(cntx, flowResult)
Naveen Sampath04696f72022-06-13 15:19:14 +05301100}