blob: cd879c644268568fe5901821bed7cfb8dcabf84a [file] [log] [blame]
khenaidoob9203542018-09-17 22:56:37 -04001/*
2 * Copyright 2018-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
npujar1d86a522019-11-14 17:11:16 +053016
Kent Hagerman2b216042020-04-03 18:28:56 -040017package device
khenaidoob9203542018-09-17 22:56:37 -040018
19import (
20 "context"
Matteo Scandolo360605d2019-11-05 18:29:17 -080021 "encoding/hex"
Kent Hagerman4f355f52020-03-30 16:01:33 -040022 "errors"
khenaidoo3ab34882019-05-02 21:33:30 -040023 "fmt"
Mahir Gunyelb0343bf2021-05-11 14:14:26 -070024 "reflect"
25 "sync"
26 "time"
27
Maninder0aabf0c2021-03-17 14:55:14 +053028 "github.com/cenkalti/backoff/v3"
Maninder9a1bc0d2020-10-26 11:34:02 +053029 "github.com/gogo/protobuf/proto"
30 "github.com/golang/protobuf/ptypes"
31 "github.com/golang/protobuf/ptypes/empty"
Maninder0aabf0c2021-03-17 14:55:14 +053032 "github.com/opencord/voltha-go/rw_core/config"
Maninder9a1bc0d2020-10-26 11:34:02 +053033 "google.golang.org/grpc/codes"
34 "google.golang.org/grpc/status"
Mahir Gunyeladdb66a2020-04-29 18:08:50 -070035
Himani Chawla2ba1c9c2020-10-07 13:19:03 +053036 "github.com/opencord/voltha-go/db/model"
Kent Hagerman2b216042020-04-03 18:28:56 -040037 "github.com/opencord/voltha-go/rw_core/core/adapter"
Mahir Gunyel03de0d32020-06-03 01:36:59 -070038 "github.com/opencord/voltha-go/rw_core/core/device/flow"
39 "github.com/opencord/voltha-go/rw_core/core/device/group"
Kent Hagerman2a07b862020-06-19 15:23:07 -040040 "github.com/opencord/voltha-go/rw_core/core/device/port"
Kent Hagerman2b216042020-04-03 18:28:56 -040041 "github.com/opencord/voltha-go/rw_core/core/device/remote"
Himani Chawla2ba1c9c2020-10-07 13:19:03 +053042 "github.com/opencord/voltha-go/rw_core/core/device/transientstate"
Scott Bakerb671a862019-10-24 10:53:40 -070043 coreutils "github.com/opencord/voltha-go/rw_core/utils"
yasin sapli5458a1c2021-06-14 22:24:38 +000044 "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
45 "github.com/opencord/voltha-lib-go/v5/pkg/log"
Maninder9a1bc0d2020-10-26 11:34:02 +053046 "github.com/opencord/voltha-protos/v4/go/common"
Salman Siddiqui1cf95042020-11-19 00:42:56 +053047 "github.com/opencord/voltha-protos/v4/go/extension"
Maninderdfadc982020-10-28 14:04:33 +053048 ic "github.com/opencord/voltha-protos/v4/go/inter_container"
49 ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
50 "github.com/opencord/voltha-protos/v4/go/voltha"
khenaidoob9203542018-09-17 22:56:37 -040051)
52
Kent Hagerman2b216042020-04-03 18:28:56 -040053// Agent represents device agent attributes
54type Agent struct {
Maninder0aabf0c2021-03-17 14:55:14 +053055 deviceID string
56 parentID string
57 deviceType string
58 isRootDevice bool
59 adapterProxy *remote.AdapterProxy
60 adapterMgr *adapter.Manager
61 deviceMgr *Manager
62 dbProxy *model.Proxy
63 exitChannel chan int
64 device *voltha.Device
65 requestQueue *coreutils.RequestQueue
66 defaultTimeout time.Duration
67 startOnce sync.Once
68 stopOnce sync.Once
69 stopped bool
70 stopReconciling chan int
71 stopReconcilingMutex sync.RWMutex
72 config *config.RWCoreFlags
Mahir Gunyel03de0d32020-06-03 01:36:59 -070073
khenaidoo7585a962021-06-10 16:15:38 -040074 flowCache *flow.Cache
75 groupCache *group.Cache
Himani Chawla2ba1c9c2020-10-07 13:19:03 +053076 portLoader *port.Loader
77 transientStateLoader *transientstate.Loader
khenaidoob9203542018-09-17 22:56:37 -040078}
79
Kent Hagerman2b216042020-04-03 18:28:56 -040080//newAgent creates a new device agent. The device will be initialized when start() is called.
Kent Hagerman2a07b862020-06-19 15:23:07 -040081func newAgent(ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, dbPath *model.Path, deviceProxy *model.Proxy, timeout time.Duration) *Agent {
82 deviceID := device.Id
83 if deviceID == "" {
84 deviceID = coreutils.CreateDeviceID()
Stephane Barbarie1ab43272018-12-08 21:42:13 -050085 }
Scott Baker80678602019-11-14 16:57:36 -080086
Kent Hagerman2a07b862020-06-19 15:23:07 -040087 return &Agent{
Himani Chawla2ba1c9c2020-10-07 13:19:03 +053088 deviceID: deviceID,
89 adapterProxy: ap,
90 isRootDevice: device.Root,
91 parentID: device.ParentId,
92 deviceType: device.Type,
93 deviceMgr: deviceMgr,
94 adapterMgr: deviceMgr.adapterMgr,
95 exitChannel: make(chan int, 1),
96 dbProxy: deviceProxy,
97 defaultTimeout: timeout,
98 device: proto.Clone(device).(*voltha.Device),
99 requestQueue: coreutils.NewRequestQueue(),
Maninder0aabf0c2021-03-17 14:55:14 +0530100 config: deviceMgr.config,
khenaidoo7585a962021-06-10 16:15:38 -0400101 flowCache: flow.NewCache(),
102 groupCache: group.NewCache(),
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530103 portLoader: port.NewLoader(dbPath.SubPath("ports").Proxy(deviceID)),
104 transientStateLoader: transientstate.NewLoader(dbPath.SubPath("core").Proxy("transientstate"), deviceID),
Kent Hagerman2a07b862020-06-19 15:23:07 -0400105 }
khenaidoob9203542018-09-17 22:56:37 -0400106}
107
khenaidoo442e7c72020-03-10 16:13:48 -0400108// start() saves the device to the data model and registers for callbacks on that device if deviceToCreate!=nil.
109// Otherwise, it will load the data from the dB and setup the necessary callbacks and proxies. Returns the device that
Scott Baker80678602019-11-14 16:57:36 -0800110// was started.
khenaidoo7585a962021-06-10 16:15:38 -0400111func (agent *Agent) start(ctx context.Context, deviceExist bool, deviceToCreate *voltha.Device) (*voltha.Device, error) {
khenaidoo442e7c72020-03-10 16:13:48 -0400112 needToStart := false
113 if agent.startOnce.Do(func() { needToStart = true }); !needToStart {
Kent Hagermancba2f302020-07-28 13:37:36 -0400114 return agent.getDeviceReadOnly(ctx)
khenaidoo442e7c72020-03-10 16:13:48 -0400115 }
116 var startSucceeded bool
117 defer func() {
118 if !startSucceeded {
119 if err := agent.stop(ctx); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000120 logger.Errorw(ctx, "failed-to-cleanup-after-unsuccessful-start", log.Fields{"device-id": agent.deviceID, "error": err})
khenaidoo442e7c72020-03-10 16:13:48 -0400121 }
122 }
123 }()
khenaidoo7585a962021-06-10 16:15:38 -0400124 if deviceExist {
125 device := deviceToCreate
126 if device == nil {
127 // Load from dB
128 device = &voltha.Device{}
129 have, err := agent.dbProxy.Get(ctx, agent.deviceID, device)
130 if err != nil {
131 return nil, err
132 } else if !have {
133 return nil, status.Errorf(codes.NotFound, "device-%s", agent.deviceID)
134 }
Thomas Lee Se5a44012019-11-07 20:32:24 +0530135 }
Kent Hagerman4f355f52020-03-30 16:01:33 -0400136 agent.deviceType = device.Adapter
137 agent.device = proto.Clone(device).(*voltha.Device)
khenaidoo7585a962021-06-10 16:15:38 -0400138 // load the ports from KV to cache
Kent Hagerman2a07b862020-06-19 15:23:07 -0400139 agent.portLoader.Load(ctx)
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530140 agent.transientStateLoader.Load(ctx)
Kent Hagerman4f355f52020-03-30 16:01:33 -0400141
Himani Chawlab4c25912020-11-12 17:16:38 +0530142 logger.Infow(ctx, "device-loaded-from-db", log.Fields{"device-id": agent.deviceID})
khenaidoo297cd252019-02-07 22:10:23 -0500143 } else {
Scott Baker80678602019-11-14 16:57:36 -0800144 // Create a new device
Maninder9a1bc0d2020-10-26 11:34:02 +0530145 var desc string
146 prevState := common.AdminState_UNKNOWN
147 currState := common.AdminState_UNKNOWN
148 operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
149
150 defer agent.logDeviceUpdate(ctx, "createDevice", &prevState, &currState, operStatus, &desc)
151
Kent Hagermanf5a67352020-04-30 15:15:26 -0400152 // Assumption is that AdminState, FlowGroups, and Flows are uninitialized since this
153 // is a new device, so populate them here before passing the device to ldProxy.Set.
Kent Hagerman2b216042020-04-03 18:28:56 -0400154 // agent.deviceId will also have been set during newAgent().
khenaidoo7585a962021-06-10 16:15:38 -0400155 device := (proto.Clone(deviceToCreate)).(*voltha.Device)
npujar1d86a522019-11-14 17:11:16 +0530156 device.Id = agent.deviceID
Scott Baker80678602019-11-14 16:57:36 -0800157 device.AdminState = voltha.AdminState_PREPROVISIONED
Maninder9a1bc0d2020-10-26 11:34:02 +0530158 currState = device.AdminState
Scott Baker80678602019-11-14 16:57:36 -0800159 if !deviceToCreate.GetRoot() && deviceToCreate.ProxyAddress != nil {
160 // Set the default vlan ID to the one specified by the parent adapter. It can be
161 // overwritten by the child adapter during a device update request
162 device.Vlan = deviceToCreate.ProxyAddress.ChannelId
163 }
164
khenaidoo297cd252019-02-07 22:10:23 -0500165 // Add the initial device to the local model
Kent Hagermanf5a67352020-04-30 15:15:26 -0400166 if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
Maninder9a1bc0d2020-10-26 11:34:02 +0530167 desc = fmt.Sprintf("failed-adding-device-%s: %s", agent.deviceID, err.Error())
Kent Hagerman4f355f52020-03-30 16:01:33 -0400168 return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s: %s", agent.deviceID, err)
khenaidoo297cd252019-02-07 22:10:23 -0500169 }
Mahir Gunyelb0343bf2021-05-11 14:14:26 -0700170 _ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, device.OperStatus, device.ConnectStatus, prevState, device, time.Now().Unix())
Maninder9a1bc0d2020-10-26 11:34:02 +0530171 operStatus.Code = common.OperationResp_OPERATION_SUCCESS
khenaidoo442e7c72020-03-10 16:13:48 -0400172 agent.device = device
khenaidoob9203542018-09-17 22:56:37 -0400173 }
khenaidoo442e7c72020-03-10 16:13:48 -0400174 startSucceeded = true
Rohan Agrawalcf12f202020-08-03 04:42:01 +0000175 log.EnrichSpan(ctx, log.Fields{"device-id": agent.deviceID})
Rohan Agrawal31f21802020-06-12 05:38:46 +0000176 logger.Debugw(ctx, "device-agent-started", log.Fields{"device-id": agent.deviceID})
khenaidoo442e7c72020-03-10 16:13:48 -0400177
Kent Hagermancba2f302020-07-28 13:37:36 -0400178 return agent.getDeviceReadOnly(ctx)
khenaidoob9203542018-09-17 22:56:37 -0400179}
180
khenaidoo4d4802d2018-10-04 21:59:49 -0400181// stop stops the device agent. Not much to do for now
Kent Hagerman2b216042020-04-03 18:28:56 -0400182func (agent *Agent) stop(ctx context.Context) error {
khenaidoo442e7c72020-03-10 16:13:48 -0400183 needToStop := false
184 if agent.stopOnce.Do(func() { needToStop = true }); !needToStop {
185 return nil
186 }
187 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
188 return err
189 }
190 defer agent.requestQueue.RequestComplete()
khenaidoo49085352020-01-13 19:15:43 -0500191
Himani Chawlab4c25912020-11-12 17:16:38 +0530192 logger.Infow(ctx, "stopping-device-agent", log.Fields{"device-id": agent.deviceID, "parent-id": agent.parentID})
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530193 // Remove the device transient loader
194 if err := agent.deleteTransientState(ctx); err != nil {
195 return err
196 }
khenaidoo0a822f92019-05-08 15:15:57 -0400197 // Remove the device from the KV store
Kent Hagermanf5a67352020-04-30 15:15:26 -0400198 if err := agent.dbProxy.Remove(ctx, agent.deviceID); err != nil {
khenaidoo442e7c72020-03-10 16:13:48 -0400199 return err
Thomas Lee Se5a44012019-11-07 20:32:24 +0530200 }
khenaidoo442e7c72020-03-10 16:13:48 -0400201
khenaidoo442e7c72020-03-10 16:13:48 -0400202 close(agent.exitChannel)
203
204 agent.stopped = true
205
Rohan Agrawal31f21802020-06-12 05:38:46 +0000206 logger.Infow(ctx, "device-agent-stopped", log.Fields{"device-id": agent.deviceID, "parent-id": agent.parentID})
khenaidoo442e7c72020-03-10 16:13:48 -0400207
208 return nil
khenaidoob9203542018-09-17 22:56:37 -0400209}
210
Scott Baker80678602019-11-14 16:57:36 -0800211// Load the most recent state from the KVStore for the device.
Kent Hagerman2b216042020-04-03 18:28:56 -0400212func (agent *Agent) reconcileWithKVStore(ctx context.Context) {
khenaidoo442e7c72020-03-10 16:13:48 -0400213 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000214 logger.Warnw(ctx, "request-aborted", log.Fields{"device-id": agent.deviceID, "error": err})
khenaidoo442e7c72020-03-10 16:13:48 -0400215 return
216 }
217 defer agent.requestQueue.RequestComplete()
Rohan Agrawal31f21802020-06-12 05:38:46 +0000218 logger.Debug(ctx, "reconciling-device-agent-devicetype")
Scott Baker80678602019-11-14 16:57:36 -0800219 // TODO: context timeout
Kent Hagerman4f355f52020-03-30 16:01:33 -0400220 device := &voltha.Device{}
Kent Hagermanf5a67352020-04-30 15:15:26 -0400221 if have, err := agent.dbProxy.Get(ctx, agent.deviceID, device); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000222 logger.Errorw(ctx, "kv-get-failed", log.Fields{"device-id": agent.deviceID, "error": err})
Thomas Lee Se5a44012019-11-07 20:32:24 +0530223 return
Kent Hagerman4f355f52020-03-30 16:01:33 -0400224 } else if !have {
225 return // not found in kv
Thomas Lee Se5a44012019-11-07 20:32:24 +0530226 }
Kent Hagerman4f355f52020-03-30 16:01:33 -0400227
228 agent.deviceType = device.Adapter
229 agent.device = device
Kent Hagerman2a07b862020-06-19 15:23:07 -0400230 agent.portLoader.Load(ctx)
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530231 agent.transientStateLoader.Load(ctx)
232
Rohan Agrawal31f21802020-06-12 05:38:46 +0000233 logger.Debugw(ctx, "reconciled-device-agent-devicetype", log.Fields{"device-id": agent.deviceID, "type": agent.deviceType})
Scott Baker80678602019-11-14 16:57:36 -0800234}
235
khenaidoo442e7c72020-03-10 16:13:48 -0400236// onSuccess is a common callback for scenarios where we receive a nil response following a request to an adapter
237// and the only action required is to publish a successful result on kafka
Rohan Agrawal31f21802020-06-12 05:38:46 +0000238func (agent *Agent) onSuccess(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530239 logger.Debugw(ctx, "response-successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
khenaidoo442e7c72020-03-10 16:13:48 -0400240 // TODO: Post success message onto kafka
241}
242
243// onFailure is a common callback for scenarios where we receive an error response following a request to an adapter
244// and the only action required is to publish the failed result on kafka
Rohan Agrawal31f21802020-06-12 05:38:46 +0000245func (agent *Agent) onFailure(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
khenaidoo442e7c72020-03-10 16:13:48 -0400246 if res, ok := response.(error); ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000247 logger.Errorw(ctx, "rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
khenaidoo442e7c72020-03-10 16:13:48 -0400248 } else {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000249 logger.Errorw(ctx, "rpc-failed-invalid-error", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "args": reqArgs})
khenaidoo442e7c72020-03-10 16:13:48 -0400250 }
251 // TODO: Post failure message onto kafka
252}
253
Himani Chawlab4c25912020-11-12 17:16:38 +0530254func (agent *Agent) waitForAdapterForceDeleteResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
khenaidoo442e7c72020-03-10 16:13:48 -0400255 onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
256 defer cancel()
257 select {
258 case rpcResponse, ok := <-ch:
259 if !ok {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000260 onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
khenaidoo442e7c72020-03-10 16:13:48 -0400261 } else if rpcResponse.Err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000262 onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
khenaidoo442e7c72020-03-10 16:13:48 -0400263 } else {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000264 onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
khenaidoo442e7c72020-03-10 16:13:48 -0400265 }
266 case <-ctx.Done():
Rohan Agrawal31f21802020-06-12 05:38:46 +0000267 onFailure(ctx, rpc, ctx.Err(), reqArgs)
khenaidoo442e7c72020-03-10 16:13:48 -0400268 }
269}
270
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530271// onDeleteSuccess is a common callback for scenarios where we receive a nil response following a delete request
272// to an adapter.
273func (agent *Agent) onDeleteSuccess(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
274 logger.Debugw(ctx, "response-successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
275 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
276 logger.Errorw(ctx, "delete-device-failure", log.Fields{"device-id": agent.deviceID, "error": err, "args": reqArgs})
277 }
278 previousDeviceTransientState := agent.getTransientState()
279 newDevice := agent.cloneDeviceWithoutLock()
280 if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, newDevice,
281 voltha.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE, previousDeviceTransientState); err != nil {
282 logger.Errorw(ctx, "delete-device-failure", log.Fields{"device-id": agent.deviceID, "error": err, "args": reqArgs})
283 }
284}
285
286// onDeleteFailure is a common callback for scenarios where we receive an error response following a delete request
287// to an adapter and the only action required is to return the error response.
288func (agent *Agent) onDeleteFailure(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
289 if res, ok := response.(error); ok {
290 logger.Errorw(ctx, "rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
291 } else {
292 logger.Errorw(ctx, "rpc-failed-invalid-error", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "args": reqArgs})
293 }
294 //Only updating of transient state is required, no transition.
295 if err := agent.updateTransientState(ctx, voltha.DeviceTransientState_DELETE_FAILED); err != nil {
296 logger.Errorw(ctx, "failed-to-update-transient-state-as-delete-failed", log.Fields{"device-id": agent.deviceID})
297 }
298
299}
300
Himani Chawlab4c25912020-11-12 17:16:38 +0530301func (agent *Agent) waitForAdapterResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530302 onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
303 defer cancel()
Himani Chawlab4c25912020-11-12 17:16:38 +0530304 var rpce *voltha.RPCEvent
305 defer func() {
306 if rpce != nil {
Himani Chawla606a4f02021-03-23 19:45:58 +0530307 agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
308 voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
Himani Chawlab4c25912020-11-12 17:16:38 +0530309 }
310 }()
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530311 select {
312 case rpcResponse, ok := <-ch:
313 if !ok {
Himani Chawlab4c25912020-11-12 17:16:38 +0530314 rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530315 onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
Himani Chawlab4c25912020-11-12 17:16:38 +0530316 //add failure
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530317 } else if rpcResponse.Err != nil {
Himani Chawlab4c25912020-11-12 17:16:38 +0530318 rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, rpcResponse.Err.Error(), nil)
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530319 onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
Himani Chawlab4c25912020-11-12 17:16:38 +0530320 //add failure
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530321 } else {
322 onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
323 }
324 case <-ctx.Done():
Himani Chawlab4c25912020-11-12 17:16:38 +0530325 rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, ctx.Err().Error(), nil)
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530326 onFailure(ctx, rpc, ctx.Err(), reqArgs)
327 }
328}
329
Maninder9a1bc0d2020-10-26 11:34:02 +0530330func (agent *Agent) waitForAdapterResponseAndLogDeviceUpdate(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
331 onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, prevState *common.AdminState_Types, reqArgs ...interface{}) {
332 defer cancel()
333 var desc string
334 operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
khenaidoodd3324d2021-04-27 16:22:55 -0400335 defer func() {
336 currAdminState := prevState
337 if d, _ := agent.getDeviceReadOnly(ctx); d != nil {
338 currAdminState = &d.AdminState
339 }
340 agent.logDeviceUpdate(ctx, rpc, prevState, currAdminState, operStatus, &desc)
341 }()
Maninder9a1bc0d2020-10-26 11:34:02 +0530342 var rpce *voltha.RPCEvent
343 defer func() {
344 if rpce != nil {
Himani Chawla606a4f02021-03-23 19:45:58 +0530345 agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
346 voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
Maninder9a1bc0d2020-10-26 11:34:02 +0530347 }
348 }()
349
350 select {
351 case rpcResponse, ok := <-ch:
352 if !ok {
353 rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
354 onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
355 //add failure
356 } else if rpcResponse.Err != nil {
357 desc = rpcResponse.Err.Error()
358 rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
359 onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
360 //add failure
361 } else {
362 operStatus.Code = common.OperationResp_OPERATION_SUCCESS
363 onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
364 }
365 case <-ctx.Done():
366 desc = ctx.Err().Error()
367 rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
368 onFailure(ctx, rpc, ctx.Err(), reqArgs)
369 }
370}
371
Kent Hagermancba2f302020-07-28 13:37:36 -0400372// getDeviceReadOnly returns a device which MUST NOT be modified, but is safe to keep forever.
373func (agent *Agent) getDeviceReadOnly(ctx context.Context) (*voltha.Device, error) {
khenaidoo442e7c72020-03-10 16:13:48 -0400374 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
375 return nil, err
376 }
377 defer agent.requestQueue.RequestComplete()
Kent Hagermancba2f302020-07-28 13:37:36 -0400378 return agent.device, nil
khenaidoo92e62c52018-10-03 14:02:54 -0400379}
380
Kent Hagermancba2f302020-07-28 13:37:36 -0400381// getDeviceReadOnlyWithoutLock returns a device which MUST NOT be modified, but is safe to keep forever. This is very efficient.
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400382// The device lock MUST be held by the caller.
Kent Hagermancba2f302020-07-28 13:37:36 -0400383func (agent *Agent) getDeviceReadOnlyWithoutLock() *voltha.Device {
khenaidoo0db4c812020-05-27 15:27:30 -0400384 return agent.device
khenaidoo92e62c52018-10-03 14:02:54 -0400385}
386
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400387// cloneDeviceWithoutLock returns a copy of the device which is safe to modify.
388// The device lock MUST be held by the caller.
389func (agent *Agent) cloneDeviceWithoutLock() *voltha.Device {
390 return proto.Clone(agent.device).(*voltha.Device)
391}
392
khenaidoo3ab34882019-05-02 21:33:30 -0400393// enableDevice activates a preprovisioned or a disable device
Kent Hagerman2b216042020-04-03 18:28:56 -0400394func (agent *Agent) enableDevice(ctx context.Context) error {
Maninder9a1bc0d2020-10-26 11:34:02 +0530395 //To preserve and use oldDevice state as prev state in new device
Maninder9a1bc0d2020-10-26 11:34:02 +0530396 var desc string
397 operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
398
399 defer agent.logDeviceUpdate(ctx, "enableDevice", nil, nil, operStatus, &desc)
400
khenaidoo442e7c72020-03-10 16:13:48 -0400401 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
402 return err
403 }
Himani Chawlab4c25912020-11-12 17:16:38 +0530404 logger.Debugw(ctx, "enable-device", log.Fields{"device-id": agent.deviceID})
khenaidoo21d51152019-02-01 13:48:37 -0500405
khenaidoodd3324d2021-04-27 16:22:55 -0400406 prevDeviceState := agent.device.AdminState
407
Kent Hagermancba2f302020-07-28 13:37:36 -0400408 oldDevice := agent.getDeviceReadOnlyWithoutLock()
Maninder9a1bc0d2020-10-26 11:34:02 +0530409
Maninder2195ccc2021-06-23 20:23:01 +0530410 if !agent.proceedWithRequest(oldDevice) {
411 agent.requestQueue.RequestComplete()
412
413 desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconcile is in progress/failed.", agent.deviceID)
414 return status.Error(codes.FailedPrecondition, desc)
415 }
416
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400417 if oldDevice.AdminState == voltha.AdminState_ENABLED {
418 logger.Warnw(ctx, "device-already-enabled", log.Fields{"device-id": agent.deviceID})
419 agent.requestQueue.RequestComplete()
Maninder9a1bc0d2020-10-26 11:34:02 +0530420 desc = fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id)
421 return status.Error(codes.FailedPrecondition, desc)
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400422 }
Maninder9a1bc0d2020-10-26 11:34:02 +0530423
npujar1d86a522019-11-14 17:11:16 +0530424 // First figure out which adapter will handle this device type. We do it at this stage as allow devices to be
khenaidoo442e7c72020-03-10 16:13:48 -0400425 // pre-provisioned with the required adapter not registered. At this stage, since we need to communicate
npujar1d86a522019-11-14 17:11:16 +0530426 // with the adapter then we need to know the adapter that will handle this request
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400427 adapterName, err := agent.adapterMgr.GetAdapterType(oldDevice.Type)
npujar1d86a522019-11-14 17:11:16 +0530428 if err != nil {
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400429 agent.requestQueue.RequestComplete()
Maninder9a1bc0d2020-10-26 11:34:02 +0530430 desc = err.Error()
Matteo Scandolod525ae32020-04-02 17:27:29 -0700431 return err
npujar1d86a522019-11-14 17:11:16 +0530432 }
433
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400434 newDevice := agent.cloneDeviceWithoutLock()
435 newDevice.Adapter = adapterName
npujar1d86a522019-11-14 17:11:16 +0530436
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400437 // Update the Admin State and set the operational state to activating before sending the request to the Adapters
438 newDevice.AdminState = voltha.AdminState_ENABLED
439 newDevice.OperStatus = voltha.OperStatus_ACTIVATING
Maninder9a1bc0d2020-10-26 11:34:02 +0530440
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400441 if err := agent.updateDeviceAndReleaseLock(ctx, newDevice); err != nil {
Maninder9a1bc0d2020-10-26 11:34:02 +0530442 desc = err.Error()
npujar1d86a522019-11-14 17:11:16 +0530443 return err
444 }
445
khenaidoo442e7c72020-03-10 16:13:48 -0400446 // Adopt the device if it was in pre-provision state. In all other cases, try to re-enable it.
khenaidoo442e7c72020-03-10 16:13:48 -0400447 var ch chan *kafka.RpcResponse
Rohan Agrawalcf12f202020-08-03 04:42:01 +0000448 subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
Himani Chawlab4c25912020-11-12 17:16:38 +0530449 subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
Maninder9a1bc0d2020-10-26 11:34:02 +0530450 subCtx = coreutils.WithFromTopicMetadataFromContext(subCtx, ctx)
Himani Chawlab4c25912020-11-12 17:16:38 +0530451
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400452 if oldDevice.AdminState == voltha.AdminState_PREPROVISIONED {
453 ch, err = agent.adapterProxy.AdoptDevice(subCtx, newDevice)
khenaidoob9203542018-09-17 22:56:37 -0400454 } else {
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400455 ch, err = agent.adapterProxy.ReEnableDevice(subCtx, newDevice)
khenaidoob9203542018-09-17 22:56:37 -0400456 }
khenaidoo442e7c72020-03-10 16:13:48 -0400457 if err != nil {
458 cancel()
Maninder9a1bc0d2020-10-26 11:34:02 +0530459 desc = err.Error()
khenaidoo442e7c72020-03-10 16:13:48 -0400460 return err
461 }
Maninder9a1bc0d2020-10-26 11:34:02 +0530462
463 operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
464
khenaidoo442e7c72020-03-10 16:13:48 -0400465 // Wait for response
Maninder9a1bc0d2020-10-26 11:34:02 +0530466 go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "enableDevice", ch, agent.onSuccess, agent.onFailure, &prevDeviceState)
khenaidoob9203542018-09-17 22:56:37 -0400467 return nil
468}
469
Maninder9a1bc0d2020-10-26 11:34:02 +0530470func (agent *Agent) waitForAdapterFlowResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse, response coreutils.Response) {
khenaidoo442e7c72020-03-10 16:13:48 -0400471 defer cancel()
Maninder9a1bc0d2020-10-26 11:34:02 +0530472 var desc string
473 operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
474 defer agent.logDeviceUpdate(ctx, rpc, nil, nil, operStatus, &desc)
475
Himani Chawlab4c25912020-11-12 17:16:38 +0530476 var rpce *voltha.RPCEvent
477 defer func() {
478 if rpce != nil {
Himani Chawla606a4f02021-03-23 19:45:58 +0530479 agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce,
480 voltha.EventCategory_COMMUNICATION, nil, time.Now().Unix())
Himani Chawlab4c25912020-11-12 17:16:38 +0530481 }
482 }()
khenaidoo442e7c72020-03-10 16:13:48 -0400483 select {
484 case rpcResponse, ok := <-ch:
485 if !ok {
Himani Chawlab4c25912020-11-12 17:16:38 +0530486 //add failure
Maninder9a1bc0d2020-10-26 11:34:02 +0530487 desc = "Response Channel Closed"
Himani Chawlab4c25912020-11-12 17:16:38 +0530488 rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, "Response Channel Closed", nil)
khenaidoo442e7c72020-03-10 16:13:48 -0400489 response.Error(status.Errorf(codes.Aborted, "channel-closed"))
490 } else if rpcResponse.Err != nil {
Himani Chawlab4c25912020-11-12 17:16:38 +0530491 //add failure
Maninder9a1bc0d2020-10-26 11:34:02 +0530492 desc = rpcResponse.Err.Error()
493 rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
khenaidoo442e7c72020-03-10 16:13:48 -0400494 response.Error(rpcResponse.Err)
495 } else {
Maninder9a1bc0d2020-10-26 11:34:02 +0530496 operStatus.Code = common.OperationResp_OPERATION_SUCCESS
khenaidoo442e7c72020-03-10 16:13:48 -0400497 response.Done()
498 }
499 case <-ctx.Done():
Maninder9a1bc0d2020-10-26 11:34:02 +0530500 desc = ctx.Err().Error()
501 rpce = agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, desc, nil)
khenaidoo442e7c72020-03-10 16:13:48 -0400502 response.Error(ctx.Err())
khenaidoo2c6a0992019-04-29 13:46:56 -0400503 }
khenaidoo2c6a0992019-04-29 13:46:56 -0400504}
505
A R Karthick5c28f552019-12-11 22:47:44 -0800506//addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
507//adapters
Kent Hagerman2b216042020-04-03 18:28:56 -0400508func (agent *Agent) addFlowsAndGroups(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
Mahir Gunyel03de0d32020-06-03 01:36:59 -0700509 var flwResponse, grpResponse coreutils.Response
510 var err error
511 //if new flow list is empty then the called function returns quickly
512 if flwResponse, err = agent.addFlowsToAdapter(ctx, newFlows, flowMetadata); err != nil {
A R Karthick5c28f552019-12-11 22:47:44 -0800513 return err
514 }
Mahir Gunyel03de0d32020-06-03 01:36:59 -0700515 //if new group list is empty then the called function returns quickly
516 if grpResponse, err = agent.addGroupsToAdapter(ctx, newGroups, flowMetadata); err != nil {
517 return err
518 }
519 if errs := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); errs != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000520 logger.Warnw(ctx, "no-adapter-response", log.Fields{"device-id": agent.deviceID, "result": errs})
khenaidoo442e7c72020-03-10 16:13:48 -0400521 return status.Errorf(codes.Aborted, "flow-failure-device-%s", agent.deviceID)
khenaidoo0458db62019-06-20 08:50:36 -0400522 }
khenaidoo0458db62019-06-20 08:50:36 -0400523 return nil
524}
525
A R Karthick5c28f552019-12-11 22:47:44 -0800526//deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the
527//adapters
Kent Hagerman2b216042020-04-03 18:28:56 -0400528func (agent *Agent) deleteFlowsAndGroups(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
Mahir Gunyel03de0d32020-06-03 01:36:59 -0700529 var flwResponse, grpResponse coreutils.Response
530 var err error
531 if flwResponse, err = agent.deleteFlowsFromAdapter(ctx, flowsToDel, flowMetadata); err != nil {
A R Karthick5c28f552019-12-11 22:47:44 -0800532 return err
533 }
Mahir Gunyel03de0d32020-06-03 01:36:59 -0700534 if grpResponse, err = agent.deleteGroupsFromAdapter(ctx, groupsToDel, flowMetadata); err != nil {
535 return err
536 }
537
538 if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); res != nil {
khenaidoo0458db62019-06-20 08:50:36 -0400539 return status.Errorf(codes.Aborted, "errors-%s", res)
540 }
541 return nil
khenaidoo0458db62019-06-20 08:50:36 -0400542}
543
A R Karthick5c28f552019-12-11 22:47:44 -0800544//updateFlowsAndGroups replaces the existing flows and groups with "updatedFlows" and "updatedGroups" respectively. It
545//also sends the updates to the adapters
Kent Hagerman2b216042020-04-03 18:28:56 -0400546func (agent *Agent) updateFlowsAndGroups(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
Mahir Gunyel03de0d32020-06-03 01:36:59 -0700547 var flwResponse, grpResponse coreutils.Response
548 var err error
549 if flwResponse, err = agent.updateFlowsToAdapter(ctx, updatedFlows, flowMetadata); err != nil {
A R Karthick5c28f552019-12-11 22:47:44 -0800550 return err
551 }
Mahir Gunyel03de0d32020-06-03 01:36:59 -0700552 if grpResponse, err = agent.updateGroupsToAdapter(ctx, updatedGroups, flowMetadata); err != nil {
553 return err
554 }
555
556 if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); res != nil {
khenaidoo0458db62019-06-20 08:50:36 -0400557 return status.Errorf(codes.Aborted, "errors-%s", res)
558 }
559 return nil
khenaidoo19d7b632018-10-30 10:49:50 -0400560}
561
khenaidoo4d4802d2018-10-04 21:59:49 -0400562//disableDevice disable a device
Kent Hagerman2b216042020-04-03 18:28:56 -0400563func (agent *Agent) disableDevice(ctx context.Context) error {
Maninder9a1bc0d2020-10-26 11:34:02 +0530564 var desc string
565 operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
566
567 prevDeviceState := agent.device.AdminState
568
569 defer agent.logDeviceUpdate(ctx, "disableDevice", nil, nil, operStatus, &desc)
570
khenaidoo442e7c72020-03-10 16:13:48 -0400571 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
Maninder9a1bc0d2020-10-26 11:34:02 +0530572 desc = err.Error()
khenaidoo442e7c72020-03-10 16:13:48 -0400573 return err
574 }
Himani Chawlab4c25912020-11-12 17:16:38 +0530575 logger.Debugw(ctx, "disable-device", log.Fields{"device-id": agent.deviceID})
khenaidoo6e55d9e2019-12-12 18:26:26 -0500576
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400577 cloned := agent.cloneDeviceWithoutLock()
khenaidoo6e55d9e2019-12-12 18:26:26 -0500578
Maninder2195ccc2021-06-23 20:23:01 +0530579 if !agent.proceedWithRequest(cloned) {
580 agent.requestQueue.RequestComplete()
581 desc = fmt.Sprintf("deviceId:%s,Cannot complete operation as device deletion is in progress or reconciling is in progress/failed.", agent.deviceID)
582 return status.Errorf(codes.FailedPrecondition, desc)
583 }
584
khenaidoo6e55d9e2019-12-12 18:26:26 -0500585 if cloned.AdminState == voltha.AdminState_DISABLED {
Maninder9a1bc0d2020-10-26 11:34:02 +0530586 desc = "device-already-disabled"
divyadesaicb8b59d2020-08-18 09:55:47 +0000587 logger.Debugw(ctx, "device-already-disabled", log.Fields{"device-id": agent.deviceID})
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400588 agent.requestQueue.RequestComplete()
npujar1d86a522019-11-14 17:11:16 +0530589 return nil
590 }
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530591 if cloned.AdminState == voltha.AdminState_PREPROVISIONED {
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400592 agent.requestQueue.RequestComplete()
Maninder9a1bc0d2020-10-26 11:34:02 +0530593 desc = fmt.Sprintf("deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500594 return status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
npujar1d86a522019-11-14 17:11:16 +0530595 }
Maninder0aabf0c2021-03-17 14:55:14 +0530596
npujar1d86a522019-11-14 17:11:16 +0530597 // Update the Admin State and operational state before sending the request out
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400598 cloned.AdminState = voltha.AdminState_DISABLED
599 cloned.OperStatus = voltha.OperStatus_UNKNOWN
Maninder9a1bc0d2020-10-26 11:34:02 +0530600
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400601 if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
npujar1d86a522019-11-14 17:11:16 +0530602 return err
603 }
khenaidoo442e7c72020-03-10 16:13:48 -0400604
Rohan Agrawalcf12f202020-08-03 04:42:01 +0000605 subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
Himani Chawlab4c25912020-11-12 17:16:38 +0530606 subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
607
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400608 ch, err := agent.adapterProxy.DisableDevice(subCtx, cloned)
khenaidoo442e7c72020-03-10 16:13:48 -0400609 if err != nil {
610 cancel()
Maninder9a1bc0d2020-10-26 11:34:02 +0530611 desc = err.Error()
npujar1d86a522019-11-14 17:11:16 +0530612 return err
khenaidoo0a822f92019-05-08 15:15:57 -0400613 }
Maninder9a1bc0d2020-10-26 11:34:02 +0530614 operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
615
616 // Wait for response
617 go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "disableDevice", ch, agent.onSuccess, agent.onFailure, &prevDeviceState)
khenaidoo0a822f92019-05-08 15:15:57 -0400618
khenaidoo92e62c52018-10-03 14:02:54 -0400619 return nil
620}
621
Kent Hagerman2b216042020-04-03 18:28:56 -0400622func (agent *Agent) rebootDevice(ctx context.Context) error {
Maninder9a1bc0d2020-10-26 11:34:02 +0530623 var desc string
624 operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
625
626 prevDeviceState := agent.device.AdminState
627
628 defer agent.logDeviceUpdate(ctx, "rebootDevice", nil, nil, operStatus, &desc)
629
khenaidoo442e7c72020-03-10 16:13:48 -0400630 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
Maninder9a1bc0d2020-10-26 11:34:02 +0530631 desc = err.Error()
npujar1d86a522019-11-14 17:11:16 +0530632 return err
khenaidoo4d4802d2018-10-04 21:59:49 -0400633 }
khenaidoo442e7c72020-03-10 16:13:48 -0400634 defer agent.requestQueue.RequestComplete()
Himani Chawlab4c25912020-11-12 17:16:38 +0530635 logger.Debugw(ctx, "reboot-device", log.Fields{"device-id": agent.deviceID})
khenaidoo442e7c72020-03-10 16:13:48 -0400636
Kent Hagermancba2f302020-07-28 13:37:36 -0400637 device := agent.getDeviceReadOnlyWithoutLock()
Maninder2195ccc2021-06-23 20:23:01 +0530638
639 if !agent.proceedWithRequest(device) {
640 desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed.", agent.deviceID)
Maninder581cf4b2021-06-16 22:42:07 +0530641 return status.Errorf(codes.FailedPrecondition, desc)
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530642 }
Rohan Agrawalcf12f202020-08-03 04:42:01 +0000643 subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
Himani Chawlab4c25912020-11-12 17:16:38 +0530644 subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
645
Kent Hagerman2b216042020-04-03 18:28:56 -0400646 ch, err := agent.adapterProxy.RebootDevice(subCtx, device)
khenaidoo442e7c72020-03-10 16:13:48 -0400647 if err != nil {
648 cancel()
Maninder9a1bc0d2020-10-26 11:34:02 +0530649 desc = err.Error()
khenaidoo442e7c72020-03-10 16:13:48 -0400650 return err
651 }
Maninder9a1bc0d2020-10-26 11:34:02 +0530652 operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
653
654 // Wait for response
655 go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "rebootDevice", ch, agent.onSuccess, agent.onFailure, &prevDeviceState)
khenaidoo4d4802d2018-10-04 21:59:49 -0400656 return nil
657}
658
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530659func (agent *Agent) deleteDeviceForce(ctx context.Context) error {
Himani Chawlab4c25912020-11-12 17:16:38 +0530660 logger.Debugw(ctx, "delete-device-force", log.Fields{"device-id": agent.deviceID})
Maninder9a1bc0d2020-10-26 11:34:02 +0530661
662 var desc string
663 operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
664
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530665 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
Maninder9a1bc0d2020-10-26 11:34:02 +0530666 desc = err.Error()
667 agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530668 return err
669 }
670 // Get the device Transient state, return err if it is DELETING
671 previousDeviceTransientState := agent.getTransientState()
672
673 if agent.isStateDeleting(previousDeviceTransientState) {
674 agent.requestQueue.RequestComplete()
Maninder9a1bc0d2020-10-26 11:34:02 +0530675 desc = fmt.Sprintf("deviceId:%s, Device Deletion is in progress",
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530676 agent.deviceID)
Maninder9a1bc0d2020-10-26 11:34:02 +0530677 agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
678 return status.Error(codes.FailedPrecondition, desc)
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530679 }
Maninder0aabf0c2021-03-17 14:55:14 +0530680
681 //Send stop Reconcile if in progress
682 agent.stopReconcile()
683
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530684 device := agent.cloneDeviceWithoutLock()
Himani Chawlab4c25912020-11-12 17:16:38 +0530685 if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
686 voltha.DeviceTransientState_FORCE_DELETING, previousDeviceTransientState); err != nil {
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530687 return err
688 }
689 previousAdminState := device.AdminState
690 if previousAdminState != ic.AdminState_PREPROVISIONED {
691 subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
Himani Chawlab4c25912020-11-12 17:16:38 +0530692 subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
693
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530694 ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
695 if err != nil {
696 cancel()
Maninder9a1bc0d2020-10-26 11:34:02 +0530697 desc = err.Error()
698 agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530699 return err
700 }
Maninder9a1bc0d2020-10-26 11:34:02 +0530701 // As force delete will not be dependent over the response of adapter, marking this operation as success
702 operStatus.Code = common.OperationResp_OPERATION_SUCCESS
703 agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530704 // Since it is a case of force delete, nothing needs to be done on adapter responses.
Himani Chawlab4c25912020-11-12 17:16:38 +0530705 go agent.waitForAdapterForceDeleteResponse(subCtx, cancel, "deleteDeviceForce", ch, agent.onSuccess,
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530706 agent.onFailure)
707 }
708 return nil
709}
710
Kent Hagerman2b216042020-04-03 18:28:56 -0400711func (agent *Agent) deleteDevice(ctx context.Context) error {
Himani Chawlab4c25912020-11-12 17:16:38 +0530712 logger.Debugw(ctx, "delete-device", log.Fields{"device-id": agent.deviceID})
Maninder9a1bc0d2020-10-26 11:34:02 +0530713
714 var desc string
715 operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
716 prevState := agent.device.AdminState
717
718 defer agent.logDeviceUpdate(ctx, "deleteDevice", nil, nil, operStatus, &desc)
719
khenaidoo442e7c72020-03-10 16:13:48 -0400720 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
Maninder9a1bc0d2020-10-26 11:34:02 +0530721 desc = err.Error()
khenaidoo442e7c72020-03-10 16:13:48 -0400722 return err
723 }
Maninder0aabf0c2021-03-17 14:55:14 +0530724
Maninder2195ccc2021-06-23 20:23:01 +0530725 device := agent.cloneDeviceWithoutLock()
726
727 if !agent.proceedWithRequest(device) {
Maninder0aabf0c2021-03-17 14:55:14 +0530728 agent.requestQueue.RequestComplete()
Maninder2195ccc2021-06-23 20:23:01 +0530729 desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", agent.deviceID)
Maninder0aabf0c2021-03-17 14:55:14 +0530730 return status.Error(codes.FailedPrecondition, desc)
731 }
732
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530733 // Get the device Transient state, return err if it is DELETING
734 previousDeviceTransientState := agent.getTransientState()
khenaidoo6e55d9e2019-12-12 18:26:26 -0500735
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530736 previousAdminState := device.AdminState
737 // Change the device transient state to DELETING_FROM_ADAPTER state till the device is removed from adapters.
738 currentDeviceTransientState := voltha.DeviceTransientState_DELETING_FROM_ADAPTER
khenaidoo442e7c72020-03-10 16:13:48 -0400739
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530740 if previousAdminState == ic.AdminState_PREPROVISIONED {
741 // Change the state to DELETING POST ADAPTER RESPONSE directly as adapters have no info of the device.
742 currentDeviceTransientState = voltha.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE
743 }
Himani Chawlab4c25912020-11-12 17:16:38 +0530744 if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
745 currentDeviceTransientState, previousDeviceTransientState); err != nil {
Maninder9a1bc0d2020-10-26 11:34:02 +0530746 desc = err.Error()
npujar1d86a522019-11-14 17:11:16 +0530747 return err
748 }
khenaidoo442e7c72020-03-10 16:13:48 -0400749 // If the device was in pre-prov state (only parent device are in that state) then do not send the request to the
750 // adapter
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530751 if previousAdminState != ic.AdminState_PREPROVISIONED {
Rohan Agrawalcf12f202020-08-03 04:42:01 +0000752 subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
Himani Chawlab4c25912020-11-12 17:16:38 +0530753 subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
754
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530755 ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
khenaidoo442e7c72020-03-10 16:13:48 -0400756 if err != nil {
757 cancel()
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530758 //updating of transient state is required in error
759 if err := agent.updateTransientState(ctx, voltha.DeviceTransientState_DELETE_FAILED); err != nil {
760 logger.Errorw(ctx, "failed-to-update-transient-state-as-delete-failed", log.Fields{"device-id": agent.deviceID})
761 }
Maninder9a1bc0d2020-10-26 11:34:02 +0530762 desc = err.Error()
khenaidoo442e7c72020-03-10 16:13:48 -0400763 return err
764 }
Maninder9a1bc0d2020-10-26 11:34:02 +0530765
766 operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
767 go agent.waitForAdapterResponseAndLogDeviceUpdate(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
768 agent.onDeleteFailure, &prevState)
khenaidoo442e7c72020-03-10 16:13:48 -0400769 }
khenaidoo4d4802d2018-10-04 21:59:49 -0400770 return nil
771}
772
Kent Hagerman2b216042020-04-03 18:28:56 -0400773func (agent *Agent) setParentID(ctx context.Context, device *voltha.Device, parentID string) error {
khenaidoo442e7c72020-03-10 16:13:48 -0400774 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
775 return err
776 }
Himani Chawlab4c25912020-11-12 17:16:38 +0530777 logger.Debugw(ctx, "set-parent-id", log.Fields{"device-id": device.Id, "parent-id": parentID})
khenaidoo6e55d9e2019-12-12 18:26:26 -0500778
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400779 cloned := agent.cloneDeviceWithoutLock()
npujar1d86a522019-11-14 17:11:16 +0530780 cloned.ParentId = parentID
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400781 return agent.updateDeviceAndReleaseLock(ctx, cloned)
khenaidooad06fd72019-10-28 12:26:05 -0400782}
783
khenaidoo442e7c72020-03-10 16:13:48 -0400784// getSwitchCapability retrieves the switch capability of a parent device
Kent Hagerman2b216042020-04-03 18:28:56 -0400785func (agent *Agent) getSwitchCapability(ctx context.Context) (*ic.SwitchCapability, error) {
Himani Chawlab4c25912020-11-12 17:16:38 +0530786 logger.Debugw(ctx, "get-switch-capability", log.Fields{"device-id": agent.deviceID})
khenaidoo442e7c72020-03-10 16:13:48 -0400787
Kent Hagermancba2f302020-07-28 13:37:36 -0400788 device, err := agent.getDeviceReadOnly(ctx)
khenaidoo442e7c72020-03-10 16:13:48 -0400789 if err != nil {
khenaidoob9203542018-09-17 22:56:37 -0400790 return nil, err
khenaidoob9203542018-09-17 22:56:37 -0400791 }
Kent Hagermancba2f302020-07-28 13:37:36 -0400792 ch, err := agent.adapterProxy.GetOfpDeviceInfo(ctx, device)
khenaidoo442e7c72020-03-10 16:13:48 -0400793 if err != nil {
794 return nil, err
795 }
796
797 // Wait for adapter response
798 rpcResponse, ok := <-ch
799 if !ok {
800 return nil, status.Errorf(codes.Aborted, "channel-closed")
801 }
802 if rpcResponse.Err != nil {
803 return nil, rpcResponse.Err
804 }
805 // Successful response
806 switchCap := &ic.SwitchCapability{}
807 if err := ptypes.UnmarshalAny(rpcResponse.Reply, switchCap); err != nil {
npujar1d86a522019-11-14 17:11:16 +0530808 return nil, err
809 }
810 return switchCap, nil
khenaidoob9203542018-09-17 22:56:37 -0400811}
812
Rohan Agrawal31f21802020-06-12 05:38:46 +0000813func (agent *Agent) onPacketFailure(ctx context.Context, rpc string, response interface{}, args ...interface{}) {
khenaidoo442e7c72020-03-10 16:13:48 -0400814 // packet data is encoded in the args param as the first parameter
815 var packet []byte
816 if len(args) >= 1 {
817 if pkt, ok := args[0].([]byte); ok {
818 packet = pkt
819 }
820 }
821 var errResp error
822 if err, ok := response.(error); ok {
823 errResp = err
824 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000825 logger.Warnw(ctx, "packet-out-error", log.Fields{
khenaidoo442e7c72020-03-10 16:13:48 -0400826 "device-id": agent.deviceID,
827 "error": errResp,
828 "packet": hex.EncodeToString(packet),
829 })
830}
831
Kent Hagerman2b216042020-04-03 18:28:56 -0400832func (agent *Agent) packetOut(ctx context.Context, outPort uint32, packet *ofp.OfpPacketOut) error {
Scott Baker80678602019-11-14 16:57:36 -0800833 // If deviceType=="" then we must have taken ownership of this device.
834 // Fixes VOL-2226 where a core would take ownership and have stale data
835 if agent.deviceType == "" {
npujar467fe752020-01-16 20:17:45 +0530836 agent.reconcileWithKVStore(ctx)
Scott Baker80678602019-11-14 16:57:36 -0800837 }
khenaidoofdbad6e2018-11-06 22:26:38 -0500838 // Send packet to adapter
Rohan Agrawalcf12f202020-08-03 04:42:01 +0000839 subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
Himani Chawlab4c25912020-11-12 17:16:38 +0530840 subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
841
Kent Hagerman2b216042020-04-03 18:28:56 -0400842 ch, err := agent.adapterProxy.PacketOut(subCtx, agent.deviceType, agent.deviceID, outPort, packet)
khenaidoo442e7c72020-03-10 16:13:48 -0400843 if err != nil {
844 cancel()
845 return nil
khenaidoofdbad6e2018-11-06 22:26:38 -0500846 }
khenaidoo442e7c72020-03-10 16:13:48 -0400847 go agent.waitForAdapterResponse(subCtx, cancel, "packetOut", ch, agent.onSuccess, agent.onPacketFailure, packet.Data)
khenaidoofdbad6e2018-11-06 22:26:38 -0500848 return nil
849}
850
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400851func (agent *Agent) updateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
852 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
853 return err
854 }
Himani Chawlab4c25912020-11-12 17:16:38 +0530855 logger.Debugw(ctx, "update-device-using-adapter-data", log.Fields{"device-id": device.Id})
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400856
857 cloned := agent.cloneDeviceWithoutLock()
Mahir Gunyel8e2707d2019-07-25 00:36:21 -0700858 cloned.Root = device.Root
859 cloned.Vendor = device.Vendor
860 cloned.Model = device.Model
861 cloned.SerialNumber = device.SerialNumber
862 cloned.MacAddress = device.MacAddress
863 cloned.Vlan = device.Vlan
864 cloned.Reason = device.Reason
Andrea Campanella025667e2021-01-14 11:50:07 +0100865 cloned.ImageDownloads = device.ImageDownloads
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400866 return agent.updateDeviceAndReleaseLock(ctx, cloned)
khenaidoo43c82122018-11-22 18:38:28 -0500867}
868
Kent Hagerman2b216042020-04-03 18:28:56 -0400869func (agent *Agent) updateDeviceStatus(ctx context.Context, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
khenaidoo442e7c72020-03-10 16:13:48 -0400870 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
871 return err
872 }
khenaidoo6e55d9e2019-12-12 18:26:26 -0500873
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400874 cloned := agent.cloneDeviceWithoutLock()
npujar1d86a522019-11-14 17:11:16 +0530875 // Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400876 if s, ok := voltha.ConnectStatus_Types_name[int32(connStatus)]; ok {
Himani Chawlab4c25912020-11-12 17:16:38 +0530877 logger.Debugw(ctx, "update-device-status-conn", log.Fields{"ok": ok, "val": s})
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400878 cloned.ConnectStatus = connStatus
npujar1d86a522019-11-14 17:11:16 +0530879 }
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400880 if s, ok := voltha.OperStatus_Types_name[int32(operStatus)]; ok {
Himani Chawlab4c25912020-11-12 17:16:38 +0530881 logger.Debugw(ctx, "update-device-status-conn", log.Fields{"ok": ok, "val": s})
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400882 cloned.OperStatus = operStatus
npujar1d86a522019-11-14 17:11:16 +0530883 }
Himani Chawlab4c25912020-11-12 17:16:38 +0530884 logger.Debugw(ctx, "update-device-status", log.Fields{"device-id": cloned.Id, "oper-status": cloned.OperStatus, "connect-status": cloned.ConnectStatus})
npujar1d86a522019-11-14 17:11:16 +0530885 // Store the device
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400886 return agent.updateDeviceAndReleaseLock(ctx, cloned)
khenaidoo92e62c52018-10-03 14:02:54 -0400887}
888
khenaidoob9203542018-09-17 22:56:37 -0400889// TODO: A generic device update by attribute
Kent Hagerman2b216042020-04-03 18:28:56 -0400890func (agent *Agent) updateDeviceAttribute(ctx context.Context, name string, value interface{}) {
khenaidoob9203542018-09-17 22:56:37 -0400891 if value == nil {
892 return
893 }
khenaidoo6e55d9e2019-12-12 18:26:26 -0500894
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400895 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
896 logger.Warnw(ctx, "request-aborted", log.Fields{"device-id": agent.deviceID, "name": name, "error": err})
897 return
898 }
899
900 cloned := agent.cloneDeviceWithoutLock()
khenaidoob9203542018-09-17 22:56:37 -0400901 updated := false
khenaidoo6e55d9e2019-12-12 18:26:26 -0500902 s := reflect.ValueOf(cloned).Elem()
khenaidoob9203542018-09-17 22:56:37 -0400903 if s.Kind() == reflect.Struct {
904 // exported field
905 f := s.FieldByName(name)
906 if f.IsValid() && f.CanSet() {
907 switch f.Kind() {
908 case reflect.String:
909 f.SetString(value.(string))
910 updated = true
911 case reflect.Uint32:
912 f.SetUint(uint64(value.(uint32)))
913 updated = true
914 case reflect.Bool:
915 f.SetBool(value.(bool))
916 updated = true
917 }
918 }
919 }
divyadesaicb8b59d2020-08-18 09:55:47 +0000920 logger.Debugw(ctx, "update-field-status", log.Fields{"device-id": cloned.Id, "name": name, "updated": updated})
khenaidoob9203542018-09-17 22:56:37 -0400921 // Save the data
khenaidoo6e55d9e2019-12-12 18:26:26 -0500922
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400923 if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000924 logger.Warnw(ctx, "attribute-update-failed", log.Fields{"attribute": name, "value": value})
khenaidoob9203542018-09-17 22:56:37 -0400925 }
khenaidoob9203542018-09-17 22:56:37 -0400926}
serkant.uluderya334479d2019-04-10 08:26:15 -0700927
Kent Hagerman45a13e42020-04-13 12:23:50 -0400928func (agent *Agent) simulateAlarm(ctx context.Context, simulateReq *voltha.SimulateAlarmRequest) error {
khenaidoo442e7c72020-03-10 16:13:48 -0400929 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
930 return err
931 }
932 defer agent.requestQueue.RequestComplete()
Himani Chawlab4c25912020-11-12 17:16:38 +0530933 logger.Debugw(ctx, "simulate-alarm", log.Fields{"device-id": agent.deviceID})
khenaidoo6e55d9e2019-12-12 18:26:26 -0500934
Kent Hagermancba2f302020-07-28 13:37:36 -0400935 device := agent.getDeviceReadOnlyWithoutLock()
khenaidoo6e55d9e2019-12-12 18:26:26 -0500936
Rohan Agrawalcf12f202020-08-03 04:42:01 +0000937 subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
Himani Chawlab4c25912020-11-12 17:16:38 +0530938 subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
939
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400940 ch, err := agent.adapterProxy.SimulateAlarm(subCtx, device, simulateReq)
khenaidoo442e7c72020-03-10 16:13:48 -0400941 if err != nil {
942 cancel()
npujar1d86a522019-11-14 17:11:16 +0530943 return err
serkant.uluderya334479d2019-04-10 08:26:15 -0700944 }
khenaidoo442e7c72020-03-10 16:13:48 -0400945 go agent.waitForAdapterResponse(subCtx, cancel, "simulateAlarm", ch, agent.onSuccess, agent.onFailure)
serkant.uluderya334479d2019-04-10 08:26:15 -0700946 return nil
947}
Mahir Gunyelb5851672019-07-24 10:46:26 +0300948
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400949// This function updates the device in the DB, releases the device lock, and runs any state transitions.
950// The calling function MUST hold the device lock. The caller MUST NOT modify the device after this is called.
951func (agent *Agent) updateDeviceAndReleaseLock(ctx context.Context, device *voltha.Device) error {
952 // fail early if this agent is no longer valid
Kent Hagerman4f355f52020-03-30 16:01:33 -0400953 if agent.stopped {
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400954 agent.requestQueue.RequestComplete()
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530955 return errors.New("device-agent-stopped")
Thomas Lee Se5a44012019-11-07 20:32:24 +0530956 }
Kent Hagerman4f355f52020-03-30 16:01:33 -0400957
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400958 // update in db
Kent Hagermanf5a67352020-04-30 15:15:26 -0400959 if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400960 agent.requestQueue.RequestComplete()
Kent Hagerman4f355f52020-03-30 16:01:33 -0400961 return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
Mahir Gunyelb5851672019-07-24 10:46:26 +0300962 }
divyadesaicb8b59d2020-08-18 09:55:47 +0000963 logger.Debugw(ctx, "updated-device-in-store", log.Fields{"device-id: ": agent.deviceID})
Mahir Gunyelb5851672019-07-24 10:46:26 +0300964
Kent Hagerman6031aad2020-07-29 16:36:33 -0400965 prevDevice := agent.device
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400966 // update the device
khenaidoo0db4c812020-05-27 15:27:30 -0400967 agent.device = device
Mahir Gunyelb0343bf2021-05-11 14:14:26 -0700968 //If any of the states has chenged, send the change event.
969 if prevDevice.OperStatus != device.OperStatus || prevDevice.ConnectStatus != device.ConnectStatus || prevDevice.AdminState != device.AdminState {
970 _ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, prevDevice.OperStatus, prevDevice.ConnectStatus, prevDevice.AdminState, device, time.Now().Unix())
971 }
Maninder0aabf0c2021-03-17 14:55:14 +0530972 deviceTransientState := agent.getTransientState()
973
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400974 // release lock before processing transition
975 agent.requestQueue.RequestComplete()
Himani Chawlab4c25912020-11-12 17:16:38 +0530976 subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400977
Himani Chawlab4c25912020-11-12 17:16:38 +0530978 if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
Maninder0aabf0c2021-03-17 14:55:14 +0530979 device, prevDevice, deviceTransientState, deviceTransientState); err != nil {
Himani Chawlab4c25912020-11-12 17:16:38 +0530980 logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
981 // Sending RPC EVENT here
982 rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
Himani Chawla606a4f02021-03-23 19:45:58 +0530983 agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
984 nil, time.Now().Unix())
Himani Chawlab4c25912020-11-12 17:16:38 +0530985
Kent Hagermanf6db9f12020-07-22 17:16:19 -0400986 }
Mahir Gunyelb5851672019-07-24 10:46:26 +0300987 return nil
988}
Mahir Gunyelfdee9212019-10-16 16:52:21 -0700989
Himani Chawla2ba1c9c2020-10-07 13:19:03 +0530990// This function updates the device transient in the DB through loader, releases the device lock, and runs any state transitions.
991// The calling function MUST hold the device lock. The caller MUST NOT modify the device after this is called.
992func (agent *Agent) updateDeviceWithTransientStateAndReleaseLock(ctx context.Context, device *voltha.Device,
993 transientState, prevTransientState voltha.DeviceTransientState_Types) error {
994 // fail early if this agent is no longer valid
995 if agent.stopped {
996 agent.requestQueue.RequestComplete()
997 return errors.New("device-agent-stopped")
998 }
999 //update device TransientState
1000 if err := agent.updateTransientState(ctx, transientState); err != nil {
1001 agent.requestQueue.RequestComplete()
1002 return err
1003 }
1004 // update in db
1005 if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
1006 //Reverting TransientState update
1007 err := agent.updateTransientState(ctx, prevTransientState)
1008 logger.Errorw(ctx, "failed-to-revert-transient-state-update-on-error", log.Fields{"device-id": device.Id,
Himani Chawlab4c25912020-11-12 17:16:38 +05301009 "previous-transient-state": prevTransientState, "current-transient-state": transientState})
Himani Chawla2ba1c9c2020-10-07 13:19:03 +05301010 agent.requestQueue.RequestComplete()
1011 return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
1012 }
1013
1014 logger.Debugw(ctx, "updated-device-in-store", log.Fields{"device-id: ": agent.deviceID})
1015
1016 prevDevice := agent.device
1017 // update the device
1018 agent.device = device
Mahir Gunyelb0343bf2021-05-11 14:14:26 -07001019 //If any of the states has chenged, send the change event.
1020 if prevDevice.OperStatus != device.OperStatus || prevDevice.ConnectStatus != device.ConnectStatus || prevDevice.AdminState != device.AdminState {
1021 _ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, prevDevice.OperStatus, prevDevice.ConnectStatus, prevDevice.AdminState, device, time.Now().Unix())
1022 }
Himani Chawla2ba1c9c2020-10-07 13:19:03 +05301023
1024 // release lock before processing transition
1025 agent.requestQueue.RequestComplete()
Himani Chawlab4c25912020-11-12 17:16:38 +05301026 subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
1027 if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
Himani Chawla2ba1c9c2020-10-07 13:19:03 +05301028 device, prevDevice, transientState, prevTransientState); err != nil {
Himani Chawlab4c25912020-11-12 17:16:38 +05301029 logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
1030 // Sending RPC EVENT here
1031 rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
Himani Chawla606a4f02021-03-23 19:45:58 +05301032 agent.deviceMgr.SendRPCEvent(ctx, "RPC_ERROR_RAISE_EVENT", rpce, voltha.EventCategory_COMMUNICATION,
1033 nil, time.Now().Unix())
Himani Chawla2ba1c9c2020-10-07 13:19:03 +05301034 }
1035 return nil
1036}
Kent Hagerman2b216042020-04-03 18:28:56 -04001037func (agent *Agent) updateDeviceReason(ctx context.Context, reason string) error {
khenaidoo442e7c72020-03-10 16:13:48 -04001038 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
1039 return err
1040 }
Maninder9a1bc0d2020-10-26 11:34:02 +05301041
Himani Chawlab4c25912020-11-12 17:16:38 +05301042 logger.Debugw(ctx, "update-device-reason", log.Fields{"device-id": agent.deviceID, "reason": reason})
khenaidoo6e55d9e2019-12-12 18:26:26 -05001043
Maninder9a1bc0d2020-10-26 11:34:02 +05301044 var desc string
1045 operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
1046
1047 defer agent.logDeviceUpdate(ctx, "updateDeviceReason", nil, nil, operStatus, &desc)
1048
Kent Hagermanf6db9f12020-07-22 17:16:19 -04001049 cloned := agent.cloneDeviceWithoutLock()
npujar1d86a522019-11-14 17:11:16 +05301050 cloned.Reason = reason
Maninder9a1bc0d2020-10-26 11:34:02 +05301051 retErr := agent.updateDeviceAndReleaseLock(ctx, cloned)
1052 if retErr != nil {
1053 desc = retErr.Error()
1054 } else {
1055 operStatus.Code = common.OperationResp_OPERATION_SUCCESS
1056 desc = reason
1057 }
1058 return retErr
Mahir Gunyelfdee9212019-10-16 16:52:21 -07001059}
kesavandbc2d1622020-01-21 00:42:01 -05001060
Kent Hagerman2b216042020-04-03 18:28:56 -04001061func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
Himani Chawlab4c25912020-11-12 17:16:38 +05301062 logger.Debugw(ctx, "child-device-lost", log.Fields{"child-device-id": device.Id, "parent-device-id": agent.deviceID})
Chaitrashree G S543df3e2020-02-24 22:36:54 -05001063
Kent Hagerman2a07b862020-06-19 15:23:07 -04001064 // Remove the associated peer ports on the parent device
1065 for portID := range agent.portLoader.ListIDs() {
1066 if portHandle, have := agent.portLoader.Lock(portID); have {
1067 oldPort := portHandle.GetReadOnly()
1068 updatedPeers := make([]*voltha.Port_PeerPort, 0)
1069 for _, peerPort := range oldPort.Peers {
1070 if peerPort.DeviceId != device.Id {
1071 updatedPeers = append(updatedPeers, peerPort)
1072 }
khenaidoo442e7c72020-03-10 16:13:48 -04001073 }
Kent Hagerman2a07b862020-06-19 15:23:07 -04001074 newPort := *oldPort
1075 newPort.Peers = updatedPeers
1076 if err := portHandle.Update(ctx, &newPort); err != nil {
1077 portHandle.Unlock()
1078 return nil
1079 }
1080 portHandle.Unlock()
khenaidoo442e7c72020-03-10 16:13:48 -04001081 }
Chaitrashree G S543df3e2020-02-24 22:36:54 -05001082 }
1083
khenaidoo442e7c72020-03-10 16:13:48 -04001084 //send request to adapter
Rohan Agrawalcf12f202020-08-03 04:42:01 +00001085 subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
Himani Chawlab4c25912020-11-12 17:16:38 +05301086 subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
1087
Girish Gowdra6f9b10e2021-03-11 14:36:39 -08001088 ch, err := agent.adapterProxy.ChildDeviceLost(ctx, agent.deviceType, device)
khenaidoo442e7c72020-03-10 16:13:48 -04001089 if err != nil {
1090 cancel()
1091 return err
Chaitrashree G S543df3e2020-02-24 22:36:54 -05001092 }
khenaidoo442e7c72020-03-10 16:13:48 -04001093 go agent.waitForAdapterResponse(subCtx, cancel, "childDeviceLost", ch, agent.onSuccess, agent.onFailure)
Chaitrashree G S543df3e2020-02-24 22:36:54 -05001094 return nil
Chaitrashree G S543df3e2020-02-24 22:36:54 -05001095}
onkarkundargi87285252020-01-27 11:34:52 +05301096
Kent Hagerman2b216042020-04-03 18:28:56 -04001097func (agent *Agent) startOmciTest(ctx context.Context, omcitestrequest *voltha.OmciTestRequest) (*voltha.TestResponse, error) {
onkarkundargi87285252020-01-27 11:34:52 +05301098 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
1099 return nil, err
1100 }
1101
Kent Hagermanf6db9f12020-07-22 17:16:19 -04001102 cloned := agent.cloneDeviceWithoutLock()
Matteo Scandolod525ae32020-04-02 17:27:29 -07001103
Kent Hagermanf6db9f12020-07-22 17:16:19 -04001104 if cloned.Adapter == "" {
1105 adapterName, err := agent.adapterMgr.GetAdapterType(cloned.Type)
Matteo Scandolod525ae32020-04-02 17:27:29 -07001106 if err != nil {
1107 agent.requestQueue.RequestComplete()
1108 return nil, err
1109 }
Kent Hagermanf6db9f12020-07-22 17:16:19 -04001110 cloned.Adapter = adapterName
onkarkundargi87285252020-01-27 11:34:52 +05301111 }
1112
1113 // Send request to the adapter
Kent Hagermanf6db9f12020-07-22 17:16:19 -04001114 ch, err := agent.adapterProxy.StartOmciTest(ctx, cloned, omcitestrequest)
onkarkundargi87285252020-01-27 11:34:52 +05301115 agent.requestQueue.RequestComplete()
1116 if err != nil {
1117 return nil, err
1118 }
1119
1120 // Wait for the adapter response
1121 rpcResponse, ok := <-ch
1122 if !ok {
1123 return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
1124 }
1125 if rpcResponse.Err != nil {
1126 return nil, rpcResponse.Err
1127 }
1128
1129 // Unmarshal and return the response
1130 testResp := &voltha.TestResponse{}
1131 if err := ptypes.UnmarshalAny(rpcResponse.Reply, testResp); err != nil {
1132 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
1133 }
Himani Chawlab4c25912020-11-12 17:16:38 +05301134 logger.Debugw(ctx, "omci_test_request-success-device-agent", log.Fields{"test-resp": testResp})
onkarkundargi87285252020-01-27 11:34:52 +05301135 return testResp, nil
1136}
Dinesh Belwalkarc1129f12020-02-27 10:41:33 -08001137
1138func (agent *Agent) getExtValue(ctx context.Context, pdevice *voltha.Device, cdevice *voltha.Device, valueparam *voltha.ValueSpecifier) (*voltha.ReturnValues, error) {
Himani Chawlab4c25912020-11-12 17:16:38 +05301139 logger.Debugw(ctx, "get-ext-value", log.Fields{"device-id": agent.deviceID, "onu-id": valueparam.Id, "value-type": valueparam.Value})
Dinesh Belwalkarc1129f12020-02-27 10:41:33 -08001140 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
1141 return nil, err
1142 }
1143
1144 //send request to adapter
1145 ch, err := agent.adapterProxy.GetExtValue(ctx, pdevice, cdevice, valueparam.Id, valueparam.Value)
1146 agent.requestQueue.RequestComplete()
1147 if err != nil {
1148 return nil, err
1149 }
1150
1151 // Wait for the adapter response
1152 rpcResponse, ok := <-ch
1153 if !ok {
1154 return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
1155 }
1156 if rpcResponse.Err != nil {
1157 return nil, rpcResponse.Err
1158 }
1159
1160 // Unmarshal and return the response
1161 Resp := &voltha.ReturnValues{}
1162 if err := ptypes.UnmarshalAny(rpcResponse.Reply, Resp); err != nil {
1163 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
1164 }
Himani Chawlab4c25912020-11-12 17:16:38 +05301165 logger.Debugw(ctx, "get-ext-value-success-device-agent", log.Fields{"Resp": Resp})
Dinesh Belwalkarc1129f12020-02-27 10:41:33 -08001166 return Resp, nil
1167}
dpaul62686312020-06-23 14:17:36 +05301168
1169func (agent *Agent) setExtValue(ctx context.Context, device *voltha.Device, value *voltha.ValueSet) (*empty.Empty, error) {
Himani Chawlab4c25912020-11-12 17:16:38 +05301170 logger.Debugw(ctx, "set-ext-value", log.Fields{"device-id": value.Id})
dpaul62686312020-06-23 14:17:36 +05301171 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
1172 return nil, err
1173 }
1174
1175 //send request to adapter
1176 ch, err := agent.adapterProxy.SetExtValue(ctx, device, value)
1177 agent.requestQueue.RequestComplete()
1178 if err != nil {
1179 return nil, err
1180 }
1181
1182 // Wait for the adapter response
1183 rpcResponse, ok := <-ch
1184 if !ok {
1185 return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
1186 }
1187 if rpcResponse.Err != nil {
1188 return nil, rpcResponse.Err
1189 }
1190
1191 // Unmarshal and return the response
Himani Chawlab4c25912020-11-12 17:16:38 +05301192 logger.Debug(ctx, "set-ext-value-success-device-agent")
dpaul62686312020-06-23 14:17:36 +05301193 return &empty.Empty{}, nil
1194}
Salman Siddiqui1cf95042020-11-19 00:42:56 +05301195
1196func (agent *Agent) getSingleValue(ctx context.Context, request *extension.SingleGetValueRequest) (*extension.SingleGetValueResponse, error) {
Himani Chawlab4c25912020-11-12 17:16:38 +05301197 logger.Debugw(ctx, "get-single-value", log.Fields{"device-id": request.TargetId})
Salman Siddiqui1cf95042020-11-19 00:42:56 +05301198
1199 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
1200 return nil, err
1201 }
1202
1203 cloned := agent.cloneDeviceWithoutLock()
1204
1205 //send request to adapter
1206 ch, err := agent.adapterProxy.GetSingleValue(ctx, cloned.Adapter, request)
1207 agent.requestQueue.RequestComplete()
1208 if err != nil {
1209 return nil, err
1210 }
1211
1212 // Wait for the adapter response
1213 rpcResponse, ok := <-ch
1214 if !ok {
1215 return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
1216 }
1217
1218 if rpcResponse.Err != nil {
1219 return nil, rpcResponse.Err
1220 }
1221
1222 resp := &extension.SingleGetValueResponse{}
1223 if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
1224 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
1225 }
1226
1227 return resp, nil
1228}
1229
1230func (agent *Agent) setSingleValue(ctx context.Context, request *extension.SingleSetValueRequest) (*extension.SingleSetValueResponse, error) {
Himani Chawlab4c25912020-11-12 17:16:38 +05301231 logger.Debugw(ctx, "set-single-value", log.Fields{"device-id": request.TargetId})
Salman Siddiqui1cf95042020-11-19 00:42:56 +05301232
1233 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
1234 return nil, err
1235 }
1236
1237 cloned := agent.cloneDeviceWithoutLock()
1238
1239 //send request to adapter
1240 ch, err := agent.adapterProxy.SetSingleValue(ctx, cloned.Adapter, request)
1241 agent.requestQueue.RequestComplete()
1242 if err != nil {
1243 return nil, err
1244 }
1245
1246 // Wait for the adapter response
1247 rpcResponse, ok := <-ch
1248 if !ok {
1249 return nil, status.Errorf(codes.Aborted, "channel-closed-cloned-id-%s", agent.deviceID)
1250 }
1251
1252 if rpcResponse.Err != nil {
1253 return nil, rpcResponse.Err
1254 }
1255
1256 resp := &extension.SingleSetValueResponse{}
1257 if err := ptypes.UnmarshalAny(rpcResponse.Reply, resp); err != nil {
1258 return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
1259 }
1260
1261 return resp, nil
1262}
Maninder0aabf0c2021-03-17 14:55:14 +05301263
Maninder2195ccc2021-06-23 20:23:01 +05301264func (agent *Agent) proceedWithRequest(device *voltha.Device) bool {
1265 return !agent.isDeletionInProgress() && !agent.isInReconcileState(device)
Maninder0aabf0c2021-03-17 14:55:14 +05301266}
1267
1268func (agent *Agent) stopReconcile() {
1269 agent.stopReconcilingMutex.Lock()
1270 if agent.stopReconciling != nil {
1271 agent.stopReconciling <- 0
1272 }
1273 agent.stopReconcilingMutex.Unlock()
1274}
1275
1276func (agent *Agent) ReconcileDevice(ctx context.Context, device *voltha.Device) {
1277 var desc string
1278 operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
1279
1280 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
1281 desc = err.Error()
1282 agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
1283 return
1284 }
1285
Maninder2195ccc2021-06-23 20:23:01 +05301286 if !agent.proceedWithRequest(device) {
Maninder0aabf0c2021-03-17 14:55:14 +05301287 agent.requestQueue.RequestComplete()
Maninder2195ccc2021-06-23 20:23:01 +05301288 desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", device.Id)
Maninder0aabf0c2021-03-17 14:55:14 +05301289 logger.Errorf(ctx, desc)
1290 agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
1291 return
1292 }
1293
1294 //set transient state to RECONCILE IN PROGRESS
1295 err := agent.updateTransientState(ctx, voltha.DeviceTransientState_RECONCILE_IN_PROGRESS)
1296 if err != nil {
1297 agent.requestQueue.RequestComplete()
1298 desc = fmt.Sprintf("Not able to set device transient state to Reconcile in progress."+
1299 "Err: %s", err.Error())
1300 logger.Errorf(ctx, desc)
1301 agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
1302 return
1303 }
1304
Maninder0aabf0c2021-03-17 14:55:14 +05301305 reconcilingBackoff := backoff.NewExponentialBackOff()
1306 reconcilingBackoff.InitialInterval = agent.config.BackoffRetryInitialInterval
1307 reconcilingBackoff.MaxElapsedTime = agent.config.BackoffRetryMaxElapsedTime
1308 reconcilingBackoff.MaxInterval = agent.config.BackoffRetryMaxInterval
1309
1310 //making here to keep lifecycle of this channel within the scope of retryReconcile
1311 agent.stopReconcilingMutex.Lock()
1312 agent.stopReconciling = make(chan int)
1313 agent.stopReconcilingMutex.Unlock()
1314
David K. Bainbridge482e4422021-06-30 12:23:42 -07001315 // defined outside the retry loop so it can be cleaned
1316 // up when the loop breaks
1317 var backoffTimer *time.Timer
1318
1319retry:
Maninder0aabf0c2021-03-17 14:55:14 +05301320 for {
David K. Bainbridge482e4422021-06-30 12:23:42 -07001321 // If the operations state of the device is RECONCILING_FAILED then we do not
1322 // want to continue to attempt reconciliation.
1323 deviceRef := agent.getDeviceReadOnlyWithoutLock()
1324 if deviceRef.OperStatus == common.OperStatus_RECONCILING_FAILED {
1325 logger.Warnw(ctx, "reconciling-failed-halting-retries",
1326 log.Fields{"device-id": device.Id})
1327 agent.requestQueue.RequestComplete()
1328 break retry
1329 }
1330
Maninder0aabf0c2021-03-17 14:55:14 +05301331 // Use an exponential back off to prevent getting into a tight loop
1332 duration := reconcilingBackoff.NextBackOff()
1333 //This case should never occur in default case as max elapsed time for backoff is 0(by default) , so it will never return stop
1334 if duration == backoff.Stop {
1335 // If we reach a maximum then warn and reset the backoff
1336 // timer and keep attempting.
1337 logger.Warnw(ctx, "maximum-reconciling-backoff-reached--resetting-backoff-timer",
1338 log.Fields{"max-reconciling-backoff": reconcilingBackoff.MaxElapsedTime,
1339 "device-id": device.Id})
1340 reconcilingBackoff.Reset()
1341 duration = reconcilingBackoff.NextBackOff()
1342 }
1343
David K. Bainbridge482e4422021-06-30 12:23:42 -07001344 backoffTimer = time.NewTimer(duration)
Maninder0aabf0c2021-03-17 14:55:14 +05301345
Maninder90670dd2021-06-29 20:05:38 +05301346 logger.Debugw(ctx, "retrying-reconciling", log.Fields{"deviceID": device.Id})
Maninder0aabf0c2021-03-17 14:55:14 +05301347 // Send a reconcile request to the adapter.
1348 ch, err := agent.adapterProxy.ReconcileDevice(ctx, agent.device)
1349 //release lock before moving further
1350 agent.requestQueue.RequestComplete()
1351 if err != nil {
1352 desc := fmt.Sprintf("Failed reconciling from adapter side. Err: %s", err.Error())
1353 agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
1354 <-backoffTimer.C
1355 // backoffTimer expired continue
1356 // Take lock back before retrying
1357 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
1358 desc = err.Error()
1359 agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
David K. Bainbridge482e4422021-06-30 12:23:42 -07001360 break retry
Maninder0aabf0c2021-03-17 14:55:14 +05301361 }
1362 continue
1363 }
1364
1365 // if return err retry if not then break loop and quit retrying reconcile
1366 if err = agent.waitForReconcileResponse(backoffTimer, ch); err != nil {
1367 desc = err.Error()
1368 logger.Errorf(ctx, desc)
1369 agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
Maninder90670dd2021-06-29 20:05:38 +05301370 <-backoffTimer.C
Maninder0aabf0c2021-03-17 14:55:14 +05301371 } else {
1372 operStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_IN_PROGRESS}
1373 agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
David K. Bainbridge482e4422021-06-30 12:23:42 -07001374 break retry
Maninder0aabf0c2021-03-17 14:55:14 +05301375 }
1376
1377 // Take lock back before retrying
1378 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
1379 desc = err.Error()
1380 agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
David K. Bainbridge482e4422021-06-30 12:23:42 -07001381 break retry
1382 }
1383 }
1384
1385 // Retry loop is broken, so stop any timers and drain the channel
1386 if backoffTimer != nil && !backoffTimer.Stop() {
1387
1388 // As per documentation and stack overflow when a timer is stopped its
1389 // channel should be drained. The issue is that Stop returns false
1390 // either if the timer has already been fired "OR" if the timer can be
1391 // stopped before being fired. This means that in some cases the
1392 // channel has already be emptied so attempting to read from it means
1393 // a blocked thread. To get around this use a select so if the
1394 // channel is already empty the default case hits and we are not
1395 // blocked.
1396 select {
1397 case <-backoffTimer.C:
1398 default:
Maninder0aabf0c2021-03-17 14:55:14 +05301399 }
1400 }
1401}
1402
1403func (agent *Agent) waitForReconcileResponse(backoffTimer *time.Timer, ch chan *kafka.RpcResponse) error {
1404 select {
1405 // wait for response
1406 case resp, ok := <-ch:
1407 if !ok {
1408 //channel-closed
1409 return errors.New("channel on which reconcile response is awaited is closed")
1410 } else if resp.Err != nil {
1411 //error encountered
Maninder90670dd2021-06-29 20:05:38 +05301412 return fmt.Errorf("error encountered while retrying reconcile. Err: %s", resp.Err.Error())
Maninder0aabf0c2021-03-17 14:55:14 +05301413 }
1414
1415 //In case of success quit retrying and wait for adapter to reset operation state of device
1416 agent.stopReconcilingMutex.Lock()
1417 agent.stopReconciling = nil
1418 agent.stopReconcilingMutex.Unlock()
1419 return nil
1420
1421 //if reconciling need to be stopped
1422 case _, ok := <-agent.stopReconciling:
1423 agent.stopReconcilingMutex.Lock()
1424 agent.stopReconciling = nil
1425 agent.stopReconcilingMutex.Unlock()
1426 if !ok {
1427 //channel-closed
1428 return errors.New("channel used to notify to stop reconcile is closed")
1429 }
1430 return nil
1431 //continue if timer expired
1432 case <-backoffTimer.C:
1433 }
1434 return nil
1435}
1436
1437func (agent *Agent) reconcilingCleanup(ctx context.Context) error {
1438 var desc string
1439 operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
1440 if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
1441 desc = err.Error()
1442 agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
1443 return err
1444 }
1445 defer agent.requestQueue.RequestComplete()
1446 err := agent.updateTransientState(ctx, voltha.DeviceTransientState_NONE)
1447 if err != nil {
1448 desc = fmt.Sprintf("Not able to clear device transient state from Reconcile in progress."+
1449 "Err: %s", err.Error())
1450 logger.Errorf(ctx, desc)
1451 agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
1452 return err
1453 }
1454 operStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
1455 agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
1456 return nil
1457}