blob: 4e7f52cb8deda708f1a987053213ca4c4f44bdd5 [file] [log] [blame]
Dinesh Belwalkar01217962019-05-23 21:51:16 +00001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package main
16
17import (
18 "fmt"
Dinesh Belwalkar41229602019-06-21 16:58:06 +000019 "net"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000020 "net/http"
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +000021 "os"
22 "os/signal"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000023 "io/ioutil"
24 "github.com/Shopify/sarama"
Dinesh Belwalkar41229602019-06-21 16:58:06 +000025 "google.golang.org/grpc"
26 "golang.org/x/net/context"
mc6a9f01a2019-06-26 21:31:23 +000027 "crypto/tls"
Dinesh Belwalkar41229602019-06-21 16:58:06 +000028 empty "github.com/golang/protobuf/ptypes/empty"
29 importer "./proto"
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +000030 log "github.com/Sirupsen/logrus"
mce7028402019-07-18 04:10:01 +000031 "time"
Dinesh Belwalkar01217962019-05-23 21:51:16 +000032)
33
mce7028402019-07-18 04:10:01 +000034//globals
35const REDFISH_ROOT = "/redfish/v1"
36const CONTENT_TYPE = "application/json"
37
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000038var (
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000039 importerTopic = "importer"
40
41)
42
43var DataProducer sarama.AsyncProducer
44
mc6a9f01a2019-06-26 21:31:23 +000045
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +000046var vendor_default_events = map[string][]string{
47 "edgecore": {"ResourceAdded","ResourceRemoved","Alert"},
48 }
mce7028402019-07-18 04:10:01 +000049var redfish_services = [...]string{"/Chassis", "/Systems","/EthernetSwitches"}
50
51type scheduler struct {
52 getdata time.Ticker
53 quit chan bool
54}
55
Dinesh Belwalkar41229602019-06-21 16:58:06 +000056type device struct {
mc6a9f01a2019-06-26 21:31:23 +000057 subscriptions map[string]uint
mce7028402019-07-18 04:10:01 +000058 datacollector scheduler
59 freqchan chan uint32
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +000060 vendor string
Dinesh Belwalkar41229602019-06-21 16:58:06 +000061}
62
63type Server struct {
64 devicemap map[string]*device
65 gRPCserver *grpc.Server
66 dataproducer sarama.AsyncProducer
67 devicechan chan *importer.DeviceInfo
68}
69
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +000070func (s *Server) GetEventList(c context.Context, info *importer.DeviceInfo) (*importer.SupportedEventList, error) {
mce7028402019-07-18 04:10:01 +000071 fmt.Println("Received GetEventList\n")
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +000072 eventstobesubscribed:= new(importer.SupportedEventList)
73 eventstobesubscribed.Events = vendor_default_events[info.Vendor]
mce7028402019-07-18 04:10:01 +000074 return eventstobesubscribed, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +000075}
76
77func (s *Server) SetFrequency(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
mce7028402019-07-18 04:10:01 +000078 fmt.Println("Received SetFrequency")
79 s.devicemap[info.IpAddress].freqchan <- info.Frequency
80 return &empty.Empty{}, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +000081}
82
83func (s *Server) SubsrcribeGivenEvents(c context.Context, subeventlist *importer.EventList) (*empty.Empty, error) {
mce7028402019-07-18 04:10:01 +000084 fmt.Println("Received SubsrcribeEvents\n")
85 //Call API to subscribe events
86 ip_address := subeventlist.EventIpAddress
87 for _, event := range subeventlist.Events {
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +000088 if _, ok := s.devicemap[ip_address].subscriptions[event]; !ok {
89 rtn, id := add_subscription(ip_address, event)
90 if rtn {
91 s.devicemap[ip_address].subscriptions[event] = id
92 fmt.Println("subscription added", event, id)
93 }
94 } else {
95 log.WithFields(log.Fields{
96 "Event": event,
97 }).Info("Already Subscribed")
98 }
mce7028402019-07-18 04:10:01 +000099 }
100 return &empty.Empty{}, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000101}
102
103func (s *Server) UnSubsrcribeGivenEvents(c context.Context, unsubeventlist *importer.EventList) (*empty.Empty, error) {
104 fmt.Println("Received UnSubsrcribeEvents\n")
105 ip_address := unsubeventlist.EventIpAddress
106 //Call API to unsubscribe events
107 for _, event := range unsubeventlist.Events {
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000108 if _, ok := s.devicemap[ip_address].subscriptions[event]; ok {
109 rtn := remove_subscription(ip_address, s.devicemap[ip_address].subscriptions[event] )
110 if rtn {
111 delete(s.devicemap[ip_address].subscriptions, event)
112 fmt.Println("subscription removed", event)
113 }
114 } else {
115 log.WithFields(log.Fields{
116 "Event": event,
117 }).Info("was not Subscribed")
118 }
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000119 }
120
121 return &empty.Empty{}, nil
122}
123
mce7028402019-07-18 04:10:01 +0000124func (s *Server) collect_data(ip_address string) {
125 freqchan := s.devicemap[ip_address].freqchan
126 ticker := s.devicemap[ip_address].datacollector.getdata
127 donechan := s.devicemap[ip_address].datacollector.quit
128 for {
129 select {
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000130 case freq := <-freqchan:
mce7028402019-07-18 04:10:01 +0000131 ticker.Stop()
132 ticker = *time.NewTicker(time.Duration(freq) * time.Second)
133 case <-ticker.C:
134 for _, service := range redfish_services {
135 rtn, data := get_status(ip_address, service)
136 if rtn {
137 for _, str := range data {
138 str = "Device IP: " + ip_address + " " + str
139 b := []byte(str)
140 s.dataproducer.Input() <- &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
141 }
142 }
143 }
144 case <-donechan:
145 ticker.Stop()
146 fmt.Println("getdata ticker stopped")
147 return
148 }
149 }
150}
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000151
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000152func (s *Server) SendDeviceInfo(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
153 d := device {
mc6a9f01a2019-06-26 21:31:23 +0000154 subscriptions: make(map[string]uint),
mce7028402019-07-18 04:10:01 +0000155 datacollector: scheduler{
156 getdata: *time.NewTicker(time.Duration(info.Frequency) * time.Second),
157 quit: make(chan bool),
158 },
159 freqchan: make(chan uint32),
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000160 vendor: info.Vendor,
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000161 }
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000162 //default_events := [...]string{}
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000163 s.devicemap[info.IpAddress] = &d
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000164 ip_address:= info.IpAddress
165 fmt.Println("Configuring %s ...", ip_address)
166 // call subscription function with info.IpAddress
mc6a9f01a2019-06-26 21:31:23 +0000167 http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000168 default_events := vendor_default_events[info.Vendor]
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000169 for _, event := range default_events {
170 rtn, id := add_subscription(ip_address, event)
171 if rtn {
172 s.devicemap[ip_address].subscriptions[event] = id
173 fmt.Println("subscription added", event, id)
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000174 }
175 }
mce7028402019-07-18 04:10:01 +0000176 go s.collect_data(ip_address)
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000177 return &empty.Empty{}, nil
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000178}
mc6a9f01a2019-06-26 21:31:23 +0000179
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000180func NewGrpcServer(grpcport string) (l net.Listener, g *grpc.Server, e error) {
181 fmt.Println("Listening %s ...", grpcport)
182 g = grpc.NewServer()
183 l, e = net.Listen("tcp", grpcport)
184 return
185}
186func (s *Server) startgrpcserver()error {
187 fmt.Println("starting gRPC Server")
188 grpcport := ":50051"
189 listener, gserver, err := NewGrpcServer(grpcport)
190 if err != nil {
191 fmt.Println("Failed to create gRPC server: %v", err)
192 return err
193 }
194 s.gRPCserver = gserver
195 importer.RegisterDeviceManagementServer(gserver, s)
196 if err := gserver.Serve(listener); err != nil {
197 fmt.Println("Failed to run gRPC server: %v", err)
198 return err
199 }
200 return nil
201
202}
203func (s *Server) kafkaInit() {
204 fmt.Println("Starting kafka init to Connect to broker: ")
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000205 config := sarama.NewConfig()
206 config.Producer.RequiredAcks = sarama.WaitForAll
207 config.Producer.Retry.Max = 10
208 config.Producer.Return.Successes = true
209 producer, err := sarama.NewAsyncProducer([]string{"cord-kafka.default.svc.cluster.local:9092"}, config)
210 if err != nil {
211 panic(err)
212 }
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000213 s.dataproducer = producer
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000214 defer func() {
215 if err := producer.Close(); err != nil {
216 panic(err)
217 }
218 }()
219}
220
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000221func (s *Server) handle_events(w http.ResponseWriter, r *http.Request) {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000222 signals := make(chan os.Signal, 1)
223 signal.Notify(signals, os.Interrupt)
224
225 if(r.Method == "POST"){
226 Body, err := ioutil.ReadAll(r.Body)
227 if err != nil {
228 fmt.Println("Error getting HTTP data",err)
229 }
230 defer r.Body.Close()
231 fmt.Printf("%s\n",Body)
232 message :=&sarama.ProducerMessage{
233 Topic: importerTopic,
234 Value: sarama.StringEncoder(Body),
235 }
236 select {
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000237 case s.dataproducer.Input() <- message:
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000238
239 case <-signals:
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000240 s.dataproducer.AsyncClose() // Trigger a shutdown of the producer.
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000241 }
242 }
243}
244
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000245func (s *Server) runServer() {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000246 fmt.Println("Starting HTTP Server")
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000247 http.HandleFunc("/", s.handle_events)
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000248 http.ListenAndServe(":8080", nil)
249}
250
251func init() {
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000252 Formatter := new(log.TextFormatter)
253 Formatter.TimestampFormat = "02-01-2006 15:04:05"
254 Formatter.FullTimestamp = true
255 log.SetFormatter(Formatter)
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000256 fmt.Println("Connecting to broker: ")
257 fmt.Println("Listening to http server")
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000258 log.Info("log Connecting to broker:")
259 log.Info("log Listening to http server ")
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000260}
261
262
Dinesh Belwalkar01217962019-05-23 21:51:16 +0000263func main() {
264 fmt.Println("Starting Device-management Container")
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000265 s := Server {
266 devicemap: make(map[string]*device),
267 devicechan: make(chan *importer.DeviceInfo),
268 }
269 go s.kafkaInit()
270 go s.runServer()
271 go s.startgrpcserver()
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +0000272 quit := make(chan os.Signal)
273 signal.Notify(quit, os.Interrupt)
274
275 select {
276 case sig := <-quit:
277 fmt.Println("Shutting down:", sig)
278 }
Dinesh Belwalkar01217962019-05-23 21:51:16 +0000279}