blob: 70c2df65a80ab2fc75ce99d755d8aa9f0b935f0c [file] [log] [blame]
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +00001// Copyright 2018 Open Networking Foundation
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15package main
16
17import (
18 "fmt"
nickhuang6b31f8f2019-09-26 02:02:14 +000019 "net"
20 "bufio"
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +000021 "os"
22 "os/signal"
23 "os/exec"
24 "github.com/Shopify/sarama"
25 "google.golang.org/grpc"
26 "golang.org/x/net/context"
27 importer "./proto"
28 log "github.com/Sirupsen/logrus"
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +000029 "bytes"
30 "strings"
nickhuang6b31f8f2019-09-26 02:02:14 +000031 "net/http"
32 "crypto/tls"
33 "strconv"
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +000034)
35
nickhuang6b31f8f2019-09-26 02:02:14 +000036var REDFISH_ROOT = "/redfish/v1"
37var CONTENT_TYPE = "application/json"
38var EVENTS_MAP = map[string]string{
39"add":"ResourceAdded",
40"rm":"ResourceRemoved",
41"alert":"Alert",
42"update":"Update"}
43
44var default_address string = "localhost:31085"
45var default_port string = "8888"
46var default_vendor string = "edgecore"
47var default_freq uint64 = 180
48var attach_device_ip string = ""
49var importerTopic = "importer"
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +000050var DataConsumer sarama.Consumer
51
nickhuang6b31f8f2019-09-26 02:02:14 +000052var cc importer.DeviceManagementClient
53var ctx context.Context
54var conn * grpc.ClientConn
55
56type Device struct {
57 deviceinfo * importer.DeviceInfo
58 eventlist * importer.EventList
59}
60
61var devicemap map[string]* Device
62
63/*///////////////////////////////////////////////////////////////////////*/
64// Allows user to register the device for data collection and frequency.
65//
66//
67/*///////////////////////////////////////////////////////////////////////*/
68func (s * Device) Attach(aip string, avendor string, afreq uint32) (error, string) {
69 fmt.Println("Received Attach\n")
70 var default_protocol string = "https"
71
72 s.deviceinfo = new(importer.DeviceInfo)
73 s.eventlist = new(importer.EventList)
74 s.deviceinfo.IpAddress = aip
75 s.deviceinfo.Vendor = avendor
76 s.deviceinfo.Frequency = afreq
77 s.deviceinfo.Protocol = default_protocol
78 _, err := cc.SendDeviceInfo(ctx, s.deviceinfo)
79
80 if err != nil {
81 return err ,"attach error!!"
82 }else{
83 return nil,""
84 }
85}
86
87/*///////////////////////////////////////////////////////////////////////*/
88// Allows user to change the frequency of data collection
89//
90//
91/*///////////////////////////////////////////////////////////////////////*/
92func (s * Device) UpdateFreq(wd uint32)(error, string) {
93 fmt.Println("Received Period\n")
94 s.deviceinfo.Frequency = wd
95 _, err := cc.SetFrequency(ctx, s.deviceinfo)
96
97 if err != nil {
98 return err, "period error!!"
99 }else{
100 return nil,""
101 }
102}
103
104/*///////////////////////////////////////////////////////////////////////*/
105// Allows user to unsubscribe events
106//
107//
108/*///////////////////////////////////////////////////////////////////////*/
109func (s * Device) Subscribe(eventlist []string) (error, string) {
110 fmt.Println("Received Subscribe\n")
111 s.eventlist.Events = eventlist
112 s.eventlist.EventIpAddress = s.deviceinfo.IpAddress
113 _, err := cc.SubsrcribeGivenEvents(ctx, s.eventlist)
114
115 if err != nil {
116 return err, "sub error!!"
117 }else{
118 return nil,""
119 }
120}
121
122/*///////////////////////////////////////////////////////////////////////*/
123// Allows user to unsubscribe events
124//
125//
126/*///////////////////////////////////////////////////////////////////////*/
127func (s * Device) UnSubscribe(eventlist []string) (error, string) {
128 fmt.Println("Received UnSubscribe\n")
129 s.eventlist.Events = eventlist
130 s.eventlist.EventIpAddress = s.deviceinfo.IpAddress
131 _, err := cc.UnSubsrcribeGivenEvents(ctx, s.eventlist)
132
133 if err != nil {
134 return err, "unsub error!!"
135 }else{
136 return nil,""
137 }
138}
139
140/*///////////////////////////////////////////////////////////////////////*/
141// Allows user to get the events supported by device
142//
143//
144/*///////////////////////////////////////////////////////////////////////*/
145func (s * Device) GetEventSupportList() (error, []string) {
146 fmt.Println("Received GetEventSupportList\n")
147 var ret_msg * importer.SupportedEventList
148 ret_msg, err :=cc.GetEventList(ctx, devicemap[s.deviceinfo.IpAddress].deviceinfo);
149 if err != nil {
150 return err,ret_msg.Events
151 }else{
152 fmt.Println("show all event subs:", ret_msg)
153 return nil , ret_msg.Events
154 }
155}
156
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000157func init() {
158 Formatter := new(log.TextFormatter)
159 Formatter.TimestampFormat = "02-01-2006 15:04:05"
160 Formatter.FullTimestamp = true
161 log.SetFormatter(Formatter)
162}
163
164func topicListener(topic *string, master sarama.Consumer) {
165 log.Info("Starting topicListener for ", *topic)
166 consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
167 if err != nil {
168 log.Error("topicListener panic, topic=[%s]: %s", *topic, err.Error())
169 os.Exit(1)
170 }
171 signals := make(chan os.Signal, 1)
172 signal.Notify(signals, os.Interrupt)
173 doneCh := make(chan struct{})
174 go func() {
175 for {
176 select {
177 case err := <-consumer.Errors():
178 log.Error("Consumer error: %s", err.Err)
179 case msg := <-consumer.Messages():
180 log.Info("Got message on topic=[%s]: %s", *topic, string(msg.Value))
181 case <-signals:
182 log.Warn("Interrupt is detected")
nickhuang6b31f8f2019-09-26 02:02:14 +0000183 os.Exit(1)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000184 }
185 }
186 }()
187 <-doneCh
188}
189
190func kafkainit() {
191 cmd := exec.Command("/bin/sh","kafka_ip.sh")
192 var kafkaIP string
193 var out bytes.Buffer
194 cmd.Stdout = &out
195 err := cmd.Run()
196 if err != nil {
197 log.Info(err)
198 os.Exit(1)
199 }
200
201 kafkaIP = out.String()
202 kafkaIP = strings.TrimSuffix(kafkaIP, "\n")
203 kafkaIP = kafkaIP +":9092"
204 fmt.Println("IP address of kafka-cord-0:",kafkaIP)
205 config := sarama.NewConfig()
206 config.Consumer.Return.Errors = true
207 master, err := sarama.NewConsumer([]string{kafkaIP}, config)
208 if err != nil {
209 panic(err)
210 }
211 DataConsumer = master
212
213 go topicListener(&importerTopic, master)
214}
215func main() {
nickhuang6b31f8f2019-09-26 02:02:14 +0000216 http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
217 fmt.Println("Launching server...")
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000218 log.Info("kafkaInit starting")
219 kafkainit()
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000220
nickhuang6b31f8f2019-09-26 02:02:14 +0000221 ln, err := net.Listen("tcp", ":9999")
222 if err != nil {
223 fmt.Println("could not listen")
224 log.Fatal("did not listen: %v", err)
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000225 }
nickhuang6b31f8f2019-09-26 02:02:14 +0000226 defer ln.Close()
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000227
nickhuang6b31f8f2019-09-26 02:02:14 +0000228 connS, err := ln.Accept()
229 if err != nil {
230 fmt.Println("Accept error")
231 log.Fatal("Accept error: %v", err)
232 }else{
233
234 conn, err = grpc.Dial(default_address, grpc.WithInsecure())
235 if err != nil {
236 fmt.Println("could not connect")
237 log.Fatal("did not connect: %v", err)
238 }
239 defer conn.Close()
240
241 cc = importer.NewDeviceManagementClient(conn)
242 ctx = context.Background()
243
244 devicemap = make(map[string] *Device)
245 loop := true
246
247 for loop == true {
248 cmd, _ := bufio.NewReader(connS).ReadString('\n')
249
250 cmd = strings.TrimSuffix(cmd, "\n")
251 s := strings.Split(cmd, ":")
252 newmessage := "cmd error!!"
253 cmd = s[0]
254
255 switch string(cmd) {
256
257 case "attach" :
258 cmd_size := len(s)
259 var err error
260 var aport string = default_port
261 var avendor string = default_vendor
262 var uafreq uint64 = default_freq
263
264 if (cmd_size == 2 || cmd_size == 5){
265 aip := s[1]
266 if(cmd_size == 5){
267 aport = s[2]
268 avendor = s[3]
269 afreq := s[4]
270 uafreq, err = strconv.ParseUint(afreq, 10, 64)
271
272 if err != nil {
273 fmt.Print("ParseUint error!!")
274 }
275
276 attach_device_ip = aip + ":" + aport
277 }else{
278 attach_device_ip = aip + ":" + default_port
279 }
280
281 if (devicemap[attach_device_ip] == nil){
282 dev := new (Device)
283 err, newmessage = dev.Attach(attach_device_ip, avendor, uint32(uafreq))
284 if err != nil {
285 fmt.Print("attach error!!")
286 }else{
287 fmt.Print("attatch IP:", attach_device_ip)
288 newmessage = attach_device_ip
289 devicemap[attach_device_ip] = dev
290 }
291 }else{
292 fmt.Print("Change attach IP to %v", attach_device_ip)
293 newmessage = attach_device_ip
294 }
295 }else{
296 fmt.Print("Need IP address !!")
297 newmessage = "Need IP address !!"
298
299 }
300 break
301
302 case "period" :
303 cmd_size := len(s)
304 if (cmd_size == 2 ){
305 if (devicemap[attach_device_ip] != nil){
306 pv := s[1]
307 fmt.Print("pv:", pv)
308 u, err := strconv.ParseUint(pv, 10, 64)
309
310 if err != nil {
311 fmt.Print("ParseUint error!!")
312 }else{
313 wd := uint32(u)
314 dev := devicemap[attach_device_ip]
315 err, newmessage = dev.UpdateFreq(wd)
316
317 if err != nil {
318 fmt.Print("period error!!")
319 }else{
320 newmessage = strings.ToUpper(cmd)
321 }
322 }
323 }else{
324 fmt.Print("need attach first!!")
325 newmessage = "need attach first!!"
326 }
327 }else{
328 fmt.Print("Need period value !!")
329 newmessage = "Need period value !!"
330 }
331
332 break
333
334 case "sub","unsub" :
335 cmd_size := len(s)
336 fmt.Print("cmd is :", cmd)
337 if(cmd_size > 4 || cmd_size <0){
338 fmt.Print("error event !!")
339 newmessage = "error event !!"
340 }else{
341 var events_list []string
342 for i := 1; i < cmd_size; i++ {
343 if value, ok := EVENTS_MAP[s[i]]; ok {
344 events_list = append(events_list,value)
345 } else {
346 fmt.Println("key not found")
347 }
348 }
349
350 if (devicemap[attach_device_ip] != nil){
351 dev := devicemap[attach_device_ip]
352 if(string(cmd) == "sub"){
353 err, newmessage = dev.Subscribe(events_list)
354 if err != nil {
355 fmt.Print("sub error!!")
356 newmessage = "sub error!!"
357 }else{
358 newmessage = strings.ToUpper(cmd)
359 }
360 }else{
361 err, newmessage = dev.UnSubscribe(events_list)
362 if err != nil {
363 fmt.Print("unsub error!!")
364 newmessage = "unsub error!!"
365 }else{
366 newmessage = strings.ToUpper(cmd)
367 }
368 }
369 }else{
370 fmt.Print("need attach first !!")
371 newmessage = "need attach first !!"
372 }
373 }
374 break
375
376 case "showeventlist" :
377 if (devicemap[attach_device_ip] != nil){
378 dev := devicemap[attach_device_ip]
379 err, supportlist := dev.GetEventSupportList()
380
381 if err != nil {
382 fmt.Print("showeventlist error!!")
383 }else{
384 fmt.Print("showeventlist ", supportlist)
385 newmessage = strings.Join(supportlist[:],",")
386 }
387 }else{
388 fmt.Print("need attach first !!")
389 newmessage = "need attach first !!"
390 }
391
392 break
393
394 case "QUIT" :
395 loop = false
396 newmessage="QUIT"
397 break
398
399 default :
400 break
401 }
402 // send string back to client
403 connS.Write([]byte(newmessage + "\n"))
404 }
405 }
Dinesh Belwalkare1e85ad2019-07-31 23:06:47 +0000406}