SEBA-909 store flows and implement GetFlow API
Change-Id: If2c214f8be5808ef89e0521f75d03da49153dc2a
diff --git a/internal/bbsim/api/onus_handler.go b/internal/bbsim/api/onus_handler.go
index b9c697c..975b426 100644
--- a/internal/bbsim/api/onus_handler.go
+++ b/internal/bbsim/api/onus_handler.go
@@ -295,6 +295,38 @@
return res, nil
}
+// GetFlows for OLT/ONUs
+func (s BBSimServer) GetFlows(ctx context.Context, req *bbsim.ONURequest) (*bbsim.Flows, error) {
+ logger.WithFields(log.Fields{
+ "OnuSn": req.SerialNumber,
+ }).Info("Received GetFlows request")
+
+ olt := devices.GetOLT()
+ res := &bbsim.Flows{}
+
+ if req.SerialNumber == "" {
+ for flowKey := range olt.Flows {
+ flow := olt.Flows[flowKey]
+ res.Flows = append(res.Flows, &flow)
+ }
+ res.FlowCount = uint32(len(olt.Flows))
+ } else {
+ onu, err := olt.FindOnuBySn(req.SerialNumber)
+ if err != nil {
+ logger.WithFields(log.Fields{
+ "OnuSn": req.SerialNumber,
+ }).Error("Can't get ONU in GetFlows request")
+ return nil, err
+ }
+ for _, flowKey := range onu.Flows {
+ flow := olt.Flows[flowKey]
+ res.Flows = append(res.Flows, &flow)
+ }
+ res.FlowCount = uint32(len(onu.Flows))
+ }
+ return res, nil
+}
+
func (s BBSimServer) GetOnuTrafficSchedulers(ctx context.Context, req *bbsim.ONURequest) (*bbsim.ONUTrafficSchedulers, error) {
olt := devices.GetOLT()
ts := bbsim.ONUTrafficSchedulers{}
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index 955d2ad..d5ba84f 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -36,7 +36,9 @@
tech_profile "github.com/opencord/voltha-protos/v2/go/tech_profile"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
"google.golang.org/grpc/reflection"
+ "google.golang.org/grpc/status"
)
var oltLogger = log.WithFields(log.Fields{
@@ -56,6 +58,7 @@
channel chan Message
nniPktInChannel chan *bbsim.PacketMsg // packets coming in from the NNI and going to VOLTHA
nniHandle *pcap.Handle // handle on the NNI interface, close it when shutting down the NNI channel
+ Flows map[FlowKey]openolt.Flow
Delay int
ControlledActivation mode
@@ -99,6 +102,7 @@
Pons: []*PonPort{},
Nnis: []*NniPort{},
Delay: delay,
+ Flows: make(map[FlowKey]openolt.Flow),
enablePerf: enablePerf,
}
@@ -859,8 +863,13 @@
"FlowId": flow.FlowId,
"UniID": flow.UniId,
"PortNo": flow.PortNo,
- }).Tracef("OLT receives Flow")
- // TODO optionally store flows somewhere
+ }).Tracef("OLT receives FlowAdd")
+
+ flowKey := FlowKey{}
+ if !o.enablePerf {
+ flowKey = FlowKey{ID: flow.FlowId, Direction: flow.FlowType}
+ olt.Flows[flowKey] = *flow
+ }
if flow.AccessIntfId == -1 {
oltLogger.WithFields(log.Fields{
@@ -883,6 +892,9 @@
"err": err,
}).Error("Can't find Onu")
}
+ if !o.enablePerf {
+ onu.Flows = append(onu.Flows, flowKey)
+ }
msg := Message{
Type: FlowUpdate,
@@ -898,9 +910,44 @@
return new(openolt.Empty), nil
}
-func (o OltDevice) FlowRemove(context.Context, *openolt.Flow) (*openolt.Empty, error) {
- oltLogger.Tracef("received FlowRemove")
- // TODO store flows somewhere
+// FlowRemove request from VOLTHA
+func (o OltDevice) FlowRemove(_ context.Context, flow *openolt.Flow) (*openolt.Empty, error) {
+ oltLogger.WithFields(log.Fields{
+ "FlowId": flow.FlowId,
+ "FlowType": flow.FlowType,
+ }).Tracef("OLT receives FlowRemove")
+
+ if !o.enablePerf { // remove only if flow were stored
+ flowKey := FlowKey{
+ ID: flow.FlowId,
+ Direction: flow.FlowType,
+ }
+
+ // Check if flow exists
+ storedFlow, ok := o.Flows[flowKey]
+ if !ok {
+ oltLogger.Errorf("Flow %v not found", flow)
+ return new(openolt.Empty), status.Errorf(codes.NotFound, "Flow not found")
+ }
+
+ // if its ONU flow remove it from ONU also
+ if storedFlow.AccessIntfId != -1 {
+ pon := o.Pons[uint32(storedFlow.AccessIntfId)]
+ onu, err := pon.GetOnuById(uint32(storedFlow.OnuId))
+ if err != nil {
+ oltLogger.WithFields(log.Fields{
+ "OnuId": storedFlow.OnuId,
+ "IntfId": storedFlow.AccessIntfId,
+ "err": err,
+ }).Error("ONU not found")
+ return new(openolt.Empty), nil
+ }
+ onu.DeleteFlow(flowKey)
+ }
+
+ // delete from olt flows
+ delete(o.Flows, flowKey)
+ }
return new(openolt.Empty), nil
}
diff --git a/internal/bbsim/devices/onu.go b/internal/bbsim/devices/onu.go
index c0bdb46..1899f4c 100644
--- a/internal/bbsim/devices/onu.go
+++ b/internal/bbsim/devices/onu.go
@@ -43,6 +43,11 @@
"module": "ONU",
})
+type FlowKey struct {
+ ID uint32
+ Direction string
+}
+
type Onu struct {
ID uint32
PonPortID uint32
@@ -62,6 +67,7 @@
// FIXME add support for multiple UNIs
PortNo uint32
DhcpFlowReceived bool
+ Flows []FlowKey
OperState *fsm.FSM
SerialNumber *openolt.SerialNumber
@@ -100,6 +106,7 @@
DoneChannel: make(chan bool, 1),
DhcpFlowReceived: false,
DiscoveryRetryDelay: 60 * time.Second, // this is used to send OnuDiscoveryIndications until an activate call is received
+ Flows: []FlowKey{},
DiscoveryDelay: delay,
}
o.SerialNumber = o.NewSN(olt.ID, pon.ID, id)
@@ -897,3 +904,17 @@
"SerialNumber": common.OnuSnToString(o.SerialNumber),
}).Info("Sent DHCP Flow")
}
+
+// DeleteFlow method search and delete flowKey from the onu flows slice
+func (onu *Onu) DeleteFlow(key FlowKey) {
+ for pos, flowKey := range onu.Flows {
+ if flowKey == key {
+ // delete the flowKey by shifting all flowKeys by one
+ onu.Flows = append(onu.Flows[:pos], onu.Flows[pos+1:]...)
+ t := make([]FlowKey, len(onu.Flows))
+ copy(t, onu.Flows)
+ onu.Flows = t
+ break
+ }
+ }
+}
diff --git a/internal/bbsimctl/commands/olt.go b/internal/bbsimctl/commands/olt.go
index d77cf77..f1553db 100644
--- a/internal/bbsimctl/commands/olt.go
+++ b/internal/bbsimctl/commands/olt.go
@@ -20,13 +20,16 @@
import (
"context"
"fmt"
+ "os"
+ "strconv"
+
"github.com/jessevdk/go-flags"
+ "github.com/olekukonko/tablewriter"
pb "github.com/opencord/bbsim/api/bbsim"
"github.com/opencord/bbsim/internal/bbsimctl/config"
"github.com/opencord/cordctl/pkg/format"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
- "os"
)
const (
@@ -46,6 +49,8 @@
type OltReboot struct{}
+type OltFlows struct{}
+
type oltOptions struct {
Get OltGet `command:"get"`
NNI OltNNIs `command:"nnis"`
@@ -54,6 +59,7 @@
Poweron OltPoweron `command:"poweron"`
Reboot OltReboot `command:"reboot"`
Alarms OltAlarmOptions `command:"alarms"`
+ Flows OltFlows `command:"flows"`
}
func RegisterOltCommands(parser *flags.Parser) {
@@ -171,3 +177,69 @@
fmt.Println(fmt.Sprintf("[Status: %d] %s", res.StatusCode, res.Message))
return nil
}
+
+func (o *OltFlows) Execute(args []string) error {
+ client, conn := connect()
+ defer conn.Close()
+
+ ctx, cancel := context.WithTimeout(context.Background(), config.GlobalConfig.Grpc.Timeout)
+ defer cancel()
+
+ req := pb.ONURequest{}
+ res, err := client.GetFlows(ctx, &req)
+ if err != nil {
+ log.Errorf("Cannot get flows for OLT: %v", err)
+ return err
+ }
+
+ if res.Flows == nil {
+ fmt.Println("OLT has no flows")
+ return nil
+ }
+
+ flowHeader := []string{
+ "access_intf_id",
+ "onu_id",
+ "uni_id",
+ "flow_id",
+ "flow_type",
+ "eth_type",
+ "alloc_id",
+ "network_intf_id",
+ "gemport_id",
+ "classifier",
+ "action",
+ "priority",
+ "cookie",
+ "port_no",
+ }
+
+ tableFlow := tablewriter.NewWriter(os.Stdout)
+ tableFlow.SetRowLine(true)
+ fmt.Fprintf(os.Stdout, "OLT Flows:\n")
+ tableFlow.SetHeader(flowHeader)
+
+ for _, flow := range res.Flows {
+ flowInfo := []string{}
+ flowInfo = append(flowInfo,
+ strconv.Itoa(int(flow.AccessIntfId)),
+ strconv.Itoa(int(flow.OnuId)),
+ strconv.Itoa(int(flow.UniId)),
+ strconv.Itoa(int(flow.FlowId)),
+ flow.FlowType,
+ fmt.Sprintf("%x", flow.Classifier.EthType),
+ strconv.Itoa(int(flow.AllocId)),
+ strconv.Itoa(int(flow.NetworkIntfId)),
+ strconv.Itoa(int(flow.GemportId)),
+ flow.Classifier.String(),
+ flow.Action.String(),
+ strconv.Itoa(int(flow.Priority)),
+ strconv.Itoa(int(flow.Cookie)),
+ strconv.Itoa(int(flow.PortNo)),
+ )
+ tableFlow.Append(flowInfo)
+ }
+ tableFlow.Render()
+ tableFlow.SetNewLine("")
+ return nil
+}
diff --git a/internal/bbsimctl/commands/onu.go b/internal/bbsimctl/commands/onu.go
index 2815501..c331502 100644
--- a/internal/bbsimctl/commands/onu.go
+++ b/internal/bbsimctl/commands/onu.go
@@ -83,6 +83,18 @@
} `positional-args:"yes" required:"yes"`
}
+type ONUTrafficSchedulers struct {
+ Args struct {
+ OnuSn OnuSnString
+ } `positional-args:"yes" required:"yes"`
+}
+
+type ONUFlows struct {
+ Args struct {
+ OnuSn OnuSnString
+ } `positional-args:"yes" required:"yes"`
+}
+
type ONUOptions struct {
List ONUList `command:"list"`
Get ONUGet `command:"get"`
@@ -93,12 +105,7 @@
Igmp ONUIgmp `command:"igmp"`
TrafficSchedulers ONUTrafficSchedulers `command:"traffic_schedulers"`
Alarms AlarmOptions `command:"alarms"`
-}
-
-type ONUTrafficSchedulers struct {
- Args struct {
- OnuSn OnuSnString
- } `positional-args:"yes" required:"yes"`
+ Flows ONUFlows `command:"flows"`
}
func RegisterONUCommands(parser *flags.Parser) {
@@ -302,6 +309,73 @@
return nil
}
+func (options *ONUFlows) Execute(args []string) error {
+ client, conn := connect()
+ defer conn.Close()
+
+ ctx, cancel := context.WithTimeout(context.Background(), config.GlobalConfig.Grpc.Timeout)
+ defer cancel()
+ req := pb.ONURequest{
+ SerialNumber: string(options.Args.OnuSn),
+ }
+ res, err := client.GetFlows(ctx, &req)
+ if err != nil {
+ log.Errorf("Cannot get flows for ONU %s: %v", options.Args.OnuSn, err)
+ return err
+ }
+
+ if res.Flows == nil {
+ fmt.Println(fmt.Sprintf("ONU %s has no flows", options.Args.OnuSn))
+ return nil
+ }
+
+ flowHeader := []string{
+ "access_intf_id",
+ "onu_id",
+ "uni_id",
+ "flow_id",
+ "flow_type",
+ "eth_type",
+ "alloc_id",
+ "network_intf_id",
+ "gemport_id",
+ "classifier",
+ "action",
+ "priority",
+ "cookie",
+ "port_no",
+ }
+
+ tableFlow := tablewriter.NewWriter(os.Stdout)
+ tableFlow.SetRowLine(true)
+ fmt.Fprintf(os.Stdout, "ONU Flows:\n")
+ tableFlow.SetHeader(flowHeader)
+
+ for _, flow := range res.Flows {
+ flowInfo := []string{}
+ flowInfo = append(flowInfo,
+ strconv.Itoa(int(flow.AccessIntfId)),
+ strconv.Itoa(int(flow.OnuId)),
+ strconv.Itoa(int(flow.UniId)),
+ strconv.Itoa(int(flow.FlowId)),
+ flow.FlowType,
+ fmt.Sprintf("%x", flow.Classifier.EthType),
+ strconv.Itoa(int(flow.AllocId)),
+ strconv.Itoa(int(flow.NetworkIntfId)),
+ strconv.Itoa(int(flow.GemportId)),
+ flow.Classifier.String(),
+ flow.Action.String(),
+ strconv.Itoa(int(flow.Priority)),
+ strconv.Itoa(int(flow.Cookie)),
+ strconv.Itoa(int(flow.PortNo)),
+ )
+ tableFlow.Append(flowInfo)
+ }
+ tableFlow.Render()
+ tableFlow.SetNewLine("")
+ return nil
+}
+
func (onuSn *OnuSnString) Complete(match string) []flags.Completion {
client, conn := connect()
defer conn.Close()