SEBA-761 implemented persistent storage using local directory to save and restore device information across pod deployments / read pv mount path from an env variable

Change-Id: Ibb7c235a464d2f4388232d393a6ad836c2d45d73
diff --git a/data_collector.go b/data_collector.go
index f68f280..77a67d9 100644
--- a/data_collector.go
+++ b/data_collector.go
@@ -26,8 +26,7 @@
 func (s *Server) get_status(ip string, service string) (rtn bool, data []string) {
 	rtn = false
 
-//	uri := "https://"+ip + REDFISH_ROOT + service
-	uri := s.devicemap[ip].protocol+"://"+ip + REDFISH_ROOT + service
+	uri := s.devicemap[ip].Protocol + "://"+ip + REDFISH_ROOT + service
         fmt.Printf("%q", uri)
 	resp, err := http.Get(uri)
 	if err != nil {
@@ -44,7 +43,7 @@
 		matches := re.FindAllString(memberstr, -1)
 		for _, match := range matches {
 			m := strings.Trim(match, "[]")
-			uri = s.devicemap[ip].protocol +"://"+ip + strings.TrimPrefix(m, "@odata.id:")
+			uri = s.devicemap[ip].Protocol +"://"+ip + strings.TrimPrefix(m, "@odata.id:")
 			fmt.Println("Printing URI")
 			fmt.Println(uri)
 			resp, err = http.Get(uri)
@@ -58,8 +57,8 @@
 					data = append(data, string(b))
 					rtn = true
 				}
+				defer resp.Body.Close()
 			}
-			defer resp.Body.Close()
 		}
 	}
 	return 
diff --git a/event_subscriber.go b/event_subscriber.go
index b2f0112..0fb3294 100644
--- a/event_subscriber.go
+++ b/event_subscriber.go
@@ -20,16 +20,13 @@
 	"net/http"
 	"bytes"
 	"regexp"
-	"strconv"
-	"time"
 	"os"
 )
 
 const RF_SUBSCRIPTION = "/EventService/Subscriptions"
 
