blob: da7b1205fb18892f227b620ccc20490d14abd81c [file] [log] [blame]
Matteo Scandoloaab36db2018-10-09 19:54:11 -07001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package main
16
17import (
18 "encoding/json"
19 "fmt"
20 "log"
21 "os"
22 "os/signal"
23 "sync"
24
25 "github.com/Shopify/sarama"
26)
27
28func ONOSListener(topic *string, master sarama.Consumer, wg sync.WaitGroup) {
29 fmt.Println("Starting ONOSListener")
30 defer wg.Done()
31 consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
32 if err != nil {
33 fmt.Println("ONOSListener panic")
34 panic(err)
35 }
36 signals := make(chan os.Signal, 1)
37 signal.Notify(signals, os.Interrupt)
38 doneCh := make(chan struct{})
39 go func() {
40 for {
41 select {
42 case err := <-consumer.Errors():
43 fmt.Println(err)
44 case msg := <-consumer.Messages():
45
46 kpi := OnosKPI{}
47
48 err := json.Unmarshal(msg.Value, &kpi)
49
50 if err != nil {
51 log.Fatal(err)
52 }
53
54 exportOnosKPI(kpi)
55
56 case <-signals:
57 fmt.Println("Interrupt is detected")
58 doneCh <- struct{}{}
59 }
60 }
61 }()
62 <-doneCh
63}