blob: b0502ac9a874e0189d08fe923f1c991d56292e67 [file] [log] [blame]
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +00001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package main
16
17import (
18 "fmt"
19 "os"
20 "os/signal"
21 "os/exec"
22 "github.com/Shopify/sarama"
23 "google.golang.org/grpc"
24 "golang.org/x/net/context"
25 importer "./proto"
26 log "github.com/Sirupsen/logrus"
27 "time"
28 "bytes"
29 "strings"
30
31)
32
33const (
34 address = "localhost:31085"
35 vendor = "edgecore"
36// device_ip = "192.168.3.44:9888"
37 device_ip = "192.168.4.27:8888"
38 protocol = "https"
39 freq = 180
40)
41var importerTopic = "importer"
42var DataConsumer sarama.Consumer
43
44func init() {
45 Formatter := new(log.TextFormatter)
46 Formatter.TimestampFormat = "02-01-2006 15:04:05"
47 Formatter.FullTimestamp = true
48 log.SetFormatter(Formatter)
49}
50
51func topicListener(topic *string, master sarama.Consumer) {
52 log.Info("Starting topicListener for ", *topic)
53 consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
54 if err != nil {
55 log.Error("topicListener panic, topic=[%s]: %s", *topic, err.Error())
56 os.Exit(1)
57 }
58 signals := make(chan os.Signal, 1)
59 signal.Notify(signals, os.Interrupt)
60 doneCh := make(chan struct{})
61 go func() {
62 for {
63 select {
64 case err := <-consumer.Errors():
65 log.Error("Consumer error: %s", err.Err)
66 case msg := <-consumer.Messages():
67 log.Info("Got message on topic=[%s]: %s", *topic, string(msg.Value))
68 case <-signals:
69 log.Warn("Interrupt is detected")
70 doneCh <- struct{}{}
71 }
72 }
73 }()
74 <-doneCh
75}
76
77func kafkainit() {
78 cmd := exec.Command("/bin/sh","kafka_ip.sh")
79 var kafkaIP string
80 var out bytes.Buffer
81 cmd.Stdout = &out
82 err := cmd.Run()
83 if err != nil {
84 log.Info(err)
85 os.Exit(1)
86 }
87
88 kafkaIP = out.String()
89 kafkaIP = strings.TrimSuffix(kafkaIP, "\n")
90 kafkaIP = kafkaIP +":9092"
91 fmt.Println("IP address of kafka-cord-0:",kafkaIP)
92 config := sarama.NewConfig()
93 config.Consumer.Return.Errors = true
94 master, err := sarama.NewConsumer([]string{kafkaIP}, config)
95 if err != nil {
96 panic(err)
97 }
98 DataConsumer = master
99
100 go topicListener(&importerTopic, master)
101}
102func main() {
103 log.Info("kafkaInit starting")
104 kafkainit()
105 // Set up a connection to the server.
106 fmt.Println("Starting connection")
107 conn, err := grpc.Dial(address, grpc.WithInsecure())
108 if err != nil {
109 fmt.Println("could not connect")
110 log.Fatal("did not connect: %v", err)
111 }
112 defer conn.Close()
113 c := importer.NewDeviceManagementClient(conn)
114
115 ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
116 defer cancel()
117 deviceinfo := new(importer.DeviceInfo)
118 deviceinfo.IpAddress = device_ip
119 deviceinfo.Vendor = vendor
120 deviceinfo.Frequency = freq
121 deviceinfo.Protocol = protocol
122 _, err = c.SendDeviceInfo(ctx, deviceinfo)
123 if err != nil {
124 log.Fatal("could not SendDeviceInfo: %v", err)
125 }
126 quit := make(chan os.Signal)
127 signal.Notify(quit, os.Interrupt)
128
129 select {
130 case sig := <-quit:
131 fmt.Println("Shutting down:", sig)
132 DataConsumer.Close()
133 panic(err)
134 }
135
136}