blob: 4f05c882cd5eaa41b1e976f156fed6bf8591e6a1 [file] [log] [blame]
Dinesh Belwalkar01217962019-05-23 21:51:16 +00001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package main
16
17import (
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000018 "crypto/tls"
19 "encoding/json"
Dinesh Belwalkar01217962019-05-23 21:51:16 +000020 "fmt"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000021 "github.com/Shopify/sarama"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000022 empty "github.com/golang/protobuf/ptypes/empty"
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +000023 "github.com/opencord/device-management/proto"
24 log "github.com/sirupsen/logrus"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000025 "golang.org/x/net/context"
26 "google.golang.org/grpc"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
29 "io/ioutil"
Dinesh Belwalkar41229602019-06-21 16:58:06 +000030 "net"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000031 "net/http"
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +000032 "os"
33 "os/signal"
mc862dad02019-08-06 20:52:51 +000034 "path"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000035 "time"
Dinesh Belwalkar01217962019-05-23 21:51:16 +000036)
37
mce7028402019-07-18 04:10:01 +000038//globals
mc20a4b5f2019-10-16 20:28:24 +000039const RF_DEFAULT_PROTOCOL = "https://"
40const RF_DATA_COLLECT_THRESHOLD = 5
41const RF_DATA_COLLECT_DUMMY_INTERVAL = 1000
mce7028402019-07-18 04:10:01 +000042const CONTENT_TYPE = "application/json"
43
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000044var (
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000045 importerTopic = "importer"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000046)
47
48var DataProducer sarama.AsyncProducer
49
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +000050var redfish_resources = [...]string{"/redfish/v1/Chassis", "/redfish/v1/Systems", "/redfish/v1/EthernetSwitches"}
mc862dad02019-08-06 20:52:51 +000051var pvmount = os.Getenv("DEVICE_MANAGEMENT_PVMOUNT")
52var subscriptionListPath string
mce7028402019-07-18 04:10:01 +000053
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +000054type scheduler struct {
55 getdata *time.Ticker
56 quit chan bool
mc20a4b5f2019-10-16 20:28:24 +000057 getdataend chan bool
mce7028402019-07-18 04:10:01 +000058}
59
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +000060type device struct {
61 Subscriptions map[string]string `json:"ss"`
62 Freq uint32 `json:"freq"`
63 Datacollector scheduler `json:"-"`
64 Freqchan chan uint32 `json:"-"`
65 Eventtypes []string `json:"eventtypes"`
66 Datafile *os.File `json:"-"`
Dinesh Belwalkar41229602019-06-21 16:58:06 +000067}
68
69type Server struct {
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +000070 devicemap map[string]*device
71 gRPCserver *grpc.Server
72 dataproducer sarama.AsyncProducer
73 httpclient *http.Client
Dinesh Belwalkar41229602019-06-21 16:58:06 +000074}
75
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000076func (s *Server) ClearCurrentEventList(c context.Context, info *importer.Device) (*empty.Empty, error) {
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +000077 fmt.Println("Received ClearCurrentEventList")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000078 ip_address := info.IpAddress
79 _, found := s.devicemap[ip_address]
80 if !found {
81 return nil, status.Errorf(codes.NotFound, "Device not registered")
82 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000083 for event, _ := range s.devicemap[ip_address].Subscriptions {
mc20a4b5f2019-10-16 20:28:24 +000084 rtn := s.remove_subscription(ip_address, event)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000085 if !rtn {
86 log.WithFields(log.Fields{
87 "Event": event,
88 }).Info("Error removing event")
89 }
90 }
mc20a4b5f2019-10-16 20:28:24 +000091 s.update_data_file(ip_address)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000092 return &empty.Empty{}, nil
93}
94
95func (s *Server) GetCurrentEventList(c context.Context, info *importer.Device) (*importer.EventList, error) {
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +000096 fmt.Println("Received GetCurrentEventList")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000097 _, found := s.devicemap[info.IpAddress]
98 if !found {
99 return nil, status.Errorf(codes.NotFound, "Device not registered")
100 }
101 currentevents := new(importer.EventList)
102 for event, _ := range s.devicemap[info.IpAddress].Subscriptions {
103 currentevents.Events = append(currentevents.Events, event)
104 }
105 return currentevents, nil
106}
107
mc20a4b5f2019-10-16 20:28:24 +0000108func (s *Server) GetEventList(c context.Context, info *importer.Device) (*importer.EventList, error) {
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000109 fmt.Println("Received GetEventList")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000110 eventstobesubscribed := new(importer.EventList)
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000111 // eventstobesubscribed.Events = s.devicemap[info.IpAddress].Eventtypes
mc20a4b5f2019-10-16 20:28:24 +0000112 eventstobesubscribed.Events = s.get_event_types(info.IpAddress)
113 if eventstobesubscribed.Events == nil {
114 return nil, status.Errorf(codes.NotFound, "No events found")
115 }
mce7028402019-07-18 04:10:01 +0000116 return eventstobesubscribed, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000117}
118
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000119func (s *Server) SetFrequency(c context.Context, info *importer.FreqInfo) (*empty.Empty, error) {
mce7028402019-07-18 04:10:01 +0000120 fmt.Println("Received SetFrequency")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000121 _, found := s.devicemap[info.IpAddress]
122 if !found {
123 return nil, status.Errorf(codes.NotFound, "Device not registered")
124 }
mc20a4b5f2019-10-16 20:28:24 +0000125 if info.Frequency > 0 && info.Frequency < RF_DATA_COLLECT_THRESHOLD {
126 return nil, status.Errorf(codes.InvalidArgument, "Invalid frequency")
127 }
mc862dad02019-08-06 20:52:51 +0000128 s.devicemap[info.IpAddress].Freqchan <- info.Frequency
mc20a4b5f2019-10-16 20:28:24 +0000129 s.devicemap[info.IpAddress].Freq = info.Frequency
130 s.update_data_file(info.IpAddress)
mce7028402019-07-18 04:10:01 +0000131 return &empty.Empty{}, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000132}
133
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000134func (s *Server) SubsrcribeGivenEvents(c context.Context, subeventlist *importer.GivenEventList) (*empty.Empty, error) {
mc20a4b5f2019-10-16 20:28:24 +0000135 errstring := ""
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000136 fmt.Println("Received SubsrcribeEvents")
mce7028402019-07-18 04:10:01 +0000137 //Call API to subscribe events
138 ip_address := subeventlist.EventIpAddress
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000139 _, found := s.devicemap[ip_address]
140 if !found {
141 return nil, status.Errorf(codes.NotFound, "Device not registered")
142 }
143 if len(subeventlist.Events) <= 0 {
144 return nil, status.Errorf(codes.InvalidArgument, "Event list is empty")
145 }
mce7028402019-07-18 04:10:01 +0000146 for _, event := range subeventlist.Events {
mc862dad02019-08-06 20:52:51 +0000147 if _, ok := s.devicemap[ip_address].Subscriptions[event]; !ok {
mc20a4b5f2019-10-16 20:28:24 +0000148 rtn := s.add_subscription(ip_address, event)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000149 if !rtn {
mc20a4b5f2019-10-16 20:28:24 +0000150 errstring = errstring + "failed to subscribe event " + ip_address + " " + event + "\n"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000151 log.WithFields(log.Fields{
152 "Event": event,
153 }).Info("Error adding event")
154 }
mc862dad02019-08-06 20:52:51 +0000155 } else {
mc20a4b5f2019-10-16 20:28:24 +0000156 errstring = errstring + "event " + event + " already subscribed\n"
mc862dad02019-08-06 20:52:51 +0000157 log.WithFields(log.Fields{
158 "Event": event,
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000159 }).Info("Already Subscribed")
mc862dad02019-08-06 20:52:51 +0000160 }
mce7028402019-07-18 04:10:01 +0000161 }
mc20a4b5f2019-10-16 20:28:24 +0000162 s.update_data_file(ip_address)
163 if errstring != "" {
164 return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000165 }
mce7028402019-07-18 04:10:01 +0000166 return &empty.Empty{}, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000167}
168
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000169func (s *Server) UnSubsrcribeGivenEvents(c context.Context, unsubeventlist *importer.GivenEventList) (*empty.Empty, error) {
mc20a4b5f2019-10-16 20:28:24 +0000170 errstring := ""
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000171 fmt.Println("Received UnSubsrcribeEvents")
mc862dad02019-08-06 20:52:51 +0000172 ip_address := unsubeventlist.EventIpAddress
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000173 _, found := s.devicemap[ip_address]
174 if !found {
175 return nil, status.Errorf(codes.NotFound, "Device not registered")
176 }
177
178 if len(unsubeventlist.Events) <= 0 {
179 return nil, status.Errorf(codes.InvalidArgument, "Event list is empty")
180 }
mc862dad02019-08-06 20:52:51 +0000181 //Call API to unsubscribe events
mc862dad02019-08-06 20:52:51 +0000182 for _, event := range unsubeventlist.Events {
183 if _, ok := s.devicemap[ip_address].Subscriptions[event]; ok {
mc20a4b5f2019-10-16 20:28:24 +0000184 rtn := s.remove_subscription(ip_address, event)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000185 if !rtn {
mc20a4b5f2019-10-16 20:28:24 +0000186 errstring = errstring + "failed to unsubscribe event " + ip_address + " " + event + "\n"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000187 log.WithFields(log.Fields{
188 "Event": event,
189 }).Info("Error removing event")
190 }
191 } else {
mc20a4b5f2019-10-16 20:28:24 +0000192 errstring = errstring + "event " + event + " not found\n"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000193 log.WithFields(log.Fields{
194 "Event": event,
195 }).Info("was not Subscribed")
196 }
197 }
mc20a4b5f2019-10-16 20:28:24 +0000198 s.update_data_file(ip_address)
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000199
mc20a4b5f2019-10-16 20:28:24 +0000200 if errstring != "" {
201 return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
202 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000203 return &empty.Empty{}, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000204}
205
mc20a4b5f2019-10-16 20:28:24 +0000206func (s *Server) update_data_file(ip_address string) {
207 f := s.devicemap[ip_address].Datafile
208 if f != nil {
209 b, err := json.Marshal(s.devicemap[ip_address])
210 if err != nil {
211 fmt.Println(err)
212 } else {
213 f.Truncate(0)
214 f.Seek(0, 0)
215 n, err := f.Write(b)
216 if err != nil {
217 fmt.Println("err wrote", n, "bytes")
218 fmt.Println(err)
219 }
220 }
221 } else {
222 fmt.Println("file handle is nil", ip_address)
223 }
224}
225
mce7028402019-07-18 04:10:01 +0000226func (s *Server) collect_data(ip_address string) {
mc862dad02019-08-06 20:52:51 +0000227 freqchan := s.devicemap[ip_address].Freqchan
228 ticker := s.devicemap[ip_address].Datacollector.getdata
229 donechan := s.devicemap[ip_address].Datacollector.quit
mce7028402019-07-18 04:10:01 +0000230 for {
231 select {
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000232 case freq := <-freqchan:
mce7028402019-07-18 04:10:01 +0000233 ticker.Stop()
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000234 if freq > 0 {
235 ticker = time.NewTicker(time.Duration(freq) * time.Second)
mc20a4b5f2019-10-16 20:28:24 +0000236 s.devicemap[ip_address].Datacollector.getdata = ticker
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000237 }
238 case err := <-s.dataproducer.Errors():
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000239 fmt.Println("Failed to produce message:", err)
mce7028402019-07-18 04:10:01 +0000240 case <-ticker.C:
mc20a4b5f2019-10-16 20:28:24 +0000241 for _, resource := range redfish_resources {
242 data := s.get_status(ip_address, resource)
243 for _, str := range data {
244 str = "Device IP: " + ip_address + " " + str
245 fmt.Printf("collected data %s\n ...", str)
246 b := []byte(str)
247 msg := &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
248 select {
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000249 case s.dataproducer.Input() <- msg:
mc20a4b5f2019-10-16 20:28:24 +0000250 fmt.Println("Produce message")
251 default:
mce7028402019-07-18 04:10:01 +0000252 }
253 }
254 }
255 case <-donechan:
256 ticker.Stop()
257 fmt.Println("getdata ticker stopped")
mc20a4b5f2019-10-16 20:28:24 +0000258 s.devicemap[ip_address].Datacollector.getdataend <- true
mce7028402019-07-18 04:10:01 +0000259 return
260 }
261 }
262}
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000263
mc20a4b5f2019-10-16 20:28:24 +0000264func (s *Server) DeleteDeviceList(c context.Context, list *importer.DeviceListByIp) (*empty.Empty, error) {
265 fmt.Println("DeleteDeviceList received")
266 errstring := ""
267 for _, ip := range list.Ip {
268 if _, ok := s.devicemap[ip]; !ok {
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000269 fmt.Printf("Device not found %s\n ", ip)
mc20a4b5f2019-10-16 20:28:24 +0000270 errstring = errstring + "Device " + ip + " not found\n"
271 continue
272 }
273 for event, _ := range s.devicemap[ip].Subscriptions {
274 rtn := s.remove_subscription(ip, event)
275 if !rtn {
276 log.WithFields(log.Fields{
277 "Event": event,
278 }).Info("Error removing event")
279 }
280 }
281 fmt.Println("deleting device", ip)
282 s.devicemap[ip].Datacollector.quit <- true
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000283
mc20a4b5f2019-10-16 20:28:24 +0000284 f := s.devicemap[ip].Datafile
285 if f != nil {
286 fmt.Println("deleteing file", f.Name())
287 err := f.Close()
288 if err != nil {
289 fmt.Println("error closing file ", f.Name(), err)
290 errstring = errstring + "error closing file " + f.Name() + "\n"
291 }
292 err = os.Remove(f.Name())
293 if err != nil {
294 fmt.Println("error deleting file ", f.Name(), err)
295 }
296 } else {
297 errstring = errstring + "file " + ip + " not found\n"
298 }
299 <-s.devicemap[ip].Datacollector.getdataend
300 delete(s.devicemap, ip)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000301 }
mc20a4b5f2019-10-16 20:28:24 +0000302 if errstring != "" {
303 return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000304 }
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000305 return &empty.Empty{}, nil
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000306}
mc6a9f01a2019-06-26 21:31:23 +0000307
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000308func (s *Server) SendDeviceList(c context.Context, list *importer.DeviceList) (*empty.Empty, error) {
mc20a4b5f2019-10-16 20:28:24 +0000309 errstring := ""
310 for _, dev := range list.Device {
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000311 ip_address := dev.IpAddress
mc20a4b5f2019-10-16 20:28:24 +0000312 if _, ok := s.devicemap[dev.IpAddress]; ok {
313 fmt.Printf("Device %s already exists", ip_address)
314 errstring = errstring + "Device " + ip_address + " already exists\n"
315 continue
316 }
317
318 if dev.Frequency > 0 && dev.Frequency < RF_DATA_COLLECT_THRESHOLD {
319 fmt.Printf("Device %s data collection frequency %d out of range", ip_address, dev.Frequency)
320 errstring = errstring + "Device " + ip_address + " data collection frequency out of range\n"
321 continue
322 }
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000323 d := device{
mc20a4b5f2019-10-16 20:28:24 +0000324 Subscriptions: make(map[string]string),
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000325 Freq: dev.Frequency,
mc20a4b5f2019-10-16 20:28:24 +0000326 Datacollector: scheduler{
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000327 quit: make(chan bool),
mc20a4b5f2019-10-16 20:28:24 +0000328 getdataend: make(chan bool),
329 },
330 Freqchan: make(chan uint32),
331 }
332 s.devicemap[ip_address] = &d
333 fmt.Printf("Configuring %s\n", ip_address)
334
335 /* if initial interval is 0, create a dummy ticker, which is stopped right away, so getdata is not nil */
336 freq := dev.Frequency
337 if freq == 0 {
338 freq = RF_DATA_COLLECT_DUMMY_INTERVAL
339 }
340 s.devicemap[ip_address].Datacollector.getdata = time.NewTicker(time.Duration(freq) * time.Second)
341 if dev.Frequency == 0 {
342 s.devicemap[ip_address].Datacollector.getdata.Stop()
343 }
344
345 eventtypes := s.get_event_types(ip_address)
346 if eventtypes != nil {
347 for _, event := range eventtypes {
348 s.devicemap[ip_address].Eventtypes = append(s.devicemap[ip_address].Eventtypes, event)
349 if s.add_subscription(ip_address, event) == false {
350 errstring = errstring + "failed to subscribe event " + ip_address + " " + event + "\n"
351 }
352 }
353 }
354 go s.collect_data(ip_address)
355 s.devicemap[ip_address].Datafile = get_data_file(ip_address)
356 s.update_data_file(ip_address)
357 }
358 if errstring != "" {
359 return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
360 }
361 return &empty.Empty{}, nil
362}
363
364func (s *Server) GetCurrentDevices(c context.Context, e *importer.Empty) (*importer.DeviceListByIp, error) {
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000365 fmt.Println("In Received GetCurrentDevices")
dileepbk86ef0102019-11-13 00:08:33 +0000366
367 if len(s.devicemap) == 0 {
368 return nil, status.Errorf(codes.NotFound, "Devices not registered")
369 }
mc20a4b5f2019-10-16 20:28:24 +0000370 dl := new(importer.DeviceListByIp)
dileepbk86ef0102019-11-13 00:08:33 +0000371 for k, v := range s.devicemap {
372 if v != nil {
373 fmt.Printf("IpAdd[%s] \n", k)
374 dl.Ip = append(dl.Ip, k)
375 }
376 }
377 return dl, nil
378}
379
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000380func NewGrpcServer(grpcport string) (l net.Listener, g *grpc.Server, e error) {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000381 fmt.Printf("Listening %s\n", grpcport)
382 g = grpc.NewServer()
383 l, e = net.Listen("tcp", grpcport)
384 return
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000385}
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000386func (s *Server) startgrpcserver() error {
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000387 fmt.Println("starting gRPC Server")
388 grpcport := ":50051"
389 listener, gserver, err := NewGrpcServer(grpcport)
390 if err != nil {
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000391 fmt.Println("Failed to create gRPC server: ", err)
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000392 return err
393 }
394 s.gRPCserver = gserver
395 importer.RegisterDeviceManagementServer(gserver, s)
396 if err := gserver.Serve(listener); err != nil {
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000397 fmt.Println("Failed to run gRPC server: ", err)
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000398 return err
399 }
400 return nil
401
402}
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000403func (s *Server) kafkaCloseProducer() {
404 if err := s.dataproducer.Close(); err != nil {
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000405 panic(err)
406 }
407
408}
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000409func (s *Server) kafkaInit() {
410 fmt.Println("Starting kafka init to Connect to broker: ")
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000411 config := sarama.NewConfig()
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000412 config.Producer.RequiredAcks = sarama.WaitForAll
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000413 config.Producer.Retry.Max = 10
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000414 producer, err := sarama.NewAsyncProducer([]string{"cord-kafka.default.svc.cluster.local:9092"}, config)
415 if err != nil {
416 panic(err)
417 }
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000418 s.dataproducer = producer
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000419}
420
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000421func (s *Server) handle_events(w http.ResponseWriter, r *http.Request) {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000422 signals := make(chan os.Signal, 1)
423 signal.Notify(signals, os.Interrupt)
424
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000425 fmt.Println(" IN Handle Event ")
426 if r.Method == "POST" {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000427 Body, err := ioutil.ReadAll(r.Body)
428 if err != nil {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000429 fmt.Println("Error getting HTTP data", err)
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000430 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000431 defer r.Body.Close()
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000432 fmt.Println("Received Event Message ")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000433 fmt.Printf("%s\n", Body)
434 message := &sarama.ProducerMessage{
435 Topic: importerTopic,
436 Value: sarama.StringEncoder(Body),
437 }
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000438 s.dataproducer.Input() <- message
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000439 }
440}
441
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000442func (s *Server) runServer() {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000443 fmt.Println("Starting HTTP Server")
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000444 http.HandleFunc("/", s.handle_events)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000445 http.ListenAndServeTLS(":8080", "https-server.crt", "https-server.key", nil)
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000446}
447
mc862dad02019-08-06 20:52:51 +0000448func (s *Server) init_data_persistence() {
mc20a4b5f2019-10-16 20:28:24 +0000449 fmt.Println("Retrieving persisted data")
mc862dad02019-08-06 20:52:51 +0000450 subscriptionListPath = pvmount + "/subscriptions"
451 if err := os.MkdirAll(subscriptionListPath, 0777); err != nil {
452 fmt.Println(err)
453 } else {
mc20a4b5f2019-10-16 20:28:24 +0000454 files, err := ioutil.ReadDir(subscriptionListPath)
mc862dad02019-08-06 20:52:51 +0000455 if err != nil {
456 fmt.Println(err)
457 } else {
mc20a4b5f2019-10-16 20:28:24 +0000458 for _, f := range files {
459 b, err := ioutil.ReadFile(path.Join(subscriptionListPath, f.Name()))
mc862dad02019-08-06 20:52:51 +0000460 if err != nil {
461 fmt.Println(err)
mc20a4b5f2019-10-16 20:28:24 +0000462 } else if f.Size() > 0 {
463 ip := f.Name()
mc862dad02019-08-06 20:52:51 +0000464 d := device{}
465 json.Unmarshal(b, &d)
466 s.devicemap[ip] = &d
mc20a4b5f2019-10-16 20:28:24 +0000467 freq := s.devicemap[ip].Freq
468
469 /* if initial interval is 0, create a dummy ticker, which is stopped right away, so getdata is not nil */
470 if freq == 0 {
471 freq = RF_DATA_COLLECT_DUMMY_INTERVAL
472 }
473 s.devicemap[ip].Datacollector.getdata = time.NewTicker(time.Duration(freq) * time.Second)
474 if s.devicemap[ip].Freq == 0 {
475 s.devicemap[ip].Datacollector.getdata.Stop()
476 }
477
mc862dad02019-08-06 20:52:51 +0000478 s.devicemap[ip].Datacollector.quit = make(chan bool)
mc20a4b5f2019-10-16 20:28:24 +0000479 s.devicemap[ip].Datacollector.getdataend = make(chan bool)
mc862dad02019-08-06 20:52:51 +0000480 s.devicemap[ip].Freqchan = make(chan uint32)
mc20a4b5f2019-10-16 20:28:24 +0000481 s.devicemap[ip].Datafile = get_data_file(ip)
mc862dad02019-08-06 20:52:51 +0000482 go s.collect_data(ip)
483 }
484 }
485 }
486 }
487}
488
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000489func init() {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000490 Formatter := new(log.TextFormatter)
491 Formatter.TimestampFormat = "02-01-2006 15:04:05"
492 Formatter.FullTimestamp = true
493 log.SetFormatter(Formatter)
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000494 fmt.Println("Connecting to broker: ")
495 fmt.Println("Listening to http server")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000496 log.Info("log Connecting to broker:")
497 log.Info("log Listening to http server ")
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000498 //sarama.Logger = log.New()
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000499}
500
mc20a4b5f2019-10-16 20:28:24 +0000501func get_data_file(ip string) *os.File {
mc862dad02019-08-06 20:52:51 +0000502 if pvmount == "" {
503 return nil
504 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000505 f, err := os.OpenFile(subscriptionListPath+"/"+ip, os.O_CREATE|os.O_RDWR, 0664)
mc862dad02019-08-06 20:52:51 +0000506 if err != nil {
507 fmt.Println(err)
508 }
509 return f
510}
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000511
mc20a4b5f2019-10-16 20:28:24 +0000512func (s *Server) close_data_files() {
513 for ip, _ := range s.devicemap {
514 s.devicemap[ip].Datafile.Close()
515 }
516}
517
Dinesh Belwalkar01217962019-05-23 21:51:16 +0000518func main() {
519 fmt.Println("Starting Device-management Container")
mc862dad02019-08-06 20:52:51 +0000520
521 http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
522 client := &http.Client{
523 Timeout: 10 * time.Second,
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000524 }
mc862dad02019-08-06 20:52:51 +0000525
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000526 s := Server{
527 devicemap: make(map[string]*device),
528 httpclient: client,
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000529 }
mc862dad02019-08-06 20:52:51 +0000530
531 s.kafkaInit()
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000532 go s.runServer()
533 go s.startgrpcserver()
mc862dad02019-08-06 20:52:51 +0000534
535 if pvmount != "" {
536 s.init_data_persistence()
537 }
538
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +0000539 quit := make(chan os.Signal)
540 signal.Notify(quit, os.Interrupt)
541
542 select {
543 case sig := <-quit:
544 fmt.Println("Shutting down:", sig)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000545 s.kafkaCloseProducer()
mc20a4b5f2019-10-16 20:28:24 +0000546 s.close_data_files()
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +0000547 }
Dinesh Belwalkar01217962019-05-23 21:51:16 +0000548}