blob: 486d5603cffb92691ec25d721bca6a8783f79bd1 [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14*/
15
16package controller
17
18import (
19 "context"
20 "strconv"
21 "time"
22
23 "voltha-go-controller/internal/pkg/intf"
24 "voltha-go-controller/internal/pkg/of"
25 "voltha-go-controller/internal/pkg/tasks"
26 "voltha-go-controller/internal/pkg/util"
27 "github.com/opencord/voltha-lib-go/v7/pkg/log"
28 "github.com/opencord/voltha-protos/v5/go/common"
29 ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
30 "github.com/opencord/voltha-protos/v5/go/voltha"
31)
32
33var (
34 rcvdGroups map[uint32]*ofp.OfpGroupDesc
35 groupsToAdd []*of.Group
36 groupsToMod []*of.Group
37)
38
39// AuditTablesTask structure
40type AuditTablesTask struct {
41 taskID uint8
42 ctx context.Context
43 device *Device
44 stop bool
45 timestamp string
46}
47
48// NewAuditTablesTask is constructor for AuditTablesTask
49func NewAuditTablesTask(device *Device) *AuditTablesTask {
50 var att AuditTablesTask
51 att.device = device
52 att.stop = false
53 tstamp := (time.Now()).Format(time.RFC3339Nano)
54 att.timestamp = tstamp
55 return &att
56}
57
58// Name returns name of the task
59func (att *AuditTablesTask) Name() string {
60 return "Audit Table Task"
61}
62
63// TaskID to return task id of the task
64func (att *AuditTablesTask) TaskID() uint8 {
65 return att.taskID
66}
67
68// Timestamp to return timestamp for the task
69func (att *AuditTablesTask) Timestamp() string {
70 return att.timestamp
71}
72
73// Stop to stop the task
74func (att *AuditTablesTask) Stop() {
75 att.stop = true
76}
77
78// Start is called by the framework and is responsible for implementing
79// the actual task.
80func (att *AuditTablesTask) Start(ctx context.Context, taskID uint8) error {
81 logger.Warnw(ctx, "Audit Table Task Triggered", log.Fields{"Context": ctx, "taskId": taskID, "Device": att.device.ID})
82 att.taskID = taskID
83 att.ctx = ctx
84 var errInfo error
85 var err error
86
87 // Audit the meters
88 if err = att.AuditMeters(); err != nil {
89 logger.Errorw(ctx, "Audit Meters Failed", log.Fields{"Reason": err.Error()})
90 errInfo = err
91 }
92
93 // Audit the Groups
94 if rcvdGroups, err = att.AuditGroups(); err != nil {
95 logger.Errorw(ctx, "Audit Groups Failed", log.Fields{"Reason": err.Error()})
96 errInfo = err
97 }
98
99 // Audit the flows
100 if err = att.AuditFlows(); err != nil {
101 logger.Errorw(ctx, "Audit Flows Failed", log.Fields{"Reason": err.Error()})
102 errInfo = err
103 }
104
105 // Triggering deletion of excess groups from device after the corresponding flows are removed
106 // to avoid flow dependency error during group deletion
107 logger.Infow(ctx, "Excess Groups", log.Fields{"Groups": rcvdGroups})
108 att.DelExcessGroups(rcvdGroups)
109 logger.Warnw(ctx, "Audit Table Task Completed", log.Fields{"Context": ctx, "taskId": taskID, "Device": att.device.ID})
110 return errInfo
111
112}
113
114// AuditMeters : Audit the meters which includes fetching the existing meters at the
115// voltha and identifying the delta between the ones held here and the
116// ones held at VOLTHA. The delta must be cleaned up to keep both the
117// components in sync
118func (att *AuditTablesTask) AuditMeters() error {
119
120 if att.stop {
121 return tasks.ErrTaskCancelError
122 }
123 var vc voltha.VolthaServiceClient
124 if vc = att.device.VolthaClient(); vc == nil {
125 logger.Error(ctx, "Fetch Device Meters Failed: Voltha Client Unavailable")
126 return nil
127 }
128
129 //-----------------------------
130 // Perform the audit of meters
131 // Fetch the meters
132 ms, err := vc.ListLogicalDeviceMeters(att.ctx, &voltha.ID{Id: att.device.ID})
133 if err != nil {
134 logger.Warnw(ctx, "Audit of flows failed", log.Fields{"Reason": err.Error()})
135 return err
136 }
137
138 // Build the map for easy and faster processing
139 rcvdMeters := make(map[uint32]*ofp.OfpMeterStats)
140 for _, m := range ms.Items {
141 rcvdMeters[m.Stats.MeterId] = m.Stats
142 }
143
144 // Verify all meters that are in the controller but not in the device
145 missingMeters := []*of.Meter{}
146 for _, meter := range att.device.meters {
147
148 if att.stop {
149 break
150 }
151 logger.Debugw(ctx, "Auditing Meter", log.Fields{"Id": meter.ID})
152
153 if _, ok := rcvdMeters[meter.ID]; ok {
154 // The meter exists in the device too. Just remove it from
155 // the received meters
156 delete(rcvdMeters, meter.ID)
157 } else {
158 // The flow exists at the controller but not at the device
159 // Push the flow to the device
160 logger.Debugw(ctx, "Adding Meter To Missing Meters", log.Fields{"Id": meter.ID})
161 missingMeters = append(missingMeters, meter)
162 }
163 }
164 if !att.stop {
165 att.AddMissingMeters(missingMeters)
166 att.DelExcessMeters(rcvdMeters)
167 } else {
168 err = tasks.ErrTaskCancelError
169 }
170 return err
171}
172
173// AddMissingMeters adds the missing meters detected by AuditMeters
174func (att *AuditTablesTask) AddMissingMeters(meters []*of.Meter) {
175 logger.Debugw(ctx, "Adding missing meters", log.Fields{"Number": len(meters)})
176 for _, meter := range meters {
177 meterMod, err := of.MeterUpdate(att.device.ID, of.MeterCommandAdd, meter)
178 if err != nil {
179 logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
180 continue
181 }
182 if vc := att.device.VolthaClient(); vc != nil {
183 if _, err = vc.UpdateLogicalDeviceMeterTable(att.ctx, meterMod); err != nil {
184 logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
185 }
186 } else {
187 logger.Error(ctx, "Update Meter Table Failed: Voltha Client Unavailable")
188 }
189 }
190}
191
192// DelExcessMeters to delete excess meters
193func (att *AuditTablesTask) DelExcessMeters(meters map[uint32]*ofp.OfpMeterStats) {
194 logger.Debugw(ctx, "Deleting Excess Meters", log.Fields{"Number": len(meters)})
195 for _, meter := range meters {
196 meterMod := &ofp.OfpMeterMod{}
197 meterMod.Command = ofp.OfpMeterModCommand_OFPMC_DELETE
198 meterMod.MeterId = meter.MeterId
199 meterUpd := &ofp.MeterModUpdate{Id: att.device.ID, MeterMod: meterMod}
200 if vc := att.device.VolthaClient(); vc != nil {
201 if _, err := vc.UpdateLogicalDeviceMeterTable(att.ctx, meterUpd); err != nil {
202 logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
203 }
204 } else {
205 logger.Error(ctx, "Update Meter Table Failed: Voltha Client Unavailable")
206 }
207 }
208}
209
210// AuditFlows audit the flows which includes fetching the existing meters at the
211// voltha and identifying the delta between the ones held here and the
212// ones held at VOLTHA. The delta must be cleaned up to keep both the
213// components in sync
214func (att *AuditTablesTask) AuditFlows() error {
215
216 if att.stop {
217 return tasks.ErrTaskCancelError
218 }
219
220 var vc voltha.VolthaServiceClient
221 if vc = att.device.VolthaClient(); vc == nil {
222 logger.Error(ctx, "Flow Audit Failed: Voltha Client Unavailable")
223 return nil
224 }
225
226 // ---------------------------------
227 // Perform the audit of flows first
228 // Retrieve the flows from the device
229 f, err := vc.ListLogicalDeviceFlows(att.ctx, &common.ID{Id: att.device.ID})
230 if err != nil {
231 logger.Warnw(ctx, "Audit of flows failed", log.Fields{"Reason": err.Error()})
232 return err
233 }
234
235 defaultSuccessFlowStatus := intf.FlowStatus{
236 Device: att.device.ID,
237 FlowModType: of.CommandAdd,
238 Status: 0,
239 Reason: "",
240 }
241
242 // Build the map for easy and faster processing
243 rcvdFlows := make(map[uint64]*ofp.OfpFlowStats)
244 flowsToAdd := &of.VoltFlow{}
245 flowsToAdd.SubFlows = make(map[uint64]*of.VoltSubFlow)
246 for _, flow := range f.Items {
247 rcvdFlows[flow.Cookie] = flow
248 }
249
250 att.device.flowLock.Lock()
251 // Verify all flows that are in the controller but not in the device
252 for _, flow := range att.device.flows {
253
254 if att.stop {
255 break
256 }
257
258 logger.Debugw(ctx, "Auditing Flow", log.Fields{"Cookie": flow.Cookie})
259 if _, ok := rcvdFlows[flow.Cookie]; ok {
260 // The flow exists in the device too. Just remove it from
261 // the received flows & trigger flow success indication unless
262 // the flow in del failure/pending state
263
264 if flow.State != of.FlowDelFailure && flow.State != of.FlowDelPending {
265 delete(rcvdFlows, flow.Cookie)
266 }
267 defaultSuccessFlowStatus.Cookie = strconv.FormatUint(flow.Cookie, 10)
268
269 logger.Infow(ctx, "Triggering Internal Flow Notification", log.Fields{"Flow Status": defaultSuccessFlowStatus})
270 GetController().ProcessFlowModResultIndication(defaultSuccessFlowStatus)
271 } else {
272 // The flow exists at the controller but not at the device
273 // Push the flow to the device
274 logger.Debugw(ctx, "Adding Flow To Missing Flows", log.Fields{"Cookie": flow.Cookie})
275 flowsToAdd.SubFlows[flow.Cookie] = flow
276 }
277 }
278 att.device.flowLock.Unlock()
279
280 if !att.stop {
281 // The flows remaining in the received flows are the excess flows at
282 // the device. Delete those flows
283 att.DelExcessFlows(rcvdFlows)
284 // Add the flows missing at the device
285 att.AddMissingFlows(flowsToAdd)
286 } else {
287 err = tasks.ErrTaskCancelError
288 }
289 return err
290}
291
292// AddMissingFlows : The flows missing from the device are reinstalled att the audit
293// The flows are added into a VoltFlow structure.
294func (att *AuditTablesTask) AddMissingFlows(mflow *of.VoltFlow) {
295 logger.Debugw(ctx, "Add Missing Flows", log.Fields{"Number": len(mflow.SubFlows)})
296 mflow.Command = of.CommandAdd
297 ofFlows := of.ProcessVoltFlow(att.device.ID, mflow.Command, mflow.SubFlows)
298 var vc voltha.VolthaServiceClient
299 var bwConsumedInfo of.BwAvailDetails
300 if vc = att.device.VolthaClient(); vc == nil {
301 logger.Error(ctx, "Update Flow Table Failed: Voltha Client Unavailable")
302 return
303 }
304 for _, flow := range ofFlows {
305 var dbFlow *of.VoltSubFlow
306 var present bool
307 if flow.FlowMod != nil {
308 if dbFlow, present = att.device.GetFlow(flow.FlowMod.Cookie); !present {
309 logger.Warn(ctx, "Flow Removed from DB. Ignoring Add Missing Flow", log.Fields{"Device": att.device.ID, "Cookie": flow.FlowMod.Cookie})
310 continue
311 }
312 }
313 var err error
314 if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flow); err != nil {
315 logger.Errorw(ctx, "Update Flow Table Failed", log.Fields{"Reason": err.Error()})
316 }
317 att.device.triggerFlowResultNotification(flow.FlowMod.Cookie, dbFlow, of.CommandAdd, bwConsumedInfo, err)
318 }
319}
320
321// DelExcessFlows delete the excess flows held at the VOLTHA
322func (att *AuditTablesTask) DelExcessFlows(flows map[uint64]*ofp.OfpFlowStats) {
323 logger.Debugw(ctx, "Deleting Excess Flows", log.Fields{"Number of Flows": len(flows)})
324
325 var vc voltha.VolthaServiceClient
326 if vc = att.device.VolthaClient(); vc == nil {
327 logger.Error(ctx, "Delete Excess Flows Failed: Voltha Client Unavailable")
328 return
329 }
330
331 // Let's cycle through the flows to delete the excess flows
332 for _, flow := range flows {
333
334 if _, present := att.device.GetFlow(flow.Cookie); present {
335 logger.Warn(ctx, "Flow Present in DB. Ignoring Delete Excess Flow", log.Fields{"Device": att.device.ID, "Cookie": flow.Cookie})
336 continue
337 }
338
339 logger.Debugw(ctx, "Deleting Flow", log.Fields{"Cookie": flow.Cookie})
340 // Create the flowMod structure and fill it out
341 flowMod := &ofp.OfpFlowMod{}
342 flowMod.Cookie = flow.Cookie
343 flowMod.TableId = flow.TableId
344 flowMod.Command = ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT
345 flowMod.IdleTimeout = flow.IdleTimeout
346 flowMod.HardTimeout = flow.HardTimeout
347 flowMod.Priority = flow.Priority
348 flowMod.BufferId = of.DefaultBufferID
349 flowMod.OutPort = of.DefaultOutPort
350 flowMod.OutGroup = of.DefaultOutGroup
351 flowMod.Flags = flow.Flags
352 flowMod.Match = flow.Match
353 flowMod.Instructions = flow.Instructions
354
355 // Create FlowTableUpdate
356 flowUpdate := &ofp.FlowTableUpdate{
357 Id: att.device.ID,
358 FlowMod: flowMod,
359 }
360
361 var err error
362 if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flowUpdate); err != nil {
363 logger.Errorw(ctx, "Flow Audit Delete Failed", log.Fields{"Reason": err.Error()})
364 }
365 att.device.triggerFlowResultNotification(flow.Cookie, nil, of.CommandDel, of.BwAvailDetails{}, err)
366 }
367}
368
369// AuditGroups audit the groups which includes fetching the existing groups at the
370// voltha and identifying the delta between the ones held here and the
371// ones held at VOLTHA. The delta must be cleaned up to keep both the
372// components in sync
373func (att *AuditTablesTask) AuditGroups() (map[uint32]*ofp.OfpGroupDesc, error) {
374
375 // Build the map for easy and faster processing
376 rcvdGroups = make(map[uint32]*ofp.OfpGroupDesc)
377
378 if att.stop {
379 return rcvdGroups, tasks.ErrTaskCancelError
380 }
381
382 var vc voltha.VolthaServiceClient
383 if vc = att.device.VolthaClient(); vc == nil {
384 logger.Error(ctx, "Group Audit Failed: Voltha Client Unavailable")
385 return rcvdGroups, nil
386 }
387
388 // ---------------------------------
389 // Perform the audit of groups first
390 // Retrieve the groups from the device
391 g, err := vc.ListLogicalDeviceFlowGroups(att.ctx, &common.ID{Id: att.device.ID})
392 if err != nil {
393 logger.Warnw(ctx, "Audit of groups failed", log.Fields{"Reason": err.Error()})
394 return rcvdGroups, err
395 }
396
397 groupsToAdd = []*of.Group{}
398 groupsToMod = []*of.Group{}
399 for _, group := range g.Items {
400 rcvdGroups[group.Desc.GroupId] = group.Desc
401 }
402 logger.Infow(ctx, "Received Groups", log.Fields{"Groups": rcvdGroups})
403
404 // Verify all groups that are in the controller but not in the device
405 att.device.groups.Range(att.compareGroupEntries)
406
407 if !att.stop {
408 // Add the groups missing at the device
409 logger.Infow(ctx, "Missing Groups", log.Fields{"Groups": groupsToAdd})
410 att.AddMissingGroups(groupsToAdd)
411
412 // Update groups with group member mismatch
413 logger.Infow(ctx, "Modify Groups", log.Fields{"Groups": groupsToMod})
414 att.UpdateMismatchGroups(groupsToMod)
415
416 // Note: Excess groups will be deleted after ensuring the connected
417 // flows are also removed as part fo audit flows
418 } else {
419 err = tasks.ErrTaskCancelError
420 }
421 // The groups remaining in the received groups are the excess groups at
422 // the device
423 return rcvdGroups, err
424}
425
426// compareGroupEntries to compare the group entries
427func (att *AuditTablesTask) compareGroupEntries(key, value interface{}) bool {
428
429 if att.stop {
430 return false
431 }
432
433 groupID := key.(uint32)
434 dbGroup := value.(*of.Group)
435 logger.Debugw(ctx, "Auditing Group", log.Fields{"Groupid": groupID})
436 if rcvdGrp, ok := rcvdGroups[groupID]; ok {
437 // The group exists in the device too.
438 // Compare the group members and add to modify list if required
439 compareGroupMembers(dbGroup, rcvdGrp)
440 delete(rcvdGroups, groupID)
441 } else {
442 // The group exists at the controller but not at the device
443 // Push the group to the device
444 logger.Debugw(ctx, "Adding Group To Missing Groups", log.Fields{"GroupId": groupID})
445 groupsToAdd = append(groupsToAdd, value.(*of.Group))
446 }
447 return true
448}
449
450func compareGroupMembers(refGroup *of.Group, rcvdGroup *ofp.OfpGroupDesc) {
451
452 portList := []uint32{}
453 refPortList := []uint32{}
454
455 //Collect port list from response Group Mod structure
456 //If PON is configured even for one group, then only PON shall be considered for compared for all groups
457 for _, bucket := range rcvdGroup.Buckets {
458 for _, actionBucket := range bucket.Actions {
459 if actionBucket.Type == ofp.OfpActionType_OFPAT_OUTPUT {
460 action := actionBucket.GetOutput()
461 portList = append(portList, action.Port)
462 }
463 }
464 }
465
466 refPortList = append(refPortList, refGroup.Buckets...)
467
468 //Is port list differs, trigger group update
469 if !util.IsSliceSame(refPortList, portList) {
470 groupsToMod = append(groupsToMod, refGroup)
471 }
472}
473
474//AddMissingGroups - addmissing groups to Voltha
475func (att *AuditTablesTask) AddMissingGroups(groupList []*of.Group) {
476 att.PushGroups(groupList, of.GroupCommandAdd)
477}
478
479//UpdateMismatchGroups - updates mismatched groups to Voltha
480func (att *AuditTablesTask) UpdateMismatchGroups(groupList []*of.Group) {
481 att.PushGroups(groupList, of.GroupCommandMod)
482}
483
484// PushGroups - The groups missing/to be updated in the device are reinstalled att the audit
485func (att *AuditTablesTask) PushGroups(groupList []*of.Group, grpCommand of.GroupCommand) {
486 logger.Debugw(ctx, "Pushing Groups", log.Fields{"Number": len(groupList), "Command": grpCommand})
487
488 var vc voltha.VolthaServiceClient
489 if vc = att.device.VolthaClient(); vc == nil {
490 logger.Error(ctx, "Update Group Table Failed: Voltha Client Unavailable")
491 return
492 }
493 for _, group := range groupList {
494 group.Command = grpCommand
495 groupUpdate := of.CreateGroupTableUpdate(group)
496 if _, err := vc.UpdateLogicalDeviceFlowGroupTable(att.ctx, groupUpdate); err != nil {
497 logger.Errorw(ctx, "Update Group Table Failed", log.Fields{"Reason": err.Error()})
498 }
499 }
500}
501
502// DelExcessGroups - Delete the excess groups held at the VOLTHA
503func (att *AuditTablesTask) DelExcessGroups(groups map[uint32]*ofp.OfpGroupDesc) {
504 logger.Debugw(ctx, "Deleting Excess Groups", log.Fields{"Number of Groups": len(groups)})
505
506 var vc voltha.VolthaServiceClient
507 if vc = att.device.VolthaClient(); vc == nil {
508 logger.Error(ctx, "Delete Excess Groups Failed: Voltha Client Unavailable")
509 return
510 }
511
512 // Let's cycle through the groups to delete the excess groups
513 for _, groupDesc := range groups {
514 logger.Debugw(ctx, "Deleting Group", log.Fields{"GroupId": groupDesc.GroupId})
515 group := &of.Group{}
516 group.Device = att.device.ID
517 group.GroupID = groupDesc.GroupId
518
519 //Group Members should be deleted before triggered group delete
520 group.Command = of.GroupCommandMod
521 groupUpdate := of.CreateGroupTableUpdate(group)
522 if _, err := vc.UpdateLogicalDeviceFlowGroupTable(att.ctx, groupUpdate); err != nil {
523 logger.Errorw(ctx, "Update Group Table Failed", log.Fields{"Reason": err.Error()})
524 }
525
526 group.Command = of.GroupCommandDel
527 groupUpdate = of.CreateGroupTableUpdate(group)
528 if _, err := vc.UpdateLogicalDeviceFlowGroupTable(att.ctx, groupUpdate); err != nil {
529 logger.Errorw(ctx, "Update Group Table Failed", log.Fields{"Reason": err.Error()})
530 }
531 }
532}