[VOL-1024]  This update consists of:

1) Inter-container kafka library in Go
2) initial set of proto definitions
3) Test cases for the kafka library

Change-Id: Ibe8b0f673a90bbe4cb92847ce40f31ec2d0b6244
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index a223318..8b2c165 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -24,7 +24,7 @@
 	default_KVStoreTimeout   = 5 //in seconds
 	default_KVStoreHost      = "10.100.198.240"
 	default_KVStorePort      = 8500 // Etcd = 2379
-	default_LogLevel         = 1
+	default_LogLevel         = 0
 	default_Banner           = false
 	default_CoreTopic        = "rwcore"
 	default_RWCoreEndpoint   = "rwcore"
diff --git a/rw_core/core/requestHandlerProxy.go b/rw_core/core/requestHandlerProxy.go
new file mode 100644
index 0000000..4bfafa5
--- /dev/null
+++ b/rw_core/core/requestHandlerProxy.go
@@ -0,0 +1,154 @@
+package core
+
+import (
+	"github.com/opencord/voltha-go/common/log"
+	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	"github.com/opencord/voltha-go/protos/voltha"
+	"github.com/golang/protobuf/ptypes"
+	"errors"
+)
+
+type RequestHandlerProxy struct {
+	TestMode bool
+}
+
+func (rhp *RequestHandlerProxy) GetDevice(args []*ca.Argument)(error, *voltha.Device) {
+	if len(args) != 1 {
+		log.Warn("invalid-number-of-args", log.Fields{"args": args})
+		err := errors.New("invalid-number-of-args")
+		return err, nil
+	}
+	pID := &ca.StrType{}
+	if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
+		log.Warnw("cannot-unmarshal-argument", log.Fields{"error": err})
+		return err, nil
+	}
+	log.Debugw("GetDevice", log.Fields{"deviceId": pID.Val})
+	// TODO process the request
+
+	if rhp.TestMode { // Execute only for test cases
+		return nil, &voltha.Device{Id: pID.Val}
+	}
+	return nil, nil
+}
+
+
+func (rhp *RequestHandlerProxy) GetChildDevice(args []*ca.Argument)(error, *voltha.Device) {
+	if len(args) < 1 {
+		log.Warn("invalid-number-of-args", log.Fields{"args": args})
+		err := errors.New("invalid-number-of-args")
+		return err, nil
+	}
+	pID := &ca.StrType{}
+	if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
+		log.Warnw("cannot-unmarshal-argument", log.Fields{"error": err})
+		return err, nil
+	}
+	// TODO decompose the other parameteres for matching criteria and process
+	log.Debugw("GetChildDevice", log.Fields{"deviceId": pID.Val})
+
+	if rhp.TestMode { // Execute only for test cases
+		return nil, &voltha.Device{Id: pID.Val}
+	}
+	return nil, nil
+}
+
+func (rhp *RequestHandlerProxy) GetPorts(args []*ca.Argument)(error, *voltha.Ports) {
+	if len(args) != 2 {
+		log.Warn("invalid-number-of-args", log.Fields{"args": args})
+		err := errors.New("invalid-number-of-args")
+		return err, nil
+	}
+	pID := &ca.StrType{}
+	if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
+		log.Warnw("cannot-unmarshal-argument", log.Fields{"error": err})
+		return err, nil
+	}
+	// Porttype is an enum sent as an integer proto
+	pt := &ca.IntType{}
+	if err := ptypes.UnmarshalAny(args[1].Value, pt); err != nil {
+		log.Warnw("cannot-unmarshal-argument", log.Fields{"error": err})
+		return err, nil
+	}
+
+	// TODO decompose the other parameteres for matching criteria
+	log.Debugw("GetPorts", log.Fields{"deviceID": pID.Val, "portype": pt.Val})
+
+	if rhp.TestMode { // Execute only for test cases
+		aPort := &voltha.Port{Label:"test_port"}
+		allPorts := &voltha.Ports{}
+		allPorts.Items = append(allPorts.Items, aPort)
+		return nil, allPorts
+	}
+	return nil, nil
+
+}
+
+func (rhp *RequestHandlerProxy) GetChildDevices(args []*ca.Argument)(error, *voltha.Device) {
+	if len(args) != 1 {
+		log.Warn("invalid-number-of-args", log.Fields{"args": args})
+		err := errors.New("invalid-number-of-args")
+		return err, nil
+	}
+	pID := &ca.StrType{}
+	if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
+		log.Warnw("cannot-unmarshal-argument", log.Fields{"error": err})
+		return err, nil
+	}
+	// TODO decompose the other parameteres for matching criteria and process
+	log.Debugw("GetChildDevice", log.Fields{"deviceId": pID.Val})
+
+	if rhp.TestMode { // Execute only for test cases
+		return nil, &voltha.Device{Id: pID.Val}
+	}
+	return nil, nil
+}
+
+// ChildDeviceDetected is invoked when a child device is detected.  The following
+// parameters are expected:
+// {parent_device_id, parent_port_no, child_device_type, proxy_address, admin_state, **kw)
+func (rhp *RequestHandlerProxy) ChildDeviceDetected(args []*ca.Argument) (error) {
+	if len(args) < 5 {
+		log.Warn("invalid-number-of-args", log.Fields{"args": args})
+		err := errors.New("invalid-number-of-args")
+		return err
+	}
+
+	pID := &ca.StrType{}
+	if err := ptypes.UnmarshalAny(args[0].Value, pID); err != nil {
+		log.Warnw("cannot-unmarshal-argument", log.Fields{"error": err})
+		return err
+	}
+	portNo := &ca.IntType{}
+	if err := ptypes.UnmarshalAny(args[1].Value, portNo); err != nil {
+		log.Warnw("cannot-unmarshal-argument", log.Fields{"error": err})
+		return err
+	}
+	dt := &ca.StrType{}
+	if err := ptypes.UnmarshalAny(args[2].Value, dt); err != nil {
+		log.Warnw("cannot-unmarshal-argument", log.Fields{"error": err})
+		return err
+	}
+	pAddr := &voltha.Device_ProxyAddress{}
+	if err := ptypes.UnmarshalAny(args[3].Value, pAddr); err != nil {
+		log.Warnw("cannot-unmarshal-argument", log.Fields{"error": err})
+		return err
+	}
+	adminState := &ca.IntType{}
+	if err := ptypes.UnmarshalAny(args[4].Value, adminState); err != nil {
+		log.Warnw("cannot-unmarshal-argument", log.Fields{"error": err})
+		return err
+	}
+
+	// Need to decode the other params - in this case the key will represent the proto type
+	// TODO decompose the other parameteres for matching criteria and process
+	log.Debugw("ChildDeviceDetected", log.Fields{"deviceId": pID.Val, "portNo":portNo.Val,
+	"deviceType": dt.Val, "proxyAddress": pAddr, "adminState": adminState})
+
+	if rhp.TestMode { // Execute only for test cases
+		return nil
+	}
+	return nil
+}
+
+
diff --git a/rw_core/main.go b/rw_core/main.go
index faa5851..a0928c5 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -6,6 +6,8 @@
 	"fmt"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/kvstore"
