SEBA-761 implemented persistent storage using local directory to save and restore device information across pod deployments / read pv mount path from an env variable

Change-Id: Ibb7c235a464d2f4388232d393a6ad836c2d45d73
diff --git a/main.go b/main.go
index f6a6948..c0d7346 100644
--- a/main.go
+++ b/main.go
@@ -29,6 +29,8 @@
         importer "./proto"
         log "github.com/Sirupsen/logrus"
 	"time"
+	"encoding/json"
+	"path"
 )
 
 //globals
@@ -37,16 +39,16 @@
 
 var (
 	importerTopic = "importer"
-
 )
 
 var DataProducer sarama.AsyncProducer
 
-
 var    vendor_default_events = map[string][]string{
         "edgecore": {"ResourceAdded","ResourceRemoved","Alert"},
     }
 var redfish_services = [...]string{"/Chassis", "/Systems","/EthernetSwitches"}
+var pvmount = os.Getenv("DEVICE_MANAGEMENT_PVMOUNT")
+var subscriptionListPath string
 
 type scheduler struct  {
 	getdata time.Ticker
@@ -54,18 +56,20 @@
 }
 
 type device struct  {
-	subscriptions map[string]uint
-	datacollector scheduler
-	freqchan   chan uint32
-        vendor string
-	protocol string
+	Subscriptions	map[string]string	`json:"ss"`
+	Freq		uint32			`json:"freq"`
+	Datacollector	scheduler		`json:"-"`
+	Freqchan	chan uint32		`json:"-"`
+	Vendor		string			`json:"vendor"`
+	Protocol	string			`json:"protocol"`
 }
 
 type Server struct {
-	devicemap  map[string]*device
-	gRPCserver   *grpc.Server
-	dataproducer sarama.AsyncProducer
-	devicechan   chan *importer.DeviceInfo
+	devicemap	map[string]*device
+	gRPCserver	*grpc.Server
+	dataproducer	sarama.AsyncProducer
+	httpclient	*http.Client
+	devicechan	chan *importer.DeviceInfo
 }
 
 func (s *Server) GetEventList(c context.Context, info *importer.DeviceInfo) (*importer.SupportedEventList, error) {
@@ -77,7 +81,7 @@
 
 func (s *Server) SetFrequency(c context.Context,  info *importer.DeviceInfo) (*empty.Empty, error) {
 	fmt.Println("Received SetFrequency")
-	s.devicemap[info.IpAddress].freqchan <- info.Frequency
+	s.devicemap[info.IpAddress].Freqchan <- info.Frequency
 	return &empty.Empty{}, nil
 }
 
@@ -85,47 +89,43 @@
 	fmt.Println("Received SubsrcribeEvents\n")
 	//Call API to subscribe events
 	ip_address := subeventlist.EventIpAddress
+	f := get_subscription_list(ip_address)
 	for _, event := range subeventlist.Events {
-                if _, ok := s.devicemap[ip_address].subscriptions[event]; !ok {
-                 rtn, id := add_subscription(ip_address, event)
-                        if rtn {
-                                 s.devicemap[ip_address].subscriptions[event] = id
-                                fmt.Println("subscription added", event, id)
-                        }
-                } else {
-                                log.WithFields(log.Fields{
-                                  "Event": event,
-                                }).Info("Already Subscribed")
-                }
+		if _, ok := s.devicemap[ip_address].Subscriptions[event]; !ok {
+			s.add_subscription(ip_address, event, f)
+		} else {
+			log.WithFields(log.Fields{
+				"Event": event,
+				}).Info("Already Subscribed")
+		}
 	}
+	if f != nil { f.Close() }
 	return &empty.Empty{}, nil
 }
 
 func (s *Server) UnSubsrcribeGivenEvents(c context.Context,  unsubeventlist *importer.EventList) (*empty.Empty, error) {
-       fmt.Println("Received UnSubsrcribeEvents\n")
-       ip_address := unsubeventlist.EventIpAddress
-       //Call API to unsubscribe events
-        for _, event := range unsubeventlist.Events {
-                if _, ok := s.devicemap[ip_address].subscriptions[event]; ok {
-                 rtn := remove_subscription(ip_address, s.devicemap[ip_address].subscriptions[event] )
-                        if rtn {
-                                delete(s.devicemap[ip_address].subscriptions, event)
-                                fmt.Println("subscription removed", event)
-                        }
+	fmt.Println("Received UnSubsrcribeEvents\n")
+	ip_address := unsubeventlist.EventIpAddress
+	//Call API to unsubscribe events
+	f := get_subscription_list(ip_address)
+	for _, event := range unsubeventlist.Events {
+		if _, ok := s.devicemap[ip_address].Subscriptions[event]; ok {
+			s.remove_subscription(ip_address, event, f)
                 } else {
                                 log.WithFields(log.Fields{
                                   "Event": event,
                                 }).Info("was not Subscribed")
                         }
         }
+	if f != nil { f.Close() }
 
        return &empty.Empty{}, nil
 }
 
 func (s *Server) collect_data(ip_address string) {
-	freqchan := s.devicemap[ip_address].freqchan
-	ticker := s.devicemap[ip_address].datacollector.getdata
-	donechan := s.devicemap[ip_address].datacollector.quit
+	freqchan := s.devicemap[ip_address].Freqchan
+	ticker := s.devicemap[ip_address].Datacollector.getdata
+	donechan := s.devicemap[ip_address].Datacollector.quit
 	for {
 		select {
 		case freq := <-freqchan:
@@ -139,11 +139,11 @@
 				if rtn {
 					for _, str := range data {
 						str = "Device IP: " + ip_address + " " + str
-						fmt.Println("collected data  %s ...", str)
+						fmt.Printf("collected data  %s\n ...", str)
 						b := []byte(str)
 						msg := &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
-						// check if needs to add for select case
 						select {
+						// TODO: this is blocking, maybe a timer?
 						case  s.dataproducer.Input() <- msg:
 							fmt.Println("Produce message")
 						}
@@ -160,42 +160,36 @@
 
 func (s *Server) SendDeviceInfo(c context.Context,  info *importer.DeviceInfo) (*empty.Empty, error) {
 	d := device {
-		subscriptions: make(map[string]uint),
-		datacollector: scheduler{
+		Subscriptions: make(map[string]string),
+		Freq: info.Frequency,
+		Datacollector: scheduler{
 			getdata: *time.NewTicker(time.Duration(info.Frequency) * time.Second),
 			quit: make(chan bool),
 		},
-		freqchan:        make(chan uint32),
-                vendor: info.Vendor,
-		protocol: info.Protocol,
+		Freqchan: make(chan uint32),
+		Vendor: info.Vendor,
+		Protocol: info.Protocol,
 	}
         //default_events := [...]string{}
 	s.devicemap[info.IpAddress] = &d
-	fmt.Println("size of devicemap %d", len(s.devicemap))
+	fmt.Printf("size of devicemap %d\n", len(s.devicemap))
 	ip_address:= info.IpAddress
-	fmt.Println("Configuring  %s ...", ip_address)
+	fmt.Printf("Configuring  %s\n", ip_address)
 	// call subscription function with info.IpAddress
-	http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
+
 	default_events := vendor_default_events[info.Vendor]
+
+	f := get_subscription_list(ip_address)
 	for _, event := range default_events {
-	if _, ok := s.devicemap[ip_address].subscriptions[event]; !ok {
-		rtn, id := add_subscription(info.Protocol+"://"+ip_address, event)
-		if rtn {
-			s.devicemap[ip_address].subscriptions[event] = id
-			fmt.Println("subscription added", event, id)
-			}
-                } else {
-                                log.WithFields(log.Fields{
-                                  "Event": event,
-                                }).Info("was  Subscribed")
-                        }
+		s.add_subscription(ip_address, event, f)
 	}
+	if f != nil { f.Close() }
 	go s.collect_data(ip_address)
 	return &empty.Empty{}, nil
 }
 
 func NewGrpcServer(grpcport string) (l net.Listener, g *grpc.Server, e error) {
-        fmt.Println("Listening %s ...", grpcport)
+        fmt.Printf("Listening %s\n", grpcport)
         g = grpc.NewServer()
         l, e = net.Listen("tcp", grpcport)
         return
@@ -262,6 +256,34 @@
 	http.ListenAndServeTLS(":8080", "https-server.crt", "https-server.key", nil)
 }
 
+func (s *Server) init_data_persistence() {
+	subscriptionListPath = pvmount + "/subscriptions"
+	if err := os.MkdirAll(subscriptionListPath, 0777); err != nil {
+		fmt.Println(err)
+	} else {
+		lists, err := ioutil.ReadDir(subscriptionListPath)
+		if err != nil {
+			fmt.Println(err)
+		} else {
+			for _, list := range lists {
+				b, err := ioutil.ReadFile(path.Join(subscriptionListPath, list.Name()))
+				if err != nil {
+					fmt.Println(err)
+				} else {
+					ip := list.Name()
+					d := device{}
+					json.Unmarshal(b, &d)
+					s.devicemap[ip] = &d
+					s.devicemap[ip].Datacollector.getdata = *time.NewTicker(time.Duration(s.devicemap[ip].Freq) * time.Second)
+					s.devicemap[ip].Datacollector.quit = make(chan bool)
+					s.devicemap[ip].Freqchan = make(chan uint32)
+					go s.collect_data(ip)
+				}
+			}
+		}
+	}
+}
+
 func init() {
         Formatter := new(log.TextFormatter)
         Formatter.TimestampFormat = "02-01-2006 15:04:05"
@@ -274,17 +296,40 @@
 	//sarama.Logger = log.New()
 }
 
+func get_subscription_list(ip string) *os.File {
+	if pvmount == "" {
+		return nil
+	}
+	f, err := os.OpenFile(subscriptionListPath + "/" + ip, os.O_CREATE|os.O_RDWR, 0664)
+	if err != nil {
+		fmt.Println(err)
+	}
+	return f
+}
 
 func main() {
 	fmt.Println("Starting Device-management Container")
+
+	http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
+	client := &http.Client{
+		Timeout: 10 * time.Second,
+		}
+
 	s := Server {
 		devicemap:	make(map[string]*device),
 		devicechan:	make(chan *importer.DeviceInfo),
+		httpclient:	client,
 	}
-	// check if we should keep this as go routines
-	go s.kafkaInit()
+
+	s.kafkaInit()
+//TODO: check if we should keep this as goroutines?
 	go s.runServer()
 	go s.startgrpcserver()
+
+	if pvmount != "" {
+		s.init_data_persistence()
+	}
+
 	quit := make(chan os.Signal)
 	signal.Notify(quit, os.Interrupt)