blob: 70f1e30bd5e7f0c14a666daf9c8cfe21d6f7475d [file] [log] [blame]
Himani Chawla6c38e572021-03-23 19:44:25 +05301/*
2 * Copyright 2020-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package events
18
19import (
20 "context"
Himani Chawla6c38e572021-03-23 19:44:25 +053021 "strconv"
22 "testing"
23 "time"
khenaidoo26721882021-08-11 17:42:52 -040024
25 "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
26 "github.com/opencord/voltha-lib-go/v7/pkg/log"
27 mock_kafka "github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka"
28 "github.com/opencord/voltha-protos/v5/go/common"
29 "github.com/opencord/voltha-protos/v5/go/voltha"
30 "github.com/stretchr/testify/assert"
Himani Chawla6c38e572021-03-23 19:44:25 +053031)
32
33const waitForKafkaEventsTimeout = 20 * time.Second
34const waitForEventProxyTimeout = 10 * time.Second
35
36func waitForEventProxyToStop(ep *EventProxy, resp chan string) {
37 timer := time.NewTimer(waitForEventProxyTimeout)
38 defer timer.Stop()
39 for {
40 select {
41 case <-time.After(2 * time.Millisecond):
42 if ep.eventQueue.insertPosition == nil {
43 resp <- "ok"
44 return
45 }
46 case <-timer.C:
47 resp <- "timer expired"
48 return
49 }
50
51 }
52}
53func waitForKafkaEvents(kc kafka.Client, topic *kafka.Topic, numEvents int, resp chan string) {
54 kafkaChnl, err := kc.Subscribe(context.Background(), topic)
55 if err != nil {
56 resp <- err.Error()
57 return
58 }
59 defer func() {
60 if kafkaChnl != nil {
61 if err = kc.UnSubscribe(context.Background(), topic, kafkaChnl); err != nil {
62 logger.Errorw(context.Background(), "unsubscribe-failed", log.Fields{"error": err})
63 }
64 }
65 }()
66 timer := time.NewTimer(waitForKafkaEventsTimeout)
67 defer timer.Stop()
68 count := 0
69loop:
70 for {
71 select {
72 case msg := <-kafkaChnl:
khenaidoo26721882021-08-11 17:42:52 -040073 if _, ok := msg.(*voltha.Event); ok {
74 count += 1
75 if count == numEvents {
76 resp <- "ok"
77 break loop
Himani Chawla6c38e572021-03-23 19:44:25 +053078 }
79 }
80 case <-timer.C:
81 resp <- "timer expired"
82 break loop
83 }
84 }
85}
86
khenaidoo26721882021-08-11 17:42:52 -040087func createAndSendEvent(proxy *EventProxy, id string) error {
Himani Chawla6c38e572021-03-23 19:44:25 +053088 eventMsg := &voltha.RPCEvent{
89 Rpc: "dummy",
khenaidoo26721882021-08-11 17:42:52 -040090 OperationId: id,
Himani Chawla6c38e572021-03-23 19:44:25 +053091 ResourceId: "dummy",
92 Service: "dummy",
93 StackId: "dummy",
94 Status: &common.OperationResp{
95 Code: common.OperationResp_OPERATION_FAILURE,
96 },
97 Description: "dummy",
98 Context: nil,
99 }
100 var event voltha.Event
101 raisedTS := time.Now().Unix()
102 event.Header, _ = proxy.getEventHeader("RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION, nil, voltha.EventType_RPC_EVENT, raisedTS)
103 event.EventType = &voltha.Event_RpcEvent{RpcEvent: eventMsg}
104 err := proxy.SendRPCEvent(context.Background(), "RPC_ERROR_RAISE_EVENT", eventMsg, voltha.EventCategory_COMMUNICATION,
105 nil, time.Now().Unix())
106 return err
107}
108
109func TestEventProxyReceiveAndSendMessage(t *testing.T) {
110 // Init Kafka client
111 log.SetAllLogLevel(log.FatalLevel)
112 cTkc := mock_kafka.NewKafkaClient()
113 topic := kafka.Topic{Name: "myTopic"}
114
115 numEvents := 10
116 resp := make(chan string)
117 go waitForKafkaEvents(cTkc, &topic, numEvents, resp)
118
119 // Init Event Proxy
120 ep := NewEventProxy(MsgClient(cTkc), MsgTopic(topic))
khenaidoo26721882021-08-11 17:42:52 -0400121 go func() {
122 if err := ep.Start(); err != nil {
123 logger.Fatalw(context.Background(), "failure-starting-event-proxy", log.Fields{"error": err})
124 }
125 }()
Himani Chawla6c38e572021-03-23 19:44:25 +0530126 time.Sleep(1 * time.Millisecond)
127 for i := 0; i < numEvents; i++ {
128 go func(ID int) {
129 err := createAndSendEvent(ep, strconv.Itoa(ID))
130 assert.Nil(t, err)
131 }(i)
132 }
133 val := <-resp
134 assert.Equal(t, val, "ok")
135 go ep.Stop()
136 go waitForEventProxyToStop(ep, resp)
137 val = <-resp
138 assert.Equal(t, val, "ok")
139}
140
141func TestEventProxyStopWhileSendingEvents(t *testing.T) {
142 // Init Kafka client
143 log.SetAllLogLevel(log.FatalLevel)
144 cTkc := mock_kafka.NewKafkaClient()
145 topic := kafka.Topic{Name: "myTopic"}
146
147 numEvents := 10
148 resp := make(chan string)
149 // Init Event Proxy
150 ep := NewEventProxy(MsgClient(cTkc), MsgTopic(topic))
khenaidoo26721882021-08-11 17:42:52 -0400151 go func() {
152 if err := ep.Start(); err != nil {
153 logger.Fatalw(context.Background(), "failure-starting-event-proxy", log.Fields{"error": err})
154 }
155 }()
Himani Chawla6c38e572021-03-23 19:44:25 +0530156 time.Sleep(1 * time.Millisecond)
157 for i := 0; i < numEvents; i++ {
158 go func(ID int) {
159 err := createAndSendEvent(ep, strconv.Itoa(ID))
160 assert.Nil(t, err)
161 }(i)
162 }
163 // In this case we cannot guarantee how many events are send before
164 // sending the last event(stopping event proxy), any event send before Stop would be received.
165 go ep.Stop()
166 go waitForEventProxyToStop(ep, resp)
167 val := <-resp
168 assert.Equal(t, val, "ok")
169}
170
171func TestEventProxyStopWhenNoEventsSend(t *testing.T) {
172 // Init Kafka client
173 log.SetAllLogLevel(log.FatalLevel)
174 cTkc := mock_kafka.NewKafkaClient()
175 topic := kafka.Topic{Name: "myTopic"}
176 resp := make(chan string)
177 // Init Event Proxy
178 ep := NewEventProxy(MsgClient(cTkc), MsgTopic(topic))
khenaidoo26721882021-08-11 17:42:52 -0400179 go func() {
180 if err := ep.Start(); err != nil {
181 logger.Fatalw(context.Background(), "failure-starting-event-proxy", log.Fields{"error": err})
182 }
183 }()
Himani Chawla6c38e572021-03-23 19:44:25 +0530184 time.Sleep(1 * time.Millisecond)
185 go ep.Stop()
186 go waitForEventProxyToStop(ep, resp)
187 val := <-resp
188 assert.Equal(t, val, "ok")
189}