blob: 193c3fb54be802315096719202d49b3666ef09b4 [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
vinokuma926cb3e2023-03-29 11:41:06 +053014 */
Naveen Sampath04696f72022-06-13 15:19:14 +053015
16package controller
17
18import (
19 "context"
20 "encoding/json"
21 "errors"
Tinoj Joseph429b9d92022-11-16 18:51:05 +053022 "fmt"
Naveen Sampath04696f72022-06-13 15:19:14 +053023 "strconv"
Tinoj Joseph429b9d92022-11-16 18:51:05 +053024 "strings"
Naveen Sampath04696f72022-06-13 15:19:14 +053025 "sync"
26 "time"
vinokuma926cb3e2023-03-29 11:41:06 +053027 infraerror "voltha-go-controller/internal/pkg/errorcodes"
Naveen Sampath04696f72022-06-13 15:19:14 +053028
29 "voltha-go-controller/database"
30 "voltha-go-controller/internal/pkg/holder"
31 "voltha-go-controller/internal/pkg/intf"
32 "voltha-go-controller/internal/pkg/of"
vinokuma926cb3e2023-03-29 11:41:06 +053033
Naveen Sampath04696f72022-06-13 15:19:14 +053034 //"voltha-go-controller/internal/pkg/vpagent"
35 "voltha-go-controller/internal/pkg/tasks"
36 "voltha-go-controller/internal/pkg/util"
Tinoj Joseph1d108322022-07-13 10:07:39 +053037 "voltha-go-controller/log"
vinokuma926cb3e2023-03-29 11:41:06 +053038
Naveen Sampath04696f72022-06-13 15:19:14 +053039 ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
40 "github.com/opencord/voltha-protos/v5/go/voltha"
41)
42
43// PortState type
44type PortState string
45
46const (
47 // PortStateDown constant
48 PortStateDown PortState = "DOWN"
49 // PortStateUp constant
50 PortStateUp PortState = "UP"
51 // DefaultMaxFlowQueues constant
52 DefaultMaxFlowQueues = 67
53 //ErrDuplicateFlow - indicates flow already exists in DB
Akash Sonid36d23b2023-08-18 12:51:40 +053054 ErrDuplicateFlow string = "duplicate flow"
55 //Unknown_Port_ID - indicates that the port id is unknown
56 Unknown_Port_ID = "unknown port id"
57 //Duplicate_Port - indicates the port is already exist in controller
58 Duplicate_Port = "duplicate port"
Naveen Sampath04696f72022-06-13 15:19:14 +053059)
60
61// DevicePort structure
62type DevicePort struct {
vinokuma926cb3e2023-03-29 11:41:06 +053063 Name string
64 State PortState
65 Version string
66 HwAddr string
Naveen Sampath04696f72022-06-13 15:19:14 +053067 tasks.Tasks
Tinoj Joseph429b9d92022-11-16 18:51:05 +053068 CurrSpeed uint32
69 MaxSpeed uint32
vinokuma926cb3e2023-03-29 11:41:06 +053070 ID uint32
Naveen Sampath04696f72022-06-13 15:19:14 +053071}
72
73// NewDevicePort is the constructor for DevicePort
Tinoj Joseph429b9d92022-11-16 18:51:05 +053074func NewDevicePort(mp *ofp.OfpPort) *DevicePort {
Naveen Sampath04696f72022-06-13 15:19:14 +053075 var port DevicePort
76
Tinoj Joseph429b9d92022-11-16 18:51:05 +053077 port.ID = mp.PortNo
78 port.Name = mp.Name
79
80 //port.HwAddr = strings.Trim(strings.Join(strings.Fields(fmt.Sprint("%02x", mp.HwAddr)), ":"), "[]")
81 port.HwAddr = strings.Trim(strings.ReplaceAll(fmt.Sprintf("%02x", mp.HwAddr), " ", ":"), "[]")
82 port.CurrSpeed = mp.CurrSpeed
83 port.MaxSpeed = mp.MaxSpeed
Naveen Sampath04696f72022-06-13 15:19:14 +053084 port.State = PortStateDown
85 return &port
86}
87
88// UniIDFlowQueue structure which maintains flows in queue.
89type UniIDFlowQueue struct {
90 tasks.Tasks
91 ID uint32
92}
93
94// NewUniIDFlowQueue is the constructor for UniIDFlowQueue.
95func NewUniIDFlowQueue(id uint32) *UniIDFlowQueue {
96 var flowQueue UniIDFlowQueue
97 flowQueue.ID = id
98 return &flowQueue
99}
100
101// DeviceState type
102type DeviceState string
103
104const (
105
106 // DeviceStateUNKNOWN constant
107 DeviceStateUNKNOWN DeviceState = "UNKNOWN"
108 // DeviceStateINIT constant
109 DeviceStateINIT DeviceState = "INIT"
110 // DeviceStateUP constant
111 DeviceStateUP DeviceState = "UP"
112 // DeviceStateDOWN constant
113 DeviceStateDOWN DeviceState = "DOWN"
114 // DeviceStateREBOOTED constant
115 DeviceStateREBOOTED DeviceState = "REBOOTED"
116 // DeviceStateDISABLED constant
117 DeviceStateDISABLED DeviceState = "DISABLED"
118 // DeviceStateDELETED constant
119 DeviceStateDELETED DeviceState = "DELETED"
120)
121
Akash Soni6f369452023-09-19 11:18:28 +0530122type DeviceInterface interface {
123 SetFlowHash(cntx context.Context, hash uint32)
124}
125
Naveen Sampath04696f72022-06-13 15:19:14 +0530126// Device structure
127type Device struct {
vinokuma926cb3e2023-03-29 11:41:06 +0530128 ctx context.Context
129 cancel context.CancelFunc
130 vclientHolder *holder.VolthaServiceClientHolder
131 packetOutChannel chan *ofp.PacketOut
132 PortsByName map[string]*DevicePort
133 flows map[uint64]*of.VoltSubFlow
134 PortsByID map[uint32]*DevicePort
135 meters map[uint32]*of.Meter
136 flowQueue map[uint32]*UniIDFlowQueue // key is hash ID generated and value is UniIDFlowQueue.
137 SouthBoundID string
138 MfrDesc string
139 HwDesc string
140 SwDesc string
141 ID string
142 SerialNum string
143 State DeviceState
144 TimeStamp time.Time
145 groups sync.Map //map[uint32]*of.Group -> [GroupId : Group]
Naveen Sampath04696f72022-06-13 15:19:14 +0530146 tasks.Tasks
Naveen Sampath04696f72022-06-13 15:19:14 +0530147 portLock sync.RWMutex
Naveen Sampath04696f72022-06-13 15:19:14 +0530148 flowLock sync.RWMutex
Naveen Sampath04696f72022-06-13 15:19:14 +0530149 meterLock sync.RWMutex
Naveen Sampath04696f72022-06-13 15:19:14 +0530150 flowQueueLock sync.RWMutex
151 flowHash uint32
vinokuma926cb3e2023-03-29 11:41:06 +0530152 auditInProgress bool
Naveen Sampath04696f72022-06-13 15:19:14 +0530153 deviceAuditInProgress bool
Naveen Sampath04696f72022-06-13 15:19:14 +0530154}
155
156// NewDevice is the constructor for Device
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530157func NewDevice(cntx context.Context, id string, slno string, vclientHldr *holder.VolthaServiceClientHolder, southBoundID, mfr, hwDesc, swDesc string) *Device {
Naveen Sampath04696f72022-06-13 15:19:14 +0530158 var device Device
159 device.ID = id
160 device.SerialNum = slno
161 device.State = DeviceStateDOWN
162 device.PortsByID = make(map[uint32]*DevicePort)
163 device.PortsByName = make(map[string]*DevicePort)
164 device.vclientHolder = vclientHldr
165 device.flows = make(map[uint64]*of.VoltSubFlow)
166 device.meters = make(map[uint32]*of.Meter)
167 device.flowQueue = make(map[uint32]*UniIDFlowQueue)
vinokuma926cb3e2023-03-29 11:41:06 +0530168 // Get the flowhash from db and update the flowhash variable in the device.
Naveen Sampath04696f72022-06-13 15:19:14 +0530169 device.SouthBoundID = southBoundID
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530170 device.MfrDesc = mfr
171 device.HwDesc = hwDesc
172 device.SwDesc = swDesc
173 device.TimeStamp = time.Now()
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530174 flowHash, err := db.GetFlowHash(cntx, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530175 if err != nil {
176 device.flowHash = DefaultMaxFlowQueues
177 } else {
178 var hash uint32
179 err = json.Unmarshal([]byte(flowHash), &hash)
180 if err != nil {
Akash Soni6168f312023-05-18 20:57:33 +0530181 logger.Errorw(ctx, "Failed to unmarshall flowhash", log.Fields{"data": flowHash})
Naveen Sampath04696f72022-06-13 15:19:14 +0530182 } else {
183 device.flowHash = hash
184 }
185 }
186 logger.Infow(ctx, "Flow hash for device", log.Fields{"Deviceid": id, "hash": device.flowHash})
187 return &device
188}
189
190// ResetCache to reset cache
191func (d *Device) ResetCache() {
192 logger.Warnw(ctx, "Resetting flows, meters and groups cache", log.Fields{"Device": d.ID})
193 d.flows = make(map[uint64]*of.VoltSubFlow)
194 d.meters = make(map[uint32]*of.Meter)
195 d.groups = sync.Map{}
196}
197
198// GetFlow - Get the flow from device obj
199func (d *Device) GetFlow(cookie uint64) (*of.VoltSubFlow, bool) {
200 d.flowLock.RLock()
201 defer d.flowLock.RUnlock()
Akash Soni6168f312023-05-18 20:57:33 +0530202 logger.Debugw(ctx, "Get Flow", log.Fields{"Cookie": cookie})
Naveen Sampath04696f72022-06-13 15:19:14 +0530203 flow, ok := d.flows[cookie]
204 return flow, ok
205}
206
Tinoj Josephec742f62022-09-29 19:11:10 +0530207// GetAllFlows - Get the flow from device obj
vinokuma926cb3e2023-03-29 11:41:06 +0530208func (d *Device) GetAllFlows() []*of.VoltSubFlow {
Tinoj Josephec742f62022-09-29 19:11:10 +0530209 d.flowLock.RLock()
210 defer d.flowLock.RUnlock()
211 var flows []*of.VoltSubFlow
Akash Soni6168f312023-05-18 20:57:33 +0530212 logger.Debugw(ctx, "Get All Flows", log.Fields{"deviceID": d.ID})
Tinoj Josephec742f62022-09-29 19:11:10 +0530213 for _, f := range d.flows {
214 flows = append(flows, f)
215 }
216 return flows
217}
218
219// GetAllPendingFlows - Get the flow from device obj
vinokuma926cb3e2023-03-29 11:41:06 +0530220func (d *Device) GetAllPendingFlows() []*of.VoltSubFlow {
Tinoj Josephec742f62022-09-29 19:11:10 +0530221 d.flowLock.RLock()
222 defer d.flowLock.RUnlock()
223 var flows []*of.VoltSubFlow
Akash Soni6168f312023-05-18 20:57:33 +0530224 logger.Debugw(ctx, "Get All Pending Flows", log.Fields{"deviceID": d.ID})
Tinoj Josephec742f62022-09-29 19:11:10 +0530225 for _, f := range d.flows {
226 if f.State == of.FlowAddPending {
227 flows = append(flows, f)
228 }
229 }
230 return flows
231}
232
Naveen Sampath04696f72022-06-13 15:19:14 +0530233// AddFlow - Adds the flow to the device and also to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530234func (d *Device) AddFlow(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530235 d.flowLock.Lock()
236 defer d.flowLock.Unlock()
Akash Soni6168f312023-05-18 20:57:33 +0530237 logger.Debugw(ctx, "AddFlow to device", log.Fields{"Cookie": flow.Cookie})
Naveen Sampath04696f72022-06-13 15:19:14 +0530238 if _, ok := d.flows[flow.Cookie]; ok {
239 return errors.New(ErrDuplicateFlow)
240 }
241 d.flows[flow.Cookie] = flow
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530242 d.AddFlowToDb(cntx, flow)
Naveen Sampath04696f72022-06-13 15:19:14 +0530243 return nil
244}
245
246// AddFlowToDb is the utility to add the flow to the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530247func (d *Device) AddFlowToDb(cntx context.Context, flow *of.VoltSubFlow) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530248 if b, err := json.Marshal(flow); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530249 if err = db.PutFlow(cntx, d.ID, flow.Cookie, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530250 logger.Errorw(ctx, "Write Flow to DB failed", log.Fields{"device": d.ID, "cookie": flow.Cookie, "Reason": err})
251 }
252 }
253}
254
255// DelFlow - Deletes the flow from the device and the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530256func (d *Device) DelFlow(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530257 d.flowLock.Lock()
258 defer d.flowLock.Unlock()
259 if _, ok := d.flows[flow.Cookie]; ok {
260 delete(d.flows, flow.Cookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530261 d.DelFlowFromDb(cntx, flow.Cookie)
Naveen Sampath04696f72022-06-13 15:19:14 +0530262 return nil
263 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530264 return errors.New("flow does not exist")
Naveen Sampath04696f72022-06-13 15:19:14 +0530265}
266
267// DelFlowFromDb is utility to delete the flow from the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530268func (d *Device) DelFlowFromDb(cntx context.Context, flowID uint64) {
269 _ = db.DelFlow(cntx, d.ID, flowID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530270}
271
272// IsFlowPresentWithOldCookie is to check whether there is any flow with old cookie.
273func (d *Device) IsFlowPresentWithOldCookie(flow *of.VoltSubFlow) bool {
274 d.flowLock.RLock()
275 defer d.flowLock.RUnlock()
276 if _, ok := d.flows[flow.Cookie]; ok {
277 return false
278 } else if flow.OldCookie != 0 && flow.Cookie != flow.OldCookie {
279 if _, ok := d.flows[flow.OldCookie]; ok {
Akash Soni6168f312023-05-18 20:57:33 +0530280 logger.Debugw(ctx, "Flow present with old cookie", log.Fields{"OldCookie": flow.OldCookie})
Naveen Sampath04696f72022-06-13 15:19:14 +0530281 return true
282 }
283 }
284 return false
285}
286
287// DelFlowWithOldCookie is to delete flow with old cookie.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530288func (d *Device) DelFlowWithOldCookie(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530289 d.flowLock.Lock()
290 defer d.flowLock.Unlock()
291 if _, ok := d.flows[flow.OldCookie]; ok {
Akash Soni6168f312023-05-18 20:57:33 +0530292 logger.Debugw(ctx, "Flow was added before vgc upgrade. Trying to delete with old cookie",
Naveen Sampath04696f72022-06-13 15:19:14 +0530293 log.Fields{"OldCookie": flow.OldCookie})
294 delete(d.flows, flow.OldCookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530295 d.DelFlowFromDb(cntx, flow.OldCookie)
Naveen Sampath04696f72022-06-13 15:19:14 +0530296 return nil
297 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530298 return errors.New("flow does not exist")
Naveen Sampath04696f72022-06-13 15:19:14 +0530299}
300
301// RestoreFlowsFromDb to restore flows from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530302func (d *Device) RestoreFlowsFromDb(cntx context.Context) {
303 flows, _ := db.GetFlows(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530304 for _, flow := range flows {
305 b, ok := flow.Value.([]byte)
306 if !ok {
307 logger.Warn(ctx, "The value type is not []byte")
308 continue
309 }
310 d.CreateFlowFromString(b)
311 }
312}
313
314// CreateFlowFromString to create flow from string
315func (d *Device) CreateFlowFromString(b []byte) {
316 var flow of.VoltSubFlow
317 if err := json.Unmarshal(b, &flow); err == nil {
318 if _, ok := d.flows[flow.Cookie]; !ok {
319 logger.Debugw(ctx, "Adding Flow From Db", log.Fields{"Cookie": flow.Cookie})
320 d.flows[flow.Cookie] = &flow
321 } else {
322 logger.Warnw(ctx, "Duplicate Flow", log.Fields{"Cookie": flow.Cookie})
323 }
324 } else {
325 logger.Warn(ctx, "Unmarshal failed")
326 }
327}
328
329// ----------------------------------------------------------
330// Database related functionality
331// Group operations at the device which include update and delete
332
333// UpdateGroupEntry - Adds/Updates the group to the device and also to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530334func (d *Device) UpdateGroupEntry(cntx context.Context, group *of.Group) {
Akash Soni6168f312023-05-18 20:57:33 +0530335 logger.Debugw(ctx, "Update Group to device", log.Fields{"ID": group.GroupID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530336 d.groups.Store(group.GroupID, group)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530337 d.AddGroupToDb(cntx, group)
Naveen Sampath04696f72022-06-13 15:19:14 +0530338}
339
340// AddGroupToDb - Utility to add the group to the device DB
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530341func (d *Device) AddGroupToDb(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530342 if b, err := json.Marshal(group); err == nil {
Akash Soni6168f312023-05-18 20:57:33 +0530343 logger.Debugw(ctx, "Adding Group to DB", log.Fields{"grp": group, "Json": string(b)})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530344 if err = db.PutGroup(cntx, d.ID, group.GroupID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530345 logger.Errorw(ctx, "Write Group to DB failed", log.Fields{"device": d.ID, "groupID": group.GroupID, "Reason": err})
346 }
347 }
348}
349
350// DelGroupEntry - Deletes the group from the device and the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530351func (d *Device) DelGroupEntry(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530352 if _, ok := d.groups.Load(group.GroupID); ok {
353 d.groups.Delete(group.GroupID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530354 d.DelGroupFromDb(cntx, group.GroupID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530355 }
356}
357
358// DelGroupFromDb - Utility to delete the Group from the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530359func (d *Device) DelGroupFromDb(cntx context.Context, groupID uint32) {
360 _ = db.DelGroup(cntx, d.ID, groupID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530361}
362
vinokuma926cb3e2023-03-29 11:41:06 +0530363// RestoreGroupsFromDb - restores all groups from DB
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530364func (d *Device) RestoreGroupsFromDb(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530365 logger.Info(ctx, "Restoring Groups")
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530366 groups, _ := db.GetGroups(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530367 for _, group := range groups {
368 b, ok := group.Value.([]byte)
369 if !ok {
370 logger.Warn(ctx, "The value type is not []byte")
371 continue
372 }
373 d.CreateGroupFromString(b)
374 }
375}
376
vinokuma926cb3e2023-03-29 11:41:06 +0530377// CreateGroupFromString - Forms group struct from json string
Naveen Sampath04696f72022-06-13 15:19:14 +0530378func (d *Device) CreateGroupFromString(b []byte) {
379 var group of.Group
380 if err := json.Unmarshal(b, &group); err == nil {
381 if _, ok := d.groups.Load(group.GroupID); !ok {
382 logger.Debugw(ctx, "Adding Group From Db", log.Fields{"GroupId": group.GroupID})
383 d.groups.Store(group.GroupID, &group)
384 } else {
385 logger.Warnw(ctx, "Duplicate Group", log.Fields{"GroupId": group.GroupID})
386 }
387 } else {
388 logger.Warn(ctx, "Unmarshal failed")
389 }
390}
391
392// AddMeter to add meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530393func (d *Device) AddMeter(cntx context.Context, meter *of.Meter) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530394 d.meterLock.Lock()
395 defer d.meterLock.Unlock()
396 if _, ok := d.meters[meter.ID]; ok {
Akash Sonid36d23b2023-08-18 12:51:40 +0530397 return errors.New("duplicate meter")
Naveen Sampath04696f72022-06-13 15:19:14 +0530398 }
399 d.meters[meter.ID] = meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530400 go d.AddMeterToDb(cntx, meter)
Naveen Sampath04696f72022-06-13 15:19:14 +0530401 return nil
402}
403
Sridhar Ravindra2d2ef4e2023-02-08 16:43:38 +0530404// UpdateMeter to update meter
405func (d *Device) UpdateMeter(cntx context.Context, meter *of.Meter) error {
vinokuma926cb3e2023-03-29 11:41:06 +0530406 d.meterLock.Lock()
407 defer d.meterLock.Unlock()
408 if _, ok := d.meters[meter.ID]; ok {
409 d.meters[meter.ID] = meter
410 d.AddMeterToDb(cntx, meter)
411 } else {
Akash Sonid36d23b2023-08-18 12:51:40 +0530412 return errors.New("meter not found for updation")
vinokuma926cb3e2023-03-29 11:41:06 +0530413 }
414 return nil
Sridhar Ravindra2d2ef4e2023-02-08 16:43:38 +0530415}
416
Naveen Sampath04696f72022-06-13 15:19:14 +0530417// GetMeter to get meter
418func (d *Device) GetMeter(id uint32) (*of.Meter, error) {
419 d.meterLock.RLock()
420 defer d.meterLock.RUnlock()
421 if m, ok := d.meters[id]; ok {
422 return m, nil
423 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530424 return nil, errors.New("meter not found")
Naveen Sampath04696f72022-06-13 15:19:14 +0530425}
426
427// DelMeter to delete meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530428func (d *Device) DelMeter(cntx context.Context, meter *of.Meter) bool {
Naveen Sampath04696f72022-06-13 15:19:14 +0530429 d.meterLock.Lock()
430 defer d.meterLock.Unlock()
431 if _, ok := d.meters[meter.ID]; ok {
432 delete(d.meters, meter.ID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530433 go d.DelMeterFromDb(cntx, meter.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530434 return true
435 }
436 return false
437}
438
439// AddMeterToDb is utility to add the Group to the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530440func (d *Device) AddMeterToDb(cntx context.Context, meter *of.Meter) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530441 if b, err := json.Marshal(meter); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530442 if err = db.PutDeviceMeter(cntx, d.ID, meter.ID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530443 logger.Errorw(ctx, "Write Meter to DB failed", log.Fields{"device": d.ID, "meterID": meter.ID, "Reason": err})
444 }
445 }
446}
447
448// DelMeterFromDb to delete meter from db
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530449func (d *Device) DelMeterFromDb(cntx context.Context, id uint32) {
450 _ = db.DelDeviceMeter(cntx, d.ID, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530451}
452
453// RestoreMetersFromDb to restore meters from db
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530454func (d *Device) RestoreMetersFromDb(cntx context.Context) {
455 meters, _ := db.GetDeviceMeters(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530456 for _, meter := range meters {
457 b, ok := meter.Value.([]byte)
458 if !ok {
459 logger.Warn(ctx, "The value type is not []byte")
460 continue
461 }
462 d.CreateMeterFromString(b)
463 }
464}
465
466// CreateMeterFromString to create meter from string
467func (d *Device) CreateMeterFromString(b []byte) {
468 var meter of.Meter
469 if err := json.Unmarshal(b, &meter); err == nil {
470 if _, ok := d.meters[meter.ID]; !ok {
471 logger.Debugw(ctx, "Adding Meter From Db", log.Fields{"ID": meter.ID})
472 d.meters[meter.ID] = &meter
473 } else {
474 logger.Warnw(ctx, "Duplicate Meter", log.Fields{"ID": meter.ID})
475 }
476 } else {
Akash Soni6168f312023-05-18 20:57:33 +0530477 logger.Warnw(ctx, "Unmarshal failed", log.Fields{"error": err, "meter": string(b)})
Naveen Sampath04696f72022-06-13 15:19:14 +0530478 }
479}
480
481// VolthaClient to get voltha client
482func (d *Device) VolthaClient() voltha.VolthaServiceClient {
483 return d.vclientHolder.Get()
484}
485
486// AddPort to add the port as requested by the device/VOLTHA
487// Inform the application if the port is successfully added
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530488func (d *Device) AddPort(cntx context.Context, mp *ofp.OfpPort) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530489 d.portLock.Lock()
490 defer d.portLock.Unlock()
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530491 id := mp.PortNo
492 name := mp.Name
Naveen Sampath04696f72022-06-13 15:19:14 +0530493 if _, ok := d.PortsByID[id]; ok {
Akash Sonid36d23b2023-08-18 12:51:40 +0530494 return errors.New(Duplicate_Port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530495 }
496 if _, ok := d.PortsByName[name]; ok {
Akash Sonid36d23b2023-08-18 12:51:40 +0530497 return errors.New(Duplicate_Port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530498 }
499
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530500 p := NewDevicePort(mp)
Naveen Sampath04696f72022-06-13 15:19:14 +0530501 d.PortsByID[id] = p
502 d.PortsByName[name] = p
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530503 d.WritePortToDb(cntx, p)
504 GetController().PortAddInd(cntx, d.ID, p.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530505 logger.Infow(ctx, "Added Port", log.Fields{"Device": d.ID, "Port": id})
506 return nil
507}
508
509// DelPort to delete the port as requested by the device/VOLTHA
510// Inform the application if the port is successfully deleted
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530511func (d *Device) DelPort(cntx context.Context, id uint32) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530512 p := d.GetPortByID(id)
513 if p == nil {
Akash Sonid36d23b2023-08-18 12:51:40 +0530514 return errors.New("unknown port")
Naveen Sampath04696f72022-06-13 15:19:14 +0530515 }
516 if p.State == PortStateUp {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530517 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530518 }
Tinoj Joseph4ead4e02023-01-30 03:12:44 +0530519 GetController().PortDelInd(cntx, d.ID, p.Name)
520
Naveen Sampath04696f72022-06-13 15:19:14 +0530521 d.portLock.Lock()
522 defer d.portLock.Unlock()
523
Naveen Sampath04696f72022-06-13 15:19:14 +0530524 delete(d.PortsByID, p.ID)
525 delete(d.PortsByName, p.Name)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530526 d.DelPortFromDb(cntx, p.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530527 logger.Infow(ctx, "Deleted Port", log.Fields{"Device": d.ID, "Port": id})
528 return nil
529}
530
531// UpdatePortByName is utility to update the port by Name
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530532func (d *Device) UpdatePortByName(cntx context.Context, name string, port uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530533 d.portLock.Lock()
534 defer d.portLock.Unlock()
535
536 p, ok := d.PortsByName[name]
537 if !ok {
538 return
539 }
540 delete(d.PortsByID, p.ID)
541 p.ID = port
542 d.PortsByID[port] = p
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530543 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530544 GetController().PortUpdateInd(d.ID, p.Name, p.ID)
545 logger.Infow(ctx, "Updated Port", log.Fields{"Device": d.ID, "Port": p.ID, "PortName": name})
546}
547
548// GetPortName to get the name of the port by its id
549func (d *Device) GetPortName(id uint32) (string, error) {
550 d.portLock.RLock()
551 defer d.portLock.RUnlock()
552
553 if p, ok := d.PortsByID[id]; ok {
554 return p.Name, nil
555 }
556 logger.Errorw(ctx, "Port not found", log.Fields{"port": id})
Akash Sonid36d23b2023-08-18 12:51:40 +0530557 return "", errors.New(Unknown_Port_ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530558}
559
560// GetPortByID is utility to retrieve the port by ID
561func (d *Device) GetPortByID(id uint32) *DevicePort {
562 d.portLock.RLock()
563 defer d.portLock.RUnlock()
564
565 p, ok := d.PortsByID[id]
566 if ok {
567 return p
568 }
569 return nil
570}
571
572// GetPortByName is utility to retrieve the port by Name
573func (d *Device) GetPortByName(name string) *DevicePort {
574 d.portLock.RLock()
575 defer d.portLock.RUnlock()
576
577 p, ok := d.PortsByName[name]
578 if ok {
579 return p
580 }
581 return nil
582}
583
584// GetPortState to get the state of the port by name
585func (d *Device) GetPortState(name string) (PortState, error) {
586 d.portLock.RLock()
587 defer d.portLock.RUnlock()
588
589 if p, ok := d.PortsByName[name]; ok {
590 return p.State, nil
591 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530592 return PortStateDown, errors.New(Unknown_Port_ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530593}
594
595// GetPortID to get the port-id by the port name
596func (d *Device) GetPortID(name string) (uint32, error) {
597 d.portLock.RLock()
598 defer d.portLock.RUnlock()
599
600 if p, ok := d.PortsByName[name]; ok {
601 return p.ID, nil
602 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530603 return 0, errors.New(Unknown_Port_ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530604}
605
606// WritePortToDb to add the port to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530607func (d *Device) WritePortToDb(ctx context.Context, port *DevicePort) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530608 port.Version = database.PresentVersionMap[database.DevicePortPath]
609 if b, err := json.Marshal(port); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530610 if err = db.PutPort(ctx, d.ID, port.ID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530611 logger.Errorw(ctx, "Write port to DB failed", log.Fields{"device": d.ID, "port": port.ID, "Reason": err})
612 }
613 }
614}
615
616// DelPortFromDb to delete port from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530617func (d *Device) DelPortFromDb(cntx context.Context, id uint32) {
618 _ = db.DelPort(cntx, d.ID, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530619}
620
621// RestorePortsFromDb to restore ports from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530622func (d *Device) RestorePortsFromDb(cntx context.Context) {
623 ports, _ := db.GetPorts(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530624 for _, port := range ports {
625 b, ok := port.Value.([]byte)
626 if !ok {
627 logger.Warn(ctx, "The value type is not []byte")
628 continue
629 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530630 d.CreatePortFromString(cntx, b)
Naveen Sampath04696f72022-06-13 15:19:14 +0530631 }
632}
633
634// CreatePortFromString to create port from string
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530635func (d *Device) CreatePortFromString(cntx context.Context, b []byte) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530636 var port DevicePort
637 if err := json.Unmarshal(b, &port); err == nil {
638 if _, ok := d.PortsByID[port.ID]; !ok {
639 logger.Debugw(ctx, "Adding Port From Db", log.Fields{"ID": port.ID})
640 d.PortsByID[port.ID] = &port
641 d.PortsByName[port.Name] = &port
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530642 GetController().PortAddInd(cntx, d.ID, port.ID, port.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530643 } else {
Akash Sonid36d23b2023-08-18 12:51:40 +0530644 logger.Warnw(ctx, Duplicate_Port, log.Fields{"ID": port.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530645 }
646 } else {
Akash Soni6168f312023-05-18 20:57:33 +0530647 logger.Warnw(ctx, "Unmarshal failed", log.Fields{"port": string(b)})
Naveen Sampath04696f72022-06-13 15:19:14 +0530648 }
649}
650
651// Delete : OLT Delete functionality yet to be implemented. IDeally all of the
652// resources should have been removed by this time. It is an error
653// scenario if the OLT has resources associated with it.
654func (d *Device) Delete() {
655 d.StopAll()
656}
657
658// Stop to stop the task
659func (d *Device) Stop() {
660}
661
662// ConnectInd is called when the connection between VGC and the VOLTHA is
663// restored. This will perform audit of the device post reconnection
664func (d *Device) ConnectInd(ctx context.Context, discType intf.DiscoveryType) {
665 logger.Warnw(ctx, "Audit Upon Connection Establishment", log.Fields{"Device": d.ID, "State": d.State})
666 ctx1, cancel := context.WithCancel(ctx)
667 d.cancel = cancel
668 d.ctx = ctx1
669 d.Tasks.Initialize(ctx1)
670
Akash Soni6168f312023-05-18 20:57:33 +0530671 logger.Debugw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530672 d.State = DeviceStateUP
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530673 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530674 GetController().DeviceUpInd(d.ID)
675
Akash Soni6168f312023-05-18 20:57:33 +0530676 logger.Debugw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530677 t := NewAuditDevice(d, AuditEventDeviceDisc)
678 d.Tasks.AddTask(t)
679
680 t1 := NewAuditTablesTask(d)
681 d.Tasks.AddTask(t1)
682
683 t2 := NewPendingProfilesTask(d)
684 d.Tasks.AddTask(t2)
685
686 go d.synchronizeDeviceTables()
687}
688
689func (d *Device) synchronizeDeviceTables() {
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530690 tick := time.NewTicker(GetController().GetDeviceTableSyncDuration())
Naveen Sampath04696f72022-06-13 15:19:14 +0530691loop:
692 for {
693 select {
694 case <-d.ctx.Done():
vinokuma926cb3e2023-03-29 11:41:06 +0530695 logger.Warnw(d.ctx, "Context Done. Canceling Periodic Audit", log.Fields{"Context": ctx, "Device": d.ID, "DeviceSerialNum": d.SerialNum})
Naveen Sampath04696f72022-06-13 15:19:14 +0530696 break loop
697 case <-tick.C:
698 t1 := NewAuditTablesTask(d)
699 d.Tasks.AddTask(t1)
700 }
701 }
702 tick.Stop()
703}
704
705// DeviceUpInd is called when the logical device state changes to UP. This will perform audit of the device post reconnection
706func (d *Device) DeviceUpInd() {
707 logger.Warnw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
708 d.State = DeviceStateUP
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530709 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530710 GetController().DeviceUpInd(d.ID)
711
712 logger.Warnw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
713 t := NewAuditDevice(d, AuditEventDeviceDisc)
714 d.Tasks.AddTask(t)
715
716 t1 := NewAuditTablesTask(d)
717 d.Tasks.AddTask(t1)
718
719 t2 := NewPendingProfilesTask(d)
720 d.Tasks.AddTask(t2)
721}
722
723// DeviceDownInd is called when the logical device state changes to Down.
724func (d *Device) DeviceDownInd() {
725 logger.Warnw(ctx, "Device State change Ind: Down", log.Fields{"Device": d.ID})
726 d.State = DeviceStateDOWN
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530727 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530728 GetController().DeviceDownInd(d.ID)
729}
730
731// DeviceRebootInd is called when the logical device is rebooted.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530732func (d *Device) DeviceRebootInd(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530733 logger.Warnw(ctx, "Device State change Ind: Rebooted", log.Fields{"Device": d.ID})
734
735 if d.State == DeviceStateREBOOTED {
736 d.State = DeviceStateREBOOTED
737 logger.Warnw(ctx, "Ignoring Device State change Ind: REBOOT, Device Already in REBOOT state", log.Fields{"Device": d.ID, "SeralNo": d.SerialNum})
738 return
739 }
740
741 d.State = DeviceStateREBOOTED
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530742 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530743 GetController().SetRebootInProgressForDevice(d.ID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530744 GetController().DeviceRebootInd(cntx, d.ID, d.SerialNum, d.SouthBoundID)
745 d.ReSetAllPortStates(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +0530746}
747
748// DeviceDisabledInd is called when the logical device is disabled
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530749func (d *Device) DeviceDisabledInd(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530750 logger.Warnw(ctx, "Device State change Ind: Disabled", log.Fields{"Device": d.ID})
751 d.State = DeviceStateDISABLED
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530752 d.TimeStamp = time.Now()
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530753 GetController().DeviceDisableInd(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530754}
755
vinokuma926cb3e2023-03-29 11:41:06 +0530756// ReSetAllPortStates - Set all logical device port status to DOWN
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530757func (d *Device) ReSetAllPortStates(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530758 logger.Warnw(ctx, "Resetting all Ports State to DOWN", log.Fields{"Device": d.ID, "State": d.State})
759
760 d.portLock.Lock()
761 defer d.portLock.Unlock()
762
763 for _, port := range d.PortsByID {
764 if port.State != PortStateDown {
765 logger.Infow(ctx, "Resetting Port State to DOWN", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530766 GetController().PortDownInd(cntx, d.ID, port.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530767 port.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530768 d.WritePortToDb(cntx, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530769 }
770 }
771}
772
vinokuma926cb3e2023-03-29 11:41:06 +0530773// ReSetAllPortStatesInDb - Set all logical device port status to DOWN in DB and skip indication to application
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530774func (d *Device) ReSetAllPortStatesInDb(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530775 logger.Warnw(ctx, "Resetting all Ports State to DOWN In DB", log.Fields{"Device": d.ID, "State": d.State})
776
777 d.portLock.Lock()
778 defer d.portLock.Unlock()
779
780 for _, port := range d.PortsByID {
781 if port.State != PortStateDown {
Akash Soni6168f312023-05-18 20:57:33 +0530782 logger.Debugw(ctx, "Resetting Port State to DOWN and Write to DB", log.Fields{"Device": d.ID, "Port": port})
Naveen Sampath04696f72022-06-13 15:19:14 +0530783 port.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530784 d.WritePortToDb(cntx, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530785 }
786 }
787}
788
789// ProcessPortUpdate deals with the change in port id (ONU movement) and taking action
790// to update only when the port state is DOWN
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530791func (d *Device) ProcessPortUpdate(cntx context.Context, portName string, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530792 if p := d.GetPortByName(portName); p != nil {
793 if p.ID != port {
794 logger.Infow(ctx, "Port ID update indication", log.Fields{"Port": p.Name, "Old PortID": p.ID, "New Port ID": port})
795 if p.State != PortStateDown {
796 logger.Errorw(ctx, "Port ID update failed. Port State UP", log.Fields{"Port": p})
797 return
798 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530799 d.UpdatePortByName(cntx, portName, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530800 logger.Errorw(ctx, "Port ID Updated", log.Fields{"Port": p})
801 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530802 d.ProcessPortState(cntx, port, state)
Naveen Sampath04696f72022-06-13 15:19:14 +0530803 }
804}
805
806// ***Operations Performed on Port state Transitions***
807//
808// |-----------------------------------------------------------------------------|
809// | State | Action |
810// |--------------------|--------------------------------------------------------|
811// | UP | UNI - Trigger Flow addition for service configured |
812// | | NNI - Trigger Flow addition for vnets & mvlan profiles |
813// | | |
814// | DOWN | UNI - Trigger Flow deletion for service configured |
815// | | NNI - Trigger Flow deletion for vnets & mvlan profiles |
816// | | |
817// |-----------------------------------------------------------------------------|
818//
819
820// ProcessPortState deals with the change in port status and taking action
821// based on the new state and the old state
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530822func (d *Device) ProcessPortState(cntx context.Context, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530823 if d.State != DeviceStateUP && !util.IsNniPort(port) {
824 logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
825 return
826 }
827 if p := d.GetPortByID(port); p != nil {
828 logger.Infow(ctx, "Port State Processing", log.Fields{"Received": state, "Current": p.State})
829
830 // Avoid blind initialization as the current tasks in the queue will be lost
831 // Eg: Service Del followed by Port Down - The flows will be dangling
832 // Eg: NNI Down followed by NNI UP - Mcast data flows will be dangling
833 p.Tasks.CheckAndInitialize(d.ctx)
834 if state == uint32(ofp.OfpPortState_OFPPS_LIVE) && p.State == PortStateDown {
835 // Transition from DOWN to UP
836 logger.Infow(ctx, "Port State Change to UP", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530837 GetController().PortUpInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530838 p.State = PortStateUp
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530839 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530840 } else if (state != uint32(ofp.OfpPortState_OFPPS_LIVE)) && (p.State != PortStateDown) {
841 // Transition from UP to Down
842 logger.Infow(ctx, "Port State Change to Down", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530843 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530844 p.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530845 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530846 } else {
847 logger.Warnw(ctx, "Dropping Port Ind: No Change in Port State", log.Fields{"PortName": p.Name, "ID": port, "Device": d.ID, "PortState": p.State, "IncomingState": state})
848 }
849 }
850}
851
852// ProcessPortStateAfterReboot - triggers the port state indication to sort out configu mismatch due to reboot
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530853func (d *Device) ProcessPortStateAfterReboot(cntx context.Context, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530854 if d.State != DeviceStateUP && !util.IsNniPort(port) {
855 logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
856 return
857 }
858 if p := d.GetPortByID(port); p != nil {
859 logger.Infow(ctx, "Port State Processing after Reboot", log.Fields{"Received": state, "Current": p.State})
860 p.Tasks.Initialize(d.ctx)
861 if p.State == PortStateUp {
862 logger.Infow(ctx, "Port State: UP", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530863 GetController().PortUpInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530864 } else if p.State == PortStateDown {
865 logger.Infow(ctx, "Port State: Down", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530866 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530867 }
868 }
869}
870
871// ChangeEvent : Change event brings in ports related changes such as addition/deletion
872// or modification where the port status change up/down is indicated to the
873// controller
874func (d *Device) ChangeEvent(event *ofp.ChangeEvent) error {
875 cet := NewChangeEventTask(d.ctx, event, d)
876 d.AddTask(cet)
877 return nil
878}
879
880// PacketIn handle the incoming packet-in and deliver to the application for the
881// actual processing
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530882func (d *Device) PacketIn(cntx context.Context, pkt *ofp.PacketIn) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530883 logger.Debugw(ctx, "Received a Packet-In", log.Fields{"Device": d.ID})
884 if pkt.PacketIn.Reason != ofp.OfpPacketInReason_OFPR_ACTION {
885 logger.Warnw(ctx, "Unsupported PacketIn Reason", log.Fields{"Reason": pkt.PacketIn.Reason})
886 return
887 }
888 data := pkt.PacketIn.Data
889 port := PacketInGetPort(pkt.PacketIn)
890 if pName, err := d.GetPortName(port); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530891 GetController().PacketInInd(cntx, d.ID, pName, data)
Naveen Sampath04696f72022-06-13 15:19:14 +0530892 } else {
893 logger.Warnw(ctx, "Unknown Port", log.Fields{"Reason": err.Error()})
894 }
895}
896
897// PacketInGetPort to get the port on which the packet-in is reported
898func PacketInGetPort(pkt *ofp.OfpPacketIn) uint32 {
899 for _, field := range pkt.Match.OxmFields {
900 if field.OxmClass == ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
901 if ofbField, ok := field.Field.(*ofp.OfpOxmField_OfbField); ok {
902 if ofbField.OfbField.Type == ofp.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT {
903 if port, ok := ofbField.OfbField.Value.(*ofp.OfpOxmOfbField_Port); ok {
904 return port.Port
905 }
906 }
907 }
908 }
909 }
910 return 0
911}
912
913// PacketOutReq receives the packet out request from the application via the
914// controller. The interface from the application uses name as the identity.
915func (d *Device) PacketOutReq(outport string, inport string, data []byte, isCustomPkt bool) error {
916 inp, err := d.GetPortID(inport)
917 if err != nil {
Akash Sonid36d23b2023-08-18 12:51:40 +0530918 return errors.New("unknown inport")
Naveen Sampath04696f72022-06-13 15:19:14 +0530919 }
920 outp, err1 := d.GetPortID(outport)
921 if err1 != nil {
Akash Sonid36d23b2023-08-18 12:51:40 +0530922 return errors.New("unknown outport")
Naveen Sampath04696f72022-06-13 15:19:14 +0530923 }
924 logger.Debugw(ctx, "Sending packet out", log.Fields{"Device": d.ID, "Inport": inport, "Outport": outport})
925 return d.SendPacketOut(outp, inp, data, isCustomPkt)
926}
927
928// SendPacketOut is responsible for building the OF structure and send the
929// packet-out to the VOLTHA
930func (d *Device) SendPacketOut(outport uint32, inport uint32, data []byte, isCustomPkt bool) error {
931 pout := &ofp.PacketOut{}
932 pout.Id = d.ID
933 opout := &ofp.OfpPacketOut{}
934 pout.PacketOut = opout
935 opout.InPort = inport
936 opout.Data = data
937 opout.Actions = []*ofp.OfpAction{
938 {
939 Type: ofp.OfpActionType_OFPAT_OUTPUT,
940 Action: &ofp.OfpAction_Output{
941 Output: &ofp.OfpActionOutput{
942 Port: outport,
943 MaxLen: 65535,
944 },
945 },
946 },
947 }
948 d.packetOutChannel <- pout
949 return nil
950}
951
952// UpdateFlows receives the flows in the form that is implemented
953// in the VGC and transforms them to the OF format. This is handled
954// as a port of the task that is enqueued to do the same.
955func (d *Device) UpdateFlows(flow *of.VoltFlow, devPort *DevicePort) {
956 t := NewAddFlowsTask(d.ctx, flow, d)
957 logger.Debugw(ctx, "Port Context", log.Fields{"Ctx": devPort.GetContext()})
958 // check if port isNni , if yes flows will be added to device port queues.
959 if util.IsNniPort(devPort.ID) {
960 // Adding the flows to device port queues.
961 devPort.AddTask(t)
962 return
963 }
964 // If the flowHash is enabled then add the flows to the flowhash generated queues.
965 flowQueue := d.getAndAddFlowQueueForUniID(uint32(devPort.ID))
966 if flowQueue != nil {
967 logger.Debugw(ctx, "flowHashQId", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID})
968 flowQueue.AddTask(t)
969 logger.Debugw(ctx, "Tasks Info", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID, "Total": flowQueue.TotalTasks(), "Pending": flowQueue.NumPendingTasks()})
970 } else {
971 //FlowThrotling disabled, add to the device port queue
972 devPort.AddTask(t)
973 return
974 }
975}
976
977// UpdateGroup to update group info
978func (d *Device) UpdateGroup(group *of.Group, devPort *DevicePort) {
979 task := NewModGroupTask(d.ctx, group, d)
980 logger.Debugw(ctx, "NNI Port Context", log.Fields{"Ctx": devPort.GetContext()})
981 devPort.AddTask(task)
982}
983
984// ModMeter for mod meter task
985func (d *Device) ModMeter(command of.MeterCommand, meter *of.Meter, devPort *DevicePort) {
986 if command == of.MeterCommandAdd {
987 if _, err := d.GetMeter(meter.ID); err == nil {
988 logger.Debugw(ctx, "Meter already added", log.Fields{"ID": meter.ID})
989 return
990 }
991 }
992 t := NewModMeterTask(d.ctx, command, meter, d)
993 devPort.AddTask(t)
994}
995
996func (d *Device) getAndAddFlowQueueForUniID(id uint32) *UniIDFlowQueue {
997 d.flowQueueLock.RLock()
vinokuma926cb3e2023-03-29 11:41:06 +0530998 // If flowhash is 0 that means flowhash throttling is disabled, return nil
Naveen Sampath04696f72022-06-13 15:19:14 +0530999 if d.flowHash == 0 {
1000 d.flowQueueLock.RUnlock()
1001 return nil
1002 }
1003 flowHashID := id % uint32(d.flowHash)
1004 if value, found := d.flowQueue[uint32(flowHashID)]; found {
1005 d.flowQueueLock.RUnlock()
1006 return value
1007 }
1008 d.flowQueueLock.RUnlock()
1009 logger.Debugw(ctx, "Flow queue not found creating one", log.Fields{"uniid": id, "hash": flowHashID})
1010
1011 return d.addFlowQueueForUniID(id)
1012}
1013
1014func (d *Device) addFlowQueueForUniID(id uint32) *UniIDFlowQueue {
Naveen Sampath04696f72022-06-13 15:19:14 +05301015 d.flowQueueLock.Lock()
1016 defer d.flowQueueLock.Unlock()
1017 flowHashID := id % uint32(d.flowHash)
1018 flowQueue := NewUniIDFlowQueue(uint32(flowHashID))
1019 flowQueue.Tasks.Initialize(d.ctx)
1020 d.flowQueue[flowHashID] = flowQueue
1021 return flowQueue
1022}
1023
1024// SetFlowHash sets the device flow hash and writes to the DB.
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301025func (d *Device) SetFlowHash(cntx context.Context, hash uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301026 d.flowQueueLock.Lock()
1027 defer d.flowQueueLock.Unlock()
1028
1029 d.flowHash = hash
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301030 d.writeFlowHashToDB(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +05301031}
1032
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301033func (d *Device) writeFlowHashToDB(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301034 hash, err := json.Marshal(d.flowHash)
1035 if err != nil {
Akash Soni6168f312023-05-18 20:57:33 +05301036 logger.Errorw(ctx, "failed to marshal flow hash", log.Fields{"hash": d.flowHash, "error": err})
Naveen Sampath04696f72022-06-13 15:19:14 +05301037 return
1038 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301039 if err := db.PutFlowHash(cntx, d.ID, string(hash)); err != nil {
Akash Soni6168f312023-05-18 20:57:33 +05301040 logger.Errorw(ctx, "Failed to add flow hash to DB", log.Fields{"device": d.ID, "hash": d.flowHash, "error": err})
Naveen Sampath04696f72022-06-13 15:19:14 +05301041 }
1042}
1043
vinokuma926cb3e2023-03-29 11:41:06 +05301044// isSBOperAllowed - determines if the SB operation is allowed based on device state & force flag
Naveen Sampath04696f72022-06-13 15:19:14 +05301045func (d *Device) isSBOperAllowed(forceAction bool) bool {
Naveen Sampath04696f72022-06-13 15:19:14 +05301046 if d.State == DeviceStateUP {
1047 return true
1048 }
1049
1050 if d.State == DeviceStateDISABLED && forceAction {
1051 return true
1052 }
1053
1054 return false
1055}
1056
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301057func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301058 flow, _ := d.GetFlow(cookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301059 d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err)
Naveen Sampath04696f72022-06-13 15:19:14 +05301060}
1061
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301062func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301063 statusCode, statusMsg := infraerror.GetErrorInfo(err)
1064 success := isFlowOperSuccess(statusCode, oper)
1065
1066 updateFlow := func(cookie uint64, state int, reason string) {
1067 if dbFlow, ok := d.GetFlow(cookie); ok {
1068 dbFlow.State = uint8(state)
1069 dbFlow.ErrorReason = reason
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301070 d.AddFlowToDb(cntx, dbFlow)
Naveen Sampath04696f72022-06-13 15:19:14 +05301071 }
1072 }
1073
vinokuma926cb3e2023-03-29 11:41:06 +05301074 // Update flow results
Naveen Sampath04696f72022-06-13 15:19:14 +05301075 // Add - Update Success or Failure status with reason
1076 // Del - Delete entry from DB on success else update error reason
1077 if oper == of.CommandAdd {
1078 state := of.FlowAddSuccess
1079 reason := ""
1080 if !success {
1081 state = of.FlowAddFailure
1082 reason = statusMsg
1083 }
1084 updateFlow(cookie, state, reason)
1085 logger.Debugw(ctx, "Updated Flow to DB", log.Fields{"Cookie": cookie, "State": state})
1086 } else {
1087 if success && flow != nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301088 if err := d.DelFlow(cntx, flow); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301089 logger.Warnw(ctx, "Delete Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
1090 }
1091 } else if !success {
1092 updateFlow(cookie, of.FlowDelFailure, statusMsg)
1093 }
1094 }
1095
1096 flowResult := intf.FlowStatus{
1097 Cookie: strconv.FormatUint(cookie, 10),
1098 Device: d.ID,
1099 FlowModType: oper,
1100 Flow: flow,
1101 Status: statusCode,
1102 Reason: statusMsg,
1103 AdditionalData: bwDetails,
1104 }
1105
1106 logger.Infow(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301107 GetController().ProcessFlowModResultIndication(cntx, flowResult)
Naveen Sampath04696f72022-06-13 15:19:14 +05301108}