VOL-2180 context changes in voltha-go

Passed context up as far as possible.
Where context reached the gRPC api, the context is passed through directly.
Where context reached the kafka api, context.TODO() was used (as this NBI does not support context or request cancelation)
Anywhere a new thread is started, and the creating thread makes no attempt to wait, context.Background() was used.
Anywhere a new thread is started, and the creating thread waits for completion, the ctx is passed through from the creating thread.
Cancelation of gRPC NBI requests should recursively cancel all the way through to the KV.

Change-Id: I7a65b49ae4e8c1d5263c27d2627e0ffe4d1eb71b
diff --git a/rw_core/flowdecomposition/flow_decomposer.go b/rw_core/flowdecomposition/flow_decomposer.go
index 09592a8..5819705 100644
--- a/rw_core/flowdecomposition/flow_decomposer.go
+++ b/rw_core/flowdecomposition/flow_decomposer.go
@@ -17,6 +17,7 @@
 package flowdecomposition
 
 import (
+	"context"
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/rw_core/coreif"
 	"github.com/opencord/voltha-go/rw_core/graph"
@@ -46,7 +47,7 @@
 }
 
 //DecomposeRules decomposes per-device flows and flow-groups from the flows and groups defined on a logical device
-func (fd *FlowDecomposer) DecomposeRules(agent coreif.LogicalDeviceAgent, flows ofp.Flows, groups ofp.FlowGroups) *fu.DeviceRules {
+func (fd *FlowDecomposer) DecomposeRules(ctx context.Context, agent coreif.LogicalDeviceAgent, flows ofp.Flows, groups ofp.FlowGroups) *fu.DeviceRules {
 	deviceRules := *fu.NewDeviceRules()
 	devicesToUpdate := make(map[string]string)
 
@@ -57,7 +58,7 @@
 
 	var decomposedRules *fu.DeviceRules
 	for _, flow := range flows.Items {
-		decomposedRules = fd.decomposeFlow(agent, flow, groupMap)
+		decomposedRules = fd.decomposeFlow(ctx, agent, flow, groupMap)
 		for deviceID, flowAndGroups := range decomposedRules.Rules {
 			deviceRules.CreateEntryIfNotExist(deviceID)
 			deviceRules.Rules[deviceID].AddFrom(flowAndGroups)
@@ -416,7 +417,7 @@
 }
 
 // decomposeFlow decomposes a flow for a logical device into flows for each physical device
-func (fd *FlowDecomposer) decomposeFlow(agent coreif.LogicalDeviceAgent, flow *ofp.OfpFlowStats,
+func (fd *FlowDecomposer) decomposeFlow(ctx context.Context, agent coreif.LogicalDeviceAgent, flow *ofp.OfpFlowStats,
 	groupMap map[uint32]*ofp.OfpGroupEntry) *fu.DeviceRules {
 
 	inPortNo := fu.GetInPort(flow)
@@ -451,7 +452,7 @@
 	} else {
 		var ingressDevice *voltha.Device
 		var err error
-		if ingressDevice, err = fd.deviceMgr.GetDevice(route[0].DeviceID); err != nil {
+		if ingressDevice, err = fd.deviceMgr.GetDevice(ctx, route[0].DeviceID); err != nil {
 			log.Errorw("ingress-device-not-found", log.Fields{"deviceId": route[0].DeviceID, "flow": flow})
 			return deviceRules
 		}
diff --git a/rw_core/flowdecomposition/flow_decomposer_test.go b/rw_core/flowdecomposition/flow_decomposer_test.go
index cf32c4d..8e2d9f3 100644
--- a/rw_core/flowdecomposition/flow_decomposer_test.go
+++ b/rw_core/flowdecomposition/flow_decomposer_test.go
@@ -16,6 +16,7 @@
 package flowdecomposition
 
 import (
+	"context"
 	"errors"
 	"github.com/opencord/voltha-go/rw_core/graph"
 	"github.com/opencord/voltha-go/rw_core/mocks"
@@ -99,7 +100,7 @@
 	return &tdm
 }
 
-func (tdm *testDeviceManager) GetDevice(deviceID string) (*voltha.Device, error) {
+func (tdm *testDeviceManager) GetDevice(ctx context.Context, deviceID string) (*voltha.Device, error) {
 	if d, ok := tdm.devices[deviceID]; ok {
 		return d, nil
 	}
@@ -398,8 +399,8 @@
 	return &tfd
 }
 
-func (tfd *testFlowDecomposer) getDeviceHelper(deviceID string) (*voltha.Device, error) {
-	return tfd.dMgr.GetDevice(deviceID)
+func (tfd *testFlowDecomposer) getDeviceHelper(ctx context.Context, deviceID string) (*voltha.Device, error) {
+	return tfd.dMgr.GetDevice(ctx, deviceID)
 }
 
 func (tfd *testFlowDecomposer) GetDeviceLogicalID() string {
@@ -483,7 +484,7 @@
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
 
-	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+	deviceRules := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
 	assert.Equal(t, 1, onu1FlowAndGroup.Flows.Len())
@@ -545,7 +546,7 @@
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
 
-	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+	deviceRules := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
 	assert.Equal(t, 1, onu1FlowAndGroup.Flows.Len())
@@ -606,7 +607,7 @@
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
 
-	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+	deviceRules := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
 	assert.Equal(t, 1, onu1FlowAndGroup.Flows.Len())
@@ -667,7 +668,7 @@
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
 
-	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+	deviceRules := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
 	assert.Equal(t, 1, onu1FlowAndGroup.Flows.Len())
@@ -731,7 +732,7 @@
 	flows := ofp.Flows{Items: []*ofp.OfpFlowStats{fu.MkFlowStat(fa)}}
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
-	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+	deviceRules := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
 	assert.Nil(t, onu1FlowAndGroup)
@@ -793,7 +794,7 @@
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
 
-	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+	deviceRules := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
 	assert.NotNil(t, onu1FlowAndGroup)
@@ -895,7 +896,7 @@
 	groups := ofp.FlowGroups{}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
 
-	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+	deviceRules := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
 
 	onu1FlowAndGroup := deviceRules.Rules["onu1"]
 	oltFlowAndGroup := deviceRules.Rules["olt"]
@@ -982,7 +983,7 @@
 	groups := ofp.FlowGroups{Items: []*ofp.OfpGroupEntry{fu.MkGroupStat(ga)}}
 	tfd := newTestFlowDecomposer(newTestDeviceManager())
 
-	deviceRules := tfd.fd.DecomposeRules(tfd, flows, groups)
+	deviceRules := tfd.fd.DecomposeRules(context.Background(), tfd, flows, groups)
 	oltFlowAndGroup := deviceRules.Rules["olt"]
 	assert.Equal(t, 1, oltFlowAndGroup.Flows.Len())
 	assert.Equal(t, 0, oltFlowAndGroup.Groups.Len())