blob: 6394ba7a64d66ae9c2884b9863c245370e2c8052 [file] [log] [blame]
Dinesh Belwalkar01217962019-05-23 21:51:16 +00001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package main
16
17import (
18 "fmt"
Dinesh Belwalkar41229602019-06-21 16:58:06 +000019 "net"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000020 "net/http"
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +000021 "os"
22 "os/signal"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000023 "io/ioutil"
24 "github.com/Shopify/sarama"
Dinesh Belwalkar41229602019-06-21 16:58:06 +000025 "google.golang.org/grpc"
26 "golang.org/x/net/context"
mc6a9f01a2019-06-26 21:31:23 +000027 "crypto/tls"
Dinesh Belwalkar41229602019-06-21 16:58:06 +000028 empty "github.com/golang/protobuf/ptypes/empty"
29 importer "./proto"
Dinesh Belwalkar01217962019-05-23 21:51:16 +000030)
31
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000032var (
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000033 importerTopic = "importer"
34
35)
36
37var DataProducer sarama.AsyncProducer
38
mc6a9f01a2019-06-26 21:31:23 +000039var default_events = [...]string{"ResourceAdded","ResourceRemoved","Alert"}
40
Dinesh Belwalkar41229602019-06-21 16:58:06 +000041type device struct {
mc6a9f01a2019-06-26 21:31:23 +000042 subscriptions map[string]uint
Dinesh Belwalkar41229602019-06-21 16:58:06 +000043 freq uint32
44}
45
46type Server struct {
47 devicemap map[string]*device
48 gRPCserver *grpc.Server
49 dataproducer sarama.AsyncProducer
50 devicechan chan *importer.DeviceInfo
51}
52
53func (s *Server) SendDeviceInfo(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
54 d := device {
mc6a9f01a2019-06-26 21:31:23 +000055 subscriptions: make(map[string]uint),
Dinesh Belwalkar41229602019-06-21 16:58:06 +000056 freq: info.Frequency,
57 }
58 s.devicemap[info.IpAddress] = &d
59 s.devicechan <- info
60 return &empty.Empty{}, nil
61}
62func(s *Server) subscribeevents() {
mc6a9f01a2019-06-26 21:31:23 +000063 http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
Dinesh Belwalkar41229602019-06-21 16:58:06 +000064 for {
65 select {
mc6a9f01a2019-06-26 21:31:23 +000066 case info:= <-s.devicechan:
67 ip_address:= info.IpAddress
Dinesh Belwalkar41229602019-06-21 16:58:06 +000068 fmt.Println("Configuring %s ...", ip_address)
69 // call subscription function with info.IpAddress
mc6a9f01a2019-06-26 21:31:23 +000070 for _, event := range default_events {
71 rtn, id := add_subscription(ip_address, event)
72 if rtn {
73 s.devicemap[ip_address].subscriptions[event] = id
74 fmt.Println("subscription added", event, id)
75 }
76 }
Dinesh Belwalkar41229602019-06-21 16:58:06 +000077 }
78 }
79}
mc6a9f01a2019-06-26 21:31:23 +000080
Dinesh Belwalkar41229602019-06-21 16:58:06 +000081func NewGrpcServer(grpcport string) (l net.Listener, g *grpc.Server, e error) {
82 fmt.Println("Listening %s ...", grpcport)
83 g = grpc.NewServer()
84 l, e = net.Listen("tcp", grpcport)
85 return
86}
87func (s *Server) startgrpcserver()error {
88 fmt.Println("starting gRPC Server")
89 grpcport := ":50051"
90 listener, gserver, err := NewGrpcServer(grpcport)
91 if err != nil {
92 fmt.Println("Failed to create gRPC server: %v", err)
93 return err
94 }
95 s.gRPCserver = gserver
96 importer.RegisterDeviceManagementServer(gserver, s)
97 if err := gserver.Serve(listener); err != nil {
98 fmt.Println("Failed to run gRPC server: %v", err)
99 return err
100 }
101 return nil
102
103}
104func (s *Server) kafkaInit() {
105 fmt.Println("Starting kafka init to Connect to broker: ")
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000106 config := sarama.NewConfig()
107 config.Producer.RequiredAcks = sarama.WaitForAll
108 config.Producer.Retry.Max = 10
109 config.Producer.Return.Successes = true
110 producer, err := sarama.NewAsyncProducer([]string{"cord-kafka.default.svc.cluster.local:9092"}, config)
111 if err != nil {
112 panic(err)
113 }
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000114 s.dataproducer = producer
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000115 defer func() {
116 if err := producer.Close(); err != nil {
117 panic(err)
118 }
119 }()
120}
121
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000122func (s *Server) handle_events(w http.ResponseWriter, r *http.Request) {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000123 signals := make(chan os.Signal, 1)
124 signal.Notify(signals, os.Interrupt)
125
126 if(r.Method == "POST"){
127 Body, err := ioutil.ReadAll(r.Body)
128 if err != nil {
129 fmt.Println("Error getting HTTP data",err)
130 }
131 defer r.Body.Close()
132 fmt.Printf("%s\n",Body)
133 message :=&sarama.ProducerMessage{
134 Topic: importerTopic,
135 Value: sarama.StringEncoder(Body),
136 }
137 select {
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000138 case s.dataproducer.Input() <- message:
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000139
140 case <-signals:
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000141 s.dataproducer.AsyncClose() // Trigger a shutdown of the producer.
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000142 }
143 }
144}
145
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000146func (s *Server) runServer() {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000147 fmt.Println("Starting HTTP Server")
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000148 http.HandleFunc("/", s.handle_events)
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000149 http.ListenAndServe(":8080", nil)
150}
151
152func init() {
153 fmt.Println("Connecting to broker: ")
154 fmt.Println("Listening to http server")
155}
156
157
Dinesh Belwalkar01217962019-05-23 21:51:16 +0000158func main() {
159 fmt.Println("Starting Device-management Container")
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000160 s := Server {
161 devicemap: make(map[string]*device),
162 devicechan: make(chan *importer.DeviceInfo),
163 }
164 go s.kafkaInit()
165 go s.runServer()
166 go s.startgrpcserver()
167 go s.subscribeevents()
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +0000168 quit := make(chan os.Signal)
169 signal.Notify(quit, os.Interrupt)
170
171 select {
172 case sig := <-quit:
173 fmt.Println("Shutting down:", sig)
174 }
Dinesh Belwalkar01217962019-05-23 21:51:16 +0000175}