[SEBA-342] Publishing logs to kafka

Change-Id: Iaa72945dfd59a5d151cad14ff593eb299229fb3e
diff --git a/core/core_server.go b/core/core_server.go
index 0e0ea1b..55166dc 100644
--- a/core/core_server.go
+++ b/core/core_server.go
@@ -19,15 +19,18 @@
 import (
 	"context"
 	"errors"
-	"gerrit.opencord.org/voltha-bbsim/common"
+	"strconv"
+	"sync"
+
+	"gerrit.opencord.org/voltha-bbsim/common/logger"
+	"gerrit.opencord.org/voltha-bbsim/common/utils"
 	"gerrit.opencord.org/voltha-bbsim/device"
 	"gerrit.opencord.org/voltha-bbsim/protos"
 	"github.com/google/gopacket"
 	"github.com/google/gopacket/layers"
 	"github.com/google/gopacket/pcap"
+	log "github.com/sirupsen/logrus"
 	"google.golang.org/grpc"
-	"strconv"
-	"sync"
 )
 
 const (
@@ -64,35 +67,34 @@
 type coreState int
 
 const (
-	INACTIVE   = iota   // OLT/ONUs are not instantiated
-	PRE_ACTIVE       	// Before PacketInDaemon Running
-	ACTIVE				// After PacketInDaemon Running
+	INACTIVE   = iota // OLT/ONUs are not instantiated
+	PRE_ACTIVE        // Before PacketInDaemon Running
+	ACTIVE            // After PacketInDaemon Running
 )
 
 /* coreState
 INACTIVE -> PRE_ACTIVE -> ACTIVE
     (ActivateOLT)   (Enable)
        <-              <-
- */
+*/
 
-
-func NewCore(opt *option) (*Server) {
+func NewCore(opt *option) *Server {
 	// TODO: make it decent
 	oltid := opt.oltid
-	npon  := opt.npon
+	npon := opt.npon
 	nonus := opt.nonus
 	s := Server{
-		Olt: device.NewOlt(oltid, npon, 1),
-		Onumap: make(map[uint32][]*device.Onu),
-		Ioinfos: []*Ioinfo{},
-		gRPCAddress: opt.address,
-		gRPCPort: opt.port,
-		Vethnames: []string{},
-		IndInterval: opt.intvl,
-		Processes: []string{},
+		Olt:          device.NewOlt(oltid, npon, 1),
+		Onumap:       make(map[uint32][]*device.Onu),
+		Ioinfos:      []*Ioinfo{},
+		gRPCAddress:  opt.address,
+		gRPCPort:     opt.port,
+		Vethnames:    []string{},
+		IndInterval:  opt.intvl,
+		Processes:    []string{},
 		EnableServer: new(openolt.Openolt_EnableIndicationServer),
-		state: INACTIVE,
-		stateChan: make(chan coreState, 8),
+		state:        INACTIVE,
+		stateChan:    make(chan coreState, 8),
 	}
 
 	nnni := s.Olt.NumNniIntf
@@ -113,10 +115,10 @@
 }
 
 //Blocking
-func (s *Server) Start () error {
+func (s *Server) Start() error {
 	s.wg = &sync.WaitGroup{}
 	logger.Debug("Start() Start")
-	defer func(){
+	defer func() {
 		close(s.stateChan)
 		logger.Debug("Start() Done")
 	}()
@@ -137,7 +139,7 @@
 }
 
 //Non-Blocking
-func (s *Server) Stop () {
+func (s *Server) Stop() {
 	logger.Debug("Stop() Start")
 	defer logger.Debug("Stop() Done")
 	if s.gRPCserver != nil {
@@ -150,7 +152,7 @@
 
 // Blocking
 func (s *Server) Enable(sv *openolt.Openolt_EnableIndicationServer) error {
-	defer func(){
+	defer func() {
 		olt := s.Olt
 		olt.InitializeStatus()
 		for intfid, _ := range s.Onumap {
@@ -171,7 +173,7 @@
 	coreCtx := context.Background()
 	coreCtx, corecancel := context.WithCancel(coreCtx)
 	s.cancel = corecancel
-	if err := s.StartPktInDaemon(coreCtx, *sv) ; err != nil {
+	if err := s.StartPktInDaemon(coreCtx, *sv); err != nil {
 		return err
 	}
 	return nil
@@ -179,14 +181,14 @@
 
 //Non-Blocking
 func (s *Server) Disable() {
-	defer func(){
+	defer func() {
 		logger.Debug("Disable() Done")
 	}()
 	logger.Debug("Disable() Start")
 	s.StopPktInDaemon()
 }
 
-func (s *Server) updateState(state coreState){
+func (s *Server) updateState(state coreState) {
 	s.state = state
 	s.stateChan <- state
 	logger.Debug("State updated to:%d", state)
@@ -245,11 +247,10 @@
 	return nil
 }
 
-
 // Blocking
 func (s *Server) StartPktInDaemon(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
 	logger.Debug("StartPktInDaemon() Start")
-	defer func(){
+	defer func() {
 		RemoveVeths(s.Vethnames)
 		s.Vethnames = []string{}
 		s.Ioinfos = []*Ioinfo{}
@@ -360,35 +361,60 @@
 			layerEth := pkt.Layer(layers.LayerTypeEthernet)
 			le, _ := layerEth.(*layers.Ethernet)
 			ethtype := le.EthernetType
+			onu, _ := getOnuByID(s.Onumap, onuid)
 
 			if ethtype == 0x888e {
-				logger.Debug("Received upstream packet is EAPOL.")
-				//log.Println(unipkt.Pkt.Dump())
-				//log.Println(pkt.Dump())
+				logger.WithFields(log.Fields{
+					"serial_number": utils.OnuToSn(onu),
+					"gemId":         gemid,
+					"interfaceId":   intfid,
+					"onuId":         onuid,
+				}).Debug("Received upstream packet is EAPOL.")
 			} else if layerDHCP := pkt.Layer(layers.LayerTypeDHCPv4); layerDHCP != nil {
-				logger.Debug("Received upstream packet is DHCP.")
+				logger.WithFields(log.Fields{
+					"serial_number": utils.OnuToSn(onu),
+					"gemId":         gemid,
+					"interfaceId":   intfid,
+					"onuId":         onuid,
+				}).Debug("Received upstream packet is DHCP.")
 
 				//C-TAG
-				onu, _ := getOnuByID(s.Onumap, onuid)
 				sn := convB2S(onu.SerialNumber.VendorSpecific)
 				if ctag, ok := s.CtagMap[sn]; ok == true {
 					tagpkt, err := PushVLAN(pkt, uint16(ctag))
 					if err != nil {
-						logger.Error("Fail to tag C-tag")
+						logger.WithFields(log.Fields{
+							"serial_number": utils.OnuToSn(onu),
+							"gemId":         gemid,
+							"interfaceId":   intfid,
+							"onuId":         onuid,
+						}).Error("Fail to tag C-tag")
 					} else {
 						pkt = tagpkt
 					}
 				} else {
-					logger.Error("Could not find the onuid %d (SN: %s) in CtagMap %v!\n", onuid, sn, s.CtagMap)
+					logger.WithFields(log.Fields{
+						"serial_number": utils.OnuToSn(onu),
+						"gemId":         gemid,
+						"interfaceId":   intfid,
+						"onuId":         onuid,
+						"sn":            sn,
+						"cTagMap":       s.CtagMap,
+					}).Error("Could not find onuid in CtagMap", onuid, sn, s.CtagMap)
 				}
 			} else {
 				continue
 			}
 
-			logger.Debug("sendPktInd intfid:%d (onuid: %d) gemid:%d\n", intfid, onuid, gemid)
+			logger.WithFields(log.Fields{
+				"serial_number": utils.OnuToSn(onu),
+				"gemId":         gemid,
+				"interfaceId":   intfid,
+				"onuId":         onuid,
+			}).Debug("sendPktInd")
 			data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "pon", IntfId: intfid, GemportId: gemid, Pkt: pkt.Data()}}
 			if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
-				logger.Error("Fail to send PktInd indication. %v\n", err)
+				logger.Error("Fail to send PktInd indication.", err)
 				return err
 			}
 
@@ -397,14 +423,19 @@
 				logger.Info("WARNING: This packet does not come from NNI ")
 				continue
 			}
+			onuid := nnipkt.Info.onuid
+			onu, _ := getOnuByID(s.Onumap, onuid)
 
 			logger.Debug("Received packet in grpc Server from NNI.")
 			intfid := nnipkt.Info.intfid
 			pkt := nnipkt.Pkt
-			logger.Info("sendPktInd intfid:%d\n", intfid)
+			logger.WithFields(log.Fields{
+				"interfaceId":   intfid,
+				"serial_number": utils.OnuToSn(onu),
+			}).Info("sendPktInd")
 			data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "nni", IntfId: intfid, Pkt: pkt.Data()}}
 			if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
-				logger.Error("Fail to send PktInd indication. %v\n", err)
+				logger.Error("Fail to send PktInd indication.", err)
 				return err
 			}
 
@@ -427,10 +458,10 @@
 		ethtype := pkt.EthernetType
 		if ethtype == 0x888e {
 			logger.Debug("Received downstream packet is EAPOL.")
-			//log.Println(rawpkt.Dump())
+			//logger.Println(rawpkt.Dump())
 		} else if layerDHCP := rawpkt.Layer(layers.LayerTypeDHCPv4); layerDHCP != nil {
 			logger.Debug("Received downstream packet is DHCP.")
-			//log.Println(rawpkt.Dump())
+			//logger.Println(rawpkt.Dump())
 			rawpkt, _, _ = PopVLAN(rawpkt)
 			rawpkt, _, _ = PopVLAN(rawpkt)
 		} else {