blob: d1d7ca9d52bc648b4af33b0503c7cea7de266246 [file] [log] [blame]
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +00001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package main
16
17import (
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000018 importer "./proto"
19 "bufio"
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +000020 "bytes"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000021 "crypto/tls"
22 "fmt"
23 "github.com/Shopify/sarama"
24 log "github.com/Sirupsen/logrus"
25 "golang.org/x/net/context"
26 "google.golang.org/grpc"
27 "google.golang.org/grpc/status"
28 "net"
29 "net/http"
30 "os"
31 "os/exec"
32 "os/signal"
33 "strconv"
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +000034 "strings"
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +000035)
36
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000037var REDFISH_ROOT = "/redfish/v1"
38var CONTENT_TYPE = "application/json"
nickhuang6b31f8f2019-09-26 02:02:14 +000039var EVENTS_MAP = map[string]string{
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000040 "add": "ResourceAdded",
41 "rm": "ResourceRemoved",
42 "alert": "Alert",
43 "update": "Update"}
nickhuang6b31f8f2019-09-26 02:02:14 +000044
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000045var default_address string = "localhost:31085"
46var default_port string = "8888"
47var default_vendor string = "edgecore"
48var default_freq uint64 = 180
49var attach_device_ip string = ""
50var importerTopic = "importer"
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +000051var DataConsumer sarama.Consumer
52
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000053var cc importer.DeviceManagementClient
54var ctx context.Context
55var conn *grpc.ClientConn
nickhuang6b31f8f2019-09-26 02:02:14 +000056
57/*///////////////////////////////////////////////////////////////////////*/
58// Allows user to register the device for data collection and frequency.
59//
60//
61/*///////////////////////////////////////////////////////////////////////*/
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000062func Attach(aip string, avendor string, afreq uint32) error {
63 fmt.Println("Received Attach\n")
64 var default_protocol string = "https"
65 deviceinfo := new(importer.DeviceInfo)
66 deviceinfo.IpAddress = aip
67 deviceinfo.Vendor = avendor
68 deviceinfo.Frequency = afreq
69 deviceinfo.Protocol = default_protocol
70 _, err := cc.SendDeviceInfo(ctx, deviceinfo)
nickhuang6b31f8f2019-09-26 02:02:14 +000071
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000072 return err
nickhuang6b31f8f2019-09-26 02:02:14 +000073}
74
75/*///////////////////////////////////////////////////////////////////////*/
76// Allows user to change the frequency of data collection
77//
78//
79/*///////////////////////////////////////////////////////////////////////*/
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000080func UpdateFreq(ip_address string, wd uint32) error {
81 fmt.Println("Received Period\n")
82 freqinfo := new(importer.FreqInfo)
83 freqinfo.Frequency = wd
84 freqinfo.IpAddress = ip_address
85 _, err := cc.SetFrequency(ctx, freqinfo)
nickhuang6b31f8f2019-09-26 02:02:14 +000086
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000087 return err
nickhuang6b31f8f2019-09-26 02:02:14 +000088}
89
90/*///////////////////////////////////////////////////////////////////////*/
91// Allows user to unsubscribe events
92//
93//
94/*///////////////////////////////////////////////////////////////////////*/
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000095func Subscribe(ip_address string, Giveneventlist []string) error {
96 fmt.Println("Received Subscribe\n")
97 giveneventlist := new(importer.GivenEventList)
98 giveneventlist.Events = Giveneventlist
99 giveneventlist.EventIpAddress = ip_address
100 _, err := cc.SubsrcribeGivenEvents(ctx, giveneventlist)
nickhuang6b31f8f2019-09-26 02:02:14 +0000101
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000102 return err
nickhuang6b31f8f2019-09-26 02:02:14 +0000103}
104
105/*///////////////////////////////////////////////////////////////////////*/
106// Allows user to unsubscribe events
107//
108//
109/*///////////////////////////////////////////////////////////////////////*/
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000110func UnSubscribe(ip_address string, Giveneventlist []string) error {
111 fmt.Println("Received UnSubscribe\n")
112 giveneventlist := new(importer.GivenEventList)
113 giveneventlist.Events = Giveneventlist
114 giveneventlist.EventIpAddress = ip_address
115 _, err := cc.UnSubsrcribeGivenEvents(ctx, giveneventlist)
nickhuang6b31f8f2019-09-26 02:02:14 +0000116
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000117 return err
nickhuang6b31f8f2019-09-26 02:02:14 +0000118}
119
120/*///////////////////////////////////////////////////////////////////////*/
121// Allows user to get the events supported by device
122//
123//
124/*///////////////////////////////////////////////////////////////////////*/
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000125func GetEventSupportList(vendor string) (error, []string) {
126 fmt.Println("Received GetEventSupportList\n")
127 vendorinfo := new(importer.VendorInfo)
128 vendorinfo.Vendor = vendor
129 var ret_msg *importer.EventList
130 ret_msg, err := cc.GetEventList(ctx, vendorinfo)
dileepbk86ef0102019-11-13 00:08:33 +0000131 if err != nil {
132 return err, nil
133 } else {
134 return err, ret_msg.Events
135 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000136}
137
138/*///////////////////////////////////////////////////////////////////////*/
139// Allows user to get the current events subscribed by device
140//
141//
142/*///////////////////////////////////////////////////////////////////////*/
143func GetEventCurrentDeviceList(ip_address string) (error, []string) {
144 fmt.Println("Received GetEventCurrentDeviceList\n")
145 currentdeviceinfo := new(importer.Device)
146 currentdeviceinfo.IpAddress = ip_address
147 var ret_msg *importer.EventList
148 ret_msg, err := cc.GetCurrentEventList(ctx, currentdeviceinfo)
dileepbk86ef0102019-11-13 00:08:33 +0000149 if err != nil {
150 return err, nil
151 } else {
152 return err, ret_msg.Events
153 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000154}
155
156/*///////////////////////////////////////////////////////////////////////*/
157// Allows user to get the current events subscribed by device
158//
159//
160/*///////////////////////////////////////////////////////////////////////*/
161func ClearCurrentDeviceEventList(ip_address string) error {
162 fmt.Println("Received ClearCurrentDeviceEventList\n")
163 currentdeviceinfo := new(importer.Device)
164 currentdeviceinfo.IpAddress = ip_address
165 _, err := cc.ClearCurrentEventList(ctx, currentdeviceinfo)
166
167 return err
nickhuang6b31f8f2019-09-26 02:02:14 +0000168}
169
dileepbk86ef0102019-11-13 00:08:33 +0000170/*///////////////////////////////////////////////////////////////////////*/
171// Allows user to get the current devices that are monitored
172//
173//
174/*///////////////////////////////////////////////////////////////////////*/
175func GetCurrentDevices() (error, []string) {
176 fmt.Println("Testing GetCurrentDevices\n")
177 empty := new(importer.Empty)
178 var ret_msg *importer.DeviceList
179 ret_msg, err := cc.GetCurrentDevices(ctx, empty)
180 if err != nil {
181 return err, nil
182 } else {
183 return err, ret_msg.Ip
184 }
185}
186
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000187func init() {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000188 Formatter := new(log.TextFormatter)
189 Formatter.TimestampFormat = "02-01-2006 15:04:05"
190 Formatter.FullTimestamp = true
191 log.SetFormatter(Formatter)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000192}
193
194func topicListener(topic *string, master sarama.Consumer) {
195 log.Info("Starting topicListener for ", *topic)
196 consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
197 if err != nil {
198 log.Error("topicListener panic, topic=[%s]: %s", *topic, err.Error())
199 os.Exit(1)
200 }
201 signals := make(chan os.Signal, 1)
202 signal.Notify(signals, os.Interrupt)
203 doneCh := make(chan struct{})
204 go func() {
205 for {
206 select {
207 case err := <-consumer.Errors():
208 log.Error("Consumer error: %s", err.Err)
209 case msg := <-consumer.Messages():
210 log.Info("Got message on topic=[%s]: %s", *topic, string(msg.Value))
211 case <-signals:
212 log.Warn("Interrupt is detected")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000213 os.Exit(1)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000214 }
215 }
216 }()
217 <-doneCh
218}
219
220func kafkainit() {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000221 cmd := exec.Command("/bin/sh", "kafka_ip.sh")
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000222 var kafkaIP string
223 var out bytes.Buffer
224 cmd.Stdout = &out
225 err := cmd.Run()
226 if err != nil {
227 log.Info(err)
228 os.Exit(1)
229 }
230
231 kafkaIP = out.String()
232 kafkaIP = strings.TrimSuffix(kafkaIP, "\n")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000233 kafkaIP = kafkaIP + ":9092"
234 fmt.Println("IP address of kafka-cord-0:", kafkaIP)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000235 config := sarama.NewConfig()
236 config.Consumer.Return.Errors = true
237 master, err := sarama.NewConsumer([]string{kafkaIP}, config)
238 if err != nil {
239 panic(err)
240 }
241 DataConsumer = master
242
243 go topicListener(&importerTopic, master)
244}
245func main() {
nickhuang6b31f8f2019-09-26 02:02:14 +0000246 http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
247 fmt.Println("Launching server...")
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000248 log.Info("kafkaInit starting")
249 kafkainit()
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000250
nickhuang6b31f8f2019-09-26 02:02:14 +0000251 ln, err := net.Listen("tcp", ":9999")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000252 if err != nil {
253 fmt.Println("could not listen")
254 log.Fatal("did not listen: %v", err)
255 }
nickhuang6b31f8f2019-09-26 02:02:14 +0000256 defer ln.Close()
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000257
nickhuang6b31f8f2019-09-26 02:02:14 +0000258 connS, err := ln.Accept()
259 if err != nil {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000260 fmt.Println("Accept error")
261 log.Fatal("Accept error: %v", err)
262 } else {
nickhuang6b31f8f2019-09-26 02:02:14 +0000263
264 conn, err = grpc.Dial(default_address, grpc.WithInsecure())
265 if err != nil {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000266 fmt.Println("could not connect")
nickhuang6b31f8f2019-09-26 02:02:14 +0000267 log.Fatal("did not connect: %v", err)
268 }
269 defer conn.Close()
270
271 cc = importer.NewDeviceManagementClient(conn)
272 ctx = context.Background()
273
nickhuang6b31f8f2019-09-26 02:02:14 +0000274 loop := true
275
276 for loop == true {
277 cmd, _ := bufio.NewReader(connS).ReadString('\n')
278
279 cmd = strings.TrimSuffix(cmd, "\n")
280 s := strings.Split(cmd, ":")
281 newmessage := "cmd error!!"
282 cmd = s[0]
283
284 switch string(cmd) {
285
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000286 case "attach":
nickhuang6b31f8f2019-09-26 02:02:14 +0000287 cmd_size := len(s)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000288 var err error
289 var uafreq uint64
290 if cmd_size == 5 {
291 aip := s[1]
292 aport := s[2]
293 avendor := s[3]
294 afreq := s[4]
295 uafreq, err = strconv.ParseUint(afreq, 10, 64)
nickhuang6b31f8f2019-09-26 02:02:14 +0000296
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000297 if err != nil {
298 fmt.Print("ParseUint error!!\n")
nickhuang6b31f8f2019-09-26 02:02:14 +0000299 }
300
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000301 attach_device_ip = aip + ":" + aport
302
303 err = Attach(attach_device_ip, avendor, uint32(uafreq))
304 if err != nil {
305 errStatus, _ := status.FromError(err)
306 fmt.Println(errStatus.Message())
307 fmt.Println(errStatus.Code())
308 fmt.Print("attach error!!\n")
309 newmessage = errStatus.Message()
310
311 } else {
312 fmt.Print("attatch IP:\n", attach_device_ip)
nickhuang6b31f8f2019-09-26 02:02:14 +0000313 newmessage = attach_device_ip
314 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000315 } else {
316 fmt.Print("Need IP addres,port,vendor,freqs !!\n")
nickhuang6b31f8f2019-09-26 02:02:14 +0000317 newmessage = "Need IP address !!"
318
319 }
nickhuang6b31f8f2019-09-26 02:02:14 +0000320
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000321 case "period":
nickhuang6b31f8f2019-09-26 02:02:14 +0000322 cmd_size := len(s)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000323 fmt.Print("cmd_size period %d", cmd_size)
324 if cmd_size == 4 {
325 fip := s[1]
326 fport := s[2]
327 pv := s[3]
328 fmt.Print("pv:", pv)
329 u, err := strconv.ParseUint(pv, 10, 64)
330
331 if err != nil {
332 fmt.Print("ParseUint error!!\n")
333 } else {
334 wd := uint32(u)
335 ip_address := fip + ":" + fport
336 err = UpdateFreq(ip_address, wd)
nickhuang6b31f8f2019-09-26 02:02:14 +0000337
338 if err != nil {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000339 errStatus, _ := status.FromError(err)
340 fmt.Println(errStatus.Message())
341 fmt.Println(errStatus.Code())
342 newmessage = errStatus.Message()
343 fmt.Print("period error!!\n")
344 } else {
345 newmessage = strings.ToUpper(cmd)
nickhuang6b31f8f2019-09-26 02:02:14 +0000346 }
nickhuang6b31f8f2019-09-26 02:02:14 +0000347 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000348 } else {
349 fmt.Print("Need period value !!\n")
nickhuang6b31f8f2019-09-26 02:02:14 +0000350 newmessage = "Need period value !!"
351 }
352
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000353 case "sub", "unsub":
nickhuang6b31f8f2019-09-26 02:02:14 +0000354 cmd_size := len(s)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000355 fmt.Print("cmd is :", cmd, cmd_size)
356 if cmd_size > 6 || cmd_size < 0 {
nickhuang6b31f8f2019-09-26 02:02:14 +0000357 fmt.Print("error event !!")
358 newmessage = "error event !!"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000359 } else {
360 ip := s[1]
361 port := s[2]
362 ip_address := ip + ":" + port
nickhuang6b31f8f2019-09-26 02:02:14 +0000363 var events_list []string
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000364 for i := 3; i < cmd_size; i++ {
nickhuang6b31f8f2019-09-26 02:02:14 +0000365 if value, ok := EVENTS_MAP[s[i]]; ok {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000366 events_list = append(events_list, value)
nickhuang6b31f8f2019-09-26 02:02:14 +0000367 } else {
368 fmt.Println("key not found")
369 }
370 }
371
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000372 if string(cmd) == "sub" {
373 err = Subscribe(ip_address, events_list)
374 if err != nil {
375 errStatus, _ := status.FromError(err)
376 fmt.Println(errStatus.Message())
377 fmt.Println(errStatus.Code())
378 newmessage = errStatus.Message()
379 fmt.Print("sub error!!")
380 } else {
381 newmessage = strings.ToUpper(cmd)
nickhuang6b31f8f2019-09-26 02:02:14 +0000382 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000383 } else {
384 err = UnSubscribe(ip_address, events_list)
385 if err != nil {
386 errStatus, _ := status.FromError(err)
387 fmt.Println(errStatus.Message())
388 fmt.Println(errStatus.Code())
389 newmessage = errStatus.Message()
390 fmt.Print("unsub error!!")
391 } else {
392 newmessage = strings.ToUpper(cmd)
393 }
nickhuang6b31f8f2019-09-26 02:02:14 +0000394 }
395 }
nickhuang6b31f8f2019-09-26 02:02:14 +0000396
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000397 case "showeventlist":
398 cmd_size := len(s)
399 fmt.Print("cmd is :", cmd, cmd_size)
400 if cmd_size > 3 || cmd_size < 0 {
401 fmt.Print("error event !!")
402 newmessage = "error event !!"
403 } else {
404 vendor := s[1]
405 err, supportlist := GetEventSupportList(vendor)
nickhuang6b31f8f2019-09-26 02:02:14 +0000406
407 if err != nil {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000408 errStatus, _ := status.FromError(err)
409 fmt.Println(errStatus.Message())
410 fmt.Println(errStatus.Code())
411 newmessage = errStatus.Message()
nickhuang6b31f8f2019-09-26 02:02:14 +0000412 fmt.Print("showeventlist error!!")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000413 } else {
nickhuang6b31f8f2019-09-26 02:02:14 +0000414 fmt.Print("showeventlist ", supportlist)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000415 newmessage = strings.Join(supportlist[:], ",")
nickhuang6b31f8f2019-09-26 02:02:14 +0000416 }
nickhuang6b31f8f2019-09-26 02:02:14 +0000417 }
418
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000419 case "showdeviceeventlist":
420 cmd_size := len(s)
421 fmt.Print("cmd is :", cmd, cmd_size)
422 if cmd_size > 4 || cmd_size < 0 {
423 fmt.Print("error event !!")
424 newmessage = "error event !!"
425 } else {
426 eip := s[1]
427 eport := s[2]
428 ip_address := eip + ":" + eport
429 err, currentlist := GetEventCurrentDeviceList(ip_address)
nickhuang6b31f8f2019-09-26 02:02:14 +0000430
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000431 if err != nil {
432 errStatus, _ := status.FromError(err)
433 fmt.Println(errStatus.Message())
434 fmt.Println(errStatus.Code())
435 newmessage = errStatus.Message()
436 fmt.Print("showdeviceeventlist error!!")
437 } else {
438 fmt.Print("showeventlist ", currentlist)
439 newmessage = strings.Join(currentlist[:], ",")
440 }
441 }
442
443 case "cleardeviceeventlist":
444 cmd_size := len(s)
445 fmt.Print("cmd is :", cmd, cmd_size)
446 if cmd_size > 4 || cmd_size < 0 {
447 fmt.Print("error event !!")
448 newmessage = "error event !!"
449 } else {
450 clip := s[1]
451 clport := s[2]
452 ip_address := clip + ":" + clport
453 err = ClearCurrentDeviceEventList(ip_address)
454 if err != nil {
455 errStatus, _ := status.FromError(err)
456 fmt.Println(errStatus.Message())
457 fmt.Println(errStatus.Code())
458 newmessage = errStatus.Message()
459 fmt.Print("cleardeviceeventlist error!!")
460 } else {
461 newmessage = strings.ToUpper(cmd)
462 }
463 }
464
dileepbk86ef0102019-11-13 00:08:33 +0000465 case "showdevices":
466 cmd_size := len(s)
467 fmt.Print("cmd is :", cmd, cmd_size)
468 if cmd_size > 2 || cmd_size < 0 {
469 fmt.Print("error event !!")
470 newmessage = "error event !!"
471 } else {
472 err, currentlist := GetCurrentDevices()
473
474 if err != nil {
475 errStatus, _ := status.FromError(err)
476 fmt.Println(errStatus.Message())
477 fmt.Println(errStatus.Code())
478 newmessage = errStatus.Message()
479 fmt.Print("showdevices error!!")
480 } else {
481 fmt.Print("showdevices ", currentlist)
482 newmessage = strings.Join(currentlist[:], ", ")
483 }
484 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000485 case "QUIT":
nickhuang6b31f8f2019-09-26 02:02:14 +0000486 loop = false
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000487 newmessage = "QUIT"
nickhuang6b31f8f2019-09-26 02:02:14 +0000488
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000489 default:
nickhuang6b31f8f2019-09-26 02:02:14 +0000490 }
491 // send string back to client
492 connS.Write([]byte(newmessage + "\n"))
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000493 }
nickhuang6b31f8f2019-09-26 02:02:14 +0000494 }
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000495}