[SEBA-342] Publishing logs to kafka

Change-Id: Iaa72945dfd59a5d151cad14ff593eb299229fb3e
diff --git a/core/mediator.go b/core/mediator.go
index 496621a..a6d67b0 100644
--- a/core/mediator.go
+++ b/core/mediator.go
@@ -17,34 +17,37 @@
 package core
 
 import (
-	"sync"
-	"gerrit.opencord.org/voltha-bbsim/common"
+	"flag"
+	"fmt"
 	"os"
 	"os/signal"
-	"fmt"
-	"flag"
-	"strings"
 	"strconv"
+	"strings"
+	"sync"
+
+	"gerrit.opencord.org/voltha-bbsim/common/logger"
+	log "github.com/sirupsen/logrus"
 )
 
-type option struct{
-	address string
-	port uint32
-	oltid uint32
-	npon uint32
-	nonus uint32
-	aaawait int
-	dhcpwait int
-	dhcpservip string
-	intvl int
-	intvl_test int
-	Mode Mode
+type option struct {
+	address     string
+	port        uint32
+	oltid       uint32
+	npon        uint32
+	nonus       uint32
+	aaawait     int
+	dhcpwait    int
+	dhcpservip  string
+	intvl       int
+	intvl_test  int
+	Mode        Mode
+	KafkaBroker string
 }
 
 func GetOptions() *option {
 	o := new(option)
 	addressport := flag.String("H", ":50060", "IP address:port")
-	oltid :=flag.Int("id", 0, "OLT-ID")
+	oltid := flag.Int("id", 0, "OLT-ID")
 	npon := flag.Int("i", 1, "Number of PON-IF ports")
 	nonus := flag.Int("n", 1, "Number of ONUs per PON-IF port")
 	modeopt := flag.String("m", "default", "Emulation mode (default, aaa, both (aaa & dhcp))")
@@ -53,6 +56,7 @@
 	dhcpservip := flag.String("s", "182.21.0.1", "DHCP Server IP Address")
 	intvl := flag.Int("v", 1, "Interval each Indication")
 	intvl_test := flag.Int("V", 1, "Interval each Indication")
+	kafkaBroker := flag.String("k", "", "Kafka broker")
 	o.Mode = DEFAULT
 	flag.Parse()
 	if *modeopt == "aaa" {
@@ -68,27 +72,28 @@
 	o.dhcpservip = *dhcpservip
 	o.intvl = *intvl
 	o.intvl_test = *intvl_test
+	o.KafkaBroker = *kafkaBroker
 	o.address = (strings.Split(*addressport, ":")[0])
 	tmp, _ := strconv.Atoi(strings.Split(*addressport, ":")[1])
 	o.port = uint32(tmp)
 	return o
 }
 
-type stateMachine struct{
+type stateMachine struct {
 	handlers []*handler
-	state coreState
+	state    coreState
 }
 
-type handler struct{
-	dst coreState
-	src coreState
+type handler struct {
+	dst    coreState
+	src    coreState
 	method func(s *Server) error
 }
 
 func (sm *stateMachine) transit(next coreState) func(s *Server) error {
-	for _, handler := range sm.handlers{
+	for _, handler := range sm.handlers {
 		if handler.src == sm.state && handler.dst == next {
-			logger.Debug("Hit (src:%d, dst:%d)",handler.src, handler.dst)
+			logger.Debug("Hit (src:%d, dst:%d)", handler.src, handler.dst)
 			sm.state = next
 			return handler.method
 		}
@@ -98,27 +103,33 @@
 }
 
 type mediator struct {
-	opt *option
-	sm *stateMachine
+	opt    *option
+	sm     *stateMachine
 	server *Server
 	tester *Tester
 }
 
-func NewMediator(o *option) *mediator{
+func NewMediator(o *option) *mediator {
 	m := new(mediator)
 	m.opt = o
-	logger.Debug("ip:%s, baseport:%d, npon:%d, nonus:%d, mode:%d\n", o.address, o.port, o.npon, o.nonus, o.Mode)
+	logger.WithFields(log.Fields{
+		"ip":        o.address,
+		"baseport":  o.port,
+		"pon_ports": o.npon,
+		"onus":      o.nonus,
+		"mode":      o.Mode,
+	}).Debug("New mediator")
 	return m
 }
 
-func (m *mediator) Start(){
+func (m *mediator) Start() {
 	var wg sync.WaitGroup
 	opt := m.opt
 	server := NewCore(opt)
 	wg.Add(1)
-	go func(){
-		if err:= server.Start(); err != nil {	//Blocking
-			logger.Error("%s", err)
+	go func() {
+		if err := server.Start(); err != nil { //Blocking
+			logger.Error(err)
 		}
 		wg.Done()
 		return
@@ -130,18 +141,18 @@
 	m.sm = &stateMachine{
 		state: INACTIVE,
 		handlers: []*handler{
-			&handler{src: PRE_ACTIVE,dst: ACTIVE, method: m.tester.Start},
-			&handler{src: ACTIVE,dst: PRE_ACTIVE, method: m.tester.Stop},
+			&handler{src: PRE_ACTIVE, dst: ACTIVE, method: m.tester.Start},
+			&handler{src: ACTIVE, dst: PRE_ACTIVE, method: m.tester.Stop},
 		},
 	}
-	go func(){
+	go func() {
 		m.Mediate()
 	}()
 
 	c := make(chan os.Signal, 1)
 	signal.Notify(c, os.Interrupt)
 	go func() {
-		defer func(){
+		defer func() {
 			logger.Debug("SIGINT catcher Done")
 			wg.Done()
 		}()
@@ -149,8 +160,8 @@
 			wg.Add(1)
 			fmt.Println("SIGINT", sig)
 			close(c)
-			server.Stop()	//Non-blocking
-			tester.Stop(server)   //Non-blocking
+			server.Stop()       //Non-blocking
+			tester.Stop(server) //Non-blocking
 			return
 		}
 	}()
@@ -158,17 +169,17 @@
 	logger.Debug("Reach to the end line")
 }
 
-func (m *mediator) Mediate(){
+func (m *mediator) Mediate() {
 	wg := sync.WaitGroup{}
 	defer logger.Debug("Mediate Done")
-	for  corestat := range m.server.stateChan {
+	for corestat := range m.server.stateChan {
 		logger.Debug("Mediator receives state %d of server", corestat)
 		method := m.sm.transit(corestat)
 		if method != nil {
 			wg.Add(1)
 			defer wg.Done()
 			go func() error {
-				if err := method(m.server); err != nil{ //blocking
+				if err := method(m.server); err != nil { //blocking
 					m.server.Stop()
 					return err
 				}
@@ -177,4 +188,4 @@
 		}
 	}
 	wg.Wait()
-}
\ No newline at end of file
+}