[SEBA-617} adding http server, kafka producer and putting message on kafka bus

Change-Id: I15c19781174ab2523f5754cdfa00e055f9ca88c0
diff --git a/Dockerfile b/Dockerfile
index 3594da6..1e59813 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -19,6 +19,10 @@
 RUN mkdir /app
 ADD . /app/
 WORKDIR /app
+RUN apk add git
+RUN apk add gcc
+RUN apk add build-base 
+RUN go get github.com/Shopify/sarama
 RUN CGO_ENABLED=0 GOOS=linux go build -o main .
 
 FROM alpine:3.9.4
diff --git a/main.go b/main.go
index abfec3c..583f668 100644
--- a/main.go
+++ b/main.go
@@ -16,12 +16,78 @@
 
 import (
 	"fmt"
+	"net/http"
 	"os"
 	"os/signal"
+	"io/ioutil"
+	"github.com/Shopify/sarama"
 )
 
+var (
+//	broker  = [2]string{"voltha-kafka.default.svc.cluster.local","9092"}
+	importerTopic = "importer"
+
+)
+
+var DataProducer sarama.AsyncProducer
+
+func kafkaInit() {
+	config := sarama.NewConfig()
+        config.Producer.RequiredAcks = sarama.WaitForAll
+	config.Producer.Retry.Max = 10
+	config.Producer.Return.Successes = true
+	producer, err := sarama.NewAsyncProducer([]string{"cord-kafka.default.svc.cluster.local:9092"}, config)
+	if err != nil {
+		panic(err)
+	}
+	DataProducer = producer
+	defer func() {
+		if err := producer.Close(); err != nil {
+			panic(err)
+		}
+	}()
+}
+
+func handle_events(w http.ResponseWriter, r *http.Request) {
+	signals := make(chan os.Signal, 1)
+	signal.Notify(signals, os.Interrupt)
+
+	if(r.Method ==  "POST"){
+		Body, err := ioutil.ReadAll(r.Body)
+		if err != nil {
+			fmt.Println("Error getting HTTP data",err)
+		}
+		defer  r.Body.Close()
+		fmt.Printf("%s\n",Body)
+		message :=&sarama.ProducerMessage{
+                        Topic: importerTopic,
+                        Value: sarama.StringEncoder(Body),
+                }
+		select {
+		case DataProducer.Input() <- message:
+
+		case <-signals:
+	        DataProducer.AsyncClose() // Trigger a shutdown of the producer.
+		}
+	}
+}
+
+func runServer() {
+	fmt.Println("Starting HTTP Server")
+	http.HandleFunc("/", handle_events)
+	http.ListenAndServe(":8080", nil)
+}
+
+func init() {
+	fmt.Println("Connecting to broker: ")
+	fmt.Println("Listening to  http server")
+}
+
+
 func main() {
 	fmt.Println("Starting Device-management Container")
+	go kafkaInit()
+	go runServer()
 	quit := make(chan os.Signal)
 	signal.Notify(quit, os.Interrupt)