/*
 * Copyright 2018-2024 Open Networking Foundation (ONF) and the ONF Contributors

 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at

 * http://www.apache.org/licenses/LICENSE-2.0

 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package devices

import (
	"context"
	"encoding/binary"
	"encoding/hex"
	"fmt"
	"math/rand"
	"sync"

	"github.com/opencord/bbsim/internal/bbsim/packetHandlers"
	"github.com/opencord/bbsim/internal/bbsim/responders/dhcp"
	"github.com/opencord/bbsim/internal/bbsim/responders/eapol"

	pb "github.com/opencord/bbsim/api/bbsim"
	"github.com/opencord/bbsim/internal/bbsim/alarmsim"

	"net"
	"strconv"
	"time"

	bbsim "github.com/opencord/bbsim/internal/bbsim/types"
	me "github.com/opencord/omci-lib-go/v2/generated"

	"github.com/boguslaw-wojcik/crc32a"
	"github.com/google/gopacket/layers"
	"github.com/jpillora/backoff"
	"github.com/looplab/fsm"
	"github.com/opencord/bbsim/internal/common"
	omcilib "github.com/opencord/bbsim/internal/common/omci"
	"github.com/opencord/omci-lib-go/v2"
	"github.com/opencord/voltha-protos/v5/go/openolt"
	"github.com/opencord/voltha-protos/v5/go/tech_profile"
	log "github.com/sirupsen/logrus"
)

var onuLogger = log.WithFields(log.Fields{
	"module": "ONU",
})

const (
	maxOmciMsgCounter = 10
)

const (
	// ONU transitions
	OnuTxInitialize            = "initialize"
	OnuTxDiscover              = "discover"
	OnuTxEnable                = "enable"
	OnuTxDisable               = "disable"
	OnuTxPonDisable            = "pon_disable"
	OnuTxStartImageDownload    = "start_image_download"
	OnuTxProgressImageDownload = "progress_image_download"
	OnuTxCompleteImageDownload = "complete_image_download"
	OnuTxFailImageDownload     = "fail_image_download"
	OnuTxActivateImage         = "activate_image"
	OnuTxCommitImage           = "commit_image"

	// ONU States
	OnuStateCreated                 = "created"
	OnuStateInitialized             = "initialized"
	OnuStateDiscovered              = "discovered"
	OnuStateEnabled                 = "enabled"
	OnuStateDisabled                = "disabled"
	OnuStatePonDisabled             = "pon_disabled"
	OnuStateImageDownloadStarted    = "image_download_started"
	OnuStateImageDownloadInProgress = "image_download_in_progress"
	OnuStateImageDownloadComplete   = "image_download_completed"
	OnuStateImageDownloadError      = "image_download_error"
	OnuStateImageActivated          = "software_image_activated"
	OnuStateImageCommitted          = "software_image_committed"

	// BBR ONU States and Transitions
	BbrOnuTxSendEapolFlow    = "send_eapol_flow"
	BbrOnuStateEapolFlowSent = "eapol_flow_sent"
	BbrOnuTxSendDhcpFlow     = "send_dhcp_flow"
	BbrOnuStateDhcpFlowSent  = "dhcp_flow_sent"
)

type FlowKey struct {
	ID uint64
}

type Onu struct {
	ID                  uint32
	PonPortID           uint32
	PonPort             *PonPort
	InternalState       *fsm.FSM
	DiscoveryRetryDelay time.Duration // this is the time between subsequent Discovery Indication
	DiscoveryDelay      time.Duration // this is the time to send the first Discovery Indication

	Backoff *backoff.Backoff
	// ONU State
	UniPorts  []UniPortIf
	PotsPorts []PotsPortIf
	Flows     []FlowKey
	FlowIds   []uint64 // keep track of the flows we currently have in the ONU

	OperState    *fsm.FSM
	SerialNumber *openolt.SerialNumber

	AdminLockState uint8 // 0 is enabled, 1 is disabled.

	Channel chan bbsim.Message // this Channel is to track state changes OMCI messages, EAPOL and DHCP packets

	// OMCI params
	MibDataSync                   uint8
	ImageSoftwareExpectedSections uint32
	ImageSoftwareReceivedSections uint32
	ActiveImageEntityId           uint16
	CommittedImageEntityId        uint16
	StandbyImageVersion           string
	ActiveImageVersion            string
	InDownloadImageVersion        string
	CommittedImageVersion         string
	OmciResponseRate              uint8
	OmciMsgCounter                uint8
	ImageSectionData              []byte

	// OMCI params (Used in BBR)
	tid       uint16
	hpTid     uint16
	seqNumber uint16
	MibDb     *omcilib.MibDb

	DoneChannel       chan bool // this channel is used to signal once the onu is complete (when the struct is used by BBR)
	TrafficSchedulers *tech_profile.TrafficSchedulers
	onuAlarmsInfoLock sync.RWMutex
	onuAlarmsInfo     map[omcilib.OnuAlarmInfoMapKey]omcilib.OnuAlarmInfo
}

func (o *Onu) Sn() string {
	return common.OnuSnToString(o.SerialNumber)
}

func CreateONU(olt *OltDevice, pon *PonPort, id uint32, delay time.Duration, nextCtag map[string]int, nextStag map[string]int, isMock bool) *Onu {

	o := Onu{
		ID:                            id,
		PonPortID:                     pon.ID,
		PonPort:                       pon,
		tid:                           0x1,
		hpTid:                         0x8000,
		seqNumber:                     0,
		DoneChannel:                   make(chan bool, 1),
		DiscoveryRetryDelay:           60 * time.Second, // this is used to send OnuDiscoveryIndications until an activate call is received
		Flows:                         []FlowKey{},
		DiscoveryDelay:                delay,
		MibDataSync:                   0,
		ImageSoftwareExpectedSections: 0, // populated during OMCI StartSoftwareDownloadRequest
		ImageSoftwareReceivedSections: 0,
		//TODO this needs reworking, it's always 0 or 1, possibly base all on the version
		ActiveImageEntityId:    0, // when we start the SoftwareImage with ID 0 is active and committed
		CommittedImageEntityId: 0,
		StandbyImageVersion:    "BBSM_IMG_00000",
		ActiveImageVersion:     "BBSM_IMG_00001",
		CommittedImageVersion:  "BBSM_IMG_00001",
		OmciResponseRate:       olt.OmciResponseRate,
		OmciMsgCounter:         0,
	}
	o.SerialNumber = NewSN(olt.ID, pon.ID, id)
	// NOTE this state machine is used to track the operational
	// state as requested by VOLTHA
	o.OperState = getOperStateFSM(func(e *fsm.Event) {
		onuLogger.WithFields(log.Fields{
			"OnuId":  o.ID,
			"IntfId": o.PonPortID,
			"OnuSn":  o.Sn(),
		}).Debugf("Changing ONU OperState from %s to %s", e.Src, e.Dst)
	})
	o.onuAlarmsInfo = make(map[omcilib.OnuAlarmInfoMapKey]omcilib.OnuAlarmInfo)

	// NOTE this state machine is used to activate the OMCI, EAPOL and DHCP clients
	o.InternalState = fsm.NewFSM(
		OnuStateCreated,
		fsm.Events{
			// DEVICE Lifecycle
			{Name: OnuTxInitialize, Src: []string{OnuStateCreated, OnuStateDisabled, OnuStatePonDisabled}, Dst: OnuStateInitialized},
			{Name: OnuTxDiscover, Src: []string{OnuStateInitialized}, Dst: OnuStateDiscovered},
			{Name: OnuTxEnable, Src: []string{OnuStateDiscovered, OnuStatePonDisabled}, Dst: OnuStateEnabled},
			// NOTE should disabled state be different for oper_disabled (emulating an error) and admin_disabled (received a disabled call via VOLTHA)?
			{Name: OnuTxDisable, Src: []string{OnuStateEnabled, OnuStatePonDisabled, OnuStateImageActivated, OnuStateImageDownloadError, OnuStateImageCommitted}, Dst: OnuStateDisabled},
			// ONU state when PON port is disabled but ONU is power ON(more states should be added in src?)
			{Name: OnuTxPonDisable, Src: []string{OnuStateEnabled, OnuStateImageActivated, OnuStateImageDownloadError, OnuStateImageCommitted, OnuStateImageDownloadComplete}, Dst: OnuStatePonDisabled},
			// Software Image Download related states
			{Name: OnuTxStartImageDownload, Src: []string{OnuStateEnabled, OnuStateImageDownloadComplete, OnuStateImageDownloadError, OnuStateImageCommitted}, Dst: OnuStateImageDownloadStarted},
			{Name: OnuTxProgressImageDownload, Src: []string{OnuStateImageDownloadStarted}, Dst: OnuStateImageDownloadInProgress},
			{Name: OnuTxCompleteImageDownload, Src: []string{OnuStateImageDownloadInProgress}, Dst: OnuStateImageDownloadComplete},
			{Name: OnuTxFailImageDownload, Src: []string{OnuStateImageDownloadInProgress}, Dst: OnuStateImageDownloadError},
			{Name: OnuTxActivateImage, Src: []string{OnuStateImageDownloadComplete}, Dst: OnuStateImageActivated},
			{Name: OnuTxCommitImage, Src: []string{OnuStateEnabled}, Dst: OnuStateImageCommitted}, // the image is committed after a ONU reboot
			// BBR States
			// TODO add start OMCI state
			{Name: BbrOnuTxSendEapolFlow, Src: []string{OnuStateInitialized}, Dst: BbrOnuStateEapolFlowSent},
			{Name: BbrOnuTxSendDhcpFlow, Src: []string{BbrOnuStateEapolFlowSent}, Dst: BbrOnuStateDhcpFlowSent},
		},
		fsm.Callbacks{
			"enter_state": func(e *fsm.Event) {
				o.logStateChange(e.Src, e.Dst)
			},
			fmt.Sprintf("enter_%s", OnuStateInitialized): func(e *fsm.Event) {
				// create new channel for ProcessOnuMessages Go routine
				o.Channel = make(chan bbsim.Message, 2048)

				if err := o.OperState.Event(OnuTxEnable); err != nil {
					onuLogger.WithFields(log.Fields{
						"OnuId":  o.ID,
						"IntfId": o.PonPortID,
						"OnuSn":  o.Sn(),
					}).Errorf("Cannot change ONU OperState to up: %s", err.Error())
				}

				if !isMock {
					// start ProcessOnuMessages Go routine
					go o.ProcessOnuMessages(olt.enableContext, olt.OpenoltStream, nil)
				}
			},
			fmt.Sprintf("enter_%s", OnuStateDiscovered): func(e *fsm.Event) {
				msg := bbsim.Message{
					Type: bbsim.OnuDiscIndication,
					Data: bbsim.OnuDiscIndicationMessage{
						OperState: bbsim.UP,
					},
				}
				o.Channel <- msg
			},
			fmt.Sprintf("enter_%s", OnuStateEnabled): func(event *fsm.Event) {

				if used, sn := o.PonPort.isOnuIdAllocated(o.ID); used {
					onuLogger.WithFields(log.Fields{
						"IntfId":       o.PonPortID,
						"OnuId":        o.ID,
						"SerialNumber": o.Sn(),
					}).Errorf("onu-id-duplicated-with-%s", common.OnuSnToString(sn))
					return
				} else {
					o.PonPort.storeOnuId(o.ID, o.SerialNumber)
				}

				msg := bbsim.Message{
					Type: bbsim.OnuIndication,
					Data: bbsim.OnuIndicationMessage{
						OnuSN:     o.SerialNumber,
						PonPortID: o.PonPortID,
						OperState: bbsim.UP,
					},
				}
				o.Channel <- msg
			},
			fmt.Sprintf("enter_%s", OnuStateDisabled): func(event *fsm.Event) {

				o.cleanupOnuState()

				// set the OperState to disabled
				if err := o.OperState.Event("disable"); err != nil {
					onuLogger.WithFields(log.Fields{
						"OnuId":  o.ID,
						"IntfId": o.PonPortID,
						"OnuSn":  o.Sn(),
					}).Errorf("Cannot change ONU OperState to down: %s", err.Error())
				}
				// send the OnuIndication DOWN event
				msg := bbsim.Message{
					Type: bbsim.OnuIndication,
					Data: bbsim.OnuIndicationMessage{
						OnuSN:     o.SerialNumber,
						PonPortID: o.PonPortID,
						OperState: bbsim.DOWN,
					},
				}
				o.Channel <- msg

				// disable the UNI ports
				for _, uni := range o.UniPorts {
					if err := uni.Disable(); err != nil {
						onuLogger.WithFields(log.Fields{
							"onuId": o.ID,
							"OnuSn": o.Sn(),
							"UniId": uni.GetID(),
							"err":   err,
						}).Error("failed-to-disable-uni-port")
					}
				}

				// disable the POTS UNI ports
				for _, pots := range o.PotsPorts {
					if err := pots.Disable(); err != nil {
						onuLogger.WithFields(log.Fields{
							"onuId": o.ID,
							"OnuSn": o.Sn(),
							"UniId": pots.GetID(),
							"err":   err,
						}).Error("failed-to-disable-pots-port")
					}
				}

				// verify all the flows removes are handled and
				// terminate the ONU's ProcessOnuMessages Go routine
				// NOTE may need to wait for the UNIs to be down too before shutting down the channel
				if len(o.FlowIds) == 0 {
					close(o.Channel)
				}
			},
			fmt.Sprintf("enter_%s", OnuStatePonDisabled): func(event *fsm.Event) {
				o.cleanupOnuState()
			},
			// BBR states
			fmt.Sprintf("enter_%s", BbrOnuStateEapolFlowSent): func(e *fsm.Event) {
				msg := bbsim.Message{
					Type: bbsim.SendEapolFlow,
				}
				o.Channel <- msg
			},
			fmt.Sprintf("enter_%s", BbrOnuStateDhcpFlowSent): func(e *fsm.Event) {
				msg := bbsim.Message{
					Type: bbsim.SendDhcpFlow,
				}
				o.Channel <- msg
			},
		},
	)
	onuLogger.WithFields(log.Fields{
		"OnuId":   o.ID,
		"IntfId":  o.PonPortID,
		"OnuSn":   o.Sn(),
		"NumUni":  olt.NumUni,
		"NumPots": olt.NumPots,
	}).Debug("creating-uni-ports")

	// create Ethernet UNIs
	for i := 0; i < olt.NumUni; i++ {
		uni, err := NewUniPort(uint32(i), &o, nextCtag, nextStag)
		if err != nil {
			onuLogger.WithFields(log.Fields{
				"OnuId":  o.ID,
				"IntfId": o.PonPortID,
				"OnuSn":  o.Sn(),
				"Err":    err,
			}).Fatal("cannot-create-uni-port")
		}
		o.UniPorts = append(o.UniPorts, uni)
	}
	// create POTS UNIs, with progressive IDs
	for i := olt.NumUni; i < (olt.NumUni + olt.NumPots); i++ {
		pots, err := NewPotsPort(uint32(i), &o)
		if err != nil {
			onuLogger.WithFields(log.Fields{
				"OnuId":  o.ID,
				"IntfId": o.PonPortID,
				"OnuSn":  o.Sn(),
				"Err":    err,
			}).Fatal("cannot-create-pots-port")
		}
		o.PotsPorts = append(o.PotsPorts, pots)
	}

	mibDb, err := omcilib.GenerateMibDatabase(len(o.UniPorts), len(o.PotsPorts), o.PonPort.Technology)
	if err != nil {
		onuLogger.WithFields(log.Fields{
			"OnuId":  o.ID,
			"IntfId": o.PonPortID,
			"OnuSn":  o.Sn(),
		}).Fatal("cannot-generate-mibdb-for-onu")
	}
	o.MibDb = mibDb

	return &o
}

func (o *Onu) logStateChange(src string, dst string) {
	onuLogger.WithFields(log.Fields{
		"OnuId":  o.ID,
		"IntfId": o.PonPortID,
		"OnuSn":  o.Sn(),
	}).Debugf("Changing ONU InternalState from %s to %s", src, dst)
}

// cleanupOnuState this method is to clean the local state when the ONU is disabled
func (o *Onu) cleanupOnuState() {
	// clean the ONU state
	o.Flows = []FlowKey{}
	o.PonPort.removeOnuId(o.ID)
	o.PonPort.removeAllocIdsForOnuSn(o.SerialNumber)
	o.PonPort.removeGemPortBySn(o.SerialNumber)

	o.onuAlarmsInfoLock.Lock()
	o.onuAlarmsInfo = make(map[omcilib.OnuAlarmInfoMapKey]omcilib.OnuAlarmInfo) //Basically reset everything on onu disable
	o.onuAlarmsInfoLock.Unlock()
}

// ProcessOnuMessages starts indication channel for each ONU
func (o *Onu) ProcessOnuMessages(ctx context.Context, stream openolt.Openolt_EnableIndicationServer, client openolt.OpenoltClient) {
	onuLogger.WithFields(log.Fields{
		"onuID":   o.ID,
		"onuSN":   o.Sn(),
		"ponPort": o.PonPortID,
		"stream":  stream,
	}).Debug("Starting ONU Indication Channel")

	defer onuLogger.WithFields(log.Fields{
		"onuID":  o.ID,
		"onuSN":  o.Sn(),
		"stream": stream,
	}).Debug("Stopped handling ONU Indication Channel")

loop:
	for {
		select {
		case <-ctx.Done():
			onuLogger.WithFields(log.Fields{
				"onuID": o.ID,
				"onuSN": o.Sn(),
			}).Debug("ONU message handling canceled via context")
			break loop
		case <-stream.Context().Done():
			onuLogger.WithFields(log.Fields{
				"onuID": o.ID,
				"onuSN": o.Sn(),
			}).Debug("ONU message handling canceled via stream context")
			break loop
		case message, ok := <-o.Channel:
			if !ok || ctx.Err() != nil {
				onuLogger.WithFields(log.Fields{
					"onuID": o.ID,
					"onuSN": o.Sn(),
				}).Debug("ONU message handling canceled via channel close")
				break loop
			}
			onuLogger.WithFields(log.Fields{
				"onuID":       o.ID,
				"onuSN":       o.Sn(),
				"messageType": message.Type,
			}).Tracef("Received message on ONU Channel")

			switch message.Type {
			case bbsim.OnuDiscIndication:
				msg, _ := message.Data.(bbsim.OnuDiscIndicationMessage)
				// NOTE we need to slow down and send ONU Discovery Indication in batches to better emulate a real scenario
				time.Sleep(o.DiscoveryDelay)
				o.sendOnuDiscIndication(msg, stream)
			case bbsim.OnuIndication:
				msg, _ := message.Data.(bbsim.OnuIndicationMessage)
				o.sendOnuIndication(msg, stream)
			case bbsim.OMCI:
				// these are OMCI messages received by the ONU
				msg, _ := message.Data.(bbsim.OmciMessage)
				_ = o.handleOmciRequest(msg, stream)
			case bbsim.UniStatusAlarm:
				msg, _ := message.Data.(bbsim.UniStatusAlarmMessage)
				onuAlarmMapKey := omcilib.OnuAlarmInfoMapKey{
					MeInstance: msg.EntityID,
					MeClassID:  me.PhysicalPathTerminationPointEthernetUniClassID,
				}
				seqNo := o.IncrementAlarmSequenceNumber(onuAlarmMapKey)
				o.onuAlarmsInfoLock.Lock()
				var alarmInfo = o.onuAlarmsInfo[onuAlarmMapKey]
				pkt, alarmBitMap := omcilib.CreateUniStatusAlarm(msg.RaiseOMCIAlarm, msg.EntityID, seqNo)
				if pkt != nil { //pkt will be nil if we are unable to create the alarm
					if err := o.sendOmciIndication(pkt, 0, stream); err != nil {
						onuLogger.WithFields(log.Fields{
							"IntfId":       o.PonPortID,
							"SerialNumber": o.Sn(),
							"omciPacket":   pkt,
							"adminState":   msg.AdminState,
							"entityID":     msg.EntityID,
						}).Errorf("failed-to-send-UNI-Link-Alarm: %v", err)
						alarmInfo.SequenceNo--
					}
					onuLogger.WithFields(log.Fields{
						"IntfId":       o.PonPortID,
						"SerialNumber": o.Sn(),
						"omciPacket":   pkt,
						"adminState":   msg.AdminState,
						"entityID":     msg.EntityID,
					}).Trace("UNI-Link-alarm-sent")
					if alarmBitMap == [28]byte{0} {
						delete(o.onuAlarmsInfo, onuAlarmMapKey)
					} else {
						alarmInfo.AlarmBitMap = alarmBitMap
						o.onuAlarmsInfo[onuAlarmMapKey] = alarmInfo
					}
				}
				o.onuAlarmsInfoLock.Unlock()
			case bbsim.FlowAdd:
				msg, _ := message.Data.(bbsim.OnuFlowUpdateMessage)
				o.handleFlowAdd(msg)
			case bbsim.FlowRemoved:
				msg, _ := message.Data.(bbsim.OnuFlowUpdateMessage)
				o.handleFlowRemove(msg)
			case bbsim.OnuPacketOut:

				msg, _ := message.Data.(bbsim.OnuPacketMessage)

				onuLogger.WithFields(log.Fields{
					"IntfId":    msg.IntfId,
					"OnuId":     msg.OnuId,
					"pktType":   msg.Type,
					"OnuSn":     o.Sn(),
					"gemportid": msg.GemPortId,
				}).Trace("Received OnuPacketOut Message")

				if msg.GemPortId == multicastGemPortId {
					unis := o.findUniWithIgmpMembership()
					if len(unis) == 0 {
						onuLogger.WithFields(log.Fields{
							"IntfId":     msg.IntfId,
							"OnuId":      msg.OnuId,
							"pktType":    msg.Type,
							"portNo":     msg.PortNo,
							"MacAddress": msg.MacAddress,
							"Pkt":        hex.EncodeToString(msg.Packet.Data()),
							"OnuSn":      o.Sn(),
						}).Trace("No uni to forward msg coming to multicast gemport")
					}
					for _, uni := range unis {
						uni.PacketCh <- msg
					}
				} else {
					uni, err := o.findUniByPortNo(msg.PortNo)
					if err != nil {
						onuLogger.WithFields(log.Fields{
							"IntfId":     msg.IntfId,
							"OnuId":      msg.OnuId,
							"pktType":    msg.Type,
							"portNo":     msg.PortNo,
							"MacAddress": msg.MacAddress,
							"Pkt":        hex.EncodeToString(msg.Packet.Data()),
							"OnuSn":      o.Sn(),
						}).Error("Cannot find Uni associated with packet")
						continue
					}
					uni.PacketCh <- msg
				}
			// BBR specific messages
			case bbsim.OnuPacketIn:
				// NOTE we only receive BBR packets here.
				// Eapol.HandleNextPacket can handle both BBSim and BBr cases so the call is the same
				// in the DHCP case VOLTHA only act as a proxy, the behaviour is completely different thus we have a dhcp.HandleNextBbrPacket
				msg, _ := message.Data.(bbsim.OnuPacketMessage)

				onuLogger.WithFields(log.Fields{
					"IntfId":    msg.IntfId,
					"OnuId":     msg.OnuId,
					"PortNo":    msg.PortNo,
					"GemPortId": msg.GemPortId,
					"pktType":   msg.Type,
				}).Trace("Received OnuPacketIn Message")

				uni, err := o.findUniByPortNo(msg.PortNo)
				if err != nil {
					onuLogger.WithFields(log.Fields{
						"IntfId":    msg.IntfId,
						"OnuId":     msg.OnuId,
						"PortNo":    msg.PortNo,
						"GemPortId": msg.GemPortId,
						"pktType":   msg.Type,
					}).Error(err.Error())
				}

				// BBR has one service and one UNI
				serviceId := uint32(0)
				oltId := 0
				if msg.Type == packetHandlers.EAPOL {
					eapol.HandleNextPacket(msg.OnuId, msg.IntfId, msg.GemPortId, o.Sn(), msg.PortNo, uni.ID, serviceId, oltId, o.InternalState, msg.Packet, stream, client)
				} else if msg.Type == packetHandlers.DHCP {
					_ = dhcp.HandleNextBbrPacket(o.ID, o.PonPortID, o.Sn(), o.DoneChannel, msg.Packet, client)
				}
			case bbsim.OmciIndication:
				// these are OMCI messages received by BBR (VOLTHA emulator)
				msg, _ := message.Data.(bbsim.OmciIndicationMessage)
				o.handleOmciResponse(msg, client)
			case bbsim.SendEapolFlow:
				o.sendEapolFlow(client)
			case bbsim.SendDhcpFlow:
				o.sendDhcpFlow(client)
			default:
				onuLogger.Warnf("Received unknown message data %v for type %v in OLT Channel", message.Data, message.Type)
			}
		}
	}
}

func NewSN(oltid int, intfid uint32, onuid uint32) *openolt.SerialNumber {
	sn := new(openolt.SerialNumber)
	sn.VendorId = []byte("BBSM")
	sn.VendorSpecific = []byte{0, byte(oltid % 256), byte(intfid), byte(onuid)}
	return sn
}

func (o *Onu) sendOnuDiscIndication(msg bbsim.OnuDiscIndicationMessage, stream openolt.Openolt_EnableIndicationServer) {
	discoverData := &openolt.Indication_OnuDiscInd{OnuDiscInd: &openolt.OnuDiscIndication{
		IntfId:       o.PonPortID,
		SerialNumber: o.SerialNumber,
	}}

	if err := stream.Send(&openolt.Indication{Data: discoverData}); err != nil {
		log.Errorf("Failed to send Indication_OnuDiscInd: %v", err)
		return
	}

	onuLogger.WithFields(log.Fields{
		"IntfId": o.PonPortID,
		"OnuSn":  o.Sn(),
		"OnuId":  o.ID,
	}).Debug("Sent Indication_OnuDiscInd")
	publishEvent("ONU-discovery-indication-sent", int32(o.PonPortID), int32(o.ID), o.Sn())

	// after DiscoveryRetryDelay check if the state is the same and in case send a new OnuDiscIndication
	go func(delay time.Duration) {
		time.Sleep(delay)
		if o.InternalState.Current() == OnuStateDiscovered {
			o.sendOnuDiscIndication(msg, stream)
		}
	}(o.DiscoveryRetryDelay)
}

func (o *Onu) sendOnuIndication(msg bbsim.OnuIndicationMessage, stream openolt.Openolt_EnableIndicationServer) {
	// NOTE the ONU ID is set by VOLTHA in the ActivateOnu call (via openolt.proto)
	// and stored in the Onu struct via onu.SetID

	indData := &openolt.Indication_OnuInd{OnuInd: &openolt.OnuIndication{
		IntfId:       o.PonPortID,
		OnuId:        o.ID,
		OperState:    msg.OperState.String(),
		AdminState:   o.OperState.Current(),
		SerialNumber: o.SerialNumber,
	}}
	if err := stream.Send(&openolt.Indication{Data: indData}); err != nil {
		// NOTE do we need to transition to a broken state?
		log.Errorf("Failed to send Indication_OnuInd: %v", err)
		return
	}
	onuLogger.WithFields(log.Fields{
		"IntfId":      o.PonPortID,
		"OnuId":       o.ID,
		"VolthaOnuId": msg.OnuID,
		"OperState":   msg.OperState.String(),
		"AdminState":  msg.OperState.String(),
		"OnuSn":       o.Sn(),
	}).Debug("Sent Indication_OnuInd")

}

func (o *Onu) HandleShutdownONU() error {

	dyingGasp := pb.ONUAlarmRequest{
		AlarmType:    "DYING_GASP",
		SerialNumber: o.Sn(),
		Status:       "on",
	}

	if err := alarmsim.SimulateOnuAlarm(&dyingGasp, o.ID, o.PonPortID, o.PonPort.Olt.channel); err != nil {
		onuLogger.WithFields(log.Fields{
			"OnuId":  o.ID,
			"IntfId": o.PonPortID,
			"OnuSn":  o.Sn(),
		}).Errorf("Cannot send Dying Gasp: %s", err.Error())
		return err
	}

	losReq := pb.ONUAlarmRequest{
		AlarmType:    "ONU_ALARM_LOS",
		SerialNumber: o.Sn(),
		Status:       "on",
	}

	if err := alarmsim.SimulateOnuAlarm(&losReq, o.ID, o.PonPortID, o.PonPort.Olt.channel); err != nil {
		onuLogger.WithFields(log.Fields{
			"OnuId":  o.ID,
			"IntfId": o.PonPortID,
			"OnuSn":  o.Sn(),
		}).Errorf("Cannot send LOS: %s", err.Error())

		return err
	}
	o.SendOMCIAlarmNotificationMsg(true, losReq.AlarmType)
	// TODO if it's the last ONU on the PON, then send a PON LOS

	if err := o.InternalState.Event(OnuTxDisable); err != nil {
		onuLogger.WithFields(log.Fields{
			"OnuId":  o.ID,
			"IntfId": o.PonPortID,
			"OnuSn":  o.Sn(),
		}).Errorf("Cannot shutdown ONU: %s", err.Error())
		return err
	}

	return nil
}

func (o *Onu) HandlePowerOnONU() error {
	intitalState := o.InternalState.Current()

	// Do not send discovery if OLT is in Deleted state
	oltState := o.PonPort.Olt.InternalState.Current()
	if oltState == "deleted" {
		onuLogger.WithFields(log.Fields{
			"OnuId":  o.ID,
			"IntfId": o.PonPortID,
			"OnuSn":  o.Sn(),
		}).Errorf("Cannot poweron ONU. oltState: %s", oltState)
		return nil
	}

	// initialize the ONU
	if intitalState == OnuStateCreated || intitalState == OnuStateDisabled {
		if err := o.InternalState.Event(OnuTxInitialize); err != nil {
			onuLogger.WithFields(log.Fields{
				"OnuId":  o.ID,
				"IntfId": o.PonPortID,
				"OnuSn":  o.Sn(),
			}).Errorf("Cannot poweron ONU: %s", err.Error())
			return err
		}
	}

	// turn off the LOS Alarm
	losReq := pb.ONUAlarmRequest{
		AlarmType:    "ONU_ALARM_LOS",
		SerialNumber: o.Sn(),
		Status:       "off",
	}

	if err := alarmsim.SimulateOnuAlarm(&losReq, o.ID, o.PonPortID, o.PonPort.Olt.channel); err != nil {
		onuLogger.WithFields(log.Fields{
			"OnuId":  o.ID,
			"IntfId": o.PonPortID,
			"OnuSn":  o.Sn(),
		}).Errorf("Cannot send LOS: %s", err.Error())
		return err
	}
	o.SendOMCIAlarmNotificationMsg(false, losReq.AlarmType)

	// Send a ONU Discovery indication
	if err := o.InternalState.Event(OnuTxDiscover); err != nil {
		onuLogger.WithFields(log.Fields{
			"OnuId":  o.ID,
			"IntfId": o.PonPortID,
			"OnuSn":  o.Sn(),
		}).Errorf("Cannot poweron ONU: %s", err.Error())
		return err
	}

	// move o directly to enable state only when its a powercycle case
	// in case of first time o poweron o will be moved to enable on
	// receiving ActivateOnu request from openolt adapter
	if intitalState == OnuStateDisabled {
		if err := o.InternalState.Event(OnuTxEnable); err != nil {
			onuLogger.WithFields(log.Fields{
				"OnuId":  o.ID,
				"IntfId": o.PonPortID,
				"OnuSn":  o.Sn(),
			}).Errorf("Cannot enable ONU: %s", err.Error())
			return err
		}
	}

	return nil
}

func (o *Onu) SetAlarm(alarmType string, status string) error {
	alarmReq := pb.ONUAlarmRequest{
		AlarmType:    alarmType,
		SerialNumber: o.Sn(),
		Status:       status,
	}

	err := alarmsim.SimulateOnuAlarm(&alarmReq, o.ID, o.PonPortID, o.PonPort.Olt.channel)
	if err != nil {
		return err
	}
	raiseAlarm := false
	if alarmReq.Status == "on" {
		raiseAlarm = true
	}
	o.SendOMCIAlarmNotificationMsg(raiseAlarm, alarmReq.AlarmType)
	return nil
}

func (o *Onu) publishOmciEvent(msg bbsim.OmciMessage) {
	if olt.PublishEvents {
		_, omciMsg, err := omcilib.ParseOpenOltOmciPacket(msg.OmciPkt.Data())
		if err != nil {
			log.Errorf("error in getting msgType %v", err)
			return
		}
		if omciMsg.MessageType == omci.MibUploadRequestType {
			o.seqNumber = 0
			publishEvent("MIB-upload-received", int32(o.PonPortID), int32(o.ID), common.OnuSnToString(o.SerialNumber))
		} else if omciMsg.MessageType == omci.MibUploadNextRequestType {
			o.seqNumber++
			if o.seqNumber > 290 {
				publishEvent("MIB-upload-done", int32(o.PonPortID), int32(o.ID), common.OnuSnToString(o.SerialNumber))
			}
		}
	}
}

// handleOmciRequest is responsible to parse the OMCI packets received from the openolt adapter
// and generate the appropriate response to it
func (o *Onu) handleOmciRequest(msg bbsim.OmciMessage, stream openolt.Openolt_EnableIndicationServer) error {

	onuLogger.WithFields(log.Fields{
		"omciMsgType":  msg.OmciMsg.MessageType,
		"transCorrId":  strconv.FormatInt(int64(msg.OmciMsg.TransactionID), 16),
		"DeviceIdent":  msg.OmciMsg.DeviceIdentifier,
		"IntfId":       o.PonPortID,
		"SerialNumber": o.Sn(),
	}).Trace("omci-message-decoded")

	if o.OmciMsgCounter < maxOmciMsgCounter {
		o.OmciMsgCounter++
	} else {
		o.OmciMsgCounter = 1
	}
	if o.OmciMsgCounter > o.OmciResponseRate {
		onuLogger.WithFields(log.Fields{
			"OmciMsgCounter":   o.OmciMsgCounter,
			"OmciResponseRate": o.OmciResponseRate,
			"omciMsgType":      msg.OmciMsg.MessageType,
			"txId":             msg.OmciMsg.TransactionID,
		}).Debug("skipping-omci-msg-response")
		return fmt.Errorf("skipping-omci-msg-response-because-of-response-rate-%d", o.OmciResponseRate)
	}
	var responsePkt []byte
	var errResp error
	switch msg.OmciMsg.MessageType {
	case omci.MibResetRequestType:
		onuLogger.WithFields(log.Fields{
			"IntfId":       o.PonPortID,
			"OnuId":        o.ID,
			"SerialNumber": o.Sn(),
		}).Debug("received-mib-reset-request")
		if responsePkt, errResp = omcilib.CreateMibResetResponse(msg.OmciMsg.TransactionID); errResp == nil {
			o.MibDataSync = 0

			// if the MIB reset is successful then remove all the stored AllocIds and GemPorts
			o.PonPort.removeAllocIdsForOnuSn(o.SerialNumber)
			o.PonPort.removeGemPortBySn(o.SerialNumber)
		}
	case omci.MibUploadRequestType:
		responsePkt, _ = omcilib.CreateMibUploadResponse(msg.OmciMsg, o.MibDb)
	case omci.MibUploadNextRequestType:
		responsePkt, _ = omcilib.CreateMibUploadNextResponse(msg.OmciPkt, msg.OmciMsg, o.MibDb)
	case omci.GetRequestType:
		onuDown := o.AdminLockState == 1
		responsePkt, _ = omcilib.CreateGetResponse(msg.OmciPkt, msg.OmciMsg, o.SerialNumber, o.MibDataSync, o.ActiveImageEntityId,
			o.CommittedImageEntityId, o.StandbyImageVersion, o.ActiveImageVersion, o.CommittedImageVersion, onuDown)

	case omci.SetRequestType:
		success := true
		msgObj, _ := omcilib.ParseSetRequest(msg.OmciPkt)
		switch msgObj.EntityClass {
		case me.PhysicalPathTerminationPointEthernetUniClassID:
			// if we're Setting a PPTP state
			// we need to send the appropriate alarm (handled in the UNI struct)
			uni, err := o.FindUniByEntityId(msgObj.EntityInstance)
			if err != nil {
				onuLogger.Error(err)
				success = false
			} else {
				// 1 locks the UNI, 0 unlocks it
				adminState := msgObj.Attributes[me.PhysicalPathTerminationPointEthernetUni_AdministrativeState].(uint8)
				var err error
				if adminState == 1 {
					err = uni.Disable()
				} else {
					err = uni.Enable()
				}
				if err != nil {
					onuLogger.WithFields(log.Fields{
						"IntfId":       o.PonPortID,
						"OnuId":        o.ID,
						"UniMeId":      uni.MeId,
						"UniId":        uni.ID,
						"SerialNumber": o.Sn(),
						"Err":          err.Error(),
					}).Warn("cannot-change-uni-status")
				}
			}
		case me.PhysicalPathTerminationPointPotsUniClassID:
			// if we're Setting a PPTP state
			// we need to send the appropriate alarm (handled in the POTS struct)
			pots, err := o.FindPotsByEntityId(msgObj.EntityInstance)
			if err != nil {
				onuLogger.Error(err)
				success = false
			} else {
				// 1 locks the UNI, 0 unlocks it
				adminState := msgObj.Attributes[me.PhysicalPathTerminationPointPotsUni_AdministrativeState].(uint8)
				var err error
				if adminState == 1 {
					err = pots.Disable()
				} else {
					err = pots.Enable()
				}
				if err != nil {
					onuLogger.WithFields(log.Fields{
						"IntfId":       o.PonPortID,
						"OnuId":        o.ID,
						"PotsMeId":     pots.MeId,
						"PotsId":       pots.ID,
						"SerialNumber": o.Sn(),
						"Err":          err.Error(),
					}).Warn("cannot-change-pots-status")
				}
			}
		case me.OnuGClassID:
			o.AdminLockState = msgObj.Attributes[me.OnuG_AdministrativeState].(uint8)
			onuLogger.WithFields(log.Fields{
				"IntfId":         o.PonPortID,
				"OnuId":          o.ID,
				"SerialNumber":   o.Sn(),
				"AdminLockState": o.AdminLockState,
			}).Debug("set-onu-admin-lock-state")
		case me.TContClassID:
			allocId := msgObj.Attributes[me.TCont_AllocId].(uint16)
			entityID := msgObj.Attributes["ManagedEntityId"].(uint16)

			// if the AllocId is 255 (0xFF) or 65535 (0xFFFF) it means we are removing it,
			// otherwise we are adding it
			if allocId == 255 || allocId == 65535 {
				onuLogger.WithFields(log.Fields{
					"IntfId":       o.PonPortID,
					"OnuId":        o.ID,
					"TContId":      msgObj.EntityInstance,
					"AllocId":      allocId,
					"SerialNumber": o.Sn(),
				}).Trace("freeing-alloc-id-via-omci")
				o.PonPort.removeAllocId(o.PonPortID, o.ID, entityID)
			} else {
				if used, allocObj := o.PonPort.isAllocIdAllocated(o.PonPortID, o.ID, entityID); used {
					onuLogger.WithFields(log.Fields{
						"IntfId":       o.PonPortID,
						"OnuId":        o.ID,
						"AllocId":      allocId,
						"SerialNumber": o.Sn(),
					}).Errorf("allocid-already-allocated-to-onu-with-sn-%s", common.OnuSnToString(allocObj.OnuSn))
					success = false
				} else {
					onuLogger.WithFields(log.Fields{
						"IntfId":       o.PonPortID,
						"OnuId":        o.ID,
						"TContId":      msgObj.EntityInstance,
						"AllocId":      allocId,
						"SerialNumber": o.Sn(),
					}).Trace("storing-alloc-id-via-omci")
					o.PonPort.storeAllocId(o.PonPortID, o.ID, entityID, allocId, o.SerialNumber)
				}
			}
		case me.EthernetFrameExtendedPmClassID,
			me.EthernetFrameExtendedPm64BitClassID:
			onuLogger.WithFields(log.Fields{
				"me-instance": msgObj.EntityInstance,
			}).Debug("set-request-received")
			// No need to reset counters as onu adapter will simply send the set control block request to actually reset
			// the counters, and respond with 0's without sending the get request to device.
			// Also, if we even reset the counters here in cache, then on get we need to restore the counters back which
			// would be of no use as ultimately the counters need to be restored.
		}

		if success {
			if responsePkt, errResp = omcilib.CreateSetResponse(msg.OmciPkt, msg.OmciMsg, me.Success); errResp == nil {
				o.MibDataSync++
			}
		} else {
			responsePkt, _ = omcilib.CreateSetResponse(msg.OmciPkt, msg.OmciMsg, me.AttributeFailure)
		}
	case omci.CreateRequestType:
		// check for GemPortNetworkCtp and make sure there are no duplicates on the same PON
		var used bool
		var sn *openolt.SerialNumber
		msgObj, err := omcilib.ParseCreateRequest(msg.OmciPkt)
		if err == nil {
			if msgObj.EntityClass == me.GemPortNetworkCtpClassID {
				// GemPort 4069 is reserved for multicast and shared across ONUs
				if msgObj.EntityInstance != 4069 {
					if used, sn = o.PonPort.isGemPortAllocated(msgObj.EntityInstance); used {
						onuLogger.WithFields(log.Fields{
							"IntfId":       o.PonPortID,
							"OnuId":        o.ID,
							"GemPortId":    msgObj.EntityInstance,
							"SerialNumber": o.Sn(),
						}).Errorf("gemport-already-allocated-to-onu-with-sn-%s", common.OnuSnToString(sn))
					} else {
						onuLogger.WithFields(log.Fields{
							"IntfId":       o.PonPortID,
							"OnuId":        o.ID,
							"GemPortId":    msgObj.EntityInstance,
							"SerialNumber": o.Sn(),
						}).Trace("storing-gem-port-id-via-omci")
						o.PonPort.storeGemPort(msgObj.EntityInstance, o.SerialNumber)
					}
				}
			}
		}

		// if the gemPort is valid then increment the MDS and return a successful response
		// otherwise fail the request
		// for now the CreateRequeste for the gemPort is the only one that can fail, if we start supporting multiple
		// validation this check will need to be rewritten
		if !used {
			if responsePkt, errResp = omcilib.CreateCreateResponse(msg.OmciPkt, msg.OmciMsg, me.Success); errResp == nil {
				o.MibDataSync++
			}
		} else {
			responsePkt, _ = omcilib.CreateCreateResponse(msg.OmciPkt, msg.OmciMsg, me.ProcessingError)
		}
	case omci.DeleteRequestType:
		msgObj, err := omcilib.ParseDeleteRequest(msg.OmciPkt)
		if err == nil {
			if msgObj.EntityClass == me.GemPortNetworkCtpClassID {
				onuLogger.WithFields(log.Fields{
					"IntfId":       o.PonPortID,
					"OnuId":        o.ID,
					"GemPortId":    msgObj.EntityInstance,
					"SerialNumber": o.Sn(),
				}).Trace("freeing-gem-port-id-via-omci")
				o.PonPort.removeGemPort(msgObj.EntityInstance)
			}
		}

		if responsePkt, errResp = omcilib.CreateDeleteResponse(msg.OmciPkt, msg.OmciMsg); errResp == nil {
			o.MibDataSync++
		}
	case omci.RebootRequestType:

		responsePkt, _ = omcilib.CreateRebootResponse(msg.OmciPkt, msg.OmciMsg)

		// powercycle the ONU
		// we run this in a separate goroutine so that
		// the RebootRequestResponse is sent to VOLTHA
		go func() {
			if err := o.Reboot(10 * time.Second); err != nil {
				log.WithFields(log.Fields{
					"IntfId":       o.PonPortID,
					"OnuId":        o.ID,
					"SerialNumber": o.Sn(),
					"err":          err,
				}).Error("cannot-reboot-onu-after-omci-reboot-request")
			}
		}()
	case omci.TestRequestType:
		var classID me.ClassID
		var omciResult me.Results
		var instID uint16
		responsePkt, errResp, classID, instID, omciResult = omcilib.CreateTestResponse(msg.OmciPkt, msg.OmciMsg)
		// Send TestResult only in case the TestResponse omci result code is me.Success
		if responsePkt != nil && errResp == nil && omciResult == me.Success {
			if testResultPkt, err := omcilib.CreateTestResult(classID, instID, msg.OmciMsg.TransactionID); err == nil {
				// send test results asynchronously
				go func() {
					// Send test results after a second to emulate async behavior
					time.Sleep(1 * time.Second)
					if testResultPkt != nil {
						if err := o.sendOmciIndication(testResultPkt, msg.OmciMsg.TransactionID, stream); err != nil {
							onuLogger.WithFields(log.Fields{
								"IntfId":          o.PonPortID,
								"SerialNumber":    o.Sn(),
								"omciPacket":      testResultPkt,
								"msg.OmciMsgType": msg.OmciMsg.MessageType,
								"transCorrId":     msg.OmciMsg.TransactionID,
							}).Errorf("failed-to-send-omci-message: %v", err)
						}
					}
				}()
			}
		}
	case omci.SynchronizeTimeRequestType:
		// MDS counter increment is not required for this message type
		responsePkt, _ = omcilib.CreateSyncTimeResponse(msg.OmciPkt, msg.OmciMsg)
	case omci.StartSoftwareDownloadRequestType:

		o.ImageSoftwareReceivedSections = 0
		o.ImageSectionData = []byte{}
		o.ImageSoftwareExpectedSections = omcilib.ComputeDownloadSectionsCount(msg.OmciPkt)

		if responsePkt, errResp = omcilib.CreateStartSoftwareDownloadResponse(msg.OmciPkt, msg.OmciMsg); errResp == nil {
			o.MibDataSync++
			if err := o.InternalState.Event(OnuTxStartImageDownload); err != nil {
				onuLogger.WithFields(log.Fields{
					"OnuId":  o.ID,
					"IntfId": o.PonPortID,
					"OnuSn":  o.Sn(),
					"Err":    err.Error(),
				}).Errorf("cannot-change-onu-internal-state-to-%s", OnuStateImageDownloadStarted)
			}
		} else {
			onuLogger.WithFields(log.Fields{
				"OmciMsgType":  msg.OmciMsg.MessageType,
				"TransCorrId":  msg.OmciMsg.TransactionID,
				"Err":          errResp.Error(),
				"IntfId":       o.PonPortID,
				"SerialNumber": o.Sn(),
			}).Error("error-while-processing-start-software-download-request")
		}
	case omci.DownloadSectionRequestType:
		if msgObj, err := omcilib.ParseDownloadSectionRequest(msg.OmciPkt); err == nil {
			onuLogger.WithFields(log.Fields{
				"OmciMsgType":    msg.OmciMsg.MessageType,
				"TransCorrId":    msg.OmciMsg.TransactionID,
				"EntityInstance": msgObj.EntityInstance,
				"SectionNumber":  msgObj.SectionNumber,
				"SectionData":    msgObj.SectionData,
			}).Trace("received-download-section-request")
			//Extracting the first 14 bytes to use as a version for this image.
			if o.ImageSoftwareReceivedSections == 0 {
				o.InDownloadImageVersion = string(msgObj.SectionData[0:14])
			}
			o.ImageSectionData = append(o.ImageSectionData, msgObj.SectionData...)
			o.ImageSoftwareReceivedSections++
			if o.InternalState.Current() != OnuStateImageDownloadInProgress {
				if err := o.InternalState.Event(OnuTxProgressImageDownload); err != nil {
					onuLogger.WithFields(log.Fields{
						"OnuId":  o.ID,
						"IntfId": o.PonPortID,
						"OnuSn":  o.Sn(),
						"Err":    err.Error(),
					}).Errorf("cannot-change-onu-internal-state-to-%s", OnuStateImageDownloadInProgress)
				}
			}
		}
	case omci.DownloadSectionRequestWithResponseType:
		// NOTE we only need to respond if an ACK is requested
		if msgObj, err := omcilib.ParseDownloadSectionRequest(msg.OmciPkt); err == nil {
			onuLogger.WithFields(log.Fields{
				"OmciMsgType":    msg.OmciMsg.MessageType,
				"TransCorrId":    msg.OmciMsg.TransactionID,
				"EntityInstance": msgObj.EntityInstance,
				"SectionNumber":  msgObj.SectionNumber,
				"SectionData":    msgObj.SectionData,
			}).Trace("received-download-section-request-with-response-type")
			o.ImageSectionData = append(o.ImageSectionData, msgObj.SectionData...)
			responsePkt, errResp = omcilib.CreateDownloadSectionResponse(msg.OmciPkt, msg.OmciMsg)

			if errResp != nil {
				onuLogger.WithFields(log.Fields{
					"OmciMsgType":  msg.OmciMsg.MessageType,
					"TransCorrId":  msg.OmciMsg.TransactionID,
					"Err":          errResp.Error(),
					"IntfId":       o.PonPortID,
					"SerialNumber": o.Sn(),
				}).Error("error-while-processing-create-download-section-response")
				return fmt.Errorf("error-while-processing-create-download-section-response: %s", errResp.Error())
			}
			o.ImageSoftwareReceivedSections++
		}
	case omci.EndSoftwareDownloadRequestType:
		success := o.handleEndSoftwareDownloadRequest(msg)

		if success {
			if responsePkt, errResp = omcilib.CreateEndSoftwareDownloadResponse(msg.OmciPkt, msg.OmciMsg, me.Success); errResp == nil {
				o.MibDataSync++
				if err := o.InternalState.Event(OnuTxCompleteImageDownload); err != nil {
					onuLogger.WithFields(log.Fields{
						"OnuId":  o.ID,
						"IntfId": o.PonPortID,
						"OnuSn":  o.Sn(),
						"Err":    err.Error(),
					}).Errorf("cannot-change-onu-internal-state-to-%s", OnuStateImageDownloadComplete)
				}
			} else {
				onuLogger.WithFields(log.Fields{
					"OmciMsgType":  msg.OmciMsg.MessageType,
					"TransCorrId":  msg.OmciMsg.TransactionID,
					"Err":          errResp.Error(),
					"IntfId":       o.PonPortID,
					"SerialNumber": o.Sn(),
				}).Error("error-while-responding-to-end-software-download-request")
			}
		} else {
			if responsePkt, errResp = omcilib.CreateEndSoftwareDownloadResponse(msg.OmciPkt, msg.OmciMsg, me.ProcessingError); errResp == nil {
				if err := o.InternalState.Event(OnuTxFailImageDownload); err != nil {
					onuLogger.WithFields(log.Fields{
						"OnuId":  o.ID,
						"IntfId": o.PonPortID,
						"OnuSn":  o.Sn(),
						"Err":    err.Error(),
					}).Errorf("cannot-change-onu-internal-state-to-%s", OnuStateImageDownloadError)
				}
			}
		}
	case omci.ActivateSoftwareRequestType:
		if responsePkt, errResp = omcilib.CreateActivateSoftwareResponse(msg.OmciPkt, msg.OmciMsg); errResp == nil {
			o.MibDataSync++
			if err := o.InternalState.Event(OnuTxActivateImage); err != nil {
				onuLogger.WithFields(log.Fields{
					"OnuId":  o.ID,
					"IntfId": o.PonPortID,
					"OnuSn":  o.Sn(),
					"Err":    err.Error(),
				}).Errorf("cannot-change-onu-internal-state-to-%s", OnuStateImageActivated)
			}
			if msgObj, err := omcilib.ParseActivateSoftwareRequest(msg.OmciPkt); err == nil {
				o.ActiveImageEntityId = msgObj.EntityInstance
				previousActiveImage := o.ActiveImageVersion
				o.ActiveImageVersion = o.StandbyImageVersion
				o.StandbyImageVersion = previousActiveImage
			} else {
				onuLogger.Errorf("something-went-wrong-while-activating: %s", err)
			}
			onuLogger.WithFields(log.Fields{
				"OnuId":                  o.ID,
				"IntfId":                 o.PonPortID,
				"OnuSn":                  o.Sn(),
				"ActiveImageEntityId":    o.ActiveImageEntityId,
				"CommittedImageEntityId": o.CommittedImageEntityId,
			}).Info("onu-software-image-activated")

			// powercycle the ONU
			// we run this in a separate goroutine so that
			// the ActivateSoftwareResponse is sent to VOLTHA
			// NOTE do we need to wait before rebooting?
			go func() {
				if err := o.Reboot(10 * time.Second); err != nil {
					log.WithFields(log.Fields{
						"IntfId":       o.PonPortID,
						"OnuId":        o.ID,
						"SerialNumber": o.Sn(),
						"err":          err,
					}).Error("cannot-reboot-onu-after-omci-activate-software-request")
				}
			}()
		}
	case omci.CommitSoftwareRequestType:
		if responsePkt, errResp = omcilib.CreateCommitSoftwareResponse(msg.OmciPkt, msg.OmciMsg); errResp == nil {
			o.MibDataSync++
			if msgObj, err := omcilib.ParseCommitSoftwareRequest(msg.OmciPkt); err == nil {
				// TODO validate that the image to commit is:
				// - active
				// - not already committed
				o.ActiveImageEntityId = msgObj.EntityInstance
				o.CommittedImageEntityId = msgObj.EntityInstance
				//committed becomes standby
				o.StandbyImageVersion = o.CommittedImageVersion
				o.CommittedImageVersion = o.ActiveImageVersion
			} else {
				onuLogger.Errorf("something-went-wrong-while-committing: %s", err)
			}
			if err := o.InternalState.Event(OnuTxCommitImage); err != nil {
				onuLogger.WithFields(log.Fields{
					"OnuId":  o.ID,
					"IntfId": o.PonPortID,
					"OnuSn":  o.Sn(),
					"Err":    err.Error(),
				}).Errorf("cannot-change-onu-internal-state-to-%s", OnuStateImageCommitted)
			}
			onuLogger.WithFields(log.Fields{
				"OnuId":                  o.ID,
				"IntfId":                 o.PonPortID,
				"OnuSn":                  o.Sn(),
				"ActiveImageEntityId":    o.ActiveImageEntityId,
				"CommittedImageEntityId": o.CommittedImageEntityId,
			}).Info("onu-software-image-committed")
		}
	case omci.GetAllAlarmsRequestType:
		// Reset the alarm sequence number on receiving get all alarms request.
		o.onuAlarmsInfoLock.Lock()
		for key, alarmInfo := range o.onuAlarmsInfo {
			// reset the alarm sequence no
			alarmInfo.SequenceNo = 0
			o.onuAlarmsInfo[key] = alarmInfo
		}
		o.onuAlarmsInfoLock.Unlock()
		responsePkt, _ = omcilib.CreateGetAllAlarmsResponse(msg.OmciMsg, o.onuAlarmsInfo)
	case omci.GetAllAlarmsNextRequestType:
		if responsePkt, errResp = omcilib.CreateGetAllAlarmsNextResponse(msg.OmciPkt, msg.OmciMsg, o.onuAlarmsInfo); errResp != nil {
			responsePkt = nil //Do not send any response for error case
		}
	default:
		onuLogger.WithFields(log.Fields{
			"omciBytes":    hex.EncodeToString(msg.OmciPkt.Data()),
			"omciPkt":      msg.OmciPkt,
			"omciMsgType":  msg.OmciMsg.MessageType,
			"transCorrId":  msg.OmciMsg.TransactionID,
			"IntfId":       o.PonPortID,
			"SerialNumber": o.Sn(),
		}).Warnf("OMCI-message-not-supported")
	}

	if responsePkt != nil {
		if err := o.sendOmciIndication(responsePkt, msg.OmciMsg.TransactionID, stream); err != nil {
			onuLogger.WithFields(log.Fields{
				"IntfId":          o.PonPortID,
				"SerialNumber":    o.Sn(),
				"omciPacket":      responsePkt,
				"msg.OmciMsgType": msg.OmciMsg.MessageType,
				"transCorrId":     msg.OmciMsg.TransactionID,
			}).Errorf("failed-to-send-omci-message: %v", err)
		}
	}

	o.publishOmciEvent(msg)
	return nil
}

// sendOmciIndication takes an OMCI packet and sends it up to VOLTHA
func (o *Onu) sendOmciIndication(responsePkt []byte, txId uint16, stream bbsim.Stream) error {
	indication := &openolt.Indication_OmciInd{
		OmciInd: &openolt.OmciIndication{
			IntfId: o.PonPortID,
			OnuId:  o.ID,
			Pkt:    responsePkt,
		},
	}
	if err := stream.Send(&openolt.Indication{Data: indication}); err != nil {
		return fmt.Errorf("failed-to-send-omci-message: %v", err)
	}
	onuLogger.WithFields(log.Fields{
		"IntfId":       o.PonPortID,
		"SerialNumber": o.Sn(),
		"omciPacket":   hex.EncodeToString(indication.OmciInd.Pkt),
		"transCorrId":  txId,
	}).Trace("omci-message-sent")
	return nil
}

// FindUniById retrieves a UNI by ID
func (o *Onu) FindUniById(uniID uint32) (*UniPort, error) {
	for _, u := range o.UniPorts {
		uni := u.(*UniPort)
		if uni.ID == uniID {
			return uni, nil
		}
	}
	return nil, fmt.Errorf("cannot-find-uni-with-id-%d-on-onu-%s", uniID, o.Sn())
}

// FindPotsById retrieves a POTS port by ID
func (o *Onu) FindPotsById(uniID uint32) (*PotsPort, error) {
	for _, p := range o.PotsPorts {
		pots := p.(*PotsPort)
		if pots.ID == uniID {
			return pots, nil
		}
	}
	return nil, fmt.Errorf("cannot-find-pots-with-id-%d-on-onu-%s", uniID, o.Sn())
}

// FindUniByEntityId retrieves a uni by MeID (the OMCI entity ID)
func (o *Onu) FindUniByEntityId(meId uint16) (*UniPort, error) {
	entityId := omcilib.EntityID{}.FromUint16(meId)
	for _, u := range o.UniPorts {
		uni := u.(*UniPort)
		if uni.MeId.Equals(entityId) {
			return uni, nil
		}
	}
	return nil, fmt.Errorf("cannot-find-uni-with-meid-%s-on-onu-%s", entityId.ToString(), o.Sn())
}

// FindPotsByEntityId retrieves a POTS uni by MeID (the OMCI entity ID)
func (o *Onu) FindPotsByEntityId(meId uint16) (*PotsPort, error) {
	entityId := omcilib.EntityID{}.FromUint16(meId)
	for _, p := range o.PotsPorts {
		pots := p.(*PotsPort)
		if pots.MeId.Equals(entityId) {
			return pots, nil
		}
	}
	return nil, fmt.Errorf("cannot-find-pots-with-meid-%s-on-onu-%s", entityId.ToString(), o.Sn())
}

func (o *Onu) SetID(id uint32) {
	onuLogger.WithFields(log.Fields{
		"IntfId":       o.PonPortID,
		"OnuId":        id,
		"SerialNumber": o.Sn(),
	}).Debug("Storing OnuId ")
	o.ID = id
}

func (o *Onu) handleFlowAdd(msg bbsim.OnuFlowUpdateMessage) {
	onuLogger.WithFields(log.Fields{
		"AllocId":           msg.Flow.AllocId,
		"Cookie":            msg.Flow.Cookie,
		"DstPort":           msg.Flow.Classifier.DstPort,
		"FlowId":            msg.Flow.FlowId,
		"FlowType":          msg.Flow.FlowType,
		"GemportId":         msg.Flow.GemportId,
		"InnerVlan":         msg.Flow.Classifier.IVid,
		"IntfId":            msg.Flow.AccessIntfId,
		"IpProto":           msg.Flow.Classifier.IpProto,
		"OnuId":             msg.Flow.OnuId,
		"OnuSn":             o.Sn(),
		"OuterVlan":         msg.Flow.Classifier.OVid,
		"PortNo":            msg.Flow.PortNo,
		"SrcPort":           msg.Flow.Classifier.SrcPort,
		"UniID":             msg.Flow.UniId,
		"ClassifierEthType": fmt.Sprintf("%x", msg.Flow.Classifier.EthType),
		"ClassifierOPbits":  msg.Flow.Classifier.OPbits,
		"ClassifierIVid":    msg.Flow.Classifier.IVid,
		"ClassifierOVid":    msg.Flow.Classifier.OVid,
		"ReplicateFlow":     msg.Flow.ReplicateFlow,
		"PbitToGemport":     msg.Flow.PbitToGemport,
	}).Debug("OLT receives FlowAdd for ONU")

	o.FlowIds = append(o.FlowIds, msg.Flow.FlowId)

	var gemPortId uint32
	if msg.Flow.ReplicateFlow {
		// This means that the OLT should replicate the flow for each PBIT, for BBSim it's enough to use the
		// first available gemport (we only need to send one packet)
		// NOTE different TP may create different mapping between PBits and GemPorts, this may require some changes
		gemPortId = msg.Flow.PbitToGemport[0]
	} else {
		// if replicateFlows is false, then the flow is carrying the correct GemPortId
		gemPortId = uint32(msg.Flow.GemportId)
	}

	uni, err := o.FindUniById(uint32(msg.Flow.UniId))
	if err != nil {
		onuLogger.WithFields(log.Fields{
			"IntfId":       o.PonPortID,
			"OnuId":        o.ID,
			"UniId":        msg.Flow.UniId,
			"PortNo":       msg.Flow.PortNo,
			"SerialNumber": o.Sn(),
			"FlowId":       msg.Flow.FlowId,
			"FlowType":     msg.Flow.FlowType,
		}).Error("cannot-find-uni-port-for-flow")
	}

	uni.addGemPortToService(gemPortId, msg.Flow.Classifier.EthType, msg.Flow.Classifier.OVid, msg.Flow.Classifier.IVid)
	uni.StorePortNo(msg.Flow.PortNo)

	if msg.Flow.Classifier.EthType == uint32(layers.EthernetTypeEAPOL) && msg.Flow.Classifier.OVid == 4091 {
		onuLogger.WithFields(log.Fields{
			"IntfId":       o.PonPortID,
			"OnuId":        o.ID,
			"UniId":        msg.Flow.UniId,
			"PortNo":       msg.Flow.PortNo,
			"SerialNumber": o.Sn(),
			"FlowId":       msg.Flow.FlowId,
		}).Debug("EAPOL flow detected")
		uni.HandleAuth()
	} else if msg.Flow.Classifier.EthType == uint32(layers.EthernetTypeIPv4) &&
		msg.Flow.Classifier.SrcPort == uint32(68) &&
		msg.Flow.Classifier.DstPort == uint32(67) {
		onuLogger.WithFields(log.Fields{
			"IntfId":       o.PonPortID,
			"OnuId":        o.ID,
			"UniId":        msg.Flow.UniId,
			"PortNo":       msg.Flow.PortNo,
			"SerialNumber": o.Sn(),
			"FlowId":       msg.Flow.FlowId,
			"FlowType":     msg.Flow.FlowType,
		}).Debug("DHCP flow detected")
		uni.HandleDhcp(uint8(msg.Flow.Classifier.OPbits), int(msg.Flow.Classifier.OVid))
	}
}

func (o *Onu) handleFlowRemove(msg bbsim.OnuFlowUpdateMessage) {
	onuLogger.WithFields(log.Fields{
		"IntfId":       o.PonPortID,
		"OnuId":        o.ID,
		"SerialNumber": o.Sn(),
		"FlowId":       msg.Flow.FlowId,
		"FlowType":     msg.Flow.FlowType,
	}).Debug("ONU receives FlowRemove")

	for idx, flow := range o.FlowIds {
		// If the gemport is found, delete it from local cache.
		if flow == msg.Flow.FlowId {
			o.FlowIds = append(o.FlowIds[:idx], o.FlowIds[idx+1:]...)
			break
		}
	}

	if len(o.FlowIds) == 0 {
		onuLogger.WithFields(log.Fields{
			"IntfId":       o.PonPortID,
			"OnuId":        o.ID,
			"SerialNumber": o.Sn(),
		}).Info("Resetting GemPort")

		// check if ONU delete is performed and
		// terminate the ONU's ProcessOnuMessages Go routine
		if o.InternalState.Current() == OnuStateDisabled {
			close(o.Channel)
		}
	}
}

func (o *Onu) Reboot(timeout time.Duration) error {
	onuLogger.WithFields(log.Fields{
		"IntfId":       o.PonPortID,
		"OnuId":        o.ID,
		"SerialNumber": o.Sn(),
	}).Debug("shutting-down-onu")
	if err := o.HandleShutdownONU(); err != nil {
		return err
	}
	time.Sleep(timeout)
	onuLogger.WithFields(log.Fields{
		"IntfId":       o.PonPortID,
		"OnuId":        o.ID,
		"SerialNumber": o.Sn(),
	}).Debug("power-on-onu")
	if err := o.HandlePowerOnONU(); err != nil {
		return err
	}
	return nil
}

// returns true if the request is successful, false otherwise
func (o *Onu) handleEndSoftwareDownloadRequest(msg bbsim.OmciMessage) bool {
	msgObj, err := omcilib.ParseEndSoftwareDownloadRequest(msg.OmciPkt)
	if err != nil {
		onuLogger.WithFields(log.Fields{
			"OmciMsgType":  msg.OmciMsg.MessageType,
			"TransCorrId":  msg.OmciMsg.TransactionID,
			"Err":          err.Error(),
			"IntfId":       o.PonPortID,
			"SerialNumber": o.Sn(),
		}).Error("error-while-processing-end-software-download-request")
		return false
	}

	onuLogger.WithFields(log.Fields{
		"OnuId":  o.ID,
		"IntfId": o.PonPortID,
		"OnuSn":  o.Sn(),
		"msgObj": msgObj,
	}).Trace("EndSoftwareDownloadRequest received message")

	// if the image download is ongoing and we receive a message with
	// ImageSize = 0 and Crc = 4294967295 (0xFFFFFFFF) respond with success
	if o.ImageSoftwareReceivedSections > 0 &&
		msgObj.ImageSize == 0 &&
		msgObj.CRC32 == 4294967295 {
		o.ImageSoftwareReceivedSections = 0
		// NOTE potentially we may want to add a ONU state to reflect
		// the software download abort
		return true
	}

	// In the startSoftwareDownload we get the image size and the window size.
	// We calculate how many DownloadSection we should receive and validate
	// that we got the correct amount when we receive this message
	// If the received sections are different from the expected sections
	// respond with failure
	if o.ImageSoftwareExpectedSections != o.ImageSoftwareReceivedSections {
		onuLogger.WithFields(log.Fields{
			"OnuId":            o.ID,
			"IntfId":           o.PonPortID,
			"OnuSn":            o.Sn(),
			"ExpectedSections": o.ImageSoftwareExpectedSections,
			"ReceivedSections": o.ImageSoftwareReceivedSections,
		}).Errorf("onu-did-not-receive-all-image-sections")
		return false
	}

	// check the received CRC vs the computed CRC
	computedCRC := crc32a.Checksum(o.ImageSectionData[:int(msgObj.ImageSize)])
	//Convert the crc to network byte order
	var byteSlice = make([]byte, 4)
	binary.LittleEndian.PutUint32(byteSlice, computedCRC)
	computedCRC = binary.BigEndian.Uint32(byteSlice)
	if msgObj.CRC32 != computedCRC {
		onuLogger.WithFields(log.Fields{
			"OnuId":         o.ID,
			"IntfId":        o.PonPortID,
			"OnuSn":         o.Sn(),
			"ReceivedCRC":   msgObj.CRC32,
			"CalculatedCRC": computedCRC,
		}).Errorf("onu-image-crc-validation-failed")
		return false
	}

	o.StandbyImageVersion = o.InDownloadImageVersion
	onuLogger.WithFields(log.Fields{
		"OnuId":          o.ID,
		"IntfId":         o.PonPortID,
		"OnuSn":          o.Sn(),
		"StandbyVersion": o.StandbyImageVersion,
	}).Debug("onu-image-version-updated")
	return true
}

// BBR methods

func sendOmciMsg(pktBytes []byte, intfId uint32, onuId uint32, sn *openolt.SerialNumber, msgType string, client openolt.OpenoltClient) {
	omciMsg := openolt.OmciMsg{
		IntfId: intfId,
		OnuId:  onuId,
		Pkt:    pktBytes,
	}

	if _, err := client.OmciMsgOut(context.Background(), &omciMsg); err != nil {
		log.WithFields(log.Fields{
			"IntfId":       intfId,
			"OnuId":        onuId,
			"SerialNumber": common.OnuSnToString(sn),
			"Pkt":          omciMsg.Pkt,
		}).Fatalf("Failed to send MIB Reset")
	}
	log.WithFields(log.Fields{
		"IntfId":       intfId,
		"OnuId":        onuId,
		"SerialNumber": common.OnuSnToString(sn),
		"Pkt":          omciMsg.Pkt,
	}).Tracef("Sent OMCI message %s", msgType)
}

func (onu *Onu) getNextTid(highPriority ...bool) uint16 {
	var next uint16
	if len(highPriority) > 0 && highPriority[0] {
		next = onu.hpTid
		onu.hpTid += 1
		if onu.hpTid < 0x8000 {
			onu.hpTid = 0x8000
		}
	} else {
		next = onu.tid
		onu.tid += 1
		if onu.tid >= 0x8000 {
			onu.tid = 1
		}
	}
	return next
}

// TODO move this method in responders/omcisim
// StartOmci is called in BBR to start the OMCI state machine
func (o *Onu) StartOmci(client openolt.OpenoltClient) {
	mibReset, _ := omcilib.CreateMibResetRequest(o.getNextTid(false))
	sendOmciMsg(mibReset, o.PonPortID, o.ID, o.SerialNumber, "mibReset", client)
}

// handleOmciResponse is used in BBR to generate the OMCI packets the openolt-adapter would send to the device
func (o *Onu) handleOmciResponse(msg bbsim.OmciIndicationMessage, client openolt.OpenoltClient) {

	// we need to encode the packet in HEX
	pkt := make([]byte, len(msg.OmciInd.Pkt)*2)
	hex.Encode(pkt, msg.OmciInd.Pkt)
	packet, omciMsg, err := omcilib.ParseOpenOltOmciPacket(pkt)
	if err != nil {
		log.WithFields(log.Fields{
			"IntfId":       o.PonPortID,
			"SerialNumber": o.Sn(),
			"omciPacket":   msg.OmciInd.Pkt,
		}).Error("BBR Cannot parse OMCI packet")
	}

	log.WithFields(log.Fields{
		"IntfId":  msg.OmciInd.IntfId,
		"OnuId":   msg.OmciInd.OnuId,
		"OnuSn":   o.Sn(),
		"Pkt":     msg.OmciInd.Pkt,
		"msgType": omciMsg.MessageType,
	}).Trace("ONU Receives OMCI Msg")
	switch omciMsg.MessageType {
	default:
		log.WithFields(log.Fields{
			"IntfId":  msg.OmciInd.IntfId,
			"OnuId":   msg.OmciInd.OnuId,
			"OnuSn":   o.Sn(),
			"Pkt":     msg.OmciInd.Pkt,
			"msgType": omciMsg.MessageType,
		}).Fatalf("unexpected frame: %v", packet)
	case omci.MibResetResponseType:
		mibUpload, _ := omcilib.CreateMibUploadRequest(o.getNextTid(false))
		sendOmciMsg(mibUpload, o.PonPortID, o.ID, o.SerialNumber, "mibUpload", client)
	case omci.MibUploadResponseType:
		mibUploadNext, _ := omcilib.CreateMibUploadNextRequest(o.getNextTid(false), o.seqNumber)
		sendOmciMsg(mibUploadNext, o.PonPortID, o.ID, o.SerialNumber, "mibUploadNext", client)
	case omci.MibUploadNextResponseType:
		o.seqNumber++
		// once the mibUpload is complete send a SetRequest for the PPTP to enable the UNI
		// NOTE that in BBR we only enable the first UNI
		if o.seqNumber == o.MibDb.NumberOfBaselineCommands {
			meId := omcilib.GenerateUniPortEntityId(1)

			meParams := me.ParamData{
				EntityID:   meId.ToUint16(),
				Attributes: me.AttributeValueMap{me.PhysicalPathTerminationPointEthernetUni_AdministrativeState: 0},
			}
			managedEntity, omciError := me.NewPhysicalPathTerminationPointEthernetUni(meParams)
			if omciError.GetError() != nil {
				onuLogger.WithFields(log.Fields{
					"OnuId":  o.ID,
					"IntfId": o.PonPortID,
					"OnuSn":  o.Sn(),
				}).Fatal(omciError.GetError())
			}

			setPPtp, _ := omcilib.CreateSetRequest(managedEntity, 1)
			sendOmciMsg(setPPtp, o.PonPortID, o.ID, o.SerialNumber, "setRquest", client)
		} else {
			mibUploadNext, _ := omcilib.CreateMibUploadNextRequest(o.getNextTid(false), o.seqNumber)
			sendOmciMsg(mibUploadNext, o.PonPortID, o.ID, o.SerialNumber, "mibUploadNext", client)
		}
	case omci.SetResponseType:
		// once we set the PPTP to active we can start sending flows

		if err := o.InternalState.Event(BbrOnuTxSendEapolFlow); err != nil {
			onuLogger.WithFields(log.Fields{
				"OnuId":  o.ID,
				"IntfId": o.PonPortID,
				"OnuSn":  o.Sn(),
			}).Errorf("Error while transitioning ONU State %v", err)
		}
	case omci.AlarmNotificationType:
		log.Info("bbr-received-alarm")
	}
}

func (o *Onu) sendEapolFlow(client openolt.OpenoltClient) {

	classifierProto := openolt.Classifier{
		EthType: uint32(layers.EthernetTypeEAPOL),
		OVid:    4091,
	}

	actionProto := openolt.Action{}

	downstreamFlow := openolt.Flow{
		AccessIntfId:  int32(o.PonPortID),
		OnuId:         int32(o.ID),
		UniId:         int32(0), // NOTE do not hardcode this, we need to support multiple UNIs
		FlowId:        uint64(o.ID),
		FlowType:      flowTypeDownstream,
		NetworkIntfId: int32(0),
		Classifier:    &classifierProto,
		Action:        &actionProto,
		Priority:      int32(100),
		Cookie:        uint64(o.ID),
		PortNo:        o.ID, // NOTE we are using this to map an incoming packetIndication to an ONU
		// AllocId and GemPorts need to be unique per PON
		// for now use the ONU-ID, will need to change once we support multiple UNIs
		AllocId:   int32(o.ID),
		GemportId: int32(o.ID),
	}

	if _, err := client.FlowAdd(context.Background(), &downstreamFlow); err != nil {
		log.WithFields(log.Fields{
			"IntfId":       o.PonPortID,
			"OnuId":        o.ID,
			"FlowId":       downstreamFlow.FlowId,
			"PortNo":       downstreamFlow.PortNo,
			"SerialNumber": common.OnuSnToString(o.SerialNumber),
			"Err":          err,
		}).Fatalf("Failed to add EAPOL Flow")
	}
	log.WithFields(log.Fields{
		"IntfId":       o.PonPortID,
		"OnuId":        o.ID,
		"FlowId":       downstreamFlow.FlowId,
		"PortNo":       downstreamFlow.PortNo,
		"SerialNumber": common.OnuSnToString(o.SerialNumber),
	}).Info("Sent EAPOL Flow")
}

func (o *Onu) sendDhcpFlow(client openolt.OpenoltClient) {

	// BBR only works with a single UNI and a single service (ATT HSIA)
	hsia := o.UniPorts[0].(*UniPort).Services[0].(*Service)
	classifierProto := openolt.Classifier{
		EthType: uint32(layers.EthernetTypeIPv4),
		SrcPort: uint32(68),
		DstPort: uint32(67),
		OVid:    uint32(hsia.CTag),
		OPbits:  255,
	}

	actionProto := openolt.Action{}

	downstreamFlow := openolt.Flow{
		AccessIntfId:  int32(o.PonPortID),
		OnuId:         int32(o.ID),
		UniId:         int32(0), // BBR only supports a single UNI
		FlowId:        uint64(o.ID),
		FlowType:      flowTypeDownstream,
		NetworkIntfId: int32(0),
		Classifier:    &classifierProto,
		Action:        &actionProto,
		Priority:      int32(100),
		Cookie:        uint64(o.ID),
		PortNo:        o.ID, // NOTE we are using this to map an incoming packetIndication to an ONU
		// AllocId and GemPorts need to be unique per PON
		// for now use the ONU-ID, will need to change once we support multiple UNIs
		AllocId:   int32(o.ID),
		GemportId: int32(o.ID),
	}

	if _, err := client.FlowAdd(context.Background(), &downstreamFlow); err != nil {
		log.WithFields(log.Fields{
			"IntfId":       o.PonPortID,
			"OnuId":        o.ID,
			"FlowId":       downstreamFlow.FlowId,
			"PortNo":       downstreamFlow.PortNo,
			"SerialNumber": common.OnuSnToString(o.SerialNumber),
			"Err":          err,
		}).Fatalf("Failed to send DHCP Flow")
	}
	log.WithFields(log.Fields{
		"IntfId":       o.PonPortID,
		"OnuId":        o.ID,
		"FlowId":       downstreamFlow.FlowId,
		"PortNo":       downstreamFlow.PortNo,
		"SerialNumber": common.OnuSnToString(o.SerialNumber),
	}).Info("Sent DHCP Flow")
}

// DeleteFlow method search and delete flowKey from the onu flows slice
func (onu *Onu) DeleteFlow(key FlowKey) {
	for pos, flowKey := range onu.Flows {
		if flowKey == key {
			// delete the flowKey by shifting all flowKeys by one
			onu.Flows = append(onu.Flows[:pos], onu.Flows[pos+1:]...)
			t := make([]FlowKey, len(onu.Flows))
			copy(t, onu.Flows)
			onu.Flows = t
			break
		}
	}
}

/*
when ReDiscoverOnu is called during reboot, true is passed so that there is no delay in onu discoveries

	It is assumed that all onu resources are cleared and no sleep is required
*/
func (onu *Onu) ReDiscoverOnu(isReboot bool) {
	// Wait for few seconds to be sure of the cleanup
	if !isReboot {
		time.Sleep(5 * time.Second)
	}

	onuLogger.WithFields(log.Fields{
		"IntfId": onu.PonPortID,
		"OnuId":  onu.ID,
		"OnuSn":  onu.Sn(),
	}).Debug("Send ONU Re-Discovery")
	// Do not send discovery if OLT is in Deleted state
	oltState := onu.PonPort.Olt.InternalState.Current()
	if oltState == "deleted" {
		onuLogger.WithFields(log.Fields{
			"IntfId": onu.PonPortID,
			"OnuId":  onu.ID,
			"OnuSn":  onu.Sn(),
		}).Infof("Skip ONU Re-Discovery. oltState=%s", oltState)
	} else if onu.InternalState.Current() != OnuStateDiscovered {
		// ONU Re-Discovery
		if err := onu.InternalState.Event(OnuTxInitialize); err != nil {
			log.WithFields(log.Fields{
				"IntfId": onu.PonPortID,
				"OnuSn":  onu.Sn(),
				"OnuId":  onu.ID,
			}).Infof("Failed to transition ONU to %s state: %s", OnuStateInitialized, err.Error())
		}

		if err := onu.InternalState.Event(OnuTxDiscover); err != nil {
			log.WithFields(log.Fields{
				"IntfId": onu.PonPortID,
				"OnuSn":  onu.Sn(),
				"OnuId":  onu.ID,
			}).Infof("Failed to transition ONU to %s state: %s", OnuStateDiscovered, err.Error())
		}
	} else {
		//if onu is already discovered dont change the state ut rather fire the indication again (this case happens if voltha misses the indications)
		msg := bbsim.Message{
			Type: bbsim.OnuDiscIndication,
			Data: bbsim.OnuDiscIndicationMessage{
				OperState: bbsim.UP,
			},
		}
		onu.Channel <- msg
	}

}

