| /* |
| * Copyright 2018-present Open Networking Foundation |
| |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package devices |
| |
| import ( |
| "context" |
| "encoding/hex" |
| "fmt" |
| "net" |
| "strconv" |
| "sync" |
| "time" |
| |
| "github.com/opencord/voltha-protos/v5/go/extension" |
| |
| "github.com/opencord/bbsim/internal/bbsim/responders/dhcp" |
| "github.com/opencord/bbsim/internal/bbsim/types" |
| omcilib "github.com/opencord/bbsim/internal/common/omci" |
| "github.com/opencord/voltha-protos/v5/go/ext/config" |
| |
| "github.com/google/gopacket" |
| "github.com/google/gopacket/layers" |
| "github.com/looplab/fsm" |
| "github.com/opencord/bbsim/internal/bbsim/packetHandlers" |
| "github.com/opencord/bbsim/internal/common" |
| "github.com/opencord/voltha-protos/v5/go/openolt" |
| "github.com/opencord/voltha-protos/v5/go/tech_profile" |
| log "github.com/sirupsen/logrus" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/reflection" |
| "google.golang.org/grpc/status" |
| ) |
| |
| var oltLogger = log.WithFields(log.Fields{ |
| "module": "OLT", |
| }) |
| |
| const ( |
| onuIdStart = 1 |
| // Least pon port on the real OLTs we test with today have 16 ports (not including the pluggable OLTs). |
| // Given that we have to support 512 ONUs per OLT, we have to have up to 32 ONU per PON (could be less |
| // on a higher density port OLT). |
| // We pass just enough limits here to ensure the resource pool is as |
| // compact as possible on the etcd to reduce storage. |
| onuIdEnd = onuIdStart + 31 |
| allocIdStart = 1024 |
| allocIdPerOnu = 4 |
| allocIdEnd = allocIdStart + (onuIdEnd-onuIdStart+1)*allocIdPerOnu // up to 4 alloc-id per ONU |
| gemPortIdPerAllocId = 8 |
| gemportIdStart = 1024 |
| gemportIdEnd = gemportIdStart + (onuIdEnd-onuIdStart+1)*allocIdPerOnu*gemPortIdPerAllocId // up to 8 gemport-id per tcont/alloc-id |
| // The flow ids are no more necessary by the adapter, but still need to pass something dummy. Pass a very small valid range. |
| flowIdStart = 1 |
| flowIdEnd = flowIdStart + 1 |
| ) |
| |
| type OltDevice struct { |
| sync.Mutex |
| OltServer *grpc.Server |
| |
| // BBSIM Internals |
| ID int |
| SerialNumber string |
| NumNni int |
| NumPon int |
| NumOnuPerPon int |
| NumUni int |
| NumPots int |
| InternalState *fsm.FSM |
| channel chan types.Message |
| dhcpServer dhcp.DHCPServerIf |
| Flows sync.Map |
| Delay int |
| ControlledActivation mode |
| EventChannel chan common.Event |
| PublishEvents bool |
| PortStatsInterval int |
| PreviouslyConnected bool |
| |
| Pons []*PonPort |
| Nnis []*NniPort |
| |
| // OLT Attributes |
| OperState *fsm.FSM |
| |
| enableContext context.Context |
| enableContextCancel context.CancelFunc |
| |
| OpenoltStream openolt.Openolt_EnableIndicationServer |
| enablePerf bool |
| |
| // Allocated Resources |
| // this data are to verify that the openolt adapter does not duplicate resources |
| AllocIDsLock sync.RWMutex |
| AllocIDs map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool // map[ponPortId]map[OnuId]map[PortNo]map[AllocIds]map[FlowId]bool |
| GemPortIDsLock sync.RWMutex |
| GemPortIDs map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool // map[ponPortId]map[OnuId]map[PortNo]map[GemPortIDs]map[FlowId]bool |
| OmciResponseRate uint8 |
| } |
| |
| var olt OltDevice |
| |
| func GetOLT() *OltDevice { |
| return &olt |
| } |
| |
| func CreateOLT(options common.GlobalConfig, services []common.ServiceYaml, isMock bool) *OltDevice { |
| oltLogger.WithFields(log.Fields{ |
| "ID": options.Olt.ID, |
| "NumNni": options.Olt.NniPorts, |
| "NumPon": options.Olt.PonPorts, |
| "NumOnuPerPon": options.Olt.OnusPonPort, |
| "NumUni": options.Olt.UniPorts, |
| "NumPots": options.Olt.PotsPorts, |
| }).Debug("CreateOLT") |
| |
| olt = OltDevice{ |
| ID: options.Olt.ID, |
| SerialNumber: fmt.Sprintf("BBSIM_OLT_%d", options.Olt.ID), |
| OperState: getOperStateFSM(func(e *fsm.Event) { |
| oltLogger.Debugf("Changing OLT OperState from %s to %s", e.Src, e.Dst) |
| }), |
| NumNni: int(options.Olt.NniPorts), |
| NumPon: int(options.Olt.PonPorts), |
| NumOnuPerPon: int(options.Olt.OnusPonPort), |
| NumUni: int(options.Olt.UniPorts), |
| NumPots: int(options.Olt.PotsPorts), |
| Pons: []*PonPort{}, |
| Nnis: []*NniPort{}, |
| Delay: options.BBSim.Delay, |
| enablePerf: options.BBSim.EnablePerf, |
| PublishEvents: options.BBSim.Events, |
| PortStatsInterval: options.Olt.PortStatsInterval, |
| dhcpServer: dhcp.NewDHCPServer(), |
| PreviouslyConnected: false, |
| AllocIDs: make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool), |
| GemPortIDs: make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool), |
| OmciResponseRate: options.Olt.OmciResponseRate, |
| } |
| |
| if val, ok := ControlledActivationModes[options.BBSim.ControlledActivation]; ok { |
| olt.ControlledActivation = val |
| } else { |
| // FIXME throw an error if the ControlledActivation is not valid |
| oltLogger.Warn("Unknown ControlledActivation Mode given, running in Default mode") |
| olt.ControlledActivation = Default |
| } |
| |
| // OLT State machine |
| // NOTE do we need 2 state machines for the OLT? (InternalState and OperState) |
| olt.InternalState = fsm.NewFSM( |
| "created", |
| fsm.Events{ |
| {Name: "initialize", Src: []string{"created", "deleted"}, Dst: "initialized"}, |
| {Name: "enable", Src: []string{"initialized", "disabled"}, Dst: "enabled"}, |
| {Name: "disable", Src: []string{"enabled"}, Dst: "disabled"}, |
| // delete event in enabled state below is for reboot OLT case. |
| {Name: "delete", Src: []string{"disabled", "enabled"}, Dst: "deleted"}, |
| }, |
| fsm.Callbacks{ |
| "enter_state": func(e *fsm.Event) { |
| oltLogger.Debugf("Changing OLT InternalState from %s to %s", e.Src, e.Dst) |
| }, |
| "enter_initialized": func(e *fsm.Event) { olt.InitOlt() }, |
| "enter_deleted": func(e *fsm.Event) { |
| // remove all the resource allocations |
| olt.clearAllResources() |
| }, |
| }, |
| ) |
| |
| if !isMock { |
| // create NNI Port |
| nniPort, err := CreateNNI(&olt) |
| if err != nil { |
| oltLogger.Fatalf("Couldn't create NNI Port: %v", err) |
| } |
| |
| olt.Nnis = append(olt.Nnis, &nniPort) |
| } |
| |
| // Create device and Services |
| nextCtag := map[string]int{} |
| nextStag := map[string]int{} |
| |
| // create PON ports |
| for i := 0; i < olt.NumPon; i++ { |
| |
| // initialize the resource maps for every PON Ports |
| olt.AllocIDs[uint32(i)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool) |
| olt.GemPortIDs[uint32(i)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool) |
| |
| p := CreatePonPort(&olt, uint32(i)) |
| |
| // create ONU devices |
| for j := 0; j < olt.NumOnuPerPon; j++ { |
| delay := time.Duration(olt.Delay*j) * time.Millisecond |
| o := CreateONU(&olt, p, uint32(j+1), delay, nextCtag, nextStag, isMock) |
| |
| p.Onus = append(p.Onus, o) |
| } |
| olt.Pons = append(olt.Pons, p) |
| } |
| |
| if !isMock { |
| if err := olt.InternalState.Event("initialize"); err != nil { |
| log.Errorf("Error initializing OLT: %v", err) |
| return nil |
| } |
| } |
| |
| if olt.PublishEvents { |
| log.Debugf("BBSim event publishing is enabled") |
| // Create a channel to write event messages |
| olt.EventChannel = make(chan common.Event, 100) |
| } |
| |
| return &olt |
| } |
| |
| func (o *OltDevice) InitOlt() { |
| |
| if o.OltServer == nil { |
| o.OltServer, _ = o.StartOltServer() |
| } else { |
| oltLogger.Fatal("OLT server already running.") |
| } |
| |
| // create new channel for processOltMessages Go routine |
| o.channel = make(chan types.Message) |
| |
| // FIXME we are assuming we have only one NNI |
| if o.Nnis[0] != nil { |
| // NOTE we want to make sure the state is down when we initialize the OLT, |
| // the NNI may be in a bad state after a disable/reboot as we are not disabling it for |
| // in-band management |
| o.Nnis[0].OperState.SetState("down") |
| } |
| |
| for ponId := range o.Pons { |
| // initialize the resource maps for every PON Ports |
| olt.AllocIDs[uint32(ponId)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool) |
| olt.GemPortIDs[uint32(ponId)] = make(map[uint32]map[uint32]map[int32]map[uint64]bool) |
| } |
| } |
| |
| func (o *OltDevice) RestartOLT() error { |
| |
| o.PreviouslyConnected = false |
| |
| softReboot := false |
| rebootDelay := common.Config.Olt.OltRebootDelay |
| |
| oltLogger.WithFields(log.Fields{ |
| "oltId": o.ID, |
| }).Infof("Simulating OLT restart... (%ds)", rebootDelay) |
| |
| if o.InternalState.Is("enabled") { |
| oltLogger.WithFields(log.Fields{ |
| "oltId": o.ID, |
| }).Info("This is an OLT soft reboot") |
| softReboot = true |
| } |
| |
| // transition internal state to deleted |
| if err := o.InternalState.Event("delete"); err != nil { |
| oltLogger.WithFields(log.Fields{ |
| "oltId": o.ID, |
| }).Errorf("Error deleting OLT: %v", err) |
| return err |
| } |
| |
| if softReboot { |
| for _, pon := range o.Pons { |
| if pon.InternalState.Current() == "enabled" { |
| // disable PONs |
| msg := types.Message{ |
| Type: types.PonIndication, |
| Data: types.PonIndicationMessage{ |
| OperState: types.DOWN, |
| PonPortID: pon.ID, |
| }, |
| } |
| o.channel <- msg |
| } |
| |
| for _, onu := range pon.Onus { |
| _ = onu.InternalState.Event(OnuTxDisable) |
| } |
| } |
| } else { |
| // PONs are already handled in the Disable call |
| for _, pon := range olt.Pons { |
| // ONUs are not automatically disabled when a PON goes down |
| // as it's possible that it's an admin down and in that case the ONUs need to keep their state |
| for _, onu := range pon.Onus { |
| _ = onu.InternalState.Event(OnuTxDisable) |
| } |
| } |
| } |
| |
| time.Sleep(1 * time.Second) // we need to give the OLT the time to respond to all the pending gRPC request before stopping the server |
| o.StopOltServer() |
| |
| // terminate the OLT's processOltMessages go routine |
| close(o.channel) |
| |
| o.enableContextCancel() |
| time.Sleep(time.Duration(rebootDelay) * time.Second) |
| |
| if err := o.InternalState.Event("initialize"); err != nil { |
| oltLogger.WithFields(log.Fields{ |
| "oltId": o.ID, |
| }).Errorf("Error initializing OLT: %v", err) |
| return err |
| } |
| oltLogger.WithFields(log.Fields{ |
| "oltId": o.ID, |
| }).Info("OLT restart completed") |
| return nil |
| } |
| |
| // newOltServer launches a new grpc server for OpenOLT |
| func (o *OltDevice) newOltServer() (*grpc.Server, error) { |
| address := common.Config.BBSim.OpenOltAddress |
| lis, err := net.Listen("tcp", address) |
| if err != nil { |
| oltLogger.Fatalf("OLT failed to listen: %v", err) |
| } |
| grpcServer := grpc.NewServer() |
| |
| openolt.RegisterOpenoltServer(grpcServer, o) |
| |
| reflection.Register(grpcServer) |
| |
| go func() { _ = grpcServer.Serve(lis) }() |
| oltLogger.Debugf("OLT listening on %v", address) |
| |
| return grpcServer, nil |
| } |
| |
| // StartOltServer will create the grpc server that VOLTHA uses |
| // to communicate with the device |
| func (o *OltDevice) StartOltServer() (*grpc.Server, error) { |
| oltServer, err := o.newOltServer() |
| if err != nil { |
| oltLogger.WithFields(log.Fields{ |
| "err": err, |
| }).Error("Cannot OLT gRPC server") |
| return nil, err |
| } |
| return oltServer, nil |
| } |
| |
| // StopOltServer stops the OpenOLT grpc server |
| func (o *OltDevice) StopOltServer() { |
| if o.OltServer != nil { |
| oltLogger.WithFields(log.Fields{ |
| "oltId": o.SerialNumber, |
| }).Warnf("Stopping OLT gRPC server") |
| o.OltServer.Stop() |
| o.OltServer = nil |
| } |
| } |
| |
| // Device Methods |
| |
| // Enable implements the OpenOLT EnableIndicationServer functionality |
| func (o *OltDevice) Enable(stream openolt.Openolt_EnableIndicationServer) { |
| oltLogger.Debug("Enable OLT called") |
| rebootFlag := false |
| |
| // If enabled has already been called then an enabled context has |
| // been created. If this is the case then we want to cancel all the |
| // proessing loops associated with that enable before we recreate |
| // new ones |
| o.Lock() |
| if o.enableContext != nil && o.enableContextCancel != nil { |
| oltLogger.Info("This is an OLT reboot or a reconcile") |
| o.enableContextCancel() |
| rebootFlag = true |
| time.Sleep(1 * time.Second) |
| } |
| o.enableContext, o.enableContextCancel = context.WithCancel(context.TODO()) |
| o.Unlock() |
| |
| wg := sync.WaitGroup{} |
| |
| o.OpenoltStream = stream |
| |
| // create Go routine to process all OLT events |
| wg.Add(1) |
| go o.processOltMessages(o.enableContext, stream, &wg) |
| |
| // enable the OLT |
| oltMsg := types.Message{ |
| Type: types.OltIndication, |
| Data: types.OltIndicationMessage{ |
| OperState: types.UP, |
| }, |
| } |
| o.channel <- oltMsg |
| |
| // send NNI Port Indications |
| for _, nni := range o.Nnis { |
| msg := types.Message{ |
| Type: types.NniIndication, |
| Data: types.NniIndicationMessage{ |
| OperState: types.UP, |
| NniPortID: nni.ID, |
| }, |
| } |
| o.channel <- msg |
| } |
| |
| if rebootFlag { |
| for _, pon := range o.Pons { |
| if pon.InternalState.Current() == "disabled" { |
| msg := types.Message{ |
| Type: types.PonIndication, |
| Data: types.PonIndicationMessage{ |
| OperState: types.UP, |
| PonPortID: pon.ID, |
| }, |
| } |
| o.channel <- msg |
| } |
| // when the enableContext was canceled the ONUs stopped listening on the channel |
| for _, onu := range pon.Onus { |
| go onu.ProcessOnuMessages(o.enableContext, stream, nil) |
| |
| // update the stream on all the services |
| for _, uni := range onu.UniPorts { |
| uni.UpdateStream(stream) |
| } |
| } |
| } |
| } else { |
| |
| // 1. controlledActivation == Default: Send both PON and ONUs indications |
| // 2. controlledActivation == only-onu: that means only ONUs will be controlled activated, so auto send PON indications |
| |
| if o.ControlledActivation == Default || o.ControlledActivation == OnlyONU { |
| // send PON Port indications |
| for _, pon := range o.Pons { |
| msg := types.Message{ |
| Type: types.PonIndication, |
| Data: types.PonIndicationMessage{ |
| OperState: types.UP, |
| PonPortID: pon.ID, |
| }, |
| } |
| o.channel <- msg |
| } |
| } |
| } |
| |
| if !o.enablePerf { |
| // Start a go routine to send periodic port stats to openolt adapter |
| wg.Add(1) |
| go o.periodicPortStats(o.enableContext, &wg, stream) |
| } |
| |
| wg.Wait() |
| oltLogger.WithFields(log.Fields{ |
| "stream": stream, |
| }).Debug("OpenOLT Stream closed") |
| } |
| |
| func (o *OltDevice) periodicPortStats(ctx context.Context, wg *sync.WaitGroup, stream openolt.Openolt_EnableIndicationServer) { |
| var portStats *openolt.PortStatistics |
| |
| loop: |
| for { |
| select { |
| case <-time.After(time.Duration(o.PortStatsInterval) * time.Second): |
| // send NNI port stats |
| for _, port := range o.Nnis { |
| incrementStat := true |
| if port.OperState.Current() == "down" { |
| incrementStat = false |
| } |
| portStats, port.PacketCount = getPortStats(port.PacketCount, incrementStat) |
| o.sendPortStatsIndication(portStats, port.ID, port.Type, stream) |
| } |
| |
| // send PON port stats |
| for _, port := range o.Pons { |
| incrementStat := true |
| // do not increment port stats if PON port is down or no ONU is activated on PON port |
| if port.OperState.Current() == "down" || port.GetNumOfActiveOnus() < 1 { |
| incrementStat = false |
| } |
| portStats, port.PacketCount = getPortStats(port.PacketCount, incrementStat) |
| o.sendPortStatsIndication(portStats, port.ID, port.Type, stream) |
| } |
| case <-ctx.Done(): |
| oltLogger.Debug("Stop sending port stats") |
| break loop |
| } |
| } |
| wg.Done() |
| } |
| |
| // Helpers method |
| |
| func (o *OltDevice) SetAlarm(interfaceId uint32, interfaceType string, alarmStatus string) error { |
| |
| switch interfaceType { |
| case "nni": |
| if !o.HasNni(interfaceId) { |
| return status.Errorf(codes.NotFound, strconv.Itoa(int(interfaceId))+" NNI not present in olt") |
| } |
| |
| case "pon": |
| if !o.HasPon(interfaceId) { |
| return status.Errorf(codes.NotFound, strconv.Itoa(int(interfaceId))+" PON not present in olt") |
| } |
| } |
| |
| alarmIndication := &openolt.AlarmIndication{ |
| Data: &openolt.AlarmIndication_LosInd{LosInd: &openolt.LosIndication{ |
| Status: alarmStatus, |
| IntfId: InterfaceIDToPortNo(interfaceId, interfaceType), |
| }}, |
| } |
| |
| msg := types.Message{ |
| Type: types.AlarmIndication, |
| Data: alarmIndication, |
| } |
| |
| o.channel <- msg |
| |
| return nil |
| } |
| |
| func (o *OltDevice) HasNni(id uint32) bool { |
| for _, intf := range o.Nnis { |
| if intf.ID == id { |
| return true |
| } |
| } |
| return false |
| } |
| |
| func (o *OltDevice) HasPon(id uint32) bool { |
| for _, intf := range o.Pons { |
| if intf.ID == id { |
| return true |
| } |
| } |
| return false |
| } |
| |
| func (o *OltDevice) GetPonById(id uint32) (*PonPort, error) { |
| for _, pon := range o.Pons { |
| if pon.ID == id { |
| return pon, nil |
| } |
| } |
| return nil, fmt.Errorf("Cannot find PonPort with id %d in OLT %d", id, o.ID) |
| } |
| |
| func (o *OltDevice) getNniById(id uint32) (*NniPort, error) { |
| for _, nni := range o.Nnis { |
| if nni.ID == id { |
| return nni, nil |
| } |
| } |
| return nil, fmt.Errorf("Cannot find NniPort with id %d in OLT %d", id, o.ID) |
| } |
| |
| func (o *OltDevice) sendAlarmIndication(alarmInd *openolt.AlarmIndication, stream openolt.Openolt_EnableIndicationServer) { |
| data := &openolt.Indication_AlarmInd{AlarmInd: alarmInd} |
| if err := stream.Send(&openolt.Indication{Data: data}); err != nil { |
| oltLogger.Errorf("Failed to send Alarm Indication: %v", err) |
| return |
| } |
| |
| oltLogger.WithFields(log.Fields{ |
| "AlarmIndication": alarmInd, |
| }).Debug("Sent Indication_AlarmInd") |
| } |
| |
| func (o *OltDevice) sendOltIndication(msg types.OltIndicationMessage, stream openolt.Openolt_EnableIndicationServer) { |
| data := &openolt.Indication_OltInd{OltInd: &openolt.OltIndication{OperState: msg.OperState.String()}} |
| if err := stream.Send(&openolt.Indication{Data: data}); err != nil { |
| oltLogger.Errorf("Failed to send Indication_OltInd: %v", err) |
| return |
| } |
| |
| oltLogger.WithFields(log.Fields{ |
| "OperState": msg.OperState, |
| }).Debug("Sent Indication_OltInd") |
| } |
| |
| func (o *OltDevice) sendNniIndication(msg types.NniIndicationMessage, stream openolt.Openolt_EnableIndicationServer) { |
| nni, _ := o.getNniById(msg.NniPortID) |
| if msg.OperState == types.UP { |
| if err := nni.OperState.Event("enable"); err != nil { |
| log.WithFields(log.Fields{ |
| "Type": nni.Type, |
| "IntfId": nni.ID, |
| "OperState": nni.OperState.Current(), |
| }).Errorf("Can't move NNI Port to enabled state: %v", err) |
| } |
| } else if msg.OperState == types.DOWN { |
| if err := nni.OperState.Event("disable"); err != nil { |
| log.WithFields(log.Fields{ |
| "Type": nni.Type, |
| "IntfId": nni.ID, |
| "OperState": nni.OperState.Current(), |
| }).Errorf("Can't move NNI Port to disable state: %v", err) |
| } |
| } |
| // NOTE Operstate may need to be an integer |
| operData := &openolt.Indication_IntfOperInd{IntfOperInd: &openolt.IntfOperIndication{ |
| Type: nni.Type, |
| IntfId: nni.ID, |
| OperState: nni.OperState.Current(), |
| }} |
| |
| if err := stream.Send(&openolt.Indication{Data: operData}); err != nil { |
| oltLogger.Errorf("Failed to send Indication_IntfOperInd for NNI: %v", err) |
| return |
| } |
| |
| oltLogger.WithFields(log.Fields{ |
| "Type": nni.Type, |
| "IntfId": nni.ID, |
| "OperState": nni.OperState.Current(), |
| }).Debug("Sent Indication_IntfOperInd for NNI") |
| } |
| |
| func (o *OltDevice) sendPonIndication(ponPortID uint32) { |
| |
| stream := o.OpenoltStream |
| pon, _ := o.GetPonById(ponPortID) |
| // Send IntfIndication for PON port |
| discoverData := &openolt.Indication_IntfInd{IntfInd: &openolt.IntfIndication{ |
| IntfId: pon.ID, |
| OperState: pon.OperState.Current(), |
| }} |
| |
| if err := stream.Send(&openolt.Indication{Data: discoverData}); err != nil { |
| oltLogger.Errorf("Failed to send Indication_IntfInd: %v", err) |
| return |
| } |
| |
| oltLogger.WithFields(log.Fields{ |
| "IntfId": pon.ID, |
| "OperState": pon.OperState.Current(), |
| }).Debug("Sent Indication_IntfInd for PON") |
| |
| // Send IntfOperIndication for PON port |
| operData := &openolt.Indication_IntfOperInd{IntfOperInd: &openolt.IntfOperIndication{ |
| Type: pon.Type, |
| IntfId: pon.ID, |
| OperState: pon.OperState.Current(), |
| }} |
| |
| if err := stream.Send(&openolt.Indication{Data: operData}); err != nil { |
| oltLogger.Errorf("Failed to send Indication_IntfOperInd for PON: %v", err) |
| return |
| } |
| |
| oltLogger.WithFields(log.Fields{ |
| "Type": pon.Type, |
| "IntfId": pon.ID, |
| "OperState": pon.OperState.Current(), |
| }).Debug("Sent Indication_IntfOperInd for PON") |
| } |
| |
| func (o *OltDevice) sendPortStatsIndication(stats *openolt.PortStatistics, portID uint32, portType string, stream openolt.Openolt_EnableIndicationServer) { |
| if o.InternalState.Current() == "enabled" { |
| oltLogger.WithFields(log.Fields{ |
| "Type": portType, |
| "IntfId": portID, |
| }).Trace("Sending port stats") |
| stats.IntfId = InterfaceIDToPortNo(portID, portType) |
| data := &openolt.Indication_PortStats{ |
| PortStats: stats, |
| } |
| |
| if err := stream.Send(&openolt.Indication{Data: data}); err != nil { |
| oltLogger.Errorf("Failed to send PortStats: %v", err) |
| return |
| } |
| } |
| } |
| |
| // processOltMessages handles messages received over the OpenOLT interface |
| func (o *OltDevice) processOltMessages(ctx context.Context, stream types.Stream, wg *sync.WaitGroup) { |
| oltLogger.WithFields(log.Fields{ |
| "stream": stream, |
| }).Debug("Starting OLT Indication Channel") |
| ch := o.channel |
| |
| loop: |
| for { |
| select { |
| case <-ctx.Done(): |
| oltLogger.Debug("OLT Indication processing canceled via context") |
| break loop |
| // do not terminate this loop if the stream is closed, |
| // when we restart the gRPC server it will automatically reconnect and we need this loop to send indications |
| //case <-stream.Context().Done(): |
| // oltLogger.Debug("OLT Indication processing canceled via stream context") |
| // break loop |
| case message, ok := <-ch: |
| if !ok { |
| if ctx.Err() != nil { |
| oltLogger.WithField("err", ctx.Err()).Error("OLT EnableContext error") |
| } |
| oltLogger.Warn("OLT Indication processing canceled via closed channel") |
| break loop |
| } |
| |
| oltLogger.WithFields(log.Fields{ |
| "oltId": o.ID, |
| "messageType": message.Type, |
| }).Trace("Received message") |
| |
| switch message.Type { |
| case types.OltIndication: |
| msg, _ := message.Data.(types.OltIndicationMessage) |
| if msg.OperState == types.UP { |
| _ = o.InternalState.Event("enable") |
| _ = o.OperState.Event("enable") |
| } else if msg.OperState == types.DOWN { |
| _ = o.InternalState.Event("disable") |
| _ = o.OperState.Event("disable") |
| } |
| o.sendOltIndication(msg, stream) |
| case types.AlarmIndication: |
| alarmInd, _ := message.Data.(*openolt.AlarmIndication) |
| o.sendAlarmIndication(alarmInd, stream) |
| case types.NniIndication: |
| msg, _ := message.Data.(types.NniIndicationMessage) |
| o.sendNniIndication(msg, stream) |
| case types.PonIndication: |
| msg, _ := message.Data.(types.PonIndicationMessage) |
| pon, _ := o.GetPonById(msg.PonPortID) |
| if msg.OperState == types.UP { |
| if err := pon.OperState.Event("enable"); err != nil { |
| oltLogger.WithFields(log.Fields{ |
| "IntfId": msg.PonPortID, |
| "Err": err, |
| }).Error("Can't Enable Oper state for PON Port") |
| } |
| if err := pon.InternalState.Event("enable"); err != nil { |
| oltLogger.WithFields(log.Fields{ |
| "IntfId": msg.PonPortID, |
| "Err": err, |
| }).Error("Can't Enable Internal state for PON Port") |
| } |
| } else if msg.OperState == types.DOWN { |
| if err := pon.OperState.Event("disable"); err != nil { |
| oltLogger.WithFields(log.Fields{ |
| "IntfId": msg.PonPortID, |
| "Err": err, |
| }).Error("Can't Disable Oper state for PON Port") |
| } |
| if err := pon.InternalState.Event("disable"); err != nil { |
| oltLogger.WithFields(log.Fields{ |
| "IntfId": msg.PonPortID, |
| "Err": err, |
| }).Error("Can't Disable Internal state for PON Port") |
| } |
| } |
| default: |
| oltLogger.Warnf("Received unknown message data %v for type %v in OLT Channel", message.Data, message.Type) |
| } |
| } |
| } |
| wg.Done() |
| oltLogger.WithFields(log.Fields{ |
| "stream": stream, |
| }).Warn("Stopped handling OLT Indication Channel") |
| } |
| |
| // returns an ONU with a given Serial Number |
| func (o *OltDevice) FindOnuBySn(serialNumber string) (*Onu, error) { |
| // NOTE this function can be a performance bottleneck when we have many ONUs, |
| // memoizing it will remove the bottleneck |
| for _, pon := range o.Pons { |
| for _, onu := range pon.Onus { |
| if onu.Sn() == serialNumber { |
| return onu, nil |
| } |
| } |
| } |
| |
| return &Onu{}, fmt.Errorf("cannot-find-onu-by-serial-number-%s", serialNumber) |
| } |
| |
| // returns an ONU with a given interface/Onu Id |
| func (o *OltDevice) FindOnuById(intfId uint32, onuId uint32) (*Onu, error) { |
| // NOTE this function can be a performance bottleneck when we have many ONUs, |
| // memoizing it will remove the bottleneck |
| for _, pon := range o.Pons { |
| if pon.ID == intfId { |
| for _, onu := range pon.Onus { |
| if onu.ID == onuId { |
| return onu, nil |
| } |
| } |
| } |
| } |
| return &Onu{}, fmt.Errorf("cannot-find-onu-by-id-%v-%v", intfId, onuId) |
| } |
| |
| // returns a Service with a given Mac Address |
| func (o *OltDevice) FindServiceByMacAddress(mac net.HardwareAddr) (ServiceIf, error) { |
| // NOTE this function can be a performance bottleneck when we have many ONUs, |
| // memoizing it will remove the bottleneck |
| for _, pon := range o.Pons { |
| for _, onu := range pon.Onus { |
| s, err := onu.findServiceByMacAddress(mac) |
| if err == nil { |
| return s, nil |
| } |
| } |
| } |
| |
| return nil, fmt.Errorf("cannot-find-service-by-mac-address-%s", mac) |
| } |
| |
| // GRPC Endpoints |
| |
| func (o *OltDevice) ActivateOnu(context context.Context, onu *openolt.Onu) (*openolt.Empty, error) { |
| |
| pon, _ := o.GetPonById(onu.IntfId) |
| |
| // Enable the resource maps for this ONU |
| olt.AllocIDs[onu.IntfId][onu.OnuId] = make(map[uint32]map[int32]map[uint64]bool) |
| olt.GemPortIDs[onu.IntfId][onu.OnuId] = make(map[uint32]map[int32]map[uint64]bool) |
| |
| _onu, _ := pon.GetOnuBySn(onu.SerialNumber) |
| |
| publishEvent("ONU-activate-indication-received", int32(onu.IntfId), int32(onu.OnuId), _onu.Sn()) |
| oltLogger.WithFields(log.Fields{ |
| "OnuSn": _onu.Sn(), |
| }).Info("Received ActivateOnu call from VOLTHA") |
| |
| _onu.SetID(onu.OnuId) |
| |
| if err := _onu.InternalState.Event(OnuTxEnable); err != nil { |
| oltLogger.WithFields(log.Fields{ |
| "IntfId": _onu.PonPortID, |
| "OnuSn": _onu.Sn(), |
| "OnuId": _onu.ID, |
| }).Infof("Failed to transition ONU to %s state: %s", OnuStateEnabled, err.Error()) |
| } |
| |
| // NOTE we need to immediately activate the ONU or the OMCI state machine won't start |
| |
| return new(openolt.Empty), nil |
| } |
| |
| func (o *OltDevice) DeactivateOnu(_ context.Context, onu *openolt.Onu) (*openolt.Empty, error) { |
| oltLogger.Error("DeactivateOnu not implemented") |
| return new(openolt.Empty), nil |
| } |
| |
| func (o *OltDevice) DeleteOnu(_ context.Context, onu *openolt.Onu) (*openolt.Empty, error) { |
| oltLogger.WithFields(log.Fields{ |
| "IntfId": onu.IntfId, |
| "OnuId": onu.OnuId, |
| }).Info("Received DeleteOnu call from VOLTHA") |
| |
| pon, err := o.GetPonById(onu.IntfId) |
| if err != nil { |
| oltLogger.WithFields(log.Fields{ |
| "OnuId": onu.OnuId, |
| "IntfId": onu.IntfId, |
| "err": err, |
| }).Error("Can't find PonPort") |
| } |
| _onu, err := pon.GetOnuById(onu.OnuId) |
| if err != nil { |
| oltLogger.WithFields(log.Fields{ |
| "OnuId": onu.OnuId, |
| "IntfId": onu.IntfId, |
| "err": err, |
| }).Error("Can't find Onu") |
| } |
| |
| if err := _onu.InternalState.Event(OnuTxDisable); err != nil { |
| oltLogger.WithFields(log.Fields{ |
| "IntfId": _onu.PonPortID, |
| "OnuSn": _onu.Sn(), |
| "OnuId": _onu.ID, |
| }).Infof("Failed to transition ONU to %s state: %s", OnuStateDisabled, err.Error()) |
| } |
| |
| // ONU Re-Discovery |
| if o.InternalState.Current() == "enabled" && pon.InternalState.Current() == "enabled" { |
| go _onu.ReDiscoverOnu() |
| } |
| |
| return new(openolt.Empty), nil |
| } |
| |
| func (o *OltDevice) DisableOlt(context.Context, *openolt.Empty) (*openolt.Empty, error) { |
| // NOTE when we disable the OLT should we disable NNI, PONs and ONUs altogether? |
| oltLogger.WithFields(log.Fields{ |
| "oltId": o.ID, |
| }).Info("Disabling OLT") |
| publishEvent("OLT-disable-received", -1, -1, "") |
| |
| for _, pon := range o.Pons { |
| if pon.InternalState.Current() == "enabled" { |
| // disable PONs |
| msg := types.Message{ |
| Type: types.PonIndication, |
| Data: types.PonIndicationMessage{ |
| OperState: types.DOWN, |
| PonPortID: pon.ID, |
| }, |
| } |
| o.channel <- msg |
| } |
| } |
| |
| // Note that we are not disabling the NNI as the real OLT does not. |
| // The reason for that is in-band management |
| |
| // disable OLT |
| oltMsg := types.Message{ |
| Type: types.OltIndication, |
| Data: types.OltIndicationMessage{ |
| OperState: types.DOWN, |
| }, |
| } |
| o.channel <- oltMsg |
| |
| return new(openolt.Empty), nil |
| } |
| |
| func (o *OltDevice) DisablePonIf(_ context.Context, intf *openolt.Interface) (*openolt.Empty, error) { |
| oltLogger.Infof("DisablePonIf request received for PON %d", intf.IntfId) |
| ponID := intf.GetIntfId() |
| pon, _ := o.GetPonById(intf.IntfId) |
| |
| msg := types.Message{ |
| Type: types.PonIndication, |
| Data: types.PonIndicationMessage{ |
| OperState: types.DOWN, |
| PonPortID: ponID, |
| }, |
| } |
| o.channel <- msg |
| |
| for _, onu := range pon.Onus { |
| |
| onuIndication := types.OnuIndicationMessage{ |
| OperState: types.DOWN, |
| PonPortID: ponID, |
| OnuID: onu.ID, |
| OnuSN: onu.SerialNumber, |
| } |
| onu.sendOnuIndication(onuIndication, o.OpenoltStream) |
| |
| } |
| |
| return new(openolt.Empty), nil |
| } |
| |
| func (o *OltDevice) EnableIndication(_ *openolt.Empty, stream openolt.Openolt_EnableIndicationServer) error { |
| oltLogger.WithField("oltId", o.ID).Info("OLT receives EnableIndication call from VOLTHA") |
| publishEvent("OLT-enable-received", -1, -1, "") |
| o.Enable(stream) |
| return nil |
| } |
| |
| func (o *OltDevice) EnablePonIf(_ context.Context, intf *openolt.Interface) (*openolt.Empty, error) { |
| oltLogger.Infof("EnablePonIf request received for PON %d", intf.IntfId) |
| ponID := intf.GetIntfId() |
| pon, _ := o.GetPonById(intf.IntfId) |
| |
| msg := types.Message{ |
| Type: types.PonIndication, |
| Data: types.PonIndicationMessage{ |
| OperState: types.UP, |
| PonPortID: ponID, |
| }, |
| } |
| o.channel <- msg |
| |
| for _, onu := range pon.Onus { |
| |
| onuIndication := types.OnuIndicationMessage{ |
| OperState: types.UP, |
| PonPortID: ponID, |
| OnuID: onu.ID, |
| OnuSN: onu.SerialNumber, |
| } |
| onu.sendOnuIndication(onuIndication, o.OpenoltStream) |
| |
| } |
| |
| return new(openolt.Empty), nil |
| } |
| |
| func (o *OltDevice) FlowAdd(ctx context.Context, flow *openolt.Flow) (*openolt.Empty, error) { |
| oltLogger.WithFields(log.Fields{ |
| "IntfId": flow.AccessIntfId, |
| "OnuId": flow.OnuId, |
| "EthType": fmt.Sprintf("%x", flow.Classifier.EthType), |
| "InnerVlan": flow.Classifier.IVid, |
| "OuterVlan": flow.Classifier.OVid, |
| "FlowType": flow.FlowType, |
| "FlowId": flow.FlowId, |
| "UniID": flow.UniId, |
| "PortNo": flow.PortNo, |
| }).Tracef("OLT receives FlowAdd") |
| |
| if !o.enablePerf { |
| o.Flows.Store(flow.FlowId, *flow) |
| } |
| |
| if flow.AccessIntfId == -1 { |
| oltLogger.WithFields(log.Fields{ |
| "FlowId": flow.FlowId, |
| }).Debug("Adding OLT flow") |
| } else if flow.FlowType == "multicast" { |
| oltLogger.WithFields(log.Fields{ |
| "Cookie": flow.Cookie, |
| "DstPort": flow.Classifier.DstPort, |
| "EthType": fmt.Sprintf("%x", flow.Classifier.EthType), |
| "FlowId": flow.FlowId, |
| "FlowType": flow.FlowType, |
| "GemportId": flow.GemportId, |
| "InnerVlan": flow.Classifier.IVid, |
| "IntfId": flow.AccessIntfId, |
| "IpProto": flow.Classifier.IpProto, |
| "OnuId": flow.OnuId, |
| "OuterVlan": flow.Classifier.OVid, |
| "PortNo": flow.PortNo, |
| "SrcPort": flow.Classifier.SrcPort, |
| "UniID": flow.UniId, |
| "ClassifierOPbits": flow.Classifier.OPbits, |
| }).Debug("Adding OLT multicast flow") |
| } else { |
| pon, err := o.GetPonById(uint32(flow.AccessIntfId)) |
| if err != nil { |
| oltLogger.WithFields(log.Fields{ |
| "OnuId": flow.OnuId, |
| "IntfId": flow.AccessIntfId, |
| "err": err, |
| }).Error("Can't find PonPort") |
| } |
| onu, err := pon.GetOnuById(uint32(flow.OnuId)) |
| if err != nil { |
| oltLogger.WithFields(log.Fields{ |
| "OnuId": flow.OnuId, |
| "IntfId": flow.AccessIntfId, |
| "err": err, |
| }).Error("Can't find Onu") |
| return nil, err |
| } |
| |
| // if the ONU is disabled reject the flow |
| // as per VOL-4061 there is a small window during which the ONU is disabled |
| // but the port has not been reported as down to ONOS |
| if onu.InternalState.Is(OnuStatePonDisabled) || onu.InternalState.Is(OnuStateDisabled) { |
| oltLogger.WithFields(log.Fields{ |
| "OnuId": flow.OnuId, |
| "IntfId": flow.AccessIntfId, |
| "Flow": flow, |
| "SerialNumber": onu.Sn(), |
| "InternalState": onu.InternalState.Current(), |
| }).Error("rejected-flow-because-of-onu-state") |
| return nil, fmt.Errorf("onu-%s-is-currently-%s", onu.Sn(), onu.InternalState.Current()) |
| } |
| |
| if !o.enablePerf { |
| // Generate event on first flow for ONU |
| if len(onu.FlowIds) == 0 { |
| publishEvent("Flow-add-received", int32(onu.PonPortID), int32(onu.ID), onu.Sn()) |
| } |
| } |
| |
| // validate that the flow reference correct IDs (Alloc, Gem) |
| if err := o.validateFlow(flow); err != nil { |
| oltLogger.WithFields(log.Fields{ |
| "OnuId": flow.OnuId, |
| "IntfId": flow.AccessIntfId, |
| "Flow": flow, |
| "SerialNumber": onu.Sn(), |
| "err": err, |
| }).Error("invalid-flow-for-onu") |
| return nil, err |
| } |
| |
| o.storeGemPortIdByFlow(flow) |
| o.storeAllocId(flow) |
| |
| msg := types.Message{ |
| Type: types.FlowAdd, |
| Data: types.OnuFlowUpdateMessage{ |
| PonPortID: pon.ID, |
| OnuID: onu.ID, |
| Flow: flow, |
| }, |
| } |
| onu.Channel <- msg |
| } |
| |
| return new(openolt.Empty), nil |
| } |
| |
| // FlowRemove request from VOLTHA |
| func (o *OltDevice) FlowRemove(_ context.Context, flow *openolt.Flow) (*openolt.Empty, error) { |
| |
| oltLogger.WithFields(log.Fields{ |
| "AllocId": flow.AllocId, |
| "Cookie": flow.Cookie, |
| "FlowId": flow.FlowId, |
| "FlowType": flow.FlowType, |
| "GemportId": flow.GemportId, |
| "IntfId": flow.AccessIntfId, |
| "OnuId": flow.OnuId, |
| "PortNo": flow.PortNo, |
| "UniID": flow.UniId, |
| "ReplicateFlow": flow.ReplicateFlow, |
| "PbitToGemport": flow.PbitToGemport, |
| }).Debug("OLT receives FlowRemove") |
| |
| olt.freeGemPortId(flow) |
| olt.freeAllocId(flow) |
| |
| if !o.enablePerf { // remove only if flow were stored |
| // Check if flow exists |
| storedFlowIntf, ok := o.Flows.Load(flow.FlowId) |
| if !ok { |
| oltLogger.Errorf("Flow %v not found", flow) |
| return new(openolt.Empty), status.Errorf(codes.NotFound, "Flow not found") |
| } |
| |
| storedFlow := storedFlowIntf.(openolt.Flow) |
| |
| // if its ONU flow remove it from ONU also |
| if storedFlow.AccessIntfId != -1 { |
| pon, err := o.GetPonById(uint32(storedFlow.AccessIntfId)) |
| if err != nil { |
| oltLogger.WithFields(log.Fields{ |
| "OnuId": storedFlow.OnuId, |
| "IntfId": storedFlow.AccessIntfId, |
| "PONs": olt.Pons, |
| "err": err, |
| }).Error("PON-port-not-found") |
| return new(openolt.Empty), nil |
| } |
| onu, err := pon.GetOnuById(uint32(storedFlow.OnuId)) |
| if err != nil { |
| oltLogger.WithFields(log.Fields{ |
| "OnuId": storedFlow.OnuId, |
| "IntfId": storedFlow.AccessIntfId, |
| "err": err, |
| }).Error("ONU-not-found") |
| return new(openolt.Empty), nil |
| } |
| publishEvent("Flow-remove-received", int32(onu.PonPortID), int32(onu.ID), onu.Sn()) |
| } |
| |
| // delete from olt flows |
| o.Flows.Delete(flow.FlowId) |
| } |
| |
| if flow.AccessIntfId == -1 { |
| oltLogger.WithFields(log.Fields{ |
| "FlowId": flow.FlowId, |
| }).Debug("Removing OLT flow") |
| } else if flow.FlowType == "multicast" { |
| oltLogger.WithFields(log.Fields{ |
| "FlowId": flow.FlowId, |
| }).Debug("Removing OLT multicast flow") |
| } else { |
| |
| onu, err := o.GetOnuByFlowId(flow.FlowId) |
| if err != nil { |
| oltLogger.WithFields(log.Fields{ |
| "OnuId": flow.OnuId, |
| "IntfId": flow.AccessIntfId, |
| "err": err, |
| }).Error("Can't find Onu") |
| return nil, err |
| } |
| |
| msg := types.Message{ |
| Type: types.FlowRemoved, |
| Data: types.OnuFlowUpdateMessage{ |
| Flow: flow, |
| }, |
| } |
| onu.Channel <- msg |
| } |
| |
| return new(openolt.Empty), nil |
| } |
| |
| func (o *OltDevice) HeartbeatCheck(context.Context, *openolt.Empty) (*openolt.Heartbeat, error) { |
| res := openolt.Heartbeat{HeartbeatSignature: uint32(time.Now().Unix())} |
| oltLogger.WithFields(log.Fields{ |
| "signature": res.HeartbeatSignature, |
| }).Trace("HeartbeatCheck") |
| return &res, nil |
| } |
| |
| func (o *OltDevice) GetOnuByFlowId(flowId uint64) (*Onu, error) { |
| for _, pon := range o.Pons { |
| for _, onu := range pon.Onus { |
| for _, fId := range onu.FlowIds { |
| if fId == flowId { |
| return onu, nil |
| } |
| } |
| } |
| } |
| return nil, fmt.Errorf("Cannot find Onu by flowId %d", flowId) |
| } |
| |
| func (o *OltDevice) GetDeviceInfo(context.Context, *openolt.Empty) (*openolt.DeviceInfo, error) { |
| |
| intfIDs := []uint32{} |
| for i := 0; i < o.NumPon; i++ { |
| intfIDs = append(intfIDs, uint32(i)) |
| } |
| |
| devinfo := &openolt.DeviceInfo{ |
| Vendor: common.Config.Olt.Vendor, |
| Model: common.Config.Olt.Model, |
| HardwareVersion: common.Config.Olt.HardwareVersion, |
| FirmwareVersion: common.Config.Olt.FirmwareVersion, |
| Technology: common.Config.Olt.Technology, |
| PonPorts: uint32(o.NumPon), |
| OnuIdStart: onuIdStart, |
| OnuIdEnd: onuIdEnd, |
| AllocIdStart: allocIdStart, |
| AllocIdEnd: allocIdEnd, |
| GemportIdStart: gemportIdStart, |
| GemportIdEnd: gemportIdEnd, |
| FlowIdStart: flowIdStart, |
| FlowIdEnd: flowIdEnd, |
| DeviceSerialNumber: o.SerialNumber, |
| DeviceId: common.Config.Olt.DeviceId, |
| PreviouslyConnected: o.PreviouslyConnected, |
| Ranges: []*openolt.DeviceInfo_DeviceResourceRanges{ |
| { |
| IntfIds: intfIDs, |
| Technology: common.Config.Olt.Technology, |
| Pools: []*openolt.DeviceInfo_DeviceResourceRanges_Pool{ |
| { |
| Type: openolt.DeviceInfo_DeviceResourceRanges_Pool_ONU_ID, |
| Sharing: openolt.DeviceInfo_DeviceResourceRanges_Pool_DEDICATED_PER_INTF, |
| Start: onuIdStart, |
| End: onuIdEnd, |
| }, |
| { |
| Type: openolt.DeviceInfo_DeviceResourceRanges_Pool_ALLOC_ID, |
| Sharing: openolt.DeviceInfo_DeviceResourceRanges_Pool_DEDICATED_PER_INTF, |
| Start: allocIdStart, |
| End: allocIdEnd, |
| }, |
| { |
| Type: openolt.DeviceInfo_DeviceResourceRanges_Pool_GEMPORT_ID, |
| Sharing: openolt.DeviceInfo_DeviceResourceRanges_Pool_DEDICATED_PER_INTF, |
| Start: gemportIdStart, |
| End: gemportIdEnd, |
| }, |
| { |
| Type: openolt.DeviceInfo_DeviceResourceRanges_Pool_FLOW_ID, |
| Sharing: openolt.DeviceInfo_DeviceResourceRanges_Pool_SHARED_BY_ALL_INTF_ALL_TECH, |
| Start: flowIdStart, |
| End: flowIdEnd, |
| }, |
| }, |
| }, |
| }, |
| } |
| |
| oltLogger.WithFields(log.Fields{ |
| "Vendor": devinfo.Vendor, |
| "Model": devinfo.Model, |
| "HardwareVersion": devinfo.HardwareVersion, |
| "FirmwareVersion": devinfo.FirmwareVersion, |
| "Technology": devinfo.Technology, |
| "PonPorts": devinfo.PonPorts, |
| "OnuIdStart": devinfo.OnuIdStart, |
| "OnuIdEnd": devinfo.OnuIdEnd, |
| "AllocIdStart": devinfo.AllocIdStart, |
| "AllocIdEnd": devinfo.AllocIdEnd, |
| "GemportIdStart": devinfo.GemportIdStart, |
| "GemportIdEnd": devinfo.GemportIdEnd, |
| "FlowIdStart": devinfo.FlowIdStart, |
| "FlowIdEnd": devinfo.FlowIdEnd, |
| "DeviceSerialNumber": devinfo.DeviceSerialNumber, |
| "DeviceId": devinfo.DeviceId, |
| "PreviouslyConnected": devinfo.PreviouslyConnected, |
| }).Info("OLT receives GetDeviceInfo call from VOLTHA") |
| |
| // once we connect, set the flag |
| o.PreviouslyConnected = true |
| |
| return devinfo, nil |
| } |
| |
| func (o *OltDevice) OmciMsgOut(ctx context.Context, omci_msg *openolt.OmciMsg) (*openolt.Empty, error) { |
| pon, err := o.GetPonById(omci_msg.IntfId) |
| if err != nil { |
| oltLogger.WithFields(log.Fields{ |
| "error": err, |
| "onu_id": omci_msg.OnuId, |
| "pon_id": omci_msg.IntfId, |
| }).Error("pon ID not found") |
| return nil, err |
| } |
| |
| onu, err := pon.GetOnuById(omci_msg.OnuId) |
| if err != nil { |
| oltLogger.WithFields(log.Fields{ |
| "error": err, |
| "onu_id": omci_msg.OnuId, |
| "pon_id": omci_msg.IntfId, |
| }).Error("onu ID not found") |
| return nil, err |
| } |
| |
| oltLogger.WithFields(log.Fields{ |
| "IntfId": onu.PonPortID, |
| "OnuId": onu.ID, |
| "OnuSn": onu.Sn(), |
| }).Tracef("Received OmciMsgOut") |
| omciPkt, omciMsg, err := omcilib.ParseOpenOltOmciPacket(omci_msg.Pkt) |
| if err != nil { |
| log.WithFields(log.Fields{ |
| "IntfId": onu.PonPortID, |
| "SerialNumber": onu.Sn(), |
| "omciPacket": omcilib.HexDecode(omci_msg.Pkt), |
| "err": err.Error(), |
| }).Error("cannot-parse-OMCI-packet") |
| return nil, fmt.Errorf("olt-received-malformed-omci-packet") |
| } |
| if onu.InternalState.Current() == OnuStateDisabled { |
| // if the ONU is disabled just drop the message |
| log.WithFields(log.Fields{ |
| "IntfId": onu.PonPortID, |
| "SerialNumber": onu.Sn(), |
| "omciBytes": hex.EncodeToString(omciPkt.Data()), |
| "omciPkt": omciPkt, |
| "omciMsgType": omciMsg.MessageType, |
| }).Warn("dropping-omci-message") |
| } else { |
| msg := types.Message{ |
| Type: types.OMCI, |
| Data: types.OmciMessage{ |
| OnuSN: onu.SerialNumber, |
| OnuID: onu.ID, |
| OmciMsg: omciMsg, |
| OmciPkt: omciPkt, |
| }, |
| } |
| onu.Channel <- msg |
| } |
| return new(openolt.Empty), nil |
| } |
| |
| // this gRPC methods receives packets from VOLTHA and sends them to the subscriber on the ONU |
| func (o *OltDevice) OnuPacketOut(ctx context.Context, onuPkt *openolt.OnuPacket) (*openolt.Empty, error) { |
| pon, err := o.GetPonById(onuPkt.IntfId) |
| if err != nil { |
| oltLogger.WithFields(log.Fields{ |
| "OnuId": onuPkt.OnuId, |
| "IntfId": onuPkt.IntfId, |
| "err": err, |
| }).Error("Can't find PonPort") |
| } |
| onu, err := pon.GetOnuById(onuPkt.OnuId) |
| if err != nil { |
| oltLogger.WithFields(log.Fields{ |
| "OnuId": onuPkt.OnuId, |
| "IntfId": onuPkt.IntfId, |
| "err": err, |
| }).Error("Can't find Onu") |
| } |
| |
| oltLogger.WithFields(log.Fields{ |
| "IntfId": onu.PonPortID, |
| "OnuId": onu.ID, |
| "OnuSn": onu.Sn(), |
| "Packet": hex.EncodeToString(onuPkt.Pkt), |
| }).Trace("Received OnuPacketOut") |
| |
| rawpkt := gopacket.NewPacket(onuPkt.Pkt, layers.LayerTypeEthernet, gopacket.Default) |
| |
| pktType, err := packetHandlers.GetPktType(rawpkt) |
| if err != nil { |
| onuLogger.WithFields(log.Fields{ |
| "IntfId": onu.PonPortID, |
| "OnuId": onu.ID, |
| "OnuSn": onu.Sn(), |
| "Pkt": hex.EncodeToString(rawpkt.Data()), |
| }).Error("Can't find pktType in packet, droppint it") |
| return new(openolt.Empty), nil |
| } |
| |
| pktMac, err := packetHandlers.GetDstMacAddressFromPacket(rawpkt) |
| if err != nil { |
| onuLogger.WithFields(log.Fields{ |
| "IntfId": onu.PonPortID, |
| "OnuId": onu.ID, |
| "OnuSn": onu.Sn(), |
| "Pkt": rawpkt.Data(), |
| }).Error("Can't find Dst MacAddress in packet, droppint it") |
| return new(openolt.Empty), nil |
| } |
| |
| msg := types.Message{ |
| Type: types.OnuPacketOut, |
| Data: types.OnuPacketMessage{ |
| IntfId: onuPkt.IntfId, |
| OnuId: onuPkt.OnuId, |
| PortNo: onuPkt.PortNo, |
| Packet: rawpkt, |
| Type: pktType, |
| MacAddress: pktMac, |
| }, |
| } |
| |
| onu.Channel <- msg |
| |
| return new(openolt.Empty), nil |
| } |
| |
| func (o *OltDevice) Reboot(context.Context, *openolt.Empty) (*openolt.Empty, error) { |
| |
| // OLT Reboot is called in two cases: |
| // - when an OLT is being removed (voltctl device disable -> voltctl device delete are called, then a new voltctl device create -> voltctl device enable will be issued) |
| // - when an OLT needs to be rebooted (voltcl device reboot) |
| |
| oltLogger.WithFields(log.Fields{ |
| "oltId": o.ID, |
| }).Info("Shutting down") |
| publishEvent("OLT-reboot-received", -1, -1, "") |
| go func() { _ = o.RestartOLT() }() |
| return new(openolt.Empty), nil |
| } |
| |
| func (o *OltDevice) ReenableOlt(context.Context, *openolt.Empty) (*openolt.Empty, error) { |
| oltLogger.WithFields(log.Fields{ |
| "oltId": o.ID, |
| }).Info("Received ReenableOlt request from VOLTHA") |
| publishEvent("OLT-reenable-received", -1, -1, "") |
| |
| // enable OLT |
| oltMsg := types.Message{ |
| Type: types.OltIndication, |
| Data: types.OltIndicationMessage{ |
| OperState: types.UP, |
| }, |
| } |
| o.channel <- oltMsg |
| |
| for _, pon := range o.Pons { |
| if pon.InternalState.Current() == "disabled" { |
| msg := types.Message{ |
| Type: types.PonIndication, |
| Data: types.PonIndicationMessage{ |
| OperState: types.UP, |
| PonPortID: pon.ID, |
| }, |
| } |
| o.channel <- msg |
| } |
| } |
| |
| return new(openolt.Empty), nil |
| } |
| |
| func (o *OltDevice) UplinkPacketOut(context context.Context, packet *openolt.UplinkPacket) (*openolt.Empty, error) { |
| pkt := gopacket.NewPacket(packet.Pkt, layers.LayerTypeEthernet, gopacket.Default) |
| |
| err := o.Nnis[0].handleNniPacket(pkt) // FIXME we are assuming we have only one NNI |
| |
| if err != nil { |
| return nil, err |
| } |
| return new(openolt.Empty), nil |
| } |
| |
| func (o *OltDevice) CollectStatistics(context.Context, *openolt.Empty) (*openolt.Empty, error) { |
| oltLogger.Error("CollectStatistics not implemented") |
| return new(openolt.Empty), nil |
| } |
| |
| func (o *OltDevice) GetOnuInfo(context context.Context, packet *openolt.Onu) (*openolt.OnuIndication, error) { |
| oltLogger.Error("GetOnuInfo not implemented") |
| return new(openolt.OnuIndication), nil |
| } |
| |
| func (o *OltDevice) GetPonIf(context context.Context, packet *openolt.Interface) (*openolt.IntfIndication, error) { |
| oltLogger.Error("GetPonIf not implemented") |
| return new(openolt.IntfIndication), nil |
| } |
| |
| func (s *OltDevice) CreateTrafficQueues(context.Context, *tech_profile.TrafficQueues) (*openolt.Empty, error) { |
| oltLogger.Info("received CreateTrafficQueues") |
| return new(openolt.Empty), nil |
| } |
| |
| func (s *OltDevice) RemoveTrafficQueues(_ context.Context, tq *tech_profile.TrafficQueues) (*openolt.Empty, error) { |
| oltLogger.WithFields(log.Fields{ |
| "OnuId": tq.OnuId, |
| "IntfId": tq.IntfId, |
| "OnuPortNo": tq.PortNo, |
| "UniId": tq.UniId, |
| }).Info("received RemoveTrafficQueues") |
| return new(openolt.Empty), nil |
| } |
| |
| func (s *OltDevice) CreateTrafficSchedulers(_ context.Context, trafficSchedulers *tech_profile.TrafficSchedulers) (*openolt.Empty, error) { |
| oltLogger.WithFields(log.Fields{ |
| "OnuId": trafficSchedulers.OnuId, |
| "IntfId": trafficSchedulers.IntfId, |
| "OnuPortNo": trafficSchedulers.PortNo, |
| "UniId": trafficSchedulers.UniId, |
| }).Info("received CreateTrafficSchedulers") |
| |
| if !s.enablePerf { |
| pon, err := s.GetPonById(trafficSchedulers.IntfId) |
| if err != nil { |
| oltLogger.Errorf("Error retrieving PON by IntfId: %v", err) |
| return new(openolt.Empty), err |
| } |
| onu, err := pon.GetOnuById(trafficSchedulers.OnuId) |
| if err != nil { |
| oltLogger.Errorf("Error retrieving ONU from pon by OnuId: %v", err) |
| return new(openolt.Empty), err |
| } |
| onu.TrafficSchedulers = trafficSchedulers |
| } |
| return new(openolt.Empty), nil |
| } |
| |
| func (s *OltDevice) RemoveTrafficSchedulers(context context.Context, trafficSchedulers *tech_profile.TrafficSchedulers) (*openolt.Empty, error) { |
| oltLogger.WithFields(log.Fields{ |
| "OnuId": trafficSchedulers.OnuId, |
| "IntfId": trafficSchedulers.IntfId, |
| "OnuPortNo": trafficSchedulers.PortNo, |
| }).Info("received RemoveTrafficSchedulers") |
| if !s.enablePerf { |
| pon, err := s.GetPonById(trafficSchedulers.IntfId) |
| if err != nil { |
| oltLogger.Errorf("Error retrieving PON by IntfId: %v", err) |
| return new(openolt.Empty), err |
| } |
| onu, err := pon.GetOnuById(trafficSchedulers.OnuId) |
| if err != nil { |
| oltLogger.Errorf("Error retrieving ONU from pon by OnuId: %v", err) |
| return new(openolt.Empty), err |
| } |
| |
| onu.TrafficSchedulers = nil |
| } |
| return new(openolt.Empty), nil |
| } |
| |
| func (o *OltDevice) PerformGroupOperation(ctx context.Context, group *openolt.Group) (*openolt.Empty, error) { |
| oltLogger.WithFields(log.Fields{ |
| "GroupId": group.GroupId, |
| "Command": group.Command, |
| "Members": group.Members, |
| "Action": group.Action, |
| }).Debug("received PerformGroupOperation") |
| return &openolt.Empty{}, nil |
| } |
| |
| func (o *OltDevice) DeleteGroup(ctx context.Context, group *openolt.Group) (*openolt.Empty, error) { |
| oltLogger.WithFields(log.Fields{ |
| "GroupId": group.GroupId, |
| "Command": group.Command, |
| "Members": group.Members, |
| "Action": group.Action, |
| }).Debug("received PerformGroupOperation") |
| return &openolt.Empty{}, nil |
| } |
| |
| func (o *OltDevice) GetExtValue(ctx context.Context, in *openolt.ValueParam) (*extension.ReturnValues, error) { |
| return &extension.ReturnValues{}, nil |
| } |
| |
| func (o *OltDevice) OnuItuPonAlarmSet(ctx context.Context, in *config.OnuItuPonAlarm) (*openolt.Empty, error) { |
| return &openolt.Empty{}, nil |
| } |
| |
| func (o *OltDevice) GetLogicalOnuDistanceZero(ctx context.Context, in *openolt.Onu) (*openolt.OnuLogicalDistance, error) { |
| return &openolt.OnuLogicalDistance{}, nil |
| } |
| |
| func (o *OltDevice) GetLogicalOnuDistance(ctx context.Context, in *openolt.Onu) (*openolt.OnuLogicalDistance, error) { |
| return &openolt.OnuLogicalDistance{}, nil |
| } |
| |
| func (o *OltDevice) GetPonRxPower(ctx context.Context, in *openolt.Onu) (*openolt.PonRxPowerData, error) { |
| return &openolt.PonRxPowerData{}, nil |
| } |
| |
| func (o *OltDevice) GetGemPortStatistics(ctx context.Context, in *openolt.OnuPacket) (*openolt.GemPortStatistics, error) { |
| return &openolt.GemPortStatistics{}, nil |
| } |
| |
| func (o *OltDevice) GetOnuStatistics(ctx context.Context, in *openolt.Onu) (*openolt.OnuStatistics, error) { |
| return &openolt.OnuStatistics{}, nil |
| } |
| |
| func (o *OltDevice) storeAllocId(flow *openolt.Flow) { |
| o.AllocIDsLock.Lock() |
| defer o.AllocIDsLock.Unlock() |
| |
| if _, ok := o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)]; !ok { |
| oltLogger.WithFields(log.Fields{ |
| "IntfId": flow.AccessIntfId, |
| "OnuId": flow.OnuId, |
| "PortNo": flow.PortNo, |
| "GemportId": flow.GemportId, |
| "FlowId": flow.FlowId, |
| }).Error("trying-to-store-alloc-id-for-unknown-onu") |
| } |
| |
| oltLogger.WithFields(log.Fields{ |
| "IntfId": flow.AccessIntfId, |
| "OnuId": flow.OnuId, |
| "PortNo": flow.PortNo, |
| "GemportId": flow.GemportId, |
| "FlowId": flow.FlowId, |
| }).Trace("storing-alloc-id-via-flow") |
| |
| if _, ok := o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo]; !ok { |
| o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo] = make(map[int32]map[uint64]bool) |
| } |
| if _, ok := o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.AllocId]; !ok { |
| o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.AllocId] = make(map[uint64]bool) |
| } |
| o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.AllocId][flow.FlowId] = true |
| } |
| |
| func (o *OltDevice) freeAllocId(flow *openolt.Flow) { |
| // if this is the last flow referencing the AllocId then remove it |
| o.AllocIDsLock.Lock() |
| defer o.AllocIDsLock.Unlock() |
| |
| oltLogger.WithFields(log.Fields{ |
| "IntfId": flow.AccessIntfId, |
| "OnuId": flow.OnuId, |
| "PortNo": flow.PortNo, |
| "GemportId": flow.GemportId, |
| }).Trace("freeing-alloc-id-via-flow") |
| |
| // NOTE look at the freeGemPortId implementation for comments and context |
| for ponId, ponValues := range o.AllocIDs { |
| for onuId, onuValues := range ponValues { |
| for uniId, uniValues := range onuValues { |
| for allocId, flows := range uniValues { |
| for flowId := range flows { |
| // if the flow matches, remove it from the map. |
| if flow.FlowId == flowId { |
| delete(o.AllocIDs[ponId][onuId][uniId][allocId], flow.FlowId) |
| } |
| // if that was the last flow for a particular allocId, remove the entire allocId |
| if len(o.AllocIDs[ponId][onuId][uniId][allocId]) == 0 { |
| delete(o.AllocIDs[ponId][onuId][uniId], allocId) |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| func (o *OltDevice) storeGemPortId(ponId uint32, onuId uint32, portNo uint32, gemId int32, flowId uint64) { |
| o.GemPortIDsLock.Lock() |
| defer o.GemPortIDsLock.Unlock() |
| |
| if _, ok := o.GemPortIDs[ponId][onuId]; !ok { |
| oltLogger.WithFields(log.Fields{ |
| "IntfId": ponId, |
| "OnuId": onuId, |
| "PortNo": portNo, |
| "GemportId": gemId, |
| "FlowId": flowId, |
| }).Error("trying-to-store-gemport-for-unknown-onu") |
| } |
| |
| oltLogger.WithFields(log.Fields{ |
| "IntfId": ponId, |
| "OnuId": onuId, |
| "PortNo": portNo, |
| "GemportId": gemId, |
| "FlowId": flowId, |
| }).Trace("storing-alloc-id-via-flow") |
| |
| if _, ok := o.GemPortIDs[ponId][onuId][portNo]; !ok { |
| o.GemPortIDs[ponId][onuId][portNo] = make(map[int32]map[uint64]bool) |
| } |
| if _, ok := o.GemPortIDs[ponId][onuId][portNo][gemId]; !ok { |
| o.GemPortIDs[ponId][onuId][portNo][gemId] = make(map[uint64]bool) |
| } |
| o.GemPortIDs[ponId][onuId][portNo][gemId][flowId] = true |
| } |
| |
| func (o *OltDevice) storeGemPortIdByFlow(flow *openolt.Flow) { |
| oltLogger.WithFields(log.Fields{ |
| "IntfId": flow.AccessIntfId, |
| "OnuId": flow.OnuId, |
| "PortNo": flow.PortNo, |
| "GemportId": flow.GemportId, |
| "FlowId": flow.FlowId, |
| "ReplicateFlow": flow.ReplicateFlow, |
| "PbitToGemport": flow.PbitToGemport, |
| }).Trace("storing-gem-port-id-via-flow") |
| |
| if flow.ReplicateFlow { |
| for _, gem := range flow.PbitToGemport { |
| o.storeGemPortId(uint32(flow.AccessIntfId), uint32(flow.OnuId), flow.PortNo, int32(gem), flow.FlowId) |
| } |
| } else { |
| o.storeGemPortId(uint32(flow.AccessIntfId), uint32(flow.OnuId), flow.PortNo, flow.GemportId, flow.FlowId) |
| } |
| } |
| |
| func (o *OltDevice) freeGemPortId(flow *openolt.Flow) { |
| // if this is the last flow referencing the GemPort then remove it |
| o.GemPortIDsLock.Lock() |
| defer o.GemPortIDsLock.Unlock() |
| |
| oltLogger.WithFields(log.Fields{ |
| "IntfId": flow.AccessIntfId, |
| "OnuId": flow.OnuId, |
| "PortNo": flow.PortNo, |
| "GemportId": flow.GemportId, |
| }).Trace("freeing-gem-port-id-via-flow") |
| |
| // NOTE that this loop is not very performant, it would be better if the flow carries |
| // the same information that it carries during a FlowAdd. If so we can directly remove |
| // items from the map |
| |
| //delete(o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId], flow.FlowId) |
| //if len(o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo][flow.GemportId]) == 0 { |
| // delete(o.GemPortIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo], flow.GemportId) |
| //} |
| |
| // NOTE this loop assumes that flow IDs are unique per device |
| for ponId, ponValues := range o.GemPortIDs { |
| for onuId, onuValues := range ponValues { |
| for uniId, uniValues := range onuValues { |
| for gemId, flows := range uniValues { |
| for flowId := range flows { |
| // if the flow matches, remove it from the map. |
| if flow.FlowId == flowId { |
| delete(o.GemPortIDs[ponId][onuId][uniId][gemId], flow.FlowId) |
| } |
| // if that was the last flow for a particular gem, remove the entire gem |
| if len(o.GemPortIDs[ponId][onuId][uniId][gemId]) == 0 { |
| delete(o.GemPortIDs[ponId][onuId][uniId], gemId) |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| // validateFlow checks that: |
| // - the AllocId is not used in any flow referencing other ONUs/UNIs on the same PON |
| // - the GemPortId is not used in any flow referencing other ONUs/UNIs on the same PON |
| func (o *OltDevice) validateFlow(flow *openolt.Flow) error { |
| // validate gemPort |
| o.GemPortIDsLock.RLock() |
| defer o.GemPortIDsLock.RUnlock() |
| for onuId, onu := range o.GemPortIDs[uint32(flow.AccessIntfId)] { |
| if onuId == uint32(flow.OnuId) { |
| continue |
| } |
| for uniId, uni := range onu { |
| for gem := range uni { |
| if flow.ReplicateFlow { |
| for _, flowGem := range flow.PbitToGemport { |
| if gem == int32(flowGem) { |
| return fmt.Errorf("gem-%d-already-in-use-on-uni-%d-onu-%d-replicated-flow-%d", gem, uniId, onuId, flow.FlowId) |
| } |
| } |
| } else { |
| if gem == flow.GemportId { |
| return fmt.Errorf("gem-%d-already-in-use-on-uni-%d-onu-%d-flow-%d", gem, uniId, onuId, flow.FlowId) |
| } |
| } |
| } |
| } |
| } |
| |
| o.AllocIDsLock.RLock() |
| defer o.AllocIDsLock.RUnlock() |
| for onuId, onu := range o.AllocIDs[uint32(flow.AccessIntfId)] { |
| if onuId == uint32(flow.OnuId) { |
| continue |
| } |
| for uniId, uni := range onu { |
| for allocId := range uni { |
| if allocId == flow.AllocId { |
| return fmt.Errorf("allocId-%d-already-in-use-on-uni-%d-onu-%d-flow-%d", allocId, uniId, onuId, flow.FlowId) |
| } |
| } |
| } |
| } |
| |
| return nil |
| } |
| |
| // clearAllResources is invoked up OLT Reboot to remove all the allocated |
| // GemPorts, AllocId and ONU-IDs across the PONs |
| func (o *OltDevice) clearAllResources() { |
| |
| // remove the resources received via flows |
| o.GemPortIDsLock.Lock() |
| o.GemPortIDs = make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool) |
| o.GemPortIDsLock.Unlock() |
| o.AllocIDsLock.Lock() |
| o.AllocIDs = make(map[uint32]map[uint32]map[uint32]map[int32]map[uint64]bool) |
| o.AllocIDsLock.Unlock() |
| |
| // remove the resources received via OMCI |
| for _, pon := range o.Pons { |
| pon.removeAllAllocIds() |
| pon.removeAllGemPorts() |
| pon.removeAllOnuIds() |
| } |
| } |