SEBA-621 scheduler to collect data for Importer

Change-Id: Idf5f34ad5e99028648fefbe0605a9c78d91d2312
diff --git a/data_collector.go b/data_collector.go
new file mode 100644
index 0000000..b5b4d49
--- /dev/null
+++ b/data_collector.go
@@ -0,0 +1,62 @@
+// Copyright 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"net/http"
+	"fmt"
+	"encoding/json"
+	"regexp"
+	"strings"
+	"io/ioutil"
+)
+
+func get_status(ip string, service string) (rtn bool, data []string) {
+	rtn = false
+
+	uri := ip + REDFISH_ROOT + service
+	resp, err := http.Get(uri)
+	if err != nil {
+		fmt.Println(err)
+		return
+	}
+	body := make(map[string]interface{})
+	json.NewDecoder(resp.Body).Decode(&body)
+	resp.Body.Close()
+
+	if members, ok := body["Members"]; ok {
+		re := regexp.MustCompile(`\[([^\[\]]*)\]`)
+		memberstr := fmt.Sprintf("%v", members)
+		matches := re.FindAllString(memberstr, -1)
+		for _, match := range matches {
+			m := strings.Trim(match, "[]")
+			uri = ip + strings.TrimPrefix(m, "@odata.id:")
+			resp, err = http.Get(uri)
+		        if err != nil {
+			        fmt.Println(err)
+			} else {
+				b, err := ioutil.ReadAll(resp.Body)
+			        if err != nil {
+				        fmt.Println(err)
+				} else {
+					data = append(data, string(b))
+					rtn = true
+				}
+			}
+			defer resp.Body.Close()
+		}
+	}
+	return 
+}
diff --git a/event_subscriber.go b/event_subscriber.go
index a44d32f..ba7a2e9 100644
--- a/event_subscriber.go
+++ b/event_subscriber.go
@@ -25,8 +25,7 @@
 	"os"
 )
 