// deprecated, delegate this to the uniPort
func (onu *Onu) findServiceByMacAddress(macAddress net.HardwareAddr) (*Service, error) {
	// FIXME is there a better way to avoid this loop?
	for _, u := range onu.UniPorts {
		uni := u.(*UniPort)
		for _, s := range uni.Services {
			service := s.(*Service)
			if service.HwAddress.String() == macAddress.String() {
				return service, nil
			}
		}
	}
	return nil, fmt.Errorf("cannot-find-service-with-mac-address-%s", macAddress.String())
}

func (onu *Onu) findUniByPortNo(portNo uint32) (*UniPort, error) {
	for _, u := range onu.UniPorts {
		uni := u.(*UniPort)
		if uni.PortNo == portNo {
			return uni, nil
		}
	}
	return nil, fmt.Errorf("cannot-find-uni-with-port-no-%d", portNo)
}

// findUniWithIgmpMembership returns the list of UNIs which has sent any IGMP messages
// and has any active membershipments currently
func (onu *Onu) findUniWithIgmpMembership() []UniPort {
	var uniPorts []UniPort
	for _, u := range onu.UniPorts {
		uni := u.(*UniPort)
		if !uni.OperState.Is(UniStateUp) {
			// if the UNI is disabled, ignore it
			continue
		}
		for _, s := range uni.Services {
			service := s.(*Service)
			if service.NeedsIgmp {
				if !service.InternalState.Is(ServiceStateInitialized) {
					log.WithFields(log.Fields{
						"OnuId":   onu.ID,
						"UniId":   uni.ID,
						"IntfId":  onu.PonPortID,
						"OnuSn":   onu.Sn(),
						"Service": service.Name,
					}).Warn("service-not-initialized-skipping")
					continue
				}
				if len(service.groupAddresses) > 0 {
					uniPorts = append(uniPorts, *uni)
				}
			}
		}
	}
	return uniPorts
}

