SEBA-761 implemented persistent storage using local directory to save and restore device information across pod deployments / read pv mount path from an env variable
Change-Id: Ibb7c235a464d2f4388232d393a6ad836c2d45d73
diff --git a/main.go b/main.go
index f6a6948..c0d7346 100644
--- a/main.go
+++ b/main.go
@@ -29,6 +29,8 @@
importer "./proto"
log "github.com/Sirupsen/logrus"
"time"
+ "encoding/json"
+ "path"
)
//globals
@@ -37,16 +39,16 @@
var (
importerTopic = "importer"
-
)
var DataProducer sarama.AsyncProducer
-
var vendor_default_events = map[string][]string{
"edgecore": {"ResourceAdded","ResourceRemoved","Alert"},
}
var redfish_services = [...]string{"/Chassis", "/Systems","/EthernetSwitches"}
+var pvmount = os.Getenv("DEVICE_MANAGEMENT_PVMOUNT")
+var subscriptionListPath string
type scheduler struct {
getdata time.Ticker
@@ -54,18 +56,20 @@
}
type device struct {
- subscriptions map[string]uint
- datacollector scheduler
- freqchan chan uint32
- vendor string
- protocol string
+ Subscriptions map[string]string `json:"ss"`
+ Freq uint32 `json:"freq"`
+ Datacollector scheduler `json:"-"`
+ Freqchan chan uint32 `json:"-"`
+ Vendor string `json:"vendor"`
+ Protocol string `json:"protocol"`
}
type Server struct {
- devicemap map[string]*device
- gRPCserver *grpc.Server
- dataproducer sarama.AsyncProducer
- devicechan chan *importer.DeviceInfo
+ devicemap map[string]*device
+ gRPCserver *grpc.Server
+ dataproducer sarama.AsyncProducer
+ httpclient *http.Client
+ devicechan chan *importer.DeviceInfo
}
func (s *Server) GetEventList(c context.Context, info *importer.DeviceInfo) (*importer.SupportedEventList, error) {
@@ -77,7 +81,7 @@
func (s *Server) SetFrequency(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
fmt.Println("Received SetFrequency")
- s.devicemap[info.IpAddress].freqchan <- info.Frequency
+ s.devicemap[info.IpAddress].Freqchan <- info.Frequency
return &empty.Empty{}, nil
}
@@ -85,47 +89,43 @@
fmt.Println("Received SubsrcribeEvents\n")
//Call API to subscribe events
ip_address := subeventlist.EventIpAddress
+ f := get_subscription_list(ip_address)
for _, event := range subeventlist.Events {
- if _, ok := s.devicemap[ip_address].subscriptions[event]; !ok {
- rtn, id := add_subscription(ip_address, event)
- if rtn {
- s.devicemap[ip_address].subscriptions[event] = id
- fmt.Println("subscription added", event, id)
- }
- } else {
- log.WithFields(log.Fields{
- "Event": event,
- }).Info("Already Subscribed")
- }
+ if _, ok := s.devicemap[ip_address].Subscriptions[event]; !ok {
+ s.add_subscription(ip_address, event, f)
+ } else {
+ log.WithFields(log.Fields{
+ "Event": event,
+ }).Info("Already Subscribed")
+ }
}
+ if f != nil { f.Close() }
return &empty.Empty{}, nil
}
func (s *Server) UnSubsrcribeGivenEvents(c context.Context, unsubeventlist *importer.EventList) (*empty.Empty, error) {
- fmt.Println("Received UnSubsrcribeEvents\n")
- ip_address := unsubeventlist.EventIpAddress
- //Call API to unsubscribe events
- for _, event := range unsubeventlist.Events {
- if _, ok := s.devicemap[ip_address].subscriptions[event]; ok {
- rtn := remove_subscription(ip_address, s.devicemap[ip_address].subscriptions[event] )
- if rtn {
- delete(s.devicemap[ip_address].subscriptions, event)
- fmt.Println("subscription removed", event)
- }
+ fmt.Println("Received UnSubsrcribeEvents\n")
+ ip_address := unsubeventlist.EventIpAddress
+ //Call API to unsubscribe events
+ f := get_subscription_list(ip_address)
+ for _, event := range unsubeventlist.Events {
+ if _, ok := s.devicemap[ip_address].Subscriptions[event]; ok {
+ s.remove_subscription(ip_address, event, f)
} else {
log.WithFields(log.Fields{
"Event": event,
}).Info("was not Subscribed")
}
}
+ if f != nil { f.Close() }
return &empty.Empty{}, nil
}
func (s *Server) collect_data(ip_address string) {
- freqchan := s.devicemap[ip_address].freqchan
- ticker := s.devicemap[ip_address].datacollector.getdata
- donechan := s.devicemap[ip_address].datacollector.quit
+ freqchan := s.devicemap[ip_address].Freqchan
+ ticker := s.devicemap[ip_address].Datacollector.getdata
+ donechan := s.devicemap[ip_address].Datacollector.quit
for {
select {
case freq := <-freqchan:
@@ -139,11 +139,11 @@
if rtn {
for _, str := range data {
str = "Device IP: " + ip_address + " " + str
- fmt.Println("collected data %s ...", str)
+ fmt.Printf("collected data %s\n ...", str)
b := []byte(str)
msg := &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
- // check if needs to add for select case
select {
+ // TODO: this is blocking, maybe a timer?
case s.dataproducer.Input() <- msg:
fmt.Println("Produce message")
}
@@ -160,42 +160,36 @@
func (s *Server) SendDeviceInfo(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
d := device {
- subscriptions: make(map[string]uint),
- datacollector: scheduler{
+ Subscriptions: make(map[string]string),
+ Freq: info.Frequency,
+ Datacollector: scheduler{
getdata: *time.NewTicker(time.Duration(info.Frequency) * time.Second),
quit: make(chan bool),
},
- freqchan: make(chan uint32),
- vendor: info.Vendor,
- protocol: info.Protocol,
+ Freqchan: make(chan uint32),
+ Vendor: info.Vendor,
+ Protocol: info.Protocol,
}
//default_events := [...]string{}
s.devicemap[info.IpAddress] = &d
- fmt.Println("size of devicemap %d", len(s.devicemap))
+ fmt.Printf("size of devicemap %d\n", len(s.devicemap))
ip_address:= info.IpAddress
- fmt.Println("Configuring %s ...", ip_address)
+ fmt.Printf("Configuring %s\n", ip_address)
// call subscription function with info.IpAddress
- http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
+
default_events := vendor_default_events[info.Vendor]
+
+ f := get_subscription_list(ip_address)
for _, event := range default_events {
- if _, ok := s.devicemap[ip_address].subscriptions[event]; !ok {
- rtn, id := add_subscription(info.Protocol+"://"+ip_address, event)
- if rtn {
- s.devicemap[ip_address].subscriptions[event] = id
- fmt.Println("subscription added", event, id)
- }
- } else {
- log.WithFields(log.Fields{
- "Event": event,
- }).Info("was Subscribed")
- }
+ s.add_subscription(ip_address, event, f)
}
+ if f != nil { f.Close() }
go s.collect_data(ip_address)
return &empty.Empty{}, nil
}
func NewGrpcServer(grpcport string) (l net.Listener, g *grpc.Server, e error) {
- fmt.Println("Listening %s ...", grpcport)
+ fmt.Printf("Listening %s\n", grpcport)
g = grpc.NewServer()
l, e = net.Listen("tcp", grpcport)
return
@@ -262,6 +256,34 @@
http.ListenAndServeTLS(":8080", "https-server.crt", "https-server.key", nil)
}
+func (s *Server) init_data_persistence() {
+ subscriptionListPath = pvmount + "/subscriptions"
+ if err := os.MkdirAll(subscriptionListPath, 0777); err != nil {
+ fmt.Println(err)
+ } else {
+ lists, err := ioutil.ReadDir(subscriptionListPath)
+ if err != nil {
+ fmt.Println(err)
+ } else {
+ for _, list := range lists {
+ b, err := ioutil.ReadFile(path.Join(subscriptionListPath, list.Name()))
+ if err != nil {
+ fmt.Println(err)
+ } else {
+ ip := list.Name()
+ d := device{}
+ json.Unmarshal(b, &d)
+ s.devicemap[ip] = &d
+ s.devicemap[ip].Datacollector.getdata = *time.NewTicker(time.Duration(s.devicemap[ip].Freq) * time.Second)
+ s.devicemap[ip].Datacollector.quit = make(chan bool)
+ s.devicemap[ip].Freqchan = make(chan uint32)
+ go s.collect_data(ip)
+ }
+ }
+ }
+ }
+}
+
func init() {
Formatter := new(log.TextFormatter)
Formatter.TimestampFormat = "02-01-2006 15:04:05"
@@ -274,17 +296,40 @@
//sarama.Logger = log.New()
}
+func get_subscription_list(ip string) *os.File {
+ if pvmount == "" {
+ return nil
+ }
+ f, err := os.OpenFile(subscriptionListPath + "/" + ip, os.O_CREATE|os.O_RDWR, 0664)
+ if err != nil {
+ fmt.Println(err)
+ }
+ return f
+}
func main() {
fmt.Println("Starting Device-management Container")
+
+ http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
+ client := &http.Client{
+ Timeout: 10 * time.Second,
+ }
+
s := Server {
devicemap: make(map[string]*device),
devicechan: make(chan *importer.DeviceInfo),
+ httpclient: client,
}
- // check if we should keep this as go routines
- go s.kafkaInit()
+
+ s.kafkaInit()
+//TODO: check if we should keep this as goroutines?
go s.runServer()
go s.startgrpcserver()
+
+ if pvmount != "" {
+ s.init_data_persistence()
+ }
+
quit := make(chan os.Signal)
signal.Notify(quit, os.Interrupt)