blob: e045fc98aa51d7ee3a82526158f716f063f1b0e5 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
19 "context"
khenaidoo19d7b632018-10-30 10:49:50 -040020 "fmt"
khenaidoob9203542018-09-17 22:56:37 -040021 "github.com/gogo/protobuf/proto"
22 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/db/model"
24 "github.com/opencord/voltha-go/protos/core_adapter"
khenaidoo19d7b632018-10-30 10:49:50 -040025 ofp "github.com/opencord/voltha-go/protos/openflow_13"
khenaidoob9203542018-09-17 22:56:37 -040026 "github.com/opencord/voltha-go/protos/voltha"
khenaidoo19d7b632018-10-30 10:49:50 -040027 fu "github.com/opencord/voltha-go/rw_core/utils"
khenaidoob9203542018-09-17 22:56:37 -040028 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
khenaidoo19d7b632018-10-30 10:49:50 -040030 "reflect"
31 "sync"
khenaidoob9203542018-09-17 22:56:37 -040032)
33
34type DeviceAgent struct {
khenaidoo9a468962018-09-19 15:33:13 -040035 deviceId string
36 lastData *voltha.Device
37 adapterProxy *AdapterProxy
38 deviceMgr *DeviceManager
39 clusterDataProxy *model.Proxy
khenaidoo92e62c52018-10-03 14:02:54 -040040 deviceProxy *model.Proxy
khenaidoo9a468962018-09-19 15:33:13 -040041 exitChannel chan int
khenaidoo19d7b632018-10-30 10:49:50 -040042 flowProxy *model.Proxy
43 groupProxy *model.Proxy
khenaidoo92e62c52018-10-03 14:02:54 -040044 lockDevice sync.RWMutex
khenaidoob9203542018-09-17 22:56:37 -040045}
46
khenaidoo4d4802d2018-10-04 21:59:49 -040047//newDeviceAgent creates a new device agent along as creating a unique ID for the device and set the device state to
48//preprovisioning
khenaidoo9a468962018-09-19 15:33:13 -040049func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy) *DeviceAgent {
khenaidoob9203542018-09-17 22:56:37 -040050 var agent DeviceAgent
khenaidoob9203542018-09-17 22:56:37 -040051 agent.adapterProxy = ap
khenaidoo92e62c52018-10-03 14:02:54 -040052 cloned := (proto.Clone(device)).(*voltha.Device)
53 cloned.Id = CreateDeviceId()
54 cloned.AdminState = voltha.AdminState_PREPROVISIONED
khenaidoo19d7b632018-10-30 10:49:50 -040055 cloned.FlowGroups = &ofp.FlowGroups{Items: nil}
56 cloned.Flows = &ofp.Flows{Items: nil}
57 if !device.GetRoot() && device.ProxyAddress != nil {
58 // Set the default vlan ID to the one specified by the parent adapter. It can be
59 // overwritten by the child adapter during a device update request
60 cloned.Vlan = device.ProxyAddress.ChannelId
61 }
khenaidoo92e62c52018-10-03 14:02:54 -040062 agent.deviceId = cloned.Id
63 agent.lastData = cloned
khenaidoob9203542018-09-17 22:56:37 -040064 agent.deviceMgr = deviceMgr
65 agent.exitChannel = make(chan int, 1)
khenaidoo9a468962018-09-19 15:33:13 -040066 agent.clusterDataProxy = cdProxy
khenaidoo92e62c52018-10-03 14:02:54 -040067 agent.lockDevice = sync.RWMutex{}
khenaidoob9203542018-09-17 22:56:37 -040068 return &agent
69}
70
khenaidoo4d4802d2018-10-04 21:59:49 -040071// start save the device to the data model and registers for callbacks on that device
khenaidoob9203542018-09-17 22:56:37 -040072func (agent *DeviceAgent) start(ctx context.Context) {
khenaidoo92e62c52018-10-03 14:02:54 -040073 agent.lockDevice.Lock()
74 defer agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -040075 log.Debugw("starting-device-agent", log.Fields{"device": agent.lastData})
76 // Add the initial device to the local model
khenaidoo9a468962018-09-19 15:33:13 -040077 if added := agent.clusterDataProxy.Add("/devices", agent.lastData, ""); added == nil {
khenaidoob9203542018-09-17 22:56:37 -040078 log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
79 }
khenaidoo92e62c52018-10-03 14:02:54 -040080 agent.deviceProxy = agent.clusterDataProxy.Root.GetProxy("/devices/"+agent.deviceId, false)
khenaidoo92e62c52018-10-03 14:02:54 -040081 agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate, nil)
khenaidoo19d7b632018-10-30 10:49:50 -040082
83 agent.flowProxy = agent.clusterDataProxy.Root.GetProxy(
84 fmt.Sprintf("/devices/%s/flows", agent.deviceId),
85 false)
86 agent.groupProxy = agent.clusterDataProxy.Root.GetProxy(
87 fmt.Sprintf("/devices/%s/flow_groups", agent.deviceId),
88 false)
89
90 agent.flowProxy.RegisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
91 //agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
92
khenaidoob9203542018-09-17 22:56:37 -040093 log.Debug("device-agent-started")
94}
95
khenaidoo4d4802d2018-10-04 21:59:49 -040096// stop stops the device agent. Not much to do for now
97func (agent *DeviceAgent) stop(ctx context.Context) {
khenaidoo92e62c52018-10-03 14:02:54 -040098 agent.lockDevice.Lock()
99 defer agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400100 log.Debug("stopping-device-agent")
101 agent.exitChannel <- 1
102 log.Debug("device-agent-stopped")
103}
104
khenaidoo19d7b632018-10-30 10:49:50 -0400105// GetDevice retrieves the latest device information from the data model
khenaidoo92e62c52018-10-03 14:02:54 -0400106func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
107 agent.lockDevice.Lock()
108 defer agent.lockDevice.Unlock()
109 if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
110 if d, ok := device.(*voltha.Device); ok {
111 cloned := proto.Clone(d).(*voltha.Device)
112 return cloned, nil
113 }
114 }
115 return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
116}
117
khenaidoo4d4802d2018-10-04 21:59:49 -0400118// getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
khenaidoo92e62c52018-10-03 14:02:54 -0400119// This function is meant so that we do not have duplicate code all over the device agent functions
120func (agent *DeviceAgent) getDeviceWithoutLock() (*voltha.Device, error) {
121 if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
122 if d, ok := device.(*voltha.Device); ok {
123 cloned := proto.Clone(d).(*voltha.Device)
124 return cloned, nil
125 }
126 }
127 return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
128}
129
khenaidoo4d4802d2018-10-04 21:59:49 -0400130// enableDevice activates a preprovisioned or disable device
khenaidoob9203542018-09-17 22:56:37 -0400131func (agent *DeviceAgent) enableDevice(ctx context.Context) error {
khenaidoo92e62c52018-10-03 14:02:54 -0400132 agent.lockDevice.Lock()
133 defer agent.lockDevice.Unlock()
134 log.Debugw("enableDevice", log.Fields{"id": agent.deviceId})
135 if device, err := agent.getDeviceWithoutLock(); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400136 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
137 } else {
khenaidoo92e62c52018-10-03 14:02:54 -0400138 if device.AdminState == voltha.AdminState_ENABLED {
139 log.Debugw("device-already-enabled", log.Fields{"id": agent.deviceId})
140 //TODO: Needs customized error message
141 return nil
142 }
khenaidoo4d4802d2018-10-04 21:59:49 -0400143 //TODO: if parent device is disabled then do not enable device
khenaidoo92e62c52018-10-03 14:02:54 -0400144 // Verify whether we need to adopt the device the first time
145 // TODO: A state machine for these state transitions would be better (we just have to handle
146 // a limited set of states now or it may be an overkill)
147 if device.AdminState == voltha.AdminState_PREPROVISIONED {
148 // First send the request to an Adapter and wait for a response
149 if err := agent.adapterProxy.AdoptDevice(ctx, device); err != nil {
150 log.Debugw("adoptDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
khenaidoob9203542018-09-17 22:56:37 -0400151 return err
152 }
khenaidoo92e62c52018-10-03 14:02:54 -0400153 } else {
154 // First send the request to an Adapter and wait for a response
155 if err := agent.adapterProxy.ReEnableDevice(ctx, device); err != nil {
156 log.Debugw("renableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
157 return err
158 }
159 }
160 // Received an Ack (no error found above). Now update the device in the model to the expected state
161 cloned := proto.Clone(device).(*voltha.Device)
162 cloned.AdminState = voltha.AdminState_ENABLED
163 cloned.OperStatus = voltha.OperStatus_ACTIVATING
164 if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
165 return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
khenaidoob9203542018-09-17 22:56:37 -0400166 }
167 }
168 return nil
169}
170
khenaidoo19d7b632018-10-30 10:49:50 -0400171func (agent *DeviceAgent) updateFlows(flows []*ofp.OfpFlowStats) error {
172 agent.lockDevice.Lock()
173 defer agent.lockDevice.Unlock()
174 log.Debugw("updateFlows", log.Fields{"deviceId": agent.deviceId, "flows": flows})
175 var oldData *voltha.Flows
176 if storedData, err := agent.getDeviceWithoutLock(); err != nil {
177 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
178 } else {
179 oldData = proto.Clone(storedData.Flows).(*voltha.Flows)
180 log.Debugw("updateFlows", log.Fields{"deviceId": agent.deviceId, "flows": flows, "old": oldData})
181 // store the changed data
182 storedData.Flows.Items = flows
183 afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, storedData, false, "")
184 if afterUpdate == nil {
185 return status.Errorf(codes.Internal, "%s", agent.deviceId)
186 }
187
188 // For now, force the callback to occur
189 go agent.flowTableUpdated(oldData, &ofp.Flows{Items: flows})
190 return nil
191 }
192}
193
194func (agent *DeviceAgent) updateGroups(groups []*ofp.OfpGroupEntry) error {
195 agent.lockDevice.Lock()
196 defer agent.lockDevice.Unlock()
197 var oldData *voltha.FlowGroups
198 log.Debugw("updateGroups", log.Fields{"deviceId": agent.deviceId, "groups": groups})
199 if storedData, err := agent.getDeviceWithoutLock(); err != nil {
200 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
201 } else {
202 oldData = proto.Clone(storedData.FlowGroups).(*voltha.FlowGroups)
203 // store the changed data
204 storedData.FlowGroups.Items = groups
205 afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, storedData, false, "")
206 if afterUpdate == nil {
207 return status.Errorf(codes.Internal, "%s", agent.deviceId)
208 }
209
210 // For now, force the callback to occur
211 go agent.groupTableUpdated(oldData, &ofp.FlowGroups{Items: groups})
212 return nil
213 }
214}
215
khenaidoo4d4802d2018-10-04 21:59:49 -0400216//disableDevice disable a device
khenaidoo92e62c52018-10-03 14:02:54 -0400217func (agent *DeviceAgent) disableDevice(ctx context.Context) error {
218 agent.lockDevice.Lock()
219 //defer agent.lockDevice.Unlock()
220 log.Debugw("disableDevice", log.Fields{"id": agent.deviceId})
221 // Get the most up to date the device info
222 if device, err := agent.getDeviceWithoutLock(); err != nil {
223 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
224 } else {
225 if device.AdminState == voltha.AdminState_DISABLED {
226 log.Debugw("device-already-disabled", log.Fields{"id": agent.deviceId})
227 //TODO: Needs customized error message
228 agent.lockDevice.Unlock()
229 return nil
230 }
231 // First send the request to an Adapter and wait for a response
232 if err := agent.adapterProxy.DisableDevice(ctx, device); err != nil {
233 log.Debugw("disableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
234 agent.lockDevice.Unlock()
235 return err
236 }
237 // Received an Ack (no error found above). Now update the device in the model to the expected state
238 cloned := proto.Clone(device).(*voltha.Device)
239 cloned.AdminState = voltha.AdminState_DISABLED
240 // Set the state of all ports on that device to disable
241 for _, port := range cloned.Ports {
242 port.AdminState = voltha.AdminState_DISABLED
243 port.OperStatus = voltha.OperStatus_UNKNOWN
244 }
245 if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
246 agent.lockDevice.Unlock()
247 return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
248 }
249 agent.lockDevice.Unlock()
250 //TODO: callback will be invoked to handle this state change
251 //For now force the state transition to happen
252 if err := agent.deviceMgr.processTransition(device, cloned); err != nil {
253 log.Warnw("process-transition-error", log.Fields{"deviceid": device.Id, "error": err})
254 return err
255 }
256 }
257 return nil
258}
259
khenaidoo4d4802d2018-10-04 21:59:49 -0400260func (agent *DeviceAgent) rebootDevice(ctx context.Context) error {
261 agent.lockDevice.Lock()
262 defer agent.lockDevice.Unlock()
263 log.Debugw("rebootDevice", log.Fields{"id": agent.deviceId})
264 // Get the most up to date the device info
265 if device, err := agent.getDeviceWithoutLock(); err != nil {
266 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
267 } else {
268 if device.AdminState != voltha.AdminState_DISABLED {
269 log.Debugw("device-not-disabled", log.Fields{"id": agent.deviceId})
270 //TODO: Needs customized error message
271 return status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceId, voltha.AdminState_DISABLED)
272 }
273 // First send the request to an Adapter and wait for a response
274 if err := agent.adapterProxy.RebootDevice(ctx, device); err != nil {
275 log.Debugw("rebootDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
276 return err
277 }
278 }
279 return nil
280}
281
282func (agent *DeviceAgent) deleteDevice(ctx context.Context) error {
283 agent.lockDevice.Lock()
284 log.Debugw("deleteDevice", log.Fields{"id": agent.deviceId})
285 // Get the most up to date the device info
286 if device, err := agent.getDeviceWithoutLock(); err != nil {
287 agent.lockDevice.Unlock()
288 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
289 } else {
290 if device.AdminState != voltha.AdminState_DISABLED {
291 log.Debugw("device-not-disabled", log.Fields{"id": agent.deviceId})
292 //TODO: Needs customized error message
293 agent.lockDevice.Unlock()
294 return status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceId, voltha.AdminState_DISABLED)
295 }
296 // Send the request to an Adapter and wait for a response
297 if err := agent.adapterProxy.DeleteDevice(ctx, device); err != nil {
298 log.Debugw("deleteDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
299 agent.lockDevice.Unlock()
300 return err
301 }
302 // Set the device Admin state to DELETED in order to trigger the callback to delete
303 // child devices, if any
304 // Received an Ack (no error found above). Now update the device in the model to the expected state
305 cloned := proto.Clone(device).(*voltha.Device)
306 cloned.AdminState = voltha.AdminState_DELETED
307 if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
308 agent.lockDevice.Unlock()
309 return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
310 }
311 agent.lockDevice.Unlock()
312 //TODO: callback will be invoked to handle this state change
313 //For now force the state transition to happen
314 if err := agent.deviceMgr.processTransition(device, cloned); err != nil {
315 log.Warnw("process-transition-error", log.Fields{"deviceid": device.Id, "error": err})
316 return err
317 }
318
319 }
320 return nil
321}
322
323// getPorts retrieves the ports information of the device based on the port type.
khenaidoo92e62c52018-10-03 14:02:54 -0400324func (agent *DeviceAgent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
325 log.Debugw("getPorts", log.Fields{"id": agent.deviceId, "portType": portType})
khenaidoob9203542018-09-17 22:56:37 -0400326 ports := &voltha.Ports{}
khenaidoo19d7b632018-10-30 10:49:50 -0400327 if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
khenaidoob9203542018-09-17 22:56:37 -0400328 for _, port := range device.Ports {
khenaidoo92e62c52018-10-03 14:02:54 -0400329 if port.Type == portType {
khenaidoob9203542018-09-17 22:56:37 -0400330 ports.Items = append(ports.Items, port)
331 }
332 }
333 }
334 return ports
335}
336
khenaidoo4d4802d2018-10-04 21:59:49 -0400337// getSwitchCapability is a helper method that a logical device agent uses to retrieve the switch capability of a
338// parent device
khenaidoob9203542018-09-17 22:56:37 -0400339func (agent *DeviceAgent) getSwitchCapability(ctx context.Context) (*core_adapter.SwitchCapability, error) {
340 log.Debugw("getSwitchCapability", log.Fields{"deviceId": agent.deviceId})
khenaidoo19d7b632018-10-30 10:49:50 -0400341 if device, err := agent.deviceMgr.GetDevice(agent.deviceId); device == nil {
khenaidoob9203542018-09-17 22:56:37 -0400342 return nil, err
343 } else {
344 var switchCap *core_adapter.SwitchCapability
345 var err error
346 if switchCap, err = agent.adapterProxy.GetOfpDeviceInfo(ctx, device); err != nil {
347 log.Debugw("getSwitchCapability-error", log.Fields{"id": device.Id, "error": err})
348 return nil, err
349 }
350 return switchCap, nil
351 }
352}
353
khenaidoo4d4802d2018-10-04 21:59:49 -0400354// getPortCapability is a helper method that a logical device agent uses to retrieve the port capability of a
355// device
khenaidoob9203542018-09-17 22:56:37 -0400356func (agent *DeviceAgent) getPortCapability(ctx context.Context, portNo uint32) (*core_adapter.PortCapability, error) {
357 log.Debugw("getPortCapability", log.Fields{"deviceId": agent.deviceId})
khenaidoo19d7b632018-10-30 10:49:50 -0400358 if device, err := agent.deviceMgr.GetDevice(agent.deviceId); device == nil {
khenaidoob9203542018-09-17 22:56:37 -0400359 return nil, err
360 } else {
361 var portCap *core_adapter.PortCapability
362 var err error
363 if portCap, err = agent.adapterProxy.GetOfpPortInfo(ctx, device, portNo); err != nil {
364 log.Debugw("getPortCapability-error", log.Fields{"id": device.Id, "error": err})
365 return nil, err
366 }
367 return portCap, nil
368 }
369}
370
khenaidoo4d4802d2018-10-04 21:59:49 -0400371// TODO: implement when callback from the data model is ready
372// processUpdate is a callback invoked whenever there is a change on the device manages by this device agent
khenaidoo92e62c52018-10-03 14:02:54 -0400373func (agent *DeviceAgent) processUpdate(args ...interface{}) interface{} {
374 log.Debug("!!!!!!!!!!!!!!!!!!!!!!!!!")
375 log.Debugw("processUpdate", log.Fields{"deviceId": agent.deviceId, "args": args})
376 return nil
377}
378
khenaidoob9203542018-09-17 22:56:37 -0400379func (agent *DeviceAgent) updateDevice(device *voltha.Device) error {
khenaidoo92e62c52018-10-03 14:02:54 -0400380 agent.lockDevice.Lock()
381 //defer agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400382 log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
383 // Get the dev info from the model
khenaidoo92e62c52018-10-03 14:02:54 -0400384 if storedData, err := agent.getDeviceWithoutLock(); err != nil {
385 agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400386 return status.Errorf(codes.NotFound, "%s", device.Id)
387 } else {
388 // store the changed data
khenaidoo92e62c52018-10-03 14:02:54 -0400389 cloned := proto.Clone(device).(*voltha.Device)
khenaidoo9a468962018-09-19 15:33:13 -0400390 afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
khenaidoo92e62c52018-10-03 14:02:54 -0400391 agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400392 if afterUpdate == nil {
393 return status.Errorf(codes.Internal, "%s", device.Id)
394 }
395 // Perform the state transition
396 if err := agent.deviceMgr.processTransition(storedData, cloned); err != nil {
397 log.Warnw("process-transition-error", log.Fields{"deviceid": device.Id, "error": err})
398 return err
399 }
400 return nil
401 }
402}
403
khenaidoo92e62c52018-10-03 14:02:54 -0400404func (agent *DeviceAgent) updateDeviceStatus(operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
405 agent.lockDevice.Lock()
406 //defer agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400407 // Work only on latest data
khenaidoo92e62c52018-10-03 14:02:54 -0400408 if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
409 agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400410 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
411 } else {
412 // clone the device
khenaidoo92e62c52018-10-03 14:02:54 -0400413 cloned := proto.Clone(storeDevice).(*voltha.Device)
414 // Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
415 if s, ok := voltha.ConnectStatus_ConnectStatus_value[connStatus.String()]; ok {
416 log.Debugw("updateDeviceStatus-conn", log.Fields{"ok": ok, "val": s})
417 cloned.ConnectStatus = connStatus
khenaidoob9203542018-09-17 22:56:37 -0400418 }
khenaidoo92e62c52018-10-03 14:02:54 -0400419 if s, ok := voltha.OperStatus_OperStatus_value[operStatus.String()]; ok {
420 log.Debugw("updateDeviceStatus-oper", log.Fields{"ok": ok, "val": s})
421 cloned.OperStatus = operStatus
khenaidoob9203542018-09-17 22:56:37 -0400422 }
khenaidoo92e62c52018-10-03 14:02:54 -0400423 log.Debugw("updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
khenaidoob9203542018-09-17 22:56:37 -0400424 // Store the device
khenaidoo92e62c52018-10-03 14:02:54 -0400425 if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
426 agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400427 return status.Errorf(codes.Internal, "%s", agent.deviceId)
428 }
khenaidoo92e62c52018-10-03 14:02:54 -0400429 agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400430 // Perform the state transition
khenaidoo92e62c52018-10-03 14:02:54 -0400431 if err := agent.deviceMgr.processTransition(storeDevice, cloned); err != nil {
432 log.Warnw("process-transition-error", log.Fields{"deviceid": agent.deviceId, "error": err})
433 return err
434 }
435 return nil
436 }
437}
438
439func (agent *DeviceAgent) updatePortState(portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_OperStatus) error {
440 agent.lockDevice.Lock()
441 //defer agent.lockDevice.Unlock()
442 // Work only on latest data
443 // TODO: Get list of ports from device directly instead of the entire device
444 if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
445 agent.lockDevice.Unlock()
446 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
447 } else {
448 // clone the device
449 cloned := proto.Clone(storeDevice).(*voltha.Device)
450 // Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
451 if _, ok := voltha.Port_PortType_value[portType.String()]; !ok {
452 agent.lockDevice.Unlock()
453 return status.Errorf(codes.InvalidArgument, "%s", portType)
454 }
455 for _, port := range cloned.Ports {
456 if port.Type == portType && port.PortNo == portNo {
457 port.OperStatus = operStatus
458 // Set the admin status to ENABLED if the operational status is ACTIVE
459 // TODO: Set by northbound system?
460 if operStatus == voltha.OperStatus_ACTIVE {
461 port.AdminState = voltha.AdminState_ENABLED
462 }
463 break
464 }
465 }
466 log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
467 // Store the device
468 if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
469 agent.lockDevice.Unlock()
470 return status.Errorf(codes.Internal, "%s", agent.deviceId)
471 }
472 agent.lockDevice.Unlock()
473 // Perform the state transition
474 if err := agent.deviceMgr.processTransition(storeDevice, cloned); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400475 log.Warnw("process-transition-error", log.Fields{"deviceid": agent.deviceId, "error": err})
476 return err
477 }
478 return nil
479 }
480}
481
482func (agent *DeviceAgent) updatePmConfigs(pmConfigs *voltha.PmConfigs) error {
khenaidoo92e62c52018-10-03 14:02:54 -0400483 agent.lockDevice.Lock()
484 defer agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400485 log.Debug("updatePmConfigs")
486 // Work only on latest data
khenaidoo92e62c52018-10-03 14:02:54 -0400487 if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400488 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
489 } else {
490 // clone the device
khenaidoo92e62c52018-10-03 14:02:54 -0400491 cloned := proto.Clone(storeDevice).(*voltha.Device)
492 cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
khenaidoob9203542018-09-17 22:56:37 -0400493 // Store the device
khenaidoo92e62c52018-10-03 14:02:54 -0400494 afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
khenaidoob9203542018-09-17 22:56:37 -0400495 if afterUpdate == nil {
496 return status.Errorf(codes.Internal, "%s", agent.deviceId)
497 }
498 return nil
499 }
500}
501
502func (agent *DeviceAgent) addPort(port *voltha.Port) error {
khenaidoo92e62c52018-10-03 14:02:54 -0400503 agent.lockDevice.Lock()
504 defer agent.lockDevice.Unlock()
505 log.Debugw("addPort", log.Fields{"deviceId": agent.deviceId})
khenaidoob9203542018-09-17 22:56:37 -0400506 // Work only on latest data
khenaidoo92e62c52018-10-03 14:02:54 -0400507 if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400508 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
509 } else {
510 // clone the device
khenaidoo92e62c52018-10-03 14:02:54 -0400511 cloned := proto.Clone(storeDevice).(*voltha.Device)
khenaidoob9203542018-09-17 22:56:37 -0400512 if cloned.Ports == nil {
513 // First port
khenaidoo92e62c52018-10-03 14:02:54 -0400514 log.Debugw("addPort-first-port-to-add", log.Fields{"deviceId": agent.deviceId})
khenaidoob9203542018-09-17 22:56:37 -0400515 cloned.Ports = make([]*voltha.Port, 0)
516 }
khenaidoo92e62c52018-10-03 14:02:54 -0400517 cp := proto.Clone(port).(*voltha.Port)
518 // Set the admin state of the port to ENABLE if the operational state is ACTIVE
519 // TODO: Set by northbound system?
520 if cp.OperStatus == voltha.OperStatus_ACTIVE {
521 cp.AdminState = voltha.AdminState_ENABLED
522 }
523 cloned.Ports = append(cloned.Ports, cp)
khenaidoob9203542018-09-17 22:56:37 -0400524 // Store the device
khenaidoo92e62c52018-10-03 14:02:54 -0400525 afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
526 if afterUpdate == nil {
527 return status.Errorf(codes.Internal, "%s", agent.deviceId)
528 }
529 return nil
530 }
531}
532
533func (agent *DeviceAgent) addPeerPort(port *voltha.Port_PeerPort) error {
534 agent.lockDevice.Lock()
535 defer agent.lockDevice.Unlock()
536 log.Debug("addPeerPort")
537 // Work only on latest data
538 if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
539 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
540 } else {
541 // clone the device
542 cloned := proto.Clone(storeDevice).(*voltha.Device)
543 // Get the peer port on the device based on the port no
544 for _, peerPort := range cloned.Ports {
545 if peerPort.PortNo == port.PortNo { // found port
546 cp := proto.Clone(port).(*voltha.Port_PeerPort)
547 peerPort.Peers = append(peerPort.Peers, cp)
548 log.Debugw("found-peer", log.Fields{"portNo": port.PortNo, "deviceId": agent.deviceId})
549 break
550 }
551 }
khenaidoo4d4802d2018-10-04 21:59:49 -0400552 //To track an issue when adding peer-port.
553 log.Debugw("before-peer-added", log.Fields{"device": cloned})
khenaidoo92e62c52018-10-03 14:02:54 -0400554 // Store the device
555 afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
khenaidoob9203542018-09-17 22:56:37 -0400556 if afterUpdate == nil {
557 return status.Errorf(codes.Internal, "%s", agent.deviceId)
558 }
khenaidoo4d4802d2018-10-04 21:59:49 -0400559 //To track an issue when adding peer-port.
560 if d, ok := afterUpdate.(*voltha.Device); ok {
561 log.Debugw("after-peer-added", log.Fields{"device": d})
562 } else {
563 log.Debug("after-peer-added-incorrect-type", log.Fields{"type": reflect.ValueOf(afterUpdate).Type()})
564 }
565
khenaidoob9203542018-09-17 22:56:37 -0400566 return nil
567 }
568}
569
khenaidoo19d7b632018-10-30 10:49:50 -0400570//flowTableUpdated is the callback after flows have been updated in the model to push them
571//to the adapters
572func (agent *DeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
573 log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
574
575 agent.lockDevice.Lock()
576 defer agent.lockDevice.Unlock()
577
578 var previousData *voltha.Flows
579 var latestData *voltha.Flows
580
581 var ok bool
582 if previousData, ok = args[0].(*ofp.Flows); !ok {
583 log.Errorw("invalid-args", log.Fields{"args0": args[0]})
584 return nil
585 }
586 if latestData, ok = args[1].(*ofp.Flows); !ok {
587 log.Errorw("invalid-args", log.Fields{"args1": args[1]})
588 return nil
589 }
590
591 // Sanity check - should not happen as this is already handled in logical device agent
592 if reflect.DeepEqual(previousData.Items, latestData.Items) {
593 log.Debugw("flow-update-not-required", log.Fields{"previous": previousData.Items, "new": latestData.Items})
594 return nil
595 }
596
597 var device *voltha.Device
598 var err error
599 if device, err = agent.getDeviceWithoutLock(); err != nil {
600 log.Errorw("no-device", log.Fields{"id": agent.deviceId, "error": err})
601 return nil
602 }
603 groups := device.FlowGroups
604
605 // Send update to adapters
606 // Check whether the device supports incremental flow changes
607 // Assume false for test
608 acceptsAddRemoveFlowUpdates := false
609 if !acceptsAddRemoveFlowUpdates {
610 if err := agent.adapterProxy.UpdateFlowsBulk(device, latestData, groups); err != nil {
611 log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
612 return err
613 }
614 return nil
615 }
616 // Incremental flow changes accepted
617 var toAdd []*ofp.OfpFlowStats
618 var toDelete []*ofp.OfpFlowStats
619
620 for _, flow := range latestData.Items {
621 if fu.FindFlowById(previousData.Items, flow) == -1 { // did not exist before
622 toAdd = append(toAdd, flow)
623 }
624 }
625 for _, flow := range previousData.Items {
626 if fu.FindFlowById(latestData.Items, flow) == -1 { // does not exist now
627 toDelete = append(toDelete, flow)
628 }
629 }
630 flowChanges := &ofp.FlowChanges{
631 ToAdd: &voltha.Flows{Items: toAdd},
632 ToRemove: &voltha.Flows{Items: toDelete},
633 }
634 // Send an empty group changes as it would be dealt with a call to groupTableUpdated
635 groupChanges := &ofp.FlowGroupChanges{}
636
637 // Send changes only
638 if err := agent.adapterProxy.UpdateFlowsIncremental(device, flowChanges, groupChanges); err != nil {
639 log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
640 return err
641 }
642
643 return nil
644}
645
646//groupTableUpdated is the callback after group table has been updated in the model to push them
647//to the adapters
648func (agent *DeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
649 log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
650
651 agent.lockDevice.Lock()
652 defer agent.lockDevice.Unlock()
653
654 var previousData *voltha.FlowGroups
655 var latestData *voltha.FlowGroups
656
657 var ok bool
658 if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
659 log.Errorw("invalid-args", log.Fields{"args0": args[0]})
660 return nil
661 }
662 if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
663 log.Errorw("invalid-args", log.Fields{"args1": args[1]})
664 return nil
665 }
666
667 // Sanity check - should not happen as this is already handled in logical device agent
668 if reflect.DeepEqual(previousData.Items, latestData.Items) {
669 log.Debugw("group-table-update-not-required", log.Fields{"previous": previousData.Items, "new": latestData.Items})
670 return nil
671 }
672
673 var device *voltha.Device
674 var err error
675 if device, err = agent.getDeviceWithoutLock(); err != nil {
676 log.Errorw("no-device", log.Fields{"id": agent.deviceId, "error": err})
677 return nil
678 }
679 flows := device.Flows
680
681 // Send update to adapters
682 // Check whether the device supports incremental flow changes
683 // Assume false for test
684 acceptsAddRemoveFlowUpdates := false
685 if !acceptsAddRemoveFlowUpdates {
686 if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, latestData); err != nil {
687 log.Debugw("update-flows-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
688 return err
689 }
690 return nil
691 }
692
693 // Incremental group changes accepted
694 var toAdd []*ofp.OfpGroupEntry
695 var toDelete []*ofp.OfpGroupEntry
696 var toUpdate []*ofp.OfpGroupEntry
697
698 for _, group := range latestData.Items {
699 if idx := fu.FindGroup(previousData.Items, group.Desc.GroupId); idx == -1 { // did not exist before
700 toAdd = append(toAdd, group)
701 } else { // existed before
702 if previousData.Items[idx].String() != group.String() { // there is a change
703 toUpdate = append(toUpdate, group)
704 }
705 }
706 }
707 for _, group := range previousData.Items {
708 if fu.FindGroup(latestData.Items, group.Desc.GroupId) == -1 { // does not exist now
709 toDelete = append(toDelete, group)
710 }
711 }
712 groupChanges := &ofp.FlowGroupChanges{
713 ToAdd: &voltha.FlowGroups{Items: toAdd},
714 ToRemove: &voltha.FlowGroups{Items: toDelete},
715 ToUpdate: &voltha.FlowGroups{Items: toUpdate},
716 }
717 // Send an empty flow changes as it should have been dealt with a call to flowTableUpdated
718 flowChanges := &ofp.FlowChanges{}
719
720 // Send changes only
721 if err := agent.adapterProxy.UpdateFlowsIncremental(device, flowChanges, groupChanges); err != nil {
722 log.Debugw("update-incremental-group-error", log.Fields{"id": agent.lastData.Id, "error": err})
723 return err
724 }
725 return nil
726}
727
khenaidoob9203542018-09-17 22:56:37 -0400728// TODO: A generic device update by attribute
729func (agent *DeviceAgent) updateDeviceAttribute(name string, value interface{}) {
khenaidoo92e62c52018-10-03 14:02:54 -0400730 agent.lockDevice.Lock()
731 defer agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400732 if value == nil {
733 return
734 }
735 var storeDevice *voltha.Device
736 var err error
khenaidoo92e62c52018-10-03 14:02:54 -0400737 if storeDevice, err = agent.getDeviceWithoutLock(); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400738 return
739 }
740 updated := false
741 s := reflect.ValueOf(storeDevice).Elem()
742 if s.Kind() == reflect.Struct {
743 // exported field
744 f := s.FieldByName(name)
745 if f.IsValid() && f.CanSet() {
746 switch f.Kind() {
747 case reflect.String:
748 f.SetString(value.(string))
749 updated = true
750 case reflect.Uint32:
751 f.SetUint(uint64(value.(uint32)))
752 updated = true
753 case reflect.Bool:
754 f.SetBool(value.(bool))
755 updated = true
756 }
757 }
758 }
khenaidoo92e62c52018-10-03 14:02:54 -0400759 log.Debugw("update-field-status", log.Fields{"deviceId": storeDevice.Id, "name": name, "updated": updated})
khenaidoob9203542018-09-17 22:56:37 -0400760 // Save the data
khenaidoo92e62c52018-10-03 14:02:54 -0400761 cloned := proto.Clone(storeDevice).(*voltha.Device)
762 if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
khenaidoob9203542018-09-17 22:56:37 -0400763 log.Warnw("attribute-update-failed", log.Fields{"attribute": name, "value": value})
764 }
765 return
766}