[SEBA-342] Publishing logs to kafka
Change-Id: Iaa72945dfd59a5d151cad14ff593eb299229fb3e
diff --git a/core/mediator.go b/core/mediator.go
index 496621a..a6d67b0 100644
--- a/core/mediator.go
+++ b/core/mediator.go
@@ -17,34 +17,37 @@
package core
import (
- "sync"
- "gerrit.opencord.org/voltha-bbsim/common"
+ "flag"
+ "fmt"
"os"
"os/signal"
- "fmt"
- "flag"
- "strings"
"strconv"
+ "strings"
+ "sync"
+
+ "gerrit.opencord.org/voltha-bbsim/common/logger"
+ log "github.com/sirupsen/logrus"
)
-type option struct{
- address string
- port uint32
- oltid uint32
- npon uint32
- nonus uint32
- aaawait int
- dhcpwait int
- dhcpservip string
- intvl int
- intvl_test int
- Mode Mode
+type option struct {
+ address string
+ port uint32
+ oltid uint32
+ npon uint32
+ nonus uint32
+ aaawait int
+ dhcpwait int
+ dhcpservip string
+ intvl int
+ intvl_test int
+ Mode Mode
+ KafkaBroker string
}
func GetOptions() *option {
o := new(option)
addressport := flag.String("H", ":50060", "IP address:port")
- oltid :=flag.Int("id", 0, "OLT-ID")
+ oltid := flag.Int("id", 0, "OLT-ID")
npon := flag.Int("i", 1, "Number of PON-IF ports")
nonus := flag.Int("n", 1, "Number of ONUs per PON-IF port")
modeopt := flag.String("m", "default", "Emulation mode (default, aaa, both (aaa & dhcp))")
@@ -53,6 +56,7 @@
dhcpservip := flag.String("s", "182.21.0.1", "DHCP Server IP Address")
intvl := flag.Int("v", 1, "Interval each Indication")
intvl_test := flag.Int("V", 1, "Interval each Indication")
+ kafkaBroker := flag.String("k", "", "Kafka broker")
o.Mode = DEFAULT
flag.Parse()
if *modeopt == "aaa" {
@@ -68,27 +72,28 @@
o.dhcpservip = *dhcpservip
o.intvl = *intvl
o.intvl_test = *intvl_test
+ o.KafkaBroker = *kafkaBroker
o.address = (strings.Split(*addressport, ":")[0])
tmp, _ := strconv.Atoi(strings.Split(*addressport, ":")[1])
o.port = uint32(tmp)
return o
}
-type stateMachine struct{
+type stateMachine struct {
handlers []*handler
- state coreState
+ state coreState
}
-type handler struct{
- dst coreState
- src coreState
+type handler struct {
+ dst coreState
+ src coreState
method func(s *Server) error
}
func (sm *stateMachine) transit(next coreState) func(s *Server) error {
- for _, handler := range sm.handlers{
+ for _, handler := range sm.handlers {
if handler.src == sm.state && handler.dst == next {
- logger.Debug("Hit (src:%d, dst:%d)",handler.src, handler.dst)
+ logger.Debug("Hit (src:%d, dst:%d)", handler.src, handler.dst)
sm.state = next
return handler.method
}
@@ -98,27 +103,33 @@
}
type mediator struct {
- opt *option
- sm *stateMachine
+ opt *option
+ sm *stateMachine
server *Server
tester *Tester
}
-func NewMediator(o *option) *mediator{
+func NewMediator(o *option) *mediator {
m := new(mediator)
m.opt = o
- logger.Debug("ip:%s, baseport:%d, npon:%d, nonus:%d, mode:%d\n", o.address, o.port, o.npon, o.nonus, o.Mode)
+ logger.WithFields(log.Fields{
+ "ip": o.address,
+ "baseport": o.port,
+ "pon_ports": o.npon,
+ "onus": o.nonus,
+ "mode": o.Mode,
+ }).Debug("New mediator")
return m
}
-func (m *mediator) Start(){
+func (m *mediator) Start() {
var wg sync.WaitGroup
opt := m.opt
server := NewCore(opt)
wg.Add(1)
- go func(){
- if err:= server.Start(); err != nil { //Blocking
- logger.Error("%s", err)
+ go func() {
+ if err := server.Start(); err != nil { //Blocking
+ logger.Error(err)
}
wg.Done()
return
@@ -130,18 +141,18 @@
m.sm = &stateMachine{
state: INACTIVE,
handlers: []*handler{
- &handler{src: PRE_ACTIVE,dst: ACTIVE, method: m.tester.Start},
- &handler{src: ACTIVE,dst: PRE_ACTIVE, method: m.tester.Stop},
+ &handler{src: PRE_ACTIVE, dst: ACTIVE, method: m.tester.Start},
+ &handler{src: ACTIVE, dst: PRE_ACTIVE, method: m.tester.Stop},
},
}
- go func(){
+ go func() {
m.Mediate()
}()
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
- defer func(){
+ defer func() {
logger.Debug("SIGINT catcher Done")
wg.Done()
}()
@@ -149,8 +160,8 @@
wg.Add(1)
fmt.Println("SIGINT", sig)
close(c)
- server.Stop() //Non-blocking
- tester.Stop(server) //Non-blocking
+ server.Stop() //Non-blocking
+ tester.Stop(server) //Non-blocking
return
}
}()
@@ -158,17 +169,17 @@
logger.Debug("Reach to the end line")
}
-func (m *mediator) Mediate(){
+func (m *mediator) Mediate() {
wg := sync.WaitGroup{}
defer logger.Debug("Mediate Done")
- for corestat := range m.server.stateChan {
+ for corestat := range m.server.stateChan {
logger.Debug("Mediator receives state %d of server", corestat)
method := m.sm.transit(corestat)
if method != nil {
wg.Add(1)
defer wg.Done()
go func() error {
- if err := method(m.server); err != nil{ //blocking
+ if err := method(m.server); err != nil { //blocking
m.server.Stop()
return err
}
@@ -177,4 +188,4 @@
}
}
wg.Wait()
-}
\ No newline at end of file
+}