// Copyright 2018 Open Networking Foundation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
	"crypto/tls"
	"encoding/json"
	"fmt"
	"github.com/Shopify/sarama"
	empty "github.com/golang/protobuf/ptypes/empty"
	"github.com/opencord/device-management/proto"
	logrus "github.com/sirupsen/logrus"
	"golang.org/x/net/context"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	"io/ioutil"
	"net"
	"net/http"
	"os"
	"os/signal"
	"path"
	"strconv"
	"strings"
	"time"
)

//globals
const RF_DEFAULT_PROTOCOL = "https://"
const RF_DATA_COLLECT_THRESHOLD = 5
const RF_DATA_COLLECT_DUMMY_INTERVAL = 1000
const CONTENT_TYPE = "application/json"

var (
	importerTopic = "importer"
)

var redfish_resources = [...]string{"/redfish/v1/Chassis", "/redfish/v1/Systems", "/redfish/v1/EthernetSwitches"}
var pvmount = os.Getenv("DEVICE_MANAGEMENT_PVMOUNT")
var subscriptionListPath string

type scheduler struct {
	getdata    *time.Ticker
	quit       chan bool
	getdataend chan bool
}

type device struct {
	Subscriptions map[string]string `json:"ss"`
	Freq          uint32            `json:"freq"`
	Datacollector scheduler         `json:"-"`
	Freqchan      chan uint32       `json:"-"`
	Eventtypes    []string          `json:"eventtypes"`
	Datafile      *os.File          `json:"-"`
}

type Server struct {
	devicemap    map[string]*device
	gRPCserver   *grpc.Server
	dataproducer sarama.AsyncProducer
	httpclient   *http.Client
}

func (s *Server) ClearCurrentEventList(c context.Context, info *importer.Device) (*empty.Empty, error) {
	logrus.Info("Received ClearCurrentEventList")
	ip_address := info.IpAddress
	if msg, ok := s.validate_ip(ip_address, true, true); !ok {
		return nil, status.Errorf(codes.InvalidArgument, msg)
	}
	for event := range s.devicemap[ip_address].Subscriptions {
		rtn := s.remove_subscription(ip_address, event)
		if !rtn {
			logrus.WithFields(logrus.Fields{
				"Event": event,
			}).Info("Error removing event")
		}
	}
	s.update_data_file(ip_address)
	return &empty.Empty{}, nil
}

func (s *Server) GetCurrentEventList(c context.Context, info *importer.Device) (*importer.EventList, error) {
	logrus.Info("Received GetCurrentEventList")
	if msg, ok := s.validate_ip(info.IpAddress, true, true); !ok {
		return nil, status.Errorf(codes.InvalidArgument, msg)
	}
	currentevents := new(importer.EventList)
	for event := range s.devicemap[info.IpAddress].Subscriptions {
		currentevents.Events = append(currentevents.Events, event)
	}
	return currentevents, nil
}

func (s *Server) GetEventList(c context.Context, info *importer.Device) (*importer.EventList, error) {
	logrus.Info("Received GetEventList")
	if msg, ok := s.validate_ip(info.IpAddress, true, true); !ok {
		return nil, status.Errorf(codes.InvalidArgument, msg)
	}
	eventstobesubscribed := new(importer.EventList)
	eventstobesubscribed.Events = s.devicemap[info.IpAddress].Eventtypes
	if eventstobesubscribed.Events == nil {
		return nil, status.Errorf(codes.NotFound, "No events found\n")
	}
	return eventstobesubscribed, nil
}

func (s *Server) SetFrequency(c context.Context, info *importer.FreqInfo) (*empty.Empty, error) {
	logrus.Info("Received SetFrequency")
	if msg, ok := s.validate_ip(info.IpAddress, true, true); !ok {
		return nil, status.Errorf(codes.InvalidArgument, msg)
	}
	if info.Frequency > 0 && info.Frequency < RF_DATA_COLLECT_THRESHOLD {
		return nil, status.Errorf(codes.InvalidArgument, "Invalid interval\n")
	}
	s.devicemap[info.IpAddress].Freqchan <- info.Frequency
	s.devicemap[info.IpAddress].Freq = info.Frequency
	s.update_data_file(info.IpAddress)
	return &empty.Empty{}, nil
}

