blob: d968713c82b3b3f154f662eadc23cae24d672d2a [file] [log] [blame]
Himani Chawla6c38e572021-03-23 19:44:25 +05301/*
2 * Copyright 2020-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package events
18
19import (
20 "context"
21 "github.com/golang/protobuf/ptypes"
Girish Gowdra4c60c672021-07-26 13:30:57 -070022 "github.com/opencord/voltha-lib-go/v6/pkg/kafka"
23 "github.com/opencord/voltha-lib-go/v6/pkg/log"
24 mock_kafka "github.com/opencord/voltha-lib-go/v6/pkg/mocks/kafka"
Himani Chawla6c38e572021-03-23 19:44:25 +053025 "github.com/opencord/voltha-protos/v4/go/common"
26 "github.com/opencord/voltha-protos/v4/go/voltha"
27 "github.com/stretchr/testify/assert"
28 "strconv"
29 "testing"
30 "time"
31)
32
33const waitForKafkaEventsTimeout = 20 * time.Second
34const waitForEventProxyTimeout = 10 * time.Second
35
36func waitForEventProxyToStop(ep *EventProxy, resp chan string) {
37 timer := time.NewTimer(waitForEventProxyTimeout)
38 defer timer.Stop()
39 for {
40 select {
41 case <-time.After(2 * time.Millisecond):
42 if ep.eventQueue.insertPosition == nil {
43 resp <- "ok"
44 return
45 }
46 case <-timer.C:
47 resp <- "timer expired"
48 return
49 }
50
51 }
52}
53func waitForKafkaEvents(kc kafka.Client, topic *kafka.Topic, numEvents int, resp chan string) {
54 kafkaChnl, err := kc.Subscribe(context.Background(), topic)
55 if err != nil {
56 resp <- err.Error()
57 return
58 }
59 defer func() {
60 if kafkaChnl != nil {
61 if err = kc.UnSubscribe(context.Background(), topic, kafkaChnl); err != nil {
62 logger.Errorw(context.Background(), "unsubscribe-failed", log.Fields{"error": err})
63 }
64 }
65 }()
66 timer := time.NewTimer(waitForKafkaEventsTimeout)
67 defer timer.Stop()
68 count := 0
69loop:
70 for {
71 select {
72 case msg := <-kafkaChnl:
73 if msg.Body != nil {
74 event := voltha.Event{}
75 if err := ptypes.UnmarshalAny(msg.Body, &event); err == nil {
76 count += 1
77 if count == numEvents {
78 resp <- "ok"
79 break loop
80 }
81 }
82 }
83 case <-timer.C:
84 resp <- "timer expired"
85 break loop
86 }
87 }
88}
89
90func createAndSendEvent(proxy *EventProxy, ID string) error {
91 eventMsg := &voltha.RPCEvent{
92 Rpc: "dummy",
93 OperationId: ID,
94 ResourceId: "dummy",
95 Service: "dummy",
96 StackId: "dummy",
97 Status: &common.OperationResp{
98 Code: common.OperationResp_OPERATION_FAILURE,
99 },
100 Description: "dummy",
101 Context: nil,
102 }
103 var event voltha.Event
104 raisedTS := time.Now().Unix()
105 event.Header, _ = proxy.getEventHeader("RPC_ERROR_RAISE_EVENT", voltha.EventCategory_COMMUNICATION, nil, voltha.EventType_RPC_EVENT, raisedTS)
106 event.EventType = &voltha.Event_RpcEvent{RpcEvent: eventMsg}
107 err := proxy.SendRPCEvent(context.Background(), "RPC_ERROR_RAISE_EVENT", eventMsg, voltha.EventCategory_COMMUNICATION,
108 nil, time.Now().Unix())
109 return err
110}
111
112func TestEventProxyReceiveAndSendMessage(t *testing.T) {
113 // Init Kafka client
114 log.SetAllLogLevel(log.FatalLevel)
115 cTkc := mock_kafka.NewKafkaClient()
116 topic := kafka.Topic{Name: "myTopic"}
117
118 numEvents := 10
119 resp := make(chan string)
120 go waitForKafkaEvents(cTkc, &topic, numEvents, resp)
121
122 // Init Event Proxy
123 ep := NewEventProxy(MsgClient(cTkc), MsgTopic(topic))
124 go ep.Start()
125 time.Sleep(1 * time.Millisecond)
126 for i := 0; i < numEvents; i++ {
127 go func(ID int) {
128 err := createAndSendEvent(ep, strconv.Itoa(ID))
129 assert.Nil(t, err)
130 }(i)
131 }
132 val := <-resp
133 assert.Equal(t, val, "ok")
134 go ep.Stop()
135 go waitForEventProxyToStop(ep, resp)
136 val = <-resp
137 assert.Equal(t, val, "ok")
138}
139
140func TestEventProxyStopWhileSendingEvents(t *testing.T) {
141 // Init Kafka client
142 log.SetAllLogLevel(log.FatalLevel)
143 cTkc := mock_kafka.NewKafkaClient()
144 topic := kafka.Topic{Name: "myTopic"}
145
146 numEvents := 10
147 resp := make(chan string)
148 // Init Event Proxy
149 ep := NewEventProxy(MsgClient(cTkc), MsgTopic(topic))
150 go ep.Start()
151 time.Sleep(1 * time.Millisecond)
152 for i := 0; i < numEvents; i++ {
153 go func(ID int) {
154 err := createAndSendEvent(ep, strconv.Itoa(ID))
155 assert.Nil(t, err)
156 }(i)
157 }
158 // In this case we cannot guarantee how many events are send before
159 // sending the last event(stopping event proxy), any event send before Stop would be received.
160 go ep.Stop()
161 go waitForEventProxyToStop(ep, resp)
162 val := <-resp
163 assert.Equal(t, val, "ok")
164}
165
166func TestEventProxyStopWhenNoEventsSend(t *testing.T) {
167 // Init Kafka client
168 log.SetAllLogLevel(log.FatalLevel)
169 cTkc := mock_kafka.NewKafkaClient()
170 topic := kafka.Topic{Name: "myTopic"}
171 resp := make(chan string)
172 // Init Event Proxy
173 ep := NewEventProxy(MsgClient(cTkc), MsgTopic(topic))
174 go ep.Start()
175 time.Sleep(1 * time.Millisecond)
176 go ep.Stop()
177 go waitForEventProxyToStop(ep, resp)
178 val := <-resp
179 assert.Equal(t, val, "ok")
180}