SEBA-432
SEBA-565
SEBA-654 (alarms)

implemented

fix Jenkins make errors
fix merge conflicts
address review comments

Change-Id: Ia2e95afb33ce55054afa1fcbd9beb6ada62dd764
diff --git a/core/core_server.go b/core/core_server.go
index ba0727a..4c4fcab 100644
--- a/core/core_server.go
+++ b/core/core_server.go
@@ -18,54 +18,73 @@
 
 import (
 	"context"
+	"encoding/hex"
 	"errors"
+	"reflect"
 	"strconv"
 	"sync"
-	"reflect"
 
-	omci "github.com/opencord/omci-sim"
+	pb "gerrit.opencord.org/voltha-bbsim/api"
 	"gerrit.opencord.org/voltha-bbsim/common/logger"
 	"gerrit.opencord.org/voltha-bbsim/common/utils"
 	"gerrit.opencord.org/voltha-bbsim/device"
+	flowHandler "gerrit.opencord.org/voltha-bbsim/flow"
 	openolt "gerrit.opencord.org/voltha-bbsim/protos"
 	"github.com/google/gopacket"
 	"github.com/google/gopacket/layers"
 	"github.com/google/gopacket/pcap"
+	omci "github.com/opencord/omci-sim"
 	log "github.com/sirupsen/logrus"
 	"golang.org/x/sync/errgroup"
 	"google.golang.org/grpc"
 )
 
+// Constants
 const (
-	NNI_VETH_NORTH_PFX = "nni_north"
-	NNI_VETH_SOUTH_PFX = "nni_south"
-	MAX_ONUS_PER_PON   = 64 // This value should be the same with the value in AdapterPlatrorm class
+	NniVethNorthPfx    = "nni_north"
+	NniVethSouthPfx    = "nni_south"
+	MaxPonPorts        = 64
+	MaxOnusPerPon      = 64 // This value should be the same with the value in AdapterPlatform class
+	VendorIDLength     = 4
+	SerialNumberLength = 12
+	OpenOltStart       = "start"
+	OpenOltStop        = "stop"
 )
 
 // Server structure consists of all the params required for BBsim.
 type Server struct {
-	wg           *sync.WaitGroup
-	Olt          *device.Olt
-	Onumap       map[uint32][]*device.Onu
-	Ioinfos      []*Ioinfo
-	gRPCserver   *grpc.Server
-	gRPCAddress  string
-	gRPCPort     uint32
-	Vethnames    []string
-	IndInterval  int
-	Processes    []string
-	EnableServer *openolt.Openolt_EnableIndicationServer
-	CtagMap      map[string]uint32
-	cancel       context.CancelFunc
-	stateRepCh   chan stateReport
-	omciIn       chan openolt.OmciIndication
-	omciOut      chan openolt.OmciMsg
-	eapolIn      chan *byteMsg
-	eapolOut     chan *byteMsg
-	dhcpIn       chan *byteMsg
-	dhcpOut      chan *byteMsg
+	wg              *sync.WaitGroup
+	Olt             *device.Olt
+	Onumap          map[uint32][]*device.Onu
+	SNmap           sync.Map
+	AutoONUActivate bool
+	Ioinfos         []*Ioinfo
+	gRPCserver      *grpc.Server
+	gRPCAddress     string
+	gRPCPort        uint32
+	mgmtServer      *grpc.Server
+	mgmtGrpcPort    uint32
+	mgmtRestPort    uint32
+	Vethnames       []string
+	IndInterval     int
+	Processes       []string
+	EnableServer    *openolt.Openolt_EnableIndicationServer
+	CtagMap         map[string]uint32
+	cancel          context.CancelFunc
+	stateRepCh      chan stateReport
+	omciIn          chan openolt.OmciIndication
+	omciOut         chan openolt.OmciMsg
+	eapolIn         chan *byteMsg
+	eapolOut        chan *byteMsg
+	dhcpIn          chan *byteMsg
+	dhcpOut         chan *byteMsg
+	FlowMap         map[FlowKey]*openolt.Flow
+	alarmCh         chan *openolt.Indication
+	deviceActionCh  chan *pb.DeviceAction
+	serverActionCh  chan string
 }
 
+// Packet structure
 type Packet struct {
 	Info *Ioinfo
 	Pkt  gopacket.Packet
@@ -83,56 +102,89 @@
 	next    device.DeviceState
 }
 
