blob: abf31088f073e5cb296be771c6be529527e1e451 [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
vinokuma926cb3e2023-03-29 11:41:06 +053014 */
Naveen Sampath04696f72022-06-13 15:19:14 +053015
16package controller
17
18import (
19 "context"
20 "encoding/json"
21 "errors"
Tinoj Joseph429b9d92022-11-16 18:51:05 +053022 "fmt"
Naveen Sampath04696f72022-06-13 15:19:14 +053023 "strconv"
Tinoj Joseph429b9d92022-11-16 18:51:05 +053024 "strings"
Naveen Sampath04696f72022-06-13 15:19:14 +053025 "sync"
26 "time"
vinokuma926cb3e2023-03-29 11:41:06 +053027 infraerror "voltha-go-controller/internal/pkg/errorcodes"
Naveen Sampath04696f72022-06-13 15:19:14 +053028
29 "voltha-go-controller/database"
30 "voltha-go-controller/internal/pkg/holder"
31 "voltha-go-controller/internal/pkg/intf"
32 "voltha-go-controller/internal/pkg/of"
vinokuma926cb3e2023-03-29 11:41:06 +053033
Naveen Sampath04696f72022-06-13 15:19:14 +053034 //"voltha-go-controller/internal/pkg/vpagent"
35 "voltha-go-controller/internal/pkg/tasks"
36 "voltha-go-controller/internal/pkg/util"
Tinoj Joseph1d108322022-07-13 10:07:39 +053037 "voltha-go-controller/log"
vinokuma926cb3e2023-03-29 11:41:06 +053038
Naveen Sampath04696f72022-06-13 15:19:14 +053039 ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
40 "github.com/opencord/voltha-protos/v5/go/voltha"
41)
42
43// PortState type
44type PortState string
45
46const (
47 // PortStateDown constant
48 PortStateDown PortState = "DOWN"
49 // PortStateUp constant
50 PortStateUp PortState = "UP"
51 // DefaultMaxFlowQueues constant
52 DefaultMaxFlowQueues = 67
53 //ErrDuplicateFlow - indicates flow already exists in DB
Akash Sonid36d23b2023-08-18 12:51:40 +053054 ErrDuplicateFlow string = "duplicate flow"
55 //Unknown_Port_ID - indicates that the port id is unknown
56 Unknown_Port_ID = "unknown port id"
57 //Duplicate_Port - indicates the port is already exist in controller
58 Duplicate_Port = "duplicate port"
Naveen Sampath04696f72022-06-13 15:19:14 +053059)
60
61// DevicePort structure
62type DevicePort struct {
vinokuma926cb3e2023-03-29 11:41:06 +053063 Name string
64 State PortState
65 Version string
66 HwAddr string
Naveen Sampath04696f72022-06-13 15:19:14 +053067 tasks.Tasks
Tinoj Joseph429b9d92022-11-16 18:51:05 +053068 CurrSpeed uint32
69 MaxSpeed uint32
vinokuma926cb3e2023-03-29 11:41:06 +053070 ID uint32
Naveen Sampath04696f72022-06-13 15:19:14 +053071}
72
73// NewDevicePort is the constructor for DevicePort
Tinoj Joseph429b9d92022-11-16 18:51:05 +053074func NewDevicePort(mp *ofp.OfpPort) *DevicePort {
Naveen Sampath04696f72022-06-13 15:19:14 +053075 var port DevicePort
76
Tinoj Joseph429b9d92022-11-16 18:51:05 +053077 port.ID = mp.PortNo
78 port.Name = mp.Name
79
80 //port.HwAddr = strings.Trim(strings.Join(strings.Fields(fmt.Sprint("%02x", mp.HwAddr)), ":"), "[]")
81 port.HwAddr = strings.Trim(strings.ReplaceAll(fmt.Sprintf("%02x", mp.HwAddr), " ", ":"), "[]")
82 port.CurrSpeed = mp.CurrSpeed
83 port.MaxSpeed = mp.MaxSpeed
Naveen Sampath04696f72022-06-13 15:19:14 +053084 port.State = PortStateDown
85 return &port
86}
87
88// UniIDFlowQueue structure which maintains flows in queue.
89type UniIDFlowQueue struct {
90 tasks.Tasks
91 ID uint32
92}
93
94// NewUniIDFlowQueue is the constructor for UniIDFlowQueue.
95func NewUniIDFlowQueue(id uint32) *UniIDFlowQueue {
96 var flowQueue UniIDFlowQueue
97 flowQueue.ID = id
98 return &flowQueue
99}
100
101// DeviceState type
102type DeviceState string
103
104const (
105
106 // DeviceStateUNKNOWN constant
107 DeviceStateUNKNOWN DeviceState = "UNKNOWN"
108 // DeviceStateINIT constant
109 DeviceStateINIT DeviceState = "INIT"
110 // DeviceStateUP constant
111 DeviceStateUP DeviceState = "UP"
112 // DeviceStateDOWN constant
113 DeviceStateDOWN DeviceState = "DOWN"
114 // DeviceStateREBOOTED constant
115 DeviceStateREBOOTED DeviceState = "REBOOTED"
116 // DeviceStateDISABLED constant
117 DeviceStateDISABLED DeviceState = "DISABLED"
118 // DeviceStateDELETED constant
119 DeviceStateDELETED DeviceState = "DELETED"
120)
121
Akash Soni6f369452023-09-19 11:18:28 +0530122type DeviceInterface interface {
123 SetFlowHash(cntx context.Context, hash uint32)
124}
125
Naveen Sampath04696f72022-06-13 15:19:14 +0530126// Device structure
127type Device struct {
vinokuma926cb3e2023-03-29 11:41:06 +0530128 ctx context.Context
129 cancel context.CancelFunc
130 vclientHolder *holder.VolthaServiceClientHolder
131 packetOutChannel chan *ofp.PacketOut
132 PortsByName map[string]*DevicePort
133 flows map[uint64]*of.VoltSubFlow
134 PortsByID map[uint32]*DevicePort
135 meters map[uint32]*of.Meter
136 flowQueue map[uint32]*UniIDFlowQueue // key is hash ID generated and value is UniIDFlowQueue.
137 SouthBoundID string
138 MfrDesc string
139 HwDesc string
140 SwDesc string
141 ID string
142 SerialNum string
143 State DeviceState
144 TimeStamp time.Time
145 groups sync.Map //map[uint32]*of.Group -> [GroupId : Group]
Naveen Sampath04696f72022-06-13 15:19:14 +0530146 tasks.Tasks
Naveen Sampath04696f72022-06-13 15:19:14 +0530147 portLock sync.RWMutex
Naveen Sampath04696f72022-06-13 15:19:14 +0530148 flowLock sync.RWMutex
Naveen Sampath04696f72022-06-13 15:19:14 +0530149 meterLock sync.RWMutex
Naveen Sampath04696f72022-06-13 15:19:14 +0530150 flowQueueLock sync.RWMutex
151 flowHash uint32
vinokuma926cb3e2023-03-29 11:41:06 +0530152 auditInProgress bool
Naveen Sampath04696f72022-06-13 15:19:14 +0530153 deviceAuditInProgress bool
Naveen Sampath04696f72022-06-13 15:19:14 +0530154}
155
156// NewDevice is the constructor for Device
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530157func NewDevice(cntx context.Context, id string, slno string, vclientHldr *holder.VolthaServiceClientHolder, southBoundID, mfr, hwDesc, swDesc string) *Device {
Naveen Sampath04696f72022-06-13 15:19:14 +0530158 var device Device
159 device.ID = id
160 device.SerialNum = slno
161 device.State = DeviceStateDOWN
162 device.PortsByID = make(map[uint32]*DevicePort)
163 device.PortsByName = make(map[string]*DevicePort)
164 device.vclientHolder = vclientHldr
165 device.flows = make(map[uint64]*of.VoltSubFlow)
166 device.meters = make(map[uint32]*of.Meter)
167 device.flowQueue = make(map[uint32]*UniIDFlowQueue)
vinokuma926cb3e2023-03-29 11:41:06 +0530168 // Get the flowhash from db and update the flowhash variable in the device.
Naveen Sampath04696f72022-06-13 15:19:14 +0530169 device.SouthBoundID = southBoundID
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530170 device.MfrDesc = mfr
171 device.HwDesc = hwDesc
172 device.SwDesc = swDesc
173 device.TimeStamp = time.Now()
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530174 flowHash, err := db.GetFlowHash(cntx, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530175 if err != nil {
176 device.flowHash = DefaultMaxFlowQueues
177 } else {
178 var hash uint32
179 err = json.Unmarshal([]byte(flowHash), &hash)
180 if err != nil {
Akash Soni6168f312023-05-18 20:57:33 +0530181 logger.Errorw(ctx, "Failed to unmarshall flowhash", log.Fields{"data": flowHash})
Naveen Sampath04696f72022-06-13 15:19:14 +0530182 } else {
183 device.flowHash = hash
184 }
185 }
186 logger.Infow(ctx, "Flow hash for device", log.Fields{"Deviceid": id, "hash": device.flowHash})
187 return &device
188}
189
190// ResetCache to reset cache
191func (d *Device) ResetCache() {
192 logger.Warnw(ctx, "Resetting flows, meters and groups cache", log.Fields{"Device": d.ID})
193 d.flows = make(map[uint64]*of.VoltSubFlow)
194 d.meters = make(map[uint32]*of.Meter)
195 d.groups = sync.Map{}
196}
197
198// GetFlow - Get the flow from device obj
199func (d *Device) GetFlow(cookie uint64) (*of.VoltSubFlow, bool) {
200 d.flowLock.RLock()
201 defer d.flowLock.RUnlock()
Naveen Sampath04696f72022-06-13 15:19:14 +0530202 flow, ok := d.flows[cookie]
Akash Sonief452f12024-12-12 18:20:28 +0530203 logger.Debugw(ctx, "Get Flow", log.Fields{"Cookie": cookie})
Naveen Sampath04696f72022-06-13 15:19:14 +0530204 return flow, ok
205}
206
Tinoj Josephec742f62022-09-29 19:11:10 +0530207// GetAllFlows - Get the flow from device obj
vinokuma926cb3e2023-03-29 11:41:06 +0530208func (d *Device) GetAllFlows() []*of.VoltSubFlow {
Tinoj Josephec742f62022-09-29 19:11:10 +0530209 d.flowLock.RLock()
210 defer d.flowLock.RUnlock()
211 var flows []*of.VoltSubFlow
Akash Soni6168f312023-05-18 20:57:33 +0530212 logger.Debugw(ctx, "Get All Flows", log.Fields{"deviceID": d.ID})
Tinoj Josephec742f62022-09-29 19:11:10 +0530213 for _, f := range d.flows {
214 flows = append(flows, f)
215 }
216 return flows
217}
218
219// GetAllPendingFlows - Get the flow from device obj
vinokuma926cb3e2023-03-29 11:41:06 +0530220func (d *Device) GetAllPendingFlows() []*of.VoltSubFlow {
Tinoj Josephec742f62022-09-29 19:11:10 +0530221 d.flowLock.RLock()
222 defer d.flowLock.RUnlock()
223 var flows []*of.VoltSubFlow
Akash Soni6168f312023-05-18 20:57:33 +0530224 logger.Debugw(ctx, "Get All Pending Flows", log.Fields{"deviceID": d.ID})
Tinoj Josephec742f62022-09-29 19:11:10 +0530225 for _, f := range d.flows {
226 if f.State == of.FlowAddPending {
227 flows = append(flows, f)
228 }
229 }
230 return flows
231}
232
Naveen Sampath04696f72022-06-13 15:19:14 +0530233// AddFlow - Adds the flow to the device and also to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530234func (d *Device) AddFlow(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530235 d.flowLock.Lock()
236 defer d.flowLock.Unlock()
Akash Soni6168f312023-05-18 20:57:33 +0530237 logger.Debugw(ctx, "AddFlow to device", log.Fields{"Cookie": flow.Cookie})
Akash Sonief452f12024-12-12 18:20:28 +0530238 if dbFlow, ok := d.flows[flow.Cookie]; ok {
239 // In case of ONU reboot after flow delete failure, try to install flow in the device by checking for previous flow state
240 if dbFlow.State != of.FlowDelFailure {
241 return errors.New(ErrDuplicateFlow)
242 }
Naveen Sampath04696f72022-06-13 15:19:14 +0530243 }
244 d.flows[flow.Cookie] = flow
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530245 d.AddFlowToDb(cntx, flow)
Naveen Sampath04696f72022-06-13 15:19:14 +0530246 return nil
247}
248
249// AddFlowToDb is the utility to add the flow to the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530250func (d *Device) AddFlowToDb(cntx context.Context, flow *of.VoltSubFlow) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530251 if b, err := json.Marshal(flow); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530252 if err = db.PutFlow(cntx, d.ID, flow.Cookie, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530253 logger.Errorw(ctx, "Write Flow to DB failed", log.Fields{"device": d.ID, "cookie": flow.Cookie, "Reason": err})
254 }
255 }
256}
257
258// DelFlow - Deletes the flow from the device and the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530259func (d *Device) DelFlow(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530260 d.flowLock.Lock()
261 defer d.flowLock.Unlock()
262 if _, ok := d.flows[flow.Cookie]; ok {
263 delete(d.flows, flow.Cookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530264 d.DelFlowFromDb(cntx, flow.Cookie)
Naveen Sampath04696f72022-06-13 15:19:14 +0530265 return nil
266 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530267 return errors.New("flow does not exist")
Naveen Sampath04696f72022-06-13 15:19:14 +0530268}
269
270// DelFlowFromDb is utility to delete the flow from the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530271func (d *Device) DelFlowFromDb(cntx context.Context, flowID uint64) {
272 _ = db.DelFlow(cntx, d.ID, flowID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530273}
274
275// IsFlowPresentWithOldCookie is to check whether there is any flow with old cookie.
276func (d *Device) IsFlowPresentWithOldCookie(flow *of.VoltSubFlow) bool {
277 d.flowLock.RLock()
278 defer d.flowLock.RUnlock()
279 if _, ok := d.flows[flow.Cookie]; ok {
280 return false
281 } else if flow.OldCookie != 0 && flow.Cookie != flow.OldCookie {
282 if _, ok := d.flows[flow.OldCookie]; ok {
Akash Sonief452f12024-12-12 18:20:28 +0530283 logger.Warnw(ctx, "Flow present with old cookie", log.Fields{"OldCookie": flow.OldCookie})
Naveen Sampath04696f72022-06-13 15:19:14 +0530284 return true
285 }
286 }
287 return false
288}
289
290// DelFlowWithOldCookie is to delete flow with old cookie.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530291func (d *Device) DelFlowWithOldCookie(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530292 d.flowLock.Lock()
293 defer d.flowLock.Unlock()
294 if _, ok := d.flows[flow.OldCookie]; ok {
Akash Soni6168f312023-05-18 20:57:33 +0530295 logger.Debugw(ctx, "Flow was added before vgc upgrade. Trying to delete with old cookie",
Naveen Sampath04696f72022-06-13 15:19:14 +0530296 log.Fields{"OldCookie": flow.OldCookie})
297 delete(d.flows, flow.OldCookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530298 d.DelFlowFromDb(cntx, flow.OldCookie)
Naveen Sampath04696f72022-06-13 15:19:14 +0530299 return nil
300 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530301 return errors.New("flow does not exist")
Naveen Sampath04696f72022-06-13 15:19:14 +0530302}
303
304// RestoreFlowsFromDb to restore flows from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530305func (d *Device) RestoreFlowsFromDb(cntx context.Context) {
306 flows, _ := db.GetFlows(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530307 for _, flow := range flows {
308 b, ok := flow.Value.([]byte)
309 if !ok {
310 logger.Warn(ctx, "The value type is not []byte")
311 continue
312 }
313 d.CreateFlowFromString(b)
314 }
315}
316
317// CreateFlowFromString to create flow from string
318func (d *Device) CreateFlowFromString(b []byte) {
319 var flow of.VoltSubFlow
320 if err := json.Unmarshal(b, &flow); err == nil {
321 if _, ok := d.flows[flow.Cookie]; !ok {
322 logger.Debugw(ctx, "Adding Flow From Db", log.Fields{"Cookie": flow.Cookie})
323 d.flows[flow.Cookie] = &flow
324 } else {
325 logger.Warnw(ctx, "Duplicate Flow", log.Fields{"Cookie": flow.Cookie})
326 }
327 } else {
328 logger.Warn(ctx, "Unmarshal failed")
329 }
330}
331
332// ----------------------------------------------------------
333// Database related functionality
334// Group operations at the device which include update and delete
335
336// UpdateGroupEntry - Adds/Updates the group to the device and also to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530337func (d *Device) UpdateGroupEntry(cntx context.Context, group *of.Group) {
Akash Soni6168f312023-05-18 20:57:33 +0530338 logger.Debugw(ctx, "Update Group to device", log.Fields{"ID": group.GroupID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530339 d.groups.Store(group.GroupID, group)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530340 d.AddGroupToDb(cntx, group)
Naveen Sampath04696f72022-06-13 15:19:14 +0530341}
342
343// AddGroupToDb - Utility to add the group to the device DB
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530344func (d *Device) AddGroupToDb(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530345 if b, err := json.Marshal(group); err == nil {
Akash Soni6168f312023-05-18 20:57:33 +0530346 logger.Debugw(ctx, "Adding Group to DB", log.Fields{"grp": group, "Json": string(b)})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530347 if err = db.PutGroup(cntx, d.ID, group.GroupID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530348 logger.Errorw(ctx, "Write Group to DB failed", log.Fields{"device": d.ID, "groupID": group.GroupID, "Reason": err})
349 }
350 }
351}
352
353// DelGroupEntry - Deletes the group from the device and the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530354func (d *Device) DelGroupEntry(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530355 if _, ok := d.groups.Load(group.GroupID); ok {
356 d.groups.Delete(group.GroupID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530357 d.DelGroupFromDb(cntx, group.GroupID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530358 }
359}
360
361// DelGroupFromDb - Utility to delete the Group from the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530362func (d *Device) DelGroupFromDb(cntx context.Context, groupID uint32) {
363 _ = db.DelGroup(cntx, d.ID, groupID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530364}
365
vinokuma926cb3e2023-03-29 11:41:06 +0530366// RestoreGroupsFromDb - restores all groups from DB
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530367func (d *Device) RestoreGroupsFromDb(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530368 logger.Info(ctx, "Restoring Groups")
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530369 groups, _ := db.GetGroups(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530370 for _, group := range groups {
371 b, ok := group.Value.([]byte)
372 if !ok {
373 logger.Warn(ctx, "The value type is not []byte")
374 continue
375 }
376 d.CreateGroupFromString(b)
377 }
378}
379
vinokuma926cb3e2023-03-29 11:41:06 +0530380// CreateGroupFromString - Forms group struct from json string
Naveen Sampath04696f72022-06-13 15:19:14 +0530381func (d *Device) CreateGroupFromString(b []byte) {
382 var group of.Group
383 if err := json.Unmarshal(b, &group); err == nil {
384 if _, ok := d.groups.Load(group.GroupID); !ok {
385 logger.Debugw(ctx, "Adding Group From Db", log.Fields{"GroupId": group.GroupID})
386 d.groups.Store(group.GroupID, &group)
387 } else {
388 logger.Warnw(ctx, "Duplicate Group", log.Fields{"GroupId": group.GroupID})
389 }
390 } else {
391 logger.Warn(ctx, "Unmarshal failed")
392 }
393}
394
395// AddMeter to add meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530396func (d *Device) AddMeter(cntx context.Context, meter *of.Meter) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530397 d.meterLock.Lock()
398 defer d.meterLock.Unlock()
399 if _, ok := d.meters[meter.ID]; ok {
Akash Sonid36d23b2023-08-18 12:51:40 +0530400 return errors.New("duplicate meter")
Naveen Sampath04696f72022-06-13 15:19:14 +0530401 }
402 d.meters[meter.ID] = meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530403 go d.AddMeterToDb(cntx, meter)
Naveen Sampath04696f72022-06-13 15:19:14 +0530404 return nil
405}
406
Sridhar Ravindra2d2ef4e2023-02-08 16:43:38 +0530407// UpdateMeter to update meter
408func (d *Device) UpdateMeter(cntx context.Context, meter *of.Meter) error {
vinokuma926cb3e2023-03-29 11:41:06 +0530409 d.meterLock.Lock()
410 defer d.meterLock.Unlock()
411 if _, ok := d.meters[meter.ID]; ok {
412 d.meters[meter.ID] = meter
413 d.AddMeterToDb(cntx, meter)
414 } else {
Akash Sonid36d23b2023-08-18 12:51:40 +0530415 return errors.New("meter not found for updation")
vinokuma926cb3e2023-03-29 11:41:06 +0530416 }
417 return nil
Sridhar Ravindra2d2ef4e2023-02-08 16:43:38 +0530418}
419
Naveen Sampath04696f72022-06-13 15:19:14 +0530420// GetMeter to get meter
421func (d *Device) GetMeter(id uint32) (*of.Meter, error) {
422 d.meterLock.RLock()
423 defer d.meterLock.RUnlock()
424 if m, ok := d.meters[id]; ok {
425 return m, nil
426 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530427 return nil, errors.New("meter not found")
Naveen Sampath04696f72022-06-13 15:19:14 +0530428}
429
430// DelMeter to delete meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530431func (d *Device) DelMeter(cntx context.Context, meter *of.Meter) bool {
Naveen Sampath04696f72022-06-13 15:19:14 +0530432 d.meterLock.Lock()
433 defer d.meterLock.Unlock()
434 if _, ok := d.meters[meter.ID]; ok {
435 delete(d.meters, meter.ID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530436 go d.DelMeterFromDb(cntx, meter.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530437 return true
438 }
439 return false
440}
441
442// AddMeterToDb is utility to add the Group to the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530443func (d *Device) AddMeterToDb(cntx context.Context, meter *of.Meter) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530444 if b, err := json.Marshal(meter); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530445 if err = db.PutDeviceMeter(cntx, d.ID, meter.ID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530446 logger.Errorw(ctx, "Write Meter to DB failed", log.Fields{"device": d.ID, "meterID": meter.ID, "Reason": err})
447 }
448 }
449}
450
451// DelMeterFromDb to delete meter from db
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530452func (d *Device) DelMeterFromDb(cntx context.Context, id uint32) {
453 _ = db.DelDeviceMeter(cntx, d.ID, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530454}
455
456// RestoreMetersFromDb to restore meters from db
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530457func (d *Device) RestoreMetersFromDb(cntx context.Context) {
458 meters, _ := db.GetDeviceMeters(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530459 for _, meter := range meters {
460 b, ok := meter.Value.([]byte)
461 if !ok {
462 logger.Warn(ctx, "The value type is not []byte")
463 continue
464 }
465 d.CreateMeterFromString(b)
466 }
467}
468
469// CreateMeterFromString to create meter from string
470func (d *Device) CreateMeterFromString(b []byte) {
471 var meter of.Meter
472 if err := json.Unmarshal(b, &meter); err == nil {
473 if _, ok := d.meters[meter.ID]; !ok {
474 logger.Debugw(ctx, "Adding Meter From Db", log.Fields{"ID": meter.ID})
475 d.meters[meter.ID] = &meter
476 } else {
477 logger.Warnw(ctx, "Duplicate Meter", log.Fields{"ID": meter.ID})
478 }
479 } else {
Akash Soni6168f312023-05-18 20:57:33 +0530480 logger.Warnw(ctx, "Unmarshal failed", log.Fields{"error": err, "meter": string(b)})
Naveen Sampath04696f72022-06-13 15:19:14 +0530481 }
482}
483
484// VolthaClient to get voltha client
485func (d *Device) VolthaClient() voltha.VolthaServiceClient {
486 return d.vclientHolder.Get()
487}
488
489// AddPort to add the port as requested by the device/VOLTHA
490// Inform the application if the port is successfully added
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530491func (d *Device) AddPort(cntx context.Context, mp *ofp.OfpPort) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530492 d.portLock.Lock()
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530493 id := mp.PortNo
494 name := mp.Name
Naveen Sampath04696f72022-06-13 15:19:14 +0530495 if _, ok := d.PortsByID[id]; ok {
Akash Sonief452f12024-12-12 18:20:28 +0530496 d.portLock.Unlock()
Akash Sonid36d23b2023-08-18 12:51:40 +0530497 return errors.New(Duplicate_Port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530498 }
499 if _, ok := d.PortsByName[name]; ok {
Akash Sonief452f12024-12-12 18:20:28 +0530500 d.portLock.Unlock()
Akash Sonid36d23b2023-08-18 12:51:40 +0530501 return errors.New(Duplicate_Port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530502 }
503
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530504 p := NewDevicePort(mp)
Naveen Sampath04696f72022-06-13 15:19:14 +0530505 d.PortsByID[id] = p
506 d.PortsByName[name] = p
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530507 d.WritePortToDb(cntx, p)
Akash Sonief452f12024-12-12 18:20:28 +0530508 d.portLock.Unlock()
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530509 GetController().PortAddInd(cntx, d.ID, p.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530510 logger.Infow(ctx, "Added Port", log.Fields{"Device": d.ID, "Port": id})
511 return nil
512}
513
514// DelPort to delete the port as requested by the device/VOLTHA
515// Inform the application if the port is successfully deleted
Sridhar Ravindra0bc5dc52023-12-13 19:03:30 +0530516func (d *Device) DelPort(cntx context.Context, id uint32, portName string) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530517 p := d.GetPortByID(id)
518 if p == nil {
Sridhar Ravindra0bc5dc52023-12-13 19:03:30 +0530519 p = d.GetPortByName(portName)
520 if p == nil {
521 return errors.New("unknown port")
522 } else {
523 logger.Infow(ctx, "Found port by name", log.Fields{"PortName": p.Name, "PortID": p.ID})
524 }
Naveen Sampath04696f72022-06-13 15:19:14 +0530525 }
526 if p.State == PortStateUp {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530527 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530528 }
Tinoj Joseph4ead4e02023-01-30 03:12:44 +0530529 GetController().PortDelInd(cntx, d.ID, p.Name)
530
Naveen Sampath04696f72022-06-13 15:19:14 +0530531 d.portLock.Lock()
532 defer d.portLock.Unlock()
533
Naveen Sampath04696f72022-06-13 15:19:14 +0530534 delete(d.PortsByID, p.ID)
535 delete(d.PortsByName, p.Name)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530536 d.DelPortFromDb(cntx, p.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530537 logger.Infow(ctx, "Deleted Port", log.Fields{"Device": d.ID, "Port": id})
538 return nil
539}
540
Akash Sonief452f12024-12-12 18:20:28 +0530541// CheckAndDeletePort deletes the port if the port name matches with VGC and one sent from voltha in OFPPR_DELETE
542func (d *Device) CheckAndDeletePort(cntx context.Context, portNo uint32, portName string) {
543 if p := d.GetPortByID(portNo); p != nil {
544 if p.Name != portName {
545 logger.Warnw(ctx, "Dropping Del Port event: Port name mismatch", log.Fields{"vgcPortName": p.Name, "ofpPortName": portName, "ID": p.ID})
546 return
547 }
548 if err := d.DelPort(cntx, portNo, portName); err != nil {
549 logger.Warnw(ctx, "DelPort Failed", log.Fields{"Port No": portNo, "Error": err})
550 }
551 }
552}
553
Naveen Sampath04696f72022-06-13 15:19:14 +0530554// UpdatePortByName is utility to update the port by Name
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530555func (d *Device) UpdatePortByName(cntx context.Context, name string, port uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530556 d.portLock.Lock()
557 defer d.portLock.Unlock()
558
559 p, ok := d.PortsByName[name]
560 if !ok {
561 return
562 }
563 delete(d.PortsByID, p.ID)
564 p.ID = port
565 d.PortsByID[port] = p
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530566 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530567 GetController().PortUpdateInd(d.ID, p.Name, p.ID)
568 logger.Infow(ctx, "Updated Port", log.Fields{"Device": d.ID, "Port": p.ID, "PortName": name})
569}
570
571// GetPortName to get the name of the port by its id
572func (d *Device) GetPortName(id uint32) (string, error) {
573 d.portLock.RLock()
574 defer d.portLock.RUnlock()
575
576 if p, ok := d.PortsByID[id]; ok {
577 return p.Name, nil
578 }
579 logger.Errorw(ctx, "Port not found", log.Fields{"port": id})
Akash Sonid36d23b2023-08-18 12:51:40 +0530580 return "", errors.New(Unknown_Port_ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530581}
582
583// GetPortByID is utility to retrieve the port by ID
584func (d *Device) GetPortByID(id uint32) *DevicePort {
585 d.portLock.RLock()
586 defer d.portLock.RUnlock()
587
588 p, ok := d.PortsByID[id]
589 if ok {
590 return p
591 }
592 return nil
593}
594
595// GetPortByName is utility to retrieve the port by Name
596func (d *Device) GetPortByName(name string) *DevicePort {
597 d.portLock.RLock()
598 defer d.portLock.RUnlock()
599
600 p, ok := d.PortsByName[name]
601 if ok {
602 return p
603 }
604 return nil
605}
606
607// GetPortState to get the state of the port by name
608func (d *Device) GetPortState(name string) (PortState, error) {
609 d.portLock.RLock()
610 defer d.portLock.RUnlock()
611
612 if p, ok := d.PortsByName[name]; ok {
613 return p.State, nil
614 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530615 return PortStateDown, errors.New(Unknown_Port_ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530616}
617
618// GetPortID to get the port-id by the port name
619func (d *Device) GetPortID(name string) (uint32, error) {
620 d.portLock.RLock()
621 defer d.portLock.RUnlock()
622
623 if p, ok := d.PortsByName[name]; ok {
624 return p.ID, nil
625 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530626 return 0, errors.New(Unknown_Port_ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530627}
628
629// WritePortToDb to add the port to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530630func (d *Device) WritePortToDb(ctx context.Context, port *DevicePort) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530631 port.Version = database.PresentVersionMap[database.DevicePortPath]
632 if b, err := json.Marshal(port); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530633 if err = db.PutPort(ctx, d.ID, port.ID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530634 logger.Errorw(ctx, "Write port to DB failed", log.Fields{"device": d.ID, "port": port.ID, "Reason": err})
635 }
636 }
637}
638
639// DelPortFromDb to delete port from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530640func (d *Device) DelPortFromDb(cntx context.Context, id uint32) {
641 _ = db.DelPort(cntx, d.ID, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530642}
643
644// RestorePortsFromDb to restore ports from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530645func (d *Device) RestorePortsFromDb(cntx context.Context) {
646 ports, _ := db.GetPorts(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530647 for _, port := range ports {
648 b, ok := port.Value.([]byte)
649 if !ok {
650 logger.Warn(ctx, "The value type is not []byte")
651 continue
652 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530653 d.CreatePortFromString(cntx, b)
Naveen Sampath04696f72022-06-13 15:19:14 +0530654 }
655}
656
657// CreatePortFromString to create port from string
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530658func (d *Device) CreatePortFromString(cntx context.Context, b []byte) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530659 var port DevicePort
660 if err := json.Unmarshal(b, &port); err == nil {
661 if _, ok := d.PortsByID[port.ID]; !ok {
662 logger.Debugw(ctx, "Adding Port From Db", log.Fields{"ID": port.ID})
663 d.PortsByID[port.ID] = &port
664 d.PortsByName[port.Name] = &port
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530665 GetController().PortAddInd(cntx, d.ID, port.ID, port.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530666 } else {
Akash Sonid36d23b2023-08-18 12:51:40 +0530667 logger.Warnw(ctx, Duplicate_Port, log.Fields{"ID": port.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530668 }
669 } else {
Akash Soni6168f312023-05-18 20:57:33 +0530670 logger.Warnw(ctx, "Unmarshal failed", log.Fields{"port": string(b)})
Naveen Sampath04696f72022-06-13 15:19:14 +0530671 }
672}
673
674// Delete : OLT Delete functionality yet to be implemented. IDeally all of the
675// resources should have been removed by this time. It is an error
676// scenario if the OLT has resources associated with it.
677func (d *Device) Delete() {
678 d.StopAll()
679}
680
681// Stop to stop the task
682func (d *Device) Stop() {
683}
684
685// ConnectInd is called when the connection between VGC and the VOLTHA is
686// restored. This will perform audit of the device post reconnection
687func (d *Device) ConnectInd(ctx context.Context, discType intf.DiscoveryType) {
688 logger.Warnw(ctx, "Audit Upon Connection Establishment", log.Fields{"Device": d.ID, "State": d.State})
689 ctx1, cancel := context.WithCancel(ctx)
690 d.cancel = cancel
691 d.ctx = ctx1
692 d.Tasks.Initialize(ctx1)
693
Akash Soni6168f312023-05-18 20:57:33 +0530694 logger.Debugw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530695 d.State = DeviceStateUP
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530696 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530697 GetController().DeviceUpInd(d.ID)
698
Akash Soni6168f312023-05-18 20:57:33 +0530699 logger.Debugw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530700 t := NewAuditDevice(d, AuditEventDeviceDisc)
701 d.Tasks.AddTask(t)
702
703 t1 := NewAuditTablesTask(d)
704 d.Tasks.AddTask(t1)
705
706 t2 := NewPendingProfilesTask(d)
707 d.Tasks.AddTask(t2)
708
709 go d.synchronizeDeviceTables()
710}
711
712func (d *Device) synchronizeDeviceTables() {
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530713 tick := time.NewTicker(GetController().GetDeviceTableSyncDuration())
Naveen Sampath04696f72022-06-13 15:19:14 +0530714loop:
715 for {
716 select {
717 case <-d.ctx.Done():
vinokuma926cb3e2023-03-29 11:41:06 +0530718 logger.Warnw(d.ctx, "Context Done. Canceling Periodic Audit", log.Fields{"Context": ctx, "Device": d.ID, "DeviceSerialNum": d.SerialNum})
Naveen Sampath04696f72022-06-13 15:19:14 +0530719 break loop
720 case <-tick.C:
721 t1 := NewAuditTablesTask(d)
722 d.Tasks.AddTask(t1)
723 }
724 }
725 tick.Stop()
726}
727
728// DeviceUpInd is called when the logical device state changes to UP. This will perform audit of the device post reconnection
729func (d *Device) DeviceUpInd() {
730 logger.Warnw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
731 d.State = DeviceStateUP
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530732 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530733 GetController().DeviceUpInd(d.ID)
734
735 logger.Warnw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
736 t := NewAuditDevice(d, AuditEventDeviceDisc)
737 d.Tasks.AddTask(t)
738
739 t1 := NewAuditTablesTask(d)
740 d.Tasks.AddTask(t1)
741
742 t2 := NewPendingProfilesTask(d)
743 d.Tasks.AddTask(t2)
744}
745
746// DeviceDownInd is called when the logical device state changes to Down.
747func (d *Device) DeviceDownInd() {
748 logger.Warnw(ctx, "Device State change Ind: Down", log.Fields{"Device": d.ID})
749 d.State = DeviceStateDOWN
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530750 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530751 GetController().DeviceDownInd(d.ID)
752}
753
754// DeviceRebootInd is called when the logical device is rebooted.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530755func (d *Device) DeviceRebootInd(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530756 logger.Warnw(ctx, "Device State change Ind: Rebooted", log.Fields{"Device": d.ID})
757
758 if d.State == DeviceStateREBOOTED {
759 d.State = DeviceStateREBOOTED
760 logger.Warnw(ctx, "Ignoring Device State change Ind: REBOOT, Device Already in REBOOT state", log.Fields{"Device": d.ID, "SeralNo": d.SerialNum})
761 return
762 }
763
764 d.State = DeviceStateREBOOTED
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530765 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530766 GetController().SetRebootInProgressForDevice(d.ID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530767 GetController().DeviceRebootInd(cntx, d.ID, d.SerialNum, d.SouthBoundID)
768 d.ReSetAllPortStates(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +0530769}
770
771// DeviceDisabledInd is called when the logical device is disabled
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530772func (d *Device) DeviceDisabledInd(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530773 logger.Warnw(ctx, "Device State change Ind: Disabled", log.Fields{"Device": d.ID})
774 d.State = DeviceStateDISABLED
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530775 d.TimeStamp = time.Now()
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530776 GetController().DeviceDisableInd(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530777}
778
vinokuma926cb3e2023-03-29 11:41:06 +0530779// ReSetAllPortStates - Set all logical device port status to DOWN
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530780func (d *Device) ReSetAllPortStates(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530781 logger.Warnw(ctx, "Resetting all Ports State to DOWN", log.Fields{"Device": d.ID, "State": d.State})
782
783 d.portLock.Lock()
784 defer d.portLock.Unlock()
785
786 for _, port := range d.PortsByID {
787 if port.State != PortStateDown {
788 logger.Infow(ctx, "Resetting Port State to DOWN", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530789 GetController().PortDownInd(cntx, d.ID, port.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530790 port.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530791 d.WritePortToDb(cntx, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530792 }
793 }
794}
795
vinokuma926cb3e2023-03-29 11:41:06 +0530796// ReSetAllPortStatesInDb - Set all logical device port status to DOWN in DB and skip indication to application
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530797func (d *Device) ReSetAllPortStatesInDb(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530798 logger.Warnw(ctx, "Resetting all Ports State to DOWN In DB", log.Fields{"Device": d.ID, "State": d.State})
799
800 d.portLock.Lock()
801 defer d.portLock.Unlock()
802
803 for _, port := range d.PortsByID {
804 if port.State != PortStateDown {
Akash Soni6168f312023-05-18 20:57:33 +0530805 logger.Debugw(ctx, "Resetting Port State to DOWN and Write to DB", log.Fields{"Device": d.ID, "Port": port})
Naveen Sampath04696f72022-06-13 15:19:14 +0530806 port.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530807 d.WritePortToDb(cntx, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530808 }
809 }
810}
811
812// ProcessPortUpdate deals with the change in port id (ONU movement) and taking action
813// to update only when the port state is DOWN
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530814func (d *Device) ProcessPortUpdate(cntx context.Context, portName string, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530815 if p := d.GetPortByName(portName); p != nil {
816 if p.ID != port {
Akash Sonief452f12024-12-12 18:20:28 +0530817 logger.Warnw(ctx, "Port update indication received with mismatching ID", log.Fields{"Port": p.Name, "Old PortID": p.ID, "New Port ID": port})
818 return
819 //Do not process port update received from change event, as we will only handle port updates during polling
Naveen Sampath04696f72022-06-13 15:19:14 +0530820 }
Akash Sonief452f12024-12-12 18:20:28 +0530821 d.ProcessPortState(cntx, port, state, portName)
Naveen Sampath04696f72022-06-13 15:19:14 +0530822 }
823}
824
825// ***Operations Performed on Port state Transitions***
826//
827// |-----------------------------------------------------------------------------|
828// | State | Action |
829// |--------------------|--------------------------------------------------------|
830// | UP | UNI - Trigger Flow addition for service configured |
831// | | NNI - Trigger Flow addition for vnets & mvlan profiles |
832// | | |
833// | DOWN | UNI - Trigger Flow deletion for service configured |
834// | | NNI - Trigger Flow deletion for vnets & mvlan profiles |
835// | | |
836// |-----------------------------------------------------------------------------|
837//
838
839// ProcessPortState deals with the change in port status and taking action
840// based on the new state and the old state
Akash Sonief452f12024-12-12 18:20:28 +0530841func (d *Device) ProcessPortState(cntx context.Context, port uint32, state uint32, portName string) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530842 if d.State != DeviceStateUP && !util.IsNniPort(port) {
843 logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
844 return
845 }
846 if p := d.GetPortByID(port); p != nil {
847 logger.Infow(ctx, "Port State Processing", log.Fields{"Received": state, "Current": p.State})
848
Akash Sonief452f12024-12-12 18:20:28 +0530849 if p.Name != portName {
850 logger.Warnw(ctx, "Dropping Port State processing: Port name does not match", log.Fields{"vgcPort": p.Name, "ofpPort": portName, "ID": port})
851 return
852 }
Naveen Sampath04696f72022-06-13 15:19:14 +0530853 // Avoid blind initialization as the current tasks in the queue will be lost
854 // Eg: Service Del followed by Port Down - The flows will be dangling
855 // Eg: NNI Down followed by NNI UP - Mcast data flows will be dangling
856 p.Tasks.CheckAndInitialize(d.ctx)
857 if state == uint32(ofp.OfpPortState_OFPPS_LIVE) && p.State == PortStateDown {
858 // Transition from DOWN to UP
859 logger.Infow(ctx, "Port State Change to UP", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530860 GetController().PortUpInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530861 p.State = PortStateUp
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530862 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530863 } else if (state != uint32(ofp.OfpPortState_OFPPS_LIVE)) && (p.State != PortStateDown) {
864 // Transition from UP to Down
865 logger.Infow(ctx, "Port State Change to Down", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530866 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530867 p.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530868 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530869 } else {
870 logger.Warnw(ctx, "Dropping Port Ind: No Change in Port State", log.Fields{"PortName": p.Name, "ID": port, "Device": d.ID, "PortState": p.State, "IncomingState": state})
871 }
872 }
873}
874
875// ProcessPortStateAfterReboot - triggers the port state indication to sort out configu mismatch due to reboot
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530876func (d *Device) ProcessPortStateAfterReboot(cntx context.Context, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530877 if d.State != DeviceStateUP && !util.IsNniPort(port) {
878 logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
879 return
880 }
881 if p := d.GetPortByID(port); p != nil {
882 logger.Infow(ctx, "Port State Processing after Reboot", log.Fields{"Received": state, "Current": p.State})
883 p.Tasks.Initialize(d.ctx)
884 if p.State == PortStateUp {
885 logger.Infow(ctx, "Port State: UP", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530886 GetController().PortUpInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530887 } else if p.State == PortStateDown {
888 logger.Infow(ctx, "Port State: Down", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530889 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530890 }
891 }
892}
893
894// ChangeEvent : Change event brings in ports related changes such as addition/deletion
895// or modification where the port status change up/down is indicated to the
896// controller
897func (d *Device) ChangeEvent(event *ofp.ChangeEvent) error {
898 cet := NewChangeEventTask(d.ctx, event, d)
899 d.AddTask(cet)
900 return nil
901}
902
903// PacketIn handle the incoming packet-in and deliver to the application for the
904// actual processing
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530905func (d *Device) PacketIn(cntx context.Context, pkt *ofp.PacketIn) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530906 logger.Debugw(ctx, "Received a Packet-In", log.Fields{"Device": d.ID})
907 if pkt.PacketIn.Reason != ofp.OfpPacketInReason_OFPR_ACTION {
908 logger.Warnw(ctx, "Unsupported PacketIn Reason", log.Fields{"Reason": pkt.PacketIn.Reason})
909 return
910 }
911 data := pkt.PacketIn.Data
912 port := PacketInGetPort(pkt.PacketIn)
913 if pName, err := d.GetPortName(port); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530914 GetController().PacketInInd(cntx, d.ID, pName, data)
Naveen Sampath04696f72022-06-13 15:19:14 +0530915 } else {
916 logger.Warnw(ctx, "Unknown Port", log.Fields{"Reason": err.Error()})
917 }
918}
919
920// PacketInGetPort to get the port on which the packet-in is reported
921func PacketInGetPort(pkt *ofp.OfpPacketIn) uint32 {
922 for _, field := range pkt.Match.OxmFields {
923 if field.OxmClass == ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
924 if ofbField, ok := field.Field.(*ofp.OfpOxmField_OfbField); ok {
925 if ofbField.OfbField.Type == ofp.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT {
926 if port, ok := ofbField.OfbField.Value.(*ofp.OfpOxmOfbField_Port); ok {
927 return port.Port
928 }
929 }
930 }
931 }
932 }
933 return 0
934}
935
936// PacketOutReq receives the packet out request from the application via the
937// controller. The interface from the application uses name as the identity.
938func (d *Device) PacketOutReq(outport string, inport string, data []byte, isCustomPkt bool) error {
939 inp, err := d.GetPortID(inport)
940 if err != nil {
Akash Sonid36d23b2023-08-18 12:51:40 +0530941 return errors.New("unknown inport")
Naveen Sampath04696f72022-06-13 15:19:14 +0530942 }
943 outp, err1 := d.GetPortID(outport)
944 if err1 != nil {
Akash Sonid36d23b2023-08-18 12:51:40 +0530945 return errors.New("unknown outport")
Naveen Sampath04696f72022-06-13 15:19:14 +0530946 }
947 logger.Debugw(ctx, "Sending packet out", log.Fields{"Device": d.ID, "Inport": inport, "Outport": outport})
948 return d.SendPacketOut(outp, inp, data, isCustomPkt)
949}
950
951// SendPacketOut is responsible for building the OF structure and send the
952// packet-out to the VOLTHA
953func (d *Device) SendPacketOut(outport uint32, inport uint32, data []byte, isCustomPkt bool) error {
954 pout := &ofp.PacketOut{}
955 pout.Id = d.ID
956 opout := &ofp.OfpPacketOut{}
957 pout.PacketOut = opout
958 opout.InPort = inport
959 opout.Data = data
960 opout.Actions = []*ofp.OfpAction{
961 {
962 Type: ofp.OfpActionType_OFPAT_OUTPUT,
963 Action: &ofp.OfpAction_Output{
964 Output: &ofp.OfpActionOutput{
965 Port: outport,
966 MaxLen: 65535,
967 },
968 },
969 },
970 }
971 d.packetOutChannel <- pout
972 return nil
973}
974
975// UpdateFlows receives the flows in the form that is implemented
976// in the VGC and transforms them to the OF format. This is handled
977// as a port of the task that is enqueued to do the same.
978func (d *Device) UpdateFlows(flow *of.VoltFlow, devPort *DevicePort) {
979 t := NewAddFlowsTask(d.ctx, flow, d)
980 logger.Debugw(ctx, "Port Context", log.Fields{"Ctx": devPort.GetContext()})
981 // check if port isNni , if yes flows will be added to device port queues.
982 if util.IsNniPort(devPort.ID) {
983 // Adding the flows to device port queues.
984 devPort.AddTask(t)
985 return
986 }
987 // If the flowHash is enabled then add the flows to the flowhash generated queues.
988 flowQueue := d.getAndAddFlowQueueForUniID(uint32(devPort.ID))
989 if flowQueue != nil {
990 logger.Debugw(ctx, "flowHashQId", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID})
991 flowQueue.AddTask(t)
992 logger.Debugw(ctx, "Tasks Info", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID, "Total": flowQueue.TotalTasks(), "Pending": flowQueue.NumPendingTasks()})
993 } else {
994 //FlowThrotling disabled, add to the device port queue
995 devPort.AddTask(t)
996 return
997 }
998}
999
1000// UpdateGroup to update group info
1001func (d *Device) UpdateGroup(group *of.Group, devPort *DevicePort) {
1002 task := NewModGroupTask(d.ctx, group, d)
1003 logger.Debugw(ctx, "NNI Port Context", log.Fields{"Ctx": devPort.GetContext()})
1004 devPort.AddTask(task)
1005}
1006
1007// ModMeter for mod meter task
1008func (d *Device) ModMeter(command of.MeterCommand, meter *of.Meter, devPort *DevicePort) {
1009 if command == of.MeterCommandAdd {
1010 if _, err := d.GetMeter(meter.ID); err == nil {
1011 logger.Debugw(ctx, "Meter already added", log.Fields{"ID": meter.ID})
1012 return
1013 }
1014 }
1015 t := NewModMeterTask(d.ctx, command, meter, d)
1016 devPort.AddTask(t)
1017}
1018
1019func (d *Device) getAndAddFlowQueueForUniID(id uint32) *UniIDFlowQueue {
1020 d.flowQueueLock.RLock()
vinokuma926cb3e2023-03-29 11:41:06 +05301021 // If flowhash is 0 that means flowhash throttling is disabled, return nil
Naveen Sampath04696f72022-06-13 15:19:14 +05301022 if d.flowHash == 0 {
1023 d.flowQueueLock.RUnlock()
1024 return nil
1025 }
1026 flowHashID := id % uint32(d.flowHash)
1027 if value, found := d.flowQueue[uint32(flowHashID)]; found {
1028 d.flowQueueLock.RUnlock()
1029 return value
1030 }
1031 d.flowQueueLock.RUnlock()
1032 logger.Debugw(ctx, "Flow queue not found creating one", log.Fields{"uniid": id, "hash": flowHashID})
1033
1034 return d.addFlowQueueForUniID(id)
1035}
1036
1037func (d *Device) addFlowQueueForUniID(id uint32) *UniIDFlowQueue {
Naveen Sampath04696f72022-06-13 15:19:14 +05301038 d.flowQueueLock.Lock()
1039 defer d.flowQueueLock.Unlock()
1040 flowHashID := id % uint32(d.flowHash)
1041 flowQueue := NewUniIDFlowQueue(uint32(flowHashID))
1042 flowQueue.Tasks.Initialize(d.ctx)
1043 d.flowQueue[flowHashID] = flowQueue
1044 return flowQueue
1045}
1046
1047// SetFlowHash sets the device flow hash and writes to the DB.
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301048func (d *Device) SetFlowHash(cntx context.Context, hash uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301049 d.flowQueueLock.Lock()
1050 defer d.flowQueueLock.Unlock()
1051
1052 d.flowHash = hash
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301053 d.writeFlowHashToDB(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +05301054}
1055
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301056func (d *Device) writeFlowHashToDB(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301057 hash, err := json.Marshal(d.flowHash)
1058 if err != nil {
Akash Soni6168f312023-05-18 20:57:33 +05301059 logger.Errorw(ctx, "failed to marshal flow hash", log.Fields{"hash": d.flowHash, "error": err})
Naveen Sampath04696f72022-06-13 15:19:14 +05301060 return
1061 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301062 if err := db.PutFlowHash(cntx, d.ID, string(hash)); err != nil {
Akash Soni6168f312023-05-18 20:57:33 +05301063 logger.Errorw(ctx, "Failed to add flow hash to DB", log.Fields{"device": d.ID, "hash": d.flowHash, "error": err})
Naveen Sampath04696f72022-06-13 15:19:14 +05301064 }
1065}
1066
vinokuma926cb3e2023-03-29 11:41:06 +05301067// isSBOperAllowed - determines if the SB operation is allowed based on device state & force flag
Naveen Sampath04696f72022-06-13 15:19:14 +05301068func (d *Device) isSBOperAllowed(forceAction bool) bool {
Naveen Sampath04696f72022-06-13 15:19:14 +05301069 if d.State == DeviceStateUP {
1070 return true
1071 }
1072
1073 if d.State == DeviceStateDISABLED && forceAction {
1074 return true
1075 }
1076
1077 return false
1078}
1079
Akash Sonief452f12024-12-12 18:20:28 +05301080// IsFlowDelThresholdReached - check if the attempts for flow delete has reached threshold or not
1081func (d *Device) IsFlowDelThresholdReached(flowCount uint32, cookie uint64) bool {
1082 logger.Debugw(ctx, "Check flow delete threshold", log.Fields{"Cookie": cookie, "FlowCount": flowCount})
1083 return flowCount >= GetController().GetMaxFlowRetryAttempt()
Naveen Sampath04696f72022-06-13 15:19:14 +05301084}
1085
Akash Sonief452f12024-12-12 18:20:28 +05301086// IsFlowAddThresholdReached - check if the attempts for flow add has reached threshold or not
1087func (d *Device) IsFlowAddThresholdReached(flowCount uint32, cookie uint64) bool {
1088 logger.Debugw(ctx, "Check flow add threshold", log.Fields{"Cookie": cookie, "FlowCount": flowCount})
1089 return flowCount >= GetController().GetMaxFlowRetryAttempt()
1090}
1091
1092func (d *Device) UpdateFlowCount(cntx context.Context, cookie uint64) {
1093 if dbFlow, ok := d.flows[cookie]; ok {
1094 dbFlow.FlowCount++
1095 d.AddFlowToDb(cntx, dbFlow)
1096 }
1097}
1098
1099func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error) {
1100 flow, _ := d.GetFlow(cookie)
1101 d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err)
1102}
1103
1104func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301105 statusCode, statusMsg := infraerror.GetErrorInfo(err)
1106 success := isFlowOperSuccess(statusCode, oper)
1107
Akash Sonief452f12024-12-12 18:20:28 +05301108 updateFlowStatus := func(cookie uint64, state int, reason string) {
1109 d.flowLock.Lock()
1110 defer d.flowLock.Unlock()
1111 if dbFlow, ok := d.flows[cookie]; ok {
Naveen Sampath04696f72022-06-13 15:19:14 +05301112 dbFlow.State = uint8(state)
1113 dbFlow.ErrorReason = reason
Akash Sonief452f12024-12-12 18:20:28 +05301114 if state == of.FlowAddSuccess {
1115 dbFlow.FlowCount = 0
1116 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301117 d.AddFlowToDb(cntx, dbFlow)
Naveen Sampath04696f72022-06-13 15:19:14 +05301118 }
1119 }
1120
vinokuma926cb3e2023-03-29 11:41:06 +05301121 // Update flow results
Naveen Sampath04696f72022-06-13 15:19:14 +05301122 // Add - Update Success or Failure status with reason
1123 // Del - Delete entry from DB on success else update error reason
1124 if oper == of.CommandAdd {
1125 state := of.FlowAddSuccess
1126 reason := ""
1127 if !success {
1128 state = of.FlowAddFailure
1129 reason = statusMsg
1130 }
Akash Sonief452f12024-12-12 18:20:28 +05301131 updateFlowStatus(cookie, state, reason)
1132 logger.Debugw(ctx, "Add flow updated to DB", log.Fields{"Cookie": cookie, "State": state})
Naveen Sampath04696f72022-06-13 15:19:14 +05301133 } else {
1134 if success && flow != nil {
Akash Sonief452f12024-12-12 18:20:28 +05301135 logger.Debugw(ctx, "Deleted flow from device and DB", log.Fields{"Cookie": cookie})
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301136 if err := d.DelFlow(cntx, flow); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301137 logger.Warnw(ctx, "Delete Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
1138 }
1139 } else if !success {
Akash Sonief452f12024-12-12 18:20:28 +05301140 if d.IsFlowDelThresholdReached(flow.FlowCount, flow.Cookie) {
1141 logger.Debugw(ctx, "Deleted flow from device and DB after delete threshold reached", log.Fields{"Cookie": cookie})
1142 if err := d.DelFlow(cntx, flow); err != nil {
1143 logger.Warnw(ctx, "Delete Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
1144 }
1145 } else {
1146 updateFlowStatus(cookie, of.FlowDelFailure, statusMsg)
1147 logger.Debugw(ctx, "Delete flow updated to DB", log.Fields{"Cookie": cookie})
1148 }
Naveen Sampath04696f72022-06-13 15:19:14 +05301149 }
1150 }
1151
1152 flowResult := intf.FlowStatus{
1153 Cookie: strconv.FormatUint(cookie, 10),
1154 Device: d.ID,
1155 FlowModType: oper,
1156 Flow: flow,
1157 Status: statusCode,
1158 Reason: statusMsg,
1159 AdditionalData: bwDetails,
1160 }
1161
Akash Sonief452f12024-12-12 18:20:28 +05301162 logger.Debugw(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
1163 GetController().ProcessFlowModResultIndication(cntx, flowResult)
Naveen Sampath04696f72022-06-13 15:19:14 +05301164}