This update consists of the following changes:
    1) Add GroupConsumer to the Go sarama_client and modify the Core
    code to use a groupConsumer instead of a partition consumer. This
    change will ensure that multiple consumers (with different group Ids)
    can consume kafka messages from the same topic.
    2) Remove afkak kafka client and replace it with confluent kakfa,
    a change done in voltha 1.x. Modify the code accordingly.
    3) Add a Group Consumer to the Python kakfa client such that
    several instances of an Adapter can consume the same messages from
    the same topic.
    4) Set the datapath_id for the logical device in the Core.

Change-Id: I5d7ced27c9aeca4f6211baa3dc8cb3db861545e4
diff --git a/rw_core/core/id.go b/rw_core/core/id.go
index b28151f..369b4f1 100644
--- a/rw_core/core/id.go
+++ b/rw_core/core/id.go
@@ -18,7 +18,10 @@
 import (
 	"crypto/rand"
 	"encoding/hex"
+	"errors"
+	"fmt"
 	m "math/rand"
+	"strconv"
 )
 
 func randomHex(n int) (string, error) {
@@ -49,3 +52,16 @@
 	//	A logical port is a uint32
 	return m.Uint32()
 }
+
+func CreateDataPathId(idInHexString string) (uint64, error) {
+	if idInHexString == "" {
+		return 0, errors.New("id-empty")
+	}
+	// First prepend 0x to the string
+	newId := fmt.Sprintf("0x%s", idInHexString)
+	if d, err := strconv.ParseUint(newId, 0, 64); err != nil {
+		return 0, err
+	} else {
+		return d, nil
+	}
+}
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 86b79f9..ee3d82a 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -75,7 +75,16 @@
 		log.Errorw("error-creating-logical-device", log.Fields{"error": err})
 		return err
 	}
+
 	ld := &voltha.LogicalDevice{Id: agent.logicalDeviceId, RootDeviceId: agent.rootDeviceId}
+
+	// Create the datapath ID (uint64) using the logical device ID (based on the MAC Address)
+	var datapathID uint64
+	if datapathID, err = CreateDataPathId(agent.logicalDeviceId); err != nil {
+		log.Errorw("error-creating-datapath-id", log.Fields{"error": err})
+		return err
+	}
+	ld.DatapathId = datapathID
 	ld.Desc = (proto.Clone(switchCap.Desc)).(*ofp.OfpDesc)
 	ld.SwitchFeatures = (proto.Clone(switchCap.SwitchFeatures)).(*ofp.OfpSwitchFeatures)
 	ld.Flows = &ofp.Flows{Items: nil}
@@ -596,7 +605,7 @@
 	groupsChanged := false
 	groupId := groupMod.GroupId
 	if idx := fu.FindGroup(groups, groupId); idx == -1 {
-		return errors.New(fmt.Sprintf("group-absent:%s", groupId))
+		return errors.New(fmt.Sprintf("group-absent:%d", groupId))
 	} else {
 		//replace existing group entry with new group definition
 		groupEntry := fd.GroupEntryFromGroupMod(groupMod)
@@ -878,7 +887,7 @@
 	}
 
 	deviceNodeIds := agent.deviceGraph.GetDeviceNodeIds()
