blob: 12583826513e333f91573d4e05ac5bd96c30d23f [file] [log] [blame]
khenaidoob6238b32020-04-07 12:07:36 -04001/*
2 * Copyright 2020-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package kafka
17
18import (
19 "context"
20 "fmt"
21 "github.com/buraksezer/consistent"
22 "github.com/cespare/xxhash"
23 "github.com/golang/protobuf/proto"
24 "github.com/opencord/voltha-lib-go/v3/pkg/db"
25 "github.com/opencord/voltha-lib-go/v3/pkg/log"
26 "github.com/opencord/voltha-protos/v3/go/voltha"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
29 "sync"
30)
31
32const (
33 // All the values below can be tuned to get optimal data distribution. The numbers below seems to work well when
34 // supporting 1000-10000 devices and 1 - 20 replicas of a service
35
36 // Keys are distributed among partitions. Prime numbers are good to distribute keys uniformly.
37 DefaultPartitionCount = 1117
38
39 // Represents how many times a node is replicated on the consistent ring.
40 DefaultReplicationFactor = 117
41
42 // Load is used to calculate average load.
43 DefaultLoad = 1.1
44)
45
46type Endpoint string // Endpoint of a service instance. When using kafka, this is the topic name of a service
47type ReplicaID int32 // The replication ID of a service instance
48
49type EndpointManager interface {
50
51 // GetEndpoint is called to get the endpoint to communicate with for a specific device and service type. For
52 // now this will return the topic name
53 GetEndpoint(deviceID string, serviceType string) (Endpoint, error)
54
55 // IsDeviceOwnedByService is invoked when a specific service (service type + replicaNumber) is restarted and
56 // devices owned by that service need to be reconciled
57 IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error)
58
Matteo Scandolo87d71c02020-04-09 13:15:44 -070059 // GetReplicaAssignment returns the replica number of the service that owns the deviceID. This is used by the
khenaidoob6238b32020-04-07 12:07:36 -040060 // test only
Matteo Scandolo87d71c02020-04-09 13:15:44 -070061 GetReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error)
khenaidoob6238b32020-04-07 12:07:36 -040062}
63
64type service struct {
65 id string // Id of the service. The same id is used for all replicas
66 totalReplicas int32
67 replicas map[ReplicaID]Endpoint
68 consistentRing *consistent.Consistent
69}
70
71type endpointManager struct {
72 partitionCount int
73 replicationFactor int
74 load float64
75 backend *db.Backend
76 services map[string]*service
77 servicesLock sync.RWMutex
78 deviceTypeServiceMap map[string]string
79 deviceTypeServiceMapLock sync.RWMutex
80}
81
82type EndpointManagerOption func(*endpointManager)
83
84func PartitionCount(count int) EndpointManagerOption {
85 return func(args *endpointManager) {
86 args.partitionCount = count
87 }
88}
89
90func ReplicationFactor(replicas int) EndpointManagerOption {
91 return func(args *endpointManager) {
92 args.replicationFactor = replicas
93 }
94}
95
96func Load(load float64) EndpointManagerOption {
97 return func(args *endpointManager) {
98 args.load = load
99 }
100}
101
102func newEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
103 tm := &endpointManager{
104 partitionCount: DefaultPartitionCount,
105 replicationFactor: DefaultReplicationFactor,
106 load: DefaultLoad,
107 backend: backend,
108 services: make(map[string]*service),
109 deviceTypeServiceMap: make(map[string]string),
110 }
111
112 for _, option := range opts {
113 option(tm)
114 }
115 return tm
116}
117
118func NewEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
119 return newEndpointManager(backend, opts...)
120}
121
122func (ep *endpointManager) GetEndpoint(deviceID string, serviceType string) (Endpoint, error) {
123 logger.Debugw("getting-endpoint", log.Fields{"device-id": deviceID, "service": serviceType})
124 owner, err := ep.getOwner(deviceID, serviceType)
125 if err != nil {
126 return "", err
127 }
128 m, ok := owner.(Member)
129 if !ok {
130 return "", status.Errorf(codes.Aborted, "invalid-member-%v", owner)
131 }
132 endpoint := m.getEndPoint()
133 if endpoint == "" {
134 return "", status.Errorf(codes.Unavailable, "endpoint-not-set-%s", serviceType)
135 }
136 logger.Debugw("returning-endpoint", log.Fields{"device-id": deviceID, "service": serviceType, "endpoint": endpoint})
137 return endpoint, nil
138}
139
140func (ep *endpointManager) IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error) {
141 logger.Debugw("device-ownership", log.Fields{"device-id": deviceID, "service": serviceType, "replica-number": replicaNumber})
142 owner, err := ep.getOwner(deviceID, serviceType)
143 if err != nil {
144 return false, nil
145 }
146 m, ok := owner.(Member)
147 if !ok {
148 return false, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
149 }
150 return m.getReplica() == ReplicaID(replicaNumber), nil
151}
152
Matteo Scandolo87d71c02020-04-09 13:15:44 -0700153func (ep *endpointManager) GetReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error) {
khenaidoob6238b32020-04-07 12:07:36 -0400154 owner, err := ep.getOwner(deviceID, serviceType)
155 if err != nil {
156 return 0, nil
157 }
158 m, ok := owner.(Member)
159 if !ok {
160 return 0, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
161 }
162 return m.getReplica(), nil
163}
164
165func (ep *endpointManager) getOwner(deviceID string, serviceType string) (consistent.Member, error) {
166 serv, dType, err := ep.getServiceAndDeviceType(serviceType)
167 if err != nil {
168 return nil, err
169 }
170 key := ep.makeKey(deviceID, dType, serviceType)
171 return serv.consistentRing.LocateKey(key), nil
172}
173
174func (ep *endpointManager) getServiceAndDeviceType(serviceType string) (*service, string, error) {
175 // Check whether service exist
176 ep.servicesLock.RLock()
177 serv, serviceExist := ep.services[serviceType]
178 ep.servicesLock.RUnlock()
179
180 // Load the service and device types if needed
181 if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
182 if err := ep.loadServices(); err != nil {
183 return nil, "", err
184 }
185
186 // Check whether the service exists now
187 ep.servicesLock.RLock()
188 serv, serviceExist = ep.services[serviceType]
189 ep.servicesLock.RUnlock()
190 if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
191 return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
192 }
193 }
194
195 ep.deviceTypeServiceMapLock.RLock()
196 defer ep.deviceTypeServiceMapLock.RUnlock()
197 for dType, sType := range ep.deviceTypeServiceMap {
198 if sType == serviceType {
199 return serv, dType, nil
200 }
201 }
202 return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
203}
204
205func (ep *endpointManager) getConsistentConfig() consistent.Config {
206 return consistent.Config{
207 PartitionCount: ep.partitionCount,
208 ReplicationFactor: ep.replicationFactor,
209 Load: ep.load,
210 Hasher: hasher{},
211 }
212}
213
214// loadServices loads the services (adapters) and device types in memory. Because of the small size of the data and
215// the data format in the dB being binary protobuf then it is better to load all the data if inconsistency is detected,
216// instead of watching for updates in the dB and acting on it.
217func (ep *endpointManager) loadServices() error {
218 ep.servicesLock.Lock()
219 defer ep.servicesLock.Unlock()
220 ep.deviceTypeServiceMapLock.Lock()
221 defer ep.deviceTypeServiceMapLock.Unlock()
222
223 if ep.backend == nil {
224 return status.Error(codes.Aborted, "backend-not-set")
225 }
226 ep.services = make(map[string]*service)
227 ep.deviceTypeServiceMap = make(map[string]string)
228
229 // Load the adapters
230 blobs, err := ep.backend.List(context.Background(), "adapters")
231 if err != nil {
232 return err
233 }
234
235 // Data is marshalled as proto bytes in the data store
236 for _, blob := range blobs {
237 data := blob.Value.([]byte)
238 adapter := &voltha.Adapter{}
239 if err := proto.Unmarshal(data, adapter); err != nil {
240 return err
241 }
242 // A valid adapter should have the vendorID set
243 if adapter.Vendor != "" {
244 if _, ok := ep.services[adapter.Type]; !ok {
245 ep.services[adapter.Type] = &service{
246 id: adapter.Type,
247 totalReplicas: adapter.TotalReplicas,
248 replicas: make(map[ReplicaID]Endpoint),
249 consistentRing: consistent.New(nil, ep.getConsistentConfig()),
250 }
251
252 }
253 currentReplica := ReplicaID(adapter.CurrentReplica)
254 endpoint := Endpoint(adapter.Endpoint)
255 ep.services[adapter.Type].replicas[currentReplica] = endpoint
256 ep.services[adapter.Type].consistentRing.Add(newMember(adapter.Id, adapter.Type, adapter.Vendor, endpoint, adapter.Version, currentReplica))
257 }
258 }
259 // Load the device types
260 blobs, err = ep.backend.List(context.Background(), "device_types")
261 if err != nil {
262 return err
263 }
264 for _, blob := range blobs {
265 data := blob.Value.([]byte)
266 deviceType := &voltha.DeviceType{}
267 if err := proto.Unmarshal(data, deviceType); err != nil {
268 return err
269 }
270 if _, ok := ep.deviceTypeServiceMap[deviceType.Id]; !ok {
271 ep.deviceTypeServiceMap[deviceType.Id] = deviceType.Adapter
272 }
273 }
274
275 // Log the loaded data in debug mode to facilitate trouble shooting
276 if logger.V(log.DebugLevel) {
277 for key, val := range ep.services {
278 members := val.consistentRing.GetMembers()
279 logger.Debugw("service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())})
280 for _, m := range members {
281 n := m.(Member)
282 logger.Debugw("service-loaded", log.Fields{"serviceId": n.getID(), "serviceType": n.getServiceType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()})
283 }
284 }
285 logger.Debugw("device-types-loaded", log.Fields{"device-types": ep.deviceTypeServiceMap})
286 }
287 return nil
288}
289
290// makeKey creates the string that the hash function uses to create the hash
291func (ep *endpointManager) makeKey(deviceID string, deviceType string, serviceType string) []byte {
292 return []byte(fmt.Sprintf("%s_%s_%s", serviceType, deviceType, deviceID))
293}
294
295// The consistent package requires a hasher function
296type hasher struct{}
297
298// Sum64 provides the hasher function. Based upon numerous testing scenarios, the xxhash package seems to provide the
299// best distribution compare to other hash packages
300func (h hasher) Sum64(data []byte) uint64 {
301 return xxhash.Sum64(data)
302}
303
304// Member represents a member on the consistent ring
305type Member interface {
306 String() string
307 getReplica() ReplicaID
308 getEndPoint() Endpoint
309 getID() string
310 getServiceType() string
311}
312
313// member implements the Member interface
314type member struct {
315 id string
316 serviceType string
317 vendor string
318 version string
319 replica ReplicaID
320 endpoint Endpoint
321}
322
323func newMember(ID string, serviceType string, vendor string, endPoint Endpoint, version string, replica ReplicaID) Member {
324 return &member{
325 id: ID,
326 serviceType: serviceType,
327 vendor: vendor,
328 version: version,
329 replica: replica,
330 endpoint: endPoint,
331 }
332}
333
334func (m *member) String() string {
335 return string(m.endpoint)
336}
337
338func (m *member) getReplica() ReplicaID {
339 return m.replica
340}
341
342func (m *member) getEndPoint() Endpoint {
343 return m.endpoint
344}
345
346func (m *member) getID() string {
347 return m.id
348}
349
350func (m *member) getServiceType() string {
351 return m.serviceType
352}