[SEBA-892] Resolve any SCA violations in Redfish Importer Errors are in Jira
Change-Id: If6cb8058d46f549dc14d6d66fee82f1bc7fb77a6
diff --git a/Dockerfile b/Dockerfile
index 8b63327..9491349 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -24,7 +24,7 @@
ENV GO111MODULE=on
ENV PROTOC_VERSION 3.6.1
ENV PROTOC_SHA256SUM 6003de742ea3fcf703cfec1cd4a3380fd143081a2eb0e559065563496af27807
-RUN apt-get update && apt-get install --no-install-recommends -y \
+RUN apt-get update && apt-get install --no-install-recommends -y --allow-downgrades \
git=1:2.20.1-2 \
gcc=4:8.3.0-1 \
curl=7.64.0-4 \
diff --git a/Makefile b/Makefile
index 575cac2..9d06f64 100644
--- a/Makefile
+++ b/Makefile
@@ -98,7 +98,7 @@
fi
@echo "Style check OK"
-lint-sanity:
+lint-sanity:proto/importer.pb.go
@echo "Running sanity check..."
@go vet -mod=vendor ./...
@echo "Sanity check OK"
diff --git a/data_collector.go b/data_collector.go
index f1bc1b1..4572526 100644
--- a/data_collector.go
+++ b/data_collector.go
@@ -16,7 +16,7 @@
import (
"encoding/json"
- "fmt"
+ logrus "github.com/sirupsen/logrus"
"io/ioutil"
"net/http"
)
@@ -69,12 +69,12 @@
defer resp.Body.Close()
}
if err != nil {
- fmt.Println(err)
+ logrus.Errorf("Error http get %s", err)
return
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
- fmt.Println(err)
+ logrus.Errorf("Error Read %s", err)
return
}
@@ -83,7 +83,7 @@
m := map[string]interface{}{}
err = json.Unmarshal([]byte(body), &m)
if err != nil {
- fmt.Println(err)
+ logrus.Errorf("Error Unmarshal %s", err)
return
}
diff --git a/demo_test/cmd_client/cmd_cl.go b/demo_test/cmd_client/cmd_cl.go
index da19079..ea704bd 100644
--- a/demo_test/cmd_client/cmd_cl.go
+++ b/demo_test/cmd_client/cmd_cl.go
@@ -22,7 +22,6 @@
func main() {
// connect to this socket
- var message string = ""
conn, _ := net.Dial("tcp", "127.0.0.1:9999")
reader := bufio.NewReader(os.Stdin)
for {
@@ -34,7 +33,7 @@
fmt.Fprintf(conn, text+"\n")
// listen for reply
- message, _ = bufio.NewReader(conn).ReadString('\n')
+ message, _ := bufio.NewReader(conn).ReadString('\n')
fmt.Print("Return from server: " + message)
if message == "QUIT\n" {
diff --git a/demo_test/functional_test/test_cli.go b/demo_test/functional_test/test_cli.go
index 114c51b..3a1bc95 100644
--- a/demo_test/functional_test/test_cli.go
+++ b/demo_test/functional_test/test_cli.go
@@ -22,14 +22,13 @@
func main() {
// connect to this socket
- var message string = ""
cmdstr := strings.Join(os.Args[1:], " ")
conn, _ := net.Dial("tcp", "127.0.0.1:9999")
// send to socket
- fmt.Fprintf(conn, cmdstr + "\n")
+ fmt.Fprintf(conn, cmdstr+"\n")
// listen for reply
- message, _ = bufio.NewReader(conn).ReadString(';')
+ message, _ := bufio.NewReader(conn).ReadString(';')
message = strings.TrimSuffix(message, ";")
fmt.Print(message)
}
diff --git a/demo_test/test.go b/demo_test/test.go
index e67f200..438ae74 100644
--- a/demo_test/test.go
+++ b/demo_test/test.go
@@ -21,7 +21,7 @@
"fmt"
"github.com/Shopify/sarama"
"github.com/opencord/device-management/demo_test/proto"
- log "github.com/sirupsen/logrus"
+ logrus "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/status"
@@ -30,13 +30,11 @@
"os"
"os/exec"
"os/signal"
+ "sort"
"strconv"
"strings"
- "sort"
)
-var REDFISH_ROOT = "/redfish/v1"
-var CONTENT_TYPE = "application/json"
var EVENTS_MAP = map[string]string{
"add": "ResourceAdded",
"rm": "ResourceRemoved",
@@ -52,7 +50,7 @@
var conn *grpc.ClientConn
func GetCurrentDevices() (error, []string) {
- fmt.Println("Testing GetCurrentDevices")
+ logrus.Info("Testing GetCurrentDevices")
empty := new(importer.Empty)
var ret_msg *importer.DeviceListByIp
ret_msg, err := cc.GetCurrentDevices(ctx, empty)
@@ -64,17 +62,17 @@
}
func init() {
- Formatter := new(log.TextFormatter)
+ Formatter := new(logrus.TextFormatter)
Formatter.TimestampFormat = "02-01-2006 15:04:05"
Formatter.FullTimestamp = true
- log.SetFormatter(Formatter)
+ logrus.SetFormatter(Formatter)
}
func topicListener(topic *string, master sarama.Consumer) {
- log.Info("Starting topicListener for ", *topic)
+ logrus.Info("Starting topicListener for ", *topic)
consumer, err := master.ConsumePartition(*topic, 0, sarama.OffsetOldest)
if err != nil {
- log.Errorf("topicListener panic, topic=[%s]: %s", *topic, err.Error())
+ logrus.Errorf("topicListener panic, topic=[%s]: %s", *topic, err.Error())
os.Exit(1)
}
signals := make(chan os.Signal, 1)
@@ -84,11 +82,11 @@
for {
select {
case err := <-consumer.Errors():
- log.Errorf("Consumer error: %s", err.Err)
+ logrus.Errorf("Consumer error: %s", err.Err)
case msg := <-consumer.Messages():
- log.Infof("Got message on topic=[%s]: %s", *topic, string(msg.Value))
+ logrus.Infof("Got message on topic=[%s]: %s", *topic, string(msg.Value))
case <-signals:
- log.Warn("Interrupt is detected")
+ logrus.Warn("Interrupt is detected")
os.Exit(1)
}
}
@@ -103,14 +101,14 @@
cmd.Stdout = &out
err := cmd.Run()
if err != nil {
- log.Info(err)
+ logrus.Info(err)
os.Exit(1)
}
kafkaIP = out.String()
kafkaIP = strings.TrimSuffix(kafkaIP, "\n")
kafkaIP = kafkaIP + ":9092"
- fmt.Println("IP address of kafka-cord-0:", kafkaIP)
+ logrus.Infof("IP address of kafka-cord-0:%s", kafkaIP)
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
master, err := sarama.NewConsumer([]string{kafkaIP}, config)
@@ -123,21 +121,20 @@
}
func main() {
http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
- fmt.Println("Launching server...")
- log.Info("kafkaInit starting")
+ logrus.Info("Launching server...")
+ logrus.Info("kafkaInit starting")
kafkainit()
ln, err := net.Listen("tcp", ":9999")
if err != nil {
fmt.Println("could not listen")
- log.Fatalf("did not listen: %v", err)
+ logrus.Fatalf("did not listen: %v", err)
}
defer ln.Close()
conn, err = grpc.Dial(default_address, grpc.WithInsecure())
if err != nil {
- fmt.Println("could not connect")
- log.Fatalf("did not connect: %v", err)
+ logrus.Fatalf("did not connect: %v", err)
}
defer conn.Close()
@@ -146,11 +143,10 @@
loop := true
- for loop == true {
+ for loop {
connS, err := ln.Accept()
if err != nil {
- fmt.Println("Accept error")
- log.Fatal("Accept error: %v", err)
+ logrus.Fatalf("Accept error: %v", err)
}
cmdstr, _ := bufio.NewReader(connS).ReadString('\n')
cmdstr = strings.TrimSuffix(cmdstr, "\n")
@@ -191,7 +187,7 @@
if err != nil {
errStatus, _ := status.FromError(err)
newmessage = newmessage + errStatus.Message()
- fmt.Printf("attach error - status code %v message %v", errStatus.Code(), errStatus.Message())
+ logrus.Errorf("attach error - status code %v message %v", errStatus.Code(), errStatus.Message())
} else {
sort.Strings(ipattached)
ips := strings.Join(ipattached, " ")
@@ -218,7 +214,7 @@
if err != nil {
errStatus, _ := status.FromError(err)
newmessage = newmessage + errStatus.Message()
- fmt.Printf("delete error - status code %v message %v", errStatus.Code(), errStatus.Message())
+ logrus.Errorf("delete error - status code %v message %v", errStatus.Code(), errStatus.Message())
} else {
sort.Strings(devicelist.Ip)
ips := strings.Join(devicelist.Ip, " ")
@@ -238,7 +234,7 @@
pv := args[2]
u, err := strconv.ParseUint(pv, 10, 64)
if err != nil {
- fmt.Print("ParseUint error!!\n")
+ logrus.Error("ParseUint error!!\n")
} else {
freqinfo := new(importer.FreqInfo)
freqinfo.Frequency = uint32(u)
@@ -248,7 +244,7 @@
if err != nil {
errStatus, _ := status.FromError(err)
newmessage = newmessage + errStatus.Message()
- fmt.Printf("period error - status code %v message %v", errStatus.Code(), errStatus.Message())
+ logrus.Errorf("period error - status code %v message %v", errStatus.Code(), errStatus.Message())
} else {
newmessage = newmessage + "data collection interval configured to " + pv + " seconds\n"
}
@@ -282,7 +278,7 @@
if err != nil {
errStatus, _ := status.FromError(err)
newmessage = newmessage + errStatus.Message()
- fmt.Printf("Un/subscribe error - status code %v message %v", errStatus.Code(), errStatus.Message())
+ logrus.Errorf("Un/subscribe error - status code %v message %v", errStatus.Code(), errStatus.Message())
} else {
newmessage = newmessage + cmd + " successful\n"
}
@@ -297,7 +293,7 @@
if err != nil {
errStatus, _ := status.FromError(err)
newmessage = errStatus.Message()
- fmt.Printf("showeventlist error - status code %v message %v", errStatus.Code(), errStatus.Message())
+ logrus.Errorf("showeventlist error - status code %v message %v", errStatus.Code(), errStatus.Message())
} else {
fmt.Print("showeventlist ", ret_msg.Events)
sort.Strings(ret_msg.Events[:])
@@ -314,7 +310,7 @@
ret_msg, err := cc.GetCurrentEventList(ctx, currentdeviceinfo)
if err != nil {
errStatus, _ := status.FromError(err)
- fmt.Printf("showdeviceeventlist error - status code %v message %v", errStatus.Code(), errStatus.Message())
+ logrus.Errorf("showdeviceeventlist error - status code %v message %v", errStatus.Code(), errStatus.Message())
newmessage = newmessage + errStatus.Message()
} else {
fmt.Print("showdeviceeventlist ", ret_msg.Events)
@@ -333,7 +329,7 @@
if err != nil {
errStatus, _ := status.FromError(err)
newmessage = newmessage + errStatus.Message()
- fmt.Printf("cleardeviceeventlist error - status code %v message %v", errStatus.Code(), errStatus.Message())
+ logrus.Errorf("cleardeviceeventlist error - status code %v message %v", errStatus.Code(), errStatus.Message())
} else {
newmessage = newmessage + currentdeviceinfo.IpAddress + " events cleared\n"
}
@@ -343,17 +339,16 @@
case "showdevices":
cmd_size := len(s)
- fmt.Print("cmd is :", cmd, cmd_size)
+ logrus.Infof("cmd is : %s cmd_size: %d", cmd, cmd_size)
if cmd_size > 2 || cmd_size < 0 {
- fmt.Print("error event !!")
+ logrus.Error("error event showdevices !!")
newmessage = "error event !!"
} else {
err, currentlist := GetCurrentDevices()
if err != nil {
errStatus, _ := status.FromError(err)
- fmt.Println(errStatus.Message())
- fmt.Println(errStatus.Code())
+ logrus.Errorf("GetCurrentDevice error: %s Status code: %d", errStatus.Message(), errStatus.Code())
newmessage = errStatus.Message()
fmt.Print("showdevices error!!")
} else {
@@ -366,7 +361,11 @@
default:
newmessage = newmessage + "invalid command " + cmdstr + "\n"
}
- // send string back to client
- connS.Write([]byte(newmessage + ";"))
+ // send string back to client
+ n, err := connS.Write([]byte(newmessage + ";"))
+ if err != nil {
+ logrus.Errorf("err writing to client:%s, n:%d", err, n)
+ return
+ }
}
}
diff --git a/event_subscriber.go b/event_subscriber.go
index fd6e013..a6740b4 100644
--- a/event_subscriber.go
+++ b/event_subscriber.go
@@ -18,6 +18,7 @@
"bytes"
"encoding/json"
"fmt"
+ logrus "github.com/sirupsen/logrus"
"io/ioutil"
"net/http"
"os"
@@ -36,6 +37,10 @@
subscrpt_info["Destination"] = RF_DEFAULT_PROTOCOL + destip
subscrpt_info["EventTypes"] = []string{event}
sRequestJson, err := json.Marshal(subscrpt_info)
+ if err != nil {
+ logrus.Errorf("Error JasonMarshal %s", err)
+ return
+ }
uri := RF_DEFAULT_PROTOCOL + ip + RF_SUBSCRIPTION
client := s.httpclient
resp, err := client.Post(uri, CONTENT_TYPE, bytes.NewBuffer(sRequestJson))
@@ -43,16 +48,20 @@
defer resp.Body.Close()
}
if err != nil {
- fmt.Println(err)
+ logrus.Errorf("client post error %s", err)
return
}
if resp.StatusCode != 201 {
result := make(map[string]interface{})
- json.NewDecoder(resp.Body).Decode(&result)
- fmt.Println(result)
+ dec := json.NewDecoder(resp.Body)
+ if err := dec.Decode(&result); err != nil {
+ logrus.Errorf("ERROR while adding event subscription:%s " + err.Error())
+ return
+ }
+ logrus.Infof("Result Decode %s", result)
fmt.Println(result["data"])
- fmt.Println("Add ", event, " subscription failed. HTTP response status: ", resp.Status)
+ logrus.Errorf("Add %s subscription failed. HTTP response status:%s ", event, resp.Status)
return
}
rtn = true
@@ -61,7 +70,7 @@
match := re.FindStringSubmatch(loc[0])
s.devicemap[ip].Subscriptions[event] = match[1]
- fmt.Println("Subscription", event, "id", match[1], "was successfully added")
+ logrus.Infof("Subscription %s id %s was successfully added", event, match[1])
return
}
@@ -74,48 +83,52 @@
defer resp.Body.Close()
}
if err != nil {
- fmt.Println(err)
+ logrus.Errorf("Error DefaultClient.Do %s", err)
return false
}
if code := resp.StatusCode; code < 200 && code > 299 {
result := make(map[string]interface{})
- json.NewDecoder(resp.Body).Decode(&result)
- fmt.Println(result)
+ dec := json.NewDecoder(resp.Body)
+ if err := dec.Decode(&result); err != nil {
+ logrus.Errorf("ERROR while removing event subscription: %s ", err.Error())
+ return false
+ }
+ logrus.Infof("Result %s", result)
fmt.Println(result["data"])
- fmt.Println("Remove subscription failed. HTTP response status:", resp.Status)
+ logrus.Errorf("Remove subscription failed. HTTP response status:%s", resp.Status)
return false
}
delete(s.devicemap[ip].Subscriptions, event)
- fmt.Println("Subscription id", id, "was successfully removed")
+ logrus.Infof("Subscription id %s was successfully removed", id)
return true
}
func (s *Server) get_event_types(ip string) (eventtypes []string) {
resp, err := http.Get(RF_DEFAULT_PROTOCOL + ip + RF_EVENTSERVICE)
- fmt.Println("get_event_types")
+ logrus.Info("get_event_types")
if resp != nil {
defer resp.Body.Close()
}
if err != nil {
- fmt.Println(err)
+ logrus.Errorf("http get Error %s", err)
return
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
- fmt.Println(err)
+ logrus.Errorf("Read error %s", err)
return
}
m := map[string]interface{}{}
err = json.Unmarshal([]byte(body), &m)
if err != nil {
- fmt.Println(err)
+ logrus.Errorf("ErrorUnmarshal %s", err)
return
}
e := m["EventTypesForSubscription"].([]interface{})
- fmt.Printf("supported event types %v\n", e)
+ logrus.Infof("supported event types %v\n", e)
for _, val := range e {
eventtypes = append(eventtypes, val.(string))
}
diff --git a/go.mod b/go.mod
index 566a094..9603bf3 100644
--- a/go.mod
+++ b/go.mod
@@ -3,7 +3,7 @@
go 1.12
require (
- github.com/Shopify/sarama v1.24.1
+ github.com/Shopify/sarama v1.25.0
github.com/golang/protobuf v1.3.2
github.com/grpc-ecosystem/grpc-gateway v1.12.1 // indirect
github.com/klauspost/cpuid v1.2.1 // indirect
@@ -11,6 +11,6 @@
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 // indirect
golang.org/x/net v0.0.0-20191119073136-fc4aabc6c914
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 // indirect
- google.golang.org/grpc v1.25.1
+ google.golang.org/grpc v1.26.0
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc // indirect
)
diff --git a/go.sum b/go.sum
index 92e037b..61712d5 100644
--- a/go.sum
+++ b/go.sum
@@ -2,6 +2,8 @@
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Shopify/sarama v1.24.1 h1:svn9vfN3R1Hz21WR2Gj0VW9ehaDGkiOS+VqlIcZOkMI=
github.com/Shopify/sarama v1.24.1/go.mod h1:fGP8eQ6PugKEI0iUETYYtnP6d1pH/bdDMTel1X5ajsU=
+github.com/Shopify/sarama v1.25.0 h1:ch1ywjRLjfJtU+EaiJ+l0rWffQ6TRpyYmW4DX7Cb2SU=
+github.com/Shopify/sarama v1.25.0/go.mod h1:y/CFFTO9eaMTNriwu/Q+W4eioLqiDMGkA1W+gmdfj8w=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/antihax/optional v0.0.0-20180407024304-ca021399b1a6/go.mod h1:V8iCPQYkqmusNa815XgQio277wI47sdRh1dUOLdyC6Q=
@@ -17,6 +19,7 @@
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
+github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
@@ -43,6 +46,8 @@
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/klauspost/compress v1.8.2 h1:Bx0qjetmNjdFXASH02NSAREKpiaDwkO1DRZ3dV2KCcs=
github.com/klauspost/compress v1.8.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
+github.com/klauspost/compress v1.9.7 h1:hYW1gP94JUmAhBtJ+LNz5My+gBobDxPR1iVuKug26aA=
+github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w=
github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
@@ -111,6 +116,8 @@
google.golang.org/grpc v1.24.0/go.mod h1:XDChyiUovWa60DnaeDeZmSW86xtLtjtZbwvSiRnRtcA=
google.golang.org/grpc v1.25.1 h1:wdKvqQk7IttEw92GoRyKG2IDrUIpgpj6H6m81yfeMW0=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
+google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg=
+google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw=
gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo=
diff --git a/main.go b/main.go
index c17926c..c27b8e2 100644
--- a/main.go
+++ b/main.go
@@ -21,7 +21,7 @@
"github.com/Shopify/sarama"
empty "github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/device-management/proto"
- log "github.com/sirupsen/logrus"
+ logrus "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@@ -32,9 +32,9 @@
"os"
"os/signal"
"path"
- "time"
- "strings"
"strconv"
+ "strings"
+ "time"
)
//globals
@@ -47,8 +47,6 @@
importerTopic = "importer"
)
-var DataProducer sarama.AsyncProducer
-
var redfish_resources = [...]string{"/redfish/v1/Chassis", "/redfish/v1/Systems", "/redfish/v1/EthernetSwitches"}
var pvmount = os.Getenv("DEVICE_MANAGEMENT_PVMOUNT")
var subscriptionListPath string
@@ -76,15 +74,15 @@
}
func (s *Server) ClearCurrentEventList(c context.Context, info *importer.Device) (*empty.Empty, error) {
- fmt.Println("Received ClearCurrentEventList")
+ logrus.Info("Received ClearCurrentEventList")
ip_address := info.IpAddress
if msg, ok := s.validate_ip(ip_address, true, true); !ok {
return nil, status.Errorf(codes.InvalidArgument, msg)
}
- for event, _ := range s.devicemap[ip_address].Subscriptions {
+ for event := range s.devicemap[ip_address].Subscriptions {
rtn := s.remove_subscription(ip_address, event)
if !rtn {
- log.WithFields(log.Fields{
+ logrus.WithFields(logrus.Fields{
"Event": event,
}).Info("Error removing event")
}
@@ -94,24 +92,23 @@
}
func (s *Server) GetCurrentEventList(c context.Context, info *importer.Device) (*importer.EventList, error) {
- fmt.Println("Received GetCurrentEventList\n")
+ logrus.Info("Received GetCurrentEventList")
if msg, ok := s.validate_ip(info.IpAddress, true, true); !ok {
return nil, status.Errorf(codes.InvalidArgument, msg)
}
currentevents := new(importer.EventList)
- for event, _ := range s.devicemap[info.IpAddress].Subscriptions {
+ for event := range s.devicemap[info.IpAddress].Subscriptions {
currentevents.Events = append(currentevents.Events, event)
}
return currentevents, nil
}
func (s *Server) GetEventList(c context.Context, info *importer.Device) (*importer.EventList, error) {
- fmt.Println("Received GetEventList\n")
+ logrus.Info("Received GetEventList")
if msg, ok := s.validate_ip(info.IpAddress, true, true); !ok {
return nil, status.Errorf(codes.InvalidArgument, msg)
}
eventstobesubscribed := new(importer.EventList)
-// eventstobesubscribed.Events = s.devicemap[info.IpAddress].Eventtypes
eventstobesubscribed.Events = s.devicemap[info.IpAddress].Eventtypes
if eventstobesubscribed.Events == nil {
return nil, status.Errorf(codes.NotFound, "No events found\n")
@@ -120,7 +117,7 @@
}
func (s *Server) SetFrequency(c context.Context, info *importer.FreqInfo) (*empty.Empty, error) {
- fmt.Println("Received SetFrequency")
+ logrus.Info("Received SetFrequency")
if msg, ok := s.validate_ip(info.IpAddress, true, true); !ok {
return nil, status.Errorf(codes.InvalidArgument, msg)
}
@@ -135,7 +132,7 @@
func (s *Server) SubscribeGivenEvents(c context.Context, subeventlist *importer.GivenEventList) (*empty.Empty, error) {
errstring := ""
- fmt.Println("Received SubsrcribeEvents")
+ logrus.Info("Received SubsrcribeEvents")
//Call API to subscribe events
ip_address := subeventlist.EventIpAddress
if msg, ok := s.validate_ip(ip_address, true, true); !ok {
@@ -147,28 +144,28 @@
for _, event := range subeventlist.Events {
if _, ok := s.devicemap[ip_address].Subscriptions[event]; !ok {
supported := false
- for _, e := range s.devicemap[ip_address].Eventtypes{
+ for _, e := range s.devicemap[ip_address].Eventtypes {
if e == event {
supported = true
rtn := s.add_subscription(ip_address, event)
if !rtn {
errstring = errstring + "failed to subscribe event " + ip_address + " " + event + "\n"
- log.WithFields(log.Fields{
+ logrus.WithFields(logrus.Fields{
"Event": event,
}).Info("Error adding event")
}
break
}
}
- if supported == false {
+ if !supported {
errstring = errstring + "event " + event + " not supported\n"
- log.WithFields(log.Fields{
+ logrus.WithFields(logrus.Fields{
"Event": event,
}).Info("not supported")
}
} else {
errstring = errstring + "event " + event + " already subscribed\n"
- log.WithFields(log.Fields{
+ logrus.WithFields(logrus.Fields{
"Event": event,
}).Info("Already Subscribed")
}
@@ -182,7 +179,7 @@
func (s *Server) UnsubscribeGivenEvents(c context.Context, unsubeventlist *importer.GivenEventList) (*empty.Empty, error) {
errstring := ""
- fmt.Println("Received UnSubsrcribeEvents")
+ logrus.Info("Received UnSubsrcribeEvents")
ip_address := unsubeventlist.EventIpAddress
if msg, ok := s.validate_ip(ip_address, true, true); !ok {
return nil, status.Errorf(codes.InvalidArgument, msg)
@@ -197,13 +194,13 @@
rtn := s.remove_subscription(ip_address, event)
if !rtn {
errstring = errstring + "failed to unsubscribe event " + ip_address + " " + event + "\n"
- log.WithFields(log.Fields{
+ logrus.WithFields(logrus.Fields{
"Event": event,
}).Info("Error removing event")
}
} else {
errstring = errstring + "event " + event + " not found\n"
- log.WithFields(log.Fields{
+ logrus.WithFields(logrus.Fields{
"Event": event,
}).Info("was not Subscribed")
}
@@ -221,18 +218,27 @@
if f != nil {
b, err := json.Marshal(s.devicemap[ip_address])
if err != nil {
- fmt.Println(err)
+ logrus.Errorf("Update_data_file %s", err)
} else {
- f.Truncate(0)
- f.Seek(0, 0)
+ err := f.Truncate(0)
+ if err != nil {
+ logrus.Errorf("err Trunate %s", err)
+ return
+ }
+ pos, err := f.Seek(0, 0)
+ if err != nil {
+ logrus.Errorf("err Seek %s", err)
+ return
+ }
+ fmt.Println("moved back to", pos)
n, err := f.Write(b)
if err != nil {
- fmt.Println("err wrote", n, "bytes")
- fmt.Println(err)
+ logrus.Errorf("err wrote %d bytes", n)
+ logrus.Errorf("write error to file %s", err)
}
}
} else {
- fmt.Println("file handle is nil", ip_address)
+ logrus.Errorf("file handle is nil %s", ip_address)
}
}
@@ -249,7 +255,7 @@
s.devicemap[ip_address].Datacollector.getdata = ticker
}
case err := <-s.dataproducer.Errors():
- fmt.Println("Failed to produce message:", err)
+ logrus.Errorf("Failed to produce message:%s", err)
case <-ticker.C:
for _, resource := range redfish_resources {
data := s.get_status(ip_address, resource)
@@ -260,14 +266,14 @@
msg := &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
select {
case s.dataproducer.Input() <- msg:
- fmt.Println("Produce message")
+ logrus.Info("Produce message")
default:
}
}
}
case <-donechan:
ticker.Stop()
- fmt.Println("getdata ticker stopped")
+ logrus.Info("getdata ticker stopped")
s.devicemap[ip_address].Datacollector.getdataend <- true
return
}
@@ -275,39 +281,39 @@
}
func (s *Server) DeleteDeviceList(c context.Context, list *importer.DeviceListByIp) (*empty.Empty, error) {
- fmt.Println("DeleteDeviceList received")
+ logrus.Info("DeleteDeviceList received")
errstring := ""
for _, ip := range list.Ip {
if _, ok := s.devicemap[ip]; !ok {
- fmt.Printf("Device not found %s\n ", ip)
+ logrus.Infof("Device not found %s ", ip)
errstring = errstring + "Device " + ip + " not found\n"
continue
}
- for event, _ := range s.devicemap[ip].Subscriptions {
+ for event := range s.devicemap[ip].Subscriptions {
rtn := s.remove_subscription(ip, event)
if !rtn {
- log.WithFields(log.Fields{
+ logrus.WithFields(logrus.Fields{
"Event": event,
}).Info("Error removing event")
}
}
- fmt.Println("deleting device", ip)
+ logrus.Infof("deleting device %s", ip)
s.devicemap[ip].Datacollector.quit <- true
f := s.devicemap[ip].Datafile
if f != nil {
- fmt.Println("deleteing file", f.Name())
+ logrus.Infof("deleteing file %s", f.Name())
err := f.Close()
if err != nil {
- fmt.Println("error closing file ", f.Name(), err)
+ logrus.Errorf("error closing file %s %s", f.Name(), err)
errstring = errstring + "error closing file " + f.Name() + "\n"
}
err = os.Remove(f.Name())
if err != nil {
- fmt.Println("error deleting file ", f.Name(), err)
+ logrus.Errorf("error deleting file %s Error:%s ", f.Name(), err)
}
} else {
- errstring = errstring + "file " + ip + " not found\n"
+ logrus.Errorf("File not found %s", errstring+"file "+ip+" not found")
}
<-s.devicemap[ip].Datacollector.getdataend
delete(s.devicemap, ip)
@@ -327,7 +333,7 @@
continue
}
if dev.Frequency > 0 && dev.Frequency < RF_DATA_COLLECT_THRESHOLD {
- fmt.Printf("Device %s data collection interval %d out of range", ip_address, dev.Frequency)
+ logrus.Errorf("Device %s data collection interval %d out of range", ip_address, dev.Frequency)
errstring = errstring + "Device " + ip_address + " data collection frequency out of range\n"
continue
}
@@ -341,7 +347,7 @@
Freqchan: make(chan uint32),
}
s.devicemap[ip_address] = &d
- fmt.Printf("Configuring %s\n", ip_address)
+ logrus.Infof("Configuring %s", ip_address)
/* if initial interval is 0, create a dummy ticker, which is stopped right away, so getdata is not nil */
freq := dev.Frequency
@@ -354,14 +360,13 @@
}
eventtypes := s.get_event_types(ip_address)
- if eventtypes != nil {
- for _, event := range eventtypes {
- s.devicemap[ip_address].Eventtypes = append(s.devicemap[ip_address].Eventtypes, event)
- if s.add_subscription(ip_address, event) == false {
- errstring = errstring + "failed to subscribe event " + ip_address + " " + event + "\n"
- }
+ for _, event := range eventtypes {
+ s.devicemap[ip_address].Eventtypes = append(s.devicemap[ip_address].Eventtypes, event)
+ if !s.add_subscription(ip_address, event) {
+ errstring = errstring + "failed to subscribe event " + ip_address + " " + event + "\n"
}
}
+
go s.collect_data(ip_address)
s.devicemap[ip_address].Datafile = get_data_file(ip_address)
s.update_data_file(ip_address)
@@ -373,7 +378,7 @@
}
func (s *Server) GetCurrentDevices(c context.Context, e *importer.Empty) (*importer.DeviceListByIp, error) {
- fmt.Println("In Received GetCurrentDevices")
+ logrus.Infof("In Received GetCurrentDevices")
if len(s.devicemap) == 0 {
return nil, status.Errorf(codes.NotFound, "No Device found\n")
@@ -381,7 +386,7 @@
dl := new(importer.DeviceListByIp)
for k, v := range s.devicemap {
if v != nil {
- fmt.Printf("IpAdd[%s] \n", k)
+ logrus.Infof("IpAdd[%s] \n", k)
dl.Ip = append(dl.Ip, k)
}
}
@@ -389,26 +394,25 @@
}
func NewGrpcServer(grpcport string) (l net.Listener, g *grpc.Server, e error) {
- fmt.Printf("Listening %s\n", grpcport)
+ logrus.Infof("Listening %s\n", grpcport)
g = grpc.NewServer()
l, e = net.Listen("tcp", grpcport)
return
}
-func (s *Server) startgrpcserver() error {
- fmt.Println("starting gRPC Server")
+func (s *Server) startgrpcserver() {
+ logrus.Info("starting gRPC Server")
grpcport := ":50051"
listener, gserver, err := NewGrpcServer(grpcport)
if err != nil {
- fmt.Println("Failed to create gRPC server: ", err)
- return err
+ logrus.Errorf("Failed to create gRPC server: %s ", err)
+ panic(err)
}
s.gRPCserver = gserver
importer.RegisterDeviceManagementServer(gserver, s)
if err := gserver.Serve(listener); err != nil {
- fmt.Println("Failed to run gRPC server: ", err)
- return err
+ logrus.Errorf("Failed to run gRPC server: %s ", err)
+ panic(err)
}
- return nil
}
func (s *Server) kafkaCloseProducer() {
@@ -418,7 +422,7 @@
}
func (s *Server) kafkaInit() {
- fmt.Println("Starting kafka init to Connect to broker: ")
+ logrus.Info("Starting kafka init to Connect to broker: ")
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 10
@@ -433,14 +437,14 @@
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
- fmt.Println(" IN Handle Event ")
+ logrus.Info(" IN Handle Event ")
if r.Method == "POST" {
Body, err := ioutil.ReadAll(r.Body)
if err != nil {
- fmt.Println("Error getting HTTP data", err)
+ logrus.Errorf("Error getting HTTP data %s", err)
}
defer r.Body.Close()
- fmt.Println("Received Event Message ")
+ logrus.Info("Received Event Message ")
fmt.Printf("%s\n", Body)
message := &sarama.ProducerMessage{
Topic: importerTopic,
@@ -451,39 +455,42 @@
}
func (s *Server) runServer() {
- fmt.Println("Starting HTTP Server")
+ logrus.Info("Starting HTTP Server")
http.HandleFunc("/", s.handle_events)
- http.ListenAndServeTLS(":8080", "https-server.crt", "https-server.key", nil)
+ err := http.ListenAndServeTLS(":8080", "https-server.crt", "https-server.key", nil)
+ if err != nil {
+ panic(err)
+ }
}
/* validate_ip() verifies if the ip and port are valid and already registered then return the truth value of the desired state specified by the following 2 switches,
want_registered: 'true' if the fact of an ip is registered is the desired state
- include_port: 'true' further checks if <ip>:<port#> does exist in the devicemap in case an ip is found registered
+ include_port: 'true' further checks if <ip>:<port#> does exist in the devicemap in case an ip is found registered
*/
func (s *Server) validate_ip(ip_address string, want_registered bool, include_port bool) (msg string, ok bool) {
msg = ""
ok = false
if !strings.Contains(ip_address, ":") {
- fmt.Printf("Incorrect IP address %s, expected format <ip>:<port #>", ip_address)
+ logrus.Errorf("Incorrect IP address %s, expected format <ip>:<port #>", ip_address)
msg = "Incorrect IP address format (<ip>:<port #>)\n"
return
}
splits := strings.Split(ip_address, ":")
ip, port := splits[0], splits[1]
if net.ParseIP(ip) == nil {
- fmt.Printf("Invalid IP address %s", ip)
+ logrus.Errorf("Invalid IP address %s", ip)
msg = "Invalid IP address " + ip + "\n"
return
}
if _, err := strconv.Atoi(port); err != nil {
- fmt.Printf("Port # %s is not an integer", port)
+ logrus.Errorf("Port # %s is not an integer", port)
msg = "Port # " + port + " needs to be an integer\n"
return
}
for k := range s.devicemap {
if strings.HasPrefix(k, ip) {
if !want_registered {
- fmt.Printf("Device ip %s already registered", ip)
+ logrus.Errorf("Device ip %s already registered", ip)
msg = "Device ip " + ip + " already registered\n"
return
} else if include_port {
@@ -491,7 +498,7 @@
ok = true
return
} else {
- fmt.Printf("Device %s not registered", ip_address)
+ logrus.Errorf("Device %s not registered", ip_address)
msg = "Device " + ip_address + " not registered\n"
return
}
@@ -502,7 +509,7 @@
}
}
if want_registered {
- fmt.Printf("Device %s not registered", ip_address)
+ logrus.Errorf("Device %s not registered", ip_address)
msg = "Device " + ip_address + " not registered\n"
return
}
@@ -511,23 +518,27 @@
}
func (s *Server) init_data_persistence() {
- fmt.Println("Retrieving persisted data")
+ logrus.Info("Retrieving persisted data")
subscriptionListPath = pvmount + "/subscriptions"
if err := os.MkdirAll(subscriptionListPath, 0777); err != nil {
- fmt.Println(err)
+ logrus.Errorf("MkdirAll %s", err)
} else {
files, err := ioutil.ReadDir(subscriptionListPath)
if err != nil {
- fmt.Println(err)
+ logrus.Errorf("ReadDir %s", err)
} else {
for _, f := range files {
b, err := ioutil.ReadFile(path.Join(subscriptionListPath, f.Name()))
if err != nil {
- fmt.Println(err)
+ logrus.Errorf("Readfile %s", err)
} else if f.Size() > 0 {
ip := f.Name()
d := device{}
- json.Unmarshal(b, &d)
+ err := json.Unmarshal(b, &d)
+ if err != nil {
+ logrus.Errorf("Unmarshal %s", err)
+ return
+ }
s.devicemap[ip] = &d
freq := s.devicemap[ip].Freq
@@ -552,36 +563,35 @@
}
func init() {
- Formatter := new(log.TextFormatter)
+ Formatter := new(logrus.TextFormatter)
Formatter.TimestampFormat = "02-01-2006 15:04:05"
Formatter.FullTimestamp = true
- log.SetFormatter(Formatter)
- fmt.Println("Connecting to broker: ")
- fmt.Println("Listening to http server")
- log.Info("log Connecting to broker:")
- log.Info("log Listening to http server ")
+ logrus.SetFormatter(Formatter)
+ logrus.Info("log Connecting to broker:")
+ logrus.Info("log Listening to http server ")
//sarama.Logger = log.New()
}
func get_data_file(ip string) *os.File {
+ logrus.Info("get_data_file")
if pvmount == "" {
return nil
}
f, err := os.OpenFile(subscriptionListPath+"/"+ip, os.O_CREATE|os.O_RDWR, 0664)
if err != nil {
- fmt.Println(err)
+ logrus.Errorf("Openfile err %s", err)
}
return f
}
func (s *Server) close_data_files() {
- for ip, _ := range s.devicemap {
+ for ip := range s.devicemap {
s.devicemap[ip].Datafile.Close()
}
}
func main() {
- fmt.Println("Starting Device-management Container")
+ logrus.Info("Starting Device-management Container")
http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
client := &http.Client{
@@ -601,13 +611,11 @@
s.init_data_persistence()
}
- quit := make(chan os.Signal)
+ quit := make(chan os.Signal, 10)
signal.Notify(quit, os.Interrupt)
- select {
- case sig := <-quit:
- fmt.Println("Shutting down:", sig)
- s.kafkaCloseProducer()
- s.close_data_files()
- }
+ sig := <-quit
+ logrus.Infof("Shutting down:%d", sig)
+ s.kafkaCloseProducer()
+ s.close_data_files()
}