blob: bc0e3ba394981fe21977d2bef79dd3a91a4abc43 [file] [log] [blame]
Dinesh Belwalkar01217962019-05-23 21:51:16 +00001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package main
16
17import (
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000018 importer "./proto"
19 "crypto/tls"
20 "encoding/json"
Dinesh Belwalkar01217962019-05-23 21:51:16 +000021 "fmt"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000022 "github.com/Shopify/sarama"
23 log "github.com/Sirupsen/logrus"
24 empty "github.com/golang/protobuf/ptypes/empty"
25 "golang.org/x/net/context"
26 "google.golang.org/grpc"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
29 "io/ioutil"
Dinesh Belwalkar41229602019-06-21 16:58:06 +000030 "net"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000031 "net/http"
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +000032 "os"
33 "os/signal"
mc862dad02019-08-06 20:52:51 +000034 "path"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000035 "time"
Dinesh Belwalkar01217962019-05-23 21:51:16 +000036)
37
mce7028402019-07-18 04:10:01 +000038//globals
39const REDFISH_ROOT = "/redfish/v1"
40const CONTENT_TYPE = "application/json"
41
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000042var (
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000043 importerTopic = "importer"
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +000044)
45
46var DataProducer sarama.AsyncProducer
47
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000048var vendor_default_events = map[string][]string{
49 "edgecore": {"ResourceAdded", "ResourceRemoved", "Alert"},
50}
51var redfish_services = [...]string{"/Chassis", "/Systems", "/EthernetSwitches"}
mc862dad02019-08-06 20:52:51 +000052var pvmount = os.Getenv("DEVICE_MANAGEMENT_PVMOUNT")
53var subscriptionListPath string
mce7028402019-07-18 04:10:01 +000054
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000055type scheduler struct {
56 getdata *time.Ticker
57 quit chan bool
mce7028402019-07-18 04:10:01 +000058}
59
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000060type device struct {
61 Subscriptions map[string]string `json:"ss"`
62 Freq uint32 `json:"freq"`
63 Datacollector scheduler `json:"-"`
64 Freqchan chan uint32 `json:"-"`
65 Vendor string `json:"vendor"`
66 Protocol string `json:"protocol"`
Dinesh Belwalkar41229602019-06-21 16:58:06 +000067}
68
69type Server struct {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000070 devicemap map[string]*device
71 gRPCserver *grpc.Server
72 dataproducer sarama.AsyncProducer
73 httpclient *http.Client
74 devicechan chan *importer.DeviceInfo
Dinesh Belwalkar41229602019-06-21 16:58:06 +000075}
76
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000077func (s *Server) ClearCurrentEventList(c context.Context, info *importer.Device) (*empty.Empty, error) {
78 fmt.Println("Received GetCurrentEventList\n")
79 ip_address := info.IpAddress
80 _, found := s.devicemap[ip_address]
81 if !found {
82 return nil, status.Errorf(codes.NotFound, "Device not registered")
83 }
84 f := get_subscription_list(ip_address)
85 for event, _ := range s.devicemap[ip_address].Subscriptions {
86 rtn := s.remove_subscription(ip_address, event, f)
87 if !rtn {
88 log.WithFields(log.Fields{
89 "Event": event,
90 }).Info("Error removing event")
91 }
92 }
93 if f != nil {
94 f.Close()
95 }
96 return &empty.Empty{}, nil
97}
98
99func (s *Server) GetCurrentEventList(c context.Context, info *importer.Device) (*importer.EventList, error) {
100 fmt.Println("Received ClearCurrentEventList\n")
101 _, found := s.devicemap[info.IpAddress]
102 if !found {
103 return nil, status.Errorf(codes.NotFound, "Device not registered")
104 }
105 currentevents := new(importer.EventList)
106 for event, _ := range s.devicemap[info.IpAddress].Subscriptions {
107 currentevents.Events = append(currentevents.Events, event)
108 }
109 return currentevents, nil
110}
111
112func (s *Server) GetEventList(c context.Context, info *importer.VendorInfo) (*importer.EventList, error) {
mce7028402019-07-18 04:10:01 +0000113 fmt.Println("Received GetEventList\n")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000114 _, found := vendor_default_events[info.Vendor]
115 if !found {
116 return nil, status.Errorf(codes.NotFound, "Invalid Vendor Provided")
117 }
118 eventstobesubscribed := new(importer.EventList)
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000119 eventstobesubscribed.Events = vendor_default_events[info.Vendor]
mce7028402019-07-18 04:10:01 +0000120 return eventstobesubscribed, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000121}
122
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000123func (s *Server) SetFrequency(c context.Context, info *importer.FreqInfo) (*empty.Empty, error) {
mce7028402019-07-18 04:10:01 +0000124 fmt.Println("Received SetFrequency")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000125 _, found := s.devicemap[info.IpAddress]
126 if !found {
127 return nil, status.Errorf(codes.NotFound, "Device not registered")
128 }
129
mc862dad02019-08-06 20:52:51 +0000130 s.devicemap[info.IpAddress].Freqchan <- info.Frequency
mce7028402019-07-18 04:10:01 +0000131 return &empty.Empty{}, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000132}
133
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000134func (s *Server) SubsrcribeGivenEvents(c context.Context, subeventlist *importer.GivenEventList) (*empty.Empty, error) {
mce7028402019-07-18 04:10:01 +0000135 fmt.Println("Received SubsrcribeEvents\n")
136 //Call API to subscribe events
137 ip_address := subeventlist.EventIpAddress
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000138 _, found := s.devicemap[ip_address]
139 if !found {
140 return nil, status.Errorf(codes.NotFound, "Device not registered")
141 }
142 if len(subeventlist.Events) <= 0 {
143 return nil, status.Errorf(codes.InvalidArgument, "Event list is empty")
144 }
mc862dad02019-08-06 20:52:51 +0000145 f := get_subscription_list(ip_address)
mce7028402019-07-18 04:10:01 +0000146 for _, event := range subeventlist.Events {
mc862dad02019-08-06 20:52:51 +0000147 if _, ok := s.devicemap[ip_address].Subscriptions[event]; !ok {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000148 rtn := s.add_subscription(ip_address, event, f)
149 if !rtn {
150 log.WithFields(log.Fields{
151 "Event": event,
152 }).Info("Error adding event")
153 }
mc862dad02019-08-06 20:52:51 +0000154 } else {
155 log.WithFields(log.Fields{
156 "Event": event,
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000157 }).Info("Already Subscribed")
mc862dad02019-08-06 20:52:51 +0000158 }
mce7028402019-07-18 04:10:01 +0000159 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000160 if f != nil {
161 f.Close()
162 }
mce7028402019-07-18 04:10:01 +0000163 return &empty.Empty{}, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000164}
165
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000166func (s *Server) UnSubsrcribeGivenEvents(c context.Context, unsubeventlist *importer.GivenEventList) (*empty.Empty, error) {
mc862dad02019-08-06 20:52:51 +0000167 fmt.Println("Received UnSubsrcribeEvents\n")
168 ip_address := unsubeventlist.EventIpAddress
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000169 _, found := s.devicemap[ip_address]
170 if !found {
171 return nil, status.Errorf(codes.NotFound, "Device not registered")
172 }
173
174 if len(unsubeventlist.Events) <= 0 {
175 return nil, status.Errorf(codes.InvalidArgument, "Event list is empty")
176 }
mc862dad02019-08-06 20:52:51 +0000177 //Call API to unsubscribe events
178 f := get_subscription_list(ip_address)
179 for _, event := range unsubeventlist.Events {
180 if _, ok := s.devicemap[ip_address].Subscriptions[event]; ok {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000181 rtn := s.remove_subscription(ip_address, event, f)
182 if !rtn {
183 log.WithFields(log.Fields{
184 "Event": event,
185 }).Info("Error removing event")
186 }
187 } else {
188 log.WithFields(log.Fields{
189 "Event": event,
190 }).Info("was not Subscribed")
191 }
192 }
193 if f != nil {
194 f.Close()
195 }
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000196
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000197 return &empty.Empty{}, nil
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000198}
199
mce7028402019-07-18 04:10:01 +0000200func (s *Server) collect_data(ip_address string) {
mc862dad02019-08-06 20:52:51 +0000201 freqchan := s.devicemap[ip_address].Freqchan
202 ticker := s.devicemap[ip_address].Datacollector.getdata
203 donechan := s.devicemap[ip_address].Datacollector.quit
mce7028402019-07-18 04:10:01 +0000204 for {
205 select {
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000206 case freq := <-freqchan:
mce7028402019-07-18 04:10:01 +0000207 ticker.Stop()
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000208 if freq > 0 {
209 ticker = time.NewTicker(time.Duration(freq) * time.Second)
210 }
211 case err := <-s.dataproducer.Errors():
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000212 fmt.Println("Failed to produce message:", err)
mce7028402019-07-18 04:10:01 +0000213 case <-ticker.C:
214 for _, service := range redfish_services {
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000215 rtn, data := s.get_status(ip_address, service)
mce7028402019-07-18 04:10:01 +0000216 if rtn {
217 for _, str := range data {
218 str = "Device IP: " + ip_address + " " + str
mc862dad02019-08-06 20:52:51 +0000219 fmt.Printf("collected data %s\n ...", str)
mce7028402019-07-18 04:10:01 +0000220 b := []byte(str)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000221 msg := &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000222 select {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000223 case s.dataproducer.Input() <- msg:
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000224 fmt.Println("Produce message")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000225 default:
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000226 }
mce7028402019-07-18 04:10:01 +0000227 }
228 }
229 }
230 case <-donechan:
231 ticker.Stop()
232 fmt.Println("getdata ticker stopped")
233 return
234 }
235 }
236}
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000237
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000238func (s *Server) SendDeviceInfo(c context.Context, info *importer.DeviceInfo) (*empty.Empty, error) {
239 d := device{
mc862dad02019-08-06 20:52:51 +0000240 Subscriptions: make(map[string]string),
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000241 Freq: info.Frequency,
mc862dad02019-08-06 20:52:51 +0000242 Datacollector: scheduler{
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000243 getdata: time.NewTicker(time.Duration(info.Frequency) * time.Second),
244 quit: make(chan bool),
mce7028402019-07-18 04:10:01 +0000245 },
mc862dad02019-08-06 20:52:51 +0000246 Freqchan: make(chan uint32),
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000247 Vendor: info.Vendor,
mc862dad02019-08-06 20:52:51 +0000248 Protocol: info.Protocol,
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000249 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000250 _, found := s.devicemap[info.IpAddress]
251 if found {
252 return nil, status.Errorf(codes.AlreadyExists, "Device Already registered")
253 }
254
255 _, vendorfound := vendor_default_events[info.Vendor]
256 if !vendorfound {
257 return nil, status.Errorf(codes.NotFound, "Vendor Not Found")
258 }
259
260 //default_events := [...]string{}
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000261 s.devicemap[info.IpAddress] = &d
mc862dad02019-08-06 20:52:51 +0000262 fmt.Printf("size of devicemap %d\n", len(s.devicemap))
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000263 ip_address := info.IpAddress
mc862dad02019-08-06 20:52:51 +0000264 fmt.Printf("Configuring %s\n", ip_address)
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000265 // call subscription function with info.IpAddress
mc862dad02019-08-06 20:52:51 +0000266
Dinesh Belwalkara0493ad2019-07-22 19:58:42 +0000267 default_events := vendor_default_events[info.Vendor]
mc862dad02019-08-06 20:52:51 +0000268
269 f := get_subscription_list(ip_address)
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000270 for _, event := range default_events {
mc862dad02019-08-06 20:52:51 +0000271 s.add_subscription(ip_address, event, f)
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000272 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000273 if f != nil {
274 f.Close()
275 }
mce7028402019-07-18 04:10:01 +0000276 go s.collect_data(ip_address)
Dinesh Belwalkarf57ee2e2019-07-11 17:46:00 +0000277 return &empty.Empty{}, nil
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000278}
mc6a9f01a2019-06-26 21:31:23 +0000279
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000280func NewGrpcServer(grpcport string) (l net.Listener, g *grpc.Server, e error) {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000281 fmt.Printf("Listening %s\n", grpcport)
282 g = grpc.NewServer()
283 l, e = net.Listen("tcp", grpcport)
284 return
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000285}
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000286func (s *Server) startgrpcserver() error {
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000287 fmt.Println("starting gRPC Server")
288 grpcport := ":50051"
289 listener, gserver, err := NewGrpcServer(grpcport)
290 if err != nil {
291 fmt.Println("Failed to create gRPC server: %v", err)
292 return err
293 }
294 s.gRPCserver = gserver
295 importer.RegisterDeviceManagementServer(gserver, s)
296 if err := gserver.Serve(listener); err != nil {
297 fmt.Println("Failed to run gRPC server: %v", err)
298 return err
299 }
300 return nil
301
302}
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000303func (s *Server) kafkaCloseProducer() {
304 if err := s.dataproducer.Close(); err != nil {
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000305 panic(err)
306 }
307
308}
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000309func (s *Server) kafkaInit() {
310 fmt.Println("Starting kafka init to Connect to broker: ")
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000311 config := sarama.NewConfig()
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000312 config.Producer.RequiredAcks = sarama.WaitForAll
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000313 config.Producer.Retry.Max = 10
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000314 producer, err := sarama.NewAsyncProducer([]string{"cord-kafka.default.svc.cluster.local:9092"}, config)
315 if err != nil {
316 panic(err)
317 }
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000318 s.dataproducer = producer
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000319}
320
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000321func (s *Server) handle_events(w http.ResponseWriter, r *http.Request) {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000322 signals := make(chan os.Signal, 1)
323 signal.Notify(signals, os.Interrupt)
324
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000325 fmt.Println(" IN Handle Event ")
326 if r.Method == "POST" {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000327 Body, err := ioutil.ReadAll(r.Body)
328 if err != nil {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000329 fmt.Println("Error getting HTTP data", err)
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000330 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000331 defer r.Body.Close()
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000332 fmt.Println("Received Event Message ")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000333 fmt.Printf("%s\n", Body)
334 message := &sarama.ProducerMessage{
335 Topic: importerTopic,
336 Value: sarama.StringEncoder(Body),
337 }
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000338 s.dataproducer.Input() <- message
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000339 }
340}
341
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000342func (s *Server) runServer() {
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000343 fmt.Println("Starting HTTP Server")
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000344 http.HandleFunc("/", s.handle_events)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000345 http.ListenAndServeTLS(":8080", "https-server.crt", "https-server.key", nil)
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000346}
347
mc862dad02019-08-06 20:52:51 +0000348func (s *Server) init_data_persistence() {
349 subscriptionListPath = pvmount + "/subscriptions"
350 if err := os.MkdirAll(subscriptionListPath, 0777); err != nil {
351 fmt.Println(err)
352 } else {
353 lists, err := ioutil.ReadDir(subscriptionListPath)
354 if err != nil {
355 fmt.Println(err)
356 } else {
357 for _, list := range lists {
358 b, err := ioutil.ReadFile(path.Join(subscriptionListPath, list.Name()))
359 if err != nil {
360 fmt.Println(err)
361 } else {
362 ip := list.Name()
363 d := device{}
364 json.Unmarshal(b, &d)
365 s.devicemap[ip] = &d
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000366 s.devicemap[ip].Datacollector.getdata = time.NewTicker(time.Duration(s.devicemap[ip].Freq) * time.Second)
mc862dad02019-08-06 20:52:51 +0000367 s.devicemap[ip].Datacollector.quit = make(chan bool)
368 s.devicemap[ip].Freqchan = make(chan uint32)
369 go s.collect_data(ip)
370 }
371 }
372 }
373 }
374}
375
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000376func init() {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000377 Formatter := new(log.TextFormatter)
378 Formatter.TimestampFormat = "02-01-2006 15:04:05"
379 Formatter.FullTimestamp = true
380 log.SetFormatter(Formatter)
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000381 fmt.Println("Connecting to broker: ")
382 fmt.Println("Listening to http server")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000383 log.Info("log Connecting to broker:")
384 log.Info("log Listening to http server ")
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000385 //sarama.Logger = log.New()
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000386}
387
mc862dad02019-08-06 20:52:51 +0000388func get_subscription_list(ip string) *os.File {
389 if pvmount == "" {
390 return nil
391 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000392 f, err := os.OpenFile(subscriptionListPath+"/"+ip, os.O_CREATE|os.O_RDWR, 0664)
mc862dad02019-08-06 20:52:51 +0000393 if err != nil {
394 fmt.Println(err)
395 }
396 return f
397}
Dinesh Belwalkarb6b2b302019-06-06 17:30:44 +0000398
Dinesh Belwalkar01217962019-05-23 21:51:16 +0000399func main() {
400 fmt.Println("Starting Device-management Container")
mc862dad02019-08-06 20:52:51 +0000401
402 http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
403 client := &http.Client{
404 Timeout: 10 * time.Second,
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000405 }
mc862dad02019-08-06 20:52:51 +0000406
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000407 s := Server{
408 devicemap: make(map[string]*device),
409 devicechan: make(chan *importer.DeviceInfo),
410 httpclient: client,
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000411 }
mc862dad02019-08-06 20:52:51 +0000412
413 s.kafkaInit()
Dinesh Belwalkar41229602019-06-21 16:58:06 +0000414 go s.runServer()
415 go s.startgrpcserver()
mc862dad02019-08-06 20:52:51 +0000416
417 if pvmount != "" {
418 s.init_data_persistence()
419 }
420
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +0000421 quit := make(chan os.Signal)
422 signal.Notify(quit, os.Interrupt)
423
424 select {
425 case sig := <-quit:
426 fmt.Println("Shutting down:", sig)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000427 s.kafkaCloseProducer()
Dinesh Belwalkar8af8d7d2019-05-29 21:00:50 +0000428 }
Dinesh Belwalkar01217962019-05-23 21:51:16 +0000429}