-const REDFISH_PATH = "/redfish/v1/EventService/Subscriptions/"
-const CONTENT_TYPE = "application/json"
+const RF_SUBSCRIPTION = "/EventService/Subscriptions"
 
 func add_subscription(ip string, event string) (rtn bool, id uint) {
 	rtn = false
@@ -38,7 +37,7 @@
 	subscrpt_info["Destination"] = "https://" + destip
 	subscrpt_info["EventTypes"] = []string{event}
 	sRequestJson, err := json.Marshal(subscrpt_info)
-	uri := ip + REDFISH_PATH
+	uri := ip + REDFISH_ROOT + RF_SUBSCRIPTION
 	client := http.Client{Timeout: 10 * time.Second}
 	resp, err := client.Post(uri, CONTENT_TYPE, bytes.NewBuffer(sRequestJson))
 	if err != nil {
@@ -67,7 +66,7 @@
 }
 
 func remove_subscription(ip string, id uint) bool {
-	uri := ip + REDFISH_PATH + strconv.Itoa(int(id))
+	uri := ip + REDFISH_ROOT + RF_SUBSCRIPTION + strconv.Itoa(int(id))
 	req, _ := http.NewRequest("DELETE", uri, nil)
 	resp, err := http.DefaultClient.Do(req)
 	if err != nil {
diff --git a/main.go b/main.go
index 771039e..9a09388 100644
--- a/main.go
+++ b/main.go
@@ -27,8 +27,13 @@
 	"crypto/tls"
 	empty "github.com/golang/protobuf/ptypes/empty"
         importer "./proto"
+	"time"
 )
 
+//globals
+const REDFISH_ROOT = "/redfish/v1"
+const CONTENT_TYPE = "application/json"
+
 var (
 	importerTopic = "importer"
 
@@ -38,9 +43,17 @@
 
 var default_events = [...]string{"ResourceAdded","ResourceRemoved","Alert"}
 
+var redfish_services = [...]string{"/Chassis", "/Systems","/EthernetSwitches"}
+
+type scheduler struct  {
+	getdata time.Ticker
+	quit chan bool
+}
+
 type device struct  {
 	subscriptions map[string]uint
-	freq uint32
+	datacollector scheduler
+	freqchan   chan uint32
 }
 
 type Server struct {
@@ -48,39 +61,34 @@
 	gRPCserver   *grpc.Server
 	dataproducer sarama.AsyncProducer
 	devicechan   chan *importer.DeviceInfo
-	freqchan   chan uint32
 }
 
-
 func (s *Server) GetEventList(c context.Context, info *importer.DeviceInfo) (*importer.EventList, error) {
-        fmt.Println("Received GetEventList\n")
-       eventstobesubscribed:= new(importer.EventList)
-       eventstobesubscribed.EventIpAddress = info.IpAddress
-       eventstobesubscribed.Events = append(eventstobesubscribed.Events,"ResourceAdded","ResourceRemoved","Alert")
-       return eventstobesubscribed, nil
+	fmt.Println("Received GetEventList\n")
+	eventstobesubscribed:= new(importer.EventList)
+	eventstobesubscribed.EventIpAddress = info.IpAddress
+	eventstobesubscribed.Events = append(eventstobesubscribed.Events,"ResourceAdded","ResourceRemoved","Alert")
+	return eventstobesubscribed, nil
 }
 
 func (s *Server) SetFrequency(c context.Context,  info *importer.DeviceInfo) (*empty.Empty, error) {
-        fmt.Println("Received SetFrequency\n")
-       device := s.devicemap[info.IpAddress]
-       device.freq = info.Frequency
-       //Inform scheduler frquency has changed
-       s.freqchan <- info.Frequency
-       return &empty.Empty{}, nil
+	fmt.Println("Received SetFrequency")
+	s.devicemap[info.IpAddress].freqchan <- info.Frequency
+	return &empty.Empty{}, nil
 }
 
 func (s *Server) SubsrcribeGivenEvents(c context.Context,  subeventlist *importer.EventList) (*empty.Empty, error) {
-        fmt.Println("Received SubsrcribeEvents\n")
-       //Call API to subscribe events
-       ip_address := subeventlist.EventIpAddress
-        for _, event := range subeventlist.Events {
-                rtn, id := add_subscription(ip_address, event)
-                if rtn {
-                        s.devicemap[ip_address].subscriptions[event] = id
-                        fmt.Println("subscription added", event, id)
-                }
-        }
-       return &empty.Empty{}, nil
+	fmt.Println("Received SubsrcribeEvents\n")
+	//Call API to subscribe events
+	ip_address := subeventlist.EventIpAddress
+	for _, event := range subeventlist.Events {
+		rtn, id := add_subscription(ip_address, event)
+		if rtn {
+			s.devicemap[ip_address].subscriptions[event] = id
+			fmt.Println("subscription added", event, id)
+		}
+	}
+	return &empty.Empty{}, nil
 }
 
 func (s *Server) UnSubsrcribeGivenEvents(c context.Context,  unsubeventlist *importer.EventList) (*empty.Empty, error) {
@@ -98,11 +106,42 @@
        return &empty.Empty{}, nil
 }
 
+func (s *Server) collect_data(ip_address string) {
+	freqchan := s.devicemap[ip_address].freqchan
+	ticker := s.devicemap[ip_address].datacollector.getdata
+	donechan := s.devicemap[ip_address].datacollector.quit
+	for {
+		select {
+		case freq := <- freqchan:
+			ticker.Stop()
+			ticker = *time.NewTicker(time.Duration(freq) * time.Second)
+		case <-ticker.C:
+			for _, service := range redfish_services {
+				rtn, data := get_status(ip_address, service)
+				if rtn {
+					for _, str := range data {
+						str = "Device IP: " + ip_address + " " + str
+						b := []byte(str)
+						s.dataproducer.Input() <- &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
+					}
+				}
+			}
+		case <-donechan:
+			ticker.Stop()
+			fmt.Println("getdata ticker stopped")
+			return
+		}
+	}
+}
 
 func (s *Server) SendDeviceInfo(c context.Context,  info *importer.DeviceInfo) (*empty.Empty, error) {
 	d := device {
 		subscriptions: make(map[string]uint),
-		freq:	info.Frequency,
+		datacollector: scheduler{
+			getdata: *time.NewTicker(time.Duration(info.Frequency) * time.Second),
+			quit: make(chan bool),
+		},
+		freqchan:        make(chan uint32),
 	}
 	s.devicemap[info.IpAddress] = &d
 	ip_address:= info.IpAddress
@@ -116,6 +155,7 @@
 			fmt.Println("subscription added", event, id)
 		}
 	}
+	go s.collect_data(ip_address)
 	return &empty.Empty{}, nil
 }
 
@@ -201,7 +241,6 @@
 	s := Server {
 		devicemap:	make(map[string]*device),
 		devicechan:	make(chan *importer.DeviceInfo),
-		freqchan:        make(chan uint32),
 	}
 	go s.kafkaInit()
 	go s.runServer()