[SEBA-618] create protofile and API for importer to expose
This commit adds
-protofile
-grpc server
-service to register device
-put event data received from http server to kafka bus
- server structure for all methods and databse to store device information
Change-Id: Ic56d42553c132305bf485188b4234cb1dfcba511
diff --git a/Dockerfile b/Dockerfile
index 1e59813..a03a39f 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -15,14 +15,29 @@
# docker build -t opencord/device-management:latest .
# docker build -t 10.90.0.101:30500/opencord/kafka-topic-exporter:latest .
-FROM golang:1.12-alpine3.9 AS build-env
+FROM golang:1.12 AS build-env
RUN mkdir /app
ADD . /app/
WORKDIR /app
-RUN apk add git
-RUN apk add gcc
-RUN apk add build-base
-RUN go get github.com/Shopify/sarama
+ENV PROTOC_VERSION 3.6.1
+ENV PROTOC_SHA256SUM 6003de742ea3fcf703cfec1cd4a3380fd143081a2eb0e559065563496af27807
+RUN apt-get update && apt-get install -y \
+ git \
+ gcc \
+ curl \
+ unzip
+RUN curl -L -o /tmp/protoc-${PROTOC_VERSION}-linux-x86_64.zip https://github.com/google/protobuf/releases/download/v${PROTOC_VERSION}/protoc-${PROTOC_VERSION}-linux-x86_64.zip
+RUN mkdir /tmp/protoc3
+RUN echo "$PROTOC_SHA256SUM /tmp/protoc-${PROTOC_VERSION}-linux-x86_64.zip" | sha256sum -c - \
+ && unzip /tmp/protoc-${PROTOC_VERSION}-linux-x86_64.zip -d /tmp/protoc3 \
+ && mv /tmp/protoc3/bin/* /usr/local/bin/ \
+ && mv /tmp/protoc3/include/* /usr/local/include/
+RUN go get -u google.golang.org/grpc \
+ && go get github.com/golang/protobuf/proto \
+ && go get -v github.com/grpc-ecosystem/grpc-gateway/protoc-gen-grpc-gateway \
+ && go get -v github.com/golang/protobuf/protoc-gen-go \
+ && go get github.com/Shopify/sarama \
+ && protoc -I proto -I${GOPATH}/src/github.com/grpc-ecosystem/grpc-gateway/third_party/googleapis --go_out=plugins=grpc:proto/ proto/*.proto
RUN CGO_ENABLED=0 GOOS=linux go build -o main .
FROM alpine:3.9.4
diff --git a/main.go b/main.go
index 583f668..477568f 100644
--- a/main.go
+++ b/main.go
@@ -16,22 +16,80 @@
import (
"fmt"
+ "net"
"net/http"
"os"
"os/signal"
"io/ioutil"
"github.com/Shopify/sarama"
+ "google.golang.org/grpc"
+ "golang.org/x/net/context"
+ empty "github.com/golang/protobuf/ptypes/empty"
+ importer "./proto"
)
var (
-// broker = [2]string{"voltha-kafka.default.svc.cluster.local","9092"}
importerTopic = "importer"
)
var DataProducer sarama.AsyncProducer
-func kafkaInit() {
+type device struct {
+ subscription []string
+ freq uint32
+}
+
+type Server struct {
+ devicemap map[string]*device
+ gRPCserver *grpc.Server
+ dataproducer sarama.AsyncProducer
+ devicechan chan *importer.DeviceInfo
+}
+
+func (s *Server) SendDeviceInfo(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
+ d := device {
+ freq: info.Frequency,
+ }
+ s.devicemap[info.IpAddress] = &d
+ s.devicechan <- info
+ return &empty.Empty{}, nil
+}
+func(s *Server) subscribeevents() {
+ for {
+ select {
+ case info:= <-s.devicechan:
+ ip_address:= info.IpAddress
+ fmt.Println("Configuring %s ...", ip_address)
+ // call subscription function with info.IpAddress
+ }
+ }
+}
+func NewGrpcServer(grpcport string) (l net.Listener, g *grpc.Server, e error) {
+ fmt.Println("Listening %s ...", grpcport)
+ g = grpc.NewServer()
+ l, e = net.Listen("tcp", grpcport)
+ return
+}
+func (s *Server) startgrpcserver()error {
+ fmt.Println("starting gRPC Server")
+ grpcport := ":50051"
+ listener, gserver, err := NewGrpcServer(grpcport)
+ if err != nil {
+ fmt.Println("Failed to create gRPC server: %v", err)
+ return err
+ }
+ s.gRPCserver = gserver
+ importer.RegisterDeviceManagementServer(gserver, s)
+ if err := gserver.Serve(listener); err != nil {
+ fmt.Println("Failed to run gRPC server: %v", err)
+ return err
+ }
+ return nil
+
+}
+func (s *Server) kafkaInit() {
+ fmt.Println("Starting kafka init to Connect to broker: ")
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 10
@@ -40,7 +98,7 @@
if err != nil {
panic(err)
}
- DataProducer = producer
+ s.dataproducer = producer
defer func() {
if err := producer.Close(); err != nil {
panic(err)
@@ -48,7 +106,7 @@
}()
}
-func handle_events(w http.ResponseWriter, r *http.Request) {
+func (s *Server) handle_events(w http.ResponseWriter, r *http.Request) {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
@@ -64,17 +122,17 @@
Value: sarama.StringEncoder(Body),
}
select {
- case DataProducer.Input() <- message:
+ case s.dataproducer.Input() <- message:
case <-signals:
- DataProducer.AsyncClose() // Trigger a shutdown of the producer.
+ s.dataproducer.AsyncClose() // Trigger a shutdown of the producer.
}
}
}
-func runServer() {
+func (s *Server) runServer() {
fmt.Println("Starting HTTP Server")
- http.HandleFunc("/", handle_events)
+ http.HandleFunc("/", s.handle_events)
http.ListenAndServe(":8080", nil)
}
@@ -86,8 +144,14 @@
func main() {
fmt.Println("Starting Device-management Container")
- go kafkaInit()
- go runServer()
+ s := Server {
+ devicemap: make(map[string]*device),
+ devicechan: make(chan *importer.DeviceInfo),
+ }
+ go s.kafkaInit()
+ go s.runServer()
+ go s.startgrpcserver()
+ go s.subscribeevents()
quit := make(chan os.Signal)
signal.Notify(quit, os.Interrupt)
diff --git a/proto/importer.proto b/proto/importer.proto
new file mode 100644
index 0000000..d2cab50
--- /dev/null
+++ b/proto/importer.proto
@@ -0,0 +1,21 @@
+syntax = "proto3";
+
+
+package importer;
+
+import "google/protobuf/empty.proto";
+//import "google/api/annotations.proto";
+
+
+message DeviceInfo {
+ string ip_address = 1;
+ uint32 frequency = 2;
+}
+
+
+service device_management {
+ rpc SendDeviceInfo(DeviceInfo)
+ returns (google.protobuf.Empty) {}
+}
+
+