blob: 477568f888d5deed5f38e46c3a46f2e2d35c90a8 [file] [log] [blame]
Dinesh Belwalkar01217962019-05-23 21:51:16 +00001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package main
16
17import (
18 "fmt"
Dinesh Belwalkar41229602019-06-21 16:58:06 +000019 "net"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000020 "net/http"
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +000021 "os"
22 "os/signal"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000023 "io/ioutil"
24 "github.com/Shopify/sarama"
Dinesh Belwalkar41229602019-06-21 16:58:06 +000025 "google.golang.org/grpc"
26 "golang.org/x/net/context"
27 empty "github.com/golang/protobuf/ptypes/empty"
28 importer "./proto"
Dinesh Belwalkar01217962019-05-23 21:51:16 +000029)
30
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000031var (
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000032 importerTopic = "importer"
33
34)
35
36var DataProducer sarama.AsyncProducer
37
Dinesh Belwalkar41229602019-06-21 16:58:06 +000038type device struct {
39 subscription []string
40 freq uint32
41}
42
43type Server struct {
44 devicemap map[string]*device
45 gRPCserver *grpc.Server
46 dataproducer sarama.AsyncProducer
47 devicechan chan *importer.DeviceInfo
48}
49
50func (s *Server) SendDeviceInfo(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
51 d := device {
52 freq: info.Frequency,
53 }
54 s.devicemap[info.IpAddress] = &d
55 s.devicechan <- info
56 return &empty.Empty{}, nil
57}
58func(s *Server) subscribeevents() {
59 for {
60 select {
61 case info:= <-s.devicechan:
62 ip_address:= info.IpAddress
63 fmt.Println("Configuring %s ...", ip_address)
64 // call subscription function with info.IpAddress
65 }
66 }
67}
68func NewGrpcServer(grpcport string) (l net.Listener, g *grpc.Server, e error) {
69 fmt.Println("Listening %s ...", grpcport)
70 g = grpc.NewServer()
71 l, e = net.Listen("tcp", grpcport)
72 return
73}
74func (s *Server) startgrpcserver()error {
75 fmt.Println("starting gRPC Server")
76 grpcport := ":50051"
77 listener, gserver, err := NewGrpcServer(grpcport)
78 if err != nil {
79 fmt.Println("Failed to create gRPC server: %v", err)
80 return err
81 }
82 s.gRPCserver = gserver
83 importer.RegisterDeviceManagementServer(gserver, s)
84 if err := gserver.Serve(listener); err != nil {
85 fmt.Println("Failed to run gRPC server: %v", err)
86 return err
87 }
88 return nil
89
90}
91func (s *Server) kafkaInit() {
92 fmt.Println("Starting kafka init to Connect to broker: ")
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000093 config := sarama.NewConfig()
94 config.Producer.RequiredAcks = sarama.WaitForAll
95 config.Producer.Retry.Max = 10
96 config.Producer.Return.Successes = true
97 producer, err := sarama.NewAsyncProducer([]string{"cord-kafka.default.svc.cluster.local:9092"}, config)
98 if err != nil {
99 panic(err)
100 }
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000101 s.dataproducer = producer
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000102 defer func() {
103 if err := producer.Close(); err != nil {
104 panic(err)
105 }
106 }()
107}
108
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000109func (s *Server) handle_events(w http.ResponseWriter, r *http.Request) {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000110 signals := make(chan os.Signal, 1)
111 signal.Notify(signals, os.Interrupt)
112
113 if(r.Method == "POST"){
114 Body, err := ioutil.ReadAll(r.Body)
115 if err != nil {
116 fmt.Println("Error getting HTTP data",err)
117 }
118 defer r.Body.Close()
119 fmt.Printf("%s\n",Body)
120 message :=&sarama.ProducerMessage{
121 Topic: importerTopic,
122 Value: sarama.StringEncoder(Body),
123 }
124 select {
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000125 case s.dataproducer.Input() <- message:
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000126
127 case <-signals:
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000128 s.dataproducer.AsyncClose() // Trigger a shutdown of the producer.
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000129 }
130 }
131}
132
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000133func (s *Server) runServer() {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000134 fmt.Println("Starting HTTP Server")
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000135 http.HandleFunc("/", s.handle_events)
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000136 http.ListenAndServe(":8080", nil)
137}
138
139func init() {
140 fmt.Println("Connecting to broker: ")
141 fmt.Println("Listening to http server")
142}
143
144
Dinesh Belwalkar01217962019-05-23 21:51:16 +0000145func main() {
146 fmt.Println("Starting Device-management Container")
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000147 s := Server {
148 devicemap: make(map[string]*device),
149 devicechan: make(chan *importer.DeviceInfo),
150 }
151 go s.kafkaInit()
152 go s.runServer()
153 go s.startgrpcserver()
154 go s.subscribeevents()
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +0000155 quit := make(chan os.Signal)
156 signal.Notify(quit, os.Interrupt)
157
158 select {
159 case sig := <-quit:
160 fmt.Println("Shutting down:", sig)
161 }
Dinesh Belwalkar01217962019-05-23 21:51:16 +0000162}