+// FlowKey used for FlowMap key
+type FlowKey struct {
+	FlowID        uint32
+	FlowDirection string
+}
+
 // NewCore initialize OLT and ONU objects
 func NewCore(opt *option) *Server {
 	// TODO: make it decent
 	oltid := opt.oltid
 	npon := opt.npon
-	nonus := opt.nonus
-	s := Server{
-		Olt:          device.NewOlt(oltid, npon, 1),
-		Onumap:       make(map[uint32][]*device.Onu),
-		Ioinfos:      []*Ioinfo{},
-		gRPCAddress:  opt.address,
-		gRPCPort:     opt.port,
-		Vethnames:    []string{},
-		IndInterval:  opt.intvl,
-		Processes:    []string{},
-		EnableServer: nil,
-		stateRepCh:   make(chan stateReport, 8),
-		omciIn:       make(chan openolt.OmciIndication, 1024),
-		omciOut:      make(chan openolt.OmciMsg, 1024),
-		eapolIn:      make(chan *byteMsg, 1024),
-		eapolOut:     make(chan *byteMsg, 1024),
-		dhcpIn:       make(chan *byteMsg, 1024),
-		dhcpOut:      make(chan *byteMsg, 1024),
+	if npon > MaxPonPorts {
+		logger.Warn("Provided number of PON ports exceeds limit of %d", MaxPonPorts)
+		logger.Info("Setting number of PON ports to %d", MaxPonPorts)
+		npon = MaxPonPorts
 	}
+	nonus := opt.nonus
+	if nonus > MaxOnusPerPon {
+		logger.Warn("Provided number of ONUs per PON port exceeds limit of %d", MaxOnusPerPon)
+		logger.Info("Setting number of ONUs per PON port to %d", MaxOnusPerPon)
+		nonus = MaxOnusPerPon
+	}
+	s := Server{
+		Olt:             device.NewOlt(oltid, npon, 1), // TODO nnni is to be taken from options
+		Onumap:          make(map[uint32][]*device.Onu),
+		Ioinfos:         []*Ioinfo{},
+		gRPCAddress:     opt.address,
+		gRPCPort:        opt.port,
+		Vethnames:       []string{},
+		IndInterval:     opt.intvl,
+		AutoONUActivate: !opt.interactiveOnuActivation,
+		Processes:       []string{},
+		mgmtGrpcPort:    opt.mgmtGrpcPort,
+		mgmtRestPort:    opt.mgmtRestPort,
+		EnableServer:    nil,
+		stateRepCh:      make(chan stateReport, 8),
+		omciIn:          make(chan openolt.OmciIndication, 1024),
+		omciOut:         make(chan openolt.OmciMsg, 1024),
+		eapolIn:         make(chan *byteMsg, 1024),
+		eapolOut:        make(chan *byteMsg, 1024),
+		dhcpIn:          make(chan *byteMsg, 1024),
+		dhcpOut:         make(chan *byteMsg, 1024),
+		FlowMap:         make(map[FlowKey]*openolt.Flow),
+		serverActionCh:  make(chan string),
+	}
+	logger.Info("OLT %d created: %v", s.Olt.ID, s.Olt)
 
 	nnni := s.Olt.NumNniIntf
 	logger.Info("OLT ID: %d was retrieved.", s.Olt.ID)
-	for intfid := nnni; intfid < npon+nnni; intfid++ {
+	logger.Info("OLT Serial-Number: %v", s.Olt.SerialNumber)
+	// Creating Onu Map
+	for intfid := uint32(0); intfid < npon; intfid++ {
 		s.Onumap[intfid] = device.NewOnus(oltid, intfid, nonus, nnni)
 	}
 
-	//TODO: To be fixed because it is hardcoded
+	logger.Debug("Onu Map:")
+	for _, onus := range s.Onumap {
+		for _, onu := range onus {
+			logger.Debug("%+v", *onu)
+		}
+	}
+
+	// TODO: To be fixed because it is hardcoded
 	s.CtagMap = make(map[string]uint32)
-	for i := 0; i < MAX_ONUS_PER_PON; i++ {
+	for i := 0; i < MaxOnusPerPon; i++ {
 		oltid := s.Olt.ID
 		intfid := uint32(1)
-		sn := convB2S(device.NewSN(oltid, intfid, uint32(i)))
+		sn := utils.ConvB2S(device.NewSN(oltid, intfid, uint32(i)))
 		s.CtagMap[sn] = uint32(900 + i) // This is hard coded for BBWF
 	}
+
+	flowHandler.InitializeFlowManager(s.Olt.ID)
 	return &s
 }
 
-// Start starts the BBSim and openolt gRPC servers (blocking)
+// Start starts the openolt gRPC server (blocking)
 func (s *Server) Start() error {
-	s.wg = &sync.WaitGroup{}
-	logger.Debug("Start() Start")
+	logger.Debug("Starting OpenOLT gRPC Server")
 	defer func() {
-		close(s.stateRepCh)
-		logger.Debug("Start() Done")
+		logger.Debug("OpenOLT gRPC Server Stopped")
 	}()
+
+	// Start Openolt gRPC server
 	addressport := s.gRPCAddress + ":" + strconv.Itoa(int(s.gRPCPort))
 	listener, gserver, err := NewGrpcServer(addressport)
 	if err != nil {
@@ -145,32 +197,67 @@
 		logger.Error("Failed to run gRPC server: %v", err)
 		return err
 	}
-	s.wg.Wait()
 	return nil
 }
 
-// Stop stops the BBSim and openolt gRPC servers (non-blocking).
+// Stop stops the openolt gRPC servers (non-blocking).
 func (s *Server) Stop() {
-	logger.Debug("Stop() Start")
-	defer logger.Debug("Stop() Done")
+	logger.Debug("Stopping OpenOLT gRPC Server & PktLoops")
+	defer logger.Debug("OpenOLT gRPC Server & PktLoops Stopped")
+
 	if s.gRPCserver != nil {
 		s.gRPCserver.Stop()
 		logger.Debug("gRPCserver.Stop()")
 	}
+
 	s.StopPktLoops()
 	return
 }
 
+func (s *Server) startMgmtServer(wg *sync.WaitGroup) {
+	defer logger.Debug("Management api server exited")
+
+	grpcAddressPort := s.gRPCAddress + ":" + strconv.Itoa(int(s.mgmtGrpcPort))
+	restAddressPort := s.gRPCAddress + ":" + strconv.Itoa(int(s.mgmtRestPort))
+	// Start rest gateway for BBSim server
+	go StartRestGatewayService(grpcAddressPort, restAddressPort, wg)
+	addressPort := s.gRPCAddress + ":" + strconv.Itoa(int(s.mgmtGrpcPort))
+
+	listener, apiserver, err := NewMgmtAPIServer(addressPort)
+	if err != nil {
+		logger.Error("Unable to create management api server %v", err)
+		return
+	}
+
+	s.mgmtServer = apiserver
+	pb.RegisterBBSimServiceServer(apiserver, s)
+	if e := apiserver.Serve(listener); e != nil {
+		logger.Error("Failed to run management api server %v", e)
+		return
+	}
+
+}
+
+func (s *Server) stopMgmtServer() error {
+	if s.mgmtServer != nil {
+		s.mgmtServer.GracefulStop()
+		logger.Debug("Management server stopped")
+		return nil
+	}
+	return errors.New("can not stop management server, server not created")
+}
+
 // Enable invokes methods for activation of OLT and ONU (blocking)
 func (s *Server) Enable(sv *openolt.Openolt_EnableIndicationServer) error {
 	olt := s.Olt
 	defer func() {
 		olt.Initialize()
-		for intfid := range s.Onumap {
-			for _, onu := range s.Onumap[intfid] {
-				onu.Initialize()
-			}
-		}
+		// Below lines commented as we dont want to change the onu state on restart
+		// for intfid := range s.Onumap {
+		// 	for _, onu := range s.Onumap[intfid] {
+		// 		onu.Initialize()
+		// 	}
+		// }
 		s.updateDevIntState(olt, device.OLT_INACTIVE)
 		logger.Debug("Enable() Done")
 	}()
@@ -185,11 +272,29 @@
 	coreCtx := context.Background()
 	coreCtx, corecancel := context.WithCancel(coreCtx)
 	s.cancel = corecancel
-	go s.sendDiscovertoONUs(*sv)
 
-	if err := s.StartPktLoops(coreCtx, *sv); err != nil {
-		return err
+	errorchan := make(chan error, 5)
+	go s.StartPktLoops(coreCtx, *sv, errorchan)
+
+	if s.AutoONUActivate == true {
+		// Initialize all ONUs
+		for intfid := range s.Onumap {
+			for _, onu := range s.Onumap[intfid] {
+				onu.Initialize()
+			}
+		}
+		// Activate all ONUs
+		s.activateONUs(*sv, s.Onumap)
 	}
+
+	select {
+	case err := <-errorchan:
+		if err != nil {
+			logger.Debug("Error: %v", err)
+			return err
+		}
+	}
+
 	return nil
 }
 
@@ -224,10 +329,28 @@
 	s.updateDevIntState(onu, state)
 	return nil
 }
-func (s *Server) Activate(onu *device.Onu) error {
-	utils.LoggerWithOnu(onu).Info("sending ONUInd Onuid")
-	go sendOnuIndtoONU(*s.EnableServer, onu)
-	return nil
+
+func (s *Server) activateOnu(onu *device.Onu) {
+	snKey := stringifySerialNumber(onu.SerialNumber)
+	s.SNmap.Store(snKey, onu)
+	device.UpdateOnusOpStatus(onu.IntfID, onu, "up")
+
+	err := sendOnuDiscInd(*s.EnableServer, onu)
+	if err != nil {
+		logger.Error(err.Error())
+		return
+	}
+	logger.Info("OLT id:%d sent ONUDiscInd.", s.Olt.ID)
+	logger.Debug("activateONUs Entry in SNmap %v", snKey)
+}
+
+func (s *Server) activateONUs(stream openolt.Openolt_EnableIndicationServer, Onumap map[uint32][]*device.Onu) {
+	// Add all ONUs to SerialNumber Map
+	for intfid := range Onumap {
+		for _, onu := range Onumap[intfid] {
+			s.activateOnu(onu)
+		}
+	}
 }
 
 func (s *Server) activateOLT(stream openolt.Openolt_EnableIndicationServer) error {
@@ -249,37 +372,16 @@
 	logger.Info("OLT %s sent IntfInd.", olt.Name)
 
 	// OLT sends Operation Indication to Adapter after activating each interface
-	//time.Sleep(IF_UP_TIME * time.Second)
 	if err := sendOperInd(stream, olt); err != nil {
 		logger.Error("Fail to sendOperInd: %v", err)
 		return err
 	}
 	logger.Info("OLT %s sent OperInd.", olt.Name)
-
 	return nil
 }
-func (s *Server) sendDiscovertoONUs(stream openolt.Openolt_EnableIndicationServer) {
-	// OLT sends ONU Discover Indication to Adapter after ONU discovery
-	for intfid := range s.Onumap {
-		device.UpdateOnusOpStatus(intfid, s.Onumap[intfid], "up")
-	}
-
-	// Initialize all ONUs
-	for intfid := range s.Onumap {
-		for _, onu := range s.Onumap[intfid] {
-			onu.Initialize()
-		}
-	}
-
-	// Send discovery indication for all ONUs
-	for intfid, _ := range s.Onumap {
-		sendOnuDiscInd(stream, s.Onumap[intfid], s.IndInterval)
-		logger.Info("OLT sent ONUDiscInd for intfId:%d.", intfid)
-	}
-}
 
 // StartPktLoops creates veth pairs and invokes runPktLoops (blocking)
-func (s *Server) StartPktLoops(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
+func (s *Server) StartPktLoops(ctx context.Context, stream openolt.Openolt_EnableIndicationServer, errorchan chan error) {
 	logger.Debug("StartPktLoops () Start")
 	defer func() {
 		RemoveVeths(s.Vethnames)
@@ -289,11 +391,14 @@
 		s.updateDevIntState(s.Olt, device.OLT_PREACTIVE)
 		logger.Debug("StartPktLoops () Done")
 	}()
+	s.alarmCh = make(chan *openolt.Indication, 10)
+	go startAlarmLoop(stream, s.alarmCh)
+	go s.startDeviceActionLoop()
 	s.wg.Add(1)
 	ioinfos, veths, err := createIoinfos(s.Olt.ID, s.Vethnames)
 	if err != nil {
 		logger.Error("createIoinfos failed: %v", err)
-		return err
+		errorchan <- err
 	}
 	s.Ioinfos = ioinfos
 	s.Vethnames = veths
@@ -305,9 +410,9 @@
 
 	if err = s.runPktLoops(child, stream); err != nil {
 		logger.Error("runPktLoops failed: %v", err)
-		return err
+		errorchan <- err
 	}
-	return nil
+	errorchan <- nil
 }
 
 // StopPktLoops (non-blocking)
@@ -335,13 +440,13 @@
 	return ioinfos, Vethnames, nil
 }
 
-//Blocking
+// Blocking
 func (s *Server) runPktLoops(ctx context.Context, stream openolt.Openolt_EnableIndicationServer) error {
 	logger.Debug("runPacketPktLoops Start")
 	defer logger.Debug("runPacketLoops Done")
 
 	errchOmci := make(chan error)
-	RunOmciResponder(ctx, s.omciOut, s.omciIn, errchOmci)
+	s.RunOmciResponder(ctx, s.omciOut, s.omciIn, errchOmci)
 	eg, child := errgroup.WithContext(ctx)
 	child, cancel := context.WithCancel(child)
 
@@ -356,7 +461,7 @@
 		defer logger.Debug("runOMCIResponder Done")
 		select {
 		case v, ok := <-errchOmci: // Wait for OmciInitialization
-			if ok { //Error
+			if ok { // Error
 				logger.Error("Error happend in Omci: %s", v)
 				return v
 			}
@@ -371,7 +476,7 @@
 		defer logger.Debug("runEapolResponder Done")
 		select {
 		case v, ok := <-errchEapol:
-			if ok { //Error
+			if ok { // Error
 				logger.Error("Error happend in Eapol:%s", v)
 				return v
 			}
@@ -386,7 +491,7 @@
 		defer logger.Debug("runDhcpResponder Done")
 		select {
 		case v, ok := <-errchDhcp:
-			if ok { //Error
+			if ok { // Error
 				logger.Error("Error happend in Dhcp:%s", v)
 				return v
 			}
@@ -451,7 +556,7 @@
 				logger.Error("Fail to send EAPOL PktInd indication. %v", err)
 				return err
 			}
-		case msg := <-s.dhcpIn: //TODO: We should put omciIn, eapolIn, dhcpIn toghether
+		case msg := <-s.dhcpIn: // TODO: We should put omciIn, eapolIn, dhcpIn toghether
 			intfid := msg.IntfId
 			onuid := msg.OnuId
 			gemid, err := s.getGemPortID(intfid, onuid)
@@ -468,7 +573,7 @@
 				logger.Error("Failed to GetOnuByID:%d", onuid)
 				continue
 			}
-			sn := convB2S(onu.SerialNumber.VendorSpecific)
+			sn := utils.ConvB2S(onu.SerialNumber.VendorSpecific)
 			if ctag, ok := s.CtagMap[sn]; ok == true {
 				tagpkt, err := PushVLAN(pkt, uint16(ctag), onu)
 				if err != nil {
@@ -499,7 +604,12 @@
 				logger.Debug("WARNING: This packet does not come from NNI ")
 				continue
 			}
+
+			onuid := nnipkt.Info.onuid
 			intfid := nnipkt.Info.intfid
+			onu, _ := s.GetOnuByID(onuid, intfid)
+
+			utils.LoggerWithOnu(onu).Info("Received packet from NNI in grpc Server.")
 
 			pkt := nnipkt.Pkt
 			data = &openolt.Indication_PktInd{PktInd: &openolt.PacketIndication{IntfType: "nni", IntfId: intfid, Pkt: pkt.Data()}}
@@ -513,7 +623,6 @@
 			return nil
 		}
 	}
-	return nil
 }
 
 func (s *Server) onuPacketOut(intfid uint32, onuid uint32, rawpkt gopacket.Packet) error {
@@ -622,7 +731,7 @@
 			}
 		}
 	}
-	err := errors.New("No mathced SN is found ")
+	err := errors.New("no matching serial number found")
 	logger.Error("%s", err)
 	return nil, err
 }
@@ -638,7 +747,7 @@
 			return onu, nil
 		}
 	}
-	err := errors.New("No matched OnuID is found ")
+	err := errors.New("no matching OnuID found")
 	logger.WithFields(log.Fields{
 		"onumap": onumap,
 		"onuid":  onuid,
@@ -647,10 +756,111 @@
 	return nil, err
 }
 
-func convB2S(b []byte) string {
-	s := ""
-	for _, i := range b {
-		s = s + strconv.FormatInt(int64(i/16), 16) + strconv.FormatInt(int64(i%16), 16)
+// getOnuFromSNmap method returns onu object from SNmap if found
+func (s *Server) getOnuFromSNmap(serialNumber *openolt.SerialNumber) (*device.Onu, bool) {
+	snkey := stringifySerialNumber(serialNumber)
+
+	logger.Debug("getOnuFromSNmap received serial number %s", snkey)
+
+	if onu, exist := s.SNmap.Load(snkey); exist {
+		logger.Info("Serial number found in map")
+		return onu.(*device.Onu), true
 	}
-	return s
+	logger.Info("Serial number not found in map")
+	return nil, false
+}
+
+func stringifySerialNumber(serialNum *openolt.SerialNumber) string {
+	return string(serialNum.VendorId) + utils.ConvB2S(serialNum.VendorSpecific)
+}
+
+func getOpenoltSerialNumber(SerialNumber string) (*openolt.SerialNumber, error) {
+	if len(SerialNumber) != SerialNumberLength {
+		logger.Error("Invalid serial number %s", SerialNumber)
+		return nil, errors.New("invalid serial number")
+	}
+	// First four characters are vendorId
+	vendorID := SerialNumber[:VendorIDLength]
+	vendorSpecific := SerialNumber[VendorIDLength:]
+
+	vsbyte, _ := hex.DecodeString(vendorSpecific)
+
+	// Convert to Openolt serial number
+	serialNum := new(openolt.SerialNumber)
+	serialNum.VendorId = []byte(vendorID)
+	serialNum.VendorSpecific = vsbyte
+
+	return serialNum, nil
+}
+
+// TODO move to device_onu.go
+func (s *Server) sendOnuIndicationsOnOltReboot() {
+	if AutoONUActivate == 1 {
+		// For auto activate mode, onu indications is sent in Enable()
+		return
+	}
+
+	s.SNmap.Range(
+		func(key, value interface{}) bool {
+			onu := value.(*device.Onu)
+			if onu.InternalState == device.ONU_LOS_RAISED {
+				return true
+			}
+
+			err := sendOnuDiscInd(*s.EnableServer, onu)
+			if err != nil {
+				logger.Error(err.Error())
+			}
+
+			return true
+		})
+}
+
+// StartServerActionLoop reads on server-action channel, and starts and stops the server as per the value received
+func (s *Server) StartServerActionLoop(wg *sync.WaitGroup) {
+	for {
+		select {
+		case Req := <-s.serverActionCh:
+			logger.Debug("Request Received On serverActionCh: %+v", Req)
+			switch Req {
+			case "start":
+				logger.Debug("Server Start Request Received On ServerActionChannel")
+				go s.Start() // blocking
+			case "stop":
+				logger.Debug("Server Stop Request Received On ServerActionChannel")
+				s.Stop()
+			default:
+				logger.Error("Invalid value received in deviceActionCh")
+			}
+		}
+	}
+}
+
+// startDeviceActionLoop reads on the action-channel, and performs onu and olt reboot related actions
+// TODO all onu and olt related actions (like alarms) should be handled using this function
+func (s *Server) startDeviceActionLoop() {
+	logger.Debug("startDeviceActionLoop invoked")
+	s.deviceActionCh = make(chan *pb.DeviceAction, 10)
+	for {
+		logger.Debug("Action channel loop started")
+		select {
+		case Req := <-s.deviceActionCh:
+			logger.Debug("Reboot Action Type: %+v", Req.DeviceAction)
+			switch Req.DeviceType {
+			case DeviceTypeOnu:
+				value, _ := s.SNmap.Load(Req.DeviceSerialNumber)
+				onu := value.(*device.Onu)
+				if Req.DeviceAction == SoftReboot {
+					s.handleONUSoftReboot(onu.IntfID, onu.OnuID)
+				} else if Req.DeviceAction == HardReboot {
+					s.handleONUHardReboot(onu)
+				}
+			case DeviceTypeOlt:
+				logger.Debug("Reboot For OLT Received")
+				s.handleOLTReboot()
+			default:
+				logger.Error("Invalid value received in deviceActionCh")
+			}
+		}
+	}
 }