[SEBA-617} adding http server, kafka producer and putting message on kafka bus
Change-Id: I15c19781174ab2523f5754cdfa00e055f9ca88c0
diff --git a/Dockerfile b/Dockerfile
index 3594da6..1e59813 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -19,6 +19,10 @@
RUN mkdir /app
ADD . /app/
WORKDIR /app
+RUN apk add git
+RUN apk add gcc
+RUN apk add build-base
+RUN go get github.com/Shopify/sarama
RUN CGO_ENABLED=0 GOOS=linux go build -o main .
FROM alpine:3.9.4
diff --git a/main.go b/main.go
index abfec3c..583f668 100644
--- a/main.go
+++ b/main.go
@@ -16,12 +16,78 @@
import (
"fmt"
+ "net/http"
"os"
"os/signal"
+ "io/ioutil"
+ "github.com/Shopify/sarama"
)
+var (
+// broker = [2]string{"voltha-kafka.default.svc.cluster.local","9092"}
+ importerTopic = "importer"
+
+)
+
+var DataProducer sarama.AsyncProducer
+
+func kafkaInit() {
+ config := sarama.NewConfig()
+ config.Producer.RequiredAcks = sarama.WaitForAll
+ config.Producer.Retry.Max = 10
+ config.Producer.Return.Successes = true
+ producer, err := sarama.NewAsyncProducer([]string{"cord-kafka.default.svc.cluster.local:9092"}, config)
+ if err != nil {
+ panic(err)
+ }
+ DataProducer = producer
+ defer func() {
+ if err := producer.Close(); err != nil {
+ panic(err)
+ }
+ }()
+}
+
+func handle_events(w http.ResponseWriter, r *http.Request) {
+ signals := make(chan os.Signal, 1)
+ signal.Notify(signals, os.Interrupt)
+
+ if(r.Method == "POST"){
+ Body, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ fmt.Println("Error getting HTTP data",err)
+ }
+ defer r.Body.Close()
+ fmt.Printf("%s\n",Body)
+ message :=&sarama.ProducerMessage{
+ Topic: importerTopic,
+ Value: sarama.StringEncoder(Body),
+ }
+ select {
+ case DataProducer.Input() <- message:
+
+ case <-signals:
+ DataProducer.AsyncClose() // Trigger a shutdown of the producer.
+ }
+ }
+}
+
+func runServer() {
+ fmt.Println("Starting HTTP Server")
+ http.HandleFunc("/", handle_events)
+ http.ListenAndServe(":8080", nil)
+}
+
+func init() {
+ fmt.Println("Connecting to broker: ")
+ fmt.Println("Listening to http server")
+}
+
+
func main() {
fmt.Println("Starting Device-management Container")
+ go kafkaInit()
+ go runServer()
quit := make(chan os.Signal)
signal.Notify(quit, os.Interrupt)