SEBA-621 scheduler to collect data for Importer
Change-Id: Idf5f34ad5e99028648fefbe0605a9c78d91d2312
diff --git a/data_collector.go b/data_collector.go
new file mode 100644
index 0000000..b5b4d49
--- /dev/null
+++ b/data_collector.go
@@ -0,0 +1,62 @@
+// Copyright 2018 Open Networking Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "net/http"
+ "fmt"
+ "encoding/json"
+ "regexp"
+ "strings"
+ "io/ioutil"
+)
+
+func get_status(ip string, service string) (rtn bool, data []string) {
+ rtn = false
+
+ uri := ip + REDFISH_ROOT + service
+ resp, err := http.Get(uri)
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ body := make(map[string]interface{})
+ json.NewDecoder(resp.Body).Decode(&body)
+ resp.Body.Close()
+
+ if members, ok := body["Members"]; ok {
+ re := regexp.MustCompile(`\[([^\[\]]*)\]`)
+ memberstr := fmt.Sprintf("%v", members)
+ matches := re.FindAllString(memberstr, -1)
+ for _, match := range matches {
+ m := strings.Trim(match, "[]")
+ uri = ip + strings.TrimPrefix(m, "@odata.id:")
+ resp, err = http.Get(uri)
+ if err != nil {
+ fmt.Println(err)
+ } else {
+ b, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println(err)
+ } else {
+ data = append(data, string(b))
+ rtn = true
+ }
+ }
+ defer resp.Body.Close()
+ }
+ }
+ return
+}
diff --git a/event_subscriber.go b/event_subscriber.go
index a44d32f..ba7a2e9 100644
--- a/event_subscriber.go
+++ b/event_subscriber.go
@@ -25,8 +25,7 @@
"os"
)
-const REDFISH_PATH = "/redfish/v1/EventService/Subscriptions/"
-const CONTENT_TYPE = "application/json"
+const RF_SUBSCRIPTION = "/EventService/Subscriptions"
func add_subscription(ip string, event string) (rtn bool, id uint) {
rtn = false
@@ -38,7 +37,7 @@
subscrpt_info["Destination"] = "https://" + destip
subscrpt_info["EventTypes"] = []string{event}
sRequestJson, err := json.Marshal(subscrpt_info)
- uri := ip + REDFISH_PATH
+ uri := ip + REDFISH_ROOT + RF_SUBSCRIPTION
client := http.Client{Timeout: 10 * time.Second}
resp, err := client.Post(uri, CONTENT_TYPE, bytes.NewBuffer(sRequestJson))
if err != nil {
@@ -67,7 +66,7 @@
}
func remove_subscription(ip string, id uint) bool {
- uri := ip + REDFISH_PATH + strconv.Itoa(int(id))
+ uri := ip + REDFISH_ROOT + RF_SUBSCRIPTION + strconv.Itoa(int(id))
req, _ := http.NewRequest("DELETE", uri, nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
diff --git a/main.go b/main.go
index 771039e..9a09388 100644
--- a/main.go
+++ b/main.go
@@ -27,8 +27,13 @@
"crypto/tls"
empty "github.com/golang/protobuf/ptypes/empty"
importer "./proto"
+ "time"
)
+//globals
+const REDFISH_ROOT = "/redfish/v1"
+const CONTENT_TYPE = "application/json"
+
var (
importerTopic = "importer"
@@ -38,9 +43,17 @@
var default_events = [...]string{"ResourceAdded","ResourceRemoved","Alert"}
+var redfish_services = [...]string{"/Chassis", "/Systems","/EthernetSwitches"}
+
+type scheduler struct {
+ getdata time.Ticker
+ quit chan bool
+}
+
type device struct {
subscriptions map[string]uint
- freq uint32
+ datacollector scheduler
+ freqchan chan uint32
}
type Server struct {
@@ -48,39 +61,34 @@
gRPCserver *grpc.Server
dataproducer sarama.AsyncProducer
devicechan chan *importer.DeviceInfo
- freqchan chan uint32
}
-
func (s *Server) GetEventList(c context.Context, info *importer.DeviceInfo) (*importer.EventList, error) {
- fmt.Println("Received GetEventList\n")
- eventstobesubscribed:= new(importer.EventList)
- eventstobesubscribed.EventIpAddress = info.IpAddress
- eventstobesubscribed.Events = append(eventstobesubscribed.Events,"ResourceAdded","ResourceRemoved","Alert")
- return eventstobesubscribed, nil
+ fmt.Println("Received GetEventList\n")
+ eventstobesubscribed:= new(importer.EventList)
+ eventstobesubscribed.EventIpAddress = info.IpAddress
+ eventstobesubscribed.Events = append(eventstobesubscribed.Events,"ResourceAdded","ResourceRemoved","Alert")
+ return eventstobesubscribed, nil
}
func (s *Server) SetFrequency(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
- fmt.Println("Received SetFrequency\n")
- device := s.devicemap[info.IpAddress]
- device.freq = info.Frequency
- //Inform scheduler frquency has changed
- s.freqchan <- info.Frequency
- return &empty.Empty{}, nil
+ fmt.Println("Received SetFrequency")
+ s.devicemap[info.IpAddress].freqchan <- info.Frequency
+ return &empty.Empty{}, nil
}
func (s *Server) SubsrcribeGivenEvents(c context.Context, subeventlist *importer.EventList) (*empty.Empty, error) {
- fmt.Println("Received SubsrcribeEvents\n")
- //Call API to subscribe events
- ip_address := subeventlist.EventIpAddress
- for _, event := range subeventlist.Events {
- rtn, id := add_subscription(ip_address, event)
- if rtn {
- s.devicemap[ip_address].subscriptions[event] = id
- fmt.Println("subscription added", event, id)
- }
- }
- return &empty.Empty{}, nil
+ fmt.Println("Received SubsrcribeEvents\n")
+ //Call API to subscribe events
+ ip_address := subeventlist.EventIpAddress
+ for _, event := range subeventlist.Events {
+ rtn, id := add_subscription(ip_address, event)
+ if rtn {
+ s.devicemap[ip_address].subscriptions[event] = id
+ fmt.Println("subscription added", event, id)
+ }
+ }
+ return &empty.Empty{}, nil
}
func (s *Server) UnSubsrcribeGivenEvents(c context.Context, unsubeventlist *importer.EventList) (*empty.Empty, error) {
@@ -98,11 +106,42 @@
return &empty.Empty{}, nil
}
+func (s *Server) collect_data(ip_address string) {
+ freqchan := s.devicemap[ip_address].freqchan
+ ticker := s.devicemap[ip_address].datacollector.getdata
+ donechan := s.devicemap[ip_address].datacollector.quit
+ for {
+ select {
+ case freq := <- freqchan:
+ ticker.Stop()
+ ticker = *time.NewTicker(time.Duration(freq) * time.Second)
+ case <-ticker.C:
+ for _, service := range redfish_services {
+ rtn, data := get_status(ip_address, service)
+ if rtn {
+ for _, str := range data {
+ str = "Device IP: " + ip_address + " " + str
+ b := []byte(str)
+ s.dataproducer.Input() <- &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
+ }
+ }
+ }
+ case <-donechan:
+ ticker.Stop()
+ fmt.Println("getdata ticker stopped")
+ return
+ }
+ }
+}
func (s *Server) SendDeviceInfo(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
d := device {
subscriptions: make(map[string]uint),
- freq: info.Frequency,
+ datacollector: scheduler{
+ getdata: *time.NewTicker(time.Duration(info.Frequency) * time.Second),
+ quit: make(chan bool),
+ },
+ freqchan: make(chan uint32),
}
s.devicemap[info.IpAddress] = &d
ip_address:= info.IpAddress
@@ -116,6 +155,7 @@
fmt.Println("subscription added", event, id)
}
}
+ go s.collect_data(ip_address)
return &empty.Empty{}, nil
}
@@ -201,7 +241,6 @@
s := Server {
devicemap: make(map[string]*device),
devicechan: make(chan *importer.DeviceInfo),
- freqchan: make(chan uint32),
}
go s.kafkaInit()
go s.runServer()