blob: 94f336c8b5ae468a811beec348a410248568ca83 [file] [log] [blame]
khenaidoo2bc48282019-07-16 18:13:46 -04001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Kent Hagerman2b216042020-04-03 18:28:56 -040016package device
khenaidoo2bc48282019-07-16 18:13:46 -040017
18import (
khenaidoo6e55d9e2019-12-12 18:26:26 -050019 "context"
David Bainbridged1afd662020-03-26 18:27:41 -070020 "math/rand"
Neha Sharmad1387da2020-05-07 20:07:28 +000021 "strconv"
David Bainbridged1afd662020-03-26 18:27:41 -070022 "sync"
23 "testing"
24 "time"
25
Kent Hagermanf5a67352020-04-30 15:15:26 -040026 "github.com/gogo/protobuf/proto"
Mahir Gunyeladdb66a2020-04-29 18:08:50 -070027 "github.com/opencord/voltha-go/db/model"
Kent Hagermanf5a67352020-04-30 15:15:26 -040028 "github.com/opencord/voltha-go/rw_core/config"
Mahir Gunyeladdb66a2020-04-29 18:08:50 -070029 "github.com/opencord/voltha-go/rw_core/core/adapter"
Mahir Gunyel03de0d32020-06-03 01:36:59 -070030 tst "github.com/opencord/voltha-go/rw_core/test"
Maninderdfadc982020-10-28 14:04:33 +053031 com "github.com/opencord/voltha-lib-go/v4/pkg/adapters/common"
32 "github.com/opencord/voltha-lib-go/v4/pkg/db"
Himani Chawlab4c25912020-11-12 17:16:38 +053033 "github.com/opencord/voltha-lib-go/v4/pkg/events"
Maninderdfadc982020-10-28 14:04:33 +053034 fu "github.com/opencord/voltha-lib-go/v4/pkg/flows"
35 "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
36 mock_etcd "github.com/opencord/voltha-lib-go/v4/pkg/mocks/etcd"
37 mock_kafka "github.com/opencord/voltha-lib-go/v4/pkg/mocks/kafka"
38 ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
39 "github.com/opencord/voltha-protos/v4/go/voltha"
khenaidoo6e55d9e2019-12-12 18:26:26 -050040 "github.com/phayes/freeport"
khenaidoo2bc48282019-07-16 18:13:46 -040041 "github.com/stretchr/testify/assert"
khenaidoo2bc48282019-07-16 18:13:46 -040042)
43
khenaidoo6e55d9e2019-12-12 18:26:26 -050044type LDATest struct {
Kent Hagerman2b216042020-04-03 18:28:56 -040045 etcdServer *mock_etcd.EtcdServer
46 deviceMgr *Manager
47 kmp kafka.InterContainerProxy
48 logicalDeviceMgr *LogicalManager
49 kClient kafka.Client
Himani Chawlab4c25912020-11-12 17:16:38 +053050 kEventClient kafka.Client
Kent Hagerman2b216042020-04-03 18:28:56 -040051 kvClientPort int
52 oltAdapterName string
53 onuAdapterName string
54 coreInstanceID string
55 defaultTimeout time.Duration
56 maxTimeout time.Duration
57 logicalDevice *voltha.LogicalDevice
Kent Hagerman2a07b862020-06-19 15:23:07 -040058 logicalPorts map[uint32]*voltha.LogicalPort
Kent Hagerman2b216042020-04-03 18:28:56 -040059 deviceIds []string
60 done chan int
khenaidoo6e55d9e2019-12-12 18:26:26 -050061}
62
Rohan Agrawal31f21802020-06-12 05:38:46 +000063func newLDATest(ctx context.Context) *LDATest {
khenaidoo6e55d9e2019-12-12 18:26:26 -050064 test := &LDATest{}
65 // Start the embedded etcd server
66 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +000067 test.etcdServer, test.kvClientPort, err = tst.StartEmbeddedEtcdServer(ctx, "voltha.rwcore.lda.test", "voltha.rwcore.lda.etcd", "error")
khenaidoo6e55d9e2019-12-12 18:26:26 -050068 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +000069 logger.Fatal(ctx, err)
khenaidoo6e55d9e2019-12-12 18:26:26 -050070 }
71 // Create the kafka client
Matteo Scandolod525ae32020-04-02 17:27:29 -070072 test.kClient = mock_kafka.NewKafkaClient()
Himani Chawlab4c25912020-11-12 17:16:38 +053073 test.kEventClient = mock_kafka.NewKafkaClient()
khenaidoo6e55d9e2019-12-12 18:26:26 -050074 test.oltAdapterName = "olt_adapter_mock"
75 test.onuAdapterName = "onu_adapter_mock"
76 test.coreInstanceID = "rw-da-test"
77 test.defaultTimeout = 5 * time.Second
78 test.maxTimeout = 20 * time.Second
79 test.done = make(chan int)
80 test.deviceIds = []string{com.GetRandomString(10), com.GetRandomString(10), com.GetRandomString(10)}
81 test.logicalDevice = &voltha.LogicalDevice{
82 Desc: &ofp.OfpDesc{
83 HwDesc: "olt_adapter_mock",
84 SwDesc: "olt_adapter_mock",
85 SerialNum: com.GetRandomSerialNumber(),
86 },
87 SwitchFeatures: &ofp.OfpSwitchFeatures{
88 NBuffers: 256,
89 NTables: 2,
90 Capabilities: uint32(ofp.OfpCapabilities_OFPC_FLOW_STATS |
91 ofp.OfpCapabilities_OFPC_TABLE_STATS |
92 ofp.OfpCapabilities_OFPC_PORT_STATS |
93 ofp.OfpCapabilities_OFPC_GROUP_STATS),
94 },
95 RootDeviceId: test.deviceIds[0],
Kent Hagerman2a07b862020-06-19 15:23:07 -040096 }
97 test.logicalPorts = map[uint32]*voltha.LogicalPort{
98 1: {
99 Id: "1001",
100 DeviceId: test.deviceIds[0],
101 DevicePortNo: 1,
102 RootPort: true,
103 OfpPort: &ofp.OfpPort{
104 PortNo: 1,
105 Name: "port1",
106 Config: 4,
107 State: 4,
khenaidoo6e55d9e2019-12-12 18:26:26 -0500108 },
Kent Hagerman2a07b862020-06-19 15:23:07 -0400109 },
110 2: {
111 Id: "1002",
112 DeviceId: test.deviceIds[1],
113 DevicePortNo: 2,
114 RootPort: false,
115 OfpPort: &ofp.OfpPort{
116 PortNo: 2,
117 Name: "port2",
118 Config: 4,
119 State: 4,
khenaidoo6e55d9e2019-12-12 18:26:26 -0500120 },
Kent Hagerman2a07b862020-06-19 15:23:07 -0400121 },
122 3: {
123 Id: "1003",
124 DeviceId: test.deviceIds[2],
125 DevicePortNo: 3,
126 RootPort: false,
127 OfpPort: &ofp.OfpPort{
128 PortNo: 3,
129 Name: "port3",
130 Config: 4,
131 State: 4,
khenaidoo6e55d9e2019-12-12 18:26:26 -0500132 },
133 },
134 }
135 return test
136}
137
Rohan Agrawal31f21802020-06-12 05:38:46 +0000138func (lda *LDATest) startCore(ctx context.Context, inCompeteMode bool) {
khenaidoo6e55d9e2019-12-12 18:26:26 -0500139 cfg := config.NewRWCoreFlags()
serkant.uluderya8ff291d2020-05-20 00:58:00 -0700140 cfg.CoreTopic = "rw_core"
Himani Chawlab4c25912020-11-12 17:16:38 +0530141 cfg.EventTopic = "voltha.events"
khenaidoo442e7c72020-03-10 16:13:48 -0400142 cfg.DefaultRequestTimeout = lda.defaultTimeout
Neha Sharmad1387da2020-05-07 20:07:28 +0000143 cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(lda.kvClientPort)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500144 grpcPort, err := freeport.GetFreePort()
145 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000146 logger.Fatal(ctx, "Cannot get a freeport for grpc")
khenaidoo6e55d9e2019-12-12 18:26:26 -0500147 }
Neha Sharmad1387da2020-05-07 20:07:28 +0000148 cfg.GrpcAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000149 client := tst.SetupKVClient(ctx, cfg, lda.coreInstanceID)
Kent Hagerman2b216042020-04-03 18:28:56 -0400150 backend := &db.Backend{
151 Client: client,
152 StoreType: cfg.KVStoreType,
Neha Sharmad1387da2020-05-07 20:07:28 +0000153 Address: cfg.KVStoreAddress,
Kent Hagerman2b216042020-04-03 18:28:56 -0400154 Timeout: cfg.KVStoreTimeout,
serkant.uluderya8ff291d2020-05-20 00:58:00 -0700155 LivenessChannelInterval: cfg.LiveProbeInterval / 2}
Kent Hagerman2b216042020-04-03 18:28:56 -0400156 lda.kmp = kafka.NewInterContainerProxy(
Neha Sharmad1387da2020-05-07 20:07:28 +0000157 kafka.InterContainerAddress(cfg.KafkaAdapterAddress),
Kent Hagerman2b216042020-04-03 18:28:56 -0400158 kafka.MsgClient(lda.kClient),
David Bainbridge9ae13132020-06-22 17:28:01 -0700159 kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}))
Kent Hagerman2b216042020-04-03 18:28:56 -0400160
161 endpointMgr := kafka.NewEndpointManager(backend)
Kent Hagermanf5a67352020-04-30 15:15:26 -0400162 proxy := model.NewDBPath(backend)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000163 adapterMgr := adapter.NewAdapterManager(ctx, proxy, lda.coreInstanceID, lda.kClient)
Himani Chawlab4c25912020-11-12 17:16:38 +0530164 eventProxy := events.NewEventProxy(events.MsgClient(lda.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
Maninder0aabf0c2021-03-17 14:55:14 +0530165 lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg, lda.coreInstanceID, eventProxy)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000166 if err = lda.kmp.Start(ctx); err != nil {
167 logger.Fatal(ctx, "Cannot start InterContainerProxy")
Thomas Lee Se5a44012019-11-07 20:32:24 +0530168 }
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400169 adapterMgr.Start(context.Background())
khenaidoo6e55d9e2019-12-12 18:26:26 -0500170}
171
Rohan Agrawal31f21802020-06-12 05:38:46 +0000172func (lda *LDATest) stopAll(ctx context.Context) {
khenaidoo6e55d9e2019-12-12 18:26:26 -0500173 if lda.kClient != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000174 lda.kClient.Stop(ctx)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500175 }
Kent Hagerman2b216042020-04-03 18:28:56 -0400176 if lda.kmp != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000177 lda.kmp.Stop(ctx)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500178 }
179 if lda.etcdServer != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000180 tst.StopEmbeddedEtcdServer(ctx, lda.etcdServer)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500181 }
Himani Chawlab4c25912020-11-12 17:16:38 +0530182 if lda.kEventClient != nil {
183 lda.kEventClient.Stop(ctx)
184 }
khenaidoo6e55d9e2019-12-12 18:26:26 -0500185}
186
Kent Hagerman2b216042020-04-03 18:28:56 -0400187func (lda *LDATest) createLogicalDeviceAgent(t *testing.T) *LogicalAgent {
188 lDeviceMgr := lda.logicalDeviceMgr
189 deviceMgr := lda.deviceMgr
khenaidoo6e55d9e2019-12-12 18:26:26 -0500190 clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
191 clonedLD.Id = com.GetRandomString(10)
192 clonedLD.DatapathId = rand.Uint64()
Rohan Agrawal31f21802020-06-12 05:38:46 +0000193 lDeviceAgent := newLogicalAgent(context.Background(), clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbPath, lDeviceMgr.ldProxy, lDeviceMgr.defaultTimeout)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500194 lDeviceAgent.logicalDevice = clonedLD
Kent Hagerman2a07b862020-06-19 15:23:07 -0400195 for _, port := range lda.logicalPorts {
196 clonedPort := proto.Clone(port).(*voltha.LogicalPort)
197 handle, created, err := lDeviceAgent.portLoader.LockOrCreate(context.Background(), clonedPort)
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400198 if err != nil {
199 panic(err)
200 }
201 handle.Unlock()
202 if !created {
Kent Hagerman2a07b862020-06-19 15:23:07 -0400203 t.Errorf("port %d already exists", clonedPort.OfpPort.PortNo)
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400204 }
205 }
Kent Hagermanf5a67352020-04-30 15:15:26 -0400206 err := lDeviceAgent.ldProxy.Set(context.Background(), clonedLD.Id, clonedLD)
Thomas Lee Se5a44012019-11-07 20:32:24 +0530207 assert.Nil(t, err)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500208 lDeviceMgr.addLogicalDeviceAgentToMap(lDeviceAgent)
209 return lDeviceAgent
210}
211
Kent Hagerman2b216042020-04-03 18:28:56 -0400212func (lda *LDATest) updateLogicalDeviceConcurrently(t *testing.T, ldAgent *LogicalAgent, globalWG *sync.WaitGroup) {
Kent Hagerman2a07b862020-06-19 15:23:07 -0400213 originalLogicalPorts := ldAgent.listLogicalDevicePorts(context.Background())
214 assert.NotNil(t, originalLogicalPorts)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500215 var localWG sync.WaitGroup
216
217 // Change the state of the first port to FAILED
218 localWG.Add(1)
219 go func() {
Kent Hagerman2a07b862020-06-19 15:23:07 -0400220 err := ldAgent.updatePortState(context.Background(), 1, voltha.OperStatus_FAILED)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500221 assert.Nil(t, err)
222 localWG.Done()
223 }()
224
225 // Change the state of the second port to TESTING
226 localWG.Add(1)
227 go func() {
Kent Hagerman2a07b862020-06-19 15:23:07 -0400228 err := ldAgent.updatePortState(context.Background(), 2, voltha.OperStatus_TESTING)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500229 assert.Nil(t, err)
230 localWG.Done()
231 }()
232
233 // Change the state of the third port to UNKNOWN and then back to ACTIVE
234 localWG.Add(1)
235 go func() {
Kent Hagerman2a07b862020-06-19 15:23:07 -0400236 err := ldAgent.updatePortState(context.Background(), 3, voltha.OperStatus_UNKNOWN)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500237 assert.Nil(t, err)
Kent Hagerman2a07b862020-06-19 15:23:07 -0400238 err = ldAgent.updatePortState(context.Background(), 3, voltha.OperStatus_ACTIVE)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500239 assert.Nil(t, err)
240 localWG.Done()
241 }()
242
243 // Add a meter to the logical device
244 meterMod := &ofp.OfpMeterMod{
245 Command: ofp.OfpMeterModCommand_OFPMC_ADD,
246 Flags: rand.Uint32(),
247 MeterId: rand.Uint32(),
248 Bands: []*ofp.OfpMeterBandHeader{
249 {Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER,
250 Rate: rand.Uint32(),
251 BurstSize: rand.Uint32(),
252 Data: nil,
253 },
254 },
255 }
256 localWG.Add(1)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000257 ctx := context.Background()
khenaidoo6e55d9e2019-12-12 18:26:26 -0500258 go func() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000259 err := ldAgent.meterAdd(ctx, meterMod)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500260 assert.Nil(t, err)
261 localWG.Done()
262 }()
khenaidoo6e55d9e2019-12-12 18:26:26 -0500263 // wait for go routines to be done
264 localWG.Wait()
Rohan Agrawal31f21802020-06-12 05:38:46 +0000265 meterEntry := fu.MeterEntryFromMeterMod(ctx, meterMod)
Mahir Gunyeladdb66a2020-04-29 18:08:50 -0700266
Kent Hagerman433a31a2020-05-20 19:04:48 -0400267 meterHandle, have := ldAgent.meterLoader.Lock(meterMod.MeterId)
268 assert.Equal(t, have, true)
269 if have {
270 assert.True(t, proto.Equal(meterEntry, meterHandle.GetReadOnly()))
271 meterHandle.Unlock()
272 }
khenaidoo6e55d9e2019-12-12 18:26:26 -0500273
Kent Hagerman2a07b862020-06-19 15:23:07 -0400274 expectedLogicalPorts := make(map[uint32]*voltha.LogicalPort)
275 for _, port := range originalLogicalPorts {
276 clonedPort := proto.Clone(port).(*voltha.LogicalPort)
277 switch clonedPort.OfpPort.PortNo {
278 case 1:
279 clonedPort.OfpPort.Config = originalLogicalPorts[1].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
280 clonedPort.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
281 case 2:
282 clonedPort.OfpPort.Config = originalLogicalPorts[1].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
283 clonedPort.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
284 case 3:
285 clonedPort.OfpPort.Config = originalLogicalPorts[1].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
286 clonedPort.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
287 }
288 expectedLogicalPorts[clonedPort.OfpPort.PortNo] = clonedPort
289 }
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400290
Kent Hagerman2a07b862020-06-19 15:23:07 -0400291 updatedLogicalDevicePorts := ldAgent.listLogicalDevicePorts(ctx)
292 assert.Equal(t, len(expectedLogicalPorts), len(updatedLogicalDevicePorts))
293 for _, p := range updatedLogicalDevicePorts {
294 assert.True(t, proto.Equal(p, expectedLogicalPorts[p.OfpPort.PortNo]))
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400295 }
khenaidoo6e55d9e2019-12-12 18:26:26 -0500296 globalWG.Done()
297}
298
Himani Chawla40af2702021-01-27 15:06:30 +0530299func (lda *LDATest) stopLogicalAgentAndCheckEventQueueIsEmpty(ctx context.Context, t *testing.T, ldAgent *LogicalAgent) {
300 queueIsEmpty := false
301 err := ldAgent.stop(ctx)
302 assert.Nil(t, err)
303 qp := ldAgent.orderedEvents.assignQueuePosition()
304 if qp.prev != nil { // we will be definitely hitting this case as we pushed events on the queue before
305 // If previous channel is closed which it should be now,
306 // only then we can know that queue is empty.
307 _, ok := <-qp.prev
308 if !ok {
309 queueIsEmpty = true
310 } else {
311 queueIsEmpty = false
312 }
313 } else {
314 queueIsEmpty = true
315 }
316 close(qp.next)
317 assert.True(t, queueIsEmpty)
318}
319
320func (lda *LDATest) updateLogicalDevice(t *testing.T, ldAgent *LogicalAgent) {
321 originalLogicalPorts := ldAgent.listLogicalDevicePorts(context.Background())
322 assert.NotNil(t, originalLogicalPorts)
323
324 // Change the state of the first port to FAILED
325 err := ldAgent.updatePortState(context.Background(), 1, voltha.OperStatus_FAILED)
326 assert.Nil(t, err)
327
328 // Change the state of the second port to TESTING
329 err = ldAgent.updatePortState(context.Background(), 2, voltha.OperStatus_TESTING)
330 assert.Nil(t, err)
331
332 // Change the state of the third port to ACTIVE
333 err = ldAgent.updatePortState(context.Background(), 3, voltha.OperStatus_ACTIVE)
334 assert.Nil(t, err)
335
336}
337
khenaidoo6e55d9e2019-12-12 18:26:26 -0500338func TestConcurrentLogicalDeviceUpdate(t *testing.T) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000339 ctx := context.Background()
340 lda := newLDATest(ctx)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500341 assert.NotNil(t, lda)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000342 defer lda.stopAll(ctx)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500343
344 // Start the Core
Rohan Agrawal31f21802020-06-12 05:38:46 +0000345 lda.startCore(ctx, false)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500346
347 var wg sync.WaitGroup
khenaidoo442e7c72020-03-10 16:13:48 -0400348 numConCurrentLogicalDeviceAgents := 3
khenaidoo6e55d9e2019-12-12 18:26:26 -0500349 for i := 0; i < numConCurrentLogicalDeviceAgents; i++ {
350 wg.Add(1)
351 a := lda.createLogicalDeviceAgent(t)
352 go lda.updateLogicalDeviceConcurrently(t, a, &wg)
353 }
354
355 wg.Wait()
356}
Himani Chawla40af2702021-01-27 15:06:30 +0530357
358func TestLogicalAgentStopWithEventsInQueue(t *testing.T) {
359 ctx := context.Background()
360 lda := newLDATest(ctx)
361 assert.NotNil(t, lda)
362 defer lda.stopAll(ctx)
363
364 // Start the Core
365 lda.startCore(ctx, false)
366
367 a := lda.createLogicalDeviceAgent(t)
368 lda.updateLogicalDevice(t, a)
369 lda.stopLogicalAgentAndCheckEventQueueIsEmpty(ctx, t, a)
370}