SEBA-621 scheduler to collect data for Importer
Change-Id: Idf5f34ad5e99028648fefbe0605a9c78d91d2312
diff --git a/main.go b/main.go
index 771039e..9a09388 100644
--- a/main.go
+++ b/main.go
@@ -27,8 +27,13 @@
"crypto/tls"
empty "github.com/golang/protobuf/ptypes/empty"
importer "./proto"
+ "time"
)
+//globals
+const REDFISH_ROOT = "/redfish/v1"
+const CONTENT_TYPE = "application/json"
+
var (
importerTopic = "importer"
@@ -38,9 +43,17 @@
var default_events = [...]string{"ResourceAdded","ResourceRemoved","Alert"}
+var redfish_services = [...]string{"/Chassis", "/Systems","/EthernetSwitches"}
+
+type scheduler struct {
+ getdata time.Ticker
+ quit chan bool
+}
+
type device struct {
subscriptions map[string]uint
- freq uint32
+ datacollector scheduler
+ freqchan chan uint32
}
type Server struct {
@@ -48,39 +61,34 @@
gRPCserver *grpc.Server
dataproducer sarama.AsyncProducer
devicechan chan *importer.DeviceInfo
- freqchan chan uint32
}
-
func (s *Server) GetEventList(c context.Context, info *importer.DeviceInfo) (*importer.EventList, error) {
- fmt.Println("Received GetEventList\n")
- eventstobesubscribed:= new(importer.EventList)
- eventstobesubscribed.EventIpAddress = info.IpAddress
- eventstobesubscribed.Events = append(eventstobesubscribed.Events,"ResourceAdded","ResourceRemoved","Alert")
- return eventstobesubscribed, nil
+ fmt.Println("Received GetEventList\n")
+ eventstobesubscribed:= new(importer.EventList)
+ eventstobesubscribed.EventIpAddress = info.IpAddress
+ eventstobesubscribed.Events = append(eventstobesubscribed.Events,"ResourceAdded","ResourceRemoved","Alert")
+ return eventstobesubscribed, nil
}
func (s *Server) SetFrequency(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
- fmt.Println("Received SetFrequency\n")
- device := s.devicemap[info.IpAddress]
- device.freq = info.Frequency
- //Inform scheduler frquency has changed
- s.freqchan <- info.Frequency
- return &empty.Empty{}, nil
+ fmt.Println("Received SetFrequency")
+ s.devicemap[info.IpAddress].freqchan <- info.Frequency
+ return &empty.Empty{}, nil
}
func (s *Server) SubsrcribeGivenEvents(c context.Context, subeventlist *importer.EventList) (*empty.Empty, error) {
- fmt.Println("Received SubsrcribeEvents\n")
- //Call API to subscribe events
- ip_address := subeventlist.EventIpAddress
- for _, event := range subeventlist.Events {
- rtn, id := add_subscription(ip_address, event)
- if rtn {
- s.devicemap[ip_address].subscriptions[event] = id
- fmt.Println("subscription added", event, id)
- }
- }
- return &empty.Empty{}, nil
+ fmt.Println("Received SubsrcribeEvents\n")
+ //Call API to subscribe events
+ ip_address := subeventlist.EventIpAddress
+ for _, event := range subeventlist.Events {
+ rtn, id := add_subscription(ip_address, event)
+ if rtn {
+ s.devicemap[ip_address].subscriptions[event] = id
+ fmt.Println("subscription added", event, id)
+ }
+ }
+ return &empty.Empty{}, nil
}
func (s *Server) UnSubsrcribeGivenEvents(c context.Context, unsubeventlist *importer.EventList) (*empty.Empty, error) {
@@ -98,11 +106,42 @@
return &empty.Empty{}, nil
}
+func (s *Server) collect_data(ip_address string) {
+ freqchan := s.devicemap[ip_address].freqchan
+ ticker := s.devicemap[ip_address].datacollector.getdata
+ donechan := s.devicemap[ip_address].datacollector.quit
+ for {
+ select {
+ case freq := <- freqchan:
+ ticker.Stop()
+ ticker = *time.NewTicker(time.Duration(freq) * time.Second)
+ case <-ticker.C:
+ for _, service := range redfish_services {
+ rtn, data := get_status(ip_address, service)
+ if rtn {
+ for _, str := range data {
+ str = "Device IP: " + ip_address + " " + str
+ b := []byte(str)
+ s.dataproducer.Input() <- &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
+ }
+ }
+ }
+ case <-donechan:
+ ticker.Stop()
+ fmt.Println("getdata ticker stopped")
+ return
+ }
+ }
+}
func (s *Server) SendDeviceInfo(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
d := device {
subscriptions: make(map[string]uint),
- freq: info.Frequency,
+ datacollector: scheduler{
+ getdata: *time.NewTicker(time.Duration(info.Frequency) * time.Second),
+ quit: make(chan bool),
+ },
+ freqchan: make(chan uint32),
}
s.devicemap[info.IpAddress] = &d
ip_address:= info.IpAddress
@@ -116,6 +155,7 @@
fmt.Println("subscription added", event, id)
}
}
+ go s.collect_data(ip_address)
return &empty.Empty{}, nil
}
@@ -201,7 +241,6 @@
s := Server {
devicemap: make(map[string]*device),
devicechan: make(chan *importer.DeviceInfo),
- freqchan: make(chan uint32),
}
go s.kafkaInit()
go s.runServer()