Merge "[SEBA-342] Publishing logs to kafka"
diff --git a/Makefile b/Makefile
index 2c3bf1e..cd3402e 100644
--- a/Makefile
+++ b/Makefile
@@ -14,6 +14,7 @@
 
 BBSIM_DEPS  = $(wildcard ./*.go)
 DOCKERTAG  ?= "latest"
+REGISTRY ?= ""
 
 .PHONY: dep test clean docker
 
@@ -47,4 +48,4 @@
 	rm -f bbsim openolt/openolt.pb.go
 
 docker:
-	docker build -t voltha/voltha-bbsim:${DOCKERTAG} .
+	docker build -t ${REGISTRY}voltha/voltha-bbsim:${DOCKERTAG} .
diff --git a/README.md b/README.md
index 6bd5eae..d125259 100644
--- a/README.md
+++ b/README.md
@@ -51,9 +51,9 @@
 Usage of ./bbsim:
   -H string
     	IP address:port (default ":50060")
-  -a int
+  -aw int
     	Wait time (sec) for activation WPA supplicants (default 30)
-  -d int
+  -dw int
     	Wait time (sec) for activation DHCP clients (default 10)
   -i int
     	Number of PON-IF ports (default 1)
diff --git a/bbsim.go b/bbsim.go
index 13f95f1..33c6cba 100644
--- a/bbsim.go
+++ b/bbsim.go
@@ -17,12 +17,12 @@
 package main
 
 import (
-	"gerrit.opencord.org/voltha-bbsim/core"
 	"log"
+
+	"gerrit.opencord.org/voltha-bbsim/common/logger"
+	"gerrit.opencord.org/voltha-bbsim/core"
 )
 
-
-
 func printBanner() {
 	log.Println("     ________    _______   ________                 ")
 	log.Println("    / ____   | / ____   | / ______/  __            ")
@@ -33,10 +33,13 @@
 }
 
 func main() {
+
 	// CLI Shows up
 	printBanner()
 	opt := core.GetOptions()
+	logger.Setup(opt.KafkaBroker, "DEBUG")
 
 	mediator := core.NewMediator(opt)
+
 	mediator.Start()
 }
diff --git a/common/logger.go b/common/logger.go
deleted file mode 100644
index c707601..0000000
--- a/common/logger.go
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package logger
-
-import (
-	"log"
-	"strings"
-)
-
-func Error(s string, opts ...interface{}) {
-	trimmed := strings.TrimRight(s, "\n")
-	if len(opts) == 0 {
-		log.Printf("[ERROR]:%s\n", trimmed)
-	} else {
-		fmt := "[ERROR]:" + trimmed + "\n"
-		log.Printf(fmt, opts...)
-	}
-}
-
-func Debug(s string, opts ...interface{}) {
-	trimmed := strings.TrimRight(s, "\n")
-	if len(opts) == 0 {
-		log.Printf("[DEBUG]:%s\n", trimmed)
-	} else {
-		fmt := "[DEBUG]:" + trimmed + "\n"
-		log.Printf(fmt, opts...)
-	}
-}
-
-func Info(s string, opts ...interface{}) {
-	trimmed := strings.TrimRight(s, "\n")
-	if len(opts) == 0 {
-		log.Printf("[INFO]:%s\n", trimmed)
-	} else {
-		fmt := "[INFO]:" + trimmed + "\n"
-		log.Printf(fmt, opts...)
-	}
-}
diff --git a/common/logger/logger.go b/common/logger/logger.go
new file mode 100644
index 0000000..f0bf4aa
--- /dev/null
+++ b/common/logger/logger.go
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package logger
+
+import (
+	"fmt"
+
+	lkh "github.com/gfremex/logrus-kafka-hook"
+	log "github.com/sirupsen/logrus"
+)
+
+var (
+	myLogger *log.Entry
+)
+
+func Setup(kafkaBroker string, level string) {
+
+	logger := log.New()
+	myLogger = logger.WithField("topics", []string{"bbsim.log"})
+
+	// TODO make this configurable via cli arg
+	if level == "DEBUG" {
+		logger.SetLevel(log.DebugLevel)
+	}
+
+	if len(kafkaBroker) > 0 {
+		myLogger.Debug("Setting up kafka integration")
+		hook, err := lkh.NewKafkaHook(
+			"kh",
+			[]log.Level{log.DebugLevel, log.InfoLevel, log.WarnLevel, log.ErrorLevel},
+			&log.JSONFormatter{
+				FieldMap: log.FieldMap{
+					log.FieldKeyTime:  "@timestamp",
+					log.FieldKeyLevel: "level",
+					log.FieldKeyMsg:   "message",
+				},
+			},
+			[]string{kafkaBroker},
+		)
+
+		if err != nil {
+			myLogger.Error(err)
+		}
+
+		logger.Hooks.Add(hook)
+
+	}
+
+	myLogger.WithField("kafkaBroker", kafkaBroker).Debug("Logger setup done")
+}
+
+func WithField(key string, value interface{}) *log.Entry {
+	return myLogger.WithField(key, value)
+}
+
+func WithFields(fields log.Fields) *log.Entry {
+	return myLogger.WithFields(fields)
+}
+
+func Panic(args ...interface{}) {
+	myLogger.Panic(fmt.Sprint(args...))
+}
+
+func Fatal(args ...interface{}) {
+	myLogger.Fatal(fmt.Sprint(args...))
+}
+
+func Error(args ...interface{}) {
+	myLogger.Error(fmt.Sprint(args...))
+}
+
+func Warn(args ...interface{}) {
+	myLogger.Warn(fmt.Sprint(args...))
+}
+
+func Info(args ...interface{}) {
+	myLogger.Info(fmt.Sprint(args...))
+}
+
+func Debug(args ...interface{}) {
+	myLogger.Debug(fmt.Sprint(args...))
+}
diff --git a/common/utils/utils.go b/common/utils/utils.go
new file mode 100644
index 0000000..a8fc3ba
--- /dev/null
+++ b/common/utils/utils.go
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+	"fmt"
+
+	"gerrit.opencord.org/voltha-bbsim/device"
+)
+
+func OnuToSn(onu *device.Onu) string {
+	// TODO this can be more elegant,
+	// see https://github.com/opencord/voltha/blob/master/voltha/adapters/openolt/openolt_device.py#L929-L943
+	return string(onu.SerialNumber.VendorId) + "00000" + fmt.Sprint(onu.IntfID) + "0" + fmt.Sprintf("%x", onu.OnuID)
+}
diff --git a/core/core_server.go b/core/core_server.go
index 0e0ea1b..55166dc 100644
--- a/core/core_server.go
+++ b/core/core_server.go
@@ -19,15 +19,18 @@
 import (
 	"context"
 	"errors"
-	"gerrit.opencord.org/voltha-bbsim/common"
+	"strconv"
+	"sync"
+
+	"gerrit.opencord.org/voltha-bbsim/common/logger"
+	"gerrit.opencord.org/voltha-bbsim/common/utils"
 	"gerrit.opencord.org/voltha-bbsim/device"
 	"gerrit.opencord.org/voltha-bbsim/protos"
 	"github.com/google/gopacket"
 	"github.com/google/gopacket/layers"
 	"github.com/google/gopacket/pcap"
+	log "github.com/sirupsen/logrus"
 	"google.golang.org/grpc"
-	"strconv"
-	"sync"
 )
 
 const (
@@ -64,35 +67,34 @@
 type coreState int
 
 const (
-	INACTIVE   = iota   // OLT/ONUs are not instantiated
-	PRE_ACTIVE       	// Before PacketInDaemon Running
-	ACTIVE				// After PacketInDaemon Running
+	INACTIVE   = iota // OLT/ONUs are not instantiated
+	PRE_ACTIVE        // Before PacketInDaemon Running
+	ACTIVE            // After PacketInDaemon Running
 )
 
 /* coreState
 INACTIVE -> PRE_ACTIVE -> ACTIVE
     (ActivateOLT)   (Enable)
        <-              <-
- */
+*/
 
-
-func NewCore(opt *option) (*Server) {
+func NewCore(opt *option) *Server {
 	// TODO: make it decent
 	oltid := opt.oltid
-	npon  := opt.npon
+	npon := opt.npon
 	nonus := opt.nonus
 	s := Server{
-		Olt: device.NewOlt(oltid, npon, 1),
-		Onumap: make(map[uint32][]*device.Onu),
-		Ioinfos: []*Ioinfo{},
-		gRPCAddress: opt.address,
-		gRPCPort: opt.port,
-		Vethnames: []string{},
-		IndInterval: opt.intvl,
-		Processes: []string{},
+		Olt:          device.NewOlt(oltid, npon, 1),
+		Onumap:       make(map[uint32][]*device.Onu),
+		Ioinfos:      []*Ioinfo{},
+		gRPCAddress:  opt.address,
+		gRPCPort:     opt.port,
+		Vethnames:    []string{},
+		IndInterval:  opt.intvl,
+		Processes:    []string{},
 		EnableServer: new(openolt.Openolt_EnableIndicationServer),
-		state: INACTIVE,
-		stateChan: make(chan coreState, 8),
+		state:        INACTIVE,
+		stateChan:    make(chan coreState, 8),
 	}
 
 	nnni := s.Olt.NumNniIntf
@@ -113,10 +115,10 @@
 }
 
 //Blocking
-func (s *Server) Start () error {
+func (s *Server) Start() error {
 	s.wg = &sync.WaitGroup{}
 	logger.Debug("Start() Start")
-	defer func(){
+	defer func() {
 		close(s.stateChan)
 		logger.Debug("Start() Done")
 	}()
@@ -137,7 +139,7 @@
 }
 
 //Non-Blocking
-func (s *Server) Stop () {
+func (s *Server) Stop() {
 	logger.Debug("Stop() Start")
 	defer logger.Debug("Stop() Done")
 	if s.gRPCserver != nil {
@@ -150,7 +152,7 @@
 
 // Blocking
 func (s *Server) Enable(sv *openolt.Openolt_EnableIndicationServer) error {
-	defer func(){
+	defer func() {
 		olt := s.Olt
 		olt.InitializeStatus()
 		for intfid, _ := range s.Onumap {
@@ -171,7 +173,7 @@
 	coreCtx := context.Background()
 	coreCtx, corecancel := context.WithCancel(coreCtx)
 	s.cancel = corecancel
-	if err := s.StartPktInDaemon(coreCtx, *sv) ; err != nil {
+	if err := s.StartPktInDaemon(coreCtx, *sv); err != nil {
 		return err
 	}
 	return nil
@@ -179,14 +181,14 @@
 
 //Non-Blocking
 func (s *Server) Disable() {
-	defer func(){
+	defer func() {
 		logger.Debug("Disable() Done")
 	}()
 	logger.Debug("Disable() Start")
 	s.StopPktInDaemon()
 }
 
-func (s *Server) updateState(state coreState){
+func (s *Server) updateState(state coreState) {
 	s.state = state
 	s.stateChan <- state
 	logger.Debug("State updated to:%d", state)
@@ -245,11 +247,10 @@
 	return nil
 }
 
-
 // Blocking
 func (s *Server) StartPktInDaemon(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
 	logger.Debug("StartPktInDaemon() Start")
-	defer func(){
+	defer func() {
 		RemoveVeths(s.Vethnames)
 		s.Vethnames = []string{}
 		s.Ioinfos = []*Ioinfo{}
@@ -360,35 +361,60 @@
 			layerEth := pkt.Layer(layers.LayerTypeEthernet)
 			le, _ := layerEth.(*layers.Ethernet)
 			ethtype := le.EthernetType
+			onu, _ := getOnuByID(s.Onumap, onuid)
 
 			if ethtype == 0x888e {
-				logger.Debug("Received upstream packet is EAPOL.")
-				//log.Println(unipkt.Pkt.Dump())
-				//log.Println(pkt.Dump())
+				logger.WithFields(log.Fields{
+					"serial_number": utils.OnuToSn(onu),
+					"gemId":         gemid,
+					"interfaceId":   intfid,
+					"onuId":         onuid,
+				}).Debug("Received upstream packet is EAPOL.")
 			} else if layerDHCP := pkt.Layer(layers.LayerTypeDHCPv4); layerDHCP != nil {
-				logger.Debug("Received upstream packet is DHCP.")
+				logger.WithFields(log.Fields{
+					"serial_number": utils.OnuToSn(onu),
+					"gemId":         gemid,
+					"interfaceId":   intfid,
+					"onuId":         onuid,
+				}).Debug("Received upstream packet is DHCP.")
 
 				//C-TAG
-				onu, _ := getOnuByID(s.Onumap, onuid)
 				sn := convB2S(onu.SerialNumber.VendorSpecific)
 				if ctag, ok := s.CtagMap[sn]; ok == true {
 					tagpkt, err := PushVLAN(pkt, uint16(ctag))
 					if err != nil {
-						logger.Error("Fail to tag C-tag")
+						logger.WithFields(log.Fields{
+							"serial_number": utils.OnuToSn(onu),
+							"gemId":         gemid,
+							"interfaceId":   intfid,
+							"onuId":         onuid,
+						}).Error("Fail to tag C-tag")
 					} else {
 						pkt = tagpkt
 					}
 				} else {
-					logger.Error("Could not find the onuid %d (SN: %s) in CtagMap %v!\n", onuid, sn, s.CtagMap)
+					logger.WithFields(log.Fields{
+						"serial_number": utils.OnuToSn(onu),
+						"gemId":         gemid,
+						"interfaceId":   intfid,
+						"onuId":         onuid,
+						"sn":            sn,
+						"cTagMap":       s.CtagMap,
+					}).Error("Could not find onuid in CtagMap", onuid, sn, s.CtagMap)
 				}
 			} else {
 				continue
 			}
 
-			logger.Debug("sendPktInd intfid:%d (onuid: %d) gemid:%d\n", intfid, onuid, gemid)
+			logger.WithFields(log.Fields{
+				"serial_number": utils.OnuToSn(onu),
+				"gemId":         gemid,
+				"interfaceId":   intfid,
+				"onuId":         onuid,
+			}).Debug("sendPktInd")
 			data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "pon", IntfId: intfid, GemportId: gemid, Pkt: pkt.Data()}}
 			if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
-				logger.Error("Fail to send PktInd indication. %v\n", err)
+				logger.Error("Fail to send PktInd indication.", err)
 				return err
 			}
 
@@ -397,14 +423,19 @@
 				logger.Info("WARNING: This packet does not come from NNI ")
 				continue
 			}
+			onuid := nnipkt.Info.onuid
+			onu, _ := getOnuByID(s.Onumap, onuid)
 
 			logger.Debug("Received packet in grpc Server from NNI.")
 			intfid := nnipkt.Info.intfid
 			pkt := nnipkt.Pkt
-			logger.Info("sendPktInd intfid:%d\n", intfid)
+			logger.WithFields(log.Fields{
+				"interfaceId":   intfid,
+				"serial_number": utils.OnuToSn(onu),
+			}).Info("sendPktInd")
 			data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "nni", IntfId: intfid, Pkt: pkt.Data()}}
 			if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
-				logger.Error("Fail to send PktInd indication. %v\n", err)
+				logger.Error("Fail to send PktInd indication.", err)
 				return err
 			}
 
@@ -427,10 +458,10 @@
 		ethtype := pkt.EthernetType
 		if ethtype == 0x888e {
 			logger.Debug("Received downstream packet is EAPOL.")
-			//log.Println(rawpkt.Dump())
+			//logger.Println(rawpkt.Dump())
 		} else if layerDHCP := rawpkt.Layer(layers.LayerTypeDHCPv4); layerDHCP != nil {
 			logger.Debug("Received downstream packet is DHCP.")
-			//log.Println(rawpkt.Dump())
+			//logger.Println(rawpkt.Dump())
 			rawpkt, _, _ = PopVLAN(rawpkt)
 			rawpkt, _, _ = PopVLAN(rawpkt)
 		} else {
diff --git a/core/grpc_service.go b/core/grpc_service.go
index e1866b8..75d8845 100644
--- a/core/grpc_service.go
+++ b/core/grpc_service.go
@@ -17,15 +17,15 @@
 package core
 
 import (
-	"gerrit.opencord.org/voltha-bbsim/common"
+	"net"
+
+	"gerrit.opencord.org/voltha-bbsim/common/logger"
 	"gerrit.opencord.org/voltha-bbsim/device"
 	"gerrit.opencord.org/voltha-bbsim/protos"
 	"github.com/google/gopacket"
 	"github.com/google/gopacket/layers"
 	"golang.org/x/net/context"
 	"google.golang.org/grpc"
-	"log"
-	"net"
 )
 
 // gRPC Service
@@ -117,7 +117,7 @@
 	if result == true {
 		matched, error := getOnuBySN(s.Onumap, onu.SerialNumber)
 		if error != nil {
-			log.Fatalf("%s\n", error)
+			logger.Fatal("%s\n", error)
 		}
 		onuid := onu.OnuId
 		matched.OnuID = onuid
diff --git a/core/io_info.go b/core/io_info.go
index 8f65280..867c88c 100644
--- a/core/io_info.go
+++ b/core/io_info.go
@@ -18,9 +18,11 @@
 
 import (
 	"errors"
-	"gerrit.opencord.org/voltha-bbsim/common"
-	"github.com/google/gopacket/pcap"
 	"os/exec"
+
+	"gerrit.opencord.org/voltha-bbsim/common/logger"
+	"github.com/google/gopacket/pcap"
+	log "github.com/sirupsen/logrus"
 )
 
 type Ioinfo struct {
@@ -39,7 +41,7 @@
 		}
 	}
 	err := errors.New("No matched Ioinfo is found")
-	logger.Error("%s", err)
+	logger.Error(err)
 	return nil, err
 }
 
@@ -50,7 +52,7 @@
 		}
 	}
 	err := errors.New("No matched Ioinfo is found")
-	logger.Error("%s", err)
+	logger.Error(err)
 	return nil, err
 }
 
@@ -63,39 +65,42 @@
 	}
 	if len(ioinfos) == 0 {
 		err := errors.New("No matched Ioinfo is found")
-		logger.Error("%s", err)
+		logger.Error(err)
 		return nil, err
 	}
 	return ioinfos, nil
 }
 
-func CreateVethPairs(name1 string, name2 string) (err error) {
-	err = exec.Command("ip", "link", "add", name1, "type", "veth", "peer", "name", name2).Run()
+func CreateVethPairs(veth1 string, veth2 string) (err error) {
+	err = exec.Command("ip", "link", "add", veth1, "type", "veth", "peer", "name", veth2).Run()
 	if err != nil {
-		logger.Error("Fail to createVeth() for %s and %s veth creation error: %s\n", name1, name2, err.Error())
+		logger.WithFields(log.Fields{
+			"veth1": veth1,
+			"veth2": veth2,
+		}).Error("Fail to createVethPair()", err.Error())
 		return
 	}
-	logger.Info("%s & %s was created.", name1, name2)
-	err = exec.Command("ip", "link", "set", name1, "up").Run()
+	logger.Info("%s & %s was created.", veth1, veth2)
+	err = exec.Command("ip", "link", "set", veth1, "up").Run()
 	if err != nil {
 		logger.Error("Fail to createVeth() veth1 up", err)
 		return
 	}
-	err = exec.Command("ip", "link", "set", name2, "up").Run()
+	err = exec.Command("ip", "link", "set", veth2, "up").Run()
 	if err != nil {
 		logger.Error("Fail to createVeth() veth2 up", err)
 		return
 	}
-	logger.Info("%s & %s was up.", name1, name2)
+	logger.Info("%s & %s was up.", veth1, veth2)
 	return
 }
 
 func RemoveVeth(name string) error {
 	err := exec.Command("ip", "link", "del", name).Run()
 	if err != nil {
-		logger.Error("Fail to removeVeth()", err)
+		logger.WithField("veth", name).Error("Fail to removeVeth()", err)
 	}
-	logger.Info("%s was removed.", name)
+	logger.WithField("veth", name).Info("Veth was removed.")
 	return err
 }
 
@@ -103,6 +108,6 @@
 	for _, name := range names {
 		RemoveVeth(name)
 	}
-	logger.Info("RemoveVeths() :%s\n", names)
+	logger.WithField("veths", names).Info("RemoveVeths(): ")
 	return
 }
diff --git a/core/io_worker.go b/core/io_worker.go
index 41aec4b..9a6cb6c 100644
--- a/core/io_worker.go
+++ b/core/io_worker.go
@@ -18,13 +18,14 @@
 
 import (
 	"errors"
-	"gerrit.opencord.org/voltha-bbsim/common"
-	"github.com/google/gopacket"
-	"github.com/google/gopacket/layers"
-	"github.com/google/gopacket/pcap"
 	"net"
 	"strconv"
 	"time"
+
+	"gerrit.opencord.org/voltha-bbsim/common/logger"
+	"github.com/google/gopacket"
+	"github.com/google/gopacket/layers"
+	"github.com/google/gopacket/pcap"
 )
 
 func RecvWorker(io *Ioinfo, handler *pcap.Handle, r chan Packet) {
@@ -32,7 +33,7 @@
 	packetSource := gopacket.NewPacketSource(handler, handler.LinkType())
 	for packet := range packetSource.Packets() {
 		logger.Debug("recv packet from IF: %v \n", *handler)
-		//log.Println(packet.Dump())
+		//logger.Println(packet.Dump())
 		pkt := Packet{}
 		pkt.Info = io
 		pkt.Pkt = packet
@@ -46,7 +47,7 @@
 		logger.Error("Error in send packet to UNI-IF: %v e:%s\n", *handle, err)
 	}
 	logger.Debug("Successfully send packet to UNI-IF: %v \n", *handle)
-	//log.Println(packet.Dump())
+	//logger.Println(packet.Dump())
 }
 
 func SendNni(handle *pcap.Handle, packet gopacket.Packet) {
@@ -55,7 +56,7 @@
 		logger.Error("Error in send packet to NNI e:%s\n", err)
 	}
 	logger.Debug("send packet to NNI-IF: %v \n", *handle)
-	//log.Println(packet.Dump())
+	//logger.Println(packet.Dump())
 }
 
 func PopVLAN(pkt gopacket.Packet) (gopacket.Packet, uint16, error) {
diff --git a/core/mediator.go b/core/mediator.go
index 496621a..a6d67b0 100644
--- a/core/mediator.go
+++ b/core/mediator.go
@@ -17,34 +17,37 @@
 package core
 
 import (
-	"sync"
-	"gerrit.opencord.org/voltha-bbsim/common"
+	"flag"
+	"fmt"
 	"os"
 	"os/signal"
-	"fmt"
-	"flag"
-	"strings"
 	"strconv"
+	"strings"
+	"sync"
+
+	"gerrit.opencord.org/voltha-bbsim/common/logger"
+	log "github.com/sirupsen/logrus"
 )
 
-type option struct{
-	address string
-	port uint32
-	oltid uint32
-	npon uint32
-	nonus uint32
-	aaawait int
-	dhcpwait int
-	dhcpservip string
-	intvl int
-	intvl_test int
-	Mode Mode
+type option struct {
+	address     string
+	port        uint32
+	oltid       uint32
+	npon        uint32
+	nonus       uint32
+	aaawait     int
+	dhcpwait    int
+	dhcpservip  string
+	intvl       int
+	intvl_test  int
+	Mode        Mode
+	KafkaBroker string
 }
 
 func GetOptions() *option {
 	o := new(option)
 	addressport := flag.String("H", ":50060", "IP address:port")
-	oltid :=flag.Int("id", 0, "OLT-ID")
+	oltid := flag.Int("id", 0, "OLT-ID")
 	npon := flag.Int("i", 1, "Number of PON-IF ports")
 	nonus := flag.Int("n", 1, "Number of ONUs per PON-IF port")
 	modeopt := flag.String("m", "default", "Emulation mode (default, aaa, both (aaa & dhcp))")
@@ -53,6 +56,7 @@
 	dhcpservip := flag.String("s", "182.21.0.1", "DHCP Server IP Address")
 	intvl := flag.Int("v", 1, "Interval each Indication")
 	intvl_test := flag.Int("V", 1, "Interval each Indication")
+	kafkaBroker := flag.String("k", "", "Kafka broker")
 	o.Mode = DEFAULT
 	flag.Parse()
 	if *modeopt == "aaa" {
@@ -68,27 +72,28 @@
 	o.dhcpservip = *dhcpservip
 	o.intvl = *intvl
 	o.intvl_test = *intvl_test
+	o.KafkaBroker = *kafkaBroker
 	o.address = (strings.Split(*addressport, ":")[0])
 	tmp, _ := strconv.Atoi(strings.Split(*addressport, ":")[1])
 	o.port = uint32(tmp)
 	return o
 }
 
-type stateMachine struct{
+type stateMachine struct {
 	handlers []*handler
-	state coreState
+	state    coreState
 }
 
-type handler struct{
-	dst coreState
-	src coreState
+type handler struct {
+	dst    coreState
+	src    coreState
 	method func(s *Server) error
 }
 
 func (sm *stateMachine) transit(next coreState) func(s *Server) error {
-	for _, handler := range sm.handlers{
+	for _, handler := range sm.handlers {
 		if handler.src == sm.state && handler.dst == next {
-			logger.Debug("Hit (src:%d, dst:%d)",handler.src, handler.dst)
+			logger.Debug("Hit (src:%d, dst:%d)", handler.src, handler.dst)
 			sm.state = next
 			return handler.method
 		}
@@ -98,27 +103,33 @@
 }
 
 type mediator struct {
-	opt *option
-	sm *stateMachine
+	opt    *option
+	sm     *stateMachine
 	server *Server
 	tester *Tester
 }
 
-func NewMediator(o *option) *mediator{
+func NewMediator(o *option) *mediator {
 	m := new(mediator)
 	m.opt = o
-	logger.Debug("ip:%s, baseport:%d, npon:%d, nonus:%d, mode:%d\n", o.address, o.port, o.npon, o.nonus, o.Mode)
+	logger.WithFields(log.Fields{
+		"ip":        o.address,
+		"baseport":  o.port,
+		"pon_ports": o.npon,
+		"onus":      o.nonus,
+		"mode":      o.Mode,
+	}).Debug("New mediator")
 	return m
 }
 
-func (m *mediator) Start(){
+func (m *mediator) Start() {
 	var wg sync.WaitGroup
 	opt := m.opt
 	server := NewCore(opt)
 	wg.Add(1)
-	go func(){
-		if err:= server.Start(); err != nil {	//Blocking
-			logger.Error("%s", err)
+	go func() {
+		if err := server.Start(); err != nil { //Blocking
+			logger.Error(err)
 		}
 		wg.Done()
 		return
@@ -130,18 +141,18 @@
 	m.sm = &stateMachine{
 		state: INACTIVE,
 		handlers: []*handler{
-			&handler{src: PRE_ACTIVE,dst: ACTIVE, method: m.tester.Start},
-			&handler{src: ACTIVE,dst: PRE_ACTIVE, method: m.tester.Stop},
+			&handler{src: PRE_ACTIVE, dst: ACTIVE, method: m.tester.Start},
+			&handler{src: ACTIVE, dst: PRE_ACTIVE, method: m.tester.Stop},
 		},
 	}
-	go func(){
+	go func() {
 		m.Mediate()
 	}()
 
 	c := make(chan os.Signal, 1)
 	signal.Notify(c, os.Interrupt)
 	go func() {
-		defer func(){
+		defer func() {
 			logger.Debug("SIGINT catcher Done")
 			wg.Done()
 		}()
@@ -149,8 +160,8 @@
 			wg.Add(1)
 			fmt.Println("SIGINT", sig)
 			close(c)
-			server.Stop()	//Non-blocking
-			tester.Stop(server)   //Non-blocking
+			server.Stop()       //Non-blocking
+			tester.Stop(server) //Non-blocking
 			return
 		}
 	}()
@@ -158,17 +169,17 @@
 	logger.Debug("Reach to the end line")
 }
 
-func (m *mediator) Mediate(){
+func (m *mediator) Mediate() {
 	wg := sync.WaitGroup{}
 	defer logger.Debug("Mediate Done")
-	for  corestat := range m.server.stateChan {
+	for corestat := range m.server.stateChan {
 		logger.Debug("Mediator receives state %d of server", corestat)
 		method := m.sm.transit(corestat)
 		if method != nil {
 			wg.Add(1)
 			defer wg.Done()
 			go func() error {
-				if err := method(m.server); err != nil{ //blocking
+				if err := method(m.server); err != nil { //blocking
 					m.server.Stop()
 					return err
 				}
@@ -177,4 +188,4 @@
 		}
 	}
 	wg.Wait()
-}
\ No newline at end of file
+}
diff --git a/core/openolt_service.go b/core/openolt_service.go
index 97047bf..41e6548 100644
--- a/core/openolt_service.go
+++ b/core/openolt_service.go
@@ -17,10 +17,13 @@
 package core
 
 import (
-	"gerrit.opencord.org/voltha-bbsim/common"
+	"time"
+
+	"gerrit.opencord.org/voltha-bbsim/common/logger"
+	"gerrit.opencord.org/voltha-bbsim/common/utils"
 	"gerrit.opencord.org/voltha-bbsim/device"
 	"gerrit.opencord.org/voltha-bbsim/protos"
-	"time"
+	log "github.com/sirupsen/logrus"
 )
 
 func sendOltIndUp(stream openolt.Openolt_EnableIndicationServer, olt *device.Olt) error {
@@ -76,7 +79,12 @@
 			logger.Error("Failed to send ONUDiscInd [id: %d]: %v\n", i, err)
 			return err
 		}
-		logger.Info("sendONUDiscInd Onuid: %d\n", i)
+		logger.WithFields(log.Fields{
+			"serial_number": utils.OnuToSn(onu),
+			"interfaceId":   onu.IntfID,
+			"onuId":         onu.OnuID,
+			"oltId":         onu.OltID,
+		}).Info("sendONUDiscInd Onuid")
 	}
 	return nil
 }
@@ -89,7 +97,12 @@
 			logger.Error("Failed to send ONUInd [id: %d]: %v\n", i, err)
 			return err
 		}
-		logger.Info("sendONUInd Onuid: %d\n", i)
+		logger.WithFields(log.Fields{
+			"serial_number": utils.OnuToSn(onu),
+			"interfaceId":   onu.IntfID,
+			"onuId":         onu.OnuID,
+			"oltId":         onu.OltID,
+		}).Info("sendONUInd Onuid")
 	}
 	return nil
 }
diff --git a/core/tester.go b/core/tester.go
index ba4792b..cd4f96a 100644
--- a/core/tester.go
+++ b/core/tester.go
@@ -18,12 +18,13 @@
 
 import (
 	"context"
-	"gerrit.opencord.org/voltha-bbsim/common"
-	"golang.org/x/sync/errgroup"
-	"log"
 	"os/exec"
-	"time"
 	"sync"
+	"time"
+
+	"gerrit.opencord.org/voltha-bbsim/common/logger"
+	log "github.com/sirupsen/logrus"
+	"golang.org/x/sync/errgroup"
 )
 
 const (
@@ -44,7 +45,7 @@
 	cancel       context.CancelFunc
 }
 
-func NewTester(opt *option) *Tester{
+func NewTester(opt *option) *Tester {
 	t := new(Tester)
 	t.AAAWait = opt.aaawait
 	t.DhcpWait = opt.dhcpwait
@@ -55,11 +56,11 @@
 }
 
 //Blocking
-func (t *Tester) Start (s *Server) error {
+func (t *Tester) Start(s *Server) error {
 	ctx := context.Background()
 	ctx, cancel := context.WithCancel(ctx)
 	t.cancel = cancel
-	defer func(){
+	defer func() {
 		cancel()
 		t.Initialize()
 		logger.Debug("Tester Done")
@@ -106,16 +107,15 @@
 	return nil
 }
 
-func (t *Tester) Initialize (){
+func (t *Tester) Initialize() {
 	logger.Info("Tester Initialize () called")
 	processes := t.Processes
 	logger.Debug("Runnig Process: %s", processes)
 	KillProcesses(processes)
-	exec.Command("rm", "/var/run/dhcpd.pid").Run()	//This is for DHCP server activation
-	exec.Command("touch", "/var/run/dhcpd.pid").Run()	//This is for DHCP server activation
+	exec.Command("rm", "/var/run/dhcpd.pid").Run()    //This is for DHCP server activation
+	exec.Command("touch", "/var/run/dhcpd.pid").Run() //This is for DHCP server activation
 }
 
-
 func (t *Tester) exeAAATest(ctx context.Context, s *Server, wait int) error {
 	tick := time.NewTicker(time.Second)
 	defer tick.Stop()
@@ -130,17 +130,17 @@
 		univeths = append(univeths, info.Name)
 	}
 
-	for sec := 1; sec <= wait; sec ++ {
+	for sec := 1; sec <= wait; sec++ {
 		select {
 		case <-ctx.Done():
 			logger.Debug("exeAAATest thread receives close ")
 			return nil
 		case <-tick.C:
-			logger.Info("exeAAATest stands by ... %dsec\n", wait - sec)
+			logger.WithField("seconds", wait-sec).Info("exeAAATest stands by ...")
 			if sec == wait {
 				wg := sync.WaitGroup{}
 				wg.Add(1)
-				go func() error{
+				go func() error {
 					defer wg.Done()
 					err = activateWPASups(ctx, univeths, t.Intvl)
 					if err != nil {
@@ -184,25 +184,27 @@
 		univeths = append(univeths, info.Name)
 	}
 
-	for sec := 1; sec <= wait; sec ++ {
+	for sec := 1; sec <= wait; sec++ {
 		select {
-		case <- ctx.Done():
+		case <-ctx.Done():
 			logger.Debug("exeDHCPTest thread receives close ")
 			return nil
-		case <- tick.C:
-			logger.Info("exeDHCPTest stands by ... %dsec\n", wait- sec)
+		case <-tick.C:
+			logger.WithField("seconds", wait-sec).Info("exeDHCPTest stands by ...")
 			if sec == wait {
 				wg := sync.WaitGroup{}
 				wg.Add(1)
-				go func() error{
+				go func() error {
 					defer wg.Done()
 					err = activateDHCPClients(ctx, univeths, t.Intvl)
 					if err != nil {
 						return err
 					}
-					logger.Info("DHCP clients are successfully activated ")
+					logger.WithFields(log.Fields{
+						"univeths": univeths,
+					}).Info("DHCP clients are successfully activated")
 					t.Processes = append(t.Processes, "dhclient")
-					logger.Debug("Running Process:%s", t.Processes)
+					logger.Debug("Running Process: ", t.Processes)
 					return nil
 				}()
 				wg.Wait()
@@ -224,17 +226,17 @@
 	defer tick.Stop()
 	i := 0
 	for {
-		select{
-		case <- tick.C:
+		select {
+		case <-tick.C:
 			if i < len(vethnames) {
 				vethname := vethnames[i]
 				if err := activateWPASupplicant(vethname); err != nil {
 					return err
 				}
 				logger.Debug("activateWPASupplicant for interface %v\n", vethname)
-				i ++
+				i++
 			}
-		case <- ctx.Done():
+		case <-ctx.Done():
 			logger.Debug("activateWPASups was canceled by context.")
 			return nil
 		}
@@ -247,17 +249,19 @@
 	defer tick.Stop()
 	i := 0
 	for {
-		select{
-		case <- tick.C:
-			if i < len(vethnames){
+		select {
+		case <-tick.C:
+			if i < len(vethnames) {
 				vethname := vethnames[i]
 				if err := activateDHCPClient(vethname); err != nil {
 					return err
 				}
-				logger.Debug("activateDHCPClient for interface %v\n", vethname)
-				i ++
+				logger.WithFields(log.Fields{
+					"interface": vethname,
+				}).Debug("activateDHCPClient")
+				i++
 			}
-		case <- ctx.Done():
+		case <-ctx.Done():
 			logger.Debug("activateDHCPClients was canceled by context.")
 			return nil
 		}
@@ -293,7 +297,7 @@
 	// if err := cmd.Run(); err != nil {
 	if err := cmd.Start(); err != nil {
 		logger.Error("Fail to activateDHCPClient() for: %s", vethname)
-		log.Panic(err)
+		logger.Panic(err)
 	}
 	logger.Debug("activateDHCPClient() done for: %s\n", vethname)
 	return
diff --git a/device/device_onu.go b/device/device_onu.go
index de99eb3..24f2cbf 100644
--- a/device/device_onu.go
+++ b/device/device_onu.go
@@ -17,10 +17,12 @@
 package device
 
 import (
-	"gerrit.opencord.org/voltha-bbsim/common"
-	"gerrit.opencord.org/voltha-bbsim/protos"
 	"reflect"
 	"sync"
+
+	"gerrit.opencord.org/voltha-bbsim/common/logger"
+	"gerrit.opencord.org/voltha-bbsim/protos"
+	log "github.com/sirupsen/logrus"
 )
 
 type onuState int
@@ -32,6 +34,7 @@
 
 type Onu struct {
 	InternalState *onuState
+	OltID         uint32
 	IntfID        uint32
 	OperState     string
 	SerialNumber  *openolt.SerialNumber
@@ -52,6 +55,7 @@
 		*onu.InternalState = ONU_PRE_ACTIVATED
 		onu.mu = &sync.Mutex{}
 		onu.IntfID = intfid
+		onu.OltID = oltid
 		onu.OperState = "up"
 		onu.SerialNumber = new(openolt.SerialNumber)
 		onu.SerialNumber.VendorId = []byte("BBSM")
@@ -82,9 +86,12 @@
 }
 
 func UpdateOnusOpStatus(ponif uint32, onus []*Onu, opstatus string) {
-	for i, onu := range onus {
+	for _, onu := range onus {
 		onu.OperState = "up"
-		logger.Info("(PONIF:%d) ONU [%d] discovered.\n", ponif, i)
+		logger.WithFields(log.Fields{
+			"onu":           onu.SerialNumber,
+			"pon_interface": ponif,
+		}).Info("ONU discovered.")
 	}
 }