SEBA-909 store flows and implement GetFlow API

Change-Id: If2c214f8be5808ef89e0521f75d03da49153dc2a
diff --git a/internal/bbsimctl/commands/onu.go b/internal/bbsimctl/commands/onu.go
index 2815501..c331502 100644
--- a/internal/bbsimctl/commands/onu.go
+++ b/internal/bbsimctl/commands/onu.go
@@ -83,6 +83,18 @@
 	} `positional-args:"yes" required:"yes"`
 }
 
+type ONUTrafficSchedulers struct {
+	Args struct {
+		OnuSn OnuSnString
+	} `positional-args:"yes" required:"yes"`
+}
+
+type ONUFlows struct {
+	Args struct {
+		OnuSn OnuSnString
+	} `positional-args:"yes" required:"yes"`
+}
+
 type ONUOptions struct {
 	List              ONUList              `command:"list"`
 	Get               ONUGet               `command:"get"`
@@ -93,12 +105,7 @@
 	Igmp              ONUIgmp              `command:"igmp"`
 	TrafficSchedulers ONUTrafficSchedulers `command:"traffic_schedulers"`
 	Alarms            AlarmOptions         `command:"alarms"`
-}
-
-type ONUTrafficSchedulers struct {
-	Args struct {
-		OnuSn OnuSnString
-	} `positional-args:"yes" required:"yes"`
+	Flows             ONUFlows             `command:"flows"`
 }
 
 func RegisterONUCommands(parser *flags.Parser) {
@@ -302,6 +309,73 @@
 	return nil
 }
 
+func (options *ONUFlows) Execute(args []string) error {
+	client, conn := connect()
+	defer conn.Close()
+
+	ctx, cancel := context.WithTimeout(context.Background(), config.GlobalConfig.Grpc.Timeout)
+	defer cancel()
+	req := pb.ONURequest{
+		SerialNumber: string(options.Args.OnuSn),
+	}
+	res, err := client.GetFlows(ctx, &req)
+	if err != nil {
+		log.Errorf("Cannot get flows for ONU %s: %v", options.Args.OnuSn, err)
+		return err
+	}
+
+	if res.Flows == nil {
+		fmt.Println(fmt.Sprintf("ONU %s has no flows", options.Args.OnuSn))
+		return nil
+	}
+
+	flowHeader := []string{
+		"access_intf_id",
+		"onu_id",
+		"uni_id",
+		"flow_id",
+		"flow_type",
+		"eth_type",
+		"alloc_id",
+		"network_intf_id",
+		"gemport_id",
+		"classifier",
+		"action",
+		"priority",
+		"cookie",
+		"port_no",
+	}
+
+	tableFlow := tablewriter.NewWriter(os.Stdout)
+	tableFlow.SetRowLine(true)
+	fmt.Fprintf(os.Stdout, "ONU Flows:\n")
+	tableFlow.SetHeader(flowHeader)
+
+	for _, flow := range res.Flows {
+		flowInfo := []string{}
+		flowInfo = append(flowInfo,
+			strconv.Itoa(int(flow.AccessIntfId)),
+			strconv.Itoa(int(flow.OnuId)),
+			strconv.Itoa(int(flow.UniId)),
+			strconv.Itoa(int(flow.FlowId)),
+			flow.FlowType,
+			fmt.Sprintf("%x", flow.Classifier.EthType),
+			strconv.Itoa(int(flow.AllocId)),
+			strconv.Itoa(int(flow.NetworkIntfId)),
+			strconv.Itoa(int(flow.GemportId)),
+			flow.Classifier.String(),
+			flow.Action.String(),
+			strconv.Itoa(int(flow.Priority)),
+			strconv.Itoa(int(flow.Cookie)),
+			strconv.Itoa(int(flow.PortNo)),
+		)
+		tableFlow.Append(flowInfo)
+	}
+	tableFlow.Render()
+	tableFlow.SetNewLine("")
+	return nil
+}
+
 func (onuSn *OnuSnString) Complete(match string) []flags.Completion {
 	client, conn := connect()
 	defer conn.Close()