blob: 9a1ff1cd552174de2b677b100a94fba8b9fc68d8 [file] [log] [blame]
khenaidoo21d51152019-02-01 13:48:37 -05001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
npujar1d86a522019-11-14 17:11:16 +053016
Kent Hagerman2b216042020-04-03 18:28:56 -040017package adapter
khenaidoo21d51152019-02-01 13:48:37 -050018
19import (
20 "context"
khenaidood948f772021-08-11 17:49:24 -040021 "errors"
khenaidoo21d51152019-02-01 13:48:37 -050022 "fmt"
npujar1d86a522019-11-14 17:11:16 +053023 "sync"
Kent Hagerman16ce36a2019-12-17 13:40:53 -050024 "time"
npujar1d86a522019-11-14 17:11:16 +053025
khenaidood948f772021-08-11 17:49:24 -040026 "github.com/opencord/voltha-lib-go/v7/pkg/db"
27 vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
khenaidoo9beaaf12021-10-19 17:32:01 -040028 "github.com/opencord/voltha-protos/v5/go/adapter_service"
khenaidood948f772021-08-11 17:49:24 -040029 "github.com/opencord/voltha-protos/v5/go/common"
khenaidoo9beaaf12021-10-19 17:32:01 -040030 "github.com/opencord/voltha-protos/v5/go/core_adapter"
khenaidood948f772021-08-11 17:49:24 -040031
khenaidoo21d51152019-02-01 13:48:37 -050032 "github.com/gogo/protobuf/proto"
Kent Hagerman45a13e42020-04-13 12:23:50 -040033 "github.com/golang/protobuf/ptypes/empty"
sbarbari17d7e222019-11-05 10:02:29 -050034 "github.com/opencord/voltha-go/db/model"
khenaidood948f772021-08-11 17:49:24 -040035 "github.com/opencord/voltha-lib-go/v7/pkg/log"
36 "github.com/opencord/voltha-lib-go/v7/pkg/probe"
37 "github.com/opencord/voltha-protos/v5/go/voltha"
Kent Hagerman45a13e42020-04-13 12:23:50 -040038 "google.golang.org/grpc/codes"
39 "google.golang.org/grpc/status"
khenaidoo21d51152019-02-01 13:48:37 -050040)
41
Kent Hagerman2b216042020-04-03 18:28:56 -040042// Manager represents adapter manager attributes
43type Manager struct {
khenaidood948f772021-08-11 17:49:24 -040044 adapterAgents map[string]*agent
45 adapterEndpoints map[Endpoint]*agent
46 deviceTypes map[string]*voltha.DeviceType
47 adapterDbProxy *model.Proxy
48 deviceTypeDbProxy *model.Proxy
49 onAdapterRestart vgrpc.RestartedHandler
50 endpointMgr EndpointManager
51 lockAdapterAgentsMap sync.RWMutex
52 lockDeviceTypesMap sync.RWMutex
53 lockAdapterEndPointsMap sync.RWMutex
54 liveProbeInterval time.Duration
khenaidoo25057da2021-12-08 14:40:45 -050055 coreEndpoint string
Girish Gowdra11ddb232022-05-26 12:19:59 -070056 rollingUpdateMap map[string]bool
57 rollingUpdateLock sync.RWMutex
58 rxStreamCloseChMap map[string]chan bool
59 rxStreamCloseChLock sync.RWMutex
khenaidoo21d51152019-02-01 13:48:37 -050060}
61
khenaidood948f772021-08-11 17:49:24 -040062// SetAdapterRestartedCallback is used to set the callback that needs to be invoked on an adapter restart
63func (aMgr *Manager) SetAdapterRestartedCallback(onAdapterRestart vgrpc.RestartedHandler) {
Kent Hagerman2b216042020-04-03 18:28:56 -040064 aMgr.onAdapterRestart = onAdapterRestart
65}
66
khenaidood948f772021-08-11 17:49:24 -040067func NewAdapterManager(
khenaidoo25057da2021-12-08 14:40:45 -050068 coreEndpoint string,
khenaidood948f772021-08-11 17:49:24 -040069 dbPath *model.Path,
70 coreInstanceID string,
71 backend *db.Backend,
72 liveProbeInterval time.Duration,
73) *Manager {
74 return &Manager{
Girish Gowdra11ddb232022-05-26 12:19:59 -070075 adapterDbProxy: dbPath.Proxy("adapters"),
76 deviceTypeDbProxy: dbPath.Proxy("device_types"),
77 deviceTypes: make(map[string]*voltha.DeviceType),
78 adapterAgents: make(map[string]*agent),
79 adapterEndpoints: make(map[Endpoint]*agent),
80 endpointMgr: NewEndpointManager(backend),
81 liveProbeInterval: liveProbeInterval,
82 coreEndpoint: coreEndpoint,
83 rollingUpdateMap: make(map[string]bool),
84 rxStreamCloseChMap: make(map[string]chan bool),
khenaidood948f772021-08-11 17:49:24 -040085 }
86}
87
88func (aMgr *Manager) Start(ctx context.Context, serviceName string) {
89 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusPreparing)
90 logger.Infow(ctx, "starting-service", log.Fields{"service": serviceName})
khenaidoo21d51152019-02-01 13:48:37 -050091
92 // Load the existing adapterAgents and device types - this will also ensure the correct paths have been
93 // created if there are no data in the dB to start
Rohan Agrawal31f21802020-06-12 05:38:46 +000094 err := aMgr.loadAdaptersAndDevicetypesInMemory(ctx)
Thomas Lee Se5a44012019-11-07 20:32:24 +053095 if err != nil {
khenaidood948f772021-08-11 17:49:24 -040096 logger.Fatalw(ctx, "failed-to-load-adapters-and-device-types-in-memory", log.Fields{"service": serviceName, "error": err})
Thomas Lee Se5a44012019-11-07 20:32:24 +053097 }
khenaidoo21d51152019-02-01 13:48:37 -050098
khenaidood948f772021-08-11 17:49:24 -040099 probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
100 logger.Infow(ctx, "service-started", log.Fields{"service": serviceName})
khenaidoo21d51152019-02-01 13:48:37 -0500101}
102
khenaidood948f772021-08-11 17:49:24 -0400103func (aMgr *Manager) Stop(ctx context.Context) {
104 // Stop all adapters
105 aMgr.lockAdapterAgentsMap.RLock()
106 defer aMgr.lockAdapterAgentsMap.RUnlock()
khenaidooa46458b2021-12-15 16:50:44 -0500107 var wg sync.WaitGroup
khenaidood948f772021-08-11 17:49:24 -0400108 for _, adapterAgent := range aMgr.adapterAgents {
khenaidooa46458b2021-12-15 16:50:44 -0500109 // Run the agent stop in its own go routine to notify to the
110 // adapters that the Core is no longer a client
111 wg.Add(1)
112 go func(agt *agent) {
113 agt.stop(ctx)
114 wg.Done()
115 }(adapterAgent)
Thomas Lee Se5a44012019-11-07 20:32:24 +0530116 }
khenaidooa46458b2021-12-15 16:50:44 -0500117 // Wait for all tests to complete
118 wg.Wait()
khenaidoo21d51152019-02-01 13:48:37 -0500119}
120
khenaidood948f772021-08-11 17:49:24 -0400121func (aMgr *Manager) GetAdapterEndpoint(ctx context.Context, deviceID string, deviceType string) (string, error) {
122 endPoint, err := aMgr.endpointMgr.GetEndpoint(ctx, deviceID, deviceType)
123 if err != nil {
124 return "", err
125 }
126 return string(endPoint), nil
127}
128
129func (aMgr *Manager) GetAdapterWithEndpoint(ctx context.Context, endPoint string) (*voltha.Adapter, error) {
130 aMgr.lockAdapterEndPointsMap.RLock()
131 agent, have := aMgr.adapterEndpoints[Endpoint(endPoint)]
132 aMgr.lockAdapterEndPointsMap.RUnlock()
Kent Hagerman16ce36a2019-12-17 13:40:53 -0500133
134 if have {
khenaidood948f772021-08-11 17:49:24 -0400135 return agent.getAdapter(ctx), nil
Kent Hagerman16ce36a2019-12-17 13:40:53 -0500136 }
khenaidood948f772021-08-11 17:49:24 -0400137
138 return nil, errors.New("Not found")
139}
140
141func (aMgr *Manager) GetAdapterNameWithEndpoint(ctx context.Context, endPoint string) (string, error) {
142 aMgr.lockAdapterEndPointsMap.RLock()
143 agent, have := aMgr.adapterEndpoints[Endpoint(endPoint)]
144 aMgr.lockAdapterEndPointsMap.RUnlock()
145
146 if have {
147 return agent.adapter.Id, nil
148 }
149
150 return "", errors.New("Not found")
151}
152
khenaidoo9beaaf12021-10-19 17:32:01 -0400153func (aMgr *Manager) GetAdapterClient(_ context.Context, endpoint string) (adapter_service.AdapterServiceClient, error) {
khenaidood948f772021-08-11 17:49:24 -0400154 if endpoint == "" {
155 return nil, errors.New("endpoint-cannot-be-empty")
156 }
157 aMgr.lockAdapterEndPointsMap.RLock()
158 defer aMgr.lockAdapterEndPointsMap.RUnlock()
159
160 if agent, have := aMgr.adapterEndpoints[Endpoint(endpoint)]; have {
161 return agent.getClient()
162 }
163
164 return nil, fmt.Errorf("Endpoint-not-found-%s", endpoint)
Kent Hagerman16ce36a2019-12-17 13:40:53 -0500165}
166
Rohan Agrawal31f21802020-06-12 05:38:46 +0000167func (aMgr *Manager) addAdapter(ctx context.Context, adapter *voltha.Adapter, saveToDb bool) error {
khenaidood948f772021-08-11 17:49:24 -0400168 aMgr.lockAdapterAgentsMap.Lock()
169 aMgr.lockAdapterEndPointsMap.Lock()
170 defer aMgr.lockAdapterEndPointsMap.Unlock()
171 defer aMgr.lockAdapterAgentsMap.Unlock()
Rohan Agrawal31f21802020-06-12 05:38:46 +0000172 logger.Debugw(ctx, "adding-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
Matteo Scandolod525ae32020-04-02 17:27:29 -0700173 "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
khenaidoo21d51152019-02-01 13:48:37 -0500174 if _, exist := aMgr.adapterAgents[adapter.Id]; !exist {
khenaidoo21d51152019-02-01 13:48:37 -0500175 if saveToDb {
176 // Save the adapter to the KV store - first check if it already exist
khenaidood948f772021-08-11 17:49:24 -0400177 if have, err := aMgr.adapterDbProxy.Get(log.WithSpanFromContext(context.Background(), ctx), adapter.Id, &voltha.Adapter{}); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000178 logger.Errorw(ctx, "failed-to-get-adapters-from-cluster-proxy", log.Fields{"error": err})
Thomas Lee Se5a44012019-11-07 20:32:24 +0530179 return err
Kent Hagerman4f355f52020-03-30 16:01:33 -0400180 } else if !have {
khenaidood948f772021-08-11 17:49:24 -0400181 if err := aMgr.adapterDbProxy.Set(log.WithSpanFromContext(context.Background(), ctx), adapter.Id, adapter); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000182 logger.Errorw(ctx, "failed-to-save-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
Matteo Scandolod525ae32020-04-02 17:27:29 -0700183 "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas})
Thomas Lee Se5a44012019-11-07 20:32:24 +0530184 return err
185 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000186 logger.Debugw(ctx, "adapter-saved-to-KV-Store", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
Matteo Scandolod525ae32020-04-02 17:27:29 -0700187 "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas})
188 } else {
Girish Kumar3e8ee212020-08-19 17:50:11 +0000189 logger.Warnw(ctx, "adding-adapter-already-in-KV-store", log.Fields{
Matteo Scandolod525ae32020-04-02 17:27:29 -0700190 "adapterName": adapter.Id,
191 "adapterReplica": adapter.CurrentReplica,
192 })
khenaidoo21d51152019-02-01 13:48:37 -0500193 }
194 }
Kent Hagermand9cc2e92019-11-04 13:28:15 -0500195 clonedAdapter := (proto.Clone(adapter)).(*voltha.Adapter)
khenaidood948f772021-08-11 17:49:24 -0400196 // Use a muted adapter restart handler which is invoked by the corresponding gRPC client on an adapter restart.
197 // This handler just log the restart event. The actual action taken following an adapter restart
198 // will be done when an adapter re-registers itself.
khenaidoo25057da2021-12-08 14:40:45 -0500199 aMgr.adapterAgents[adapter.Id] = newAdapterAgent(aMgr.coreEndpoint, clonedAdapter, aMgr.mutedAdapterRestartedHandler, aMgr.liveProbeInterval)
khenaidood948f772021-08-11 17:49:24 -0400200 aMgr.adapterEndpoints[Endpoint(adapter.Endpoint)] = aMgr.adapterAgents[adapter.Id]
khenaidoo21d51152019-02-01 13:48:37 -0500201 }
Thomas Lee Se5a44012019-11-07 20:32:24 +0530202 return nil
khenaidoo21d51152019-02-01 13:48:37 -0500203}
204
Girish Gowdra11ddb232022-05-26 12:19:59 -0700205func (aMgr *Manager) updateAdapter(ctx context.Context, adapter *voltha.Adapter, saveToDb bool) error {
206 aMgr.lockAdapterAgentsMap.Lock()
207 defer aMgr.lockAdapterAgentsMap.Unlock()
208 logger.Debugw(ctx, "updating-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
209 "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint,
210 "version": adapter.Version})
211 if _, exist := aMgr.adapterAgents[adapter.Id]; !exist {
212 logger.Errorw(ctx, "adapter-does-not-exist", log.Fields{"adapterName": adapter.Id})
213 return fmt.Errorf("does-not-exist")
214 }
215 if saveToDb {
216 // Update the adapter to the KV store
217 if err := aMgr.adapterDbProxy.Set(log.WithSpanFromContext(context.Background(), ctx), adapter.Id, adapter); err != nil {
218 logger.Errorw(ctx, "failed-to-update-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
219 "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas,
220 "endpoint": adapter.Endpoint, "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas,
221 "version": adapter.Version})
222 return err
223 }
224 logger.Debugw(ctx, "adapter-updated-to-KV-Store", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
225 "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint,
226 "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas, "version": adapter.Version})
227 }
228 clonedAdapter := (proto.Clone(adapter)).(*voltha.Adapter)
229 // Use a muted adapter restart handler which is invoked by the corresponding gRPC client on an adapter restart.
230 // This handler just log the restart event. The actual action taken following an adapter restart
231 // will be done when an adapter re-registers itself.
232 aMgr.adapterAgents[adapter.Id] = newAdapterAgent(aMgr.coreEndpoint, clonedAdapter, aMgr.mutedAdapterRestartedHandler, aMgr.liveProbeInterval)
233 aMgr.adapterEndpoints[Endpoint(adapter.Endpoint)] = aMgr.adapterAgents[adapter.Id]
234 return nil
235}
236
Rohan Agrawal31f21802020-06-12 05:38:46 +0000237func (aMgr *Manager) addDeviceTypes(ctx context.Context, deviceTypes *voltha.DeviceTypes, saveToDb bool) error {
khenaidoo21d51152019-02-01 13:48:37 -0500238 if deviceTypes == nil {
Thomas Lee Se5a44012019-11-07 20:32:24 +0530239 return fmt.Errorf("no-device-type")
khenaidoo21d51152019-02-01 13:48:37 -0500240 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000241 logger.Debugw(ctx, "adding-device-types", log.Fields{"deviceTypes": deviceTypes})
khenaidood948f772021-08-11 17:49:24 -0400242 aMgr.lockAdapterAgentsMap.Lock()
243 defer aMgr.lockAdapterAgentsMap.Unlock()
244 aMgr.lockDeviceTypesMap.Lock()
245 defer aMgr.lockDeviceTypesMap.Unlock()
Kent Hagermand9cc2e92019-11-04 13:28:15 -0500246
Matteo Scandolod525ae32020-04-02 17:27:29 -0700247 // create an in memory map to fetch the entire voltha.DeviceType from a device.Type string
248 for _, deviceType := range deviceTypes.Items {
249 aMgr.deviceTypes[deviceType.Id] = deviceType
250 }
251
khenaidoo21d51152019-02-01 13:48:37 -0500252 if saveToDb {
Kent Hagermand9cc2e92019-11-04 13:28:15 -0500253 // Save the device types to the KV store
khenaidoo21d51152019-02-01 13:48:37 -0500254 for _, deviceType := range deviceTypes.Items {
khenaidood948f772021-08-11 17:49:24 -0400255 if have, err := aMgr.deviceTypeDbProxy.Get(log.WithSpanFromContext(context.Background(), ctx), deviceType.Id, &voltha.DeviceType{}); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000256 logger.Errorw(ctx, "Failed-to--device-types-from-cluster-data-proxy", log.Fields{"error": err})
Thomas Lee Se5a44012019-11-07 20:32:24 +0530257 return err
Kent Hagerman4f355f52020-03-30 16:01:33 -0400258 } else if !have {
khenaidoo21d51152019-02-01 13:48:37 -0500259 // Does not exist - save it
260 clonedDType := (proto.Clone(deviceType)).(*voltha.DeviceType)
khenaidood948f772021-08-11 17:49:24 -0400261 if err := aMgr.deviceTypeDbProxy.Set(log.WithSpanFromContext(context.Background(), ctx), deviceType.Id, clonedDType); err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000262 logger.Errorw(ctx, "Failed-to-add-device-types-to-cluster-data-proxy", log.Fields{"error": err})
Thomas Lee Se5a44012019-11-07 20:32:24 +0530263 return err
264 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000265 logger.Debugw(ctx, "device-type-saved-to-KV-Store", log.Fields{"deviceType": deviceType})
khenaidoo21d51152019-02-01 13:48:37 -0500266 }
267 }
268 }
Thomas Lee Se5a44012019-11-07 20:32:24 +0530269 return nil
khenaidoo21d51152019-02-01 13:48:37 -0500270}
271
khenaidood948f772021-08-11 17:49:24 -0400272//loadAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
273func (aMgr *Manager) loadAdaptersAndDevicetypesInMemory(ctx context.Context) error {
274 // Load the adapters
275 var adapters []*voltha.Adapter
276 if err := aMgr.adapterDbProxy.List(log.WithSpanFromContext(context.Background(), ctx), &adapters); err != nil {
277 logger.Errorw(ctx, "Failed-to-list-adapters-from-cluster-data-proxy", log.Fields{"error": err})
278 return err
279 }
280
281 logger.Debugw(ctx, "retrieved-adapters", log.Fields{"count": len(adapters)})
282
283 if len(adapters) != 0 {
284 for _, adapter := range adapters {
285 if err := aMgr.addAdapter(ctx, adapter, false); err != nil {
286 logger.Errorw(ctx, "failed-to-add-adapter", log.Fields{"adapterId": adapter.Id})
287 } else {
288 logger.Debugw(ctx, "adapter-added-successfully", log.Fields{"adapterId": adapter.Id})
289 }
khenaidoo21d51152019-02-01 13:48:37 -0500290 }
291 }
khenaidoo21d51152019-02-01 13:48:37 -0500292
khenaidood948f772021-08-11 17:49:24 -0400293 // Load the device types
294 var deviceTypes []*voltha.DeviceType
295 if err := aMgr.deviceTypeDbProxy.List(log.WithSpanFromContext(context.Background(), ctx), &deviceTypes); err != nil {
296 logger.Errorw(ctx, "Failed-to-list-device-types-from-cluster-data-proxy", log.Fields{"error": err})
297 return err
khenaidoo21d51152019-02-01 13:48:37 -0500298 }
khenaidood948f772021-08-11 17:49:24 -0400299
300 logger.Debugw(ctx, "retrieved-devicetypes", log.Fields{"count": len(deviceTypes)})
301
302 if len(deviceTypes) != 0 {
303 dTypes := &voltha.DeviceTypes{Items: []*voltha.DeviceType{}}
304 for _, dType := range deviceTypes {
305 logger.Debugw(ctx, "found-existing-device-types", log.Fields{"deviceTypes": deviceTypes})
306 dTypes.Items = append(dTypes.Items, dType)
307 }
308 if err := aMgr.addDeviceTypes(ctx, dTypes, false); err != nil {
309 logger.Errorw(ctx, "failed-to-add-device-type", log.Fields{"deviceTypes": deviceTypes})
310 } else {
311 logger.Debugw(ctx, "device-type-added-successfully", log.Fields{"deviceTypes": deviceTypes})
312 }
313 }
314
315 // Start the adapter agents - this will trigger the connection to the adapter
316 aMgr.lockAdapterAgentsMap.RLock()
317 defer aMgr.lockAdapterAgentsMap.RUnlock()
318 for _, adapterAgent := range aMgr.adapterAgents {
319 subCtx := log.WithSpanFromContext(context.Background(), ctx)
320 if err := adapterAgent.start(subCtx); err != nil {
321 logger.Errorw(ctx, "failed-to-start-adapter", log.Fields{"adapter-endpoint": adapterAgent.adapterAPIEndPoint})
322 }
323 }
324
325 logger.Debug(ctx, "no-existing-device-type-found")
326
khenaidoo21d51152019-02-01 13:48:37 -0500327 return nil
328}
329
khenaidoo9beaaf12021-10-19 17:32:01 -0400330func (aMgr *Manager) RegisterAdapter(ctx context.Context, registration *core_adapter.AdapterRegistration) (*empty.Empty, error) {
khenaidood948f772021-08-11 17:49:24 -0400331 adapter := registration.Adapter
332 deviceTypes := registration.DTypes
333 logger.Infow(ctx, "RegisterAdapter", log.Fields{"adapter": adapter, "deviceTypes": deviceTypes.Items})
khenaidoo21d51152019-02-01 13:48:37 -0500334
Matteo Scandolo57ee9aa2020-04-17 09:20:42 -0700335 if adapter.Type == "" {
Girish Kumar3e8ee212020-08-19 17:50:11 +0000336 logger.Errorw(ctx, "adapter-not-specifying-type", log.Fields{
Matteo Scandolo57ee9aa2020-04-17 09:20:42 -0700337 "adapterId": adapter.Id,
338 "vendor": adapter.Vendor,
339 "type": adapter.Type,
340 })
341 return nil, status.Error(codes.InvalidArgument, "adapter-not-specifying-type")
342 }
343
khenaidood948f772021-08-11 17:49:24 -0400344 if adpt, _ := aMgr.getAdapter(ctx, adapter.Id); adpt != nil {
khenaidood948f772021-08-11 17:49:24 -0400345 agt, err := aMgr.getAgent(ctx, adpt.Id)
346 if err != nil {
347 logger.Errorw(ctx, "no-adapter-agent", log.Fields{"error": err})
348 return nil, err
349 }
Girish Gowdra11ddb232022-05-26 12:19:59 -0700350 if adapter.Version != adpt.Version {
351 // Rolling update scenario - could be downgrade or upgrade
352 logger.Infow(ctx, "rolling-update",
353 log.Fields{"adapter": adpt.Id, "endpoint": adpt.Endpoint, "old-version": adpt.Version, "new-version": adapter.Version})
354 // Stop the gRPC connection to the old adapter
355 agt.stop(ctx)
356 if err = aMgr.updateAdapter(ctx, adapter, true); err != nil {
357 return nil, err
358 }
359 aMgr.SetRollingUpdate(ctx, adapter.Endpoint, true)
360 } else {
361 // Adapter registered and version is the same. The adapter may have restarted.
362 // Trigger the reconcile process for that adapter
363 logger.Warnw(ctx, "adapter-restarted", log.Fields{"adapter": adpt.Id, "endpoint": adpt.Endpoint})
364 agt.resetConnection(ctx)
365 }
khenaidood948f772021-08-11 17:49:24 -0400366
npujar1d86a522019-11-14 17:11:16 +0530367 go func() {
khenaidood948f772021-08-11 17:49:24 -0400368 err := aMgr.onAdapterRestart(log.WithSpanFromContext(context.Background(), ctx), adpt.Endpoint)
npujar1d86a522019-11-14 17:11:16 +0530369 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000370 logger.Errorw(ctx, "unable-to-restart-adapter", log.Fields{"error": err})
npujar1d86a522019-11-14 17:11:16 +0530371 }
372 }()
khenaidood948f772021-08-11 17:49:24 -0400373 return &empty.Empty{}, nil
khenaidoo21d51152019-02-01 13:48:37 -0500374 }
375 // Save the adapter and the device types
Rohan Agrawal31f21802020-06-12 05:38:46 +0000376 if err := aMgr.addAdapter(ctx, adapter, true); err != nil {
377 logger.Errorw(ctx, "failed-to-add-adapter", log.Fields{"error": err})
Thomas Lee Se5a44012019-11-07 20:32:24 +0530378 return nil, err
379 }
Rohan Agrawal31f21802020-06-12 05:38:46 +0000380 if err := aMgr.addDeviceTypes(ctx, deviceTypes, true); err != nil {
381 logger.Errorw(ctx, "failed-to-add-device-types", log.Fields{"error": err})
Thomas Lee Se5a44012019-11-07 20:32:24 +0530382 return nil, err
383 }
khenaidoo21d51152019-02-01 13:48:37 -0500384
Rohan Agrawal31f21802020-06-12 05:38:46 +0000385 logger.Debugw(ctx, "adapter-registered", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
Matteo Scandolod525ae32020-04-02 17:27:29 -0700386 "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
khenaidoo21d51152019-02-01 13:48:37 -0500387
khenaidood948f772021-08-11 17:49:24 -0400388 // Setup the endpoints for this adapter
389 if err := aMgr.endpointMgr.RegisterAdapter(ctx, adapter, deviceTypes); err != nil {
390 logger.Errorw(ctx, "failed-to-register-adapter", log.Fields{"error": err})
391 }
392
393 // Start adapter instance - this will trigger the connection to the adapter
394 if agent, err := aMgr.getAgent(ctx, adapter.Id); agent != nil {
395 subCtx := log.WithSpanFromContext(context.Background(), ctx)
396 if err := agent.start(subCtx); err != nil {
397 logger.Errorw(ctx, "failed-to-start-adapter", log.Fields{"error": err})
398 return nil, err
399 }
400 } else {
401 logger.Fatalw(ctx, "adapter-absent", log.Fields{"error": err, "adapter": adapter.Id})
402 }
403
404 return &empty.Empty{}, nil
khenaidoo21d51152019-02-01 13:48:37 -0500405}
406
Girish Gowdra11ddb232022-05-26 12:19:59 -0700407func (aMgr *Manager) StartAdapterWithEndPoint(ctx context.Context, endpoint string) error {
408 aMgr.lockAdapterAgentsMap.RLock()
409 defer aMgr.lockAdapterAgentsMap.RUnlock()
410 subCtx := log.WithSpanFromContext(context.Background(), ctx)
411 for _, adapterAgent := range aMgr.adapterAgents {
412 if adapterAgent.adapter.Endpoint == endpoint {
413 if err := adapterAgent.start(subCtx); err != nil {
414 logger.Errorw(subCtx, "failed-to-start-adapter", log.Fields{"error": err})
415 return err
416 }
417 return nil
418 }
419 }
420 logger.Errorw(ctx, "adapter-agent-not-found-for-endpoint", log.Fields{"endpoint": endpoint})
421 return fmt.Errorf("adapter-agent-not-found-for-endpoint-%s", endpoint)
422}
423
khenaidood948f772021-08-11 17:49:24 -0400424func (aMgr *Manager) GetAdapterTypeByVendorID(vendorID string) (string, error) {
425 aMgr.lockDeviceTypesMap.RLock()
426 defer aMgr.lockDeviceTypesMap.RUnlock()
427 for _, dType := range aMgr.deviceTypes {
428 for _, v := range dType.VendorIds {
429 if v == vendorID {
430 return dType.AdapterType, nil
431 }
432 }
433 }
434 return "", fmt.Errorf("vendor id %s not found", vendorID)
435}
436
437// GetAdapterType returns the name of the device adapter that services this device type
Kent Hagerman2b216042020-04-03 18:28:56 -0400438func (aMgr *Manager) GetAdapterType(deviceType string) (string, error) {
khenaidood948f772021-08-11 17:49:24 -0400439 aMgr.lockDeviceTypesMap.Lock()
440 defer aMgr.lockDeviceTypesMap.Unlock()
441 for _, dt := range aMgr.deviceTypes {
442 if deviceType == dt.Id {
443 return dt.AdapterType, nil
Matteo Scandolod525ae32020-04-02 17:27:29 -0700444 }
khenaidoo21d51152019-02-01 13:48:37 -0500445 }
Kent Hagerman45a13e42020-04-13 12:23:50 -0400446 return "", fmt.Errorf("adapter-not-registered-for-device-type %s", deviceType)
khenaidoo21d51152019-02-01 13:48:37 -0500447}
448
Kent Hagerman45a13e42020-04-13 12:23:50 -0400449// ListDeviceTypes returns all the device types known to the system
Rohan Agrawal31f21802020-06-12 05:38:46 +0000450func (aMgr *Manager) ListDeviceTypes(ctx context.Context, _ *empty.Empty) (*voltha.DeviceTypes, error) {
451 logger.Debug(ctx, "ListDeviceTypes")
khenaidood948f772021-08-11 17:49:24 -0400452 aMgr.lockDeviceTypesMap.Lock()
453 defer aMgr.lockDeviceTypesMap.Unlock()
Kent Hagermanc2c73ff2019-11-20 16:22:32 -0500454
Matteo Scandolod525ae32020-04-02 17:27:29 -0700455 deviceTypes := make([]*voltha.DeviceType, 0, len(aMgr.deviceTypes))
Matteo Scandolod525ae32020-04-02 17:27:29 -0700456 for _, deviceType := range aMgr.deviceTypes {
457 deviceTypes = append(deviceTypes, deviceType)
Kent Hagermanc2c73ff2019-11-20 16:22:32 -0500458 }
Kent Hagerman45a13e42020-04-13 12:23:50 -0400459 return &voltha.DeviceTypes{Items: deviceTypes}, nil
Kent Hagermanc2c73ff2019-11-20 16:22:32 -0500460}
461
Kent Hagerman45a13e42020-04-13 12:23:50 -0400462// GetDeviceType returns the device type proto definition given the name of the device type
khenaidood948f772021-08-11 17:49:24 -0400463func (aMgr *Manager) GetDeviceType(ctx context.Context, deviceType *common.ID) (*voltha.DeviceType, error) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000464 logger.Debugw(ctx, "GetDeviceType", log.Fields{"typeid": deviceType.Id})
khenaidood948f772021-08-11 17:49:24 -0400465 aMgr.lockDeviceTypesMap.Lock()
466 defer aMgr.lockDeviceTypesMap.Unlock()
Kent Hagermanc2c73ff2019-11-20 16:22:32 -0500467
Kent Hagerman45a13e42020-04-13 12:23:50 -0400468 dType, exist := aMgr.deviceTypes[deviceType.Id]
469 if !exist {
470 return nil, status.Errorf(codes.NotFound, "device_type-%s", deviceType.Id)
khenaidoo21d51152019-02-01 13:48:37 -0500471 }
Kent Hagerman45a13e42020-04-13 12:23:50 -0400472 return dType, nil
khenaidoo21d51152019-02-01 13:48:37 -0500473}
khenaidood948f772021-08-11 17:49:24 -0400474
475// ListAdapters returns the contents of all adapters known to the system
476func (aMgr *Manager) ListAdapters(ctx context.Context, _ *empty.Empty) (*voltha.Adapters, error) {
477 logger.Debug(ctx, "Listing adapters")
478 result := &voltha.Adapters{Items: []*voltha.Adapter{}}
479 aMgr.lockAdapterAgentsMap.RLock()
480 defer aMgr.lockAdapterAgentsMap.RUnlock()
481 for _, adapterAgent := range aMgr.adapterAgents {
482 if a := adapterAgent.getAdapter(ctx); a != nil {
483 result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
484 }
485 }
486 logger.Debugw(ctx, "Listing adapters", log.Fields{"result": result})
487 return result, nil
488}
489
Girish Gowdra11ddb232022-05-26 12:19:59 -0700490func (aMgr *Manager) GetRollingUpdate(ctx context.Context, endpoint string) (bool, bool) {
491 aMgr.rollingUpdateLock.RLock()
492 defer aMgr.rollingUpdateLock.RUnlock()
493 val, ok := aMgr.rollingUpdateMap[endpoint]
494 return val, ok
495}
496
497func (aMgr *Manager) SetRollingUpdate(ctx context.Context, endpoint string, status bool) {
498 aMgr.rollingUpdateLock.Lock()
499 defer aMgr.rollingUpdateLock.Unlock()
500 if res, ok := aMgr.rollingUpdateMap[endpoint]; ok {
501 logger.Warnw(ctx, "possible duplicate rolling update - overwriting", log.Fields{"old-status": res, "endpoint": endpoint})
502 }
503 aMgr.rollingUpdateMap[endpoint] = status
504}
505
506func (aMgr *Manager) DeleteRollingUpdate(ctx context.Context, endpoint string) {
507 aMgr.rollingUpdateLock.Lock()
508 defer aMgr.rollingUpdateLock.Unlock()
509 delete(aMgr.rollingUpdateMap, endpoint)
510}
511
512func (aMgr *Manager) RegisterOnRxStreamCloseChMap(ctx context.Context, endpoint string) {
513 aMgr.rxStreamCloseChLock.Lock()
514 defer aMgr.rxStreamCloseChLock.Unlock()
515 if _, ok := aMgr.rxStreamCloseChMap[endpoint]; ok {
516 logger.Warnw(ctx, "duplicate entry on rxStreamCloseChMap - overwriting", log.Fields{"endpoint": endpoint})
517 // First close the old channel
518 close(aMgr.rxStreamCloseChMap[endpoint])
519 }
520 aMgr.rxStreamCloseChMap[endpoint] = make(chan bool, 1)
521}
522
523func (aMgr *Manager) SignalOnRxStreamCloseCh(ctx context.Context, endpoint string) {
524 var closeCh chan bool
525 ok := false
526 aMgr.rxStreamCloseChLock.RLock()
527 if closeCh, ok = aMgr.rxStreamCloseChMap[endpoint]; !ok {
528 logger.Infow(ctx, "no entry on rxStreamCloseChMap", log.Fields{"endpoint": endpoint})
529 aMgr.rxStreamCloseChLock.RUnlock()
530 return
531 }
532 aMgr.rxStreamCloseChLock.RUnlock()
533
534 // close the rx channel
535 closeCh <- true
536
537 aMgr.rxStreamCloseChLock.Lock()
538 defer aMgr.rxStreamCloseChLock.Unlock()
539 delete(aMgr.rxStreamCloseChMap, endpoint)
540}
541
542func (aMgr *Manager) WaitOnRxStreamCloseCh(ctx context.Context, endpoint string) {
543 var closeCh chan bool
544 ok := false
545 aMgr.rxStreamCloseChLock.RLock()
546 if closeCh, ok = aMgr.rxStreamCloseChMap[endpoint]; !ok {
547 logger.Warnw(ctx, "no entry on rxStreamCloseChMap", log.Fields{"endpoint": endpoint})
548 aMgr.rxStreamCloseChLock.RUnlock()
549 return
550 }
551 aMgr.rxStreamCloseChLock.RUnlock()
552
553 select {
554 case <-closeCh:
555 logger.Infow(ctx, "rx stream closed for endpoint", log.Fields{"endpoint": endpoint})
556 case <-time.After(60 * time.Second):
557 logger.Warnw(ctx, "timeout waiting for rx stream close", log.Fields{"endpoint": endpoint})
558 }
559}
560
khenaidood948f772021-08-11 17:49:24 -0400561func (aMgr *Manager) getAgent(ctx context.Context, adapterID string) (*agent, error) {
562 aMgr.lockAdapterAgentsMap.RLock()
563 defer aMgr.lockAdapterAgentsMap.RUnlock()
564 if adapterAgent, ok := aMgr.adapterAgents[adapterID]; ok {
565 return adapterAgent, nil
566 }
567 return nil, errors.New("Not found")
568}
569
570func (aMgr *Manager) getAdapter(ctx context.Context, adapterID string) (*voltha.Adapter, error) {
571 aMgr.lockAdapterAgentsMap.RLock()
572 defer aMgr.lockAdapterAgentsMap.RUnlock()
573 if adapterAgent, ok := aMgr.adapterAgents[adapterID]; ok {
574 return adapterAgent.getAdapter(ctx), nil
575 }
576 return nil, errors.New("Not found")
577}
578
579// mutedAdapterRestartedHandler will be invoked by the grpc client on an adapter restart.
580// Since the Adapter will re-register itself and that will trigger the reconcile process,
581// therefore this handler does nothing, other than logging the event.
582func (aMgr *Manager) mutedAdapterRestartedHandler(ctx context.Context, endpoint string) error {
583 logger.Infow(ctx, "muted-adapter-restarted", log.Fields{"endpoint": endpoint})
584 return nil
585}
586
587func (aMgr *Manager) WaitUntilConnectionsToAdaptersAreUp(ctx context.Context, connectionRetryInterval time.Duration) error {
588 logger.Infow(ctx, "waiting-for-adapters-to-be-up", log.Fields{"retry-interval": connectionRetryInterval})
589 for {
590 aMgr.lockAdapterAgentsMap.Lock()
591 numAdapters := len(aMgr.adapterAgents)
592 if numAdapters == 0 {
593 // No adapter registered yet
594 aMgr.lockAdapterAgentsMap.Unlock()
595 logger.Info(ctx, "no-adapter-registered")
596 return nil
597 }
598 // A case of Core restart
599 agentsUp := true
600 adapterloop:
601 for _, agt := range aMgr.adapterAgents {
602 agentsUp = agentsUp && agt.IsConnectionUp()
603 if !agentsUp {
604 break adapterloop
605 }
606 }
607 aMgr.lockAdapterAgentsMap.Unlock()
608 if agentsUp {
609 logger.Infow(ctx, "adapter-connections-ready", log.Fields{"adapter-count": numAdapters})
610 return nil
611 }
612 logger.Warnw(ctx, "adapter-connections-not-ready", log.Fields{"adapter-count": numAdapters})
613 select {
614 case <-time.After(connectionRetryInterval):
615 logger.Infow(ctx, "retrying-adapter-connections", log.Fields{"adapter-count": numAdapters})
616 continue
617 case <-ctx.Done():
618 logger.Errorw(ctx, "context-timeout", log.Fields{"adapter-count": numAdapters, "err": ctx.Err()})
619 return ctx.Err()
620 }
621 }
622}