func (s *Server) SubscribeGivenEvents(c context.Context, subeventlist *importer.GivenEventList) (*empty.Empty, error) {
	errstring := ""
	logrus.Info("Received SubsrcribeEvents")
	//Call API to subscribe events
	ip_address := subeventlist.EventIpAddress
	if msg, ok := s.validate_ip(ip_address, true, true); !ok {
		return nil, status.Errorf(codes.InvalidArgument, msg)
	}
	if len(subeventlist.Events) <= 0 {
		return nil, status.Errorf(codes.InvalidArgument, "Event list is empty\n")
	}
	for _, event := range subeventlist.Events {
		if _, ok := s.devicemap[ip_address].Subscriptions[event]; !ok {
			supported := false
			for _, e := range s.devicemap[ip_address].Eventtypes {
				if e == event {
					supported = true
					rtn := s.add_subscription(ip_address, event)
					if !rtn {
						errstring = errstring + "failed to subscribe event " + ip_address + " " + event + "\n"
						logrus.WithFields(logrus.Fields{
							"Event": event,
						}).Info("Error adding  event")
					}
					break
				}
			}
			if !supported {
				errstring = errstring + "event " + event + " not supported\n"
				logrus.WithFields(logrus.Fields{
					"Event": event,
				}).Info("not supported")
			}
		} else {
			errstring = errstring + "event " + event + " already subscribed\n"
			logrus.WithFields(logrus.Fields{
				"Event": event,
			}).Info("Already Subscribed")
		}
	}
	s.update_data_file(ip_address)
	if errstring != "" {
		return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
	}
	return &empty.Empty{}, nil
}

func (s *Server) UnsubscribeGivenEvents(c context.Context, unsubeventlist *importer.GivenEventList) (*empty.Empty, error) {
	errstring := ""
	logrus.Info("Received UnSubsrcribeEvents")
	ip_address := unsubeventlist.EventIpAddress
	if msg, ok := s.validate_ip(ip_address, true, true); !ok {
		return nil, status.Errorf(codes.InvalidArgument, msg)
	}

	if len(unsubeventlist.Events) <= 0 {
		return nil, status.Errorf(codes.InvalidArgument, "Event list is empty\n")
	}
	//Call API to unsubscribe events
	for _, event := range unsubeventlist.Events {
		if _, ok := s.devicemap[ip_address].Subscriptions[event]; ok {
			rtn := s.remove_subscription(ip_address, event)
			if !rtn {
				errstring = errstring + "failed to unsubscribe event " + ip_address + " " + event + "\n"
				logrus.WithFields(logrus.Fields{
					"Event": event,
				}).Info("Error removing event")
			}
		} else {
			errstring = errstring + "event " + event + " not found\n"
			logrus.WithFields(logrus.Fields{
				"Event": event,
			}).Info("was not Subscribed")
		}
	}
	s.update_data_file(ip_address)

	if errstring != "" {
		return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
	}
	return &empty.Empty{}, nil
}

func (s *Server) update_data_file(ip_address string) {
	f := s.devicemap[ip_address].Datafile
	if f != nil {
		b, err := json.Marshal(s.devicemap[ip_address])
		if err != nil {
			logrus.Errorf("Update_data_file %s", err)
		} else {
			err := f.Truncate(0)
			if err != nil {
				logrus.Errorf("err Trunate %s", err)
				return
			}
			pos, err := f.Seek(0, 0)
			if err != nil {
				logrus.Errorf("err Seek %s", err)
				return
			}
			fmt.Println("moved back to", pos)
			n, err := f.Write(b)
			if err != nil {
				logrus.Errorf("err wrote %d bytes", n)
				logrus.Errorf("write error to file %s", err)
			}
		}
	} else {
		logrus.Errorf("file handle is nil %s", ip_address)
	}
}

