blob: dac137ca81de3390890869482a50205eb9619654 [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001/*
2 * Copyright 2021-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package adapter
18
19import (
20 "context"
21 "fmt"
22 "sync"
23
24 "github.com/buraksezer/consistent"
25 "github.com/cespare/xxhash"
26 "github.com/golang/protobuf/proto"
27 "github.com/opencord/voltha-lib-go/v7/pkg/db"
28 "github.com/opencord/voltha-lib-go/v7/pkg/log"
29 "github.com/opencord/voltha-protos/v5/go/voltha"
30 "google.golang.org/grpc/codes"
31 "google.golang.org/grpc/status"
32)
33
34const (
35 // All the values below can be tuned to get optimal data distribution. The numbers below seems to work well when
36 // supporting 1000-10000 devices and 1 - 20 replicas of an adapter
37
38 // Keys are distributed among partitions. Prime numbers are good to distribute keys uniformly.
39 DefaultPartitionCount = 1117
40
41 // Represents how many times a node is replicated on the consistent ring.
42 DefaultReplicationFactor = 117
43
44 // Load is used to calculate average load.
45 DefaultLoad = 1.1
46)
47
48type Endpoint string // The gRPC endpoint of an adapter instance
49type ReplicaID int32 // The replication ID of an adapter instance
50
51type EndpointManager interface {
52
53 // Registers an adapter
54 RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error
55
56 // GetEndpoint is called to get the endpoint to communicate with for a specific device and device type.
57 GetEndpoint(ctx context.Context, deviceID string, deviceType string) (Endpoint, error)
58
59 // IsDeviceOwnedByAdapter is invoked when a specific adapter (adapter type + replicaNumber) is restarted and
60 // devices owned by that adapter need to be reconciled
61 IsDeviceOwnedByAdapter(ctx context.Context, deviceID string, adapterType string, replicaNumber int32) (bool, error)
62
63 // GetReplicaAssignment returns the replica number of the adapter that owns the deviceID. This is used by the
64 // test only
65 GetReplicaAssignment(ctx context.Context, deviceID string, adapterType string) (ReplicaID, error)
66}
67
68type adapterService struct {
69 adapterType string // Type of the adapter. The same type applies for all replicas of that adapter
70 totalReplicas int32
71 replicas map[ReplicaID]Endpoint
72 consistentRing *consistent.Consistent
73}
74
75type endpointManager struct {
76 partitionCount int
77 replicationFactor int
78 load float64
79 backend *db.Backend
80 adapterServices map[string]*adapterService
81 adapterServicesLock sync.RWMutex
82 deviceTypeToAdapterServiceMap map[string]string
83 deviceTypeToAdapterServiceMapLock sync.RWMutex
84}
85
86type EndpointManagerOption func(*endpointManager)
87
88func PartitionCount(count int) EndpointManagerOption {
89 return func(args *endpointManager) {
90 args.partitionCount = count
91 }
92}
93
94func ReplicationFactor(replicas int) EndpointManagerOption {
95 return func(args *endpointManager) {
96 args.replicationFactor = replicas
97 }
98}
99
100func Load(load float64) EndpointManagerOption {
101 return func(args *endpointManager) {
102 args.load = load
103 }
104}
105
106func newEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
107 tm := &endpointManager{
108 partitionCount: DefaultPartitionCount,
109 replicationFactor: DefaultReplicationFactor,
110 load: DefaultLoad,
111 backend: backend,
112 adapterServices: make(map[string]*adapterService),
113 deviceTypeToAdapterServiceMap: make(map[string]string),
114 }
115
116 for _, option := range opts {
117 option(tm)
118 }
119 return tm
120}
121
122func NewEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
123 return newEndpointManager(backend, opts...)
124}
125
126func (ep *endpointManager) GetEndpoint(ctx context.Context, deviceID string, deviceType string) (Endpoint, error) {
127 logger.Debugw(ctx, "getting-endpoint", log.Fields{"device-id": deviceID, "device-type": deviceType})
128 owner, err := ep.getOwnerByDeviceType(ctx, deviceID, deviceType)
129 if err != nil {
130 return "", err
131 }
132 m, ok := owner.(Member)
133 if !ok {
134 return "", status.Errorf(codes.Aborted, "invalid-member-%v", owner)
135 }
136 endpoint := m.getEndPoint()
137 if endpoint == "" {
138 return "", status.Errorf(codes.Unavailable, "endpoint-not-set-%s", deviceType)
139 }
140 logger.Debugw(ctx, "returning-endpoint", log.Fields{"device-id": deviceID, "device-type": deviceType, "endpoint": endpoint})
141 return endpoint, nil
142}
143
144func (ep *endpointManager) IsDeviceOwnedByAdapter(ctx context.Context, deviceID string, adapterType string, replicaNumber int32) (bool, error) {
145 logger.Debugw(ctx, "device-ownership", log.Fields{"device-id": deviceID, "adapter-type": adapterType, "replica-number": replicaNumber})
146
147 serv, err := ep.getOwnerByAdapterType(ctx, deviceID, adapterType)
148 if err != nil {
149 return false, err
150 }
151 m, ok := serv.(Member)
152 if !ok {
153 return false, status.Errorf(codes.Aborted, "invalid-member-%v", serv)
154 }
155 return m.getReplica() == ReplicaID(replicaNumber), nil
156}
157
158func (ep *endpointManager) GetReplicaAssignment(ctx context.Context, deviceID string, adapterType string) (ReplicaID, error) {
159 owner, err := ep.getOwnerByAdapterType(ctx, deviceID, adapterType)
160 if err != nil {
161 return 0, nil
162 }
163 m, ok := owner.(Member)
164 if !ok {
165 return 0, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
166 }
167 return m.getReplica(), nil
168}
169
170func (ep *endpointManager) getOwnerByDeviceType(ctx context.Context, deviceID string, deviceType string) (consistent.Member, error) {
171 serv, err := ep.getAdapterService(ctx, deviceType)
172 if err != nil {
173 return nil, err
174 }
175 key := ep.makeKey(deviceID, deviceType, serv.adapterType)
176 return serv.consistentRing.LocateKey(key), nil
177}
178
179func (ep *endpointManager) getOwnerByAdapterType(ctx context.Context, deviceID string, adapterType string) (consistent.Member, error) {
180 // Check whether the adapter exist
181 ep.adapterServicesLock.RLock()
182 serv, adapterExist := ep.adapterServices[adapterType]
183 ep.adapterServicesLock.RUnlock()
184
185 if !adapterExist {
186 // Sync from the dB
187 if err := ep.loadAdapterServices(ctx); err != nil {
188 return nil, err
189 }
190 // Check again
191 ep.adapterServicesLock.RLock()
192 serv, adapterExist = ep.adapterServices[adapterType]
193 ep.adapterServicesLock.RUnlock()
194 if !adapterExist {
195 return nil, fmt.Errorf("adapter-type-not-exist-%s", adapterType)
196 }
197 }
198
199 // Get the device type
200 deviceType := ""
201 ep.deviceTypeToAdapterServiceMapLock.RLock()
202 for dType, aType := range ep.deviceTypeToAdapterServiceMap {
203 if aType == adapterType {
204 deviceType = dType
205 break
206 }
207 }
208 ep.deviceTypeToAdapterServiceMapLock.RUnlock()
209
210 if deviceType == "" {
211 return nil, fmt.Errorf("device-type-not-exist-for-adapter-type-%s", adapterType)
212 }
213
214 owner := serv.consistentRing.LocateKey(ep.makeKey(deviceID, deviceType, serv.adapterType))
215 m, ok := owner.(Member)
216 if !ok {
217 return nil, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
218 }
219 return m, nil
220}
221
222func (ep *endpointManager) getAdapterService(ctx context.Context, deviceType string) (*adapterService, error) {
223 // First get the adapter type for that device type
224 adapterType := ""
225 ep.deviceTypeToAdapterServiceMapLock.RLock()
226 for dType, aType := range ep.deviceTypeToAdapterServiceMap {
227 if dType == deviceType {
228 adapterType = aType
229 break
230 }
231 }
232 ep.deviceTypeToAdapterServiceMapLock.RUnlock()
233
234 // Check whether the adapter exist
235 adapterExist := false
236 var aServ *adapterService
237 if adapterType != "" {
238 ep.adapterServicesLock.RLock()
239 aServ, adapterExist = ep.adapterServices[adapterType]
240 ep.adapterServicesLock.RUnlock()
241 }
242
243 // Load the service and device types if not found, i.e. sync up with the dB
244 if !adapterExist || aServ == nil || int(aServ.totalReplicas) != len(aServ.consistentRing.GetMembers()) {
245 if err := ep.loadAdapterServices(ctx); err != nil {
246 return nil, err
247 }
248
249 // Get the adapter type if it was empty before
250 if adapterType == "" {
251 ep.deviceTypeToAdapterServiceMapLock.RLock()
252 for dType, aType := range ep.deviceTypeToAdapterServiceMap {
253 if dType == deviceType {
254 adapterType = aType
255 break
256 }
257 }
258 ep.deviceTypeToAdapterServiceMapLock.RUnlock()
259 }
260 // Error put if the adapter type is not set
261 if adapterType == "" {
262 return nil, fmt.Errorf("adapter-service-not-found-for-device-type-%s", deviceType)
263 }
264
265 // Get the service
266 ep.adapterServicesLock.RLock()
267 aServ, adapterExist = ep.adapterServices[adapterType]
268 ep.adapterServicesLock.RUnlock()
269 }
270
271 // Sanity check
272 if !adapterExist || aServ == nil || int(aServ.totalReplicas) != len(aServ.consistentRing.GetMembers()) {
273 return nil, fmt.Errorf("adapter-service-not-found-for-device-type-%s", deviceType)
274 }
275
276 return aServ, nil
277}
278
279func (ep *endpointManager) getConsistentConfig() consistent.Config {
280 return consistent.Config{
281 PartitionCount: ep.partitionCount,
282 ReplicationFactor: ep.replicationFactor,
283 Load: ep.load,
284 Hasher: hasher{},
285 }
286}
287
288// loadAdapterServices loads the services (adapters) and device types in memory. Because of the small size of the data and
289// the data format in the dB being binary protobuf then it is better to load all the data if inconsistency is detected,
290// instead of watching for updates in the dB and acting on it.
291func (ep *endpointManager) loadAdapterServices(ctx context.Context) error {
292 ep.adapterServicesLock.Lock()
293 defer ep.adapterServicesLock.Unlock()
294 ep.deviceTypeToAdapterServiceMapLock.Lock()
295 defer ep.deviceTypeToAdapterServiceMapLock.Unlock()
296
297 if ep.backend == nil {
298 return status.Error(codes.Aborted, "backend-not-set")
299 }
300
301 ep.adapterServices = make(map[string]*adapterService)
302 ep.deviceTypeToAdapterServiceMap = make(map[string]string)
303
304 // Load the adapters
305 blobs, err := ep.backend.List(log.WithSpanFromContext(context.Background(), ctx), "adapters")
306 if err != nil {
307 return err
308 }
309
310 // Data is marshalled as proto bytes in the data store
311 for _, blob := range blobs {
312 data := blob.Value.([]byte)
313 adapter := &voltha.Adapter{}
314 if err := proto.Unmarshal(data, adapter); err != nil {
315 return err
316 }
317 // A valid adapter should have the vendorID set
318 if err := ep.setupAdapterWithLock(ctx, adapter); err != nil {
319 logger.Errorw(ctx, "missing vendor id", log.Fields{"adapter": adapter})
320 }
321 }
322 // Load the device types
323 blobs, err = ep.backend.List(log.WithSpanFromContext(context.Background(), ctx), "device_types")
324 if err != nil {
325 return err
326 }
327 for _, blob := range blobs {
328 data := blob.Value.([]byte)
329 deviceType := &voltha.DeviceType{}
330 if err := proto.Unmarshal(data, deviceType); err != nil {
331 return err
332 }
333 ep.addDeviceTypeWithLock(deviceType)
334 }
335
336 ep.printServices(ctx)
337 return nil
338}
339
340func (ep *endpointManager) printServices(ctx context.Context) {
341 if logger.V(log.DebugLevel) {
342 for key, val := range ep.adapterServices {
343 members := val.consistentRing.GetMembers()
344 logger.Debugw(ctx, "adapter-service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())})
345 for _, m := range members {
346 n := m.(Member)
347 logger.Debugw(ctx, "adapter-instance-registered", log.Fields{"service-id": n.getID(), "adapter-type": n.getAdapterType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()})
348 }
349 }
350 logger.Debugw(ctx, "device-types", log.Fields{"device-types": ep.deviceTypeToAdapterServiceMap})
351 }
352}
353
354func (ep *endpointManager) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
355 ep.adapterServicesLock.Lock()
356 defer ep.adapterServicesLock.Unlock()
357 ep.deviceTypeToAdapterServiceMapLock.Lock()
358 defer ep.deviceTypeToAdapterServiceMapLock.Unlock()
359
360 if err := ep.setupAdapterWithLock(ctx, adapter); err != nil {
361 return err
362 }
363 ep.addDeviceTypesWithLock(deviceTypes)
364 ep.printServices(ctx)
365 return nil
366}
367
368func (ep *endpointManager) setupAdapterWithLock(ctx context.Context, adapter *voltha.Adapter) error {
369 // Build the consistent ring for that adapter
370 if adapter.Vendor != "" {
371 if _, ok := ep.adapterServices[adapter.Type]; !ok {
372 ep.adapterServices[adapter.Type] = &adapterService{
373 adapterType: adapter.Type,
374 totalReplicas: adapter.TotalReplicas,
375 replicas: make(map[ReplicaID]Endpoint),
376 consistentRing: consistent.New(nil, ep.getConsistentConfig()),
377 }
378
379 }
380 currentReplica := ReplicaID(adapter.CurrentReplica)
381 endpoint := Endpoint(adapter.Endpoint)
382 ep.adapterServices[adapter.Type].replicas[currentReplica] = endpoint
383 ep.adapterServices[adapter.Type].consistentRing.Add(newMember(adapter.Id, adapter.Type, adapter.Vendor, endpoint, adapter.Version, currentReplica))
384 } else {
385 logger.Errorw(ctx, "missing-vendor-id", log.Fields{"adapter": adapter})
386 return fmt.Errorf("missing vendor id for %s adapter", adapter.Id)
387 }
388 return nil
389}
390
391func (ep *endpointManager) addDeviceTypesWithLock(deviceTypes *voltha.DeviceTypes) {
392 // Update the device types
393 for _, deviceType := range deviceTypes.Items {
394 if _, ok := ep.deviceTypeToAdapterServiceMap[deviceType.Id]; !ok {
395 ep.deviceTypeToAdapterServiceMap[deviceType.Id] = deviceType.AdapterType
396 }
397 }
398}
399
400func (ep *endpointManager) addDeviceTypeWithLock(deviceType *voltha.DeviceType) {
401 if _, ok := ep.deviceTypeToAdapterServiceMap[deviceType.Id]; !ok {
402 ep.deviceTypeToAdapterServiceMap[deviceType.Id] = deviceType.AdapterType
403 }
404}
405
406// makeKey creates the string that the hash function uses to create the hash
407// In most cases, a deviceType is the same as a serviceType. It is being differentiated here to allow a
408// serviceType to support multiple device types
409func (ep *endpointManager) makeKey(deviceID string, deviceType string, serviceType string) []byte {
410 return []byte(fmt.Sprintf("%s_%s_%s", serviceType, deviceType, deviceID))
411}
412
413// The consistent package requires a hasher function
414type hasher struct{}
415
416// Sum64 provides the hasher function. Based upon numerous testing scenarios, the xxhash package seems to provide the
417// best distribution compare to other hash packages
418func (h hasher) Sum64(data []byte) uint64 {
419 return xxhash.Sum64(data)
420}
421
422// Member represents a member on the consistent ring
423type Member interface {
424 String() string
425 getReplica() ReplicaID
426 getEndPoint() Endpoint
427 getID() string
428 getAdapterType() string
429}
430
431// member implements the Member interface
432type member struct {
433 id string
434 adapterType string
435 vendor string
436 version string
437 replica ReplicaID
438 endpoint Endpoint
439}
440
441func newMember(id string, adapterType string, vendor string, endPoint Endpoint, version string, replica ReplicaID) Member {
442 return &member{
443 id: id,
444 adapterType: adapterType,
445 vendor: vendor,
446 version: version,
447 replica: replica,
448 endpoint: endPoint,
449 }
450}
451
452func (m *member) String() string {
453 return string(m.endpoint)
454}
455
456func (m *member) getReplica() ReplicaID {
457 return m.replica
458}
459
460func (m *member) getEndPoint() Endpoint {
461 return m.endpoint
462}
463
464func (m *member) getID() string {
465 return m.id
466}
467
468func (m *member) getAdapterType() string {
469 return m.adapterType
470}