-func add_subscription(ip string, event string) (rtn bool, id uint) {
+func (s *Server) add_subscription(ip string, event string, f *os.File) (rtn bool) {
 	rtn = false
-	id = 0
 
 	destip := os.Getenv("EVENT_NOTIFICATION_DESTIP") + ":" + os.Getenv("DEVICE_MANAGEMENT_DESTPORT")
 	subscrpt_info := map[string]interface{}{"Context":"TBD","Protocol":"Redfish"}
@@ -37,8 +34,8 @@
 	subscrpt_info["Destination"] = "https://" + destip
 	subscrpt_info["EventTypes"] = []string{event}
 	sRequestJson, err := json.Marshal(subscrpt_info)
-	uri := ip + REDFISH_ROOT + RF_SUBSCRIPTION
-	client := http.Client{Timeout: 10 * time.Second}
+	uri := s.devicemap[ip].Protocol + "://" + ip + REDFISH_ROOT + RF_SUBSCRIPTION
+	client := s.httpclient
 	resp, err := client.Post(uri, CONTENT_TYPE, bytes.NewBuffer(sRequestJson))
 	if err != nil {
 		fmt.Println(err)
@@ -54,19 +51,37 @@
 		fmt.Println("Add ", event, " subscription failed. HTTP response status: ",  resp.Status)
 		return
 	}
-
 	rtn = true
 	loc := resp.Header["Location"]
 	re := regexp.MustCompile(`/(\w+)$`)
 	match := re.FindStringSubmatch(loc[0])
-	idint, _ := strconv.Atoi(match[1])
-	id = uint(idint)
-	fmt.Println("Subscription", event, "id", id, "was successfully added")
+	s.devicemap[ip].Subscriptions[event] = match[1]
+
+	if f != nil {
+		b, err := json.Marshal(s.devicemap[ip])
+fmt.Println(string(b))
+		if err != nil {
+			fmt.Println(err)
+		} else {
+			f.Truncate(0)
+			f.Seek(0, 0)
+			n, err := f.Write(b)
+			if err != nil {
+				fmt.Println("err wrote", n, "bytes")
+				fmt.Println(err)
+			}
+		}
+	} else {
+		fmt.Println("file handle is nil")
+	}
+
+	fmt.Println("Subscription", event, "id", match[1], "was successfully added")
 	return
 }
 
-func remove_subscription(ip string, id uint) bool {
-	uri := ip + REDFISH_ROOT + RF_SUBSCRIPTION + strconv.Itoa(int(id))
+func (s *Server) remove_subscription(ip string, event string, f *os.File) bool {
+	id := s.devicemap[ip].Subscriptions[event]
+	uri := s.devicemap[ip].Protocol + "://" + ip + REDFISH_ROOT + RF_SUBSCRIPTION + "/" + id
 	req, _ := http.NewRequest("DELETE", uri, nil)
 	resp, err := http.DefaultClient.Do(req)
 	if err != nil {
@@ -83,6 +98,24 @@
 		fmt.Println("Remove subscription failed. HTTP response status:", resp.Status)
 		return false
 	}
+	delete(s.devicemap[ip].Subscriptions, event)
+
+	if f != nil {
+		b, err := json.Marshal(s.devicemap[ip])
+		if err != nil {
+			fmt.Println(err)
+		} else {
+			f.Truncate(0)
+			f.Seek(0, 0)
+			n, err := f.Write(b)
+			if err != nil {
+				fmt.Println("!!!!! err wrote", n, "bytes")
+				fmt.Println(err)
+			} else {
+				fmt.Println("wrote", n, "bytes")
+			}
+		}
+	}
 	fmt.Println("Subscription id", id, "was successfully removed")
 	return true
 }
diff --git a/main.go b/main.go
index f6a6948..c0d7346 100644
--- a/main.go
+++ b/main.go
@@ -29,6 +29,8 @@
         importer "./proto"
         log "github.com/Sirupsen/logrus"
 	"time"
+	"encoding/json"
+	"path"
 )
 
 //globals
@@ -37,16 +39,16 @@
 
 var (
 	importerTopic = "importer"
-
 )
 
 var DataProducer sarama.AsyncProducer
 
-
 var    vendor_default_events = map[string][]string{
         "edgecore": {"ResourceAdded","ResourceRemoved","Alert"},
     }
 var redfish_services = [...]string{"/Chassis", "/Systems","/EthernetSwitches"}
+var pvmount = os.Getenv("DEVICE_MANAGEMENT_PVMOUNT")
+var subscriptionListPath string
 
 type scheduler struct  {
 	getdata time.Ticker
@@ -54,18 +56,20 @@
 }
 
 type device struct  {
-	subscriptions map[string]uint
-	datacollector scheduler
-	freqchan   chan uint32
-        vendor string
-	protocol string
+	Subscriptions	map[string]string	`json:"ss"`
+	Freq		uint32			`json:"freq"`
+	Datacollector	scheduler		`json:"-"`
+	Freqchan	chan uint32		`json:"-"`
+	Vendor		string			`json:"vendor"`
+	Protocol	string			`json:"protocol"`
 }
 
 type Server struct {
-	devicemap  map[string]*device
-	gRPCserver   *grpc.Server
-	dataproducer sarama.AsyncProducer
-	devicechan   chan *importer.DeviceInfo
+	devicemap	map[string]*device
+	gRPCserver	*grpc.Server
+	dataproducer	sarama.AsyncProducer
+	httpclient	*http.Client
+	devicechan	chan *importer.DeviceInfo
 }
 
 func (s *Server) GetEventList(c context.Context, info *importer.DeviceInfo) (*importer.SupportedEventList, error) {
@@ -77,7 +81,7 @@
 
 func (s *Server) SetFrequency(c context.Context,  info *importer.DeviceInfo) (*empty.Empty, error) {
 	fmt.Println("Received SetFrequency")
-	s.devicemap[info.IpAddress].freqchan <- info.Frequency
+	s.devicemap[info.IpAddress].Freqchan <- info.Frequency
 	return &empty.Empty{}, nil
 }
 
@@ -85,47 +89,43 @@
 	fmt.Println("Received SubsrcribeEvents\n")
 	//Call API to subscribe events
 	ip_address := subeventlist.EventIpAddress
+	f := get_subscription_list(ip_address)
 	for _, event := range subeventlist.Events {
-                if _, ok := s.devicemap[ip_address].subscriptions[event]; !ok {
-                 rtn, id := add_subscription(ip_address, event)
-                        if rtn {
-                                 s.devicemap[ip_address].subscriptions[event] = id
-                                fmt.Println("subscription added", event, id)
-                        }
-                } else {
-                                log.WithFields(log.Fields{
-                                  "Event": event,
-                                }).Info("Already Subscribed")
-                }
+		if _, ok := s.devicemap[ip_address].Subscriptions[event]; !ok {
+			s.add_subscription(ip_address, event, f)
+		} else {
+			log.WithFields(log.Fields{
+				"Event": event,
+				}).Info("Already Subscribed")
+		}
 	}
+	if f != nil { f.Close() }
 	return &empty.Empty{}, nil
 }
 
 func (s *Server) UnSubsrcribeGivenEvents(c context.Context,  unsubeventlist *importer.EventList) (*empty.Empty, error) {
-       fmt.Println("Received UnSubsrcribeEvents\n")
-       ip_address := unsubeventlist.EventIpAddress
-       //Call API to unsubscribe events
-        for _, event := range unsubeventlist.Events {
-                if _, ok := s.devicemap[ip_address].subscriptions[event]; ok {
-                 rtn := remove_subscription(ip_address, s.devicemap[ip_address].subscriptions[event] )
-                        if rtn {
-                                delete(s.devicemap[ip_address].subscriptions, event)
-                                fmt.Println("subscription removed", event)
-                        }
+	fmt.Println("Received UnSubsrcribeEvents\n")
+	ip_address := unsubeventlist.EventIpAddress
+	//Call API to unsubscribe events
+	f := get_subscription_list(ip_address)
+	for _, event := range unsubeventlist.Events {
+		if _, ok := s.devicemap[ip_address].Subscriptions[event]; ok {
+			s.remove_subscription(ip_address, event, f)
                 } else {
                                 log.WithFields(log.Fields{
                                   "Event": event,
                                 }).Info("was not Subscribed")
                         }
         }
+	if f != nil { f.Close() }
 
        return &empty.Empty{}, nil
 }
 
 func (s *Server) collect_data(ip_address string) {
-	freqchan := s.devicemap[ip_address].freqchan
-	ticker := s.devicemap[ip_address].datacollector.getdata
-	donechan := s.devicemap[ip_address].datacollector.quit
+	freqchan := s.devicemap[ip_address].Freqchan
+	ticker := s.devicemap[ip_address].Datacollector.getdata
+	donechan := s.devicemap[ip_address].Datacollector.quit
 	for {
 		select {
 		case freq := <-freqchan:
@@ -139,11 +139,11 @@
 				if rtn {
 					for _, str := range data {
 						str = "Device IP: " + ip_address + " " + str
-						fmt.Println("collected data  %s ...", str)
+						fmt.Printf("collected data  %s\n ...", str)
 						b := []byte(str)
 						msg := &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
-						// check if needs to add for select case
 						select {
+						// TODO: this is blocking, maybe a timer?
 						case  s.dataproducer.Input() <- msg:
 							fmt.Println("Produce message")
 						}
@@ -160,42 +160,36 @@
 
 func (s *Server) SendDeviceInfo(c context.Context,  info *importer.DeviceInfo) (*empty.Empty, error) {
 	d := device {
-		subscriptions: make(map[string]uint),
-		datacollector: scheduler{
+		Subscriptions: make(map[string]string),
+		Freq: info.Frequency,
+		Datacollector: scheduler{
 			getdata: *time.NewTicker(time.Duration(info.Frequency) * time.Second),
 			quit: make(chan bool),
 		},
-		freqchan:        make(chan uint32),
-                vendor: info.Vendor,
-		protocol: info.Protocol,
+		Freqchan: make(chan uint32),
+		Vendor: info.Vendor,
+		Protocol: info.Protocol,
 	}
         //default_events := [...]string{}
 	s.devicemap[info.IpAddress] = &d
-	fmt.Println("size of devicemap %d", len(s.devicemap))
+	fmt.Printf("size of devicemap %d\n", len(s.devicemap))
 	ip_address:= info.IpAddress
-	fmt.Println("Configuring  %s ...", ip_address)
+	fmt.Printf("Configuring  %s\n", ip_address)
 	// call subscription function with info.IpAddress
-	http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
+
 	default_events := vendor_default_events[info.Vendor]
+
+	f := get_subscription_list(ip_address)
 	for _, event := range default_events {
-	if _, ok := s.devicemap[ip_address].subscriptions[event]; !ok {
-		rtn, id := add_subscription(info.Protocol+"://"+ip_address, event)
-		if rtn {
-			s.devicemap[ip_address].subscriptions[event] = id
-			fmt.Println("subscription added", event, id)
-			}
-                } else {
-                                log.WithFields(log.Fields{
-                                  "Event": event,
-                                }).Info("was  Subscribed")
-                        }
+		s.add_subscription(ip_address, event, f)
 	}
+	if f != nil { f.Close() }
 	go s.collect_data(ip_address)
 	return &empty.Empty{}, nil
 }
 
 func NewGrpcServer(grpcport string) (l net.Listener, g *grpc.Server, e error) {
-        fmt.Println("Listening %s ...", grpcport)
+        fmt.Printf("Listening %s\n", grpcport)
         g = grpc.NewServer()
         l, e = net.Listen("tcp", grpcport)
         return
@@ -262,6 +256,34 @@
 	http.ListenAndServeTLS(":8080", "https-server.crt", "https-server.key", nil)
 }
 
+func (s *Server) init_data_persistence() {
+	subscriptionListPath = pvmount + "/subscriptions"
+	if err := os.MkdirAll(subscriptionListPath, 0777); err != nil {
+		fmt.Println(err)
+	} else {
+		lists, err := ioutil.ReadDir(subscriptionListPath)
+		if err != nil {
+			fmt.Println(err)
+		} else {
+			for _, list := range lists {
+				b, err := ioutil.ReadFile(path.Join(subscriptionListPath, list.Name()))
+				if err != nil {
+					fmt.Println(err)
+				} else {
+					ip := list.Name()
+					d := device{}
+					json.Unmarshal(b, &d)
+					s.devicemap[ip] = &d
+					s.devicemap[ip].Datacollector.getdata = *time.NewTicker(time.Duration(s.devicemap[ip].Freq) * time.Second)
+					s.devicemap[ip].Datacollector.quit = make(chan bool)
+					s.devicemap[ip].Freqchan = make(chan uint32)
+					go s.collect_data(ip)
+				}
+			}
+		}
+	}
+}
+
 func init() {
         Formatter := new(log.TextFormatter)
         Formatter.TimestampFormat = "02-01-2006 15:04:05"
@@ -274,17 +296,40 @@
 	//sarama.Logger = log.New()
 }
 
+func get_subscription_list(ip string) *os.File {
+	if pvmount == "" {
+		return nil
+	}
+	f, err := os.OpenFile(subscriptionListPath + "/" + ip, os.O_CREATE|os.O_RDWR, 0664)
+	if err != nil {
+		fmt.Println(err)
+	}
+	return f
+}
 
 func main() {
 	fmt.Println("Starting Device-management Container")
+
+	http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
+	client := &http.Client{
+		Timeout: 10 * time.Second,
+		}
+
 	s := Server {
 		devicemap:	make(map[string]*device),
 		devicechan:	make(chan *importer.DeviceInfo),
+		httpclient:	client,
 	}
-	// check if we should keep this as go routines
-	go s.kafkaInit()
+
+	s.kafkaInit()
+//TODO: check if we should keep this as goroutines?
 	go s.runServer()
 	go s.startgrpcserver()
+
+	if pvmount != "" {
+		s.init_data_persistence()
+	}
+
 	quit := make(chan os.Signal)
 	signal.Notify(quit, os.Interrupt)