func (s *Server) collect_data(ip_address string) {
	freqchan := s.devicemap[ip_address].Freqchan
	ticker := s.devicemap[ip_address].Datacollector.getdata
	donechan := s.devicemap[ip_address].Datacollector.quit
	for {
		select {
		case freq := <-freqchan:
			ticker.Stop()
			if freq > 0 {
				ticker = time.NewTicker(time.Duration(freq) * time.Second)
				s.devicemap[ip_address].Datacollector.getdata = ticker
			}
		case err := <-s.dataproducer.Errors():
			logrus.Errorf("Failed to produce message:%s", err)
		case <-ticker.C:
			for _, resource := range redfish_resources {
				data := s.get_status(ip_address, resource)
				for _, str := range data {
					str = "Device IP: " + ip_address + " " + str
					fmt.Printf("collected data  %s\n ...", str)
					b := []byte(str)
					msg := &sarama.ProducerMessage{Topic: importerTopic, Value: sarama.StringEncoder(b)}
					select {
					case s.dataproducer.Input() <- msg:
						logrus.Info("Produce message")
					default:
					}
				}
			}
		case <-donechan:
			ticker.Stop()
			logrus.Info("getdata ticker stopped")
			s.devicemap[ip_address].Datacollector.getdataend <- true
			return
		}
	}
}

func (s *Server) DeleteDeviceList(c context.Context, list *importer.DeviceListByIp) (*empty.Empty, error) {
	logrus.Info("DeleteDeviceList received")
	errstring := ""
	for _, ip := range list.Ip {
		if _, ok := s.devicemap[ip]; !ok {
			logrus.Infof("Device not found %s ", ip)
			errstring = errstring + "Device " + ip + " not found\n"
			continue
		}
		for event := range s.devicemap[ip].Subscriptions {
			rtn := s.remove_subscription(ip, event)
			if !rtn {
				logrus.WithFields(logrus.Fields{
					"Event": event,
				}).Info("Error removing event")
			}
		}
		logrus.Infof("deleting device %s", ip)
		s.devicemap[ip].Datacollector.quit <- true

		f := s.devicemap[ip].Datafile
		if f != nil {
			logrus.Infof("deleteing file %s", f.Name())
			err := f.Close()
			if err != nil {
				logrus.Errorf("error closing file %s %s", f.Name(), err)
				errstring = errstring + "error closing file " + f.Name() + "\n"
			}
			err = os.Remove(f.Name())
			if err != nil {
				logrus.Errorf("error deleting file %s Error:%s ", f.Name(), err)
			}
		} else {
			logrus.Errorf("File not found %s", errstring+"file "+ip+" not found")
		}
		<-s.devicemap[ip].Datacollector.getdataend
		delete(s.devicemap, ip)
	}
	if errstring != "" {
		return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
	}
	return &empty.Empty{}, nil
}

func (s *Server) SendDeviceList(c context.Context, list *importer.DeviceList) (*empty.Empty, error) {
	errstring := ""
	for _, dev := range list.Device {
		ip_address := dev.IpAddress
		if msg, ok := s.validate_ip(ip_address, false, false); !ok {
			errstring = errstring + msg
			continue
		}
		if dev.Frequency > 0 && dev.Frequency < RF_DATA_COLLECT_THRESHOLD {
			logrus.Errorf("Device %s data collection interval %d out of range", ip_address, dev.Frequency)
			errstring = errstring + "Device " + ip_address + " data collection frequency out of range\n"
			continue
		}
		d := device{
			Subscriptions: make(map[string]string),
			Freq:          dev.Frequency,
			Datacollector: scheduler{
				quit:       make(chan bool),
				getdataend: make(chan bool),
			},
			Freqchan: make(chan uint32),
		}
		s.devicemap[ip_address] = &d
		logrus.Infof("Configuring  %s", ip_address)

		/* if initial interval is 0, create a dummy ticker, which is stopped right away, so getdata is not nil */
		freq := dev.Frequency
		if freq == 0 {
			freq = RF_DATA_COLLECT_DUMMY_INTERVAL
		}
		s.devicemap[ip_address].Datacollector.getdata = time.NewTicker(time.Duration(freq) * time.Second)
		if dev.Frequency == 0 {
			s.devicemap[ip_address].Datacollector.getdata.Stop()
		}

		eventtypes := s.get_event_types(ip_address)
		for _, event := range eventtypes {
			s.devicemap[ip_address].Eventtypes = append(s.devicemap[ip_address].Eventtypes, event)
			if !s.add_subscription(ip_address, event) {
				errstring = errstring + "failed to subscribe event " + ip_address + " " + event + "\n"
			}
		}

		go s.collect_data(ip_address)
		s.devicemap[ip_address].Datafile = get_data_file(ip_address)
		s.update_data_file(ip_address)
	}
	if errstring != "" {
		return &empty.Empty{}, status.Errorf(codes.InvalidArgument, errstring)
	}
	return &empty.Empty{}, nil
}

