blob: 05582ad474ecaafa92eb33af83b2874cabdfeb01 [file] [log] [blame]
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +00001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package main
16
17import (
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000018 importer "./proto"
19 "bufio"
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +000020 "bytes"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000021 "crypto/tls"
22 "fmt"
23 "github.com/Shopify/sarama"
24 log "github.com/Sirupsen/logrus"
25 "golang.org/x/net/context"
26 "google.golang.org/grpc"
27 "google.golang.org/grpc/status"
28 "net"
29 "net/http"
30 "os"
31 "os/exec"
32 "os/signal"
33 "strconv"
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +000034 "strings"
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +000035)
36
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000037var REDFISH_ROOT = "/redfish/v1"
38var CONTENT_TYPE = "application/json"
nickhuang6b31f8f2019-09-26 02:02:14 +000039var EVENTS_MAP = map[string]string{
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000040 "add": "ResourceAdded",
41 "rm": "ResourceRemoved",
42 "alert": "Alert",
43 "update": "Update"}
nickhuang6b31f8f2019-09-26 02:02:14 +000044
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000045var default_address string = "localhost:31085"
46var default_port string = "8888"
47var default_vendor string = "edgecore"
48var default_freq uint64 = 180
49var attach_device_ip string = ""
50var importerTopic = "importer"
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +000051var DataConsumer sarama.Consumer
52
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000053var cc importer.DeviceManagementClient
54var ctx context.Context
55var conn *grpc.ClientConn
nickhuang6b31f8f2019-09-26 02:02:14 +000056
57/*///////////////////////////////////////////////////////////////////////*/
58// Allows user to register the device for data collection and frequency.
59//
60//
61/*///////////////////////////////////////////////////////////////////////*/
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000062func Attach(aip string, avendor string, afreq uint32) error {
63 fmt.Println("Received Attach\n")
64 var default_protocol string = "https"
65 deviceinfo := new(importer.DeviceInfo)
66 deviceinfo.IpAddress = aip
67 deviceinfo.Vendor = avendor
68 deviceinfo.Frequency = afreq
69 deviceinfo.Protocol = default_protocol
70 _, err := cc.SendDeviceInfo(ctx, deviceinfo)
nickhuang6b31f8f2019-09-26 02:02:14 +000071
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000072 return err
nickhuang6b31f8f2019-09-26 02:02:14 +000073}
74
75/*///////////////////////////////////////////////////////////////////////*/
76// Allows user to change the frequency of data collection
77//
78//
79/*///////////////////////////////////////////////////////////////////////*/
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000080func UpdateFreq(ip_address string, wd uint32) error {
81 fmt.Println("Received Period\n")
82 freqinfo := new(importer.FreqInfo)
83 freqinfo.Frequency = wd
84 freqinfo.IpAddress = ip_address
85 _, err := cc.SetFrequency(ctx, freqinfo)
nickhuang6b31f8f2019-09-26 02:02:14 +000086
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000087 return err
nickhuang6b31f8f2019-09-26 02:02:14 +000088}
89
90/*///////////////////////////////////////////////////////////////////////*/
91// Allows user to unsubscribe events
92//
93//
94/*///////////////////////////////////////////////////////////////////////*/
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +000095func Subscribe(ip_address string, Giveneventlist []string) error {
96 fmt.Println("Received Subscribe\n")
97 giveneventlist := new(importer.GivenEventList)
98 giveneventlist.Events = Giveneventlist
99 giveneventlist.EventIpAddress = ip_address
100 _, err := cc.SubsrcribeGivenEvents(ctx, giveneventlist)
nickhuang6b31f8f2019-09-26 02:02:14 +0000101
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000102 return err
nickhuang6b31f8f2019-09-26 02:02:14 +0000103}
104
105/*///////////////////////////////////////////////////////////////////////*/
106// Allows user to unsubscribe events
107//
108//
109/*///////////////////////////////////////////////////////////////////////*/
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000110func UnSubscribe(ip_address string, Giveneventlist []string) error {
111 fmt.Println("Received UnSubscribe\n")
112 giveneventlist := new(importer.GivenEventList)
113 giveneventlist.Events = Giveneventlist
114 giveneventlist.EventIpAddress = ip_address
115 _, err := cc.UnSubsrcribeGivenEvents(ctx, giveneventlist)
nickhuang6b31f8f2019-09-26 02:02:14 +0000116
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000117 return err
nickhuang6b31f8f2019-09-26 02:02:14 +0000118}
119
120/*///////////////////////////////////////////////////////////////////////*/
121// Allows user to get the events supported by device
122//
123//
124/*///////////////////////////////////////////////////////////////////////*/
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000125func GetEventSupportList(vendor string) (error, []string) {
126 fmt.Println("Received GetEventSupportList\n")
127 vendorinfo := new(importer.VendorInfo)
128 vendorinfo.Vendor = vendor
129 var ret_msg *importer.EventList
130 ret_msg, err := cc.GetEventList(ctx, vendorinfo)
131
132 return err, ret_msg.Events
133}
134
135/*///////////////////////////////////////////////////////////////////////*/
136// Allows user to get the current events subscribed by device
137//
138//
139/*///////////////////////////////////////////////////////////////////////*/
140func GetEventCurrentDeviceList(ip_address string) (error, []string) {
141 fmt.Println("Received GetEventCurrentDeviceList\n")
142 currentdeviceinfo := new(importer.Device)
143 currentdeviceinfo.IpAddress = ip_address
144 var ret_msg *importer.EventList
145 ret_msg, err := cc.GetCurrentEventList(ctx, currentdeviceinfo)
146
147 return err, ret_msg.Events
148}
149
150/*///////////////////////////////////////////////////////////////////////*/
151// Allows user to get the current events subscribed by device
152//
153//
154/*///////////////////////////////////////////////////////////////////////*/
155func ClearCurrentDeviceEventList(ip_address string) error {
156 fmt.Println("Received ClearCurrentDeviceEventList\n")
157 currentdeviceinfo := new(importer.Device)
158 currentdeviceinfo.IpAddress = ip_address
159 _, err := cc.ClearCurrentEventList(ctx, currentdeviceinfo)
160
161 return err
nickhuang6b31f8f2019-09-26 02:02:14 +0000162}
163
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000164func init() {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000165 Formatter := new(log.TextFormatter)
166 Formatter.TimestampFormat = "02-01-2006 15:04:05"
167 Formatter.FullTimestamp = true
168 log.SetFormatter(Formatter)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000169}
170
171func topicListener(topic *string, master sarama.Consumer) {
172 log.Info("Starting topicListener for ", *topic)
173 consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
174 if err != nil {
175 log.Error("topicListener panic, topic=[%s]: %s", *topic, err.Error())
176 os.Exit(1)
177 }
178 signals := make(chan os.Signal, 1)
179 signal.Notify(signals, os.Interrupt)
180 doneCh := make(chan struct{})
181 go func() {
182 for {
183 select {
184 case err := <-consumer.Errors():
185 log.Error("Consumer error: %s", err.Err)
186 case msg := <-consumer.Messages():
187 log.Info("Got message on topic=[%s]: %s", *topic, string(msg.Value))
188 case <-signals:
189 log.Warn("Interrupt is detected")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000190 os.Exit(1)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000191 }
192 }
193 }()
194 <-doneCh
195}
196
197func kafkainit() {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000198 cmd := exec.Command("/bin/sh", "kafka_ip.sh")
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000199 var kafkaIP string
200 var out bytes.Buffer
201 cmd.Stdout = &out
202 err := cmd.Run()
203 if err != nil {
204 log.Info(err)
205 os.Exit(1)
206 }
207
208 kafkaIP = out.String()
209 kafkaIP = strings.TrimSuffix(kafkaIP, "\n")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000210 kafkaIP = kafkaIP + ":9092"
211 fmt.Println("IP address of kafka-cord-0:", kafkaIP)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000212 config := sarama.NewConfig()
213 config.Consumer.Return.Errors = true
214 master, err := sarama.NewConsumer([]string{kafkaIP}, config)
215 if err != nil {
216 panic(err)
217 }
218 DataConsumer = master
219
220 go topicListener(&importerTopic, master)
221}
222func main() {
nickhuang6b31f8f2019-09-26 02:02:14 +0000223 http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
224 fmt.Println("Launching server...")
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000225 log.Info("kafkaInit starting")
226 kafkainit()
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000227
nickhuang6b31f8f2019-09-26 02:02:14 +0000228 ln, err := net.Listen("tcp", ":9999")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000229 if err != nil {
230 fmt.Println("could not listen")
231 log.Fatal("did not listen: %v", err)
232 }
nickhuang6b31f8f2019-09-26 02:02:14 +0000233 defer ln.Close()
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000234
nickhuang6b31f8f2019-09-26 02:02:14 +0000235 connS, err := ln.Accept()
236 if err != nil {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000237 fmt.Println("Accept error")
238 log.Fatal("Accept error: %v", err)
239 } else {
nickhuang6b31f8f2019-09-26 02:02:14 +0000240
241 conn, err = grpc.Dial(default_address, grpc.WithInsecure())
242 if err != nil {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000243 fmt.Println("could not connect")
nickhuang6b31f8f2019-09-26 02:02:14 +0000244 log.Fatal("did not connect: %v", err)
245 }
246 defer conn.Close()
247
248 cc = importer.NewDeviceManagementClient(conn)
249 ctx = context.Background()
250
nickhuang6b31f8f2019-09-26 02:02:14 +0000251 loop := true
252
253 for loop == true {
254 cmd, _ := bufio.NewReader(connS).ReadString('\n')
255
256 cmd = strings.TrimSuffix(cmd, "\n")
257 s := strings.Split(cmd, ":")
258 newmessage := "cmd error!!"
259 cmd = s[0]
260
261 switch string(cmd) {
262
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000263 case "attach":
nickhuang6b31f8f2019-09-26 02:02:14 +0000264 cmd_size := len(s)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000265 var err error
266 var uafreq uint64
267 if cmd_size == 5 {
268 aip := s[1]
269 aport := s[2]
270 avendor := s[3]
271 afreq := s[4]
272 uafreq, err = strconv.ParseUint(afreq, 10, 64)
nickhuang6b31f8f2019-09-26 02:02:14 +0000273
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000274 if err != nil {
275 fmt.Print("ParseUint error!!\n")
nickhuang6b31f8f2019-09-26 02:02:14 +0000276 }
277
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000278 attach_device_ip = aip + ":" + aport
279
280 err = Attach(attach_device_ip, avendor, uint32(uafreq))
281 if err != nil {
282 errStatus, _ := status.FromError(err)
283 fmt.Println(errStatus.Message())
284 fmt.Println(errStatus.Code())
285 fmt.Print("attach error!!\n")
286 newmessage = errStatus.Message()
287
288 } else {
289 fmt.Print("attatch IP:\n", attach_device_ip)
nickhuang6b31f8f2019-09-26 02:02:14 +0000290 newmessage = attach_device_ip
291 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000292 } else {
293 fmt.Print("Need IP addres,port,vendor,freqs !!\n")
nickhuang6b31f8f2019-09-26 02:02:14 +0000294 newmessage = "Need IP address !!"
295
296 }
nickhuang6b31f8f2019-09-26 02:02:14 +0000297
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000298 case "period":
nickhuang6b31f8f2019-09-26 02:02:14 +0000299 cmd_size := len(s)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000300 fmt.Print("cmd_size period %d", cmd_size)
301 if cmd_size == 4 {
302 fip := s[1]
303 fport := s[2]
304 pv := s[3]
305 fmt.Print("pv:", pv)
306 u, err := strconv.ParseUint(pv, 10, 64)
307
308 if err != nil {
309 fmt.Print("ParseUint error!!\n")
310 } else {
311 wd := uint32(u)
312 ip_address := fip + ":" + fport
313 err = UpdateFreq(ip_address, wd)
nickhuang6b31f8f2019-09-26 02:02:14 +0000314
315 if err != nil {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000316 errStatus, _ := status.FromError(err)
317 fmt.Println(errStatus.Message())
318 fmt.Println(errStatus.Code())
319 newmessage = errStatus.Message()
320 fmt.Print("period error!!\n")
321 } else {
322 newmessage = strings.ToUpper(cmd)
nickhuang6b31f8f2019-09-26 02:02:14 +0000323 }
nickhuang6b31f8f2019-09-26 02:02:14 +0000324 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000325 } else {
326 fmt.Print("Need period value !!\n")
nickhuang6b31f8f2019-09-26 02:02:14 +0000327 newmessage = "Need period value !!"
328 }
329
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000330 case "sub", "unsub":
nickhuang6b31f8f2019-09-26 02:02:14 +0000331 cmd_size := len(s)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000332 fmt.Print("cmd is :", cmd, cmd_size)
333 if cmd_size > 6 || cmd_size < 0 {
nickhuang6b31f8f2019-09-26 02:02:14 +0000334 fmt.Print("error event !!")
335 newmessage = "error event !!"
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000336 } else {
337 ip := s[1]
338 port := s[2]
339 ip_address := ip + ":" + port
nickhuang6b31f8f2019-09-26 02:02:14 +0000340 var events_list []string
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000341 for i := 3; i < cmd_size; i++ {
nickhuang6b31f8f2019-09-26 02:02:14 +0000342 if value, ok := EVENTS_MAP[s[i]]; ok {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000343 events_list = append(events_list, value)
nickhuang6b31f8f2019-09-26 02:02:14 +0000344 } else {
345 fmt.Println("key not found")
346 }
347 }
348
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000349 if string(cmd) == "sub" {
350 err = Subscribe(ip_address, events_list)
351 if err != nil {
352 errStatus, _ := status.FromError(err)
353 fmt.Println(errStatus.Message())
354 fmt.Println(errStatus.Code())
355 newmessage = errStatus.Message()
356 fmt.Print("sub error!!")
357 } else {
358 newmessage = strings.ToUpper(cmd)
nickhuang6b31f8f2019-09-26 02:02:14 +0000359 }
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000360 } else {
361 err = UnSubscribe(ip_address, events_list)
362 if err != nil {
363 errStatus, _ := status.FromError(err)
364 fmt.Println(errStatus.Message())
365 fmt.Println(errStatus.Code())
366 newmessage = errStatus.Message()
367 fmt.Print("unsub error!!")
368 } else {
369 newmessage = strings.ToUpper(cmd)
370 }
nickhuang6b31f8f2019-09-26 02:02:14 +0000371 }
372 }
nickhuang6b31f8f2019-09-26 02:02:14 +0000373
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000374 case "showeventlist":
375 cmd_size := len(s)
376 fmt.Print("cmd is :", cmd, cmd_size)
377 if cmd_size > 3 || cmd_size < 0 {
378 fmt.Print("error event !!")
379 newmessage = "error event !!"
380 } else {
381 vendor := s[1]
382 err, supportlist := GetEventSupportList(vendor)
nickhuang6b31f8f2019-09-26 02:02:14 +0000383
384 if err != nil {
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000385 errStatus, _ := status.FromError(err)
386 fmt.Println(errStatus.Message())
387 fmt.Println(errStatus.Code())
388 newmessage = errStatus.Message()
nickhuang6b31f8f2019-09-26 02:02:14 +0000389 fmt.Print("showeventlist error!!")
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000390 } else {
nickhuang6b31f8f2019-09-26 02:02:14 +0000391 fmt.Print("showeventlist ", supportlist)
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000392 newmessage = strings.Join(supportlist[:], ",")
nickhuang6b31f8f2019-09-26 02:02:14 +0000393 }
nickhuang6b31f8f2019-09-26 02:02:14 +0000394 }
395
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000396 case "showdeviceeventlist":
397 cmd_size := len(s)
398 fmt.Print("cmd is :", cmd, cmd_size)
399 if cmd_size > 4 || cmd_size < 0 {
400 fmt.Print("error event !!")
401 newmessage = "error event !!"
402 } else {
403 eip := s[1]
404 eport := s[2]
405 ip_address := eip + ":" + eport
406 err, currentlist := GetEventCurrentDeviceList(ip_address)
nickhuang6b31f8f2019-09-26 02:02:14 +0000407
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000408 if err != nil {
409 errStatus, _ := status.FromError(err)
410 fmt.Println(errStatus.Message())
411 fmt.Println(errStatus.Code())
412 newmessage = errStatus.Message()
413 fmt.Print("showdeviceeventlist error!!")
414 } else {
415 fmt.Print("showeventlist ", currentlist)
416 newmessage = strings.Join(currentlist[:], ",")
417 }
418 }
419
420 case "cleardeviceeventlist":
421 cmd_size := len(s)
422 fmt.Print("cmd is :", cmd, cmd_size)
423 if cmd_size > 4 || cmd_size < 0 {
424 fmt.Print("error event !!")
425 newmessage = "error event !!"
426 } else {
427 clip := s[1]
428 clport := s[2]
429 ip_address := clip + ":" + clport
430 err = ClearCurrentDeviceEventList(ip_address)
431 if err != nil {
432 errStatus, _ := status.FromError(err)
433 fmt.Println(errStatus.Message())
434 fmt.Println(errStatus.Code())
435 newmessage = errStatus.Message()
436 fmt.Print("cleardeviceeventlist error!!")
437 } else {
438 newmessage = strings.ToUpper(cmd)
439 }
440 }
441
442 case "QUIT":
nickhuang6b31f8f2019-09-26 02:02:14 +0000443 loop = false
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000444 newmessage = "QUIT"
nickhuang6b31f8f2019-09-26 02:02:14 +0000445
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000446 default:
nickhuang6b31f8f2019-09-26 02:02:14 +0000447 }
448 // send string back to client
449 connS.Write([]byte(newmessage + "\n"))
Dinesh Belwalkarb5db83f2019-10-24 17:27:58 +0000450 }
nickhuang6b31f8f2019-09-26 02:02:14 +0000451 }
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000452}