blob: c17926c688005eddc19bb248723499634892892b [file] [log] [blame]
Dinesh Belwalkar01217962019-05-23 21:51:16 +00001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package main
16
17import (
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000018 "crypto/tls"
19 "encoding/json"
Dinesh Belwalkar01217962019-05-23 21:51:16 +000020 "fmt"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000021 "github.com/Shopify/sarama"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000022 empty "github.com/golang/protobuf/ptypes/empty"
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +000023 "github.com/opencord/device-management/proto"
24 log "github.com/sirupsen/logrus"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000025 "golang.org/x/net/context"
26 "google.golang.org/grpc"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
29 "io/ioutil"
Dinesh Belwalkar41229602019-06-21 16:58:06 +000030 "net"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000031 "net/http"
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +000032 "os"
33 "os/signal"
mc862dad02019-08-06 20:52:51 +000034 "path"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000035 "time"
mccd7e9502019-12-16 22:04:13 +000036 "strings"
37 "strconv"
Dinesh Belwalkar01217962019-05-23 21:51:16 +000038)
39
mce7028402019-07-18 04:10:01 +000040//globals
mc20a4b5f2019-10-16 20:28:24 +000041const RF_DEFAULT_PROTOCOL = "https://"
42const RF_DATA_COLLECT_THRESHOLD = 5
43const RF_DATA_COLLECT_DUMMY_INTERVAL = 1000
mce7028402019-07-18 04:10:01 +000044const CONTENT_TYPE = "application/json"
45
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000046var (
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000047 importerTopic = "importer"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000048)
49
50var DataProducer sarama.AsyncProducer
51
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +000052var redfish_resources = [...]string{"/redfish/v1/Chassis", "/redfish/v1/Systems", "/redfish/v1/EthernetSwitches"}
mc862dad02019-08-06 20:52:51 +000053var pvmount = os.Getenv("DEVICE_MANAGEMENT_PVMOUNT")
54var subscriptionListPath string
mce7028402019-07-18 04:10:01 +000055
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +000056type scheduler struct {
57 getdata *time.Ticker
58 quit chan bool
mc20a4b5f2019-10-16 20:28:24 +000059 getdataend chan bool
mce7028402019-07-18 04:10:01 +000060}
61
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +000062type device struct {
63 Subscriptions map[string]string `json:"ss"`
64 Freq uint32 `json:"freq"`
65 Datacollector scheduler `json:"-"`
66 Freqchan chan uint32 `json:"-"`
67 Eventtypes []string `json:"eventtypes"`
68 Datafile *os.File `json:"-"`
Dinesh Belwalkar41229602019-06-21 16:58:06 +000069}
70
71type Server struct {
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +000072 devicemap map[string]*device
73 gRPCserver *grpc.Server
74 dataproducer sarama.AsyncProducer
75 httpclient *http.Client
Dinesh Belwalkar41229602019-06-21 16:58:06 +000076}
77
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000078func (s *Server) ClearCurrentEventList(c context.Context, info *importer.Device) (*empty.Empty, error) {
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +000079 fmt.Println("Received ClearCurrentEventList")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000080 ip_address := info.IpAddress
mccd7e9502019-12-16 22:04:13 +000081 if msg, ok := s.validate_ip(ip_address, true, true); !ok {
82 return nil, status.Errorf(codes.InvalidArgument, msg)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000083 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000084 for event, _ := range s.devicemap[ip_address].Subscriptions {
mc20a4b5f2019-10-16 20:28:24 +000085 rtn := s.remove_subscription(ip_address, event)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000086 if !rtn {
87 log.WithFields(log.Fields{
88 "Event": event,
89 }).Info("Error removing event")
90 }
91 }
mc20a4b5f2019-10-16 20:28:24 +000092 s.update_data_file(ip_address)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000093 return &empty.Empty{}, nil
94}
95
96func (s *Server) GetCurrentEventList(c context.Context, info *importer.Device) (*importer.EventList, error) {
mccd7e9502019-12-16 22:04:13 +000097 fmt.Println("Received GetCurrentEventList\n")
98 if msg, ok := s.validate_ip(info.IpAddress, true, true); !ok {
99 return nil, status.Errorf(codes.InvalidArgument, msg)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000100 }
101 currentevents := new(importer.EventList)
102 for event, _ := range s.devicemap[info.IpAddress].Subscriptions {
103 currentevents.Events = append(currentevents.Events, event)
104 }
105 return currentevents, nil
106}
107
mc20a4b5f2019-10-16 20:28:24 +0000108func (s *Server) GetEventList(c context.Context, info *importer.Device) (*importer.EventList, error) {
mccd7e9502019-12-16 22:04:13 +0000109 fmt.Println("Received GetEventList\n")
110 if msg, ok := s.validate_ip(info.IpAddress, true, true); !ok {
111 return nil, status.Errorf(codes.InvalidArgument, msg)
112 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000113 eventstobesubscribed := new(importer.EventList)
mccd7e9502019-12-16 22:04:13 +0000114// eventstobesubscribed.Events = s.devicemap[info.IpAddress].Eventtypes
115 eventstobesubscribed.Events = s.devicemap[info.IpAddress].Eventtypes
mc20a4b5f2019-10-16 20:28:24 +0000116 if eventstobesubscribed.Events == nil {
mccd7e9502019-12-16 22:04:13 +0000117 return nil, status.Errorf(codes.NotFound, "No events found\n")
mc20a4b5f2019-10-16 20:28:24 +0000118 }
mce7028402019-07-18 04:10:01 +0000119 return eventstobesubscribed, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000120}
121
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000122func (s *Server) SetFrequency(c context.Context, info *importer.FreqInfo) (*empty.Empty, error) {
mce7028402019-07-18 04:10:01 +0000123 fmt.Println("Received SetFrequency")
mccd7e9502019-12-16 22:04:13 +0000124 if msg, ok := s.validate_ip(info.IpAddress, true, true); !ok {
125 return nil, status.Errorf(codes.InvalidArgument, msg)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000126 }
mc20a4b5f2019-10-16 20:28:24 +0000127 if info.Frequency > 0 && info.Frequency < RF_DATA_COLLECT_THRESHOLD {
mccd7e9502019-12-16 22:04:13 +0000128 return nil, status.Errorf(codes.InvalidArgument, "Invalid interval\n")
mc20a4b5f2019-10-16 20:28:24 +0000129 }
mc862dad02019-08-06 20:52:51 +0000130 s.devicemap[info.IpAddress].Freqchan <- info.Frequency
mc20a4b5f2019-10-16 20:28:24 +0000131 s.devicemap[info.IpAddress].Freq = info.Frequency
132 s.update_data_file(info.IpAddress)
mce7028402019-07-18 04:10:01 +0000133 return &empty.Empty{}, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000134}
135
mccd7e9502019-12-16 22:04:13 +0000136func (s *Server) SubscribeGivenEvents(c context.Context, subeventlist *importer.GivenEventList) (*empty.Empty, error) {
mc20a4b5f2019-10-16 20:28:24 +0000137 errstring := ""
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000138 fmt.Println("Received SubsrcribeEvents")
mce7028402019-07-18 04:10:01 +0000139 //Call API to subscribe events
140 ip_address := subeventlist.EventIpAddress
mccd7e9502019-12-16 22:04:13 +0000141 if msg, ok := s.validate_ip(ip_address, true, true); !ok {
142 return nil, status.Errorf(codes.InvalidArgument, msg)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000143 }
144 if len(subeventlist.Events) <= 0 {
mccd7e9502019-12-16 22:04:13 +0000145 return nil, status.Errorf(codes.InvalidArgument, "Event list is empty\n")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000146 }
mce7028402019-07-18 04:10:01 +0000147 for _, event := range subeventlist.Events {
mc862dad02019-08-06 20:52:51 +0000148 if _, ok := s.devicemap[ip_address].Subscriptions[event]; !ok {
mccd7e9502019-12-16 22:04:13 +0000149 supported := false
150 for _, e := range s.devicemap[ip_address].Eventtypes{
151 if e == event {
152 supported = true
153 rtn := s.add_subscription(ip_address, event)
154 if !rtn {
155 errstring = errstring + "failed to subscribe event " + ip_address + " " + event + "\n"
156 log.WithFields(log.Fields{
157 "Event": event,
158 }).Info("Error adding event")
159 }
160 break
161 }
162 }
163 if supported == false {
164 errstring = errstring + "event " + event + " not supported\n"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000165 log.WithFields(log.Fields{
166 "Event": event,
mccd7e9502019-12-16 22:04:13 +0000167 }).Info("not supported")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000168 }
mc862dad02019-08-06 20:52:51 +0000169 } else {
mc20a4b5f2019-10-16 20:28:24 +0000170 errstring = errstring + "event " + event + " already subscribed\n"
mc862dad02019-08-06 20:52:51 +0000171 log.WithFields(log.Fields{
172 "Event": event,
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000173 }).Info("Already Subscribed")
mc862dad02019-08-06 20:52:51 +0000174 }
mce7028402019-07-18 04:10:01 +0000175 }
mc20a4b5f2019-10-16 20:28:24 +0000176 s.update_data_file(ip_address)
177 if errstring != "" {
178 return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000179 }
mce7028402019-07-18 04:10:01 +0000180 return &empty.Empty{}, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000181}
182
mccd7e9502019-12-16 22:04:13 +0000183func (s *Server) UnsubscribeGivenEvents(c context.Context, unsubeventlist *importer.GivenEventList) (*empty.Empty, error) {
mc20a4b5f2019-10-16 20:28:24 +0000184 errstring := ""
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000185 fmt.Println("Received UnSubsrcribeEvents")
mc862dad02019-08-06 20:52:51 +0000186 ip_address := unsubeventlist.EventIpAddress
mccd7e9502019-12-16 22:04:13 +0000187 if msg, ok := s.validate_ip(ip_address, true, true); !ok {
188 return nil, status.Errorf(codes.InvalidArgument, msg)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000189 }
190
191 if len(unsubeventlist.Events) <= 0 {
mccd7e9502019-12-16 22:04:13 +0000192 return nil, status.Errorf(codes.InvalidArgument, "Event list is empty\n")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000193 }
mc862dad02019-08-06 20:52:51 +0000194 //Call API to unsubscribe events
mc862dad02019-08-06 20:52:51 +0000195 for _, event := range unsubeventlist.Events {
196 if _, ok := s.devicemap[ip_address].Subscriptions[event]; ok {
mc20a4b5f2019-10-16 20:28:24 +0000197 rtn := s.remove_subscription(ip_address, event)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000198 if !rtn {
mc20a4b5f2019-10-16 20:28:24 +0000199 errstring = errstring + "failed to unsubscribe event " + ip_address + " " + event + "\n"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000200 log.WithFields(log.Fields{
201 "Event": event,
202 }).Info("Error removing event")
203 }
204 } else {
mc20a4b5f2019-10-16 20:28:24 +0000205 errstring = errstring + "event " + event + " not found\n"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000206 log.WithFields(log.Fields{
207 "Event": event,
208 }).Info("was not Subscribed")
209 }
210 }
mc20a4b5f2019-10-16 20:28:24 +0000211 s.update_data_file(ip_address)
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000212
mc20a4b5f2019-10-16 20:28:24 +0000213 if errstring != "" {
214 return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
215 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000216 return &empty.Empty{}, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000217}
218
mc20a4b5f2019-10-16 20:28:24 +0000219func (s *Server) update_data_file(ip_address string) {
220 f := s.devicemap[ip_address].Datafile
221 if f != nil {
222 b, err := json.Marshal(s.devicemap[ip_address])
223 if err != nil {
224 fmt.Println(err)
225 } else {
226 f.Truncate(0)
227 f.Seek(0, 0)
228 n, err := f.Write(b)
229 if err != nil {
230 fmt.Println("err wrote", n, "bytes")
231 fmt.Println(err)
232 }
233 }
234 } else {
235 fmt.Println("file handle is nil", ip_address)
236 }
237}
238
mce7028402019-07-18 04:10:01 +0000239func (s *Server) collect_data(ip_address string) {
mc862dad02019-08-06 20:52:51 +0000240 freqchan := s.devicemap[ip_address].Freqchan
241 ticker := s.devicemap[ip_address].Datacollector.getdata
242 donechan := s.devicemap[ip_address].Datacollector.quit
mce7028402019-07-18 04:10:01 +0000243 for {
244 select {
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000245 case freq := <-freqchan:
mce7028402019-07-18 04:10:01 +0000246 ticker.Stop()
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000247 if freq > 0 {
248 ticker = time.NewTicker(time.Duration(freq) * time.Second)
mc20a4b5f2019-10-16 20:28:24 +0000249 s.devicemap[ip_address].Datacollector.getdata = ticker
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000250 }
251 case err := <-s.dataproducer.Errors():
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000252 fmt.Println("Failed to produce message:", err)
mce7028402019-07-18 04:10:01 +0000253 case <-ticker.C:
mc20a4b5f2019-10-16 20:28:24 +0000254 for _, resource := range redfish_resources {
255 data := s.get_status(ip_address, resource)
256 for _, str := range data {
257 str = "Device IP: " + ip_address + " " + str
258 fmt.Printf("collected data %s\n ...", str)
259 b := []byte(str)
260 msg := &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
261 select {
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000262 case s.dataproducer.Input() <- msg:
mc20a4b5f2019-10-16 20:28:24 +0000263 fmt.Println("Produce message")
264 default:
mce7028402019-07-18 04:10:01 +0000265 }
266 }
267 }
268 case <-donechan:
269 ticker.Stop()
270 fmt.Println("getdata ticker stopped")
mc20a4b5f2019-10-16 20:28:24 +0000271 s.devicemap[ip_address].Datacollector.getdataend <- true
mce7028402019-07-18 04:10:01 +0000272 return
273 }
274 }
275}
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000276
mc20a4b5f2019-10-16 20:28:24 +0000277func (s *Server) DeleteDeviceList(c context.Context, list *importer.DeviceListByIp) (*empty.Empty, error) {
278 fmt.Println("DeleteDeviceList received")
279 errstring := ""
280 for _, ip := range list.Ip {
281 if _, ok := s.devicemap[ip]; !ok {
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000282 fmt.Printf("Device not found %s\n ", ip)
mc20a4b5f2019-10-16 20:28:24 +0000283 errstring = errstring + "Device " + ip + " not found\n"
284 continue
285 }
286 for event, _ := range s.devicemap[ip].Subscriptions {
287 rtn := s.remove_subscription(ip, event)
288 if !rtn {
289 log.WithFields(log.Fields{
290 "Event": event,
291 }).Info("Error removing event")
292 }
293 }
294 fmt.Println("deleting device", ip)
295 s.devicemap[ip].Datacollector.quit <- true
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000296
mc20a4b5f2019-10-16 20:28:24 +0000297 f := s.devicemap[ip].Datafile
298 if f != nil {
299 fmt.Println("deleteing file", f.Name())
300 err := f.Close()
301 if err != nil {
302 fmt.Println("error closing file ", f.Name(), err)
303 errstring = errstring + "error closing file " + f.Name() + "\n"
304 }
305 err = os.Remove(f.Name())
306 if err != nil {
307 fmt.Println("error deleting file ", f.Name(), err)
308 }
309 } else {
310 errstring = errstring + "file " + ip + " not found\n"
311 }
312 <-s.devicemap[ip].Datacollector.getdataend
313 delete(s.devicemap, ip)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000314 }
mc20a4b5f2019-10-16 20:28:24 +0000315 if errstring != "" {
316 return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000317 }
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000318 return &empty.Empty{}, nil
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000319}
mc6a9f01a2019-06-26 21:31:23 +0000320
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000321func (s *Server) SendDeviceList(c context.Context, list *importer.DeviceList) (*empty.Empty, error) {
mc20a4b5f2019-10-16 20:28:24 +0000322 errstring := ""
323 for _, dev := range list.Device {
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000324 ip_address := dev.IpAddress
mccd7e9502019-12-16 22:04:13 +0000325 if msg, ok := s.validate_ip(ip_address, false, false); !ok {
326 errstring = errstring + msg
mc20a4b5f2019-10-16 20:28:24 +0000327 continue
328 }
mc20a4b5f2019-10-16 20:28:24 +0000329 if dev.Frequency > 0 && dev.Frequency < RF_DATA_COLLECT_THRESHOLD {
mccd7e9502019-12-16 22:04:13 +0000330 fmt.Printf("Device %s data collection interval %d out of range", ip_address, dev.Frequency)
mc20a4b5f2019-10-16 20:28:24 +0000331 errstring = errstring + "Device " + ip_address + " data collection frequency out of range\n"
332 continue
333 }
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000334 d := device{
mc20a4b5f2019-10-16 20:28:24 +0000335 Subscriptions: make(map[string]string),
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000336 Freq: dev.Frequency,
mc20a4b5f2019-10-16 20:28:24 +0000337 Datacollector: scheduler{
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000338 quit: make(chan bool),
mc20a4b5f2019-10-16 20:28:24 +0000339 getdataend: make(chan bool),
340 },
341 Freqchan: make(chan uint32),
342 }
343 s.devicemap[ip_address] = &d
344 fmt.Printf("Configuring %s\n", ip_address)
345
346 /* if initial interval is 0, create a dummy ticker, which is stopped right away, so getdata is not nil */
347 freq := dev.Frequency
348 if freq == 0 {
349 freq = RF_DATA_COLLECT_DUMMY_INTERVAL
350 }
351 s.devicemap[ip_address].Datacollector.getdata = time.NewTicker(time.Duration(freq) * time.Second)
352 if dev.Frequency == 0 {
353 s.devicemap[ip_address].Datacollector.getdata.Stop()
354 }
355
356 eventtypes := s.get_event_types(ip_address)
357 if eventtypes != nil {
358 for _, event := range eventtypes {
359 s.devicemap[ip_address].Eventtypes = append(s.devicemap[ip_address].Eventtypes, event)
360 if s.add_subscription(ip_address, event) == false {
361 errstring = errstring + "failed to subscribe event " + ip_address + " " + event + "\n"
362 }
363 }
364 }
365 go s.collect_data(ip_address)
366 s.devicemap[ip_address].Datafile = get_data_file(ip_address)
367 s.update_data_file(ip_address)
368 }
369 if errstring != "" {
370 return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
371 }
372 return &empty.Empty{}, nil
373}
374
375func (s *Server) GetCurrentDevices(c context.Context, e *importer.Empty) (*importer.DeviceListByIp, error) {
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000376 fmt.Println("In Received GetCurrentDevices")
dileepbk86ef0102019-11-13 00:08:33 +0000377
378 if len(s.devicemap) == 0 {
mccd7e9502019-12-16 22:04:13 +0000379 return nil, status.Errorf(codes.NotFound, "No Device found\n")
dileepbk86ef0102019-11-13 00:08:33 +0000380 }
mc20a4b5f2019-10-16 20:28:24 +0000381 dl := new(importer.DeviceListByIp)
dileepbk86ef0102019-11-13 00:08:33 +0000382 for k, v := range s.devicemap {
383 if v != nil {
384 fmt.Printf("IpAdd[%s] \n", k)
385 dl.Ip = append(dl.Ip, k)
386 }
387 }
388 return dl, nil
389}
390
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000391func NewGrpcServer(grpcport string) (l net.Listener, g *grpc.Server, e error) {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000392 fmt.Printf("Listening %s\n", grpcport)
393 g = grpc.NewServer()
394 l, e = net.Listen("tcp", grpcport)
395 return
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000396}
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000397func (s *Server) startgrpcserver() error {
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000398 fmt.Println("starting gRPC Server")
399 grpcport := ":50051"
400 listener, gserver, err := NewGrpcServer(grpcport)
401 if err != nil {
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000402 fmt.Println("Failed to create gRPC server: ", err)
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000403 return err
404 }
405 s.gRPCserver = gserver
406 importer.RegisterDeviceManagementServer(gserver, s)
407 if err := gserver.Serve(listener); err != nil {
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000408 fmt.Println("Failed to run gRPC server: ", err)
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000409 return err
410 }
411 return nil
412
413}
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000414func (s *Server) kafkaCloseProducer() {
415 if err := s.dataproducer.Close(); err != nil {
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000416 panic(err)
417 }
418
419}
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000420func (s *Server) kafkaInit() {
421 fmt.Println("Starting kafka init to Connect to broker: ")
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000422 config := sarama.NewConfig()
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000423 config.Producer.RequiredAcks = sarama.WaitForAll
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000424 config.Producer.Retry.Max = 10
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000425 producer, err := sarama.NewAsyncProducer([]string{"cord-kafka.default.svc.cluster.local:9092"}, config)
426 if err != nil {
427 panic(err)
428 }
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000429 s.dataproducer = producer
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000430}
431
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000432func (s *Server) handle_events(w http.ResponseWriter, r *http.Request) {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000433 signals := make(chan os.Signal, 1)
434 signal.Notify(signals, os.Interrupt)
435
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000436 fmt.Println(" IN Handle Event ")
437 if r.Method == "POST" {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000438 Body, err := ioutil.ReadAll(r.Body)
439 if err != nil {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000440 fmt.Println("Error getting HTTP data", err)
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000441 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000442 defer r.Body.Close()
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000443 fmt.Println("Received Event Message ")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000444 fmt.Printf("%s\n", Body)
445 message := &sarama.ProducerMessage{
446 Topic: importerTopic,
447 Value: sarama.StringEncoder(Body),
448 }
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000449 s.dataproducer.Input() <- message
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000450 }
451}
452
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000453func (s *Server) runServer() {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000454 fmt.Println("Starting HTTP Server")
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000455 http.HandleFunc("/", s.handle_events)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000456 http.ListenAndServeTLS(":8080", "https-server.crt", "https-server.key", nil)
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000457}
458
mccd7e9502019-12-16 22:04:13 +0000459/* validate_ip() verifies if the ip and port are valid and already registered then return the truth value of the desired state specified by the following 2 switches,
460 want_registered: 'true' if the fact of an ip is registered is the desired state
461 include_port: 'true' further checks if <ip>:<port#> does exist in the devicemap in case an ip is found registered
462*/
463func (s *Server) validate_ip(ip_address string, want_registered bool, include_port bool) (msg string, ok bool) {
464 msg = ""
465 ok = false
466 if !strings.Contains(ip_address, ":") {
467 fmt.Printf("Incorrect IP address %s, expected format <ip>:<port #>", ip_address)
468 msg = "Incorrect IP address format (<ip>:<port #>)\n"
469 return
470 }
471 splits := strings.Split(ip_address, ":")
472 ip, port := splits[0], splits[1]
473 if net.ParseIP(ip) == nil {
474 fmt.Printf("Invalid IP address %s", ip)
475 msg = "Invalid IP address " + ip + "\n"
476 return
477 }
478 if _, err := strconv.Atoi(port); err != nil {
479 fmt.Printf("Port # %s is not an integer", port)
480 msg = "Port # " + port + " needs to be an integer\n"
481 return
482 }
483 for k := range s.devicemap {
484 if strings.HasPrefix(k, ip) {
485 if !want_registered {
486 fmt.Printf("Device ip %s already registered", ip)
487 msg = "Device ip " + ip + " already registered\n"
488 return
489 } else if include_port {
490 if _, found := s.devicemap[ip_address]; found {
491 ok = true
492 return
493 } else {
494 fmt.Printf("Device %s not registered", ip_address)
495 msg = "Device " + ip_address + " not registered\n"
496 return
497 }
498 } else {
499 ok = true
500 return
501 }
502 }
503 }
504 if want_registered {
505 fmt.Printf("Device %s not registered", ip_address)
506 msg = "Device " + ip_address + " not registered\n"
507 return
508 }
509 ok = true
510 return
511}
512
mc862dad02019-08-06 20:52:51 +0000513func (s *Server) init_data_persistence() {
mc20a4b5f2019-10-16 20:28:24 +0000514 fmt.Println("Retrieving persisted data")
mc862dad02019-08-06 20:52:51 +0000515 subscriptionListPath = pvmount + "/subscriptions"
516 if err := os.MkdirAll(subscriptionListPath, 0777); err != nil {
517 fmt.Println(err)
518 } else {
mc20a4b5f2019-10-16 20:28:24 +0000519 files, err := ioutil.ReadDir(subscriptionListPath)
mc862dad02019-08-06 20:52:51 +0000520 if err != nil {
521 fmt.Println(err)
522 } else {
mc20a4b5f2019-10-16 20:28:24 +0000523 for _, f := range files {
524 b, err := ioutil.ReadFile(path.Join(subscriptionListPath, f.Name()))
mc862dad02019-08-06 20:52:51 +0000525 if err != nil {
526 fmt.Println(err)
mc20a4b5f2019-10-16 20:28:24 +0000527 } else if f.Size() > 0 {
528 ip := f.Name()
mc862dad02019-08-06 20:52:51 +0000529 d := device{}
530 json.Unmarshal(b, &d)
531 s.devicemap[ip] = &d
mc20a4b5f2019-10-16 20:28:24 +0000532 freq := s.devicemap[ip].Freq
533
534 /* if initial interval is 0, create a dummy ticker, which is stopped right away, so getdata is not nil */
535 if freq == 0 {
536 freq = RF_DATA_COLLECT_DUMMY_INTERVAL
537 }
538 s.devicemap[ip].Datacollector.getdata = time.NewTicker(time.Duration(freq) * time.Second)
539 if s.devicemap[ip].Freq == 0 {
540 s.devicemap[ip].Datacollector.getdata.Stop()
541 }
542
mc862dad02019-08-06 20:52:51 +0000543 s.devicemap[ip].Datacollector.quit = make(chan bool)
mc20a4b5f2019-10-16 20:28:24 +0000544 s.devicemap[ip].Datacollector.getdataend = make(chan bool)
mc862dad02019-08-06 20:52:51 +0000545 s.devicemap[ip].Freqchan = make(chan uint32)
mc20a4b5f2019-10-16 20:28:24 +0000546 s.devicemap[ip].Datafile = get_data_file(ip)
mc862dad02019-08-06 20:52:51 +0000547 go s.collect_data(ip)
548 }
549 }
550 }
551 }
552}
553
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000554func init() {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000555 Formatter := new(log.TextFormatter)
556 Formatter.TimestampFormat = "02-01-2006 15:04:05"
557 Formatter.FullTimestamp = true
558 log.SetFormatter(Formatter)
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000559 fmt.Println("Connecting to broker: ")
560 fmt.Println("Listening to http server")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000561 log.Info("log Connecting to broker:")
562 log.Info("log Listening to http server ")
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000563 //sarama.Logger = log.New()
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000564}
565
mc20a4b5f2019-10-16 20:28:24 +0000566func get_data_file(ip string) *os.File {
mc862dad02019-08-06 20:52:51 +0000567 if pvmount == "" {
568 return nil
569 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000570 f, err := os.OpenFile(subscriptionListPath+"/"+ip, os.O_CREATE|os.O_RDWR, 0664)
mc862dad02019-08-06 20:52:51 +0000571 if err != nil {
572 fmt.Println(err)
573 }
574 return f
575}
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000576
mc20a4b5f2019-10-16 20:28:24 +0000577func (s *Server) close_data_files() {
578 for ip, _ := range s.devicemap {
579 s.devicemap[ip].Datafile.Close()
580 }
581}
582
Dinesh Belwalkar01217962019-05-23 21:51:16 +0000583func main() {
584 fmt.Println("Starting Device-management Container")
mc862dad02019-08-06 20:52:51 +0000585
586 http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
587 client := &http.Client{
588 Timeout: 10 * time.Second,
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000589 }
mc862dad02019-08-06 20:52:51 +0000590
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000591 s := Server{
592 devicemap: make(map[string]*device),
593 httpclient: client,
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000594 }
mc862dad02019-08-06 20:52:51 +0000595
596 s.kafkaInit()
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000597 go s.runServer()
598 go s.startgrpcserver()
mc862dad02019-08-06 20:52:51 +0000599
600 if pvmount != "" {
601 s.init_data_persistence()
602 }
603
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +0000604 quit := make(chan os.Signal)
605 signal.Notify(quit, os.Interrupt)
606
607 select {
608 case sig := <-quit:
609 fmt.Println("Shutting down:", sig)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000610 s.kafkaCloseProducer()
mc20a4b5f2019-10-16 20:28:24 +0000611 s.close_data_files()
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +0000612 }
Dinesh Belwalkar01217962019-05-23 21:51:16 +0000613}