VOL-3419: OpenOLT adapter at scale constantly takes more that 10 seconds to react to flows
- Pass information to agent to do the flow replication
- Consolidate various locks in the adapter and remove reduntant locks
- use voltha-proto version 4.0.2 and voltha-lib-go version 4.0.0
- Bump adapter version to 3.0.0

Change-Id: Ic053c54e5319bb1736ec74facfc79dd10058ecf5
diff --git a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/client.go b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/client.go
new file mode 100644
index 0000000..0337432
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/client.go
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"context"
+	ca "github.com/opencord/voltha-protos/v4/go/inter_container"
+	"time"
+)
+
+const (
+	PartitionConsumer = iota
+	GroupCustomer     = iota
+)
+
+const (
+	OffsetNewest = -1
+	OffsetOldest = -2
+)
+
+const (
+	GroupIdKey = "groupId"
+	Offset     = "offset"
+)
+
+const (
+	DefaultKafkaHost                = "127.0.0.1"
+	DefaultKafkaPort                = 9092
+	DefaultKafkaAddress             = DefaultKafkaHost + ":" + string(DefaultKafkaPort)
+	DefaultGroupName                = "voltha"
+	DefaultSleepOnError             = 1
+	DefaultProducerFlushFrequency   = 10
+	DefaultProducerFlushMessages    = 10
+	DefaultProducerFlushMaxmessages = 100
+	DefaultProducerReturnSuccess    = true
+	DefaultProducerReturnErrors     = true
+	DefaultProducerRetryMax         = 3
+	DefaultProducerRetryBackoff     = time.Millisecond * 100
+	DefaultConsumerMaxwait          = 100
+	DefaultMaxProcessingTime        = 100
+	DefaultConsumerType             = PartitionConsumer
+	DefaultNumberPartitions         = 3
+	DefaultNumberReplicas           = 1
+	DefaultAutoCreateTopic          = false
+	DefaultMetadataMaxRetry         = 3
+	DefaultLivenessChannelInterval  = time.Second * 30
+)
+
+// MsgClient represents the set of APIs  a Kafka MsgClient must implement
+type Client interface {
+	Start(ctx context.Context) error
+	Stop(ctx context.Context)
+	CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error
+	DeleteTopic(ctx context.Context, topic *Topic) error
+	Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan *ca.InterContainerMessage, error)
+	UnSubscribe(ctx context.Context, topic *Topic, ch <-chan *ca.InterContainerMessage) error
+	SubscribeForMetadata(context.Context, func(fromTopic string, timestamp time.Time))
+	Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error
+	SendLiveness(ctx context.Context) error
+	EnableLivenessChannel(ctx context.Context, enable bool) chan bool
+	EnableHealthinessChannel(ctx context.Context, enable bool) chan bool
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/common.go b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/common.go
new file mode 100644
index 0000000..5db364d
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/common.go
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+)
+
+var logger log.CLogger
+
+func init() {
+	// Setup this package so that it's log level can be modified at run time
+	var err error
+	logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{})
+	if err != nil {
+		panic(err)
+	}
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/endpoint_manager.go b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/endpoint_manager.go
new file mode 100644
index 0000000..796eb72
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/endpoint_manager.go
@@ -0,0 +1,352 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"context"
+	"fmt"
+	"github.com/buraksezer/consistent"
+	"github.com/cespare/xxhash"
+	"github.com/golang/protobuf/proto"
+	"github.com/opencord/voltha-lib-go/v4/pkg/db"
+	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	"github.com/opencord/voltha-protos/v4/go/voltha"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"sync"
+)
+
+const (
+	// All the values below can be tuned to get optimal data distribution.  The numbers below seems to work well when
+	// supporting 1000-10000 devices and 1 - 20 replicas of a service
+
+	// Keys are distributed among partitions. Prime numbers are good to distribute keys uniformly.
+	DefaultPartitionCount = 1117
+
+	// Represents how many times a node is replicated on the consistent ring.
+	DefaultReplicationFactor = 117
+
+	// Load is used to calculate average load.
+	DefaultLoad = 1.1
+)
+
+type Endpoint string // Endpoint of a service instance.  When using kafka, this is the topic name of a service
+type ReplicaID int32 // The replication ID of a service instance
+
+type EndpointManager interface {
+
+	// GetEndpoint is called to get the endpoint to communicate with for a specific device and service type.  For
+	// now this will return the topic name
+	GetEndpoint(ctx context.Context, deviceID string, serviceType string) (Endpoint, error)
+
+	// IsDeviceOwnedByService is invoked when a specific service (service type + replicaNumber) is restarted and
+	// devices owned by that service need to be reconciled
+	IsDeviceOwnedByService(ctx context.Context, deviceID string, serviceType string, replicaNumber int32) (bool, error)
+
+	// GetReplicaAssignment returns the replica number of the service that owns the deviceID.  This is used by the
+	// test only
+	GetReplicaAssignment(ctx context.Context, deviceID string, serviceType string) (ReplicaID, error)
+}
+
+type service struct {
+	id             string // Id of the service.  The same id is used for all replicas
+	totalReplicas  int32
+	replicas       map[ReplicaID]Endpoint
+	consistentRing *consistent.Consistent
+}
+
+type endpointManager struct {
+	partitionCount           int
+	replicationFactor        int
+	load                     float64
+	backend                  *db.Backend
+	services                 map[string]*service
+	servicesLock             sync.RWMutex
+	deviceTypeServiceMap     map[string]string
+	deviceTypeServiceMapLock sync.RWMutex
+}
+
+type EndpointManagerOption func(*endpointManager)
+
+func PartitionCount(count int) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.partitionCount = count
+	}
+}
+
+func ReplicationFactor(replicas int) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.replicationFactor = replicas
+	}
+}
+
+func Load(load float64) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.load = load
+	}
+}
+
+func newEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+	tm := &endpointManager{
+		partitionCount:       DefaultPartitionCount,
+		replicationFactor:    DefaultReplicationFactor,
+		load:                 DefaultLoad,
+		backend:              backend,
+		services:             make(map[string]*service),
+		deviceTypeServiceMap: make(map[string]string),
+	}
+
+	for _, option := range opts {
+		option(tm)
+	}
+	return tm
+}
+
+func NewEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+	return newEndpointManager(backend, opts...)
+}
+
+func (ep *endpointManager) GetEndpoint(ctx context.Context, deviceID string, serviceType string) (Endpoint, error) {
+	logger.Debugw(ctx, "getting-endpoint", log.Fields{"device-id": deviceID, "service": serviceType})
+	owner, err := ep.getOwner(ctx, deviceID, serviceType)
+	if err != nil {
+		return "", err
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return "", status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	endpoint := m.getEndPoint()
+	if endpoint == "" {
+		return "", status.Errorf(codes.Unavailable, "endpoint-not-set-%s", serviceType)
+	}
+	logger.Debugw(ctx, "returning-endpoint", log.Fields{"device-id": deviceID, "service": serviceType, "endpoint": endpoint})
+	return endpoint, nil
+}
+
+func (ep *endpointManager) IsDeviceOwnedByService(ctx context.Context, deviceID string, serviceType string, replicaNumber int32) (bool, error) {
+	logger.Debugw(ctx, "device-ownership", log.Fields{"device-id": deviceID, "service": serviceType, "replica-number": replicaNumber})
+	owner, err := ep.getOwner(ctx, deviceID, serviceType)
+	if err != nil {
+		return false, nil
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return false, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	return m.getReplica() == ReplicaID(replicaNumber), nil
+}
+
+func (ep *endpointManager) GetReplicaAssignment(ctx context.Context, deviceID string, serviceType string) (ReplicaID, error) {
+	owner, err := ep.getOwner(ctx, deviceID, serviceType)
+	if err != nil {
+		return 0, nil
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return 0, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	return m.getReplica(), nil
+}
+
+func (ep *endpointManager) getOwner(ctx context.Context, deviceID string, serviceType string) (consistent.Member, error) {
+	serv, dType, err := ep.getServiceAndDeviceType(ctx, serviceType)
+	if err != nil {
+		return nil, err
+	}
+	key := ep.makeKey(deviceID, dType, serviceType)
+	return serv.consistentRing.LocateKey(key), nil
+}
+
+func (ep *endpointManager) getServiceAndDeviceType(ctx context.Context, serviceType string) (*service, string, error) {
+	// Check whether service exist
+	ep.servicesLock.RLock()
+	serv, serviceExist := ep.services[serviceType]
+	ep.servicesLock.RUnlock()
+
+	// Load the service and device types if needed
+	if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+		if err := ep.loadServices(ctx); err != nil {
+			return nil, "", err
+		}
+
+		// Check whether the service exists now
+		ep.servicesLock.RLock()
+		serv, serviceExist = ep.services[serviceType]
+		ep.servicesLock.RUnlock()
+		if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+			return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+		}
+	}
+
+	ep.deviceTypeServiceMapLock.RLock()
+	defer ep.deviceTypeServiceMapLock.RUnlock()
+	for dType, sType := range ep.deviceTypeServiceMap {
+		if sType == serviceType {
+			return serv, dType, nil
+		}
+	}
+	return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+}
+
+func (ep *endpointManager) getConsistentConfig() consistent.Config {
+	return consistent.Config{
+		PartitionCount:    ep.partitionCount,
+		ReplicationFactor: ep.replicationFactor,
+		Load:              ep.load,
+		Hasher:            hasher{},
+	}
+}
+
+// loadServices loads the services (adapters) and device types in memory. Because of the small size of the data and
+// the data format in the dB being binary protobuf then it is better to load all the data if inconsistency is detected,
+// instead of watching for updates in the dB and acting on it.
+func (ep *endpointManager) loadServices(ctx context.Context) error {
+	ep.servicesLock.Lock()
+	defer ep.servicesLock.Unlock()
+	ep.deviceTypeServiceMapLock.Lock()
+	defer ep.deviceTypeServiceMapLock.Unlock()
+
+	if ep.backend == nil {
+		return status.Error(codes.Aborted, "backend-not-set")
+	}
+	ep.services = make(map[string]*service)
+	ep.deviceTypeServiceMap = make(map[string]string)
+
+	// Load the adapters
+	blobs, err := ep.backend.List(log.WithSpanFromContext(context.Background(), ctx), "adapters")
+	if err != nil {
+		return err
+	}
+
+	// Data is marshalled as proto bytes in the data store
+	for _, blob := range blobs {
+		data := blob.Value.([]byte)
+		adapter := &voltha.Adapter{}
+		if err := proto.Unmarshal(data, adapter); err != nil {
+			return err
+		}
+		// A valid adapter should have the vendorID set
+		if adapter.Vendor != "" {
+			if _, ok := ep.services[adapter.Type]; !ok {
+				ep.services[adapter.Type] = &service{
+					id:             adapter.Type,
+					totalReplicas:  adapter.TotalReplicas,
+					replicas:       make(map[ReplicaID]Endpoint),
+					consistentRing: consistent.New(nil, ep.getConsistentConfig()),
+				}
+
+			}
+			currentReplica := ReplicaID(adapter.CurrentReplica)
+			endpoint := Endpoint(adapter.Endpoint)
+			ep.services[adapter.Type].replicas[currentReplica] = endpoint
+			ep.services[adapter.Type].consistentRing.Add(newMember(adapter.Id, adapter.Type, adapter.Vendor, endpoint, adapter.Version, currentReplica))
+		}
+	}
+	// Load the device types
+	blobs, err = ep.backend.List(log.WithSpanFromContext(context.Background(), ctx), "device_types")
+	if err != nil {
+		return err
+	}
+	for _, blob := range blobs {
+		data := blob.Value.([]byte)
+		deviceType := &voltha.DeviceType{}
+		if err := proto.Unmarshal(data, deviceType); err != nil {
+			return err
+		}
+		if _, ok := ep.deviceTypeServiceMap[deviceType.Id]; !ok {
+			ep.deviceTypeServiceMap[deviceType.Id] = deviceType.Adapter
+		}
+	}
+
+	// Log the loaded data in debug mode to facilitate trouble shooting
+	if logger.V(log.DebugLevel) {
+		for key, val := range ep.services {
+			members := val.consistentRing.GetMembers()
+			logger.Debugw(ctx, "service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())})
+			for _, m := range members {
+				n := m.(Member)
+				logger.Debugw(ctx, "service-loaded", log.Fields{"serviceId": n.getID(), "serviceType": n.getServiceType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()})
+			}
+		}
+		logger.Debugw(ctx, "device-types-loaded", log.Fields{"device-types": ep.deviceTypeServiceMap})
+	}
+	return nil
+}
+
+// makeKey creates the string that the hash function uses to create the hash
+func (ep *endpointManager) makeKey(deviceID string, deviceType string, serviceType string) []byte {
+	return []byte(fmt.Sprintf("%s_%s_%s", serviceType, deviceType, deviceID))
+}
+
+// The consistent package requires a hasher function
+type hasher struct{}
+
+// Sum64 provides the hasher function.  Based upon numerous testing scenarios, the xxhash package seems to provide the
+// best distribution compare to other hash packages
+func (h hasher) Sum64(data []byte) uint64 {
+	return xxhash.Sum64(data)
+}
+
+// Member represents a member on the consistent ring
+type Member interface {
+	String() string
+	getReplica() ReplicaID
+	getEndPoint() Endpoint
+	getID() string
+	getServiceType() string
+}
+
+// member implements the Member interface
+type member struct {
+	id          string
+	serviceType string
+	vendor      string
+	version     string
+	replica     ReplicaID
+	endpoint    Endpoint
+}
+
+func newMember(ID string, serviceType string, vendor string, endPoint Endpoint, version string, replica ReplicaID) Member {
+	return &member{
+		id:          ID,
+		serviceType: serviceType,
+		vendor:      vendor,
+		version:     version,
+		replica:     replica,
+		endpoint:    endPoint,
+	}
+}
+
+func (m *member) String() string {
+	return string(m.endpoint)
+}
+
+func (m *member) getReplica() ReplicaID {
+	return m.replica
+}
+
+func (m *member) getEndPoint() Endpoint {
+	return m.endpoint
+}
+
+func (m *member) getID() string {
+	return m.id
+}
+
+func (m *member) getServiceType() string {
+	return m.serviceType
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/kafka_inter_container_library.go
new file mode 100644
index 0000000..3af35d7
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/kafka_inter_container_library.go
@@ -0,0 +1,1085 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"reflect"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/golang/protobuf/proto"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/golang/protobuf/ptypes/any"
+	"github.com/google/uuid"
+	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
+	"github.com/opentracing/opentracing-go"
+)
+
+const (
+	DefaultMaxRetries     = 3
+	DefaultRequestTimeout = 60000 // 60000 milliseconds - to handle a wider latency range
+)
+
+const (
+	TransactionKey = "transactionID"
+	FromTopic      = "fromTopic"
+)
+
+var ErrorTransactionNotAcquired = errors.New("transaction-not-acquired")
+var ErrorTransactionInvalidId = errors.New("transaction-invalid-id")
+
+// requestHandlerChannel represents an interface associated with a channel.  Whenever, an event is
+// obtained from that channel, this interface is invoked.   This is used to handle
+// async requests into the Core via the kafka messaging bus
+type requestHandlerChannel struct {
+	requesthandlerInterface interface{}
+	ch                      <-chan *ic.InterContainerMessage
+}
+
+// transactionChannel represents a combination of a topic and a channel onto which a response received
+// on the kafka bus will be sent to
+type transactionChannel struct {
+	topic *Topic
+	ch    chan *ic.InterContainerMessage
+}
+
+type InterContainerProxy interface {
+	Start(ctx context.Context) error
+	Stop(ctx context.Context)
+	GetDefaultTopic() *Topic
+	InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
+	InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
+	SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error
+	SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error
+	UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error
+	DeleteTopic(ctx context.Context, topic Topic) error
+	EnableLivenessChannel(ctx context.Context, enable bool) chan bool
+	SendLiveness(ctx context.Context) error
+}
+
+// interContainerProxy represents the messaging proxy
+type interContainerProxy struct {
+	kafkaAddress                   string
+	defaultTopic                   *Topic
+	defaultRequestHandlerInterface interface{}
+	kafkaClient                    Client
+	doneCh                         chan struct{}
+	doneOnce                       sync.Once
+
+	// This map is used to map a topic to an interface and channel.   When a request is received
+	// on that channel (registered to the topic) then that interface is invoked.
+	topicToRequestHandlerChannelMap   map[string]*requestHandlerChannel
+	lockTopicRequestHandlerChannelMap sync.RWMutex
+
+	// This map is used to map a channel to a response topic.   This channel handles all responses on that
+	// channel for that topic and forward them to the appropriate consumers channel, using the
+	// transactionIdToChannelMap.
+	topicToResponseChannelMap   map[string]<-chan *ic.InterContainerMessage
+	lockTopicResponseChannelMap sync.RWMutex
+
+	// This map is used to map a transaction to a consumers channel.  This is used whenever a request has been
+	// sent out and we are waiting for a response.
+	transactionIdToChannelMap     map[string]*transactionChannel
+	lockTransactionIdToChannelMap sync.RWMutex
+}
+
+type InterContainerProxyOption func(*interContainerProxy)
+
+func InterContainerAddress(address string) InterContainerProxyOption {
+	return func(args *interContainerProxy) {
+		args.kafkaAddress = address
+	}
+}
+
+func DefaultTopic(topic *Topic) InterContainerProxyOption {
+	return func(args *interContainerProxy) {
+		args.defaultTopic = topic
+	}
+}
+
+func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
+	return func(args *interContainerProxy) {
+		args.defaultRequestHandlerInterface = handler
+	}
+}
+
+func MsgClient(client Client) InterContainerProxyOption {
+	return func(args *interContainerProxy) {
+		args.kafkaClient = client
+	}
+}
+
+func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
+	proxy := &interContainerProxy{
+		kafkaAddress: DefaultKafkaAddress,
+		doneCh:       make(chan struct{}),
+	}
+
+	for _, option := range opts {
+		option(proxy)
+	}
+
+	return proxy
+}
+
+func NewInterContainerProxy(opts ...InterContainerProxyOption) InterContainerProxy {
+	return newInterContainerProxy(opts...)
+}
+
+func (kp *interContainerProxy) Start(ctx context.Context) error {
+	logger.Info(ctx, "Starting-Proxy")
+
+	// Kafka MsgClient should already have been created.  If not, output fatal error
+	if kp.kafkaClient == nil {
+		logger.Fatal(ctx, "kafka-client-not-set")
+	}
+
+	// Start the kafka client
+	if err := kp.kafkaClient.Start(ctx); err != nil {
+		logger.Errorw(ctx, "Cannot-create-kafka-proxy", log.Fields{"error": err})
+		return err
+	}
+
+	// Create the topic to response channel map
+	kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
+	//
+	// Create the transactionId to Channel Map
+	kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
+
+	// Create the topic to request channel map
+	kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
+
+	return nil
+}
+
+func (kp *interContainerProxy) Stop(ctx context.Context) {
+	logger.Info(ctx, "stopping-intercontainer-proxy")
+	kp.doneOnce.Do(func() { close(kp.doneCh) })
+	// TODO : Perform cleanup
+	kp.kafkaClient.Stop(ctx)
+	err := kp.deleteAllTopicRequestHandlerChannelMap(ctx)
+	if err != nil {
+		logger.Errorw(ctx, "failed-delete-all-topic-request-handler-channel-map", log.Fields{"error": err})
+	}
+	err = kp.deleteAllTopicResponseChannelMap(ctx)
+	if err != nil {
+		logger.Errorw(ctx, "failed-delete-all-topic-response-channel-map", log.Fields{"error": err})
+	}
+	kp.deleteAllTransactionIdToChannelMap(ctx)
+}
+
+func (kp *interContainerProxy) GetDefaultTopic() *Topic {
+	return kp.defaultTopic
+}
+
+// InvokeAsyncRPC is used to make an RPC request asynchronously
+func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
+	waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
+
+	logger.Debugw(ctx, "InvokeAsyncRPC", log.Fields{"rpc": rpc, "key": key})
+
+	spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, !waitForResponse)
+	if spanArg != nil {
+		kvArgs = append(kvArgs, &spanArg[0])
+	}
+	defer span.Finish()
+
+	//	If a replyToTopic is provided then we use it, otherwise just use the  default toTopic.  The replyToTopic is
+	// typically the device ID.
+	responseTopic := replyToTopic
+	if responseTopic == nil {
+		responseTopic = kp.GetDefaultTopic()
+	}
+
+	chnl := make(chan *RpcResponse)
+
+	go func() {
+
+		// once we're done,
+		// close the response channel
+		defer close(chnl)
+
+		var err error
+		var protoRequest *ic.InterContainerMessage
+
+		// Encode the request
+		protoRequest, err = encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
+		if err != nil {
+			logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
+			log.MarkSpanError(ctx, errors.New("cannot-format-request"))
+			chnl <- NewResponse(RpcFormattingError, err, nil)
+			return
+		}
+
+		// Subscribe for response, if needed, before sending request
+		var ch <-chan *ic.InterContainerMessage
+		if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
+			logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
+			log.MarkSpanError(ctx, errors.New("failed-to-subscribe-for-response"))
+			chnl <- NewResponse(RpcTransportError, err, nil)
+			return
+		}
+
+		// Send request - if the topic is formatted with a device Id then we will send the request using a
+		// specific key, hence ensuring a single partition is used to publish the request.  This ensures that the
+		// subscriber on that topic will receive the request in the order it was sent.  The key used is the deviceId.
+		logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
+
+		// if the message is not sent on kafka publish an event an close the channel
+		if err = kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
+			chnl <- NewResponse(RpcTransportError, err, nil)
+			return
+		}
+
+		// if the client is not waiting for a response send the ack and close the channel
+		chnl <- NewResponse(RpcSent, nil, nil)
+		if !waitForResponse {
+			return
+		}
+
+		defer func() {
+			// Remove the subscription for a response on return
+			if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
+				logger.Warnw(ctx, "invoke-async-rpc-unsubscriber-for-response-failed", log.Fields{"err": err})
+			}
+		}()
+
+		// Wait for response as well as timeout or cancellation
+		select {
+		case msg, ok := <-ch:
+			if !ok {
+				logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
+				log.MarkSpanError(ctx, errors.New("channel-closed"))
+				chnl <- NewResponse(RpcTransportError, status.Error(codes.Aborted, "channel closed"), nil)
+			}
+			logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
+			if responseBody, err := decodeResponse(ctx, msg); err != nil {
+				chnl <- NewResponse(RpcReply, err, nil)
+			} else {
+				if responseBody.Success {
+					chnl <- NewResponse(RpcReply, nil, responseBody.Result)
+				} else {
+					// response body contains an error
+					unpackErr := &ic.Error{}
+					if err := ptypes.UnmarshalAny(responseBody.Result, unpackErr); err != nil {
+						chnl <- NewResponse(RpcReply, err, nil)
+					} else {
+						chnl <- NewResponse(RpcReply, status.Error(codes.Internal, unpackErr.Reason), nil)
+					}
+				}
+			}
+		case <-ctx.Done():
+			logger.Errorw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
+			log.MarkSpanError(ctx, errors.New("context-cancelled"))
+			err := status.Error(codes.DeadlineExceeded, ctx.Err().Error())
+			chnl <- NewResponse(RpcTimeout, err, nil)
+		case <-kp.doneCh:
+			chnl <- NewResponse(RpcSystemClosing, nil, nil)
+			logger.Warnw(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
+		}
+	}()
+	return chnl
+}
+
+// Method to extract Open-tracing Span from Context and serialize it for transport over Kafka embedded as a additional argument.
+// Additional argument is injected using key as "span" and value as Span marshalled into a byte slice
+//
+// The span name is automatically constructed using the RPC name with following convention (<rpc-name> represents name of invoked method):
+// - RPC invoked in Sync manner (WaitForResponse=true) : kafka-rpc-<rpc-name>
+// - RPC invoked in Async manner (WaitForResponse=false) : kafka-async-rpc-<rpc-name>
+// - Inter Adapter RPC invoked in Sync manner (WaitForResponse=true) : kafka-inter-adapter-rpc-<rpc-name>
+// - Inter Adapter RPC invoked in Async manner (WaitForResponse=false) : kafka-inter-adapter-async-rpc-<rpc-name>
+func (kp *interContainerProxy) embedSpanAsArg(ctx context.Context, rpc string, isAsync bool) ([]KVArg, opentracing.Span, context.Context) {
+	var err error
+	var newCtx context.Context
+	var spanToInject opentracing.Span
+
+	var spanName strings.Builder
+	spanName.WriteString("kafka-")
+
+	// In case of inter adapter message, use Msg Type for constructing RPC name
+	if rpc == "process_inter_adapter_message" {
+		if msgType, ok := ctx.Value("inter-adapter-msg-type").(ic.InterAdapterMessageType_Types); ok {
+			spanName.WriteString("inter-adapter-")
+			rpc = msgType.String()
+		}
+	}
+
+	if isAsync {
+		spanName.WriteString("async-rpc-")
+	} else {
+		spanName.WriteString("rpc-")
+	}
+	spanName.WriteString(rpc)
+
+	if isAsync {
+		spanToInject, newCtx = log.CreateAsyncSpan(ctx, spanName.String())
+	} else {
+		spanToInject, newCtx = log.CreateChildSpan(ctx, spanName.String())
+	}
+
+	spanToInject.SetBaggageItem("rpc-span-name", spanName.String())
+
+	textMapCarrier := opentracing.TextMapCarrier(make(map[string]string))
+	if err = opentracing.GlobalTracer().Inject(spanToInject.Context(), opentracing.TextMap, textMapCarrier); err != nil {
+		logger.Warnw(ctx, "unable-to-serialize-span-to-textmap", log.Fields{"span": spanToInject, "error": err})
+		return nil, spanToInject, newCtx
+	}
+
+	var textMapJson []byte
+	if textMapJson, err = json.Marshal(textMapCarrier); err != nil {
+		logger.Warnw(ctx, "unable-to-marshal-textmap-to-json-string", log.Fields{"textMap": textMapCarrier, "error": err})
+		return nil, spanToInject, newCtx
+	}
+
+	spanArg := make([]KVArg, 1)
+	spanArg[0] = KVArg{Key: "span", Value: &ic.StrType{Val: string(textMapJson)}}
+	return spanArg, spanToInject, newCtx
+}
+
+// InvokeRPC is used to send a request to a given topic
+func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
+	waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
+
+	spanArg, span, ctx := kp.embedSpanAsArg(ctx, rpc, false)
+	if spanArg != nil {
+		kvArgs = append(kvArgs, &spanArg[0])
+	}
+	defer span.Finish()
+
+	//	If a replyToTopic is provided then we use it, otherwise just use the  default toTopic.  The replyToTopic is
+	// typically the device ID.
+	responseTopic := replyToTopic
+	if responseTopic == nil {
+		responseTopic = kp.defaultTopic
+	}
+
+	// Encode the request
+	protoRequest, err := encodeRequest(ctx, rpc, toTopic, responseTopic, key, kvArgs...)
+	if err != nil {
+		logger.Warnw(ctx, "cannot-format-request", log.Fields{"rpc": rpc, "error": err})
+		log.MarkSpanError(ctx, errors.New("cannot-format-request"))
+		return false, nil
+	}
+
+	// Subscribe for response, if needed, before sending request
+	var ch <-chan *ic.InterContainerMessage
+	if waitForResponse {
+		var err error
+		if ch, err = kp.subscribeForResponse(ctx, *responseTopic, protoRequest.Header.Id); err != nil {
+			logger.Errorw(ctx, "failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
+		}
+	}
+
+	// Send request - if the topic is formatted with a device Id then we will send the request using a
+	// specific key, hence ensuring a single partition is used to publish the request.  This ensures that the
+	// subscriber on that topic will receive the request in the order it was sent.  The key used is the deviceId.
+	//key := GetDeviceIdFromTopic(*toTopic)
+	logger.Debugw(ctx, "sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key, "xId": protoRequest.Header.Id})
+	go func() {
+		if err := kp.kafkaClient.Send(ctx, protoRequest, toTopic, key); err != nil {
+			log.MarkSpanError(ctx, errors.New("send-failed"))
+			logger.Errorw(ctx, "send-failed", log.Fields{
+				"topic": toTopic,
+				"key":   key,
+				"error": err})
+		}
+	}()
+
+	if waitForResponse {
+		// Create a child context based on the parent context, if any
+		var cancel context.CancelFunc
+		childCtx := context.Background()
+		if ctx == nil {
+			ctx, cancel = context.WithTimeout(context.Background(), DefaultRequestTimeout*time.Millisecond)
+		} else {
+			childCtx, cancel = context.WithTimeout(ctx, DefaultRequestTimeout*time.Millisecond)
+		}
+		defer cancel()
+
+		// Wait for response as well as timeout or cancellation
+		// Remove the subscription for a response on return
+		defer func() {
+			if err := kp.unSubscribeForResponse(ctx, protoRequest.Header.Id); err != nil {
+				logger.Errorw(ctx, "response-unsubscribe-failed", log.Fields{
+					"id":    protoRequest.Header.Id,
+					"error": err})
+			}
+		}()
+		select {
+		case msg, ok := <-ch:
+			if !ok {
+				logger.Warnw(ctx, "channel-closed", log.Fields{"rpc": rpc, "replyTopic": replyToTopic.Name})
+				log.MarkSpanError(ctx, errors.New("channel-closed"))
+				protoError := &ic.Error{Reason: "channel-closed"}
+				var marshalledArg *any.Any
+				if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
+					return false, nil // Should never happen
+				}
+				return false, marshalledArg
+			}
+			logger.Debugw(ctx, "received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
+			var responseBody *ic.InterContainerResponseBody
+			var err error
+			if responseBody, err = decodeResponse(ctx, msg); err != nil {
+				logger.Errorw(ctx, "decode-response-error", log.Fields{"error": err})
+				// FIXME we should return something
+			}
+			return responseBody.Success, responseBody.Result
+		case <-ctx.Done():
+			logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
+			log.MarkSpanError(ctx, errors.New("context-cancelled"))
+			//	 pack the error as proto any type
+			protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
+
+			var marshalledArg *any.Any
+			if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
+				return false, nil // Should never happen
+			}
+			return false, marshalledArg
+		case <-childCtx.Done():
+			logger.Debugw(ctx, "context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
+			log.MarkSpanError(ctx, errors.New("context-cancelled"))
+			//	 pack the error as proto any type
+			protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
+
+			var marshalledArg *any.Any
+			if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
+				return false, nil // Should never happen
+			}
+			return false, marshalledArg
+		case <-kp.doneCh:
+			logger.Infow(ctx, "received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
+			return true, nil
+		}
+	}
+	return true, nil
+}
+
+// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
+// when a message is received on a given topic
+func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic Topic, handler interface{}) error {
+
+	// Subscribe to receive messages for that topic
+	var ch <-chan *ic.InterContainerMessage
+	var err error
+	if ch, err = kp.kafkaClient.Subscribe(ctx, &topic); err != nil {
+		//if ch, err = kp.Subscribe(topic); err != nil {
+		logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+		return err
+	}
+
+	kp.defaultRequestHandlerInterface = handler
+	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
+	// Launch a go routine to receive and process kafka messages
+	go kp.waitForMessages(ctx, ch, topic, handler)
+
+	return nil
+}
+
+// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
+// when a message is received on a given topic.  So far there is only 1 target registered per microservice
+func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic Topic, initialOffset int64) error {
+	// Subscribe to receive messages for that topic
+	var ch <-chan *ic.InterContainerMessage
+	var err error
+	if ch, err = kp.kafkaClient.Subscribe(ctx, &topic, &KVArg{Key: Offset, Value: initialOffset}); err != nil {
+		logger.Errorw(ctx, "failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+		return err
+	}
+	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
+
+	// Launch a go routine to receive and process kafka messages
+	go kp.waitForMessages(ctx, ch, topic, kp.defaultRequestHandlerInterface)
+
+	return nil
+}
+
+func (kp *interContainerProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic Topic) error {
+	return kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name)
+}
+
+func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(ctx context.Context, topic string) error {
+	kp.lockTopicResponseChannelMap.Lock()
+	defer kp.lockTopicResponseChannelMap.Unlock()
+	if _, exist := kp.topicToResponseChannelMap[topic]; exist {
+		// Unsubscribe to this topic first - this will close the subscribed channel
+		var err error
+		if err = kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+			logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic})
+		}
+		delete(kp.topicToResponseChannelMap, topic)
+		return err
+	} else {
+		return fmt.Errorf("%s-Topic-not-found", topic)
+	}
+}
+
+// nolint: unused
+func (kp *interContainerProxy) deleteAllTopicResponseChannelMap(ctx context.Context) error {
+	logger.Debug(ctx, "delete-all-topic-response-channel")
+	kp.lockTopicResponseChannelMap.Lock()
+	defer kp.lockTopicResponseChannelMap.Unlock()
+	var unsubscribeFailTopics []string
+	for topic := range kp.topicToResponseChannelMap {
+		// Unsubscribe to this topic first - this will close the subscribed channel
+		if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+			unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
+			logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
+			// Do not return. Continue to try to unsubscribe to other topics.
+		} else {
+			// Only delete from channel map if successfully unsubscribed.
+			delete(kp.topicToResponseChannelMap, topic)
+		}
+	}
+	if len(unsubscribeFailTopics) > 0 {
+		return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+	}
+	return nil
+}
+
+func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
+	kp.lockTopicRequestHandlerChannelMap.Lock()
+	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
+	if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
+		kp.topicToRequestHandlerChannelMap[topic] = arg
+	}
+}
+
+func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(ctx context.Context, topic string) error {
+	kp.lockTopicRequestHandlerChannelMap.Lock()
+	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
+	if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
+		// Close the kafka client client first by unsubscribing to this topic
+		if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+			return err
+		}
+		delete(kp.topicToRequestHandlerChannelMap, topic)
+		return nil
+	} else {
+		return fmt.Errorf("%s-Topic-not-found", topic)
+	}
+}
+
+// nolint: unused
+func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap(ctx context.Context) error {
+	logger.Debug(ctx, "delete-all-topic-request-channel")
+	kp.lockTopicRequestHandlerChannelMap.Lock()
+	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
+	var unsubscribeFailTopics []string
+	for topic := range kp.topicToRequestHandlerChannelMap {
+		// Close the kafka client client first by unsubscribing to this topic
+		if err := kp.kafkaClient.UnSubscribe(ctx, &Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+			unsubscribeFailTopics = append(unsubscribeFailTopics, topic)
+			logger.Errorw(ctx, "unsubscribing-error", log.Fields{"topic": topic, "error": err})
+			// Do not return. Continue to try to unsubscribe to other topics.
+		} else {
+			// Only delete from channel map if successfully unsubscribed.
+			delete(kp.topicToRequestHandlerChannelMap, topic)
+		}
+	}
+	if len(unsubscribeFailTopics) > 0 {
+		return fmt.Errorf("unsubscribe-errors: %v", unsubscribeFailTopics)
+	}
+	return nil
+}
+
+func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
+	kp.lockTransactionIdToChannelMap.Lock()
+	defer kp.lockTransactionIdToChannelMap.Unlock()
+	if _, exist := kp.transactionIdToChannelMap[id]; !exist {
+		kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
+	}
+}
+
+func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
+	kp.lockTransactionIdToChannelMap.Lock()
+	defer kp.lockTransactionIdToChannelMap.Unlock()
+	if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
+		// Close the channel first
+		close(transChannel.ch)
+		delete(kp.transactionIdToChannelMap, id)
+	}
+}
+
+func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
+	kp.lockTransactionIdToChannelMap.Lock()
+	defer kp.lockTransactionIdToChannelMap.Unlock()
+	for key, value := range kp.transactionIdToChannelMap {
+		if value.topic.Name == id {
+			close(value.ch)
+			delete(kp.transactionIdToChannelMap, key)
+		}
+	}
+}
+
+// nolint: unused
+func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap(ctx context.Context) {
+	logger.Debug(ctx, "delete-all-transaction-id-channel-map")
+	kp.lockTransactionIdToChannelMap.Lock()
+	defer kp.lockTransactionIdToChannelMap.Unlock()
+	for key, value := range kp.transactionIdToChannelMap {
+		close(value.ch)
+		delete(kp.transactionIdToChannelMap, key)
+	}
+}
+
+func (kp *interContainerProxy) DeleteTopic(ctx context.Context, topic Topic) error {
+	// If we have any consumers on that topic we need to close them
+	if err := kp.deleteFromTopicResponseChannelMap(ctx, topic.Name); err != nil {
+		logger.Errorw(ctx, "delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
+	}
+	if err := kp.deleteFromTopicRequestHandlerChannelMap(ctx, topic.Name); err != nil {
+		logger.Errorw(ctx, "delete-from-topic-requesthandlerchannelmap-failed", log.Fields{"error": err})
+	}
+	kp.deleteTopicTransactionIdToChannelMap(topic.Name)
+
+	return kp.kafkaClient.DeleteTopic(ctx, &topic)
+}
+
+func encodeReturnedValue(ctx context.Context, returnedVal interface{}) (*any.Any, error) {
+	// Encode the response argument - needs to be a proto message
+	if returnedVal == nil {
+		return nil, nil
+	}
+	protoValue, ok := returnedVal.(proto.Message)
+	if !ok {
+		logger.Warnw(ctx, "response-value-not-proto-message", log.Fields{"error": ok, "returnVal": returnedVal})
+		err := errors.New("response-value-not-proto-message")
+		return nil, err
+	}
+
+	// Marshal the returned value, if any
+	var marshalledReturnedVal *any.Any
+	var err error
+	if marshalledReturnedVal, err = ptypes.MarshalAny(protoValue); err != nil {
+		logger.Warnw(ctx, "cannot-marshal-returned-val", log.Fields{"error": err})
+		return nil, err
+	}
+	return marshalledReturnedVal, nil
+}
+
+func encodeDefaultFailedResponse(ctx context.Context, request *ic.InterContainerMessage) *ic.InterContainerMessage {
+	responseHeader := &ic.Header{
+		Id:        request.Header.Id,
+		Type:      ic.MessageType_RESPONSE,
+		FromTopic: request.Header.ToTopic,
+		ToTopic:   request.Header.FromTopic,
+		Timestamp: ptypes.TimestampNow(),
+	}
+	responseBody := &ic.InterContainerResponseBody{
+		Success: false,
+		Result:  nil,
+	}
+	var marshalledResponseBody *any.Any
+	var err error
+	// Error should never happen here
+	if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
+		logger.Warnw(ctx, "cannot-marshal-failed-response-body", log.Fields{"error": err})
+	}
+
+	return &ic.InterContainerMessage{
+		Header: responseHeader,
+		Body:   marshalledResponseBody,
+	}
+
+}
+
+//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
+//or an error on failure
+func encodeResponse(ctx context.Context, request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
+	//logger.Debugw(ctx, "encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
+	responseHeader := &ic.Header{
+		Id:        request.Header.Id,
+		Type:      ic.MessageType_RESPONSE,
+		FromTopic: request.Header.ToTopic,
+		ToTopic:   request.Header.FromTopic,
+		KeyTopic:  request.Header.KeyTopic,
+		Timestamp: ptypes.TimestampNow(),
+	}
+
+	// Go over all returned values
+	var marshalledReturnedVal *any.Any
+	var err error
+
+	// for now we support only 1 returned value - (excluding the error)
+	if len(returnedValues) > 0 {
+		if marshalledReturnedVal, err = encodeReturnedValue(ctx, returnedValues[0]); err != nil {
+			logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
+		}
+	}
+
+	responseBody := &ic.InterContainerResponseBody{
+		Success: success,
+		Result:  marshalledReturnedVal,
+	}
+
+	// Marshal the response body
+	var marshalledResponseBody *any.Any
+	if marshalledResponseBody, err = ptypes.MarshalAny(responseBody); err != nil {
+		logger.Warnw(ctx, "cannot-marshal-response-body", log.Fields{"error": err})
+		return nil, err
+	}
+
+	return &ic.InterContainerMessage{
+		Header: responseHeader,
+		Body:   marshalledResponseBody,
+	}, nil
+}
+
+func CallFuncByName(ctx context.Context, myClass interface{}, funcName string, params ...interface{}) (out []reflect.Value, err error) {
+	myClassValue := reflect.ValueOf(myClass)
+	// Capitalize the first letter in the funcName to workaround the first capital letters required to
+	// invoke a function from a different package
+	funcName = strings.Title(funcName)
+	m := myClassValue.MethodByName(funcName)
+	if !m.IsValid() {
+		return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
+	}
+	in := make([]reflect.Value, len(params)+1)
+	in[0] = reflect.ValueOf(ctx)
+	for i, param := range params {
+		in[i+1] = reflect.ValueOf(param)
+	}
+	out = m.Call(in)
+	return
+}
+
+func (kp *interContainerProxy) addTransactionId(ctx context.Context, transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
+	arg := &KVArg{
+		Key:   TransactionKey,
+		Value: &ic.StrType{Val: transactionId},
+	}
+
+	var marshalledArg *any.Any
+	var err error
+	if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: transactionId}); err != nil {
+		logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
+		return currentArgs
+	}
+	protoArg := &ic.Argument{
+		Key:   arg.Key,
+		Value: marshalledArg,
+	}
+	return append(currentArgs, protoArg)
+}
+
+func (kp *interContainerProxy) addFromTopic(ctx context.Context, fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
+	var marshalledArg *any.Any
+	var err error
+	if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
+		logger.Warnw(ctx, "cannot-add-transactionId", log.Fields{"error": err})
+		return currentArgs
+	}
+	protoArg := &ic.Argument{
+		Key:   FromTopic,
+		Value: marshalledArg,
+	}
+	return append(currentArgs, protoArg)
+}
+
+// Method to extract the Span embedded in Kafka RPC request on the receiver side. If span is found embedded in the KV args (with key as "span"),
+// it is de-serialized and injected into the Context to be carried forward by the RPC request processor thread.
+// If no span is found embedded, even then a span is created with name as "kafka-rpc-<rpc-name>" to enrich the Context for RPC calls coming
+// from components currently not sending the span (e.g. openonu adapter)
+func (kp *interContainerProxy) enrichContextWithSpan(ctx context.Context, rpcName string, args []*ic.Argument) (opentracing.Span, context.Context) {
+
+	for _, arg := range args {
+		if arg.Key == "span" {
+			var err error
+			var textMapString ic.StrType
+			if err = ptypes.UnmarshalAny(arg.Value, &textMapString); err != nil {
+				logger.Warnw(ctx, "unable-to-unmarshal-kvarg-to-textmap-string", log.Fields{"value": arg.Value})
+				break
+			}
+
+			spanTextMap := make(map[string]string)
+			if err = json.Unmarshal([]byte(textMapString.Val), &spanTextMap); err != nil {
+				logger.Warnw(ctx, "unable-to-unmarshal-textmap-from-json-string", log.Fields{"textMapString": textMapString, "error": err})
+				break
+			}
+
+			var spanContext opentracing.SpanContext
+			if spanContext, err = opentracing.GlobalTracer().Extract(opentracing.TextMap, opentracing.TextMapCarrier(spanTextMap)); err != nil {
+				logger.Warnw(ctx, "unable-to-deserialize-textmap-to-span", log.Fields{"textMap": spanTextMap, "error": err})
+				break
+			}
+
+			var receivedRpcName string
+			extractBaggage := func(k, v string) bool {
+				if k == "rpc-span-name" {
+					receivedRpcName = v
+					return false
+				}
+				return true
+			}
+
+			spanContext.ForeachBaggageItem(extractBaggage)
+
+			return opentracing.StartSpanFromContext(ctx, receivedRpcName, opentracing.FollowsFrom(spanContext))
+		}
+	}
+
+	// Create new Child Span with rpc as name if no span details were received in kafka arguments
+	var spanName strings.Builder
+	spanName.WriteString("kafka-")
+
+	// In case of inter adapter message, use Msg Type for constructing RPC name
+	if rpcName == "process_inter_adapter_message" {
+		for _, arg := range args {
+			if arg.Key == "msg" {
+				iamsg := ic.InterAdapterMessage{}
+				if err := ptypes.UnmarshalAny(arg.Value, &iamsg); err == nil {
+					spanName.WriteString("inter-adapter-")
+					rpcName = iamsg.Header.Type.String()
+				}
+			}
+		}
+	}
+
+	spanName.WriteString("rpc-")
+	spanName.WriteString(rpcName)
+
+	return opentracing.StartSpanFromContext(ctx, spanName.String())
+}
+
+func (kp *interContainerProxy) handleMessage(ctx context.Context, msg *ic.InterContainerMessage, targetInterface interface{}) {
+
+	// First extract the header to know whether this is a request - responses are handled by a different handler
+	if msg.Header.Type == ic.MessageType_REQUEST {
+		var out []reflect.Value
+		var err error
+
+		// Get the request body
+		requestBody := &ic.InterContainerRequestBody{}
+		if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
+			logger.Warnw(ctx, "cannot-unmarshal-request", log.Fields{"error": err})
+		} else {
+			span, ctx := kp.enrichContextWithSpan(ctx, requestBody.Rpc, requestBody.Args)
+			defer span.Finish()
+
+			logger.Debugw(ctx, "received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
+			// let the callee unpack the arguments as its the only one that knows the real proto type
+			// Augment the requestBody with the message Id as it will be used in scenarios where cores
+			// are set in pairs and competing
+			requestBody.Args = kp.addTransactionId(ctx, msg.Header.Id, requestBody.Args)
+
+			// Augment the requestBody with the From topic name as it will be used in scenarios where a container
+			// needs to send an unsollicited message to the currently requested container
+			requestBody.Args = kp.addFromTopic(ctx, msg.Header.FromTopic, requestBody.Args)
+
+			out, err = CallFuncByName(ctx, targetInterface, requestBody.Rpc, requestBody.Args)
+			if err != nil {
+				logger.Warn(ctx, err)
+			}
+		}
+		// Response required?
+		if requestBody.ResponseRequired {
+			// If we already have an error before then just return that
+			var returnError *ic.Error
+			var returnedValues []interface{}
+			var success bool
+			if err != nil {
+				returnError = &ic.Error{Reason: err.Error()}
+				returnedValues = make([]interface{}, 1)
+				returnedValues[0] = returnError
+			} else {
+				returnedValues = make([]interface{}, 0)
+				// Check for errors first
+				lastIndex := len(out) - 1
+				if out[lastIndex].Interface() != nil { // Error
+					if retError, ok := out[lastIndex].Interface().(error); ok {
+						if retError.Error() == ErrorTransactionNotAcquired.Error() {
+							logger.Debugw(ctx, "Ignoring request", log.Fields{"error": retError, "txId": msg.Header.Id})
+							return // Ignore - process is in competing mode and ignored transaction
+						}
+						returnError = &ic.Error{Reason: retError.Error()}
+						returnedValues = append(returnedValues, returnError)
+					} else { // Should never happen
+						returnError = &ic.Error{Reason: "incorrect-error-returns"}
+						returnedValues = append(returnedValues, returnError)
+					}
+				} else if len(out) == 2 && reflect.ValueOf(out[0].Interface()).IsValid() && reflect.ValueOf(out[0].Interface()).IsNil() {
+					logger.Warnw(ctx, "Unexpected response of (nil,nil)", log.Fields{"txId": msg.Header.Id})
+					return // Ignore - should not happen
+				} else { // Non-error case
+					success = true
+					for idx, val := range out {
+						//logger.Debugw(ctx, "returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
+						if idx != lastIndex {
+							returnedValues = append(returnedValues, val.Interface())
+						}
+					}
+				}
+			}
+
+			var icm *ic.InterContainerMessage
+			if icm, err = encodeResponse(ctx, msg, success, returnedValues...); err != nil {
+				logger.Warnw(ctx, "error-encoding-response-returning-failure-result", log.Fields{"error": err})
+				icm = encodeDefaultFailedResponse(ctx, msg)
+			}
+			// To preserve ordering of messages, all messages to a given topic are sent to the same partition
+			// by providing a message key.   The key is encoded in the topic name.  If the deviceId is not
+			// present then the key will be empty, hence all messages for a given topic will be sent to all
+			// partitions.
+			replyTopic := &Topic{Name: msg.Header.FromTopic}
+			key := msg.Header.KeyTopic
+			logger.Debugw(ctx, "sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": icm.Header, "key": key})
+			// TODO: handle error response.
+			go func() {
+				if err := kp.kafkaClient.Send(ctx, icm, replyTopic, key); err != nil {
+					logger.Errorw(ctx, "send-reply-failed", log.Fields{
+						"topic": replyTopic,
+						"key":   key,
+						"error": err})
+				}
+			}()
+		}
+	} else if msg.Header.Type == ic.MessageType_RESPONSE {
+		logger.Debugw(ctx, "response-received", log.Fields{"msg-header": msg.Header})
+		go kp.dispatchResponse(ctx, msg)
+	} else {
+		logger.Warnw(ctx, "unsupported-message-received", log.Fields{"msg-header": msg.Header})
+	}
+}
+
+func (kp *interContainerProxy) waitForMessages(ctx context.Context, ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
+	//	Wait for messages
+	for msg := range ch {
+		//logger.Debugw(ctx, "request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
+		go kp.handleMessage(context.Background(), msg, targetInterface)
+	}
+}
+
+func (kp *interContainerProxy) dispatchResponse(ctx context.Context, msg *ic.InterContainerMessage) {
+	kp.lockTransactionIdToChannelMap.RLock()
+	defer kp.lockTransactionIdToChannelMap.RUnlock()
+	if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
+		logger.Debugw(ctx, "no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
+		return
+	}
+	kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
+}
+
+// subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
+// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
+// API. There is one response channel waiting for kafka messages before dispatching the message to the
+// corresponding waiting channel
+func (kp *interContainerProxy) subscribeForResponse(ctx context.Context, topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
+	logger.Debugw(ctx, "subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
+
+	// Create a specific channel for this consumers.  We cannot use the channel from the kafkaclient as it will
+	// broadcast any message for this topic to all channels waiting on it.
+	// Set channel size to 1 to prevent deadlock, see VOL-2708
+	ch := make(chan *ic.InterContainerMessage, 1)
+	kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
+
+	return ch, nil
+}
+
+func (kp *interContainerProxy) unSubscribeForResponse(ctx context.Context, trnsId string) error {
+	logger.Debugw(ctx, "unsubscribe-for-response", log.Fields{"trnsId": trnsId})
+	kp.deleteFromTransactionIdToChannelMap(trnsId)
+	return nil
+}
+
+func (kp *interContainerProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
+	return kp.kafkaClient.EnableLivenessChannel(ctx, enable)
+}
+
+func (kp *interContainerProxy) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
+	return kp.kafkaClient.EnableHealthinessChannel(ctx, enable)
+}
+
+func (kp *interContainerProxy) SendLiveness(ctx context.Context) error {
+	return kp.kafkaClient.SendLiveness(ctx)
+}
+
+//formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
+//or an error on failure
+func encodeRequest(ctx context.Context, rpc string, toTopic *Topic, replyTopic *Topic, key string, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
+	requestHeader := &ic.Header{
+		Id:        uuid.New().String(),
+		Type:      ic.MessageType_REQUEST,
+		FromTopic: replyTopic.Name,
+		ToTopic:   toTopic.Name,
+		KeyTopic:  key,
+		Timestamp: ptypes.TimestampNow(),
+	}
+	requestBody := &ic.InterContainerRequestBody{
+		Rpc:              rpc,
+		ResponseRequired: true,
+		ReplyToTopic:     replyTopic.Name,
+	}
+
+	for _, arg := range kvArgs {
+		if arg == nil {
+			// In case the caller sends an array with empty args
+			continue
+		}
+		var marshalledArg *any.Any
+		var err error
+		// ascertain the value interface type is a proto.Message
+		protoValue, ok := arg.Value.(proto.Message)
+		if !ok {
+			logger.Warnw(ctx, "argument-value-not-proto-message", log.Fields{"error": ok, "Value": arg.Value})
+			err := errors.New("argument-value-not-proto-message")
+			return nil, err
+		}
+		if marshalledArg, err = ptypes.MarshalAny(protoValue); err != nil {
+			logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
+			return nil, err
+		}
+		protoArg := &ic.Argument{
+			Key:   arg.Key,
+			Value: marshalledArg,
+		}
+		requestBody.Args = append(requestBody.Args, protoArg)
+	}
+
+	var marshalledData *any.Any
+	var err error
+	if marshalledData, err = ptypes.MarshalAny(requestBody); err != nil {
+		logger.Warnw(ctx, "cannot-marshal-request", log.Fields{"error": err})
+		return nil, err
+	}
+	request := &ic.InterContainerMessage{
+		Header: requestHeader,
+		Body:   marshalledData,
+	}
+	return request, nil
+}
+
+func decodeResponse(ctx context.Context, response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
+	//	Extract the message body
+	responseBody := ic.InterContainerResponseBody{}
+	if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
+		logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+		return nil, err
+	}
+	//logger.Debugw(ctx, "response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
+
+	return &responseBody, nil
+
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/sarama_client.go
new file mode 100644
index 0000000..1e4efae
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/sarama_client.go
@@ -0,0 +1,1154 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/Shopify/sarama"
+	scc "github.com/bsm/sarama-cluster"
+	"github.com/eapache/go-resiliency/breaker"
+	"github.com/golang/protobuf/proto"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/google/uuid"
+	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
+)
+
+// consumerChannels represents one or more consumers listening on a kafka topic.  Once a message is received on that
+// topic, the consumer(s) broadcasts the message to all the listening channels.   The consumer can be a partition
+//consumer or a group consumer
+type consumerChannels struct {
+	consumers []interface{}
+	channels  []chan *ic.InterContainerMessage
+}
+
+// static check to ensure SaramaClient implements Client
+var _ Client = &SaramaClient{}
+
+// SaramaClient represents the messaging proxy
+type SaramaClient struct {
+	cAdmin                        sarama.ClusterAdmin
+	KafkaAddress                  string
+	producer                      sarama.AsyncProducer
+	consumer                      sarama.Consumer
+	groupConsumers                map[string]*scc.Consumer
+	lockOfGroupConsumers          sync.RWMutex
+	consumerGroupPrefix           string
+	consumerType                  int
+	consumerGroupName             string
+	producerFlushFrequency        int
+	producerFlushMessages         int
+	producerFlushMaxmessages      int
+	producerRetryMax              int
+	producerRetryBackOff          time.Duration
+	producerReturnSuccess         bool
+	producerReturnErrors          bool
+	consumerMaxwait               int
+	maxProcessingTime             int
+	numPartitions                 int
+	numReplicas                   int
+	autoCreateTopic               bool
+	doneCh                        chan int
+	metadataCallback              func(fromTopic string, timestamp time.Time)
+	topicToConsumerChannelMap     map[string]*consumerChannels
+	lockTopicToConsumerChannelMap sync.RWMutex
+	topicLockMap                  map[string]*sync.RWMutex
+	lockOfTopicLockMap            sync.RWMutex
+	metadataMaxRetry              int
+	alive                         bool
+	livenessMutex                 sync.Mutex
+	liveness                      chan bool
+	livenessChannelInterval       time.Duration
+	lastLivenessTime              time.Time
+	started                       bool
+	healthinessMutex              sync.Mutex
+	healthy                       bool
+	healthiness                   chan bool
+}
+
+type SaramaClientOption func(*SaramaClient)
+
+func Address(address string) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.KafkaAddress = address
+	}
+}
+
+func ConsumerGroupPrefix(prefix string) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.consumerGroupPrefix = prefix
+	}
+}
+
+func ConsumerGroupName(name string) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.consumerGroupName = name
+	}
+}
+
+func ConsumerType(consumer int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.consumerType = consumer
+	}
+}
+
+func ProducerFlushFrequency(frequency int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.producerFlushFrequency = frequency
+	}
+}
+
+func ProducerFlushMessages(num int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.producerFlushMessages = num
+	}
+}
+
+func ProducerFlushMaxMessages(num int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.producerFlushMaxmessages = num
+	}
+}
+
+func ProducerMaxRetries(num int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.producerRetryMax = num
+	}
+}
+
+func ProducerRetryBackoff(duration time.Duration) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.producerRetryBackOff = duration
+	}
+}
+
+func ProducerReturnOnErrors(opt bool) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.producerReturnErrors = opt
+	}
+}
+
+func ProducerReturnOnSuccess(opt bool) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.producerReturnSuccess = opt
+	}
+}
+
+func ConsumerMaxWait(wait int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.consumerMaxwait = wait
+	}
+}
+
+func MaxProcessingTime(pTime int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.maxProcessingTime = pTime
+	}
+}
+
+func NumPartitions(number int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.numPartitions = number
+	}
+}
+
+func NumReplicas(number int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.numReplicas = number
+	}
+}
+
+func AutoCreateTopic(opt bool) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.autoCreateTopic = opt
+	}
+}
+
+func MetadatMaxRetries(retry int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.metadataMaxRetry = retry
+	}
+}
+
+func LivenessChannelInterval(opt time.Duration) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.livenessChannelInterval = opt
+	}
+}
+
+func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
+	client := &SaramaClient{
+		KafkaAddress: DefaultKafkaAddress,
+	}
+	client.consumerType = DefaultConsumerType
+	client.producerFlushFrequency = DefaultProducerFlushFrequency
+	client.producerFlushMessages = DefaultProducerFlushMessages
+	client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
+	client.producerReturnErrors = DefaultProducerReturnErrors
+	client.producerReturnSuccess = DefaultProducerReturnSuccess
+	client.producerRetryMax = DefaultProducerRetryMax
+	client.producerRetryBackOff = DefaultProducerRetryBackoff
+	client.consumerMaxwait = DefaultConsumerMaxwait
+	client.maxProcessingTime = DefaultMaxProcessingTime
+	client.numPartitions = DefaultNumberPartitions
+	client.numReplicas = DefaultNumberReplicas
+	client.autoCreateTopic = DefaultAutoCreateTopic
+	client.metadataMaxRetry = DefaultMetadataMaxRetry
+	client.livenessChannelInterval = DefaultLivenessChannelInterval
+
+	for _, option := range opts {
+		option(client)
+	}
+
+	client.groupConsumers = make(map[string]*scc.Consumer)
+
+	client.lockTopicToConsumerChannelMap = sync.RWMutex{}
+	client.topicLockMap = make(map[string]*sync.RWMutex)
+	client.lockOfTopicLockMap = sync.RWMutex{}
+	client.lockOfGroupConsumers = sync.RWMutex{}
+
+	// healthy and alive until proven otherwise
+	client.alive = true
+	client.healthy = true
+
+	return client
+}
+
+func (sc *SaramaClient) Start(ctx context.Context) error {
+	logger.Info(ctx, "Starting-kafka-sarama-client")
+
+	// Create the Done channel
+	sc.doneCh = make(chan int, 1)
+
+	var err error
+
+	// Add a cleanup in case of failure to startup
+	defer func() {
+		if err != nil {
+			sc.Stop(ctx)
+		}
+	}()
+
+	// Create the Cluster Admin
+	if err = sc.createClusterAdmin(ctx); err != nil {
+		logger.Errorw(ctx, "Cannot-create-cluster-admin", log.Fields{"error": err})
+		return err
+	}
+
+	// Create the Publisher
+	if err := sc.createPublisher(ctx); err != nil {
+		logger.Errorw(ctx, "Cannot-create-kafka-publisher", log.Fields{"error": err})
+		return err
+	}
+
+	if sc.consumerType == DefaultConsumerType {
+		// Create the master consumers
+		if err := sc.createConsumer(ctx); err != nil {
+			logger.Errorw(ctx, "Cannot-create-kafka-consumers", log.Fields{"error": err})
+			return err
+		}
+	}
+
+	// Create the topic to consumers/channel map
+	sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
+
+	logger.Info(ctx, "kafka-sarama-client-started")
+
+	sc.started = true
+
+	return nil
+}
+
+func (sc *SaramaClient) Stop(ctx context.Context) {
+	logger.Info(ctx, "stopping-sarama-client")
+
+	sc.started = false
+
+	//Send a message over the done channel to close all long running routines
+	sc.doneCh <- 1
+
+	if sc.producer != nil {
+		if err := sc.producer.Close(); err != nil {
+			logger.Errorw(ctx, "closing-producer-failed", log.Fields{"error": err})
+		}
+	}
+
+	if sc.consumer != nil {
+		if err := sc.consumer.Close(); err != nil {
+			logger.Errorw(ctx, "closing-partition-consumer-failed", log.Fields{"error": err})
+		}
+	}
+
+	for key, val := range sc.groupConsumers {
+		logger.Debugw(ctx, "closing-group-consumer", log.Fields{"topic": key})
+		if err := val.Close(); err != nil {
+			logger.Errorw(ctx, "closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
+		}
+	}
+
+	if sc.cAdmin != nil {
+		if err := sc.cAdmin.Close(); err != nil {
+			logger.Errorw(ctx, "closing-cluster-admin-failed", log.Fields{"error": err})
+		}
+	}
+
+	//TODO: Clear the consumers map
+	//sc.clearConsumerChannelMap()
+
+	logger.Info(ctx, "sarama-client-stopped")
+}
+
+//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
+// the invoking function must hold the lock
+func (sc *SaramaClient) createTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
+	// Set the topic details
+	topicDetail := &sarama.TopicDetail{}
+	topicDetail.NumPartitions = int32(numPartition)
+	topicDetail.ReplicationFactor = int16(repFactor)
+	topicDetail.ConfigEntries = make(map[string]*string)
+	topicDetails := make(map[string]*sarama.TopicDetail)
+	topicDetails[topic.Name] = topicDetail
+
+	if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
+		if err == sarama.ErrTopicAlreadyExists {
+			//	Not an error
+			logger.Debugw(ctx, "topic-already-exist", log.Fields{"topic": topic.Name})
+			return nil
+		}
+		logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
+		return err
+	}
+	// TODO: Wait until the topic has been created.  No API is available in the Sarama clusterAdmin to
+	// do so.
+	logger.Debugw(ctx, "topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
+	return nil
+}
+
+//CreateTopic is a public API to create a topic on the Kafka Broker.  It uses a lock on a specific topic to
+// ensure no two go routines are performing operations on the same topic
+func (sc *SaramaClient) CreateTopic(ctx context.Context, topic *Topic, numPartition int, repFactor int) error {
+	sc.lockTopic(topic)
+	defer sc.unLockTopic(topic)
+
+	return sc.createTopic(ctx, topic, numPartition, repFactor)
+}
+
+//DeleteTopic removes a topic from the kafka Broker
+func (sc *SaramaClient) DeleteTopic(ctx context.Context, topic *Topic) error {
+	sc.lockTopic(topic)
+	defer sc.unLockTopic(topic)
+
+	// Remove the topic from the broker
+	if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
+		if err == sarama.ErrUnknownTopicOrPartition {
+			//	Not an error as does not exist
+			logger.Debugw(ctx, "topic-not-exist", log.Fields{"topic": topic.Name})
+			return nil
+		}
+		logger.Errorw(ctx, "delete-topic-failed", log.Fields{"topic": topic, "error": err})
+		return err
+	}
+
+	// Clear the topic from the consumer channel.  This will also close any consumers listening on that topic.
+	if err := sc.clearTopicFromConsumerChannelMap(ctx, *topic); err != nil {
+		logger.Errorw(ctx, "failure-clearing-channels", log.Fields{"topic": topic, "error": err})
+		return err
+	}
+	return nil
+}
+
+// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
+// messages from that topic
+func (sc *SaramaClient) Subscribe(ctx context.Context, topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
+	sc.lockTopic(topic)
+	defer sc.unLockTopic(topic)
+
+	logger.Debugw(ctx, "subscribe", log.Fields{"topic": topic.Name})
+
+	// If a consumers already exist for that topic then resuse it
+	if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
+		logger.Debugw(ctx, "topic-already-subscribed", log.Fields{"topic": topic.Name})
+		// Create a channel specific for that consumers and add it to the consumers channel map
+		ch := make(chan *ic.InterContainerMessage)
+		sc.addChannelToConsumerChannelMap(ctx, topic, ch)
+		return ch, nil
+	}
+
+	// Register for the topic and set it up
+	var consumerListeningChannel chan *ic.InterContainerMessage
+	var err error
+
+	// Use the consumerType option to figure out the type of consumer to launch
+	if sc.consumerType == PartitionConsumer {
+		if sc.autoCreateTopic {
+			if err = sc.createTopic(ctx, topic, sc.numPartitions, sc.numReplicas); err != nil {
+				logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
+				return nil, err
+			}
+		}
+		if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(ctx, topic, getOffset(kvArgs...)); err != nil {
+			logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+			return nil, err
+		}
+	} else if sc.consumerType == GroupCustomer {
+		// TODO: create topic if auto create is on.  There is an issue with the sarama cluster library that
+		// does not consume from a precreated topic in some scenarios
+		//if sc.autoCreateTopic {
+		//	if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
+		//		logger.Errorw(ctx, "create-topic-failure", logger.Fields{"error": err, "topic": topic.Name})
+		//		return nil, err
+		//	}
+		//}
+		//groupId := sc.consumerGroupName
+		groupId := getGroupId(kvArgs...)
+		// Include the group prefix
+		if groupId != "" {
+			groupId = sc.consumerGroupPrefix + groupId
+		} else {
+			// Need to use a unique group Id per topic
+			groupId = sc.consumerGroupPrefix + topic.Name
+		}
+		if consumerListeningChannel, err = sc.setupGroupConsumerChannel(ctx, topic, groupId, getOffset(kvArgs...)); err != nil {
+			logger.Warnw(ctx, "create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+			return nil, err
+		}
+
+	} else {
+		logger.Warnw(ctx, "unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
+		return nil, errors.New("unknown-consumer-type")
+	}
+
+	return consumerListeningChannel, nil
+}
+
+//UnSubscribe unsubscribe a consumer from a given topic
+func (sc *SaramaClient) UnSubscribe(ctx context.Context, topic *Topic, ch <-chan *ic.InterContainerMessage) error {
+	sc.lockTopic(topic)
+	defer sc.unLockTopic(topic)
+
+	logger.Debugw(ctx, "unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
+	var err error
+	if err = sc.removeChannelFromConsumerChannelMap(ctx, *topic, ch); err != nil {
+		logger.Errorw(ctx, "failed-removing-channel", log.Fields{"error": err})
+	}
+	if err = sc.deleteFromGroupConsumers(ctx, topic.Name); err != nil {
+		logger.Errorw(ctx, "failed-deleting-group-consumer", log.Fields{"error": err})
+	}
+	return err
+}
+
+func (sc *SaramaClient) SubscribeForMetadata(ctx context.Context, callback func(fromTopic string, timestamp time.Time)) {
+	sc.metadataCallback = callback
+}
+
+func (sc *SaramaClient) updateLiveness(ctx context.Context, alive bool) {
+	// Post a consistent stream of liveness data to the channel,
+	// so that in a live state, the core does not timeout and
+	// send a forced liveness message. Production of liveness
+	// events to the channel is rate-limited by livenessChannelInterval.
+	sc.livenessMutex.Lock()
+	defer sc.livenessMutex.Unlock()
+	if sc.liveness != nil {
+		if sc.alive != alive {
+			logger.Info(ctx, "update-liveness-channel-because-change")
+			sc.liveness <- alive
+			sc.lastLivenessTime = time.Now()
+		} else if time.Since(sc.lastLivenessTime) > sc.livenessChannelInterval {
+			logger.Info(ctx, "update-liveness-channel-because-interval")
+			sc.liveness <- alive
+			sc.lastLivenessTime = time.Now()
+		}
+	}
+
+	// Only emit a log message when the state changes
+	if sc.alive != alive {
+		logger.Info(ctx, "set-client-alive", log.Fields{"alive": alive})
+		sc.alive = alive
+	}
+}
+
+// Once unhealthy, we never go back
+func (sc *SaramaClient) setUnhealthy(ctx context.Context) {
+	sc.healthy = false
+	sc.healthinessMutex.Lock()
+	defer sc.healthinessMutex.Unlock()
+	if sc.healthiness != nil {
+		logger.Infow(ctx, "set-client-unhealthy", log.Fields{"healthy": sc.healthy})
+		sc.healthiness <- sc.healthy
+	}
+}
+
+func (sc *SaramaClient) isLivenessError(ctx context.Context, err error) bool {
+	// Sarama producers and consumers encapsulate the error inside
+	// a ProducerError or ConsumerError struct.
+	if prodError, ok := err.(*sarama.ProducerError); ok {
+		err = prodError.Err
+	} else if consumerError, ok := err.(*sarama.ConsumerError); ok {
+		err = consumerError.Err
+	}
+
+	// Sarama-Cluster will compose the error into a ClusterError struct,
+	// which we can't do a compare by reference. To handle that, we the
+	// best we can do is compare the error strings.
+
+	switch err.Error() {
+	case context.DeadlineExceeded.Error():
+		logger.Info(ctx, "is-liveness-error-timeout")
+		return true
+	case sarama.ErrOutOfBrokers.Error(): // "Kafka: client has run out of available brokers"
+		logger.Info(ctx, "is-liveness-error-no-brokers")
+		return true
+	case sarama.ErrShuttingDown.Error(): // "Kafka: message received by producer in process of shutting down"
+		logger.Info(ctx, "is-liveness-error-shutting-down")
+		return true
+	case sarama.ErrControllerNotAvailable.Error(): // "Kafka: controller is not available"
+		logger.Info(ctx, "is-liveness-error-not-available")
+		return true
+	case breaker.ErrBreakerOpen.Error(): // "circuit breaker is open"
+		logger.Info(ctx, "is-liveness-error-circuit-breaker-open")
+		return true
+	}
+
+	if strings.HasSuffix(err.Error(), "connection refused") { // "dial tcp 10.244.1.176:9092: connect: connection refused"
+		logger.Info(ctx, "is-liveness-error-connection-refused")
+		return true
+	}
+
+	if strings.HasSuffix(err.Error(), "i/o timeout") { // "dial tcp 10.244.1.176:9092: i/o timeout"
+		logger.Info(ctx, "is-liveness-error-io-timeout")
+		return true
+	}
+
+	// Other errors shouldn't trigger a loss of liveness
+
+	logger.Infow(ctx, "is-liveness-error-ignored", log.Fields{"err": err})
+
+	return false
+}
+
+// send formats and sends the request onto the kafka messaging bus.
+func (sc *SaramaClient) Send(ctx context.Context, msg interface{}, topic *Topic, keys ...string) error {
+
+	// Assert message is a proto message
+	var protoMsg proto.Message
+	var ok bool
+	// ascertain the value interface type is a proto.Message
+	if protoMsg, ok = msg.(proto.Message); !ok {
+		logger.Warnw(ctx, "message-not-proto-message", log.Fields{"msg": msg})
+		return fmt.Errorf("not-a-proto-msg-%s", msg)
+	}
+
+	var marshalled []byte
+	var err error
+	//	Create the Sarama producer message
+	if marshalled, err = proto.Marshal(protoMsg); err != nil {
+		logger.Errorw(ctx, "marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
+		return err
+	}
+	key := ""
+	if len(keys) > 0 {
+		key = keys[0] // Only the first key is relevant
+	}
+	kafkaMsg := &sarama.ProducerMessage{
+		Topic: topic.Name,
+		Key:   sarama.StringEncoder(key),
+		Value: sarama.ByteEncoder(marshalled),
+	}
+
+	// Send message to kafka
+	sc.producer.Input() <- kafkaMsg
+	// Wait for result
+	// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
+	select {
+	case ok := <-sc.producer.Successes():
+		logger.Debugw(ctx, "message-sent", log.Fields{"status": ok.Topic})
+		sc.updateLiveness(ctx, true)
+	case notOk := <-sc.producer.Errors():
+		logger.Debugw(ctx, "error-sending", log.Fields{"status": notOk})
+		if sc.isLivenessError(ctx, notOk) {
+			sc.updateLiveness(ctx, false)
+		}
+		return notOk
+	}
+	return nil
+}
+
+// Enable the liveness monitor channel. This channel will report
+// a "true" or "false" on every publish, which indicates whether
+// or not the channel is still live. This channel is then picked up
+// by the service (i.e. rw_core / ro_core) to update readiness status
+// and/or take other actions.
+func (sc *SaramaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
+	logger.Infow(ctx, "kafka-enable-liveness-channel", log.Fields{"enable": enable})
+	if enable {
+		sc.livenessMutex.Lock()
+		defer sc.livenessMutex.Unlock()
+		if sc.liveness == nil {
+			logger.Info(ctx, "kafka-create-liveness-channel")
+			// At least 1, so we can immediately post to it without blocking
+			// Setting a bigger number (10) allows the monitor to fall behind
+			// without blocking others. The monitor shouldn't really fall
+			// behind...
+			sc.liveness = make(chan bool, 10)
+			// post intial state to the channel
+			sc.liveness <- sc.alive
+		}
+	} else {
+		// TODO: Think about whether we need the ability to turn off
+		// liveness monitoring
+		panic("Turning off liveness reporting is not supported")
+	}
+	return sc.liveness
+}
+
+// Enable the Healthiness monitor channel. This channel will report "false"
+// if the kafka consumers die, or some other problem occurs which is
+// catastrophic that would require re-creating the client.
+func (sc *SaramaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
+	logger.Infow(ctx, "kafka-enable-healthiness-channel", log.Fields{"enable": enable})
+	if enable {
+		sc.healthinessMutex.Lock()
+		defer sc.healthinessMutex.Unlock()
+		if sc.healthiness == nil {
+			logger.Info(ctx, "kafka-create-healthiness-channel")
+			// At least 1, so we can immediately post to it without blocking
+			// Setting a bigger number (10) allows the monitor to fall behind
+			// without blocking others. The monitor shouldn't really fall
+			// behind...
+			sc.healthiness = make(chan bool, 10)
+			// post intial state to the channel
+			sc.healthiness <- sc.healthy
+		}
+	} else {
+		// TODO: Think about whether we need the ability to turn off
+		// liveness monitoring
+		panic("Turning off healthiness reporting is not supported")
+	}
+	return sc.healthiness
+}
+
+// send an empty message on the liveness channel to check whether connectivity has
+// been restored.
+func (sc *SaramaClient) SendLiveness(ctx context.Context) error {
+	if !sc.started {
+		return fmt.Errorf("SendLiveness() called while not started")
+	}
+
+	kafkaMsg := &sarama.ProducerMessage{
+		Topic: "_liveness_test",
+		Value: sarama.StringEncoder(time.Now().Format(time.RFC3339)), // for debugging / informative use
+	}
+
+	// Send message to kafka
+	sc.producer.Input() <- kafkaMsg
+	// Wait for result
+	// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
+	select {
+	case ok := <-sc.producer.Successes():
+		logger.Debugw(ctx, "liveness-message-sent", log.Fields{"status": ok.Topic})
+		sc.updateLiveness(ctx, true)
+	case notOk := <-sc.producer.Errors():
+		logger.Debugw(ctx, "liveness-error-sending", log.Fields{"status": notOk})
+		if sc.isLivenessError(ctx, notOk) {
+			sc.updateLiveness(ctx, false)
+		}
+		return notOk
+	}
+	return nil
+}
+
+// getGroupId returns the group id from the key-value args.
+func getGroupId(kvArgs ...*KVArg) string {
+	for _, arg := range kvArgs {
+		if arg.Key == GroupIdKey {
+			return arg.Value.(string)
+		}
+	}
+	return ""
+}
+
+// getOffset returns the offset from the key-value args.
+func getOffset(kvArgs ...*KVArg) int64 {
+	for _, arg := range kvArgs {
+		if arg.Key == Offset {
+			return arg.Value.(int64)
+		}
+	}
+	return sarama.OffsetNewest
+}
+
+func (sc *SaramaClient) createClusterAdmin(ctx context.Context) error {
+	config := sarama.NewConfig()
+	config.Version = sarama.V1_0_0_0
+
+	// Create a cluster Admin
+	var cAdmin sarama.ClusterAdmin
+	var err error
+	if cAdmin, err = sarama.NewClusterAdmin([]string{sc.KafkaAddress}, config); err != nil {
+		logger.Errorw(ctx, "cluster-admin-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
+		return err
+	}
+	sc.cAdmin = cAdmin
+	return nil
+}
+
+func (sc *SaramaClient) lockTopic(topic *Topic) {
+	sc.lockOfTopicLockMap.Lock()
+	if _, exist := sc.topicLockMap[topic.Name]; exist {
+		sc.lockOfTopicLockMap.Unlock()
+		sc.topicLockMap[topic.Name].Lock()
+	} else {
+		sc.topicLockMap[topic.Name] = &sync.RWMutex{}
+		sc.lockOfTopicLockMap.Unlock()
+		sc.topicLockMap[topic.Name].Lock()
+	}
+}
+
+func (sc *SaramaClient) unLockTopic(topic *Topic) {
+	sc.lockOfTopicLockMap.Lock()
+	defer sc.lockOfTopicLockMap.Unlock()
+	if _, exist := sc.topicLockMap[topic.Name]; exist {
+		sc.topicLockMap[topic.Name].Unlock()
+	}
+}
+
+func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
+	sc.lockTopicToConsumerChannelMap.Lock()
+	defer sc.lockTopicToConsumerChannelMap.Unlock()
+	if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
+		sc.topicToConsumerChannelMap[id] = arg
+	}
+}
+
+func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
+	sc.lockTopicToConsumerChannelMap.RLock()
+	defer sc.lockTopicToConsumerChannelMap.RUnlock()
+
+	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
+		return consumerCh
+	}
+	return nil
+}
+
+func (sc *SaramaClient) addChannelToConsumerChannelMap(ctx context.Context, topic *Topic, ch chan *ic.InterContainerMessage) {
+	sc.lockTopicToConsumerChannelMap.Lock()
+	defer sc.lockTopicToConsumerChannelMap.Unlock()
+	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
+		consumerCh.channels = append(consumerCh.channels, ch)
+		return
+	}
+	logger.Warnw(ctx, "consumers-channel-not-exist", log.Fields{"topic": topic.Name})
+}
+
+//closeConsumers closes a list of sarama consumers.  The consumers can either be a partition consumers or a group consumers
+func closeConsumers(ctx context.Context, consumers []interface{}) error {
+	var err error
+	for _, consumer := range consumers {
+		//	Is it a partition consumers?
+		if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
+			if errTemp := partionConsumer.Close(); errTemp != nil {
+				logger.Debugw(ctx, "partition!!!", log.Fields{"err": errTemp})
+				if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
+					// This can occur on race condition
+					err = nil
+				} else {
+					err = errTemp
+				}
+			}
+		} else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
+			if errTemp := groupConsumer.Close(); errTemp != nil {
+				if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
+					// This can occur on race condition
+					err = nil
+				} else {
+					err = errTemp
+				}
+			}
+		}
+	}
+	return err
+}
+
+func (sc *SaramaClient) removeChannelFromConsumerChannelMap(ctx context.Context, topic Topic, ch <-chan *ic.InterContainerMessage) error {
+	sc.lockTopicToConsumerChannelMap.Lock()
+	defer sc.lockTopicToConsumerChannelMap.Unlock()
+	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
+		// Channel will be closed in the removeChannel method
+		consumerCh.channels = removeChannel(ctx, consumerCh.channels, ch)
+		// If there are no more channels then we can close the consumers itself
+		if len(consumerCh.channels) == 0 {
+			logger.Debugw(ctx, "closing-consumers", log.Fields{"topic": topic})
+			err := closeConsumers(ctx, consumerCh.consumers)
+			//err := consumerCh.consumers.Close()
+			delete(sc.topicToConsumerChannelMap, topic.Name)
+			return err
+		}
+		return nil
+	}
+	logger.Warnw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
+	return errors.New("topic-does-not-exist")
+}
+
+func (sc *SaramaClient) clearTopicFromConsumerChannelMap(ctx context.Context, topic Topic) error {
+	sc.lockTopicToConsumerChannelMap.Lock()
+	defer sc.lockTopicToConsumerChannelMap.Unlock()
+	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
+		for _, ch := range consumerCh.channels {
+			// Channel will be closed in the removeChannel method
+			removeChannel(ctx, consumerCh.channels, ch)
+		}
+		err := closeConsumers(ctx, consumerCh.consumers)
+		//if err == sarama.ErrUnknownTopicOrPartition {
+		//	// Not an error
+		//	err = nil
+		//}
+		//err := consumerCh.consumers.Close()
+		delete(sc.topicToConsumerChannelMap, topic.Name)
+		return err
+	}
+	logger.Debugw(ctx, "topic-does-not-exist", log.Fields{"topic": topic.Name})
+	return nil
+}
+
+//createPublisher creates the publisher which is used to send a message onto kafka
+func (sc *SaramaClient) createPublisher(ctx context.Context) error {
+	// This Creates the publisher
+	config := sarama.NewConfig()
+	config.Producer.Partitioner = sarama.NewRandomPartitioner
+	config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
+	config.Producer.Flush.Messages = sc.producerFlushMessages
+	config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
+	config.Producer.Return.Errors = sc.producerReturnErrors
+	config.Producer.Return.Successes = sc.producerReturnSuccess
+	//config.Producer.RequiredAcks = sarama.WaitForAll
+	config.Producer.RequiredAcks = sarama.WaitForLocal
+
+	brokers := []string{sc.KafkaAddress}
+
+	if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
+		logger.Errorw(ctx, "error-starting-publisher", log.Fields{"error": err})
+		return err
+	} else {
+		sc.producer = producer
+	}
+	logger.Info(ctx, "Kafka-publisher-created")
+	return nil
+}
+
+func (sc *SaramaClient) createConsumer(ctx context.Context) error {
+	config := sarama.NewConfig()
+	config.Consumer.Return.Errors = true
+	config.Consumer.Fetch.Min = 1
+	config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
+	config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
+	config.Consumer.Offsets.Initial = sarama.OffsetNewest
+	config.Metadata.Retry.Max = sc.metadataMaxRetry
+	brokers := []string{sc.KafkaAddress}
+
+	if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
+		logger.Errorw(ctx, "error-starting-consumers", log.Fields{"error": err})
+		return err
+	} else {
+		sc.consumer = consumer
+	}
+	logger.Info(ctx, "Kafka-consumers-created")
+	return nil
+}
+
+// createGroupConsumer creates a consumers group
+func (sc *SaramaClient) createGroupConsumer(ctx context.Context, topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
+	config := scc.NewConfig()
+	config.ClientID = uuid.New().String()
+	config.Group.Mode = scc.ConsumerModeMultiplex
+	config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")
+	config.Consumer.Return.Errors = true
+	//config.Group.Return.Notifications = false
+	//config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
+	//config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
+	config.Consumer.Offsets.Initial = initialOffset
+	//config.Consumer.Offsets.Initial = sarama.OffsetOldest
+	brokers := []string{sc.KafkaAddress}
+
+	topics := []string{topic.Name}
+	var consumer *scc.Consumer
+	var err error
+
+	if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
+		logger.Errorw(ctx, "create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+		return nil, err
+	}
+	logger.Debugw(ctx, "create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
+
+	//sc.groupConsumers[topic.Name] = consumer
+	sc.addToGroupConsumers(topic.Name, consumer)
+	return consumer, nil
+}
+
+// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
+// topic via the unique channel each subscriber received during subscription
+func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
+	// Need to go over all channels and publish messages to them - do we need to copy msg?
+	sc.lockTopicToConsumerChannelMap.RLock()
+	for _, ch := range consumerCh.channels {
+		go func(c chan *ic.InterContainerMessage) {
+			c <- protoMessage
+		}(ch)
+	}
+	sc.lockTopicToConsumerChannelMap.RUnlock()
+
+	if callback := sc.metadataCallback; callback != nil {
+		ts, _ := ptypes.Timestamp(protoMessage.Header.Timestamp)
+		callback(protoMessage.Header.FromTopic, ts)
+	}
+}
+
+func (sc *SaramaClient) consumeFromAPartition(ctx context.Context, topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
+	logger.Debugw(ctx, "starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
+startloop:
+	for {
+		select {
+		case err, ok := <-consumer.Errors():
+			if ok {
+				if sc.isLivenessError(ctx, err) {
+					sc.updateLiveness(ctx, false)
+					logger.Warnw(ctx, "partition-consumers-error", log.Fields{"error": err})
+				}
+			} else {
+				// Channel is closed
+				break startloop
+			}
+		case msg, ok := <-consumer.Messages():
+			//logger.Debugw(ctx, "message-received", logger.Fields{"msg": msg, "receivedTopic": msg.Topic})
+			if !ok {
+				// channel is closed
+				break startloop
+			}
+			msgBody := msg.Value
+			sc.updateLiveness(ctx, true)
+			logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
+			icm := &ic.InterContainerMessage{}
+			if err := proto.Unmarshal(msgBody, icm); err != nil {
+				logger.Warnw(ctx, "partition-invalid-message", log.Fields{"error": err})
+				continue
+			}
+			go sc.dispatchToConsumers(consumerChnls, icm)
+		case <-sc.doneCh:
+			logger.Infow(ctx, "partition-received-exit-signal", log.Fields{"topic": topic.Name})
+			break startloop
+		}
+	}
+	logger.Infow(ctx, "partition-consumer-stopped", log.Fields{"topic": topic.Name})
+	sc.setUnhealthy(ctx)
+}
+
+func (sc *SaramaClient) consumeGroupMessages(ctx context.Context, topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
+	logger.Debugw(ctx, "starting-group-consumption-loop", log.Fields{"topic": topic.Name})
+
+startloop:
+	for {
+		select {
+		case err, ok := <-consumer.Errors():
+			if ok {
+				if sc.isLivenessError(ctx, err) {
+					sc.updateLiveness(ctx, false)
+				}
+				logger.Warnw(ctx, "group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
+			} else {
+				logger.Warnw(ctx, "group-consumers-closed-err", log.Fields{"topic": topic.Name})
+				// channel is closed
+				break startloop
+			}
+		case msg, ok := <-consumer.Messages():
+			if !ok {
+				logger.Warnw(ctx, "group-consumers-closed-msg", log.Fields{"topic": topic.Name})
+				// Channel closed
+				break startloop
+			}
+			sc.updateLiveness(ctx, true)
+			logger.Debugw(ctx, "message-received", log.Fields{"timestamp": msg.Timestamp, "receivedTopic": msg.Topic})
+			msgBody := msg.Value
+			icm := &ic.InterContainerMessage{}
+			if err := proto.Unmarshal(msgBody, icm); err != nil {
+				logger.Warnw(ctx, "invalid-message", log.Fields{"error": err})
+				continue
+			}
+			go sc.dispatchToConsumers(consumerChnls, icm)
+			consumer.MarkOffset(msg, "")
+		case ntf := <-consumer.Notifications():
+			logger.Debugw(ctx, "group-received-notification", log.Fields{"notification": ntf})
+		case <-sc.doneCh:
+			logger.Infow(ctx, "group-received-exit-signal", log.Fields{"topic": topic.Name})
+			break startloop
+		}
+	}
+	logger.Infow(ctx, "group-consumer-stopped", log.Fields{"topic": topic.Name})
+	sc.setUnhealthy(ctx)
+}
+
+func (sc *SaramaClient) startConsumers(ctx context.Context, topic *Topic) error {
+	logger.Debugw(ctx, "starting-consumers", log.Fields{"topic": topic.Name})
+	var consumerCh *consumerChannels
+	if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
+		logger.Errorw(ctx, "consumers-not-exist", log.Fields{"topic": topic.Name})
+		return errors.New("consumers-not-exist")
+	}
+	// For each consumer listening for that topic, start a consumption loop
+	for _, consumer := range consumerCh.consumers {
+		if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
+			go sc.consumeFromAPartition(ctx, topic, pConsumer, consumerCh)
+		} else if gConsumer, ok := consumer.(*scc.Consumer); ok {
+			go sc.consumeGroupMessages(ctx, topic, gConsumer, consumerCh)
+		} else {
+			logger.Errorw(ctx, "invalid-consumer", log.Fields{"topic": topic})
+			return errors.New("invalid-consumer")
+		}
+	}
+	return nil
+}
+
+//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
+//// for that topic.  It also starts the routine that listens for messages on that topic.
+func (sc *SaramaClient) setupPartitionConsumerChannel(ctx context.Context, topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
+	var pConsumers []sarama.PartitionConsumer
+	var err error
+
+	if pConsumers, err = sc.createPartitionConsumers(ctx, topic, initialOffset); err != nil {
+		logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
+		return nil, err
+	}
+
+	consumersIf := make([]interface{}, 0)
+	for _, pConsumer := range pConsumers {
+		consumersIf = append(consumersIf, pConsumer)
+	}
+
+	// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
+	// unbuffered to verify race conditions.
+	consumerListeningChannel := make(chan *ic.InterContainerMessage)
+	cc := &consumerChannels{
+		consumers: consumersIf,
+		channels:  []chan *ic.InterContainerMessage{consumerListeningChannel},
+	}
+
+	// Add the consumers channel to the map
+	sc.addTopicToConsumerChannelMap(topic.Name, cc)
+
+	//Start a consumers to listen on that specific topic
+	go func() {
+		if err := sc.startConsumers(ctx, topic); err != nil {
+			logger.Errorw(ctx, "start-consumers-failed", log.Fields{
+				"topic": topic,
+				"error": err})
+		}
+	}()
+
+	return consumerListeningChannel, nil
+}
+
+// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
+// for that topic.  It also starts the routine that listens for messages on that topic.
+func (sc *SaramaClient) setupGroupConsumerChannel(ctx context.Context, topic *Topic, groupId string, initialOffset int64) (chan *ic.InterContainerMessage, error) {
+	// TODO:  Replace this development partition consumers with a group consumers
+	var pConsumer *scc.Consumer
+	var err error
+	if pConsumer, err = sc.createGroupConsumer(ctx, topic, groupId, initialOffset, DefaultMaxRetries); err != nil {
+		logger.Errorw(ctx, "creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
+		return nil, err
+	}
+	// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
+	// unbuffered to verify race conditions.
+	consumerListeningChannel := make(chan *ic.InterContainerMessage)
+	cc := &consumerChannels{
+		consumers: []interface{}{pConsumer},
+		channels:  []chan *ic.InterContainerMessage{consumerListeningChannel},
+	}
+
+	// Add the consumers channel to the map
+	sc.addTopicToConsumerChannelMap(topic.Name, cc)
+
+	//Start a consumers to listen on that specific topic
+	go func() {
+		if err := sc.startConsumers(ctx, topic); err != nil {
+			logger.Errorw(ctx, "start-consumers-failed", log.Fields{
+				"topic": topic,
+				"error": err})
+		}
+	}()
+
+	return consumerListeningChannel, nil
+}
+
+func (sc *SaramaClient) createPartitionConsumers(ctx context.Context, topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
+	logger.Debugw(ctx, "creating-partition-consumers", log.Fields{"topic": topic.Name})
+	partitionList, err := sc.consumer.Partitions(topic.Name)
+	if err != nil {
+		logger.Warnw(ctx, "get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
+		return nil, err
+	}
+
+	pConsumers := make([]sarama.PartitionConsumer, 0)
+	for _, partition := range partitionList {
+		var pConsumer sarama.PartitionConsumer
+		if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
+			logger.Warnw(ctx, "consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
+			return nil, err
+		}
+		pConsumers = append(pConsumers, pConsumer)
+	}
+	return pConsumers, nil
+}
+
+func removeChannel(ctx context.Context, channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
+	var i int
+	var channel chan *ic.InterContainerMessage
+	for i, channel = range channels {
+		if channel == ch {
+			channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
+			close(channel)
+			logger.Debug(ctx, "channel-closed")
+			return channels[:len(channels)-1]
+		}
+	}
+	return channels
+}
+
+func (sc *SaramaClient) addToGroupConsumers(topic string, consumer *scc.Consumer) {
+	sc.lockOfGroupConsumers.Lock()
+	defer sc.lockOfGroupConsumers.Unlock()
+	if _, exist := sc.groupConsumers[topic]; !exist {
+		sc.groupConsumers[topic] = consumer
+	}
+}
+
+func (sc *SaramaClient) deleteFromGroupConsumers(ctx context.Context, topic string) error {
+	sc.lockOfGroupConsumers.Lock()
+	defer sc.lockOfGroupConsumers.Unlock()
+	if _, exist := sc.groupConsumers[topic]; exist {
+		consumer := sc.groupConsumers[topic]
+		delete(sc.groupConsumers, topic)
+		if err := consumer.Close(); err != nil {
+			logger.Errorw(ctx, "failure-closing-consumer", log.Fields{"error": err})
+			return err
+		}
+	}
+	return nil
+}
diff --git a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/utils.go b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/utils.go
new file mode 100644
index 0000000..bdc615f
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/utils.go
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"github.com/golang/protobuf/ptypes/any"
+	"strings"
+)
+
+const (
+	TopicSeparator = "_"
+	DeviceIdLength = 24
+)
+
+// A Topic definition - may be augmented with additional attributes eventually
+type Topic struct {
+	// The name of the topic. It must start with a letter,
+	// and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`),
+	// underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent
+	// signs (`%`).
+	Name string
+}
+
+type KVArg struct {
+	Key   string
+	Value interface{}
+}
+
+type RpcMType int
+
+const (
+	RpcFormattingError RpcMType = iota
+	RpcSent
+	RpcReply
+	RpcTimeout
+	RpcTransportError
+	RpcSystemClosing
+)
+
+type RpcResponse struct {
+	MType RpcMType
+	Err   error
+	Reply *any.Any
+}
+
+func NewResponse(messageType RpcMType, err error, body *any.Any) *RpcResponse {
+	return &RpcResponse{
+		MType: messageType,
+		Err:   err,
+		Reply: body,
+	}
+}
+
+// TODO:  Remove and provide better may to get the device id
+// GetDeviceIdFromTopic extract the deviceId from the topic name.  The topic name is formatted either as:
+//			<any string> or <any string>_<deviceId>.  The device Id is 24 characters long.
+func GetDeviceIdFromTopic(topic Topic) string {
+	pos := strings.LastIndex(topic.Name, TopicSeparator)
+	if pos == -1 {
+		return ""
+	}
+	adjustedPos := pos + len(TopicSeparator)
+	if adjustedPos >= len(topic.Name) {
+		return ""
+	}
+	deviceId := topic.Name[adjustedPos:len(topic.Name)]
+	if len(deviceId) != DeviceIdLength {
+		return ""
+	}
+	return deviceId
+}