blob: 334ce41005506510863b23b68730ad8df7edcdc3 [file] [log] [blame]
Naveen Sampath04696f72022-06-13 15:19:14 +05301/*
2* Copyright 2022-present Open Networking Foundation
3* Licensed under the Apache License, Version 2.0 (the "License");
4* you may not use this file except in compliance with the License.
5* You may obtain a copy of the License at
6*
7* http://www.apache.org/licenses/LICENSE-2.0
8*
9* Unless required by applicable law or agreed to in writing, software
10* distributed under the License is distributed on an "AS IS" BASIS,
11* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12* See the License for the specific language governing permissions and
13* limitations under the License.
14*/
15
16package controller
17
18import (
19 "context"
20 "strconv"
21 "time"
22
23 "voltha-go-controller/internal/pkg/intf"
24 "voltha-go-controller/internal/pkg/of"
25 "voltha-go-controller/internal/pkg/tasks"
26 "voltha-go-controller/internal/pkg/util"
Tinoj Joseph1d108322022-07-13 10:07:39 +053027 "voltha-go-controller/log"
Naveen Sampath04696f72022-06-13 15:19:14 +053028 "github.com/opencord/voltha-protos/v5/go/common"
29 ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
30 "github.com/opencord/voltha-protos/v5/go/voltha"
31)
32
33var (
34 rcvdGroups map[uint32]*ofp.OfpGroupDesc
35 groupsToAdd []*of.Group
36 groupsToMod []*of.Group
37)
38
39// AuditTablesTask structure
40type AuditTablesTask struct {
41 taskID uint8
42 ctx context.Context
43 device *Device
44 stop bool
45 timestamp string
46}
47
48// NewAuditTablesTask is constructor for AuditTablesTask
49func NewAuditTablesTask(device *Device) *AuditTablesTask {
50 var att AuditTablesTask
51 att.device = device
52 att.stop = false
53 tstamp := (time.Now()).Format(time.RFC3339Nano)
54 att.timestamp = tstamp
55 return &att
56}
57
58// Name returns name of the task
59func (att *AuditTablesTask) Name() string {
60 return "Audit Table Task"
61}
62
63// TaskID to return task id of the task
64func (att *AuditTablesTask) TaskID() uint8 {
65 return att.taskID
66}
67
68// Timestamp to return timestamp for the task
69func (att *AuditTablesTask) Timestamp() string {
70 return att.timestamp
71}
72
73// Stop to stop the task
74func (att *AuditTablesTask) Stop() {
75 att.stop = true
76}
77
78// Start is called by the framework and is responsible for implementing
79// the actual task.
80func (att *AuditTablesTask) Start(ctx context.Context, taskID uint8) error {
81 logger.Warnw(ctx, "Audit Table Task Triggered", log.Fields{"Context": ctx, "taskId": taskID, "Device": att.device.ID})
82 att.taskID = taskID
83 att.ctx = ctx
84 var errInfo error
85 var err error
86
Tinoj Josephaf37ce82022-12-28 11:59:43 +053087 // Audit ports
88 if err = att.AuditPorts(); err != nil {
89 logger.Errorw(ctx, "Audit Ports Failed", log.Fields{"Reason": err.Error()})
90 errInfo = err
91 }
92
Naveen Sampath04696f72022-06-13 15:19:14 +053093 // Audit the meters
94 if err = att.AuditMeters(); err != nil {
95 logger.Errorw(ctx, "Audit Meters Failed", log.Fields{"Reason": err.Error()})
96 errInfo = err
97 }
98
99 // Audit the Groups
100 if rcvdGroups, err = att.AuditGroups(); err != nil {
101 logger.Errorw(ctx, "Audit Groups Failed", log.Fields{"Reason": err.Error()})
102 errInfo = err
103 }
104
105 // Audit the flows
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530106 if err = att.AuditFlows(ctx); err != nil {
Naveen Sampath04696f72022-06-13 15:19:14 +0530107 logger.Errorw(ctx, "Audit Flows Failed", log.Fields{"Reason": err.Error()})
108 errInfo = err
109 }
110
111 // Triggering deletion of excess groups from device after the corresponding flows are removed
112 // to avoid flow dependency error during group deletion
113 logger.Infow(ctx, "Excess Groups", log.Fields{"Groups": rcvdGroups})
114 att.DelExcessGroups(rcvdGroups)
115 logger.Warnw(ctx, "Audit Table Task Completed", log.Fields{"Context": ctx, "taskId": taskID, "Device": att.device.ID})
116 return errInfo
117
118}
119
120// AuditMeters : Audit the meters which includes fetching the existing meters at the
121// voltha and identifying the delta between the ones held here and the
122// ones held at VOLTHA. The delta must be cleaned up to keep both the
123// components in sync
124func (att *AuditTablesTask) AuditMeters() error {
125
126 if att.stop {
127 return tasks.ErrTaskCancelError
128 }
129 var vc voltha.VolthaServiceClient
130 if vc = att.device.VolthaClient(); vc == nil {
131 logger.Error(ctx, "Fetch Device Meters Failed: Voltha Client Unavailable")
132 return nil
133 }
134
135 //-----------------------------
136 // Perform the audit of meters
137 // Fetch the meters
138 ms, err := vc.ListLogicalDeviceMeters(att.ctx, &voltha.ID{Id: att.device.ID})
139 if err != nil {
140 logger.Warnw(ctx, "Audit of flows failed", log.Fields{"Reason": err.Error()})
141 return err
142 }
143
144 // Build the map for easy and faster processing
145 rcvdMeters := make(map[uint32]*ofp.OfpMeterStats)
146 for _, m := range ms.Items {
147 rcvdMeters[m.Stats.MeterId] = m.Stats
148 }
149
150 // Verify all meters that are in the controller but not in the device
151 missingMeters := []*of.Meter{}
152 for _, meter := range att.device.meters {
153
154 if att.stop {
155 break
156 }
157 logger.Debugw(ctx, "Auditing Meter", log.Fields{"Id": meter.ID})
158
159 if _, ok := rcvdMeters[meter.ID]; ok {
160 // The meter exists in the device too. Just remove it from
161 // the received meters
162 delete(rcvdMeters, meter.ID)
163 } else {
164 // The flow exists at the controller but not at the device
165 // Push the flow to the device
166 logger.Debugw(ctx, "Adding Meter To Missing Meters", log.Fields{"Id": meter.ID})
167 missingMeters = append(missingMeters, meter)
168 }
169 }
170 if !att.stop {
171 att.AddMissingMeters(missingMeters)
172 att.DelExcessMeters(rcvdMeters)
173 } else {
174 err = tasks.ErrTaskCancelError
175 }
176 return err
177}
178
179// AddMissingMeters adds the missing meters detected by AuditMeters
180func (att *AuditTablesTask) AddMissingMeters(meters []*of.Meter) {
181 logger.Debugw(ctx, "Adding missing meters", log.Fields{"Number": len(meters)})
182 for _, meter := range meters {
183 meterMod, err := of.MeterUpdate(att.device.ID, of.MeterCommandAdd, meter)
184 if err != nil {
185 logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
186 continue
187 }
188 if vc := att.device.VolthaClient(); vc != nil {
189 if _, err = vc.UpdateLogicalDeviceMeterTable(att.ctx, meterMod); err != nil {
190 logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
191 }
192 } else {
193 logger.Error(ctx, "Update Meter Table Failed: Voltha Client Unavailable")
194 }
195 }
196}
197
198// DelExcessMeters to delete excess meters
199func (att *AuditTablesTask) DelExcessMeters(meters map[uint32]*ofp.OfpMeterStats) {
200 logger.Debugw(ctx, "Deleting Excess Meters", log.Fields{"Number": len(meters)})
201 for _, meter := range meters {
202 meterMod := &ofp.OfpMeterMod{}
203 meterMod.Command = ofp.OfpMeterModCommand_OFPMC_DELETE
204 meterMod.MeterId = meter.MeterId
205 meterUpd := &ofp.MeterModUpdate{Id: att.device.ID, MeterMod: meterMod}
206 if vc := att.device.VolthaClient(); vc != nil {
207 if _, err := vc.UpdateLogicalDeviceMeterTable(att.ctx, meterUpd); err != nil {
208 logger.Errorw(ctx, "Update Meter Table Failed", log.Fields{"Reason": err.Error()})
209 }
210 } else {
211 logger.Error(ctx, "Update Meter Table Failed: Voltha Client Unavailable")
212 }
213 }
214}
215
216// AuditFlows audit the flows which includes fetching the existing meters at the
217// voltha and identifying the delta between the ones held here and the
218// ones held at VOLTHA. The delta must be cleaned up to keep both the
219// components in sync
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530220func (att *AuditTablesTask) AuditFlows(cntx context.Context) error {
Naveen Sampath04696f72022-06-13 15:19:14 +0530221
222 if att.stop {
223 return tasks.ErrTaskCancelError
224 }
225
226 var vc voltha.VolthaServiceClient
227 if vc = att.device.VolthaClient(); vc == nil {
228 logger.Error(ctx, "Flow Audit Failed: Voltha Client Unavailable")
229 return nil
230 }
231
232 // ---------------------------------
233 // Perform the audit of flows first
234 // Retrieve the flows from the device
235 f, err := vc.ListLogicalDeviceFlows(att.ctx, &common.ID{Id: att.device.ID})
236 if err != nil {
237 logger.Warnw(ctx, "Audit of flows failed", log.Fields{"Reason": err.Error()})
238 return err
239 }
240
241 defaultSuccessFlowStatus := intf.FlowStatus{
242 Device: att.device.ID,
243 FlowModType: of.CommandAdd,
244 Status: 0,
245 Reason: "",
246 }
247
248 // Build the map for easy and faster processing
249 rcvdFlows := make(map[uint64]*ofp.OfpFlowStats)
250 flowsToAdd := &of.VoltFlow{}
251 flowsToAdd.SubFlows = make(map[uint64]*of.VoltSubFlow)
252 for _, flow := range f.Items {
253 rcvdFlows[flow.Cookie] = flow
254 }
255
256 att.device.flowLock.Lock()
257 // Verify all flows that are in the controller but not in the device
258 for _, flow := range att.device.flows {
259
260 if att.stop {
261 break
262 }
263
264 logger.Debugw(ctx, "Auditing Flow", log.Fields{"Cookie": flow.Cookie})
265 if _, ok := rcvdFlows[flow.Cookie]; ok {
266 // The flow exists in the device too. Just remove it from
267 // the received flows & trigger flow success indication unless
268 // the flow in del failure/pending state
269
270 if flow.State != of.FlowDelFailure && flow.State != of.FlowDelPending {
271 delete(rcvdFlows, flow.Cookie)
272 }
273 defaultSuccessFlowStatus.Cookie = strconv.FormatUint(flow.Cookie, 10)
274
275 logger.Infow(ctx, "Triggering Internal Flow Notification", log.Fields{"Flow Status": defaultSuccessFlowStatus})
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530276 GetController().ProcessFlowModResultIndication(cntx, defaultSuccessFlowStatus)
Naveen Sampath04696f72022-06-13 15:19:14 +0530277 } else {
278 // The flow exists at the controller but not at the device
279 // Push the flow to the device
280 logger.Debugw(ctx, "Adding Flow To Missing Flows", log.Fields{"Cookie": flow.Cookie})
281 flowsToAdd.SubFlows[flow.Cookie] = flow
282 }
283 }
284 att.device.flowLock.Unlock()
285
286 if !att.stop {
287 // The flows remaining in the received flows are the excess flows at
288 // the device. Delete those flows
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530289 att.DelExcessFlows(cntx, rcvdFlows)
Naveen Sampath04696f72022-06-13 15:19:14 +0530290 // Add the flows missing at the device
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530291 att.AddMissingFlows(cntx, flowsToAdd)
Naveen Sampath04696f72022-06-13 15:19:14 +0530292 } else {
293 err = tasks.ErrTaskCancelError
294 }
295 return err
296}
297
298// AddMissingFlows : The flows missing from the device are reinstalled att the audit
299// The flows are added into a VoltFlow structure.
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530300func (att *AuditTablesTask) AddMissingFlows(cntx context.Context, mflow *of.VoltFlow) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530301 logger.Debugw(ctx, "Add Missing Flows", log.Fields{"Number": len(mflow.SubFlows)})
302 mflow.Command = of.CommandAdd
303 ofFlows := of.ProcessVoltFlow(att.device.ID, mflow.Command, mflow.SubFlows)
304 var vc voltha.VolthaServiceClient
305 var bwConsumedInfo of.BwAvailDetails
306 if vc = att.device.VolthaClient(); vc == nil {
307 logger.Error(ctx, "Update Flow Table Failed: Voltha Client Unavailable")
308 return
309 }
310 for _, flow := range ofFlows {
311 var dbFlow *of.VoltSubFlow
312 var present bool
313 if flow.FlowMod != nil {
314 if dbFlow, present = att.device.GetFlow(flow.FlowMod.Cookie); !present {
Tinoj Joseph1d108322022-07-13 10:07:39 +0530315 logger.Warnw(ctx, "Flow Removed from DB. Ignoring Add Missing Flow", log.Fields{"Device": att.device.ID, "Cookie": flow.FlowMod.Cookie})
Naveen Sampath04696f72022-06-13 15:19:14 +0530316 continue
317 }
318 }
319 var err error
320 if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flow); err != nil {
321 logger.Errorw(ctx, "Update Flow Table Failed", log.Fields{"Reason": err.Error()})
322 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530323 att.device.triggerFlowResultNotification(cntx, flow.FlowMod.Cookie, dbFlow, of.CommandAdd, bwConsumedInfo, err)
Naveen Sampath04696f72022-06-13 15:19:14 +0530324 }
325}
326
327// DelExcessFlows delete the excess flows held at the VOLTHA
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530328func (att *AuditTablesTask) DelExcessFlows(cntx context.Context, flows map[uint64]*ofp.OfpFlowStats) {
Naveen Sampath04696f72022-06-13 15:19:14 +0530329 logger.Debugw(ctx, "Deleting Excess Flows", log.Fields{"Number of Flows": len(flows)})
330
331 var vc voltha.VolthaServiceClient
332 if vc = att.device.VolthaClient(); vc == nil {
333 logger.Error(ctx, "Delete Excess Flows Failed: Voltha Client Unavailable")
334 return
335 }
336
337 // Let's cycle through the flows to delete the excess flows
338 for _, flow := range flows {
339
340 if _, present := att.device.GetFlow(flow.Cookie); present {
Tinoj Joseph1d108322022-07-13 10:07:39 +0530341 logger.Warnw(ctx, "Flow Present in DB. Ignoring Delete Excess Flow", log.Fields{"Device": att.device.ID, "Cookie": flow.Cookie})
Naveen Sampath04696f72022-06-13 15:19:14 +0530342 continue
343 }
344
345 logger.Debugw(ctx, "Deleting Flow", log.Fields{"Cookie": flow.Cookie})
346 // Create the flowMod structure and fill it out
347 flowMod := &ofp.OfpFlowMod{}
348 flowMod.Cookie = flow.Cookie
349 flowMod.TableId = flow.TableId
350 flowMod.Command = ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT
351 flowMod.IdleTimeout = flow.IdleTimeout
352 flowMod.HardTimeout = flow.HardTimeout
353 flowMod.Priority = flow.Priority
354 flowMod.BufferId = of.DefaultBufferID
355 flowMod.OutPort = of.DefaultOutPort
356 flowMod.OutGroup = of.DefaultOutGroup
357 flowMod.Flags = flow.Flags
358 flowMod.Match = flow.Match
359 flowMod.Instructions = flow.Instructions
360
361 // Create FlowTableUpdate
362 flowUpdate := &ofp.FlowTableUpdate{
363 Id: att.device.ID,
364 FlowMod: flowMod,
365 }
366
367 var err error
368 if _, err = vc.UpdateLogicalDeviceFlowTable(att.ctx, flowUpdate); err != nil {
369 logger.Errorw(ctx, "Flow Audit Delete Failed", log.Fields{"Reason": err.Error()})
370 }
Tinoj Joseph07cc5372022-07-18 22:53:51 +0530371 att.device.triggerFlowResultNotification(cntx, flow.Cookie, nil, of.CommandDel, of.BwAvailDetails{}, err)
Naveen Sampath04696f72022-06-13 15:19:14 +0530372 }
373}
374
375// AuditGroups audit the groups which includes fetching the existing groups at the
376// voltha and identifying the delta between the ones held here and the
377// ones held at VOLTHA. The delta must be cleaned up to keep both the
378// components in sync
379func (att *AuditTablesTask) AuditGroups() (map[uint32]*ofp.OfpGroupDesc, error) {
380
381 // Build the map for easy and faster processing
382 rcvdGroups = make(map[uint32]*ofp.OfpGroupDesc)
383
384 if att.stop {
385 return rcvdGroups, tasks.ErrTaskCancelError
386 }
387
388 var vc voltha.VolthaServiceClient
389 if vc = att.device.VolthaClient(); vc == nil {
390 logger.Error(ctx, "Group Audit Failed: Voltha Client Unavailable")
391 return rcvdGroups, nil
392 }
393
394 // ---------------------------------
395 // Perform the audit of groups first
396 // Retrieve the groups from the device
397 g, err := vc.ListLogicalDeviceFlowGroups(att.ctx, &common.ID{Id: att.device.ID})
398 if err != nil {
399 logger.Warnw(ctx, "Audit of groups failed", log.Fields{"Reason": err.Error()})
400 return rcvdGroups, err
401 }
402
403 groupsToAdd = []*of.Group{}
404 groupsToMod = []*of.Group{}
405 for _, group := range g.Items {
406 rcvdGroups[group.Desc.GroupId] = group.Desc
407 }
408 logger.Infow(ctx, "Received Groups", log.Fields{"Groups": rcvdGroups})
409
410 // Verify all groups that are in the controller but not in the device
411 att.device.groups.Range(att.compareGroupEntries)
412
413 if !att.stop {
414 // Add the groups missing at the device
415 logger.Infow(ctx, "Missing Groups", log.Fields{"Groups": groupsToAdd})
416 att.AddMissingGroups(groupsToAdd)
417
418 // Update groups with group member mismatch
419 logger.Infow(ctx, "Modify Groups", log.Fields{"Groups": groupsToMod})
420 att.UpdateMismatchGroups(groupsToMod)
421
422 // Note: Excess groups will be deleted after ensuring the connected
423 // flows are also removed as part fo audit flows
424 } else {
425 err = tasks.ErrTaskCancelError
426 }
427 // The groups remaining in the received groups are the excess groups at
428 // the device
429 return rcvdGroups, err
430}
431
432// compareGroupEntries to compare the group entries
433func (att *AuditTablesTask) compareGroupEntries(key, value interface{}) bool {
434
435 if att.stop {
436 return false
437 }
438
439 groupID := key.(uint32)
440 dbGroup := value.(*of.Group)
441 logger.Debugw(ctx, "Auditing Group", log.Fields{"Groupid": groupID})
442 if rcvdGrp, ok := rcvdGroups[groupID]; ok {
443 // The group exists in the device too.
444 // Compare the group members and add to modify list if required
445 compareGroupMembers(dbGroup, rcvdGrp)
446 delete(rcvdGroups, groupID)
447 } else {
448 // The group exists at the controller but not at the device
449 // Push the group to the device
450 logger.Debugw(ctx, "Adding Group To Missing Groups", log.Fields{"GroupId": groupID})
451 groupsToAdd = append(groupsToAdd, value.(*of.Group))
452 }
453 return true
454}
455
456func compareGroupMembers(refGroup *of.Group, rcvdGroup *ofp.OfpGroupDesc) {
457
458 portList := []uint32{}
459 refPortList := []uint32{}
460
461 //Collect port list from response Group Mod structure
462 //If PON is configured even for one group, then only PON shall be considered for compared for all groups
463 for _, bucket := range rcvdGroup.Buckets {
464 for _, actionBucket := range bucket.Actions {
465 if actionBucket.Type == ofp.OfpActionType_OFPAT_OUTPUT {
466 action := actionBucket.GetOutput()
467 portList = append(portList, action.Port)
468 }
469 }
470 }
471
472 refPortList = append(refPortList, refGroup.Buckets...)
473
474 //Is port list differs, trigger group update
475 if !util.IsSliceSame(refPortList, portList) {
476 groupsToMod = append(groupsToMod, refGroup)
477 }
478}
479
480//AddMissingGroups - addmissing groups to Voltha
481func (att *AuditTablesTask) AddMissingGroups(groupList []*of.Group) {
482 att.PushGroups(groupList, of.GroupCommandAdd)
483}
484
485//UpdateMismatchGroups - updates mismatched groups to Voltha
486func (att *AuditTablesTask) UpdateMismatchGroups(groupList []*of.Group) {
487 att.PushGroups(groupList, of.GroupCommandMod)
488}
489
490// PushGroups - The groups missing/to be updated in the device are reinstalled att the audit
491func (att *AuditTablesTask) PushGroups(groupList []*of.Group, grpCommand of.GroupCommand) {
492 logger.Debugw(ctx, "Pushing Groups", log.Fields{"Number": len(groupList), "Command": grpCommand})
493
494 var vc voltha.VolthaServiceClient
495 if vc = att.device.VolthaClient(); vc == nil {
496 logger.Error(ctx, "Update Group Table Failed: Voltha Client Unavailable")
497 return
498 }
499 for _, group := range groupList {
500 group.Command = grpCommand
501 groupUpdate := of.CreateGroupTableUpdate(group)
502 if _, err := vc.UpdateLogicalDeviceFlowGroupTable(att.ctx, groupUpdate); err != nil {
503 logger.Errorw(ctx, "Update Group Table Failed", log.Fields{"Reason": err.Error()})
504 }
505 }
506}
507
508// DelExcessGroups - Delete the excess groups held at the VOLTHA
509func (att *AuditTablesTask) DelExcessGroups(groups map[uint32]*ofp.OfpGroupDesc) {
510 logger.Debugw(ctx, "Deleting Excess Groups", log.Fields{"Number of Groups": len(groups)})
511
512 var vc voltha.VolthaServiceClient
513 if vc = att.device.VolthaClient(); vc == nil {
514 logger.Error(ctx, "Delete Excess Groups Failed: Voltha Client Unavailable")
515 return
516 }
517
518 // Let's cycle through the groups to delete the excess groups
519 for _, groupDesc := range groups {
520 logger.Debugw(ctx, "Deleting Group", log.Fields{"GroupId": groupDesc.GroupId})
521 group := &of.Group{}
522 group.Device = att.device.ID
523 group.GroupID = groupDesc.GroupId
524
525 //Group Members should be deleted before triggered group delete
526 group.Command = of.GroupCommandMod
527 groupUpdate := of.CreateGroupTableUpdate(group)
528 if _, err := vc.UpdateLogicalDeviceFlowGroupTable(att.ctx, groupUpdate); err != nil {
529 logger.Errorw(ctx, "Update Group Table Failed", log.Fields{"Reason": err.Error()})
530 }
531
532 group.Command = of.GroupCommandDel
533 groupUpdate = of.CreateGroupTableUpdate(group)
534 if _, err := vc.UpdateLogicalDeviceFlowGroupTable(att.ctx, groupUpdate); err != nil {
535 logger.Errorw(ctx, "Update Group Table Failed", log.Fields{"Reason": err.Error()})
536 }
537 }
538}
Tinoj Josephaf37ce82022-12-28 11:59:43 +0530539
540func (att *AuditTablesTask) AuditPorts() error {
541
542 if att.stop {
543 return tasks.ErrTaskCancelError
544 }
545
546 var vc voltha.VolthaServiceClient
547 if vc = att.device.VolthaClient(); vc == nil {
548 logger.Error(ctx, "Flow Audit Failed: Voltha Client Unavailable")
549 return nil
550 }
551 ofpps, err := vc.ListLogicalDevicePorts(att.ctx, &common.ID{Id: att.device.ID})
552 if err != nil {
553 return err
554 }
555
556 // Compute the difference between the ports received and ports at VGC
557 // First build a map of all the received ports under missing ports. We
558 // will eliminate the ports that are in the device from the missing ports
559 // so that the elements remaining are missing ports. The ones that are
560 // not in missing ports are added to excess ports which should be deleted
561 // from the VGC.
562 missingPorts := make(map[uint32]*ofp.OfpPort)
563 for _, ofpp := range ofpps.Items {
564 missingPorts[ofpp.OfpPort.PortNo] = ofpp.OfpPort
565 }
566
567 var excessPorts []uint32
568 processPortState := func(id uint32, vgcPort *DevicePort) {
569 logger.Debugw(ctx, "Process Port State Ind", log.Fields{"Port No": vgcPort.ID, "Port Name": vgcPort.Name})
570
571 if ofpPort, ok := missingPorts[id]; ok {
572 if ((vgcPort.State == PortStateDown) && (ofpPort.State == uint32(ofp.OfpPortState_OFPPS_LIVE))) || ((vgcPort.State == PortStateUp) && (ofpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE))) {
573 // This port exists in the received list and the map at
574 // VGC. This is common so delete it
575 logger.Infow(ctx, "Port State Mismatch", log.Fields{"Port": vgcPort.ID, "OfpPort": ofpPort.PortNo, "ReceivedState": ofpPort.State, "CurrentState": vgcPort.State})
576 att.device.ProcessPortState(ctx, ofpPort.PortNo, ofpPort.State)
577 }
578 delete(missingPorts, id)
579 } else {
580 // This port is missing from the received list. This is an
581 // excess port at VGC. This must be added to excess ports
582 excessPorts = append(excessPorts, id)
583 }
584 logger.Debugw(ctx, "Processed Port State Ind", log.Fields{"Port No": vgcPort.ID, "Port Name": vgcPort.Name})
585
586 }
587 // 1st process the NNI port before all other ports so that the device state can be updated.
588 if vgcPort, ok := att.device.PortsByID[NNIPortID]; ok {
589 logger.Info(ctx, "Processing NNI port state")
590 processPortState(NNIPortID, vgcPort)
591 }
592
593 for id, vgcPort := range att.device.PortsByID {
594 if id == NNIPortID {
595 //NNI port already processed
596 continue
597 }
598 if att.stop {
599 break
600 }
601 processPortState(id, vgcPort)
602 }
603
604 if att.stop {
605 logger.Errorw(ctx, "Audit Device Task Cancelled", log.Fields{"Context": att.ctx, "Task": att.taskID})
606 return tasks.ErrTaskCancelError
607 }
608 att.AddMissingPorts(ctx, missingPorts)
609 att.DelExcessPorts(ctx, excessPorts)
610 return nil
611}
612
613// AddMissingPorts to add the missing ports
614func (att *AuditTablesTask) AddMissingPorts(cntx context.Context, mps map[uint32]*ofp.OfpPort) {
615 logger.Debugw(ctx, "Device Audit - Add Missing Ports", log.Fields{"NumPorts": len(mps)})
616
617 addMissingPort := func(mp *ofp.OfpPort) {
618 logger.Debugw(ctx, "Process Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
619
620 // Error is ignored as it only drops duplicate ports
621 logger.Infow(ctx, "Calling AddPort", log.Fields{"No": mp.PortNo, "Name": mp.Name})
622 if err := att.device.AddPort(cntx, mp); err != nil {
623 logger.Warnw(ctx, "AddPort Failed", log.Fields{"No": mp.PortNo, "Name": mp.Name, "Reason": err})
624 }
625 if mp.State == uint32(ofp.OfpPortState_OFPPS_LIVE) {
626 att.device.ProcessPortState(cntx, mp.PortNo, mp.State)
627 }
628 logger.Debugw(ctx, "Processed Port Add Ind", log.Fields{"Port No": mp.PortNo, "Port Name": mp.Name})
629
630 }
631
632 // 1st process the NNI port before all other ports so that the flow provisioning for UNIs can be enabled
633 if mp, ok := mps[NNIPortID]; ok {
634 logger.Info(ctx, "Adding Missing NNI port")
635 addMissingPort(mp)
636 }
637
638 for portNo, mp := range mps {
639 if portNo != NNIPortID {
640 addMissingPort(mp)
641 }
642 }
643}
644
645// DelExcessPorts to delete the excess ports
646func (att *AuditTablesTask) DelExcessPorts(cntx context.Context, eps []uint32) {
647 logger.Debugw(ctx, "Device Audit - Delete Excess Ports", log.Fields{"NumPorts": len(eps)})
648 for _, id := range eps {
649 // Now delete the port from the device @ VGC
650 logger.Infow(ctx, "Device Audit - Deleting Port", log.Fields{"PortId": id})
651 if err := att.device.DelPort(cntx, id); err != nil {
652 logger.Warnw(ctx, "DelPort Failed", log.Fields{"PortId": id, "Reason": err})
653 }
654 }
655}
656