+	"github.com/opencord/voltha-go/kafka"
+	ca "github.com/opencord/voltha-go/protos/core_adapter"
 	"github.com/opencord/voltha-go/rw_core/config"
 	"os"
 	"os/signal"
@@ -19,6 +21,9 @@
 	config      *config.RWCoreFlags
 	halted      bool
 	exitChannel chan int
+	kmp         *kafka.KafkaMessagingProxy
+	//For test
+	receiverChannels []<-chan *ca.InterContainerMessage
 }
 
 func newKVClient(storeType string, address string, timeout int) (kvstore.Client, error) {
@@ -38,6 +43,7 @@
 	rwCore.config = cf
 	rwCore.halted = false
 	rwCore.exitChannel = make(chan int, 1)
+	rwCore.receiverChannels = make([]<-chan *ca.InterContainerMessage, 0)
 	return &rwCore
 }
 
@@ -63,6 +69,7 @@
 	}
 }
 
+
 func (core *rwCore) start(ctx context.Context) {
 	log.Info("Starting RW Core components")
 	// Setup GRPC Server
@@ -70,13 +77,29 @@
 	// Setup KV Client
 
 	// Setup Kafka messaging services
-
+	var err error
+	if core.kmp, err = kafka.NewKafkaMessagingProxy(
+		kafka.KafkaHost("10.100.198.220"),
+		kafka.KafkaPort(9092),
+		kafka.DefaultTopic(&kafka.Topic{Name: "Adapter"})); err != nil {
+		log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
+		return
+	}
+	// Start the kafka messaging service - synchronous call to ensure
+	if err = core.kmp.Start(); err != nil {
+		log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
+	}
 }
 
 func (core *rwCore) stop() {
 	// Stop leadership tracking
 	core.halted = true
 
+	// Stop the Kafka messaging service
+	if core.kmp != nil {
+		core.kmp.Stop()
+	}
+
 	// send exit signal
 	core.exitChannel <- 0