blob: 57474a07932d43ad6632703196c10118ecea700a [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
vinokuma926cb3e2023-03-29 11:41:06 +053014 */
Naveen Sampath04696f72022-06-13 15:19:14 +053015
16package controller
17
18import (
19 "context"
20 "encoding/json"
21 "errors"
Tinoj Joseph429b9d92022-11-16 18:51:05 +053022 "fmt"
Naveen Sampath04696f72022-06-13 15:19:14 +053023 "strconv"
Tinoj Joseph429b9d92022-11-16 18:51:05 +053024 "strings"
Naveen Sampath04696f72022-06-13 15:19:14 +053025 "sync"
26 "time"
vinokuma926cb3e2023-03-29 11:41:06 +053027 infraerror "voltha-go-controller/internal/pkg/errorcodes"
Naveen Sampath04696f72022-06-13 15:19:14 +053028
29 "voltha-go-controller/database"
30 "voltha-go-controller/internal/pkg/holder"
31 "voltha-go-controller/internal/pkg/intf"
32 "voltha-go-controller/internal/pkg/of"
vinokuma926cb3e2023-03-29 11:41:06 +053033
Naveen Sampath04696f72022-06-13 15:19:14 +053034 //"voltha-go-controller/internal/pkg/vpagent"
35 "voltha-go-controller/internal/pkg/tasks"
36 "voltha-go-controller/internal/pkg/util"
Tinoj Joseph1d108322022-07-13 10:07:39 +053037 "voltha-go-controller/log"
vinokuma926cb3e2023-03-29 11:41:06 +053038
Naveen Sampath04696f72022-06-13 15:19:14 +053039 ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
40 "github.com/opencord/voltha-protos/v5/go/voltha"
41)
42
43// PortState type
44type PortState string
45
46const (
47 // PortStateDown constant
48 PortStateDown PortState = "DOWN"
49 // PortStateUp constant
50 PortStateUp PortState = "UP"
51 // DefaultMaxFlowQueues constant
52 DefaultMaxFlowQueues = 67
53 //ErrDuplicateFlow - indicates flow already exists in DB
Akash Sonid36d23b2023-08-18 12:51:40 +053054 ErrDuplicateFlow string = "duplicate flow"
55 //Unknown_Port_ID - indicates that the port id is unknown
56 Unknown_Port_ID = "unknown port id"
57 //Duplicate_Port - indicates the port is already exist in controller
58 Duplicate_Port = "duplicate port"
Naveen Sampath04696f72022-06-13 15:19:14 +053059)
60
61// DevicePort structure
62type DevicePort struct {
vinokuma926cb3e2023-03-29 11:41:06 +053063 Name string
64 State PortState
65 Version string
66 HwAddr string
Naveen Sampath04696f72022-06-13 15:19:14 +053067 tasks.Tasks
Tinoj Joseph429b9d92022-11-16 18:51:05 +053068 CurrSpeed uint32
69 MaxSpeed uint32
vinokuma926cb3e2023-03-29 11:41:06 +053070 ID uint32
Naveen Sampath04696f72022-06-13 15:19:14 +053071}
72
73// NewDevicePort is the constructor for DevicePort
Tinoj Joseph429b9d92022-11-16 18:51:05 +053074func NewDevicePort(mp *ofp.OfpPort) *DevicePort {
Naveen Sampath04696f72022-06-13 15:19:14 +053075 var port DevicePort
76
Tinoj Joseph429b9d92022-11-16 18:51:05 +053077 port.ID = mp.PortNo
78 port.Name = mp.Name
79
80 //port.HwAddr = strings.Trim(strings.Join(strings.Fields(fmt.Sprint("%02x", mp.HwAddr)), ":"), "[]")
81 port.HwAddr = strings.Trim(strings.ReplaceAll(fmt.Sprintf("%02x", mp.HwAddr), " ", ":"), "[]")
82 port.CurrSpeed = mp.CurrSpeed
83 port.MaxSpeed = mp.MaxSpeed
Naveen Sampath04696f72022-06-13 15:19:14 +053084 port.State = PortStateDown
85 return &port
86}
87
88// UniIDFlowQueue structure which maintains flows in queue.
89type UniIDFlowQueue struct {
90 tasks.Tasks
91 ID uint32
92}
93
94// NewUniIDFlowQueue is the constructor for UniIDFlowQueue.
95func NewUniIDFlowQueue(id uint32) *UniIDFlowQueue {
96 var flowQueue UniIDFlowQueue
97 flowQueue.ID = id
98 return &flowQueue
99}
100
101// DeviceState type
102type DeviceState string
103
104const (
105
106 // DeviceStateUNKNOWN constant
107 DeviceStateUNKNOWN DeviceState = "UNKNOWN"
108 // DeviceStateINIT constant
109 DeviceStateINIT DeviceState = "INIT"
110 // DeviceStateUP constant
111 DeviceStateUP DeviceState = "UP"
112 // DeviceStateDOWN constant
113 DeviceStateDOWN DeviceState = "DOWN"
114 // DeviceStateREBOOTED constant
115 DeviceStateREBOOTED DeviceState = "REBOOTED"
116 // DeviceStateDISABLED constant
117 DeviceStateDISABLED DeviceState = "DISABLED"
118 // DeviceStateDELETED constant
119 DeviceStateDELETED DeviceState = "DELETED"
120)
121
122// Device structure
123type Device struct {
vinokuma926cb3e2023-03-29 11:41:06 +0530124 ctx context.Context
125 cancel context.CancelFunc
126 vclientHolder *holder.VolthaServiceClientHolder
127 packetOutChannel chan *ofp.PacketOut
128 PortsByName map[string]*DevicePort
129 flows map[uint64]*of.VoltSubFlow
130 PortsByID map[uint32]*DevicePort
131 meters map[uint32]*of.Meter
132 flowQueue map[uint32]*UniIDFlowQueue // key is hash ID generated and value is UniIDFlowQueue.
133 SouthBoundID string
134 MfrDesc string
135 HwDesc string
136 SwDesc string
137 ID string
138 SerialNum string
139 State DeviceState
140 TimeStamp time.Time
141 groups sync.Map //map[uint32]*of.Group -> [GroupId : Group]
Naveen Sampath04696f72022-06-13 15:19:14 +0530142 tasks.Tasks
Naveen Sampath04696f72022-06-13 15:19:14 +0530143 portLock sync.RWMutex
Naveen Sampath04696f72022-06-13 15:19:14 +0530144 flowLock sync.RWMutex
Naveen Sampath04696f72022-06-13 15:19:14 +0530145 meterLock sync.RWMutex
Naveen Sampath04696f72022-06-13 15:19:14 +0530146 flowQueueLock sync.RWMutex
147 flowHash uint32
vinokuma926cb3e2023-03-29 11:41:06 +0530148 auditInProgress bool
Naveen Sampath04696f72022-06-13 15:19:14 +0530149 deviceAuditInProgress bool
Naveen Sampath04696f72022-06-13 15:19:14 +0530150}
151
152// NewDevice is the constructor for Device
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530153func NewDevice(cntx context.Context, id string, slno string, vclientHldr *holder.VolthaServiceClientHolder, southBoundID, mfr, hwDesc, swDesc string) *Device {
Naveen Sampath04696f72022-06-13 15:19:14 +0530154 var device Device
155 device.ID = id
156 device.SerialNum = slno
157 device.State = DeviceStateDOWN
158 device.PortsByID = make(map[uint32]*DevicePort)
159 device.PortsByName = make(map[string]*DevicePort)
160 device.vclientHolder = vclientHldr
161 device.flows = make(map[uint64]*of.VoltSubFlow)
162 device.meters = make(map[uint32]*of.Meter)
163 device.flowQueue = make(map[uint32]*UniIDFlowQueue)
vinokuma926cb3e2023-03-29 11:41:06 +0530164 // Get the flowhash from db and update the flowhash variable in the device.
Naveen Sampath04696f72022-06-13 15:19:14 +0530165 device.SouthBoundID = southBoundID
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530166 device.MfrDesc = mfr
167 device.HwDesc = hwDesc
168 device.SwDesc = swDesc
169 device.TimeStamp = time.Now()
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530170 flowHash, err := db.GetFlowHash(cntx, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530171 if err != nil {
172 device.flowHash = DefaultMaxFlowQueues
173 } else {
174 var hash uint32
175 err = json.Unmarshal([]byte(flowHash), &hash)
176 if err != nil {
Akash Soni6168f312023-05-18 20:57:33 +0530177 logger.Errorw(ctx, "Failed to unmarshall flowhash", log.Fields{"data": flowHash})
Naveen Sampath04696f72022-06-13 15:19:14 +0530178 } else {
179 device.flowHash = hash
180 }
181 }
182 logger.Infow(ctx, "Flow hash for device", log.Fields{"Deviceid": id, "hash": device.flowHash})
183 return &device
184}
185
186// ResetCache to reset cache
187func (d *Device) ResetCache() {
188 logger.Warnw(ctx, "Resetting flows, meters and groups cache", log.Fields{"Device": d.ID})
189 d.flows = make(map[uint64]*of.VoltSubFlow)
190 d.meters = make(map[uint32]*of.Meter)
191 d.groups = sync.Map{}
192}
193
194// GetFlow - Get the flow from device obj
195func (d *Device) GetFlow(cookie uint64) (*of.VoltSubFlow, bool) {
196 d.flowLock.RLock()
197 defer d.flowLock.RUnlock()
Akash Soni6168f312023-05-18 20:57:33 +0530198 logger.Debugw(ctx, "Get Flow", log.Fields{"Cookie": cookie})
Naveen Sampath04696f72022-06-13 15:19:14 +0530199 flow, ok := d.flows[cookie]
200 return flow, ok
201}
202
Tinoj Josephec742f62022-09-29 19:11:10 +0530203// GetAllFlows - Get the flow from device obj
vinokuma926cb3e2023-03-29 11:41:06 +0530204func (d *Device) GetAllFlows() []*of.VoltSubFlow {
Tinoj Josephec742f62022-09-29 19:11:10 +0530205 d.flowLock.RLock()
206 defer d.flowLock.RUnlock()
207 var flows []*of.VoltSubFlow
Akash Soni6168f312023-05-18 20:57:33 +0530208 logger.Debugw(ctx, "Get All Flows", log.Fields{"deviceID": d.ID})
Tinoj Josephec742f62022-09-29 19:11:10 +0530209 for _, f := range d.flows {
210 flows = append(flows, f)
211 }
212 return flows
213}
214
215// GetAllPendingFlows - Get the flow from device obj
vinokuma926cb3e2023-03-29 11:41:06 +0530216func (d *Device) GetAllPendingFlows() []*of.VoltSubFlow {
Tinoj Josephec742f62022-09-29 19:11:10 +0530217 d.flowLock.RLock()
218 defer d.flowLock.RUnlock()
219 var flows []*of.VoltSubFlow
Akash Soni6168f312023-05-18 20:57:33 +0530220 logger.Debugw(ctx, "Get All Pending Flows", log.Fields{"deviceID": d.ID})
Tinoj Josephec742f62022-09-29 19:11:10 +0530221 for _, f := range d.flows {
222 if f.State == of.FlowAddPending {
223 flows = append(flows, f)
224 }
225 }
226 return flows
227}
228
Naveen Sampath04696f72022-06-13 15:19:14 +0530229// AddFlow - Adds the flow to the device and also to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530230func (d *Device) AddFlow(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530231 d.flowLock.Lock()
232 defer d.flowLock.Unlock()
Akash Soni6168f312023-05-18 20:57:33 +0530233 logger.Debugw(ctx, "AddFlow to device", log.Fields{"Cookie": flow.Cookie})
Naveen Sampath04696f72022-06-13 15:19:14 +0530234 if _, ok := d.flows[flow.Cookie]; ok {
235 return errors.New(ErrDuplicateFlow)
236 }
237 d.flows[flow.Cookie] = flow
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530238 d.AddFlowToDb(cntx, flow)
Naveen Sampath04696f72022-06-13 15:19:14 +0530239 return nil
240}
241
242// AddFlowToDb is the utility to add the flow to the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530243func (d *Device) AddFlowToDb(cntx context.Context, flow *of.VoltSubFlow) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530244 if b, err := json.Marshal(flow); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530245 if err = db.PutFlow(cntx, d.ID, flow.Cookie, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530246 logger.Errorw(ctx, "Write Flow to DB failed", log.Fields{"device": d.ID, "cookie": flow.Cookie, "Reason": err})
247 }
248 }
249}
250
251// DelFlow - Deletes the flow from the device and the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530252func (d *Device) DelFlow(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530253 d.flowLock.Lock()
254 defer d.flowLock.Unlock()
255 if _, ok := d.flows[flow.Cookie]; ok {
256 delete(d.flows, flow.Cookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530257 d.DelFlowFromDb(cntx, flow.Cookie)
Naveen Sampath04696f72022-06-13 15:19:14 +0530258 return nil
259 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530260 return errors.New("flow does not exist")
Naveen Sampath04696f72022-06-13 15:19:14 +0530261}
262
263// DelFlowFromDb is utility to delete the flow from the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530264func (d *Device) DelFlowFromDb(cntx context.Context, flowID uint64) {
265 _ = db.DelFlow(cntx, d.ID, flowID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530266}
267
268// IsFlowPresentWithOldCookie is to check whether there is any flow with old cookie.
269func (d *Device) IsFlowPresentWithOldCookie(flow *of.VoltSubFlow) bool {
270 d.flowLock.RLock()
271 defer d.flowLock.RUnlock()
272 if _, ok := d.flows[flow.Cookie]; ok {
273 return false
274 } else if flow.OldCookie != 0 && flow.Cookie != flow.OldCookie {
275 if _, ok := d.flows[flow.OldCookie]; ok {
Akash Soni6168f312023-05-18 20:57:33 +0530276 logger.Debugw(ctx, "Flow present with old cookie", log.Fields{"OldCookie": flow.OldCookie})
Naveen Sampath04696f72022-06-13 15:19:14 +0530277 return true
278 }
279 }
280 return false
281}
282
283// DelFlowWithOldCookie is to delete flow with old cookie.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530284func (d *Device) DelFlowWithOldCookie(cntx context.Context, flow *of.VoltSubFlow) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530285 d.flowLock.Lock()
286 defer d.flowLock.Unlock()
287 if _, ok := d.flows[flow.OldCookie]; ok {
Akash Soni6168f312023-05-18 20:57:33 +0530288 logger.Debugw(ctx, "Flow was added before vgc upgrade. Trying to delete with old cookie",
Naveen Sampath04696f72022-06-13 15:19:14 +0530289 log.Fields{"OldCookie": flow.OldCookie})
290 delete(d.flows, flow.OldCookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530291 d.DelFlowFromDb(cntx, flow.OldCookie)
Naveen Sampath04696f72022-06-13 15:19:14 +0530292 return nil
293 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530294 return errors.New("flow does not exist")
Naveen Sampath04696f72022-06-13 15:19:14 +0530295}
296
297// RestoreFlowsFromDb to restore flows from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530298func (d *Device) RestoreFlowsFromDb(cntx context.Context) {
299 flows, _ := db.GetFlows(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530300 for _, flow := range flows {
301 b, ok := flow.Value.([]byte)
302 if !ok {
303 logger.Warn(ctx, "The value type is not []byte")
304 continue
305 }
306 d.CreateFlowFromString(b)
307 }
308}
309
310// CreateFlowFromString to create flow from string
311func (d *Device) CreateFlowFromString(b []byte) {
312 var flow of.VoltSubFlow
313 if err := json.Unmarshal(b, &flow); err == nil {
314 if _, ok := d.flows[flow.Cookie]; !ok {
315 logger.Debugw(ctx, "Adding Flow From Db", log.Fields{"Cookie": flow.Cookie})
316 d.flows[flow.Cookie] = &flow
317 } else {
318 logger.Warnw(ctx, "Duplicate Flow", log.Fields{"Cookie": flow.Cookie})
319 }
320 } else {
321 logger.Warn(ctx, "Unmarshal failed")
322 }
323}
324
325// ----------------------------------------------------------
326// Database related functionality
327// Group operations at the device which include update and delete
328
329// UpdateGroupEntry - Adds/Updates the group to the device and also to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530330func (d *Device) UpdateGroupEntry(cntx context.Context, group *of.Group) {
Akash Soni6168f312023-05-18 20:57:33 +0530331 logger.Debugw(ctx, "Update Group to device", log.Fields{"ID": group.GroupID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530332 d.groups.Store(group.GroupID, group)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530333 d.AddGroupToDb(cntx, group)
Naveen Sampath04696f72022-06-13 15:19:14 +0530334}
335
336// AddGroupToDb - Utility to add the group to the device DB
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530337func (d *Device) AddGroupToDb(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530338 if b, err := json.Marshal(group); err == nil {
Akash Soni6168f312023-05-18 20:57:33 +0530339 logger.Debugw(ctx, "Adding Group to DB", log.Fields{"grp": group, "Json": string(b)})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530340 if err = db.PutGroup(cntx, d.ID, group.GroupID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530341 logger.Errorw(ctx, "Write Group to DB failed", log.Fields{"device": d.ID, "groupID": group.GroupID, "Reason": err})
342 }
343 }
344}
345
346// DelGroupEntry - Deletes the group from the device and the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530347func (d *Device) DelGroupEntry(cntx context.Context, group *of.Group) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530348 if _, ok := d.groups.Load(group.GroupID); ok {
349 d.groups.Delete(group.GroupID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530350 d.DelGroupFromDb(cntx, group.GroupID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530351 }
352}
353
354// DelGroupFromDb - Utility to delete the Group from the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530355func (d *Device) DelGroupFromDb(cntx context.Context, groupID uint32) {
356 _ = db.DelGroup(cntx, d.ID, groupID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530357}
358
vinokuma926cb3e2023-03-29 11:41:06 +0530359// RestoreGroupsFromDb - restores all groups from DB
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530360func (d *Device) RestoreGroupsFromDb(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530361 logger.Info(ctx, "Restoring Groups")
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530362 groups, _ := db.GetGroups(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530363 for _, group := range groups {
364 b, ok := group.Value.([]byte)
365 if !ok {
366 logger.Warn(ctx, "The value type is not []byte")
367 continue
368 }
369 d.CreateGroupFromString(b)
370 }
371}
372
vinokuma926cb3e2023-03-29 11:41:06 +0530373// CreateGroupFromString - Forms group struct from json string
Naveen Sampath04696f72022-06-13 15:19:14 +0530374func (d *Device) CreateGroupFromString(b []byte) {
375 var group of.Group
376 if err := json.Unmarshal(b, &group); err == nil {
377 if _, ok := d.groups.Load(group.GroupID); !ok {
378 logger.Debugw(ctx, "Adding Group From Db", log.Fields{"GroupId": group.GroupID})
379 d.groups.Store(group.GroupID, &group)
380 } else {
381 logger.Warnw(ctx, "Duplicate Group", log.Fields{"GroupId": group.GroupID})
382 }
383 } else {
384 logger.Warn(ctx, "Unmarshal failed")
385 }
386}
387
388// AddMeter to add meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530389func (d *Device) AddMeter(cntx context.Context, meter *of.Meter) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530390 d.meterLock.Lock()
391 defer d.meterLock.Unlock()
392 if _, ok := d.meters[meter.ID]; ok {
Akash Sonid36d23b2023-08-18 12:51:40 +0530393 return errors.New("duplicate meter")
Naveen Sampath04696f72022-06-13 15:19:14 +0530394 }
395 d.meters[meter.ID] = meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530396 go d.AddMeterToDb(cntx, meter)
Naveen Sampath04696f72022-06-13 15:19:14 +0530397 return nil
398}
399
Sridhar Ravindra2d2ef4e2023-02-08 16:43:38 +0530400// UpdateMeter to update meter
401func (d *Device) UpdateMeter(cntx context.Context, meter *of.Meter) error {
vinokuma926cb3e2023-03-29 11:41:06 +0530402 d.meterLock.Lock()
403 defer d.meterLock.Unlock()
404 if _, ok := d.meters[meter.ID]; ok {
405 d.meters[meter.ID] = meter
406 d.AddMeterToDb(cntx, meter)
407 } else {
Akash Sonid36d23b2023-08-18 12:51:40 +0530408 return errors.New("meter not found for updation")
vinokuma926cb3e2023-03-29 11:41:06 +0530409 }
410 return nil
Sridhar Ravindra2d2ef4e2023-02-08 16:43:38 +0530411}
412
Naveen Sampath04696f72022-06-13 15:19:14 +0530413// GetMeter to get meter
414func (d *Device) GetMeter(id uint32) (*of.Meter, error) {
415 d.meterLock.RLock()
416 defer d.meterLock.RUnlock()
417 if m, ok := d.meters[id]; ok {
418 return m, nil
419 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530420 return nil, errors.New("meter not found")
Naveen Sampath04696f72022-06-13 15:19:14 +0530421}
422
423// DelMeter to delete meter
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530424func (d *Device) DelMeter(cntx context.Context, meter *of.Meter) bool {
Naveen Sampath04696f72022-06-13 15:19:14 +0530425 d.meterLock.Lock()
426 defer d.meterLock.Unlock()
427 if _, ok := d.meters[meter.ID]; ok {
428 delete(d.meters, meter.ID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530429 go d.DelMeterFromDb(cntx, meter.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530430 return true
431 }
432 return false
433}
434
435// AddMeterToDb is utility to add the Group to the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530436func (d *Device) AddMeterToDb(cntx context.Context, meter *of.Meter) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530437 if b, err := json.Marshal(meter); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530438 if err = db.PutDeviceMeter(cntx, d.ID, meter.ID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530439 logger.Errorw(ctx, "Write Meter to DB failed", log.Fields{"device": d.ID, "meterID": meter.ID, "Reason": err})
440 }
441 }
442}
443
444// DelMeterFromDb to delete meter from db
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530445func (d *Device) DelMeterFromDb(cntx context.Context, id uint32) {
446 _ = db.DelDeviceMeter(cntx, d.ID, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530447}
448
449// RestoreMetersFromDb to restore meters from db
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530450func (d *Device) RestoreMetersFromDb(cntx context.Context) {
451 meters, _ := db.GetDeviceMeters(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530452 for _, meter := range meters {
453 b, ok := meter.Value.([]byte)
454 if !ok {
455 logger.Warn(ctx, "The value type is not []byte")
456 continue
457 }
458 d.CreateMeterFromString(b)
459 }
460}
461
462// CreateMeterFromString to create meter from string
463func (d *Device) CreateMeterFromString(b []byte) {
464 var meter of.Meter
465 if err := json.Unmarshal(b, &meter); err == nil {
466 if _, ok := d.meters[meter.ID]; !ok {
467 logger.Debugw(ctx, "Adding Meter From Db", log.Fields{"ID": meter.ID})
468 d.meters[meter.ID] = &meter
469 } else {
470 logger.Warnw(ctx, "Duplicate Meter", log.Fields{"ID": meter.ID})
471 }
472 } else {
Akash Soni6168f312023-05-18 20:57:33 +0530473 logger.Warnw(ctx, "Unmarshal failed", log.Fields{"error": err, "meter": string(b)})
Naveen Sampath04696f72022-06-13 15:19:14 +0530474 }
475}
476
477// VolthaClient to get voltha client
478func (d *Device) VolthaClient() voltha.VolthaServiceClient {
479 return d.vclientHolder.Get()
480}
481
482// AddPort to add the port as requested by the device/VOLTHA
483// Inform the application if the port is successfully added
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530484func (d *Device) AddPort(cntx context.Context, mp *ofp.OfpPort) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530485 d.portLock.Lock()
486 defer d.portLock.Unlock()
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530487 id := mp.PortNo
488 name := mp.Name
Naveen Sampath04696f72022-06-13 15:19:14 +0530489 if _, ok := d.PortsByID[id]; ok {
Akash Sonid36d23b2023-08-18 12:51:40 +0530490 return errors.New(Duplicate_Port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530491 }
492 if _, ok := d.PortsByName[name]; ok {
Akash Sonid36d23b2023-08-18 12:51:40 +0530493 return errors.New(Duplicate_Port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530494 }
495
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530496 p := NewDevicePort(mp)
Naveen Sampath04696f72022-06-13 15:19:14 +0530497 d.PortsByID[id] = p
498 d.PortsByName[name] = p
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530499 d.WritePortToDb(cntx, p)
500 GetController().PortAddInd(cntx, d.ID, p.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530501 logger.Infow(ctx, "Added Port", log.Fields{"Device": d.ID, "Port": id})
502 return nil
503}
504
505// DelPort to delete the port as requested by the device/VOLTHA
506// Inform the application if the port is successfully deleted
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530507func (d *Device) DelPort(cntx context.Context, id uint32) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530508 p := d.GetPortByID(id)
509 if p == nil {
Akash Sonid36d23b2023-08-18 12:51:40 +0530510 return errors.New("unknown port")
Naveen Sampath04696f72022-06-13 15:19:14 +0530511 }
512 if p.State == PortStateUp {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530513 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530514 }
Tinoj Joseph4ead4e02023-01-30 03:12:44 +0530515 GetController().PortDelInd(cntx, d.ID, p.Name)
516
Naveen Sampath04696f72022-06-13 15:19:14 +0530517 d.portLock.Lock()
518 defer d.portLock.Unlock()
519
Naveen Sampath04696f72022-06-13 15:19:14 +0530520 delete(d.PortsByID, p.ID)
521 delete(d.PortsByName, p.Name)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530522 d.DelPortFromDb(cntx, p.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530523 logger.Infow(ctx, "Deleted Port", log.Fields{"Device": d.ID, "Port": id})
524 return nil
525}
526
527// UpdatePortByName is utility to update the port by Name
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530528func (d *Device) UpdatePortByName(cntx context.Context, name string, port uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530529 d.portLock.Lock()
530 defer d.portLock.Unlock()
531
532 p, ok := d.PortsByName[name]
533 if !ok {
534 return
535 }
536 delete(d.PortsByID, p.ID)
537 p.ID = port
538 d.PortsByID[port] = p
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530539 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530540 GetController().PortUpdateInd(d.ID, p.Name, p.ID)
541 logger.Infow(ctx, "Updated Port", log.Fields{"Device": d.ID, "Port": p.ID, "PortName": name})
542}
543
544// GetPortName to get the name of the port by its id
545func (d *Device) GetPortName(id uint32) (string, error) {
546 d.portLock.RLock()
547 defer d.portLock.RUnlock()
548
549 if p, ok := d.PortsByID[id]; ok {
550 return p.Name, nil
551 }
552 logger.Errorw(ctx, "Port not found", log.Fields{"port": id})
Akash Sonid36d23b2023-08-18 12:51:40 +0530553 return "", errors.New(Unknown_Port_ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530554}
555
556// GetPortByID is utility to retrieve the port by ID
557func (d *Device) GetPortByID(id uint32) *DevicePort {
558 d.portLock.RLock()
559 defer d.portLock.RUnlock()
560
561 p, ok := d.PortsByID[id]
562 if ok {
563 return p
564 }
565 return nil
566}
567
568// GetPortByName is utility to retrieve the port by Name
569func (d *Device) GetPortByName(name string) *DevicePort {
570 d.portLock.RLock()
571 defer d.portLock.RUnlock()
572
573 p, ok := d.PortsByName[name]
574 if ok {
575 return p
576 }
577 return nil
578}
579
580// GetPortState to get the state of the port by name
581func (d *Device) GetPortState(name string) (PortState, error) {
582 d.portLock.RLock()
583 defer d.portLock.RUnlock()
584
585 if p, ok := d.PortsByName[name]; ok {
586 return p.State, nil
587 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530588 return PortStateDown, errors.New(Unknown_Port_ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530589}
590
591// GetPortID to get the port-id by the port name
592func (d *Device) GetPortID(name string) (uint32, error) {
593 d.portLock.RLock()
594 defer d.portLock.RUnlock()
595
596 if p, ok := d.PortsByName[name]; ok {
597 return p.ID, nil
598 }
Akash Sonid36d23b2023-08-18 12:51:40 +0530599 return 0, errors.New(Unknown_Port_ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530600}
601
602// WritePortToDb to add the port to the database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530603func (d *Device) WritePortToDb(ctx context.Context, port *DevicePort) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530604 port.Version = database.PresentVersionMap[database.DevicePortPath]
605 if b, err := json.Marshal(port); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530606 if err = db.PutPort(ctx, d.ID, port.ID, string(b)); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530607 logger.Errorw(ctx, "Write port to DB failed", log.Fields{"device": d.ID, "port": port.ID, "Reason": err})
608 }
609 }
610}
611
612// DelPortFromDb to delete port from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530613func (d *Device) DelPortFromDb(cntx context.Context, id uint32) {
614 _ = db.DelPort(cntx, d.ID, id)
Naveen Sampath04696f72022-06-13 15:19:14 +0530615}
616
617// RestorePortsFromDb to restore ports from database
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530618func (d *Device) RestorePortsFromDb(cntx context.Context) {
619 ports, _ := db.GetPorts(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530620 for _, port := range ports {
621 b, ok := port.Value.([]byte)
622 if !ok {
623 logger.Warn(ctx, "The value type is not []byte")
624 continue
625 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530626 d.CreatePortFromString(cntx, b)
Naveen Sampath04696f72022-06-13 15:19:14 +0530627 }
628}
629
630// CreatePortFromString to create port from string
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530631func (d *Device) CreatePortFromString(cntx context.Context, b []byte) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530632 var port DevicePort
633 if err := json.Unmarshal(b, &port); err == nil {
634 if _, ok := d.PortsByID[port.ID]; !ok {
635 logger.Debugw(ctx, "Adding Port From Db", log.Fields{"ID": port.ID})
636 d.PortsByID[port.ID] = &port
637 d.PortsByName[port.Name] = &port
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530638 GetController().PortAddInd(cntx, d.ID, port.ID, port.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530639 } else {
Akash Sonid36d23b2023-08-18 12:51:40 +0530640 logger.Warnw(ctx, Duplicate_Port, log.Fields{"ID": port.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530641 }
642 } else {
Akash Soni6168f312023-05-18 20:57:33 +0530643 logger.Warnw(ctx, "Unmarshal failed", log.Fields{"port": string(b)})
Naveen Sampath04696f72022-06-13 15:19:14 +0530644 }
645}
646
647// Delete : OLT Delete functionality yet to be implemented. IDeally all of the
648// resources should have been removed by this time. It is an error
649// scenario if the OLT has resources associated with it.
650func (d *Device) Delete() {
651 d.StopAll()
652}
653
654// Stop to stop the task
655func (d *Device) Stop() {
656}
657
658// ConnectInd is called when the connection between VGC and the VOLTHA is
659// restored. This will perform audit of the device post reconnection
660func (d *Device) ConnectInd(ctx context.Context, discType intf.DiscoveryType) {
661 logger.Warnw(ctx, "Audit Upon Connection Establishment", log.Fields{"Device": d.ID, "State": d.State})
662 ctx1, cancel := context.WithCancel(ctx)
663 d.cancel = cancel
664 d.ctx = ctx1
665 d.Tasks.Initialize(ctx1)
666
Akash Soni6168f312023-05-18 20:57:33 +0530667 logger.Debugw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530668 d.State = DeviceStateUP
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530669 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530670 GetController().DeviceUpInd(d.ID)
671
Akash Soni6168f312023-05-18 20:57:33 +0530672 logger.Debugw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530673 t := NewAuditDevice(d, AuditEventDeviceDisc)
674 d.Tasks.AddTask(t)
675
676 t1 := NewAuditTablesTask(d)
677 d.Tasks.AddTask(t1)
678
679 t2 := NewPendingProfilesTask(d)
680 d.Tasks.AddTask(t2)
681
682 go d.synchronizeDeviceTables()
683}
684
685func (d *Device) synchronizeDeviceTables() {
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530686 tick := time.NewTicker(GetController().GetDeviceTableSyncDuration())
Naveen Sampath04696f72022-06-13 15:19:14 +0530687loop:
688 for {
689 select {
690 case <-d.ctx.Done():
vinokuma926cb3e2023-03-29 11:41:06 +0530691 logger.Warnw(d.ctx, "Context Done. Canceling Periodic Audit", log.Fields{"Context": ctx, "Device": d.ID, "DeviceSerialNum": d.SerialNum})
Naveen Sampath04696f72022-06-13 15:19:14 +0530692 break loop
693 case <-tick.C:
694 t1 := NewAuditTablesTask(d)
695 d.Tasks.AddTask(t1)
696 }
697 }
698 tick.Stop()
699}
700
701// DeviceUpInd is called when the logical device state changes to UP. This will perform audit of the device post reconnection
702func (d *Device) DeviceUpInd() {
703 logger.Warnw(ctx, "Device State change Ind: UP", log.Fields{"Device": d.ID})
704 d.State = DeviceStateUP
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530705 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530706 GetController().DeviceUpInd(d.ID)
707
708 logger.Warnw(ctx, "Device State change Ind: UP, trigger Audit Tasks", log.Fields{"Device": d.ID})
709 t := NewAuditDevice(d, AuditEventDeviceDisc)
710 d.Tasks.AddTask(t)
711
712 t1 := NewAuditTablesTask(d)
713 d.Tasks.AddTask(t1)
714
715 t2 := NewPendingProfilesTask(d)
716 d.Tasks.AddTask(t2)
717}
718
719// DeviceDownInd is called when the logical device state changes to Down.
720func (d *Device) DeviceDownInd() {
721 logger.Warnw(ctx, "Device State change Ind: Down", log.Fields{"Device": d.ID})
722 d.State = DeviceStateDOWN
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530723 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530724 GetController().DeviceDownInd(d.ID)
725}
726
727// DeviceRebootInd is called when the logical device is rebooted.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530728func (d *Device) DeviceRebootInd(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530729 logger.Warnw(ctx, "Device State change Ind: Rebooted", log.Fields{"Device": d.ID})
730
731 if d.State == DeviceStateREBOOTED {
732 d.State = DeviceStateREBOOTED
733 logger.Warnw(ctx, "Ignoring Device State change Ind: REBOOT, Device Already in REBOOT state", log.Fields{"Device": d.ID, "SeralNo": d.SerialNum})
734 return
735 }
736
737 d.State = DeviceStateREBOOTED
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530738 d.TimeStamp = time.Now()
Naveen Sampath04696f72022-06-13 15:19:14 +0530739 GetController().SetRebootInProgressForDevice(d.ID)
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530740 GetController().DeviceRebootInd(cntx, d.ID, d.SerialNum, d.SouthBoundID)
741 d.ReSetAllPortStates(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +0530742}
743
744// DeviceDisabledInd is called when the logical device is disabled
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530745func (d *Device) DeviceDisabledInd(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530746 logger.Warnw(ctx, "Device State change Ind: Disabled", log.Fields{"Device": d.ID})
747 d.State = DeviceStateDISABLED
Tinoj Joseph429b9d92022-11-16 18:51:05 +0530748 d.TimeStamp = time.Now()
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530749 GetController().DeviceDisableInd(cntx, d.ID)
Naveen Sampath04696f72022-06-13 15:19:14 +0530750}
751
vinokuma926cb3e2023-03-29 11:41:06 +0530752// ReSetAllPortStates - Set all logical device port status to DOWN
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530753func (d *Device) ReSetAllPortStates(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530754 logger.Warnw(ctx, "Resetting all Ports State to DOWN", log.Fields{"Device": d.ID, "State": d.State})
755
756 d.portLock.Lock()
757 defer d.portLock.Unlock()
758
759 for _, port := range d.PortsByID {
760 if port.State != PortStateDown {
761 logger.Infow(ctx, "Resetting Port State to DOWN", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530762 GetController().PortDownInd(cntx, d.ID, port.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530763 port.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530764 d.WritePortToDb(cntx, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530765 }
766 }
767}
768
vinokuma926cb3e2023-03-29 11:41:06 +0530769// ReSetAllPortStatesInDb - Set all logical device port status to DOWN in DB and skip indication to application
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530770func (d *Device) ReSetAllPortStatesInDb(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530771 logger.Warnw(ctx, "Resetting all Ports State to DOWN In DB", log.Fields{"Device": d.ID, "State": d.State})
772
773 d.portLock.Lock()
774 defer d.portLock.Unlock()
775
776 for _, port := range d.PortsByID {
777 if port.State != PortStateDown {
Akash Soni6168f312023-05-18 20:57:33 +0530778 logger.Debugw(ctx, "Resetting Port State to DOWN and Write to DB", log.Fields{"Device": d.ID, "Port": port})
Naveen Sampath04696f72022-06-13 15:19:14 +0530779 port.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530780 d.WritePortToDb(cntx, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530781 }
782 }
783}
784
785// ProcessPortUpdate deals with the change in port id (ONU movement) and taking action
786// to update only when the port state is DOWN
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530787func (d *Device) ProcessPortUpdate(cntx context.Context, portName string, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530788 if p := d.GetPortByName(portName); p != nil {
789 if p.ID != port {
790 logger.Infow(ctx, "Port ID update indication", log.Fields{"Port": p.Name, "Old PortID": p.ID, "New Port ID": port})
791 if p.State != PortStateDown {
792 logger.Errorw(ctx, "Port ID update failed. Port State UP", log.Fields{"Port": p})
793 return
794 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530795 d.UpdatePortByName(cntx, portName, port)
Naveen Sampath04696f72022-06-13 15:19:14 +0530796 logger.Errorw(ctx, "Port ID Updated", log.Fields{"Port": p})
797 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530798 d.ProcessPortState(cntx, port, state)
Naveen Sampath04696f72022-06-13 15:19:14 +0530799 }
800}
801
802// ***Operations Performed on Port state Transitions***
803//
804// |-----------------------------------------------------------------------------|
805// | State | Action |
806// |--------------------|--------------------------------------------------------|
807// | UP | UNI - Trigger Flow addition for service configured |
808// | | NNI - Trigger Flow addition for vnets & mvlan profiles |
809// | | |
810// | DOWN | UNI - Trigger Flow deletion for service configured |
811// | | NNI - Trigger Flow deletion for vnets & mvlan profiles |
812// | | |
813// |-----------------------------------------------------------------------------|
814//
815
816// ProcessPortState deals with the change in port status and taking action
817// based on the new state and the old state
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530818func (d *Device) ProcessPortState(cntx context.Context, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530819 if d.State != DeviceStateUP && !util.IsNniPort(port) {
820 logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
821 return
822 }
823 if p := d.GetPortByID(port); p != nil {
824 logger.Infow(ctx, "Port State Processing", log.Fields{"Received": state, "Current": p.State})
825
826 // Avoid blind initialization as the current tasks in the queue will be lost
827 // Eg: Service Del followed by Port Down - The flows will be dangling
828 // Eg: NNI Down followed by NNI UP - Mcast data flows will be dangling
829 p.Tasks.CheckAndInitialize(d.ctx)
830 if state == uint32(ofp.OfpPortState_OFPPS_LIVE) && p.State == PortStateDown {
831 // Transition from DOWN to UP
832 logger.Infow(ctx, "Port State Change to UP", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530833 GetController().PortUpInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530834 p.State = PortStateUp
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530835 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530836 } else if (state != uint32(ofp.OfpPortState_OFPPS_LIVE)) && (p.State != PortStateDown) {
837 // Transition from UP to Down
838 logger.Infow(ctx, "Port State Change to Down", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530839 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530840 p.State = PortStateDown
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530841 d.WritePortToDb(cntx, p)
Naveen Sampath04696f72022-06-13 15:19:14 +0530842 } else {
843 logger.Warnw(ctx, "Dropping Port Ind: No Change in Port State", log.Fields{"PortName": p.Name, "ID": port, "Device": d.ID, "PortState": p.State, "IncomingState": state})
844 }
845 }
846}
847
848// ProcessPortStateAfterReboot - triggers the port state indication to sort out configu mismatch due to reboot
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530849func (d *Device) ProcessPortStateAfterReboot(cntx context.Context, port uint32, state uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530850 if d.State != DeviceStateUP && !util.IsNniPort(port) {
851 logger.Warnw(ctx, "Ignore Port State Processing - Device not UP", log.Fields{"Device": d.ID, "Port": port, "DeviceState": d.State})
852 return
853 }
854 if p := d.GetPortByID(port); p != nil {
855 logger.Infow(ctx, "Port State Processing after Reboot", log.Fields{"Received": state, "Current": p.State})
856 p.Tasks.Initialize(d.ctx)
857 if p.State == PortStateUp {
858 logger.Infow(ctx, "Port State: UP", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530859 GetController().PortUpInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530860 } else if p.State == PortStateDown {
861 logger.Infow(ctx, "Port State: Down", log.Fields{"Device": d.ID, "Port": port})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530862 GetController().PortDownInd(cntx, d.ID, p.Name)
Naveen Sampath04696f72022-06-13 15:19:14 +0530863 }
864 }
865}
866
867// ChangeEvent : Change event brings in ports related changes such as addition/deletion
868// or modification where the port status change up/down is indicated to the
869// controller
870func (d *Device) ChangeEvent(event *ofp.ChangeEvent) error {
871 cet := NewChangeEventTask(d.ctx, event, d)
872 d.AddTask(cet)
873 return nil
874}
875
876// PacketIn handle the incoming packet-in and deliver to the application for the
877// actual processing
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530878func (d *Device) PacketIn(cntx context.Context, pkt *ofp.PacketIn) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530879 logger.Debugw(ctx, "Received a Packet-In", log.Fields{"Device": d.ID})
880 if pkt.PacketIn.Reason != ofp.OfpPacketInReason_OFPR_ACTION {
881 logger.Warnw(ctx, "Unsupported PacketIn Reason", log.Fields{"Reason": pkt.PacketIn.Reason})
882 return
883 }
884 data := pkt.PacketIn.Data
885 port := PacketInGetPort(pkt.PacketIn)
886 if pName, err := d.GetPortName(port); err == nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530887 GetController().PacketInInd(cntx, d.ID, pName, data)
Naveen Sampath04696f72022-06-13 15:19:14 +0530888 } else {
889 logger.Warnw(ctx, "Unknown Port", log.Fields{"Reason": err.Error()})
890 }
891}
892
893// PacketInGetPort to get the port on which the packet-in is reported
894func PacketInGetPort(pkt *ofp.OfpPacketIn) uint32 {
895 for _, field := range pkt.Match.OxmFields {
896 if field.OxmClass == ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC {
897 if ofbField, ok := field.Field.(*ofp.OfpOxmField_OfbField); ok {
898 if ofbField.OfbField.Type == ofp.OxmOfbFieldTypes_OFPXMT_OFB_IN_PORT {
899 if port, ok := ofbField.OfbField.Value.(*ofp.OfpOxmOfbField_Port); ok {
900 return port.Port
901 }
902 }
903 }
904 }
905 }
906 return 0
907}
908
909// PacketOutReq receives the packet out request from the application via the
910// controller. The interface from the application uses name as the identity.
911func (d *Device) PacketOutReq(outport string, inport string, data []byte, isCustomPkt bool) error {
912 inp, err := d.GetPortID(inport)
913 if err != nil {
Akash Sonid36d23b2023-08-18 12:51:40 +0530914 return errors.New("unknown inport")
Naveen Sampath04696f72022-06-13 15:19:14 +0530915 }
916 outp, err1 := d.GetPortID(outport)
917 if err1 != nil {
Akash Sonid36d23b2023-08-18 12:51:40 +0530918 return errors.New("unknown outport")
Naveen Sampath04696f72022-06-13 15:19:14 +0530919 }
920 logger.Debugw(ctx, "Sending packet out", log.Fields{"Device": d.ID, "Inport": inport, "Outport": outport})
921 return d.SendPacketOut(outp, inp, data, isCustomPkt)
922}
923
924// SendPacketOut is responsible for building the OF structure and send the
925// packet-out to the VOLTHA
926func (d *Device) SendPacketOut(outport uint32, inport uint32, data []byte, isCustomPkt bool) error {
927 pout := &ofp.PacketOut{}
928 pout.Id = d.ID
929 opout := &ofp.OfpPacketOut{}
930 pout.PacketOut = opout
931 opout.InPort = inport
932 opout.Data = data
933 opout.Actions = []*ofp.OfpAction{
934 {
935 Type: ofp.OfpActionType_OFPAT_OUTPUT,
936 Action: &ofp.OfpAction_Output{
937 Output: &ofp.OfpActionOutput{
938 Port: outport,
939 MaxLen: 65535,
940 },
941 },
942 },
943 }
944 d.packetOutChannel <- pout
945 return nil
946}
947
948// UpdateFlows receives the flows in the form that is implemented
949// in the VGC and transforms them to the OF format. This is handled
950// as a port of the task that is enqueued to do the same.
951func (d *Device) UpdateFlows(flow *of.VoltFlow, devPort *DevicePort) {
952 t := NewAddFlowsTask(d.ctx, flow, d)
953 logger.Debugw(ctx, "Port Context", log.Fields{"Ctx": devPort.GetContext()})
954 // check if port isNni , if yes flows will be added to device port queues.
955 if util.IsNniPort(devPort.ID) {
956 // Adding the flows to device port queues.
957 devPort.AddTask(t)
958 return
959 }
960 // If the flowHash is enabled then add the flows to the flowhash generated queues.
961 flowQueue := d.getAndAddFlowQueueForUniID(uint32(devPort.ID))
962 if flowQueue != nil {
963 logger.Debugw(ctx, "flowHashQId", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID})
964 flowQueue.AddTask(t)
965 logger.Debugw(ctx, "Tasks Info", log.Fields{"uniid": devPort.ID, "flowhash": flowQueue.ID, "Total": flowQueue.TotalTasks(), "Pending": flowQueue.NumPendingTasks()})
966 } else {
967 //FlowThrotling disabled, add to the device port queue
968 devPort.AddTask(t)
969 return
970 }
971}
972
973// UpdateGroup to update group info
974func (d *Device) UpdateGroup(group *of.Group, devPort *DevicePort) {
975 task := NewModGroupTask(d.ctx, group, d)
976 logger.Debugw(ctx, "NNI Port Context", log.Fields{"Ctx": devPort.GetContext()})
977 devPort.AddTask(task)
978}
979
980// ModMeter for mod meter task
981func (d *Device) ModMeter(command of.MeterCommand, meter *of.Meter, devPort *DevicePort) {
982 if command == of.MeterCommandAdd {
983 if _, err := d.GetMeter(meter.ID); err == nil {
984 logger.Debugw(ctx, "Meter already added", log.Fields{"ID": meter.ID})
985 return
986 }
987 }
988 t := NewModMeterTask(d.ctx, command, meter, d)
989 devPort.AddTask(t)
990}
991
992func (d *Device) getAndAddFlowQueueForUniID(id uint32) *UniIDFlowQueue {
993 d.flowQueueLock.RLock()
vinokuma926cb3e2023-03-29 11:41:06 +0530994 // If flowhash is 0 that means flowhash throttling is disabled, return nil
Naveen Sampath04696f72022-06-13 15:19:14 +0530995 if d.flowHash == 0 {
996 d.flowQueueLock.RUnlock()
997 return nil
998 }
999 flowHashID := id % uint32(d.flowHash)
1000 if value, found := d.flowQueue[uint32(flowHashID)]; found {
1001 d.flowQueueLock.RUnlock()
1002 return value
1003 }
1004 d.flowQueueLock.RUnlock()
1005 logger.Debugw(ctx, "Flow queue not found creating one", log.Fields{"uniid": id, "hash": flowHashID})
1006
1007 return d.addFlowQueueForUniID(id)
1008}
1009
1010func (d *Device) addFlowQueueForUniID(id uint32) *UniIDFlowQueue {
Naveen Sampath04696f72022-06-13 15:19:14 +05301011 d.flowQueueLock.Lock()
1012 defer d.flowQueueLock.Unlock()
1013 flowHashID := id % uint32(d.flowHash)
1014 flowQueue := NewUniIDFlowQueue(uint32(flowHashID))
1015 flowQueue.Tasks.Initialize(d.ctx)
1016 d.flowQueue[flowHashID] = flowQueue
1017 return flowQueue
1018}
1019
1020// SetFlowHash sets the device flow hash and writes to the DB.
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301021func (d *Device) SetFlowHash(cntx context.Context, hash uint32) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301022 d.flowQueueLock.Lock()
1023 defer d.flowQueueLock.Unlock()
1024
1025 d.flowHash = hash
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301026 d.writeFlowHashToDB(cntx)
Naveen Sampath04696f72022-06-13 15:19:14 +05301027}
1028
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301029func (d *Device) writeFlowHashToDB(cntx context.Context) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301030 hash, err := json.Marshal(d.flowHash)
1031 if err != nil {
Akash Soni6168f312023-05-18 20:57:33 +05301032 logger.Errorw(ctx, "failed to marshal flow hash", log.Fields{"hash": d.flowHash, "error": err})
Naveen Sampath04696f72022-06-13 15:19:14 +05301033 return
1034 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301035 if err := db.PutFlowHash(cntx, d.ID, string(hash)); err != nil {
Akash Soni6168f312023-05-18 20:57:33 +05301036 logger.Errorw(ctx, "Failed to add flow hash to DB", log.Fields{"device": d.ID, "hash": d.flowHash, "error": err})
Naveen Sampath04696f72022-06-13 15:19:14 +05301037 }
1038}
1039
vinokuma926cb3e2023-03-29 11:41:06 +05301040// isSBOperAllowed - determines if the SB operation is allowed based on device state & force flag
Naveen Sampath04696f72022-06-13 15:19:14 +05301041func (d *Device) isSBOperAllowed(forceAction bool) bool {
Naveen Sampath04696f72022-06-13 15:19:14 +05301042 if d.State == DeviceStateUP {
1043 return true
1044 }
1045
1046 if d.State == DeviceStateDISABLED && forceAction {
1047 return true
1048 }
1049
1050 return false
1051}
1052
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301053func (d *Device) triggerFlowNotification(cntx context.Context, cookie uint64, oper of.Command, bwDetails of.BwAvailDetails, err error) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301054 flow, _ := d.GetFlow(cookie)
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301055 d.triggerFlowResultNotification(cntx, cookie, flow, oper, bwDetails, err)
Naveen Sampath04696f72022-06-13 15:19:14 +05301056}
1057
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301058func (d *Device) triggerFlowResultNotification(cntx context.Context, cookie uint64, flow *of.VoltSubFlow, oper of.Command, bwDetails of.BwAvailDetails, err error) {
Naveen Sampath04696f72022-06-13 15:19:14 +05301059 statusCode, statusMsg := infraerror.GetErrorInfo(err)
1060 success := isFlowOperSuccess(statusCode, oper)
1061
1062 updateFlow := func(cookie uint64, state int, reason string) {
1063 if dbFlow, ok := d.GetFlow(cookie); ok {
1064 dbFlow.State = uint8(state)
1065 dbFlow.ErrorReason = reason
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301066 d.AddFlowToDb(cntx, dbFlow)
Naveen Sampath04696f72022-06-13 15:19:14 +05301067 }
1068 }
1069
vinokuma926cb3e2023-03-29 11:41:06 +05301070 // Update flow results
Naveen Sampath04696f72022-06-13 15:19:14 +05301071 // Add - Update Success or Failure status with reason
1072 // Del - Delete entry from DB on success else update error reason
1073 if oper == of.CommandAdd {
1074 state := of.FlowAddSuccess
1075 reason := ""
1076 if !success {
1077 state = of.FlowAddFailure
1078 reason = statusMsg
1079 }
1080 updateFlow(cookie, state, reason)
1081 logger.Debugw(ctx, "Updated Flow to DB", log.Fields{"Cookie": cookie, "State": state})
1082 } else {
1083 if success && flow != nil {
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301084 if err := d.DelFlow(cntx, flow); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +05301085 logger.Warnw(ctx, "Delete Flow Error", log.Fields{"Cookie": flow.Cookie, "Reason": err.Error()})
1086 }
1087 } else if !success {
1088 updateFlow(cookie, of.FlowDelFailure, statusMsg)
1089 }
1090 }
1091
1092 flowResult := intf.FlowStatus{
1093 Cookie: strconv.FormatUint(cookie, 10),
1094 Device: d.ID,
1095 FlowModType: oper,
1096 Flow: flow,
1097 Status: statusCode,
1098 Reason: statusMsg,
1099 AdditionalData: bwDetails,
1100 }
1101
1102 logger.Infow(ctx, "Sending Flow Notification", log.Fields{"Cookie": cookie, "Error Code": statusCode, "FlowOp": oper})
Tinoj Joseph07cc5372022-07-18 22:53:51 +05301103 GetController().ProcessFlowModResultIndication(cntx, flowResult)
Naveen Sampath04696f72022-06-13 15:19:14 +05301104}