[SEBA-627]Importer testing demo test code

Change-Id: I28048ad021f0df8b112c2c6f52f6d3602741bf61
diff --git a/main.go b/main.go
index 4e7f52c..f6a6948 100644
--- a/main.go
+++ b/main.go
@@ -58,6 +58,7 @@
 	datacollector scheduler
 	freqchan   chan uint32
         vendor string
+	protocol string
 }
 
 type Server struct {
@@ -130,14 +131,22 @@
 		case freq := <-freqchan:
 			ticker.Stop()
 			ticker = *time.NewTicker(time.Duration(freq) * time.Second)
+		case  err := <-s.dataproducer.Errors():
+			fmt.Println("Failed to produce message:", err)
 		case <-ticker.C:
 			for _, service := range redfish_services {
-				rtn, data := get_status(ip_address, service)
+				rtn, data := s.get_status(ip_address, service)
 				if rtn {
 					for _, str := range data {
 						str = "Device IP: " + ip_address + " " + str
+						fmt.Println("collected data  %s ...", str)
 						b := []byte(str)
-						s.dataproducer.Input() <- &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
+						msg := &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
+						// check if needs to add for select case
+						select {
+						case  s.dataproducer.Input() <- msg:
+							fmt.Println("Produce message")
+						}
 					}
 				}
 			}
@@ -158,20 +167,28 @@
 		},
 		freqchan:        make(chan uint32),
                 vendor: info.Vendor,
+		protocol: info.Protocol,
 	}
         //default_events := [...]string{}
 	s.devicemap[info.IpAddress] = &d
+	fmt.Println("size of devicemap %d", len(s.devicemap))
 	ip_address:= info.IpAddress
 	fmt.Println("Configuring  %s ...", ip_address)
 	// call subscription function with info.IpAddress
 	http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
 	default_events := vendor_default_events[info.Vendor]
 	for _, event := range default_events {
-		rtn, id := add_subscription(ip_address, event)
+	if _, ok := s.devicemap[ip_address].subscriptions[event]; !ok {
+		rtn, id := add_subscription(info.Protocol+"://"+ip_address, event)
 		if rtn {
 			s.devicemap[ip_address].subscriptions[event] = id
 			fmt.Println("subscription added", event, id)
-		}
+			}
+                } else {
+                                log.WithFields(log.Fields{
+                                  "Event": event,
+                                }).Info("was  Subscribed")
+                        }
 	}
 	go s.collect_data(ip_address)
 	return &empty.Empty{}, nil
@@ -200,52 +217,49 @@
 	return nil
 
 }
+func (s *Server) kafkaCloseProducer(){
+	if err :=s.dataproducer.Close(); err != nil {
+		panic(err)
+	}
+
+}
 func (s *Server) kafkaInit() {
 	fmt.Println("Starting kafka init to Connect to broker: ")
 	config := sarama.NewConfig()
         config.Producer.RequiredAcks = sarama.WaitForAll
 	config.Producer.Retry.Max = 10
-	config.Producer.Return.Successes = true
 	producer, err := sarama.NewAsyncProducer([]string{"cord-kafka.default.svc.cluster.local:9092"}, config)
 	if err != nil {
 		panic(err)
 	}
 	s.dataproducer = producer
-	defer func() {
-		if err := producer.Close(); err != nil {
-			panic(err)
-		}
-	}()
 }
 
 func (s *Server) handle_events(w http.ResponseWriter, r *http.Request) {
 	signals := make(chan os.Signal, 1)
 	signal.Notify(signals, os.Interrupt)
 
+        fmt.Println(" IN Handle Event  ")
 	if(r.Method ==  "POST"){
 		Body, err := ioutil.ReadAll(r.Body)
 		if err != nil {
 			fmt.Println("Error getting HTTP data",err)
 		}
 		defer  r.Body.Close()
+		fmt.Println("Received Event Message ")
 		fmt.Printf("%s\n",Body)
 		message :=&sarama.ProducerMessage{
                         Topic: importerTopic,
                         Value: sarama.StringEncoder(Body),
                 }
-		select {
-		case s.dataproducer.Input() <- message:
-
-		case <-signals:
-	        s.dataproducer.AsyncClose() // Trigger a shutdown of the producer.
-		}
+		s.dataproducer.Input() <- message
 	}
 }
 
 func (s *Server) runServer() {
 	fmt.Println("Starting HTTP Server")
 	http.HandleFunc("/", s.handle_events)
-	http.ListenAndServe(":8080", nil)
+	http.ListenAndServeTLS(":8080", "https-server.crt", "https-server.key", nil)
 }
 
 func init() {
@@ -257,6 +271,7 @@
 	fmt.Println("Listening to  http server")
         log.Info("log Connecting to broker:")
         log.Info("log Listening to  http server ")
+	//sarama.Logger = log.New()
 }
 
 
@@ -266,6 +281,7 @@
 		devicemap:	make(map[string]*device),
 		devicechan:	make(chan *importer.DeviceInfo),
 	}
+	// check if we should keep this as go routines
 	go s.kafkaInit()
 	go s.runServer()
 	go s.startgrpcserver()
@@ -275,5 +291,6 @@
 	select {
 	case sig := <-quit:
 		fmt.Println("Shutting down:", sig)
+		s.kafkaCloseProducer()
 	}
 }