blob: 4f05c882cd5eaa41b1e976f156fed6bf8591e6a1 [file] [log] [blame]
// Copyright 2018 Open Networking Foundation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package main
import (
"crypto/tls"
"encoding/json"
"fmt"
"github.com/Shopify/sarama"
empty "github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/device-management/proto"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"io/ioutil"
"net"
"net/http"
"os"
"os/signal"
"path"
"time"
)
//globals
const RF_DEFAULT_PROTOCOL = "https://"
const RF_DATA_COLLECT_THRESHOLD = 5
const RF_DATA_COLLECT_DUMMY_INTERVAL = 1000
const CONTENT_TYPE = "application/json"
var (
importerTopic = "importer"
)
var DataProducer sarama.AsyncProducer
var redfish_resources = [...]string{"/redfish/v1/Chassis", "/redfish/v1/Systems", "/redfish/v1/EthernetSwitches"}
var pvmount = os.Getenv("DEVICE_MANAGEMENT_PVMOUNT")
var subscriptionListPath string
type scheduler struct {
getdata *time.Ticker
quit chan bool
getdataend chan bool
}
type device struct {
Subscriptions map[string]string `json:"ss"`
Freq uint32 `json:"freq"`
Datacollector scheduler `json:"-"`
Freqchan chan uint32 `json:"-"`
Eventtypes []string `json:"eventtypes"`
Datafile *os.File `json:"-"`
}
type Server struct {
devicemap map[string]*device
gRPCserver *grpc.Server
dataproducer sarama.AsyncProducer
httpclient *http.Client
}
func (s *Server) ClearCurrentEventList(c context.Context, info *importer.Device) (*empty.Empty, error) {
fmt.Println("Received ClearCurrentEventList")
ip_address := info.IpAddress
_, found := s.devicemap[ip_address]
if !found {
return nil, status.Errorf(codes.NotFound, "Device not registered")
}
for event, _ := range s.devicemap[ip_address].Subscriptions {
rtn := s.remove_subscription(ip_address, event)
if !rtn {
log.WithFields(log.Fields{
"Event": event,
}).Info("Error removing event")
}
}
s.update_data_file(ip_address)
return &empty.Empty{}, nil
}
func (s *Server) GetCurrentEventList(c context.Context, info *importer.Device) (*importer.EventList, error) {
fmt.Println("Received GetCurrentEventList")
_, found := s.devicemap[info.IpAddress]
if !found {
return nil, status.Errorf(codes.NotFound, "Device not registered")
}
currentevents := new(importer.EventList)
for event, _ := range s.devicemap[info.IpAddress].Subscriptions {
currentevents.Events = append(currentevents.Events, event)
}
return currentevents, nil
}
func (s *Server) GetEventList(c context.Context, info *importer.Device) (*importer.EventList, error) {
fmt.Println("Received GetEventList")
eventstobesubscribed := new(importer.EventList)
// eventstobesubscribed.Events = s.devicemap[info.IpAddress].Eventtypes
eventstobesubscribed.Events = s.get_event_types(info.IpAddress)
if eventstobesubscribed.Events == nil {
return nil, status.Errorf(codes.NotFound, "No events found")
}
return eventstobesubscribed, nil
}
func (s *Server) SetFrequency(c context.Context, info *importer.FreqInfo) (*empty.Empty, error) {
fmt.Println("Received SetFrequency")
_, found := s.devicemap[info.IpAddress]
if !found {
return nil, status.Errorf(codes.NotFound, "Device not registered")
}
if info.Frequency > 0 && info.Frequency < RF_DATA_COLLECT_THRESHOLD {
return nil, status.Errorf(codes.InvalidArgument, "Invalid frequency")
}
s.devicemap[info.IpAddress].Freqchan <- info.Frequency
s.devicemap[info.IpAddress].Freq = info.Frequency
s.update_data_file(info.IpAddress)
return &empty.Empty{}, nil
}
func (s *Server) SubsrcribeGivenEvents(c context.Context, subeventlist *importer.GivenEventList) (*empty.Empty, error) {
errstring := ""
fmt.Println("Received SubsrcribeEvents")
//Call API to subscribe events
ip_address := subeventlist.EventIpAddress
_, found := s.devicemap[ip_address]
if !found {
return nil, status.Errorf(codes.NotFound, "Device not registered")
}
if len(subeventlist.Events) <= 0 {
return nil, status.Errorf(codes.InvalidArgument, "Event list is empty")
}
for _, event := range subeventlist.Events {
if _, ok := s.devicemap[ip_address].Subscriptions[event]; !ok {
rtn := s.add_subscription(ip_address, event)
if !rtn {
errstring = errstring + "failed to subscribe event " + ip_address + " " + event + "\n"
log.WithFields(log.Fields{
"Event": event,
}).Info("Error adding event")
}
} else {
errstring = errstring + "event " + event + " already subscribed\n"
log.WithFields(log.Fields{
"Event": event,
}).Info("Already Subscribed")
}
}
s.update_data_file(ip_address)
if errstring != "" {
return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
}
return &empty.Empty{}, nil
}
func (s *Server) UnSubsrcribeGivenEvents(c context.Context, unsubeventlist *importer.GivenEventList) (*empty.Empty, error) {
errstring := ""
fmt.Println("Received UnSubsrcribeEvents")
ip_address := unsubeventlist.EventIpAddress
_, found := s.devicemap[ip_address]
if !found {
return nil, status.Errorf(codes.NotFound, "Device not registered")
}
if len(unsubeventlist.Events) <= 0 {
return nil, status.Errorf(codes.InvalidArgument, "Event list is empty")
}
//Call API to unsubscribe events
for _, event := range unsubeventlist.Events {
if _, ok := s.devicemap[ip_address].Subscriptions[event]; ok {
rtn := s.remove_subscription(ip_address, event)
if !rtn {
errstring = errstring + "failed to unsubscribe event " + ip_address + " " + event + "\n"
log.WithFields(log.Fields{
"Event": event,
}).Info("Error removing event")
}
} else {
errstring = errstring + "event " + event + " not found\n"
log.WithFields(log.Fields{
"Event": event,
}).Info("was not Subscribed")
}
}
s.update_data_file(ip_address)
if errstring != "" {
return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
}
return &empty.Empty{}, nil
}
func (s *Server) update_data_file(ip_address string) {
f := s.devicemap[ip_address].Datafile
if f != nil {
b, err := json.Marshal(s.devicemap[ip_address])
if err != nil {
fmt.Println(err)
} else {
f.Truncate(0)
f.Seek(0, 0)
n, err := f.Write(b)
if err != nil {
fmt.Println("err wrote", n, "bytes")
fmt.Println(err)
}
}
} else {
fmt.Println("file handle is nil", ip_address)
}
}
func (s *Server) collect_data(ip_address string) {
freqchan := s.devicemap[ip_address].Freqchan
ticker := s.devicemap[ip_address].Datacollector.getdata
donechan := s.devicemap[ip_address].Datacollector.quit
for {
select {
case freq := <-freqchan:
ticker.Stop()
if freq > 0 {
ticker = time.NewTicker(time.Duration(freq) * time.Second)
s.devicemap[ip_address].Datacollector.getdata = ticker
}
case err := <-s.dataproducer.Errors():
fmt.Println("Failed to produce message:", err)
case <-ticker.C:
for _, resource := range redfish_resources {
data := s.get_status(ip_address, resource)
for _, str := range data {
str = "Device IP: " + ip_address + " " + str
fmt.Printf("collected data %s\n ...", str)
b := []byte(str)
msg := &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
select {
case s.dataproducer.Input() <- msg:
fmt.Println("Produce message")
default:
}
}
}
case <-donechan:
ticker.Stop()
fmt.Println("getdata ticker stopped")
s.devicemap[ip_address].Datacollector.getdataend <- true
return
}
}
}
func (s *Server) DeleteDeviceList(c context.Context, list *importer.DeviceListByIp) (*empty.Empty, error) {
fmt.Println("DeleteDeviceList received")
errstring := ""
for _, ip := range list.Ip {
if _, ok := s.devicemap[ip]; !ok {
fmt.Printf("Device not found %s\n ", ip)
errstring = errstring + "Device " + ip + " not found\n"
continue
}
for event, _ := range s.devicemap[ip].Subscriptions {
rtn := s.remove_subscription(ip, event)
if !rtn {
log.WithFields(log.Fields{
"Event": event,
}).Info("Error removing event")
}
}
fmt.Println("deleting device", ip)
s.devicemap[ip].Datacollector.quit <- true
f := s.devicemap[ip].Datafile
if f != nil {
fmt.Println("deleteing file", f.Name())
err := f.Close()
if err != nil {
fmt.Println("error closing file ", f.Name(), err)
errstring = errstring + "error closing file " + f.Name() + "\n"
}
err = os.Remove(f.Name())
if err != nil {
fmt.Println("error deleting file ", f.Name(), err)
}
} else {
errstring = errstring + "file " + ip + " not found\n"
}
<-s.devicemap[ip].Datacollector.getdataend
delete(s.devicemap, ip)
}
if errstring != "" {
return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
}
return &empty.Empty{}, nil
}
func (s *Server) SendDeviceList(c context.Context, list *importer.DeviceList) (*empty.Empty, error) {
errstring := ""
for _, dev := range list.Device {
ip_address := dev.IpAddress
if _, ok := s.devicemap[dev.IpAddress]; ok {
fmt.Printf("Device %s already exists", ip_address)
errstring = errstring + "Device " + ip_address + " already exists\n"
continue
}
if dev.Frequency > 0 && dev.Frequency < RF_DATA_COLLECT_THRESHOLD {
fmt.Printf("Device %s data collection frequency %d out of range", ip_address, dev.Frequency)
errstring = errstring + "Device " + ip_address + " data collection frequency out of range\n"
continue
}
d := device{
Subscriptions: make(map[string]string),
Freq: dev.Frequency,
Datacollector: scheduler{
quit: make(chan bool),
getdataend: make(chan bool),
},
Freqchan: make(chan uint32),
}
s.devicemap[ip_address] = &d
fmt.Printf("Configuring %s\n", ip_address)
/* if initial interval is 0, create a dummy ticker, which is stopped right away, so getdata is not nil */
freq := dev.Frequency
if freq == 0 {
freq = RF_DATA_COLLECT_DUMMY_INTERVAL
}
s.devicemap[ip_address].Datacollector.getdata = time.NewTicker(time.Duration(freq) * time.Second)
if dev.Frequency == 0 {
s.devicemap[ip_address].Datacollector.getdata.Stop()
}
eventtypes := s.get_event_types(ip_address)
if eventtypes != nil {
for _, event := range eventtypes {
s.devicemap[ip_address].Eventtypes = append(s.devicemap[ip_address].Eventtypes, event)
if s.add_subscription(ip_address, event) == false {
errstring = errstring + "failed to subscribe event " + ip_address + " " + event + "\n"
}
}
}
go s.collect_data(ip_address)
s.devicemap[ip_address].Datafile = get_data_file(ip_address)
s.update_data_file(ip_address)
}
if errstring != "" {
return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
}
return &empty.Empty{}, nil
}
func (s *Server) GetCurrentDevices(c context.Context, e *importer.Empty) (*importer.DeviceListByIp, error) {
fmt.Println("In Received GetCurrentDevices")
if len(s.devicemap) == 0 {
return nil, status.Errorf(codes.NotFound, "Devices not registered")
}
dl := new(importer.DeviceListByIp)
for k, v := range s.devicemap {
if v != nil {
fmt.Printf("IpAdd[%s] \n", k)
dl.Ip = append(dl.Ip, k)
}
}
return dl, nil
}
func NewGrpcServer(grpcport string) (l net.Listener, g *grpc.Server, e error) {
fmt.Printf("Listening %s\n", grpcport)
g = grpc.NewServer()
l, e = net.Listen("tcp", grpcport)
return
}
func (s *Server) startgrpcserver() error {
fmt.Println("starting gRPC Server")
grpcport := ":50051"
listener, gserver, err := NewGrpcServer(grpcport)
if err != nil {
fmt.Println("Failed to create gRPC server: ", err)
return err
}
s.gRPCserver = gserver
importer.RegisterDeviceManagementServer(gserver, s)
if err := gserver.Serve(listener); err != nil {
fmt.Println("Failed to run gRPC server: ", err)
return err
}
return nil
}
func (s *Server) kafkaCloseProducer() {
if err := s.dataproducer.Close(); err != nil {
panic(err)
}
}
func (s *Server) kafkaInit() {
fmt.Println("Starting kafka init to Connect to broker: ")
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 10
producer, err := sarama.NewAsyncProducer([]string{"cord-kafka.default.svc.cluster.local:9092"}, config)
if err != nil {
panic(err)
}
s.dataproducer = producer
}
func (s *Server) handle_events(w http.ResponseWriter, r *http.Request) {
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
fmt.Println(" IN Handle Event ")
if r.Method == "POST" {
Body, err := ioutil.ReadAll(r.Body)
if err != nil {
fmt.Println("Error getting HTTP data", err)
}
defer r.Body.Close()
fmt.Println("Received Event Message ")
fmt.Printf("%s\n", Body)
message := &sarama.ProducerMessage{
Topic: importerTopic,
Value: sarama.StringEncoder(Body),
}
s.dataproducer.Input() <- message
}
}
func (s *Server) runServer() {
fmt.Println("Starting HTTP Server")
http.HandleFunc("/", s.handle_events)
http.ListenAndServeTLS(":8080", "https-server.crt", "https-server.key", nil)
}
func (s *Server) init_data_persistence() {
fmt.Println("Retrieving persisted data")
subscriptionListPath = pvmount + "/subscriptions"
if err := os.MkdirAll(subscriptionListPath, 0777); err != nil {
fmt.Println(err)
} else {
files, err := ioutil.ReadDir(subscriptionListPath)
if err != nil {
fmt.Println(err)
} else {
for _, f := range files {
b, err := ioutil.ReadFile(path.Join(subscriptionListPath, f.Name()))
if err != nil {
fmt.Println(err)
} else if f.Size() > 0 {
ip := f.Name()
d := device{}
json.Unmarshal(b, &d)
s.devicemap[ip] = &d
freq := s.devicemap[ip].Freq
/* if initial interval is 0, create a dummy ticker, which is stopped right away, so getdata is not nil */
if freq == 0 {
freq = RF_DATA_COLLECT_DUMMY_INTERVAL
}
s.devicemap[ip].Datacollector.getdata = time.NewTicker(time.Duration(freq) * time.Second)
if s.devicemap[ip].Freq == 0 {
s.devicemap[ip].Datacollector.getdata.Stop()
}
s.devicemap[ip].Datacollector.quit = make(chan bool)
s.devicemap[ip].Datacollector.getdataend = make(chan bool)
s.devicemap[ip].Freqchan = make(chan uint32)
s.devicemap[ip].Datafile = get_data_file(ip)
go s.collect_data(ip)
}
}
}
}
}
func init() {
Formatter := new(log.TextFormatter)
Formatter.TimestampFormat = "02-01-2006 15:04:05"
Formatter.FullTimestamp = true
log.SetFormatter(Formatter)
fmt.Println("Connecting to broker: ")
fmt.Println("Listening to http server")
log.Info("log Connecting to broker:")
log.Info("log Listening to http server ")
//sarama.Logger = log.New()
}
func get_data_file(ip string) *os.File {
if pvmount == "" {
return nil
}
f, err := os.OpenFile(subscriptionListPath+"/"+ip, os.O_CREATE|os.O_RDWR, 0664)
if err != nil {
fmt.Println(err)
}
return f
}
func (s *Server) close_data_files() {
for ip, _ := range s.devicemap {
s.devicemap[ip].Datafile.Close()
}
}
func main() {
fmt.Println("Starting Device-management Container")
http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
client := &http.Client{
Timeout: 10 * time.Second,
}
s := Server{
devicemap: make(map[string]*device),
httpclient: client,
}
s.kafkaInit()
go s.runServer()
go s.startgrpcserver()
if pvmount != "" {
s.init_data_persistence()
}
quit := make(chan os.Signal)
signal.Notify(quit, os.Interrupt)
select {
case sig := <-quit:
fmt.Println("Shutting down:", sig)
s.kafkaCloseProducer()
s.close_data_files()
}
}