[SEBA-618] create protofile and API for importer to expose
	This commit adds
		-protofile
		-grpc server
		-service to register device
		-put event data received from http server to kafka bus
		- server structure for all methods and databse to store device information

Change-Id: Ic56d42553c132305bf485188b4234cb1dfcba511
diff --git a/Dockerfile b/Dockerfile
index 1e59813..a03a39f 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -15,14 +15,29 @@
 # docker build -t opencord/device-management:latest .
 # docker build -t 10.90.0.101:30500/opencord/kafka-topic-exporter:latest .
 
-FROM golang:1.12-alpine3.9 AS build-env  
+FROM golang:1.12 AS build-env
 RUN mkdir /app
 ADD . /app/
 WORKDIR /app
-RUN apk add git
-RUN apk add gcc
-RUN apk add build-base 
-RUN go get github.com/Shopify/sarama
+ENV PROTOC_VERSION 3.6.1
+ENV PROTOC_SHA256SUM 6003de742ea3fcf703cfec1cd4a3380fd143081a2eb0e559065563496af27807
+RUN apt-get update && apt-get install -y \
+	git \
+	gcc \
+	curl \
+	unzip 
+RUN curl -L -o /tmp/protoc-${PROTOC_VERSION}-linux-x86_64.zip https://github.com/google/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip 
+RUN mkdir /tmp/protoc3
+RUN echo "$PROTOC_SHA256SUM  /tmp/protoc-${PROTOC_VERSION}-linux-x86_64.zip" | sha256sum -c - \
+ &&  unzip /tmp/protoc-${PROTOC_VERSION}-linux-x86_64.zip -d /tmp/protoc3 \
+ && mv /tmp/protoc3/bin/* /usr/local/bin/ \
+ && mv /tmp/protoc3/include/* /usr/local/include/
+RUN go get -u google.golang.org/grpc \
+    && go get github.com/golang/protobuf/proto \
+    && go get -v github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway \
+    && go get -v github.com/golang/protobuf/protoc-gen-go \
+    && go get github.com/Shopify/sarama \
+    &&  protoc -I proto -I${GOPATH}/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis --go_out=plugins=grpc:proto/  proto/*.proto
 RUN CGO_ENABLED=0 GOOS=linux go build -o main .
 
 FROM alpine:3.9.4
diff --git a/main.go b/main.go
index 583f668..477568f 100644
--- a/main.go
+++ b/main.go
@@ -16,22 +16,80 @@
 
 import (
 	"fmt"
+	"net"
 	"net/http"
 	"os"
 	"os/signal"
 	"io/ioutil"
 	"github.com/Shopify/sarama"
+	"google.golang.org/grpc"
+	"golang.org/x/net/context"
+	empty "github.com/golang/protobuf/ptypes/empty"
+        importer "./proto"
 )
 
 var (
-//	broker  = [2]string{"voltha-kafka.default.svc.cluster.local","9092"}
 	importerTopic = "importer"
 
 )
 
 var DataProducer sarama.AsyncProducer
 
-func kafkaInit() {
+type device struct  {
+	subscription []string
+	freq uint32
+}
+
+type Server struct {
+	devicemap  map[string]*device
+	gRPCserver   *grpc.Server
+	dataproducer sarama.AsyncProducer
+	devicechan   chan *importer.DeviceInfo
+}
+
+func (s *Server) SendDeviceInfo(c context.Context,  info *importer.DeviceInfo) (*empty.Empty, error) {
+	d := device {
+		freq:	info.Frequency,
+	}
+	s.devicemap[info.IpAddress] = &d
+	s.devicechan <- info
+	return &empty.Empty{}, nil
+}
+func(s *Server) subscribeevents() {
+	for {
+		select {
+			case info:= <-s.devicechan:
+				ip_address:= info.IpAddress
+			fmt.Println("Configuring  %s ...", ip_address)
+			// call subscription function with info.IpAddress
+		}
+	}
+}
+func NewGrpcServer(grpcport string) (l net.Listener, g *grpc.Server, e error) {
+        fmt.Println("Listening %s ...", grpcport)
+        g = grpc.NewServer()
+        l, e = net.Listen("tcp", grpcport)
+        return
+}
+func (s *Server) startgrpcserver()error {
+	fmt.Println("starting gRPC Server")
+	grpcport := ":50051"
+	listener, gserver, err := NewGrpcServer(grpcport)
+	if err != nil {
+		fmt.Println("Failed to create gRPC server: %v", err)
+		return err
+	}
+	s.gRPCserver = gserver
+	importer.RegisterDeviceManagementServer(gserver, s)
+	if err := gserver.Serve(listener); err != nil {
+		fmt.Println("Failed to run gRPC server: %v", err)
+		return err
+	}
+	return nil
+
+}
+func (s *Server) kafkaInit() {
+	fmt.Println("Starting kafka init to Connect to broker: ")
 	config := sarama.NewConfig()
         config.Producer.RequiredAcks = sarama.WaitForAll
 	config.Producer.Retry.Max = 10
@@ -40,7 +98,7 @@
 	if err != nil {
 		panic(err)
 	}
-	DataProducer = producer
+	s.dataproducer = producer
 	defer func() {
 		if err := producer.Close(); err != nil {
 			panic(err)
@@ -48,7 +106,7 @@
 	}()
 }
 
-func handle_events(w http.ResponseWriter, r *http.Request) {
+func (s *Server) handle_events(w http.ResponseWriter, r *http.Request) {
 	signals := make(chan os.Signal, 1)
 	signal.Notify(signals, os.Interrupt)
 
@@ -64,17 +122,17 @@
                         Value: sarama.StringEncoder(Body),
                 }
 		select {
-		case DataProducer.Input() <- message:
+		case s.dataproducer.Input() <- message:
 
 		case <-signals:
-	        DataProducer.AsyncClose() // Trigger a shutdown of the producer.
+	        s.dataproducer.AsyncClose() // Trigger a shutdown of the producer.
 		}
 	}
 }
 
-func runServer() {
+func (s *Server) runServer() {
 	fmt.Println("Starting HTTP Server")
-	http.HandleFunc("/", handle_events)
+	http.HandleFunc("/", s.handle_events)
 	http.ListenAndServe(":8080", nil)
 }
 
@@ -86,8 +144,14 @@
 
 func main() {
 	fmt.Println("Starting Device-management Container")
-	go kafkaInit()
-	go runServer()
+	s := Server {
+		devicemap:	make(map[string]*device),
+		devicechan:	make(chan *importer.DeviceInfo),
+	}
+	go s.kafkaInit()
+	go s.runServer()
+	go s.startgrpcserver()
+	go s.subscribeevents()
 	quit := make(chan os.Signal)
 	signal.Notify(quit, os.Interrupt)
 
diff --git a/proto/importer.proto b/proto/importer.proto
new file mode 100644
index 0000000..d2cab50
--- /dev/null
+++ b/proto/importer.proto
@@ -0,0 +1,21 @@
+syntax = "proto3";
+  
+
+package importer;
+
+import "google/protobuf/empty.proto";
+//import "google/api/annotations.proto";
+
+
+message DeviceInfo {
+    string ip_address = 1;
+    uint32 frequency = 2;
+}
+
+
+service device_management {
+    rpc SendDeviceInfo(DeviceInfo)
+        returns (google.protobuf.Empty) {}
+}
+
+