SEBA-909 store flows and implement GetFlow API

Change-Id: If2c214f8be5808ef89e0521f75d03da49153dc2a
diff --git a/internal/bbsim/api/onus_handler.go b/internal/bbsim/api/onus_handler.go
index b9c697c..975b426 100644
--- a/internal/bbsim/api/onus_handler.go
+++ b/internal/bbsim/api/onus_handler.go
@@ -295,6 +295,38 @@
 	return res, nil
 }
 
+// GetFlows for OLT/ONUs
+func (s BBSimServer) GetFlows(ctx context.Context, req *bbsim.ONURequest) (*bbsim.Flows, error) {
+	logger.WithFields(log.Fields{
+		"OnuSn": req.SerialNumber,
+	}).Info("Received GetFlows request")
+
+	olt := devices.GetOLT()
+	res := &bbsim.Flows{}
+
+	if req.SerialNumber == "" {
+		for flowKey := range olt.Flows {
+			flow := olt.Flows[flowKey]
+			res.Flows = append(res.Flows, &flow)
+		}
+		res.FlowCount = uint32(len(olt.Flows))
+	} else {
+		onu, err := olt.FindOnuBySn(req.SerialNumber)
+		if err != nil {
+			logger.WithFields(log.Fields{
+				"OnuSn": req.SerialNumber,
+			}).Error("Can't get ONU in GetFlows request")
+			return nil, err
+		}
+		for _, flowKey := range onu.Flows {
+			flow := olt.Flows[flowKey]
+			res.Flows = append(res.Flows, &flow)
+		}
+		res.FlowCount = uint32(len(onu.Flows))
+	}
+	return res, nil
+}
+
 func (s BBSimServer) GetOnuTrafficSchedulers(ctx context.Context, req *bbsim.ONURequest) (*bbsim.ONUTrafficSchedulers, error) {
 	olt := devices.GetOLT()
 	ts := bbsim.ONUTrafficSchedulers{}
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index 955d2ad..d5ba84f 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -36,7 +36,9 @@
 	tech_profile "github.com/opencord/voltha-protos/v2/go/tech_profile"
 	log "github.com/sirupsen/logrus"
 	"google.golang.org/grpc"
+	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/reflection"
+	"google.golang.org/grpc/status"
 )
 
 var oltLogger = log.WithFields(log.Fields{
@@ -56,6 +58,7 @@
 	channel              chan Message
 	nniPktInChannel      chan *bbsim.PacketMsg // packets coming in from the NNI and going to VOLTHA
 	nniHandle            *pcap.Handle          // handle on the NNI interface, close it when shutting down the NNI channel
+	Flows                map[FlowKey]openolt.Flow
 	Delay                int
 	ControlledActivation mode
 
@@ -99,6 +102,7 @@
 		Pons:         []*PonPort{},
 		Nnis:         []*NniPort{},
 		Delay:        delay,
+		Flows:        make(map[FlowKey]openolt.Flow),
 		enablePerf:   enablePerf,
 	}
 
@@ -859,8 +863,13 @@
 		"FlowId":    flow.FlowId,
 		"UniID":     flow.UniId,
 		"PortNo":    flow.PortNo,
-	}).Tracef("OLT receives Flow")
-	// TODO optionally store flows somewhere
+	}).Tracef("OLT receives FlowAdd")
+
+	flowKey := FlowKey{}
+	if !o.enablePerf {
+		flowKey = FlowKey{ID: flow.FlowId, Direction: flow.FlowType}
+		olt.Flows[flowKey] = *flow
+	}
 
 	if flow.AccessIntfId == -1 {
 		oltLogger.WithFields(log.Fields{
@@ -883,6 +892,9 @@
 				"err":    err,
 			}).Error("Can't find Onu")
 		}
+		if !o.enablePerf {
+			onu.Flows = append(onu.Flows, flowKey)
+		}
 
 		msg := Message{
 			Type: FlowUpdate,
@@ -898,9 +910,44 @@
 	return new(openolt.Empty), nil
 }
 
-func (o OltDevice) FlowRemove(context.Context, *openolt.Flow) (*openolt.Empty, error) {
-	oltLogger.Tracef("received FlowRemove")
-	// TODO store flows somewhere
+// FlowRemove request from VOLTHA
+func (o OltDevice) FlowRemove(_ context.Context, flow *openolt.Flow) (*openolt.Empty, error) {
+	oltLogger.WithFields(log.Fields{
+		"FlowId":   flow.FlowId,
+		"FlowType": flow.FlowType,
+	}).Tracef("OLT receives FlowRemove")
+
+	if !o.enablePerf { // remove only if flow were stored
+		flowKey := FlowKey{
+			ID:        flow.FlowId,
+			Direction: flow.FlowType,
+		}
+
+		// Check if flow exists
+		storedFlow, ok := o.Flows[flowKey]
+		if !ok {
+			oltLogger.Errorf("Flow %v not found", flow)
+			return new(openolt.Empty), status.Errorf(codes.NotFound, "Flow not found")
+		}
+
+		// if its ONU flow remove it from ONU also
+		if storedFlow.AccessIntfId != -1 {
+			pon := o.Pons[uint32(storedFlow.AccessIntfId)]
+			onu, err := pon.GetOnuById(uint32(storedFlow.OnuId))
+			if err != nil {
+				oltLogger.WithFields(log.Fields{
+					"OnuId":  storedFlow.OnuId,
+					"IntfId": storedFlow.AccessIntfId,
+					"err":    err,
+				}).Error("ONU not found")
+				return new(openolt.Empty), nil
+			}
+			onu.DeleteFlow(flowKey)
+		}
+
+		// delete from olt flows
+		delete(o.Flows, flowKey)
+	}
 	return new(openolt.Empty), nil
 }
 
diff --git a/internal/bbsim/devices/onu.go b/internal/bbsim/devices/onu.go
index c0bdb46..1899f4c 100644
--- a/internal/bbsim/devices/onu.go
+++ b/internal/bbsim/devices/onu.go
@@ -43,6 +43,11 @@
 	"module": "ONU",
 })
 
