[VOL-3999] Correctly handling context during OLT Reconcile

Change-Id: I22b8cca74eba3574adee4ed9dae48808ca9af889
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index 6ad068d..1c4e099 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -413,19 +413,20 @@
 	// new ones
 	o.Lock()
 	if o.enableContext != nil && o.enableContextCancel != nil {
-		oltLogger.Info("This is an OLT reboot")
+		oltLogger.Info("This is an OLT reboot or a reconcile")
 		o.enableContextCancel()
 		rebootFlag = true
+		time.Sleep(1 * time.Second)
 	}
 	o.enableContext, o.enableContextCancel = context.WithCancel(context.TODO())
 	o.Unlock()
 
 	wg := sync.WaitGroup{}
-	wg.Add(3)
 
 	o.OpenoltStream = stream
 
 	// create Go routine to process all OLT events
+	wg.Add(1)
 	go o.processOltMessages(o.enableContext, stream, &wg)
 
 	// enable the OLT
@@ -461,6 +462,15 @@
 				}
 				o.channel <- msg
 			}
+			// when the enableContext was canceled the ONUs stopped listening on the channel
+			for _, onu := range pon.Onus {
+				go onu.ProcessOnuMessages(o.enableContext, stream, nil)
+
+				// update the stream on all the services
+				for _, service := range onu.Services {
+					service.UpdateStream(stream)
+				}
+			}
 		}
 	} else {
 
@@ -482,18 +492,22 @@
 		}
 	}
 
-	oltLogger.Debug("Enable OLT Done")
-
 	if !o.enablePerf {
 		// Start a go routine to send periodic port stats to openolt adapter
-		go o.periodicPortStats(o.enableContext)
+		wg.Add(1)
+		go o.periodicPortStats(o.enableContext, &wg, stream)
 	}
 
 	wg.Wait()
+	oltLogger.WithFields(log.Fields{
+		"stream": stream,
+	}).Debug("OpenOLT Stream closed")
 }
 
-func (o *OltDevice) periodicPortStats(ctx context.Context) {
+func (o *OltDevice) periodicPortStats(ctx context.Context, wg *sync.WaitGroup, stream openolt.Openolt_EnableIndicationServer) {
 	var portStats *openolt.PortStatistics
+
+loop:
 	for {
 		select {
 		case <-time.After(time.Duration(o.PortStatsInterval) * time.Second):
@@ -504,7 +518,7 @@
 					incrementStat = false
 				}
 				portStats, port.PacketCount = getPortStats(port.PacketCount, incrementStat)
-				o.sendPortStatsIndication(portStats, port.ID, port.Type)
+				o.sendPortStatsIndication(portStats, port.ID, port.Type, stream)
 			}
 
 			// send PON port stats
@@ -515,14 +529,14 @@
 					incrementStat = false
 				}
 				portStats, port.PacketCount = getPortStats(port.PacketCount, incrementStat)
-				o.sendPortStatsIndication(portStats, port.ID, port.Type)
+				o.sendPortStatsIndication(portStats, port.ID, port.Type, stream)
 			}
 		case <-ctx.Done():
-			log.Debug("Stop sending port stats")
-			return
+			oltLogger.Debug("Stop sending port stats")
+			break loop
 		}
-
 	}
+	wg.Done()
 }
 
 // Helpers method
@@ -695,7 +709,7 @@
 	}).Debug("Sent Indication_IntfOperInd for PON")
 }
 
-func (o *OltDevice) sendPortStatsIndication(stats *openolt.PortStatistics, portID uint32, portType string) {
+func (o *OltDevice) sendPortStatsIndication(stats *openolt.PortStatistics, portID uint32, portType string, stream openolt.Openolt_EnableIndicationServer) {
 	if o.InternalState.Current() == "enabled" {
 		oltLogger.WithFields(log.Fields{
 			"Type":   portType,
@@ -705,7 +719,7 @@
 		data := &openolt.Indication_PortStats{
 			PortStats: stats,
 		}
-		stream := o.OpenoltStream
+
 		if err := stream.Send(&openolt.Indication{Data: data}); err != nil {
 			oltLogger.Errorf("Failed to send PortStats: %v", err)
 			return
@@ -714,8 +728,10 @@
 }
 
 // processOltMessages handles messages received over the OpenOLT interface
-func (o *OltDevice) processOltMessages(ctx context.Context, stream openolt.Openolt_EnableIndicationServer, wg *sync.WaitGroup) {
-	oltLogger.Debug("Starting OLT Indication Channel")
+func (o *OltDevice) processOltMessages(ctx context.Context, stream types.Stream, wg *sync.WaitGroup) {
+	oltLogger.WithFields(log.Fields{
+		"stream": stream,
+	}).Debug("Starting OLT Indication Channel")
 	ch := o.channel
 
 loop:
@@ -724,9 +740,15 @@
 		case <-ctx.Done():
 			oltLogger.Debug("OLT Indication processing canceled via context")
 			break loop
+		case <-stream.Context().Done():
+			oltLogger.Debug("OLT Indication processing canceled via stream context")
+			break loop
 		case message, ok := <-ch:
-			if !ok || ctx.Err() != nil {
-				oltLogger.Debug("OLT Indication processing canceled via closed channel")
+			if !ok {
+				if ctx.Err() != nil {
+					oltLogger.WithField("err", ctx.Err()).Error("OLT EnableContext error")
+				}
+				oltLogger.Warn("OLT Indication processing canceled via closed channel")
 				break loop
 			}
 
@@ -788,7 +810,9 @@
 		}
 	}
 	wg.Done()
-	oltLogger.Warn("Stopped handling OLT Indication Channel")
+	oltLogger.WithFields(log.Fields{
+		"stream": stream,
+	}).Warn("Stopped handling OLT Indication Channel")
 }
 
 // returns an ONU with a given Serial Number