[VOL-3999] Correctly handling context during OLT Reconcile
Change-Id: I22b8cca74eba3574adee4ed9dae48808ca9af889
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index 6ad068d..1c4e099 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -413,19 +413,20 @@
// new ones
o.Lock()
if o.enableContext != nil && o.enableContextCancel != nil {
- oltLogger.Info("This is an OLT reboot")
+ oltLogger.Info("This is an OLT reboot or a reconcile")
o.enableContextCancel()
rebootFlag = true
+ time.Sleep(1 * time.Second)
}
o.enableContext, o.enableContextCancel = context.WithCancel(context.TODO())
o.Unlock()
wg := sync.WaitGroup{}
- wg.Add(3)
o.OpenoltStream = stream
// create Go routine to process all OLT events
+ wg.Add(1)
go o.processOltMessages(o.enableContext, stream, &wg)
// enable the OLT
@@ -461,6 +462,15 @@
}
o.channel <- msg
}
+ // when the enableContext was canceled the ONUs stopped listening on the channel
+ for _, onu := range pon.Onus {
+ go onu.ProcessOnuMessages(o.enableContext, stream, nil)
+
+ // update the stream on all the services
+ for _, service := range onu.Services {
+ service.UpdateStream(stream)
+ }
+ }
}
} else {
@@ -482,18 +492,22 @@
}
}
- oltLogger.Debug("Enable OLT Done")
-
if !o.enablePerf {
// Start a go routine to send periodic port stats to openolt adapter
- go o.periodicPortStats(o.enableContext)
+ wg.Add(1)
+ go o.periodicPortStats(o.enableContext, &wg, stream)
}
wg.Wait()
+ oltLogger.WithFields(log.Fields{
+ "stream": stream,
+ }).Debug("OpenOLT Stream closed")
}
-func (o *OltDevice) periodicPortStats(ctx context.Context) {
+func (o *OltDevice) periodicPortStats(ctx context.Context, wg *sync.WaitGroup, stream openolt.Openolt_EnableIndicationServer) {
var portStats *openolt.PortStatistics
+
+loop:
for {
select {
case <-time.After(time.Duration(o.PortStatsInterval) * time.Second):
@@ -504,7 +518,7 @@
incrementStat = false
}
portStats, port.PacketCount = getPortStats(port.PacketCount, incrementStat)
- o.sendPortStatsIndication(portStats, port.ID, port.Type)
+ o.sendPortStatsIndication(portStats, port.ID, port.Type, stream)
}
// send PON port stats
@@ -515,14 +529,14 @@
incrementStat = false
}
portStats, port.PacketCount = getPortStats(port.PacketCount, incrementStat)
- o.sendPortStatsIndication(portStats, port.ID, port.Type)
+ o.sendPortStatsIndication(portStats, port.ID, port.Type, stream)
}
case <-ctx.Done():
- log.Debug("Stop sending port stats")
- return
+ oltLogger.Debug("Stop sending port stats")
+ break loop
}
-
}
+ wg.Done()
}
// Helpers method
@@ -695,7 +709,7 @@
}).Debug("Sent Indication_IntfOperInd for PON")
}
-func (o *OltDevice) sendPortStatsIndication(stats *openolt.PortStatistics, portID uint32, portType string) {
+func (o *OltDevice) sendPortStatsIndication(stats *openolt.PortStatistics, portID uint32, portType string, stream openolt.Openolt_EnableIndicationServer) {
if o.InternalState.Current() == "enabled" {
oltLogger.WithFields(log.Fields{
"Type": portType,
@@ -705,7 +719,7 @@
data := &openolt.Indication_PortStats{
PortStats: stats,
}
- stream := o.OpenoltStream
+
if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
oltLogger.Errorf("Failed to send PortStats: %v", err)
return
@@ -714,8 +728,10 @@
}
// processOltMessages handles messages received over the OpenOLT interface
-func (o *OltDevice) processOltMessages(ctx context.Context, stream openolt.Openolt_EnableIndicationServer, wg *sync.WaitGroup) {
- oltLogger.Debug("Starting OLT Indication Channel")
+func (o *OltDevice) processOltMessages(ctx context.Context, stream types.Stream, wg *sync.WaitGroup) {
+ oltLogger.WithFields(log.Fields{
+ "stream": stream,
+ }).Debug("Starting OLT Indication Channel")
ch := o.channel
loop:
@@ -724,9 +740,15 @@
case <-ctx.Done():
oltLogger.Debug("OLT Indication processing canceled via context")
break loop
+ case <-stream.Context().Done():
+ oltLogger.Debug("OLT Indication processing canceled via stream context")
+ break loop
case message, ok := <-ch:
- if !ok || ctx.Err() != nil {
- oltLogger.Debug("OLT Indication processing canceled via closed channel")
+ if !ok {
+ if ctx.Err() != nil {
+ oltLogger.WithField("err", ctx.Err()).Error("OLT EnableContext error")
+ }
+ oltLogger.Warn("OLT Indication processing canceled via closed channel")
break loop
}
@@ -788,7 +810,9 @@
}
}
wg.Done()
- oltLogger.Warn("Stopped handling OLT Indication Channel")
+ oltLogger.WithFields(log.Fields{
+ "stream": stream,
+ }).Warn("Stopped handling OLT Indication Channel")
}
// returns an ONU with a given Serial Number