+type FlowKey struct {
+	ID        uint32
+	Direction string
+}
+
 type Onu struct {
 	ID                  uint32
 	PonPortID           uint32
@@ -62,6 +67,7 @@
 	// FIXME add support for multiple UNIs
 	PortNo           uint32
 	DhcpFlowReceived bool
+	Flows            []FlowKey
 
 	OperState    *fsm.FSM
 	SerialNumber *openolt.SerialNumber
@@ -100,6 +106,7 @@
 		DoneChannel:         make(chan bool, 1),
 		DhcpFlowReceived:    false,
 		DiscoveryRetryDelay: 60 * time.Second, // this is used to send OnuDiscoveryIndications until an activate call is received
+		Flows:               []FlowKey{},
 		DiscoveryDelay:      delay,
 	}
 	o.SerialNumber = o.NewSN(olt.ID, pon.ID, id)
@@ -897,3 +904,17 @@
 		"SerialNumber": common.OnuSnToString(o.SerialNumber),
 	}).Info("Sent DHCP Flow")
 }
+
+// DeleteFlow method search and delete flowKey from the onu flows slice
+func (onu *Onu) DeleteFlow(key FlowKey) {
+	for pos, flowKey := range onu.Flows {
+		if flowKey == key {
+			// delete the flowKey by shifting all flowKeys by one
+			onu.Flows = append(onu.Flows[:pos], onu.Flows[pos+1:]...)
+			t := make([]FlowKey, len(onu.Flows))
+			copy(t, onu.Flows)
+			onu.Flows = t
+			break
+		}
+	}
+}
diff --git a/internal/bbsimctl/commands/olt.go b/internal/bbsimctl/commands/olt.go
index d77cf77..f1553db 100644
--- a/internal/bbsimctl/commands/olt.go
+++ b/internal/bbsimctl/commands/olt.go
@@ -20,13 +20,16 @@
 import (
 	"context"
 	"fmt"
+	"os"
+	"strconv"
+
 	"github.com/jessevdk/go-flags"
+	"github.com/olekukonko/tablewriter"
 	pb "github.com/opencord/bbsim/api/bbsim"
 	"github.com/opencord/bbsim/internal/bbsimctl/config"
 	"github.com/opencord/cordctl/pkg/format"
 	log "github.com/sirupsen/logrus"
 	"google.golang.org/grpc"
-	"os"
 )
 
 const (
@@ -46,6 +49,8 @@
 
 type OltReboot struct{}
 
+type OltFlows struct{}
+
 type oltOptions struct {
 	Get      OltGet          `command:"get"`
 	NNI      OltNNIs         `command:"nnis"`
@@ -54,6 +59,7 @@
 	Poweron  OltPoweron      `command:"poweron"`
 	Reboot   OltReboot       `command:"reboot"`
 	Alarms   OltAlarmOptions `command:"alarms"`
+	Flows    OltFlows        `command:"flows"`
 }
 
 func RegisterOltCommands(parser *flags.Parser) {
@@ -171,3 +177,69 @@
 	fmt.Println(fmt.Sprintf("[Status: %d] %s", res.StatusCode, res.Message))
 	return nil
 }
+
+func (o *OltFlows) Execute(args []string) error {
+	client, conn := connect()
+	defer conn.Close()
+
+	ctx, cancel := context.WithTimeout(context.Background(), config.GlobalConfig.Grpc.Timeout)
+	defer cancel()
+
+	req := pb.ONURequest{}
+	res, err := client.GetFlows(ctx, &req)
+	if err != nil {
+		log.Errorf("Cannot get flows for OLT: %v", err)
+		return err
+	}
+
+	if res.Flows == nil {
+		fmt.Println("OLT has no flows")
+		return nil
+	}
+
+	flowHeader := []string{
+		"access_intf_id",
+		"onu_id",
+		"uni_id",
+		"flow_id",
+		"flow_type",
+		"eth_type",
+		"alloc_id",
+		"network_intf_id",
+		"gemport_id",
+		"classifier",
+		"action",
+		"priority",
+		"cookie",
+		"port_no",
+	}
+
+	tableFlow := tablewriter.NewWriter(os.Stdout)
+	tableFlow.SetRowLine(true)
+	fmt.Fprintf(os.Stdout, "OLT Flows:\n")
+	tableFlow.SetHeader(flowHeader)
+
+	for _, flow := range res.Flows {
+		flowInfo := []string{}
+		flowInfo = append(flowInfo,
+			strconv.Itoa(int(flow.AccessIntfId)),
+			strconv.Itoa(int(flow.OnuId)),
+			strconv.Itoa(int(flow.UniId)),
+			strconv.Itoa(int(flow.FlowId)),
+			flow.FlowType,
+			fmt.Sprintf("%x", flow.Classifier.EthType),
+			strconv.Itoa(int(flow.AllocId)),
+			strconv.Itoa(int(flow.NetworkIntfId)),
+			strconv.Itoa(int(flow.GemportId)),
+			flow.Classifier.String(),
+			flow.Action.String(),
+			strconv.Itoa(int(flow.Priority)),
+			strconv.Itoa(int(flow.Cookie)),
+			strconv.Itoa(int(flow.PortNo)),
+		)
+		tableFlow.Append(flowInfo)
+	}
+	tableFlow.Render()
+	tableFlow.SetNewLine("")
+	return nil
+}
diff --git a/internal/bbsimctl/commands/onu.go b/internal/bbsimctl/commands/onu.go
index 2815501..c331502 100644
--- a/internal/bbsimctl/commands/onu.go
+++ b/internal/bbsimctl/commands/onu.go
@@ -83,6 +83,18 @@
 	} `positional-args:"yes" required:"yes"`
 }
 
