blob: c0d734615d95ac21a725a85b91c2674e2fd75c99 [file] [log] [blame]
Dinesh Belwalkar01217962019-05-23 21:51:16 +00001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package main
16
17import (
18 "fmt"
Dinesh Belwalkar41229602019-06-21 16:58:06 +000019 "net"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000020 "net/http"
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +000021 "os"
22 "os/signal"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000023 "io/ioutil"
24 "github.com/Shopify/sarama"
Dinesh Belwalkar41229602019-06-21 16:58:06 +000025 "google.golang.org/grpc"
26 "golang.org/x/net/context"
mc6a9f01a2019-06-26 21:31:23 +000027 "crypto/tls"
Dinesh Belwalkar41229602019-06-21 16:58:06 +000028 empty "github.com/golang/protobuf/ptypes/empty"
29 importer "./proto"
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +000030 log "github.com/Sirupsen/logrus"
mce7028402019-07-18 04:10:01 +000031 "time"
mc862dad02019-08-06 20:52:51 +000032 "encoding/json"
33 "path"
Dinesh Belwalkar01217962019-05-23 21:51:16 +000034)
35
mce7028402019-07-18 04:10:01 +000036//globals
37const REDFISH_ROOT = "/redfish/v1"
38const CONTENT_TYPE = "application/json"
39
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000040var (
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000041 importerTopic = "importer"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000042)
43
44var DataProducer sarama.AsyncProducer
45
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +000046var vendor_default_events = map[string][]string{
47 "edgecore": {"ResourceAdded","ResourceRemoved","Alert"},
48 }
mce7028402019-07-18 04:10:01 +000049var redfish_services = [...]string{"/Chassis", "/Systems","/EthernetSwitches"}
mc862dad02019-08-06 20:52:51 +000050var pvmount = os.Getenv("DEVICE_MANAGEMENT_PVMOUNT")
51var subscriptionListPath string
mce7028402019-07-18 04:10:01 +000052
53type scheduler struct {
54 getdata time.Ticker
55 quit chan bool
56}
57
Dinesh Belwalkar41229602019-06-21 16:58:06 +000058type device struct {
mc862dad02019-08-06 20:52:51 +000059 Subscriptions map[string]string `json:"ss"`
60 Freq uint32 `json:"freq"`
61 Datacollector scheduler `json:"-"`
62 Freqchan chan uint32 `json:"-"`
63 Vendor string `json:"vendor"`
64 Protocol string `json:"protocol"`
Dinesh Belwalkar41229602019-06-21 16:58:06 +000065}
66
67type Server struct {
mc862dad02019-08-06 20:52:51 +000068 devicemap map[string]*device
69 gRPCserver *grpc.Server
70 dataproducer sarama.AsyncProducer
71 httpclient *http.Client
72 devicechan chan *importer.DeviceInfo
Dinesh Belwalkar41229602019-06-21 16:58:06 +000073}
74
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +000075func (s *Server) GetEventList(c context.Context, info *importer.DeviceInfo) (*importer.SupportedEventList, error) {
mce7028402019-07-18 04:10:01 +000076 fmt.Println("Received GetEventList\n")
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +000077 eventstobesubscribed:= new(importer.SupportedEventList)
78 eventstobesubscribed.Events = vendor_default_events[info.Vendor]
mce7028402019-07-18 04:10:01 +000079 return eventstobesubscribed, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +000080}
81
82func (s *Server) SetFrequency(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
mce7028402019-07-18 04:10:01 +000083 fmt.Println("Received SetFrequency")
mc862dad02019-08-06 20:52:51 +000084 s.devicemap[info.IpAddress].Freqchan <- info.Frequency
mce7028402019-07-18 04:10:01 +000085 return &empty.Empty{}, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +000086}
87
88func (s *Server) SubsrcribeGivenEvents(c context.Context, subeventlist *importer.EventList) (*empty.Empty, error) {
mce7028402019-07-18 04:10:01 +000089 fmt.Println("Received SubsrcribeEvents\n")
90 //Call API to subscribe events
91 ip_address := subeventlist.EventIpAddress
mc862dad02019-08-06 20:52:51 +000092 f := get_subscription_list(ip_address)
mce7028402019-07-18 04:10:01 +000093 for _, event := range subeventlist.Events {
mc862dad02019-08-06 20:52:51 +000094 if _, ok := s.devicemap[ip_address].Subscriptions[event]; !ok {
95 s.add_subscription(ip_address, event, f)
96 } else {
97 log.WithFields(log.Fields{
98 "Event": event,
99 }).Info("Already Subscribed")
100 }
mce7028402019-07-18 04:10:01 +0000101 }
mc862dad02019-08-06 20:52:51 +0000102 if f != nil { f.Close() }
mce7028402019-07-18 04:10:01 +0000103 return &empty.Empty{}, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000104}
105
106func (s *Server) UnSubsrcribeGivenEvents(c context.Context, unsubeventlist *importer.EventList) (*empty.Empty, error) {
mc862dad02019-08-06 20:52:51 +0000107 fmt.Println("Received UnSubsrcribeEvents\n")
108 ip_address := unsubeventlist.EventIpAddress
109 //Call API to unsubscribe events
110 f := get_subscription_list(ip_address)
111 for _, event := range unsubeventlist.Events {
112 if _, ok := s.devicemap[ip_address].Subscriptions[event]; ok {
113 s.remove_subscription(ip_address, event, f)
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000114 } else {
115 log.WithFields(log.Fields{
116 "Event": event,
117 }).Info("was not Subscribed")
118 }
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000119 }
mc862dad02019-08-06 20:52:51 +0000120 if f != nil { f.Close() }
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000121
122 return &empty.Empty{}, nil
123}
124
mce7028402019-07-18 04:10:01 +0000125func (s *Server) collect_data(ip_address string) {
mc862dad02019-08-06 20:52:51 +0000126 freqchan := s.devicemap[ip_address].Freqchan
127 ticker := s.devicemap[ip_address].Datacollector.getdata
128 donechan := s.devicemap[ip_address].Datacollector.quit
mce7028402019-07-18 04:10:01 +0000129 for {
130 select {
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000131 case freq := <-freqchan:
mce7028402019-07-18 04:10:01 +0000132 ticker.Stop()
133 ticker = *time.NewTicker(time.Duration(freq) * time.Second)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000134 case err := <-s.dataproducer.Errors():
135 fmt.Println("Failed to produce message:", err)
mce7028402019-07-18 04:10:01 +0000136 case <-ticker.C:
137 for _, service := range redfish_services {
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000138 rtn, data := s.get_status(ip_address, service)
mce7028402019-07-18 04:10:01 +0000139 if rtn {
140 for _, str := range data {
141 str = "Device IP: " + ip_address + " " + str
mc862dad02019-08-06 20:52:51 +0000142 fmt.Printf("collected data %s\n ...", str)
mce7028402019-07-18 04:10:01 +0000143 b := []byte(str)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000144 msg := &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000145 select {
mc862dad02019-08-06 20:52:51 +0000146 // TODO: this is blocking, maybe a timer?
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000147 case s.dataproducer.Input() <- msg:
148 fmt.Println("Produce message")
149 }
mce7028402019-07-18 04:10:01 +0000150 }
151 }
152 }
153 case <-donechan:
154 ticker.Stop()
155 fmt.Println("getdata ticker stopped")
156 return
157 }
158 }
159}
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000160
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000161func (s *Server) SendDeviceInfo(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
162 d := device {
mc862dad02019-08-06 20:52:51 +0000163 Subscriptions: make(map[string]string),
164 Freq: info.Frequency,
165 Datacollector: scheduler{
mce7028402019-07-18 04:10:01 +0000166 getdata: *time.NewTicker(time.Duration(info.Frequency) * time.Second),
167 quit: make(chan bool),
168 },
mc862dad02019-08-06 20:52:51 +0000169 Freqchan: make(chan uint32),
170 Vendor: info.Vendor,
171 Protocol: info.Protocol,
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000172 }
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000173 //default_events := [...]string{}
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000174 s.devicemap[info.IpAddress] = &d
mc862dad02019-08-06 20:52:51 +0000175 fmt.Printf("size of devicemap %d\n", len(s.devicemap))
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000176 ip_address:= info.IpAddress
mc862dad02019-08-06 20:52:51 +0000177 fmt.Printf("Configuring %s\n", ip_address)
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000178 // call subscription function with info.IpAddress
mc862dad02019-08-06 20:52:51 +0000179
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000180 default_events := vendor_default_events[info.Vendor]
mc862dad02019-08-06 20:52:51 +0000181
182 f := get_subscription_list(ip_address)
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000183 for _, event := range default_events {
mc862dad02019-08-06 20:52:51 +0000184 s.add_subscription(ip_address, event, f)
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000185 }
mc862dad02019-08-06 20:52:51 +0000186 if f != nil { f.Close() }
mce7028402019-07-18 04:10:01 +0000187 go s.collect_data(ip_address)
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000188 return &empty.Empty{}, nil
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000189}
mc6a9f01a2019-06-26 21:31:23 +0000190
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000191func NewGrpcServer(grpcport string) (l net.Listener, g *grpc.Server, e error) {
mc862dad02019-08-06 20:52:51 +0000192 fmt.Printf("Listening %s\n", grpcport)
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000193 g = grpc.NewServer()
194 l, e = net.Listen("tcp", grpcport)
195 return
196}
197func (s *Server) startgrpcserver()error {
198 fmt.Println("starting gRPC Server")
199 grpcport := ":50051"
200 listener, gserver, err := NewGrpcServer(grpcport)
201 if err != nil {
202 fmt.Println("Failed to create gRPC server: %v", err)
203 return err
204 }
205 s.gRPCserver = gserver
206 importer.RegisterDeviceManagementServer(gserver, s)
207 if err := gserver.Serve(listener); err != nil {
208 fmt.Println("Failed to run gRPC server: %v", err)
209 return err
210 }
211 return nil
212
213}
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000214func (s *Server) kafkaCloseProducer(){
215 if err :=s.dataproducer.Close(); err != nil {
216 panic(err)
217 }
218
219}
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000220func (s *Server) kafkaInit() {
221 fmt.Println("Starting kafka init to Connect to broker: ")
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000222 config := sarama.NewConfig()
223 config.Producer.RequiredAcks = sarama.WaitForAll
224 config.Producer.Retry.Max = 10
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000225 producer, err := sarama.NewAsyncProducer([]string{"cord-kafka.default.svc.cluster.local:9092"}, config)
226 if err != nil {
227 panic(err)
228 }
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000229 s.dataproducer = producer
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000230}
231
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000232func (s *Server) handle_events(w http.ResponseWriter, r *http.Request) {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000233 signals := make(chan os.Signal, 1)
234 signal.Notify(signals, os.Interrupt)
235
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000236 fmt.Println(" IN Handle Event ")
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000237 if(r.Method == "POST"){
238 Body, err := ioutil.ReadAll(r.Body)
239 if err != nil {
240 fmt.Println("Error getting HTTP data",err)
241 }
242 defer r.Body.Close()
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000243 fmt.Println("Received Event Message ")
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000244 fmt.Printf("%s\n",Body)
245 message :=&sarama.ProducerMessage{
246 Topic: importerTopic,
247 Value: sarama.StringEncoder(Body),
248 }
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000249 s.dataproducer.Input() <- message
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000250 }
251}
252
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000253func (s *Server) runServer() {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000254 fmt.Println("Starting HTTP Server")
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000255 http.HandleFunc("/", s.handle_events)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000256 http.ListenAndServeTLS(":8080", "https-server.crt", "https-server.key", nil)
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000257}
258
mc862dad02019-08-06 20:52:51 +0000259func (s *Server) init_data_persistence() {
260 subscriptionListPath = pvmount + "/subscriptions"
261 if err := os.MkdirAll(subscriptionListPath, 0777); err != nil {
262 fmt.Println(err)
263 } else {
264 lists, err := ioutil.ReadDir(subscriptionListPath)
265 if err != nil {
266 fmt.Println(err)
267 } else {
268 for _, list := range lists {
269 b, err := ioutil.ReadFile(path.Join(subscriptionListPath, list.Name()))
270 if err != nil {
271 fmt.Println(err)
272 } else {
273 ip := list.Name()
274 d := device{}
275 json.Unmarshal(b, &d)
276 s.devicemap[ip] = &d
277 s.devicemap[ip].Datacollector.getdata = *time.NewTicker(time.Duration(s.devicemap[ip].Freq) * time.Second)
278 s.devicemap[ip].Datacollector.quit = make(chan bool)
279 s.devicemap[ip].Freqchan = make(chan uint32)
280 go s.collect_data(ip)
281 }
282 }
283 }
284 }
285}
286
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000287func init() {
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000288 Formatter := new(log.TextFormatter)
289 Formatter.TimestampFormat = "02-01-2006 15:04:05"
290 Formatter.FullTimestamp = true
291 log.SetFormatter(Formatter)
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000292 fmt.Println("Connecting to broker: ")
293 fmt.Println("Listening to http server")
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000294 log.Info("log Connecting to broker:")
295 log.Info("log Listening to http server ")
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000296 //sarama.Logger = log.New()
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000297}
298
mc862dad02019-08-06 20:52:51 +0000299func get_subscription_list(ip string) *os.File {
300 if pvmount == "" {
301 return nil
302 }
303 f, err := os.OpenFile(subscriptionListPath + "/" + ip, os.O_CREATE|os.O_RDWR, 0664)
304 if err != nil {
305 fmt.Println(err)
306 }
307 return f
308}
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000309
Dinesh Belwalkar01217962019-05-23 21:51:16 +0000310func main() {
311 fmt.Println("Starting Device-management Container")
mc862dad02019-08-06 20:52:51 +0000312
313 http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
314 client := &http.Client{
315 Timeout: 10 * time.Second,
316 }
317
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000318 s := Server {
319 devicemap: make(map[string]*device),
320 devicechan: make(chan *importer.DeviceInfo),
mc862dad02019-08-06 20:52:51 +0000321 httpclient: client,
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000322 }
mc862dad02019-08-06 20:52:51 +0000323
324 s.kafkaInit()
325//TODO: check if we should keep this as goroutines?
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000326 go s.runServer()
327 go s.startgrpcserver()
mc862dad02019-08-06 20:52:51 +0000328
329 if pvmount != "" {
330 s.init_data_persistence()
331 }
332
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +0000333 quit := make(chan os.Signal)
334 signal.Notify(quit, os.Interrupt)
335
336 select {
337 case sig := <-quit:
338 fmt.Println("Shutting down:", sig)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000339 s.kafkaCloseProducer()
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +0000340 }
Dinesh Belwalkar01217962019-05-23 21:51:16 +0000341}