-	for deviceId, _ := range deviceNodeIds {
+	for deviceId := range deviceNodeIds {
 		if deviceId == ld.RootDeviceId {
 			rules.AddFlowsAndGroup(deviceId, agent.rootDeviceDefaultRules())
 		} else {
@@ -960,9 +969,14 @@
 	deviceRules := agent.flowDecomposer.DecomposeRules(agent, *latestData, *groups)
 	log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
+	var err error
 	for deviceId, value := range deviceRules.GetRules() {
-		agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
-		agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
+		if err = agent.deviceMgr.updateFlows(deviceId, value.ListFlows()); err != nil {
+			log.Error("update-flows-failed", log.Fields{"deviceID":deviceId})
+		}
+		if err = agent.deviceMgr.updateGroups(deviceId, value.ListGroups()); err != nil {
+			log.Error("update-groups-failed", log.Fields{"deviceID":deviceId})
+		}
 	}
 
 	return nil
@@ -996,11 +1010,16 @@
 	log.Debugw("groupsinfo", log.Fields{"groups": latestData, "flows": flows})
 	deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *latestData)
 	log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+	var err error
 	for deviceId, value := range deviceRules.GetRules() {
-		agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
-		agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
-	}
+		if err = agent.deviceMgr.updateFlows(deviceId, value.ListFlows()); err != nil {
+			log.Error("update-flows-failed", log.Fields{"deviceID":deviceId})
+		}
+		if err = agent.deviceMgr.updateGroups(deviceId, value.ListGroups()); err != nil {
+			log.Error("update-groups-failed", log.Fields{"deviceID":deviceId})
+		}
 
+	}
 	return nil
 }
 
@@ -1009,12 +1028,14 @@
 	outPort := fd.GetPacketOutPort(packet)
 	//frame := packet.GetData()
 	//TODO: Use a channel between the logical agent and the device agent
-	agent.deviceMgr.packetOut(agent.rootDeviceId, outPort, packet)
+	if err := agent.deviceMgr.packetOut(agent.rootDeviceId, outPort, packet); err != nil {
+		log.Error("packetout-failed", log.Fields{"logicalDeviceID":agent.rootDeviceId})
+	}
 }
 
 func (agent *LogicalDeviceAgent) packetIn(port uint32, packet []byte) {
 	log.Debugw("packet-in", log.Fields{"port": port, "packet": packet})
-	packet_in := fd.MkPacketIn(port, packet)
-	agent.ldeviceMgr.grpcNbiHdlr.sendPacketIn(agent.logicalDeviceId, packet_in)
-	log.Debugw("sending-packet-in", log.Fields{"packet-in": packet_in})
+	packetIn := fd.MkPacketIn(port, packet)
+	agent.ldeviceMgr.grpcNbiHdlr.sendPacketIn(agent.logicalDeviceId, packetIn)
+	log.Debugw("sending-packet-in", log.Fields{"packet-in": packetIn})
 }
diff --git a/rw_core/main.go b/rw_core/main.go
index dd830c1..336e731 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -62,7 +62,7 @@
 	return nil, errors.New("unsupported-kv-store")
 }
 
-func newKafkaClient(clientType string, host string, port int) (kafka.Client, error) {
+func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
 
 	log.Infow("kafka-client-type", log.Fields{"client": clientType})
 	switch clientType {
@@ -70,9 +70,15 @@
 		return kafka.NewSaramaClient(
 			kafka.Host(host),
 			kafka.Port(port),
+			kafka.ConsumerType(kafka.GroupCustomer),
 			kafka.ProducerReturnOnErrors(true),
 			kafka.ProducerReturnOnSuccess(true),
 			kafka.ProducerMaxRetries(6),
+			kafka.NumPartitions(3),
+			kafka.ConsumerGroupName(instanceID),
+			kafka.ConsumerGroupPrefix(instanceID),
+			kafka.AutoCreateTopic(false),
+			kafka.ProducerFlushFrequency(5),
 			kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
 	}
 	return nil, errors.New("unsupported-client-type")
@@ -126,7 +132,7 @@
 	}
 
 	// Setup Kafka Client
-	if rw.kafkaClient, err = newKafkaClient("sarama", rw.config.KafkaAdapterHost, rw.config.KafkaAdapterPort); err != nil {
+	if rw.kafkaClient, err = newKafkaClient("sarama", rw.config.KafkaAdapterHost, rw.config.KafkaAdapterPort, rw.config.InstanceID); err != nil {
 		log.Fatal("Unsupported-kafka-client")
 	}
 
@@ -219,7 +225,7 @@
 	}
 
 	log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
-	//log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
+	log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
 
 	defer log.CleanUp()