[SEBA-751} Add API for subscribe/unsubscribe and set frequency APIs
Change-Id: If224851b143dcb335918cbc06d66e5fdfd99bab1
diff --git a/main.go b/main.go
index 6394ba7..771039e 100644
--- a/main.go
+++ b/main.go
@@ -48,34 +48,75 @@
gRPCserver *grpc.Server
dataproducer sarama.AsyncProducer
devicechan chan *importer.DeviceInfo
+ freqchan chan uint32
}
+
+func (s *Server) GetEventList(c context.Context, info *importer.DeviceInfo) (*importer.EventList, error) {
+ fmt.Println("Received GetEventList\n")
+ eventstobesubscribed:= new(importer.EventList)
+ eventstobesubscribed.EventIpAddress = info.IpAddress
+ eventstobesubscribed.Events = append(eventstobesubscribed.Events,"ResourceAdded","ResourceRemoved","Alert")
+ return eventstobesubscribed, nil
+}
+
+func (s *Server) SetFrequency(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
+ fmt.Println("Received SetFrequency\n")
+ device := s.devicemap[info.IpAddress]
+ device.freq = info.Frequency
+ //Inform scheduler frquency has changed
+ s.freqchan <- info.Frequency
+ return &empty.Empty{}, nil
+}
+
+func (s *Server) SubsrcribeGivenEvents(c context.Context, subeventlist *importer.EventList) (*empty.Empty, error) {
+ fmt.Println("Received SubsrcribeEvents\n")
+ //Call API to subscribe events
+ ip_address := subeventlist.EventIpAddress
+ for _, event := range subeventlist.Events {
+ rtn, id := add_subscription(ip_address, event)
+ if rtn {
+ s.devicemap[ip_address].subscriptions[event] = id
+ fmt.Println("subscription added", event, id)
+ }
+ }
+ return &empty.Empty{}, nil
+}
+
+func (s *Server) UnSubsrcribeGivenEvents(c context.Context, unsubeventlist *importer.EventList) (*empty.Empty, error) {
+ fmt.Println("Received UnSubsrcribeEvents\n")
+ ip_address := unsubeventlist.EventIpAddress
+ //Call API to unsubscribe events
+ for _, event := range unsubeventlist.Events {
+ rtn := remove_subscription(ip_address, s.devicemap[ip_address].subscriptions[event] )
+ if rtn {
+ delete(s.devicemap[ip_address].subscriptions, event)
+ fmt.Println("subscription removed", event)
+ }
+ }
+
+ return &empty.Empty{}, nil
+}
+
+
func (s *Server) SendDeviceInfo(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
d := device {
subscriptions: make(map[string]uint),
freq: info.Frequency,
}
s.devicemap[info.IpAddress] = &d
- s.devicechan <- info
- return &empty.Empty{}, nil
-}
-func(s *Server) subscribeevents() {
+ ip_address:= info.IpAddress
+ fmt.Println("Configuring %s ...", ip_address)
+ // call subscription function with info.IpAddress
http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
- for {
- select {
- case info:= <-s.devicechan:
- ip_address:= info.IpAddress
- fmt.Println("Configuring %s ...", ip_address)
- // call subscription function with info.IpAddress
- for _, event := range default_events {
- rtn, id := add_subscription(ip_address, event)
- if rtn {
- s.devicemap[ip_address].subscriptions[event] = id
- fmt.Println("subscription added", event, id)
- }
- }
+ for _, event := range default_events {
+ rtn, id := add_subscription(ip_address, event)
+ if rtn {
+ s.devicemap[ip_address].subscriptions[event] = id
+ fmt.Println("subscription added", event, id)
}
}
+ return &empty.Empty{}, nil
}
func NewGrpcServer(grpcport string) (l net.Listener, g *grpc.Server, e error) {
@@ -160,11 +201,11 @@
s := Server {
devicemap: make(map[string]*device),
devicechan: make(chan *importer.DeviceInfo),
+ freqchan: make(chan uint32),
}
go s.kafkaInit()
go s.runServer()
go s.startgrpcserver()
- go s.subscribeevents()
quit := make(chan os.Signal)
signal.Notify(quit, os.Interrupt)