blob: 8c9c8eedb08da9ceb563c66f5d3e0cc95c796547 [file] [log] [blame]
khenaidoo2bc48282019-07-16 18:13:46 -04001/*
Joey Armstrong5f51f2e2023-01-17 17:06:26 -05002 * Copyright 2019-2023 Open Networking Foundation (ONF) and the ONF Contributors
khenaidoo2bc48282019-07-16 18:13:46 -04003 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Kent Hagerman2b216042020-04-03 18:28:56 -040016package device
khenaidoo2bc48282019-07-16 18:13:46 -040017
18import (
khenaidoo6e55d9e2019-12-12 18:26:26 -050019 "context"
David Bainbridged1afd662020-03-26 18:27:41 -070020 "math/rand"
Neha Sharmad1387da2020-05-07 20:07:28 +000021 "strconv"
David Bainbridged1afd662020-03-26 18:27:41 -070022 "sync"
23 "testing"
24 "time"
25
Kent Hagermanf5a67352020-04-30 15:15:26 -040026 "github.com/gogo/protobuf/proto"
Mahir Gunyeladdb66a2020-04-29 18:08:50 -070027 "github.com/opencord/voltha-go/db/model"
Kent Hagermanf5a67352020-04-30 15:15:26 -040028 "github.com/opencord/voltha-go/rw_core/config"
Mahir Gunyeladdb66a2020-04-29 18:08:50 -070029 "github.com/opencord/voltha-go/rw_core/core/adapter"
Mahir Gunyel03de0d32020-06-03 01:36:59 -070030 tst "github.com/opencord/voltha-go/rw_core/test"
khenaidood948f772021-08-11 17:49:24 -040031 com "github.com/opencord/voltha-lib-go/v7/pkg/adapters/common"
32 "github.com/opencord/voltha-lib-go/v7/pkg/db"
33 "github.com/opencord/voltha-lib-go/v7/pkg/events"
34 fu "github.com/opencord/voltha-lib-go/v7/pkg/flows"
35 "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
36 mock_etcd "github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd"
37 mock_kafka "github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka"
38 ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
39 "github.com/opencord/voltha-protos/v5/go/voltha"
khenaidoo6e55d9e2019-12-12 18:26:26 -050040 "github.com/phayes/freeport"
khenaidoo2bc48282019-07-16 18:13:46 -040041 "github.com/stretchr/testify/assert"
khenaidoo2bc48282019-07-16 18:13:46 -040042)
43
khenaidoo6e55d9e2019-12-12 18:26:26 -050044type LDATest struct {
Kent Hagerman2b216042020-04-03 18:28:56 -040045 etcdServer *mock_etcd.EtcdServer
46 deviceMgr *Manager
Kent Hagerman2b216042020-04-03 18:28:56 -040047 logicalDeviceMgr *LogicalManager
48 kClient kafka.Client
Himani Chawlab4c25912020-11-12 17:16:38 +053049 kEventClient kafka.Client
Kent Hagerman2b216042020-04-03 18:28:56 -040050 kvClientPort int
51 oltAdapterName string
52 onuAdapterName string
53 coreInstanceID string
khenaidood948f772021-08-11 17:49:24 -040054 internalTimeout time.Duration
55 rpcTimeout time.Duration
Kent Hagerman2b216042020-04-03 18:28:56 -040056 logicalDevice *voltha.LogicalDevice
Kent Hagerman2a07b862020-06-19 15:23:07 -040057 logicalPorts map[uint32]*voltha.LogicalPort
Kent Hagerman2b216042020-04-03 18:28:56 -040058 deviceIds []string
59 done chan int
khenaidoo6e55d9e2019-12-12 18:26:26 -050060}
61
Rohan Agrawal31f21802020-06-12 05:38:46 +000062func newLDATest(ctx context.Context) *LDATest {
khenaidoo6e55d9e2019-12-12 18:26:26 -050063 test := &LDATest{}
64 // Start the embedded etcd server
65 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +000066 test.etcdServer, test.kvClientPort, err = tst.StartEmbeddedEtcdServer(ctx, "voltha.rwcore.lda.test", "voltha.rwcore.lda.etcd", "error")
khenaidoo6e55d9e2019-12-12 18:26:26 -050067 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +000068 logger.Fatal(ctx, err)
khenaidoo6e55d9e2019-12-12 18:26:26 -050069 }
70 // Create the kafka client
Matteo Scandolod525ae32020-04-02 17:27:29 -070071 test.kClient = mock_kafka.NewKafkaClient()
Himani Chawlab4c25912020-11-12 17:16:38 +053072 test.kEventClient = mock_kafka.NewKafkaClient()
khenaidoo6e55d9e2019-12-12 18:26:26 -050073 test.oltAdapterName = "olt_adapter_mock"
74 test.onuAdapterName = "onu_adapter_mock"
75 test.coreInstanceID = "rw-da-test"
khenaidood948f772021-08-11 17:49:24 -040076 test.internalTimeout = 5 * time.Second
77 test.rpcTimeout = 20 * time.Second
khenaidoo6e55d9e2019-12-12 18:26:26 -050078 test.done = make(chan int)
79 test.deviceIds = []string{com.GetRandomString(10), com.GetRandomString(10), com.GetRandomString(10)}
80 test.logicalDevice = &voltha.LogicalDevice{
81 Desc: &ofp.OfpDesc{
82 HwDesc: "olt_adapter_mock",
83 SwDesc: "olt_adapter_mock",
84 SerialNum: com.GetRandomSerialNumber(),
85 },
86 SwitchFeatures: &ofp.OfpSwitchFeatures{
87 NBuffers: 256,
88 NTables: 2,
89 Capabilities: uint32(ofp.OfpCapabilities_OFPC_FLOW_STATS |
90 ofp.OfpCapabilities_OFPC_TABLE_STATS |
91 ofp.OfpCapabilities_OFPC_PORT_STATS |
92 ofp.OfpCapabilities_OFPC_GROUP_STATS),
93 },
94 RootDeviceId: test.deviceIds[0],
Kent Hagerman2a07b862020-06-19 15:23:07 -040095 }
96 test.logicalPorts = map[uint32]*voltha.LogicalPort{
97 1: {
98 Id: "1001",
99 DeviceId: test.deviceIds[0],
100 DevicePortNo: 1,
101 RootPort: true,
102 OfpPort: &ofp.OfpPort{
103 PortNo: 1,
104 Name: "port1",
105 Config: 4,
106 State: 4,
khenaidoo6e55d9e2019-12-12 18:26:26 -0500107 },
Kent Hagerman2a07b862020-06-19 15:23:07 -0400108 },
109 2: {
110 Id: "1002",
111 DeviceId: test.deviceIds[1],
112 DevicePortNo: 2,
113 RootPort: false,
114 OfpPort: &ofp.OfpPort{
115 PortNo: 2,
116 Name: "port2",
117 Config: 4,
118 State: 4,
khenaidoo6e55d9e2019-12-12 18:26:26 -0500119 },
Kent Hagerman2a07b862020-06-19 15:23:07 -0400120 },
121 3: {
122 Id: "1003",
123 DeviceId: test.deviceIds[2],
124 DevicePortNo: 3,
125 RootPort: false,
126 OfpPort: &ofp.OfpPort{
127 PortNo: 3,
128 Name: "port3",
129 Config: 4,
130 State: 4,
khenaidoo6e55d9e2019-12-12 18:26:26 -0500131 },
132 },
133 }
134 return test
135}
136
Rohan Agrawal31f21802020-06-12 05:38:46 +0000137func (lda *LDATest) startCore(ctx context.Context, inCompeteMode bool) {
David K. Bainbridge6080c172021-07-24 00:22:28 +0000138 cfg := &config.RWCoreFlags{}
139 cfg.ParseCommandArguments([]string{})
Himani Chawlab4c25912020-11-12 17:16:38 +0530140 cfg.EventTopic = "voltha.events"
khenaidood948f772021-08-11 17:49:24 -0400141 cfg.InternalTimeout = lda.internalTimeout
Neha Sharmad1387da2020-05-07 20:07:28 +0000142 cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(lda.kvClientPort)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500143 grpcPort, err := freeport.GetFreePort()
144 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000145 logger.Fatal(ctx, "Cannot get a freeport for grpc")
khenaidoo6e55d9e2019-12-12 18:26:26 -0500146 }
khenaidood948f772021-08-11 17:49:24 -0400147 cfg.GrpcNBIAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000148 client := tst.SetupKVClient(ctx, cfg, lda.coreInstanceID)
Kent Hagerman2b216042020-04-03 18:28:56 -0400149 backend := &db.Backend{
150 Client: client,
151 StoreType: cfg.KVStoreType,
Neha Sharmad1387da2020-05-07 20:07:28 +0000152 Address: cfg.KVStoreAddress,
Kent Hagerman2b216042020-04-03 18:28:56 -0400153 Timeout: cfg.KVStoreTimeout,
serkant.uluderya8ff291d2020-05-20 00:58:00 -0700154 LivenessChannelInterval: cfg.LiveProbeInterval / 2}
Kent Hagerman2b216042020-04-03 18:28:56 -0400155
Kent Hagermanf5a67352020-04-30 15:15:26 -0400156 proxy := model.NewDBPath(backend)
khenaidoo25057da2021-12-08 14:40:45 -0500157 adapterMgr := adapter.NewAdapterManager("test-endpoint", proxy, lda.coreInstanceID, backend, 5)
Himani Chawlab4c25912020-11-12 17:16:38 +0530158 eventProxy := events.NewEventProxy(events.MsgClient(lda.kEventClient), events.MsgTopic(kafka.Topic{Name: cfg.EventTopic}))
khenaidood948f772021-08-11 17:49:24 -0400159 lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, cfg, lda.coreInstanceID, eventProxy)
160 adapterMgr.Start(context.Background(), "logical-test")
khenaidoo6e55d9e2019-12-12 18:26:26 -0500161}
162
Rohan Agrawal31f21802020-06-12 05:38:46 +0000163func (lda *LDATest) stopAll(ctx context.Context) {
khenaidoo6e55d9e2019-12-12 18:26:26 -0500164 if lda.kClient != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000165 lda.kClient.Stop(ctx)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500166 }
khenaidoo6e55d9e2019-12-12 18:26:26 -0500167 if lda.etcdServer != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000168 tst.StopEmbeddedEtcdServer(ctx, lda.etcdServer)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500169 }
Himani Chawlab4c25912020-11-12 17:16:38 +0530170 if lda.kEventClient != nil {
171 lda.kEventClient.Stop(ctx)
172 }
khenaidoo6e55d9e2019-12-12 18:26:26 -0500173}
174
Kent Hagerman2b216042020-04-03 18:28:56 -0400175func (lda *LDATest) createLogicalDeviceAgent(t *testing.T) *LogicalAgent {
176 lDeviceMgr := lda.logicalDeviceMgr
177 deviceMgr := lda.deviceMgr
khenaidoo6e55d9e2019-12-12 18:26:26 -0500178 clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
179 clonedLD.Id = com.GetRandomString(10)
180 clonedLD.DatapathId = rand.Uint64()
khenaidood948f772021-08-11 17:49:24 -0400181 lDeviceAgent := newLogicalAgent(context.Background(), clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbPath, lDeviceMgr.ldProxy, lDeviceMgr.internalTimeout)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500182 lDeviceAgent.logicalDevice = clonedLD
Kent Hagerman2a07b862020-06-19 15:23:07 -0400183 for _, port := range lda.logicalPorts {
184 clonedPort := proto.Clone(port).(*voltha.LogicalPort)
185 handle, created, err := lDeviceAgent.portLoader.LockOrCreate(context.Background(), clonedPort)
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400186 if err != nil {
187 panic(err)
188 }
189 handle.Unlock()
190 if !created {
Kent Hagerman2a07b862020-06-19 15:23:07 -0400191 t.Errorf("port %d already exists", clonedPort.OfpPort.PortNo)
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400192 }
193 }
Kent Hagermanf5a67352020-04-30 15:15:26 -0400194 err := lDeviceAgent.ldProxy.Set(context.Background(), clonedLD.Id, clonedLD)
Thomas Lee Se5a44012019-11-07 20:32:24 +0530195 assert.Nil(t, err)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500196 lDeviceMgr.addLogicalDeviceAgentToMap(lDeviceAgent)
197 return lDeviceAgent
198}
199
Kent Hagerman2b216042020-04-03 18:28:56 -0400200func (lda *LDATest) updateLogicalDeviceConcurrently(t *testing.T, ldAgent *LogicalAgent, globalWG *sync.WaitGroup) {
Kent Hagerman2a07b862020-06-19 15:23:07 -0400201 originalLogicalPorts := ldAgent.listLogicalDevicePorts(context.Background())
202 assert.NotNil(t, originalLogicalPorts)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500203 var localWG sync.WaitGroup
204
205 // Change the state of the first port to FAILED
206 localWG.Add(1)
207 go func() {
Kent Hagerman2a07b862020-06-19 15:23:07 -0400208 err := ldAgent.updatePortState(context.Background(), 1, voltha.OperStatus_FAILED)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500209 assert.Nil(t, err)
210 localWG.Done()
211 }()
212
213 // Change the state of the second port to TESTING
214 localWG.Add(1)
215 go func() {
Kent Hagerman2a07b862020-06-19 15:23:07 -0400216 err := ldAgent.updatePortState(context.Background(), 2, voltha.OperStatus_TESTING)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500217 assert.Nil(t, err)
218 localWG.Done()
219 }()
220
221 // Change the state of the third port to UNKNOWN and then back to ACTIVE
222 localWG.Add(1)
223 go func() {
Kent Hagerman2a07b862020-06-19 15:23:07 -0400224 err := ldAgent.updatePortState(context.Background(), 3, voltha.OperStatus_UNKNOWN)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500225 assert.Nil(t, err)
Kent Hagerman2a07b862020-06-19 15:23:07 -0400226 err = ldAgent.updatePortState(context.Background(), 3, voltha.OperStatus_ACTIVE)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500227 assert.Nil(t, err)
228 localWG.Done()
229 }()
230
231 // Add a meter to the logical device
232 meterMod := &ofp.OfpMeterMod{
233 Command: ofp.OfpMeterModCommand_OFPMC_ADD,
234 Flags: rand.Uint32(),
235 MeterId: rand.Uint32(),
236 Bands: []*ofp.OfpMeterBandHeader{
237 {Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER,
238 Rate: rand.Uint32(),
239 BurstSize: rand.Uint32(),
240 Data: nil,
241 },
242 },
243 }
244 localWG.Add(1)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000245 ctx := context.Background()
khenaidoo6e55d9e2019-12-12 18:26:26 -0500246 go func() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000247 err := ldAgent.meterAdd(ctx, meterMod)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500248 assert.Nil(t, err)
249 localWG.Done()
250 }()
khenaidoo6e55d9e2019-12-12 18:26:26 -0500251 // wait for go routines to be done
252 localWG.Wait()
Rohan Agrawal31f21802020-06-12 05:38:46 +0000253 meterEntry := fu.MeterEntryFromMeterMod(ctx, meterMod)
Mahir Gunyeladdb66a2020-04-29 18:08:50 -0700254
khenaidoo1a0d6222021-06-30 16:48:44 -0400255 meterHandle, have := ldAgent.meterLoader.Lock(meterMod.MeterId)
Kent Hagerman433a31a2020-05-20 19:04:48 -0400256 assert.Equal(t, have, true)
257 if have {
258 assert.True(t, proto.Equal(meterEntry, meterHandle.GetReadOnly()))
259 meterHandle.Unlock()
260 }
khenaidoo6e55d9e2019-12-12 18:26:26 -0500261
Kent Hagerman2a07b862020-06-19 15:23:07 -0400262 expectedLogicalPorts := make(map[uint32]*voltha.LogicalPort)
263 for _, port := range originalLogicalPorts {
264 clonedPort := proto.Clone(port).(*voltha.LogicalPort)
265 switch clonedPort.OfpPort.PortNo {
266 case 1:
267 clonedPort.OfpPort.Config = originalLogicalPorts[1].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
268 clonedPort.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
269 case 2:
270 clonedPort.OfpPort.Config = originalLogicalPorts[1].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
271 clonedPort.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
272 case 3:
273 clonedPort.OfpPort.Config = originalLogicalPorts[1].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
274 clonedPort.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
275 }
276 expectedLogicalPorts[clonedPort.OfpPort.PortNo] = clonedPort
277 }
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400278
Kent Hagerman2a07b862020-06-19 15:23:07 -0400279 updatedLogicalDevicePorts := ldAgent.listLogicalDevicePorts(ctx)
280 assert.Equal(t, len(expectedLogicalPorts), len(updatedLogicalDevicePorts))
281 for _, p := range updatedLogicalDevicePorts {
282 assert.True(t, proto.Equal(p, expectedLogicalPorts[p.OfpPort.PortNo]))
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400283 }
khenaidoo6e55d9e2019-12-12 18:26:26 -0500284 globalWG.Done()
285}
286
Himani Chawla40af2702021-01-27 15:06:30 +0530287func (lda *LDATest) stopLogicalAgentAndCheckEventQueueIsEmpty(ctx context.Context, t *testing.T, ldAgent *LogicalAgent) {
288 queueIsEmpty := false
289 err := ldAgent.stop(ctx)
290 assert.Nil(t, err)
291 qp := ldAgent.orderedEvents.assignQueuePosition()
292 if qp.prev != nil { // we will be definitely hitting this case as we pushed events on the queue before
293 // If previous channel is closed which it should be now,
294 // only then we can know that queue is empty.
295 _, ok := <-qp.prev
296 if !ok {
297 queueIsEmpty = true
298 } else {
299 queueIsEmpty = false
300 }
301 } else {
302 queueIsEmpty = true
303 }
304 close(qp.next)
305 assert.True(t, queueIsEmpty)
306}
307
308func (lda *LDATest) updateLogicalDevice(t *testing.T, ldAgent *LogicalAgent) {
309 originalLogicalPorts := ldAgent.listLogicalDevicePorts(context.Background())
310 assert.NotNil(t, originalLogicalPorts)
311
312 // Change the state of the first port to FAILED
313 err := ldAgent.updatePortState(context.Background(), 1, voltha.OperStatus_FAILED)
314 assert.Nil(t, err)
315
316 // Change the state of the second port to TESTING
317 err = ldAgent.updatePortState(context.Background(), 2, voltha.OperStatus_TESTING)
318 assert.Nil(t, err)
319
320 // Change the state of the third port to ACTIVE
321 err = ldAgent.updatePortState(context.Background(), 3, voltha.OperStatus_ACTIVE)
322 assert.Nil(t, err)
323
324}
325
khenaidoo6e55d9e2019-12-12 18:26:26 -0500326func TestConcurrentLogicalDeviceUpdate(t *testing.T) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000327 ctx := context.Background()
328 lda := newLDATest(ctx)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500329 assert.NotNil(t, lda)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000330 defer lda.stopAll(ctx)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500331
332 // Start the Core
Rohan Agrawal31f21802020-06-12 05:38:46 +0000333 lda.startCore(ctx, false)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500334
335 var wg sync.WaitGroup
khenaidoo442e7c72020-03-10 16:13:48 -0400336 numConCurrentLogicalDeviceAgents := 3
khenaidoo6e55d9e2019-12-12 18:26:26 -0500337 for i := 0; i < numConCurrentLogicalDeviceAgents; i++ {
338 wg.Add(1)
339 a := lda.createLogicalDeviceAgent(t)
340 go lda.updateLogicalDeviceConcurrently(t, a, &wg)
341 }
342
343 wg.Wait()
344}
Himani Chawla40af2702021-01-27 15:06:30 +0530345
346func TestLogicalAgentStopWithEventsInQueue(t *testing.T) {
347 ctx := context.Background()
348 lda := newLDATest(ctx)
349 assert.NotNil(t, lda)
350 defer lda.stopAll(ctx)
351
352 // Start the Core
353 lda.startCore(ctx, false)
354
355 a := lda.createLogicalDeviceAgent(t)
356 lda.updateLogicalDevice(t, a)
357 lda.stopLogicalAgentAndCheckEventQueueIsEmpty(ctx, t, a)
358}