blob: 7e7f42a08e80fa682fca43153db8b44531bada59 [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
19 "context"
khenaidoo19d7b632018-10-30 10:49:50 -040020 "fmt"
khenaidoob9203542018-09-17 22:56:37 -040021 "github.com/gogo/protobuf/proto"
22 "github.com/opencord/voltha-go/common/log"
23 "github.com/opencord/voltha-go/db/model"
24 "github.com/opencord/voltha-go/protos/core_adapter"
khenaidoo19d7b632018-10-30 10:49:50 -040025 ofp "github.com/opencord/voltha-go/protos/openflow_13"
khenaidoob9203542018-09-17 22:56:37 -040026 "github.com/opencord/voltha-go/protos/voltha"
khenaidoo19d7b632018-10-30 10:49:50 -040027 fu "github.com/opencord/voltha-go/rw_core/utils"
khenaidoob9203542018-09-17 22:56:37 -040028 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
khenaidoo19d7b632018-10-30 10:49:50 -040030 "reflect"
31 "sync"
khenaidoob9203542018-09-17 22:56:37 -040032)
33
34type DeviceAgent struct {
khenaidoo9a468962018-09-19 15:33:13 -040035 deviceId string
khenaidoo43c82122018-11-22 18:38:28 -050036 deviceType string
khenaidoo9a468962018-09-19 15:33:13 -040037 lastData *voltha.Device
38 adapterProxy *AdapterProxy
39 deviceMgr *DeviceManager
40 clusterDataProxy *model.Proxy
khenaidoo92e62c52018-10-03 14:02:54 -040041 deviceProxy *model.Proxy
khenaidoo9a468962018-09-19 15:33:13 -040042 exitChannel chan int
khenaidoo19d7b632018-10-30 10:49:50 -040043 flowProxy *model.Proxy
44 groupProxy *model.Proxy
khenaidoo92e62c52018-10-03 14:02:54 -040045 lockDevice sync.RWMutex
khenaidoob9203542018-09-17 22:56:37 -040046}
47
khenaidoo4d4802d2018-10-04 21:59:49 -040048//newDeviceAgent creates a new device agent along as creating a unique ID for the device and set the device state to
49//preprovisioning
khenaidoo9a468962018-09-19 15:33:13 -040050func newDeviceAgent(ap *AdapterProxy, device *voltha.Device, deviceMgr *DeviceManager, cdProxy *model.Proxy) *DeviceAgent {
khenaidoob9203542018-09-17 22:56:37 -040051 var agent DeviceAgent
khenaidoob9203542018-09-17 22:56:37 -040052 agent.adapterProxy = ap
khenaidoo92e62c52018-10-03 14:02:54 -040053 cloned := (proto.Clone(device)).(*voltha.Device)
54 cloned.Id = CreateDeviceId()
55 cloned.AdminState = voltha.AdminState_PREPROVISIONED
khenaidoo19d7b632018-10-30 10:49:50 -040056 cloned.FlowGroups = &ofp.FlowGroups{Items: nil}
57 cloned.Flows = &ofp.Flows{Items: nil}
58 if !device.GetRoot() && device.ProxyAddress != nil {
59 // Set the default vlan ID to the one specified by the parent adapter. It can be
60 // overwritten by the child adapter during a device update request
61 cloned.Vlan = device.ProxyAddress.ChannelId
62 }
khenaidoo92e62c52018-10-03 14:02:54 -040063 agent.deviceId = cloned.Id
khenaidoofdbad6e2018-11-06 22:26:38 -050064 agent.deviceType = cloned.Type
khenaidoo92e62c52018-10-03 14:02:54 -040065 agent.lastData = cloned
khenaidoob9203542018-09-17 22:56:37 -040066 agent.deviceMgr = deviceMgr
67 agent.exitChannel = make(chan int, 1)
khenaidoo9a468962018-09-19 15:33:13 -040068 agent.clusterDataProxy = cdProxy
khenaidoo92e62c52018-10-03 14:02:54 -040069 agent.lockDevice = sync.RWMutex{}
khenaidoob9203542018-09-17 22:56:37 -040070 return &agent
71}
72
khenaidoo4d4802d2018-10-04 21:59:49 -040073// start save the device to the data model and registers for callbacks on that device
khenaidoob9203542018-09-17 22:56:37 -040074func (agent *DeviceAgent) start(ctx context.Context) {
khenaidoo92e62c52018-10-03 14:02:54 -040075 agent.lockDevice.Lock()
76 defer agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -040077 log.Debugw("starting-device-agent", log.Fields{"device": agent.lastData})
78 // Add the initial device to the local model
khenaidoo9a468962018-09-19 15:33:13 -040079 if added := agent.clusterDataProxy.Add("/devices", agent.lastData, ""); added == nil {
khenaidoob9203542018-09-17 22:56:37 -040080 log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
81 }
khenaidoo43c82122018-11-22 18:38:28 -050082 agent.deviceProxy = agent.clusterDataProxy.Root.CreateProxy("/devices/"+agent.deviceId, false)
83 agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate)
khenaidoo19d7b632018-10-30 10:49:50 -040084
khenaidoo43c82122018-11-22 18:38:28 -050085 agent.flowProxy = agent.clusterDataProxy.Root.CreateProxy(
khenaidoo19d7b632018-10-30 10:49:50 -040086 fmt.Sprintf("/devices/%s/flows", agent.deviceId),
87 false)
khenaidoo43c82122018-11-22 18:38:28 -050088 agent.groupProxy = agent.clusterDataProxy.Root.CreateProxy(
khenaidoo19d7b632018-10-30 10:49:50 -040089 fmt.Sprintf("/devices/%s/flow_groups", agent.deviceId),
90 false)
91
92 agent.flowProxy.RegisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
khenaidoo43c82122018-11-22 18:38:28 -050093 agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
khenaidoo19d7b632018-10-30 10:49:50 -040094
khenaidoob9203542018-09-17 22:56:37 -040095 log.Debug("device-agent-started")
96}
97
khenaidoo4d4802d2018-10-04 21:59:49 -040098// stop stops the device agent. Not much to do for now
99func (agent *DeviceAgent) stop(ctx context.Context) {
khenaidoo92e62c52018-10-03 14:02:54 -0400100 agent.lockDevice.Lock()
101 defer agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400102 log.Debug("stopping-device-agent")
103 agent.exitChannel <- 1
104 log.Debug("device-agent-stopped")
105}
106
khenaidoo19d7b632018-10-30 10:49:50 -0400107// GetDevice retrieves the latest device information from the data model
khenaidoo92e62c52018-10-03 14:02:54 -0400108func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
109 agent.lockDevice.Lock()
110 defer agent.lockDevice.Unlock()
111 if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
112 if d, ok := device.(*voltha.Device); ok {
113 cloned := proto.Clone(d).(*voltha.Device)
114 return cloned, nil
115 }
116 }
117 return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
118}
119
khenaidoo4d4802d2018-10-04 21:59:49 -0400120// getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
khenaidoo92e62c52018-10-03 14:02:54 -0400121// This function is meant so that we do not have duplicate code all over the device agent functions
122func (agent *DeviceAgent) getDeviceWithoutLock() (*voltha.Device, error) {
123 if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
124 if d, ok := device.(*voltha.Device); ok {
125 cloned := proto.Clone(d).(*voltha.Device)
126 return cloned, nil
127 }
128 }
129 return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceId)
130}
131
khenaidoo4d4802d2018-10-04 21:59:49 -0400132// enableDevice activates a preprovisioned or disable device
khenaidoob9203542018-09-17 22:56:37 -0400133func (agent *DeviceAgent) enableDevice(ctx context.Context) error {
khenaidoo92e62c52018-10-03 14:02:54 -0400134 agent.lockDevice.Lock()
135 defer agent.lockDevice.Unlock()
136 log.Debugw("enableDevice", log.Fields{"id": agent.deviceId})
137 if device, err := agent.getDeviceWithoutLock(); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400138 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
139 } else {
khenaidoo92e62c52018-10-03 14:02:54 -0400140 if device.AdminState == voltha.AdminState_ENABLED {
141 log.Debugw("device-already-enabled", log.Fields{"id": agent.deviceId})
142 //TODO: Needs customized error message
143 return nil
144 }
khenaidoo4d4802d2018-10-04 21:59:49 -0400145 //TODO: if parent device is disabled then do not enable device
khenaidoo92e62c52018-10-03 14:02:54 -0400146 // Verify whether we need to adopt the device the first time
147 // TODO: A state machine for these state transitions would be better (we just have to handle
148 // a limited set of states now or it may be an overkill)
149 if device.AdminState == voltha.AdminState_PREPROVISIONED {
150 // First send the request to an Adapter and wait for a response
151 if err := agent.adapterProxy.AdoptDevice(ctx, device); err != nil {
152 log.Debugw("adoptDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
khenaidoob9203542018-09-17 22:56:37 -0400153 return err
154 }
khenaidoo92e62c52018-10-03 14:02:54 -0400155 } else {
156 // First send the request to an Adapter and wait for a response
157 if err := agent.adapterProxy.ReEnableDevice(ctx, device); err != nil {
158 log.Debugw("renableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
159 return err
160 }
161 }
162 // Received an Ack (no error found above). Now update the device in the model to the expected state
163 cloned := proto.Clone(device).(*voltha.Device)
164 cloned.AdminState = voltha.AdminState_ENABLED
165 cloned.OperStatus = voltha.OperStatus_ACTIVATING
166 if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
167 return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
khenaidoob9203542018-09-17 22:56:37 -0400168 }
169 }
170 return nil
171}
172
khenaidoo19d7b632018-10-30 10:49:50 -0400173func (agent *DeviceAgent) updateFlows(flows []*ofp.OfpFlowStats) error {
174 agent.lockDevice.Lock()
175 defer agent.lockDevice.Unlock()
176 log.Debugw("updateFlows", log.Fields{"deviceId": agent.deviceId, "flows": flows})
177 var oldData *voltha.Flows
178 if storedData, err := agent.getDeviceWithoutLock(); err != nil {
179 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
180 } else {
181 oldData = proto.Clone(storedData.Flows).(*voltha.Flows)
182 log.Debugw("updateFlows", log.Fields{"deviceId": agent.deviceId, "flows": flows, "old": oldData})
183 // store the changed data
184 storedData.Flows.Items = flows
185 afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, storedData, false, "")
186 if afterUpdate == nil {
187 return status.Errorf(codes.Internal, "%s", agent.deviceId)
188 }
189
190 // For now, force the callback to occur
191 go agent.flowTableUpdated(oldData, &ofp.Flows{Items: flows})
192 return nil
193 }
194}
195
196func (agent *DeviceAgent) updateGroups(groups []*ofp.OfpGroupEntry) error {
197 agent.lockDevice.Lock()
198 defer agent.lockDevice.Unlock()
199 var oldData *voltha.FlowGroups
200 log.Debugw("updateGroups", log.Fields{"deviceId": agent.deviceId, "groups": groups})
201 if storedData, err := agent.getDeviceWithoutLock(); err != nil {
202 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
203 } else {
204 oldData = proto.Clone(storedData.FlowGroups).(*voltha.FlowGroups)
205 // store the changed data
206 storedData.FlowGroups.Items = groups
207 afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, storedData, false, "")
208 if afterUpdate == nil {
209 return status.Errorf(codes.Internal, "%s", agent.deviceId)
210 }
211
212 // For now, force the callback to occur
213 go agent.groupTableUpdated(oldData, &ofp.FlowGroups{Items: groups})
214 return nil
215 }
216}
217
khenaidoo4d4802d2018-10-04 21:59:49 -0400218//disableDevice disable a device
khenaidoo92e62c52018-10-03 14:02:54 -0400219func (agent *DeviceAgent) disableDevice(ctx context.Context) error {
220 agent.lockDevice.Lock()
221 //defer agent.lockDevice.Unlock()
222 log.Debugw("disableDevice", log.Fields{"id": agent.deviceId})
223 // Get the most up to date the device info
224 if device, err := agent.getDeviceWithoutLock(); err != nil {
225 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
226 } else {
227 if device.AdminState == voltha.AdminState_DISABLED {
228 log.Debugw("device-already-disabled", log.Fields{"id": agent.deviceId})
229 //TODO: Needs customized error message
230 agent.lockDevice.Unlock()
231 return nil
232 }
233 // First send the request to an Adapter and wait for a response
234 if err := agent.adapterProxy.DisableDevice(ctx, device); err != nil {
235 log.Debugw("disableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
236 agent.lockDevice.Unlock()
237 return err
238 }
239 // Received an Ack (no error found above). Now update the device in the model to the expected state
240 cloned := proto.Clone(device).(*voltha.Device)
241 cloned.AdminState = voltha.AdminState_DISABLED
242 // Set the state of all ports on that device to disable
243 for _, port := range cloned.Ports {
244 port.AdminState = voltha.AdminState_DISABLED
245 port.OperStatus = voltha.OperStatus_UNKNOWN
246 }
247 if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
248 agent.lockDevice.Unlock()
249 return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
250 }
251 agent.lockDevice.Unlock()
khenaidoo92e62c52018-10-03 14:02:54 -0400252 }
253 return nil
254}
255
khenaidoo4d4802d2018-10-04 21:59:49 -0400256func (agent *DeviceAgent) rebootDevice(ctx context.Context) error {
257 agent.lockDevice.Lock()
258 defer agent.lockDevice.Unlock()
259 log.Debugw("rebootDevice", log.Fields{"id": agent.deviceId})
260 // Get the most up to date the device info
261 if device, err := agent.getDeviceWithoutLock(); err != nil {
262 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
263 } else {
264 if device.AdminState != voltha.AdminState_DISABLED {
265 log.Debugw("device-not-disabled", log.Fields{"id": agent.deviceId})
266 //TODO: Needs customized error message
267 return status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceId, voltha.AdminState_DISABLED)
268 }
269 // First send the request to an Adapter and wait for a response
270 if err := agent.adapterProxy.RebootDevice(ctx, device); err != nil {
271 log.Debugw("rebootDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
272 return err
273 }
274 }
275 return nil
276}
277
278func (agent *DeviceAgent) deleteDevice(ctx context.Context) error {
279 agent.lockDevice.Lock()
280 log.Debugw("deleteDevice", log.Fields{"id": agent.deviceId})
281 // Get the most up to date the device info
282 if device, err := agent.getDeviceWithoutLock(); err != nil {
283 agent.lockDevice.Unlock()
284 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
285 } else {
khenaidoo43c82122018-11-22 18:38:28 -0500286 if (device.AdminState != voltha.AdminState_DISABLED) &&
287 (device.AdminState != voltha.AdminState_PREPROVISIONED) {
khenaidoo4d4802d2018-10-04 21:59:49 -0400288 log.Debugw("device-not-disabled", log.Fields{"id": agent.deviceId})
289 //TODO: Needs customized error message
290 agent.lockDevice.Unlock()
291 return status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceId, voltha.AdminState_DISABLED)
292 }
293 // Send the request to an Adapter and wait for a response
294 if err := agent.adapterProxy.DeleteDevice(ctx, device); err != nil {
295 log.Debugw("deleteDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
296 agent.lockDevice.Unlock()
297 return err
298 }
299 // Set the device Admin state to DELETED in order to trigger the callback to delete
300 // child devices, if any
301 // Received an Ack (no error found above). Now update the device in the model to the expected state
302 cloned := proto.Clone(device).(*voltha.Device)
303 cloned.AdminState = voltha.AdminState_DELETED
304 if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
305 agent.lockDevice.Unlock()
306 return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
307 }
308 agent.lockDevice.Unlock()
khenaidoo4d4802d2018-10-04 21:59:49 -0400309 }
310 return nil
311}
312
313// getPorts retrieves the ports information of the device based on the port type.
khenaidoo92e62c52018-10-03 14:02:54 -0400314func (agent *DeviceAgent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
315 log.Debugw("getPorts", log.Fields{"id": agent.deviceId, "portType": portType})
khenaidoob9203542018-09-17 22:56:37 -0400316 ports := &voltha.Ports{}
khenaidoo19d7b632018-10-30 10:49:50 -0400317 if device, _ := agent.deviceMgr.GetDevice(agent.deviceId); device != nil {
khenaidoob9203542018-09-17 22:56:37 -0400318 for _, port := range device.Ports {
khenaidoo92e62c52018-10-03 14:02:54 -0400319 if port.Type == portType {
khenaidoob9203542018-09-17 22:56:37 -0400320 ports.Items = append(ports.Items, port)
321 }
322 }
323 }
324 return ports
325}
326
khenaidoo4d4802d2018-10-04 21:59:49 -0400327// getSwitchCapability is a helper method that a logical device agent uses to retrieve the switch capability of a
328// parent device
khenaidoob9203542018-09-17 22:56:37 -0400329func (agent *DeviceAgent) getSwitchCapability(ctx context.Context) (*core_adapter.SwitchCapability, error) {
330 log.Debugw("getSwitchCapability", log.Fields{"deviceId": agent.deviceId})
khenaidoo19d7b632018-10-30 10:49:50 -0400331 if device, err := agent.deviceMgr.GetDevice(agent.deviceId); device == nil {
khenaidoob9203542018-09-17 22:56:37 -0400332 return nil, err
333 } else {
334 var switchCap *core_adapter.SwitchCapability
335 var err error
336 if switchCap, err = agent.adapterProxy.GetOfpDeviceInfo(ctx, device); err != nil {
337 log.Debugw("getSwitchCapability-error", log.Fields{"id": device.Id, "error": err})
338 return nil, err
339 }
340 return switchCap, nil
341 }
342}
343
khenaidoo4d4802d2018-10-04 21:59:49 -0400344// getPortCapability is a helper method that a logical device agent uses to retrieve the port capability of a
345// device
khenaidoob9203542018-09-17 22:56:37 -0400346func (agent *DeviceAgent) getPortCapability(ctx context.Context, portNo uint32) (*core_adapter.PortCapability, error) {
347 log.Debugw("getPortCapability", log.Fields{"deviceId": agent.deviceId})
khenaidoo19d7b632018-10-30 10:49:50 -0400348 if device, err := agent.deviceMgr.GetDevice(agent.deviceId); device == nil {
khenaidoob9203542018-09-17 22:56:37 -0400349 return nil, err
350 } else {
351 var portCap *core_adapter.PortCapability
352 var err error
353 if portCap, err = agent.adapterProxy.GetOfpPortInfo(ctx, device, portNo); err != nil {
354 log.Debugw("getPortCapability-error", log.Fields{"id": device.Id, "error": err})
355 return nil, err
356 }
357 return portCap, nil
358 }
359}
360
khenaidoofdbad6e2018-11-06 22:26:38 -0500361func (agent *DeviceAgent) packetOut(outPort uint32, packet *ofp.OfpPacketOut) error {
362 // Send packet to adapter
363 if err := agent.adapterProxy.packetOut(agent.deviceType, agent.deviceId, outPort, packet); err != nil {
364 log.Debugw("packet-out-error", log.Fields{"id": agent.lastData.Id, "error": err})
365 return err
366 }
367 return nil
368}
369
khenaidoo4d4802d2018-10-04 21:59:49 -0400370// processUpdate is a callback invoked whenever there is a change on the device manages by this device agent
khenaidoo92e62c52018-10-03 14:02:54 -0400371func (agent *DeviceAgent) processUpdate(args ...interface{}) interface{} {
khenaidoo43c82122018-11-22 18:38:28 -0500372 //// Run this callback in its own go routine
373 go func(args ...interface{}) interface{} {
374 var previous *voltha.Device
375 var current *voltha.Device
376 var ok bool
377 if len(args) == 2 {
378 if previous, ok = args[0].(*voltha.Device); !ok {
379 log.Errorw("invalid-callback-type", log.Fields{"data": args[0]})
380 return nil
381 }
382 if current, ok = args[1].(*voltha.Device); !ok {
383 log.Errorw("invalid-callback-type", log.Fields{"data": args[1]})
384 return nil
385 }
386 } else {
387 log.Errorw("too-many-args-in-callback", log.Fields{"len": len(args)})
388 return nil
389 }
390 // Perform the state transition in it's own go routine
391 agent.deviceMgr.processTransition(previous, current)
392 return nil
393 }(args...)
394
khenaidoo92e62c52018-10-03 14:02:54 -0400395 return nil
396}
397
khenaidoob9203542018-09-17 22:56:37 -0400398func (agent *DeviceAgent) updateDevice(device *voltha.Device) error {
khenaidoo92e62c52018-10-03 14:02:54 -0400399 agent.lockDevice.Lock()
khenaidoo43c82122018-11-22 18:38:28 -0500400 defer agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400401 log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
khenaidoo43c82122018-11-22 18:38:28 -0500402 cloned := proto.Clone(device).(*voltha.Device)
403 afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
404 if afterUpdate == nil {
405 return status.Errorf(codes.Internal, "%s", device.Id)
khenaidoob9203542018-09-17 22:56:37 -0400406 }
khenaidoo43c82122018-11-22 18:38:28 -0500407 return nil
408}
409
410func (agent *DeviceAgent) updateDeviceWithoutLock(device *voltha.Device) error {
411 log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
412 cloned := proto.Clone(device).(*voltha.Device)
413 afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
414 if afterUpdate == nil {
415 return status.Errorf(codes.Internal, "%s", device.Id)
416 }
417 return nil
khenaidoob9203542018-09-17 22:56:37 -0400418}
419
khenaidoo92e62c52018-10-03 14:02:54 -0400420func (agent *DeviceAgent) updateDeviceStatus(operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
421 agent.lockDevice.Lock()
422 //defer agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400423 // Work only on latest data
khenaidoo92e62c52018-10-03 14:02:54 -0400424 if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
425 agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400426 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
427 } else {
428 // clone the device
khenaidoo92e62c52018-10-03 14:02:54 -0400429 cloned := proto.Clone(storeDevice).(*voltha.Device)
430 // Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
431 if s, ok := voltha.ConnectStatus_ConnectStatus_value[connStatus.String()]; ok {
432 log.Debugw("updateDeviceStatus-conn", log.Fields{"ok": ok, "val": s})
433 cloned.ConnectStatus = connStatus
khenaidoob9203542018-09-17 22:56:37 -0400434 }
khenaidoo92e62c52018-10-03 14:02:54 -0400435 if s, ok := voltha.OperStatus_OperStatus_value[operStatus.String()]; ok {
436 log.Debugw("updateDeviceStatus-oper", log.Fields{"ok": ok, "val": s})
437 cloned.OperStatus = operStatus
khenaidoob9203542018-09-17 22:56:37 -0400438 }
khenaidoo92e62c52018-10-03 14:02:54 -0400439 log.Debugw("updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
khenaidoob9203542018-09-17 22:56:37 -0400440 // Store the device
khenaidoo92e62c52018-10-03 14:02:54 -0400441 if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
442 agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400443 return status.Errorf(codes.Internal, "%s", agent.deviceId)
444 }
khenaidoo92e62c52018-10-03 14:02:54 -0400445 agent.lockDevice.Unlock()
khenaidoo92e62c52018-10-03 14:02:54 -0400446 return nil
447 }
448}
449
450func (agent *DeviceAgent) updatePortState(portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_OperStatus) error {
451 agent.lockDevice.Lock()
452 //defer agent.lockDevice.Unlock()
453 // Work only on latest data
454 // TODO: Get list of ports from device directly instead of the entire device
455 if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
456 agent.lockDevice.Unlock()
457 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
458 } else {
459 // clone the device
460 cloned := proto.Clone(storeDevice).(*voltha.Device)
461 // Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
462 if _, ok := voltha.Port_PortType_value[portType.String()]; !ok {
463 agent.lockDevice.Unlock()
464 return status.Errorf(codes.InvalidArgument, "%s", portType)
465 }
466 for _, port := range cloned.Ports {
467 if port.Type == portType && port.PortNo == portNo {
468 port.OperStatus = operStatus
469 // Set the admin status to ENABLED if the operational status is ACTIVE
470 // TODO: Set by northbound system?
471 if operStatus == voltha.OperStatus_ACTIVE {
472 port.AdminState = voltha.AdminState_ENABLED
473 }
474 break
475 }
476 }
477 log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
478 // Store the device
479 if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
480 agent.lockDevice.Unlock()
481 return status.Errorf(codes.Internal, "%s", agent.deviceId)
482 }
483 agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400484 return nil
485 }
486}
487
488func (agent *DeviceAgent) updatePmConfigs(pmConfigs *voltha.PmConfigs) error {
khenaidoo92e62c52018-10-03 14:02:54 -0400489 agent.lockDevice.Lock()
490 defer agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400491 log.Debug("updatePmConfigs")
492 // Work only on latest data
khenaidoo92e62c52018-10-03 14:02:54 -0400493 if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400494 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
495 } else {
496 // clone the device
khenaidoo92e62c52018-10-03 14:02:54 -0400497 cloned := proto.Clone(storeDevice).(*voltha.Device)
498 cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
khenaidoob9203542018-09-17 22:56:37 -0400499 // Store the device
khenaidoo92e62c52018-10-03 14:02:54 -0400500 afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
khenaidoob9203542018-09-17 22:56:37 -0400501 if afterUpdate == nil {
502 return status.Errorf(codes.Internal, "%s", agent.deviceId)
503 }
504 return nil
505 }
506}
507
508func (agent *DeviceAgent) addPort(port *voltha.Port) error {
khenaidoo92e62c52018-10-03 14:02:54 -0400509 agent.lockDevice.Lock()
510 defer agent.lockDevice.Unlock()
511 log.Debugw("addPort", log.Fields{"deviceId": agent.deviceId})
khenaidoob9203542018-09-17 22:56:37 -0400512 // Work only on latest data
khenaidoo92e62c52018-10-03 14:02:54 -0400513 if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400514 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
515 } else {
516 // clone the device
khenaidoo92e62c52018-10-03 14:02:54 -0400517 cloned := proto.Clone(storeDevice).(*voltha.Device)
khenaidoob9203542018-09-17 22:56:37 -0400518 if cloned.Ports == nil {
519 // First port
khenaidoo92e62c52018-10-03 14:02:54 -0400520 log.Debugw("addPort-first-port-to-add", log.Fields{"deviceId": agent.deviceId})
khenaidoob9203542018-09-17 22:56:37 -0400521 cloned.Ports = make([]*voltha.Port, 0)
522 }
khenaidoo92e62c52018-10-03 14:02:54 -0400523 cp := proto.Clone(port).(*voltha.Port)
524 // Set the admin state of the port to ENABLE if the operational state is ACTIVE
525 // TODO: Set by northbound system?
526 if cp.OperStatus == voltha.OperStatus_ACTIVE {
527 cp.AdminState = voltha.AdminState_ENABLED
528 }
529 cloned.Ports = append(cloned.Ports, cp)
khenaidoob9203542018-09-17 22:56:37 -0400530 // Store the device
khenaidoo92e62c52018-10-03 14:02:54 -0400531 afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
532 if afterUpdate == nil {
533 return status.Errorf(codes.Internal, "%s", agent.deviceId)
534 }
535 return nil
536 }
537}
538
539func (agent *DeviceAgent) addPeerPort(port *voltha.Port_PeerPort) error {
540 agent.lockDevice.Lock()
541 defer agent.lockDevice.Unlock()
542 log.Debug("addPeerPort")
543 // Work only on latest data
544 if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
545 return status.Errorf(codes.NotFound, "%s", agent.deviceId)
546 } else {
547 // clone the device
548 cloned := proto.Clone(storeDevice).(*voltha.Device)
549 // Get the peer port on the device based on the port no
550 for _, peerPort := range cloned.Ports {
551 if peerPort.PortNo == port.PortNo { // found port
552 cp := proto.Clone(port).(*voltha.Port_PeerPort)
553 peerPort.Peers = append(peerPort.Peers, cp)
554 log.Debugw("found-peer", log.Fields{"portNo": port.PortNo, "deviceId": agent.deviceId})
555 break
556 }
557 }
558 // Store the device
559 afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
khenaidoob9203542018-09-17 22:56:37 -0400560 if afterUpdate == nil {
561 return status.Errorf(codes.Internal, "%s", agent.deviceId)
562 }
563 return nil
564 }
565}
566
khenaidoo19d7b632018-10-30 10:49:50 -0400567//flowTableUpdated is the callback after flows have been updated in the model to push them
568//to the adapters
569func (agent *DeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
570 log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
571
572 agent.lockDevice.Lock()
573 defer agent.lockDevice.Unlock()
574
575 var previousData *voltha.Flows
576 var latestData *voltha.Flows
577
578 var ok bool
579 if previousData, ok = args[0].(*ofp.Flows); !ok {
580 log.Errorw("invalid-args", log.Fields{"args0": args[0]})
581 return nil
582 }
583 if latestData, ok = args[1].(*ofp.Flows); !ok {
584 log.Errorw("invalid-args", log.Fields{"args1": args[1]})
585 return nil
586 }
587
588 // Sanity check - should not happen as this is already handled in logical device agent
589 if reflect.DeepEqual(previousData.Items, latestData.Items) {
590 log.Debugw("flow-update-not-required", log.Fields{"previous": previousData.Items, "new": latestData.Items})
591 return nil
592 }
593
594 var device *voltha.Device
595 var err error
596 if device, err = agent.getDeviceWithoutLock(); err != nil {
597 log.Errorw("no-device", log.Fields{"id": agent.deviceId, "error": err})
598 return nil
599 }
600 groups := device.FlowGroups
601
602 // Send update to adapters
khenaidoo43c82122018-11-22 18:38:28 -0500603 // TODO: Check whether the device supports incremental flow changes
khenaidoo19d7b632018-10-30 10:49:50 -0400604 // Assume false for test
605 acceptsAddRemoveFlowUpdates := false
606 if !acceptsAddRemoveFlowUpdates {
607 if err := agent.adapterProxy.UpdateFlowsBulk(device, latestData, groups); err != nil {
608 log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
609 return err
610 }
611 return nil
612 }
613 // Incremental flow changes accepted
614 var toAdd []*ofp.OfpFlowStats
615 var toDelete []*ofp.OfpFlowStats
616
617 for _, flow := range latestData.Items {
618 if fu.FindFlowById(previousData.Items, flow) == -1 { // did not exist before
619 toAdd = append(toAdd, flow)
620 }
621 }
622 for _, flow := range previousData.Items {
623 if fu.FindFlowById(latestData.Items, flow) == -1 { // does not exist now
624 toDelete = append(toDelete, flow)
625 }
626 }
627 flowChanges := &ofp.FlowChanges{
628 ToAdd: &voltha.Flows{Items: toAdd},
629 ToRemove: &voltha.Flows{Items: toDelete},
630 }
631 // Send an empty group changes as it would be dealt with a call to groupTableUpdated
632 groupChanges := &ofp.FlowGroupChanges{}
633
634 // Send changes only
635 if err := agent.adapterProxy.UpdateFlowsIncremental(device, flowChanges, groupChanges); err != nil {
636 log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
637 return err
638 }
639
640 return nil
641}
642
643//groupTableUpdated is the callback after group table has been updated in the model to push them
644//to the adapters
645func (agent *DeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
646 log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
647
648 agent.lockDevice.Lock()
649 defer agent.lockDevice.Unlock()
650
651 var previousData *voltha.FlowGroups
652 var latestData *voltha.FlowGroups
653
654 var ok bool
655 if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
656 log.Errorw("invalid-args", log.Fields{"args0": args[0]})
657 return nil
658 }
659 if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
660 log.Errorw("invalid-args", log.Fields{"args1": args[1]})
661 return nil
662 }
663
664 // Sanity check - should not happen as this is already handled in logical device agent
665 if reflect.DeepEqual(previousData.Items, latestData.Items) {
666 log.Debugw("group-table-update-not-required", log.Fields{"previous": previousData.Items, "new": latestData.Items})
667 return nil
668 }
669
670 var device *voltha.Device
671 var err error
672 if device, err = agent.getDeviceWithoutLock(); err != nil {
673 log.Errorw("no-device", log.Fields{"id": agent.deviceId, "error": err})
674 return nil
675 }
676 flows := device.Flows
677
678 // Send update to adapters
khenaidoo43c82122018-11-22 18:38:28 -0500679 // TODO: Check whether the device supports incremental flow changes
khenaidoo19d7b632018-10-30 10:49:50 -0400680 // Assume false for test
681 acceptsAddRemoveFlowUpdates := false
682 if !acceptsAddRemoveFlowUpdates {
683 if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, latestData); err != nil {
684 log.Debugw("update-flows-bulk-error", log.Fields{"id": agent.lastData.Id, "error": err})
685 return err
686 }
687 return nil
688 }
689
690 // Incremental group changes accepted
691 var toAdd []*ofp.OfpGroupEntry
692 var toDelete []*ofp.OfpGroupEntry
693 var toUpdate []*ofp.OfpGroupEntry
694
695 for _, group := range latestData.Items {
696 if idx := fu.FindGroup(previousData.Items, group.Desc.GroupId); idx == -1 { // did not exist before
697 toAdd = append(toAdd, group)
698 } else { // existed before
699 if previousData.Items[idx].String() != group.String() { // there is a change
700 toUpdate = append(toUpdate, group)
701 }
702 }
703 }
704 for _, group := range previousData.Items {
705 if fu.FindGroup(latestData.Items, group.Desc.GroupId) == -1 { // does not exist now
706 toDelete = append(toDelete, group)
707 }
708 }
709 groupChanges := &ofp.FlowGroupChanges{
710 ToAdd: &voltha.FlowGroups{Items: toAdd},
711 ToRemove: &voltha.FlowGroups{Items: toDelete},
712 ToUpdate: &voltha.FlowGroups{Items: toUpdate},
713 }
714 // Send an empty flow changes as it should have been dealt with a call to flowTableUpdated
715 flowChanges := &ofp.FlowChanges{}
716
717 // Send changes only
718 if err := agent.adapterProxy.UpdateFlowsIncremental(device, flowChanges, groupChanges); err != nil {
719 log.Debugw("update-incremental-group-error", log.Fields{"id": agent.lastData.Id, "error": err})
720 return err
721 }
722 return nil
723}
724
khenaidoob9203542018-09-17 22:56:37 -0400725// TODO: A generic device update by attribute
726func (agent *DeviceAgent) updateDeviceAttribute(name string, value interface{}) {
khenaidoo92e62c52018-10-03 14:02:54 -0400727 agent.lockDevice.Lock()
728 defer agent.lockDevice.Unlock()
khenaidoob9203542018-09-17 22:56:37 -0400729 if value == nil {
730 return
731 }
732 var storeDevice *voltha.Device
733 var err error
khenaidoo92e62c52018-10-03 14:02:54 -0400734 if storeDevice, err = agent.getDeviceWithoutLock(); err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400735 return
736 }
737 updated := false
738 s := reflect.ValueOf(storeDevice).Elem()
739 if s.Kind() == reflect.Struct {
740 // exported field
741 f := s.FieldByName(name)
742 if f.IsValid() && f.CanSet() {
743 switch f.Kind() {
744 case reflect.String:
745 f.SetString(value.(string))
746 updated = true
747 case reflect.Uint32:
748 f.SetUint(uint64(value.(uint32)))
749 updated = true
750 case reflect.Bool:
751 f.SetBool(value.(bool))
752 updated = true
753 }
754 }
755 }
khenaidoo92e62c52018-10-03 14:02:54 -0400756 log.Debugw("update-field-status", log.Fields{"deviceId": storeDevice.Id, "name": name, "updated": updated})
khenaidoob9203542018-09-17 22:56:37 -0400757 // Save the data
khenaidoo92e62c52018-10-03 14:02:54 -0400758 cloned := proto.Clone(storeDevice).(*voltha.Device)
759 if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
khenaidoob9203542018-09-17 22:56:37 -0400760 log.Warnw("attribute-update-failed", log.Fields{"attribute": name, "value": value})
761 }
762 return
763}