blob: 9a093889d3de5341f8cf58c60582b36c4426baaa [file] [log] [blame]
Dinesh Belwalkar01217962019-05-23 21:51:16 +00001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package main
16
17import (
18 "fmt"
Dinesh Belwalkar41229602019-06-21 16:58:06 +000019 "net"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000020 "net/http"
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +000021 "os"
22 "os/signal"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000023 "io/ioutil"
24 "github.com/Shopify/sarama"
Dinesh Belwalkar41229602019-06-21 16:58:06 +000025 "google.golang.org/grpc"
26 "golang.org/x/net/context"
mc6a9f01a2019-06-26 21:31:23 +000027 "crypto/tls"
Dinesh Belwalkar41229602019-06-21 16:58:06 +000028 empty "github.com/golang/protobuf/ptypes/empty"
29 importer "./proto"
mce7028402019-07-18 04:10:01 +000030 "time"
Dinesh Belwalkar01217962019-05-23 21:51:16 +000031)
32
mce7028402019-07-18 04:10:01 +000033//globals
34const REDFISH_ROOT = "/redfish/v1"
35const CONTENT_TYPE = "application/json"
36
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000037var (
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000038 importerTopic = "importer"
39
40)
41
42var DataProducer sarama.AsyncProducer
43
mc6a9f01a2019-06-26 21:31:23 +000044var default_events = [...]string{"ResourceAdded","ResourceRemoved","Alert"}
45
mce7028402019-07-18 04:10:01 +000046var redfish_services = [...]string{"/Chassis", "/Systems","/EthernetSwitches"}
47
48type scheduler struct {
49 getdata time.Ticker
50 quit chan bool
51}
52
Dinesh Belwalkar41229602019-06-21 16:58:06 +000053type device struct {
mc6a9f01a2019-06-26 21:31:23 +000054 subscriptions map[string]uint
mce7028402019-07-18 04:10:01 +000055 datacollector scheduler
56 freqchan chan uint32
Dinesh Belwalkar41229602019-06-21 16:58:06 +000057}
58
59type Server struct {
60 devicemap map[string]*device
61 gRPCserver *grpc.Server
62 dataproducer sarama.AsyncProducer
63 devicechan chan *importer.DeviceInfo
64}
65
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +000066func (s *Server) GetEventList(c context.Context, info *importer.DeviceInfo) (*importer.EventList, error) {
mce7028402019-07-18 04:10:01 +000067 fmt.Println("Received GetEventList\n")
68 eventstobesubscribed:= new(importer.EventList)
69 eventstobesubscribed.EventIpAddress = info.IpAddress
70 eventstobesubscribed.Events = append(eventstobesubscribed.Events,"ResourceAdded","ResourceRemoved","Alert")
71 return eventstobesubscribed, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +000072}
73
74func (s *Server) SetFrequency(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
mce7028402019-07-18 04:10:01 +000075 fmt.Println("Received SetFrequency")
76 s.devicemap[info.IpAddress].freqchan <- info.Frequency
77 return &empty.Empty{}, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +000078}
79
80func (s *Server) SubsrcribeGivenEvents(c context.Context, subeventlist *importer.EventList) (*empty.Empty, error) {
mce7028402019-07-18 04:10:01 +000081 fmt.Println("Received SubsrcribeEvents\n")
82 //Call API to subscribe events
83 ip_address := subeventlist.EventIpAddress
84 for _, event := range subeventlist.Events {
85 rtn, id := add_subscription(ip_address, event)
86 if rtn {
87 s.devicemap[ip_address].subscriptions[event] = id
88 fmt.Println("subscription added", event, id)
89 }
90 }
91 return &empty.Empty{}, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +000092}
93
94func (s *Server) UnSubsrcribeGivenEvents(c context.Context, unsubeventlist *importer.EventList) (*empty.Empty, error) {
95 fmt.Println("Received UnSubsrcribeEvents\n")
96 ip_address := unsubeventlist.EventIpAddress
97 //Call API to unsubscribe events
98 for _, event := range unsubeventlist.Events {
99 rtn := remove_subscription(ip_address, s.devicemap[ip_address].subscriptions[event] )
100 if rtn {
101 delete(s.devicemap[ip_address].subscriptions, event)
102 fmt.Println("subscription removed", event)
103 }
104 }
105
106 return &empty.Empty{}, nil
107}
108
mce7028402019-07-18 04:10:01 +0000109func (s *Server) collect_data(ip_address string) {
110 freqchan := s.devicemap[ip_address].freqchan
111 ticker := s.devicemap[ip_address].datacollector.getdata
112 donechan := s.devicemap[ip_address].datacollector.quit
113 for {
114 select {
115 case freq := <- freqchan:
116 ticker.Stop()
117 ticker = *time.NewTicker(time.Duration(freq) * time.Second)
118 case <-ticker.C:
119 for _, service := range redfish_services {
120 rtn, data := get_status(ip_address, service)
121 if rtn {
122 for _, str := range data {
123 str = "Device IP: " + ip_address + " " + str
124 b := []byte(str)
125 s.dataproducer.Input() <- &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
126 }
127 }
128 }
129 case <-donechan:
130 ticker.Stop()
131 fmt.Println("getdata ticker stopped")
132 return
133 }
134 }
135}
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000136
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000137func (s *Server) SendDeviceInfo(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
138 d := device {
mc6a9f01a2019-06-26 21:31:23 +0000139 subscriptions: make(map[string]uint),
mce7028402019-07-18 04:10:01 +0000140 datacollector: scheduler{
141 getdata: *time.NewTicker(time.Duration(info.Frequency) * time.Second),
142 quit: make(chan bool),
143 },
144 freqchan: make(chan uint32),
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000145 }
146 s.devicemap[info.IpAddress] = &d
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000147 ip_address:= info.IpAddress
148 fmt.Println("Configuring %s ...", ip_address)
149 // call subscription function with info.IpAddress
mc6a9f01a2019-06-26 21:31:23 +0000150 http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000151 for _, event := range default_events {
152 rtn, id := add_subscription(ip_address, event)
153 if rtn {
154 s.devicemap[ip_address].subscriptions[event] = id
155 fmt.Println("subscription added", event, id)
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000156 }
157 }
mce7028402019-07-18 04:10:01 +0000158 go s.collect_data(ip_address)
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000159 return &empty.Empty{}, nil
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000160}
mc6a9f01a2019-06-26 21:31:23 +0000161
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000162func NewGrpcServer(grpcport string) (l net.Listener, g *grpc.Server, e error) {
163 fmt.Println("Listening %s ...", grpcport)
164 g = grpc.NewServer()
165 l, e = net.Listen("tcp", grpcport)
166 return
167}
168func (s *Server) startgrpcserver()error {
169 fmt.Println("starting gRPC Server")
170 grpcport := ":50051"
171 listener, gserver, err := NewGrpcServer(grpcport)
172 if err != nil {
173 fmt.Println("Failed to create gRPC server: %v", err)
174 return err
175 }
176 s.gRPCserver = gserver
177 importer.RegisterDeviceManagementServer(gserver, s)
178 if err := gserver.Serve(listener); err != nil {
179 fmt.Println("Failed to run gRPC server: %v", err)
180 return err
181 }
182 return nil
183
184}
185func (s *Server) kafkaInit() {
186 fmt.Println("Starting kafka init to Connect to broker: ")
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000187 config := sarama.NewConfig()
188 config.Producer.RequiredAcks = sarama.WaitForAll
189 config.Producer.Retry.Max = 10
190 config.Producer.Return.Successes = true
191 producer, err := sarama.NewAsyncProducer([]string{"cord-kafka.default.svc.cluster.local:9092"}, config)
192 if err != nil {
193 panic(err)
194 }
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000195 s.dataproducer = producer
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000196 defer func() {
197 if err := producer.Close(); err != nil {
198 panic(err)
199 }
200 }()
201}
202
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000203func (s *Server) handle_events(w http.ResponseWriter, r *http.Request) {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000204 signals := make(chan os.Signal, 1)
205 signal.Notify(signals, os.Interrupt)
206
207 if(r.Method == "POST"){
208 Body, err := ioutil.ReadAll(r.Body)
209 if err != nil {
210 fmt.Println("Error getting HTTP data",err)
211 }
212 defer r.Body.Close()
213 fmt.Printf("%s\n",Body)
214 message :=&sarama.ProducerMessage{
215 Topic: importerTopic,
216 Value: sarama.StringEncoder(Body),
217 }
218 select {
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000219 case s.dataproducer.Input() <- message:
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000220
221 case <-signals:
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000222 s.dataproducer.AsyncClose() // Trigger a shutdown of the producer.
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000223 }
224 }
225}
226
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000227func (s *Server) runServer() {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000228 fmt.Println("Starting HTTP Server")
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000229 http.HandleFunc("/", s.handle_events)
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000230 http.ListenAndServe(":8080", nil)
231}
232
233func init() {
234 fmt.Println("Connecting to broker: ")
235 fmt.Println("Listening to http server")
236}
237
238
Dinesh Belwalkar01217962019-05-23 21:51:16 +0000239func main() {
240 fmt.Println("Starting Device-management Container")
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000241 s := Server {
242 devicemap: make(map[string]*device),
243 devicechan: make(chan *importer.DeviceInfo),
244 }
245 go s.kafkaInit()
246 go s.runServer()
247 go s.startgrpcserver()
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +0000248 quit := make(chan os.Signal)
249 signal.Notify(quit, os.Interrupt)
250
251 select {
252 case sig := <-quit:
253 fmt.Println("Shutting down:", sig)
254 }
Dinesh Belwalkar01217962019-05-23 21:51:16 +0000255}