func (s *Server) GetCurrentDevices(c context.Context, e *importer.Empty) (*importer.DeviceListByIp, error) {
	logrus.Infof("In Received GetCurrentDevices")

	if len(s.devicemap) == 0 {
		return nil, status.Errorf(codes.NotFound, "No Device found\n")
	}
	dl := new(importer.DeviceListByIp)
	for k, v := range s.devicemap {
		if v != nil {
			logrus.Infof("IpAdd[%s] \n", k)
			dl.Ip = append(dl.Ip, k)
		}
	}
	return dl, nil
}

func NewGrpcServer(grpcport string) (l net.Listener, g *grpc.Server, e error) {
	logrus.Infof("Listening %s\n", grpcport)
	g = grpc.NewServer()
	l, e = net.Listen("tcp", grpcport)
	return
}
func (s *Server) startgrpcserver() {
	logrus.Info("starting gRPC Server")
	grpcport := ":50051"
	listener, gserver, err := NewGrpcServer(grpcport)
	if err != nil {
		logrus.Errorf("Failed to create gRPC server: %s ", err)
		panic(err)
	}
	s.gRPCserver = gserver
	importer.RegisterDeviceManagementServer(gserver, s)
	if err := gserver.Serve(listener); err != nil {
		logrus.Errorf("Failed to run gRPC server: %s ", err)
		panic(err)
	}

}
func (s *Server) kafkaCloseProducer() {
	if err := s.dataproducer.Close(); err != nil {
		panic(err)
	}

}
func (s *Server) kafkaInit() {
	logrus.Info("Starting kafka init to Connect to broker: ")
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Retry.Max = 10
	producer, err := sarama.NewAsyncProducer([]string{"cord-kafka.default.svc.cluster.local:9092"}, config)
	if err != nil {
		panic(err)
	}
	s.dataproducer = producer
}

func (s *Server) handle_events(w http.ResponseWriter, r *http.Request) {
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	logrus.Info(" IN Handle Event  ")
	if r.Method == "POST" {
		Body, err := ioutil.ReadAll(r.Body)
		if err != nil {
			logrus.Errorf("Error getting HTTP data %s", err)
		}
		defer r.Body.Close()
		logrus.Info("Received Event Message ")
		fmt.Printf("%s\n", Body)
		message := &sarama.ProducerMessage{
			Topic: importerTopic,
			Value: sarama.StringEncoder(Body),
		}
		s.dataproducer.Input() <- message
	}
}

func (s *Server) runServer() {
	logrus.Info("Starting HTTP Server")
	http.HandleFunc("/", s.handle_events)
	err := http.ListenAndServeTLS(":8080", "https-server.crt", "https-server.key", nil)
	if err != nil {
		panic(err)
	}
}

/* validate_ip() verifies if the ip and port are valid and already registered then return the truth value of the desired state specified by the following 2 switches,
   want_registered: 'true' if the fact of an ip is registered is the desired state
   include_port: 'true' further checks if <ip>:<port#> does exist in the devicemap in case an ip is found registered
*/
func (s *Server) validate_ip(ip_address string, want_registered bool, include_port bool) (msg string, ok bool) {
	msg = ""
	ok = false
	if !strings.Contains(ip_address, ":") {
		logrus.Errorf("Incorrect IP address %s, expected format <ip>:<port #>", ip_address)
		msg = "Incorrect IP address format (<ip>:<port #>)\n"
		return
	}
	splits := strings.Split(ip_address, ":")
	ip, port := splits[0], splits[1]
	if net.ParseIP(ip) == nil {
		logrus.Errorf("Invalid IP address %s", ip)
		msg = "Invalid IP address " + ip + "\n"
		return
	}
	if _, err := strconv.Atoi(port); err != nil {
		logrus.Errorf("Port # %s is not an integer", port)
		msg = "Port # " + port + " needs to be an integer\n"
		return
	}
	for k := range s.devicemap {
		if strings.HasPrefix(k, ip) {
			if !want_registered {
				logrus.Errorf("Device ip %s already registered", ip)
				msg = "Device ip " + ip + " already registered\n"
				return
			} else if include_port {
				if _, found := s.devicemap[ip_address]; found {
					ok = true
					return
				} else {
					logrus.Errorf("Device %s not registered", ip_address)
					msg = "Device " + ip_address + " not registered\n"
					return
				}
			} else {
				ok = true
				return
			}
		}
	}
	if want_registered {
		logrus.Errorf("Device %s not registered", ip_address)
		msg = "Device " + ip_address + " not registered\n"
		return
	}
	ok = true
	return
}

