blob: 583f66872eb53ee84760351117a00f9f413f3353 [file] [log] [blame]
Dinesh Belwalkar01217962019-05-23 21:51:16 +00001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package main
16
17import (
18 "fmt"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000019 "net/http"
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +000020 "os"
21 "os/signal"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000022 "io/ioutil"
23 "github.com/Shopify/sarama"
Dinesh Belwalkar01217962019-05-23 21:51:16 +000024)
25
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000026var (
27// broker = [2]string{"voltha-kafka.default.svc.cluster.local","9092"}
28 importerTopic = "importer"
29
30)
31
32var DataProducer sarama.AsyncProducer
33
34func kafkaInit() {
35 config := sarama.NewConfig()
36 config.Producer.RequiredAcks = sarama.WaitForAll
37 config.Producer.Retry.Max = 10
38 config.Producer.Return.Successes = true
39 producer, err := sarama.NewAsyncProducer([]string{"cord-kafka.default.svc.cluster.local:9092"}, config)
40 if err != nil {
41 panic(err)
42 }
43 DataProducer = producer
44 defer func() {
45 if err := producer.Close(); err != nil {
46 panic(err)
47 }
48 }()
49}
50
51func handle_events(w http.ResponseWriter, r *http.Request) {
52 signals := make(chan os.Signal, 1)
53 signal.Notify(signals, os.Interrupt)
54
55 if(r.Method == "POST"){
56 Body, err := ioutil.ReadAll(r.Body)
57 if err != nil {
58 fmt.Println("Error getting HTTP data",err)
59 }
60 defer r.Body.Close()
61 fmt.Printf("%s\n",Body)
62 message :=&sarama.ProducerMessage{
63 Topic: importerTopic,
64 Value: sarama.StringEncoder(Body),
65 }
66 select {
67 case DataProducer.Input() <- message:
68
69 case <-signals:
70 DataProducer.AsyncClose() // Trigger a shutdown of the producer.
71 }
72 }
73}
74
75func runServer() {
76 fmt.Println("Starting HTTP Server")
77 http.HandleFunc("/", handle_events)
78 http.ListenAndServe(":8080", nil)
79}
80
81func init() {
82 fmt.Println("Connecting to broker: ")
83 fmt.Println("Listening to http server")
84}
85
86
Dinesh Belwalkar01217962019-05-23 21:51:16 +000087func main() {
88 fmt.Println("Starting Device-management Container")
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000089 go kafkaInit()
90 go runServer()
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +000091 quit := make(chan os.Signal)
92 signal.Notify(quit, os.Interrupt)
93
94 select {
95 case sig := <-quit:
96 fmt.Println("Shutting down:", sig)
97 }
Dinesh Belwalkar01217962019-05-23 21:51:16 +000098}