blob: 92f00bff64a94c3ac816c6d822c3b93ead446b31 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
19 "context"
khenaidoo19d7b632018-10-30 10:49:50 -040020 "fmt"
khenaidoob9203542018-09-17 22:56:37 -040021 "github.com/gogo/protobuf/proto"
22 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/db/model"
24 "github.com/opencord/voltha-go/protos/core_adapter"
khenaidoo19d7b632018-10-30 10:49:50 -040025 ofp "github.com/opencord/voltha-go/protos/openflow_13"
khenaidoob9203542018-09-17 22:56:37 -040026 "github.com/opencord/voltha-go/protos/voltha"
khenaidoo19d7b632018-10-30 10:49:50 -040027 fu "github.com/opencord/voltha-go/rw_core/utils"
khenaidoob9203542018-09-17 22:56:37 -040028 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
khenaidoo19d7b632018-10-30 10:49:50 -040030 "reflect"
31 "sync"
khenaidoob9203542018-09-17 22:56:37 -040032)
33
34type DeviceAgent struct {
khenaidoo9a468962018-09-19 15:33:13 -040035 deviceId string
khenaidoofdbad6e2018-11-06 22:26:38 -050036 deviceType string
khenaidoo9a468962018-09-19 15:33:13 -040037 lastData *voltha.Device
38 adapterProxy *AdapterProxy
39 deviceMgr *DeviceManager
40 clusterDataProxy *model.Proxy
khenaidoo92e62c52018-10-03 14:02:54 -040041 deviceProxy *model.Proxy
khenaidoo9a468962018-09-19 15:33:13 -040042 exitChannel chan int
khenaidoo19d7b632018-10-30 10:49:50 -040043 flowProxy *model.Proxy
44 groupProxy *model.Proxy
khenaidoo92e62c52018-10-03 14:02:54 -040045 lockDevice sync.RWMutex
khenaidoob9203542018-09-17 22:56:37 -040046}
47
khenaidoo4d4802d2018-10-04 21:59:49 -040048//newDeviceAgent creates a new device agent along as creating a unique ID for the device and set the device state to
49//preprovisioning
khenaidoo9a468962018-09-19 15:33:13 -040050func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy) *DeviceAgent {
khenaidoob9203542018-09-17 22:56:37 -040051 var agent DeviceAgent
khenaidoob9203542018-09-17 22:56:37 -040052 agent.adapterProxy = ap
khenaidoo92e62c52018-10-03 14:02:54 -040053 cloned := (proto.Clone(device)).(*voltha.Device)
54 cloned.Id = CreateDeviceId()
55 cloned.AdminState = voltha.AdminState_PREPROVISIONED
khenaidoo19d7b632018-10-30 10:49:50 -040056 cloned.FlowGroups = &ofp.FlowGroups{Items: nil}
57 cloned.Flows = &ofp.Flows{Items: nil}
58 if !device.GetRoot() && device.ProxyAddress != nil {
59 // Set the default vlan ID to the one specified by the parent adapter. It can be
60 // overwritten by the child adapter during a device update request
61 cloned.Vlan = device.ProxyAddress.ChannelId
62 }
khenaidoo92e62c52018-10-03 14:02:54 -040063 agent.deviceId = cloned.Id
khenaidoofdbad6e2018-11-06 22:26:38 -050064 agent.deviceType = cloned.Type
khenaidoo92e62c52018-10-03 14:02:54 -040065 agent.lastData = cloned
khenaidoob9203542018-09-17 22:56:37 -040066 agent.deviceMgr = deviceMgr
67 agent.exitChannel = make(chan int, 1)
khenaidoo9a468962018-09-19 15:33:13 -040068 agent.clusterDataProxy = cdProxy
khenaidoo92e62c52018-10-03 14:02:54 -040069 agent.lockDevice = sync.RWMutex{}
khenaidoob9203542018-09-17 22:56:37 -040070 return &agent
71}
72
khenaidoo4d4802d2018-10-04 21:59:49 -040073// start save the device to the data model and registers for callbacks on that device
khenaidoob9203542018-09-17 22:56:37 -040074func (agent *DeviceAgent) start(ctx context.Context) {
khenaidoo92e62c52018-10-03 14:02:54 -040075 agent.lockDevice.Lock()
76 defer agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -040077 log.Debugw("starting-device-agent", log.Fields{"device": agent.lastData})
78 // Add the initial device to the local model
khenaidoo9a468962018-09-19 15:33:13 -040079 if added := agent.clusterDataProxy.Add("/devices", agent.lastData, ""); added == nil {
khenaidoob9203542018-09-17 22:56:37 -040080 log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
81 }
khenaidoo92e62c52018-10-03 14:02:54 -040082 agent.deviceProxy = agent.clusterDataProxy.Root.GetProxy("/devices/"+agent.deviceId, false)
khenaidoo92e62c52018-10-03 14:02:54 -040083 agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate, nil)
khenaidoo19d7b632018-10-30 10:49:50 -040084
85 agent.flowProxy = agent.clusterDataProxy.Root.GetProxy(
86 fmt.Sprintf("/devices/%s/flows", agent.deviceId),
87 false)
88 agent.groupProxy = agent.clusterDataProxy.Root.GetProxy(
89 fmt.Sprintf("/devices/%s/flow_groups", agent.deviceId),
90 false)
91
92 agent.flowProxy.RegisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
93 //agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
94
khenaidoob9203542018-09-17 22:56:37 -040095 log.Debug("device-agent-started")
96}
97
khenaidoo4d4802d2018-10-04 21:59:49 -040098// stop stops the device agent. Not much to do for now
99func (agent *DeviceAgent) stop(ctx context.Context) {
khenaidoo92e62c52018-10-03 14:02:54 -0400100 agent.lockDevice.Lock()
101 defer agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400102 log.Debug("stopping-device-agent")
103 agent.exitChannel <- 1
104 log.Debug("device-agent-stopped")
105}
106
khenaidoo19d7b632018-10-30 10:49:50 -0400107// GetDevice retrieves the latest device information from the data model
khenaidoo92e62c52018-10-03 14:02:54 -0400108func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
109 agent.lockDevice.Lock()
110 defer agent.lockDevice.Unlock()
111 if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
112 if d, ok := device.(*voltha.Device); ok {
113 cloned := proto.Clone(d).(*voltha.Device)
114 return cloned, nil
115 }
116 }
117 return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
118}
119
khenaidoo4d4802d2018-10-04 21:59:49 -0400120// getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
khenaidoo92e62c52018-10-03 14:02:54 -0400121// This function is meant so that we do not have duplicate code all over the device agent functions
122func (agent *DeviceAgent) getDeviceWithoutLock() (*voltha.Device, error) {
123 if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
124 if d, ok := device.(*voltha.Device); ok {
125 cloned := proto.Clone(d).(*voltha.Device)
126 return cloned, nil
127 }
128 }
129 return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
130}
131
khenaidoo4d4802d2018-10-04 21:59:49 -0400132// enableDevice activates a preprovisioned or disable device
khenaidoob9203542018-09-17 22:56:37 -0400133func (agent *DeviceAgent) enableDevice(ctx context.Context) error {
khenaidoo92e62c52018-10-03 14:02:54 -0400134 agent.lockDevice.Lock()
135 defer agent.lockDevice.Unlock()
136 log.Debugw("enableDevice", log.Fields{"id": agent.deviceId})
137 if device, err := agent.getDeviceWithoutLock(); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400138 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
139 } else {
khenaidoo92e62c52018-10-03 14:02:54 -0400140 if device.AdminState == voltha.AdminState_ENABLED {
141 log.Debugw("device-already-enabled", log.Fields{"id": agent.deviceId})
142 //TODO: Needs customized error message
143 return nil
144 }
khenaidoo4d4802d2018-10-04 21:59:49 -0400145 //TODO: if parent device is disabled then do not enable device
khenaidoo92e62c52018-10-03 14:02:54 -0400146 // Verify whether we need to adopt the device the first time
147 // TODO: A state machine for these state transitions would be better (we just have to handle
148 // a limited set of states now or it may be an overkill)
149 if device.AdminState == voltha.AdminState_PREPROVISIONED {
150 // First send the request to an Adapter and wait for a response
151 if err := agent.adapterProxy.AdoptDevice(ctx, device); err != nil {
152 log.Debugw("adoptDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
khenaidoob9203542018-09-17 22:56:37 -0400153 return err
154 }
khenaidoo92e62c52018-10-03 14:02:54 -0400155 } else {
156 // First send the request to an Adapter and wait for a response
157 if err := agent.adapterProxy.ReEnableDevice(ctx, device); err != nil {
158 log.Debugw("renableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
159 return err
160 }
161 }
162 // Received an Ack (no error found above). Now update the device in the model to the expected state
163 cloned := proto.Clone(device).(*voltha.Device)
164 cloned.AdminState = voltha.AdminState_ENABLED
165 cloned.OperStatus = voltha.OperStatus_ACTIVATING
166 if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
167 return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
khenaidoob9203542018-09-17 22:56:37 -0400168 }
169 }
170 return nil
171}
172
khenaidoo19d7b632018-10-30 10:49:50 -0400173func (agent *DeviceAgent) updateFlows(flows []*ofp.OfpFlowStats) error {
174 agent.lockDevice.Lock()
175 defer agent.lockDevice.Unlock()
176 log.Debugw("updateFlows", log.Fields{"deviceId": agent.deviceId, "flows": flows})
177 var oldData *voltha.Flows
178 if storedData, err := agent.getDeviceWithoutLock(); err != nil {
179 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
180 } else {
181 oldData = proto.Clone(storedData.Flows).(*voltha.Flows)
182 log.Debugw("updateFlows", log.Fields{"deviceId": agent.deviceId, "flows": flows, "old": oldData})
183 // store the changed data
184 storedData.Flows.Items = flows
185 afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, storedData, false, "")
186 if afterUpdate == nil {
187 return status.Errorf(codes.Internal, "%s", agent.deviceId)
188 }
189
190 // For now, force the callback to occur
191 go agent.flowTableUpdated(oldData, &ofp.Flows{Items: flows})
192 return nil
193 }
194}
195
196func (agent *DeviceAgent) updateGroups(groups []*ofp.OfpGroupEntry) error {
197 agent.lockDevice.Lock()
198 defer agent.lockDevice.Unlock()
199 var oldData *voltha.FlowGroups
200 log.Debugw("updateGroups", log.Fields{"deviceId": agent.deviceId, "groups": groups})
201 if storedData, err := agent.getDeviceWithoutLock(); err != nil {
202 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
203 } else {
204 oldData = proto.Clone(storedData.FlowGroups).(*voltha.FlowGroups)
205 // store the changed data
206 storedData.FlowGroups.Items = groups
207 afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, storedData, false, "")
208 if afterUpdate == nil {
209 return status.Errorf(codes.Internal, "%s", agent.deviceId)
210 }
211
212 // For now, force the callback to occur
213 go agent.groupTableUpdated(oldData, &ofp.FlowGroups{Items: groups})
214 return nil
215 }
216}
217
khenaidoo4d4802d2018-10-04 21:59:49 -0400218//disableDevice disable a device
khenaidoo92e62c52018-10-03 14:02:54 -0400219func (agent *DeviceAgent) disableDevice(ctx context.Context) error {
220 agent.lockDevice.Lock()
221 //defer agent.lockDevice.Unlock()
222 log.Debugw("disableDevice", log.Fields{"id": agent.deviceId})
223 // Get the most up to date the device info
224 if device, err := agent.getDeviceWithoutLock(); err != nil {
225 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
226 } else {
227 if device.AdminState == voltha.AdminState_DISABLED {
228 log.Debugw("device-already-disabled", log.Fields{"id": agent.deviceId})
229 //TODO: Needs customized error message
230 agent.lockDevice.Unlock()
231 return nil
232 }
233 // First send the request to an Adapter and wait for a response
234 if err := agent.adapterProxy.DisableDevice(ctx, device); err != nil {
235 log.Debugw("disableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
236 agent.lockDevice.Unlock()
237 return err
238 }
239 // Received an Ack (no error found above). Now update the device in the model to the expected state
240 cloned := proto.Clone(device).(*voltha.Device)
241 cloned.AdminState = voltha.AdminState_DISABLED
242 // Set the state of all ports on that device to disable
243 for _, port := range cloned.Ports {
244 port.AdminState = voltha.AdminState_DISABLED
245 port.OperStatus = voltha.OperStatus_UNKNOWN
246 }
247 if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
248 agent.lockDevice.Unlock()
249 return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
250 }
251 agent.lockDevice.Unlock()
252 //TODO: callback will be invoked to handle this state change
253 //For now force the state transition to happen
254 if err := agent.deviceMgr.processTransition(device, cloned); err != nil {
255 log.Warnw("process-transition-error", log.Fields{"deviceid": device.Id, "error": err})
256 return err
257 }
258 }
259 return nil
260}
261
khenaidoo4d4802d2018-10-04 21:59:49 -0400262func (agent *DeviceAgent) rebootDevice(ctx context.Context) error {
263 agent.lockDevice.Lock()
264 defer agent.lockDevice.Unlock()
265 log.Debugw("rebootDevice", log.Fields{"id": agent.deviceId})
266 // Get the most up to date the device info
267 if device, err := agent.getDeviceWithoutLock(); err != nil {
268 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
269 } else {
270 if device.AdminState != voltha.AdminState_DISABLED {
271 log.Debugw("device-not-disabled", log.Fields{"id": agent.deviceId})
272 //TODO: Needs customized error message
273 return status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceId, voltha.AdminState_DISABLED)
274 }
275 // First send the request to an Adapter and wait for a response
276 if err := agent.adapterProxy.RebootDevice(ctx, device); err != nil {
277 log.Debugw("rebootDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
278 return err
279 }
280 }
281 return nil
282}
283
284func (agent *DeviceAgent) deleteDevice(ctx context.Context) error {
285 agent.lockDevice.Lock()
286 log.Debugw("deleteDevice", log.Fields{"id": agent.deviceId})
287 // Get the most up to date the device info
288 if device, err := agent.getDeviceWithoutLock(); err != nil {
289 agent.lockDevice.Unlock()
290 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
291 } else {
292 if device.AdminState != voltha.AdminState_DISABLED {
293 log.Debugw("device-not-disabled", log.Fields{"id": agent.deviceId})
294 //TODO: Needs customized error message
295 agent.lockDevice.Unlock()
296 return status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceId, voltha.AdminState_DISABLED)
297 }
298 // Send the request to an Adapter and wait for a response
299 if err := agent.adapterProxy.DeleteDevice(ctx, device); err != nil {
300 log.Debugw("deleteDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
301 agent.lockDevice.Unlock()
302 return err
303 }
304 // Set the device Admin state to DELETED in order to trigger the callback to delete
305 // child devices, if any
306 // Received an Ack (no error found above). Now update the device in the model to the expected state
307 cloned := proto.Clone(device).(*voltha.Device)
308 cloned.AdminState = voltha.AdminState_DELETED
309 if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
310 agent.lockDevice.Unlock()
311 return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
312 }
313 agent.lockDevice.Unlock()
314 //TODO: callback will be invoked to handle this state change
315 //For now force the state transition to happen
316 if err := agent.deviceMgr.processTransition(device, cloned); err != nil {
317 log.Warnw("process-transition-error", log.Fields{"deviceid": device.Id, "error": err})
318 return err
319 }
320
321 }
322 return nil
323}
324
325// getPorts retrieves the ports information of the device based on the port type.
khenaidoo92e62c52018-10-03 14:02:54 -0400326func (agent *DeviceAgent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
327 log.Debugw("getPorts", log.Fields{"id": agent.deviceId, "portType": portType})
khenaidoob9203542018-09-17 22:56:37 -0400328 ports := &voltha.Ports{}
khenaidoo19d7b632018-10-30 10:49:50 -0400329 if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
khenaidoob9203542018-09-17 22:56:37 -0400330 for _, port := range device.Ports {
khenaidoo92e62c52018-10-03 14:02:54 -0400331 if port.Type == portType {
khenaidoob9203542018-09-17 22:56:37 -0400332 ports.Items = append(ports.Items, port)
333 }
334 }
335 }
336 return ports
337}
338
khenaidoo4d4802d2018-10-04 21:59:49 -0400339// getSwitchCapability is a helper method that a logical device agent uses to retrieve the switch capability of a
340// parent device
khenaidoob9203542018-09-17 22:56:37 -0400341func (agent *DeviceAgent) getSwitchCapability(ctx context.Context) (*core_adapter.SwitchCapability, error) {
342 log.Debugw("getSwitchCapability", log.Fields{"deviceId": agent.deviceId})
khenaidoo19d7b632018-10-30 10:49:50 -0400343 if device, err := agent.deviceMgr.GetDevice(agent.deviceId); device == nil {
khenaidoob9203542018-09-17 22:56:37 -0400344 return nil, err
345 } else {
346 var switchCap *core_adapter.SwitchCapability
347 var err error
348 if switchCap, err = agent.adapterProxy.GetOfpDeviceInfo(ctx, device); err != nil {
349 log.Debugw("getSwitchCapability-error", log.Fields{"id": device.Id, "error": err})
350 return nil, err
351 }
352 return switchCap, nil
353 }
354}
355
khenaidoo4d4802d2018-10-04 21:59:49 -0400356// getPortCapability is a helper method that a logical device agent uses to retrieve the port capability of a
357// device
khenaidoob9203542018-09-17 22:56:37 -0400358func (agent *DeviceAgent) getPortCapability(ctx context.Context, portNo uint32) (*core_adapter.PortCapability, error) {
359 log.Debugw("getPortCapability", log.Fields{"deviceId": agent.deviceId})
khenaidoo19d7b632018-10-30 10:49:50 -0400360 if device, err := agent.deviceMgr.GetDevice(agent.deviceId); device == nil {
khenaidoob9203542018-09-17 22:56:37 -0400361 return nil, err
362 } else {
363 var portCap *core_adapter.PortCapability
364 var err error
365 if portCap, err = agent.adapterProxy.GetOfpPortInfo(ctx, device, portNo); err != nil {
366 log.Debugw("getPortCapability-error", log.Fields{"id": device.Id, "error": err})
367 return nil, err
368 }
369 return portCap, nil
370 }
371}
372
khenaidoofdbad6e2018-11-06 22:26:38 -0500373func (agent *DeviceAgent) packetOut(outPort uint32, packet *ofp.OfpPacketOut) error {
374 // Send packet to adapter
375 if err := agent.adapterProxy.packetOut(agent.deviceType, agent.deviceId, outPort, packet); err != nil {
376 log.Debugw("packet-out-error", log.Fields{"id": agent.lastData.Id, "error": err})
377 return err
378 }
379 return nil
380}
381
382
khenaidoo4d4802d2018-10-04 21:59:49 -0400383// TODO: implement when callback from the data model is ready
384// processUpdate is a callback invoked whenever there is a change on the device manages by this device agent
khenaidoo92e62c52018-10-03 14:02:54 -0400385func (agent *DeviceAgent) processUpdate(args ...interface{}) interface{} {
386 log.Debug("!!!!!!!!!!!!!!!!!!!!!!!!!")
387 log.Debugw("processUpdate", log.Fields{"deviceId": agent.deviceId, "args": args})
388 return nil
389}
390
khenaidoob9203542018-09-17 22:56:37 -0400391func (agent *DeviceAgent) updateDevice(device *voltha.Device) error {
khenaidoo92e62c52018-10-03 14:02:54 -0400392 agent.lockDevice.Lock()
393 //defer agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400394 log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
395 // Get the dev info from the model
khenaidoo92e62c52018-10-03 14:02:54 -0400396 if storedData, err := agent.getDeviceWithoutLock(); err != nil {
397 agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400398 return status.Errorf(codes.NotFound, "%s", device.Id)
399 } else {
400 // store the changed data
khenaidoo92e62c52018-10-03 14:02:54 -0400401 cloned := proto.Clone(device).(*voltha.Device)
khenaidoo9a468962018-09-19 15:33:13 -0400402 afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
khenaidoo92e62c52018-10-03 14:02:54 -0400403 agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400404 if afterUpdate == nil {
405 return status.Errorf(codes.Internal, "%s", device.Id)
406 }
407 // Perform the state transition
408 if err := agent.deviceMgr.processTransition(storedData, cloned); err != nil {
409 log.Warnw("process-transition-error", log.Fields{"deviceid": device.Id, "error": err})
410 return err
411 }
412 return nil
413 }
414}
415
khenaidoo92e62c52018-10-03 14:02:54 -0400416func (agent *DeviceAgent) updateDeviceStatus(operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
417 agent.lockDevice.Lock()
418 //defer agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400419 // Work only on latest data
khenaidoo92e62c52018-10-03 14:02:54 -0400420 if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
421 agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400422 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
423 } else {
424 // clone the device
khenaidoo92e62c52018-10-03 14:02:54 -0400425 cloned := proto.Clone(storeDevice).(*voltha.Device)
426 // Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
427 if s, ok := voltha.ConnectStatus_ConnectStatus_value[connStatus.String()]; ok {
428 log.Debugw("updateDeviceStatus-conn", log.Fields{"ok": ok, "val": s})
429 cloned.ConnectStatus = connStatus
khenaidoob9203542018-09-17 22:56:37 -0400430 }
khenaidoo92e62c52018-10-03 14:02:54 -0400431 if s, ok := voltha.OperStatus_OperStatus_value[operStatus.String()]; ok {
432 log.Debugw("updateDeviceStatus-oper", log.Fields{"ok": ok, "val": s})
433 cloned.OperStatus = operStatus
khenaidoob9203542018-09-17 22:56:37 -0400434 }
khenaidoo92e62c52018-10-03 14:02:54 -0400435 log.Debugw("updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
khenaidoob9203542018-09-17 22:56:37 -0400436 // Store the device
khenaidoo92e62c52018-10-03 14:02:54 -0400437 if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
438 agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400439 return status.Errorf(codes.Internal, "%s", agent.deviceId)
440 }
khenaidoo92e62c52018-10-03 14:02:54 -0400441 agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400442 // Perform the state transition
khenaidoo92e62c52018-10-03 14:02:54 -0400443 if err := agent.deviceMgr.processTransition(storeDevice, cloned); err != nil {
444 log.Warnw("process-transition-error", log.Fields{"deviceid": agent.deviceId, "error": err})
445 return err
446 }
447 return nil
448 }
449}
450
451func (agent *DeviceAgent) updatePortState(portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_OperStatus) error {
452 agent.lockDevice.Lock()
453 //defer agent.lockDevice.Unlock()
454 // Work only on latest data
455 // TODO: Get list of ports from device directly instead of the entire device
456 if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
457 agent.lockDevice.Unlock()
458 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
459 } else {
460 // clone the device
461 cloned := proto.Clone(storeDevice).(*voltha.Device)
462 // Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
463 if _, ok := voltha.Port_PortType_value[portType.String()]; !ok {
464 agent.lockDevice.Unlock()
465 return status.Errorf(codes.InvalidArgument, "%s", portType)
466 }
467 for _, port := range cloned.Ports {
468 if port.Type == portType && port.PortNo == portNo {
469 port.OperStatus = operStatus
470 // Set the admin status to ENABLED if the operational status is ACTIVE
471 // TODO: Set by northbound system?
472 if operStatus == voltha.OperStatus_ACTIVE {
473 port.AdminState = voltha.AdminState_ENABLED
474 }
475 break
476 }
477 }
478 log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
479 // Store the device
480 if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
481 agent.lockDevice.Unlock()
482 return status.Errorf(codes.Internal, "%s", agent.deviceId)
483 }
484 agent.lockDevice.Unlock()
485 // Perform the state transition
486 if err := agent.deviceMgr.processTransition(storeDevice, cloned); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400487 log.Warnw("process-transition-error", log.Fields{"deviceid": agent.deviceId, "error": err})
488 return err
489 }
490 return nil
491 }
492}
493
494func (agent *DeviceAgent) updatePmConfigs(pmConfigs *voltha.PmConfigs) error {
khenaidoo92e62c52018-10-03 14:02:54 -0400495 agent.lockDevice.Lock()
496 defer agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400497 log.Debug("updatePmConfigs")
498 // Work only on latest data
khenaidoo92e62c52018-10-03 14:02:54 -0400499 if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400500 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
501 } else {
502 // clone the device
khenaidoo92e62c52018-10-03 14:02:54 -0400503 cloned := proto.Clone(storeDevice).(*voltha.Device)
504 cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
khenaidoob9203542018-09-17 22:56:37 -0400505 // Store the device
khenaidoo92e62c52018-10-03 14:02:54 -0400506 afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
khenaidoob9203542018-09-17 22:56:37 -0400507 if afterUpdate == nil {
508 return status.Errorf(codes.Internal, "%s", agent.deviceId)
509 }
510 return nil
511 }
512}
513
514func (agent *DeviceAgent) addPort(port *voltha.Port) error {
khenaidoo92e62c52018-10-03 14:02:54 -0400515 agent.lockDevice.Lock()
516 defer agent.lockDevice.Unlock()
517 log.Debugw("addPort", log.Fields{"deviceId": agent.deviceId})
khenaidoob9203542018-09-17 22:56:37 -0400518 // Work only on latest data
khenaidoo92e62c52018-10-03 14:02:54 -0400519 if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400520 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
521 } else {
522 // clone the device
khenaidoo92e62c52018-10-03 14:02:54 -0400523 cloned := proto.Clone(storeDevice).(*voltha.Device)
khenaidoob9203542018-09-17 22:56:37 -0400524 if cloned.Ports == nil {
525 // First port
khenaidoo92e62c52018-10-03 14:02:54 -0400526 log.Debugw("addPort-first-port-to-add", log.Fields{"deviceId": agent.deviceId})
khenaidoob9203542018-09-17 22:56:37 -0400527 cloned.Ports = make([]*voltha.Port, 0)
528 }
khenaidoo92e62c52018-10-03 14:02:54 -0400529 cp := proto.Clone(port).(*voltha.Port)
530 // Set the admin state of the port to ENABLE if the operational state is ACTIVE
531 // TODO: Set by northbound system?
532 if cp.OperStatus == voltha.OperStatus_ACTIVE {
533 cp.AdminState = voltha.AdminState_ENABLED
534 }
535 cloned.Ports = append(cloned.Ports, cp)
khenaidoob9203542018-09-17 22:56:37 -0400536 // Store the device
khenaidoo92e62c52018-10-03 14:02:54 -0400537 afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
538 if afterUpdate == nil {
539 return status.Errorf(codes.Internal, "%s", agent.deviceId)
540 }
541 return nil
542 }
543}
544
545func (agent *DeviceAgent) addPeerPort(port *voltha.Port_PeerPort) error {
546 agent.lockDevice.Lock()
547 defer agent.lockDevice.Unlock()
548 log.Debug("addPeerPort")
549 // Work only on latest data
550 if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
551 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
552 } else {
553 // clone the device
554 cloned := proto.Clone(storeDevice).(*voltha.Device)
555 // Get the peer port on the device based on the port no
556 for _, peerPort := range cloned.Ports {
557 if peerPort.PortNo == port.PortNo { // found port
558 cp := proto.Clone(port).(*voltha.Port_PeerPort)
559 peerPort.Peers = append(peerPort.Peers, cp)
560 log.Debugw("found-peer", log.Fields{"portNo": port.PortNo, "deviceId": agent.deviceId})
561 break
562 }
563 }
khenaidoo4d4802d2018-10-04 21:59:49 -0400564 //To track an issue when adding peer-port.
565 log.Debugw("before-peer-added", log.Fields{"device": cloned})
khenaidoo92e62c52018-10-03 14:02:54 -0400566 // Store the device
567 afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
khenaidoob9203542018-09-17 22:56:37 -0400568 if afterUpdate == nil {
569 return status.Errorf(codes.Internal, "%s", agent.deviceId)
570 }
khenaidoo4d4802d2018-10-04 21:59:49 -0400571 //To track an issue when adding peer-port.
572 if d, ok := afterUpdate.(*voltha.Device); ok {
573 log.Debugw("after-peer-added", log.Fields{"device": d})
574 } else {
575 log.Debug("after-peer-added-incorrect-type", log.Fields{"type": reflect.ValueOf(afterUpdate).Type()})
576 }
577
khenaidoob9203542018-09-17 22:56:37 -0400578 return nil
579 }
580}
581
khenaidoo19d7b632018-10-30 10:49:50 -0400582//flowTableUpdated is the callback after flows have been updated in the model to push them
583//to the adapters
584func (agent *DeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
585 log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
586
587 agent.lockDevice.Lock()
588 defer agent.lockDevice.Unlock()
589
590 var previousData *voltha.Flows
591 var latestData *voltha.Flows
592
593 var ok bool
594 if previousData, ok = args[0].(*ofp.Flows); !ok {
595 log.Errorw("invalid-args", log.Fields{"args0": args[0]})
596 return nil
597 }
598 if latestData, ok = args[1].(*ofp.Flows); !ok {
599 log.Errorw("invalid-args", log.Fields{"args1": args[1]})
600 return nil
601 }
602
603 // Sanity check - should not happen as this is already handled in logical device agent
604 if reflect.DeepEqual(previousData.Items, latestData.Items) {
605 log.Debugw("flow-update-not-required", log.Fields{"previous": previousData.Items, "new": latestData.Items})
606 return nil
607 }
608
609 var device *voltha.Device
610 var err error
611 if device, err = agent.getDeviceWithoutLock(); err != nil {
612 log.Errorw("no-device", log.Fields{"id": agent.deviceId, "error": err})
613 return nil
614 }
615 groups := device.FlowGroups
616
617 // Send update to adapters
618 // Check whether the device supports incremental flow changes
619 // Assume false for test
620 acceptsAddRemoveFlowUpdates := false
621 if !acceptsAddRemoveFlowUpdates {
622 if err := agent.adapterProxy.UpdateFlowsBulk(device, latestData, groups); err != nil {
623 log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
624 return err
625 }
626 return nil
627 }
628 // Incremental flow changes accepted
629 var toAdd []*ofp.OfpFlowStats
630 var toDelete []*ofp.OfpFlowStats
631
632 for _, flow := range latestData.Items {
633 if fu.FindFlowById(previousData.Items, flow) == -1 { // did not exist before
634 toAdd = append(toAdd, flow)
635 }
636 }
637 for _, flow := range previousData.Items {
638 if fu.FindFlowById(latestData.Items, flow) == -1 { // does not exist now
639 toDelete = append(toDelete, flow)
640 }
641 }
642 flowChanges := &ofp.FlowChanges{
643 ToAdd: &voltha.Flows{Items: toAdd},
644 ToRemove: &voltha.Flows{Items: toDelete},
645 }
646 // Send an empty group changes as it would be dealt with a call to groupTableUpdated
647 groupChanges := &ofp.FlowGroupChanges{}
648
649 // Send changes only
650 if err := agent.adapterProxy.UpdateFlowsIncremental(device, flowChanges, groupChanges); err != nil {
651 log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
652 return err
653 }
654
655 return nil
656}
657
658//groupTableUpdated is the callback after group table has been updated in the model to push them
659//to the adapters
660func (agent *DeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
661 log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
662
663 agent.lockDevice.Lock()
664 defer agent.lockDevice.Unlock()
665
666 var previousData *voltha.FlowGroups
667 var latestData *voltha.FlowGroups
668
669 var ok bool
670 if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
671 log.Errorw("invalid-args", log.Fields{"args0": args[0]})
672 return nil
673 }
674 if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
675 log.Errorw("invalid-args", log.Fields{"args1": args[1]})
676 return nil
677 }
678
679 // Sanity check - should not happen as this is already handled in logical device agent
680 if reflect.DeepEqual(previousData.Items, latestData.Items) {
681 log.Debugw("group-table-update-not-required", log.Fields{"previous": previousData.Items, "new": latestData.Items})
682 return nil
683 }
684
685 var device *voltha.Device
686 var err error
687 if device, err = agent.getDeviceWithoutLock(); err != nil {
688 log.Errorw("no-device", log.Fields{"id": agent.deviceId, "error": err})
689 return nil
690 }
691 flows := device.Flows
692
693 // Send update to adapters
694 // Check whether the device supports incremental flow changes
695 // Assume false for test
696 acceptsAddRemoveFlowUpdates := false
697 if !acceptsAddRemoveFlowUpdates {
698 if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, latestData); err != nil {
699 log.Debugw("update-flows-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
700 return err
701 }
702 return nil
703 }
704
705 // Incremental group changes accepted
706 var toAdd []*ofp.OfpGroupEntry
707 var toDelete []*ofp.OfpGroupEntry
708 var toUpdate []*ofp.OfpGroupEntry
709
710 for _, group := range latestData.Items {
711 if idx := fu.FindGroup(previousData.Items, group.Desc.GroupId); idx == -1 { // did not exist before
712 toAdd = append(toAdd, group)
713 } else { // existed before
714 if previousData.Items[idx].String() != group.String() { // there is a change
715 toUpdate = append(toUpdate, group)
716 }
717 }
718 }
719 for _, group := range previousData.Items {
720 if fu.FindGroup(latestData.Items, group.Desc.GroupId) == -1 { // does not exist now
721 toDelete = append(toDelete, group)
722 }
723 }
724 groupChanges := &ofp.FlowGroupChanges{
725 ToAdd: &voltha.FlowGroups{Items: toAdd},
726 ToRemove: &voltha.FlowGroups{Items: toDelete},
727 ToUpdate: &voltha.FlowGroups{Items: toUpdate},
728 }
729 // Send an empty flow changes as it should have been dealt with a call to flowTableUpdated
730 flowChanges := &ofp.FlowChanges{}
731
732 // Send changes only
733 if err := agent.adapterProxy.UpdateFlowsIncremental(device, flowChanges, groupChanges); err != nil {
734 log.Debugw("update-incremental-group-error", log.Fields{"id": agent.lastData.Id, "error": err})
735 return err
736 }
737 return nil
738}
739
khenaidoob9203542018-09-17 22:56:37 -0400740// TODO: A generic device update by attribute
741func (agent *DeviceAgent) updateDeviceAttribute(name string, value interface{}) {
khenaidoo92e62c52018-10-03 14:02:54 -0400742 agent.lockDevice.Lock()
743 defer agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400744 if value == nil {
745 return
746 }
747 var storeDevice *voltha.Device
748 var err error
khenaidoo92e62c52018-10-03 14:02:54 -0400749 if storeDevice, err = agent.getDeviceWithoutLock(); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400750 return
751 }
752 updated := false
753 s := reflect.ValueOf(storeDevice).Elem()
754 if s.Kind() == reflect.Struct {
755 // exported field
756 f := s.FieldByName(name)
757 if f.IsValid() && f.CanSet() {
758 switch f.Kind() {
759 case reflect.String:
760 f.SetString(value.(string))
761 updated = true
762 case reflect.Uint32:
763 f.SetUint(uint64(value.(uint32)))
764 updated = true
765 case reflect.Bool:
766 f.SetBool(value.(bool))
767 updated = true
768 }
769 }
770 }
khenaidoo92e62c52018-10-03 14:02:54 -0400771 log.Debugw("update-field-status", log.Fields{"deviceId": storeDevice.Id, "name": name, "updated": updated})
khenaidoob9203542018-09-17 22:56:37 -0400772 // Save the data
khenaidoo92e62c52018-10-03 14:02:54 -0400773 cloned := proto.Clone(storeDevice).(*voltha.Device)
774 if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
khenaidoob9203542018-09-17 22:56:37 -0400775 log.Warnw("attribute-update-failed", log.Fields{"attribute": name, "value": value})
776 }
777 return
778}