func (o *Onu) SendOMCIAlarmNotificationMsg(raiseOMCIAlarm bool, alarmType string) {
	switch alarmType {
	case "ONU_ALARM_LOS":
		msg := bbsim.Message{
			Type: bbsim.UniStatusAlarm,
			Data: bbsim.UniStatusAlarmMessage{
				OnuSN:          o.SerialNumber,
				OnuID:          o.ID,
				EntityID:       257,
				RaiseOMCIAlarm: raiseOMCIAlarm,
			},
		}
		o.Channel <- msg
	}

}

func (o *Onu) IncrementAlarmSequenceNumber(key omcilib.OnuAlarmInfoMapKey) uint8 {
	o.onuAlarmsInfoLock.Lock()
	defer o.onuAlarmsInfoLock.Unlock()
	if alarmInfo, ok := o.onuAlarmsInfo[key]; ok {
		if alarmInfo.SequenceNo == 255 {
			alarmInfo.SequenceNo = 1
		} else {
			alarmInfo.SequenceNo++
		}
		o.onuAlarmsInfo[key] = alarmInfo
		return alarmInfo.SequenceNo
	} else {
		// This is the first time alarm notification message is being sent
		o.onuAlarmsInfo[key] = omcilib.OnuAlarmInfo{
			SequenceNo: 1,
		}
		return 1
	}
}

func (o *Onu) InvalidateMibDataSync() {
	rand.Seed(time.Now().UnixNano())
	r := uint8(rand.Intn(10) + 1)

	o.MibDataSync += r

	// Since MibDataSync is a uint8, summing to it will never
	// result in a value higher than 255, but could be 0
	if o.MibDataSync == 0 {
		o.MibDataSync++
	}
}
