blob: 78635d1f75f6fd571cfda5647e7bdac1033a08ee [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001/*
2 * Copyright 2021-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package adapter
17
18import (
19 "context"
20 "fmt"
21 "math"
22 "strconv"
23 "testing"
24 "time"
25
26 "github.com/golang/protobuf/proto"
27 "github.com/google/uuid"
28 "github.com/opencord/voltha-lib-go/v7/pkg/db"
29 "github.com/opencord/voltha-lib-go/v7/pkg/log"
30 "github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd"
31 "github.com/opencord/voltha-protos/v5/go/voltha"
32 "github.com/phayes/freeport"
33 "github.com/stretchr/testify/assert"
34 "google.golang.org/grpc/codes"
35 "google.golang.org/grpc/status"
36)
37
38type EPTest struct {
39 etcdServer *etcd.EtcdServer
40 backend *db.Backend
41 maxReplicas int
42 minReplicas int
43}
44
45func newEPTest(minReplicas, maxReplicas int) *EPTest {
46 ctx := context.Background()
47 test := &EPTest{
48 minReplicas: minReplicas,
49 maxReplicas: maxReplicas,
50 }
51
52 // Create backend
53 if err := test.initBackend(); err != nil {
54 logger.Fatalw(ctx, "setting-backend-failed", log.Fields{"error": err})
55 }
56
57 // Populate backend with data
58 if err := test.populateBackend(); err != nil {
59 logger.Fatalw(ctx, "populating-db-failed", log.Fields{"error": err})
60 }
61 return test
62}
63
64func (ep *EPTest) initBackend() error {
65 ctx := context.Background()
66 configName := "voltha-go.adapter.ep.test"
67 storageDir := "voltha-go.adapter.ep.etcd"
68 logLevel := "error"
69 timeout := 5 * time.Second
70
71 kvClientPort, err := freeport.GetFreePort()
72 if err != nil {
73 return err
74 }
75 peerPort, err := freeport.GetFreePort()
76 if err != nil {
77 return err
78 }
79 ep.etcdServer = etcd.StartEtcdServer(ctx, etcd.MKConfig(ctx, configName, kvClientPort, peerPort, storageDir, logLevel))
80 if ep.etcdServer == nil {
81 return status.Error(codes.Internal, "Embedded server failed to start")
82 }
83
84 ep.backend = db.NewBackend(ctx, "etcd", "127.0.0.1"+":"+strconv.Itoa(kvClientPort), timeout, "service/voltha")
85 return nil
86}
87
88func (ep *EPTest) stopAll() {
89 if ep.etcdServer != nil {
90 ep.etcdServer.Stop(context.Background())
91 }
92}
93
94func (ep *EPTest) populateBackend() error {
95 // Add an adapter with multiple replicas
96 adapterPrefix := "adapter_brcm_openomci_onu"
97 numReplicas := ep.maxReplicas
98 for i := 0; i < numReplicas; i++ {
99 adapter := &voltha.Adapter{
100 Id: fmt.Sprintf("%s_%d", adapterPrefix, i),
101 Vendor: "VOLTHA OpenONU",
102 Version: "2.4.0-dev0",
103 Type: adapterPrefix,
104 CurrentReplica: int32(i),
105 TotalReplicas: int32(numReplicas),
106 Endpoint: fmt.Sprintf("%s_%d", adapterPrefix, i),
107 }
108 adapterKVKey := fmt.Sprintf("%s/%d", adapterPrefix, i)
109 blob, err := proto.Marshal(adapter)
110 if err != nil {
111 return err
112 }
113 if err := ep.backend.Put(context.Background(), "adapters/"+adapterKVKey, blob); err != nil {
114 return err
115 }
116 }
117
118 // Add an adapter with minreplicas
119 adapterPrefix = "adapter_openolt"
120 numReplicas = ep.minReplicas
121 for i := 0; i < numReplicas; i++ {
122 adapter := &voltha.Adapter{
123 Id: fmt.Sprintf("%s_%d", adapterPrefix, i),
124 Vendor: "VOLTHA OpenOLT",
125 Version: "2.3.1-dev",
126 Type: adapterPrefix,
127 CurrentReplica: int32(i),
128 TotalReplicas: int32(numReplicas),
129 Endpoint: fmt.Sprintf("%s_%d", adapterPrefix, i),
130 }
131 adapterKVKey := fmt.Sprintf("%s/%d", adapterPrefix, i)
132 blob, err := proto.Marshal(adapter)
133 if err != nil {
134 return err
135 }
136 if err := ep.backend.Put(context.Background(), "adapters/"+adapterKVKey, blob); err != nil {
137 return err
138 }
139 }
140
141 // Add the brcm_openomci_onu device type
142 dType := "brcm_openomci_onu"
143 adapterName := "adapter_brcm_openomci_onu"
144 deviceType := &voltha.DeviceType{
145 Id: dType,
146 VendorIds: []string{"OPEN", "ALCL", "BRCM", "TWSH", "ALPH", "ISKT", "SFAA", "BBSM", "SCOM", "ARPX", "DACM", "ERSN", "HWTC", "CIGG"},
147 AdapterType: adapterName,
148 AcceptsAddRemoveFlowUpdates: true,
149 }
150 blob, err := proto.Marshal(deviceType)
151 if err != nil {
152 return err
153 }
154 if err := ep.backend.Put(context.Background(), "device_types/"+deviceType.Id, blob); err != nil {
155 return err
156 }
157
158 // Add the openolt device type
159 dType = "openolt"
160 adapterName = "adapter_openolt"
161 deviceType = &voltha.DeviceType{
162 Id: dType,
163 AdapterType: adapterName,
164 AcceptsAddRemoveFlowUpdates: true,
165 }
166 blob, err = proto.Marshal(deviceType)
167 if err != nil {
168 return err
169 }
170 if err := ep.backend.Put(context.Background(), "device_types/"+deviceType.Id, blob); err != nil {
171 return err
172 }
173 return nil
174}
175
176func getMeanAndStdDeviation(val []int, replicas int) (float64, float64) {
177 var sum, mean, sd float64
178 for i := 0; i < replicas; i++ {
179 sum += float64(val[i])
180 }
181 mean = sum / float64(replicas)
182
183 for j := 0; j < replicas; j++ {
184 sd += math.Pow(float64(val[j])-mean, 2)
185 }
186 sd = math.Sqrt(sd / float64(replicas))
187 return mean, sd
188}
189
190func (ep *EPTest) testEndpointManagerAPIs(t *testing.T, tm EndpointManager, adapterType string, deviceType string, replicas int) {
191 ctx := context.Background()
192 // Map of device ids to topic
193 deviceIDs := make(map[string]Endpoint)
194 numDevices := 1000
195 total := make([]int, replicas)
196 for i := 0; i < numDevices; i++ {
197 deviceID := uuid.New().String()
198 endpoint, err := tm.GetEndpoint(ctx, deviceID, deviceType)
199 if err != nil {
200 logger.Fatalw(ctx, "error-getting-endpoint", log.Fields{"error": err})
201 }
202 deviceIDs[deviceID] = endpoint
203 replicaID, err := tm.GetReplicaAssignment(ctx, deviceID, adapterType)
204 if err != nil {
205 logger.Fatalw(ctx, "error-getting-endpoint", log.Fields{"error": err})
206 }
207 total[replicaID]++
208 }
209
210 mean, sdtDev := getMeanAndStdDeviation(total, replicas)
211 fmt.Printf("Device distributions => devices:%d adapter_replicas:%d mean:%d standard_deviation:%d, distributions:%v\n", numDevices, replicas, int(mean), int(sdtDev), total)
212
213 // Verify that we get the same topic for a given device ID, irrespective of the number of iterations
214 numIterations := 10
215 for i := 0; i < numIterations; i++ {
216 for deviceID, expectedEndpoint := range deviceIDs {
217 endpointByAdapterType, err := tm.GetEndpoint(ctx, deviceID, deviceType)
218 if err != nil {
219 logger.Fatalw(ctx, "error-getting-endpoint", log.Fields{"error": err})
220 }
221 assert.Equal(t, expectedEndpoint, endpointByAdapterType)
222 }
223 }
224
225 // Verify that a device belong to the correct node
226 for deviceID := range deviceIDs {
227 replicaID, err := tm.GetReplicaAssignment(ctx, deviceID, adapterType)
228 if err != nil {
229 logger.Fatalw(ctx, "error-getting-topic", log.Fields{"error": err})
230 }
231 for k := 0; k < replicas; k++ {
232 owned, err := tm.IsDeviceOwnedByAdapter(ctx, deviceID, adapterType, int32(k))
233 if err != nil {
234 logger.Fatalw(ctx, "error-verifying-device-ownership", log.Fields{"error": err})
235 }
236 assert.Equal(t, ReplicaID(k) == replicaID, owned)
237 }
238 }
239}
240
241func TestEndpointManagerSuite(t *testing.T) {
242 tmt := newEPTest(1, 10)
243 assert.NotNil(t, tmt)
244
245 tm := NewEndpointManager(
246 tmt.backend,
247 PartitionCount(1117),
248 ReplicationFactor(200),
249 Load(1.1))
250
251 defer tmt.stopAll()
252
253 //1. Test APIs with multiple replicas
254 tmt.testEndpointManagerAPIs(t, tm, "adapter_brcm_openomci_onu", "brcm_openomci_onu", tmt.maxReplicas)
255
256 //2. Test APIs with single replica
257 tmt.testEndpointManagerAPIs(t, tm, "adapter_openolt", "openolt", tmt.minReplicas)
258}