blob: f6a69486b7bee01189ea8a7aa61e263718d3823c [file] [log] [blame]
Dinesh Belwalkar01217962019-05-23 21:51:16 +00001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package main
16
17import (
18 "fmt"
Dinesh Belwalkar41229602019-06-21 16:58:06 +000019 "net"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000020 "net/http"
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +000021 "os"
22 "os/signal"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000023 "io/ioutil"
24 "github.com/Shopify/sarama"
Dinesh Belwalkar41229602019-06-21 16:58:06 +000025 "google.golang.org/grpc"
26 "golang.org/x/net/context"
mc6a9f01a2019-06-26 21:31:23 +000027 "crypto/tls"
Dinesh Belwalkar41229602019-06-21 16:58:06 +000028 empty "github.com/golang/protobuf/ptypes/empty"
29 importer "./proto"
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +000030 log "github.com/Sirupsen/logrus"
mce7028402019-07-18 04:10:01 +000031 "time"
Dinesh Belwalkar01217962019-05-23 21:51:16 +000032)
33
mce7028402019-07-18 04:10:01 +000034//globals
35const REDFISH_ROOT = "/redfish/v1"
36const CONTENT_TYPE = "application/json"
37
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000038var (
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000039 importerTopic = "importer"
40
41)
42
43var DataProducer sarama.AsyncProducer
44
mc6a9f01a2019-06-26 21:31:23 +000045
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +000046var vendor_default_events = map[string][]string{
47 "edgecore": {"ResourceAdded","ResourceRemoved","Alert"},
48 }
mce7028402019-07-18 04:10:01 +000049var redfish_services = [...]string{"/Chassis", "/Systems","/EthernetSwitches"}
50
51type scheduler struct {
52 getdata time.Ticker
53 quit chan bool
54}
55
Dinesh Belwalkar41229602019-06-21 16:58:06 +000056type device struct {
mc6a9f01a2019-06-26 21:31:23 +000057 subscriptions map[string]uint
mce7028402019-07-18 04:10:01 +000058 datacollector scheduler
59 freqchan chan uint32
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +000060 vendor string
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +000061 protocol string
Dinesh Belwalkar41229602019-06-21 16:58:06 +000062}
63
64type Server struct {
65 devicemap map[string]*device
66 gRPCserver *grpc.Server
67 dataproducer sarama.AsyncProducer
68 devicechan chan *importer.DeviceInfo
69}
70
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +000071func (s *Server) GetEventList(c context.Context, info *importer.DeviceInfo) (*importer.SupportedEventList, error) {
mce7028402019-07-18 04:10:01 +000072 fmt.Println("Received GetEventList\n")
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +000073 eventstobesubscribed:= new(importer.SupportedEventList)
74 eventstobesubscribed.Events = vendor_default_events[info.Vendor]
mce7028402019-07-18 04:10:01 +000075 return eventstobesubscribed, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +000076}
77
78func (s *Server) SetFrequency(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
mce7028402019-07-18 04:10:01 +000079 fmt.Println("Received SetFrequency")
80 s.devicemap[info.IpAddress].freqchan <- info.Frequency
81 return &empty.Empty{}, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +000082}
83
84func (s *Server) SubsrcribeGivenEvents(c context.Context, subeventlist *importer.EventList) (*empty.Empty, error) {
mce7028402019-07-18 04:10:01 +000085 fmt.Println("Received SubsrcribeEvents\n")
86 //Call API to subscribe events
87 ip_address := subeventlist.EventIpAddress
88 for _, event := range subeventlist.Events {
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +000089 if _, ok := s.devicemap[ip_address].subscriptions[event]; !ok {
90 rtn, id := add_subscription(ip_address, event)
91 if rtn {
92 s.devicemap[ip_address].subscriptions[event] = id
93 fmt.Println("subscription added", event, id)
94 }
95 } else {
96 log.WithFields(log.Fields{
97 "Event": event,
98 }).Info("Already Subscribed")
99 }
mce7028402019-07-18 04:10:01 +0000100 }
101 return &empty.Empty{}, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000102}
103
104func (s *Server) UnSubsrcribeGivenEvents(c context.Context, unsubeventlist *importer.EventList) (*empty.Empty, error) {
105 fmt.Println("Received UnSubsrcribeEvents\n")
106 ip_address := unsubeventlist.EventIpAddress
107 //Call API to unsubscribe events
108 for _, event := range unsubeventlist.Events {
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000109 if _, ok := s.devicemap[ip_address].subscriptions[event]; ok {
110 rtn := remove_subscription(ip_address, s.devicemap[ip_address].subscriptions[event] )
111 if rtn {
112 delete(s.devicemap[ip_address].subscriptions, event)
113 fmt.Println("subscription removed", event)
114 }
115 } else {
116 log.WithFields(log.Fields{
117 "Event": event,
118 }).Info("was not Subscribed")
119 }
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000120 }
121
122 return &empty.Empty{}, nil
123}
124
mce7028402019-07-18 04:10:01 +0000125func (s *Server) collect_data(ip_address string) {
126 freqchan := s.devicemap[ip_address].freqchan
127 ticker := s.devicemap[ip_address].datacollector.getdata
128 donechan := s.devicemap[ip_address].datacollector.quit
129 for {
130 select {
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000131 case freq := <-freqchan:
mce7028402019-07-18 04:10:01 +0000132 ticker.Stop()
133 ticker = *time.NewTicker(time.Duration(freq) * time.Second)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000134 case err := <-s.dataproducer.Errors():
135 fmt.Println("Failed to produce message:", err)
mce7028402019-07-18 04:10:01 +0000136 case <-ticker.C:
137 for _, service := range redfish_services {
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000138 rtn, data := s.get_status(ip_address, service)
mce7028402019-07-18 04:10:01 +0000139 if rtn {
140 for _, str := range data {
141 str = "Device IP: " + ip_address + " " + str
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000142 fmt.Println("collected data %s ...", str)
mce7028402019-07-18 04:10:01 +0000143 b := []byte(str)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000144 msg := &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
145 // check if needs to add for select case
146 select {
147 case s.dataproducer.Input() <- msg:
148 fmt.Println("Produce message")
149 }
mce7028402019-07-18 04:10:01 +0000150 }
151 }
152 }
153 case <-donechan:
154 ticker.Stop()
155 fmt.Println("getdata ticker stopped")
156 return
157 }
158 }
159}
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000160
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000161func (s *Server) SendDeviceInfo(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
162 d := device {
mc6a9f01a2019-06-26 21:31:23 +0000163 subscriptions: make(map[string]uint),
mce7028402019-07-18 04:10:01 +0000164 datacollector: scheduler{
165 getdata: *time.NewTicker(time.Duration(info.Frequency) * time.Second),
166 quit: make(chan bool),
167 },
168 freqchan: make(chan uint32),
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000169 vendor: info.Vendor,
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000170 protocol: info.Protocol,
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000171 }
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000172 //default_events := [...]string{}
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000173 s.devicemap[info.IpAddress] = &d
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000174 fmt.Println("size of devicemap %d", len(s.devicemap))
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000175 ip_address:= info.IpAddress
176 fmt.Println("Configuring %s ...", ip_address)
177 // call subscription function with info.IpAddress
mc6a9f01a2019-06-26 21:31:23 +0000178 http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000179 default_events := vendor_default_events[info.Vendor]
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000180 for _, event := range default_events {
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000181 if _, ok := s.devicemap[ip_address].subscriptions[event]; !ok {
182 rtn, id := add_subscription(info.Protocol+"://"+ip_address, event)
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000183 if rtn {
184 s.devicemap[ip_address].subscriptions[event] = id
185 fmt.Println("subscription added", event, id)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000186 }
187 } else {
188 log.WithFields(log.Fields{
189 "Event": event,
190 }).Info("was Subscribed")
191 }
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000192 }
mce7028402019-07-18 04:10:01 +0000193 go s.collect_data(ip_address)
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000194 return &empty.Empty{}, nil
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000195}
mc6a9f01a2019-06-26 21:31:23 +0000196
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000197func NewGrpcServer(grpcport string) (l net.Listener, g *grpc.Server, e error) {
198 fmt.Println("Listening %s ...", grpcport)
199 g = grpc.NewServer()
200 l, e = net.Listen("tcp", grpcport)
201 return
202}
203func (s *Server) startgrpcserver()error {
204 fmt.Println("starting gRPC Server")
205 grpcport := ":50051"
206 listener, gserver, err := NewGrpcServer(grpcport)
207 if err != nil {
208 fmt.Println("Failed to create gRPC server: %v", err)
209 return err
210 }
211 s.gRPCserver = gserver
212 importer.RegisterDeviceManagementServer(gserver, s)
213 if err := gserver.Serve(listener); err != nil {
214 fmt.Println("Failed to run gRPC server: %v", err)
215 return err
216 }
217 return nil
218
219}
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000220func (s *Server) kafkaCloseProducer(){
221 if err :=s.dataproducer.Close(); err != nil {
222 panic(err)
223 }
224
225}
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000226func (s *Server) kafkaInit() {
227 fmt.Println("Starting kafka init to Connect to broker: ")
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000228 config := sarama.NewConfig()
229 config.Producer.RequiredAcks = sarama.WaitForAll
230 config.Producer.Retry.Max = 10
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000231 producer, err := sarama.NewAsyncProducer([]string{"cord-kafka.default.svc.cluster.local:9092"}, config)
232 if err != nil {
233 panic(err)
234 }
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000235 s.dataproducer = producer
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000236}
237
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000238func (s *Server) handle_events(w http.ResponseWriter, r *http.Request) {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000239 signals := make(chan os.Signal, 1)
240 signal.Notify(signals, os.Interrupt)
241
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000242 fmt.Println(" IN Handle Event ")
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000243 if(r.Method == "POST"){
244 Body, err := ioutil.ReadAll(r.Body)
245 if err != nil {
246 fmt.Println("Error getting HTTP data",err)
247 }
248 defer r.Body.Close()
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000249 fmt.Println("Received Event Message ")
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000250 fmt.Printf("%s\n",Body)
251 message :=&sarama.ProducerMessage{
252 Topic: importerTopic,
253 Value: sarama.StringEncoder(Body),
254 }
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000255 s.dataproducer.Input() <- message
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000256 }
257}
258
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000259func (s *Server) runServer() {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000260 fmt.Println("Starting HTTP Server")
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000261 http.HandleFunc("/", s.handle_events)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000262 http.ListenAndServeTLS(":8080", "https-server.crt", "https-server.key", nil)
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000263}
264
265func init() {
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000266 Formatter := new(log.TextFormatter)
267 Formatter.TimestampFormat = "02-01-2006 15:04:05"
268 Formatter.FullTimestamp = true
269 log.SetFormatter(Formatter)
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000270 fmt.Println("Connecting to broker: ")
271 fmt.Println("Listening to http server")
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000272 log.Info("log Connecting to broker:")
273 log.Info("log Listening to http server ")
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000274 //sarama.Logger = log.New()
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000275}
276
277
Dinesh Belwalkar01217962019-05-23 21:51:16 +0000278func main() {
279 fmt.Println("Starting Device-management Container")
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000280 s := Server {
281 devicemap: make(map[string]*device),
282 devicechan: make(chan *importer.DeviceInfo),
283 }
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000284 // check if we should keep this as go routines
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000285 go s.kafkaInit()
286 go s.runServer()
287 go s.startgrpcserver()
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +0000288 quit := make(chan os.Signal)
289 signal.Notify(quit, os.Interrupt)
290
291 select {
292 case sig := <-quit:
293 fmt.Println("Shutting down:", sig)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000294 s.kafkaCloseProducer()
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +0000295 }
Dinesh Belwalkar01217962019-05-23 21:51:16 +0000296}