[SEBA-836] BBSim Reflector
Change-Id: Ib4ae5a2c24880dc62209bebb81188eca5f57865d
diff --git a/internal/bbr/devices/olt.go b/internal/bbr/devices/olt.go
new file mode 100644
index 0000000..103be3b
--- /dev/null
+++ b/internal/bbr/devices/olt.go
@@ -0,0 +1,380 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package devices
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "github.com/google/gopacket"
+ "github.com/google/gopacket/layers"
+ "github.com/opencord/bbsim/internal/bbsim/devices"
+ "github.com/opencord/bbsim/internal/bbsim/packetHandlers"
+ "github.com/opencord/bbsim/internal/common"
+ "github.com/opencord/voltha-protos/go/openolt"
+ log "github.com/sirupsen/logrus"
+ "google.golang.org/grpc"
+ "io"
+ "reflect"
+ "time"
+)
+
+type OltMock struct {
+ Olt *devices.OltDevice
+ BBSimIp string
+ BBSimPort string
+ BBSimApiPort string
+
+ conn *grpc.ClientConn
+
+ TargetOnus int
+ CompletedOnus int // Number of ONUs that have received a DHCPAck
+}
+
+// trigger an enable call and start the same listeners on the gRPC stream that VOLTHA would create
+// this method is blocking
+func (o *OltMock) Start() {
+ log.Info("Starting Mock OLT")
+
+ for _, pon := range o.Olt.Pons {
+ for _, onu := range pon.Onus {
+ log.Debugf("Created ONU: %s (%d:%d)", onu.Sn(), onu.STag, onu.CTag)
+ }
+ }
+
+ client, conn := Connect(o.BBSimIp, o.BBSimPort)
+ o.conn = conn
+ defer conn.Close()
+
+ deviceInfo, err := o.getDeviceInfo(client)
+
+ if err != nil {
+ log.WithFields(log.Fields{
+ "error": err,
+ }).Fatal("Can't read device info")
+ }
+
+ log.WithFields(log.Fields{
+ "Vendor": deviceInfo.Vendor,
+ "Model": deviceInfo.Model,
+ "DeviceSerialNumber": deviceInfo.DeviceSerialNumber,
+ "PonPorts": deviceInfo.PonPorts,
+ }).Info("Retrieved device info")
+
+ o.readIndications(client)
+
+}
+
+func (o *OltMock) getDeviceInfo(client openolt.OpenoltClient) (*openolt.DeviceInfo, error) {
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ return client.GetDeviceInfo(ctx, new(openolt.Empty))
+}
+
+func (o *OltMock) getOnuByTags(sTag int, cTag int) (*devices.Onu, error) {
+
+ for _, pon := range o.Olt.Pons {
+ for _, onu := range pon.Onus {
+ if onu.STag == sTag && onu.CTag == cTag {
+ return onu, nil
+ }
+ }
+ }
+
+ return nil, errors.New("cant-find-onu-by-c-s-tags")
+}
+
+func (o *OltMock) readIndications(client openolt.OpenoltClient) {
+ defer func() {
+ log.Info("OLT readIndications done")
+ }()
+
+ // Tell the OLT to start sending indications
+ indications, err := client.EnableIndication(context.Background(), new(openolt.Empty))
+ if err != nil {
+ log.WithFields(log.Fields{
+ "error": err,
+ }).Error("Failed to enable indication stream")
+ return
+ }
+
+ // listen for indications
+ for {
+ indication, err := indications.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+
+ // the connection is closed once we have sent the DHCP_ACK packet to all of the ONUs
+ // it means BBR completed, it's not an error
+
+ log.WithFields(log.Fields{
+ "error": err,
+ }).Debug("Failed to read from indications")
+ break
+ }
+
+ o.handleIndication(client, indication)
+ }
+}
+
+func (o *OltMock) handleIndication(client openolt.OpenoltClient, indication *openolt.Indication) {
+ switch indication.Data.(type) {
+ case *openolt.Indication_OltInd:
+ log.Info("Received Indication_OltInd")
+ case *openolt.Indication_IntfInd:
+ log.Info("Received Indication_IntfInd")
+ case *openolt.Indication_IntfOperInd:
+ log.Info("Received Indication_IntfOperInd")
+ case *openolt.Indication_OnuDiscInd:
+ onuDiscInd := indication.GetOnuDiscInd()
+ o.handleOnuDiscIndication(client, onuDiscInd)
+ case *openolt.Indication_OnuInd:
+ onuInd := indication.GetOnuInd()
+ o.handleOnuIndication(client, onuInd)
+ case *openolt.Indication_OmciInd:
+ omciIndication := indication.GetOmciInd()
+ o.handleOmciIndication(client, omciIndication)
+ case *openolt.Indication_PktInd:
+ pktIndication := indication.GetPktInd()
+ o.handlePktIndication(client, pktIndication)
+ case *openolt.Indication_PortStats:
+ case *openolt.Indication_FlowStats:
+ case *openolt.Indication_AlarmInd:
+ default:
+ log.WithFields(log.Fields{
+ "data": indication.Data,
+ "type": reflect.TypeOf(indication.Data),
+ }).Warn("Indication unsupported")
+ }
+}
+
+func (o *OltMock) handleOnuDiscIndication(client openolt.OpenoltClient, onuDiscInd *openolt.OnuDiscIndication) {
+ log.WithFields(log.Fields{
+ "IntfId": onuDiscInd.IntfId,
+ "SerialNumber": common.OnuSnToString(onuDiscInd.SerialNumber),
+ }).Info("Received Onu discovery indication")
+
+ onu, err := o.Olt.FindOnuBySn(common.OnuSnToString(onuDiscInd.SerialNumber))
+
+ if err != nil {
+ log.WithFields(log.Fields{
+ "IntfId": onuDiscInd.IntfId,
+ "SerialNumber": common.OnuSnToString(onuDiscInd.SerialNumber),
+ }).Fatal("Cannot find ONU")
+ }
+
+ var pir uint32 = 1000000
+ Onu := openolt.Onu{
+ IntfId: onu.PonPortID,
+ OnuId: onu.ID,
+ SerialNumber: onu.SerialNumber,
+ Pir: pir,
+ }
+
+ if _, err := client.ActivateOnu(context.Background(), &Onu); err != nil {
+ log.WithFields(log.Fields{
+ "IntfId": onuDiscInd.IntfId,
+ "SerialNumber": common.OnuSnToString(onuDiscInd.SerialNumber),
+ }).Error("Failed to activate ONU")
+ }
+}
+
+func (o *OltMock) handleOnuIndication(client openolt.OpenoltClient, onuInd *openolt.OnuIndication) {
+ log.WithFields(log.Fields{
+ "IntfId": onuInd.IntfId,
+ "SerialNumber": common.OnuSnToString(onuInd.SerialNumber),
+ }).Info("Received Onu indication")
+
+ onu, err := o.Olt.FindOnuBySn(common.OnuSnToString(onuInd.SerialNumber))
+
+ if err != nil {
+ log.WithFields(log.Fields{
+ "IntfId": onuInd.IntfId,
+ "SerialNumber": common.OnuSnToString(onuInd.SerialNumber),
+ }).Fatal("Cannot find ONU")
+ }
+
+ go onu.ProcessOnuMessages(nil, client)
+
+ go func() {
+
+ defer func() {
+ log.WithFields(log.Fields{
+ "onuSn": common.OnuSnToString(onuInd.SerialNumber),
+ "CompletedOnus": o.CompletedOnus,
+ "TargetOnus": o.TargetOnus,
+ }).Debugf("Onu done")
+
+ }()
+
+ for message := range onu.DoneChannel {
+ if message == true {
+ o.CompletedOnus++
+ if o.CompletedOnus == o.TargetOnus {
+ // NOTE once all the ONUs are completed, exit
+ // closing the connection is not the most elegant way,
+ // but I haven't found any other way to stop
+ // the indications.Recv() infinite loop
+ log.Info("Simulation Done")
+ ValidateAndClose(o)
+ }
+
+ break
+ }
+ }
+
+ }()
+
+ // TODO change the state instead of calling an ONU method from here
+ onu.StartOmci(client)
+}
+
+func (o *OltMock) handleOmciIndication(client openolt.OpenoltClient, omciInd *openolt.OmciIndication) {
+
+ pon, err := o.Olt.GetPonById(omciInd.IntfId)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "OnuId": omciInd.OnuId,
+ "IntfId": omciInd.IntfId,
+ "err": err,
+ }).Fatal("Can't find PonPort")
+ }
+ onu, _ := pon.GetOnuById(omciInd.OnuId)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "OnuId": omciInd.OnuId,
+ "IntfId": omciInd.IntfId,
+ "err": err,
+ }).Fatal("Can't find Onu")
+ }
+
+ log.WithFields(log.Fields{
+ "IntfId": onu.PonPortID,
+ "OnuId": onu.ID,
+ "OnuSn": onu.Sn(),
+ "Pkt": omciInd.Pkt,
+ }).Trace("Received Onu omci indication")
+
+ msg := devices.Message{
+ Type: devices.OmciIndication,
+ Data: devices.OmciIndicationMessage{
+ OnuSN: onu.SerialNumber,
+ OnuID: onu.ID,
+ OmciInd: omciInd,
+ },
+ }
+ onu.Channel <- msg
+}
+
+func (o *OltMock) handlePktIndication(client openolt.OpenoltClient, pktIndication *openolt.PacketIndication) {
+
+ pkt := gopacket.NewPacket(pktIndication.Pkt, layers.LayerTypeEthernet, gopacket.Default)
+
+ pktType, err := packetHandlers.IsEapolOrDhcp(pkt)
+
+ if err != nil {
+ log.Warnf("Ignoring packet as it's neither EAPOL or DHCP")
+ return
+ }
+
+ log.WithFields(log.Fields{
+ "IntfType": pktIndication.IntfType,
+ "IntfId": pktIndication.IntfId,
+ "GemportId": pktIndication.GemportId,
+ "FlowId": pktIndication.FlowId,
+ "PortNo": pktIndication.PortNo,
+ "Cookie": pktIndication.Cookie,
+ "pktType": pktType,
+ }).Trace("Received PktIndication")
+
+ msg := devices.Message{}
+ if pktIndication.IntfType == "nni" {
+ // This is an packet that is arriving from the NNI and needs to be sent to an ONU
+ // in this case we need to fin the ONU from the C/S tags
+ // TODO: handle errors in the untagging process
+ sTag, _ := packetHandlers.GetVlanTag(pkt)
+ singleTagPkt, _ := packetHandlers.PopSingleTag(pkt)
+ cTag, _ := packetHandlers.GetVlanTag(singleTagPkt)
+
+ onu, err := o.getOnuByTags(int(sTag), int(cTag))
+
+ if err != nil {
+ log.WithFields(log.Fields{
+ "sTag": sTag,
+ "cTag": cTag,
+ }).Fatalf("Can't find ONU from c/s tags")
+ }
+
+ msg = devices.Message{
+ Type: devices.OnuPacketIn,
+ Data: devices.OnuPacketMessage{
+ IntfId: pktIndication.IntfId,
+ OnuId: onu.ID,
+ Packet: pkt,
+ Type: pktType,
+ },
+ }
+ // NOTE we send it on the ONU channel so that is handled as all the others packets in a separate thread
+ onu.Channel <- msg
+ } else {
+ // TODO a very similar construct is used in many places,
+ // abstract this in an OLT method
+ pon, err := o.Olt.GetPonById(pktIndication.IntfId)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "OnuId": pktIndication.PortNo,
+ "IntfId": pktIndication.IntfId,
+ "err": err,
+ }).Fatal("Can't find PonPort")
+ }
+ onu, err := pon.GetOnuById(pktIndication.PortNo)
+ if err != nil {
+ log.WithFields(log.Fields{
+ "OnuId": pktIndication.PortNo,
+ "IntfId": pktIndication.IntfId,
+ "err": err,
+ }).Fatal("Can't find Onu")
+ }
+ // NOTE when we push the EAPOL flow we set the PortNo = OnuId for convenience sake
+ // BBsim responds setting the port number that was sent with the flow
+ msg = devices.Message{
+ Type: devices.OnuPacketIn,
+ Data: devices.OnuPacketMessage{
+ IntfId: pktIndication.IntfId,
+ OnuId: pktIndication.PortNo,
+ Packet: pkt,
+ Type: pktType,
+ },
+ }
+ onu.Channel <- msg
+ }
+}
+
+// TODO Move in a different file
+func Connect(ip string, port string) (openolt.OpenoltClient, *grpc.ClientConn) {
+ server := fmt.Sprintf("%s:%s", ip, port)
+ conn, err := grpc.Dial(server, grpc.WithInsecure())
+
+ if err != nil {
+ log.Fatalf("did not connect: %v", err)
+ return nil, conn
+ }
+ return openolt.NewOpenoltClient(conn), conn
+}
diff --git a/internal/bbr/devices/validate.go b/internal/bbr/devices/validate.go
new file mode 100644
index 0000000..cb2551f
--- /dev/null
+++ b/internal/bbr/devices/validate.go
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package devices
+
+import (
+ "context"
+ "fmt"
+ "github.com/opencord/bbsim/api/bbsim"
+ log "github.com/sirupsen/logrus"
+ "google.golang.org/grpc"
+ "time"
+)
+
+func ValidateAndClose(olt *OltMock) {
+
+ // connect to the BBSim control APIs to check that all the ONUs are in the correct state
+ client, conn := ApiConnect(olt.BBSimIp, olt.BBSimApiPort)
+ defer conn.Close()
+
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ onus, err := client.GetONUs(ctx, &bbsim.Empty{})
+
+ if err != nil {
+ log.WithFields(log.Fields{
+ "error": err,
+ }).Fatalf("Can't reach BBSim API")
+ }
+
+ expectedState := "dhcp_ack_received"
+
+ res := true
+ for _, onu := range onus.Items {
+ if onu.InternalState != expectedState {
+ res = false
+ log.WithFields(log.Fields{
+ "OnuSN": onu.SerialNumber,
+ "OnuId": onu.ID,
+ "InternalState": onu.InternalState,
+ "ExpectedSatte": expectedState,
+ }).Error("Not matching expected state")
+ }
+ }
+
+ if res == true {
+ log.WithFields(log.Fields{
+ "ExpectedState": expectedState,
+ }).Infof("%d ONUs matching expected state", len(onus.Items))
+ }
+
+ olt.conn.Close()
+}
+
+func ApiConnect(ip string, port string) (bbsim.BBSimClient, *grpc.ClientConn) {
+ server := fmt.Sprintf("%s:%s", ip, port)
+ conn, err := grpc.Dial(server, grpc.WithInsecure())
+
+ if err != nil {
+ log.Fatalf("did not connect: %v", err)
+ return nil, conn
+ }
+ return bbsim.NewBBSimClient(conn), conn
+}