SEBA-621 scheduler to collect data for Importer

Change-Id: Idf5f34ad5e99028648fefbe0605a9c78d91d2312
diff --git a/main.go b/main.go
index 771039e..9a09388 100644
--- a/main.go
+++ b/main.go
@@ -27,8 +27,13 @@
 	"crypto/tls"
 	empty "github.com/golang/protobuf/ptypes/empty"
         importer "./proto"
+	"time"
 )
 
+//globals
+const REDFISH_ROOT = "/redfish/v1"
+const CONTENT_TYPE = "application/json"
+
 var (
 	importerTopic = "importer"
 
@@ -38,9 +43,17 @@
 
 var default_events = [...]string{"ResourceAdded","ResourceRemoved","Alert"}
 
+var redfish_services = [...]string{"/Chassis", "/Systems","/EthernetSwitches"}
+
+type scheduler struct  {
+	getdata time.Ticker
+	quit chan bool
+}
+
 type device struct  {
 	subscriptions map[string]uint
-	freq uint32
+	datacollector scheduler
+	freqchan   chan uint32
 }
 
 type Server struct {
@@ -48,39 +61,34 @@
 	gRPCserver   *grpc.Server
 	dataproducer sarama.AsyncProducer
 	devicechan   chan *importer.DeviceInfo
-	freqchan   chan uint32
 }
 
-
 func (s *Server) GetEventList(c context.Context, info *importer.DeviceInfo) (*importer.EventList, error) {
-        fmt.Println("Received GetEventList\n")
-       eventstobesubscribed:= new(importer.EventList)
-       eventstobesubscribed.EventIpAddress = info.IpAddress
-       eventstobesubscribed.Events = append(eventstobesubscribed.Events,"ResourceAdded","ResourceRemoved","Alert")
-       return eventstobesubscribed, nil
+	fmt.Println("Received GetEventList\n")
+	eventstobesubscribed:= new(importer.EventList)
+	eventstobesubscribed.EventIpAddress = info.IpAddress
+	eventstobesubscribed.Events = append(eventstobesubscribed.Events,"ResourceAdded","ResourceRemoved","Alert")
+	return eventstobesubscribed, nil
 }
 
 func (s *Server) SetFrequency(c context.Context,  info *importer.DeviceInfo) (*empty.Empty, error) {
-        fmt.Println("Received SetFrequency\n")
-       device := s.devicemap[info.IpAddress]
-       device.freq = info.Frequency
-       //Inform scheduler frquency has changed
-       s.freqchan <- info.Frequency
-       return &empty.Empty{}, nil
+	fmt.Println("Received SetFrequency")
+	s.devicemap[info.IpAddress].freqchan <- info.Frequency
+	return &empty.Empty{}, nil
 }
 
 func (s *Server) SubsrcribeGivenEvents(c context.Context,  subeventlist *importer.EventList) (*empty.Empty, error) {
-        fmt.Println("Received SubsrcribeEvents\n")
-       //Call API to subscribe events
-       ip_address := subeventlist.EventIpAddress
-        for _, event := range subeventlist.Events {
-                rtn, id := add_subscription(ip_address, event)
-                if rtn {
-                        s.devicemap[ip_address].subscriptions[event] = id
-                        fmt.Println("subscription added", event, id)
-                }
-        }
-       return &empty.Empty{}, nil
+	fmt.Println("Received SubsrcribeEvents\n")
+	//Call API to subscribe events
+	ip_address := subeventlist.EventIpAddress
+	for _, event := range subeventlist.Events {
+		rtn, id := add_subscription(ip_address, event)
+		if rtn {
+			s.devicemap[ip_address].subscriptions[event] = id
+			fmt.Println("subscription added", event, id)
+		}
+	}
+	return &empty.Empty{}, nil
 }
 
 func (s *Server) UnSubsrcribeGivenEvents(c context.Context,  unsubeventlist *importer.EventList) (*empty.Empty, error) {
@@ -98,11 +106,42 @@
        return &empty.Empty{}, nil
 }
 
+func (s *Server) collect_data(ip_address string) {
+	freqchan := s.devicemap[ip_address].freqchan
+	ticker := s.devicemap[ip_address].datacollector.getdata
+	donechan := s.devicemap[ip_address].datacollector.quit
+	for {
+		select {
+		case freq := <- freqchan:
+			ticker.Stop()
+			ticker = *time.NewTicker(time.Duration(freq) * time.Second)
+		case <-ticker.C:
+			for _, service := range redfish_services {
+				rtn, data := get_status(ip_address, service)
+				if rtn {
+					for _, str := range data {
+						str = "Device IP: " + ip_address + " " + str
+						b := []byte(str)
+						s.dataproducer.Input() <- &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
+					}
+				}
+			}
+		case <-donechan:
+			ticker.Stop()
+			fmt.Println("getdata ticker stopped")
+			return
+		}
+	}
+}
 
 func (s *Server) SendDeviceInfo(c context.Context,  info *importer.DeviceInfo) (*empty.Empty, error) {
 	d := device {
 		subscriptions: make(map[string]uint),
-		freq:	info.Frequency,
+		datacollector: scheduler{
+			getdata: *time.NewTicker(time.Duration(info.Frequency) * time.Second),
+			quit: make(chan bool),
+		},
+		freqchan:        make(chan uint32),
 	}
 	s.devicemap[info.IpAddress] = &d
 	ip_address:= info.IpAddress
@@ -116,6 +155,7 @@
 			fmt.Println("subscription added", event, id)
 		}
 	}
+	go s.collect_data(ip_address)
 	return &empty.Empty{}, nil
 }
 
@@ -201,7 +241,6 @@
 	s := Server {
 		devicemap:	make(map[string]*device),
 		devicechan:	make(chan *importer.DeviceInfo),
-		freqchan:        make(chan uint32),
 	}
 	go s.kafkaInit()
 	go s.runServer()