SEBA-841 importer to parse all status from redfish server for data collection purpose / remove demotest binary
SEBA-856 SendDeviceList- This API will add all devices in the list
SEBA-858 DeleteDeviceList- This API will remove all devices in the list
decouple add/remove event subscription routines and device data file update
remove 'protocol'
SEBA-874 get rid of the 'vendor' argument called by some API's
Change-Id: Icc044dd4661c3cc14f02ad1a5f52e18116da63aa
diff --git a/data_collector.go b/data_collector.go
index 77a67d9..c626427 100644
--- a/data_collector.go
+++ b/data_collector.go
@@ -18,48 +18,90 @@
"net/http"
"fmt"
"encoding/json"
- "regexp"
- "strings"
"io/ioutil"
)
-func (s *Server) get_status(ip string, service string) (rtn bool, data []string) {
- rtn = false
+/* parse_map() parses the json structure, amap, and returns all sub-folder paths found at the 2nd level of the multiplayer structure
+*/
+func parse_map(amap map[string]interface{}, level uint, archive map[string]bool) (paths []string) {
+ level = level + 1
+ for key, val := range amap {
+ switch val.(type) {
+ case map[string]interface{}:
+ p := parse_map(val.(map[string]interface{}), level, archive)
+ paths = append(paths, p...)
+ case []interface{}:
+ p := parse_array(val.([]interface{}), level, archive)
+ paths = append(paths, p...)
+ default:
+ if level == 2 && key == "@odata.id" {
+ /* sub-folder path of a resource can be found as the value of the key '@odata.id' showing up at the 2nd level of the data read from a resource. When a path is found, it's checked against the array 'archive' to avoid duplicates. */
+ if _, ok := archive[val.(string)]; !ok {
+ archive[val.(string)] = true
+ paths = append(paths, val.(string))
+ }
+ }
+ }
+ }
+ return paths
+}
- uri := s.devicemap[ip].Protocol + "://"+ip + REDFISH_ROOT + service
- fmt.Printf("%q", uri)
- resp, err := http.Get(uri)
+/* parse_array() parses any vlaue, if in the form of an array, of a key-value pair found in the json structure, and returns any paths found.
+*/
+func parse_array(anarray []interface{}, level uint, archive map[string]bool) (paths []string) {
+ for _, val := range anarray {
+ switch val.(type) {
+ case map[string]interface{}:
+ p := parse_map(val.(map[string]interface{}), level, archive)
+ paths = append(paths, p...)
+ }
+ }
+ return paths
+}
+
+/* read_resource() reads data from the specified Redfish resource, including its sub-folders, of the specified device ip and rerutnrs the data read.
+
+Based on careful examination of the data returned from several resources sampled, it was determined that sub-folder paths can be found as the value to the key '@odata.id' showing up at the 2nd level of the data read from a resource.
+*/
+func read_resource(ip string, resource string, archive map[string]bool) (data []string) {
+ resp, err := http.Get(ip + resource)
+ if resp != nil {
+ defer resp.Body.Close()
+ }
if err != nil {
fmt.Println(err)
return
}
- body := make(map[string]interface{})
- json.NewDecoder(resp.Body).Decode(&body)
- resp.Body.Close()
-
- if members, ok := body["Members"]; ok {
- re := regexp.MustCompile(`\[([^\[\]]*)\]`)
- memberstr := fmt.Sprintf("%v", members)
- matches := re.FindAllString(memberstr, -1)
- for _, match := range matches {
- m := strings.Trim(match, "[]")
- uri = s.devicemap[ip].Protocol +"://"+ip + strings.TrimPrefix(m, "@odata.id:")
- fmt.Println("Printing URI")
- fmt.Println(uri)
- resp, err = http.Get(uri)
- if err != nil {
- fmt.Println(err)
- } else {
- b, err := ioutil.ReadAll(resp.Body)
- if err != nil {
- fmt.Println(err)
- } else {
- data = append(data, string(b))
- rtn = true
- }
- defer resp.Body.Close()
- }
- }
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println(err)
+ return
}
- return
+
+ data = append(data, string(body))
+
+ m := map[string]interface{}{}
+ err = json.Unmarshal([]byte(body), &m)
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+
+ resources := parse_map(m, 0, archive)
+
+ for _, resource := range resources {
+ d := read_resource(ip, resource, archive)
+ data = append(data, d...)
+ }
+ return data
+}
+
+/* sample JSON files can be found in the samples folder */
+func (s *Server) get_status(ip string, resource string) (data []string) {
+ archive := make(map[string]bool)
+ base_ip := RF_DEFAULT_PROTOCOL + ip
+ /* 'archive' maintains a list of all resources that will be/have been visited to avoid duplicates */
+ archive[resource] = true
+ data = read_resource(base_ip, resource, archive)
+ return data
}
diff --git a/demo_test/cmd_client/Makefile b/demo_test/cmd_client/Makefile
index 4c9a95e..c9f9579 100644
--- a/demo_test/cmd_client/Makefile
+++ b/demo_test/cmd_client/Makefile
@@ -12,6 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-cmd_cl:
+cmd_cl: cmd_cl.go
go build -i -v -o $@
diff --git a/demo_test/cmd_client/Note b/demo_test/cmd_client/Note
index 4781181..b72578a 100644
--- a/demo_test/cmd_client/Note
+++ b/demo_test/cmd_client/Note
@@ -26,9 +26,7 @@
Example:
- For the first time, you need to use "attach:ipaddress:port:vendorname:freq" to set the device's IP related info.
- If successful, you will get "[ipaddress:port]CMD to send:" prompt to indicate what device you attached now.
- This attach will subscribe default 3 types events into the server.
+ Attach a device will subscribe default 3 types events into the server.
You can use the following wget to check if any events subscriptions on the device.
wget --no-check-certificate \
@@ -42,60 +40,70 @@
--------------------------------------------------------------------------------------
Test items client cmd
--------------------------------------------------------------------------------------
-set device info Example:
+register one device Example:
-Set IP 192.168.4.27 port 8888 vendor "edgecore" freq 180
- attach:192.168.4.27:8888:edgecore:180
+Set IP 192.168.4.27 port 8888 freq 180
+ attach 192.168.4.27:8888:180
--------------------------------------------------------------------------------------
-set multi-device info Example:
+register multiple devices Example:
- attach:192.168.4.27:8888:edgecore:180
- attach:192.168.3.34:8888:edgecore:180
+Set "IP 192.168.4.27 port 8888 freq 180" and "IP 192.168.4.26 port 8888 freq 120"
+ attach 192.168.4.27:8888:180 192.168.4.26:8888:120
+
+--------------------------------------------------------------------------------------
+delete devices Example:
+
+Delete "IP 192.168.4.27" and "IP 192.168.3.34"
+
+ delete 192.168.4.27:8888 192.168.4.26:8888
--------------------------------------------------------------------------------------
Get Current List of Devices monitored showdevices
Return from server: 192.168.4.26:8888,192.168.4.27:8888
--------------------------------------------------------------------------------------
UnSubscribe all events(ResourceAdded/ResourceRemoved/Alert) unsub:192.168.4.27:8888:add:rm:alert
+=======
+UnSubscribe all events(ResourceAdded/ResourceRemoved/Alert) unsub 192.168.4.27:8888:add:rm:alert
+>>>>>>> SEBA-841 importer to parse all status from redfish server for data collection purpose / remove demotest binary
--------------------------------------------------------------------------------------
-Subscribe all events(ResourceAdded/ResourceRemoved/Alert) sub:192.168.4.27:8888:add:rm:alert
+Subscribe all events(ResourceAdded/ResourceRemoved/Alert) sub 192.168.4.27:8888:add:rm:alert
--------------------------------------------------------------------------------------
Subscribe and unsubscribe an event Example:
-Subscribe ResourceAdded event sub:192.168.4.27:8888:add
-Subscribe ResourceRemoved event sub:192.168.4.27:8888:rm
-Subscribe Alert event sub:192.168.4.27:8888:alert
-Unsubscribe ResourceAdded event unsub:192.168.4.27:8888:add
-Unsubscribe ResourceRemoved event unsub:192.168.4.27:8888:rm
-Unsubscribe Alert event unsub:192.168.4.27:8888:alert
+Subscribe ResourceAdded event sub 192.168.4.27:8888:add
+Subscribe ResourceRemoved event sub 192.168.4.27:8888:rm
+Subscribe Alert event sub 192.168.4.27:8888:alert
+Unsubscribe ResourceAdded event unsub 192.168.4.27:8888:add
+Unsubscribe ResourceRemoved event unsub 192.168.4.27:8888:rm
+Unsubscribe Alert event unsub 192.168.4.27:8888:alert
--------------------------------------------------------------------------------------
Subscribe and unsubscribe multiple events, out of order Use the above commands to do test.
--------------------------------------------------------------------------------------
-Subscribe an unsupported event sub:192.168.4.27:8888:update
+Subscribe an unsupported event sub 192.168.4.27:8888:update
--------------------------------------------------------------------------------------
Subscribe to an already subscribed event Example:
- sub:192.168.4.27:8888:add
- sub:192.168.4.27:8888:add
+ sub 192.168.4.27:8888:add
+ sub 192.168.4.27:8888:add
--------------------------------------------------------------------------------------
-Unsubscribe an unsupported event unsub:192.168.4.27:8888:update
+Unsubscribe an unsupported event unsub 192.168.4.27:8888:update
--------------------------------------------------------------------------------------
Unsubscribe a supported but not-subscribed event Example:
- unsub:192.168.4.27:8888:add:rm:alert
- unsub:192.168.4.27:8888:add
- unsub:192.168.4.27:8888:rm
- unsub:192.168.4.27:8888:alert
+ unsub 192.168.4.27:8888:add:rm:alert
+ unsub 192.168.4.27:8888:add
+ unsub 192.168.4.27:8888:rm
+ unsub 192.168.4.27:8888:alert
--------------------------------------------------------------------------------------
Change polling interval Example:
Set frequecny to 30 seconds
- period:192.168.4.27:8888:30
+ period 192.168.4.27:8888:30
--------------------------------------------------------------------------------------
-Show support event list vendor showeventlist:edgecore
+Show list of supported event showeventlist 192.168.4.27:8888
-----------------------------------------------------------------------------------------------
-Show current events subscribed by device showdeviceeventlist:192.168.4.27:8888
+Show current events subscribed by device showdeviceeventlist 192.168.4.27:8888
------------------------------------------------------------------------------------------------------------
-Clear all current events subscribed by device cleardeviceeventlist:192.168.4.27:8888
+Clear all current events subscribed by device cleardeviceeventlist 192.168.4.27:8888
-------------------------------------------------------------------------------------------------------------
* During and after each test, verify the list of events subscribed wget --no-check-certificate \
-qO- https://192.168.4.27:8888/redfish/v1/EventService/Subscriptions/ \
diff --git a/demo_test/cmd_client/cmd_cl.go b/demo_test/cmd_client/cmd_cl.go
index e1ee3bb..2f8d865 100644
--- a/demo_test/cmd_client/cmd_cl.go
+++ b/demo_test/cmd_client/cmd_cl.go
@@ -18,9 +18,6 @@
import "fmt"
import "bufio"
import "os"
-import "strings"
-
-var attach_ip string = ""
func main() {
// connect to this socket
@@ -29,29 +26,14 @@
reader := bufio.NewReader(os.Stdin)
for {
// read in input from stdin
- if(attach_ip != ""){
- fmt.Printf("[%v] CMD to send :", attach_ip)
- }else{
- fmt.Print("CMD to send :")
- }
+ fmt.Print("CMD to send : ")
text, _ := reader.ReadString('\n')
-
// send to socket
fmt.Fprintf(conn, text + "\n")
- cmd := strings.TrimSuffix(text, "\n")
- s := strings.Split(cmd, ":")
- cmd = s[0]
-
- if(cmd == "attach"){
- // listen for reply
- t_attach_ip, _ := bufio.NewReader(conn).ReadString('\n')
- attach_ip = strings.TrimSuffix(t_attach_ip, "\n")
- }else{
- // listen for reply
- message, _ = bufio.NewReader(conn).ReadString('\n')
- fmt.Print("Return from server: " + message)
- }
+ // listen for reply
+ message, _ = bufio.NewReader(conn).ReadString('\n')
+ fmt.Print("Return from server: " + message)
if message == "QUIT\n"{
break
diff --git a/demo_test/test.go b/demo_test/test.go
index d1d7ca9..741da16 100644
--- a/demo_test/test.go
+++ b/demo_test/test.go
@@ -43,10 +43,6 @@
"update": "Update"}
var default_address string = "localhost:31085"
-var default_port string = "8888"
-var default_vendor string = "edgecore"
-var default_freq uint64 = 180
-var attach_device_ip string = ""
var importerTopic = "importer"
var DataConsumer sarama.Consumer
@@ -54,128 +50,10 @@
var ctx context.Context
var conn *grpc.ClientConn
-/*///////////////////////////////////////////////////////////////////////*/
-// Allows user to register the device for data collection and frequency.
-//
-//
-/*///////////////////////////////////////////////////////////////////////*/
-func Attach(aip string, avendor string, afreq uint32) error {
- fmt.Println("Received Attach\n")
- var default_protocol string = "https"
- deviceinfo := new(importer.DeviceInfo)
- deviceinfo.IpAddress = aip
- deviceinfo.Vendor = avendor
- deviceinfo.Frequency = afreq
- deviceinfo.Protocol = default_protocol
- _, err := cc.SendDeviceInfo(ctx, deviceinfo)
-
- return err
-}
-
-/*///////////////////////////////////////////////////////////////////////*/
-// Allows user to change the frequency of data collection
-//
-//
-/*///////////////////////////////////////////////////////////////////////*/
-func UpdateFreq(ip_address string, wd uint32) error {
- fmt.Println("Received Period\n")
- freqinfo := new(importer.FreqInfo)
- freqinfo.Frequency = wd
- freqinfo.IpAddress = ip_address
- _, err := cc.SetFrequency(ctx, freqinfo)
-
- return err
-}
-
-/*///////////////////////////////////////////////////////////////////////*/
-// Allows user to unsubscribe events
-//
-//
-/*///////////////////////////////////////////////////////////////////////*/
-func Subscribe(ip_address string, Giveneventlist []string) error {
- fmt.Println("Received Subscribe\n")
- giveneventlist := new(importer.GivenEventList)
- giveneventlist.Events = Giveneventlist
- giveneventlist.EventIpAddress = ip_address
- _, err := cc.SubsrcribeGivenEvents(ctx, giveneventlist)
-
- return err
-}
-
-/*///////////////////////////////////////////////////////////////////////*/
-// Allows user to unsubscribe events
-//
-//
-/*///////////////////////////////////////////////////////////////////////*/
-func UnSubscribe(ip_address string, Giveneventlist []string) error {
- fmt.Println("Received UnSubscribe\n")
- giveneventlist := new(importer.GivenEventList)
- giveneventlist.Events = Giveneventlist
- giveneventlist.EventIpAddress = ip_address
- _, err := cc.UnSubsrcribeGivenEvents(ctx, giveneventlist)
-
- return err
-}
-
-/*///////////////////////////////////////////////////////////////////////*/
-// Allows user to get the events supported by device
-//
-//
-/*///////////////////////////////////////////////////////////////////////*/
-func GetEventSupportList(vendor string) (error, []string) {
- fmt.Println("Received GetEventSupportList\n")
- vendorinfo := new(importer.VendorInfo)
- vendorinfo.Vendor = vendor
- var ret_msg *importer.EventList
- ret_msg, err := cc.GetEventList(ctx, vendorinfo)
- if err != nil {
- return err, nil
- } else {
- return err, ret_msg.Events
- }
-}
-
-/*///////////////////////////////////////////////////////////////////////*/
-// Allows user to get the current events subscribed by device
-//
-//
-/*///////////////////////////////////////////////////////////////////////*/
-func GetEventCurrentDeviceList(ip_address string) (error, []string) {
- fmt.Println("Received GetEventCurrentDeviceList\n")
- currentdeviceinfo := new(importer.Device)
- currentdeviceinfo.IpAddress = ip_address
- var ret_msg *importer.EventList
- ret_msg, err := cc.GetCurrentEventList(ctx, currentdeviceinfo)
- if err != nil {
- return err, nil
- } else {
- return err, ret_msg.Events
- }
-}
-
-/*///////////////////////////////////////////////////////////////////////*/
-// Allows user to get the current events subscribed by device
-//
-//
-/*///////////////////////////////////////////////////////////////////////*/
-func ClearCurrentDeviceEventList(ip_address string) error {
- fmt.Println("Received ClearCurrentDeviceEventList\n")
- currentdeviceinfo := new(importer.Device)
- currentdeviceinfo.IpAddress = ip_address
- _, err := cc.ClearCurrentEventList(ctx, currentdeviceinfo)
-
- return err
-}
-
-/*///////////////////////////////////////////////////////////////////////*/
-// Allows user to get the current devices that are monitored
-//
-//
-/*///////////////////////////////////////////////////////////////////////*/
func GetCurrentDevices() (error, []string) {
fmt.Println("Testing GetCurrentDevices\n")
empty := new(importer.Empty)
- var ret_msg *importer.DeviceList
+ var ret_msg *importer.DeviceListByIp
ret_msg, err := cc.GetCurrentDevices(ctx, empty)
if err != nil {
return err, nil
@@ -259,208 +137,188 @@
if err != nil {
fmt.Println("Accept error")
log.Fatal("Accept error: %v", err)
- } else {
+ }
+ conn, err = grpc.Dial(default_address, grpc.WithInsecure())
+ if err != nil {
+ fmt.Println("could not connect")
+ log.Fatal("did not connect: %v", err)
+ }
+ defer conn.Close()
- conn, err = grpc.Dial(default_address, grpc.WithInsecure())
- if err != nil {
- fmt.Println("could not connect")
- log.Fatal("did not connect: %v", err)
- }
- defer conn.Close()
+ cc = importer.NewDeviceManagementClient(conn)
+ ctx = context.Background()
- cc = importer.NewDeviceManagementClient(conn)
- ctx = context.Background()
+ loop := true
- loop := true
+ for loop == true {
+ cmdstr, _ := bufio.NewReader(connS).ReadString('\n')
+ cmdstr = strings.TrimSuffix(cmdstr, "\n")
+ s := strings.Split(cmdstr, " ")
+ newmessage := ""
+ cmd := string(s[0])
- for loop == true {
- cmd, _ := bufio.NewReader(connS).ReadString('\n')
+ switch cmd {
- cmd = strings.TrimSuffix(cmd, "\n")
- s := strings.Split(cmd, ":")
- newmessage := "cmd error!!"
- cmd = s[0]
-
- switch string(cmd) {
-
- case "attach":
- cmd_size := len(s)
- var err error
- var uafreq uint64
- if cmd_size == 5 {
- aip := s[1]
- aport := s[2]
- avendor := s[3]
- afreq := s[4]
- uafreq, err = strconv.ParseUint(afreq, 10, 64)
-
- if err != nil {
- fmt.Print("ParseUint error!!\n")
- }
-
- attach_device_ip = aip + ":" + aport
-
- err = Attach(attach_device_ip, avendor, uint32(uafreq))
- if err != nil {
- errStatus, _ := status.FromError(err)
- fmt.Println(errStatus.Message())
- fmt.Println(errStatus.Code())
- fmt.Print("attach error!!\n")
- newmessage = errStatus.Message()
-
- } else {
- fmt.Print("attatch IP:\n", attach_device_ip)
- newmessage = attach_device_ip
- }
- } else {
- fmt.Print("Need IP addres,port,vendor,freqs !!\n")
- newmessage = "Need IP address !!"
-
+ case "attach" :
+ if len(s) < 2 {
+ newmessage = newmessage + "invalid command " + cmdstr + "\n"
+ break
+ }
+ var devicelist importer.DeviceList
+ var ipattached []string
+ for _, devinfo := range s[1:] {
+ info := strings.Split(devinfo, ":")
+ if len(info) != 3 {
+ newmessage = newmessage + "invalid command " + devinfo + "\n"
+ continue
}
-
- case "period":
- cmd_size := len(s)
- fmt.Print("cmd_size period %d", cmd_size)
- if cmd_size == 4 {
- fip := s[1]
- fport := s[2]
- pv := s[3]
- fmt.Print("pv:", pv)
- u, err := strconv.ParseUint(pv, 10, 64)
-
- if err != nil {
- fmt.Print("ParseUint error!!\n")
- } else {
- wd := uint32(u)
- ip_address := fip + ":" + fport
- err = UpdateFreq(ip_address, wd)
-
- if err != nil {
- errStatus, _ := status.FromError(err)
- fmt.Println(errStatus.Message())
- fmt.Println(errStatus.Code())
- newmessage = errStatus.Message()
- fmt.Print("period error!!\n")
- } else {
- newmessage = strings.ToUpper(cmd)
- }
- }
- } else {
- fmt.Print("Need period value !!\n")
- newmessage = "Need period value !!"
+ deviceinfo := new(importer.DeviceInfo)
+ deviceinfo.IpAddress = info[0] + ":" + info[1]
+ freq, err := strconv.ParseUint(info[2], 10, 32)
+ if (err != nil) {
+ newmessage = newmessage + "invalid command " + devinfo + "\n"
+ continue
}
+ deviceinfo.Frequency = uint32(freq)
+ devicelist.Device = append(devicelist.Device, deviceinfo)
+ ipattached = append(ipattached, deviceinfo.IpAddress)
+ }
+ _, err := cc.SendDeviceList(ctx, &devicelist)
+ if err != nil {
+ errStatus, _ := status.FromError(err)
+ newmessage = newmessage + errStatus.Message() + "\n"
+ fmt.Printf("attach error - status code %v message %v", errStatus.Code(), errStatus.Message())
+ } else {
+ ips := strings.Join(ipattached, " ")
+ newmessage = newmessage + ips + " attached\n"
+ }
+ case "delete" :
+ if len(s) < 2 {
+ newmessage = newmessage + "invalid command " + cmdstr + "\n"
+ break
+ }
+ var devicelist importer.DeviceListByIp
+ for _, ip := range s[1:] {
+ devicelist.Ip = append(devicelist.Ip, ip)
+ }
+ _, err := cc.DeleteDeviceList(ctx, &devicelist)
+ if err != nil {
+ errStatus, _ := status.FromError(err)
+ newmessage = newmessage + errStatus.Message() + "\n"
+ fmt.Printf("delete error - status code %v message %v", errStatus.Code(), errStatus.Message())
+ } else {
+ ips := strings.Join(devicelist.Ip, " ")
+ newmessage = newmessage + ips + " deleted\n"
+ }
+ case "period" :
+ if len(s) != 2 {
+ newmessage = newmessage + "invalid command " + cmdstr + "\n"
+ break
+ }
+ args := strings.Split(s[1], ":")
+ if len(args) != 3 {
+ newmessage = newmessage + "invalid command " + s[1] + "\n"
+ break
+ }
+ ip := args[0] + ":" + args[1]
+ pv := args[2]
+ u, err := strconv.ParseUint(pv, 10, 64)
+ if err != nil {
+ fmt.Print("ParseUint error!!\n")
+ } else {
+ freqinfo := new(importer.FreqInfo)
+ freqinfo.Frequency = uint32(u)
+ freqinfo.IpAddress = ip
+ _, err := cc.SetFrequency(ctx, freqinfo)
- case "sub", "unsub":
- cmd_size := len(s)
- fmt.Print("cmd is :", cmd, cmd_size)
- if cmd_size > 6 || cmd_size < 0 {
- fmt.Print("error event !!")
- newmessage = "error event !!"
+ if err != nil {
+ errStatus, _ := status.FromError(err)
+ newmessage = newmessage + errStatus.Message()
+ fmt.Printf("period error - status code %v message %v", errStatus.Code(), errStatus.Message())
} else {
- ip := s[1]
- port := s[2]
- ip_address := ip + ":" + port
- var events_list []string
- for i := 3; i < cmd_size; i++ {
- if value, ok := EVENTS_MAP[s[i]]; ok {
- events_list = append(events_list, value)
- } else {
- fmt.Println("key not found")
- }
- }
-
- if string(cmd) == "sub" {
- err = Subscribe(ip_address, events_list)
- if err != nil {
- errStatus, _ := status.FromError(err)
- fmt.Println(errStatus.Message())
- fmt.Println(errStatus.Code())
- newmessage = errStatus.Message()
- fmt.Print("sub error!!")
- } else {
- newmessage = strings.ToUpper(cmd)
- }
- } else {
- err = UnSubscribe(ip_address, events_list)
- if err != nil {
- errStatus, _ := status.FromError(err)
- fmt.Println(errStatus.Message())
- fmt.Println(errStatus.Code())
- newmessage = errStatus.Message()
- fmt.Print("unsub error!!")
- } else {
- newmessage = strings.ToUpper(cmd)
- }
- }
+ newmessage = newmessage + "data collection interval configured to " + pv + " seconds\n"
}
-
- case "showeventlist":
- cmd_size := len(s)
- fmt.Print("cmd is :", cmd, cmd_size)
- if cmd_size > 3 || cmd_size < 0 {
- fmt.Print("error event !!")
- newmessage = "error event !!"
- } else {
- vendor := s[1]
- err, supportlist := GetEventSupportList(vendor)
-
- if err != nil {
- errStatus, _ := status.FromError(err)
- fmt.Println(errStatus.Message())
- fmt.Println(errStatus.Code())
- newmessage = errStatus.Message()
- fmt.Print("showeventlist error!!")
- } else {
- fmt.Print("showeventlist ", supportlist)
- newmessage = strings.Join(supportlist[:], ",")
- }
+ }
+ case "sub", "unsub":
+ if len(s) != 2 {
+ newmessage = newmessage + "invalid command " + cmdstr + "\n"
+ break
+ }
+ args := strings.Split(s[1], ":")
+ if len(args) < 3 {
+ newmessage = newmessage + "invalid command " + s[1] + "\n"
+ break
+ }
+ giveneventlist := new(importer.GivenEventList)
+ giveneventlist.EventIpAddress = args[0] + ":" + args[1]
+ for _, event := range args[2:] {
+ if value, ok := EVENTS_MAP[event]; ok {
+ giveneventlist.Events = append(giveneventlist.Events, value)
}
-
- case "showdeviceeventlist":
- cmd_size := len(s)
- fmt.Print("cmd is :", cmd, cmd_size)
- if cmd_size > 4 || cmd_size < 0 {
- fmt.Print("error event !!")
- newmessage = "error event !!"
- } else {
- eip := s[1]
- eport := s[2]
- ip_address := eip + ":" + eport
- err, currentlist := GetEventCurrentDeviceList(ip_address)
-
- if err != nil {
- errStatus, _ := status.FromError(err)
- fmt.Println(errStatus.Message())
- fmt.Println(errStatus.Code())
- newmessage = errStatus.Message()
- fmt.Print("showdeviceeventlist error!!")
- } else {
- fmt.Print("showeventlist ", currentlist)
- newmessage = strings.Join(currentlist[:], ",")
- }
- }
-
- case "cleardeviceeventlist":
- cmd_size := len(s)
- fmt.Print("cmd is :", cmd, cmd_size)
- if cmd_size > 4 || cmd_size < 0 {
- fmt.Print("error event !!")
- newmessage = "error event !!"
- } else {
- clip := s[1]
- clport := s[2]
- ip_address := clip + ":" + clport
- err = ClearCurrentDeviceEventList(ip_address)
- if err != nil {
- errStatus, _ := status.FromError(err)
- fmt.Println(errStatus.Message())
- fmt.Println(errStatus.Code())
- newmessage = errStatus.Message()
- fmt.Print("cleardeviceeventlist error!!")
- } else {
- newmessage = strings.ToUpper(cmd)
- }
- }
+ }
+ var err error
+ if cmd == "sub" {
+ _, err = cc.SubsrcribeGivenEvents(ctx, giveneventlist)
+ } else {
+ _, err = cc.UnSubsrcribeGivenEvents(ctx, giveneventlist)
+ }
+ if err != nil {
+ errStatus, _ := status.FromError(err)
+ newmessage = newmessage + errStatus.Message()
+ fmt.Printf("Un/subscribe error - status code %v message %v", errStatus.Code(), errStatus.Message())
+ } else {
+ newmessage = newmessage + cmd + " successful\n"
+ }
+ case "showeventlist":
+ if len(s) != 2 {
+ newmessage = newmessage + "invalid command " + s[1] + "\n"
+ break
+ }
+ currentdeviceinfo := new(importer.Device)
+ currentdeviceinfo.IpAddress = s[1]
+ ret_msg, err := cc.GetEventList(ctx, currentdeviceinfo)
+ if err != nil {
+ errStatus, _ := status.FromError(err)
+ newmessage = errStatus.Message()
+ fmt.Printf("showeventlist error - status code %v message %v", errStatus.Code(), errStatus.Message())
+ } else {
+ fmt.Print("showeventlist ", ret_msg.Events)
+ newmessage = strings.Join(ret_msg.Events[:], ",")
+ }
+ case "showdeviceeventlist":
+ if len(s) != 2 {
+ newmessage = newmessage + "invalid command " + s[1] + "\n"
+ break
+ }
+ currentdeviceinfo := new(importer.Device)
+ currentdeviceinfo.IpAddress = s[1]
+ ret_msg, err := cc.GetCurrentEventList(ctx, currentdeviceinfo)
+ if err != nil {
+ errStatus, _ := status.FromError(err)
+ fmt.Printf("showdeviceeventlist error - status code %v message %v", errStatus.Code(), errStatus.Message())
+ newmessage = newmessage + errStatus.Message()
+ } else {
+ fmt.Print("showdeviceeventlist ", ret_msg.Events)
+ newmessage = strings.Join(ret_msg.Events[:], ",")
+ }
+ case "cleardeviceeventlist":
+ if len(s) != 2 {
+ newmessage = newmessage + "invalid command " + s[1] + "\n"
+ break
+ }
+ currentdeviceinfo := new(importer.Device)
+ currentdeviceinfo.IpAddress = s[1]
+ _, err := cc.ClearCurrentEventList(ctx, currentdeviceinfo)
+ if err != nil {
+ errStatus, _ := status.FromError(err)
+ newmessage = newmessage + errStatus.Message()
+ fmt.Printf("cleardeviceeventlist error - status code %v message %v", errStatus.Code(), errStatus.Message())
+ } else {
+ newmessage = newmessage + currentdeviceinfo.IpAddress + " events cleared\n"
+ }
+ case "QUIT":
+ loop = false
+ newmessage = "QUIT"
case "showdevices":
cmd_size := len(s)
@@ -482,14 +340,10 @@
newmessage = strings.Join(currentlist[:], ", ")
}
}
- case "QUIT":
- loop = false
- newmessage = "QUIT"
-
- default:
- }
- // send string back to client
- connS.Write([]byte(newmessage + "\n"))
+ default:
+ newmessage = newmessage + "invalid command " + cmdstr + "\n"
}
+ // send string back to client
+ connS.Write([]byte(newmessage + "\n"))
}
}
diff --git a/event_subscriber.go b/event_subscriber.go
index 973a765..d4b75a3 100644
--- a/event_subscriber.go
+++ b/event_subscriber.go
@@ -21,27 +21,31 @@
"net/http"
"os"
"regexp"
+ "io/ioutil"
)
-const RF_SUBSCRIPTION = "/EventService/Subscriptions/"
+const RF_EVENTSERVICE = "/redfish/v1/EventService/"
+const RF_SUBSCRIPTION = RF_EVENTSERVICE + "Subscriptions/"
-func (s *Server) add_subscription(ip string, event string, f *os.File) (rtn bool) {
+func (s *Server) add_subscription(ip string, event string) (rtn bool) {
rtn = false
destip := os.Getenv("EVENT_NOTIFICATION_DESTIP") + ":" + os.Getenv("DEVICE_MANAGEMENT_DESTPORT")
subscrpt_info := map[string]interface{}{"Context": "TBD-" + destip, "Protocol": "Redfish"}
subscrpt_info["Name"] = event + " event subscription"
- subscrpt_info["Destination"] = "https://" + destip
+ subscrpt_info["Destination"] = RF_DEFAULT_PROTOCOL + destip
subscrpt_info["EventTypes"] = []string{event}
sRequestJson, err := json.Marshal(subscrpt_info)
- uri := s.devicemap[ip].Protocol + "://" + ip + REDFISH_ROOT + RF_SUBSCRIPTION
+ uri := RF_DEFAULT_PROTOCOL + ip + RF_SUBSCRIPTION
client := s.httpclient
resp, err := client.Post(uri, CONTENT_TYPE, bytes.NewBuffer(sRequestJson))
+ if resp != nil {
+ defer resp.Body.Close()
+ }
if err != nil {
fmt.Println(err)
return
}
- defer resp.Body.Close()
if resp.StatusCode != 201 {
result := make(map[string]interface{})
@@ -57,38 +61,22 @@
match := re.FindStringSubmatch(loc[0])
s.devicemap[ip].Subscriptions[event] = match[1]
- if f != nil {
- b, err := json.Marshal(s.devicemap[ip])
- fmt.Println(string(b))
- if err != nil {
- fmt.Println(err)
- } else {
- f.Truncate(0)
- f.Seek(0, 0)
- n, err := f.Write(b)
- if err != nil {
- fmt.Println("err wrote", n, "bytes")
- fmt.Println(err)
- }
- }
- } else {
- fmt.Println("file handle is nil")
- }
-
fmt.Println("Subscription", event, "id", match[1], "was successfully added")
return
}
-func (s *Server) remove_subscription(ip string, event string, f *os.File) bool {
+func (s *Server) remove_subscription(ip string, event string) bool {
id := s.devicemap[ip].Subscriptions[event]
- uri := s.devicemap[ip].Protocol + "://" + ip + REDFISH_ROOT + RF_SUBSCRIPTION + id
+ uri := RF_DEFAULT_PROTOCOL + ip + RF_SUBSCRIPTION + id
req, _ := http.NewRequest("DELETE", uri, nil)
resp, err := http.DefaultClient.Do(req)
+ if resp != nil {
+ defer resp.Body.Close()
+ }
if err != nil {
fmt.Println(err)
return false
}
- defer resp.Body.Close()
if code := resp.StatusCode; code < 200 && code > 299 {
result := make(map[string]interface{})
@@ -100,22 +88,36 @@
}
delete(s.devicemap[ip].Subscriptions, event)
- if f != nil {
- b, err := json.Marshal(s.devicemap[ip])
- if err != nil {
- fmt.Println(err)
- } else {
- f.Truncate(0)
- f.Seek(0, 0)
- n, err := f.Write(b)
- if err != nil {
- fmt.Println("!!!!! err wrote", n, "bytes")
- fmt.Println(err)
- } else {
- fmt.Println("wrote", n, "bytes")
- }
- }
- }
fmt.Println("Subscription id", id, "was successfully removed")
return true
}
+
+func (s *Server) get_event_types(ip string) (eventtypes []string ) {
+ resp, err := http.Get(RF_DEFAULT_PROTOCOL + ip + RF_EVENTSERVICE)
+ fmt.Println("get_event_types")
+ if resp != nil {
+ defer resp.Body.Close()
+ }
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+
+ m := map[string]interface{}{}
+ err = json.Unmarshal([]byte(body), &m)
+ if err != nil {
+ fmt.Println(err)
+ return
+ }
+ e := m["EventTypesForSubscription"].([]interface{})
+ fmt.Printf("supported event types %v\n", e)
+ for _, val := range e {
+ eventtypes = append(eventtypes, val.(string))
+ }
+ return
+}
diff --git a/main.go b/main.go
index 0510876..d01685d 100644
--- a/main.go
+++ b/main.go
@@ -36,7 +36,9 @@
)
//globals
-const REDFISH_ROOT = "/redfish/v1"
+const RF_DEFAULT_PROTOCOL = "https://"
+const RF_DATA_COLLECT_THRESHOLD = 5
+const RF_DATA_COLLECT_DUMMY_INTERVAL = 1000
const CONTENT_TYPE = "application/json"
var (
@@ -45,33 +47,30 @@
var DataProducer sarama.AsyncProducer
-var vendor_default_events = map[string][]string{
- "edgecore": {"ResourceAdded", "ResourceRemoved", "Alert"},
-}
-var redfish_services = [...]string{"/Chassis", "/Systems", "/EthernetSwitches"}
+var redfish_resources = [...]string{"/redfish/v1/Chassis", "/redfish/v1/Systems","/redfish/v1/EthernetSwitches"}
var pvmount = os.Getenv("DEVICE_MANAGEMENT_PVMOUNT")
var subscriptionListPath string
-type scheduler struct {
+type scheduler struct {
getdata *time.Ticker
- quit chan bool
+ quit chan bool
+ getdataend chan bool
}
-type device struct {
- Subscriptions map[string]string `json:"ss"`
- Freq uint32 `json:"freq"`
- Datacollector scheduler `json:"-"`
- Freqchan chan uint32 `json:"-"`
- Vendor string `json:"vendor"`
- Protocol string `json:"protocol"`
+type device struct {
+ Subscriptions map[string]string `json:"ss"`
+ Freq uint32 `json:"freq"`
+ Datacollector scheduler `json:"-"`
+ Freqchan chan uint32 `json:"-"`
+ Eventtypes []string `json:"eventtypes"`
+ Datafile *os.File `json:"-"`
}
type Server struct {
- devicemap map[string]*device
- gRPCserver *grpc.Server
- dataproducer sarama.AsyncProducer
- httpclient *http.Client
- devicechan chan *importer.DeviceInfo
+ devicemap map[string]*device
+ gRPCserver *grpc.Server
+ dataproducer sarama.AsyncProducer
+ httpclient *http.Client
}
func (s *Server) ClearCurrentEventList(c context.Context, info *importer.Device) (*empty.Empty, error) {
@@ -81,18 +80,15 @@
if !found {
return nil, status.Errorf(codes.NotFound, "Device not registered")
}
- f := get_subscription_list(ip_address)
for event, _ := range s.devicemap[ip_address].Subscriptions {
- rtn := s.remove_subscription(ip_address, event, f)
+ rtn := s.remove_subscription(ip_address, event)
if !rtn {
log.WithFields(log.Fields{
"Event": event,
}).Info("Error removing event")
}
}
- if f != nil {
- f.Close()
- }
+ s.update_data_file(ip_address)
return &empty.Empty{}, nil
}
@@ -109,14 +105,14 @@
return currentevents, nil
}
-func (s *Server) GetEventList(c context.Context, info *importer.VendorInfo) (*importer.EventList, error) {
+func (s *Server) GetEventList(c context.Context, info *importer.Device) (*importer.EventList, error) {
fmt.Println("Received GetEventList\n")
- _, found := vendor_default_events[info.Vendor]
- if !found {
- return nil, status.Errorf(codes.NotFound, "Invalid Vendor Provided")
- }
eventstobesubscribed := new(importer.EventList)
- eventstobesubscribed.Events = vendor_default_events[info.Vendor]
+// eventstobesubscribed.Events = s.devicemap[info.IpAddress].Eventtypes
+ eventstobesubscribed.Events = s.get_event_types(info.IpAddress)
+ if eventstobesubscribed.Events == nil {
+ return nil, status.Errorf(codes.NotFound, "No events found")
+ }
return eventstobesubscribed, nil
}
@@ -126,12 +122,17 @@
if !found {
return nil, status.Errorf(codes.NotFound, "Device not registered")
}
-
+ if info.Frequency > 0 && info.Frequency < RF_DATA_COLLECT_THRESHOLD {
+ return nil, status.Errorf(codes.InvalidArgument, "Invalid frequency")
+ }
s.devicemap[info.IpAddress].Freqchan <- info.Frequency
+ s.devicemap[info.IpAddress].Freq = info.Frequency
+ s.update_data_file(info.IpAddress)
return &empty.Empty{}, nil
}
func (s *Server) SubsrcribeGivenEvents(c context.Context, subeventlist *importer.GivenEventList) (*empty.Empty, error) {
+ errstring := ""
fmt.Println("Received SubsrcribeEvents\n")
//Call API to subscribe events
ip_address := subeventlist.EventIpAddress
@@ -142,28 +143,31 @@
if len(subeventlist.Events) <= 0 {
return nil, status.Errorf(codes.InvalidArgument, "Event list is empty")
}
- f := get_subscription_list(ip_address)
for _, event := range subeventlist.Events {
if _, ok := s.devicemap[ip_address].Subscriptions[event]; !ok {
- rtn := s.add_subscription(ip_address, event, f)
+ rtn := s.add_subscription(ip_address, event)
if !rtn {
+ errstring = errstring + "failed to subscribe event " + ip_address + " " + event + "\n"
log.WithFields(log.Fields{
"Event": event,
}).Info("Error adding event")
}
} else {
+ errstring = errstring + "event " + event + " already subscribed\n"
log.WithFields(log.Fields{
"Event": event,
}).Info("Already Subscribed")
}
}
- if f != nil {
- f.Close()
+ s.update_data_file(ip_address)
+ if errstring != "" {
+ return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
}
return &empty.Empty{}, nil
}
func (s *Server) UnSubsrcribeGivenEvents(c context.Context, unsubeventlist *importer.GivenEventList) (*empty.Empty, error) {
+ errstring := ""
fmt.Println("Received UnSubsrcribeEvents\n")
ip_address := unsubeventlist.EventIpAddress
_, found := s.devicemap[ip_address]
@@ -175,28 +179,50 @@
return nil, status.Errorf(codes.InvalidArgument, "Event list is empty")
}
//Call API to unsubscribe events
- f := get_subscription_list(ip_address)
for _, event := range unsubeventlist.Events {
if _, ok := s.devicemap[ip_address].Subscriptions[event]; ok {
- rtn := s.remove_subscription(ip_address, event, f)
+ rtn := s.remove_subscription(ip_address, event)
if !rtn {
+ errstring = errstring + "failed to unsubscribe event " + ip_address + " " + event + "\n"
log.WithFields(log.Fields{
"Event": event,
}).Info("Error removing event")
}
} else {
+ errstring = errstring + "event " + event + " not found\n"
log.WithFields(log.Fields{
"Event": event,
}).Info("was not Subscribed")
}
}
- if f != nil {
- f.Close()
- }
+ s.update_data_file(ip_address)
+ if errstring != "" {
+ return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
+ }
return &empty.Empty{}, nil
}
+func (s *Server) update_data_file(ip_address string) {
+ f := s.devicemap[ip_address].Datafile
+ if f != nil {
+ b, err := json.Marshal(s.devicemap[ip_address])
+ if err != nil {
+ fmt.Println(err)
+ } else {
+ f.Truncate(0)
+ f.Seek(0, 0)
+ n, err := f.Write(b)
+ if err != nil {
+ fmt.Println("err wrote", n, "bytes")
+ fmt.Println(err)
+ }
+ }
+ } else {
+ fmt.Println("file handle is nil", ip_address)
+ }
+}
+
func (s *Server) collect_data(ip_address string) {
freqchan := s.devicemap[ip_address].Freqchan
ticker := s.devicemap[ip_address].Datacollector.getdata
@@ -207,83 +233,141 @@
ticker.Stop()
if freq > 0 {
ticker = time.NewTicker(time.Duration(freq) * time.Second)
+ s.devicemap[ip_address].Datacollector.getdata = ticker
}
case err := <-s.dataproducer.Errors():
fmt.Println("Failed to produce message:", err)
case <-ticker.C:
- for _, service := range redfish_services {
- rtn, data := s.get_status(ip_address, service)
- if rtn {
- for _, str := range data {
- str = "Device IP: " + ip_address + " " + str
- fmt.Printf("collected data %s\n ...", str)
- b := []byte(str)
- msg := &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
- select {
- case s.dataproducer.Input() <- msg:
- fmt.Println("Produce message")
- default:
- }
+ for _, resource := range redfish_resources {
+ data := s.get_status(ip_address, resource)
+ for _, str := range data {
+ str = "Device IP: " + ip_address + " " + str
+ fmt.Printf("collected data %s\n ...", str)
+ b := []byte(str)
+ msg := &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
+ select {
+ case s.dataproducer.Input() <- msg:
+ fmt.Println("Produce message")
+ default:
}
}
}
case <-donechan:
ticker.Stop()
fmt.Println("getdata ticker stopped")
+ s.devicemap[ip_address].Datacollector.getdataend <- true
return
}
}
}
-func (s *Server) SendDeviceInfo(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
- d := device{
- Subscriptions: make(map[string]string),
- Freq: info.Frequency,
- Datacollector: scheduler{
- getdata: time.NewTicker(time.Duration(info.Frequency) * time.Second),
- quit: make(chan bool),
- },
- Freqchan: make(chan uint32),
- Vendor: info.Vendor,
- Protocol: info.Protocol,
- }
- _, found := s.devicemap[info.IpAddress]
- if found {
- return nil, status.Errorf(codes.AlreadyExists, "Device Already registered")
- }
+func (s *Server) DeleteDeviceList(c context.Context, list *importer.DeviceListByIp) (*empty.Empty, error) {
+ fmt.Println("DeleteDeviceList received")
+ errstring := ""
+ for _, ip := range list.Ip {
+ if _, ok := s.devicemap[ip]; !ok {
+ fmt.Printf("Device not found ", ip)
+ errstring = errstring + "Device " + ip + " not found\n"
+ continue
+ }
+ for event, _ := range s.devicemap[ip].Subscriptions {
+ rtn := s.remove_subscription(ip, event)
+ if !rtn {
+ log.WithFields(log.Fields{
+ "Event": event,
+ }).Info("Error removing event")
+ }
+ }
+ fmt.Println("deleting device", ip)
+ s.devicemap[ip].Datacollector.quit <- true
- _, vendorfound := vendor_default_events[info.Vendor]
- if !vendorfound {
- return nil, status.Errorf(codes.NotFound, "Vendor Not Found")
+ f := s.devicemap[ip].Datafile
+ if f != nil {
+ fmt.Println("deleteing file", f.Name())
+ err := f.Close()
+ if err != nil {
+ fmt.Println("error closing file ", f.Name(), err)
+ errstring = errstring + "error closing file " + f.Name() + "\n"
+ }
+ err = os.Remove(f.Name())
+ if err != nil {
+ fmt.Println("error deleting file ", f.Name(), err)
+ }
+ } else {
+ errstring = errstring + "file " + ip + " not found\n"
+ }
+ <-s.devicemap[ip].Datacollector.getdataend
+ delete(s.devicemap, ip)
}
-
- //default_events := [...]string{}
- s.devicemap[info.IpAddress] = &d
- fmt.Printf("size of devicemap %d\n", len(s.devicemap))
- ip_address := info.IpAddress
- fmt.Printf("Configuring %s\n", ip_address)
- // call subscription function with info.IpAddress
-
- default_events := vendor_default_events[info.Vendor]
-
- f := get_subscription_list(ip_address)
- for _, event := range default_events {
- s.add_subscription(ip_address, event, f)
+ if errstring != "" {
+ return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
}
- if f != nil {
- f.Close()
- }
- go s.collect_data(ip_address)
return &empty.Empty{}, nil
}
-func (s *Server) GetCurrentDevices(c context.Context, e *importer.Empty) (*importer.DeviceList, error) {
+func (s *Server) SendDeviceList(c context.Context, list *importer.DeviceList) (*empty.Empty, error) {
+ errstring := ""
+ for _, dev := range list.Device {
+ ip_address:= dev.IpAddress
+ if _, ok := s.devicemap[dev.IpAddress]; ok {
+ fmt.Printf("Device %s already exists", ip_address)
+ errstring = errstring + "Device " + ip_address + " already exists\n"
+ continue
+ }
+
+ if dev.Frequency > 0 && dev.Frequency < RF_DATA_COLLECT_THRESHOLD {
+ fmt.Printf("Device %s data collection frequency %d out of range", ip_address, dev.Frequency)
+ errstring = errstring + "Device " + ip_address + " data collection frequency out of range\n"
+ continue
+ }
+ d := device {
+ Subscriptions: make(map[string]string),
+ Freq: dev.Frequency,
+ Datacollector: scheduler{
+ quit: make(chan bool),
+ getdataend: make(chan bool),
+ },
+ Freqchan: make(chan uint32),
+ }
+ s.devicemap[ip_address] = &d
+ fmt.Printf("Configuring %s\n", ip_address)
+
+ /* if initial interval is 0, create a dummy ticker, which is stopped right away, so getdata is not nil */
+ freq := dev.Frequency
+ if freq == 0 {
+ freq = RF_DATA_COLLECT_DUMMY_INTERVAL
+ }
+ s.devicemap[ip_address].Datacollector.getdata = time.NewTicker(time.Duration(freq) * time.Second)
+ if dev.Frequency == 0 {
+ s.devicemap[ip_address].Datacollector.getdata.Stop()
+ }
+
+ eventtypes := s.get_event_types(ip_address)
+ if eventtypes != nil {
+ for _, event := range eventtypes {
+ s.devicemap[ip_address].Eventtypes = append(s.devicemap[ip_address].Eventtypes, event)
+ if s.add_subscription(ip_address, event) == false {
+ errstring = errstring + "failed to subscribe event " + ip_address + " " + event + "\n"
+ }
+ }
+ }
+ go s.collect_data(ip_address)
+ s.devicemap[ip_address].Datafile = get_data_file(ip_address)
+ s.update_data_file(ip_address)
+ }
+ if errstring != "" {
+ return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
+ }
+ return &empty.Empty{}, nil
+}
+
+func (s *Server) GetCurrentDevices(c context.Context, e *importer.Empty) (*importer.DeviceListByIp, error) {
fmt.Println("In Received GetCurrentDevices\n")
if len(s.devicemap) == 0 {
return nil, status.Errorf(codes.NotFound, "Devices not registered")
}
- dl := new(importer.DeviceList)
+ dl := new(importer.DeviceListByIp)
for k, v := range s.devicemap {
if v != nil {
fmt.Printf("IpAdd[%s] \n", k)
@@ -362,26 +446,39 @@
}
func (s *Server) init_data_persistence() {
+ fmt.Println("Retrieving persisted data")
subscriptionListPath = pvmount + "/subscriptions"
if err := os.MkdirAll(subscriptionListPath, 0777); err != nil {
fmt.Println(err)
} else {
- lists, err := ioutil.ReadDir(subscriptionListPath)
+ files, err := ioutil.ReadDir(subscriptionListPath)
if err != nil {
fmt.Println(err)
} else {
- for _, list := range lists {
- b, err := ioutil.ReadFile(path.Join(subscriptionListPath, list.Name()))
+ for _, f := range files {
+ b, err := ioutil.ReadFile(path.Join(subscriptionListPath, f.Name()))
if err != nil {
fmt.Println(err)
- } else {
- ip := list.Name()
+ } else if f.Size() > 0 {
+ ip := f.Name()
d := device{}
json.Unmarshal(b, &d)
s.devicemap[ip] = &d
- s.devicemap[ip].Datacollector.getdata = time.NewTicker(time.Duration(s.devicemap[ip].Freq) * time.Second)
+ freq := s.devicemap[ip].Freq
+
+ /* if initial interval is 0, create a dummy ticker, which is stopped right away, so getdata is not nil */
+ if freq == 0 {
+ freq = RF_DATA_COLLECT_DUMMY_INTERVAL
+ }
+ s.devicemap[ip].Datacollector.getdata = time.NewTicker(time.Duration(freq) * time.Second)
+ if s.devicemap[ip].Freq == 0 {
+ s.devicemap[ip].Datacollector.getdata.Stop()
+ }
+
s.devicemap[ip].Datacollector.quit = make(chan bool)
+ s.devicemap[ip].Datacollector.getdataend = make(chan bool)
s.devicemap[ip].Freqchan = make(chan uint32)
+ s.devicemap[ip].Datafile = get_data_file(ip)
go s.collect_data(ip)
}
}
@@ -401,7 +498,7 @@
//sarama.Logger = log.New()
}
-func get_subscription_list(ip string) *os.File {
+func get_data_file(ip string) *os.File {
if pvmount == "" {
return nil
}
@@ -412,6 +509,12 @@
return f
}
+func (s *Server) close_data_files() {
+ for ip, _ := range s.devicemap {
+ s.devicemap[ip].Datafile.Close()
+ }
+}
+
func main() {
fmt.Println("Starting Device-management Container")
@@ -420,10 +523,9 @@
Timeout: 10 * time.Second,
}
- s := Server{
- devicemap: make(map[string]*device),
- devicechan: make(chan *importer.DeviceInfo),
- httpclient: client,
+ s := Server {
+ devicemap: make(map[string]*device),
+ httpclient: client,
}
s.kafkaInit()
@@ -441,5 +543,6 @@
case sig := <-quit:
fmt.Println("Shutting down:", sig)
s.kafkaCloseProducer()
+ s.close_data_files()
}
}
diff --git a/proto/importer.proto b/proto/importer.proto
index e7117b1..58d8772 100644
--- a/proto/importer.proto
+++ b/proto/importer.proto
@@ -10,8 +10,6 @@
message DeviceInfo {
string ip_address = 1;
uint32 frequency = 2;
- string vendor = 3;
- string protocol = 4;
}
message GivenEventList {
@@ -28,26 +26,28 @@
uint32 Frequency = 2;
}
-message VendorInfo {
- string Vendor = 1;
-}
-
message Device {
string IpAddress = 1;
}
-message DeviceList {
- repeated string ip = 1;
-}
-
message Empty {}
+message DeviceList {
+ repeated DeviceInfo device = 1;
+}
+
+message DeviceListByIp {
+ repeated string Ip = 1;
+}
+
service device_management {
- rpc SendDeviceInfo(DeviceInfo) returns (google.protobuf.Empty) {}
+ rpc SendDeviceList(DeviceList) returns (google.protobuf.Empty) {}
+
+ rpc DeleteDeviceList(DeviceListByIp) returns (google.protobuf.Empty) {}
rpc SetFrequency(FreqInfo) returns (google.protobuf.Empty) {}
- rpc GetEventList(VendorInfo) returns (EventList) {}
+ rpc GetEventList(Device) returns (EventList) {}
rpc SubsrcribeGivenEvents(GivenEventList) returns (google.protobuf.Empty) {}
@@ -57,7 +57,7 @@
rpc ClearCurrentEventList(Device) returns (google.protobuf.Empty) {}
- rpc GetCurrentDevices(Empty) returns (DeviceList) {}
+ rpc GetCurrentDevices(Empty) returns (DeviceListByIp) {}
}
diff --git a/samples/chassis.1.json b/samples/chassis.1.json
new file mode 100644
index 0000000..c796835
--- /dev/null
+++ b/samples/chassis.1.json
@@ -0,0 +1,82 @@
+{
+ "Oem" : {
+ "Intel_RackScale" : {
+ "Location" : {
+ "Id" : null,
+ "ParentId" : null
+ },
+ "@odata.type" : "#Intel.Oem.Chassis"
+ }
+ },
+ "Links" : {
+ "Oem" : {
+ "Intel_RackScale" : {
+ "@odata.type" : "#Intel.Oem.ChassisLinks",
+ "Switches" : [
+ {
+ "@odata.id" : "/redfish/v1/EthernetSwitches/1"
+ }
+ ]
+ }
+ },
+ "Drives" : [
+ {
+ "@odata.id" : "/redfish/v1/Chassis/1/Drives/1"
+ }
+ ],
+ "ManagedBy" : [
+ {
+ "@odata.id" : "/redfish/v1/Managers/1"
+ }
+ ],
+ "Storage" : [
+ {
+ "@odata.id" : "/redfish/v1/Systems/1/Storage/1"
+ }
+ ],
+ "ComputerSystems" : [
+ {
+ "@odata.id" : "/redfish/v1/Systems/1"
+ }
+ ],
+ "@odata.type" : "#Chassis.v1_2_0.Links",
+ "Contains" : []
+ },
+ "Power" : {
+ "@odata.id" : "/redfish/v1/Chassis/1/Power"
+ },
+ "Name" : "Chassis",
+ "Id" : "1",
+ "@odata.id" : "/redfish/v1/Chassis/1",
+ "@odata.context" : "/redfish/v1/$metadata#Chassis.Chassis",
+ "@odata.type" : "#Chassis.v1_3_0.Chassis",
+ "PowerState" : "On",
+ "PartNumber" : "",
+ "Status" : {
+ "State" : "Enabled",
+ "HealthRollup" : "OK",
+ "Health" : "OK"
+ },
+ "Actions" : {
+ "#Chassis.Reset" : {
+ "target" : "/redfish/v1/Chassis/1/Actions/Chassis.Reset",
+ "ResetType@Redfish.AllowableValues" : [
+ "ForceOff",
+ "GracefulShutdown",
+ "GracefulRestart",
+ "ForceRestart"
+ ]
+ }
+ },
+ "Manufacturer" : "",
+ "SerialNumber" : "",
+ "AssetTag" : "N/A",
+ "Description" : "Chassis of ",
+ "SKU" : null,
+ "Thermal" : {
+ "@odata.id" : "/redfish/v1/Chassis/1/Thermal"
+ },
+ "ChassisType" : "Drawer",
+ "IndicatorLED" : "Lit",
+ "Model" : ""
+}
diff --git a/samples/chassis.json b/samples/chassis.json
new file mode 100644
index 0000000..317f9c8
--- /dev/null
+++ b/samples/chassis.json
@@ -0,0 +1,13 @@
+{
+ "Name" : "Chassis Collection",
+ "@odata.context" : "/redfish/v1/$metadata#Chassis.Chassis",
+ "Members@odata.count" : 1,
+ "Description" : "Collection of Chassis",
+ "Members" : [
+ {
+ "@odata.id" : "/redfish/v1/Chassis/1"
+ }
+ ],
+ "@odata.type" : "#ChassisCollection.ChassisCollection",
+ "@odata.id" : "/redfish/v1/Chassis"
+}