[SEBA-751} Add API for subscribe/unsubscribe and set frequency APIs

Change-Id: If224851b143dcb335918cbc06d66e5fdfd99bab1
diff --git a/main.go b/main.go
index 6394ba7..771039e 100644
--- a/main.go
+++ b/main.go
@@ -48,34 +48,75 @@
 	gRPCserver   *grpc.Server
 	dataproducer sarama.AsyncProducer
 	devicechan   chan *importer.DeviceInfo
+	freqchan   chan uint32
 }
 
+
+func (s *Server) GetEventList(c context.Context, info *importer.DeviceInfo) (*importer.EventList, error) {
+        fmt.Println("Received GetEventList\n")
+       eventstobesubscribed:= new(importer.EventList)
+       eventstobesubscribed.EventIpAddress = info.IpAddress
+       eventstobesubscribed.Events = append(eventstobesubscribed.Events,"ResourceAdded","ResourceRemoved","Alert")
+       return eventstobesubscribed, nil
+}
+
+func (s *Server) SetFrequency(c context.Context,  info *importer.DeviceInfo) (*empty.Empty, error) {
+        fmt.Println("Received SetFrequency\n")
+       device := s.devicemap[info.IpAddress]
+       device.freq = info.Frequency
+       //Inform scheduler frquency has changed
+       s.freqchan <- info.Frequency
+       return &empty.Empty{}, nil
+}
+
+func (s *Server) SubsrcribeGivenEvents(c context.Context,  subeventlist *importer.EventList) (*empty.Empty, error) {
+        fmt.Println("Received SubsrcribeEvents\n")
+       //Call API to subscribe events
+       ip_address := subeventlist.EventIpAddress
+        for _, event := range subeventlist.Events {
+                rtn, id := add_subscription(ip_address, event)
+                if rtn {
+                        s.devicemap[ip_address].subscriptions[event] = id
+                        fmt.Println("subscription added", event, id)
+                }
+        }
+       return &empty.Empty{}, nil
+}
+
+func (s *Server) UnSubsrcribeGivenEvents(c context.Context,  unsubeventlist *importer.EventList) (*empty.Empty, error) {
+       fmt.Println("Received UnSubsrcribeEvents\n")
+       ip_address := unsubeventlist.EventIpAddress
+       //Call API to unsubscribe events
+        for _, event := range unsubeventlist.Events {
+                rtn := remove_subscription(ip_address, s.devicemap[ip_address].subscriptions[event] )
+                if rtn {
+                        delete(s.devicemap[ip_address].subscriptions, event)
+                        fmt.Println("subscription removed", event)
+                }
+        }
+
+       return &empty.Empty{}, nil
+}
+
+
 func (s *Server) SendDeviceInfo(c context.Context,  info *importer.DeviceInfo) (*empty.Empty, error) {
 	d := device {
 		subscriptions: make(map[string]uint),
 		freq:	info.Frequency,
 	}
 	s.devicemap[info.IpAddress] = &d
-	s.devicechan <- info
-	return &empty.Empty{}, nil
-}
-func(s *Server) subscribeevents() {
+	ip_address:= info.IpAddress
+	fmt.Println("Configuring  %s ...", ip_address)
+	// call subscription function with info.IpAddress
 	http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
-	for {
-		select {
-		case info:= <-s.devicechan:
-			ip_address:= info.IpAddress
-			fmt.Println("Configuring  %s ...", ip_address)
-			// call subscription function with info.IpAddress
-			for _, event := range default_events {
-				rtn, id := add_subscription(ip_address, event)
-				if rtn {
-					s.devicemap[ip_address].subscriptions[event] = id
-					fmt.Println("subscription added", event, id)
-				}
-			}
+	for _, event := range default_events {
+		rtn, id := add_subscription(ip_address, event)
+		if rtn {
+			s.devicemap[ip_address].subscriptions[event] = id
+			fmt.Println("subscription added", event, id)
 		}
 	}
+	return &empty.Empty{}, nil
 }
 
 func NewGrpcServer(grpcport string) (l net.Listener, g *grpc.Server, e error) {
@@ -160,11 +201,11 @@
 	s := Server {
 		devicemap:	make(map[string]*device),
 		devicechan:	make(chan *importer.DeviceInfo),
+		freqchan:        make(chan uint32),
 	}
 	go s.kafkaInit()
 	go s.runServer()
 	go s.startgrpcserver()
-	go s.subscribeevents()
 	quit := make(chan os.Signal)
 	signal.Notify(quit, os.Interrupt)
 
diff --git a/proto/importer.proto b/proto/importer.proto
index d2cab50..6a582a2 100644
--- a/proto/importer.proto
+++ b/proto/importer.proto
@@ -12,10 +12,23 @@
     uint32 frequency = 2;
 }
 
+message EventList {
+       string event_ip_address = 1;
+        repeated string events = 2;
+}
+
 
 service device_management {
-    rpc SendDeviceInfo(DeviceInfo)
-        returns (google.protobuf.Empty) {}
+    rpc SendDeviceInfo(DeviceInfo) returns (google.protobuf.Empty) {}
+
+    rpc SetFrequency(DeviceInfo) returns (google.protobuf.Empty) {}
+
+    rpc GetEventList(DeviceInfo) returns (EventList) {}
+
+    rpc SubsrcribeGivenEvents(EventList)  returns (google.protobuf.Empty) {}
+
+    rpc UnSubsrcribeGivenEvents(EventList) returns (google.protobuf.Empty) {}
 }
 
 
+