+type ONUTrafficSchedulers struct {
+	Args struct {
+		OnuSn OnuSnString
+	} `positional-args:"yes" required:"yes"`
+}
+
+type ONUFlows struct {
+	Args struct {
+		OnuSn OnuSnString
+	} `positional-args:"yes" required:"yes"`
+}
+
 type ONUOptions struct {
 	List              ONUList              `command:"list"`
 	Get               ONUGet               `command:"get"`
@@ -93,12 +105,7 @@
 	Igmp              ONUIgmp              `command:"igmp"`
 	TrafficSchedulers ONUTrafficSchedulers `command:"traffic_schedulers"`
 	Alarms            AlarmOptions         `command:"alarms"`
-}
-
-type ONUTrafficSchedulers struct {
-	Args struct {
-		OnuSn OnuSnString
-	} `positional-args:"yes" required:"yes"`
+	Flows             ONUFlows             `command:"flows"`
 }
 
 func RegisterONUCommands(parser *flags.Parser) {
@@ -302,6 +309,73 @@
 	return nil
 }
 
+func (options *ONUFlows) Execute(args []string) error {
+	client, conn := connect()
+	defer conn.Close()
+
+	ctx, cancel := context.WithTimeout(context.Background(), config.GlobalConfig.Grpc.Timeout)
+	defer cancel()
+	req := pb.ONURequest{
+		SerialNumber: string(options.Args.OnuSn),
+	}
+	res, err := client.GetFlows(ctx, &req)
+	if err != nil {
+		log.Errorf("Cannot get flows for ONU %s: %v", options.Args.OnuSn, err)
+		return err
+	}
+
+	if res.Flows == nil {
+		fmt.Println(fmt.Sprintf("ONU %s has no flows", options.Args.OnuSn))
+		return nil
+	}
+
+	flowHeader := []string{
+		"access_intf_id",
+		"onu_id",
+		"uni_id",
+		"flow_id",
+		"flow_type",
+		"eth_type",
+		"alloc_id",
+		"network_intf_id",
+		"gemport_id",
+		"classifier",
+		"action",
+		"priority",
+		"cookie",
+		"port_no",
+	}
+
+	tableFlow := tablewriter.NewWriter(os.Stdout)
+	tableFlow.SetRowLine(true)
+	fmt.Fprintf(os.Stdout, "ONU Flows:\n")
+	tableFlow.SetHeader(flowHeader)
+
+	for _, flow := range res.Flows {
+		flowInfo := []string{}
+		flowInfo = append(flowInfo,
+			strconv.Itoa(int(flow.AccessIntfId)),
+			strconv.Itoa(int(flow.OnuId)),
+			strconv.Itoa(int(flow.UniId)),
+			strconv.Itoa(int(flow.FlowId)),
+			flow.FlowType,
+			fmt.Sprintf("%x", flow.Classifier.EthType),
+			strconv.Itoa(int(flow.AllocId)),
+			strconv.Itoa(int(flow.NetworkIntfId)),
+			strconv.Itoa(int(flow.GemportId)),
+			flow.Classifier.String(),
+			flow.Action.String(),
+			strconv.Itoa(int(flow.Priority)),
+			strconv.Itoa(int(flow.Cookie)),
+			strconv.Itoa(int(flow.PortNo)),
+		)
+		tableFlow.Append(flowInfo)
+	}
+	tableFlow.Render()
+	tableFlow.SetNewLine("")
+	return nil
+}
+
 func (onuSn *OnuSnString) Complete(match string) []flags.Completion {
 	client, conn := connect()
 	defer conn.Close()