blob: 73caa071115e5b5ecd8bbf7282a4e2f54465239f [file] [log] [blame]
Kent Hagerman3136fbd2020-05-14 10:30:45 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package device
18
19import (
20 "context"
21 "fmt"
22 "strconv"
23
24 coreutils "github.com/opencord/voltha-go/rw_core/utils"
25 fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
26 "github.com/opencord/voltha-lib-go/v3/pkg/log"
27 ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
28 "github.com/opencord/voltha-protos/v3/go/voltha"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/status"
31)
32
33//updateGroupTable updates the group table of that logical device
34func (agent *LogicalAgent) updateGroupTable(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
35 logger.Debug("updateGroupTable")
36 if groupMod == nil {
37 return nil
38 }
39
Kent Hagerman3136fbd2020-05-14 10:30:45 -040040 switch groupMod.GetCommand() {
41 case ofp.OfpGroupModCommand_OFPGC_ADD:
42 return agent.groupAdd(ctx, groupMod)
43 case ofp.OfpGroupModCommand_OFPGC_DELETE:
44 return agent.groupDelete(ctx, groupMod)
45 case ofp.OfpGroupModCommand_OFPGC_MODIFY:
46 return agent.groupModify(ctx, groupMod)
47 }
48 return status.Errorf(codes.Internal, "unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, groupMod.GetCommand())
49}
50
51func (agent *LogicalAgent) groupAdd(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
52 if groupMod == nil {
53 return nil
54 }
55 logger.Debugw("groupAdd", log.Fields{"GroupId": groupMod.GroupId})
56 agent.groupLock.Lock()
57 _, ok := agent.groups[groupMod.GroupId]
58 if ok {
59 agent.groupLock.Unlock()
60 return fmt.Errorf("Group %d already exists", groupMod.GroupId)
61 }
62
63 groupEntry := fu.GroupEntryFromGroupMod(groupMod)
64 groupChunk := GroupChunk{
65 group: groupEntry,
66 }
67 //add to map
68 agent.groups[groupMod.GroupId] = &groupChunk
69 groupChunk.lock.Lock()
70 defer groupChunk.lock.Unlock()
71 agent.groupLock.Unlock()
72 //add to the kv store
73 path := fmt.Sprintf("groups/%s", agent.logicalDeviceID)
74 groupID := strconv.Itoa(int(groupMod.GroupId))
75 if err := agent.clusterDataProxy.AddWithID(ctx, path, groupID, groupEntry); err != nil {
76 logger.Errorw("failed-adding-group", log.Fields{"deviceID": agent.logicalDeviceID, "groupID": groupID, "err": err})
77 agent.groupLock.Lock()
78 delete(agent.groups, groupMod.GroupId)
79 agent.groupLock.Unlock()
80 return err
81 }
82 deviceRules := fu.NewDeviceRules()
83 deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
84 fg := fu.NewFlowsAndGroups()
85 fg.AddGroup(fu.GroupEntryFromGroupMod(groupMod))
86 deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
87
88 logger.Debugw("rules", log.Fields{"rules for group-add": deviceRules.String()})
89
90 // Update the devices
khenaidoo0db4c812020-05-27 15:27:30 -040091 respChnls := agent.addFlowsAndGroupsToDevices(deviceRules, &voltha.FlowMetadata{})
Kent Hagerman3136fbd2020-05-14 10:30:45 -040092
93 // Wait for completion
94 go func() {
95 if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
96 logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
97 //TODO: Revert flow changes
98 }
99 }()
100 return nil
101}
102
103func (agent *LogicalAgent) groupDelete(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
104 logger.Debug("groupDelete")
105 if groupMod == nil {
106 return nil
107 }
108 affectedFlows := make([]*ofp.OfpFlowStats, 0)
109 affectedGroups := make([]*ofp.OfpGroupEntry, 0)
110 var groupsChanged bool
111 groupID := groupMod.GroupId
112 var err error
113 if groupID == uint32(ofp.OfpGroup_OFPG_ALL) {
114 if err := func() error {
115 agent.groupLock.Lock()
116 defer agent.groupLock.Unlock()
117 for key, groupChunk := range agent.groups {
118 //Remove from store and cache. Do this in a one time lock allocation.
119 path := fmt.Sprintf("groups/%s/%d", agent.logicalDeviceID, key)
120 if err := agent.clusterDataProxy.Remove(ctx, path); err != nil {
121 return fmt.Errorf("couldnt-deleted-group-from-store-%s", path)
122 }
123 delete(agent.groups, groupID)
124 var flows []*ofp.OfpFlowStats
125 if flows, err = agent.deleteFlowsOfGroup(ctx, key); err != nil {
126 logger.Errorw("cannot-update-flow-for-group-delete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "groupID": key})
127 return err
128 }
129 affectedFlows = append(affectedFlows, flows...)
130 affectedGroups = append(affectedGroups, groupChunk.group)
131 }
132 return nil
133 }(); err != nil {
134 return err
135 }
136 groupsChanged = true
137 } else {
138 agent.groupLock.RLock()
139 groupChunk, ok := agent.groups[groupID]
140 agent.groupLock.RUnlock()
141 if !ok {
142 logger.Warnw("group-not-found", log.Fields{"groupID": groupID})
143 return nil
144 }
145 groupChunk.lock.Lock()
146 defer groupChunk.lock.Unlock()
147 var flows []*ofp.OfpFlowStats
148 if flows, err = agent.deleteFlowsOfGroup(ctx, groupID); err != nil {
149 logger.Errorw("cannot-update-flow-for-group-delete", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "groupID": groupID})
150 return err
151 }
152 //remove from store
153 if err := agent.removeLogicalDeviceFlowGroup(ctx, groupID); err != nil {
154 return err
155 }
156 affectedFlows = append(affectedFlows, flows...)
157 affectedGroups = append(affectedGroups, groupChunk.group)
158 groupsChanged = true
159
160 }
161
162 if err != nil || groupsChanged {
163 var deviceRules *fu.DeviceRules
164 deviceRules, err = agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: affectedFlows}, ofp.FlowGroups{Items: affectedGroups})
165 if err != nil {
166 return err
167 }
168 logger.Debugw("rules", log.Fields{"rules": deviceRules.String()})
169
170 // Update the devices
khenaidoo0db4c812020-05-27 15:27:30 -0400171 respChnls := agent.updateFlowsAndGroupsOfDevice(deviceRules, nil)
Kent Hagerman3136fbd2020-05-14 10:30:45 -0400172
173 // Wait for completion
174 go func() {
175 if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
176 logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
177 //TODO: Revert flow changes
178 }
179 }()
180 }
181 return nil
182}
183
184func (agent *LogicalAgent) groupModify(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
185 logger.Debug("groupModify")
186 if groupMod == nil {
187 return nil
188 }
189
190 groupID := groupMod.GroupId
191 agent.groupLock.RLock()
192 groupChunk, ok := agent.groups[groupID]
193 agent.groupLock.RUnlock()
194 if !ok {
195 return fmt.Errorf("group-absent:%d", groupID)
196 }
197 //Don't let any other thread to make modifications to this group till all done here.
198 groupChunk.lock.Lock()
199 defer groupChunk.lock.Unlock()
200 //replace existing group entry with new group definition
201 groupEntry := fu.GroupEntryFromGroupMod(groupMod)
202 deviceRules := fu.NewDeviceRules()
203 deviceRules.CreateEntryIfNotExist(agent.rootDeviceID)
204 fg := fu.NewFlowsAndGroups()
205 fg.AddGroup(fu.GroupEntryFromGroupMod(groupMod))
206 deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
207
208 logger.Debugw("rules", log.Fields{"rules-for-group-modify": deviceRules.String()})
209 //update KV
210 if err := agent.updateLogicalDeviceFlowGroup(ctx, groupEntry, groupChunk); err != nil {
211 logger.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
212 return err
213 }
214
215 // Update the devices
khenaidoo0db4c812020-05-27 15:27:30 -0400216 respChnls := agent.updateFlowsAndGroupsOfDevice(deviceRules, &voltha.FlowMetadata{})
Kent Hagerman3136fbd2020-05-14 10:30:45 -0400217
218 // Wait for completion
219 go func() {
220 if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
221 logger.Warnw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
222 //TODO: Revert flow changes
223 }
224 }()
225 return nil
226}