khenaidoo | b6238b3 | 2020-04-07 12:07:36 -0400 | [diff] [blame^] | 1 | /* |
| 2 | * Copyright 2020-present Open Networking Foundation |
| 3 | |
| 4 | * Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | * you may not use this file except in compliance with the License. |
| 6 | * You may obtain a copy of the License at |
| 7 | |
| 8 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | * Unless required by applicable law or agreed to in writing, software |
| 11 | * distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | * See the License for the specific language governing permissions and |
| 14 | * limitations under the License. |
| 15 | */ |
| 16 | package kafka |
| 17 | |
| 18 | import ( |
| 19 | "context" |
| 20 | "fmt" |
| 21 | "github.com/buraksezer/consistent" |
| 22 | "github.com/cespare/xxhash" |
| 23 | "github.com/golang/protobuf/proto" |
| 24 | "github.com/opencord/voltha-lib-go/v3/pkg/db" |
| 25 | "github.com/opencord/voltha-lib-go/v3/pkg/log" |
| 26 | "github.com/opencord/voltha-protos/v3/go/voltha" |
| 27 | "google.golang.org/grpc/codes" |
| 28 | "google.golang.org/grpc/status" |
| 29 | "sync" |
| 30 | ) |
| 31 | |
| 32 | const ( |
| 33 | // All the values below can be tuned to get optimal data distribution. The numbers below seems to work well when |
| 34 | // supporting 1000-10000 devices and 1 - 20 replicas of a service |
| 35 | |
| 36 | // Keys are distributed among partitions. Prime numbers are good to distribute keys uniformly. |
| 37 | DefaultPartitionCount = 1117 |
| 38 | |
| 39 | // Represents how many times a node is replicated on the consistent ring. |
| 40 | DefaultReplicationFactor = 117 |
| 41 | |
| 42 | // Load is used to calculate average load. |
| 43 | DefaultLoad = 1.1 |
| 44 | ) |
| 45 | |
| 46 | type Endpoint string // Endpoint of a service instance. When using kafka, this is the topic name of a service |
| 47 | type ReplicaID int32 // The replication ID of a service instance |
| 48 | |
| 49 | type EndpointManager interface { |
| 50 | |
| 51 | // GetEndpoint is called to get the endpoint to communicate with for a specific device and service type. For |
| 52 | // now this will return the topic name |
| 53 | GetEndpoint(deviceID string, serviceType string) (Endpoint, error) |
| 54 | |
| 55 | // IsDeviceOwnedByService is invoked when a specific service (service type + replicaNumber) is restarted and |
| 56 | // devices owned by that service need to be reconciled |
| 57 | IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error) |
| 58 | |
| 59 | // getReplicaAssignment returns the replica number of the service that owns the deviceID. This is used by the |
| 60 | // test only |
| 61 | getReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error) |
| 62 | } |
| 63 | |
| 64 | type service struct { |
| 65 | id string // Id of the service. The same id is used for all replicas |
| 66 | totalReplicas int32 |
| 67 | replicas map[ReplicaID]Endpoint |
| 68 | consistentRing *consistent.Consistent |
| 69 | } |
| 70 | |
| 71 | type endpointManager struct { |
| 72 | partitionCount int |
| 73 | replicationFactor int |
| 74 | load float64 |
| 75 | backend *db.Backend |
| 76 | services map[string]*service |
| 77 | servicesLock sync.RWMutex |
| 78 | deviceTypeServiceMap map[string]string |
| 79 | deviceTypeServiceMapLock sync.RWMutex |
| 80 | } |
| 81 | |
| 82 | type EndpointManagerOption func(*endpointManager) |
| 83 | |
| 84 | func PartitionCount(count int) EndpointManagerOption { |
| 85 | return func(args *endpointManager) { |
| 86 | args.partitionCount = count |
| 87 | } |
| 88 | } |
| 89 | |
| 90 | func ReplicationFactor(replicas int) EndpointManagerOption { |
| 91 | return func(args *endpointManager) { |
| 92 | args.replicationFactor = replicas |
| 93 | } |
| 94 | } |
| 95 | |
| 96 | func Load(load float64) EndpointManagerOption { |
| 97 | return func(args *endpointManager) { |
| 98 | args.load = load |
| 99 | } |
| 100 | } |
| 101 | |
| 102 | func newEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager { |
| 103 | tm := &endpointManager{ |
| 104 | partitionCount: DefaultPartitionCount, |
| 105 | replicationFactor: DefaultReplicationFactor, |
| 106 | load: DefaultLoad, |
| 107 | backend: backend, |
| 108 | services: make(map[string]*service), |
| 109 | deviceTypeServiceMap: make(map[string]string), |
| 110 | } |
| 111 | |
| 112 | for _, option := range opts { |
| 113 | option(tm) |
| 114 | } |
| 115 | return tm |
| 116 | } |
| 117 | |
| 118 | func NewEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager { |
| 119 | return newEndpointManager(backend, opts...) |
| 120 | } |
| 121 | |
| 122 | func (ep *endpointManager) GetEndpoint(deviceID string, serviceType string) (Endpoint, error) { |
| 123 | logger.Debugw("getting-endpoint", log.Fields{"device-id": deviceID, "service": serviceType}) |
| 124 | owner, err := ep.getOwner(deviceID, serviceType) |
| 125 | if err != nil { |
| 126 | return "", err |
| 127 | } |
| 128 | m, ok := owner.(Member) |
| 129 | if !ok { |
| 130 | return "", status.Errorf(codes.Aborted, "invalid-member-%v", owner) |
| 131 | } |
| 132 | endpoint := m.getEndPoint() |
| 133 | if endpoint == "" { |
| 134 | return "", status.Errorf(codes.Unavailable, "endpoint-not-set-%s", serviceType) |
| 135 | } |
| 136 | logger.Debugw("returning-endpoint", log.Fields{"device-id": deviceID, "service": serviceType, "endpoint": endpoint}) |
| 137 | return endpoint, nil |
| 138 | } |
| 139 | |
| 140 | func (ep *endpointManager) IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error) { |
| 141 | logger.Debugw("device-ownership", log.Fields{"device-id": deviceID, "service": serviceType, "replica-number": replicaNumber}) |
| 142 | owner, err := ep.getOwner(deviceID, serviceType) |
| 143 | if err != nil { |
| 144 | return false, nil |
| 145 | } |
| 146 | m, ok := owner.(Member) |
| 147 | if !ok { |
| 148 | return false, status.Errorf(codes.Aborted, "invalid-member-%v", owner) |
| 149 | } |
| 150 | return m.getReplica() == ReplicaID(replicaNumber), nil |
| 151 | } |
| 152 | |
| 153 | func (ep *endpointManager) getReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error) { |
| 154 | owner, err := ep.getOwner(deviceID, serviceType) |
| 155 | if err != nil { |
| 156 | return 0, nil |
| 157 | } |
| 158 | m, ok := owner.(Member) |
| 159 | if !ok { |
| 160 | return 0, status.Errorf(codes.Aborted, "invalid-member-%v", owner) |
| 161 | } |
| 162 | return m.getReplica(), nil |
| 163 | } |
| 164 | |
| 165 | func (ep *endpointManager) getOwner(deviceID string, serviceType string) (consistent.Member, error) { |
| 166 | serv, dType, err := ep.getServiceAndDeviceType(serviceType) |
| 167 | if err != nil { |
| 168 | return nil, err |
| 169 | } |
| 170 | key := ep.makeKey(deviceID, dType, serviceType) |
| 171 | return serv.consistentRing.LocateKey(key), nil |
| 172 | } |
| 173 | |
| 174 | func (ep *endpointManager) getServiceAndDeviceType(serviceType string) (*service, string, error) { |
| 175 | // Check whether service exist |
| 176 | ep.servicesLock.RLock() |
| 177 | serv, serviceExist := ep.services[serviceType] |
| 178 | ep.servicesLock.RUnlock() |
| 179 | |
| 180 | // Load the service and device types if needed |
| 181 | if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) { |
| 182 | if err := ep.loadServices(); err != nil { |
| 183 | return nil, "", err |
| 184 | } |
| 185 | |
| 186 | // Check whether the service exists now |
| 187 | ep.servicesLock.RLock() |
| 188 | serv, serviceExist = ep.services[serviceType] |
| 189 | ep.servicesLock.RUnlock() |
| 190 | if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) { |
| 191 | return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType) |
| 192 | } |
| 193 | } |
| 194 | |
| 195 | ep.deviceTypeServiceMapLock.RLock() |
| 196 | defer ep.deviceTypeServiceMapLock.RUnlock() |
| 197 | for dType, sType := range ep.deviceTypeServiceMap { |
| 198 | if sType == serviceType { |
| 199 | return serv, dType, nil |
| 200 | } |
| 201 | } |
| 202 | return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType) |
| 203 | } |
| 204 | |
| 205 | func (ep *endpointManager) getConsistentConfig() consistent.Config { |
| 206 | return consistent.Config{ |
| 207 | PartitionCount: ep.partitionCount, |
| 208 | ReplicationFactor: ep.replicationFactor, |
| 209 | Load: ep.load, |
| 210 | Hasher: hasher{}, |
| 211 | } |
| 212 | } |
| 213 | |
| 214 | // loadServices loads the services (adapters) and device types in memory. Because of the small size of the data and |
| 215 | // the data format in the dB being binary protobuf then it is better to load all the data if inconsistency is detected, |
| 216 | // instead of watching for updates in the dB and acting on it. |
| 217 | func (ep *endpointManager) loadServices() error { |
| 218 | ep.servicesLock.Lock() |
| 219 | defer ep.servicesLock.Unlock() |
| 220 | ep.deviceTypeServiceMapLock.Lock() |
| 221 | defer ep.deviceTypeServiceMapLock.Unlock() |
| 222 | |
| 223 | if ep.backend == nil { |
| 224 | return status.Error(codes.Aborted, "backend-not-set") |
| 225 | } |
| 226 | ep.services = make(map[string]*service) |
| 227 | ep.deviceTypeServiceMap = make(map[string]string) |
| 228 | |
| 229 | // Load the adapters |
| 230 | blobs, err := ep.backend.List(context.Background(), "adapters") |
| 231 | if err != nil { |
| 232 | return err |
| 233 | } |
| 234 | |
| 235 | // Data is marshalled as proto bytes in the data store |
| 236 | for _, blob := range blobs { |
| 237 | data := blob.Value.([]byte) |
| 238 | adapter := &voltha.Adapter{} |
| 239 | if err := proto.Unmarshal(data, adapter); err != nil { |
| 240 | return err |
| 241 | } |
| 242 | // A valid adapter should have the vendorID set |
| 243 | if adapter.Vendor != "" { |
| 244 | if _, ok := ep.services[adapter.Type]; !ok { |
| 245 | ep.services[adapter.Type] = &service{ |
| 246 | id: adapter.Type, |
| 247 | totalReplicas: adapter.TotalReplicas, |
| 248 | replicas: make(map[ReplicaID]Endpoint), |
| 249 | consistentRing: consistent.New(nil, ep.getConsistentConfig()), |
| 250 | } |
| 251 | |
| 252 | } |
| 253 | currentReplica := ReplicaID(adapter.CurrentReplica) |
| 254 | endpoint := Endpoint(adapter.Endpoint) |
| 255 | ep.services[adapter.Type].replicas[currentReplica] = endpoint |
| 256 | ep.services[adapter.Type].consistentRing.Add(newMember(adapter.Id, adapter.Type, adapter.Vendor, endpoint, adapter.Version, currentReplica)) |
| 257 | } |
| 258 | } |
| 259 | // Load the device types |
| 260 | blobs, err = ep.backend.List(context.Background(), "device_types") |
| 261 | if err != nil { |
| 262 | return err |
| 263 | } |
| 264 | for _, blob := range blobs { |
| 265 | data := blob.Value.([]byte) |
| 266 | deviceType := &voltha.DeviceType{} |
| 267 | if err := proto.Unmarshal(data, deviceType); err != nil { |
| 268 | return err |
| 269 | } |
| 270 | if _, ok := ep.deviceTypeServiceMap[deviceType.Id]; !ok { |
| 271 | ep.deviceTypeServiceMap[deviceType.Id] = deviceType.Adapter |
| 272 | } |
| 273 | } |
| 274 | |
| 275 | // Log the loaded data in debug mode to facilitate trouble shooting |
| 276 | if logger.V(log.DebugLevel) { |
| 277 | for key, val := range ep.services { |
| 278 | members := val.consistentRing.GetMembers() |
| 279 | logger.Debugw("service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())}) |
| 280 | for _, m := range members { |
| 281 | n := m.(Member) |
| 282 | logger.Debugw("service-loaded", log.Fields{"serviceId": n.getID(), "serviceType": n.getServiceType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()}) |
| 283 | } |
| 284 | } |
| 285 | logger.Debugw("device-types-loaded", log.Fields{"device-types": ep.deviceTypeServiceMap}) |
| 286 | } |
| 287 | return nil |
| 288 | } |
| 289 | |
| 290 | // makeKey creates the string that the hash function uses to create the hash |
| 291 | func (ep *endpointManager) makeKey(deviceID string, deviceType string, serviceType string) []byte { |
| 292 | return []byte(fmt.Sprintf("%s_%s_%s", serviceType, deviceType, deviceID)) |
| 293 | } |
| 294 | |
| 295 | // The consistent package requires a hasher function |
| 296 | type hasher struct{} |
| 297 | |
| 298 | // Sum64 provides the hasher function. Based upon numerous testing scenarios, the xxhash package seems to provide the |
| 299 | // best distribution compare to other hash packages |
| 300 | func (h hasher) Sum64(data []byte) uint64 { |
| 301 | return xxhash.Sum64(data) |
| 302 | } |
| 303 | |
| 304 | // Member represents a member on the consistent ring |
| 305 | type Member interface { |
| 306 | String() string |
| 307 | getReplica() ReplicaID |
| 308 | getEndPoint() Endpoint |
| 309 | getID() string |
| 310 | getServiceType() string |
| 311 | } |
| 312 | |
| 313 | // member implements the Member interface |
| 314 | type member struct { |
| 315 | id string |
| 316 | serviceType string |
| 317 | vendor string |
| 318 | version string |
| 319 | replica ReplicaID |
| 320 | endpoint Endpoint |
| 321 | } |
| 322 | |
| 323 | func newMember(ID string, serviceType string, vendor string, endPoint Endpoint, version string, replica ReplicaID) Member { |
| 324 | return &member{ |
| 325 | id: ID, |
| 326 | serviceType: serviceType, |
| 327 | vendor: vendor, |
| 328 | version: version, |
| 329 | replica: replica, |
| 330 | endpoint: endPoint, |
| 331 | } |
| 332 | } |
| 333 | |
| 334 | func (m *member) String() string { |
| 335 | return string(m.endpoint) |
| 336 | } |
| 337 | |
| 338 | func (m *member) getReplica() ReplicaID { |
| 339 | return m.replica |
| 340 | } |
| 341 | |
| 342 | func (m *member) getEndPoint() Endpoint { |
| 343 | return m.endpoint |
| 344 | } |
| 345 | |
| 346 | func (m *member) getID() string { |
| 347 | return m.id |
| 348 | } |
| 349 | |
| 350 | func (m *member) getServiceType() string { |
| 351 | return m.serviceType |
| 352 | } |