blob: 0be505a510e5a5f707feecb57d5c6c00cdb85a06 [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
vinokuma926cb3e2023-03-29 11:41:06 +053014 */
Naveen Sampath04696f72022-06-13 15:19:14 +053015
16package controller
17
18import (
19 "context"
20 "strconv"
21 "time"
22
23 "voltha-go-controller/internal/pkg/intf"
24 "voltha-go-controller/internal/pkg/of"
25 "voltha-go-controller/internal/pkg/tasks"
26 "voltha-go-controller/internal/pkg/util"
Tinoj Joseph1d108322022-07-13 10:07:39 +053027 "voltha-go-controller/log"
vinokuma926cb3e2023-03-29 11:41:06 +053028
Naveen Sampath04696f72022-06-13 15:19:14 +053029 "github.com/opencord/voltha-protos/v5/go/common"
30 ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
31 "github.com/opencord/voltha-protos/v5/go/voltha"
32)
33
34var (
35 rcvdGroups map[uint32]*ofp.OfpGroupDesc
36 groupsToAdd []*of.Group
37 groupsToMod []*of.Group
38)
39
40// AuditTablesTask structure
41type AuditTablesTask struct {
Naveen Sampath04696f72022-06-13 15:19:14 +053042 ctx context.Context
43 device *Device
Naveen Sampath04696f72022-06-13 15:19:14 +053044 timestamp string
vinokuma926cb3e2023-03-29 11:41:06 +053045 taskID uint8
46 stop bool
Naveen Sampath04696f72022-06-13 15:19:14 +053047}
48
49// NewAuditTablesTask is constructor for AuditTablesTask
50func NewAuditTablesTask(device *Device) *AuditTablesTask {
51 var att AuditTablesTask
52 att.device = device
53 att.stop = false
54 tstamp := (time.Now()).Format(time.RFC3339Nano)
55 att.timestamp = tstamp
56 return &att
57}
58
59// Name returns name of the task
60func (att *AuditTablesTask) Name() string {
61 return "Audit Table Task"
62}
63
64// TaskID to return task id of the task
65func (att *AuditTablesTask) TaskID() uint8 {
66 return att.taskID
67}
68
69// Timestamp to return timestamp for the task
70func (att *AuditTablesTask) Timestamp() string {
71 return att.timestamp
72}
73
74// Stop to stop the task
75func (att *AuditTablesTask) Stop() {
76 att.stop = true
77}
78
79// Start is called by the framework and is responsible for implementing
80// the actual task.
81func (att *AuditTablesTask) Start(ctx context.Context, taskID uint8) error {
Akash Sonie863fe42023-11-30 14:35:01 +053082 logger.Debugw(ctx, "Audit Table Task Triggered", log.Fields{"Context": ctx, "taskId": taskID, "Device": att.device.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +053083 att.taskID = taskID
84 att.ctx = ctx
85 var errInfo error
86 var err error
87
Tinoj Josephaf37ce82022-12-28 11:59:43 +053088 // Audit ports
89 if err = att.AuditPorts(); err != nil {
90 logger.Errorw(ctx, "Audit Ports Failed", log.Fields{"Reason": err.Error()})
91 errInfo = err
92 }
93
Naveen Sampath04696f72022-06-13 15:19:14 +053094 // Audit the meters
95 if err = att.AuditMeters(); err != nil {
96 logger.Errorw(ctx, "Audit Meters Failed", log.Fields{"Reason": err.Error()})
97 errInfo = err
98 }
99
100 // Audit the Groups
101 if rcvdGroups, err = att.AuditGroups(); err != nil {
102 logger.Errorw(ctx, "Audit Groups Failed", log.Fields{"Reason": err.Error()})
103 errInfo = err
104 }
105
106 // Audit the flows
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530107 if err = att.AuditFlows(ctx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530108 logger.Errorw(ctx, "Audit Flows Failed", log.Fields{"Reason": err.Error()})
109 errInfo = err
110 }
111
112 // Triggering deletion of excess groups from device after the corresponding flows are removed
113 // to avoid flow dependency error during group deletion
Akash Soni6168f312023-05-18 20:57:33 +0530114 logger.Debugw(ctx, "Excess Groups", log.Fields{"Groups": rcvdGroups})
Naveen Sampath04696f72022-06-13 15:19:14 +0530115 att.DelExcessGroups(rcvdGroups)
Akash Sonie863fe42023-11-30 14:35:01 +0530116 logger.Debugw(ctx, "Audit Table Task Completed", log.Fields{"Context": ctx, "taskId": taskID, "Device": att.device.ID})
Naveen Sampath04696f72022-06-13 15:19:14 +0530117 return errInfo
Naveen Sampath04696f72022-06-13 15:19:14 +0530118}
119
120// AuditMeters : Audit the meters which includes fetching the existing meters at the
121// voltha and identifying the delta between the ones held here and the
122// ones held at VOLTHA. The delta must be cleaned up to keep both the
123// components in sync
124func (att *AuditTablesTask) AuditMeters() error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530125 if att.stop {
126 return tasks.ErrTaskCancelError
127 }
128 var vc voltha.VolthaServiceClient
129 if vc = att.device.VolthaClient(); vc == nil {
130 logger.Error(ctx, "Fetch Device Meters Failed: Voltha Client Unavailable")
131 return nil
132 }
133
134 //-----------------------------
135 // Perform the audit of meters
136 // Fetch the meters
137 ms, err := vc.ListLogicalDeviceMeters(att.ctx, &voltha.ID{Id: att.device.ID})
138 if err != nil {
139 logger.Warnw(ctx, "Audit of flows failed", log.Fields{"Reason": err.Error()})
140 return err
141 }
142
143 // Build the map for easy and faster processing
144 rcvdMeters := make(map[uint32]*ofp.OfpMeterStats)
145 for _, m := range ms.Items {
146 rcvdMeters[m.Stats.MeterId] = m.Stats
147 }
148
149 // Verify all meters that are in the controller but not in the device
150 missingMeters := []*of.Meter{}
151 for _, meter := range att.device.meters {
Naveen Sampath04696f72022-06-13 15:19:14 +0530152 if att.stop {
153 break
154 }
155 logger.Debugw(ctx, "Auditing Meter", log.Fields{"Id": meter.ID})
156
157 if _, ok := rcvdMeters[meter.ID]; ok {
158 // The meter exists in the device too. Just remove it from
159 // the received meters
160 delete(rcvdMeters, meter.ID)
161 } else {
162 // The flow exists at the controller but not at the device
163 // Push the flow to the device
164 logger.Debugw(ctx, "Adding Meter To Missing Meters", log.Fields{"Id": meter.ID})
165 missingMeters = append(missingMeters, meter)
166 }
167 }
168 if !att.stop {
169 att.AddMissingMeters(missingMeters)
170 att.DelExcessMeters(rcvdMeters)
171 } else {
172 err = tasks.ErrTaskCancelError
173 }
174 return err
175}
176
177// AddMissingMeters adds the missing meters detected by AuditMeters
178func (att *AuditTablesTask) AddMissingMeters(meters []*of.Meter) {
179 logger.Debugw(ctx, "Adding missing meters", log.Fields{"Number": len(meters)})
180 for _, meter := range meters {
181 meterMod, err := of.MeterUpdate(att.device.ID, of.MeterCommandAdd, meter)
182 if err != nil {
183 logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
184 continue
185 }
186 if vc := att.device.VolthaClient(); vc != nil {
187 if _, err = vc.UpdateLogicalDeviceMeterTable(att.ctx, meterMod); err != nil {
188 logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
189 }
190 } else {
191 logger.Error(ctx, "Update Meter Table Failed: Voltha Client Unavailable")
192 }
193 }
194}
195
196// DelExcessMeters to delete excess meters
197func (att *AuditTablesTask) DelExcessMeters(meters map[uint32]*ofp.OfpMeterStats) {
198 logger.Debugw(ctx, "Deleting Excess Meters", log.Fields{"Number": len(meters)})
199 for _, meter := range meters {
200 meterMod := &ofp.OfpMeterMod{}
201 meterMod.Command = ofp.OfpMeterModCommand_OFPMC_DELETE
202 meterMod.MeterId = meter.MeterId
203 meterUpd := &ofp.MeterModUpdate{Id: att.device.ID, MeterMod: meterMod}
204 if vc := att.device.VolthaClient(); vc != nil {
205 if _, err := vc.UpdateLogicalDeviceMeterTable(att.ctx, meterUpd); err != nil {
206 logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
207 }
208 } else {
209 logger.Error(ctx, "Update Meter Table Failed: Voltha Client Unavailable")
210 }
211 }
212}
213
214// AuditFlows audit the flows which includes fetching the existing meters at the
215// voltha and identifying the delta between the ones held here and the
216// ones held at VOLTHA. The delta must be cleaned up to keep both the
217// components in sync
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530218func (att *AuditTablesTask) AuditFlows(cntx context.Context) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530219 if att.stop {
220 return tasks.ErrTaskCancelError
221 }
222
223 var vc voltha.VolthaServiceClient
224 if vc = att.device.VolthaClient(); vc == nil {
225 logger.Error(ctx, "Flow Audit Failed: Voltha Client Unavailable")
226 return nil
227 }
228
229 // ---------------------------------
230 // Perform the audit of flows first
231 // Retrieve the flows from the device
232 f, err := vc.ListLogicalDeviceFlows(att.ctx, &common.ID{Id: att.device.ID})
233 if err != nil {
234 logger.Warnw(ctx, "Audit of flows failed", log.Fields{"Reason": err.Error()})
235 return err
236 }
237
238 defaultSuccessFlowStatus := intf.FlowStatus{
239 Device: att.device.ID,
240 FlowModType: of.CommandAdd,
241 Status: 0,
242 Reason: "",
243 }
244
245 // Build the map for easy and faster processing
246 rcvdFlows := make(map[uint64]*ofp.OfpFlowStats)
247 flowsToAdd := &of.VoltFlow{}
248 flowsToAdd.SubFlows = make(map[uint64]*of.VoltSubFlow)
249 for _, flow := range f.Items {
250 rcvdFlows[flow.Cookie] = flow
251 }
252
253 att.device.flowLock.Lock()
254 // Verify all flows that are in the controller but not in the device
255 for _, flow := range att.device.flows {
Naveen Sampath04696f72022-06-13 15:19:14 +0530256 if att.stop {
257 break
258 }
259
Akash Sonief452f12024-12-12 18:20:28 +0530260 logger.Debugw(ctx, "Auditing Flow", log.Fields{"Cookie": flow.Cookie, "State": flow.State})
Naveen Sampath04696f72022-06-13 15:19:14 +0530261 if _, ok := rcvdFlows[flow.Cookie]; ok {
262 // The flow exists in the device too. Just remove it from
263 // the received flows & trigger flow success indication unless
264 // the flow in del failure/pending state
265
266 if flow.State != of.FlowDelFailure && flow.State != of.FlowDelPending {
267 delete(rcvdFlows, flow.Cookie)
Akash Sonief452f12024-12-12 18:20:28 +0530268 } else {
269 // Update flow delete count since we are retrying the flow delete due to failure
270 att.device.UpdateFlowCount(cntx, flow.Cookie)
Naveen Sampath04696f72022-06-13 15:19:14 +0530271 }
272 defaultSuccessFlowStatus.Cookie = strconv.FormatUint(flow.Cookie, 10)
Naveen Sampath04696f72022-06-13 15:19:14 +0530273 } else {
274 // The flow exists at the controller but not at the device
275 // Push the flow to the device
276 logger.Debugw(ctx, "Adding Flow To Missing Flows", log.Fields{"Cookie": flow.Cookie})
Akash Sonief452f12024-12-12 18:20:28 +0530277 if !att.device.IsFlowAddThresholdReached(flow.FlowCount, flow.Cookie) {
278 flowsToAdd.SubFlows[flow.Cookie] = flow
279 att.device.UpdateFlowCount(cntx, flow.Cookie)
280 } else if flow.State != of.FlowDelFailure {
281 // Release the lock before deactivating service, as we acquire the same lock to delete flows
282 att.device.flowLock.Unlock()
283 // If flow add threshold has reached, deactivate the service corresponding to the UNI
284 GetController().CheckAndDeactivateService(cntx, flow, att.device.SerialNum, att.device.ID)
285 // Acquire the lock again for processing remaining flows
286 att.device.flowLock.Lock()
287 }
Naveen Sampath04696f72022-06-13 15:19:14 +0530288 }
289 }
290 att.device.flowLock.Unlock()
291
292 if !att.stop {
293 // The flows remaining in the received flows are the excess flows at
294 // the device. Delete those flows
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530295 att.DelExcessFlows(cntx, rcvdFlows)
Naveen Sampath04696f72022-06-13 15:19:14 +0530296 // Add the flows missing at the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530297 att.AddMissingFlows(cntx, flowsToAdd)
Naveen Sampath04696f72022-06-13 15:19:14 +0530298 } else {
299 err = tasks.ErrTaskCancelError
300 }
301 return err
302}
303
304// AddMissingFlows : The flows missing from the device are reinstalled att the audit
305// The flows are added into a VoltFlow structure.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530306func (att *AuditTablesTask) AddMissingFlows(cntx context.Context, mflow *of.VoltFlow) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530307 logger.Debugw(ctx, "Add Missing Flows", log.Fields{"Number": len(mflow.SubFlows)})
308 mflow.Command = of.CommandAdd
309 ofFlows := of.ProcessVoltFlow(att.device.ID, mflow.Command, mflow.SubFlows)
310 var vc voltha.VolthaServiceClient
311 var bwConsumedInfo of.BwAvailDetails
312 if vc = att.device.VolthaClient(); vc == nil {
313 logger.Error(ctx, "Update Flow Table Failed: Voltha Client Unavailable")
314 return
315 }
316 for _, flow := range ofFlows {
Naveen Sampath04696f72022-06-13 15:19:14 +0530317 if flow.FlowMod != nil {
Akash Sonief452f12024-12-12 18:20:28 +0530318 if _, present := att.device.GetFlow(flow.FlowMod.Cookie); !present {
Tinoj Joseph1d108322022-07-13 10:07:39 +0530319 logger.Warnw(ctx, "Flow Removed from DB. Ignoring Add Missing Flow", log.Fields{"Device": att.device.ID, "Cookie": flow.FlowMod.Cookie})
Naveen Sampath04696f72022-06-13 15:19:14 +0530320 continue
321 }
322 }
323 var err error
324 if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flow); err != nil {
325 logger.Errorw(ctx, "Update Flow Table Failed", log.Fields{"Reason": err.Error()})
326 }
Akash Sonief452f12024-12-12 18:20:28 +0530327 att.device.triggerFlowNotification(cntx, flow.FlowMod.Cookie, of.CommandAdd, bwConsumedInfo, err)
Naveen Sampath04696f72022-06-13 15:19:14 +0530328 }
329}
330
331// DelExcessFlows delete the excess flows held at the VOLTHA
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530332func (att *AuditTablesTask) DelExcessFlows(cntx context.Context, flows map[uint64]*ofp.OfpFlowStats) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530333 logger.Debugw(ctx, "Deleting Excess Flows", log.Fields{"Number of Flows": len(flows)})
334
335 var vc voltha.VolthaServiceClient
336 if vc = att.device.VolthaClient(); vc == nil {
337 logger.Error(ctx, "Delete Excess Flows Failed: Voltha Client Unavailable")
338 return
339 }
340
341 // Let's cycle through the flows to delete the excess flows
342 for _, flow := range flows {
Akash Sonief452f12024-12-12 18:20:28 +0530343 if dbFlow, present := att.device.GetFlow(flow.Cookie); present {
344 if dbFlow.State != of.FlowDelFailure && dbFlow.State != of.FlowDelPending {
345 logger.Warnw(ctx, "Flow Present in DB. Ignoring Delete Excess Flow", log.Fields{"Device": att.device.ID, "Cookie": flow.Cookie})
346 continue
347 }
348 } else {
349 logger.Debugw(ctx, "Flow removed from DB after delete threshold reached. Ignoring Delete Excess Flow", log.Fields{"Device": att.device.ID, "Cookie": flow.Cookie})
Sridhar Ravindra3ec14232024-01-01 19:11:48 +0530350 continue
351 }
352
Naveen Sampath04696f72022-06-13 15:19:14 +0530353 logger.Debugw(ctx, "Deleting Flow", log.Fields{"Cookie": flow.Cookie})
354 // Create the flowMod structure and fill it out
355 flowMod := &ofp.OfpFlowMod{}
356 flowMod.Cookie = flow.Cookie
357 flowMod.TableId = flow.TableId
358 flowMod.Command = ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT
359 flowMod.IdleTimeout = flow.IdleTimeout
360 flowMod.HardTimeout = flow.HardTimeout
361 flowMod.Priority = flow.Priority
362 flowMod.BufferId = of.DefaultBufferID
363 flowMod.OutPort = of.DefaultOutPort
364 flowMod.OutGroup = of.DefaultOutGroup
365 flowMod.Flags = flow.Flags
366 flowMod.Match = flow.Match
367 flowMod.Instructions = flow.Instructions
368
369 // Create FlowTableUpdate
370 flowUpdate := &ofp.FlowTableUpdate{
371 Id: att.device.ID,
372 FlowMod: flowMod,
373 }
374
375 var err error
376 if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flowUpdate); err != nil {
377 logger.Errorw(ctx, "Flow Audit Delete Failed", log.Fields{"Reason": err.Error()})
378 }
Akash Sonief452f12024-12-12 18:20:28 +0530379 att.device.triggerFlowNotification(cntx, flow.Cookie, of.CommandDel, of.BwAvailDetails{}, err)
Naveen Sampath04696f72022-06-13 15:19:14 +0530380 }
381}
382
383// AuditGroups audit the groups which includes fetching the existing groups at the
384// voltha and identifying the delta between the ones held here and the
385// ones held at VOLTHA. The delta must be cleaned up to keep both the
386// components in sync
387func (att *AuditTablesTask) AuditGroups() (map[uint32]*ofp.OfpGroupDesc, error) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530388 // Build the map for easy and faster processing
389 rcvdGroups = make(map[uint32]*ofp.OfpGroupDesc)
390
391 if att.stop {
392 return rcvdGroups, tasks.ErrTaskCancelError
393 }
394
395 var vc voltha.VolthaServiceClient
396 if vc = att.device.VolthaClient(); vc == nil {
397 logger.Error(ctx, "Group Audit Failed: Voltha Client Unavailable")
398 return rcvdGroups, nil
399 }
400
401 // ---------------------------------
402 // Perform the audit of groups first
403 // Retrieve the groups from the device
404 g, err := vc.ListLogicalDeviceFlowGroups(att.ctx, &common.ID{Id: att.device.ID})
405 if err != nil {
406 logger.Warnw(ctx, "Audit of groups failed", log.Fields{"Reason": err.Error()})
407 return rcvdGroups, err
408 }
409
410 groupsToAdd = []*of.Group{}
411 groupsToMod = []*of.Group{}
412 for _, group := range g.Items {
413 rcvdGroups[group.Desc.GroupId] = group.Desc
414 }
Akash Sonie863fe42023-11-30 14:35:01 +0530415 logger.Debugw(ctx, "Received Groups", log.Fields{"Groups": rcvdGroups})
Naveen Sampath04696f72022-06-13 15:19:14 +0530416
417 // Verify all groups that are in the controller but not in the device
418 att.device.groups.Range(att.compareGroupEntries)
419
420 if !att.stop {
421 // Add the groups missing at the device
Akash Sonie863fe42023-11-30 14:35:01 +0530422 logger.Debugw(ctx, "Missing Groups", log.Fields{"Groups": groupsToAdd})
Naveen Sampath04696f72022-06-13 15:19:14 +0530423 att.AddMissingGroups(groupsToAdd)
424
425 // Update groups with group member mismatch
Akash Sonie863fe42023-11-30 14:35:01 +0530426 logger.Debugw(ctx, "Modify Groups", log.Fields{"Groups": groupsToMod})
Naveen Sampath04696f72022-06-13 15:19:14 +0530427 att.UpdateMismatchGroups(groupsToMod)
428
429 // Note: Excess groups will be deleted after ensuring the connected
430 // flows are also removed as part fo audit flows
431 } else {
432 err = tasks.ErrTaskCancelError
433 }
434 // The groups remaining in the received groups are the excess groups at
435 // the device
436 return rcvdGroups, err
437}
438
439// compareGroupEntries to compare the group entries
440func (att *AuditTablesTask) compareGroupEntries(key, value interface{}) bool {
Naveen Sampath04696f72022-06-13 15:19:14 +0530441 if att.stop {
442 return false
443 }
444
445 groupID := key.(uint32)
446 dbGroup := value.(*of.Group)
447 logger.Debugw(ctx, "Auditing Group", log.Fields{"Groupid": groupID})
448 if rcvdGrp, ok := rcvdGroups[groupID]; ok {
449 // The group exists in the device too.
450 // Compare the group members and add to modify list if required
451 compareGroupMembers(dbGroup, rcvdGrp)
452 delete(rcvdGroups, groupID)
453 } else {
454 // The group exists at the controller but not at the device
455 // Push the group to the device
456 logger.Debugw(ctx, "Adding Group To Missing Groups", log.Fields{"GroupId": groupID})
457 groupsToAdd = append(groupsToAdd, value.(*of.Group))
458 }
459 return true
460}
461
462func compareGroupMembers(refGroup *of.Group, rcvdGroup *ofp.OfpGroupDesc) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530463 portList := []uint32{}
464 refPortList := []uint32{}
465
vinokuma926cb3e2023-03-29 11:41:06 +0530466 // Collect port list from response Group Mod structure
467 // If PON is configured even for one group, then only PON shall be considered for compared for all groups
Naveen Sampath04696f72022-06-13 15:19:14 +0530468 for _, bucket := range rcvdGroup.Buckets {
469 for _, actionBucket := range bucket.Actions {
470 if actionBucket.Type == ofp.OfpActionType_OFPAT_OUTPUT {
471 action := actionBucket.GetOutput()
472 portList = append(portList, action.Port)
473 }
474 }
475 }
476
477 refPortList = append(refPortList, refGroup.Buckets...)
478
vinokuma926cb3e2023-03-29 11:41:06 +0530479 // Is port list differs, trigger group update
Naveen Sampath04696f72022-06-13 15:19:14 +0530480 if !util.IsSliceSame(refPortList, portList) {
481 groupsToMod = append(groupsToMod, refGroup)
482 }
483}
484
vinokuma926cb3e2023-03-29 11:41:06 +0530485// AddMissingGroups - addmissing groups to Voltha
Naveen Sampath04696f72022-06-13 15:19:14 +0530486func (att *AuditTablesTask) AddMissingGroups(groupList []*of.Group) {
487 att.PushGroups(groupList, of.GroupCommandAdd)
488}
489
vinokuma926cb3e2023-03-29 11:41:06 +0530490// UpdateMismatchGroups - updates mismatched groups to Voltha
Naveen Sampath04696f72022-06-13 15:19:14 +0530491func (att *AuditTablesTask) UpdateMismatchGroups(groupList []*of.Group) {
492 att.PushGroups(groupList, of.GroupCommandMod)
493}
494
495// PushGroups - The groups missing/to be updated in the device are reinstalled att the audit
496func (att *AuditTablesTask) PushGroups(groupList []*of.Group, grpCommand of.GroupCommand) {
497 logger.Debugw(ctx, "Pushing Groups", log.Fields{"Number": len(groupList), "Command": grpCommand})
498
499 var vc voltha.VolthaServiceClient
500 if vc = att.device.VolthaClient(); vc == nil {
501 logger.Error(ctx, "Update Group Table Failed: Voltha Client Unavailable")
502 return
503 }
504 for _, group := range groupList {
505 group.Command = grpCommand
506 groupUpdate := of.CreateGroupTableUpdate(group)
507 if _, err := vc.UpdateLogicalDeviceFlowGroupTable(att.ctx, groupUpdate); err != nil {
508 logger.Errorw(ctx, "Update Group Table Failed", log.Fields{"Reason": err.Error()})
509 }
510 }
511}
512
513// DelExcessGroups - Delete the excess groups held at the VOLTHA
514func (att *AuditTablesTask) DelExcessGroups(groups map[uint32]*ofp.OfpGroupDesc) {
515 logger.Debugw(ctx, "Deleting Excess Groups", log.Fields{"Number of Groups": len(groups)})
516
517 var vc voltha.VolthaServiceClient
518 if vc = att.device.VolthaClient(); vc == nil {
519 logger.Error(ctx, "Delete Excess Groups Failed: Voltha Client Unavailable")
520 return
521 }
522
523 // Let's cycle through the groups to delete the excess groups
524 for _, groupDesc := range groups {
525 logger.Debugw(ctx, "Deleting Group", log.Fields{"GroupId": groupDesc.GroupId})
526 group := &of.Group{}
527 group.Device = att.device.ID
528 group.GroupID = groupDesc.GroupId
529
vinokuma926cb3e2023-03-29 11:41:06 +0530530 // Group Members should be deleted before triggered group delete
Naveen Sampath04696f72022-06-13 15:19:14 +0530531 group.Command = of.GroupCommandMod
532 groupUpdate := of.CreateGroupTableUpdate(group)
533 if _, err := vc.UpdateLogicalDeviceFlowGroupTable(att.ctx, groupUpdate); err != nil {
534 logger.Errorw(ctx, "Update Group Table Failed", log.Fields{"Reason": err.Error()})
535 }
536
537 group.Command = of.GroupCommandDel
538 groupUpdate = of.CreateGroupTableUpdate(group)
539 if _, err := vc.UpdateLogicalDeviceFlowGroupTable(att.ctx, groupUpdate); err != nil {
540 logger.Errorw(ctx, "Update Group Table Failed", log.Fields{"Reason": err.Error()})
541 }
542 }
543}
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530544
545func (att *AuditTablesTask) AuditPorts() error {
vinokuma926cb3e2023-03-29 11:41:06 +0530546 if att.stop {
547 return tasks.ErrTaskCancelError
548 }
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530549
vinokuma926cb3e2023-03-29 11:41:06 +0530550 var vc voltha.VolthaServiceClient
551 if vc = att.device.VolthaClient(); vc == nil {
552 logger.Error(ctx, "Flow Audit Failed: Voltha Client Unavailable")
553 return nil
554 }
555 ofpps, err := vc.ListLogicalDevicePorts(att.ctx, &common.ID{Id: att.device.ID})
556 if err != nil {
557 return err
558 }
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530559
vinokuma926cb3e2023-03-29 11:41:06 +0530560 // Compute the difference between the ports received and ports at VGC
561 // First build a map of all the received ports under missing ports. We
562 // will eliminate the ports that are in the device from the missing ports
563 // so that the elements remaining are missing ports. The ones that are
564 // not in missing ports are added to excess ports which should be deleted
565 // from the VGC.
566 missingPorts := make(map[uint32]*ofp.OfpPort)
567 for _, ofpp := range ofpps.Items {
568 missingPorts[ofpp.OfpPort.PortNo] = ofpp.OfpPort
569 }
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530570
Sridhar Ravindra0bc5dc52023-12-13 19:03:30 +0530571 excessPorts := make(map[uint32]*DevicePort)
vinokuma926cb3e2023-03-29 11:41:06 +0530572 processPortState := func(id uint32, vgcPort *DevicePort) {
573 logger.Debugw(ctx, "Process Port State Ind", log.Fields{"Port No": vgcPort.ID, "Port Name": vgcPort.Name})
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530574
vinokuma926cb3e2023-03-29 11:41:06 +0530575 if ofpPort, ok := missingPorts[id]; ok {
Akash Sonief452f12024-12-12 18:20:28 +0530576 if vgcPort.Name != ofpPort.Name {
577 logger.Infow(ctx, "Port Name Mismatch", log.Fields{"vgcPort": vgcPort.Name, "ofpPort": ofpPort.Name, "ID": id})
578 att.DeleteMismatchPorts(ctx, vgcPort, ofpPort.Name)
579 return
580 }
vinokuma926cb3e2023-03-29 11:41:06 +0530581 if ((vgcPort.State == PortStateDown) && (ofpPort.State == uint32(ofp.OfpPortState_OFPPS_LIVE))) || ((vgcPort.State == PortStateUp) && (ofpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE))) {
582 // This port exists in the received list and the map at
583 // VGC. This is common so delete it
584 logger.Infow(ctx, "Port State Mismatch", log.Fields{"Port": vgcPort.ID, "OfpPort": ofpPort.PortNo, "ReceivedState": ofpPort.State, "CurrentState": vgcPort.State})
Akash Sonief452f12024-12-12 18:20:28 +0530585 att.device.ProcessPortState(ctx, ofpPort.PortNo, ofpPort.State, ofpPort.Name)
vinokuma926cb3e2023-03-29 11:41:06 +0530586 }
587 delete(missingPorts, id)
588 } else {
589 // This port is missing from the received list. This is an
590 // excess port at VGC. This must be added to excess ports
Sridhar Ravindra0bc5dc52023-12-13 19:03:30 +0530591 excessPorts[id] = vgcPort
vinokuma926cb3e2023-03-29 11:41:06 +0530592 }
593 logger.Debugw(ctx, "Processed Port State Ind", log.Fields{"Port No": vgcPort.ID, "Port Name": vgcPort.Name})
594 }
595 // 1st process the NNI port before all other ports so that the device state can be updated.
596 if vgcPort, ok := att.device.PortsByID[NNIPortID]; ok {
Akash Soni6168f312023-05-18 20:57:33 +0530597 logger.Debugw(ctx, "Processing NNI port state", log.Fields{"Port ID": vgcPort.ID, "Port Name": vgcPort.Name})
vinokuma926cb3e2023-03-29 11:41:06 +0530598 processPortState(NNIPortID, vgcPort)
599 }
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530600
vinokuma926cb3e2023-03-29 11:41:06 +0530601 for id, vgcPort := range att.device.PortsByID {
602 if id == NNIPortID {
603 // NNI port already processed
604 continue
605 }
606 if att.stop {
607 break
608 }
609 processPortState(id, vgcPort)
610 }
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530611
612 if att.stop {
Akash Soni6168f312023-05-18 20:57:33 +0530613 logger.Warnw(ctx, "Audit Device Task Canceled", log.Fields{"Context": att.ctx, "Task": att.taskID})
vinokuma926cb3e2023-03-29 11:41:06 +0530614 return tasks.ErrTaskCancelError
615 }
616 att.AddMissingPorts(ctx, missingPorts)
617 att.DelExcessPorts(ctx, excessPorts)
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530618 return nil
619}
620
621// AddMissingPorts to add the missing ports
622func (att *AuditTablesTask) AddMissingPorts(cntx context.Context, mps map[uint32]*ofp.OfpPort) {
Sridhar Ravindra3ec14232024-01-01 19:11:48 +0530623 logger.Debugw(ctx, "Device Audit - Add Missing Ports", log.Fields{"NumPorts": len(mps)})
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530624
vinokuma926cb3e2023-03-29 11:41:06 +0530625 addMissingPort := func(mp *ofp.OfpPort) {
626 logger.Debugw(ctx, "Process Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530627
vinokuma926cb3e2023-03-29 11:41:06 +0530628 if err := att.device.AddPort(cntx, mp); err != nil {
629 logger.Warnw(ctx, "AddPort Failed", log.Fields{"No": mp.PortNo, "Name": mp.Name, "Reason": err})
630 }
631 if mp.State == uint32(ofp.OfpPortState_OFPPS_LIVE) {
Akash Sonief452f12024-12-12 18:20:28 +0530632 att.device.ProcessPortState(cntx, mp.PortNo, mp.State, mp.Name)
vinokuma926cb3e2023-03-29 11:41:06 +0530633 }
634 logger.Debugw(ctx, "Processed Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
635 }
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530636
vinokuma926cb3e2023-03-29 11:41:06 +0530637 // 1st process the NNI port before all other ports so that the flow provisioning for UNIs can be enabled
638 if mp, ok := mps[NNIPortID]; ok {
Akash Soni6168f312023-05-18 20:57:33 +0530639 logger.Debugw(ctx, "Adding Missing NNI port", log.Fields{"PortNo": mp.PortNo})
vinokuma926cb3e2023-03-29 11:41:06 +0530640 addMissingPort(mp)
641 }
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530642
vinokuma926cb3e2023-03-29 11:41:06 +0530643 for portNo, mp := range mps {
644 if portNo != NNIPortID {
645 addMissingPort(mp)
646 }
647 }
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530648}
649
650// DelExcessPorts to delete the excess ports
Sridhar Ravindra0bc5dc52023-12-13 19:03:30 +0530651func (att *AuditTablesTask) DelExcessPorts(cntx context.Context, eps map[uint32]*DevicePort) {
vinokuma926cb3e2023-03-29 11:41:06 +0530652 logger.Debugw(ctx, "Device Audit - Delete Excess Ports", log.Fields{"NumPorts": len(eps)})
Sridhar Ravindra0bc5dc52023-12-13 19:03:30 +0530653 for portNo, ep := range eps {
vinokuma926cb3e2023-03-29 11:41:06 +0530654 // Now delete the port from the device @ VGC
Sridhar Ravindra0bc5dc52023-12-13 19:03:30 +0530655 logger.Debugw(ctx, "Device Audit - Deleting Port", log.Fields{"PortId": portNo})
656 if err := att.device.DelPort(cntx, ep.ID, ep.Name); err != nil {
657 logger.Warnw(ctx, "DelPort Failed", log.Fields{"PortId": portNo, "Reason": err})
vinokuma926cb3e2023-03-29 11:41:06 +0530658 }
659 }
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530660}
Akash Sonief452f12024-12-12 18:20:28 +0530661
662func (att *AuditTablesTask) DeleteMismatchPorts(cntx context.Context, vgcPort *DevicePort, ofpPortName string) {
663 logger.Infow(ctx, "Deleting port in VGC due to mismatch with voltha", log.Fields{"vgcPortID": vgcPort.ID, "vgcPortName": vgcPort.Name})
664 _ = att.device.DelPort(cntx, vgcPort.ID, vgcPort.Name)
665 if p := att.device.GetPortByName(ofpPortName); p != nil {
666 logger.Infow(ctx, "Delete port by name in VGC due to mismatch with voltha", log.Fields{"portID": p.ID, "portName": p.Name})
667 _ = att.device.DelPort(cntx, p.ID, p.Name)
668 }
669}