blob: c847f535540a0160e954867562eb50d99aa569d9 [file] [log] [blame]
Dinesh Belwalkar6c0bc752020-04-24 23:47:53 +00001// Copyright 2018-present Open Networking Foundation
2// Copyright 2018-present Edgecore Networks Corporation
Dinesh Belwalkar01217962019-05-23 21:51:16 +00003//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16package main
17
18import (
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000019 "crypto/tls"
20 "encoding/json"
Dinesh Belwalkar01217962019-05-23 21:51:16 +000021 "fmt"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000022 "github.com/Shopify/sarama"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000023 empty "github.com/golang/protobuf/ptypes/empty"
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +000024 "github.com/opencord/device-management/proto"
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +000025 logrus "github.com/sirupsen/logrus"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000026 "golang.org/x/net/context"
27 "google.golang.org/grpc"
28 "google.golang.org/grpc/codes"
29 "google.golang.org/grpc/status"
30 "io/ioutil"
Dinesh Belwalkar41229602019-06-21 16:58:06 +000031 "net"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000032 "net/http"
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +000033 "os"
34 "os/signal"
mc862dad02019-08-06 20:52:51 +000035 "path"
mccd7e9502019-12-16 22:04:13 +000036 "strconv"
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +000037 "strings"
38 "time"
Dinesh Belwalkar01217962019-05-23 21:51:16 +000039)
40
mce7028402019-07-18 04:10:01 +000041//globals
mc20a4b5f2019-10-16 20:28:24 +000042const RF_DEFAULT_PROTOCOL = "https://"
43const RF_DATA_COLLECT_THRESHOLD = 5
44const RF_DATA_COLLECT_DUMMY_INTERVAL = 1000
mce7028402019-07-18 04:10:01 +000045const CONTENT_TYPE = "application/json"
46
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000047var (
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000048 importerTopic = "importer"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000049)
50
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +000051var redfish_resources = [...]string{"/redfish/v1/Chassis", "/redfish/v1/Systems", "/redfish/v1/EthernetSwitches"}
mc862dad02019-08-06 20:52:51 +000052var pvmount = os.Getenv("DEVICE_MANAGEMENT_PVMOUNT")
53var subscriptionListPath string
mce7028402019-07-18 04:10:01 +000054
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +000055type scheduler struct {
56 getdata *time.Ticker
57 quit chan bool
mc20a4b5f2019-10-16 20:28:24 +000058 getdataend chan bool
mce7028402019-07-18 04:10:01 +000059}
60
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +000061type device struct {
62 Subscriptions map[string]string `json:"ss"`
63 Freq uint32 `json:"freq"`
64 Datacollector scheduler `json:"-"`
65 Freqchan chan uint32 `json:"-"`
66 Eventtypes []string `json:"eventtypes"`
67 Datafile *os.File `json:"-"`
Dinesh Belwalkar41229602019-06-21 16:58:06 +000068}
69
70type Server struct {
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +000071 devicemap map[string]*device
72 gRPCserver *grpc.Server
73 dataproducer sarama.AsyncProducer
74 httpclient *http.Client
Dinesh Belwalkar41229602019-06-21 16:58:06 +000075}
76
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000077func (s *Server) ClearCurrentEventList(c context.Context, info *importer.Device) (*empty.Empty, error) {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +000078 logrus.Info("Received ClearCurrentEventList")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000079 ip_address := info.IpAddress
mccd7e9502019-12-16 22:04:13 +000080 if msg, ok := s.validate_ip(ip_address, true, true); !ok {
81 return nil, status.Errorf(codes.InvalidArgument, msg)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000082 }
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +000083 for event := range s.devicemap[ip_address].Subscriptions {
mc20a4b5f2019-10-16 20:28:24 +000084 rtn := s.remove_subscription(ip_address, event)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000085 if !rtn {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +000086 logrus.WithFields(logrus.Fields{
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000087 "Event": event,
88 }).Info("Error removing event")
89 }
90 }
mc20a4b5f2019-10-16 20:28:24 +000091 s.update_data_file(ip_address)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000092 return &empty.Empty{}, nil
93}
94
95func (s *Server) GetCurrentEventList(c context.Context, info *importer.Device) (*importer.EventList, error) {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +000096 logrus.Info("Received GetCurrentEventList")
mccd7e9502019-12-16 22:04:13 +000097 if msg, ok := s.validate_ip(info.IpAddress, true, true); !ok {
98 return nil, status.Errorf(codes.InvalidArgument, msg)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000099 }
100 currentevents := new(importer.EventList)
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000101 for event := range s.devicemap[info.IpAddress].Subscriptions {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000102 currentevents.Events = append(currentevents.Events, event)
103 }
104 return currentevents, nil
105}
106
mc20a4b5f2019-10-16 20:28:24 +0000107func (s *Server) GetEventList(c context.Context, info *importer.Device) (*importer.EventList, error) {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000108 logrus.Info("Received GetEventList")
mccd7e9502019-12-16 22:04:13 +0000109 if msg, ok := s.validate_ip(info.IpAddress, true, true); !ok {
110 return nil, status.Errorf(codes.InvalidArgument, msg)
111 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000112 eventstobesubscribed := new(importer.EventList)
mccd7e9502019-12-16 22:04:13 +0000113 eventstobesubscribed.Events = s.devicemap[info.IpAddress].Eventtypes
mc20a4b5f2019-10-16 20:28:24 +0000114 if eventstobesubscribed.Events == nil {
mccd7e9502019-12-16 22:04:13 +0000115 return nil, status.Errorf(codes.NotFound, "No events found\n")
mc20a4b5f2019-10-16 20:28:24 +0000116 }
mce7028402019-07-18 04:10:01 +0000117 return eventstobesubscribed, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000118}
119
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000120func (s *Server) SetFrequency(c context.Context, info *importer.FreqInfo) (*empty.Empty, error) {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000121 logrus.Info("Received SetFrequency")
mccd7e9502019-12-16 22:04:13 +0000122 if msg, ok := s.validate_ip(info.IpAddress, true, true); !ok {
123 return nil, status.Errorf(codes.InvalidArgument, msg)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000124 }
mc20a4b5f2019-10-16 20:28:24 +0000125 if info.Frequency > 0 && info.Frequency < RF_DATA_COLLECT_THRESHOLD {
mccd7e9502019-12-16 22:04:13 +0000126 return nil, status.Errorf(codes.InvalidArgument, "Invalid interval\n")
mc20a4b5f2019-10-16 20:28:24 +0000127 }
mc862dad02019-08-06 20:52:51 +0000128 s.devicemap[info.IpAddress].Freqchan <- info.Frequency
mc20a4b5f2019-10-16 20:28:24 +0000129 s.devicemap[info.IpAddress].Freq = info.Frequency
130 s.update_data_file(info.IpAddress)
mce7028402019-07-18 04:10:01 +0000131 return &empty.Empty{}, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000132}
133
mccd7e9502019-12-16 22:04:13 +0000134func (s *Server) SubscribeGivenEvents(c context.Context, subeventlist *importer.GivenEventList) (*empty.Empty, error) {
mc20a4b5f2019-10-16 20:28:24 +0000135 errstring := ""
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000136 logrus.Info("Received SubsrcribeEvents")
mce7028402019-07-18 04:10:01 +0000137 //Call API to subscribe events
138 ip_address := subeventlist.EventIpAddress
mccd7e9502019-12-16 22:04:13 +0000139 if msg, ok := s.validate_ip(ip_address, true, true); !ok {
140 return nil, status.Errorf(codes.InvalidArgument, msg)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000141 }
142 if len(subeventlist.Events) <= 0 {
mccd7e9502019-12-16 22:04:13 +0000143 return nil, status.Errorf(codes.InvalidArgument, "Event list is empty\n")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000144 }
mce7028402019-07-18 04:10:01 +0000145 for _, event := range subeventlist.Events {
mc862dad02019-08-06 20:52:51 +0000146 if _, ok := s.devicemap[ip_address].Subscriptions[event]; !ok {
mccd7e9502019-12-16 22:04:13 +0000147 supported := false
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000148 for _, e := range s.devicemap[ip_address].Eventtypes {
mccd7e9502019-12-16 22:04:13 +0000149 if e == event {
150 supported = true
151 rtn := s.add_subscription(ip_address, event)
152 if !rtn {
153 errstring = errstring + "failed to subscribe event " + ip_address + " " + event + "\n"
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000154 logrus.WithFields(logrus.Fields{
mccd7e9502019-12-16 22:04:13 +0000155 "Event": event,
156 }).Info("Error adding event")
157 }
158 break
159 }
160 }
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000161 if !supported {
mccd7e9502019-12-16 22:04:13 +0000162 errstring = errstring + "event " + event + " not supported\n"
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000163 logrus.WithFields(logrus.Fields{
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000164 "Event": event,
mccd7e9502019-12-16 22:04:13 +0000165 }).Info("not supported")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000166 }
mc862dad02019-08-06 20:52:51 +0000167 } else {
mc20a4b5f2019-10-16 20:28:24 +0000168 errstring = errstring + "event " + event + " already subscribed\n"
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000169 logrus.WithFields(logrus.Fields{
mc862dad02019-08-06 20:52:51 +0000170 "Event": event,
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000171 }).Info("Already Subscribed")
mc862dad02019-08-06 20:52:51 +0000172 }
mce7028402019-07-18 04:10:01 +0000173 }
mc20a4b5f2019-10-16 20:28:24 +0000174 s.update_data_file(ip_address)
175 if errstring != "" {
176 return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000177 }
mce7028402019-07-18 04:10:01 +0000178 return &empty.Empty{}, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000179}
180
mccd7e9502019-12-16 22:04:13 +0000181func (s *Server) UnsubscribeGivenEvents(c context.Context, unsubeventlist *importer.GivenEventList) (*empty.Empty, error) {
mc20a4b5f2019-10-16 20:28:24 +0000182 errstring := ""
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000183 logrus.Info("Received UnSubsrcribeEvents")
mc862dad02019-08-06 20:52:51 +0000184 ip_address := unsubeventlist.EventIpAddress
mccd7e9502019-12-16 22:04:13 +0000185 if msg, ok := s.validate_ip(ip_address, true, true); !ok {
186 return nil, status.Errorf(codes.InvalidArgument, msg)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000187 }
188
189 if len(unsubeventlist.Events) <= 0 {
mccd7e9502019-12-16 22:04:13 +0000190 return nil, status.Errorf(codes.InvalidArgument, "Event list is empty\n")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000191 }
mc862dad02019-08-06 20:52:51 +0000192 //Call API to unsubscribe events
mc862dad02019-08-06 20:52:51 +0000193 for _, event := range unsubeventlist.Events {
194 if _, ok := s.devicemap[ip_address].Subscriptions[event]; ok {
mc20a4b5f2019-10-16 20:28:24 +0000195 rtn := s.remove_subscription(ip_address, event)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000196 if !rtn {
mc20a4b5f2019-10-16 20:28:24 +0000197 errstring = errstring + "failed to unsubscribe event " + ip_address + " " + event + "\n"
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000198 logrus.WithFields(logrus.Fields{
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000199 "Event": event,
200 }).Info("Error removing event")
201 }
202 } else {
mc20a4b5f2019-10-16 20:28:24 +0000203 errstring = errstring + "event " + event + " not found\n"
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000204 logrus.WithFields(logrus.Fields{
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000205 "Event": event,
206 }).Info("was not Subscribed")
207 }
208 }
mc20a4b5f2019-10-16 20:28:24 +0000209 s.update_data_file(ip_address)
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000210
mc20a4b5f2019-10-16 20:28:24 +0000211 if errstring != "" {
212 return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
213 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000214 return &empty.Empty{}, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000215}
216
mc20a4b5f2019-10-16 20:28:24 +0000217func (s *Server) update_data_file(ip_address string) {
218 f := s.devicemap[ip_address].Datafile
219 if f != nil {
220 b, err := json.Marshal(s.devicemap[ip_address])
221 if err != nil {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000222 logrus.Errorf("Update_data_file %s", err)
mc20a4b5f2019-10-16 20:28:24 +0000223 } else {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000224 err := f.Truncate(0)
225 if err != nil {
226 logrus.Errorf("err Trunate %s", err)
227 return
228 }
229 pos, err := f.Seek(0, 0)
230 if err != nil {
231 logrus.Errorf("err Seek %s", err)
232 return
233 }
234 fmt.Println("moved back to", pos)
mc20a4b5f2019-10-16 20:28:24 +0000235 n, err := f.Write(b)
236 if err != nil {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000237 logrus.Errorf("err wrote %d bytes", n)
238 logrus.Errorf("write error to file %s", err)
mc20a4b5f2019-10-16 20:28:24 +0000239 }
240 }
241 } else {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000242 logrus.Errorf("file handle is nil %s", ip_address)
mc20a4b5f2019-10-16 20:28:24 +0000243 }
244}
245
mce7028402019-07-18 04:10:01 +0000246func (s *Server) collect_data(ip_address string) {
mc862dad02019-08-06 20:52:51 +0000247 freqchan := s.devicemap[ip_address].Freqchan
248 ticker := s.devicemap[ip_address].Datacollector.getdata
249 donechan := s.devicemap[ip_address].Datacollector.quit
mce7028402019-07-18 04:10:01 +0000250 for {
251 select {
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000252 case freq := <-freqchan:
mce7028402019-07-18 04:10:01 +0000253 ticker.Stop()
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000254 if freq > 0 {
255 ticker = time.NewTicker(time.Duration(freq) * time.Second)
mc20a4b5f2019-10-16 20:28:24 +0000256 s.devicemap[ip_address].Datacollector.getdata = ticker
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000257 }
258 case err := <-s.dataproducer.Errors():
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000259 logrus.Errorf("Failed to produce message:%s", err)
mce7028402019-07-18 04:10:01 +0000260 case <-ticker.C:
mc20a4b5f2019-10-16 20:28:24 +0000261 for _, resource := range redfish_resources {
262 data := s.get_status(ip_address, resource)
263 for _, str := range data {
264 str = "Device IP: " + ip_address + " " + str
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000265 logrus.Infof("collected data %s", str)
mc20a4b5f2019-10-16 20:28:24 +0000266 b := []byte(str)
267 msg := &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000268 s.dataproducer.Input() <- msg
269 logrus.Info("Produce message")
mce7028402019-07-18 04:10:01 +0000270 }
271 }
272 case <-donechan:
273 ticker.Stop()
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000274 logrus.Info("getdata ticker stopped")
mc20a4b5f2019-10-16 20:28:24 +0000275 s.devicemap[ip_address].Datacollector.getdataend <- true
mce7028402019-07-18 04:10:01 +0000276 return
277 }
278 }
279}
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000280
mc20a4b5f2019-10-16 20:28:24 +0000281func (s *Server) DeleteDeviceList(c context.Context, list *importer.DeviceListByIp) (*empty.Empty, error) {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000282 logrus.Info("DeleteDeviceList received")
mc20a4b5f2019-10-16 20:28:24 +0000283 errstring := ""
284 for _, ip := range list.Ip {
285 if _, ok := s.devicemap[ip]; !ok {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000286 logrus.Infof("Device not found %s ", ip)
mc20a4b5f2019-10-16 20:28:24 +0000287 errstring = errstring + "Device " + ip + " not found\n"
288 continue
289 }
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000290 for event := range s.devicemap[ip].Subscriptions {
mc20a4b5f2019-10-16 20:28:24 +0000291 rtn := s.remove_subscription(ip, event)
292 if !rtn {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000293 logrus.WithFields(logrus.Fields{
mc20a4b5f2019-10-16 20:28:24 +0000294 "Event": event,
295 }).Info("Error removing event")
296 }
297 }
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000298 logrus.Infof("deleting device %s", ip)
mc20a4b5f2019-10-16 20:28:24 +0000299 s.devicemap[ip].Datacollector.quit <- true
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000300
mc20a4b5f2019-10-16 20:28:24 +0000301 f := s.devicemap[ip].Datafile
302 if f != nil {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000303 logrus.Infof("deleteing file %s", f.Name())
mc20a4b5f2019-10-16 20:28:24 +0000304 err := f.Close()
305 if err != nil {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000306 logrus.Errorf("error closing file %s %s", f.Name(), err)
mc20a4b5f2019-10-16 20:28:24 +0000307 errstring = errstring + "error closing file " + f.Name() + "\n"
308 }
309 err = os.Remove(f.Name())
310 if err != nil {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000311 logrus.Errorf("error deleting file %s Error:%s ", f.Name(), err)
mc20a4b5f2019-10-16 20:28:24 +0000312 }
313 } else {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000314 logrus.Errorf("File not found %s", errstring+"file "+ip+" not found")
mc20a4b5f2019-10-16 20:28:24 +0000315 }
316 <-s.devicemap[ip].Datacollector.getdataend
317 delete(s.devicemap, ip)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000318 }
mc20a4b5f2019-10-16 20:28:24 +0000319 if errstring != "" {
320 return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000321 }
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000322 return &empty.Empty{}, nil
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000323}
mc6a9f01a2019-06-26 21:31:23 +0000324
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000325func (s *Server) SendDeviceList(c context.Context, list *importer.DeviceList) (*empty.Empty, error) {
mc20a4b5f2019-10-16 20:28:24 +0000326 errstring := ""
327 for _, dev := range list.Device {
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000328 ip_address := dev.IpAddress
mccd7e9502019-12-16 22:04:13 +0000329 if msg, ok := s.validate_ip(ip_address, false, false); !ok {
330 errstring = errstring + msg
mc20a4b5f2019-10-16 20:28:24 +0000331 continue
332 }
mc20a4b5f2019-10-16 20:28:24 +0000333 if dev.Frequency > 0 && dev.Frequency < RF_DATA_COLLECT_THRESHOLD {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000334 logrus.Errorf("Device %s data collection interval %d out of range", ip_address, dev.Frequency)
mc20a4b5f2019-10-16 20:28:24 +0000335 errstring = errstring + "Device " + ip_address + " data collection frequency out of range\n"
336 continue
337 }
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000338 d := device{
mc20a4b5f2019-10-16 20:28:24 +0000339 Subscriptions: make(map[string]string),
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000340 Freq: dev.Frequency,
mc20a4b5f2019-10-16 20:28:24 +0000341 Datacollector: scheduler{
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000342 quit: make(chan bool),
mc20a4b5f2019-10-16 20:28:24 +0000343 getdataend: make(chan bool),
344 },
345 Freqchan: make(chan uint32),
346 }
347 s.devicemap[ip_address] = &d
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000348 logrus.Infof("Configuring %s", ip_address)
mc20a4b5f2019-10-16 20:28:24 +0000349
350 /* if initial interval is 0, create a dummy ticker, which is stopped right away, so getdata is not nil */
351 freq := dev.Frequency
352 if freq == 0 {
353 freq = RF_DATA_COLLECT_DUMMY_INTERVAL
354 }
355 s.devicemap[ip_address].Datacollector.getdata = time.NewTicker(time.Duration(freq) * time.Second)
356 if dev.Frequency == 0 {
357 s.devicemap[ip_address].Datacollector.getdata.Stop()
358 }
359
360 eventtypes := s.get_event_types(ip_address)
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000361 for _, event := range eventtypes {
362 s.devicemap[ip_address].Eventtypes = append(s.devicemap[ip_address].Eventtypes, event)
363 if !s.add_subscription(ip_address, event) {
364 errstring = errstring + "failed to subscribe event " + ip_address + " " + event + "\n"
mc20a4b5f2019-10-16 20:28:24 +0000365 }
366 }
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000367
mc20a4b5f2019-10-16 20:28:24 +0000368 go s.collect_data(ip_address)
369 s.devicemap[ip_address].Datafile = get_data_file(ip_address)
370 s.update_data_file(ip_address)
371 }
372 if errstring != "" {
373 return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
374 }
375 return &empty.Empty{}, nil
376}
377
378func (s *Server) GetCurrentDevices(c context.Context, e *importer.Empty) (*importer.DeviceListByIp, error) {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000379 logrus.Infof("In Received GetCurrentDevices")
dileepbk86ef0102019-11-13 00:08:33 +0000380
381 if len(s.devicemap) == 0 {
mccd7e9502019-12-16 22:04:13 +0000382 return nil, status.Errorf(codes.NotFound, "No Device found\n")
dileepbk86ef0102019-11-13 00:08:33 +0000383 }
mc20a4b5f2019-10-16 20:28:24 +0000384 dl := new(importer.DeviceListByIp)
dileepbk86ef0102019-11-13 00:08:33 +0000385 for k, v := range s.devicemap {
386 if v != nil {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000387 logrus.Infof("IpAdd[%s] \n", k)
dileepbk86ef0102019-11-13 00:08:33 +0000388 dl.Ip = append(dl.Ip, k)
389 }
390 }
391 return dl, nil
392}
393
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000394func NewGrpcServer(grpcport string) (l net.Listener, g *grpc.Server, e error) {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000395 logrus.Infof("Listening %s\n", grpcport)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000396 g = grpc.NewServer()
397 l, e = net.Listen("tcp", grpcport)
398 return
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000399}
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000400func (s *Server) startgrpcserver() {
401 logrus.Info("starting gRPC Server")
Scott Bakerbdb962b2020-04-03 10:53:36 -0700402 listener, gserver, err := NewGrpcServer(GlobalConfig.LocalGrpc)
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000403 if err != nil {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000404 logrus.Errorf("Failed to create gRPC server: %s ", err)
405 panic(err)
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000406 }
407 s.gRPCserver = gserver
408 importer.RegisterDeviceManagementServer(gserver, s)
409 if err := gserver.Serve(listener); err != nil {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000410 logrus.Errorf("Failed to run gRPC server: %s ", err)
411 panic(err)
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000412 }
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000413
414}
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000415func (s *Server) kafkaCloseProducer() {
416 if err := s.dataproducer.Close(); err != nil {
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000417 panic(err)
418 }
419
420}
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000421func (s *Server) kafkaInit() {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000422 logrus.Info("Starting kafka init to Connect to broker: ")
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000423 config := sarama.NewConfig()
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000424 config.Producer.RequiredAcks = sarama.WaitForAll
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000425 config.Producer.Retry.Max = 10
Scott Bakerbdb962b2020-04-03 10:53:36 -0700426 producer, err := sarama.NewAsyncProducer([]string{GlobalConfig.Kafka}, config)
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000427 if err != nil {
428 panic(err)
429 }
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000430 s.dataproducer = producer
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000431}
432
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000433func (s *Server) handle_events(w http.ResponseWriter, r *http.Request) {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000434 signals := make(chan os.Signal, 1)
435 signal.Notify(signals, os.Interrupt)
436
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000437 logrus.Info(" IN Handle Event ")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000438 if r.Method == "POST" {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000439 Body, err := ioutil.ReadAll(r.Body)
440 if err != nil {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000441 logrus.Errorf("Error getting HTTP data %s", err)
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000442 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000443 defer r.Body.Close()
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000444 logrus.Info("Received Event Message ")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000445 fmt.Printf("%s\n", Body)
446 message := &sarama.ProducerMessage{
447 Topic: importerTopic,
448 Value: sarama.StringEncoder(Body),
449 }
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000450 s.dataproducer.Input() <- message
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000451 }
452}
453
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000454func (s *Server) runServer() {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000455 logrus.Info("Starting HTTP Server")
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000456 http.HandleFunc("/", s.handle_events)
Scott Bakerbdb962b2020-04-03 10:53:36 -0700457 err := http.ListenAndServeTLS(GlobalConfig.Local, "https-server.crt", "https-server.key", nil)
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000458 if err != nil {
459 panic(err)
460 }
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000461}
462
mccd7e9502019-12-16 22:04:13 +0000463/* validate_ip() verifies if the ip and port are valid and already registered then return the truth value of the desired state specified by the following 2 switches,
464 want_registered: 'true' if the fact of an ip is registered is the desired state
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000465 include_port: 'true' further checks if <ip>:<port#> does exist in the devicemap in case an ip is found registered
mccd7e9502019-12-16 22:04:13 +0000466*/
467func (s *Server) validate_ip(ip_address string, want_registered bool, include_port bool) (msg string, ok bool) {
468 msg = ""
469 ok = false
470 if !strings.Contains(ip_address, ":") {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000471 logrus.Errorf("Incorrect IP address %s, expected format <ip>:<port #>", ip_address)
mccd7e9502019-12-16 22:04:13 +0000472 msg = "Incorrect IP address format (<ip>:<port #>)\n"
473 return
474 }
475 splits := strings.Split(ip_address, ":")
476 ip, port := splits[0], splits[1]
477 if net.ParseIP(ip) == nil {
Scott Bakerbdb962b2020-04-03 10:53:36 -0700478 // also check to see if it's a valid hostname
479 if _, err := net.LookupIP(ip); err != nil {
480 logrus.Errorf("Invalid IP address %s", ip)
481 msg = "Invalid IP address " + ip + "\n"
482 return
483 }
mccd7e9502019-12-16 22:04:13 +0000484 }
485 if _, err := strconv.Atoi(port); err != nil {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000486 logrus.Errorf("Port # %s is not an integer", port)
mccd7e9502019-12-16 22:04:13 +0000487 msg = "Port # " + port + " needs to be an integer\n"
488 return
489 }
490 for k := range s.devicemap {
491 if strings.HasPrefix(k, ip) {
492 if !want_registered {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000493 logrus.Errorf("Device ip %s already registered", ip)
mccd7e9502019-12-16 22:04:13 +0000494 msg = "Device ip " + ip + " already registered\n"
495 return
496 } else if include_port {
497 if _, found := s.devicemap[ip_address]; found {
498 ok = true
499 return
500 } else {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000501 logrus.Errorf("Device %s not registered", ip_address)
mccd7e9502019-12-16 22:04:13 +0000502 msg = "Device " + ip_address + " not registered\n"
503 return
504 }
505 } else {
506 ok = true
507 return
508 }
509 }
510 }
511 if want_registered {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000512 logrus.Errorf("Device %s not registered", ip_address)
mccd7e9502019-12-16 22:04:13 +0000513 msg = "Device " + ip_address + " not registered\n"
514 return
515 }
516 ok = true
517 return
518}
519
mc862dad02019-08-06 20:52:51 +0000520func (s *Server) init_data_persistence() {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000521 logrus.Info("Retrieving persisted data")
mc862dad02019-08-06 20:52:51 +0000522 subscriptionListPath = pvmount + "/subscriptions"
523 if err := os.MkdirAll(subscriptionListPath, 0777); err != nil {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000524 logrus.Errorf("MkdirAll %s", err)
mc862dad02019-08-06 20:52:51 +0000525 } else {
mc20a4b5f2019-10-16 20:28:24 +0000526 files, err := ioutil.ReadDir(subscriptionListPath)
mc862dad02019-08-06 20:52:51 +0000527 if err != nil {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000528 logrus.Errorf("ReadDir %s", err)
mc862dad02019-08-06 20:52:51 +0000529 } else {
mc20a4b5f2019-10-16 20:28:24 +0000530 for _, f := range files {
531 b, err := ioutil.ReadFile(path.Join(subscriptionListPath, f.Name()))
mc862dad02019-08-06 20:52:51 +0000532 if err != nil {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000533 logrus.Errorf("Readfile %s", err)
mc20a4b5f2019-10-16 20:28:24 +0000534 } else if f.Size() > 0 {
535 ip := f.Name()
mc862dad02019-08-06 20:52:51 +0000536 d := device{}
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000537 err := json.Unmarshal(b, &d)
538 if err != nil {
539 logrus.Errorf("Unmarshal %s", err)
540 return
541 }
mc862dad02019-08-06 20:52:51 +0000542 s.devicemap[ip] = &d
mc20a4b5f2019-10-16 20:28:24 +0000543 freq := s.devicemap[ip].Freq
544
545 /* if initial interval is 0, create a dummy ticker, which is stopped right away, so getdata is not nil */
546 if freq == 0 {
547 freq = RF_DATA_COLLECT_DUMMY_INTERVAL
548 }
549 s.devicemap[ip].Datacollector.getdata = time.NewTicker(time.Duration(freq) * time.Second)
550 if s.devicemap[ip].Freq == 0 {
551 s.devicemap[ip].Datacollector.getdata.Stop()
552 }
553
mc862dad02019-08-06 20:52:51 +0000554 s.devicemap[ip].Datacollector.quit = make(chan bool)
mc20a4b5f2019-10-16 20:28:24 +0000555 s.devicemap[ip].Datacollector.getdataend = make(chan bool)
mc862dad02019-08-06 20:52:51 +0000556 s.devicemap[ip].Freqchan = make(chan uint32)
mc20a4b5f2019-10-16 20:28:24 +0000557 s.devicemap[ip].Datafile = get_data_file(ip)
mc862dad02019-08-06 20:52:51 +0000558 go s.collect_data(ip)
559 }
560 }
561 }
562 }
563}
564
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000565func init() {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000566 Formatter := new(logrus.TextFormatter)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000567 Formatter.TimestampFormat = "02-01-2006 15:04:05"
568 Formatter.FullTimestamp = true
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000569 logrus.SetFormatter(Formatter)
570 logrus.Info("log Connecting to broker:")
571 logrus.Info("log Listening to http server ")
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000572 //sarama.Logger = log.New()
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000573}
574
mc20a4b5f2019-10-16 20:28:24 +0000575func get_data_file(ip string) *os.File {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000576 logrus.Info("get_data_file")
mc862dad02019-08-06 20:52:51 +0000577 if pvmount == "" {
578 return nil
579 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000580 f, err := os.OpenFile(subscriptionListPath+"/"+ip, os.O_CREATE|os.O_RDWR, 0664)
mc862dad02019-08-06 20:52:51 +0000581 if err != nil {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000582 logrus.Errorf("Openfile err %s", err)
mc862dad02019-08-06 20:52:51 +0000583 }
584 return f
585}
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000586
mc20a4b5f2019-10-16 20:28:24 +0000587func (s *Server) close_data_files() {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000588 for ip := range s.devicemap {
mc20a4b5f2019-10-16 20:28:24 +0000589 s.devicemap[ip].Datafile.Close()
590 }
591}
592
Dinesh Belwalkar01217962019-05-23 21:51:16 +0000593func main() {
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000594 logrus.Info("Starting Device-management Container")
mc862dad02019-08-06 20:52:51 +0000595
Scott Bakerbdb962b2020-04-03 10:53:36 -0700596 ParseCommandLine()
597 ProcessGlobalOptions()
598 ShowGlobalOptions()
599
mc862dad02019-08-06 20:52:51 +0000600 http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
601 client := &http.Client{
602 Timeout: 10 * time.Second,
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000603 }
mc862dad02019-08-06 20:52:51 +0000604
Dinesh Belwalkar72ecafb2019-12-12 00:08:56 +0000605 s := Server{
606 devicemap: make(map[string]*device),
607 httpclient: client,
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000608 }
mc862dad02019-08-06 20:52:51 +0000609
610 s.kafkaInit()
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000611 go s.runServer()
612 go s.startgrpcserver()
mc862dad02019-08-06 20:52:51 +0000613
614 if pvmount != "" {
615 s.init_data_persistence()
616 }
617
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000618 quit := make(chan os.Signal, 10)
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +0000619 signal.Notify(quit, os.Interrupt)
620
Dinesh Belwalkara6ba07d2020-01-10 23:22:34 +0000621 sig := <-quit
622 logrus.Infof("Shutting down:%d", sig)
623 s.kafkaCloseProducer()
624 s.close_data_files()
Dinesh Belwalkar01217962019-05-23 21:51:16 +0000625}