blob: bdac105e3e37ed197b8d0baad6fcee96f1e9476e [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
vinokuma926cb3e2023-03-29 11:41:06 +053014 */
Naveen Sampath04696f72022-06-13 15:19:14 +053015
16package controller
17
18import (
19 "context"
20 "encoding/json"
21 "errors"
Tinoj Joseph429b9d92022-11-16 18:51:05 +053022 "fmt"
Naveen Sampath04696f72022-06-13 15:19:14 +053023 "strconv"
Tinoj Joseph429b9d92022-11-16 18:51:05 +053024 "strings"
Naveen Sampath04696f72022-06-13 15:19:14 +053025 "sync"
26 "time"
vinokuma926cb3e2023-03-29 11:41:06 +053027 infraerror "voltha-go-controller/internal/pkg/errorcodes"
Naveen Sampath04696f72022-06-13 15:19:14 +053028
29 "voltha-go-controller/database"
30 "voltha-go-controller/internal/pkg/holder"
31 "voltha-go-controller/internal/pkg/intf"
32 "voltha-go-controller/internal/pkg/of"
vinokuma926cb3e2023-03-29 11:41:06 +053033
Naveen Sampath04696f72022-06-13 15:19:14 +053034 //"voltha-go-controller/internal/pkg/vpagent"
35 "voltha-go-controller/internal/pkg/tasks"
36 "voltha-go-controller/internal/pkg/util"
Tinoj Joseph1d108322022-07-13 10:07:39 +053037 "voltha-go-controller/log"
vinokuma926cb3e2023-03-29 11:41:06 +053038
Naveen Sampath04696f72022-06-13 15:19:14 +053039 ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
40 "github.com/opencord/voltha-protos/v5/go/voltha"
41)
42
43// PortState type
44type PortState string
45
46const (
47 // PortStateDown constant
48 PortStateDown PortState = "DOWN"
49 // PortStateUp constant
50 PortStateUp PortState = "UP"
51 // DefaultMaxFlowQueues constant
52 DefaultMaxFlowQueues = 67
53 //ErrDuplicateFlow - indicates flow already exists in DB
Akash Sonid36d23b2023-08-18 12:51:40 +053054 ErrDuplicateFlow string = "duplicate flow"
55 //Unknown_Port_ID - indicates that the port id is unknown
56 Unknown_Port_ID = "unknown port id"
57 //Duplicate_Port - indicates the port is already exist in controller
58 Duplicate_Port = "duplicate port"
Naveen Sampath04696f72022-06-13 15:19:14 +053059)
60
61// DevicePort structure
62type DevicePort struct {
vinokuma926cb3e2023-03-29 11:41:06 +053063 Name string
64 State PortState
65 Version string
66 HwAddr string
Naveen Sampath04696f72022-06-13 15:19:14 +053067 tasks.Tasks
Tinoj Joseph429b9d92022-11-16 18:51:05 +053068 CurrSpeed uint32
69 MaxSpeed uint32
vinokuma926cb3e2023-03-29 11:41:06 +053070 ID uint32
Naveen Sampath04696f72022-06-13 15:19:14 +053071}
72
73// NewDevicePort is the constructor for DevicePort
Tinoj Joseph429b9d92022-11-16 18:51:05 +053074func NewDevicePort(mp *ofp.OfpPort) *DevicePort {
Naveen Sampath04696f72022-06-13 15:19:14 +053075 var port DevicePort
76
Tinoj Joseph429b9d92022-11-16 18:51:05 +053077 port.ID = mp.PortNo
78 port.Name = mp.Name
79
80 //port.HwAddr = strings.Trim(strings.Join(strings.Fields(fmt.Sprint("%02x", mp.HwAddr)), ":"), "[]")
81 port.HwAddr = strings.Trim(strings.ReplaceAll(fmt.Sprintf("%02x", mp.HwAddr), " ", ":"), "[]")
82 port.CurrSpeed = mp.CurrSpeed
83 port.MaxSpeed = mp.MaxSpeed
Naveen Sampath04696f72022-06-13 15:19:14 +053084 port.State = PortStateDown
85 return &port
86}
87
88// UniIDFlowQueue structure which maintains flows in queue.
89type UniIDFlowQueue struct {
90 tasks.Tasks
91 ID uint32
92}
93
94// NewUniIDFlowQueue is the constructor for UniIDFlowQueue.
95func NewUniIDFlowQueue(id uint32) *UniIDFlowQueue {
96 var flowQueue UniIDFlowQueue
97 flowQueue.ID = id
98 return &flowQueue
99}
100
101// DeviceState type
102type DeviceState string
103
104const (
105
106 // DeviceStateUNKNOWN constant
107 DeviceStateUNKNOWN DeviceState = "UNKNOWN"
108 // DeviceStateINIT constant
109 DeviceStateINIT DeviceState = "INIT"
110 // DeviceStateUP constant
111 DeviceStateUP DeviceState = "UP"
112 // DeviceStateDOWN constant
113 DeviceStateDOWN DeviceState = "DOWN"
114 // DeviceStateREBOOTED constant
115 DeviceStateREBOOTED DeviceState = "REBOOTED"
116 // DeviceStateDISABLED constant
117 DeviceStateDISABLED DeviceState = "DISABLED"
118 // DeviceStateDELETED constant
119 DeviceStateDELETED DeviceState = "DELETED"
120)
121
Akash Soni6f369452023-09-19 11:18:28 +0530122type DeviceInterface interface {
123 SetFlowHash(cntx context.Context, hash uint32)
124}
125
Naveen Sampath04696f72022-06-13 15:19:14 +0530126// Device structure
127type Device struct {
vinokuma926cb3e2023-03-29 11:41:06 +0530128 ctx context.Context
129 cancel context.CancelFunc
130 vclientHolder *holder.VolthaServiceClientHolder
131 packetOutChannel chan *ofp.PacketOut
132 PortsByName map[string]*DevicePort
133 flows map[uint64]*of.VoltSubFlow
134 PortsByID map[uint32]*DevicePort
135 meters map[uint32]*of.Meter
136 flowQueue map[uint32]*UniIDFlowQueue // key is hash ID generated and value is UniIDFlowQueue.
137 SouthBoundID string
138 MfrDesc string
139 HwDesc string
140 SwDesc string
141 ID string
142 SerialNum string
143 State DeviceState
144 TimeStamp time.Time
145 groups sync.Map //map[uint32]*of.Group -> [GroupId : Group]
Naveen Sampath04696f72022-06-13 15:19:14 +0530146 tasks.Tasks
Naveen Sampath04696f72022-06-13 15:19:14 +0530147 portLock sync.RWMutex
Naveen Sampath04696f72022-06-13 15:19:14 +0530148 flowLock sync.RWMutex
Naveen Sampath04696f72022-06-13 15:19:14 +0530149 meterLock sync.RWMutex
Naveen Sampath04696f72022-06-13 15:19:14 +0530150 flowQueueLock sync.RWMutex
151 flowHash uint32
vinokuma926cb3e2023-03-29 11:41:06 +0530152 auditInProgress bool
Naveen Sampath04696f72022-06-13 15:19:14 +0530153 deviceAuditInProgress bool
Naveen Sampath04696f72022-06-13 15:19:14 +0530154}
155
156// NewDevice is the constructor for Device
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530157func NewDevice(cntx context.Context, id string, slno string, vclientHldr *holder.VolthaServiceClientHolder, southBoundID, mfr, hwDesc, swDesc string) *Device {
Naveen Sampath04696f72022-06-13 15:19:14 +0530158 var device Device
159 device.ID = id
160 device.SerialNum = slno
161 device.State = DeviceStateDOWN
162 device.PortsByID = make(map[uint32]*DevicePort)
163 device.PortsByName = make(map[string]*DevicePort)
164 device.vclientHolder = vclientHldr
165 device.flows = make(map[uint64]*of.VoltSubFlow)
166 device.meters = make(map[uint32]*of.Meter)
167 device.flowQueue = make(map[uint32]*UniIDFlowQueue)
vinokuma926cb3e2023-03-29 11:41:06 +0530168 // Get the flowhash from db and update the flowhash variable in the device.
Naveen Sampath04696f72022-06-13 15:19:14 +0530169 device.SouthBoundID = southBoundID
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530170 device.MfrDesc = mfr
171 device.HwDesc = hwDesc
172 device.SwDesc = swDesc
173 device.TimeStamp = time.Now()
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530174 flowHash, err := db.GetFlowHash(cntx, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530175 if err != nil {
176 device.flowHash = DefaultMaxFlowQueues
177 } else {
178 var hash uint32
179 err = json.Unmarshal([]byte(flowHash), &hash)
180 if err != nil {
Akash Soni6168f312023-05-18 20:57:33 +0530181 logger.Errorw(ctx, "Failed to unmarshall flowhash", log.Fields{"data": flowHash})
Naveen Sampath04696f72022-06-13 15:19:14 +0530182 } else {
183 device.flowHash = hash
184 }
185 }
186 logger.Infow(ctx, "Flow hash for device", log.Fields{"Deviceid": id, "hash": device.flowHash})
187 return &device
188}
189
190// ResetCache to reset cache
191func (d *Device) ResetCache() {
192 logger.Warnw(ctx, "Resetting flows, meters and groups cache", log.Fields{"Device": d.ID})
193 d.flows = make(map[uint64]*of.VoltSubFlow)
194 d.meters = make(map[uint32]*of.Meter)
195 d.groups = sync.Map{}
196}
197
198// GetFlow - Get the flow from device obj
199func (d *Device) GetFlow(cookie uint64) (*of.VoltSubFlow, bool) {
200 d.flowLock.RLock()
201 defer d.flowLock.RUnlock()
Akash Soni6168f312023-05-18 20:57:33 +0530202 logger.Debugw(ctx, "Get Flow", log.Fields{"Cookie": cookie})
Naveen Sampath04696f72022-06-13 15:19:14 +0530203 flow, ok := d.flows[cookie]
204 return flow, ok
205}
206
Tinoj Josephec742f62022-09-29 19:11:10 +0530207// GetAllFlows - Get the flow from device obj
vinokuma926cb3e2023-03-29 11:41:06 +0530208func (d *Device) GetAllFlows() []*of.VoltSubFlow {
Tinoj Josephec742f62022-09-29 19:11:10 +0530209 d.flowLock.RLock()
210 defer d.flowLock.RUnlock()
211 var flows []*of.VoltSubFlow
Akash Soni6168f312023-05-18 20:57:33 +0530212 logger.Debugw(ctx, "Get All Flows", log.Fields{"deviceID": d.ID})
Tinoj Josephec742f62022-09-29 19:11:10 +0530213 for _, f := range d.flows {
214 flows = append(flows, f)
215 }
216 return flows
217}
218
219// GetAllPendingFlows - Get the flow from device obj
vinokuma926cb3e2023-03-29 11:41:06 +0530220func (d *Device) GetAllPendingFlows() []*of.VoltSubFlow {
Tinoj Josephec742f62022-09-29 19:11:10 +0530221 d.flowLock.RLock()
222 defer d.flowLock.RUnlock()
223 var flows []*of.VoltSubFlow
Akash Soni6168f312023-05-18 20:57:33 +0530224 logger.Debugw(ctx, "Get All Pending Flows", log.Fields{"deviceID": d.ID})
Tinoj Josephec742f62022-09-29 19:11:10 +0530225 for _, f := range d.flows {
226 if f.State == of.FlowAddPending {
227 flows = append(flows, f)
228 }
229 }
230 return flows
231}
232
Naveen Sampath04696f72022-06-13 15:19:14 +0530233// AddFlow - Adds the flow to the device and also to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530234func (d *Device) AddFlow(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530235 d.flowLock.Lock()
236 defer d.flowLock.Unlock()
Akash Soni6168f312023-05-18 20:57:33 +0530237 logger.Debugw(ctx, "AddFlow to device", log.Fields{"Cookie": flow.Cookie})
Naveen Sampath04696f72022-06-13 15:19:14 +0530238 if _, ok := d.flows[flow.Cookie]; ok {
239 return errors.New(ErrDuplicateFlow)
240 }
241 d.flows[flow.Cookie] = flow
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530242 d.AddFlowToDb(cntx, flow)
Naveen Sampath04696f72022-06-13 15:19:14 +0530243 return nil
244}
245
246// AddFlowToDb is the utility to add the flow to the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530247func (d *Device) AddFlowToDb(cntx context.Context, flow *of.VoltSubFlow) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530248 if b, err := json.Marshal(flow); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530249 if err = db.PutFlow(cntx, d.ID, flow.Cookie, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530250 logger.Errorw(ctx, "Write Flow to DB failed", log.Fields{"device": d.ID, "cookie": flow.Cookie, "Reason": err})
251 }
252 }
253}
254
255// DelFlow - Deletes the flow from the device and the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530256func (d *Device) DelFlow(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530257 d.flowLock.Lock()
258 defer d.flowLock.Unlock()
259 if _, ok := d.flows[flow.Cookie]; ok {
260 delete(d.flows, flow.Cookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530261 d.DelFlowFromDb(cntx, flow.Cookie)
Naveen Sampath04696f72022-06-13 15:19:14 +0530262 return nil
263 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530264 return errors.New("flow does not exist")
Naveen Sampath04696f72022-06-13 15:19:14 +0530265}
266
267// DelFlowFromDb is utility to delete the flow from the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530268func (d *Device) DelFlowFromDb(cntx context.Context, flowID uint64) {
269 _ = db.DelFlow(cntx, d.ID, flowID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530270}
271
272// IsFlowPresentWithOldCookie is to check whether there is any flow with old cookie.
273func (d *Device) IsFlowPresentWithOldCookie(flow *of.VoltSubFlow) bool {
274 d.flowLock.RLock()
275 defer d.flowLock.RUnlock()
276 if _, ok := d.flows[flow.Cookie]; ok {
277 return false
278 } else if flow.OldCookie != 0 && flow.Cookie != flow.OldCookie {
279 if _, ok := d.flows[flow.OldCookie]; ok {
Akash Soni6168f312023-05-18 20:57:33 +0530280 logger.Debugw(ctx, "Flow present with old cookie", log.Fields{"OldCookie": flow.OldCookie})
Naveen Sampath04696f72022-06-13 15:19:14 +0530281 return true
282 }
283 }
284 return false
285}
286
287// DelFlowWithOldCookie is to delete flow with old cookie.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530288func (d *Device) DelFlowWithOldCookie(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530289 d.flowLock.Lock()
290 defer d.flowLock.Unlock()
291 if _, ok := d.flows[flow.OldCookie]; ok {
Akash Soni6168f312023-05-18 20:57:33 +0530292 logger.Debugw(ctx, "Flow was added before vgc upgrade. Trying to delete with old cookie",
Naveen Sampath04696f72022-06-13 15:19:14 +0530293 log.Fields{"OldCookie": flow.OldCookie})
294 delete(d.flows, flow.OldCookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530295 d.DelFlowFromDb(cntx, flow.OldCookie)
Naveen Sampath04696f72022-06-13 15:19:14 +0530296 return nil
297 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530298 return errors.New("flow does not exist")
Naveen Sampath04696f72022-06-13 15:19:14 +0530299}
300
301// RestoreFlowsFromDb to restore flows from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530302func (d *Device) RestoreFlowsFromDb(cntx context.Context) {
303 flows, _ := db.GetFlows(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530304 for _, flow := range flows {
305 b, ok := flow.Value.([]byte)
306 if !ok {
307 logger.Warn(ctx, "The value type is not []byte")
308 continue
309 }
310 d.CreateFlowFromString(b)
311 }
312}
313
314// CreateFlowFromString to create flow from string
315func (d *Device) CreateFlowFromString(b []byte) {
316 var flow of.VoltSubFlow
317 if err := json.Unmarshal(b, &flow); err == nil {
318 if _, ok := d.flows[flow.Cookie]; !ok {
319 logger.Debugw(ctx, "Adding Flow From Db", log.Fields{"Cookie": flow.Cookie})
320 d.flows[flow.Cookie] = &flow
321 } else {
322 logger.Warnw(ctx, "Duplicate Flow", log.Fields{"Cookie": flow.Cookie})
323 }
324 } else {
325 logger.Warn(ctx, "Unmarshal failed")
326 }
327}
328
329// ----------------------------------------------------------
330// Database related functionality
331// Group operations at the device which include update and delete
332
333// UpdateGroupEntry - Adds/Updates the group to the device and also to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530334func (d *Device) UpdateGroupEntry(cntx context.Context, group *of.Group) {
Akash Soni6168f312023-05-18 20:57:33 +0530335 logger.Debugw(ctx, "Update Group to device", log.Fields{"ID": group.GroupID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530336 d.groups.Store(group.GroupID, group)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530337 d.AddGroupToDb(cntx, group)
Naveen Sampath04696f72022-06-13 15:19:14 +0530338}
339
340// AddGroupToDb - Utility to add the group to the device DB
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530341func (d *Device) AddGroupToDb(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530342 if b, err := json.Marshal(group); err == nil {
Akash Soni6168f312023-05-18 20:57:33 +0530343 logger.Debugw(ctx, "Adding Group to DB", log.Fields{"grp": group, "Json": string(b)})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530344 if err = db.PutGroup(cntx, d.ID, group.GroupID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530345 logger.Errorw(ctx, "Write Group to DB failed", log.Fields{"device": d.ID, "groupID": group.GroupID, "Reason": err})
346 }
347 }
348}
349
350// DelGroupEntry - Deletes the group from the device and the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530351func (d *Device) DelGroupEntry(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530352 if _, ok := d.groups.Load(group.GroupID); ok {
353 d.groups.Delete(group.GroupID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530354 d.DelGroupFromDb(cntx, group.GroupID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530355 }
356}
357
358// DelGroupFromDb - Utility to delete the Group from the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530359func (d *Device) DelGroupFromDb(cntx context.Context, groupID uint32) {
360 _ = db.DelGroup(cntx, d.ID, groupID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530361}
362
vinokuma926cb3e2023-03-29 11:41:06 +0530363// RestoreGroupsFromDb - restores all groups from DB
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530364func (d *Device) RestoreGroupsFromDb(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530365 logger.Info(ctx, "Restoring Groups")
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530366 groups, _ := db.GetGroups(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530367 for _, group := range groups {
368 b, ok := group.Value.([]byte)
369 if !ok {
370 logger.Warn(ctx, "The value type is not []byte")
371 continue
372 }
373 d.CreateGroupFromString(b)
374 }
375}
376
vinokuma926cb3e2023-03-29 11:41:06 +0530377// CreateGroupFromString - Forms group struct from json string
Naveen Sampath04696f72022-06-13 15:19:14 +0530378func (d *Device) CreateGroupFromString(b []byte) {
379 var group of.Group
380 if err := json.Unmarshal(b, &group); err == nil {
381 if _, ok := d.groups.Load(group.GroupID); !ok {
382 logger.Debugw(ctx, "Adding Group From Db", log.Fields{"GroupId": group.GroupID})
383 d.groups.Store(group.GroupID, &group)
384 } else {
385 logger.Warnw(ctx, "Duplicate Group", log.Fields{"GroupId": group.GroupID})
386 }
387 } else {
388 logger.Warn(ctx, "Unmarshal failed")
389 }
390}
391
392// AddMeter to add meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530393func (d *Device) AddMeter(cntx context.Context, meter *of.Meter) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530394 d.meterLock.Lock()
395 defer d.meterLock.Unlock()
396 if _, ok := d.meters[meter.ID]; ok {
Akash Sonid36d23b2023-08-18 12:51:40 +0530397 return errors.New("duplicate meter")
Naveen Sampath04696f72022-06-13 15:19:14 +0530398 }
399 d.meters[meter.ID] = meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530400 go d.AddMeterToDb(cntx, meter)
Naveen Sampath04696f72022-06-13 15:19:14 +0530401 return nil
402}
403
Sridhar Ravindra2d2ef4e2023-02-08 16:43:38 +0530404// UpdateMeter to update meter
405func (d *Device) UpdateMeter(cntx context.Context, meter *of.Meter) error {
vinokuma926cb3e2023-03-29 11:41:06 +0530406 d.meterLock.Lock()
407 defer d.meterLock.Unlock()
408 if _, ok := d.meters[meter.ID]; ok {
409 d.meters[meter.ID] = meter
410 d.AddMeterToDb(cntx, meter)
411 } else {
Akash Sonid36d23b2023-08-18 12:51:40 +0530412 return errors.New("meter not found for updation")
vinokuma926cb3e2023-03-29 11:41:06 +0530413 }
414 return nil
Sridhar Ravindra2d2ef4e2023-02-08 16:43:38 +0530415}
416
Naveen Sampath04696f72022-06-13 15:19:14 +0530417// GetMeter to get meter
418func (d *Device) GetMeter(id uint32) (*of.Meter, error) {
419 d.meterLock.RLock()
420 defer d.meterLock.RUnlock()
421 if m, ok := d.meters[id]; ok {
422 return m, nil
423 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530424 return nil, errors.New("meter not found")
Naveen Sampath04696f72022-06-13 15:19:14 +0530425}
426
427// DelMeter to delete meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530428func (d *Device) DelMeter(cntx context.Context, meter *of.Meter) bool {
Naveen Sampath04696f72022-06-13 15:19:14 +0530429 d.meterLock.Lock()
430 defer d.meterLock.Unlock()
431 if _, ok := d.meters[meter.ID]; ok {
432 delete(d.meters, meter.ID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530433 go d.DelMeterFromDb(cntx, meter.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530434 return true
435 }
436 return false
437}
438
439// AddMeterToDb is utility to add the Group to the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530440func (d *Device) AddMeterToDb(cntx context.Context, meter *of.Meter) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530441 if b, err := json.Marshal(meter); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530442 if err = db.PutDeviceMeter(cntx, d.ID, meter.ID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530443 logger.Errorw(ctx, "Write Meter to DB failed", log.Fields{"device": d.ID, "meterID": meter.ID, "Reason": err})
444 }
445 }
446}
447
448// DelMeterFromDb to delete meter from db
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530449func (d *Device) DelMeterFromDb(cntx context.Context, id uint32) {
450 _ = db.DelDeviceMeter(cntx, d.ID, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530451}
452
453// RestoreMetersFromDb to restore meters from db
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530454func (d *Device) RestoreMetersFromDb(cntx context.Context) {
455 meters, _ := db.GetDeviceMeters(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530456 for _, meter := range meters {
457 b, ok := meter.Value.([]byte)
458 if !ok {
459 logger.Warn(ctx, "The value type is not []byte")
460 continue
461 }
462 d.CreateMeterFromString(b)
463 }
464}
465
466// CreateMeterFromString to create meter from string
467func (d *Device) CreateMeterFromString(b []byte) {
468 var meter of.Meter
469 if err := json.Unmarshal(b, &meter); err == nil {
470 if _, ok := d.meters[meter.ID]; !ok {
471 logger.Debugw(ctx, "Adding Meter From Db", log.Fields{"ID": meter.ID})
472 d.meters[meter.ID] = &meter
473 } else {
474 logger.Warnw(ctx, "Duplicate Meter", log.Fields{"ID": meter.ID})
475 }
476 } else {
Akash Soni6168f312023-05-18 20:57:33 +0530477 logger.Warnw(ctx, "Unmarshal failed", log.Fields{"error": err, "meter": string(b)})
Naveen Sampath04696f72022-06-13 15:19:14 +0530478 }
479}
480
481// VolthaClient to get voltha client
482func (d *Device) VolthaClient() voltha.VolthaServiceClient {
483 return d.vclientHolder.Get()
484}
485
486// AddPort to add the port as requested by the device/VOLTHA
487// Inform the application if the port is successfully added
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530488func (d *Device) AddPort(cntx context.Context, mp *ofp.OfpPort) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530489 d.portLock.Lock()
490 defer d.portLock.Unlock()
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530491 id := mp.PortNo
492 name := mp.Name
Naveen Sampath04696f72022-06-13 15:19:14 +0530493 if _, ok := d.PortsByID[id]; ok {
Akash Sonid36d23b2023-08-18 12:51:40 +0530494 return errors.New(Duplicate_Port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530495 }
496 if _, ok := d.PortsByName[name]; ok {
Akash Sonid36d23b2023-08-18 12:51:40 +0530497 return errors.New(Duplicate_Port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530498 }
499
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530500 p := NewDevicePort(mp)
Naveen Sampath04696f72022-06-13 15:19:14 +0530501 d.PortsByID[id] = p
502 d.PortsByName[name] = p
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530503 d.WritePortToDb(cntx, p)
504 GetController().PortAddInd(cntx, d.ID, p.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530505 logger.Infow(ctx, "Added Port", log.Fields{"Device": d.ID, "Port": id})
506 return nil
507}
508
509// DelPort to delete the port as requested by the device/VOLTHA
510// Inform the application if the port is successfully deleted
Sridhar Ravindra0bc5dc52023-12-13 19:03:30 +0530511func (d *Device) DelPort(cntx context.Context, id uint32, portName string) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530512 p := d.GetPortByID(id)
513 if p == nil {
Sridhar Ravindra0bc5dc52023-12-13 19:03:30 +0530514 p = d.GetPortByName(portName)
515 if p == nil {
516 return errors.New("unknown port")
517 } else {
518 logger.Infow(ctx, "Found port by name", log.Fields{"PortName": p.Name, "PortID": p.ID})
519 }
Naveen Sampath04696f72022-06-13 15:19:14 +0530520 }
521 if p.State == PortStateUp {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530522 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530523 }
Tinoj Joseph4ead4e02023-01-30 03:12:44 +0530524 GetController().PortDelInd(cntx, d.ID, p.Name)
525
Naveen Sampath04696f72022-06-13 15:19:14 +0530526 d.portLock.Lock()
527 defer d.portLock.Unlock()
528
Naveen Sampath04696f72022-06-13 15:19:14 +0530529 delete(d.PortsByID, p.ID)
530 delete(d.PortsByName, p.Name)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530531 d.DelPortFromDb(cntx, p.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530532 logger.Infow(ctx, "Deleted Port", log.Fields{"Device": d.ID, "Port": id})
533 return nil
534}
535
536// UpdatePortByName is utility to update the port by Name
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530537func (d *Device) UpdatePortByName(cntx context.Context, name string, port uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530538 d.portLock.Lock()
539 defer d.portLock.Unlock()
540
541 p, ok := d.PortsByName[name]
542 if !ok {
543 return
544 }
545 delete(d.PortsByID, p.ID)
546 p.ID = port
547 d.PortsByID[port] = p
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530548 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530549 GetController().PortUpdateInd(d.ID, p.Name, p.ID)
550 logger.Infow(ctx, "Updated Port", log.Fields{"Device": d.ID, "Port": p.ID, "PortName": name})
551}
552
553// GetPortName to get the name of the port by its id
554func (d *Device) GetPortName(id uint32) (string, error) {
555 d.portLock.RLock()
556 defer d.portLock.RUnlock()
557
558 if p, ok := d.PortsByID[id]; ok {
559 return p.Name, nil
560 }
561 logger.Errorw(ctx, "Port not found", log.Fields{"port": id})
Akash Sonid36d23b2023-08-18 12:51:40 +0530562 return "", errors.New(Unknown_Port_ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530563}
564
565// GetPortByID is utility to retrieve the port by ID
566func (d *Device) GetPortByID(id uint32) *DevicePort {
567 d.portLock.RLock()
568 defer d.portLock.RUnlock()
569
570 p, ok := d.PortsByID[id]
571 if ok {
572 return p
573 }
574 return nil
575}
576
577// GetPortByName is utility to retrieve the port by Name
578func (d *Device) GetPortByName(name string) *DevicePort {
579 d.portLock.RLock()
580 defer d.portLock.RUnlock()
581
582 p, ok := d.PortsByName[name]
583 if ok {
584 return p
585 }
586 return nil
587}
588
589// GetPortState to get the state of the port by name
590func (d *Device) GetPortState(name string) (PortState, error) {
591 d.portLock.RLock()
592 defer d.portLock.RUnlock()
593
594 if p, ok := d.PortsByName[name]; ok {
595 return p.State, nil
596 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530597 return PortStateDown, errors.New(Unknown_Port_ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530598}
599
600// GetPortID to get the port-id by the port name
601func (d *Device) GetPortID(name string) (uint32, error) {
602 d.portLock.RLock()
603 defer d.portLock.RUnlock()
604
605 if p, ok := d.PortsByName[name]; ok {
606 return p.ID, nil
607 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530608 return 0, errors.New(Unknown_Port_ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530609}
610
611// WritePortToDb to add the port to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530612func (d *Device) WritePortToDb(ctx context.Context, port *DevicePort) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530613 port.Version = database.PresentVersionMap[database.DevicePortPath]
614 if b, err := json.Marshal(port); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530615 if err = db.PutPort(ctx, d.ID, port.ID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530616 logger.Errorw(ctx, "Write port to DB failed", log.Fields{"device": d.ID, "port": port.ID, "Reason": err})
617 }
618 }
619}
620
621// DelPortFromDb to delete port from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530622func (d *Device) DelPortFromDb(cntx context.Context, id uint32) {
623 _ = db.DelPort(cntx, d.ID, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530624}
625
626// RestorePortsFromDb to restore ports from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530627func (d *Device) RestorePortsFromDb(cntx context.Context) {
628 ports, _ := db.GetPorts(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530629 for _, port := range ports {
630 b, ok := port.Value.([]byte)
631 if !ok {
632 logger.Warn(ctx, "The value type is not []byte")
633 continue
634 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530635 d.CreatePortFromString(cntx, b)
Naveen Sampath04696f72022-06-13 15:19:14 +0530636 }
637}
638
639// CreatePortFromString to create port from string
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530640func (d *Device) CreatePortFromString(cntx context.Context, b []byte) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530641 var port DevicePort
642 if err := json.Unmarshal(b, &port); err == nil {
643 if _, ok := d.PortsByID[port.ID]; !ok {
644 logger.Debugw(ctx, "Adding Port From Db", log.Fields{"ID": port.ID})
645 d.PortsByID[port.ID] = &port
646 d.PortsByName[port.Name] = &port
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530647 GetController().PortAddInd(cntx, d.ID, port.ID, port.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530648 } else {
Akash Sonid36d23b2023-08-18 12:51:40 +0530649 logger.Warnw(ctx, Duplicate_Port, log.Fields{"ID": port.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530650 }
651 } else {
Akash Soni6168f312023-05-18 20:57:33 +0530652 logger.Warnw(ctx, "Unmarshal failed", log.Fields{"port": string(b)})
Naveen Sampath04696f72022-06-13 15:19:14 +0530653 }
654}
655
656// Delete : OLT Delete functionality yet to be implemented. IDeally all of the
657// resources should have been removed by this time. It is an error
658// scenario if the OLT has resources associated with it.
659func (d *Device) Delete() {
660 d.StopAll()
661}
662
663// Stop to stop the task
664func (d *Device) Stop() {
665}
666
667// ConnectInd is called when the connection between VGC and the VOLTHA is
668// restored. This will perform audit of the device post reconnection
669func (d *Device) ConnectInd(ctx context.Context, discType intf.DiscoveryType) {
670 logger.Warnw(ctx, "Audit Upon Connection Establishment", log.Fields{"Device": d.ID, "State": d.State})
671 ctx1, cancel := context.WithCancel(ctx)
672 d.cancel = cancel
673 d.ctx = ctx1
674 d.Tasks.Initialize(ctx1)
675
Akash Soni6168f312023-05-18 20:57:33 +0530676 logger.Debugw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530677 d.State = DeviceStateUP
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530678 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530679 GetController().DeviceUpInd(d.ID)
680
Akash Soni6168f312023-05-18 20:57:33 +0530681 logger.Debugw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530682 t := NewAuditDevice(d, AuditEventDeviceDisc)
683 d.Tasks.AddTask(t)
684
685 t1 := NewAuditTablesTask(d)
686 d.Tasks.AddTask(t1)
687
688 t2 := NewPendingProfilesTask(d)
689 d.Tasks.AddTask(t2)
690
691 go d.synchronizeDeviceTables()
692}
693
694func (d *Device) synchronizeDeviceTables() {
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530695 tick := time.NewTicker(GetController().GetDeviceTableSyncDuration())
Naveen Sampath04696f72022-06-13 15:19:14 +0530696loop:
697 for {
698 select {
699 case <-d.ctx.Done():
vinokuma926cb3e2023-03-29 11:41:06 +0530700 logger.Warnw(d.ctx, "Context Done. Canceling Periodic Audit", log.Fields{"Context": ctx, "Device": d.ID, "DeviceSerialNum": d.SerialNum})
Naveen Sampath04696f72022-06-13 15:19:14 +0530701 break loop
702 case <-tick.C:
703 t1 := NewAuditTablesTask(d)
704 d.Tasks.AddTask(t1)
705 }
706 }
707 tick.Stop()
708}
709
710// DeviceUpInd is called when the logical device state changes to UP. This will perform audit of the device post reconnection
711func (d *Device) DeviceUpInd() {
712 logger.Warnw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
713 d.State = DeviceStateUP
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530714 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530715 GetController().DeviceUpInd(d.ID)
716
717 logger.Warnw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
718 t := NewAuditDevice(d, AuditEventDeviceDisc)
719 d.Tasks.AddTask(t)
720
721 t1 := NewAuditTablesTask(d)
722 d.Tasks.AddTask(t1)
723
724 t2 := NewPendingProfilesTask(d)
725 d.Tasks.AddTask(t2)
726}
727
728// DeviceDownInd is called when the logical device state changes to Down.
729func (d *Device) DeviceDownInd() {
730 logger.Warnw(ctx, "Device State change Ind: Down", log.Fields{"Device": d.ID})
731 d.State = DeviceStateDOWN
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530732 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530733 GetController().DeviceDownInd(d.ID)
734}
735
736// DeviceRebootInd is called when the logical device is rebooted.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530737func (d *Device) DeviceRebootInd(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530738 logger.Warnw(ctx, "Device State change Ind: Rebooted", log.Fields{"Device": d.ID})
739
740 if d.State == DeviceStateREBOOTED {
741 d.State = DeviceStateREBOOTED
742 logger.Warnw(ctx, "Ignoring Device State change Ind: REBOOT, Device Already in REBOOT state", log.Fields{"Device": d.ID, "SeralNo": d.SerialNum})
743 return
744 }
745
746 d.State = DeviceStateREBOOTED
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530747 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530748 GetController().SetRebootInProgressForDevice(d.ID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530749 GetController().DeviceRebootInd(cntx, d.ID, d.SerialNum, d.SouthBoundID)
750 d.ReSetAllPortStates(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +0530751}
752
753// DeviceDisabledInd is called when the logical device is disabled
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530754func (d *Device) DeviceDisabledInd(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530755 logger.Warnw(ctx, "Device State change Ind: Disabled", log.Fields{"Device": d.ID})
756 d.State = DeviceStateDISABLED
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530757 d.TimeStamp = time.Now()
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530758 GetController().DeviceDisableInd(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530759}
760
vinokuma926cb3e2023-03-29 11:41:06 +0530761// ReSetAllPortStates - Set all logical device port status to DOWN
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530762func (d *Device) ReSetAllPortStates(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530763 logger.Warnw(ctx, "Resetting all Ports State to DOWN", log.Fields{"Device": d.ID, "State": d.State})
764
765 d.portLock.Lock()
766 defer d.portLock.Unlock()
767
768 for _, port := range d.PortsByID {
769 if port.State != PortStateDown {
770 logger.Infow(ctx, "Resetting Port State to DOWN", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530771 GetController().PortDownInd(cntx, d.ID, port.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530772 port.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530773 d.WritePortToDb(cntx, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530774 }
775 }
776}
777
vinokuma926cb3e2023-03-29 11:41:06 +0530778// ReSetAllPortStatesInDb - Set all logical device port status to DOWN in DB and skip indication to application
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530779func (d *Device) ReSetAllPortStatesInDb(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530780 logger.Warnw(ctx, "Resetting all Ports State to DOWN In DB", log.Fields{"Device": d.ID, "State": d.State})
781
782 d.portLock.Lock()
783 defer d.portLock.Unlock()
784
785 for _, port := range d.PortsByID {
786 if port.State != PortStateDown {
Akash Soni6168f312023-05-18 20:57:33 +0530787 logger.Debugw(ctx, "Resetting Port State to DOWN and Write to DB", log.Fields{"Device": d.ID, "Port": port})
Naveen Sampath04696f72022-06-13 15:19:14 +0530788 port.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530789 d.WritePortToDb(cntx, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530790 }
791 }
792}
793
794// ProcessPortUpdate deals with the change in port id (ONU movement) and taking action
795// to update only when the port state is DOWN
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530796func (d *Device) ProcessPortUpdate(cntx context.Context, portName string, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530797 if p := d.GetPortByName(portName); p != nil {
798 if p.ID != port {
799 logger.Infow(ctx, "Port ID update indication", log.Fields{"Port": p.Name, "Old PortID": p.ID, "New Port ID": port})
800 if p.State != PortStateDown {
801 logger.Errorw(ctx, "Port ID update failed. Port State UP", log.Fields{"Port": p})
802 return
803 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530804 d.UpdatePortByName(cntx, portName, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530805 logger.Errorw(ctx, "Port ID Updated", log.Fields{"Port": p})
806 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530807 d.ProcessPortState(cntx, port, state)
Naveen Sampath04696f72022-06-13 15:19:14 +0530808 }
809}
810
811// ***Operations Performed on Port state Transitions***
812//
813// |-----------------------------------------------------------------------------|
814// | State | Action |
815// |--------------------|--------------------------------------------------------|
816// | UP | UNI - Trigger Flow addition for service configured |
817// | | NNI - Trigger Flow addition for vnets & mvlan profiles |
818// | | |
819// | DOWN | UNI - Trigger Flow deletion for service configured |
820// | | NNI - Trigger Flow deletion for vnets & mvlan profiles |
821// | | |
822// |-----------------------------------------------------------------------------|
823//
824
825// ProcessPortState deals with the change in port status and taking action
826// based on the new state and the old state
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530827func (d *Device) ProcessPortState(cntx context.Context, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530828 if d.State != DeviceStateUP && !util.IsNniPort(port) {
829 logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
830 return
831 }
832 if p := d.GetPortByID(port); p != nil {
833 logger.Infow(ctx, "Port State Processing", log.Fields{"Received": state, "Current": p.State})
834
835 // Avoid blind initialization as the current tasks in the queue will be lost
836 // Eg: Service Del followed by Port Down - The flows will be dangling
837 // Eg: NNI Down followed by NNI UP - Mcast data flows will be dangling
838 p.Tasks.CheckAndInitialize(d.ctx)
839 if state == uint32(ofp.OfpPortState_OFPPS_LIVE) && p.State == PortStateDown {
840 // Transition from DOWN to UP
841 logger.Infow(ctx, "Port State Change to UP", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530842 GetController().PortUpInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530843 p.State = PortStateUp
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530844 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530845 } else if (state != uint32(ofp.OfpPortState_OFPPS_LIVE)) && (p.State != PortStateDown) {
846 // Transition from UP to Down
847 logger.Infow(ctx, "Port State Change to Down", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530848 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530849 p.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530850 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530851 } else {
852 logger.Warnw(ctx, "Dropping Port Ind: No Change in Port State", log.Fields{"PortName": p.Name, "ID": port, "Device": d.ID, "PortState": p.State, "IncomingState": state})
853 }
854 }
855}
856
857// ProcessPortStateAfterReboot - triggers the port state indication to sort out configu mismatch due to reboot
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530858func (d *Device) ProcessPortStateAfterReboot(cntx context.Context, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530859 if d.State != DeviceStateUP && !util.IsNniPort(port) {
860 logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
861 return
862 }
863 if p := d.GetPortByID(port); p != nil {
864 logger.Infow(ctx, "Port State Processing after Reboot", log.Fields{"Received": state, "Current": p.State})
865 p.Tasks.Initialize(d.ctx)
866 if p.State == PortStateUp {
867 logger.Infow(ctx, "Port State: UP", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530868 GetController().PortUpInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530869 } else if p.State == PortStateDown {
870 logger.Infow(ctx, "Port State: Down", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530871 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530872 }
873 }
874}
875
876// ChangeEvent : Change event brings in ports related changes such as addition/deletion
877// or modification where the port status change up/down is indicated to the
878// controller
879func (d *Device) ChangeEvent(event *ofp.ChangeEvent) error {
880 cet := NewChangeEventTask(d.ctx, event, d)
881 d.AddTask(cet)
882 return nil
883}
884
885// PacketIn handle the incoming packet-in and deliver to the application for the
886// actual processing
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530887func (d *Device) PacketIn(cntx context.Context, pkt *ofp.PacketIn) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530888 logger.Debugw(ctx, "Received a Packet-In", log.Fields{"Device": d.ID})
889 if pkt.PacketIn.Reason != ofp.OfpPacketInReason_OFPR_ACTION {
890 logger.Warnw(ctx, "Unsupported PacketIn Reason", log.Fields{"Reason": pkt.PacketIn.Reason})
891 return
892 }
893 data := pkt.PacketIn.Data
894 port := PacketInGetPort(pkt.PacketIn)
895 if pName, err := d.GetPortName(port); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530896 GetController().PacketInInd(cntx, d.ID, pName, data)
Naveen Sampath04696f72022-06-13 15:19:14 +0530897 } else {
898 logger.Warnw(ctx, "Unknown Port", log.Fields{"Reason": err.Error()})
899 }
900}
901
902// PacketInGetPort to get the port on which the packet-in is reported
903func PacketInGetPort(pkt *ofp.OfpPacketIn) uint32 {
904 for _, field := range pkt.Match.OxmFields {
905 if field.OxmClass == ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
906 if ofbField, ok := field.Field.(*ofp.OfpOxmField_OfbField); ok {
907 if ofbField.OfbField.Type == ofp.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT {
908 if port, ok := ofbField.OfbField.Value.(*ofp.OfpOxmOfbField_Port); ok {
909 return port.Port
910 }
911 }
912 }
913 }
914 }
915 return 0
916}
917
918// PacketOutReq receives the packet out request from the application via the
919// controller. The interface from the application uses name as the identity.
920func (d *Device) PacketOutReq(outport string, inport string, data []byte, isCustomPkt bool) error {
921 inp, err := d.GetPortID(inport)
922 if err != nil {
Akash Sonid36d23b2023-08-18 12:51:40 +0530923 return errors.New("unknown inport")
Naveen Sampath04696f72022-06-13 15:19:14 +0530924 }
925 outp, err1 := d.GetPortID(outport)
926 if err1 != nil {
Akash Sonid36d23b2023-08-18 12:51:40 +0530927 return errors.New("unknown outport")
Naveen Sampath04696f72022-06-13 15:19:14 +0530928 }
929 logger.Debugw(ctx, "Sending packet out", log.Fields{"Device": d.ID, "Inport": inport, "Outport": outport})
930 return d.SendPacketOut(outp, inp, data, isCustomPkt)
931}
932
933// SendPacketOut is responsible for building the OF structure and send the
934// packet-out to the VOLTHA
935func (d *Device) SendPacketOut(outport uint32, inport uint32, data []byte, isCustomPkt bool) error {
936 pout := &ofp.PacketOut{}
937 pout.Id = d.ID
938 opout := &ofp.OfpPacketOut{}
939 pout.PacketOut = opout
940 opout.InPort = inport
941 opout.Data = data
942 opout.Actions = []*ofp.OfpAction{
943 {
944 Type: ofp.OfpActionType_OFPAT_OUTPUT,
945 Action: &ofp.OfpAction_Output{
946 Output: &ofp.OfpActionOutput{
947 Port: outport,
948 MaxLen: 65535,
949 },
950 },
951 },
952 }
953 d.packetOutChannel <- pout
954 return nil
955}
956
957// UpdateFlows receives the flows in the form that is implemented
958// in the VGC and transforms them to the OF format. This is handled
959// as a port of the task that is enqueued to do the same.
960func (d *Device) UpdateFlows(flow *of.VoltFlow, devPort *DevicePort) {
961 t := NewAddFlowsTask(d.ctx, flow, d)
962 logger.Debugw(ctx, "Port Context", log.Fields{"Ctx": devPort.GetContext()})
963 // check if port isNni , if yes flows will be added to device port queues.
964 if util.IsNniPort(devPort.ID) {
965 // Adding the flows to device port queues.
966 devPort.AddTask(t)
967 return
968 }
969 // If the flowHash is enabled then add the flows to the flowhash generated queues.
970 flowQueue := d.getAndAddFlowQueueForUniID(uint32(devPort.ID))
971 if flowQueue != nil {
972 logger.Debugw(ctx, "flowHashQId", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID})
973 flowQueue.AddTask(t)
974 logger.Debugw(ctx, "Tasks Info", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID, "Total": flowQueue.TotalTasks(), "Pending": flowQueue.NumPendingTasks()})
975 } else {
976 //FlowThrotling disabled, add to the device port queue
977 devPort.AddTask(t)
978 return
979 }
980}
981
982// UpdateGroup to update group info
983func (d *Device) UpdateGroup(group *of.Group, devPort *DevicePort) {
984 task := NewModGroupTask(d.ctx, group, d)
985 logger.Debugw(ctx, "NNI Port Context", log.Fields{"Ctx": devPort.GetContext()})
986 devPort.AddTask(task)
987}
988
989// ModMeter for mod meter task
990func (d *Device) ModMeter(command of.MeterCommand, meter *of.Meter, devPort *DevicePort) {
991 if command == of.MeterCommandAdd {
992 if _, err := d.GetMeter(meter.ID); err == nil {
993 logger.Debugw(ctx, "Meter already added", log.Fields{"ID": meter.ID})
994 return
995 }
996 }
997 t := NewModMeterTask(d.ctx, command, meter, d)
998 devPort.AddTask(t)
999}
1000
1001func (d *Device) getAndAddFlowQueueForUniID(id uint32) *UniIDFlowQueue {
1002 d.flowQueueLock.RLock()
vinokuma926cb3e2023-03-29 11:41:06 +05301003 // If flowhash is 0 that means flowhash throttling is disabled, return nil
Naveen Sampath04696f72022-06-13 15:19:14 +05301004 if d.flowHash == 0 {
1005 d.flowQueueLock.RUnlock()
1006 return nil
1007 }
1008 flowHashID := id % uint32(d.flowHash)
1009 if value, found := d.flowQueue[uint32(flowHashID)]; found {
1010 d.flowQueueLock.RUnlock()
1011 return value
1012 }
1013 d.flowQueueLock.RUnlock()
1014 logger.Debugw(ctx, "Flow queue not found creating one", log.Fields{"uniid": id, "hash": flowHashID})
1015
1016 return d.addFlowQueueForUniID(id)
1017}
1018
1019func (d *Device) addFlowQueueForUniID(id uint32) *UniIDFlowQueue {
Naveen Sampath04696f72022-06-13 15:19:14 +05301020 d.flowQueueLock.Lock()
1021 defer d.flowQueueLock.Unlock()
1022 flowHashID := id % uint32(d.flowHash)
1023 flowQueue := NewUniIDFlowQueue(uint32(flowHashID))
1024 flowQueue.Tasks.Initialize(d.ctx)
1025 d.flowQueue[flowHashID] = flowQueue
1026 return flowQueue
1027}
1028
1029// SetFlowHash sets the device flow hash and writes to the DB.
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301030func (d *Device) SetFlowHash(cntx context.Context, hash uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301031 d.flowQueueLock.Lock()
1032 defer d.flowQueueLock.Unlock()
1033
1034 d.flowHash = hash
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301035 d.writeFlowHashToDB(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +05301036}
1037
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301038func (d *Device) writeFlowHashToDB(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301039 hash, err := json.Marshal(d.flowHash)
1040 if err != nil {
Akash Soni6168f312023-05-18 20:57:33 +05301041 logger.Errorw(ctx, "failed to marshal flow hash", log.Fields{"hash": d.flowHash, "error": err})
Naveen Sampath04696f72022-06-13 15:19:14 +05301042 return
1043 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301044 if err := db.PutFlowHash(cntx, d.ID, string(hash)); err != nil {
Akash Soni6168f312023-05-18 20:57:33 +05301045 logger.Errorw(ctx, "Failed to add flow hash to DB", log.Fields{"device": d.ID, "hash": d.flowHash, "error": err})
Naveen Sampath04696f72022-06-13 15:19:14 +05301046 }
1047}
1048
vinokuma926cb3e2023-03-29 11:41:06 +05301049// isSBOperAllowed - determines if the SB operation is allowed based on device state & force flag
Naveen Sampath04696f72022-06-13 15:19:14 +05301050func (d *Device) isSBOperAllowed(forceAction bool) bool {
Naveen Sampath04696f72022-06-13 15:19:14 +05301051 if d.State == DeviceStateUP {
1052 return true
1053 }
1054
1055 if d.State == DeviceStateDISABLED && forceAction {
1056 return true
1057 }
1058
1059 return false
1060}
1061
Sridhar Ravindra3ec14232024-01-01 19:11:48 +05301062func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error, sendFlowNotif bool) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301063 flow, _ := d.GetFlow(cookie)
Sridhar Ravindra3ec14232024-01-01 19:11:48 +05301064 d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err, sendFlowNotif)
Naveen Sampath04696f72022-06-13 15:19:14 +05301065}
1066
Sridhar Ravindra3ec14232024-01-01 19:11:48 +05301067func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error, sendFlowNotif bool) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301068 statusCode, statusMsg := infraerror.GetErrorInfo(err)
1069 success := isFlowOperSuccess(statusCode, oper)
1070
1071 updateFlow := func(cookie uint64, state int, reason string) {
1072 if dbFlow, ok := d.GetFlow(cookie); ok {
1073 dbFlow.State = uint8(state)
1074 dbFlow.ErrorReason = reason
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301075 d.AddFlowToDb(cntx, dbFlow)
Naveen Sampath04696f72022-06-13 15:19:14 +05301076 }
1077 }
1078
vinokuma926cb3e2023-03-29 11:41:06 +05301079 // Update flow results
Naveen Sampath04696f72022-06-13 15:19:14 +05301080 // Add - Update Success or Failure status with reason
1081 // Del - Delete entry from DB on success else update error reason
1082 if oper == of.CommandAdd {
1083 state := of.FlowAddSuccess
1084 reason := ""
1085 if !success {
1086 state = of.FlowAddFailure
1087 reason = statusMsg
1088 }
1089 updateFlow(cookie, state, reason)
1090 logger.Debugw(ctx, "Updated Flow to DB", log.Fields{"Cookie": cookie, "State": state})
1091 } else {
1092 if success && flow != nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301093 if err := d.DelFlow(cntx, flow); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301094 logger.Warnw(ctx, "Delete Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
1095 }
1096 } else if !success {
1097 updateFlow(cookie, of.FlowDelFailure, statusMsg)
1098 }
1099 }
1100
1101 flowResult := intf.FlowStatus{
1102 Cookie: strconv.FormatUint(cookie, 10),
1103 Device: d.ID,
1104 FlowModType: oper,
1105 Flow: flow,
1106 Status: statusCode,
1107 Reason: statusMsg,
1108 AdditionalData: bwDetails,
1109 }
1110
Sridhar Ravindra3ec14232024-01-01 19:11:48 +05301111 if sendFlowNotif {
1112 logger.Debugw(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
1113 GetController().ProcessFlowModResultIndication(cntx, flowResult)
1114 }
Naveen Sampath04696f72022-06-13 15:19:14 +05301115}