| package core |
| |
| import ( |
| "context" |
| "crypto/tls" |
| "github.com/golang/protobuf/ptypes/empty" |
| "github.com/google/gopacket" |
| "github.com/opencord/voltha/ponsim/v2/common" |
| "github.com/opencord/voltha/protos/go/ponsim" |
| "github.com/sirupsen/logrus" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/connectivity" |
| "google.golang.org/grpc/credentials" |
| "strconv" |
| "strings" |
| "time" |
| ) |
| |
| // TODO: Pass-in the certificate information as a structure parameter |
| |
| /* |
| PonSimOltDevice is the structure responsible for the handling of an OLT device |
| */ |
| type PonSimOltDevice struct { |
| PonSimDevice `json:pon_device` |
| VCoreEndpoint string `json:vcore_ep` |
| MaxOnuCount int `json:max_onu` |
| Onus map[int32]*OnuRegistree `json:onu_registrees` |
| outgoing chan []byte |
| |
| counterLoop *common.IntervalHandler |
| alarmLoop *common.IntervalHandler |
| } |
| |
| /* |
| |
| */ |
| type OnuRegistree struct { |
| Device *PonSimOnuDevice `json:onu_device` |
| Conn *grpc.ClientConn `json:grpc_conn` |
| Client ponsim.PonSimCommonClient `json:client` |
| Stream ponsim.PonSimCommon_ProcessDataClient `json:stream` |
| } |
| |
| const ( |
| BASE_PORT_NUMBER = 128 |
| ) |
| |
| /* |
| NewPonSimOltDevice instantiates a new OLT device structure |
| */ |
| func NewPonSimOltDevice(device PonSimDevice) *PonSimOltDevice { |
| olt := &PonSimOltDevice{PonSimDevice: device} |
| return olt |
| } |
| |
| /* |
| forwardToONU defines a EGRESS function to forward a packet to a specific ONU |
| */ |
| func (o *PonSimOltDevice) forwardToONU(onuPort int32) func(int, gopacket.Packet) { |
| return func(port int, frame gopacket.Packet) { |
| ipAddress := common.GetInterfaceIP(o.ExternalIf) |
| incoming := &ponsim.IncomingData{ |
| Id: "EGRESS.OLT." + ipAddress, |
| Address: ipAddress, |
| Port: int32(port), |
| Payload: frame.Data(), |
| } |
| common.Logger().WithFields(logrus.Fields{ |
| "device": o, |
| "port": port, |
| "frame": frame, |
| }).Debug("Forwarding to ONU") |
| |
| // Forward packet to ONU |
| if err := o.GetOnu(onuPort).Stream.Send(incoming); err != nil { |
| common.Logger().WithFields(logrus.Fields{ |
| "device": o, |
| "frameDump": frame.Dump(), |
| "incoming": incoming, |
| "error": err.Error(), |
| }).Error("A problem occurred while forwarding to ONU") |
| } |
| |
| } |
| } |
| |
| /* |
| forwardToLAN defines an INGRESS function to forward a packet to VOLTHA |
| */ |
| func (o *PonSimOltDevice) forwardToLAN() func(int, gopacket.Packet) { |
| return func(port int, frame gopacket.Packet) { |
| common.Logger().WithFields(logrus.Fields{ |
| "frame": frame.Dump(), |
| }).Info("Sending packet") |
| |
| select { |
| case o.outgoing <- frame.Data(): |
| common.Logger().WithFields(logrus.Fields{ |
| "frame": frame.Dump(), |
| }).Info("Sent packet") |
| default: |
| common.Logger().WithFields(logrus.Fields{ |
| "frame": frame.Dump(), |
| }).Warn("Unable to send packet") |
| } |
| } |
| } |
| |
| /* |
| Start performs setup operations for an OLT device |
| */ |
| func (o *PonSimOltDevice) Start(ctx context.Context) { |
| common.Logger().Info("Starting OLT device...") |
| o.PonSimDevice.Start(ctx) |
| |
| // Open network interfaces for listening |
| o.connectNetworkInterfaces() |
| |
| o.outgoing = make(chan []byte, 1) |
| |
| // Add INGRESS operation |
| o.AddLink(2, 0, o.forwardToLAN()) |
| |
| // Start PM counter logging |
| o.counterLoop = common.NewIntervalHandler(90, o.Counter.LogCounts) |
| o.counterLoop.Start() |
| |
| // Start alarm simulation |
| if o.AlarmsOn { |
| common.Logger().WithFields(logrus.Fields{ |
| "device": o, |
| }).Debug("Starting alarm simulation") |
| |
| alarms := NewPonSimAlarm(o.InternalIf, o.VCoreEndpoint, o.forwardToLAN()) |
| o.alarmLoop = common.NewIntervalHandler(o.AlarmsFreq, alarms.GenerateAlarm) |
| o.alarmLoop.Start() |
| } |
| } |
| |
| /* |
| Stop performs cleanup operations for an OLT device |
| */ |
| func (o *PonSimOltDevice) Stop(ctx context.Context) { |
| common.Logger().Info("Stopping OLT device...") |
| |
| // Stop PM counters loop |
| o.counterLoop.Stop() |
| o.counterLoop = nil |
| |
| // Stop alarm simulation |
| if o.AlarmsOn { |
| o.alarmLoop.Stop() |
| } |
| o.alarmLoop = nil |
| |
| o.ingressHandler.Close() |
| o.egressHandler.Close() |
| |
| o.PonSimDevice.Stop(ctx) |
| } |
| |
| /* |
| ConnectToRemoteOnu establishes communication to a remote ONU device |
| */ |
| func (o *PonSimOltDevice) ConnectToRemoteOnu(onu *OnuRegistree) error { |
| var err error |
| |
| host := strings.Join([]string{ |
| onu.Device.Address, |
| strconv.Itoa(int(onu.Device.Port)), |
| }, ":") |
| |
| common.Logger().WithFields(logrus.Fields{ |
| "device": o, |
| "host": host, |
| }).Debug("Formatting host address") |
| |
| // TODO: make it secure |
| ta := credentials.NewTLS(&tls.Config{ |
| //Certificates: []tls.Certificate{peerCert}, |
| //RootCAs: caCertPool, |
| InsecureSkipVerify: true, |
| }) |
| |
| // GRPC communication needs to be secured |
| if onu.Conn, err = grpc.DialContext( |
| context.Background(), |
| host, |
| grpc.WithTransportCredentials(ta), |
| ); err != nil { |
| common.Logger().WithFields(logrus.Fields{ |
| "device": o, |
| "error": err.Error(), |
| }).Error("Problem with client connection") |
| } |
| |
| return err |
| } |
| |
| /* |
| Listen waits for incoming EGRESS data on the internal interface |
| */ |
| func (o *PonSimOltDevice) Listen(ctx context.Context, port int32) { |
| var reply *empty.Empty |
| var err error |
| |
| // Establish a GRPC connection with the ONU |
| onu := o.GetOnu(port) |
| |
| common.Logger().WithFields(logrus.Fields{ |
| "onu": onu, |
| }).Debug("Connecting to remote ONU") |
| |
| if onu.Client = ponsim.NewPonSimCommonClient(onu.Conn); onu.Client == nil { |
| common.Logger().WithFields(logrus.Fields{ |
| "device": o, |
| }).Error("Problem establishing client connection to ONU") |
| o.RemoveOnu(ctx, port) |
| return |
| } |
| |
| // Prepare stream to ONU to forward incoming data as needed |
| if onu.Stream, err = onu.Client.ProcessData(ctx); err != nil { |
| common.Logger().WithFields(logrus.Fields{ |
| "device": o, |
| }).Error("Problem establishing stream to ONU") |
| o.RemoveOnu(ctx, port) |
| return |
| } |
| |
| defer o.egressHandler.Close() |
| packetSource := gopacket.NewPacketSource(o.egressHandler, o.egressHandler.LinkType()) |
| common.Logger().WithFields(logrus.Fields{ |
| "device": o, |
| "interface": o.InternalIf, |
| }).Debug("Listening to incoming EGRESS data") |
| |
| // Wait for incoming EGRESS data |
| for packet := range packetSource.Packets() { |
| if dot1q := common.GetDot1QLayer(packet); dot1q != nil { |
| common.Logger().WithFields(logrus.Fields{ |
| "device": o, |
| "packet": packet, |
| }).Debug("Received EGRESS packet") |
| |
| o.Forward(ctx, 2, packet) |
| } |
| } |
| |
| common.Logger().WithFields(logrus.Fields{ |
| "device": o, |
| }).Debug("No more packets to process") |
| |
| if reply, err = onu.Stream.CloseAndRecv(); err != nil { |
| common.Logger().WithFields(logrus.Fields{ |
| "device": o, |
| "error": err.Error(), |
| }).Error("A problem occurred while closing client stream") |
| } else { |
| common.Logger().WithFields(logrus.Fields{ |
| "device": o, |
| "reply": reply, |
| }).Warn("Client stream closed") |
| } |
| } |
| |
| /* |
| GetOnus returns the list of registered ONU devices |
| */ |
| func (o *PonSimOltDevice) GetOnus() map[int32]*OnuRegistree { |
| if o.Onus == nil { |
| o.Onus = make(map[int32]*OnuRegistree) |
| } |
| |
| return o.Onus |
| } |
| |
| /* |
| GetOnu return a specific registered ONU |
| */ |
| func (o *PonSimOltDevice) GetOnu(index int32) *OnuRegistree { |
| var onu *OnuRegistree |
| var ok bool |
| |
| if onu, ok = (o.GetOnus())[index]; ok { |
| return onu |
| } |
| |
| return nil |
| } |
| |
| func (o *PonSimOltDevice) GetOutgoing() chan []byte { |
| return o.outgoing |
| } |
| |
| /* |
| nextAvailablePort returns a port that is not already used by a registered ONU |
| */ |
| func (o *PonSimOltDevice) nextAvailablePort() int32 { |
| var port int32 = BASE_PORT_NUMBER |
| |
| if len(o.GetOnus()) < o.MaxOnuCount { |
| for { |
| if o.GetOnu(port) != nil { |
| // port is already used |
| port += 1 |
| } else { |
| // port is available... use it |
| return port |
| } |
| } |
| } else { |
| // OLT has reached its max number of ONUs |
| return -1 |
| } |
| } |
| |
| /* |
| AddOnu registers an ONU device and sets up all required monitoring and connections |
| */ |
| func (o *PonSimOltDevice) AddOnu(onu *PonSimOnuDevice) (int32, error) { |
| var portNum int32 |
| ctx := context.Background() |
| |
| if portNum = o.nextAvailablePort(); portNum != -1 { |
| common.Logger().WithFields(logrus.Fields{ |
| "device": o, |
| "port": portNum, |
| "onu": onu, |
| }).Info("Adding ONU") |
| |
| registree := &OnuRegistree{Device: onu} |
| |
| // Setup GRPC communication and check if it succeeded |
| if err := o.ConnectToRemoteOnu(registree); err == nil { |
| o.GetOnus()[portNum] = registree |
| |
| o.AddLink(1, int(portNum), o.forwardToONU(portNum)) |
| go o.MonitorOnu(ctx, portNum) |
| go o.Listen(ctx, portNum) |
| } |
| |
| } else { |
| common.Logger().WithFields(logrus.Fields{ |
| "device": o, |
| }).Warn("ONU Map is full") |
| } |
| |
| return int32(portNum), nil |
| } |
| |
| /* |
| RemoveOnu removes the reference to a registered ONU |
| */ |
| func (o *PonSimOltDevice) RemoveOnu(ctx context.Context, onuIndex int32) error { |
| onu := o.GetOnu(onuIndex) |
| if err := onu.Conn.Close(); err != nil { |
| common.Logger().WithFields(logrus.Fields{ |
| "device": o, |
| "onu": onu.Device, |
| "onuIndex": onuIndex, |
| }).Error("Problem closing connection to ONU") |
| } |
| |
| common.Logger().WithFields(logrus.Fields{ |
| "device": o, |
| "onu": onu, |
| "onuIndex": onuIndex, |
| }).Info("Removing ONU") |
| |
| delete(o.Onus, onuIndex) |
| |
| // Remove link entries for this ONU |
| o.RemoveLink(1, int(onuIndex)) |
| |
| return nil |
| } |
| |
| /* |
| MonitorOnu verifies the connection status of a specific ONU and cleans up as necessary |
| */ |
| func (o *PonSimOltDevice) MonitorOnu(ctx context.Context, onuIndex int32) { |
| for { |
| if o.GetOnu(onuIndex) != nil { |
| if conn := o.GetOnu(onuIndex).Conn; conn.GetState() == connectivity.Ready { |
| // Wait for any change to occur |
| conn.WaitForStateChange(ctx, conn.GetState()) |
| // We lost communication with the ONU ... remove it |
| o.RemoveOnu(ctx, onuIndex) |
| return |
| } |
| common.Logger().WithFields(logrus.Fields{ |
| "device": o, |
| "ctx": ctx, |
| "onuIndex": onuIndex, |
| }).Debug("ONU is not ready") |
| time.Sleep(1 * time.Second) |
| } else { |
| return |
| } |
| } |
| } |