blob: 3f22002462983596222d3917908b777529128294 [file] [log] [blame]
khenaidoo6e55d9e2019-12-12 18:26:26 -05001/*
2* Copyright 2019-present Open Networking Foundation
3*
4* Licensed under the Apache License, Version 2.0 (the "License");
5* you may not use this file except in compliance with the License.
6* You may obtain a copy of the License at
7*
8* http://www.apache.org/licenses/LICENSE-2.0
9*
10* Unless required by applicable law or agreed to in writing, software
11* distributed under the License is distributed on an "AS IS" BASIS,
12* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13* See the License for the specific language governing permissions and
14* limitations under the License.
15 */
16package core
17
18import (
19 "context"
20 "github.com/gogo/protobuf/proto"
21 "github.com/opencord/voltha-go/rw_core/config"
22 com "github.com/opencord/voltha-lib-go/v2/pkg/adapters/common"
23 "github.com/opencord/voltha-lib-go/v2/pkg/kafka"
24 "github.com/opencord/voltha-lib-go/v2/pkg/log"
25 lm "github.com/opencord/voltha-lib-go/v2/pkg/mocks"
26 "github.com/opencord/voltha-protos/v2/go/voltha"
27 "github.com/phayes/freeport"
28 "github.com/stretchr/testify/assert"
29 "math/rand"
30 "strings"
31 "sync"
32 "testing"
33 "time"
34)
35
36type DATest struct {
37 etcdServer *lm.EtcdServer
38 core *Core
39 kClient kafka.Client
40 kvClientPort int
41 oltAdapterName string
42 onuAdapterName string
43 coreInstanceID string
44 defaultTimeout time.Duration
45 maxTimeout time.Duration
46 device *voltha.Device
47 done chan int
48}
49
50func newDATest() *DATest {
51 test := &DATest{}
52 // Start the embedded etcd server
53 var err error
54 test.etcdServer, test.kvClientPort, err = startEmbeddedEtcdServer("voltha.rwcore.da.test", "voltha.rwcore.da.etcd", "error")
55 if err != nil {
56 log.Fatal(err)
57 }
58 // Create the kafka client
59 test.kClient = lm.NewKafkaClient()
60 test.oltAdapterName = "olt_adapter_mock"
61 test.onuAdapterName = "onu_adapter_mock"
62 test.coreInstanceID = "rw-da-test"
63 test.defaultTimeout = 5 * time.Second
64 test.maxTimeout = 20 * time.Second
65 test.done = make(chan int)
66 parentID := com.GetRandomString(10)
67 test.device = &voltha.Device{
68 Type: "onu_adapter_mock",
69 ParentId: parentID,
70 ParentPortNo: 1,
71 VendorId: "onu_adapter_mock",
72 Adapter: "onu_adapter_mock",
73 Vlan: 100,
74 Address: nil,
75 ProxyAddress: &voltha.Device_ProxyAddress{
76 DeviceId: parentID,
77 DeviceType: "olt_adapter_mock",
78 ChannelId: 100,
79 ChannelGroupId: 0,
80 ChannelTermination: "",
81 OnuId: 2,
82 },
83 AdminState: voltha.AdminState_PREPROVISIONED,
84 OperStatus: voltha.OperStatus_UNKNOWN,
85 Reason: "All good",
86 ConnectStatus: voltha.ConnectStatus_UNKNOWN,
87 Custom: nil,
88 Ports: []*voltha.Port{
89 {PortNo: 1, Label: "pon-1", Type: voltha.Port_PON_ONU, AdminState: voltha.AdminState_ENABLED,
90 OperStatus: voltha.OperStatus_ACTIVE, Peers: []*voltha.Port_PeerPort{{DeviceId: parentID, PortNo: 1}}},
91 {PortNo: 100, Label: "uni-100", Type: voltha.Port_ETHERNET_UNI, AdminState: voltha.AdminState_ENABLED,
92 OperStatus: voltha.OperStatus_ACTIVE},
93 },
94 }
95
96 return test
97}
98
99func (dat *DATest) startCore(inCompeteMode bool) {
100 cfg := config.NewRWCoreFlags()
101 cfg.CorePairTopic = "rw_core"
102 cfg.DefaultRequestTimeout = dat.defaultTimeout.Nanoseconds() / 1000000 //TODO: change when Core changes to Duration
103 cfg.KVStorePort = dat.kvClientPort
104 cfg.InCompetingMode = inCompeteMode
105 grpcPort, err := freeport.GetFreePort()
106 if err != nil {
107 log.Fatal("Cannot get a freeport for grpc")
108 }
109 cfg.GrpcPort = grpcPort
110 cfg.GrpcHost = "127.0.0.1"
111 setCoreCompeteMode(inCompeteMode)
112 client := setupKVClient(cfg, dat.coreInstanceID)
113 dat.core = NewCore(dat.coreInstanceID, cfg, client, dat.kClient)
114 dat.core.Start(context.Background())
115}
116
117func (dat *DATest) stopAll() {
118 if dat.kClient != nil {
119 dat.kClient.Stop()
120 }
121 if dat.core != nil {
122 dat.core.Stop(context.Background())
123 }
124 if dat.etcdServer != nil {
125 stopEmbeddedEtcdServer(dat.etcdServer)
126 }
127}
128
129func (dat *DATest) createDeviceAgent(t *testing.T) *DeviceAgent {
130 deviceMgr := dat.core.deviceMgr
131 clonedDevice := proto.Clone(dat.device).(*voltha.Device)
132 deviceAgent := newDeviceAgent(deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.clusterDataProxy, deviceMgr.defaultTimeout)
133 d, err := deviceAgent.start(context.TODO(), clonedDevice)
134 assert.Nil(t, err)
135 assert.NotNil(t, d)
136 deviceMgr.addDeviceAgentToMap(deviceAgent)
137 return deviceAgent
138}
139
140func (dat *DATest) updateDeviceConcurrently(t *testing.T, da *DeviceAgent, globalWG *sync.WaitGroup) {
141 originalDevice := da.getDevice()
142 assert.NotNil(t, originalDevice)
143 var localWG sync.WaitGroup
144
145 // Update device routine
146 var (
147 root = false
148 vendor = "onu_adapter_mock"
149 model = "go-mock"
150 serialNumber = com.GetRandomSerialNumber()
151 macAddress = strings.ToUpper(com.GetRandomMacAddress())
152 vlan = rand.Uint32()
153 reason = "testing concurrent device update"
154 portToAdd = &voltha.Port{PortNo: 101, Label: "uni-101", Type: voltha.Port_ETHERNET_UNI, AdminState: voltha.AdminState_ENABLED,
155 OperStatus: voltha.OperStatus_ACTIVE}
156 )
157 localWG.Add(1)
158 go func() {
159 deviceToUpdate := proto.Clone(originalDevice).(*voltha.Device)
160 deviceToUpdate.Root = root
161 deviceToUpdate.Vendor = vendor
162 deviceToUpdate.Model = model
163 deviceToUpdate.SerialNumber = serialNumber
164 deviceToUpdate.MacAddress = macAddress
165 deviceToUpdate.Vlan = vlan
166 deviceToUpdate.Reason = reason
167 err := da.updateDeviceUsingAdapterData(deviceToUpdate)
168 assert.Nil(t, err)
169 localWG.Done()
170 }()
171
172 // Update the device status routine
173 localWG.Add(1)
174 go func() {
175 err := da.updateDeviceStatus(voltha.OperStatus_ACTIVE, voltha.ConnectStatus_REACHABLE)
176 assert.Nil(t, err)
177 localWG.Done()
178 }()
179
180 // Add a port routine
181 localWG.Add(1)
182 go func() {
183 err := da.addPort(portToAdd)
184 assert.Nil(t, err)
185 localWG.Done()
186 }()
187
188 // wait for go routines to be done
189 localWG.Wait()
190
191 expectedChange := proto.Clone(originalDevice).(*voltha.Device)
192 expectedChange.OperStatus = voltha.OperStatus_ACTIVE
193 expectedChange.ConnectStatus = voltha.ConnectStatus_REACHABLE
194 expectedChange.Ports = append(expectedChange.Ports, portToAdd)
195 expectedChange.Root = root
196 expectedChange.Vendor = vendor
197 expectedChange.Model = model
198 expectedChange.SerialNumber = serialNumber
199 expectedChange.MacAddress = macAddress
200 expectedChange.Vlan = vlan
201 expectedChange.Reason = reason
202
203 updatedDevice := da.getDevice()
204 assert.NotNil(t, updatedDevice)
205 assert.True(t, proto.Equal(expectedChange, updatedDevice))
206
207 globalWG.Done()
208}
209
210func TestConcurrentDevices(t *testing.T) {
211 da := newDATest()
212 assert.NotNil(t, da)
213 defer da.stopAll()
214
215 // Start the Core
216 da.startCore(false)
217
218 var wg sync.WaitGroup
219 numConCurrentDeviceAgents := 20
220 for i := 0; i < numConCurrentDeviceAgents; i++ {
221 wg.Add(1)
222 a := da.createDeviceAgent(t)
223 go da.updateDeviceConcurrently(t, a, &wg)
224 }
225
226 wg.Wait()
227}