func (s *Server) init_data_persistence() {
	logrus.Info("Retrieving persisted data")
	subscriptionListPath = pvmount + "/subscriptions"
	if err := os.MkdirAll(subscriptionListPath, 0777); err != nil {
		logrus.Errorf("MkdirAll %s", err)
	} else {
		files, err := ioutil.ReadDir(subscriptionListPath)
		if err != nil {
			logrus.Errorf("ReadDir %s", err)
		} else {
			for _, f := range files {
				b, err := ioutil.ReadFile(path.Join(subscriptionListPath, f.Name()))
				if err != nil {
					logrus.Errorf("Readfile %s", err)
				} else if f.Size() > 0 {
					ip := f.Name()
					d := device{}
					err := json.Unmarshal(b, &d)
					if err != nil {
						logrus.Errorf("Unmarshal %s", err)
						return
					}
					s.devicemap[ip] = &d
					freq := s.devicemap[ip].Freq

					/* if initial interval is 0, create a dummy ticker, which is stopped right away, so getdata is not nil */
					if freq == 0 {
						freq = RF_DATA_COLLECT_DUMMY_INTERVAL
					}
					s.devicemap[ip].Datacollector.getdata = time.NewTicker(time.Duration(freq) * time.Second)
					if s.devicemap[ip].Freq == 0 {
						s.devicemap[ip].Datacollector.getdata.Stop()
					}

					s.devicemap[ip].Datacollector.quit = make(chan bool)
					s.devicemap[ip].Datacollector.getdataend = make(chan bool)
					s.devicemap[ip].Freqchan = make(chan uint32)
					s.devicemap[ip].Datafile = get_data_file(ip)
					go s.collect_data(ip)
				}
			}
		}
	}
}

func init() {
	Formatter := new(logrus.TextFormatter)
	Formatter.TimestampFormat = "02-01-2006 15:04:05"
	Formatter.FullTimestamp = true
	logrus.SetFormatter(Formatter)
	logrus.Info("log Connecting to broker:")
	logrus.Info("log Listening to  http server ")
	//sarama.Logger = log.New()
}

func get_data_file(ip string) *os.File {
	logrus.Info("get_data_file")
	if pvmount == "" {
		return nil
	}
	f, err := os.OpenFile(subscriptionListPath+"/"+ip, os.O_CREATE|os.O_RDWR, 0664)
	if err != nil {
		logrus.Errorf("Openfile err %s", err)
	}
	return f
}

func (s *Server) close_data_files() {
	for ip := range s.devicemap {
		s.devicemap[ip].Datafile.Close()
	}
}

func main() {
	logrus.Info("Starting Device-management Container")

	http.DefaultTransport.(*http.Transport).TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
	client := &http.Client{
		Timeout: 10 * time.Second,
	}

	s := Server{
		devicemap:  make(map[string]*device),
		httpclient: client,
	}

	s.kafkaInit()
	go s.runServer()
	go s.startgrpcserver()

	if pvmount != "" {
		s.init_data_persistence()
	}

	quit := make(chan os.Signal, 10)
	signal.Notify(quit, os.Interrupt)

	sig := <-quit
	logrus.Infof("Shutting down:%d", sig)
	s.kafkaCloseProducer()
	s.close_data_files()
}
