[SEBA-627]Importer testing demo test code
Change-Id: I28048ad021f0df8b112c2c6f52f6d3602741bf61
diff --git a/main.go b/main.go
index 4e7f52c..f6a6948 100644
--- a/main.go
+++ b/main.go
@@ -58,6 +58,7 @@
datacollector scheduler
freqchan chan uint32
vendor string
+ protocol string
}
type Server struct {
@@ -130,14 +131,22 @@
case freq := <-freqchan:
ticker.Stop()
ticker = *time.NewTicker(time.Duration(freq) * time.Second)
+ case err := <-s.dataproducer.Errors():
+ fmt.Println("Failed to produce message:", err)
case <-ticker.C:
for _, service := range redfish_services {
- rtn, data := get_status(ip_address, service)
+ rtn, data := s.get_status(ip_address, service)
if rtn {
for _, str := range data {
str = "Device IP: " + ip_address + " " + str
+ fmt.Println("collected data %s ...", str)
b := []byte(str)
- s.dataproducer.Input() <- &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
+ msg := &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
+ // check if needs to add for select case
+ select {
+ case s.dataproducer.Input() <- msg:
+ fmt.Println("Produce message")
+ }
}
}
}
@@ -158,20 +167,28 @@
},
freqchan: make(chan uint32),
vendor: info.Vendor,
+ protocol: info.Protocol,
}
//default_events := [...]string{}
s.devicemap[info.IpAddress] = &d
+ fmt.Println("size of devicemap %d", len(s.devicemap))
ip_address:= info.IpAddress
fmt.Println("Configuring %s ...", ip_address)
// call subscription function with info.IpAddress
http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
default_events := vendor_default_events[info.Vendor]
for _, event := range default_events {
- rtn, id := add_subscription(ip_address, event)
+ if _, ok := s.devicemap[ip_address].subscriptions[event]; !ok {
+ rtn, id := add_subscription(info.Protocol+"://"+ip_address, event)
if rtn {
s.devicemap[ip_address].subscriptions[event] = id
fmt.Println("subscription added", event, id)
- }
+ }
+ } else {
+ log.WithFields(log.Fields{
+ "Event": event,
+ }).Info("was Subscribed")
+ }
}
go s.collect_data(ip_address)
return &empty.Empty{}, nil
@@ -200,52 +217,49 @@
return nil
}
+func (s *Server) kafkaCloseProducer(){
+ if err :=s.dataproducer.Close(); err != nil {
+ panic(err)
+ }
+
+}
func (s *Server) kafkaInit() {
fmt.Println("Starting kafka init to Connect to broker: ")
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 10
- config.Producer.Return.Successes = true
producer, err := sarama.NewAsyncProducer([]string{"cord-kafka.default.svc.cluster.local:9092"}, config)
if err != nil {
panic(err)
}
s.dataproducer = producer
- defer func() {
- if err := producer.Close(); err != nil {
- panic(err)
- }
- }()
}
func (s *Server) handle_events(w http.ResponseWriter, r *http.Request) {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
+ fmt.Println(" IN Handle Event ")
if(r.Method == "POST"){
Body, err := ioutil.ReadAll(r.Body)
if err != nil {
fmt.Println("Error getting HTTP data",err)
}
defer r.Body.Close()
+ fmt.Println("Received Event Message ")
fmt.Printf("%s\n",Body)
message :=&sarama.ProducerMessage{
Topic: importerTopic,
Value: sarama.StringEncoder(Body),
}
- select {
- case s.dataproducer.Input() <- message:
-
- case <-signals:
- s.dataproducer.AsyncClose() // Trigger a shutdown of the producer.
- }
+ s.dataproducer.Input() <- message
}
}
func (s *Server) runServer() {
fmt.Println("Starting HTTP Server")
http.HandleFunc("/", s.handle_events)
- http.ListenAndServe(":8080", nil)
+ http.ListenAndServeTLS(":8080", "https-server.crt", "https-server.key", nil)
}
func init() {
@@ -257,6 +271,7 @@
fmt.Println("Listening to http server")
log.Info("log Connecting to broker:")
log.Info("log Listening to http server ")
+ //sarama.Logger = log.New()
}
@@ -266,6 +281,7 @@
devicemap: make(map[string]*device),
devicechan: make(chan *importer.DeviceInfo),
}
+ // check if we should keep this as go routines
go s.kafkaInit()
go s.runServer()
go s.startgrpcserver()
@@ -275,5 +291,6 @@
select {
case sig := <-quit:
fmt.Println("Shutting down:", sig)
+ s.kafkaCloseProducer()
}
}