Merge "[SEBA-342] Publishing logs to kafka"
diff --git a/Makefile b/Makefile
index 2c3bf1e..cd3402e 100644
--- a/Makefile
+++ b/Makefile
@@ -14,6 +14,7 @@
BBSIM_DEPS = $(wildcard ./*.go)
DOCKERTAG ?= "latest"
+REGISTRY ?= ""
.PHONY: dep test clean docker
@@ -47,4 +48,4 @@
rm -f bbsim openolt/openolt.pb.go
docker:
- docker build -t voltha/voltha-bbsim:${DOCKERTAG} .
+ docker build -t ${REGISTRY}voltha/voltha-bbsim:${DOCKERTAG} .
diff --git a/README.md b/README.md
index 6bd5eae..d125259 100644
--- a/README.md
+++ b/README.md
@@ -51,9 +51,9 @@
Usage of ./bbsim:
-H string
IP address:port (default ":50060")
- -a int
+ -aw int
Wait time (sec) for activation WPA supplicants (default 30)
- -d int
+ -dw int
Wait time (sec) for activation DHCP clients (default 10)
-i int
Number of PON-IF ports (default 1)
diff --git a/bbsim.go b/bbsim.go
index 13f95f1..33c6cba 100644
--- a/bbsim.go
+++ b/bbsim.go
@@ -17,12 +17,12 @@
package main
import (
- "gerrit.opencord.org/voltha-bbsim/core"
"log"
+
+ "gerrit.opencord.org/voltha-bbsim/common/logger"
+ "gerrit.opencord.org/voltha-bbsim/core"
)
-
-
func printBanner() {
log.Println(" ________ _______ ________ ")
log.Println(" / ____ | / ____ | / ______/ __ ")
@@ -33,10 +33,13 @@
}
func main() {
+
// CLI Shows up
printBanner()
opt := core.GetOptions()
+ logger.Setup(opt.KafkaBroker, "DEBUG")
mediator := core.NewMediator(opt)
+
mediator.Start()
}
diff --git a/common/logger.go b/common/logger.go
deleted file mode 100644
index c707601..0000000
--- a/common/logger.go
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package logger
-
-import (
- "log"
- "strings"
-)
-
-func Error(s string, opts ...interface{}) {
- trimmed := strings.TrimRight(s, "\n")
- if len(opts) == 0 {
- log.Printf("[ERROR]:%s\n", trimmed)
- } else {
- fmt := "[ERROR]:" + trimmed + "\n"
- log.Printf(fmt, opts...)
- }
-}
-
-func Debug(s string, opts ...interface{}) {
- trimmed := strings.TrimRight(s, "\n")
- if len(opts) == 0 {
- log.Printf("[DEBUG]:%s\n", trimmed)
- } else {
- fmt := "[DEBUG]:" + trimmed + "\n"
- log.Printf(fmt, opts...)
- }
-}
-
-func Info(s string, opts ...interface{}) {
- trimmed := strings.TrimRight(s, "\n")
- if len(opts) == 0 {
- log.Printf("[INFO]:%s\n", trimmed)
- } else {
- fmt := "[INFO]:" + trimmed + "\n"
- log.Printf(fmt, opts...)
- }
-}
diff --git a/common/logger/logger.go b/common/logger/logger.go
new file mode 100644
index 0000000..f0bf4aa
--- /dev/null
+++ b/common/logger/logger.go
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package logger
+
+import (
+ "fmt"
+
+ lkh "github.com/gfremex/logrus-kafka-hook"
+ log "github.com/sirupsen/logrus"
+)
+
+var (
+ myLogger *log.Entry
+)
+
+func Setup(kafkaBroker string, level string) {
+
+ logger := log.New()
+ myLogger = logger.WithField("topics", []string{"bbsim.log"})
+
+ // TODO make this configurable via cli arg
+ if level == "DEBUG" {
+ logger.SetLevel(log.DebugLevel)
+ }
+
+ if len(kafkaBroker) > 0 {
+ myLogger.Debug("Setting up kafka integration")
+ hook, err := lkh.NewKafkaHook(
+ "kh",
+ []log.Level{log.DebugLevel, log.InfoLevel, log.WarnLevel, log.ErrorLevel},
+ &log.JSONFormatter{
+ FieldMap: log.FieldMap{
+ log.FieldKeyTime: "@timestamp",
+ log.FieldKeyLevel: "level",
+ log.FieldKeyMsg: "message",
+ },
+ },
+ []string{kafkaBroker},
+ )
+
+ if err != nil {
+ myLogger.Error(err)
+ }
+
+ logger.Hooks.Add(hook)
+
+ }
+
+ myLogger.WithField("kafkaBroker", kafkaBroker).Debug("Logger setup done")
+}
+
+func WithField(key string, value interface{}) *log.Entry {
+ return myLogger.WithField(key, value)
+}
+
+func WithFields(fields log.Fields) *log.Entry {
+ return myLogger.WithFields(fields)
+}
+
+func Panic(args ...interface{}) {
+ myLogger.Panic(fmt.Sprint(args...))
+}
+
+func Fatal(args ...interface{}) {
+ myLogger.Fatal(fmt.Sprint(args...))
+}
+
+func Error(args ...interface{}) {
+ myLogger.Error(fmt.Sprint(args...))
+}
+
+func Warn(args ...interface{}) {
+ myLogger.Warn(fmt.Sprint(args...))
+}
+
+func Info(args ...interface{}) {
+ myLogger.Info(fmt.Sprint(args...))
+}
+
+func Debug(args ...interface{}) {
+ myLogger.Debug(fmt.Sprint(args...))
+}
diff --git a/common/utils/utils.go b/common/utils/utils.go
new file mode 100644
index 0000000..a8fc3ba
--- /dev/null
+++ b/common/utils/utils.go
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+ "fmt"
+
+ "gerrit.opencord.org/voltha-bbsim/device"
+)
+
+func OnuToSn(onu *device.Onu) string {
+ // TODO this can be more elegant,
+ // see https://github.com/opencord/voltha/blob/master/voltha/adapters/openolt/openolt_device.py#L929-L943
+ return string(onu.SerialNumber.VendorId) + "00000" + fmt.Sprint(onu.IntfID) + "0" + fmt.Sprintf("%x", onu.OnuID)
+}
diff --git a/core/core_server.go b/core/core_server.go
index 0e0ea1b..55166dc 100644
--- a/core/core_server.go
+++ b/core/core_server.go
@@ -19,15 +19,18 @@
import (
"context"
"errors"
- "gerrit.opencord.org/voltha-bbsim/common"
+ "strconv"
+ "sync"
+
+ "gerrit.opencord.org/voltha-bbsim/common/logger"
+ "gerrit.opencord.org/voltha-bbsim/common/utils"
"gerrit.opencord.org/voltha-bbsim/device"
"gerrit.opencord.org/voltha-bbsim/protos"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/google/gopacket/pcap"
+ log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
- "strconv"
- "sync"
)
const (
@@ -64,35 +67,34 @@
type coreState int
const (
- INACTIVE = iota // OLT/ONUs are not instantiated
- PRE_ACTIVE // Before PacketInDaemon Running
- ACTIVE // After PacketInDaemon Running
+ INACTIVE = iota // OLT/ONUs are not instantiated
+ PRE_ACTIVE // Before PacketInDaemon Running
+ ACTIVE // After PacketInDaemon Running
)
/* coreState
INACTIVE -> PRE_ACTIVE -> ACTIVE
(ActivateOLT) (Enable)
<- <-
- */
+*/
-
-func NewCore(opt *option) (*Server) {
+func NewCore(opt *option) *Server {
// TODO: make it decent
oltid := opt.oltid
- npon := opt.npon
+ npon := opt.npon
nonus := opt.nonus
s := Server{
- Olt: device.NewOlt(oltid, npon, 1),
- Onumap: make(map[uint32][]*device.Onu),
- Ioinfos: []*Ioinfo{},
- gRPCAddress: opt.address,
- gRPCPort: opt.port,
- Vethnames: []string{},
- IndInterval: opt.intvl,
- Processes: []string{},
+ Olt: device.NewOlt(oltid, npon, 1),
+ Onumap: make(map[uint32][]*device.Onu),
+ Ioinfos: []*Ioinfo{},
+ gRPCAddress: opt.address,
+ gRPCPort: opt.port,
+ Vethnames: []string{},
+ IndInterval: opt.intvl,
+ Processes: []string{},
EnableServer: new(openolt.Openolt_EnableIndicationServer),
- state: INACTIVE,
- stateChan: make(chan coreState, 8),
+ state: INACTIVE,
+ stateChan: make(chan coreState, 8),
}
nnni := s.Olt.NumNniIntf
@@ -113,10 +115,10 @@
}
//Blocking
-func (s *Server) Start () error {
+func (s *Server) Start() error {
s.wg = &sync.WaitGroup{}
logger.Debug("Start() Start")
- defer func(){
+ defer func() {
close(s.stateChan)
logger.Debug("Start() Done")
}()
@@ -137,7 +139,7 @@
}
//Non-Blocking
-func (s *Server) Stop () {
+func (s *Server) Stop() {
logger.Debug("Stop() Start")
defer logger.Debug("Stop() Done")
if s.gRPCserver != nil {
@@ -150,7 +152,7 @@
// Blocking
func (s *Server) Enable(sv *openolt.Openolt_EnableIndicationServer) error {
- defer func(){
+ defer func() {
olt := s.Olt
olt.InitializeStatus()
for intfid, _ := range s.Onumap {
@@ -171,7 +173,7 @@
coreCtx := context.Background()
coreCtx, corecancel := context.WithCancel(coreCtx)
s.cancel = corecancel
- if err := s.StartPktInDaemon(coreCtx, *sv) ; err != nil {
+ if err := s.StartPktInDaemon(coreCtx, *sv); err != nil {
return err
}
return nil
@@ -179,14 +181,14 @@
//Non-Blocking
func (s *Server) Disable() {
- defer func(){
+ defer func() {
logger.Debug("Disable() Done")
}()
logger.Debug("Disable() Start")
s.StopPktInDaemon()
}
-func (s *Server) updateState(state coreState){
+func (s *Server) updateState(state coreState) {
s.state = state
s.stateChan <- state
logger.Debug("State updated to:%d", state)
@@ -245,11 +247,10 @@
return nil
}
-
// Blocking
func (s *Server) StartPktInDaemon(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
logger.Debug("StartPktInDaemon() Start")
- defer func(){
+ defer func() {
RemoveVeths(s.Vethnames)
s.Vethnames = []string{}
s.Ioinfos = []*Ioinfo{}
@@ -360,35 +361,60 @@
layerEth := pkt.Layer(layers.LayerTypeEthernet)
le, _ := layerEth.(*layers.Ethernet)
ethtype := le.EthernetType
+ onu, _ := getOnuByID(s.Onumap, onuid)
if ethtype == 0x888e {
- logger.Debug("Received upstream packet is EAPOL.")
- //log.Println(unipkt.Pkt.Dump())
- //log.Println(pkt.Dump())
+ logger.WithFields(log.Fields{
+ "serial_number": utils.OnuToSn(onu),
+ "gemId": gemid,
+ "interfaceId": intfid,
+ "onuId": onuid,
+ }).Debug("Received upstream packet is EAPOL.")
} else if layerDHCP := pkt.Layer(layers.LayerTypeDHCPv4); layerDHCP != nil {
- logger.Debug("Received upstream packet is DHCP.")
+ logger.WithFields(log.Fields{
+ "serial_number": utils.OnuToSn(onu),
+ "gemId": gemid,
+ "interfaceId": intfid,
+ "onuId": onuid,
+ }).Debug("Received upstream packet is DHCP.")
//C-TAG
- onu, _ := getOnuByID(s.Onumap, onuid)
sn := convB2S(onu.SerialNumber.VendorSpecific)
if ctag, ok := s.CtagMap[sn]; ok == true {
tagpkt, err := PushVLAN(pkt, uint16(ctag))
if err != nil {
- logger.Error("Fail to tag C-tag")
+ logger.WithFields(log.Fields{
+ "serial_number": utils.OnuToSn(onu),
+ "gemId": gemid,
+ "interfaceId": intfid,
+ "onuId": onuid,
+ }).Error("Fail to tag C-tag")
} else {
pkt = tagpkt
}
} else {
- logger.Error("Could not find the onuid %d (SN: %s) in CtagMap %v!\n", onuid, sn, s.CtagMap)
+ logger.WithFields(log.Fields{
+ "serial_number": utils.OnuToSn(onu),
+ "gemId": gemid,
+ "interfaceId": intfid,
+ "onuId": onuid,
+ "sn": sn,
+ "cTagMap": s.CtagMap,
+ }).Error("Could not find onuid in CtagMap", onuid, sn, s.CtagMap)
}
} else {
continue
}
- logger.Debug("sendPktInd intfid:%d (onuid: %d) gemid:%d\n", intfid, onuid, gemid)
+ logger.WithFields(log.Fields{
+ "serial_number": utils.OnuToSn(onu),
+ "gemId": gemid,
+ "interfaceId": intfid,
+ "onuId": onuid,
+ }).Debug("sendPktInd")
data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "pon", IntfId: intfid, GemportId: gemid, Pkt: pkt.Data()}}
if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
- logger.Error("Fail to send PktInd indication. %v\n", err)
+ logger.Error("Fail to send PktInd indication.", err)
return err
}
@@ -397,14 +423,19 @@
logger.Info("WARNING: This packet does not come from NNI ")
continue
}
+ onuid := nnipkt.Info.onuid
+ onu, _ := getOnuByID(s.Onumap, onuid)
logger.Debug("Received packet in grpc Server from NNI.")
intfid := nnipkt.Info.intfid
pkt := nnipkt.Pkt
- logger.Info("sendPktInd intfid:%d\n", intfid)
+ logger.WithFields(log.Fields{
+ "interfaceId": intfid,
+ "serial_number": utils.OnuToSn(onu),
+ }).Info("sendPktInd")
data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "nni", IntfId: intfid, Pkt: pkt.Data()}}
if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
- logger.Error("Fail to send PktInd indication. %v\n", err)
+ logger.Error("Fail to send PktInd indication.", err)
return err
}
@@ -427,10 +458,10 @@
ethtype := pkt.EthernetType
if ethtype == 0x888e {
logger.Debug("Received downstream packet is EAPOL.")
- //log.Println(rawpkt.Dump())
+ //logger.Println(rawpkt.Dump())
} else if layerDHCP := rawpkt.Layer(layers.LayerTypeDHCPv4); layerDHCP != nil {
logger.Debug("Received downstream packet is DHCP.")
- //log.Println(rawpkt.Dump())
+ //logger.Println(rawpkt.Dump())
rawpkt, _, _ = PopVLAN(rawpkt)
rawpkt, _, _ = PopVLAN(rawpkt)
} else {
diff --git a/core/grpc_service.go b/core/grpc_service.go
index e1866b8..75d8845 100644
--- a/core/grpc_service.go
+++ b/core/grpc_service.go
@@ -17,15 +17,15 @@
package core
import (
- "gerrit.opencord.org/voltha-bbsim/common"
+ "net"
+
+ "gerrit.opencord.org/voltha-bbsim/common/logger"
"gerrit.opencord.org/voltha-bbsim/device"
"gerrit.opencord.org/voltha-bbsim/protos"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"golang.org/x/net/context"
"google.golang.org/grpc"
- "log"
- "net"
)
// gRPC Service
@@ -117,7 +117,7 @@
if result == true {
matched, error := getOnuBySN(s.Onumap, onu.SerialNumber)
if error != nil {
- log.Fatalf("%s\n", error)
+ logger.Fatal("%s\n", error)
}
onuid := onu.OnuId
matched.OnuID = onuid
diff --git a/core/io_info.go b/core/io_info.go
index 8f65280..867c88c 100644
--- a/core/io_info.go
+++ b/core/io_info.go
@@ -18,9 +18,11 @@
import (
"errors"
- "gerrit.opencord.org/voltha-bbsim/common"
- "github.com/google/gopacket/pcap"
"os/exec"
+
+ "gerrit.opencord.org/voltha-bbsim/common/logger"
+ "github.com/google/gopacket/pcap"
+ log "github.com/sirupsen/logrus"
)
type Ioinfo struct {
@@ -39,7 +41,7 @@
}
}
err := errors.New("No matched Ioinfo is found")
- logger.Error("%s", err)
+ logger.Error(err)
return nil, err
}
@@ -50,7 +52,7 @@
}
}
err := errors.New("No matched Ioinfo is found")
- logger.Error("%s", err)
+ logger.Error(err)
return nil, err
}
@@ -63,39 +65,42 @@
}
if len(ioinfos) == 0 {
err := errors.New("No matched Ioinfo is found")
- logger.Error("%s", err)
+ logger.Error(err)
return nil, err
}
return ioinfos, nil
}
-func CreateVethPairs(name1 string, name2 string) (err error) {
- err = exec.Command("ip", "link", "add", name1, "type", "veth", "peer", "name", name2).Run()
+func CreateVethPairs(veth1 string, veth2 string) (err error) {
+ err = exec.Command("ip", "link", "add", veth1, "type", "veth", "peer", "name", veth2).Run()
if err != nil {
- logger.Error("Fail to createVeth() for %s and %s veth creation error: %s\n", name1, name2, err.Error())
+ logger.WithFields(log.Fields{
+ "veth1": veth1,
+ "veth2": veth2,
+ }).Error("Fail to createVethPair()", err.Error())
return
}
- logger.Info("%s & %s was created.", name1, name2)
- err = exec.Command("ip", "link", "set", name1, "up").Run()
+ logger.Info("%s & %s was created.", veth1, veth2)
+ err = exec.Command("ip", "link", "set", veth1, "up").Run()
if err != nil {
logger.Error("Fail to createVeth() veth1 up", err)
return
}
- err = exec.Command("ip", "link", "set", name2, "up").Run()
+ err = exec.Command("ip", "link", "set", veth2, "up").Run()
if err != nil {
logger.Error("Fail to createVeth() veth2 up", err)
return
}
- logger.Info("%s & %s was up.", name1, name2)
+ logger.Info("%s & %s was up.", veth1, veth2)
return
}
func RemoveVeth(name string) error {
err := exec.Command("ip", "link", "del", name).Run()
if err != nil {
- logger.Error("Fail to removeVeth()", err)
+ logger.WithField("veth", name).Error("Fail to removeVeth()", err)
}
- logger.Info("%s was removed.", name)
+ logger.WithField("veth", name).Info("Veth was removed.")
return err
}
@@ -103,6 +108,6 @@
for _, name := range names {
RemoveVeth(name)
}
- logger.Info("RemoveVeths() :%s\n", names)
+ logger.WithField("veths", names).Info("RemoveVeths(): ")
return
}
diff --git a/core/io_worker.go b/core/io_worker.go
index 41aec4b..9a6cb6c 100644
--- a/core/io_worker.go
+++ b/core/io_worker.go
@@ -18,13 +18,14 @@
import (
"errors"
- "gerrit.opencord.org/voltha-bbsim/common"
- "github.com/google/gopacket"
- "github.com/google/gopacket/layers"
- "github.com/google/gopacket/pcap"
"net"
"strconv"
"time"
+
+ "gerrit.opencord.org/voltha-bbsim/common/logger"
+ "github.com/google/gopacket"
+ "github.com/google/gopacket/layers"
+ "github.com/google/gopacket/pcap"
)
func RecvWorker(io *Ioinfo, handler *pcap.Handle, r chan Packet) {
@@ -32,7 +33,7 @@
packetSource := gopacket.NewPacketSource(handler, handler.LinkType())
for packet := range packetSource.Packets() {
logger.Debug("recv packet from IF: %v \n", *handler)
- //log.Println(packet.Dump())
+ //logger.Println(packet.Dump())
pkt := Packet{}
pkt.Info = io
pkt.Pkt = packet
@@ -46,7 +47,7 @@
logger.Error("Error in send packet to UNI-IF: %v e:%s\n", *handle, err)
}
logger.Debug("Successfully send packet to UNI-IF: %v \n", *handle)
- //log.Println(packet.Dump())
+ //logger.Println(packet.Dump())
}
func SendNni(handle *pcap.Handle, packet gopacket.Packet) {
@@ -55,7 +56,7 @@
logger.Error("Error in send packet to NNI e:%s\n", err)
}
logger.Debug("send packet to NNI-IF: %v \n", *handle)
- //log.Println(packet.Dump())
+ //logger.Println(packet.Dump())
}
func PopVLAN(pkt gopacket.Packet) (gopacket.Packet, uint16, error) {
diff --git a/core/mediator.go b/core/mediator.go
index 496621a..a6d67b0 100644
--- a/core/mediator.go
+++ b/core/mediator.go
@@ -17,34 +17,37 @@
package core
import (
- "sync"
- "gerrit.opencord.org/voltha-bbsim/common"
+ "flag"
+ "fmt"
"os"
"os/signal"
- "fmt"
- "flag"
- "strings"
"strconv"
+ "strings"
+ "sync"
+
+ "gerrit.opencord.org/voltha-bbsim/common/logger"
+ log "github.com/sirupsen/logrus"
)
-type option struct{
- address string
- port uint32
- oltid uint32
- npon uint32
- nonus uint32
- aaawait int
- dhcpwait int
- dhcpservip string
- intvl int
- intvl_test int
- Mode Mode
+type option struct {
+ address string
+ port uint32
+ oltid uint32
+ npon uint32
+ nonus uint32
+ aaawait int
+ dhcpwait int
+ dhcpservip string
+ intvl int
+ intvl_test int
+ Mode Mode
+ KafkaBroker string
}
func GetOptions() *option {
o := new(option)
addressport := flag.String("H", ":50060", "IP address:port")
- oltid :=flag.Int("id", 0, "OLT-ID")
+ oltid := flag.Int("id", 0, "OLT-ID")
npon := flag.Int("i", 1, "Number of PON-IF ports")
nonus := flag.Int("n", 1, "Number of ONUs per PON-IF port")
modeopt := flag.String("m", "default", "Emulation mode (default, aaa, both (aaa & dhcp))")
@@ -53,6 +56,7 @@
dhcpservip := flag.String("s", "182.21.0.1", "DHCP Server IP Address")
intvl := flag.Int("v", 1, "Interval each Indication")
intvl_test := flag.Int("V", 1, "Interval each Indication")
+ kafkaBroker := flag.String("k", "", "Kafka broker")
o.Mode = DEFAULT
flag.Parse()
if *modeopt == "aaa" {
@@ -68,27 +72,28 @@
o.dhcpservip = *dhcpservip
o.intvl = *intvl
o.intvl_test = *intvl_test
+ o.KafkaBroker = *kafkaBroker
o.address = (strings.Split(*addressport, ":")[0])
tmp, _ := strconv.Atoi(strings.Split(*addressport, ":")[1])
o.port = uint32(tmp)
return o
}
-type stateMachine struct{
+type stateMachine struct {
handlers []*handler
- state coreState
+ state coreState
}
-type handler struct{
- dst coreState
- src coreState
+type handler struct {
+ dst coreState
+ src coreState
method func(s *Server) error
}
func (sm *stateMachine) transit(next coreState) func(s *Server) error {
- for _, handler := range sm.handlers{
+ for _, handler := range sm.handlers {
if handler.src == sm.state && handler.dst == next {
- logger.Debug("Hit (src:%d, dst:%d)",handler.src, handler.dst)
+ logger.Debug("Hit (src:%d, dst:%d)", handler.src, handler.dst)
sm.state = next
return handler.method
}
@@ -98,27 +103,33 @@
}
type mediator struct {
- opt *option
- sm *stateMachine
+ opt *option
+ sm *stateMachine
server *Server
tester *Tester
}
-func NewMediator(o *option) *mediator{
+func NewMediator(o *option) *mediator {
m := new(mediator)
m.opt = o
- logger.Debug("ip:%s, baseport:%d, npon:%d, nonus:%d, mode:%d\n", o.address, o.port, o.npon, o.nonus, o.Mode)
+ logger.WithFields(log.Fields{
+ "ip": o.address,
+ "baseport": o.port,
+ "pon_ports": o.npon,
+ "onus": o.nonus,
+ "mode": o.Mode,
+ }).Debug("New mediator")
return m
}
-func (m *mediator) Start(){
+func (m *mediator) Start() {
var wg sync.WaitGroup
opt := m.opt
server := NewCore(opt)
wg.Add(1)
- go func(){
- if err:= server.Start(); err != nil { //Blocking
- logger.Error("%s", err)
+ go func() {
+ if err := server.Start(); err != nil { //Blocking
+ logger.Error(err)
}
wg.Done()
return
@@ -130,18 +141,18 @@
m.sm = &stateMachine{
state: INACTIVE,
handlers: []*handler{
- &handler{src: PRE_ACTIVE,dst: ACTIVE, method: m.tester.Start},
- &handler{src: ACTIVE,dst: PRE_ACTIVE, method: m.tester.Stop},
+ &handler{src: PRE_ACTIVE, dst: ACTIVE, method: m.tester.Start},
+ &handler{src: ACTIVE, dst: PRE_ACTIVE, method: m.tester.Stop},
},
}
- go func(){
+ go func() {
m.Mediate()
}()
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
- defer func(){
+ defer func() {
logger.Debug("SIGINT catcher Done")
wg.Done()
}()
@@ -149,8 +160,8 @@
wg.Add(1)
fmt.Println("SIGINT", sig)
close(c)
- server.Stop() //Non-blocking
- tester.Stop(server) //Non-blocking
+ server.Stop() //Non-blocking
+ tester.Stop(server) //Non-blocking
return
}
}()
@@ -158,17 +169,17 @@
logger.Debug("Reach to the end line")
}
-func (m *mediator) Mediate(){
+func (m *mediator) Mediate() {
wg := sync.WaitGroup{}
defer logger.Debug("Mediate Done")
- for corestat := range m.server.stateChan {
+ for corestat := range m.server.stateChan {
logger.Debug("Mediator receives state %d of server", corestat)
method := m.sm.transit(corestat)
if method != nil {
wg.Add(1)
defer wg.Done()
go func() error {
- if err := method(m.server); err != nil{ //blocking
+ if err := method(m.server); err != nil { //blocking
m.server.Stop()
return err
}
@@ -177,4 +188,4 @@
}
}
wg.Wait()
-}
\ No newline at end of file
+}
diff --git a/core/openolt_service.go b/core/openolt_service.go
index 97047bf..41e6548 100644
--- a/core/openolt_service.go
+++ b/core/openolt_service.go
@@ -17,10 +17,13 @@
package core
import (
- "gerrit.opencord.org/voltha-bbsim/common"
+ "time"
+
+ "gerrit.opencord.org/voltha-bbsim/common/logger"
+ "gerrit.opencord.org/voltha-bbsim/common/utils"
"gerrit.opencord.org/voltha-bbsim/device"
"gerrit.opencord.org/voltha-bbsim/protos"
- "time"
+ log "github.com/sirupsen/logrus"
)
func sendOltIndUp(stream openolt.Openolt_EnableIndicationServer, olt *device.Olt) error {
@@ -76,7 +79,12 @@
logger.Error("Failed to send ONUDiscInd [id: %d]: %v\n", i, err)
return err
}
- logger.Info("sendONUDiscInd Onuid: %d\n", i)
+ logger.WithFields(log.Fields{
+ "serial_number": utils.OnuToSn(onu),
+ "interfaceId": onu.IntfID,
+ "onuId": onu.OnuID,
+ "oltId": onu.OltID,
+ }).Info("sendONUDiscInd Onuid")
}
return nil
}
@@ -89,7 +97,12 @@
logger.Error("Failed to send ONUInd [id: %d]: %v\n", i, err)
return err
}
- logger.Info("sendONUInd Onuid: %d\n", i)
+ logger.WithFields(log.Fields{
+ "serial_number": utils.OnuToSn(onu),
+ "interfaceId": onu.IntfID,
+ "onuId": onu.OnuID,
+ "oltId": onu.OltID,
+ }).Info("sendONUInd Onuid")
}
return nil
}
diff --git a/core/tester.go b/core/tester.go
index ba4792b..cd4f96a 100644
--- a/core/tester.go
+++ b/core/tester.go
@@ -18,12 +18,13 @@
import (
"context"
- "gerrit.opencord.org/voltha-bbsim/common"
- "golang.org/x/sync/errgroup"
- "log"
"os/exec"
- "time"
"sync"
+ "time"
+
+ "gerrit.opencord.org/voltha-bbsim/common/logger"
+ log "github.com/sirupsen/logrus"
+ "golang.org/x/sync/errgroup"
)
const (
@@ -44,7 +45,7 @@
cancel context.CancelFunc
}
-func NewTester(opt *option) *Tester{
+func NewTester(opt *option) *Tester {
t := new(Tester)
t.AAAWait = opt.aaawait
t.DhcpWait = opt.dhcpwait
@@ -55,11 +56,11 @@
}
//Blocking
-func (t *Tester) Start (s *Server) error {
+func (t *Tester) Start(s *Server) error {
ctx := context.Background()
ctx, cancel := context.WithCancel(ctx)
t.cancel = cancel
- defer func(){
+ defer func() {
cancel()
t.Initialize()
logger.Debug("Tester Done")
@@ -106,16 +107,15 @@
return nil
}
-func (t *Tester) Initialize (){
+func (t *Tester) Initialize() {
logger.Info("Tester Initialize () called")
processes := t.Processes
logger.Debug("Runnig Process: %s", processes)
KillProcesses(processes)
- exec.Command("rm", "/var/run/dhcpd.pid").Run() //This is for DHCP server activation
- exec.Command("touch", "/var/run/dhcpd.pid").Run() //This is for DHCP server activation
+ exec.Command("rm", "/var/run/dhcpd.pid").Run() //This is for DHCP server activation
+ exec.Command("touch", "/var/run/dhcpd.pid").Run() //This is for DHCP server activation
}
-
func (t *Tester) exeAAATest(ctx context.Context, s *Server, wait int) error {
tick := time.NewTicker(time.Second)
defer tick.Stop()
@@ -130,17 +130,17 @@
univeths = append(univeths, info.Name)
}
- for sec := 1; sec <= wait; sec ++ {
+ for sec := 1; sec <= wait; sec++ {
select {
case <-ctx.Done():
logger.Debug("exeAAATest thread receives close ")
return nil
case <-tick.C:
- logger.Info("exeAAATest stands by ... %dsec\n", wait - sec)
+ logger.WithField("seconds", wait-sec).Info("exeAAATest stands by ...")
if sec == wait {
wg := sync.WaitGroup{}
wg.Add(1)
- go func() error{
+ go func() error {
defer wg.Done()
err = activateWPASups(ctx, univeths, t.Intvl)
if err != nil {
@@ -184,25 +184,27 @@
univeths = append(univeths, info.Name)
}
- for sec := 1; sec <= wait; sec ++ {
+ for sec := 1; sec <= wait; sec++ {
select {
- case <- ctx.Done():
+ case <-ctx.Done():
logger.Debug("exeDHCPTest thread receives close ")
return nil
- case <- tick.C:
- logger.Info("exeDHCPTest stands by ... %dsec\n", wait- sec)
+ case <-tick.C:
+ logger.WithField("seconds", wait-sec).Info("exeDHCPTest stands by ...")
if sec == wait {
wg := sync.WaitGroup{}
wg.Add(1)
- go func() error{
+ go func() error {
defer wg.Done()
err = activateDHCPClients(ctx, univeths, t.Intvl)
if err != nil {
return err
}
- logger.Info("DHCP clients are successfully activated ")
+ logger.WithFields(log.Fields{
+ "univeths": univeths,
+ }).Info("DHCP clients are successfully activated")
t.Processes = append(t.Processes, "dhclient")
- logger.Debug("Running Process:%s", t.Processes)
+ logger.Debug("Running Process: ", t.Processes)
return nil
}()
wg.Wait()
@@ -224,17 +226,17 @@
defer tick.Stop()
i := 0
for {
- select{
- case <- tick.C:
+ select {
+ case <-tick.C:
if i < len(vethnames) {
vethname := vethnames[i]
if err := activateWPASupplicant(vethname); err != nil {
return err
}
logger.Debug("activateWPASupplicant for interface %v\n", vethname)
- i ++
+ i++
}
- case <- ctx.Done():
+ case <-ctx.Done():
logger.Debug("activateWPASups was canceled by context.")
return nil
}
@@ -247,17 +249,19 @@
defer tick.Stop()
i := 0
for {
- select{
- case <- tick.C:
- if i < len(vethnames){
+ select {
+ case <-tick.C:
+ if i < len(vethnames) {
vethname := vethnames[i]
if err := activateDHCPClient(vethname); err != nil {
return err
}
- logger.Debug("activateDHCPClient for interface %v\n", vethname)
- i ++
+ logger.WithFields(log.Fields{
+ "interface": vethname,
+ }).Debug("activateDHCPClient")
+ i++
}
- case <- ctx.Done():
+ case <-ctx.Done():
logger.Debug("activateDHCPClients was canceled by context.")
return nil
}
@@ -293,7 +297,7 @@
// if err := cmd.Run(); err != nil {
if err := cmd.Start(); err != nil {
logger.Error("Fail to activateDHCPClient() for: %s", vethname)
- log.Panic(err)
+ logger.Panic(err)
}
logger.Debug("activateDHCPClient() done for: %s\n", vethname)
return
diff --git a/device/device_onu.go b/device/device_onu.go
index de99eb3..24f2cbf 100644
--- a/device/device_onu.go
+++ b/device/device_onu.go
@@ -17,10 +17,12 @@
package device
import (
- "gerrit.opencord.org/voltha-bbsim/common"
- "gerrit.opencord.org/voltha-bbsim/protos"
"reflect"
"sync"
+
+ "gerrit.opencord.org/voltha-bbsim/common/logger"
+ "gerrit.opencord.org/voltha-bbsim/protos"
+ log "github.com/sirupsen/logrus"
)
type onuState int
@@ -32,6 +34,7 @@
type Onu struct {
InternalState *onuState
+ OltID uint32
IntfID uint32
OperState string
SerialNumber *openolt.SerialNumber
@@ -52,6 +55,7 @@
*onu.InternalState = ONU_PRE_ACTIVATED
onu.mu = &sync.Mutex{}
onu.IntfID = intfid
+ onu.OltID = oltid
onu.OperState = "up"
onu.SerialNumber = new(openolt.SerialNumber)
onu.SerialNumber.VendorId = []byte("BBSM")
@@ -82,9 +86,12 @@
}
func UpdateOnusOpStatus(ponif uint32, onus []*Onu, opstatus string) {
- for i, onu := range onus {
+ for _, onu := range onus {
onu.OperState = "up"
- logger.Info("(PONIF:%d) ONU [%d] discovered.\n", ponif, i)
+ logger.WithFields(log.Fields{
+ "onu": onu.SerialNumber,
+ "pon_interface": ponif,
+ }).Info("ONU discovered.")
}
}