[VOL-1547] Add port to logical device when device is active

This commit consists of the following changes:
1)  Fix the issue where flows were received when the logical
device flow graph was not ready.
2) Update the default kafka config for improved performance
3) Add a lock to the device ownership logic to ensure the
lock map does not get corrupted.

Change-Id: I840d572e06ed5acf0f3bc1ce423a0ada8f335543
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index 887a167..e5f6dfe 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -431,14 +431,14 @@
 }
 
 func (c *EtcdClient) listenForKeyChange(channel v3Client.WatchChan, ch chan<- *Event) {
-	log.Infow("start-listening-on-channel", log.Fields{"channel": ch})
+	log.Debug("start-listening-on-channel ...")
 	for resp := range channel {
 		for _, ev := range resp.Events {
 			//log.Debugf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
 			ch <- NewEvent(getEventType(ev), ev.Kv.Key, ev.Kv.Value)
 		}
 	}
-	log.Info("stop-listening-on-channel")
+	log.Debug("stop-listening-on-channel ...")
 }
 
 func getEventType(event *v3Client.Event) int {
diff --git a/kafka/client.go b/kafka/client.go
index 4eb3e5a..a4c49ca 100644
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -40,14 +40,14 @@
 	DefaultKafkaPort                = 9092
 	DefaultGroupName                = "voltha"
 	DefaultSleepOnError             = 1
-	DefaultProducerFlushFrequency   = 1
-	DefaultProducerFlushMessages    = 1
-	DefaultProducerFlushMaxmessages = 1
+	DefaultProducerFlushFrequency   = 10
+	DefaultProducerFlushMessages    = 10
+	DefaultProducerFlushMaxmessages = 100
 	DefaultProducerReturnSuccess    = true
 	DefaultProducerReturnErrors     = true
 	DefaultProducerRetryMax         = 3
 	DefaultProducerRetryBackoff     = time.Millisecond * 100
-	DefaultConsumerMaxwait          = 10
+	DefaultConsumerMaxwait          = 100
 	DefaultMaxProcessingTime        = 100
 	DefaultConsumerType             = PartitionConsumer
 	DefaultNumberPartitions         = 3
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 42e5a02..b9c03e6 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -257,7 +257,7 @@
 	// subscriber on that topic will receive the request in the order it was sent.  The key used is the deviceId.
 	//key := GetDeviceIdFromTopic(*toTopic)
 	log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
-	kp.kafkaClient.Send(protoRequest, toTopic, key)
+	go kp.kafkaClient.Send(protoRequest, toTopic, key)
 
 	if waitForResponse {
 		// Create a child context based on the parent context, if any
@@ -550,7 +550,7 @@
 		FromTopic: request.Header.ToTopic,
 		ToTopic:   request.Header.FromTopic,
 		KeyTopic: request.Header.KeyTopic,
-		Timestamp: time.Now().Unix(),
+		Timestamp: time.Now().UnixNano(),
 	}
 
 	// Go over all returned values
@@ -635,7 +635,6 @@
 
 	// First extract the header to know whether this is a request - responses are handled by a different handler
 	if msg.Header.Type == ic.MessageType_REQUEST {
-
 		var out []reflect.Value
 		var err error
 
@@ -707,7 +706,7 @@
 			key := msg.Header.KeyTopic
 			log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
 			// TODO: handle error response.
-			kp.kafkaClient.Send(icm, replyTopic, key)
+			 go kp.kafkaClient.Send(icm, replyTopic, key)
 		}
 	} else if msg.Header.Type == ic.MessageType_RESPONSE {
 		log.Debugw("response-received", log.Fields{"msg-header": msg.Header})
@@ -765,7 +764,7 @@
 		FromTopic: replyTopic.Name,
 		ToTopic:   toTopic.Name,
 		KeyTopic: key,
-		Timestamp: time.Now().Unix(),
+		Timestamp: time.Now().UnixNano(),
 	}
 	requestBody := &ic.InterContainerRequestBody{
 		Rpc:              rpc,
diff --git a/rw_core/core/device_ownership.go b/rw_core/core/device_ownership.go
index 4a692d3..0921561 100644
--- a/rw_core/core/device_ownership.go
+++ b/rw_core/core/device_ownership.go
@@ -238,39 +238,34 @@
 	if id == nil {
 		return "", status.Error(codes.InvalidArgument, "nil-id")
 	}
+	da.deviceToKeyMapLock.Lock()
+	defer da.deviceToKeyMapLock.Unlock()
 	var device *voltha.Device
 	var lDevice *voltha.LogicalDevice
 	// The id can either be a device Id or a logical device id.
 	if dId, ok := id.(*utils.DeviceID); ok {
 		// Use cache if present
-		if val, err := da.getDeviceKey(dId.Id); err == nil {
+		if val, exist := da.deviceToKeyMap[dId.Id]; exist {
 			return val, nil
 		}
 		if device, _ = da.deviceMgr.GetDevice(dId.Id); device == nil {
 			return "", status.Error(codes.NotFound, fmt.Sprintf("id-absent-%s", dId))
 		}
 		if device.Root {
-			if err := da.updateDeviceKey(dId.Id, device.Id); err != nil {
-				log.Warnw("Error-updating-cache", log.Fields{"id": dId.Id, "key": device.Id, "error": err})
-			}
-			return device.Id, nil
+			da.deviceToKeyMap[dId.Id] = device.Id
 		} else {
-			if err := da.updateDeviceKey(dId.Id, device.ParentId); err != nil {
-				log.Warnw("Error-updating-cache", log.Fields{"id": dId.Id, "key": device.ParentId, "error": err})
-			}
-			return device.ParentId, nil
+			da.deviceToKeyMap[dId.Id] = device.ParentId
 		}
+		return da.deviceToKeyMap[dId.Id], nil
 	} else if ldId, ok := id.(*utils.LogicalDeviceID); ok {
 		// Use cache if present
-		if val, err := da.getDeviceKey(ldId.Id); err == nil {
+		if val, exist := da.deviceToKeyMap[ldId.Id]; exist {
 			return val, nil
 		}
 		if lDevice, _ = da.logicalDeviceMgr.getLogicalDevice(ldId.Id); lDevice == nil {
 			return "", status.Error(codes.NotFound, fmt.Sprintf("id-absent-%s", ldId))
 		}
-		if err := da.updateDeviceKey(ldId.Id, lDevice.RootDeviceId); err != nil {
-			log.Warnw("Error-updating-cache", log.Fields{"id": ldId.Id, "key": lDevice.RootDeviceId, "error": err})
-		}
+		da.deviceToKeyMap[ldId.Id] = lDevice.RootDeviceId
 		return lDevice.RootDeviceId, nil
 	}
 	return "", status.Error(codes.NotFound, fmt.Sprintf("id-%s", id))
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index b1a68cc..732e9cf 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -22,12 +22,12 @@
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/model"
-	ic "github.com/opencord/voltha-protos/go/inter_container"
-	ofp "github.com/opencord/voltha-protos/go/openflow_13"
-	"github.com/opencord/voltha-protos/go/voltha"
 	fd "github.com/opencord/voltha-go/rw_core/flow_decomposition"
 	"github.com/opencord/voltha-go/rw_core/graph"
 	fu "github.com/opencord/voltha-go/rw_core/utils"
+	ic "github.com/opencord/voltha-protos/go/inter_container"
+	ofp "github.com/opencord/voltha-protos/go/openflow_13"
+	"github.com/opencord/voltha-protos/go/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 	"reflect"
@@ -248,24 +248,31 @@
 
 
 func (agent *LogicalDeviceAgent)  addLogicalPort (device *voltha.Device, port *voltha.Port) error {
+	log.Debugw("addLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
+	var changed bool
+	var err error
 	if port.Type == voltha.Port_ETHERNET_NNI {
-		if err := agent.addNNILogicalPort(device, port); err != nil {
+		if changed, err = agent.addNNILogicalPort(device, port); err != nil {
 			return err
 		}
 	} else if port.Type == voltha.Port_ETHERNET_UNI {
-		if err :=  agent.addUNILogicalPort(device, port); err != nil {
+		if changed, err =  agent.addUNILogicalPort(device, port); err != nil {
 			return err
 		}
 	} else {
 		log.Debugw("invalid-port-type", log.Fields{"deviceId": device.Id, "port": port})
 		return nil
 	}
-	go agent.setupDeviceGraph()
+	if changed {
+		go agent.setupDeviceGraph()
+	}
 	return nil
 }
 
 // setupNNILogicalPorts creates an NNI port on the logical device that represents an NNI interface on a root device
 func (agent *LogicalDeviceAgent) setupNNILogicalPorts(ctx context.Context, deviceId string) error {
+	//now := time.Now()
+	//defer fmt.Println("setupNNILogicalPorts:", deviceId, time.Since(now))
 	log.Infow("setupNNILogicalPorts-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 	// Build the logical device based on information retrieved from the device adapter
 	var err error
@@ -279,11 +286,12 @@
 	//Get UNI port number
 	changesMade := false
 	for _, port := range device.Ports {
+		changed := false
 		if port.Type == voltha.Port_ETHERNET_NNI {
-			if err = agent.addNNILogicalPort(device, port); err != nil {
+			if changed, err = agent.addNNILogicalPort(device, port); err != nil {
 				log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
 			} else {
-				changesMade = true
+				changesMade = changed || changesMade
 			}
 		}
 	}
@@ -296,6 +304,8 @@
 
 // setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
 func (agent *LogicalDeviceAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device) error {
+	//now := time.Now()
+	//defer fmt.Println("setupUNILogicalPorts:", childDevice.Id, time.Since(now))
 	log.Infow("setupUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 	// Build the logical device based on information retrieved from the device adapter
 	var err error
@@ -303,11 +313,12 @@
 	//Get UNI port number
 	changesMade := false
 	for _, port := range childDevice.Ports {
+		changed := false
 		if port.Type == voltha.Port_ETHERNET_UNI {
-			if err = agent.addUNILogicalPort(childDevice, port); err != nil {
+			if changed, err = agent.addUNILogicalPort(childDevice, port); err != nil {
 				log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
 			} else {
-				changesMade = true
+				changesMade = changed || changesMade
 			}
 		}
 	}
@@ -797,6 +808,14 @@
 		if isNNIPort(ingressPortNo, nniLogicalPortsNo) {
 			log.Debug("returning-half-route")
 			//This is a trap on the NNI Port
+			if len(agent.deviceGraph.Routes) == 0 {
+				// If there are no routes set (usually when the logical device has only NNI port(s), then just return an
+				// internal route
+				hop := graph.RouteHop{DeviceID:ld.RootDeviceId, Ingress:ingressPortNo, Egress:egressPortNo}
+				routes = append(routes, hop)
+				routes = append(routes, hop)
+				return routes
+			}
 			//Return a 'half' route to make the flow decomposer logic happy
 			for routeLink, route := range agent.deviceGraph.Routes {
 				if isNNIPort(routeLink.Egress, nniLogicalPortsNo) {
@@ -1208,17 +1227,23 @@
 }
 
 
-func (agent *LogicalDeviceAgent) addNNILogicalPort (device *voltha.Device, port *voltha.Port)  error {
+// addNNILogicalPort adds an NNI port to the logical device.  It returns a bool representing whether a port has been
+// added and an eror in case a valid error is encountered. If the port was successfully added it will return
+// (true, nil).   If the device is not in the correct state it will return (false, nil) as this is a valid
+// scenario. This also applies to the case where the port was already added.
+func (agent *LogicalDeviceAgent) addNNILogicalPort (device *voltha.Device, port *voltha.Port)  (bool, error) {
+	//now := time.Now()
+	//defer fmt.Println("setupNNILogicalPorts:", device.Id, time.Since(now))
 	log.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
-	if device.AdminState != voltha.AdminState_ENABLED {
-		log.Infow("device-not-enabled", log.Fields{"deviceId": device.Id})
-		return nil
+	if device.AdminState != voltha.AdminState_ENABLED || device.OperStatus != voltha.OperStatus_ACTIVE {
+		log.Infow("device-not-ready", log.Fields{"deviceId": device.Id, "admin": device.AdminState, "oper": device.OperStatus})
+		return false, nil
 	}
 	agent.lockLogicalDevice.RLock()
 	if agent.portExist(device, port) {
 		log.Debugw("port-already-exist", log.Fields{"port": port})
 		agent.lockLogicalDevice.RUnlock()
-		return nil
+		return false, nil
 	}
 	agent.lockLogicalDevice.RUnlock()
 
@@ -1227,7 +1252,7 @@
 	// First get the port capability
 	if portCap, err = agent.deviceMgr.getPortCapability(nil, device.Id, port.PortNo); err != nil {
 		log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
-		return err
+		return false, err
 	}
 
 	agent.lockLogicalDevice.Lock()
@@ -1235,7 +1260,7 @@
 	// Double check again if this port has been already added since the getPortCapability could have taken a long time
 	if agent.portExist(device, port) {
 		log.Debugw("port-already-exist", log.Fields{"port": port})
-		return nil
+		return false, nil
 	}
 
 	portCap.Port.RootPort = true
@@ -1249,7 +1274,7 @@
 	var ld *voltha.LogicalDevice
 	if ld, err = agent.getLogicalDeviceWithoutLock(); err != nil {
 		log.Errorw("error-retrieving-logical-device", log.Fields{"error": err})
-		return err
+		return false, err
 	}
 	cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
 	if cloned.Ports == nil {
@@ -1259,9 +1284,9 @@
 
 	if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
 		log.Errorw("error-updating-logical-device", log.Fields{"error": err})
-		return err
+		return false, err
 	}
-	return nil
+	return true, nil
 }
 
 func (agent *LogicalDeviceAgent) portExist (device *voltha.Device, port *voltha.Port) bool {
@@ -1275,17 +1300,24 @@
 	return false
 }
 
-func (agent *LogicalDeviceAgent) addUNILogicalPort (childDevice *voltha.Device, port *voltha.Port)  error {
+
+// addUNILogicalPort adds an UNI port to the logical device.  It returns a bool representing whether a port has been
+// added and an eror in case a valid error is encountered. If the port was successfully added it will return
+// (true, nil).   If the device is not in the correct state it will return (false, nil) as this is a valid
+// scenario. This also applies to the case where the port was already added.
+func (agent *LogicalDeviceAgent) addUNILogicalPort (childDevice *voltha.Device, port *voltha.Port)  (bool, error) {
+	//now := time.Now()
+	//defer fmt.Println("addUNILogicalPort:", childDevice.Id, time.Since(now))
 	log.Debugw("addUNILogicalPort", log.Fields{"port": port})
-	if childDevice.AdminState != voltha.AdminState_ENABLED {
-		log.Infow("device-not-enabled", log.Fields{"deviceId": childDevice.Id})
-		return nil
+	if childDevice.AdminState != voltha.AdminState_ENABLED || childDevice.OperStatus != voltha.OperStatus_ACTIVE {
+		log.Infow("device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
+		return false, nil
 	}
 	agent.lockLogicalDevice.RLock()
 	if agent.portExist(childDevice, port) {
 		log.Debugw("port-already-exist", log.Fields{"port": port})
 		agent.lockLogicalDevice.RUnlock()
-		return nil
+		return false, nil
 	}
 	agent.lockLogicalDevice.RUnlock()
 	var portCap *ic.PortCapability
@@ -1293,18 +1325,18 @@
 	// First get the port capability
 	if portCap, err = agent.deviceMgr.getPortCapability(nil, childDevice.Id, port.PortNo); err != nil {
 		log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
-		return err
+		return false, err
 	}
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
 	// Double check again if this port has been already added since the getPortCapability could have taken a long time
 	if agent.portExist(childDevice, port) {
 		log.Debugw("port-already-exist", log.Fields{"port": port})
-		return nil
+		return false, nil
 	}
 	// Get stored logical device
 	if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
-		return status.Error(codes.NotFound, agent.logicalDeviceId)
+		return false, status.Error(codes.NotFound, agent.logicalDeviceId)
 	} else {
 		log.Debugw("adding-uni", log.Fields{"deviceId": childDevice.Id})
 		portCap.Port.RootPort = false
@@ -1318,7 +1350,7 @@
 			cloned.Ports = make([]*voltha.LogicalPort, 0)
 		}
 		cloned.Ports = append(cloned.Ports, portCap.Port)
-		return agent.updateLogicalDeviceWithoutLock(cloned)
+		return true, agent.updateLogicalDeviceWithoutLock(cloned)
 	}
 }
 
diff --git a/rw_core/flow_decomposition/flow_decomposer.go b/rw_core/flow_decomposition/flow_decomposer.go
index 6c67ace..ec2904f 100644
--- a/rw_core/flow_decomposition/flow_decomposer.go
+++ b/rw_core/flow_decomposition/flow_decomposer.go
@@ -819,17 +819,8 @@
 	fg := fu.NewFlowsAndGroups()
 	if agent.GetDeviceGraph().IsRootPort(inPortNo) {
 		log.Debug("trap-nni")
-		var fa *fu.FlowArgs
-		fa = &fu.FlowArgs{
-			KV: fu.OfpFlowModArgs{"priority": uint64(flow.Priority), "cookie": flow.Cookie},
-			MatchFields: []*ofp.OfpOxmOfbField{
-				InPort(egressHop.Egress),
-			},
-			Actions: GetActions(flow),
-		}
-		// Augment the matchfields with the ofpfields from the flow
-		fa.MatchFields = append(fa.MatchFields, GetOfbFields(flow, IN_PORT)...)
-		fg.AddFlow(MkFlowStat(fa))
+		// no decomposition required - it is already an OLT flow from NNI
+		fg.AddFlow(flow)
 	} else {
 		// Trap flow for UNI port
 		log.Debug("trap-uni")