blob: a0d6c4a10c3c1b26f0dd1ec622e4f7bda32b24bb [file] [log] [blame]
Kent Hagerman3136fbd2020-05-14 10:30:45 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package device
18
19import (
20 "context"
21 "fmt"
22 "strconv"
23
24 coreutils "github.com/opencord/voltha-go/rw_core/utils"
25 fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
26 "github.com/opencord/voltha-lib-go/v3/pkg/log"
27 ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
28 "github.com/opencord/voltha-protos/v3/go/voltha"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/status"
31)
32
33//updateGroupTable updates the group table of that logical device
34func (agent *LogicalAgent) updateGroupTable(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
35 logger.Debug("updateGroupTable")
36 if groupMod == nil {
37 return nil
38 }
39
40 if err := agent.generateDeviceRoutesIfNeeded(ctx); err != nil {
41 return err
42 }
43
44 switch groupMod.GetCommand() {
45 case ofp.OfpGroupModCommand_OFPGC_ADD:
46 return agent.groupAdd(ctx, groupMod)
47 case ofp.OfpGroupModCommand_OFPGC_DELETE:
48 return agent.groupDelete(ctx, groupMod)
49 case ofp.OfpGroupModCommand_OFPGC_MODIFY:
50 return agent.groupModify(ctx, groupMod)
51 }
52 return status.Errorf(codes.Internal, "unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, groupMod.GetCommand())
53}
54
55func (agent *LogicalAgent) groupAdd(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
56 if groupMod == nil {
57 return nil
58 }
59 logger.Debugw("groupAdd", log.Fields{"GroupId": groupMod.GroupId})
60 agent.groupLock.Lock()
61 _, ok := agent.groups[groupMod.GroupId]
62 if ok {
63 agent.groupLock.Unlock()
64 return fmt.Errorf("Group %d already exists", groupMod.GroupId)
65 }
66
67 groupEntry := fu.GroupEntryFromGroupMod(groupMod)
68 groupChunk := GroupChunk{
69 group: groupEntry,
70 }
71 //add to map
72 agent.groups[groupMod.GroupId] = &groupChunk
73 groupChunk.lock.Lock()
74 defer groupChunk.lock.Unlock()
75 agent.groupLock.Unlock()
76 //add to the kv store
77 path := fmt.Sprintf("groups/%s", agent.logicalDeviceID)
78 groupID := strconv.Itoa(int(groupMod.GroupId))
79 if err := agent.clusterDataProxy.AddWithID(ctx, path, groupID, groupEntry); err != nil {
80 logger.Errorw("failed-adding-group", log.Fields{"deviceID": agent.logicalDeviceID, "groupID": groupID, "err": err})
81 agent.groupLock.Lock()
82 delete(agent.groups, groupMod.GroupId)
83 agent.groupLock.Unlock()
84 return err
85 }
86 deviceRules := fu.NewDeviceRules()
87 deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
88 fg := fu.NewFlowsAndGroups()
89 fg.AddGroup(fu.GroupEntryFromGroupMod(groupMod))
90 deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
91
92 logger.Debugw("rules", log.Fields{"rules for group-add": deviceRules.String()})
93
94 // Update the devices
95 respChnls := agent.addFlowsAndGroupsToDevices(ctx, deviceRules, &voltha.FlowMetadata{})
96
97 // Wait for completion
98 go func() {
99 if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
100 logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
101 //TODO: Revert flow changes
102 }
103 }()
104 return nil
105}
106
107func (agent *LogicalAgent) groupDelete(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
108 logger.Debug("groupDelete")
109 if groupMod == nil {
110 return nil
111 }
112 affectedFlows := make([]*ofp.OfpFlowStats, 0)
113 affectedGroups := make([]*ofp.OfpGroupEntry, 0)
114 var groupsChanged bool
115 groupID := groupMod.GroupId
116 var err error
117 if groupID == uint32(ofp.OfpGroup_OFPG_ALL) {
118 if err := func() error {
119 agent.groupLock.Lock()
120 defer agent.groupLock.Unlock()
121 for key, groupChunk := range agent.groups {
122 //Remove from store and cache. Do this in a one time lock allocation.
123 path := fmt.Sprintf("groups/%s/%d", agent.logicalDeviceID, key)
124 if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
125 return fmt.Errorf("couldnt-deleted-group-from-store-%s", path)
126 }
127 delete(agent.groups, groupID)
128 var flows []*ofp.OfpFlowStats
129 if flows, err = agent.deleteFlowsOfGroup(ctx, key); err != nil {
130 logger.Errorw("cannot-update-flow-for-group-delete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "groupID": key})
131 return err
132 }
133 affectedFlows = append(affectedFlows, flows...)
134 affectedGroups = append(affectedGroups, groupChunk.group)
135 }
136 return nil
137 }(); err != nil {
138 return err
139 }
140 groupsChanged = true
141 } else {
142 agent.groupLock.RLock()
143 groupChunk, ok := agent.groups[groupID]
144 agent.groupLock.RUnlock()
145 if !ok {
146 logger.Warnw("group-not-found", log.Fields{"groupID": groupID})
147 return nil
148 }
149 groupChunk.lock.Lock()
150 defer groupChunk.lock.Unlock()
151 var flows []*ofp.OfpFlowStats
152 if flows, err = agent.deleteFlowsOfGroup(ctx, groupID); err != nil {
153 logger.Errorw("cannot-update-flow-for-group-delete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "groupID": groupID})
154 return err
155 }
156 //remove from store
157 if err := agent.removeLogicalDeviceFlowGroup(ctx, groupID); err != nil {
158 return err
159 }
160 affectedFlows = append(affectedFlows, flows...)
161 affectedGroups = append(affectedGroups, groupChunk.group)
162 groupsChanged = true
163
164 }
165
166 if err != nil || groupsChanged {
167 var deviceRules *fu.DeviceRules
168 deviceRules, err = agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: affectedFlows}, ofp.FlowGroups{Items: affectedGroups})
169 if err != nil {
170 return err
171 }
172 logger.Debugw("rules", log.Fields{"rules": deviceRules.String()})
173
174 // Update the devices
175 respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, nil)
176
177 // Wait for completion
178 go func() {
179 if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
180 logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
181 //TODO: Revert flow changes
182 }
183 }()
184 }
185 return nil
186}
187
188func (agent *LogicalAgent) groupModify(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
189 logger.Debug("groupModify")
190 if groupMod == nil {
191 return nil
192 }
193
194 groupID := groupMod.GroupId
195 agent.groupLock.RLock()
196 groupChunk, ok := agent.groups[groupID]
197 agent.groupLock.RUnlock()
198 if !ok {
199 return fmt.Errorf("group-absent:%d", groupID)
200 }
201 //Don't let any other thread to make modifications to this group till all done here.
202 groupChunk.lock.Lock()
203 defer groupChunk.lock.Unlock()
204 //replace existing group entry with new group definition
205 groupEntry := fu.GroupEntryFromGroupMod(groupMod)
206 deviceRules := fu.NewDeviceRules()
207 deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
208 fg := fu.NewFlowsAndGroups()
209 fg.AddGroup(fu.GroupEntryFromGroupMod(groupMod))
210 deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
211
212 logger.Debugw("rules", log.Fields{"rules-for-group-modify": deviceRules.String()})
213 //update KV
214 if err := agent.updateLogicalDeviceFlowGroup(ctx, groupEntry, groupChunk); err != nil {
215 logger.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
216 return err
217 }
218
219 // Update the devices
220 respChnls := agent.updateFlowsAndGroupsOfDevice(ctx, deviceRules, &voltha.FlowMetadata{})
221
222 // Wait for completion
223 go func() {
224 if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
225 logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
226 //TODO: Revert flow changes
227 }
228 }()
229 return nil
230}