[SEBA-342] Publishing logs to kafka
Change-Id: Iaa72945dfd59a5d151cad14ff593eb299229fb3e
diff --git a/core/core_server.go b/core/core_server.go
index 0e0ea1b..55166dc 100644
--- a/core/core_server.go
+++ b/core/core_server.go
@@ -19,15 +19,18 @@
import (
"context"
"errors"
- "gerrit.opencord.org/voltha-bbsim/common"
+ "strconv"
+ "sync"
+
+ "gerrit.opencord.org/voltha-bbsim/common/logger"
+ "gerrit.opencord.org/voltha-bbsim/common/utils"
"gerrit.opencord.org/voltha-bbsim/device"
"gerrit.opencord.org/voltha-bbsim/protos"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
+ log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
- "strconv"
- "sync"
)
const (
@@ -64,35 +67,34 @@
type coreState int
const (
- INACTIVE = iota // OLT/ONUs are not instantiated
- PRE_ACTIVE // Before PacketInDaemon Running
- ACTIVE // After PacketInDaemon Running
+ INACTIVE = iota // OLT/ONUs are not instantiated
+ PRE_ACTIVE // Before PacketInDaemon Running
+ ACTIVE // After PacketInDaemon Running
)
/* coreState
INACTIVE -> PRE_ACTIVE -> ACTIVE
(ActivateOLT) (Enable)
<- <-
- */
+*/
-
-func NewCore(opt *option) (*Server) {
+func NewCore(opt *option) *Server {
// TODO: make it decent
oltid := opt.oltid
- npon := opt.npon
+ npon := opt.npon
nonus := opt.nonus
s := Server{
- Olt: device.NewOlt(oltid, npon, 1),
- Onumap: make(map[uint32][]*device.Onu),
- Ioinfos: []*Ioinfo{},
- gRPCAddress: opt.address,
- gRPCPort: opt.port,
- Vethnames: []string{},
- IndInterval: opt.intvl,
- Processes: []string{},
+ Olt: device.NewOlt(oltid, npon, 1),
+ Onumap: make(map[uint32][]*device.Onu),
+ Ioinfos: []*Ioinfo{},
+ gRPCAddress: opt.address,
+ gRPCPort: opt.port,
+ Vethnames: []string{},
+ IndInterval: opt.intvl,
+ Processes: []string{},
EnableServer: new(openolt.Openolt_EnableIndicationServer),
- state: INACTIVE,
- stateChan: make(chan coreState, 8),
+ state: INACTIVE,
+ stateChan: make(chan coreState, 8),
}
nnni := s.Olt.NumNniIntf
@@ -113,10 +115,10 @@
}
//Blocking
-func (s *Server) Start () error {
+func (s *Server) Start() error {
s.wg = &sync.WaitGroup{}
logger.Debug("Start() Start")
- defer func(){
+ defer func() {
close(s.stateChan)
logger.Debug("Start() Done")
}()
@@ -137,7 +139,7 @@
}
//Non-Blocking
-func (s *Server) Stop () {
+func (s *Server) Stop() {
logger.Debug("Stop() Start")
defer logger.Debug("Stop() Done")
if s.gRPCserver != nil {
@@ -150,7 +152,7 @@
// Blocking
func (s *Server) Enable(sv *openolt.Openolt_EnableIndicationServer) error {
- defer func(){
+ defer func() {
olt := s.Olt
olt.InitializeStatus()
for intfid, _ := range s.Onumap {
@@ -171,7 +173,7 @@
coreCtx := context.Background()
coreCtx, corecancel := context.WithCancel(coreCtx)
s.cancel = corecancel
- if err := s.StartPktInDaemon(coreCtx, *sv) ; err != nil {
+ if err := s.StartPktInDaemon(coreCtx, *sv); err != nil {
return err
}
return nil
@@ -179,14 +181,14 @@
//Non-Blocking
func (s *Server) Disable() {
- defer func(){
+ defer func() {
logger.Debug("Disable() Done")
}()
logger.Debug("Disable() Start")
s.StopPktInDaemon()
}
-func (s *Server) updateState(state coreState){
+func (s *Server) updateState(state coreState) {
s.state = state
s.stateChan <- state
logger.Debug("State updated to:%d", state)
@@ -245,11 +247,10 @@
return nil
}
-
// Blocking
func (s *Server) StartPktInDaemon(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
logger.Debug("StartPktInDaemon() Start")
- defer func(){
+ defer func() {
RemoveVeths(s.Vethnames)
s.Vethnames = []string{}
s.Ioinfos = []*Ioinfo{}
@@ -360,35 +361,60 @@
layerEth := pkt.Layer(layers.LayerTypeEthernet)
le, _ := layerEth.(*layers.Ethernet)
ethtype := le.EthernetType
+ onu, _ := getOnuByID(s.Onumap, onuid)
if ethtype == 0x888e {
- logger.Debug("Received upstream packet is EAPOL.")
- //log.Println(unipkt.Pkt.Dump())
- //log.Println(pkt.Dump())
+ logger.WithFields(log.Fields{
+ "serial_number": utils.OnuToSn(onu),
+ "gemId": gemid,
+ "interfaceId": intfid,
+ "onuId": onuid,
+ }).Debug("Received upstream packet is EAPOL.")
} else if layerDHCP := pkt.Layer(layers.LayerTypeDHCPv4); layerDHCP != nil {
- logger.Debug("Received upstream packet is DHCP.")
+ logger.WithFields(log.Fields{
+ "serial_number": utils.OnuToSn(onu),
+ "gemId": gemid,
+ "interfaceId": intfid,
+ "onuId": onuid,
+ }).Debug("Received upstream packet is DHCP.")
//C-TAG
- onu, _ := getOnuByID(s.Onumap, onuid)
sn := convB2S(onu.SerialNumber.VendorSpecific)
if ctag, ok := s.CtagMap[sn]; ok == true {
tagpkt, err := PushVLAN(pkt, uint16(ctag))
if err != nil {
- logger.Error("Fail to tag C-tag")
+ logger.WithFields(log.Fields{
+ "serial_number": utils.OnuToSn(onu),
+ "gemId": gemid,
+ "interfaceId": intfid,
+ "onuId": onuid,
+ }).Error("Fail to tag C-tag")
} else {
pkt = tagpkt
}
} else {
- logger.Error("Could not find the onuid %d (SN: %s) in CtagMap %v!\n", onuid, sn, s.CtagMap)
+ logger.WithFields(log.Fields{
+ "serial_number": utils.OnuToSn(onu),
+ "gemId": gemid,
+ "interfaceId": intfid,
+ "onuId": onuid,
+ "sn": sn,
+ "cTagMap": s.CtagMap,
+ }).Error("Could not find onuid in CtagMap", onuid, sn, s.CtagMap)
}
} else {
continue
}
- logger.Debug("sendPktInd intfid:%d (onuid: %d) gemid:%d\n", intfid, onuid, gemid)
+ logger.WithFields(log.Fields{
+ "serial_number": utils.OnuToSn(onu),
+ "gemId": gemid,
+ "interfaceId": intfid,
+ "onuId": onuid,
+ }).Debug("sendPktInd")
data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "pon", IntfId: intfid, GemportId: gemid, Pkt: pkt.Data()}}
if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
- logger.Error("Fail to send PktInd indication. %v\n", err)
+ logger.Error("Fail to send PktInd indication.", err)
return err
}
@@ -397,14 +423,19 @@
logger.Info("WARNING: This packet does not come from NNI ")
continue
}
+ onuid := nnipkt.Info.onuid
+ onu, _ := getOnuByID(s.Onumap, onuid)
logger.Debug("Received packet in grpc Server from NNI.")
intfid := nnipkt.Info.intfid
pkt := nnipkt.Pkt
- logger.Info("sendPktInd intfid:%d\n", intfid)
+ logger.WithFields(log.Fields{
+ "interfaceId": intfid,
+ "serial_number": utils.OnuToSn(onu),
+ }).Info("sendPktInd")
data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "nni", IntfId: intfid, Pkt: pkt.Data()}}
if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
- logger.Error("Fail to send PktInd indication. %v\n", err)
+ logger.Error("Fail to send PktInd indication.", err)
return err
}
@@ -427,10 +458,10 @@
ethtype := pkt.EthernetType
if ethtype == 0x888e {
logger.Debug("Received downstream packet is EAPOL.")
- //log.Println(rawpkt.Dump())
+ //logger.Println(rawpkt.Dump())
} else if layerDHCP := rawpkt.Layer(layers.LayerTypeDHCPv4); layerDHCP != nil {
logger.Debug("Received downstream packet is DHCP.")
- //log.Println(rawpkt.Dump())
+ //logger.Println(rawpkt.Dump())
rawpkt, _, _ = PopVLAN(rawpkt)
rawpkt, _, _ = PopVLAN(rawpkt)
} else {